You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2011/09/03 02:29:20 UTC
svn commit: r1164770 - in /hadoop/common/branches/branch-0.20-security: ./
src/hdfs/org/apache/hadoop/hdfs/ src/hdfs/org/apache/hadoop/hdfs/protocol/
src/hdfs/org/apache/hadoop/hdfs/server/namenode/
src/test/org/apache/hadoop/hdfs/
Author: suresh
Date: Sat Sep 3 00:29:19 2011
New Revision: 1164770
URL: http://svn.apache.org/viewvc?rev=1164770&view=rev
Log:
Port from 0.20-append - HDFS-1520. Lightweight NameNode operation recoverLease to trigger
lease recovery. Contributed by Hairong Kuang.
Modified:
hadoop/common/branches/branch-0.20-security/CHANGES.txt
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1164770&r1=1164769&r2=1164770&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Sat Sep 3 00:29:19 2011
@@ -22,6 +22,9 @@ Release 0.20.205.0 - unreleased
HDFS-895. Allow hflush/sync to occur in parallel with new writes to
the file. (Todd Lipcon via hairong)
+ HDFS-1520. Lightweight NameNode operation recoverLease to trigger
+ lease recovery. (Hairong Kuang via dhruba)
+
BUG FIXES
MAPREDUCE-2324. Removed usage of broken
Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1164770&r1=1164769&r2=1164770&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Sat Sep 3 00:29:19 2011
@@ -552,6 +552,22 @@ public class DFSClient implements FSCons
}
/**
+ * Recover a file's lease
+ * @param src a file's path
+ * @throws IOException
+ */
+ void recoverLease(String src) throws IOException {
+ checkOpen();
+
+ try {
+ namenode.recoverLease(src, clientName);
+ } catch (RemoteException re) {
+ throw re.unwrapRemoteException(FileNotFoundException.class,
+ AccessControlException.class);
+ }
+ }
+
+ /**
* Append to an existing HDFS file.
*
* @param src file name
Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1164770&r1=1164769&r2=1164770&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java Sat Sep 3 00:29:19 2011
@@ -188,6 +188,15 @@ public class DistributedFileSystem exten
dfs.open(getPathName(f), bufferSize, verifyChecksum, statistics));
}
+ /**
+ * Trigger the lease reovery of a file
+ * @param f a file
+ * @throws IOException if an error occurs
+ */
+ public void recoverLease(Path f) throws IOException {
+ dfs.recoverLease(getPathName(f));
+ }
+
/** This optional operation is not yet supported. */
public FSDataOutputStream append(Path f, int bufferSize,
Progressable progress) throws IOException {
Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1164770&r1=1164769&r2=1164770&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Sat Sep 3 00:29:19 2011
@@ -50,12 +50,9 @@ public interface ClientProtocol extends
* Compared to the previous version the following changes have been introduced:
* (Only the latest change is reflected.
* The log of historical changes can be retrieved from the svn).
- * 61: Serialized format of BlockTokenIdentifier changed to contain
- * multiple blocks within a single BlockTokenIdentifier
- *
- * (bumped to 61 to bring in line with trunk)
+ * 62: Introduce a lightweight recoverLease RPC
*/
- public static final long versionID = 61L;
+ public static final long versionID = 62L;
///////////////////////////////////////
// File contents
@@ -135,6 +132,14 @@ public interface ClientProtocol extends
* @throws IOException if other errors occur.
*/
public LocatedBlock append(String src, String clientName) throws IOException;
+
+ /**
+ * Trigger lease recovery to happen
+ * @param src path of the file to trigger lease recovery
+ * @param clientName name of the current client
+ * @throws IOException
+ */
+ public void recoverLease(String src, String clientName) throws IOException;
/**
* Set replication for an existing file.
Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1164770&r1=1164769&r2=1164770&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Sat Sep 3 00:29:19 2011
@@ -1216,48 +1216,7 @@ public class FSNamesystem implements FSC
try {
INode myFile = dir.getFileINode(src);
- if (myFile != null && myFile.isUnderConstruction()) {
- INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) myFile;
- //
- // If the file is under construction , then it must be in our
- // leases. Find the appropriate lease record.
- //
- Lease lease = leaseManager.getLeaseByPath(src);
- if (lease == null) {
- throw new AlreadyBeingCreatedException(
- "failed to create file " + src + " for " + holder +
- " on client " + clientMachine +
- " because pendingCreates is non-null but no leases found.");
- }
- //
- // We found the lease for this file. And surprisingly the original
- // holder is trying to recreate this file. This should never occur.
- //
- if (lease.getHolder().equals(holder)) {
- throw new AlreadyBeingCreatedException(
- "failed to create file " + src + " for " + holder +
- " on client " + clientMachine +
- " because current leaseholder is trying to recreate file.");
- }
- assert lease.getHolder().equals(pendingFile.getClientName()) :
- "Current lease holder " + lease.getHolder() +
- " does not match file creator " + pendingFile.getClientName();
- //
- // Current lease holder is different from the requester.
- // If the original holder has not renewed in the last SOFTLIMIT
- // period, then start lease recovery.
- //
- if (lease.expiredSoftLimit()) {
- LOG.info("startFile: recover lease " + lease + ", src=" + src +
- " from client " + pendingFile.clientName);
- internalReleaseLease(lease, src);
- }
- throw new AlreadyBeingCreatedException("failed to create file " + src + " for " + holder +
- " on client " + clientMachine +
- ", because this file is already being created by " +
- pendingFile.getClientName() +
- " on " + pendingFile.getClientMachine());
- }
+ recoverLeaseInternal(myFile, src, holder, clientMachine);
try {
verifyReplication(src, replication, clientMachine);
@@ -1332,6 +1291,90 @@ public class FSNamesystem implements FSC
}
/**
+ * Trigger to recover lease;
+ * When the method returns successfully, the lease has been recovered and
+ * the file is closed.
+ *
+ * @param src the path of the file to trigger release
+ * @param holder the lease holder's name
+ * @param clientMachine the client machine's name
+ * @throws IOException
+ */
+ synchronized void recoverLease(String src, String holder, String clientMachine)
+ throws IOException {
+ if (isInSafeMode()) {
+ throw new SafeModeException(
+ "Cannot recover the lease of " + src, safeMode);
+ }
+ if (!DFSUtil.isValidName(src)) {
+ throw new IOException("Invalid file name: " + src);
+ }
+
+ INode inode = dir.getFileINode(src);
+ if (inode == null) {
+ throw new FileNotFoundException("File not found " + src);
+ }
+
+ if (isPermissionEnabled) {
+ checkPathAccess(src, FsAction.WRITE);
+ }
+
+ recoverLeaseInternal(inode, src, holder, clientMachine);
+ }
+
+ private void recoverLeaseInternal(INode fileInode,
+ String src, String holder, String clientMachine)
+ throws IOException {
+ if (fileInode != null && fileInode.isUnderConstruction()) {
+ INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) fileInode;
+ //
+ // If the file is under construction , then it must be in our
+ // leases. Find the appropriate lease record.
+ //
+ Lease lease = leaseManager.getLease(holder);
+ //
+ // We found the lease for this file. And surprisingly the original
+ // holder is trying to recreate this file. This should never occur.
+ //
+ if (lease != null) {
+ Lease leaseFile = leaseManager.getLeaseByPath(src);
+ if (leaseFile != null && leaseFile.equals(lease)) {
+ throw new AlreadyBeingCreatedException(
+ "failed to create file " + src + " for " + holder +
+ " on client " + clientMachine +
+ " because current leaseholder is trying to recreate file.");
+ }
+ }
+ //
+ // Find the original holder.
+ //
+ lease = leaseManager.getLease(pendingFile.clientName);
+ if (lease == null) {
+ throw new AlreadyBeingCreatedException(
+ "failed to create file " + src + " for " + holder +
+ " on client " + clientMachine +
+ " because pendingCreates is non-null but no leases found.");
+ }
+ //
+ // If the original holder has not renewed in the last SOFTLIMIT
+ // period, then start lease recovery.
+ //
+ if (lease.expiredSoftLimit()) {
+ LOG.info("startFile: recover lease " + lease + ", src=" + src +
+ " from client " + pendingFile.clientName);
+ internalReleaseLease(lease, src);
+ }
+ throw new AlreadyBeingCreatedException(
+ "failed to create file " + src + " for " + holder +
+ " on client " + clientMachine +
+ ", because this file is already being created by " +
+ pendingFile.getClientName() +
+ " on " + pendingFile.getClientMachine());
+ }
+
+ }
+
+ /**
* Append to an existing file in the namespace.
*/
LocatedBlock appendFile(String src, String holder, String clientMachine
Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1164770&r1=1164769&r2=1164770&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java Sat Sep 3 00:29:19 2011
@@ -575,6 +575,12 @@ public class NameNode implements ClientP
}
/** {@inheritDoc} */
+ public void recoverLease(String src, String clientName) throws IOException {
+ String clientMachine = getClientMachine();
+ namesystem.recoverLease(src, clientName, clientMachine);
+ }
+
+ /** {@inheritDoc} */
public boolean setReplication(String src,
short replication
) throws IOException {
Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java?rev=1164770&r1=1164769&r2=1164770&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java Sat Sep 3 00:29:19 2011
@@ -289,6 +289,8 @@ public class TestDFSClientRetries extend
public void setTimes(String src, long mtime, long atime) throws IOException {}
+ public void recoverLease(String src, String clientName) throws IOException {}
+
public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
throws IOException {
return null;
Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java?rev=1164770&r1=1164769&r2=1164770&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java Sat Sep 3 00:29:19 2011
@@ -44,7 +44,9 @@ public class TestLeaseRecovery2 extends
static final long BLOCK_SIZE = 1024;
static final int FILE_SIZE = 1024*16;
static final short REPLICATION_NUM = (short)3;
- static byte[] buffer = new byte[FILE_SIZE];
+ private static byte[] buffer = new byte[FILE_SIZE];
+ private final Configuration conf = new Configuration();
+ private final int bufferSize = conf.getInt("io.file.buffer.size", 4096);
static private String fakeUsername = "fakeUser1";
static private String fakeGroup = "supergroup";
@@ -52,9 +54,6 @@ public class TestLeaseRecovery2 extends
public void testBlockSynchronization() throws Exception {
final long softLease = 1000;
final long hardLease = 60 * 60 *1000;
- final short repl = 3;
- final Configuration conf = new Configuration();
- final int bufferSize = conf.getInt("io.file.buffer.size", 4096);
conf.setLong("dfs.block.size", BLOCK_SIZE);
conf.setInt("dfs.heartbeat.interval", 1);
// conf.setInt("io.bytes.per.checksum", 16);
@@ -75,80 +74,27 @@ public class TestLeaseRecovery2 extends
//create a file
DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
- // create a random file name
- String filestr = "/foo" + AppendTestUtil.nextInt();
- System.out.println("filestr=" + filestr);
- Path filepath = new Path(filestr);
- FSDataOutputStream stm = dfs.create(filepath, true,
- bufferSize, repl, BLOCK_SIZE);
- assertTrue(dfs.dfs.exists(filestr));
-
- // write random number of bytes into it.
int size = AppendTestUtil.nextInt(FILE_SIZE);
- System.out.println("size=" + size);
- stm.write(buffer, 0, size);
-
- // sync file
- AppendTestUtil.LOG.info("sync");
- stm.sync();
- AppendTestUtil.LOG.info("leasechecker.interruptAndJoin()");
- dfs.dfs.leasechecker.interruptAndJoin();
+ Path filepath = createFile(dfs, size);
// set the soft limit to be 1 second so that the
// namenode triggers lease recovery on next attempt to write-for-open.
cluster.setLeasePeriod(softLease, hardLease);
- // try to re-open the file before closing the previous handle. This
- // should fail but will trigger lease recovery.
- {
- UserGroupInformation ugi =
- UserGroupInformation.createUserForTesting(fakeUsername,
- new String [] { fakeGroup});
-
- FileSystem dfs2 = DFSTestUtil.getFileSystemAs(ugi, conf);
-
- boolean done = false;
- for(int i = 0; i < 10 && !done; i++) {
- AppendTestUtil.LOG.info("i=" + i);
- try {
- dfs2.create(filepath, false, bufferSize, repl, BLOCK_SIZE);
- fail("Creation of an existing file should never succeed.");
- } catch (IOException ioe) {
- final String message = ioe.getMessage();
- if (message.contains("file exists")) {
- AppendTestUtil.LOG.info("done", ioe);
- done = true;
- }
- else if (message.contains(AlreadyBeingCreatedException.class.getSimpleName())) {
- AppendTestUtil.LOG.info("GOOD! got " + message);
- }
- else {
- AppendTestUtil.LOG.warn("UNEXPECTED IOException", ioe);
- }
- }
-
- if (!done) {
- AppendTestUtil.LOG.info("sleep " + 5000 + "ms");
- try {Thread.sleep(5000);} catch (InterruptedException e) {}
- }
- }
- assertTrue(done);
- }
+ recoverLeaseUsingCreate(filepath);
+ verifyFile(dfs, filepath, actual, size);
+
+ //test recoverLease
+ size = AppendTestUtil.nextInt(FILE_SIZE);
+ filepath = createFile(dfs, size);
- AppendTestUtil.LOG.info("Lease for file " + filepath + " is recovered. "
- + "Validating its contents now...");
+ // set the soft limit to be 1 second so that the
+ // namenode triggers lease recovery on next attempt to write-for-open.
+ cluster.setLeasePeriod(softLease, hardLease);
+
+ recoverLease(filepath);
+ verifyFile(dfs, filepath, actual, size);
- // verify that file-size matches
- assertTrue("File should be " + size + " bytes, but is actually " +
- " found to be " + dfs.getFileStatus(filepath).getLen() +
- " bytes",
- dfs.getFileStatus(filepath).getLen() == size);
-
- // verify that there is enough data to read.
- System.out.println("File size is good. Now validating sizes from datanodes...");
- FSDataInputStream stmin = dfs.open(filepath);
- stmin.readFully(0, actual, 0, size);
- stmin.close();
}
finally {
try {
@@ -158,4 +104,112 @@ public class TestLeaseRecovery2 extends
}
}
}
+
+ private void recoverLease(Path filepath) throws IOException, InterruptedException {
+ UserGroupInformation ugi =
+ UserGroupInformation.createUserForTesting(fakeUsername,
+ new String [] { fakeGroup});
+
+ DistributedFileSystem dfs2 = (DistributedFileSystem)
+ DFSTestUtil.getFileSystemAs(ugi, conf);
+
+ boolean done = false;
+ while (!done) {
+ try {
+ dfs2.recoverLease(filepath);
+ done = true;
+ } catch (IOException ioe) {
+ final String message = ioe.getMessage();
+ if (message.contains(AlreadyBeingCreatedException.class.getSimpleName())) {
+ AppendTestUtil.LOG.info("GOOD! got " + message);
+ }
+ else {
+ AppendTestUtil.LOG.warn("UNEXPECTED IOException", ioe);
+ }
+ }
+
+ if (!done) {
+ AppendTestUtil.LOG.info("sleep " + 1000 + "ms");
+ try {Thread.sleep(5000);} catch (InterruptedException e) {}
+ }
+ }
+ }
+
+ // try to re-open the file before closing the previous handle. This
+ // should fail but will trigger lease recovery.
+ private Path createFile(DistributedFileSystem dfs, int size)
+ throws IOException, InterruptedException {
+ // create a random file name
+ String filestr = "/foo" + AppendTestUtil.nextInt();
+ System.out.println("filestr=" + filestr);
+ Path filepath = new Path(filestr);
+ FSDataOutputStream stm = dfs.create(filepath, true,
+ bufferSize, REPLICATION_NUM, BLOCK_SIZE);
+ assertTrue(dfs.dfs.exists(filestr));
+
+ // write random number of bytes into it.
+ System.out.println("size=" + size);
+ stm.write(buffer, 0, size);
+
+ // sync file
+ AppendTestUtil.LOG.info("sync");
+ stm.sync();
+ AppendTestUtil.LOG.info("leasechecker.interruptAndJoin()");
+ dfs.dfs.leasechecker.interruptAndJoin();
+ return filepath;
+ }
+
+ private void recoverLeaseUsingCreate(Path filepath)
+ throws IOException, InterruptedException {
+ UserGroupInformation ugi =
+ UserGroupInformation.createUserForTesting(fakeUsername,
+ new String [] { fakeGroup});
+ FileSystem dfs2 = DFSTestUtil.getFileSystemAs(ugi, conf);
+
+ boolean done = false;
+ for(int i = 0; i < 10 && !done; i++) {
+ AppendTestUtil.LOG.info("i=" + i);
+ try {
+ dfs2.create(filepath, false, bufferSize, (short)1, BLOCK_SIZE);
+ fail("Creation of an existing file should never succeed.");
+ } catch (IOException ioe) {
+ final String message = ioe.getMessage();
+ if (message.contains("file exists")) {
+ AppendTestUtil.LOG.info("done", ioe);
+ done = true;
+ }
+ else if (message.contains(AlreadyBeingCreatedException.class.getSimpleName())) {
+ AppendTestUtil.LOG.info("GOOD! got " + message);
+ }
+ else {
+ AppendTestUtil.LOG.warn("UNEXPECTED IOException", ioe);
+ }
+ }
+
+ if (!done) {
+ AppendTestUtil.LOG.info("sleep " + 5000 + "ms");
+ try {Thread.sleep(5000);} catch (InterruptedException e) {}
+ }
+ }
+ assertTrue(done);
+
+ }
+
+ private void verifyFile(FileSystem dfs, Path filepath, byte[] actual,
+ int size) throws IOException {
+ AppendTestUtil.LOG.info("Lease for file " + filepath + " is recovered. "
+ + "Validating its contents now...");
+
+ // verify that file-size matches
+ assertTrue("File should be " + size + " bytes, but is actually " +
+ " found to be " + dfs.getFileStatus(filepath).getLen() +
+ " bytes",
+ dfs.getFileStatus(filepath).getLen() == size);
+
+ // verify that there is enough data to read.
+ System.out.println("File size is good. Now validating sizes from datanodes...");
+ FSDataInputStream stmin = dfs.open(filepath);
+ stmin.readFully(0, actual, 0, size);
+ stmin.close();
+ }
}