You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2019/02/20 22:22:04 UTC

[hadoop] 38/41: HDFS-13358. RBF: Support for Delegation Token (RPC). Contributed by CR Hota.

This is an automated email from the ASF dual-hosted git repository.

inigoiri pushed a commit to branch HDFS-13891
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 5f5ba94c80270c97e762da2cecf9e150cf7e4527
Author: Brahma Reddy Battula <br...@apache.org>
AuthorDate: Thu Feb 14 08:16:45 2019 +0530

    HDFS-13358. RBF: Support for Delegation Token (RPC). Contributed by CR Hota.
---
 .../server/federation/router/RBFConfigKeys.java    |   9 +
 .../federation/router/RouterClientProtocol.java    |  16 +-
 .../server/federation/router/RouterRpcServer.java  |  21 +-
 .../router/security/RouterSecurityManager.java     | 239 +++++++++++++++++++++
 .../federation/router/security/package-info.java   |  28 +++
 .../token/ZKDelegationTokenSecretManagerImpl.java  |  56 +++++
 .../router/security/token/package-info.java        |  29 +++
 .../src/main/resources/hdfs-rbf-default.xml        |  11 +-
 .../fs/contract/router/SecurityConfUtil.java       |   4 +
 .../TestRouterHDFSContractDelegationToken.java     | 101 +++++++++
 .../security/MockDelegationTokenSecretManager.java |  52 +++++
 .../security/TestRouterSecurityManager.java        |  93 ++++++++
 12 files changed, 652 insertions(+), 7 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
index 5e907c8..657b6cf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
 import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
 import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreSerializerPBImpl;
 import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreZooKeeperImpl;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
+import org.apache.hadoop.hdfs.server.federation.router.security.token.ZKDelegationTokenSecretManagerImpl;
 
 import java.util.concurrent.TimeUnit;
 
@@ -294,4 +296,11 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
 
   public static final String DFS_ROUTER_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY =
       FEDERATION_ROUTER_PREFIX + "kerberos.internal.spnego.principal";
+
+  // HDFS Router secret manager for delegation token
+  public static final String DFS_ROUTER_DELEGATION_TOKEN_DRIVER_CLASS =
+      FEDERATION_ROUTER_PREFIX + "secret.manager.class";
+  public static final Class<? extends AbstractDelegationTokenSecretManager>
+      DFS_ROUTER_DELEGATION_TOKEN_DRIVER_CLASS_DEFAULT =
+      ZKDelegationTokenSecretManagerImpl.class;
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
index f20b4b6..5383a7d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
@@ -77,6 +77,7 @@ import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo
 import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.federation.router.security.RouterSecurityManager;
 import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
