You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2011/09/03 02:29:20 UTC

svn commit: r1164770 - in /hadoop/common/branches/branch-0.20-security: ./ src/hdfs/org/apache/hadoop/hdfs/ src/hdfs/org/apache/hadoop/hdfs/protocol/ src/hdfs/org/apache/hadoop/hdfs/server/namenode/ src/test/org/apache/hadoop/hdfs/

Author: suresh
Date: Sat Sep  3 00:29:19 2011
New Revision: 1164770

URL: http://svn.apache.org/viewvc?rev=1164770&view=rev
Log:
Port from 0.20-append - HDFS-1520. Lightweight NameNode operation recoverLease to trigger 
        lease recovery. Contributed by Hairong Kuang.

Modified:
    hadoop/common/branches/branch-0.20-security/CHANGES.txt
    hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java
    hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
    hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java
    hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java

Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1164770&r1=1164769&r2=1164770&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Sat Sep  3 00:29:19 2011
@@ -22,6 +22,9 @@ Release 0.20.205.0 - unreleased
     HDFS-895. Allow hflush/sync to occur in parallel with new writes to
     the file. (Todd Lipcon via hairong)
 
+    HDFS-1520. Lightweight NameNode operation recoverLease to trigger 
+    lease recovery. (Hairong Kuang via dhruba)
+
   BUG FIXES
 
     MAPREDUCE-2324. Removed usage of broken

Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1164770&r1=1164769&r2=1164770&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Sat Sep  3 00:29:19 2011
@@ -552,6 +552,22 @@ public class DFSClient implements FSCons
   }
 
   /**
+   * Recover a file's lease
+   * @param src a file's path
+   * @throws IOException
+   */
+  void recoverLease(String src) throws IOException {
+    checkOpen();
+    
+    try {
+      namenode.recoverLease(src, clientName);
+    } catch (RemoteException re) {
+      throw re.unwrapRemoteException(FileNotFoundException.class,
+                                     AccessControlException.class);
+    }
+  }
+  
+  /**
    * Append to an existing HDFS file.  
    * 
    * @param src file name

Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1164770&r1=1164769&r2=1164770&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java Sat Sep  3 00:29:19 2011
@@ -188,6 +188,15 @@ public class DistributedFileSystem exten
           dfs.open(getPathName(f), bufferSize, verifyChecksum, statistics));
   }
 
+  /** 
+   * Trigger the lease reovery of a file
+   * @param f a file
+   * @throws IOException if an error occurs
+   */
+  public void recoverLease(Path f) throws IOException {
+    dfs.recoverLease(getPathName(f));
+  }
+
   /** This optional operation is not yet supported. */
   public FSDataOutputStream append(Path f, int bufferSize,
       Progressable progress) throws IOException {

Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1164770&r1=1164769&r2=1164770&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Sat Sep  3 00:29:19 2011
@@ -50,12 +50,9 @@ public interface ClientProtocol extends 
    * Compared to the previous version the following changes have been introduced:
    * (Only the latest change is reflected.
    * The log of historical changes can be retrieved from the svn).
-   * 61: Serialized format of BlockTokenIdentifier changed to contain
-   *     multiple blocks within a single BlockTokenIdentifier 
-   *     
-   *     (bumped to 61 to bring in line with trunk)
+   * 62: Introduce a lightweight recoverLease RPC
    */
-  public static final long versionID = 61L;
+  public static final long versionID = 62L;
   
   ///////////////////////////////////////
   // File contents
@@ -135,6 +132,14 @@ public interface ClientProtocol extends 
    * @throws IOException if other errors occur.
    */
   public LocatedBlock append(String src, String clientName) throws IOException;
+  
+  /**
+   * Trigger lease recovery to happen
+   * @param src path of the file to trigger lease recovery
+   * @param clientName name of the current client
+   * @throws IOException
+   */
+  public void recoverLease(String src, String clientName) throws IOException;
 
   /**
    * Set replication for an existing file.

Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1164770&r1=1164769&r2=1164770&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Sat Sep  3 00:29:19 2011
@@ -1216,48 +1216,7 @@ public class FSNamesystem implements FSC
 
     try {
       INode myFile = dir.getFileINode(src);
-      if (myFile != null && myFile.isUnderConstruction()) {
-        INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) myFile;
-        //
-        // If the file is under construction , then it must be in our
-        // leases. Find the appropriate lease record.
-        //
-        Lease lease = leaseManager.getLeaseByPath(src);
-        if (lease == null) {
-          throw new AlreadyBeingCreatedException(
-              "failed to create file " + src + " for " + holder +
-              " on client " + clientMachine + 
-              " because pendingCreates is non-null but no leases found.");
-        }
-        //
-        // We found the lease for this file. And surprisingly the original
-        // holder is trying to recreate this file. This should never occur.
-        //
-        if (lease.getHolder().equals(holder)) {
-          throw new AlreadyBeingCreatedException(
-              "failed to create file " + src + " for " + holder +
-              " on client " + clientMachine + 
-              " because current leaseholder is trying to recreate file.");
-        }
-        assert lease.getHolder().equals(pendingFile.getClientName()) :
-            "Current lease holder " + lease.getHolder() +
-            " does not match file creator " + pendingFile.getClientName();
-        //
-        // Current lease holder is different from the requester.
-        // If the original holder has not renewed in the last SOFTLIMIT 
-        // period, then start lease recovery.
-        //
-        if (lease.expiredSoftLimit()) {
-          LOG.info("startFile: recover lease " + lease + ", src=" + src +
-                   " from client " + pendingFile.clientName);
-          internalReleaseLease(lease, src);
-        }
-        throw new AlreadyBeingCreatedException("failed to create file " + src + " for " + holder +
-                                               " on client " + clientMachine + 
-                                               ", because this file is already being created by " +
-                                               pendingFile.getClientName() + 
-                                               " on " + pendingFile.getClientMachine());
-      }
+      recoverLeaseInternal(myFile, src, holder, clientMachine);
 
       try {
         verifyReplication(src, replication, clientMachine);
@@ -1332,6 +1291,90 @@ public class FSNamesystem implements FSC
   }
 
   /**
+   * Trigger to recover lease;
+   * When the method returns successfully, the lease has been recovered and
+   * the file is closed.
+   * 
+   * @param src the path of the file to trigger release
+   * @param holder the lease holder's name
+   * @param clientMachine the client machine's name
+   * @throws IOException
+   */
+  synchronized void recoverLease(String src, String holder, String clientMachine)
+  throws IOException {
+    if (isInSafeMode()) {
+      throw new SafeModeException(
+          "Cannot recover the lease of " + src, safeMode);
+    }
+    if (!DFSUtil.isValidName(src)) {
+      throw new IOException("Invalid file name: " + src);
+    }
+
+    INode inode = dir.getFileINode(src);
+    if (inode == null) {
+      throw new FileNotFoundException("File not found " + src);
+    }
+
+    if (isPermissionEnabled) {
+      checkPathAccess(src, FsAction.WRITE);
+    }
+
+    recoverLeaseInternal(inode, src, holder, clientMachine);
+  }
+  
+  private void recoverLeaseInternal(INode fileInode, 
+      String src, String holder, String clientMachine)
+  throws IOException {
+    if (fileInode != null && fileInode.isUnderConstruction()) {
+      INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) fileInode;
+      //
+      // If the file is under construction , then it must be in our
+      // leases. Find the appropriate lease record.
+      //
+      Lease lease = leaseManager.getLease(holder);
+      //
+      // We found the lease for this file. And surprisingly the original
+      // holder is trying to recreate this file. This should never occur.
+      //
+      if (lease != null) {
+        Lease leaseFile = leaseManager.getLeaseByPath(src);
+        if (leaseFile != null && leaseFile.equals(lease)) { 
+          throw new AlreadyBeingCreatedException(
+                    "failed to create file " + src + " for " + holder +
+                    " on client " + clientMachine + 
+                    " because current leaseholder is trying to recreate file.");
+        }
+      }
+      //
+      // Find the original holder.
+      //
+      lease = leaseManager.getLease(pendingFile.clientName);
+      if (lease == null) {
+        throw new AlreadyBeingCreatedException(
+                    "failed to create file " + src + " for " + holder +
+                    " on client " + clientMachine + 
+                    " because pendingCreates is non-null but no leases found.");
+      }
+      //
+      // If the original holder has not renewed in the last SOFTLIMIT 
+      // period, then start lease recovery.
+      //
+      if (lease.expiredSoftLimit()) {
+        LOG.info("startFile: recover lease " + lease + ", src=" + src +
+                 " from client " + pendingFile.clientName);
+        internalReleaseLease(lease, src);
+      }
+      throw new AlreadyBeingCreatedException(
+                    "failed to create file " + src + " for " + holder +
+                    " on client " + clientMachine + 
+                    ", because this file is already being created by " +
+                    pendingFile.getClientName() + 
+                    " on " + pendingFile.getClientMachine());
+    }
+
+  }
+  
+  /**
    * Append to an existing file in the namespace.
    */
   LocatedBlock appendFile(String src, String holder, String clientMachine

Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1164770&r1=1164769&r2=1164770&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java Sat Sep  3 00:29:19 2011
@@ -575,6 +575,12 @@ public class NameNode implements ClientP
   }
 
   /** {@inheritDoc} */
