You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2008/05/22 20:21:34 UTC

svn commit: r659188 - in /hadoop/core/trunk: CHANGES.txt src/java/org/apache/hadoop/dfs/FSDataset.java src/java/org/apache/hadoop/fs/DU.java src/test/org/apache/hadoop/fs/TestDU.java

Author: rangadi
Date: Thu May 22 11:21:33 2008
New Revision: 659188

URL: http://svn.apache.org/viewvc?rev=659188&view=rev
Log:
HADOOP-3232. DU class runs the 'du' command in a seperate thread so
that it does not block user. DataNode misses heartbeats in large
nodes otherwise. (Johan Oskarsson via rangadi)

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java
    hadoop/core/trunk/src/java/org/apache/hadoop/fs/DU.java
    hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestDU.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=659188&r1=659187&r2=659188&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu May 22 11:21:33 2008
@@ -162,6 +162,10 @@
     HADOOP-2867. Adds the task's CWD to its LD_LIBRARY_PATH. 
     (Amareshwari Sriramadasu via ddas)
 
+    HADOOP-3232. DU class runs the 'du' command in a seperate thread so
+    that it does not block user. DataNode misses heartbeats in large
+    nodes otherwise. (Johan Oskarsson via rangadi)
+
   OPTIMIZATIONS
 
     HADOOP-3274. The default constructor of BytesWritable creates empty 

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java?rev=659188&r1=659187&r2=659188&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java Thu May 22 11:21:33 2008
@@ -316,6 +316,7 @@
       }
       this.usage = new DF(parent, conf);
       this.dfsUsage = new DU(parent, conf);
