You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2012/11/21 13:29:38 UTC
svn commit: r1412077 - in
/hadoop/common/trunk/hadoop-common-project/hadoop-common: CHANGES.txt
src/main/java/org/apache/hadoop/fs/DelegationTokenRenewer.java
src/test/java/org/apache/hadoop/fs/TestDelegationTokenRenewer.java
Author: tomwhite
Date: Wed Nov 21 12:29:37 2012
New Revision: 1412077
URL: http://svn.apache.org/viewvc?rev=1412077&view=rev
Log:
HADOOP-9049. DelegationTokenRenewer needs to be Singleton and FileSystems should register/deregister to/from. Contributed by Karthik Kambatla.
Added:
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestDelegationTokenRenewer.java (with props)
Modified:
hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DelegationTokenRenewer.java
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1412077&r1=1412076&r2=1412077&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt Wed Nov 21 12:29:37 2012
@@ -444,6 +444,9 @@ Release 2.0.3-alpha - Unreleased
HADOOP-6607. Add different variants of non caching HTTP headers. (tucu)
+ HADOOP-9049. DelegationTokenRenewer needs to be Singleton and FileSystems
+ should register/deregister to/from. (Karthik Kambatla via tomwhite)
+
Release 2.0.2-alpha - 2012-09-07
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DelegationTokenRenewer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DelegationTokenRenewer.java?rev=1412077&r1=1412076&r2=1412077&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DelegationTokenRenewer.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DelegationTokenRenewer.java Wed Nov 21 12:29:37 2012
@@ -33,7 +33,7 @@ import org.apache.hadoop.util.Time;
* A daemon thread that waits for the next file system to renew.
*/
@InterfaceAudience.Private
-public class DelegationTokenRenewer<T extends FileSystem & DelegationTokenRenewer.Renewable>
+public class DelegationTokenRenewer
extends Thread {
/** The renewable interface used by the renewer. */
public interface Renewable {
@@ -93,7 +93,7 @@ public class DelegationTokenRenewer<T ex
* @param newTime the new time
*/
private void updateRenewalTime() {
- renewalTime = RENEW_CYCLE + Time.now();
+ renewalTime = renewCycle + Time.now();
}
/**
@@ -134,34 +134,69 @@ public class DelegationTokenRenewer<T ex
}
/** Wait for 95% of a day between renewals */
- private static final int RENEW_CYCLE = 24 * 60 * 60 * 950;
+ private static final int RENEW_CYCLE = 24 * 60 * 60 * 950;
- private DelayQueue<RenewAction<T>> queue = new DelayQueue<RenewAction<T>>();
+ @InterfaceAudience.Private
+ protected static int renewCycle = RENEW_CYCLE;
- public DelegationTokenRenewer(final Class<T> clazz) {
+ /** Queue to maintain the RenewActions to be processed by the {@link #run()} */
+ private volatile DelayQueue<RenewAction<?>> queue = new DelayQueue<RenewAction<?>>();
+
+ /**
+ * Create the singleton instance. However, the thread can be started lazily in
+ * {@link #addRenewAction(FileSystem)}
+ */
+ private static DelegationTokenRenewer INSTANCE = null;
+
+ private DelegationTokenRenewer(final Class<? extends FileSystem> clazz) {
super(clazz.getSimpleName() + "-" + DelegationTokenRenewer.class.getSimpleName());
setDaemon(true);
}
+ public static synchronized DelegationTokenRenewer getInstance() {
+ if (INSTANCE == null) {
+ INSTANCE = new DelegationTokenRenewer(FileSystem.class);
+ }
+ return INSTANCE;
+ }
+
/** Add a renew action to the queue. */
- public void addRenewAction(final T fs) {
+ public synchronized <T extends FileSystem & Renewable> void addRenewAction(final T fs) {
queue.add(new RenewAction<T>(fs));
+ if (!isAlive()) {
+ start();
+ }
}
+ /** Remove the associated renew action from the queue */
+ public synchronized <T extends FileSystem & Renewable> void removeRenewAction(
+ final T fs) {
+ for (RenewAction<?> action : queue) {
+ if (action.weakFs.get() == fs) {
+ queue.remove(action);
+ return;
+ }
+ }
+ }
+
+ @SuppressWarnings("static-access")
@Override
public void run() {
for(;;) {
- RenewAction<T> action = null;
+ RenewAction<?> action = null;
try {
- action = queue.take();
- if (action.renew()) {
- action.updateRenewalTime();
- queue.add(action);
+ synchronized (this) {
+ action = queue.take();
+ if (action.renew()) {
+ action.updateRenewalTime();
+ queue.add(action);
+ }
}
} catch (InterruptedException ie) {
return;
} catch (Exception ie) {
- T.LOG.warn("Failed to renew token, action=" + action, ie);
+ action.weakFs.get().LOG.warn("Failed to renew token, action=" + action,
+ ie);
}
}
}
Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestDelegationTokenRenewer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestDelegationTokenRenewer.java?rev=1412077&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestDelegationTokenRenewer.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestDelegationTokenRenewer.java Wed Nov 21 12:29:37 2012
@@ -0,0 +1,159 @@
+package org.apache.hadoop.fs;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.Progressable;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestDelegationTokenRenewer {
+ private static final int RENEW_CYCLE = 1000;
+ private static final int MAX_RENEWALS = 100;
+
+ @SuppressWarnings("rawtypes")
+ static class TestToken extends Token {
+ public volatile int renewCount = 0;
+
+ @Override
+ public long renew(Configuration conf) {
+ if (renewCount == MAX_RENEWALS) {
+ Thread.currentThread().interrupt();
+ } else {
+ renewCount++;
+ }
+ return renewCount;
+ }
+ }
+
+ static class TestFileSystem extends FileSystem implements
+ DelegationTokenRenewer.Renewable {
+ private Configuration mockConf = mock(Configuration.class);;
+ private TestToken testToken = new TestToken();
+
+ @Override
+ public Configuration getConf() {
+ return mockConf;
+ }
+
+ @Override
+ public Token<?> getRenewToken() {
+ return testToken;
+ }
+
+ @Override
+ public URI getUri() {
+ return null;
+ }
+
+ @Override
+ public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+ return null;
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f, FsPermission permission,
+ boolean overwrite, int bufferSize, short replication, long blockSize,
+ Progressable progress) throws IOException {
+ return null;
+ }
+
+ @Override
+ public FSDataOutputStream append(Path f, int bufferSize,
+ Progressable progress) throws IOException {
+ return null;
+ }
+
+ @Override
+ public boolean rename(Path src, Path dst) throws IOException {
+ return false;
+ }
+
+ @Override
+ public boolean delete(Path f, boolean recursive) throws IOException {
+ return false;
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path f) throws FileNotFoundException,
+ IOException {
+ return null;
+ }
+
+ @Override
+ public void setWorkingDirectory(Path new_dir) {
+ }
+
+ @Override
+ public Path getWorkingDirectory() {
+ return null;
+ }
+
+ @Override
+ public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+ return false;
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path f) throws IOException {
+ return null;
+ }
+
+ @Override
+ public <T extends TokenIdentifier> void setDelegationToken(Token<T> token) {
+ return;
+ }
+ }
+
+ private DelegationTokenRenewer renewer;
+
+ @Before
+ public void setup() {
+ DelegationTokenRenewer.renewCycle = RENEW_CYCLE;
+ renewer = DelegationTokenRenewer.getInstance();
+ }
+
+ @Test
+ public void testAddRenewAction() throws IOException, InterruptedException {
+ TestFileSystem tfs = new TestFileSystem();
+ renewer.addRenewAction(tfs);
+
+ for (int i = 0; i < 10; i++) {
+ Thread.sleep(RENEW_CYCLE);
+ if (tfs.testToken.renewCount > 0) {
+ return;
+ }
+ }
+
+ assertTrue("Token not renewed even after 10 seconds",
+ (tfs.testToken.renewCount > 0));
+ }
+
+ @Test
+ public void testRemoveRenewAction() throws IOException, InterruptedException {
+ TestFileSystem tfs = new TestFileSystem();
+ renewer.addRenewAction(tfs);
+
+ for (int i = 0; i < 10; i++) {
+ Thread.sleep(RENEW_CYCLE);
+ if (tfs.testToken.renewCount > 0) {
+ renewer.removeRenewAction(tfs);
+ break;
+ }
+ }
+
+ assertTrue("Token not renewed even once",
+ (tfs.testToken.renewCount > 0));
+ assertTrue("Token not removed",
+ (tfs.testToken.renewCount < MAX_RENEWALS));
+ }
+}
Propchange: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestDelegationTokenRenewer.java
------------------------------------------------------------------------------
svn:eol-style = native