You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/05/05 01:43:10 UTC
[06/33] hadoop git commit: HDFS-8086. Move LeaseRenewer to the
hdfs.client.impl package. Contributed by Takanobu
HDFS-8086. Move LeaseRenewer to the hdfs.client.impl package. Contributed by Takanobu
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d3d019c3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d3d019c3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d3d019c3
Branch: refs/heads/HDFS-7240
Commit: d3d019c337ecc10e9c6bbefc3a97c6cd1f5283c3
Parents: 64d30a6
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Fri May 1 15:11:09 2015 -0700
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Fri May 1 15:12:18 2015 -0700
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../java/org/apache/hadoop/hdfs/DFSClient.java | 13 +-
.../org/apache/hadoop/hdfs/LeaseRenewer.java | 512 ------------------
.../hadoop/hdfs/client/impl/LeaseRenewer.java | 514 +++++++++++++++++++
.../hadoop/hdfs/TestDistributedFileSystem.java | 59 ++-
.../java/org/apache/hadoop/hdfs/TestLease.java | 1 +
.../apache/hadoop/hdfs/TestLeaseRenewer.java | 207 --------
.../hdfs/client/impl/TestLeaseRenewer.java | 209 ++++++++
8 files changed, 769 insertions(+), 749 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3d019c3/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index b5c5e6b..179fe7e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -494,6 +494,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8292. Move conditional in fmt_time from dfs-dust.js to status.html.
(Charles Lamb via wang)
+ HDFS-8086. Move LeaseRenewer to the hdfs.client.impl package. (Takanobu
+ Asanuma via szetszwo)
+
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3d019c3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index d47992b..aaba543 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -101,6 +101,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
+import org.apache.hadoop.hdfs.client.impl.LeaseRenewer;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.AclException;
@@ -481,7 +482,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
* enforced to consistently update its local dfsclients array and
* client's filesBeingWritten map.
*/
- void putFileBeingWritten(final long inodeId, final DFSOutputStream out) {
+ public void putFileBeingWritten(final long inodeId, final DFSOutputStream out) {
synchronized(filesBeingWritten) {
filesBeingWritten.put(inodeId, out);
// update the last lease renewal time only when there was no
@@ -494,7 +495,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
}
/** Remove a file. Only called from LeaseRenewer. */
- void removeFileBeingWritten(final long inodeId) {
+ public void removeFileBeingWritten(final long inodeId) {
synchronized(filesBeingWritten) {
filesBeingWritten.remove(inodeId);
if (filesBeingWritten.isEmpty()) {
@@ -504,14 +505,14 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
}
/** Is file-being-written map empty? */
- boolean isFilesBeingWrittenEmpty() {
+ public boolean isFilesBeingWrittenEmpty() {
synchronized(filesBeingWritten) {
return filesBeingWritten.isEmpty();
}
}
/** @return true if the client is running */
- boolean isClientRunning() {
+ public boolean isClientRunning() {
return clientRunning;
}
@@ -533,7 +534,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
* @return true if lease was renewed. May return false if this
* client has been closed or has no files open.
**/
- boolean renewLease() throws IOException {
+ public boolean renewLease() throws IOException {
if (clientRunning && !isFilesBeingWrittenEmpty()) {
try {
namenode.renewLease(clientName);
@@ -565,7 +566,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
}
/** Abort and release resources held. Ignore all errors. */
- void abort() {
+ public void abort() {
clientRunning = false;
closeAllFilesBeingWritten(true);
try {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3d019c3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java
deleted file mode 100644
index 511bddb..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java
+++ /dev/null
@@ -1,512 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import java.io.IOException;
-import java.net.SocketTimeoutException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.HadoopIllegalArgumentException;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.Daemon;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.Time;
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * <p>
- * Used by {@link DFSClient} for renewing file-being-written leases
- * on the namenode.
- * When a file is opened for write (create or append),
- * namenode stores a file lease for recording the identity of the writer.
- * The writer (i.e. the DFSClient) is required to renew the lease periodically.
- * When the lease is not renewed before it expires,
- * the namenode considers the writer as failed and then it may either let
- * another writer to obtain the lease or close the file.
- * </p>
- * <p>
- * This class also provides the following functionality:
- * <ul>
- * <li>
- * It maintains a map from (namenode, user) pairs to lease renewers.
- * The same {@link LeaseRenewer} instance is used for renewing lease
- * for all the {@link DFSClient} to the same namenode and the same user.
- * </li>
- * <li>
- * Each renewer maintains a list of {@link DFSClient}.
- * Periodically the leases for all the clients are renewed.
- * A client is removed from the list when the client is closed.
- * </li>
- * <li>
- * A thread per namenode per user is used by the {@link LeaseRenewer}
- * to renew the leases.
- * </li>
- * </ul>
- * </p>
- */
-@InterfaceAudience.Private
-class LeaseRenewer {
- static final Log LOG = LogFactory.getLog(LeaseRenewer.class);
-
- static final long LEASE_RENEWER_GRACE_DEFAULT = 60*1000L;
- static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L;
-
- /** Get a {@link LeaseRenewer} instance */
- static LeaseRenewer getInstance(final String authority,
- final UserGroupInformation ugi, final DFSClient dfsc) throws IOException {
- final LeaseRenewer r = Factory.INSTANCE.get(authority, ugi);
- r.addClient(dfsc);
- return r;
- }
-
- /**
- * A factory for sharing {@link LeaseRenewer} objects
- * among {@link DFSClient} instances
- * so that there is only one renewer per authority per user.
- */
- private static class Factory {
- private static final Factory INSTANCE = new Factory();
-
- private static class Key {
- /** Namenode info */
- final String authority;
- /** User info */
- final UserGroupInformation ugi;
-
- private Key(final String authority, final UserGroupInformation ugi) {
- if (authority == null) {
- throw new HadoopIllegalArgumentException("authority == null");
- } else if (ugi == null) {
- throw new HadoopIllegalArgumentException("ugi == null");
- }
-
- this.authority = authority;
- this.ugi = ugi;
- }
-
- @Override
- public int hashCode() {
- return authority.hashCode() ^ ugi.hashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == this) {
- return true;
- }
- if (obj != null && obj instanceof Key) {
- final Key that = (Key)obj;
- return this.authority.equals(that.authority)
- && this.ugi.equals(that.ugi);
- }
- return false;
- }
-
- @Override
- public String toString() {
- return ugi.getShortUserName() + "@" + authority;
- }
- }
-
- /** A map for per user per namenode renewers. */
- private final Map<Key, LeaseRenewer> renewers = new HashMap<Key, LeaseRenewer>();
-
- /** Get a renewer. */
- private synchronized LeaseRenewer get(final String authority,
- final UserGroupInformation ugi) {
- final Key k = new Key(authority, ugi);
- LeaseRenewer r = renewers.get(k);
- if (r == null) {
- r = new LeaseRenewer(k);
- renewers.put(k, r);
- }
- return r;
- }
-
- /** Remove the given renewer. */
- private synchronized void remove(final LeaseRenewer r) {
- final LeaseRenewer stored = renewers.get(r.factorykey);
- //Since a renewer may expire, the stored renewer can be different.
- if (r == stored) {
- if (!r.clientsRunning()) {
- renewers.remove(r.factorykey);
- }
- }
- }
- }
-
- /** The time in milliseconds that the map became empty. */
- private long emptyTime = Long.MAX_VALUE;
- /** A fixed lease renewal time period in milliseconds */
- private long renewal = HdfsConstants.LEASE_SOFTLIMIT_PERIOD/2;
-
- /** A daemon for renewing lease */
- private Daemon daemon = null;
- /** Only the daemon with currentId should run. */
- private int currentId = 0;
-
- /**
- * A period in milliseconds that the lease renewer thread should run
- * after the map became empty.
- * In other words,
- * if the map is empty for a time period longer than the grace period,
- * the renewer should terminate.
- */
- private long gracePeriod;
- /**
- * The time period in milliseconds
- * that the renewer sleeps for each iteration.
- */
- private long sleepPeriod;
-
- private final Factory.Key factorykey;
-
- /** A list of clients corresponding to this renewer. */
- private final List<DFSClient> dfsclients = new ArrayList<DFSClient>();
-
- /**
- * A stringified stack trace of the call stack when the Lease Renewer
- * was instantiated. This is only generated if trace-level logging is
- * enabled on this class.
- */
- private final String instantiationTrace;
-
- private LeaseRenewer(Factory.Key factorykey) {
- this.factorykey = factorykey;
- unsyncSetGraceSleepPeriod(LEASE_RENEWER_GRACE_DEFAULT);
-
- if (LOG.isTraceEnabled()) {
- instantiationTrace = StringUtils.stringifyException(
- new Throwable("TRACE"));
- } else {
- instantiationTrace = null;
- }
- }
-
- /** @return the renewal time in milliseconds. */
- private synchronized long getRenewalTime() {
- return renewal;
- }
-
- /** Add a client. */
- private synchronized void addClient(final DFSClient dfsc) {
- for(DFSClient c : dfsclients) {
- if (c == dfsc) {
- //client already exists, nothing to do.
- return;
- }
- }
- //client not found, add it
- dfsclients.add(dfsc);
-
- //update renewal time
- final int hdfsTimeout = dfsc.getConf().getHdfsTimeout();
- if (hdfsTimeout > 0) {
- final long half = hdfsTimeout/2;
- if (half < renewal) {
- this.renewal = half;
- }
- }
- }
-
- private synchronized boolean clientsRunning() {
- for(Iterator<DFSClient> i = dfsclients.iterator(); i.hasNext(); ) {
- if (!i.next().isClientRunning()) {
- i.remove();
- }
- }
- return !dfsclients.isEmpty();
- }
-
- private synchronized long getSleepPeriod() {
- return sleepPeriod;
- }
-
- /** Set the grace period and adjust the sleep period accordingly. */
- synchronized void setGraceSleepPeriod(final long gracePeriod) {
- unsyncSetGraceSleepPeriod(gracePeriod);
- }
-
- private void unsyncSetGraceSleepPeriod(final long gracePeriod) {
- if (gracePeriod < 100L) {
- throw new HadoopIllegalArgumentException(gracePeriod
- + " = gracePeriod < 100ms is too small.");
- }
- this.gracePeriod = gracePeriod;
- final long half = gracePeriod/2;
- this.sleepPeriod = half < LEASE_RENEWER_SLEEP_DEFAULT?
- half: LEASE_RENEWER_SLEEP_DEFAULT;
- }
-
- /** Is the daemon running? */
- synchronized boolean isRunning() {
- return daemon != null && daemon.isAlive();
- }
-
- /** Does this renewer have nothing to renew? */
- public boolean isEmpty() {
- return dfsclients.isEmpty();
- }
-
- /** Used only by tests */
- synchronized String getDaemonName() {
- return daemon.getName();
- }
-
- /** Is the empty period longer than the grace period? */
- private synchronized boolean isRenewerExpired() {
- return emptyTime != Long.MAX_VALUE
- && Time.monotonicNow() - emptyTime > gracePeriod;
- }
-
- synchronized void put(final long inodeId, final DFSOutputStream out,
- final DFSClient dfsc) {
- if (dfsc.isClientRunning()) {
- if (!isRunning() || isRenewerExpired()) {
- //start a new deamon with a new id.
- final int id = ++currentId;
- daemon = new Daemon(new Runnable() {
- @Override
- public void run() {
- try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Lease renewer daemon for " + clientsString()
- + " with renew id " + id + " started");
- }
- LeaseRenewer.this.run(id);
- } catch(InterruptedException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(LeaseRenewer.this.getClass().getSimpleName()
- + " is interrupted.", e);
- }
- } finally {
- synchronized(LeaseRenewer.this) {
- Factory.INSTANCE.remove(LeaseRenewer.this);
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Lease renewer daemon for " + clientsString()
- + " with renew id " + id + " exited");
- }
- }
- }
-
- @Override
- public String toString() {
- return String.valueOf(LeaseRenewer.this);
- }
- });
- daemon.start();
- }
- dfsc.putFileBeingWritten(inodeId, out);
- emptyTime = Long.MAX_VALUE;
- }
- }
-
- @VisibleForTesting
- synchronized void setEmptyTime(long time) {
- emptyTime = time;
- }
-
- /** Close a file. */
- void closeFile(final long inodeId, final DFSClient dfsc) {
- dfsc.removeFileBeingWritten(inodeId);
-
- synchronized(this) {
- if (dfsc.isFilesBeingWrittenEmpty()) {
- dfsclients.remove(dfsc);
- }
- //update emptyTime if necessary
- if (emptyTime == Long.MAX_VALUE) {
- for(DFSClient c : dfsclients) {
- if (!c.isFilesBeingWrittenEmpty()) {
- //found a non-empty file-being-written map
- return;
- }
- }
- //discover the first time that all file-being-written maps are empty.
- emptyTime = Time.monotonicNow();
- }
- }
- }
-
- /** Close the given client. */
- synchronized void closeClient(final DFSClient dfsc) {
- dfsclients.remove(dfsc);
- if (dfsclients.isEmpty()) {
- if (!isRunning() || isRenewerExpired()) {
- Factory.INSTANCE.remove(LeaseRenewer.this);
- return;
- }
- if (emptyTime == Long.MAX_VALUE) {
- //discover the first time that the client list is empty.
- emptyTime = Time.monotonicNow();
- }
- }
-
- //update renewal time
- if (renewal == dfsc.getConf().getHdfsTimeout()/2) {
- long min = HdfsConstants.LEASE_SOFTLIMIT_PERIOD;
- for(DFSClient c : dfsclients) {
- final int timeout = c.getConf().getHdfsTimeout();
- if (timeout > 0 && timeout < min) {
- min = timeout;
- }
- }
- renewal = min/2;
- }
- }
-
- void interruptAndJoin() throws InterruptedException {
- Daemon daemonCopy = null;
- synchronized (this) {
- if (isRunning()) {
- daemon.interrupt();
- daemonCopy = daemon;
- }
- }
-
- if (daemonCopy != null) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Wait for lease checker to terminate");
- }
- daemonCopy.join();
- }
- }
-
- private void renew() throws IOException {
- final List<DFSClient> copies;
- synchronized(this) {
- copies = new ArrayList<DFSClient>(dfsclients);
- }
- //sort the client names for finding out repeated names.
- Collections.sort(copies, new Comparator<DFSClient>() {
- @Override
- public int compare(final DFSClient left, final DFSClient right) {
- return left.getClientName().compareTo(right.getClientName());
- }
- });
- String previousName = "";
- for(int i = 0; i < copies.size(); i++) {
- final DFSClient c = copies.get(i);
- //skip if current client name is the same as the previous name.
- if (!c.getClientName().equals(previousName)) {
- if (!c.renewLease()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Did not renew lease for client " +
- c);
- }
- continue;
- }
- previousName = c.getClientName();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Lease renewed for client " + previousName);
- }
- }
- }
- }
-
- /**
- * Periodically check in with the namenode and renew all the leases
- * when the lease period is half over.
- */
- private void run(final int id) throws InterruptedException {
- for(long lastRenewed = Time.monotonicNow(); !Thread.interrupted();
- Thread.sleep(getSleepPeriod())) {
- final long elapsed = Time.monotonicNow() - lastRenewed;
- if (elapsed >= getRenewalTime()) {
- try {
- renew();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Lease renewer daemon for " + clientsString()
- + " with renew id " + id + " executed");
- }
- lastRenewed = Time.monotonicNow();
- } catch (SocketTimeoutException ie) {
- LOG.warn("Failed to renew lease for " + clientsString() + " for "
- + (elapsed/1000) + " seconds. Aborting ...", ie);
- synchronized (this) {
- while (!dfsclients.isEmpty()) {
- dfsclients.get(0).abort();
- }
- }
- break;
- } catch (IOException ie) {
- LOG.warn("Failed to renew lease for " + clientsString() + " for "
- + (elapsed/1000) + " seconds. Will retry shortly ...", ie);
- }
- }
-
- synchronized(this) {
- if (id != currentId || isRenewerExpired()) {
- if (LOG.isDebugEnabled()) {
- if (id != currentId) {
- LOG.debug("Lease renewer daemon for " + clientsString()
- + " with renew id " + id + " is not current");
- } else {
- LOG.debug("Lease renewer daemon for " + clientsString()
- + " with renew id " + id + " expired");
- }
- }
- //no longer the current daemon or expired
- return;
- }
-
- // if no clients are in running state or there is no more clients
- // registered with this renewer, stop the daemon after the grace
- // period.
- if (!clientsRunning() && emptyTime == Long.MAX_VALUE) {
- emptyTime = Time.monotonicNow();
- }
- }
- }
- }
-
- @Override
- public String toString() {
- String s = getClass().getSimpleName() + ":" + factorykey;
- if (LOG.isTraceEnabled()) {
- return s + ", clients=" + clientsString()
- + ", created at " + instantiationTrace;
- }
- return s;
- }
-
- /** Get the names of all clients */
- private synchronized String clientsString() {
- if (dfsclients.isEmpty()) {
- return "[]";
- } else {
- final StringBuilder b = new StringBuilder("[").append(
- dfsclients.get(0).getClientName());
- for(int i = 1; i < dfsclients.size(); i++) {
- b.append(", ").append(dfsclients.get(i).getClientName());
- }
- return b.append("]").toString();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3d019c3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java
new file mode 100644
index 0000000..4cdf168
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java
@@ -0,0 +1,514 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.client.impl;
+
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSOutputStream;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * <p>
+ * Used by {@link org.apache.hadoop.hdfs.DFSClient} for renewing file-being-written leases
+ * on the namenode.
+ * When a file is opened for write (create or append),
+ * namenode stores a file lease for recording the identity of the writer.
+ * The writer (i.e. the DFSClient) is required to renew the lease periodically.
+ * When the lease is not renewed before it expires,
+ * the namenode considers the writer as failed and then it may either let
+ * another writer to obtain the lease or close the file.
+ * </p>
+ * <p>
+ * This class also provides the following functionality:
+ * <ul>
+ * <li>
+ * It maintains a map from (namenode, user) pairs to lease renewers.
+ * The same {@link LeaseRenewer} instance is used for renewing lease
+ * for all the {@link org.apache.hadoop.hdfs.DFSClient} to the same namenode and the same user.
+ * </li>
+ * <li>
+ * Each renewer maintains a list of {@link org.apache.hadoop.hdfs.DFSClient}.
+ * Periodically the leases for all the clients are renewed.
+ * A client is removed from the list when the client is closed.
+ * </li>
+ * <li>
+ * A thread per namenode per user is used by the {@link LeaseRenewer}
+ * to renew the leases.
+ * </li>
+ * </ul>
+ * </p>
+ */
+@InterfaceAudience.Private
+public class LeaseRenewer {
+ static final Log LOG = LogFactory.getLog(LeaseRenewer.class);
+
+ static final long LEASE_RENEWER_GRACE_DEFAULT = 60*1000L;
+ static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L;
+
+ /** Get a {@link LeaseRenewer} instance */
+ public static LeaseRenewer getInstance(final String authority,
+ final UserGroupInformation ugi, final DFSClient dfsc) throws IOException {
+ final LeaseRenewer r = Factory.INSTANCE.get(authority, ugi);
+ r.addClient(dfsc);
+ return r;
+ }
+
+ /**
+ * A factory for sharing {@link LeaseRenewer} objects
+ * among {@link DFSClient} instances
+ * so that there is only one renewer per authority per user.
+ */
+ private static class Factory {
+ private static final Factory INSTANCE = new Factory();
+
+ private static class Key {
+ /** Namenode info */
+ final String authority;
+ /** User info */
+ final UserGroupInformation ugi;
+
+ private Key(final String authority, final UserGroupInformation ugi) {
+ if (authority == null) {
+ throw new HadoopIllegalArgumentException("authority == null");
+ } else if (ugi == null) {
+ throw new HadoopIllegalArgumentException("ugi == null");
+ }
+
+ this.authority = authority;
+ this.ugi = ugi;
+ }
+
+ @Override
+ public int hashCode() {
+ return authority.hashCode() ^ ugi.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (obj != null && obj instanceof Key) {
+ final Key that = (Key)obj;
+ return this.authority.equals(that.authority)
+ && this.ugi.equals(that.ugi);
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return ugi.getShortUserName() + "@" + authority;
+ }
+ }
+
+ /** A map for per user per namenode renewers. */
+ private final Map<Key, LeaseRenewer> renewers = new HashMap<Key, LeaseRenewer>();
+
+ /** Get a renewer. */
+ private synchronized LeaseRenewer get(final String authority,
+ final UserGroupInformation ugi) {
+ final Key k = new Key(authority, ugi);
+ LeaseRenewer r = renewers.get(k);
+ if (r == null) {
+ r = new LeaseRenewer(k);
+ renewers.put(k, r);
+ }
+ return r;
+ }
+
+ /** Remove the given renewer. */
+ private synchronized void remove(final LeaseRenewer r) {
+ final LeaseRenewer stored = renewers.get(r.factorykey);
+ //Since a renewer may expire, the stored renewer can be different.
+ if (r == stored) {
+ if (!r.clientsRunning()) {
+ renewers.remove(r.factorykey);
+ }
+ }
+ }
+ }
+
+ /** The time in milliseconds that the map became empty. */
+ private long emptyTime = Long.MAX_VALUE;
+ /** A fixed lease renewal time period in milliseconds */
+ private long renewal = HdfsConstants.LEASE_SOFTLIMIT_PERIOD/2;
+
+ /** A daemon for renewing lease */
+ private Daemon daemon = null;
+ /** Only the daemon with currentId should run. */
+ private int currentId = 0;
+
+ /**
+ * A period in milliseconds that the lease renewer thread should run
+ * after the map became empty.
+ * In other words,
+ * if the map is empty for a time period longer than the grace period,
+ * the renewer should terminate.
+ */
+ private long gracePeriod;
+ /**
+ * The time period in milliseconds
+ * that the renewer sleeps for each iteration.
+ */
+ private long sleepPeriod;
+
+ private final Factory.Key factorykey;
+
+ /** A list of clients corresponding to this renewer. */
+ private final List<DFSClient> dfsclients = new ArrayList<DFSClient>();
+
+ /**
+ * A stringified stack trace of the call stack when the Lease Renewer
+ * was instantiated. This is only generated if trace-level logging is
+ * enabled on this class.
+ */
+ private final String instantiationTrace;
+
+ private LeaseRenewer(Factory.Key factorykey) {
+ this.factorykey = factorykey;
+ unsyncSetGraceSleepPeriod(LEASE_RENEWER_GRACE_DEFAULT);
+
+ if (LOG.isTraceEnabled()) {
+ instantiationTrace = StringUtils.stringifyException(
+ new Throwable("TRACE"));
+ } else {
+ instantiationTrace = null;
+ }
+ }
+
+ /** @return the renewal time in milliseconds. */
+ private synchronized long getRenewalTime() {
+ return renewal;
+ }
+
+ /** Add a client. */
+ private synchronized void addClient(final DFSClient dfsc) {
+ for(DFSClient c : dfsclients) {
+ if (c == dfsc) {
+ //client already exists, nothing to do.
+ return;
+ }
+ }
+ //client not found, add it
+ dfsclients.add(dfsc);
+
+ //update renewal time
+ final int hdfsTimeout = dfsc.getConf().getHdfsTimeout();
+ if (hdfsTimeout > 0) {
+ final long half = hdfsTimeout/2;
+ if (half < renewal) {
+ this.renewal = half;
+ }
+ }
+ }
+
+ private synchronized boolean clientsRunning() {
+ for(Iterator<DFSClient> i = dfsclients.iterator(); i.hasNext(); ) {
+ if (!i.next().isClientRunning()) {
+ i.remove();
+ }
+ }
+ return !dfsclients.isEmpty();
+ }
+
+ private synchronized long getSleepPeriod() {
+ return sleepPeriod;
+ }
+
+ /** Set the grace period and adjust the sleep period accordingly. */
+ synchronized void setGraceSleepPeriod(final long gracePeriod) {
+ unsyncSetGraceSleepPeriod(gracePeriod);
+ }
+
+ private void unsyncSetGraceSleepPeriod(final long gracePeriod) {
+ if (gracePeriod < 100L) {
+ throw new HadoopIllegalArgumentException(gracePeriod
+ + " = gracePeriod < 100ms is too small.");
+ }
+ this.gracePeriod = gracePeriod;
+ final long half = gracePeriod/2;
+ this.sleepPeriod = half < LEASE_RENEWER_SLEEP_DEFAULT?
+ half: LEASE_RENEWER_SLEEP_DEFAULT;
+ }
+
+ /** Is the daemon running? */
+ synchronized boolean isRunning() {
+ return daemon != null && daemon.isAlive();
+ }
+
+ /** Does this renewer have nothing to renew? */
+ public boolean isEmpty() {
+ return dfsclients.isEmpty();
+ }
+
+ /** Used only by tests */
+ synchronized String getDaemonName() {
+ return daemon.getName();
+ }
+
+ /** Is the empty period longer than the grace period? */
+ private synchronized boolean isRenewerExpired() {
+ return emptyTime != Long.MAX_VALUE
+ && Time.monotonicNow() - emptyTime > gracePeriod;
+ }
+
+ public synchronized void put(final long inodeId, final DFSOutputStream out,
+ final DFSClient dfsc) {
+ if (dfsc.isClientRunning()) {
+ if (!isRunning() || isRenewerExpired()) {
+ //start a new deamon with a new id.
+ final int id = ++currentId;
+ daemon = new Daemon(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Lease renewer daemon for " + clientsString()
+ + " with renew id " + id + " started");
+ }
+ LeaseRenewer.this.run(id);
+ } catch(InterruptedException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(LeaseRenewer.this.getClass().getSimpleName()
+ + " is interrupted.", e);
+ }
+ } finally {
+ synchronized(LeaseRenewer.this) {
+ Factory.INSTANCE.remove(LeaseRenewer.this);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Lease renewer daemon for " + clientsString()
+ + " with renew id " + id + " exited");
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return String.valueOf(LeaseRenewer.this);
+ }
+ });
+ daemon.start();
+ }
+ dfsc.putFileBeingWritten(inodeId, out);
+ emptyTime = Long.MAX_VALUE;
+ }
+ }
+
+ @VisibleForTesting
+ synchronized void setEmptyTime(long time) {
+ emptyTime = time;
+ }
+
+ /** Close a file. */
+ public void closeFile(final long inodeId, final DFSClient dfsc) {
+ dfsc.removeFileBeingWritten(inodeId);
+
+ synchronized(this) {
+ if (dfsc.isFilesBeingWrittenEmpty()) {
+ dfsclients.remove(dfsc);
+ }
+ //update emptyTime if necessary
+ if (emptyTime == Long.MAX_VALUE) {
+ for(DFSClient c : dfsclients) {
+ if (!c.isFilesBeingWrittenEmpty()) {
+ //found a non-empty file-being-written map
+ return;
+ }
+ }
+ //discover the first time that all file-being-written maps are empty.
+ emptyTime = Time.monotonicNow();
+ }
+ }
+ }
+
+ /** Close the given client. */
+ public synchronized void closeClient(final DFSClient dfsc) {
+ dfsclients.remove(dfsc);
+ if (dfsclients.isEmpty()) {
+ if (!isRunning() || isRenewerExpired()) {
+ Factory.INSTANCE.remove(LeaseRenewer.this);
+ return;
+ }
+ if (emptyTime == Long.MAX_VALUE) {
+ //discover the first time that the client list is empty.
+ emptyTime = Time.monotonicNow();
+ }
+ }
+
+ //update renewal time
+ if (renewal == dfsc.getConf().getHdfsTimeout()/2) {
+ long min = HdfsConstants.LEASE_SOFTLIMIT_PERIOD;
+ for(DFSClient c : dfsclients) {
+ final int timeout = c.getConf().getHdfsTimeout();
+ if (timeout > 0 && timeout < min) {
+ min = timeout;
+ }
+ }
+ renewal = min/2;
+ }
+ }
+
+ public void interruptAndJoin() throws InterruptedException {
+ Daemon daemonCopy = null;
+ synchronized (this) {
+ if (isRunning()) {
+ daemon.interrupt();
+ daemonCopy = daemon;
+ }
+ }
+
+ if (daemonCopy != null) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Wait for lease checker to terminate");
+ }
+ daemonCopy.join();
+ }
+ }
+
+ private void renew() throws IOException {
+ final List<DFSClient> copies;
+ synchronized(this) {
+ copies = new ArrayList<DFSClient>(dfsclients);
+ }
+ //sort the client names for finding out repeated names.
+ Collections.sort(copies, new Comparator<DFSClient>() {
+ @Override
+ public int compare(final DFSClient left, final DFSClient right) {
+ return left.getClientName().compareTo(right.getClientName());
+ }
+ });
+ String previousName = "";
+ for(int i = 0; i < copies.size(); i++) {
+ final DFSClient c = copies.get(i);
+ //skip if current client name is the same as the previous name.
+ if (!c.getClientName().equals(previousName)) {
+ if (!c.renewLease()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Did not renew lease for client " +
+ c);
+ }
+ continue;
+ }
+ previousName = c.getClientName();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Lease renewed for client " + previousName);
+ }
+ }
+ }
+ }
+
+ /**
+ * Periodically check in with the namenode and renew all the leases
+ * when the lease period is half over.
+ */
+ private void run(final int id) throws InterruptedException {
+ for(long lastRenewed = Time.monotonicNow(); !Thread.interrupted();
+ Thread.sleep(getSleepPeriod())) {
+ final long elapsed = Time.monotonicNow() - lastRenewed;
+ if (elapsed >= getRenewalTime()) {
+ try {
+ renew();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Lease renewer daemon for " + clientsString()
+ + " with renew id " + id + " executed");
+ }
+ lastRenewed = Time.monotonicNow();
+ } catch (SocketTimeoutException ie) {
+ LOG.warn("Failed to renew lease for " + clientsString() + " for "
+ + (elapsed/1000) + " seconds. Aborting ...", ie);
+ synchronized (this) {
+ while (!dfsclients.isEmpty()) {
+ dfsclients.get(0).abort();
+ }
+ }
+ break;
+ } catch (IOException ie) {
+ LOG.warn("Failed to renew lease for " + clientsString() + " for "
+ + (elapsed/1000) + " seconds. Will retry shortly ...", ie);
+ }
+ }
+
+ synchronized(this) {
+ if (id != currentId || isRenewerExpired()) {
+ if (LOG.isDebugEnabled()) {
+ if (id != currentId) {
+ LOG.debug("Lease renewer daemon for " + clientsString()
+ + " with renew id " + id + " is not current");
+ } else {
+ LOG.debug("Lease renewer daemon for " + clientsString()
+ + " with renew id " + id + " expired");
+ }
+ }
+ //no longer the current daemon or expired
+ return;
+ }
+
+ // if no clients are in running state or there is no more clients
+ // registered with this renewer, stop the daemon after the grace
+ // period.
+ if (!clientsRunning() && emptyTime == Long.MAX_VALUE) {
+ emptyTime = Time.monotonicNow();
+ }
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ String s = getClass().getSimpleName() + ":" + factorykey;
+ if (LOG.isTraceEnabled()) {
+ return s + ", clients=" + clientsString()
+ + ", created at " + instantiationTrace;
+ }
+ return s;
+ }
+
+ /** Get the names of all clients */
+ private synchronized String clientsString() {
+ if (dfsclients.isEmpty()) {
+ return "[]";
+ } else {
+ final StringBuilder b = new StringBuilder("[").append(
+ dfsclients.get(0).getClientName());
+ for(int i = 1; i < dfsclients.size(); i++) {
+ b.append(", ").append(dfsclients.get(i).getClientName());
+ }
+ return b.append("]").toString();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3d019c3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
index 0689a53..837665e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
@@ -31,6 +31,7 @@ import static org.mockito.Mockito.mock;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.SocketTimeoutException;
@@ -64,6 +65,7 @@ import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.VolumeId;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
+import org.apache.hadoop.hdfs.client.impl.LeaseRenewer;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
@@ -264,78 +266,84 @@ public class TestDistributedFileSystem {
{
final DistributedFileSystem dfs = cluster.getFileSystem();
- dfs.dfs.getLeaseRenewer().setGraceSleepPeriod(grace);
- assertFalse(dfs.dfs.getLeaseRenewer().isRunning());
+ Method setMethod = dfs.dfs.getLeaseRenewer().getClass()
+ .getDeclaredMethod("setGraceSleepPeriod", long.class);
+ setMethod.setAccessible(true);
+ setMethod.invoke(dfs.dfs.getLeaseRenewer(), grace);
+ Method checkMethod = dfs.dfs.getLeaseRenewer().getClass()
+ .getDeclaredMethod("isRunning");
+ checkMethod.setAccessible(true);
+ assertFalse((boolean) checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
{
//create a file
final FSDataOutputStream out = dfs.create(filepaths[0]);
- assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+ assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
//write something
out.writeLong(millis);
- assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+ assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
//close
out.close();
Thread.sleep(grace/4*3);
//within grace period
- assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+ assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
for(int i = 0; i < 3; i++) {
- if (dfs.dfs.getLeaseRenewer().isRunning()) {
+ if ((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer())) {
Thread.sleep(grace/2);
}
}
//passed grace period
- assertFalse(dfs.dfs.getLeaseRenewer().isRunning());
+ assertFalse((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
}
{
//create file1
final FSDataOutputStream out1 = dfs.create(filepaths[1]);
- assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+ assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
//create file2
final FSDataOutputStream out2 = dfs.create(filepaths[2]);
- assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+ assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
//write something to file1
out1.writeLong(millis);
- assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+ assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
//close file1
out1.close();
- assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+ assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
//write something to file2
out2.writeLong(millis);
- assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+ assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
//close file2
out2.close();
Thread.sleep(grace/4*3);
//within grace period
- assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+ assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
}
{
//create file3
final FSDataOutputStream out3 = dfs.create(filepaths[3]);
- assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+ assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
Thread.sleep(grace/4*3);
//passed previous grace period, should still running
- assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+ assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
//write something to file3
out3.writeLong(millis);
- assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+ assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
//close file3
out3.close();
- assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+ assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
Thread.sleep(grace/4*3);
//within grace period
- assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+ assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
for(int i = 0; i < 3; i++) {
- if (dfs.dfs.getLeaseRenewer().isRunning()) {
+ if ((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer())) {
Thread.sleep(grace/2);
}
}
//passed grace period
- assertFalse(dfs.dfs.getLeaseRenewer().isRunning());
+ assertFalse((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
}
dfs.close();
@@ -364,15 +372,18 @@ public class TestDistributedFileSystem {
{
final DistributedFileSystem dfs = cluster.getFileSystem();
- assertFalse(dfs.dfs.getLeaseRenewer().isRunning());
+ Method checkMethod = dfs.dfs.getLeaseRenewer().getClass()
+ .getDeclaredMethod("isRunning");
+ checkMethod.setAccessible(true);
+ assertFalse((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
//open and check the file
FSDataInputStream in = dfs.open(filepaths[0]);
- assertFalse(dfs.dfs.getLeaseRenewer().isRunning());
+ assertFalse((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
assertEquals(millis, in.readLong());
- assertFalse(dfs.dfs.getLeaseRenewer().isRunning());
+ assertFalse((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
in.close();
- assertFalse(dfs.dfs.getLeaseRenewer().isRunning());
+ assertFalse((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
dfs.close();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3d019c3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java
index 1cf7add..9b5a7c0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.client.impl.LeaseRenewer;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3d019c3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRenewer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRenewer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRenewer.java
deleted file mode 100644
index f091db7..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRenewer.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import static org.junit.Assert.assertSame;
-
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.util.Time;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import com.google.common.base.Supplier;
-
-public class TestLeaseRenewer {
- private final String FAKE_AUTHORITY="hdfs://nn1/";
- private final UserGroupInformation FAKE_UGI_A =
- UserGroupInformation.createUserForTesting(
- "myuser", new String[]{"group1"});
- private final UserGroupInformation FAKE_UGI_B =
- UserGroupInformation.createUserForTesting(
- "myuser", new String[]{"group1"});
-
- private DFSClient MOCK_DFSCLIENT;
- private LeaseRenewer renewer;
-
- /** Cause renewals often so test runs quickly. */
- private static final long FAST_GRACE_PERIOD = 100L;
-
- @Before
- public void setupMocksAndRenewer() throws IOException {
- MOCK_DFSCLIENT = createMockClient();
-
- renewer = LeaseRenewer.getInstance(
- FAKE_AUTHORITY, FAKE_UGI_A, MOCK_DFSCLIENT);
- renewer.setGraceSleepPeriod(FAST_GRACE_PERIOD);
-}
-
- private DFSClient createMockClient() {
- final DfsClientConf mockConf = Mockito.mock(DfsClientConf.class);
- Mockito.doReturn((int)FAST_GRACE_PERIOD).when(mockConf).getHdfsTimeout();
-
- DFSClient mock = Mockito.mock(DFSClient.class);
- Mockito.doReturn(true).when(mock).isClientRunning();
- Mockito.doReturn(mockConf).when(mock).getConf();
- Mockito.doReturn("myclient").when(mock).getClientName();
- return mock;
- }
-
- @Test
- public void testInstanceSharing() throws IOException {
- // Two lease renewers with the same UGI should return
- // the same instance
- LeaseRenewer lr = LeaseRenewer.getInstance(
- FAKE_AUTHORITY, FAKE_UGI_A, MOCK_DFSCLIENT);
- LeaseRenewer lr2 = LeaseRenewer.getInstance(
- FAKE_AUTHORITY, FAKE_UGI_A, MOCK_DFSCLIENT);
- Assert.assertSame(lr, lr2);
-
- // But a different UGI should return a different instance
- LeaseRenewer lr3 = LeaseRenewer.getInstance(
- FAKE_AUTHORITY, FAKE_UGI_B, MOCK_DFSCLIENT);
- Assert.assertNotSame(lr, lr3);
-
- // A different authority with same UGI should also be a different
- // instance.
- LeaseRenewer lr4 = LeaseRenewer.getInstance(
- "someOtherAuthority", FAKE_UGI_B, MOCK_DFSCLIENT);
- Assert.assertNotSame(lr, lr4);
- Assert.assertNotSame(lr3, lr4);
- }
-
- @Test
- public void testRenewal() throws Exception {
- // Keep track of how many times the lease gets renewed
- final AtomicInteger leaseRenewalCount = new AtomicInteger();
- Mockito.doAnswer(new Answer<Boolean>() {
- @Override
- public Boolean answer(InvocationOnMock invocation) throws Throwable {
- leaseRenewalCount.incrementAndGet();
- return true;
- }
- }).when(MOCK_DFSCLIENT).renewLease();
-
-
- // Set up a file so that we start renewing our lease.
- DFSOutputStream mockStream = Mockito.mock(DFSOutputStream.class);
- long fileId = 123L;
- renewer.put(fileId, mockStream, MOCK_DFSCLIENT);
-
- // Wait for lease to get renewed
- long failTime = Time.monotonicNow() + 5000;
- while (Time.monotonicNow() < failTime &&
- leaseRenewalCount.get() == 0) {
- Thread.sleep(50);
- }
- if (leaseRenewalCount.get() == 0) {
- Assert.fail("Did not renew lease at all!");
- }
-
- renewer.closeFile(fileId, MOCK_DFSCLIENT);
- }
-
- /**
- * Regression test for HDFS-2810. In this bug, the LeaseRenewer has handles
- * to several DFSClients with the same name, the first of which has no files
- * open. Previously, this was causing the lease to not get renewed.
- */
- @Test
- public void testManyDfsClientsWhereSomeNotOpen() throws Exception {
- // First DFSClient has no files open so doesn't renew leases.
- final DFSClient mockClient1 = createMockClient();
- Mockito.doReturn(false).when(mockClient1).renewLease();
- assertSame(renewer, LeaseRenewer.getInstance(
- FAKE_AUTHORITY, FAKE_UGI_A, mockClient1));
-
- // Set up a file so that we start renewing our lease.
- DFSOutputStream mockStream1 = Mockito.mock(DFSOutputStream.class);
- long fileId = 456L;
- renewer.put(fileId, mockStream1, mockClient1);
-
- // Second DFSClient does renew lease
- final DFSClient mockClient2 = createMockClient();
- Mockito.doReturn(true).when(mockClient2).renewLease();
- assertSame(renewer, LeaseRenewer.getInstance(
- FAKE_AUTHORITY, FAKE_UGI_A, mockClient2));
-
- // Set up a file so that we start renewing our lease.
- DFSOutputStream mockStream2 = Mockito.mock(DFSOutputStream.class);
- renewer.put(fileId, mockStream2, mockClient2);
-
-
- // Wait for lease to get renewed
- GenericTestUtils.waitFor(new Supplier<Boolean>() {
- @Override
- public Boolean get() {
- try {
- Mockito.verify(mockClient1, Mockito.atLeastOnce()).renewLease();
- Mockito.verify(mockClient2, Mockito.atLeastOnce()).renewLease();
- return true;
- } catch (AssertionError err) {
- LeaseRenewer.LOG.warn("Not yet satisfied", err);
- return false;
- } catch (IOException e) {
- // should not throw!
- throw new RuntimeException(e);
- }
- }
- }, 100, 10000);
-
- renewer.closeFile(fileId, mockClient1);
- renewer.closeFile(fileId, mockClient2);
- }
-
- @Test
- public void testThreadName() throws Exception {
- DFSOutputStream mockStream = Mockito.mock(DFSOutputStream.class);
- long fileId = 789L;
- Assert.assertFalse("Renewer not initially running",
- renewer.isRunning());
-
- // Pretend to open a file
- renewer.put(fileId, mockStream, MOCK_DFSCLIENT);
-
- Assert.assertTrue("Renewer should have started running",
- renewer.isRunning());
-
- // Check the thread name is reasonable
- String threadName = renewer.getDaemonName();
- Assert.assertEquals("LeaseRenewer:myuser@hdfs://nn1/", threadName);
-
- // Pretend to close the file
- renewer.closeFile(fileId, MOCK_DFSCLIENT);
- renewer.setEmptyTime(Time.monotonicNow());
-
- // Should stop the renewer running within a few seconds
- long failTime = Time.monotonicNow() + 5000;
- while (renewer.isRunning() && Time.monotonicNow() < failTime) {
- Thread.sleep(50);
- }
- Assert.assertFalse(renewer.isRunning());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3d019c3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestLeaseRenewer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestLeaseRenewer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestLeaseRenewer.java
new file mode 100644
index 0000000..a4e00d9
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestLeaseRenewer.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.client.impl;
+
+import static org.junit.Assert.assertSame;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSOutputStream;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Time;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import com.google.common.base.Supplier;
+
+public class TestLeaseRenewer {
+ private final String FAKE_AUTHORITY="hdfs://nn1/";
+ private final UserGroupInformation FAKE_UGI_A =
+ UserGroupInformation.createUserForTesting(
+ "myuser", new String[]{"group1"});
+ private final UserGroupInformation FAKE_UGI_B =
+ UserGroupInformation.createUserForTesting(
+ "myuser", new String[]{"group1"});
+
+ private DFSClient MOCK_DFSCLIENT;
+ private LeaseRenewer renewer;
+
+ /** Cause renewals often so test runs quickly. */
+ private static final long FAST_GRACE_PERIOD = 100L;
+
+ @Before
+ public void setupMocksAndRenewer() throws IOException {
+ MOCK_DFSCLIENT = createMockClient();
+
+ renewer = LeaseRenewer.getInstance(
+ FAKE_AUTHORITY, FAKE_UGI_A, MOCK_DFSCLIENT);
+ renewer.setGraceSleepPeriod(FAST_GRACE_PERIOD);
+}
+
+ private DFSClient createMockClient() {
+ final DfsClientConf mockConf = Mockito.mock(DfsClientConf.class);
+ Mockito.doReturn((int)FAST_GRACE_PERIOD).when(mockConf).getHdfsTimeout();
+
+ DFSClient mock = Mockito.mock(DFSClient.class);
+ Mockito.doReturn(true).when(mock).isClientRunning();
+ Mockito.doReturn(mockConf).when(mock).getConf();
+ Mockito.doReturn("myclient").when(mock).getClientName();
+ return mock;
+ }
+
+ @Test
+ public void testInstanceSharing() throws IOException {
+ // Two lease renewers with the same UGI should return
+ // the same instance
+ LeaseRenewer lr = LeaseRenewer.getInstance(
+ FAKE_AUTHORITY, FAKE_UGI_A, MOCK_DFSCLIENT);
+ LeaseRenewer lr2 = LeaseRenewer.getInstance(
+ FAKE_AUTHORITY, FAKE_UGI_A, MOCK_DFSCLIENT);
+ Assert.assertSame(lr, lr2);
+
+ // But a different UGI should return a different instance
+ LeaseRenewer lr3 = LeaseRenewer.getInstance(
+ FAKE_AUTHORITY, FAKE_UGI_B, MOCK_DFSCLIENT);
+ Assert.assertNotSame(lr, lr3);
+
+ // A different authority with same UGI should also be a different
+ // instance.
+ LeaseRenewer lr4 = LeaseRenewer.getInstance(
+ "someOtherAuthority", FAKE_UGI_B, MOCK_DFSCLIENT);
+ Assert.assertNotSame(lr, lr4);
+ Assert.assertNotSame(lr3, lr4);
+ }
+
+ @Test
+ public void testRenewal() throws Exception {
+ // Keep track of how many times the lease gets renewed
+ final AtomicInteger leaseRenewalCount = new AtomicInteger();
+ Mockito.doAnswer(new Answer<Boolean>() {
+ @Override
+ public Boolean answer(InvocationOnMock invocation) throws Throwable {
+ leaseRenewalCount.incrementAndGet();
+ return true;
+ }
+ }).when(MOCK_DFSCLIENT).renewLease();
+
+
+ // Set up a file so that we start renewing our lease.
+ DFSOutputStream mockStream = Mockito.mock(DFSOutputStream.class);
+ long fileId = 123L;
+ renewer.put(fileId, mockStream, MOCK_DFSCLIENT);
+
+ // Wait for lease to get renewed
+ long failTime = Time.monotonicNow() + 5000;
+ while (Time.monotonicNow() < failTime &&
+ leaseRenewalCount.get() == 0) {
+ Thread.sleep(50);
+ }
+ if (leaseRenewalCount.get() == 0) {
+ Assert.fail("Did not renew lease at all!");
+ }
+
+ renewer.closeFile(fileId, MOCK_DFSCLIENT);
+ }
+
+ /**
+ * Regression test for HDFS-2810. In this bug, the LeaseRenewer has handles
+ * to several DFSClients with the same name, the first of which has no files
+ * open. Previously, this was causing the lease to not get renewed.
+ */
+ @Test
+ public void testManyDfsClientsWhereSomeNotOpen() throws Exception {
+ // First DFSClient has no files open so doesn't renew leases.
+ final DFSClient mockClient1 = createMockClient();
+ Mockito.doReturn(false).when(mockClient1).renewLease();
+ assertSame(renewer, LeaseRenewer.getInstance(
+ FAKE_AUTHORITY, FAKE_UGI_A, mockClient1));
+
+ // Set up a file so that we start renewing our lease.
+ DFSOutputStream mockStream1 = Mockito.mock(DFSOutputStream.class);
+ long fileId = 456L;
+ renewer.put(fileId, mockStream1, mockClient1);
+
+ // Second DFSClient does renew lease
+ final DFSClient mockClient2 = createMockClient();
+ Mockito.doReturn(true).when(mockClient2).renewLease();
+ assertSame(renewer, LeaseRenewer.getInstance(
+ FAKE_AUTHORITY, FAKE_UGI_A, mockClient2));
+
+ // Set up a file so that we start renewing our lease.
+ DFSOutputStream mockStream2 = Mockito.mock(DFSOutputStream.class);
+ renewer.put(fileId, mockStream2, mockClient2);
+
+
+ // Wait for lease to get renewed
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ try {
+ Mockito.verify(mockClient1, Mockito.atLeastOnce()).renewLease();
+ Mockito.verify(mockClient2, Mockito.atLeastOnce()).renewLease();
+ return true;
+ } catch (AssertionError err) {
+ LeaseRenewer.LOG.warn("Not yet satisfied", err);
+ return false;
+ } catch (IOException e) {
+ // should not throw!
+ throw new RuntimeException(e);
+ }
+ }
+ }, 100, 10000);
+
+ renewer.closeFile(fileId, mockClient1);
+ renewer.closeFile(fileId, mockClient2);
+ }
+
+ @Test
+ public void testThreadName() throws Exception {
+ DFSOutputStream mockStream = Mockito.mock(DFSOutputStream.class);
+ long fileId = 789L;
+ Assert.assertFalse("Renewer not initially running",
+ renewer.isRunning());
+
+ // Pretend to open a file
+ renewer.put(fileId, mockStream, MOCK_DFSCLIENT);
+
+ Assert.assertTrue("Renewer should have started running",
+ renewer.isRunning());
+
+ // Check the thread name is reasonable
+ String threadName = renewer.getDaemonName();
+ Assert.assertEquals("LeaseRenewer:myuser@hdfs://nn1/", threadName);
+
+ // Pretend to close the file
+ renewer.closeFile(fileId, MOCK_DFSCLIENT);
+ renewer.setEmptyTime(Time.monotonicNow());
+
+ // Should stop the renewer running within a few seconds
+ long failTime = Time.monotonicNow() + 5000;
+ while (renewer.isRunning() && Time.monotonicNow() < failTime) {
+ Thread.sleep(50);
+ }
+ Assert.assertFalse(renewer.isRunning());
+ }
+
+}