+      this.dfsUsage.start();
     }
 
     void decDfsUsed(long value) {
@@ -1006,6 +1007,14 @@
   public void shutdown() {
     if (mbeanName != null)
       MBeanUtil.unregisterMBean(mbeanName);
+    
+    if(volumes != null) {
+      for (FSVolume volume : volumes.volumes) {
+        if(volume != null) {
+          volume.dfsUsage.shutdown();
+        }
+      }
+    }
   }
 
   public String getStorageInfo() {

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/fs/DU.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/fs/DU.java?rev=659188&r1=659187&r2=659188&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/fs/DU.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/fs/DU.java Thu May 22 11:21:33 2008
@@ -18,46 +18,152 @@
 package org.apache.hadoop.fs;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.dfs.FSConstants;
 import org.apache.hadoop.util.Shell;
 
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
 
 /** Filesystem disk space usage statistics.  Uses the unix 'du' program*/
 public class DU extends Shell {
   private String  dirPath;
 
-  private long used;
-  
+  private AtomicLong used = new AtomicLong();
+  private volatile boolean shouldRun = true;
+  private Thread refreshUsed;
+  private IOException duException = null;
+  private long refreshInterval;
+  
+  /**
+   * Keeps track of disk usage.
+   * @param path the path to check disk usage in
+   * @param interval refresh the disk usage at this interval
+   * @throws IOException if we fail to refresh the disk usage
+   */
   public DU(File path, long interval) throws IOException {
-    super(interval);
+    super(0);
+    
+    //we set the Shell interval to 0 so it will always run our command
+    //and use this one to set the thread sleep interval
+    this.refreshInterval = interval;
     this.dirPath = path.getCanonicalPath();
+    
+    //populate the used variable
+    run();
   }
   
+  /**
+   * Keeps track of disk usage.
+   * @param path the path to check disk usage in
+   * @param conf configuration object
+   * @throws IOException if we fail to refresh the disk usage
+   */
   public DU(File path, Configuration conf) throws IOException {
-    this(path, conf.getLong("dfs.blockreport.intervalMsec",
-        FSConstants.BLOCKREPORT_INTERVAL));
-  }
-  
-  synchronized public void decDfsUsed(long value) {
-    used -= value;
+    this(path, 600000L);
+    //10 minutes default refresh interval
   }
 
-  synchronized public void incDfsUsed(long value) {
-    used += value;
+  /**
+   * This thread refreshes the "used" variable.
+   * 
+   * Future improvements could be to not permanently
+   * run this thread, instead run when getUsed is called.
+   **/
+  class DURefreshThread implements Runnable {
+    
+    public void run() {
+      
+      while(shouldRun) {
+
+        try {
+          Thread.sleep(refreshInterval);
+          
+          try {
+            //update the used variable
+            DU.this.run();
+          } catch (IOException e) {
+            synchronized (DU.this) {
+              //save the latest exception so we can return it in getUsed()
+              duException = e;
+            }
+            
+            LOG.warn("Could not get disk usage information", e);
+          }
+        } catch (InterruptedException e) {
+        }
+      }
+    }
   }
   
-  synchronized public long getUsed() throws IOException { 
-    run();
-    return used;
+  /**
+   * Decrease how much disk space we use.
+   * @param value decrease by this value
+   */
+  public void decDfsUsed(long value) {
+    used.addAndGet(-value);
+  }
+
+  /**
+   * Increase how much disk space we use.
+   * @param value increase by this value
+   */
+  public void incDfsUsed(long value) {
+    used.addAndGet(value);
+  }
+  
+  /**
+   * @return disk space used 
+   * @throws IOException if the shell command fails
+   */
+  public long getUsed() throws IOException {
+    //if the updating thread isn't started, update on demand
+    if(refreshUsed == null) {
+      run();
+    } else {
+      synchronized (DU.this) {
+        //if an exception was thrown in the last run, rethrow
+        if(duException != null) {
+          IOException tmp = duException;
+          duException = null;
+          throw tmp;
+        }
+      }
+    }
+    
+    return used.longValue();
   }
 
+  /**
+   * @return the path of which we're keeping track of disk usage
+   */
   public String getDirPath() {
     return dirPath;
   }
   
+  /**
+   * Start the disk usage checking thread.
+   */
+  public void start() {
+    //only start the thread if the interval is sane
+    if(refreshInterval > 0) {
+      refreshUsed = new Thread(new DURefreshThread(), 
+          "refreshUsed-"+dirPath);
+      refreshUsed.setDaemon(true);
+      refreshUsed.start();
+    }
+  }
+  
+  /**
+   * Shut down the refreshing thread.
+   */
+  public void shutdown() {
+    this.shouldRun = false;
+    
+    if(this.refreshUsed != null) {
+      this.refreshUsed.interrupt();
+    }
+  }
   
   public String toString() {
     return
@@ -66,25 +172,26 @@
   }
 
   protected String[] getExecString() {
-    return new String[] {"du","-sk", dirPath};
+    return new String[] {"du", "-sk", dirPath};
   }
   
   protected void parseExecResult(BufferedReader lines) throws IOException {
     String line = lines.readLine();
     if (line == null) {
-      throw new IOException( "Expecting a line not the end of stream" );
+      throw new IOException("Expecting a line not the end of stream");
     }
     String[] tokens = line.split("\t");
     if(tokens.length == 0) {
       throw new IOException("Illegal du output");
     }
-    this.used = Long.parseLong(tokens[0])*1024;
+    this.used.set(Long.parseLong(tokens[0])*1024);
   }
 
   public static void main(String[] args) throws Exception {
     String path = ".";
-    if (args.length > 0)
+    if (args.length > 0) {
       path = args[0];
+    }
 
     System.out.println(new DU(new File(path), new Configuration()).toString());
   }

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestDU.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestDU.java?rev=659188&r1=659187&r2=659188&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestDU.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestDU.java Thu May 22 11:21:33 2008
@@ -71,9 +71,25 @@
 
     Thread.sleep(5000); // let the metadata updater catch up
     
-    DU du = new DU(file, 0);
+    DU du = new DU(file, 10000);
+    du.start();
     long duSize = du.getUsed();
+    du.shutdown();
 
     assertEquals(writtenSize, duSize);
+    
+    //test with 0 interval, will not launch thread 
+    du = new DU(file, 0);
+    du.start();
+    duSize = du.getUsed();
+    du.shutdown();
+    
+    assertEquals(writtenSize, duSize);  
+    
+    //test without launching thread 
+    du = new DU(file, 10000);
+    duSize = du.getUsed();
+    
+    assertEquals(writtenSize, duSize);     
   }
 }