You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2008/05/22 20:21:34 UTC
svn commit: r659188 - in /hadoop/core/trunk: CHANGES.txt
src/java/org/apache/hadoop/dfs/FSDataset.java
src/java/org/apache/hadoop/fs/DU.java
src/test/org/apache/hadoop/fs/TestDU.java
Author: rangadi
Date: Thu May 22 11:21:33 2008
New Revision: 659188
URL: http://svn.apache.org/viewvc?rev=659188&view=rev
Log:
HADOOP-3232. DU class runs the 'du' command in a seperate thread so
that it does not block user. DataNode misses heartbeats in large
nodes otherwise. (Johan Oskarsson via rangadi)
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java
hadoop/core/trunk/src/java/org/apache/hadoop/fs/DU.java
hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestDU.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=659188&r1=659187&r2=659188&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu May 22 11:21:33 2008
@@ -162,6 +162,10 @@
HADOOP-2867. Adds the task's CWD to its LD_LIBRARY_PATH.
(Amareshwari Sriramadasu via ddas)
+ HADOOP-3232. DU class runs the 'du' command in a seperate thread so
+ that it does not block user. DataNode misses heartbeats in large
+ nodes otherwise. (Johan Oskarsson via rangadi)
+
OPTIMIZATIONS
HADOOP-3274. The default constructor of BytesWritable creates empty
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java?rev=659188&r1=659187&r2=659188&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java Thu May 22 11:21:33 2008
@@ -316,6 +316,7 @@
}
this.usage = new DF(parent, conf);
this.dfsUsage = new DU(parent, conf);
+ this.dfsUsage.start();
}
void decDfsUsed(long value) {
@@ -1006,6 +1007,14 @@
public void shutdown() {
if (mbeanName != null)
MBeanUtil.unregisterMBean(mbeanName);
+
+ if(volumes != null) {
+ for (FSVolume volume : volumes.volumes) {
+ if(volume != null) {
+ volume.dfsUsage.shutdown();
+ }
+ }
+ }
}
public String getStorageInfo() {
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/fs/DU.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/fs/DU.java?rev=659188&r1=659187&r2=659188&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/fs/DU.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/fs/DU.java Thu May 22 11:21:33 2008
@@ -18,46 +18,152 @@
package org.apache.hadoop.fs;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.dfs.FSConstants;
import org.apache.hadoop.util.Shell;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
/** Filesystem disk space usage statistics. Uses the unix 'du' program*/
public class DU extends Shell {
private String dirPath;
- private long used;
-
+ private AtomicLong used = new AtomicLong();
+ private volatile boolean shouldRun = true;
+ private Thread refreshUsed;
+ private IOException duException = null;
+ private long refreshInterval;
+
+ /**
+ * Keeps track of disk usage.
+ * @param path the path to check disk usage in
+ * @param interval refresh the disk usage at this interval
+ * @throws IOException if we fail to refresh the disk usage
+ */
public DU(File path, long interval) throws IOException {
- super(interval);
+ super(0);
+
+ //we set the Shell interval to 0 so it will always run our command
+ //and use this one to set the thread sleep interval
+ this.refreshInterval = interval;
this.dirPath = path.getCanonicalPath();
+
+ //populate the used variable
+ run();
}
+ /**
+ * Keeps track of disk usage.
+ * @param path the path to check disk usage in
+ * @param conf configuration object
+ * @throws IOException if we fail to refresh the disk usage
+ */
public DU(File path, Configuration conf) throws IOException {
- this(path, conf.getLong("dfs.blockreport.intervalMsec",
- FSConstants.BLOCKREPORT_INTERVAL));
- }
-
- synchronized public void decDfsUsed(long value) {
- used -= value;
+ this(path, 600000L);
+ //10 minutes default refresh interval
}
- synchronized public void incDfsUsed(long value) {
- used += value;
+ /**
+ * This thread refreshes the "used" variable.
+ *
+ * Future improvements could be to not permanently
+ * run this thread, instead run when getUsed is called.
+ **/
+ class DURefreshThread implements Runnable {
+
+ public void run() {
+
+ while(shouldRun) {
+
+ try {
+ Thread.sleep(refreshInterval);
+
+ try {
+ //update the used variable
+ DU.this.run();
+ } catch (IOException e) {
+ synchronized (DU.this) {
+ //save the latest exception so we can return it in getUsed()
+ duException = e;
+ }
+
+ LOG.warn("Could not get disk usage information", e);
+ }
+ } catch (InterruptedException e) {
+ }
+ }
+ }
}
- synchronized public long getUsed() throws IOException {
- run();
- return used;
+ /**
+ * Decrease how much disk space we use.
+ * @param value decrease by this value
+ */
+ public void decDfsUsed(long value) {
+ used.addAndGet(-value);
+ }
+
+ /**
+ * Increase how much disk space we use.
+ * @param value increase by this value
+ */
+ public void incDfsUsed(long value) {
+ used.addAndGet(value);
+ }
+
+ /**
+ * @return disk space used
+ * @throws IOException if the shell command fails
+ */
+ public long getUsed() throws IOException {
+ //if the updating thread isn't started, update on demand
+ if(refreshUsed == null) {
+ run();
+ } else {
+ synchronized (DU.this) {
+ //if an exception was thrown in the last run, rethrow
+ if(duException != null) {
+ IOException tmp = duException;
+ duException = null;
+ throw tmp;
+ }
+ }
+ }
+
+ return used.longValue();
}
+ /**
+ * @return the path of which we're keeping track of disk usage
+ */
public String getDirPath() {
return dirPath;
}
+ /**
+ * Start the disk usage checking thread.
+ */
+ public void start() {
+ //only start the thread if the interval is sane
+ if(refreshInterval > 0) {
+ refreshUsed = new Thread(new DURefreshThread(),
+ "refreshUsed-"+dirPath);
+ refreshUsed.setDaemon(true);
+ refreshUsed.start();
+ }
+ }
+
+ /**
+ * Shut down the refreshing thread.
+ */
+ public void shutdown() {
+ this.shouldRun = false;
+
+ if(this.refreshUsed != null) {
+ this.refreshUsed.interrupt();
+ }
+ }
public String toString() {
return
@@ -66,25 +172,26 @@
}
protected String[] getExecString() {
- return new String[] {"du","-sk", dirPath};
+ return new String[] {"du", "-sk", dirPath};
}
protected void parseExecResult(BufferedReader lines) throws IOException {
String line = lines.readLine();
if (line == null) {
- throw new IOException( "Expecting a line not the end of stream" );
+ throw new IOException("Expecting a line not the end of stream");
}
String[] tokens = line.split("\t");
if(tokens.length == 0) {
throw new IOException("Illegal du output");
}
- this.used = Long.parseLong(tokens[0])*1024;
+ this.used.set(Long.parseLong(tokens[0])*1024);
}
public static void main(String[] args) throws Exception {
String path = ".";
- if (args.length > 0)
+ if (args.length > 0) {
path = args[0];
+ }
System.out.println(new DU(new File(path), new Configuration()).toString());
}
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestDU.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestDU.java?rev=659188&r1=659187&r2=659188&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestDU.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestDU.java Thu May 22 11:21:33 2008
@@ -71,9 +71,25 @@
Thread.sleep(5000); // let the metadata updater catch up
- DU du = new DU(file, 0);
+ DU du = new DU(file, 10000);
+ du.start();
long duSize = du.getUsed();
+ du.shutdown();
assertEquals(writtenSize, duSize);
+
+ //test with 0 interval, will not launch thread
+ du = new DU(file, 0);
+ du.start();
+ duSize = du.getUsed();
+ du.shutdown();
+
+ assertEquals(writtenSize, duSize);
+
+ //test without launching thread
+ du = new DU(file, 10000);
+ duSize = du.getUsed();
+
+ assertEquals(writtenSize, duSize);
}
}