You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/05/05 01:43:10 UTC

[06/33] hadoop git commit: HDFS-8086. Move LeaseRenewer to the hdfs.client.impl package. Contributed by Takanobu

HDFS-8086. Move LeaseRenewer to the hdfs.client.impl package.  Contributed by Takanobu


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d3d019c3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d3d019c3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d3d019c3

Branch: refs/heads/HDFS-7240
Commit: d3d019c337ecc10e9c6bbefc3a97c6cd1f5283c3
Parents: 64d30a6
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Fri May 1 15:11:09 2015 -0700
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Fri May 1 15:12:18 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../java/org/apache/hadoop/hdfs/DFSClient.java  |  13 +-
 .../org/apache/hadoop/hdfs/LeaseRenewer.java    | 512 ------------------
 .../hadoop/hdfs/client/impl/LeaseRenewer.java   | 514 +++++++++++++++++++
 .../hadoop/hdfs/TestDistributedFileSystem.java  |  59 ++-
 .../java/org/apache/hadoop/hdfs/TestLease.java  |   1 +
 .../apache/hadoop/hdfs/TestLeaseRenewer.java    | 207 --------
 .../hdfs/client/impl/TestLeaseRenewer.java      | 209 ++++++++
 8 files changed, 769 insertions(+), 749 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3d019c3/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index b5c5e6b..179fe7e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -494,6 +494,9 @@ Release 2.8.0 - UNRELEASED
     HDFS-8292. Move conditional in fmt_time from dfs-dust.js to status.html.
     (Charles Lamb via wang)
 
+    HDFS-8086. Move LeaseRenewer to the hdfs.client.impl package.  (Takanobu
+    Asanuma via szetszwo)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3d019c3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index d47992b..aaba543 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -101,6 +101,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
+import org.apache.hadoop.hdfs.client.impl.LeaseRenewer;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.net.TcpPeerServer;
 import org.apache.hadoop.hdfs.protocol.AclException;
@@ -481,7 +482,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
    *  enforced to consistently update its local dfsclients array and 
    *  client's filesBeingWritten map.
    */
