You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2011/04/20 19:23:17 UTC
svn commit: r1095461 - in /hadoop/hdfs/trunk: CHANGES.txt
src/java/org/apache/hadoop/hdfs/DFSClient.java
src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
Author: szetszwo
Date: Wed Apr 20 17:23:17 2011
New Revision: 1095461
URL: http://svn.apache.org/viewvc?rev=1095461&view=rev
Log:
HDFS-1840. In DFSClient, terminate the lease renewing thread when all files being written are closed for a grace period, and start a new thread when new files are opened for write.
Modified:
hadoop/hdfs/trunk/CHANGES.txt
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=1095461&r1=1095460&r2=1095461&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Wed Apr 20 17:23:17 2011
@@ -121,6 +121,10 @@ Trunk (unreleased changes)
HDFS-1844. Move "fs -help" shell command tests from HDFS to COMMOM; see
also HADOOP-7230. (Daryn Sharp via szetszwo)
+ HDFS-1840. In DFSClient, terminate the lease renewing thread when all files
+ being written are closed for a grace period, and start a new thread when
+ new files are opened for write. (szetszwo)
+
OPTIMIZATIONS
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1095461&r1=1095460&r2=1095461&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java Wed Apr 20 17:23:17 2011
@@ -45,6 +45,7 @@ import javax.net.SocketFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
@@ -128,7 +129,6 @@ public class DFSClient implements FSCons
private volatile long serverDefaultsLastUpdate;
static Random r = new Random();
final String clientName;
- final LeaseChecker leasechecker = new LeaseChecker();
Configuration conf;
long defaultBlockSize;
private short defaultReplication;
@@ -138,6 +138,7 @@ public class DFSClient implements FSCons
final DataTransferProtocol.ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure;
final FileSystem.Statistics stats;
final int hdfsTimeout; // timeout value for a DFS operation.
+ final LeaseChecker leasechecker;
/**
* The locking hierarchy is to first acquire lock on DFSClient object, followed by
@@ -253,6 +254,7 @@ public class DFSClient implements FSCons
// The hdfsTimeout is currently the same as the ipc timeout
this.hdfsTimeout = Client.getTimeout(conf);
+ this.leasechecker = new LeaseChecker(hdfsTimeout);
this.ugi = UserGroupInformation.getCurrentUser();
@@ -1358,38 +1360,106 @@ public class DFSClient implements FSCons
}
}
- boolean isLeaseCheckerStarted() {
- return leasechecker.daemon != null;
- }
-
/** Lease management*/
- class LeaseChecker implements Runnable {
+ class LeaseChecker {
+ static final long LEASE_RENEWER_GRACE_DEFAULT = 60*1000L;
+ static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L;
/** A map from src -> DFSOutputStream of files that are currently being
* written by this client.
*/
private final SortedMap<String, OutputStream> pendingCreates
= new TreeMap<String, OutputStream>();
+ /** The time in milliseconds that the map became empty. */
+ private long emptyTime = Long.MAX_VALUE;
+ /** A fixed lease renewal time period in milliseconds */
+ private final long renewal;
+ /** A daemon for renewing lease */
private Daemon daemon = null;
-
+ /** Only the daemon with currentId should run. */
+ private int currentId = 0;
+
+ /**
+ * A period in milliseconds that the lease renewer thread should run
+ * after the map became empty.
+ * If the map is empty for a time period longer than the grace period,
+ * the renewer should terminate.
+ */
+ private long gracePeriod;
+ /**
+ * The time period in milliseconds
+ * that the renewer sleeps for each iteration.
+ */
+ private volatile long sleepPeriod;
+
+ private LeaseChecker(final long timeout) {
+ this.renewal = (timeout > 0 && timeout < LEASE_SOFTLIMIT_PERIOD)?
+ timeout/2: LEASE_SOFTLIMIT_PERIOD/2;
+ setGraceSleepPeriod(LEASE_RENEWER_GRACE_DEFAULT);
+ }
+
+ /** Set the grace period and adjust the sleep period accordingly. */
+ void setGraceSleepPeriod(final long gracePeriod) {
+ if (gracePeriod < 100L) {
+ throw new HadoopIllegalArgumentException(gracePeriod
+ + " = gracePeriod < 100ms is too small.");
+ }
+ synchronized(this) {
+ this.gracePeriod = gracePeriod;
+ }
+ final long half = gracePeriod/2;
+ this.sleepPeriod = half < LEASE_RENEWER_SLEEP_DEFAULT?
+ half: LEASE_RENEWER_SLEEP_DEFAULT;
+ }
+
+ /** Is the daemon running? */
+ synchronized boolean isRunning() {
+ return daemon != null && daemon.isAlive();
+ }
+
+ /** Is the empty period longer than the grace period? */
+ private synchronized boolean isRenewerExpired() {
+ return emptyTime != Long.MAX_VALUE
+ && System.currentTimeMillis() - emptyTime > gracePeriod;
+ }
+
synchronized void put(String src, OutputStream out) {
if (clientRunning) {
- if (daemon == null) {
- daemon = new Daemon(this);
+ if (daemon == null || isRenewerExpired()) {
+ //start a new deamon with a new id.
+ final int id = ++currentId;
+ daemon = new Daemon(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ LeaseChecker.this.run(id);
+ } catch(InterruptedException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(LeaseChecker.this.getClass().getSimpleName()
+ + " is interrupted.", e);
+ }
+ }
+ }
+ });
daemon.start();
}
pendingCreates.put(src, out);
+ emptyTime = Long.MAX_VALUE;
}
}
synchronized void remove(String src) {
pendingCreates.remove(src);
+ if (pendingCreates.isEmpty() && emptyTime == Long.MAX_VALUE) {
+ //discover the first time that the map is empty.
+ emptyTime = System.currentTimeMillis();
+ }
}
void interruptAndJoin() throws InterruptedException {
Daemon daemonCopy = null;
synchronized (this) {
- if (daemon != null) {
+ if (isRunning()) {
daemon.interrupt();
daemonCopy = daemon;
}
@@ -1456,37 +1526,30 @@ public class DFSClient implements FSCons
* Periodically check in with the namenode and renew all the leases
* when the lease period is half over.
*/
- public void run() {
- long lastRenewed = 0;
- int renewal = (int)(LEASE_SOFTLIMIT_PERIOD / 2);
- if (hdfsTimeout > 0) {
- renewal = Math.min(renewal, hdfsTimeout/2);
- }
- while (clientRunning && !Thread.interrupted()) {
- if (System.currentTimeMillis() - lastRenewed > renewal) {
+ private void run(final int id) throws InterruptedException {
+ for(long lastRenewed = System.currentTimeMillis();
+ clientRunning && !Thread.interrupted();
+ Thread.sleep(sleepPeriod)) {
+ if (System.currentTimeMillis() - lastRenewed >= renewal) {
try {
renew();
lastRenewed = System.currentTimeMillis();
} catch (SocketTimeoutException ie) {
- LOG.warn("Problem renewing lease for " + clientName +
- " for a period of " + (hdfsTimeout/1000) +
- " seconds. Shutting down HDFS client...", ie);
+ LOG.warn("Failed to renew lease for " + clientName + " for "
+ + (renewal/1000) + " seconds. Aborting ...", ie);
abort();
break;
} catch (IOException ie) {
- LOG.warn("Problem renewing lease for " + clientName +
- " for a period of " + (hdfsTimeout/1000) +
- " seconds. Will retry shortly...", ie);
+ LOG.warn("Failed to renew lease for " + clientName + " for "
+ + (renewal/1000) + " seconds. Will retry shortly ...", ie);
}
}
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ie) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(this + " is interrupted.", ie);
+ synchronized(this) {
+ if (id != currentId || isRenewerExpired()) {
+ //no longer the current daemon or expired
+ return;
}
- return;
}
}
}
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java?rev=1095461&r1=1095460&r2=1095461&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java Wed Apr 20 17:23:17 2011
@@ -44,6 +44,10 @@ import org.junit.Test;
public class TestDistributedFileSystem {
private static final Random RAN = new Random();
+ {
+ ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
+ }
+
private boolean dualPortTesting = false;
private HdfsConfiguration getTestConfiguration() {
@@ -100,26 +104,94 @@ public class TestDistributedFileSystem {
@Test
public void testDFSClient() throws Exception {
Configuration conf = getTestConfiguration();
+ final long grace = 1000L;
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
- final Path filepath = new Path("/test/LeaseChecker/foo");
+ final String filepathstring = "/test/LeaseChecker/foo";
+ final Path[] filepaths = new Path[4];
+ for(int i = 0; i < filepaths.length; i++) {
+ filepaths[i] = new Path(filepathstring + i);
+ }
final long millis = System.currentTimeMillis();
{
DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
- assertFalse(dfs.dfs.isLeaseCheckerStarted());
-
- //create a file
- FSDataOutputStream out = dfs.create(filepath);
- assertTrue(dfs.dfs.isLeaseCheckerStarted());
+ dfs.dfs.leasechecker.setGraceSleepPeriod(grace);
+ assertFalse(dfs.dfs.leasechecker.isRunning());
- //write something and close
- out.writeLong(millis);
- assertTrue(dfs.dfs.isLeaseCheckerStarted());
- out.close();
- assertTrue(dfs.dfs.isLeaseCheckerStarted());
+ {
+ //create a file
+ final FSDataOutputStream out = dfs.create(filepaths[0]);
+ assertTrue(dfs.dfs.leasechecker.isRunning());
+ //write something
+ out.writeLong(millis);
+ assertTrue(dfs.dfs.leasechecker.isRunning());
+ //close
+ out.close();
+ Thread.sleep(grace/4*3);
+ //within grace period
+ assertTrue(dfs.dfs.leasechecker.isRunning());
+ for(int i = 0; i < 3; i++) {
+ if (dfs.dfs.leasechecker.isRunning()) {
+ Thread.sleep(grace/2);
+ }
+ }
+ //passed grace period
+ assertFalse(dfs.dfs.leasechecker.isRunning());
+ }
+
+ {
+ //create file1
+ final FSDataOutputStream out1 = dfs.create(filepaths[1]);
+ assertTrue(dfs.dfs.leasechecker.isRunning());
+ //create file2
+ final FSDataOutputStream out2 = dfs.create(filepaths[2]);
+ assertTrue(dfs.dfs.leasechecker.isRunning());
+
+ //write something to file1
+ out1.writeLong(millis);
+ assertTrue(dfs.dfs.leasechecker.isRunning());
+ //close file1
+ out1.close();
+ assertTrue(dfs.dfs.leasechecker.isRunning());
+
+ //write something to file2
+ out2.writeLong(millis);
+ assertTrue(dfs.dfs.leasechecker.isRunning());
+ //close file2
+ out2.close();
+ Thread.sleep(grace/4*3);
+ //within grace period
+ assertTrue(dfs.dfs.leasechecker.isRunning());
+ }
+
+ {
+ //create file3
+ final FSDataOutputStream out3 = dfs.create(filepaths[3]);
+ assertTrue(dfs.dfs.leasechecker.isRunning());
+ Thread.sleep(grace/4*3);
+ //passed previous grace period, should still running
+ assertTrue(dfs.dfs.leasechecker.isRunning());
+ //write something to file3
+ out3.writeLong(millis);
+ assertTrue(dfs.dfs.leasechecker.isRunning());
+ //close file3
+ out3.close();
+ assertTrue(dfs.dfs.leasechecker.isRunning());
+ Thread.sleep(grace/4*3);
+ //within grace period
+ assertTrue(dfs.dfs.leasechecker.isRunning());
+ for(int i = 0; i < 3; i++) {
+ if (dfs.dfs.leasechecker.isRunning()) {
+ Thread.sleep(grace/2);
+ }
+ }
+ //passed grace period
+ assertFalse(dfs.dfs.leasechecker.isRunning());
+ }
+
dfs.close();
}
@@ -146,15 +218,15 @@ public class TestDistributedFileSystem {
{
DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
- assertFalse(dfs.dfs.isLeaseCheckerStarted());
+ assertFalse(dfs.dfs.leasechecker.isRunning());
//open and check the file
- FSDataInputStream in = dfs.open(filepath);
- assertFalse(dfs.dfs.isLeaseCheckerStarted());
+ FSDataInputStream in = dfs.open(filepaths[0]);
+ assertFalse(dfs.dfs.leasechecker.isRunning());
assertEquals(millis, in.readLong());
- assertFalse(dfs.dfs.isLeaseCheckerStarted());
+ assertFalse(dfs.dfs.leasechecker.isRunning());
in.close();
- assertFalse(dfs.dfs.isLeaseCheckerStarted());
+ assertFalse(dfs.dfs.leasechecker.isRunning());
dfs.close();
}