You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2011/04/20 19:23:17 UTC

svn commit: r1095461 - in /hadoop/hdfs/trunk: CHANGES.txt src/java/org/apache/hadoop/hdfs/DFSClient.java src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java

Author: szetszwo
Date: Wed Apr 20 17:23:17 2011
New Revision: 1095461

URL: http://svn.apache.org/viewvc?rev=1095461&view=rev
Log:
HDFS-1840. In DFSClient, terminate the lease renewing thread when all files being written are closed for a grace period, and start a new thread when new files are opened for write.

Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=1095461&r1=1095460&r2=1095461&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Wed Apr 20 17:23:17 2011
@@ -121,6 +121,10 @@ Trunk (unreleased changes)
     HDFS-1844. Move "fs -help" shell command tests from HDFS to COMMOM; see
     also HADOOP-7230.  (Daryn Sharp via szetszwo)
 
+    HDFS-1840. In DFSClient, terminate the lease renewing thread when all files
+    being written are closed for a grace period, and start a new thread when
+    new files are opened for write.  (szetszwo)
+
   OPTIMIZATIONS
 
     HDFS-1458. Improve checkpoint performance by avoiding unnecessary image

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1095461&r1=1095460&r2=1095461&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java Wed Apr 20 17:23:17 2011
@@ -45,6 +45,7 @@ import javax.net.SocketFactory;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
@@ -128,7 +129,6 @@ public class DFSClient implements FSCons
   private volatile long serverDefaultsLastUpdate;
   static Random r = new Random();
   final String clientName;
-  final LeaseChecker leasechecker = new LeaseChecker();
   Configuration conf;
   long defaultBlockSize;
   private short defaultReplication;
@@ -138,6 +138,7 @@ public class DFSClient implements FSCons
   final DataTransferProtocol.ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure;
   final FileSystem.Statistics stats;
   final int hdfsTimeout;    // timeout value for a DFS operation.
+  final LeaseChecker leasechecker;
 
   /**
    * The locking hierarchy is to first acquire lock on DFSClient object, followed by 
@@ -253,6 +254,7 @@ public class DFSClient implements FSCons
 
     // The hdfsTimeout is currently the same as the ipc timeout 
     this.hdfsTimeout = Client.getTimeout(conf);
+    this.leasechecker = new LeaseChecker(hdfsTimeout);
 
     this.ugi = UserGroupInformation.getCurrentUser();
     
@@ -1358,38 +1360,106 @@ public class DFSClient implements FSCons
     }
   }
 
-  boolean isLeaseCheckerStarted() {
-    return leasechecker.daemon != null;
-  }
-
   /** Lease management*/