-  void putFileBeingWritten(final long inodeId, final DFSOutputStream out) {
+  public void putFileBeingWritten(final long inodeId, final DFSOutputStream out) {
     synchronized(filesBeingWritten) {
       filesBeingWritten.put(inodeId, out);
       // update the last lease renewal time only when there was no
@@ -494,7 +495,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
   }
 
   /** Remove a file. Only called from LeaseRenewer. */
-  void removeFileBeingWritten(final long inodeId) {
+  public void removeFileBeingWritten(final long inodeId) {
     synchronized(filesBeingWritten) {
       filesBeingWritten.remove(inodeId);
       if (filesBeingWritten.isEmpty()) {
@@ -504,14 +505,14 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
   }
 
   /** Is file-being-written map empty? */
-  boolean isFilesBeingWrittenEmpty() {
+  public boolean isFilesBeingWrittenEmpty() {
     synchronized(filesBeingWritten) {
       return filesBeingWritten.isEmpty();
     }
   }
   
   /** @return true if the client is running */
-  boolean isClientRunning() {
+  public boolean isClientRunning() {
     return clientRunning;
   }
 
@@ -533,7 +534,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
    * @return true if lease was renewed. May return false if this
    * client has been closed or has no files open.
    **/
-  boolean renewLease() throws IOException {
+  public boolean renewLease() throws IOException {
     if (clientRunning && !isFilesBeingWrittenEmpty()) {
       try {
         namenode.renewLease(clientName);
@@ -565,7 +566,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
   }
   
   /** Abort and release resources held.  Ignore all errors. */
-  void abort() {
+  public void abort() {
     clientRunning = false;
     closeAllFilesBeingWritten(true);
     try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3d019c3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java
deleted file mode 100644
index 511bddb..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java
+++ /dev/null
@@ -1,512 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import java.io.IOException;
-import java.net.SocketTimeoutException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.HadoopIllegalArgumentException;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.Daemon;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.Time;
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * <p>
- * Used by {@link DFSClient} for renewing file-being-written leases
- * on the namenode.
- * When a file is opened for write (create or append),
- * namenode stores a file lease for recording the identity of the writer.
- * The writer (i.e. the DFSClient) is required to renew the lease periodically.
- * When the lease is not renewed before it expires,
- * the namenode considers the writer as failed and then it may either let
- * another writer to obtain the lease or close the file.
- * </p>
- * <p>
- * This class also provides the following functionality:
- * <ul>
- * <li>
- * It maintains a map from (namenode, user) pairs to lease renewers. 
- * The same {@link LeaseRenewer} instance is used for renewing lease
- * for all the {@link DFSClient} to the same namenode and the same user.
- * </li>
- * <li>
- * Each renewer maintains a list of {@link DFSClient}.
- * Periodically the leases for all the clients are renewed.
- * A client is removed from the list when the client is closed.
- * </li>
- * <li>
- * A thread per namenode per user is used by the {@link LeaseRenewer}
- * to renew the leases.
- * </li>
- * </ul>
- * </p>
- */
-@InterfaceAudience.Private
-class LeaseRenewer {
-  static final Log LOG = LogFactory.getLog(LeaseRenewer.class);
-
-  static final long LEASE_RENEWER_GRACE_DEFAULT = 60*1000L;
-  static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L;
-
-  /** Get a {@link LeaseRenewer} instance */
-  static LeaseRenewer getInstance(final String authority,
-      final UserGroupInformation ugi, final DFSClient dfsc) throws IOException {
-    final LeaseRenewer r = Factory.INSTANCE.get(authority, ugi);
-    r.addClient(dfsc);
-    return r;
-  }
-
-  /** 
-   * A factory for sharing {@link LeaseRenewer} objects
-   * among {@link DFSClient} instances
-   * so that there is only one renewer per authority per user.
-   */
-  private static class Factory {
-    private static final Factory INSTANCE = new Factory();
-
-    private static class Key {
-      /** Namenode info */
-      final String authority;
-      /** User info */
-      final UserGroupInformation ugi;
-
-      private Key(final String authority, final UserGroupInformation ugi) {
-        if (authority == null) {
-          throw new HadoopIllegalArgumentException("authority == null");
-        } else if (ugi == null) {
-          throw new HadoopIllegalArgumentException("ugi == null");
-        }
-
-        this.authority = authority;
-        this.ugi = ugi;
-      }
-
-      @Override
-      public int hashCode() {
-        return authority.hashCode() ^ ugi.hashCode();
-      }
-
-      @Override
-      public boolean equals(Object obj) {
-        if (obj == this) {
-          return true;
-        }
-        if (obj != null && obj instanceof Key) {
-          final Key that = (Key)obj;
-          return this.authority.equals(that.authority)
-                 && this.ugi.equals(that.ugi);
-        }
-        return false;        
-      }
-
-      @Override
-      public String toString() {
-        return ugi.getShortUserName() + "@" + authority;
-      }
-    }
-
-    /** A map for per user per namenode renewers. */
-    private final Map<Key, LeaseRenewer> renewers = new HashMap<Key, LeaseRenewer>();
-
-    /** Get a renewer. */
-    private synchronized LeaseRenewer get(final String authority,
-        final UserGroupInformation ugi) {
-      final Key k = new Key(authority, ugi);
-      LeaseRenewer r = renewers.get(k);
-      if (r == null) {
-        r = new LeaseRenewer(k);
-        renewers.put(k, r);
-      }
-      return r;
-    }
-
-    /** Remove the given renewer. */
-    private synchronized void remove(final LeaseRenewer r) {
-      final LeaseRenewer stored = renewers.get(r.factorykey);
-      //Since a renewer may expire, the stored renewer can be different.
-      if (r == stored) {
-        if (!r.clientsRunning()) {
-          renewers.remove(r.factorykey);
-        }
-      }
-    }
-  }
-
-  /** The time in milliseconds that the map became empty. */
-  private long emptyTime = Long.MAX_VALUE;
-  /** A fixed lease renewal time period in milliseconds */
-  private long renewal = HdfsConstants.LEASE_SOFTLIMIT_PERIOD/2;
-
-  /** A daemon for renewing lease */
-  private Daemon daemon = null;
-  /** Only the daemon with currentId should run. */
-  private int currentId = 0;
-
-  /** 
-   * A period in milliseconds that the lease renewer thread should run
-   * after the map became empty.
-   * In other words,
-   * if the map is empty for a time period longer than the grace period,
-   * the renewer should terminate.  
-   */
-  private long gracePeriod;
-  /**
-   * The time period in milliseconds
-   * that the renewer sleeps for each iteration. 
-   */
-  private long sleepPeriod;
-
-  private final Factory.Key factorykey;
-
-  /** A list of clients corresponding to this renewer. */
-  private final List<DFSClient> dfsclients = new ArrayList<DFSClient>();
-
-  /**
-   * A stringified stack trace of the call stack when the Lease Renewer
-   * was instantiated. This is only generated if trace-level logging is
-   * enabled on this class.
-   */
-  private final String instantiationTrace;
-
-  private LeaseRenewer(Factory.Key factorykey) {
-    this.factorykey = factorykey;
-    unsyncSetGraceSleepPeriod(LEASE_RENEWER_GRACE_DEFAULT);
-    
-    if (LOG.isTraceEnabled()) {
-      instantiationTrace = StringUtils.stringifyException(
-        new Throwable("TRACE"));
-    } else {
-      instantiationTrace = null;
-    }
-  }
-
-  /** @return the renewal time in milliseconds. */
-  private synchronized long getRenewalTime() {
-    return renewal;
-  }
-
-  /** Add a client. */
-  private synchronized void addClient(final DFSClient dfsc) {
-    for(DFSClient c : dfsclients) {
-      if (c == dfsc) {
-        //client already exists, nothing to do.
-        return;
-      }
-    }
-    //client not found, add it
-    dfsclients.add(dfsc);
-
-    //update renewal time
-    final int hdfsTimeout = dfsc.getConf().getHdfsTimeout();
-    if (hdfsTimeout > 0) {
-      final long half = hdfsTimeout/2;
-      if (half < renewal) {
-        this.renewal = half;
-      }
-    }
-  }
-
-  private synchronized boolean clientsRunning() {
-    for(Iterator<DFSClient> i = dfsclients.iterator(); i.hasNext(); ) {
-      if (!i.next().isClientRunning()) {
-        i.remove();
-      }
-    }
-    return !dfsclients.isEmpty();
-  }
-
-  private synchronized long getSleepPeriod() {
-    return sleepPeriod;    
-  }
-
-  /** Set the grace period and adjust the sleep period accordingly. */
-  synchronized void setGraceSleepPeriod(final long gracePeriod) {
-    unsyncSetGraceSleepPeriod(gracePeriod);
-  }
-
-  private void unsyncSetGraceSleepPeriod(final long gracePeriod) {
-    if (gracePeriod < 100L) {
-      throw new HadoopIllegalArgumentException(gracePeriod
-          + " = gracePeriod < 100ms is too small.");
-    }
-    this.gracePeriod = gracePeriod;
-    final long half = gracePeriod/2;
-    this.sleepPeriod = half < LEASE_RENEWER_SLEEP_DEFAULT?
-        half: LEASE_RENEWER_SLEEP_DEFAULT;
-  }
-
-  /** Is the daemon running? */
-  synchronized boolean isRunning() {
-    return daemon != null && daemon.isAlive();
-  }
-
-  /** Does this renewer have nothing to renew? */
-  public boolean isEmpty() {
-    return dfsclients.isEmpty();
-  }
-  
-  /** Used only by tests */
-  synchronized String getDaemonName() {
-    return daemon.getName();
-  }
-
-  /** Is the empty period longer than the grace period? */  
-  private synchronized boolean isRenewerExpired() {
-    return emptyTime != Long.MAX_VALUE
-        && Time.monotonicNow() - emptyTime > gracePeriod;
-  }
-
-  synchronized void put(final long inodeId, final DFSOutputStream out,
-      final DFSClient dfsc) {
-    if (dfsc.isClientRunning()) {
-      if (!isRunning() || isRenewerExpired()) {
-        //start a new deamon with a new id.
-        final int id = ++currentId;
-        daemon = new Daemon(new Runnable() {
-          @Override
-          public void run() {
-            try {
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("Lease renewer daemon for " + clientsString()
-                    + " with renew id " + id + " started");
-              }
-              LeaseRenewer.this.run(id);
-            } catch(InterruptedException e) {
-              if (LOG.isDebugEnabled()) {
-                LOG.debug(LeaseRenewer.this.getClass().getSimpleName()
-                    + " is interrupted.", e);
-              }
-            } finally {
-              synchronized(LeaseRenewer.this) {
-                Factory.INSTANCE.remove(LeaseRenewer.this);
-              }
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("Lease renewer daemon for " + clientsString()
-                    + " with renew id " + id + " exited");
-              }
-            }
-          }
-          
-          @Override
-          public String toString() {
-            return String.valueOf(LeaseRenewer.this);
-          }
-        });
-        daemon.start();
-      }
-      dfsc.putFileBeingWritten(inodeId, out);
-      emptyTime = Long.MAX_VALUE;
-    }
-  }
-
-  @VisibleForTesting
-  synchronized void setEmptyTime(long time) {
-    emptyTime = time;
-  }
-
-  /** Close a file. */
-  void closeFile(final long inodeId, final DFSClient dfsc) {
-    dfsc.removeFileBeingWritten(inodeId);
-
-    synchronized(this) {
-      if (dfsc.isFilesBeingWrittenEmpty()) {
-        dfsclients.remove(dfsc);
-      }
-      //update emptyTime if necessary
-      if (emptyTime == Long.MAX_VALUE) {
-        for(DFSClient c : dfsclients) {
-          if (!c.isFilesBeingWrittenEmpty()) {
-            //found a non-empty file-being-written map
-            return;
-          }
-        }
-        //discover the first time that all file-being-written maps are empty.
-        emptyTime = Time.monotonicNow();
-      }
-    }
-  }
-
-  /** Close the given client. */
-  synchronized void closeClient(final DFSClient dfsc) {
-    dfsclients.remove(dfsc);
-    if (dfsclients.isEmpty()) {
-      if (!isRunning() || isRenewerExpired()) {
-        Factory.INSTANCE.remove(LeaseRenewer.this);
-        return;
-      }
-      if (emptyTime == Long.MAX_VALUE) {
-        //discover the first time that the client list is empty.
-        emptyTime = Time.monotonicNow();
-      }
-    }
-
-    //update renewal time
-    if (renewal == dfsc.getConf().getHdfsTimeout()/2) {
-      long min = HdfsConstants.LEASE_SOFTLIMIT_PERIOD;
-      for(DFSClient c : dfsclients) {
-        final int timeout = c.getConf().getHdfsTimeout();
-        if (timeout > 0 && timeout < min) {
-          min = timeout;
-        }
-      }
-      renewal = min/2;
-    }
-  }
-
-  void interruptAndJoin() throws InterruptedException {
-    Daemon daemonCopy = null;
-    synchronized (this) {
-      if (isRunning()) {
-        daemon.interrupt();
-        daemonCopy = daemon;
-      }
-    }
-   
-    if (daemonCopy != null) {
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("Wait for lease checker to terminate");
-      }
-      daemonCopy.join();
-    }
-  }
-
-  private void renew() throws IOException {
-    final List<DFSClient> copies;
-    synchronized(this) {
-      copies = new ArrayList<DFSClient>(dfsclients);
-    }
-    //sort the client names for finding out repeated names.
-    Collections.sort(copies, new Comparator<DFSClient>() {
-      @Override
-      public int compare(final DFSClient left, final DFSClient right) {
-        return left.getClientName().compareTo(right.getClientName());
-      }
-    });
-    String previousName = "";
-    for(int i = 0; i < copies.size(); i++) {
-      final DFSClient c = copies.get(i);
-      //skip if current client name is the same as the previous name.
-      if (!c.getClientName().equals(previousName)) {
-        if (!c.renewLease()) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Did not renew lease for client " +
-                c);
-          }
-          continue;
-        }
-        previousName = c.getClientName();
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Lease renewed for client " + previousName);
-        }
-      }
-    }
-  }
-
-  /**
-   * Periodically check in with the namenode and renew all the leases
-   * when the lease period is half over.
-   */
-  private void run(final int id) throws InterruptedException {
-    for(long lastRenewed = Time.monotonicNow(); !Thread.interrupted();
-        Thread.sleep(getSleepPeriod())) {
-      final long elapsed = Time.monotonicNow() - lastRenewed;
-      if (elapsed >= getRenewalTime()) {
-        try {
-          renew();
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Lease renewer daemon for " + clientsString()
-                + " with renew id " + id + " executed");
-          }
-          lastRenewed = Time.monotonicNow();
-        } catch (SocketTimeoutException ie) {
-          LOG.warn("Failed to renew lease for " + clientsString() + " for "
-              + (elapsed/1000) + " seconds.  Aborting ...", ie);
-          synchronized (this) {
-            while (!dfsclients.isEmpty()) {
-              dfsclients.get(0).abort();
-            }
-          }
-          break;
-        } catch (IOException ie) {
-          LOG.warn("Failed to renew lease for " + clientsString() + " for "
-              + (elapsed/1000) + " seconds.  Will retry shortly ...", ie);
-        }
-      }
-
-      synchronized(this) {
-        if (id != currentId || isRenewerExpired()) {
-          if (LOG.isDebugEnabled()) {
-            if (id != currentId) {
-              LOG.debug("Lease renewer daemon for " + clientsString()
-                  + " with renew id " + id + " is not current");
-            } else {
-               LOG.debug("Lease renewer daemon for " + clientsString()
-                  + " with renew id " + id + " expired");
-            }
-          }
-          //no longer the current daemon or expired
-          return;
-        }
-
-        // if no clients are in running state or there is no more clients
-        // registered with this renewer, stop the daemon after the grace
-        // period.
-        if (!clientsRunning() && emptyTime == Long.MAX_VALUE) {
-          emptyTime = Time.monotonicNow();
-        }
-      }
-    }
-  }
-
-  @Override
-  public String toString() {
-    String s = getClass().getSimpleName() + ":" + factorykey;
-    if (LOG.isTraceEnabled()) {
-      return s + ", clients=" +  clientsString()
-        + ", created at " + instantiationTrace;
-    }
-    return s;
-  }
-
-  /** Get the names of all clients */
-  private synchronized String clientsString() {
-    if (dfsclients.isEmpty()) {
-      return "[]";
-    } else {
-      final StringBuilder b = new StringBuilder("[").append(
-          dfsclients.get(0).getClientName());
-      for(int i = 1; i < dfsclients.size(); i++) {
-        b.append(", ").append(dfsclients.get(i).getClientName());
-      }
-      return b.append("]").toString();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3d019c3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java
new file mode 100644
index 0000000..4cdf168
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java
@@ -0,0 +1,514 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.client.impl;
+
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSOutputStream;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * <p>
+ * Used by {@link org.apache.hadoop.hdfs.DFSClient} for renewing file-being-written leases
+ * on the namenode.
+ * When a file is opened for write (create or append),
+ * namenode stores a file lease for recording the identity of the writer.
+ * The writer (i.e. the DFSClient) is required to renew the lease periodically.
+ * When the lease is not renewed before it expires,
+ * the namenode considers the writer as failed and then it may either let
+ * another writer to obtain the lease or close the file.
+ * </p>
+ * <p>
+ * This class also provides the following functionality:
+ * <ul>
+ * <li>
+ * It maintains a map from (namenode, user) pairs to lease renewers.
+ * The same {@link LeaseRenewer} instance is used for renewing lease
+ * for all the {@link org.apache.hadoop.hdfs.DFSClient} to the same namenode and the same user.
+ * </li>
+ * <li>
+ * Each renewer maintains a list of {@link org.apache.hadoop.hdfs.DFSClient}.
+ * Periodically the leases for all the clients are renewed.
+ * A client is removed from the list when the client is closed.
+ * </li>
+ * <li>
+ * A thread per namenode per user is used by the {@link LeaseRenewer}
+ * to renew the leases.
+ * </li>
+ * </ul>
+ * </p>
+ */
+@InterfaceAudience.Private
+public class LeaseRenewer {
+  static final Log LOG = LogFactory.getLog(LeaseRenewer.class);
+
+  static final long LEASE_RENEWER_GRACE_DEFAULT = 60*1000L;
+  static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L;
+
+  /** Get a {@link LeaseRenewer} instance */
+  public static LeaseRenewer getInstance(final String authority,
+      final UserGroupInformation ugi, final DFSClient dfsc) throws IOException {
+    final LeaseRenewer r = Factory.INSTANCE.get(authority, ugi);
+    r.addClient(dfsc);
+    return r;
+  }
+
+  /**
+   * A factory for sharing {@link LeaseRenewer} objects
+   * among {@link DFSClient} instances
+   * so that there is only one renewer per authority per user.
+   */
+  private static class Factory {
+    private static final Factory INSTANCE = new Factory();
+
+    private static class Key {
+      /** Namenode info */
+      final String authority;
+      /** User info */
+      final UserGroupInformation ugi;
+
+      private Key(final String authority, final UserGroupInformation ugi) {
+        if (authority == null) {
+          throw new HadoopIllegalArgumentException("authority == null");
+        } else if (ugi == null) {
+          throw new HadoopIllegalArgumentException("ugi == null");
+        }
+
+        this.authority = authority;
+        this.ugi = ugi;
+      }
+
+      @Override
+      public int hashCode() {
+        return authority.hashCode() ^ ugi.hashCode();
+      }
+
+      @Override
+      public boolean equals(Object obj) {
+        if (obj == this) {
+          return true;
+        }
+        if (obj != null && obj instanceof Key) {
+          final Key that = (Key)obj;
+          return this.authority.equals(that.authority)
+                 && this.ugi.equals(that.ugi);
+        }
+        return false;
+      }
+
+      @Override
+      public String toString() {
+        return ugi.getShortUserName() + "@" + authority;
+      }
+    }
+
+    /** A map for per user per namenode renewers. */
+    private final Map<Key, LeaseRenewer> renewers = new HashMap<Key, LeaseRenewer>();
+
+    /** Get a renewer. */
+    private synchronized LeaseRenewer get(final String authority,
+        final UserGroupInformation ugi) {
+      final Key k = new Key(authority, ugi);
+      LeaseRenewer r = renewers.get(k);
+      if (r == null) {
+        r = new LeaseRenewer(k);
+        renewers.put(k, r);
+      }
+      return r;
+    }
+
+    /** Remove the given renewer. */
+    private synchronized void remove(final LeaseRenewer r) {
+      final LeaseRenewer stored = renewers.get(r.factorykey);
+      //Since a renewer may expire, the stored renewer can be different.
+      if (r == stored) {
+        if (!r.clientsRunning()) {
+          renewers.remove(r.factorykey);
+        }
+      }
+    }
+  }
+
+  /** The time in milliseconds that the map became empty. */
+  private long emptyTime = Long.MAX_VALUE;
+  /** A fixed lease renewal time period in milliseconds */
+  private long renewal = HdfsConstants.LEASE_SOFTLIMIT_PERIOD/2;
+
+  /** A daemon for renewing lease */
+  private Daemon daemon = null;
+  /** Only the daemon with currentId should run. */
+  private int currentId = 0;
+
+  /**
+   * A period in milliseconds that the lease renewer thread should run
+   * after the map became empty.
+   * In other words,
+   * if the map is empty for a time period longer than the grace period,
+   * the renewer should terminate.
+   */
+  private long gracePeriod;
+  /**
+   * The time period in milliseconds
+   * that the renewer sleeps for each iteration.
+   */
+  private long sleepPeriod;
+
+  private final Factory.Key factorykey;
+
+  /** A list of clients corresponding to this renewer. */
+  private final List<DFSClient> dfsclients = new ArrayList<DFSClient>();
+
+  /**
+   * A stringified stack trace of the call stack when the Lease Renewer
+   * was instantiated. This is only generated if trace-level logging is
+   * enabled on this class.
+   */
+  private final String instantiationTrace;
+
+  private LeaseRenewer(Factory.Key factorykey) {
+    this.factorykey = factorykey;
+    unsyncSetGraceSleepPeriod(LEASE_RENEWER_GRACE_DEFAULT);
+
+    if (LOG.isTraceEnabled()) {
+      instantiationTrace = StringUtils.stringifyException(
+        new Throwable("TRACE"));
+    } else {
+      instantiationTrace = null;
+    }
+  }
+
+  /** @return the renewal time in milliseconds. */
+  private synchronized long getRenewalTime() {
+    return renewal;
+  }
+
+  /** Add a client. */
+  private synchronized void addClient(final DFSClient dfsc) {
+    for(DFSClient c : dfsclients) {
+      if (c == dfsc) {
+        //client already exists, nothing to do.
+        return;
+      }
+    }
+    //client not found, add it
+    dfsclients.add(dfsc);
+
+    //update renewal time
+    final int hdfsTimeout = dfsc.getConf().getHdfsTimeout();
+    if (hdfsTimeout > 0) {
+      final long half = hdfsTimeout/2;
+      if (half < renewal) {
+        this.renewal = half;
+      }
+    }
+  }
+
+  private synchronized boolean clientsRunning() {
+    for(Iterator<DFSClient> i = dfsclients.iterator(); i.hasNext(); ) {
+      if (!i.next().isClientRunning()) {
+        i.remove();
+      }
+    }
+    return !dfsclients.isEmpty();
+  }
+
+  private synchronized long getSleepPeriod() {
+    return sleepPeriod;
+  }
+
+  /** Set the grace period and adjust the sleep period accordingly. */
+  synchronized void setGraceSleepPeriod(final long gracePeriod) {
+    unsyncSetGraceSleepPeriod(gracePeriod);
+  }
+
+  private void unsyncSetGraceSleepPeriod(final long gracePeriod) {
+    if (gracePeriod < 100L) {
+      throw new HadoopIllegalArgumentException(gracePeriod
+          + " = gracePeriod < 100ms is too small.");
+    }
+    this.gracePeriod = gracePeriod;
+    final long half = gracePeriod/2;
+    this.sleepPeriod = half < LEASE_RENEWER_SLEEP_DEFAULT?
+        half: LEASE_RENEWER_SLEEP_DEFAULT;
+  }
+
+  /** Is the daemon running? */
+  synchronized boolean isRunning() {
+    return daemon != null && daemon.isAlive();
+  }
+
+  /** Does this renewer have nothing to renew? */
+  public boolean isEmpty() {
+    return dfsclients.isEmpty();
+  }
+
+  /** Used only by tests */
+  synchronized String getDaemonName() {
+    return daemon.getName();
+  }
+
+  /** Is the empty period longer than the grace period? */
+  private synchronized boolean isRenewerExpired() {
+    return emptyTime != Long.MAX_VALUE
+        && Time.monotonicNow() - emptyTime > gracePeriod;
+  }
+
+  public synchronized void put(final long inodeId, final DFSOutputStream out,
+      final DFSClient dfsc) {
+    if (dfsc.isClientRunning()) {
+      if (!isRunning() || isRenewerExpired()) {
+        //start a new deamon with a new id.
+        final int id = ++currentId;
+        daemon = new Daemon(new Runnable() {
+          @Override
+          public void run() {
+            try {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Lease renewer daemon for " + clientsString()
+                    + " with renew id " + id + " started");
+              }
+              LeaseRenewer.this.run(id);
+            } catch(InterruptedException e) {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug(LeaseRenewer.this.getClass().getSimpleName()
+                    + " is interrupted.", e);
+              }
+            } finally {
+              synchronized(LeaseRenewer.this) {
+                Factory.INSTANCE.remove(LeaseRenewer.this);
+              }
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Lease renewer daemon for " + clientsString()
+                    + " with renew id " + id + " exited");
+              }
+            }
+          }
+
+          @Override
+          public String toString() {
+            return String.valueOf(LeaseRenewer.this);
+          }
+        });
+        daemon.start();
+      }
+      dfsc.putFileBeingWritten(inodeId, out);
+      emptyTime = Long.MAX_VALUE;
+    }
+  }
+
+  @VisibleForTesting
+  synchronized void setEmptyTime(long time) {
+    emptyTime = time;
+  }
+
+  /** Close a file. */
+  public void closeFile(final long inodeId, final DFSClient dfsc) {
+    dfsc.removeFileBeingWritten(inodeId);
+
+    synchronized(this) {
+      if (dfsc.isFilesBeingWrittenEmpty()) {
+        dfsclients.remove(dfsc);
+      }
+      //update emptyTime if necessary
+      if (emptyTime == Long.MAX_VALUE) {
+        for(DFSClient c : dfsclients) {
+          if (!c.isFilesBeingWrittenEmpty()) {
+            //found a non-empty file-being-written map
+            return;
+          }
+        }
+        //discover the first time that all file-being-written maps are empty.
+        emptyTime = Time.monotonicNow();
+      }
+    }
+  }
+
+  /** Close the given client. */
+  public synchronized void closeClient(final DFSClient dfsc) {
+    dfsclients.remove(dfsc);
+    if (dfsclients.isEmpty()) {
+      if (!isRunning() || isRenewerExpired()) {
+        Factory.INSTANCE.remove(LeaseRenewer.this);
+        return;
+      }
+      if (emptyTime == Long.MAX_VALUE) {
+        //discover the first time that the client list is empty.
+        emptyTime = Time.monotonicNow();
+      }
+    }
+
+    //update renewal time
+    if (renewal == dfsc.getConf().getHdfsTimeout()/2) {
+      long min = HdfsConstants.LEASE_SOFTLIMIT_PERIOD;
+      for(DFSClient c : dfsclients) {
+        final int timeout = c.getConf().getHdfsTimeout();
+        if (timeout > 0 && timeout < min) {
+          min = timeout;
+        }
+      }
+      renewal = min/2;
+    }
+  }
+
+  public void interruptAndJoin() throws InterruptedException {
+    Daemon daemonCopy = null;
+    synchronized (this) {
+      if (isRunning()) {
+        daemon.interrupt();
+        daemonCopy = daemon;
+      }
+    }
+
+    if (daemonCopy != null) {
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Wait for lease checker to terminate");
+      }
+      daemonCopy.join();
+    }
+  }
+
+  private void renew() throws IOException {
+    final List<DFSClient> copies;
+    synchronized(this) {
+      copies = new ArrayList<DFSClient>(dfsclients);
+    }
+    //sort the client names for finding out repeated names.
+    Collections.sort(copies, new Comparator<DFSClient>() {
+      @Override
+      public int compare(final DFSClient left, final DFSClient right) {
+        return left.getClientName().compareTo(right.getClientName());
+      }
+    });
+    String previousName = "";
+    for(int i = 0; i < copies.size(); i++) {
+      final DFSClient c = copies.get(i);
+      //skip if current client name is the same as the previous name.
+      if (!c.getClientName().equals(previousName)) {
+        if (!c.renewLease()) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Did not renew lease for client " +
+                c);
+          }
+          continue;
+        }
+        previousName = c.getClientName();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Lease renewed for client " + previousName);
+        }
+      }
+    }
+  }
+
+  /**
+   * Periodically check in with the namenode and renew all the leases
+   * when the lease period is half over.
+   */
+  private void run(final int id) throws InterruptedException {
+    for(long lastRenewed = Time.monotonicNow(); !Thread.interrupted();
+        Thread.sleep(getSleepPeriod())) {
+      final long elapsed = Time.monotonicNow() - lastRenewed;
+      if (elapsed >= getRenewalTime()) {
+        try {
+          renew();
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Lease renewer daemon for " + clientsString()
+                + " with renew id " + id + " executed");
+          }
+          lastRenewed = Time.monotonicNow();
+        } catch (SocketTimeoutException ie) {
+          LOG.warn("Failed to renew lease for " + clientsString() + " for "
+              + (elapsed/1000) + " seconds.  Aborting ...", ie);
+          synchronized (this) {
+            while (!dfsclients.isEmpty()) {
+              dfsclients.get(0).abort();
+            }
+          }
+          break;
+        } catch (IOException ie) {
+          LOG.warn("Failed to renew lease for " + clientsString() + " for "
+              + (elapsed/1000) + " seconds.  Will retry shortly ...", ie);
+        }
+      }
+
+      synchronized(this) {
+        if (id != currentId || isRenewerExpired()) {
+          if (LOG.isDebugEnabled()) {
+            if (id != currentId) {
+              LOG.debug("Lease renewer daemon for " + clientsString()
+                  + " with renew id " + id + " is not current");
+            } else {
+               LOG.debug("Lease renewer daemon for " + clientsString()
+                  + " with renew id " + id + " expired");
+            }
+          }
+          //no longer the current daemon or expired
+          return;
+        }
+
+        // if no clients are in running state or there is no more clients
+        // registered with this renewer, stop the daemon after the grace
+        // period.
+        if (!clientsRunning() && emptyTime == Long.MAX_VALUE) {
+          emptyTime = Time.monotonicNow();
+        }
+      }
+    }
+  }
+
+  @Override
+  public String toString() {
+    String s = getClass().getSimpleName() + ":" + factorykey;
+    if (LOG.isTraceEnabled()) {
+      return s + ", clients=" +  clientsString()
+        + ", created at " + instantiationTrace;
+    }
+    return s;
+  }
+
+  /** Get the names of all clients */
+  private synchronized String clientsString() {
+    if (dfsclients.isEmpty()) {
+      return "[]";
+    } else {
+      final StringBuilder b = new StringBuilder("[").append(
+          dfsclients.get(0).getClientName());
+      for(int i = 1; i < dfsclients.size(); i++) {
+        b.append(", ").append(dfsclients.get(i).getClientName());
+      }
+      return b.append("]").toString();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3d019c3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
index 0689a53..837665e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
@@ -31,6 +31,7 @@ import static org.mockito.Mockito.mock;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.SocketTimeoutException;
@@ -64,6 +65,7 @@ import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.VolumeId;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
+import org.apache.hadoop.hdfs.client.impl.LeaseRenewer;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
 import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
@@ -264,78 +266,84 @@ public class TestDistributedFileSystem {
 
       {
         final DistributedFileSystem dfs = cluster.getFileSystem();
-        dfs.dfs.getLeaseRenewer().setGraceSleepPeriod(grace);
-        assertFalse(dfs.dfs.getLeaseRenewer().isRunning());
+        Method setMethod = dfs.dfs.getLeaseRenewer().getClass()
+            .getDeclaredMethod("setGraceSleepPeriod", long.class);
+        setMethod.setAccessible(true);
+        setMethod.invoke(dfs.dfs.getLeaseRenewer(), grace);
+        Method checkMethod = dfs.dfs.getLeaseRenewer().getClass()
+            .getDeclaredMethod("isRunning");
+        checkMethod.setAccessible(true);
+        assertFalse((boolean) checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
   
         {
           //create a file
           final FSDataOutputStream out = dfs.create(filepaths[0]);
-          assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+          assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
           //write something
           out.writeLong(millis);
-          assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+          assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
           //close
           out.close();
           Thread.sleep(grace/4*3);
           //within grace period
-          assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+          assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
           for(int i = 0; i < 3; i++) {
-            if (dfs.dfs.getLeaseRenewer().isRunning()) {
+            if ((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer())) {
               Thread.sleep(grace/2);
             }
           }
           //passed grace period
-          assertFalse(dfs.dfs.getLeaseRenewer().isRunning());
+          assertFalse((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
         }
 
         {
           //create file1
           final FSDataOutputStream out1 = dfs.create(filepaths[1]);
-          assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+          assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
           //create file2
           final FSDataOutputStream out2 = dfs.create(filepaths[2]);
-          assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+          assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
 
           //write something to file1
           out1.writeLong(millis);
-          assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+          assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
           //close file1
           out1.close();
-          assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+          assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
 
           //write something to file2
           out2.writeLong(millis);
-          assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+          assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
           //close file2
           out2.close();
           Thread.sleep(grace/4*3);
           //within grace period
-          assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+          assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
         }
 
         {
           //create file3
           final FSDataOutputStream out3 = dfs.create(filepaths[3]);
-          assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+          assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
           Thread.sleep(grace/4*3);
           //passed previous grace period, should still running
-          assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+          assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
           //write something to file3
           out3.writeLong(millis);
-          assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+          assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
           //close file3
           out3.close();
-          assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+          assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
           Thread.sleep(grace/4*3);
           //within grace period
-          assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+          assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
           for(int i = 0; i < 3; i++) {
-            if (dfs.dfs.getLeaseRenewer().isRunning()) {
+            if ((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer())) {
               Thread.sleep(grace/2);
             }
           }
           //passed grace period
-          assertFalse(dfs.dfs.getLeaseRenewer().isRunning());
+          assertFalse((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
         }
 
         dfs.close();
@@ -364,15 +372,18 @@ public class TestDistributedFileSystem {
 
       {
         final DistributedFileSystem dfs = cluster.getFileSystem();
-        assertFalse(dfs.dfs.getLeaseRenewer().isRunning());
+        Method checkMethod = dfs.dfs.getLeaseRenewer().getClass()
+            .getDeclaredMethod("isRunning");
+        checkMethod.setAccessible(true);
+        assertFalse((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
 
         //open and check the file
         FSDataInputStream in = dfs.open(filepaths[0]);
-        assertFalse(dfs.dfs.getLeaseRenewer().isRunning());
+        assertFalse((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
         assertEquals(millis, in.readLong());
-        assertFalse(dfs.dfs.getLeaseRenewer().isRunning());
+        assertFalse((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
         in.close();
-        assertFalse(dfs.dfs.getLeaseRenewer().isRunning());
+        assertFalse((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
         dfs.close();
       }
       

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3d019c3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java
index 1cf7add..9b5a7c0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.client.impl.LeaseRenewer;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3d019c3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRenewer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRenewer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRenewer.java
deleted file mode 100644
index f091db7..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRenewer.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import static org.junit.Assert.assertSame;
-
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.util.Time;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import com.google.common.base.Supplier;
-
-public class TestLeaseRenewer {
-  private final String FAKE_AUTHORITY="hdfs://nn1/";
-  private final UserGroupInformation FAKE_UGI_A =
-    UserGroupInformation.createUserForTesting(
-      "myuser", new String[]{"group1"});
-  private final UserGroupInformation FAKE_UGI_B =
-    UserGroupInformation.createUserForTesting(
-      "myuser", new String[]{"group1"});
-
-  private DFSClient MOCK_DFSCLIENT;
-  private LeaseRenewer renewer;
-  
-  /** Cause renewals often so test runs quickly. */
-  private static final long FAST_GRACE_PERIOD = 100L;
-  
-  @Before
-  public void setupMocksAndRenewer() throws IOException {
-    MOCK_DFSCLIENT = createMockClient();
-    
-    renewer = LeaseRenewer.getInstance(
-        FAKE_AUTHORITY, FAKE_UGI_A, MOCK_DFSCLIENT);
-    renewer.setGraceSleepPeriod(FAST_GRACE_PERIOD);
-}
- 
-  private DFSClient createMockClient() {
-    final DfsClientConf mockConf = Mockito.mock(DfsClientConf.class);
-    Mockito.doReturn((int)FAST_GRACE_PERIOD).when(mockConf).getHdfsTimeout();
-
-    DFSClient mock = Mockito.mock(DFSClient.class);
-    Mockito.doReturn(true).when(mock).isClientRunning();
-    Mockito.doReturn(mockConf).when(mock).getConf();
-    Mockito.doReturn("myclient").when(mock).getClientName();
-    return mock;
-  }
-
-  @Test
-  public void testInstanceSharing() throws IOException {
-    // Two lease renewers with the same UGI should return
-    // the same instance
-    LeaseRenewer lr = LeaseRenewer.getInstance(
-        FAKE_AUTHORITY, FAKE_UGI_A, MOCK_DFSCLIENT);
-    LeaseRenewer lr2 = LeaseRenewer.getInstance(
-        FAKE_AUTHORITY, FAKE_UGI_A, MOCK_DFSCLIENT);
-    Assert.assertSame(lr, lr2);
-    
-    // But a different UGI should return a different instance
-    LeaseRenewer lr3 = LeaseRenewer.getInstance(
-        FAKE_AUTHORITY, FAKE_UGI_B, MOCK_DFSCLIENT);
-    Assert.assertNotSame(lr, lr3);
-    
-    // A different authority with same UGI should also be a different
-    // instance.
-    LeaseRenewer lr4 = LeaseRenewer.getInstance(
-        "someOtherAuthority", FAKE_UGI_B, MOCK_DFSCLIENT);
-    Assert.assertNotSame(lr, lr4);
-    Assert.assertNotSame(lr3, lr4);
-  }
-  
-  @Test
-  public void testRenewal() throws Exception {
-    // Keep track of how many times the lease gets renewed
-    final AtomicInteger leaseRenewalCount = new AtomicInteger();
-    Mockito.doAnswer(new Answer<Boolean>() {
-      @Override
-      public Boolean answer(InvocationOnMock invocation) throws Throwable {
-        leaseRenewalCount.incrementAndGet();
-        return true;
-      }
-    }).when(MOCK_DFSCLIENT).renewLease();
-
-    
-    // Set up a file so that we start renewing our lease.
-    DFSOutputStream mockStream = Mockito.mock(DFSOutputStream.class);
-    long fileId = 123L;
-    renewer.put(fileId, mockStream, MOCK_DFSCLIENT);
-
-    // Wait for lease to get renewed
-    long failTime = Time.monotonicNow() + 5000;
-    while (Time.monotonicNow() < failTime &&
-        leaseRenewalCount.get() == 0) {
-      Thread.sleep(50);
-    }
-    if (leaseRenewalCount.get() == 0) {
-      Assert.fail("Did not renew lease at all!");
-    }
-
-    renewer.closeFile(fileId, MOCK_DFSCLIENT);
-  }
-  
-  /**
-   * Regression test for HDFS-2810. In this bug, the LeaseRenewer has handles
-   * to several DFSClients with the same name, the first of which has no files
-   * open. Previously, this was causing the lease to not get renewed.
-   */
-  @Test
-  public void testManyDfsClientsWhereSomeNotOpen() throws Exception {
-    // First DFSClient has no files open so doesn't renew leases.
-    final DFSClient mockClient1 = createMockClient();
-    Mockito.doReturn(false).when(mockClient1).renewLease();
-    assertSame(renewer, LeaseRenewer.getInstance(
-        FAKE_AUTHORITY, FAKE_UGI_A, mockClient1));
-    
-    // Set up a file so that we start renewing our lease.
-    DFSOutputStream mockStream1 = Mockito.mock(DFSOutputStream.class);
-    long fileId = 456L;
-    renewer.put(fileId, mockStream1, mockClient1);
-
-    // Second DFSClient does renew lease
-    final DFSClient mockClient2 = createMockClient();
-    Mockito.doReturn(true).when(mockClient2).renewLease();
-    assertSame(renewer, LeaseRenewer.getInstance(
-        FAKE_AUTHORITY, FAKE_UGI_A, mockClient2));
-
-    // Set up a file so that we start renewing our lease.
-    DFSOutputStream mockStream2 = Mockito.mock(DFSOutputStream.class);
-    renewer.put(fileId, mockStream2, mockClient2);
-
-    
-    // Wait for lease to get renewed
-    GenericTestUtils.waitFor(new Supplier<Boolean>() {
-      @Override
-      public Boolean get() {
-        try {
-          Mockito.verify(mockClient1, Mockito.atLeastOnce()).renewLease();
-          Mockito.verify(mockClient2, Mockito.atLeastOnce()).renewLease();
-          return true;
-        } catch (AssertionError err) {
-          LeaseRenewer.LOG.warn("Not yet satisfied", err);
-          return false;
-        } catch (IOException e) {
-          // should not throw!
-          throw new RuntimeException(e);
-        }
-      }
-    }, 100, 10000);
-
-    renewer.closeFile(fileId, mockClient1);
-    renewer.closeFile(fileId, mockClient2);
-  }
-  
-  @Test
-  public void testThreadName() throws Exception {
-    DFSOutputStream mockStream = Mockito.mock(DFSOutputStream.class);
-    long fileId = 789L;
-    Assert.assertFalse("Renewer not initially running",
-        renewer.isRunning());
-    
-    // Pretend to open a file
-    renewer.put(fileId, mockStream, MOCK_DFSCLIENT);
-    
-    Assert.assertTrue("Renewer should have started running",
-        renewer.isRunning());
-    
-    // Check the thread name is reasonable
-    String threadName = renewer.getDaemonName();
-    Assert.assertEquals("LeaseRenewer:myuser@hdfs://nn1/", threadName);
-    
-    // Pretend to close the file
-    renewer.closeFile(fileId, MOCK_DFSCLIENT);
-    renewer.setEmptyTime(Time.monotonicNow());
-    
-    // Should stop the renewer running within a few seconds
-    long failTime = Time.monotonicNow() + 5000;
-    while (renewer.isRunning() && Time.monotonicNow() < failTime) {
-      Thread.sleep(50);
-    }
-    Assert.assertFalse(renewer.isRunning());
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3d019c3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestLeaseRenewer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestLeaseRenewer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestLeaseRenewer.java
new file mode 100644
index 0000000..a4e00d9
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestLeaseRenewer.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.client.impl;
+
+import static org.junit.Assert.assertSame;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSOutputStream;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Time;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import com.google.common.base.Supplier;
+
+public class TestLeaseRenewer {
+  private final String FAKE_AUTHORITY="hdfs://nn1/";
+  private final UserGroupInformation FAKE_UGI_A =
+    UserGroupInformation.createUserForTesting(
+      "myuser", new String[]{"group1"});
+  private final UserGroupInformation FAKE_UGI_B =
+    UserGroupInformation.createUserForTesting(
+      "myuser", new String[]{"group1"});
+
+  private DFSClient MOCK_DFSCLIENT;
+  private LeaseRenewer renewer;
+
+  /** Cause renewals often so test runs quickly. */
+  private static final long FAST_GRACE_PERIOD = 100L;
+
+  @Before
+  public void setupMocksAndRenewer() throws IOException {
+    MOCK_DFSCLIENT = createMockClient();
+
+    renewer = LeaseRenewer.getInstance(
+        FAKE_AUTHORITY, FAKE_UGI_A, MOCK_DFSCLIENT);
+    renewer.setGraceSleepPeriod(FAST_GRACE_PERIOD);
+}
+
+  private DFSClient createMockClient() {
+    final DfsClientConf mockConf = Mockito.mock(DfsClientConf.class);
+    Mockito.doReturn((int)FAST_GRACE_PERIOD).when(mockConf).getHdfsTimeout();
+
+    DFSClient mock = Mockito.mock(DFSClient.class);
+    Mockito.doReturn(true).when(mock).isClientRunning();
+    Mockito.doReturn(mockConf).when(mock).getConf();
+    Mockito.doReturn("myclient").when(mock).getClientName();
+    return mock;
+  }
+
+  @Test
+  public void testInstanceSharing() throws IOException {
+    // Two lease renewers with the same UGI should return
+    // the same instance
+    LeaseRenewer lr = LeaseRenewer.getInstance(
+        FAKE_AUTHORITY, FAKE_UGI_A, MOCK_DFSCLIENT);
+    LeaseRenewer lr2 = LeaseRenewer.getInstance(
+        FAKE_AUTHORITY, FAKE_UGI_A, MOCK_DFSCLIENT);
+    Assert.assertSame(lr, lr2);
+
+    // But a different UGI should return a different instance
+    LeaseRenewer lr3 = LeaseRenewer.getInstance(
+        FAKE_AUTHORITY, FAKE_UGI_B, MOCK_DFSCLIENT);
+    Assert.assertNotSame(lr, lr3);
+
+    // A different authority with same UGI should also be a different
+    // instance.
+    LeaseRenewer lr4 = LeaseRenewer.getInstance(
+        "someOtherAuthority", FAKE_UGI_B, MOCK_DFSCLIENT);
+    Assert.assertNotSame(lr, lr4);
+    Assert.assertNotSame(lr3, lr4);
+  }
+
+  @Test
+  public void testRenewal() throws Exception {
+    // Keep track of how many times the lease gets renewed
+    final AtomicInteger leaseRenewalCount = new AtomicInteger();
+    Mockito.doAnswer(new Answer<Boolean>() {
+      @Override
+      public Boolean answer(InvocationOnMock invocation) throws Throwable {
+        leaseRenewalCount.incrementAndGet();
+        return true;
+      }
+    }).when(MOCK_DFSCLIENT).renewLease();
+
+
+    // Set up a file so that we start renewing our lease.
+    DFSOutputStream mockStream = Mockito.mock(DFSOutputStream.class);
+    long fileId = 123L;
+    renewer.put(fileId, mockStream, MOCK_DFSCLIENT);
+
+    // Wait for lease to get renewed
+    long failTime = Time.monotonicNow() + 5000;
+    while (Time.monotonicNow() < failTime &&
+        leaseRenewalCount.get() == 0) {
+      Thread.sleep(50);
+    }
+    if (leaseRenewalCount.get() == 0) {
+      Assert.fail("Did not renew lease at all!");
+    }
+
+    renewer.closeFile(fileId, MOCK_DFSCLIENT);
+  }
+
+  /**
+   * Regression test for HDFS-2810. In this bug, the LeaseRenewer has handles
+   * to several DFSClients with the same name, the first of which has no files
+   * open. Previously, this was causing the lease to not get renewed.
+   */
+  @Test
+  public void testManyDfsClientsWhereSomeNotOpen() throws Exception {
+    // First DFSClient has no files open so doesn't renew leases.
+    final DFSClient mockClient1 = createMockClient();
+    Mockito.doReturn(false).when(mockClient1).renewLease();
+    assertSame(renewer, LeaseRenewer.getInstance(
+        FAKE_AUTHORITY, FAKE_UGI_A, mockClient1));
+
+    // Set up a file so that we start renewing our lease.
+    DFSOutputStream mockStream1 = Mockito.mock(DFSOutputStream.class);
+    long fileId = 456L;
+    renewer.put(fileId, mockStream1, mockClient1);
+
+    // Second DFSClient does renew lease
+    final DFSClient mockClient2 = createMockClient();
+    Mockito.doReturn(true).when(mockClient2).renewLease();
+    assertSame(renewer, LeaseRenewer.getInstance(
+        FAKE_AUTHORITY, FAKE_UGI_A, mockClient2));
+
+    // Set up a file so that we start renewing our lease.
+    DFSOutputStream mockStream2 = Mockito.mock(DFSOutputStream.class);
+    renewer.put(fileId, mockStream2, mockClient2);
+
+
+    // Wait for lease to get renewed
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        try {
+          Mockito.verify(mockClient1, Mockito.atLeastOnce()).renewLease();
+          Mockito.verify(mockClient2, Mockito.atLeastOnce()).renewLease();
+          return true;
+        } catch (AssertionError err) {
+          LeaseRenewer.LOG.warn("Not yet satisfied", err);
+          return false;
+        } catch (IOException e) {
+          // should not throw!
+          throw new RuntimeException(e);
+        }
+      }
+    }, 100, 10000);
+
+    renewer.closeFile(fileId, mockClient1);
+    renewer.closeFile(fileId, mockClient2);
+  }
+
+  @Test
+  public void testThreadName() throws Exception {
+    DFSOutputStream mockStream = Mockito.mock(DFSOutputStream.class);
+    long fileId = 789L;
+    Assert.assertFalse("Renewer not initially running",
+        renewer.isRunning());
+
+    // Pretend to open a file
+    renewer.put(fileId, mockStream, MOCK_DFSCLIENT);
+
+    Assert.assertTrue("Renewer should have started running",
+        renewer.isRunning());
+
+    // Check the thread name is reasonable
+    String threadName = renewer.getDaemonName();
+    Assert.assertEquals("LeaseRenewer:myuser@hdfs://nn1/", threadName);
+
+    // Pretend to close the file
+    renewer.closeFile(fileId, MOCK_DFSCLIENT);
+    renewer.setEmptyTime(Time.monotonicNow());
+
+    // Should stop the renewer running within a few seconds
+    long failTime = Time.monotonicNow() + 5000;
+    while (renewer.isRunning() && Time.monotonicNow() < failTime) {
+      Thread.sleep(50);
+    }
+    Assert.assertFalse(renewer.isRunning());
+  }
+
+}