You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2020/01/22 01:45:45 UTC

[hadoop] branch trunk updated: Revert "YARN-9768. RM Renew Delegation token thread should timeout and retry. Contributed by Manikandan R."

This is an automated email from the ASF dual-hosted git repository.

inigoiri pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b4870bc  Revert "YARN-9768. RM Renew Delegation token thread should timeout and retry. Contributed by Manikandan R."
b4870bc is described below

commit b4870bce3a8336dbd638d26b8662037c4d4cdae9
Author: Inigo Goiri <in...@apache.org>
AuthorDate: Tue Jan 21 17:45:17 2020 -0800

    Revert "YARN-9768. RM Renew Delegation token thread should timeout and retry. Contributed by Manikandan R."
    
    This reverts commit 0696828a090bc06446f75b29c967697f1d6d845b.
---
 .../apache/hadoop/yarn/conf/YarnConfiguration.java |  14 --
 .../src/main/resources/yarn-default.xml            |  24 ---
 .../security/DelegationTokenRenewer.java           | 144 +----------------
 .../security/TestDelegationTokenRenewer.java       | 177 +--------------------
 4 files changed, 4 insertions(+), 355 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index be7cc89..06c3fa4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -26,7 +26,6 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -730,19 +729,6 @@ public class YarnConfiguration extends Configuration {
   public static final int DEFAULT_RM_DELEGATION_TOKEN_MAX_CONF_SIZE_BYTES =
       12800;
 
-  public static final String RM_DT_RENEWER_THREAD_TIMEOUT =
-      RM_PREFIX + "delegation-token-renewer.thread-timeout";
-  public static final long DEFAULT_RM_DT_RENEWER_THREAD_TIMEOUT =
-      TimeUnit.SECONDS.toMillis(60); // 60 Seconds
-  public static final String RM_DT_RENEWER_THREAD_RETRY_INTERVAL =
-      RM_PREFIX + "delegation-token-renewer.thread-retry-interval";
-  public static final long DEFAULT_RM_DT_RENEWER_THREAD_RETRY_INTERVAL =
-      TimeUnit.SECONDS.toMillis(60); // 60 Seconds
-  public static final String RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS =
-      RM_PREFIX + "delegation-token-renewer.thread-retry-max-attempts";
-  public static final int DEFAULT_RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS =
-      10;
-
   public static final String RECOVERY_ENABLED = RM_PREFIX + "recovery.enabled";
   public static final boolean DEFAULT_RM_RECOVERY_ENABLED = false;
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 5277be4..c96a7e4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -959,30 +959,6 @@
 
   <property>
     <description>
-    RM DelegationTokenRenewer thread timeout
-    </description>
-    <name>yarn.resourcemanager.delegation-token-renewer.thread-timeout</name>
-    <value>60s</value>
-  </property>
-
-  <property>
-    <description>
-    Default maximum number of retries for each RM DelegationTokenRenewer thread
-    </description>
-    <name>yarn.resourcemanager.delegation-token-renewer.thread-retry-max-attempts</name>
-    <value>10</value>
-  </property>
-
-  <property>
-    <description>
-    Time interval between each RM DelegationTokenRenewer thread retry attempt
-    </description>
-    <name>yarn.resourcemanager.delegation-token-renewer.thread-retry-interval</name>
-    <value>60s</value>
-  </property>
-
-  <property>
-    <description>
     Thread pool size for RMApplicationHistoryWriter.
     </description>
     <name>yarn.resourcemanager.history-writer.multi-threaded-dispatcher.pool-size</name>
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
index fd8935d..d3ed503 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
@@ -26,7 +26,6 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -37,12 +36,10 @@ import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -118,12 +115,6 @@ public class DelegationTokenRenewer extends AbstractService {
   private boolean tokenKeepAliveEnabled;
   private boolean hasProxyUserPrivileges;
   private long credentialsValidTimeRemaining;
-  private long tokenRenewerThreadTimeout;
-  private long tokenRenewerThreadRetryInterval;
-  private int tokenRenewerThreadRetryMaxAttempts;
-  private final Map<DelegationTokenRenewerEvent, Future<?>> futures =
-      new HashMap<>();
-  private boolean delegationTokenRenewerPoolTrackerFlag = true;
 
   // this config is supposedly not used by end-users.
   public static final String RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING =
@@ -149,17 +140,6 @@ public class DelegationTokenRenewer extends AbstractService {
     this.credentialsValidTimeRemaining =
         conf.getLong(RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING,
           DEFAULT_RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING);
-    tokenRenewerThreadTimeout =
-        conf.getTimeDuration(YarnConfiguration.RM_DT_RENEWER_THREAD_TIMEOUT,
-            YarnConfiguration.DEFAULT_RM_DT_RENEWER_THREAD_TIMEOUT,
-            TimeUnit.MILLISECONDS);
-    tokenRenewerThreadRetryInterval = conf.getTimeDuration(
-        YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_INTERVAL,
-        YarnConfiguration.DEFAULT_RM_DT_RENEWER_THREAD_RETRY_INTERVAL,
-        TimeUnit.MILLISECONDS);
-    tokenRenewerThreadRetryMaxAttempts =
-        conf.getInt(YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS,
-            YarnConfiguration.DEFAULT_RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS);
     setLocalSecretManagerAndServiceAddr();
     renewerService = createNewThreadPoolService(conf);
     pendingEventQueue = new LinkedBlockingQueue<DelegationTokenRenewerEvent>();
@@ -204,11 +184,6 @@ public class DelegationTokenRenewer extends AbstractService {
     serviceStateLock.writeLock().lock();
     isServiceStarted = true;
     serviceStateLock.writeLock().unlock();
-
-    if (delegationTokenRenewerPoolTrackerFlag) {
-      renewerService.submit(new DelegationTokenRenewerPoolTracker());
-    }
-
     while(!pendingEventQueue.isEmpty()) {
       processDelegationTokenRenewerEvent(pendingEventQueue.take());
     }
@@ -220,9 +195,7 @@ public class DelegationTokenRenewer extends AbstractService {
     serviceStateLock.readLock().lock();
     try {
       if (isServiceStarted) {
-        Future<?> future =
-            renewerService.submit(new DelegationTokenRenewerRunnable(evt));
-        futures.put(evt, future);
+        renewerService.execute(new DelegationTokenRenewerRunnable(evt));
       } else {
         pendingEventQueue.add(evt);
       }
@@ -503,8 +476,7 @@ public class DelegationTokenRenewer extends AbstractService {
               for (Iterator<Map.Entry<String, String>> itor =
                    tokenConf.iterator(); itor.hasNext(); ) {
                 Map.Entry<String, String> entry = itor.next();
-                LOG.debug("Token conf key is {} and value is {}",
-                    entry.getKey(), entry.getValue());
+                LOG.info(entry.getKey() + " ===> " + entry.getValue());
               }
             }
           }  else {
@@ -922,100 +894,7 @@ public class DelegationTokenRenewer extends AbstractService {
   public void setRMContext(RMContext rmContext) {
     this.rmContext = rmContext;
   }
-
-  @VisibleForTesting
-  public void setDelegationTokenRenewerPoolTracker(boolean flag) {
-    delegationTokenRenewerPoolTrackerFlag = flag;
-  }
-
-  /**
-   * Create a timer task to retry the token renewer event which would be
-   * scheduled at defined intervals based on the configuration.
-   *
-   * @param evt
-   * @return Timer Task
-   */
-  private TimerTask getTimerTask(AbstractDelegationTokenRenewerAppEvent evt) {
-    return new TimerTask() {
-      @Override
-      public void run() {
-        LOG.info("Retrying token renewer thread for appid = {} and "
-            + "attempt is {}", evt.getApplicationId(),
-            evt.getAttempt());
-        evt.incrAttempt();
-
-        Collection<Token<?>> tokens =
-            evt.getCredentials().getAllTokens();
-        for (Token<?> token : tokens) {
-          DelegationTokenToRenew dttr = allTokens.get(token);
-          if (dttr != null) {
-            removeFailedDelegationToken(dttr);
-          }
-        }
-
-        DelegationTokenRenewerAppRecoverEvent event =
-            new DelegationTokenRenewerAppRecoverEvent(
-                evt.getApplicationId(), evt.getCredentials(),
-                evt.shouldCancelAtEnd(), evt.getUser(), evt.getTokenConf());
-        event.setAttempt(evt.getAttempt());
-        processDelegationTokenRenewerEvent(event);
-      }
-    };
-  }
-
-  /**
-   * Runnable class to set timeout for futures of all threads running in
-   * renewerService thread pool executor asynchronously.
-   *
-   * In case of timeout exception, retries would be attempted with defined
-   * intervals till no. of retry attempt reaches max attempt.
-   */
-  private final class DelegationTokenRenewerPoolTracker
-      implements Runnable {
-
-    DelegationTokenRenewerPoolTracker() {
-    }
-
-    /**
-     * Keep traversing <Future> of renewer pool threads and wait for specific
-     * timeout. In case of timeout exception, retry the event till no. of
-     * attempts reaches max attempts with specific interval.
-     */
-    @Override
-    public void run() {
-      while (true) {
-        for (Map.Entry<DelegationTokenRenewerEvent, Future<?>> entry : futures
-            .entrySet()) {
-          DelegationTokenRenewerEvent evt = entry.getKey();
-          Future<?> future = entry.getValue();
-          try {
-            future.get(tokenRenewerThreadTimeout, TimeUnit.MILLISECONDS);
-          } catch (TimeoutException e) {
-
-            // Cancel thread and retry the same event in case of timeout
-            if (future != null && !future.isDone() && !future.isCancelled()) {
-              future.cancel(true);
-              futures.remove(evt);
-              if (evt.getAttempt() < tokenRenewerThreadRetryMaxAttempts) {
-                renewalTimer.schedule(
-                    getTimerTask((AbstractDelegationTokenRenewerAppEvent) evt),
-                    tokenRenewerThreadRetryInterval);
-              } else {
-                LOG.info(
-                    "Exhausted max retry attempts {} in token renewer "
-                        + "thread for {}",
-                    tokenRenewerThreadRetryMaxAttempts, evt.getApplicationId());
-              }
-            }
-          } catch (Exception e) {
-            LOG.info("Problem in submitting renew tasks in token renewer "
-                + "thread.", e);
-          }
-        }
-      }
-    }
-  }
-
+  
   /*
    * This will run as a separate thread and will process individual events. It
    * is done in this way to make sure that the token renewal as a part of
@@ -1137,10 +1016,6 @@ public class DelegationTokenRenewer extends AbstractService {
     public String getUser() {
       return user;
     }
-
-    private Configuration getTokenConf() {
-      return tokenConf;
-    }
   }
   
   enum DelegationTokenRenewerEventType {
@@ -1153,7 +1028,6 @@ public class DelegationTokenRenewer extends AbstractService {
       AbstractEvent<DelegationTokenRenewerEventType> {
 
     private ApplicationId appId;
-    private int attempt = 1;
 
     public DelegationTokenRenewerEvent(ApplicationId appId,
         DelegationTokenRenewerEventType type) {
@@ -1164,18 +1038,6 @@ public class DelegationTokenRenewer extends AbstractService {
     public ApplicationId getApplicationId() {
       return appId;
     }
-
-    public void incrAttempt() {
-      attempt++;
-    }
-
-    public int getAttempt() {
-      return attempt;
-    }
-
-    public void setAttempt(int attempt) {
-      this.attempt = attempt;
-    }
   }
 
   // only for testing
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
index 0205460..5f6d440 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.security;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doAnswer;
@@ -43,7 +42,6 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -95,7 +93,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@@ -233,7 +230,6 @@ public class TestDelegationTokenRenewer {
     InetSocketAddress sockAddr =
         InetSocketAddress.createUnresolved("localhost", 1234);
     when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
-    delegationTokenRenewer.setDelegationTokenRenewerPoolTracker(false);
     delegationTokenRenewer.setRMContext(mockContext);
     delegationTokenRenewer.init(conf);
     delegationTokenRenewer.start();
@@ -636,7 +632,6 @@ public class TestDelegationTokenRenewer {
     InetSocketAddress sockAddr =
         InetSocketAddress.createUnresolved("localhost", 1234);
     when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
-    localDtr.setDelegationTokenRenewerPoolTracker(false);
     localDtr.setRMContext(mockContext);
     localDtr.init(lconf);
     localDtr.start();
@@ -717,7 +712,6 @@ public class TestDelegationTokenRenewer {
     InetSocketAddress sockAddr =
         InetSocketAddress.createUnresolved("localhost", 1234);
     when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
-    localDtr.setDelegationTokenRenewerPoolTracker(false);
     localDtr.setRMContext(mockContext);
     localDtr.init(lconf);
     localDtr.start();
@@ -1618,173 +1612,4 @@ public class TestDelegationTokenRenewer {
     // Ensure incrTokenSequenceNo has been called for token renewal as well.
     Mockito.verify(mockContext, Mockito.times(2)).incrTokenSequenceNo();
   }
-
-  /**
-   * Test case to ensure token renewer threads are timed out by inducing
-   * artificial delay.
-   *
-   * Because of time out, retries would be attempted till it reaches max retry
-   * attempt and finally asserted using used threads count.
-   *
-   * @throws Exception
-   */
-  @Test(timeout = 30000)
-  public void testTokenThreadTimeout() throws Exception {
-    Configuration yarnConf = new YarnConfiguration();
-    yarnConf.setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED,
-        true);
-    yarnConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
-        "kerberos");
-    yarnConf.setClass(YarnConfiguration.RM_STORE, MemoryRMStateStore.class,
-        RMStateStore.class);
-    yarnConf.setTimeDuration(YarnConfiguration.RM_DT_RENEWER_THREAD_TIMEOUT, 5,
-        TimeUnit.SECONDS);
-    yarnConf.setTimeDuration(
-        YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_INTERVAL, 5,
-        TimeUnit.SECONDS);
-    yarnConf.setInt(YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS,
-        3);
-    UserGroupInformation.setConfiguration(yarnConf);
-
-    Text userText = new Text("user1");
-    DelegationTokenIdentifier dtId = new DelegationTokenIdentifier(userText,
-        new Text("renewer1"), userText);
-    final Token<DelegationTokenIdentifier> originalToken =
-        new Token<>(dtId.getBytes(), "password1".getBytes(), dtId.getKind(),
-            new Text("service1"));
-
-    Credentials credentials = new Credentials();
-    credentials.addToken(userText, originalToken);
-
-    AtomicBoolean renewDelay = new AtomicBoolean(false);
-
-    // -1 is because of thread allocated to pool tracker runnable tasks
-    AtomicInteger threadCounter = new AtomicInteger(-1);
-    renewDelay.set(true);
-    DelegationTokenRenewer renewer = createNewDelegationTokenRenewerForTimeout(
-        yarnConf, threadCounter, renewDelay);
-
-    MockRM rm = new TestSecurityMockRM(yarnConf) {
-      @Override
-      protected DelegationTokenRenewer createDelegationTokenRenewer() {
-        return renewer;
-      }
-    };
-
-    rm.start();
-    rm.submitApp(200, "name", "user",
-        new HashMap<ApplicationAccessType, String>(), false, "default", 1,
-        credentials);
-
-    int attempts = yarnConf.getInt(
-        YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS,
-        YarnConfiguration.DEFAULT_RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS);
-
-    GenericTestUtils.waitFor(() -> threadCounter.get() >= attempts, 2000,
-        30000);
-
-    // Ensure no. of threads has been used in renewer service thread pool is
-    // higher than the configured max retry attempts
-    assertTrue(threadCounter.get() >= attempts);
-    rm.close();
-  }
-
-  /**
-   * Test case to ensure token renewer threads are running as usual and finally
-   * asserted only 1 thread has been used.
-   *
-   * @throws Exception
-   */
-  @Test(timeout = 30000)
-  public void testTokenThreadTimeoutWithoutDelay() throws Exception {
-    Configuration yarnConf = new YarnConfiguration();
-    yarnConf.setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED,
-        true);
-    yarnConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
-        "kerberos");
-    yarnConf.set(YarnConfiguration.RM_STORE,
-        MemoryRMStateStore.class.getName());
-    yarnConf.setTimeDuration(YarnConfiguration.RM_DT_RENEWER_THREAD_TIMEOUT, 5,
-        TimeUnit.SECONDS);
-    yarnConf.setTimeDuration(
-        YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_INTERVAL, 5,
-        TimeUnit.SECONDS);
-    yarnConf.setInt(YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS,
-        3);
-    UserGroupInformation.setConfiguration(yarnConf);
-
-    Text userText = new Text("user1");
-    DelegationTokenIdentifier dtId = new DelegationTokenIdentifier(userText,
-        new Text("renewer1"), userText);
-    final Token<DelegationTokenIdentifier> originalToken =
-        new Token<>(dtId.getBytes(), "password1".getBytes(), dtId.getKind(),
-            new Text("service1"));
-
-    Credentials credentials = new Credentials();
-    credentials.addToken(userText, originalToken);
-
-    AtomicBoolean renewDelay = new AtomicBoolean(false);
-
-    // -1 is because of thread allocated to pool tracker runnable tasks
-    AtomicInteger threadCounter = new AtomicInteger(-1);
-    DelegationTokenRenewer renwer = createNewDelegationTokenRenewerForTimeout(
-        yarnConf, threadCounter, renewDelay);
-
-    MockRM rm = new TestSecurityMockRM(yarnConf) {
-      @Override
-      protected DelegationTokenRenewer createDelegationTokenRenewer() {
-        return renwer;
-      }
-    };
-
-    rm.start();
-    rm.submitApp(200, "name", "user",
-        new HashMap<ApplicationAccessType, String>(), false, "default", 1,
-        credentials);
-
-    GenericTestUtils.waitFor(() -> threadCounter.get() == 1, 2000, 40000);
-
-    // Ensure only one thread has been used in renewer service thread pool.
-    assertEquals(threadCounter.get(), 1);
-    rm.close();
-  }
-
-  private DelegationTokenRenewer createNewDelegationTokenRenewerForTimeout(
-      Configuration config, final AtomicInteger renewerCounter,
-      final AtomicBoolean renewDelay) {
-    DelegationTokenRenewer renew = new DelegationTokenRenewer() {
-      @Override
-      protected ThreadPoolExecutor createNewThreadPoolService(
-          Configuration configuration) {
-        ThreadPoolExecutor pool = new ThreadPoolExecutor(5, 5, 3L,
-            TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()) {
-          @Override
-          public Future<?> submit(Runnable r) {
-            renewerCounter.incrementAndGet();
-            return super.submit(r);
-          }
-        };
-        return pool;
-      }
-
-      @Override
-      protected void renewToken(final DelegationTokenToRenew dttr)
-          throws IOException {
-        try {
-          if (renewDelay.get()) {
-            // Delay for 4 times than the configured timeout
-            Thread.sleep(config.getTimeDuration(
-                YarnConfiguration.RM_DT_RENEWER_THREAD_TIMEOUT,
-                YarnConfiguration.DEFAULT_RM_DT_RENEWER_THREAD_TIMEOUT,
-                TimeUnit.MILLISECONDS) * 4);
-          }
-          super.renewToken(dttr);
-        } catch (InterruptedException e) {
-          LOG.info("Sleep Interrupted", e);
-        }
-      }
-    };
-    renew.setDelegationTokenRenewerPoolTracker(true);
-    return renew;
-  }
-}
\ No newline at end of file
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org