You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ec...@apache.org on 2015/08/11 22:00:22 UTC
hbase git commit: HBASE-14208 Remove yarn dependencies on -common and
-client
Repository: hbase
Updated Branches:
refs/heads/master 6da753553 -> 38b94709e
HBASE-14208 Remove yarn dependencies on -common and -client
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/38b94709
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/38b94709
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/38b94709
Branch: refs/heads/master
Commit: 38b94709ee3727832cb58446b4fa60cf5c37b9a6
Parents: 6da7535
Author: Elliott Clark <ec...@apache.org>
Authored: Tue Aug 11 09:10:29 2015 -0700
Committer: Elliott Clark <ec...@apache.org>
Committed: Tue Aug 11 13:00:13 2015 -0700
----------------------------------------------------------------------
hbase-client/pom.xml | 46 ---
.../hadoop/hbase/security/token/TokenUtil.java | 374 -------------------
hbase-common/pom.xml | 4 -
.../org/apache/hadoop/hbase/security/User.java | 78 +---
.../hadoop/hbase/security/token/TokenUtil.java | 374 +++++++++++++++++++
5 files changed, 375 insertions(+), 501 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/38b94709/hbase-client/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-client/pom.xml b/hbase-client/pom.xml
index dc0da77..425bd05 100644
--- a/hbase-client/pom.xml
+++ b/hbase-client/pom.xml
@@ -291,52 +291,6 @@
</exclusion>
</exclusions>
</dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-core</artifactId>
- <exclusions>
- <exclusion>
- <groupId>com.sun.jersey.jersey-test-framework</groupId>
- <artifactId>jersey-test-framework-grizzly2</artifactId>
- </exclusion>
- <exclusion>
- <groupId>javax.servlet</groupId>
- <artifactId>servlet-api</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.sun.jersey</groupId>
- <artifactId>jersey-server</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.sun.jersey</groupId>
- <artifactId>jersey-core</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.sun.jersey</groupId>
- <artifactId>jersey-json</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.sun.jersey.contribs</groupId>
- <artifactId>jersey-guice</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.google.inject</groupId>
- <artifactId>guice</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.google.inject.extensions</groupId>
- <artifactId>guice-servlet</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-jaxrs</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-xc</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
</dependencies>
</profile>
http://git-wip-us.apache.org/repos/asf/hbase/blob/38b94709/hbase-client/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java
deleted file mode 100644
index 9be33d7..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java
+++ /dev/null
@@ -1,374 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.security.token;
-
-import java.io.IOException;
-import java.lang.reflect.UndeclaredThrowableException;
-import java.security.PrivilegedExceptionAction;
-
-import com.google.protobuf.ServiceException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.security.UserProvider;
-import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.zookeeper.KeeperException;
-
-/**
- * Utility methods for obtaining authentication tokens.
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public class TokenUtil {
- // This class is referenced indirectly by User out in common; instances are created by reflection
- private static final Log LOG = LogFactory.getLog(TokenUtil.class);
-
- /**
- * Obtain and return an authentication token for the current user.
- * @param conf the configuration for connecting to the cluster
- * @return the authentication token instance
- * @deprecated Replaced by {@link #obtainToken(Connection)}
- */
- @Deprecated
- public static Token<AuthenticationTokenIdentifier> obtainToken(
- Configuration conf) throws IOException {
- try (Connection connection = ConnectionFactory.createConnection(conf)) {
- return obtainToken(connection);
- }
- }
-
- /**
- * Obtain and return an authentication token for the current user.
- * @param conn The HBase cluster connection
- * @return the authentication token instance
- */
- public static Token<AuthenticationTokenIdentifier> obtainToken(
- Connection conn) throws IOException {
- Table meta = null;
- try {
- meta = conn.getTable(TableName.META_TABLE_NAME);
- CoprocessorRpcChannel rpcChannel = meta.coprocessorService(HConstants.EMPTY_START_ROW);
- AuthenticationProtos.AuthenticationService.BlockingInterface service =
- AuthenticationProtos.AuthenticationService.newBlockingStub(rpcChannel);
- AuthenticationProtos.GetAuthenticationTokenResponse response = service.getAuthenticationToken(null,
- AuthenticationProtos.GetAuthenticationTokenRequest.getDefaultInstance());
-
- return ProtobufUtil.toToken(response.getToken());
- } catch (ServiceException se) {
- ProtobufUtil.toIOException(se);
- } finally {
- if (meta != null) {
- meta.close();
- }
- }
- // dummy return for ServiceException block
- return null;
- }
-
- /**
- * Obtain and return an authentication token for the current user.
- * @param conn The HBase cluster connection
- * @return the authentication token instance
- */
- public static Token<AuthenticationTokenIdentifier> obtainToken(
- final Connection conn, User user) throws IOException, InterruptedException {
- return user.runAs(new PrivilegedExceptionAction<Token<AuthenticationTokenIdentifier>>() {
- @Override
- public Token<AuthenticationTokenIdentifier> run() throws Exception {
- return obtainToken(conn);
- }
- });
- }
-
-
- private static Text getClusterId(Token<AuthenticationTokenIdentifier> token)
- throws IOException {
- return token.getService() != null
- ? token.getService() : new Text("default");
- }
-
- /**
- * Obtain an authentication token for the given user and add it to the
- * user's credentials.
- * @param conf The configuration for connecting to the cluster
- * @param user The user for whom to obtain the token
- * @throws IOException If making a remote call to the authentication service fails
- * @throws InterruptedException If executing as the given user is interrupted
- * @deprecated Replaced by {@link #obtainAndCacheToken(Connection,User)}
- */
- @Deprecated
- public static void obtainAndCacheToken(final Configuration conf,
- UserGroupInformation user)
- throws IOException, InterruptedException {
- Connection conn = ConnectionFactory.createConnection(conf);
- try {
- UserProvider userProvider = UserProvider.instantiate(conf);
- obtainAndCacheToken(conn, userProvider.create(user));
- } finally {
- conn.close();
- }
- }
-
- /**
- * Obtain an authentication token for the given user and add it to the
- * user's credentials.
- * @param conn The HBase cluster connection
- * @param user The user for whom to obtain the token
- * @throws IOException If making a remote call to the authentication service fails
- * @throws InterruptedException If executing as the given user is interrupted
- */
- public static void obtainAndCacheToken(final Connection conn,
- User user)
- throws IOException, InterruptedException {
- try {
- Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user);
-
- if (token == null) {
- throw new IOException("No token returned for user " + user.getName());
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Obtained token " + token.getKind().toString() + " for user " +
- user.getName());
- }
- user.addToken(token);
- } catch (IOException ioe) {
- throw ioe;
- } catch (InterruptedException ie) {
- throw ie;
- } catch (RuntimeException re) {
- throw re;
- } catch (Exception e) {
- throw new UndeclaredThrowableException(e,
- "Unexpected exception obtaining token for user " + user.getName());
- }
- }
-
- /**
- * Obtain an authentication token on behalf of the given user and add it to
- * the credentials for the given map reduce job.
- * @param conf The configuration for connecting to the cluster
- * @param user The user for whom to obtain the token
- * @param job The job instance in which the token should be stored
- * @throws IOException If making a remote call to the authentication service fails
- * @throws InterruptedException If executing as the given user is interrupted
- * @deprecated Replaced by {@link #obtainTokenForJob(Connection,User,Job)}
- */
- @Deprecated
- public static void obtainTokenForJob(final Configuration conf,
- UserGroupInformation user, Job job)
- throws IOException, InterruptedException {
- Connection conn = ConnectionFactory.createConnection(conf);
- try {
- UserProvider userProvider = UserProvider.instantiate(conf);
- obtainTokenForJob(conn, userProvider.create(user), job);
- } finally {
- conn.close();
- }
- }
-
- /**
- * Obtain an authentication token on behalf of the given user and add it to
- * the credentials for the given map reduce job.
- * @param conn The HBase cluster connection
- * @param user The user for whom to obtain the token
- * @param job The job instance in which the token should be stored
- * @throws IOException If making a remote call to the authentication service fails
- * @throws InterruptedException If executing as the given user is interrupted
- */
- public static void obtainTokenForJob(final Connection conn,
- User user, Job job)
- throws IOException, InterruptedException {
- try {
- Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user);
-
- if (token == null) {
- throw new IOException("No token returned for user " + user.getName());
- }
- Text clusterId = getClusterId(token);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Obtained token " + token.getKind().toString() + " for user " +
- user.getName() + " on cluster " + clusterId.toString());
- }
- job.getCredentials().addToken(clusterId, token);
- } catch (IOException ioe) {
- throw ioe;
- } catch (InterruptedException ie) {
- throw ie;
- } catch (RuntimeException re) {
- throw re;
- } catch (Exception e) {
- throw new UndeclaredThrowableException(e,
- "Unexpected exception obtaining token for user " + user.getName());
- }
- }
-
- /**
- * Obtain an authentication token on behalf of the given user and add it to
- * the credentials for the given map reduce job.
- * @param user The user for whom to obtain the token
- * @param job The job configuration in which the token should be stored
- * @throws IOException If making a remote call to the authentication service fails
- * @throws InterruptedException If executing as the given user is interrupted
- * @deprecated Replaced by {@link #obtainTokenForJob(Connection,JobConf,User)}
- */
- @Deprecated
- public static void obtainTokenForJob(final JobConf job,
- UserGroupInformation user)
- throws IOException, InterruptedException {
- Connection conn = ConnectionFactory.createConnection(job);
- try {
- UserProvider userProvider = UserProvider.instantiate(job);
- obtainTokenForJob(conn, job, userProvider.create(user));
- } finally {
- conn.close();
- }
- }
-
- /**
- * Obtain an authentication token on behalf of the given user and add it to
- * the credentials for the given map reduce job.
- * @param conn The HBase cluster connection
- * @param user The user for whom to obtain the token
- * @param job The job configuration in which the token should be stored
- * @throws IOException If making a remote call to the authentication service fails
- * @throws InterruptedException If executing as the given user is interrupted
- */
- public static void obtainTokenForJob(final Connection conn, final JobConf job, User user)
- throws IOException, InterruptedException {
- try {
- Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user);
-
- if (token == null) {
- throw new IOException("No token returned for user " + user.getName());
- }
- Text clusterId = getClusterId(token);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Obtained token " + token.getKind().toString() + " for user " +
- user.getName() + " on cluster " + clusterId.toString());
- }
- job.getCredentials().addToken(clusterId, token);
- } catch (IOException ioe) {
- throw ioe;
- } catch (InterruptedException ie) {
- throw ie;
- } catch (RuntimeException re) {
- throw re;
- } catch (Exception e) {
- throw new UndeclaredThrowableException(e,
- "Unexpected exception obtaining token for user "+user.getName());
- }
- }
-
- /**
- * Checks for an authentication token for the given user, obtaining a new token if necessary,
- * and adds it to the credentials for the given map reduce job.
- *
- * @param conn The HBase cluster connection
- * @param user The user for whom to obtain the token
- * @param job The job configuration in which the token should be stored
- * @throws IOException If making a remote call to the authentication service fails
- * @throws InterruptedException If executing as the given user is interrupted
- */
- public static void addTokenForJob(final Connection conn, final JobConf job, User user)
- throws IOException, InterruptedException {
-
- Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user);
- if (token == null) {
- token = obtainToken(conn, user);
- }
- job.getCredentials().addToken(token.getService(), token);
- }
-
- /**
- * Checks for an authentication token for the given user, obtaining a new token if necessary,
- * and adds it to the credentials for the given map reduce job.
- *
- * @param conn The HBase cluster connection
- * @param user The user for whom to obtain the token
- * @param job The job instance in which the token should be stored
- * @throws IOException If making a remote call to the authentication service fails
- * @throws InterruptedException If executing as the given user is interrupted
- */
- public static void addTokenForJob(final Connection conn, User user, Job job)
- throws IOException, InterruptedException {
- Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user);
- if (token == null) {
- token = obtainToken(conn, user);
- }
- job.getCredentials().addToken(token.getService(), token);
- }
-
- /**
- * Checks if an authentication tokens exists for the connected cluster,
- * obtaining one if needed and adding it to the user's credentials.
- *
- * @param conn The HBase cluster connection
- * @param user The user for whom to obtain the token
- * @throws IOException If making a remote call to the authentication service fails
- * @throws InterruptedException If executing as the given user is interrupted
- * @return true if the token was added, false if it already existed
- */
- public static boolean addTokenIfMissing(Connection conn, User user)
- throws IOException, InterruptedException {
- Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user);
- if (token == null) {
- token = obtainToken(conn, user);
- user.getUGI().addToken(token.getService(), token);
- return true;
- }
- return false;
- }
-
- /**
- * Get the authentication token of the user for the cluster specified in the configuration
- * @return null if the user does not have the token, otherwise the auth token for the cluster.
- */
- private static Token<AuthenticationTokenIdentifier> getAuthToken(Configuration conf, User user)
- throws IOException, InterruptedException {
- ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "TokenUtil-getAuthToken", null);
- try {
- String clusterId = ZKClusterId.readClusterIdZNode(zkw);
- if (clusterId == null) {
- throw new IOException("Failed to get cluster ID");
- }
- return new AuthenticationTokenSelector().selectToken(new Text(clusterId), user.getTokens());
- } catch (KeeperException e) {
- throw new IOException(e);
- } finally {
- zkw.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/38b94709/hbase-common/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-common/pom.xml b/hbase-common/pom.xml
index d782c6c..3e315e4 100644
--- a/hbase-common/pom.xml
+++ b/hbase-common/pom.xml
@@ -357,10 +357,6 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-core</artifactId>
- </dependency>
</dependencies>
<build>
<plugins>
http://git-wip-us.apache.org/repos/asf/hbase/blob/38b94709/hbase-common/src/main/java/org/apache/hadoop/hbase/security/User.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/security/User.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/security/User.java
index 0efb402..b0e2ed2 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/security/User.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/security/User.java
@@ -20,7 +20,6 @@
package org.apache.hadoop.hbase.security;
import java.io.IOException;
-import java.lang.reflect.UndeclaredThrowableException;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
import java.util.Collection;
@@ -29,8 +28,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.util.Methods;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
@@ -100,30 +97,6 @@ public abstract class User {
throws IOException, InterruptedException;
/**
- * Requests an authentication token for this user and stores it in the
- * user's credentials.
- *
- * @throws IOException
- * @deprecated Use {@code TokenUtil.obtainAuthTokenForJob(Connection,User,Job)}
- * instead.
- */
- @Deprecated
- public abstract void obtainAuthTokenForJob(Configuration conf, Job job)
- throws IOException, InterruptedException;
-
- /**
- * Requests an authentication token for this user and stores it in the
- * user's credentials.
- *
- * @throws IOException
- * @deprecated Use {@code TokenUtil.obtainAuthTokenForJob(Connection,JobConf,User)}
- * instead.
- */
- @Deprecated
- public abstract void obtainAuthTokenForJob(JobConf job)
- throws IOException, InterruptedException;
-
- /**
* Returns the Token of the specified kind associated with this user,
* or null if the Token is not present.
*
@@ -278,7 +251,7 @@ public abstract class User {
* {@link org.apache.hadoop.security.UserGroupInformation} for secure Hadoop
* 0.20 and versions 0.21 and above.
*/
- private static class SecureHadoopUser extends User {
+ private static final class SecureHadoopUser extends User {
private String shortName;
private SecureHadoopUser() throws IOException {
@@ -312,55 +285,6 @@ public abstract class User {
return ugi.doAs(action);
}
- @Override
- public void obtainAuthTokenForJob(Configuration conf, Job job)
- throws IOException, InterruptedException {
- try {
- Class<?> c = Class.forName(
- "org.apache.hadoop.hbase.security.token.TokenUtil");
- Methods.call(c, null, "obtainTokenForJob",
- new Class[]{Configuration.class, UserGroupInformation.class,
- Job.class},
- new Object[]{conf, ugi, job});
- } catch (ClassNotFoundException cnfe) {
- throw new RuntimeException("Failure loading TokenUtil class, "
- +"is secure RPC available?", cnfe);
- } catch (IOException ioe) {
- throw ioe;
- } catch (InterruptedException ie) {
- throw ie;
- } catch (RuntimeException re) {
- throw re;
- } catch (Exception e) {
- throw new UndeclaredThrowableException(e,
- "Unexpected error calling TokenUtil.obtainAndCacheToken()");
- }
- }
-
- @Override
- public void obtainAuthTokenForJob(JobConf job)
- throws IOException, InterruptedException {
- try {
- Class<?> c = Class.forName(
- "org.apache.hadoop.hbase.security.token.TokenUtil");
- Methods.call(c, null, "obtainTokenForJob",
- new Class[]{JobConf.class, UserGroupInformation.class},
- new Object[]{job, ugi});
- } catch (ClassNotFoundException cnfe) {
- throw new RuntimeException("Failure loading TokenUtil class, "
- +"is secure RPC available?", cnfe);
- } catch (IOException ioe) {
- throw ioe;
- } catch (InterruptedException ie) {
- throw ie;
- } catch (RuntimeException re) {
- throw re;
- } catch (Exception e) {
- throw new UndeclaredThrowableException(e,
- "Unexpected error calling TokenUtil.obtainAndCacheToken()");
- }
- }
-
/** @see User#createUserForTesting(org.apache.hadoop.conf.Configuration, String, String[]) */
public static User createUserForTesting(Configuration conf,
String name, String[] groups) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/38b94709/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java
new file mode 100644
index 0000000..9be33d7
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security.token;
+
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.security.PrivilegedExceptionAction;
+
+import com.google.protobuf.ServiceException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Utility methods for obtaining authentication tokens.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class TokenUtil {
+ // This class is referenced indirectly by User out in common; instances are created by reflection
+ private static final Log LOG = LogFactory.getLog(TokenUtil.class);
+
+ /**
+ * Obtain and return an authentication token for the current user.
+ * @param conf the configuration for connecting to the cluster
+ * @return the authentication token instance
+ * @deprecated Replaced by {@link #obtainToken(Connection)}
+ */
+ @Deprecated
+ public static Token<AuthenticationTokenIdentifier> obtainToken(
+ Configuration conf) throws IOException {
+ try (Connection connection = ConnectionFactory.createConnection(conf)) {
+ return obtainToken(connection);
+ }
+ }
+
+ /**
+ * Obtain and return an authentication token for the current user.
+ * @param conn The HBase cluster connection
+ * @return the authentication token instance
+ */
+ public static Token<AuthenticationTokenIdentifier> obtainToken(
+ Connection conn) throws IOException {
+ Table meta = null;
+ try {
+ meta = conn.getTable(TableName.META_TABLE_NAME);
+ CoprocessorRpcChannel rpcChannel = meta.coprocessorService(HConstants.EMPTY_START_ROW);
+ AuthenticationProtos.AuthenticationService.BlockingInterface service =
+ AuthenticationProtos.AuthenticationService.newBlockingStub(rpcChannel);
+ AuthenticationProtos.GetAuthenticationTokenResponse response = service.getAuthenticationToken(null,
+ AuthenticationProtos.GetAuthenticationTokenRequest.getDefaultInstance());
+
+ return ProtobufUtil.toToken(response.getToken());
+ } catch (ServiceException se) {
+ ProtobufUtil.toIOException(se);
+ } finally {
+ if (meta != null) {
+ meta.close();
+ }
+ }
+ // dummy return for ServiceException block
+ return null;
+ }
+
+ /**
+ * Obtain and return an authentication token for the current user.
+ * @param conn The HBase cluster connection
+ * @return the authentication token instance
+ */
+ public static Token<AuthenticationTokenIdentifier> obtainToken(
+ final Connection conn, User user) throws IOException, InterruptedException {
+ return user.runAs(new PrivilegedExceptionAction<Token<AuthenticationTokenIdentifier>>() {
+ @Override
+ public Token<AuthenticationTokenIdentifier> run() throws Exception {
+ return obtainToken(conn);
+ }
+ });
+ }
+
+
+ private static Text getClusterId(Token<AuthenticationTokenIdentifier> token)
+ throws IOException {
+ return token.getService() != null
+ ? token.getService() : new Text("default");
+ }
+
+ /**
+ * Obtain an authentication token for the given user and add it to the
+ * user's credentials.
+ * @param conf The configuration for connecting to the cluster
+ * @param user The user for whom to obtain the token
+ * @throws IOException If making a remote call to the authentication service fails
+ * @throws InterruptedException If executing as the given user is interrupted
+ * @deprecated Replaced by {@link #obtainAndCacheToken(Connection,User)}
+ */
+ @Deprecated
+ public static void obtainAndCacheToken(final Configuration conf,
+ UserGroupInformation user)
+ throws IOException, InterruptedException {
+ Connection conn = ConnectionFactory.createConnection(conf);
+ try {
+ UserProvider userProvider = UserProvider.instantiate(conf);
+ obtainAndCacheToken(conn, userProvider.create(user));
+ } finally {
+ conn.close();
+ }
+ }
+
+ /**
+ * Obtain an authentication token for the given user and add it to the
+ * user's credentials.
+ * @param conn The HBase cluster connection
+ * @param user The user for whom to obtain the token
+ * @throws IOException If making a remote call to the authentication service fails
+ * @throws InterruptedException If executing as the given user is interrupted
+ */
+ public static void obtainAndCacheToken(final Connection conn,
+ User user)
+ throws IOException, InterruptedException {
+ try {
+ Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user);
+
+ if (token == null) {
+ throw new IOException("No token returned for user " + user.getName());
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Obtained token " + token.getKind().toString() + " for user " +
+ user.getName());
+ }
+ user.addToken(token);
+ } catch (IOException ioe) {
+ throw ioe;
+ } catch (InterruptedException ie) {
+ throw ie;
+ } catch (RuntimeException re) {
+ throw re;
+ } catch (Exception e) {
+ throw new UndeclaredThrowableException(e,
+ "Unexpected exception obtaining token for user " + user.getName());
+ }
+ }
+
+ /**
+ * Obtain an authentication token on behalf of the given user and add it to
+ * the credentials for the given map reduce job.
+ * @param conf The configuration for connecting to the cluster
+ * @param user The user for whom to obtain the token
+ * @param job The job instance in which the token should be stored
+ * @throws IOException If making a remote call to the authentication service fails
+ * @throws InterruptedException If executing as the given user is interrupted
+ * @deprecated Replaced by {@link #obtainTokenForJob(Connection,User,Job)}
+ */
+ @Deprecated
+ public static void obtainTokenForJob(final Configuration conf,
+ UserGroupInformation user, Job job)
+ throws IOException, InterruptedException {
+ Connection conn = ConnectionFactory.createConnection(conf);
+ try {
+ UserProvider userProvider = UserProvider.instantiate(conf);
+ obtainTokenForJob(conn, userProvider.create(user), job);
+ } finally {
+ conn.close();
+ }
+ }
+
+ /**
+ * Obtain an authentication token on behalf of the given user and add it to
+ * the credentials for the given map reduce job.
+ * @param conn The HBase cluster connection
+ * @param user The user for whom to obtain the token
+ * @param job The job instance in which the token should be stored
+ * @throws IOException If making a remote call to the authentication service fails
+ * @throws InterruptedException If executing as the given user is interrupted
+ */
+ public static void obtainTokenForJob(final Connection conn,
+ User user, Job job)
+ throws IOException, InterruptedException {
+ try {
+ Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user);
+
+ if (token == null) {
+ throw new IOException("No token returned for user " + user.getName());
+ }
+ Text clusterId = getClusterId(token);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Obtained token " + token.getKind().toString() + " for user " +
+ user.getName() + " on cluster " + clusterId.toString());
+ }
+ job.getCredentials().addToken(clusterId, token);
+ } catch (IOException ioe) {
+ throw ioe;
+ } catch (InterruptedException ie) {
+ throw ie;
+ } catch (RuntimeException re) {
+ throw re;
+ } catch (Exception e) {
+ throw new UndeclaredThrowableException(e,
+ "Unexpected exception obtaining token for user " + user.getName());
+ }
+ }
+
+ /**
+ * Obtain an authentication token on behalf of the given user and add it to
+ * the credentials for the given map reduce job.
+ * @param user The user for whom to obtain the token
+ * @param job The job configuration in which the token should be stored
+ * @throws IOException If making a remote call to the authentication service fails
+ * @throws InterruptedException If executing as the given user is interrupted
+ * @deprecated Replaced by {@link #obtainTokenForJob(Connection,JobConf,User)}
+ */
+ @Deprecated
+ public static void obtainTokenForJob(final JobConf job,
+ UserGroupInformation user)
+ throws IOException, InterruptedException {
+ Connection conn = ConnectionFactory.createConnection(job);
+ try {
+ UserProvider userProvider = UserProvider.instantiate(job);
+ obtainTokenForJob(conn, job, userProvider.create(user));
+ } finally {
+ conn.close();
+ }
+ }
+
+ /**
+ * Obtain an authentication token on behalf of the given user and add it to
+ * the credentials for the given map reduce job.
+ * @param conn The HBase cluster connection
+ * @param user The user for whom to obtain the token
+ * @param job The job configuration in which the token should be stored
+ * @throws IOException If making a remote call to the authentication service fails
+ * @throws InterruptedException If executing as the given user is interrupted
+ */
+ public static void obtainTokenForJob(final Connection conn, final JobConf job, User user)
+ throws IOException, InterruptedException {
+ try {
+ Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user);
+
+ if (token == null) {
+ throw new IOException("No token returned for user " + user.getName());
+ }
+ Text clusterId = getClusterId(token);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Obtained token " + token.getKind().toString() + " for user " +
+ user.getName() + " on cluster " + clusterId.toString());
+ }
+ job.getCredentials().addToken(clusterId, token);
+ } catch (IOException ioe) {
+ throw ioe;
+ } catch (InterruptedException ie) {
+ throw ie;
+ } catch (RuntimeException re) {
+ throw re;
+ } catch (Exception e) {
+ throw new UndeclaredThrowableException(e,
+ "Unexpected exception obtaining token for user "+user.getName());
+ }
+ }
+
+ /**
+ * Checks for an authentication token for the given user, obtaining a new token if necessary,
+ * and adds it to the credentials for the given map reduce job.
+ *
+ * @param conn The HBase cluster connection
+ * @param user The user for whom to obtain the token
+ * @param job The job configuration in which the token should be stored
+ * @throws IOException If making a remote call to the authentication service fails
+ * @throws InterruptedException If executing as the given user is interrupted
+ */
+ public static void addTokenForJob(final Connection conn, final JobConf job, User user)
+ throws IOException, InterruptedException {
+
+ Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user);
+ if (token == null) {
+ token = obtainToken(conn, user);
+ }
+ job.getCredentials().addToken(token.getService(), token);
+ }
+
+ /**
+ * Checks for an authentication token for the given user, obtaining a new token if necessary,
+ * and adds it to the credentials for the given map reduce job.
+ *
+ * @param conn The HBase cluster connection
+ * @param user The user for whom to obtain the token
+ * @param job The job instance in which the token should be stored
+ * @throws IOException If making a remote call to the authentication service fails
+ * @throws InterruptedException If executing as the given user is interrupted
+ */
+ public static void addTokenForJob(final Connection conn, User user, Job job)
+ throws IOException, InterruptedException {
+ Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user);
+ if (token == null) {
+ token = obtainToken(conn, user);
+ }
+ job.getCredentials().addToken(token.getService(), token);
+ }
+
+ /**
+ * Checks if an authentication tokens exists for the connected cluster,
+ * obtaining one if needed and adding it to the user's credentials.
+ *
+ * @param conn The HBase cluster connection
+ * @param user The user for whom to obtain the token
+ * @throws IOException If making a remote call to the authentication service fails
+ * @throws InterruptedException If executing as the given user is interrupted
+ * @return true if the token was added, false if it already existed
+ */
+ public static boolean addTokenIfMissing(Connection conn, User user)
+ throws IOException, InterruptedException {
+ Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user);
+ if (token == null) {
+ token = obtainToken(conn, user);
+ user.getUGI().addToken(token.getService(), token);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Get the authentication token of the user for the cluster specified in the configuration
+ * @return null if the user does not have the token, otherwise the auth token for the cluster.
+ */
+ private static Token<AuthenticationTokenIdentifier> getAuthToken(Configuration conf, User user)
+ throws IOException, InterruptedException {
+ ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "TokenUtil-getAuthToken", null);
+ try {
+ String clusterId = ZKClusterId.readClusterIdZNode(zkw);
+ if (clusterId == null) {
+ throw new IOException("Failed to get cluster ID");
+ }
+ return new AuthenticationTokenSelector().selectToken(new Text(clusterId), user.getTokens());
+ } catch (KeeperException e) {
+ throw new IOException(e);
+ } finally {
+ zkw.close();
+ }
+ }
+}