@@ -124,6 +125,8 @@ public class RouterClientProtocol implements ClientProtocol {
   private final ErasureCoding erasureCoding;
   /** StoragePolicy calls. **/
   private final RouterStoragePolicy storagePolicy;
+  /** Router security manager to handle token operations. */
+  private RouterSecurityManager securityManager = null;
 
   RouterClientProtocol(Configuration conf, RouterRpcServer rpcServer) {
     this.rpcServer = rpcServer;
@@ -142,13 +145,14 @@ public class RouterClientProtocol implements ClientProtocol {
         DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
     this.erasureCoding = new ErasureCoding(rpcServer);
     this.storagePolicy = new RouterStoragePolicy(rpcServer);
+    this.securityManager = rpcServer.getRouterSecurityManager();
   }
 
   @Override
   public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
       throws IOException {
-    rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false);
-    return null;
+    rpcServer.checkOperation(NameNode.OperationCategory.WRITE, true);
+    return this.securityManager.getDelegationToken(renewer);
   }
 
   /**
@@ -167,14 +171,16 @@ public class RouterClientProtocol implements ClientProtocol {
   @Override
   public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
       throws IOException {
-    rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false);
-    return 0;
+    rpcServer.checkOperation(NameNode.OperationCategory.WRITE, true);
+    return this.securityManager.renewDelegationToken(token);
   }
 
   @Override
   public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
       throws IOException {
-    rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false);
+    rpcServer.checkOperation(NameNode.OperationCategory.WRITE, true);
+    this.securityManager.cancelDelegationToken(token);
+    return;
   }
 
   @Override
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
index be6a9b0..a312d4b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
@@ -114,6 +114,7 @@ import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
 import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
 import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.apache.hadoop.hdfs.server.federation.router.security.RouterSecurityManager;
 import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
 import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
 import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
@@ -197,6 +198,8 @@ public class RouterRpcServer extends AbstractService
   private final RouterNamenodeProtocol nnProto;
   /** ClientProtocol calls. */
   private final RouterClientProtocol clientProto;
+  /** Router security manager to handle token operations. */
+  private RouterSecurityManager securityManager = null;
 
   /**
    * Construct a router RPC server.
@@ -256,6 +259,9 @@ public class RouterRpcServer extends AbstractService
     LOG.info("RPC server binding to {} with {} handlers for Router {}",
         confRpcAddress, handlerCount, this.router.getRouterId());
 
+    // Create security manager
+    this.securityManager = new RouterSecurityManager(this.conf);
+
     this.rpcServer = new RPC.Builder(this.conf)
         .setProtocol(ClientNamenodeProtocolPB.class)
         .setInstance(clientNNPbService)
@@ -265,6 +271,7 @@ public class RouterRpcServer extends AbstractService
         .setnumReaders(readerCount)
         .setQueueSizePerHandler(handlerQueueSize)
         .setVerbose(false)
+        .setSecretManager(this.securityManager.getSecretManager())
         .build();
 
     // Add all the RPC protocols that the Router implements
@@ -344,10 +351,22 @@ public class RouterRpcServer extends AbstractService
     if (rpcMonitor != null) {
       this.rpcMonitor.close();
     }
+    if (securityManager != null) {
+      this.securityManager.stop();
+    }
     super.serviceStop();
   }
 
   /**
+   * Get the RPC security manager.
+   *
+   * @return RPC security manager.
+   */
+  public RouterSecurityManager getRouterSecurityManager() {
+    return this.securityManager;
+  }
+
+  /**
    * Get the RPC client to the Namenode.
    *
    * @return RPC clients to the Namenodes.
@@ -1457,7 +1476,7 @@ public class RouterRpcServer extends AbstractService
    * @return Remote user group information.
    * @throws IOException If we cannot get the user information.
    */
-  static UserGroupInformation getRemoteUser() throws IOException {
+  public static UserGroupInformation getRemoteUser() throws IOException {
     UserGroupInformation ugi = Server.getRemoteUser();
     return (ugi != null) ? ugi : UserGroupInformation.getCurrentUser();
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/RouterSecurityManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/RouterSecurityManager.java
new file mode 100644
index 0000000..0f0089a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/RouterSecurityManager.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.router.security;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+
+/**
+ * Manager to hold underlying delegation token secret manager implementations.
+ */
+public class RouterSecurityManager {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RouterSecurityManager.class);
+
+  private AbstractDelegationTokenSecretManager<DelegationTokenIdentifier>
+      dtSecretManager = null;
+
+  public RouterSecurityManager(Configuration conf) {
+    this.dtSecretManager = newSecretManager(conf);
+  }
+
+  @VisibleForTesting
+  public RouterSecurityManager(AbstractDelegationTokenSecretManager
+      <DelegationTokenIdentifier> dtSecretManager) {
+    this.dtSecretManager = dtSecretManager;
+  }
+
+  /**
+   * Creates an instance of a SecretManager from the configuration.
+   *
+   * @param conf Configuration that defines the secret manager class.
+   * @return New secret manager.
+   */
+  public static AbstractDelegationTokenSecretManager<DelegationTokenIdentifier>
+      newSecretManager(Configuration conf) {
+    Class<? extends AbstractDelegationTokenSecretManager> clazz =
+        conf.getClass(
+        RBFConfigKeys.DFS_ROUTER_DELEGATION_TOKEN_DRIVER_CLASS,
+        RBFConfigKeys.DFS_ROUTER_DELEGATION_TOKEN_DRIVER_CLASS_DEFAULT,
+        AbstractDelegationTokenSecretManager.class);
+    AbstractDelegationTokenSecretManager secretManager;
+    try {
+      Constructor constructor = clazz.getConstructor(Configuration.class);
+      secretManager = (AbstractDelegationTokenSecretManager)
+          constructor.newInstance(conf);
+      LOG.info("Delegation token secret manager object instantiated");
+    } catch (ReflectiveOperationException e) {
+      LOG.error("Could not instantiate: {}", clazz.getSimpleName(), e);
+      return null;
+    } catch (RuntimeException e) {
+      LOG.error("RuntimeException to instantiate: {}",
+          clazz.getSimpleName(), e);
+      return null;
+    }
+    return secretManager;
+  }
+
+  public AbstractDelegationTokenSecretManager<DelegationTokenIdentifier>
+      getSecretManager() {
+    return this.dtSecretManager;
+  }
+
+  public void stop() {
+    LOG.info("Stopping security manager");
+    if(this.dtSecretManager != null) {
+      this.dtSecretManager.stopThreads();
+    }
+  }
+
+  private static UserGroupInformation getRemoteUser() throws IOException {
+    return RouterRpcServer.getRemoteUser();
+  }
+  /**
+   * Returns authentication method used to establish the connection.
+   * @return AuthenticationMethod used to establish connection.
+   * @throws IOException
+   */
+  private UserGroupInformation.AuthenticationMethod
+      getConnectionAuthenticationMethod() throws IOException {
+    UserGroupInformation ugi = getRemoteUser();
+    UserGroupInformation.AuthenticationMethod authMethod
+        = ugi.getAuthenticationMethod();
+    if (authMethod == UserGroupInformation.AuthenticationMethod.PROXY) {
+      authMethod = ugi.getRealUser().getAuthenticationMethod();
+    }
+    return authMethod;
+  }
+
+  /**
+   *
+   * @return true if delegation token operation is allowed
+   */
+  private boolean isAllowedDelegationTokenOp() throws IOException {
+    AuthenticationMethod authMethod = getConnectionAuthenticationMethod();
+    if (UserGroupInformation.isSecurityEnabled()
+        && (authMethod != AuthenticationMethod.KERBEROS)
+        && (authMethod != AuthenticationMethod.KERBEROS_SSL)
+        && (authMethod != AuthenticationMethod.CERTIFICATE)) {
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * @param renewer Renewer information
+   * @return delegation token
+   * @throws IOException on error
+   */
+  public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
+      throws IOException {
+    LOG.debug("Generate delegation token with renewer " + renewer);
+    final String operationName = "getDelegationToken";
+    boolean success = false;
+    String tokenId = "";
+    Token<DelegationTokenIdentifier> token;
+    try {
+      if (!isAllowedDelegationTokenOp()) {
+        throw new IOException(
+            "Delegation Token can be issued only " +
+                "with kerberos or web authentication");
+      }
+      if (dtSecretManager == null || !dtSecretManager.isRunning()) {
+        LOG.warn("trying to get DT with no secret manager running");
+        return null;
+      }
+      UserGroupInformation ugi = getRemoteUser();
+      String user = ugi.getUserName();
+      Text owner = new Text(user);
+      Text realUser = null;
+      if (ugi.getRealUser() != null) {
+        realUser = new Text(ugi.getRealUser().getUserName());
+      }
+      DelegationTokenIdentifier dtId = new DelegationTokenIdentifier(owner,
+          renewer, realUser);
+      token = new Token<DelegationTokenIdentifier>(
+          dtId, dtSecretManager);
+      tokenId = dtId.toStringStable();
+      success = true;
+    } finally {
+      logAuditEvent(success, operationName, tokenId);
+    }
+    return token;
+  }
+
+  public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
+          throws SecretManager.InvalidToken, IOException {
+    LOG.debug("Renew delegation token");
+    final String operationName = "renewDelegationToken";
+    boolean success = false;
+    String tokenId = "";
+    long expiryTime;
+    try {
+      if (!isAllowedDelegationTokenOp()) {
+        throw new IOException(
+            "Delegation Token can be renewed only " +
+                "with kerberos or web authentication");
+      }
+      String renewer = getRemoteUser().getShortUserName();
+      expiryTime = dtSecretManager.renewToken(token, renewer);
+      final DelegationTokenIdentifier id = DFSUtil.decodeDelegationToken(token);
+      tokenId = id.toStringStable();
+      success = true;
+    } catch (AccessControlException ace) {
+      final DelegationTokenIdentifier id = DFSUtil.decodeDelegationToken(token);
+      tokenId = id.toStringStable();
+      throw ace;
+    } finally {
+      logAuditEvent(success, operationName, tokenId);
+    }
+    return expiryTime;
+  }
+
+  public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
+          throws IOException {
+    LOG.debug("Cancel delegation token");
+    final String operationName = "cancelDelegationToken";
+    boolean success = false;
+    String tokenId = "";
+    try {
+      String canceller = getRemoteUser().getUserName();
+      LOG.info("Cancel request by " + canceller);
+      DelegationTokenIdentifier id =
+          dtSecretManager.cancelToken(token, canceller);
+      tokenId = id.toStringStable();
+      success = true;
+    } catch (AccessControlException ace) {
+      final DelegationTokenIdentifier id = DFSUtil.decodeDelegationToken(token);
+      tokenId = id.toStringStable();
+      throw ace;
+    } finally {
+      logAuditEvent(success, operationName, tokenId);
+    }
+  }
+
+  /**
+   * Log status of delegation token related operation.
+   * Extend in future to use audit logger instead of local logging.
+   */
+  void logAuditEvent(boolean succeeded, String cmd, String tokenId)
+      throws IOException {
+    LOG.debug(
+        "Operation:" + cmd +
+        " Status:" + succeeded +
+        " TokenId:" + tokenId);
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/package-info.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/package-info.java
new file mode 100644
index 0000000..9dd12ec
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/package-info.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Includes router security manager and token store implementations.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+
+package org.apache.hadoop.hdfs.server.federation.router.security;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/ZKDelegationTokenSecretManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/ZKDelegationTokenSecretManagerImpl.java
new file mode 100644
index 0000000..3da63f8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/ZKDelegationTokenSecretManagerImpl.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.router.security.token;
+
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
+import org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+
+/**
+ * Zookeeper based router delegation token store implementation.
+ */
+public class ZKDelegationTokenSecretManagerImpl extends
+    ZKDelegationTokenSecretManager<AbstractDelegationTokenIdentifier> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ZKDelegationTokenSecretManagerImpl.class);
+
+  private Configuration conf = null;
+
+  public ZKDelegationTokenSecretManagerImpl(Configuration conf) {
+    super(conf);
+    this.conf = conf;
+    try {
+      super.startThreads();
+    } catch (IOException e) {
+      LOG.error("Error starting threads for zkDelegationTokens ");
+    }
+    LOG.info("Zookeeper delegation token secret manager instantiated");
+  }
+
+  @Override
+  public DelegationTokenIdentifier createIdentifier() {
+    return new DelegationTokenIdentifier();
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/package-info.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/package-info.java
new file mode 100644
index 0000000..a51e455
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/package-info.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Includes implementations of token secret managers.
+ * Implementations should extend {@link AbstractDelegationTokenSecretManager}.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+
+package org.apache.hadoop.hdfs.server.federation.router.security.token;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
index afe3ad1..1034c87 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
@@ -584,4 +584,13 @@
     </description>
   </property>
 
-</configuration>
\ No newline at end of file
+  <property>
+    <name>dfs.federation.router.secret.manager.class</name>
+    <value>org.apache.hadoop.hdfs.server.federation.router.security.token.ZKDelegationTokenSecretManagerImpl</value>
+    <description>
+      Class to implement state store to delegation tokens.
+      Default implementation uses zookeeper as the backend to store delegation tokens.
+    </description>
+  </property>
+
+</configuration>
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/fs/contract/router/SecurityConfUtil.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/fs/contract/router/SecurityConfUtil.java
index 100313e..d6ee3c7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/fs/contract/router/SecurityConfUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/fs/contract/router/SecurityConfUtil.java
@@ -31,6 +31,7 @@ import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_KERBEROS_PRINCIPAL_KEY;
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_KEYTAB_FILE_KEY;
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_DELEGATION_TOKEN_DRIVER_CLASS;
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
@@ -43,6 +44,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
 import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
 import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileImpl;
+import org.apache.hadoop.hdfs.server.federation.security.MockDelegationTokenSecretManager;
 import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.minikdc.MiniKdc;
 import org.apache.hadoop.security.SecurityUtil;
@@ -144,6 +146,8 @@ public final class SecurityConfUtil {
 
     // We need to specify the host to prevent 0.0.0.0 as the host address
     conf.set(DFS_ROUTER_RPC_BIND_HOST_KEY, "localhost");
+    conf.set(DFS_ROUTER_DELEGATION_TOKEN_DRIVER_CLASS,
+        MockDelegationTokenSecretManager.class.getName());
 
     return conf;
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/fs/contract/router/TestRouterHDFSContractDelegationToken.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/fs/contract/router/TestRouterHDFSContractDelegationToken.java
new file mode 100644
index 0000000..e4c03e4
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/fs/contract/router/TestRouterHDFSContractDelegationToken.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract.router;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.contract.AbstractFSContractTestBase;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.Token;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import static org.apache.hadoop.fs.contract.router.SecurityConfUtil.initSecurity;
+
+/**
+ * Test to verify router contracts for delegation token operations.
+ */
+public class TestRouterHDFSContractDelegationToken
+    extends AbstractFSContractTestBase {
+
+  @BeforeClass
+  public static void createCluster() throws Exception {
+    RouterHDFSContract.createCluster(initSecurity());
+  }
+
+  @AfterClass
+  public static void teardownCluster() throws IOException {
+    RouterHDFSContract.destroyCluster();
+  }
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new RouterHDFSContract(conf);
+  }
+
+  @Rule
+  public ExpectedException exceptionRule = ExpectedException.none();
+
+  @Test
+  public void testRouterDelegationToken() throws Exception {
+    // Generate delegation token
+    Token<DelegationTokenIdentifier> token =
+        (Token<DelegationTokenIdentifier>) getFileSystem()
+        .getDelegationToken("router");
+    assertNotNull(token);
+    // Verify properties of the token
+    assertEquals("HDFS_DELEGATION_TOKEN", token.getKind().toString());
+    DelegationTokenIdentifier identifier = token.decodeIdentifier();
+    assertNotNull(identifier);
+    String owner = identifier.getOwner().toString();
+    // Windows will not reverse name lookup "127.0.0.1" to "localhost".
+    String host = Path.WINDOWS ? "127.0.0.1" : "localhost";
+    String expectedOwner = "router/"+ host + "@EXAMPLE.COM";
+    assertEquals(expectedOwner, owner);
+    assertEquals("router", identifier.getRenewer().toString());
+    int masterKeyId = identifier.getMasterKeyId();
+    assertTrue(masterKeyId > 0);
+    int sequenceNumber = identifier.getSequenceNumber();
+    assertTrue(sequenceNumber > 0);
+    long existingMaxTime = token.decodeIdentifier().getMaxDate();
+    assertTrue(identifier.getMaxDate() >= identifier.getIssueDate());
+
+    // Renew delegation token
+    token.renew(initSecurity());
+    assertNotNull(token);
+    assertTrue(token.decodeIdentifier().getMaxDate() >= existingMaxTime);
+    // Renewal should retain old master key id and sequence number
+    identifier = token.decodeIdentifier();
+    assertEquals(identifier.getMasterKeyId(), masterKeyId);
+    assertEquals(identifier.getSequenceNumber(), sequenceNumber);
+
+    // Cancel delegation token
+    token.cancel(initSecurity());
+
+    // Renew a cancelled token
+    exceptionRule.expect(SecretManager.InvalidToken.class);
+    token.renew(initSecurity());
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/security/MockDelegationTokenSecretManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/security/MockDelegationTokenSecretManager.java
new file mode 100644
index 0000000..8f89f0a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/security/MockDelegationTokenSecretManager.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.security;
+
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
+import org.apache.hadoop.conf.Configuration;
+import java.io.IOException;
+
+/**
+ * Mock functionality of AbstractDelegationTokenSecretManager.
+ * for testing
+ */
+public class MockDelegationTokenSecretManager
+    extends AbstractDelegationTokenSecretManager<DelegationTokenIdentifier> {
+
+  public MockDelegationTokenSecretManager(
+      long delegationKeyUpdateInterval,
+      long delegationTokenMaxLifetime,
+      long delegationTokenRenewInterval,
+      long delegationTokenRemoverScanInterval) {
+    super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
+        delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
+  }
+
+  public MockDelegationTokenSecretManager(Configuration conf)
+      throws IOException {
+    super(100000, 100000, 100000, 100000);
+    this.startThreads();
+  }
+
+  @Override
+  public DelegationTokenIdentifier createIdentifier() {
+    return new DelegationTokenIdentifier();
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/security/TestRouterSecurityManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/security/TestRouterSecurityManager.java
new file mode 100644
index 0000000..fe6e0ee
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/security/TestRouterSecurityManager.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.security;
+
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.server.federation.router.security.RouterSecurityManager;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
+import org.junit.rules.ExpectedException;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Test functionality of {@link RouterSecurityManager}, which manages
+ * delegation tokens for router.
+ */
+public class TestRouterSecurityManager {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestRouterSecurityManager.class);
+
+  private static RouterSecurityManager securityManager = null;
+
+  @BeforeClass
+  public static void createMockSecretManager() throws IOException {
+    AbstractDelegationTokenSecretManager<DelegationTokenIdentifier>
+        mockDelegationTokenSecretManager =
+        new MockDelegationTokenSecretManager(100, 100, 100, 100);
+    mockDelegationTokenSecretManager.startThreads();
+    securityManager =
+        new RouterSecurityManager(mockDelegationTokenSecretManager);
+  }
+
+  @Rule
+  public ExpectedException exceptionRule = ExpectedException.none();
+
+  @Test
+  public void testDelegationTokens() throws IOException {
+    String[] groupsForTesting = new String[1];
+    groupsForTesting[0] = "router_group";
+    UserGroupInformation.setLoginUser(UserGroupInformation
+        .createUserForTesting("router", groupsForTesting));
+
+    // Get a delegation token
+    Token<DelegationTokenIdentifier> token =
+        securityManager.getDelegationToken(new Text("some_renewer"));
+    assertNotNull(token);
+
+    // Renew the delegation token
+    UserGroupInformation.setLoginUser(UserGroupInformation
+        .createUserForTesting("some_renewer", groupsForTesting));
+    long updatedExpirationTime = securityManager.renewDelegationToken(token);
+    assertTrue(updatedExpirationTime >= token.decodeIdentifier().getMaxDate());
+
+    // Cancel the delegation token
+    securityManager.cancelDelegationToken(token);
+
+    String exceptionCause = "Renewal request for unknown token";
+    exceptionRule.expect(SecretManager.InvalidToken.class);
+    exceptionRule.expectMessage(exceptionCause);
+
+    // This throws an exception as token has been cancelled.
+    securityManager.renewDelegationToken(token);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org