-  class LeaseChecker implements Runnable {
+  class LeaseChecker {
+    static final long LEASE_RENEWER_GRACE_DEFAULT = 60*1000L;
+    static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L;
     /** A map from src -> DFSOutputStream of files that are currently being
      * written by this client.
      */
     private final SortedMap<String, OutputStream> pendingCreates
         = new TreeMap<String, OutputStream>();
+    /** The time in milliseconds that the map became empty. */
+    private long emptyTime = Long.MAX_VALUE;
+    /** A fixed lease renewal time period in milliseconds */
+    private final long renewal;
 
+    /** A daemon for renewing lease */
     private Daemon daemon = null;
-    
+    /** Only the daemon with currentId should run. */
+    private int currentId = 0;
+
+    /** 
+     * A period in milliseconds that the lease renewer thread should run
+     * after the map became empty.
+     * If the map is empty for a time period longer than the grace period,
+     * the renewer should terminate.  
+     */
+    private long gracePeriod;
+    /**
+     * The time period in milliseconds
+     * that the renewer sleeps for each iteration. 
+     */
+    private volatile long sleepPeriod;
+
+    private LeaseChecker(final long timeout) {
+      this.renewal = (timeout > 0 && timeout < LEASE_SOFTLIMIT_PERIOD)? 
+          timeout/2: LEASE_SOFTLIMIT_PERIOD/2;
+      setGraceSleepPeriod(LEASE_RENEWER_GRACE_DEFAULT);
+    }
+
+    /** Set the grace period and adjust the sleep period accordingly. */
+    void setGraceSleepPeriod(final long gracePeriod) {
+      if (gracePeriod < 100L) {
+        throw new HadoopIllegalArgumentException(gracePeriod
+            + " = gracePeriod < 100ms is too small.");
+      }
+      synchronized(this) {
+        this.gracePeriod = gracePeriod;
+      }
+      final long half = gracePeriod/2;
+      this.sleepPeriod = half < LEASE_RENEWER_SLEEP_DEFAULT?
+          half: LEASE_RENEWER_SLEEP_DEFAULT;
+    }
+
+    /** Is the daemon running? */
+    synchronized boolean isRunning() {
+      return daemon != null && daemon.isAlive();
+    }
+
+    /** Is the empty period longer than the grace period? */  
+    private synchronized boolean isRenewerExpired() {
+      return emptyTime != Long.MAX_VALUE
+          && System.currentTimeMillis() - emptyTime > gracePeriod;
+    }
+
     synchronized void put(String src, OutputStream out) {
       if (clientRunning) {
-        if (daemon == null) {
-          daemon = new Daemon(this);
+        if (daemon == null || isRenewerExpired()) {
+          //start a new deamon with a new id.
+          final int id = ++currentId;
+          daemon = new Daemon(new Runnable() {
+            @Override
+            public void run() {
+              try {
+                LeaseChecker.this.run(id);
+              } catch(InterruptedException e) {
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug(LeaseChecker.this.getClass().getSimpleName()
+                      + " is interrupted.", e);
+                }
+              }
+            }
+          });
           daemon.start();
         }
         pendingCreates.put(src, out);
+        emptyTime = Long.MAX_VALUE;
       }
     }
     
     synchronized void remove(String src) {
       pendingCreates.remove(src);
+      if (pendingCreates.isEmpty() && emptyTime == Long.MAX_VALUE) {
+        //discover the first time that the map is empty.
+        emptyTime = System.currentTimeMillis();
+      }
     }
     
     void interruptAndJoin() throws InterruptedException {
       Daemon daemonCopy = null;
       synchronized (this) {
-        if (daemon != null) {
+        if (isRunning()) {
           daemon.interrupt();
           daemonCopy = daemon;
         }
@@ -1456,37 +1526,30 @@ public class DFSClient implements FSCons
      * Periodically check in with the namenode and renew all the leases
      * when the lease period is half over.
      */
-    public void run() {
-      long lastRenewed = 0;
-      int renewal = (int)(LEASE_SOFTLIMIT_PERIOD / 2);
-      if (hdfsTimeout > 0) {
-        renewal = Math.min(renewal, hdfsTimeout/2);
-      }
-      while (clientRunning && !Thread.interrupted()) {
-        if (System.currentTimeMillis() - lastRenewed > renewal) {
+    private void run(final int id) throws InterruptedException {
+      for(long lastRenewed = System.currentTimeMillis();
+          clientRunning && !Thread.interrupted();
+          Thread.sleep(sleepPeriod)) {
+        if (System.currentTimeMillis() - lastRenewed >= renewal) {
           try {
             renew();
             lastRenewed = System.currentTimeMillis();
           } catch (SocketTimeoutException ie) {
-            LOG.warn("Problem renewing lease for " + clientName +
-                     " for a period of " + (hdfsTimeout/1000) +
-                     " seconds. Shutting down HDFS client...", ie);
+            LOG.warn("Failed to renew lease for " + clientName + " for "
+                + (renewal/1000) + " seconds.  Aborting ...", ie);
             abort();
             break;
           } catch (IOException ie) {
-            LOG.warn("Problem renewing lease for " + clientName +
-                     " for a period of " + (hdfsTimeout/1000) +
-                     " seconds. Will retry shortly...", ie);
+            LOG.warn("Failed to renew lease for " + clientName + " for "
+                + (renewal/1000) + " seconds.  Will retry shortly ...", ie);
           }
         }
 
-        try {
-          Thread.sleep(1000);
-        } catch (InterruptedException ie) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(this + " is interrupted.", ie);
+        synchronized(this) {
+          if (id != currentId || isRenewerExpired()) {
+            //no longer the current daemon or expired
+            return;
           }
-          return;
         }
       }
     }

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java?rev=1095461&r1=1095460&r2=1095461&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java Wed Apr 20 17:23:17 2011
@@ -44,6 +44,10 @@ import org.junit.Test;
 public class TestDistributedFileSystem {
   private static final Random RAN = new Random();
 
+  {
+    ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
+  }
+
   private boolean dualPortTesting = false;
   
   private HdfsConfiguration getTestConfiguration() {
@@ -100,26 +104,94 @@ public class TestDistributedFileSystem {
   @Test
   public void testDFSClient() throws Exception {
     Configuration conf = getTestConfiguration();
+    final long grace = 1000L;
     MiniDFSCluster cluster = null;
 
     try {
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
-      final Path filepath = new Path("/test/LeaseChecker/foo");
+      final String filepathstring = "/test/LeaseChecker/foo";
+      final Path[] filepaths = new Path[4];
+      for(int i = 0; i < filepaths.length; i++) {
+        filepaths[i] = new Path(filepathstring + i);
+      }
       final long millis = System.currentTimeMillis();
 
       {
         DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
-        assertFalse(dfs.dfs.isLeaseCheckerStarted());
-  
-        //create a file
-        FSDataOutputStream out = dfs.create(filepath);
-        assertTrue(dfs.dfs.isLeaseCheckerStarted());
+        dfs.dfs.leasechecker.setGraceSleepPeriod(grace);
+        assertFalse(dfs.dfs.leasechecker.isRunning());
   
-        //write something and close
-        out.writeLong(millis);
-        assertTrue(dfs.dfs.isLeaseCheckerStarted());
-        out.close();
-        assertTrue(dfs.dfs.isLeaseCheckerStarted());
+        {
+          //create a file
+          final FSDataOutputStream out = dfs.create(filepaths[0]);
+          assertTrue(dfs.dfs.leasechecker.isRunning());
+          //write something
+          out.writeLong(millis);
+          assertTrue(dfs.dfs.leasechecker.isRunning());
+          //close
+          out.close();
+          Thread.sleep(grace/4*3);
+          //within grace period
+          assertTrue(dfs.dfs.leasechecker.isRunning());
+          for(int i = 0; i < 3; i++) {
+            if (dfs.dfs.leasechecker.isRunning()) {
+              Thread.sleep(grace/2);
+            }
+          }
+          //passed grace period
+          assertFalse(dfs.dfs.leasechecker.isRunning());
+        }
+
+        {
+          //create file1
+          final FSDataOutputStream out1 = dfs.create(filepaths[1]);
+          assertTrue(dfs.dfs.leasechecker.isRunning());
+          //create file2
+          final FSDataOutputStream out2 = dfs.create(filepaths[2]);
+          assertTrue(dfs.dfs.leasechecker.isRunning());
+
+          //write something to file1
+          out1.writeLong(millis);
+          assertTrue(dfs.dfs.leasechecker.isRunning());
+          //close file1
+          out1.close();
+          assertTrue(dfs.dfs.leasechecker.isRunning());
+
+          //write something to file2
+          out2.writeLong(millis);
+          assertTrue(dfs.dfs.leasechecker.isRunning());
+          //close file2
+          out2.close();
+          Thread.sleep(grace/4*3);
+          //within grace period
+          assertTrue(dfs.dfs.leasechecker.isRunning());
+        }
+
+        {
+          //create file3
+          final FSDataOutputStream out3 = dfs.create(filepaths[3]);
+          assertTrue(dfs.dfs.leasechecker.isRunning());
+          Thread.sleep(grace/4*3);
+          //passed previous grace period, should still running
+          assertTrue(dfs.dfs.leasechecker.isRunning());
+          //write something to file3
+          out3.writeLong(millis);
+          assertTrue(dfs.dfs.leasechecker.isRunning());
+          //close file3
+          out3.close();
+          assertTrue(dfs.dfs.leasechecker.isRunning());
+          Thread.sleep(grace/4*3);
+          //within grace period
+          assertTrue(dfs.dfs.leasechecker.isRunning());
+          for(int i = 0; i < 3; i++) {
+            if (dfs.dfs.leasechecker.isRunning()) {
+              Thread.sleep(grace/2);
+            }
+          }
+          //passed grace period
+          assertFalse(dfs.dfs.leasechecker.isRunning());
+        }
+
         dfs.close();
       }
 
@@ -146,15 +218,15 @@ public class TestDistributedFileSystem {
 
       {
         DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
-        assertFalse(dfs.dfs.isLeaseCheckerStarted());
+        assertFalse(dfs.dfs.leasechecker.isRunning());
 
         //open and check the file
-        FSDataInputStream in = dfs.open(filepath);
-        assertFalse(dfs.dfs.isLeaseCheckerStarted());
+        FSDataInputStream in = dfs.open(filepaths[0]);
+        assertFalse(dfs.dfs.leasechecker.isRunning());
         assertEquals(millis, in.readLong());
-        assertFalse(dfs.dfs.isLeaseCheckerStarted());
+        assertFalse(dfs.dfs.leasechecker.isRunning());
         in.close();
-        assertFalse(dfs.dfs.isLeaseCheckerStarted());
+        assertFalse(dfs.dfs.leasechecker.isRunning());
         dfs.close();
       }