+  public void recoverLease(String src, String clientName) throws IOException {
+    String clientMachine = getClientMachine();
+    namesystem.recoverLease(src, clientName, clientMachine);
+  }
+  
+  /** {@inheritDoc} */
   public boolean setReplication(String src, 
                                 short replication
                                 ) throws IOException {

Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java?rev=1164770&r1=1164769&r2=1164770&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java Sat Sep  3 00:29:19 2011
@@ -289,6 +289,8 @@ public class TestDFSClientRetries extend
 
     public void setTimes(String src, long mtime, long atime) throws IOException {}
 
+    public void recoverLease(String src, String clientName) throws IOException {}
+
     public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
         throws IOException {
       return null;

Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java?rev=1164770&r1=1164769&r2=1164770&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java Sat Sep  3 00:29:19 2011
@@ -44,7 +44,9 @@ public class TestLeaseRecovery2 extends 
   static final long BLOCK_SIZE = 1024;
   static final int FILE_SIZE = 1024*16;
   static final short REPLICATION_NUM = (short)3;
-  static byte[] buffer = new byte[FILE_SIZE];
+  private static byte[] buffer = new byte[FILE_SIZE];
+  private final Configuration conf = new Configuration();
+  private final int bufferSize = conf.getInt("io.file.buffer.size", 4096);
   
   static private String fakeUsername = "fakeUser1";
   static private String fakeGroup = "supergroup";
@@ -52,9 +54,6 @@ public class TestLeaseRecovery2 extends 
   public void testBlockSynchronization() throws Exception {
     final long softLease = 1000;
     final long hardLease = 60 * 60 *1000;
-    final short repl = 3;
-    final Configuration conf = new Configuration();
-    final int bufferSize = conf.getInt("io.file.buffer.size", 4096);
     conf.setLong("dfs.block.size", BLOCK_SIZE);
     conf.setInt("dfs.heartbeat.interval", 1);
   //  conf.setInt("io.bytes.per.checksum", 16);
@@ -75,80 +74,27 @@ public class TestLeaseRecovery2 extends 
 
       //create a file
       DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
-      // create a random file name
-      String filestr = "/foo" + AppendTestUtil.nextInt();
-      System.out.println("filestr=" + filestr);
-      Path filepath = new Path(filestr);
-      FSDataOutputStream stm = dfs.create(filepath, true,
-          bufferSize, repl, BLOCK_SIZE);
-      assertTrue(dfs.dfs.exists(filestr));
-
-      // write random number of bytes into it.
       int size = AppendTestUtil.nextInt(FILE_SIZE);
-      System.out.println("size=" + size);
-      stm.write(buffer, 0, size);
-
-      // sync file
-      AppendTestUtil.LOG.info("sync");
-      stm.sync();
-      AppendTestUtil.LOG.info("leasechecker.interruptAndJoin()");
-      dfs.dfs.leasechecker.interruptAndJoin();
+      Path filepath = createFile(dfs, size);
 
       // set the soft limit to be 1 second so that the
       // namenode triggers lease recovery on next attempt to write-for-open.
       cluster.setLeasePeriod(softLease, hardLease);
 
-      // try to re-open the file before closing the previous handle. This
-      // should fail but will trigger lease recovery.
-      {
-        UserGroupInformation ugi = 
-          UserGroupInformation.createUserForTesting(fakeUsername, 
-                                                    new String [] { fakeGroup});
-        
-        FileSystem dfs2 = DFSTestUtil.getFileSystemAs(ugi, conf);
-  
-        boolean done = false;
-        for(int i = 0; i < 10 && !done; i++) {
-          AppendTestUtil.LOG.info("i=" + i);
-          try {
-            dfs2.create(filepath, false, bufferSize, repl, BLOCK_SIZE);
-            fail("Creation of an existing file should never succeed.");
-          } catch (IOException ioe) {
-            final String message = ioe.getMessage();
-            if (message.contains("file exists")) {
-              AppendTestUtil.LOG.info("done", ioe);
-              done = true;
-            }
-            else if (message.contains(AlreadyBeingCreatedException.class.getSimpleName())) {
-              AppendTestUtil.LOG.info("GOOD! got " + message);
-            }
-            else {
-              AppendTestUtil.LOG.warn("UNEXPECTED IOException", ioe);
-            }
-          }
-
-          if (!done) {
-            AppendTestUtil.LOG.info("sleep " + 5000 + "ms");
-            try {Thread.sleep(5000);} catch (InterruptedException e) {}
-          }
-        }
-        assertTrue(done);
-      }
+      recoverLeaseUsingCreate(filepath);
+      verifyFile(dfs, filepath, actual, size);
+
+      //test recoverLease
+      size = AppendTestUtil.nextInt(FILE_SIZE);
+      filepath = createFile(dfs, size);
 
-      AppendTestUtil.LOG.info("Lease for file " +  filepath + " is recovered. "
-          + "Validating its contents now...");
+      // set the soft limit to be 1 second so that the
+      // namenode triggers lease recovery on next attempt to write-for-open.
+      cluster.setLeasePeriod(softLease, hardLease);
+
+      recoverLease(filepath);
+      verifyFile(dfs, filepath, actual, size);
 
-      // verify that file-size matches
-      assertTrue("File should be " + size + " bytes, but is actually " +
-                 " found to be " + dfs.getFileStatus(filepath).getLen() +
-                 " bytes",
-                 dfs.getFileStatus(filepath).getLen() == size);
-
-      // verify that there is enough data to read.
-      System.out.println("File size is good. Now validating sizes from datanodes...");
-      FSDataInputStream stmin = dfs.open(filepath);
-      stmin.readFully(0, actual, 0, size);
-      stmin.close();
     }
     finally {
       try {
@@ -158,4 +104,112 @@ public class TestLeaseRecovery2 extends 
       }
     }
   }
+  
+  private void recoverLease(Path filepath) throws IOException, InterruptedException {
+    UserGroupInformation ugi = 
+      UserGroupInformation.createUserForTesting(fakeUsername, 
+                                                new String [] { fakeGroup});
+        
+    DistributedFileSystem dfs2 = (DistributedFileSystem)
+      DFSTestUtil.getFileSystemAs(ugi, conf);
+
+    boolean done = false;
+    while (!done) {
+      try {
+        dfs2.recoverLease(filepath);
+        done = true;
+      } catch (IOException ioe) {
+        final String message = ioe.getMessage();
+        if (message.contains(AlreadyBeingCreatedException.class.getSimpleName())) {
+          AppendTestUtil.LOG.info("GOOD! got " + message);
+        }
+        else {
+          AppendTestUtil.LOG.warn("UNEXPECTED IOException", ioe);
+        }
+      }
+
+      if (!done) {
+        AppendTestUtil.LOG.info("sleep " + 1000 + "ms");
+        try {Thread.sleep(5000);} catch (InterruptedException e) {}
+      }
+    }
+  }
+  
+  // try to re-open the file before closing the previous handle. This
+  // should fail but will trigger lease recovery.
+  private Path createFile(DistributedFileSystem dfs, int size)
+  throws IOException, InterruptedException {
+    // create a random file name
+    String filestr = "/foo" + AppendTestUtil.nextInt();
+    System.out.println("filestr=" + filestr);
+    Path filepath = new Path(filestr);
+    FSDataOutputStream stm = dfs.create(filepath, true,
+        bufferSize, REPLICATION_NUM, BLOCK_SIZE);
+    assertTrue(dfs.dfs.exists(filestr));
+
+    // write random number of bytes into it.
+    System.out.println("size=" + size);
+    stm.write(buffer, 0, size);
+
+    // sync file
+    AppendTestUtil.LOG.info("sync");
+    stm.sync();
+    AppendTestUtil.LOG.info("leasechecker.interruptAndJoin()");
+    dfs.dfs.leasechecker.interruptAndJoin();
+    return filepath;
+  }
+  
+  private void recoverLeaseUsingCreate(Path filepath)
+  throws IOException, InterruptedException {
+    UserGroupInformation ugi = 
+      UserGroupInformation.createUserForTesting(fakeUsername, 
+                                                new String [] { fakeGroup});
+    FileSystem dfs2 = DFSTestUtil.getFileSystemAs(ugi, conf);
+
+    boolean done = false;
+    for(int i = 0; i < 10 && !done; i++) {
+      AppendTestUtil.LOG.info("i=" + i);
+      try {
+        dfs2.create(filepath, false, bufferSize, (short)1, BLOCK_SIZE);
+        fail("Creation of an existing file should never succeed.");
+      } catch (IOException ioe) {
+        final String message = ioe.getMessage();
+        if (message.contains("file exists")) {
+          AppendTestUtil.LOG.info("done", ioe);
+          done = true;
+        }
+        else if (message.contains(AlreadyBeingCreatedException.class.getSimpleName())) {
+          AppendTestUtil.LOG.info("GOOD! got " + message);
+        }
+        else {
+          AppendTestUtil.LOG.warn("UNEXPECTED IOException", ioe);
+        }
+      }
+
+      if (!done) {
+        AppendTestUtil.LOG.info("sleep " + 5000 + "ms");
+        try {Thread.sleep(5000);} catch (InterruptedException e) {}
+      }
+    }
+    assertTrue(done);
+
+  }
+  
+  private void verifyFile(FileSystem dfs, Path filepath, byte[] actual,
+      int size) throws IOException {
+    AppendTestUtil.LOG.info("Lease for file " +  filepath + " is recovered. "
+        + "Validating its contents now...");
+
+    // verify that file-size matches
+    assertTrue("File should be " + size + " bytes, but is actually " +
+               " found to be " + dfs.getFileStatus(filepath).getLen() +
+               " bytes",
+               dfs.getFileStatus(filepath).getLen() == size);
+
+    // verify that there is enough data to read.
+    System.out.println("File size is good. Now validating sizes from datanodes...");
+    FSDataInputStream stmin = dfs.open(filepath);
+    stmin.readFully(0, actual, 0, size);
+    stmin.close();
+  }
 }