You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2014/11/27 02:07:35 UTC
svn commit: r1641980 [3/4] - in /hive/trunk: ./
beeline/src/test/org/apache/hive/beeline/
common/src/java/org/apache/hadoop/hive/common/
common/src/java/org/apache/hadoop/hive/conf/
hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/ hcatalog/...
Modified: hive/trunk/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java?rev=1641980&r1=1641979&r2=1641980&view=diff
==============================================================================
--- hive/trunk/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java (original)
+++ hive/trunk/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java Thu Nov 27 01:07:32 2014
@@ -33,6 +33,7 @@ import org.apache.curator.framework.imps
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
@@ -134,7 +135,7 @@ public class ZooKeeperTokenStore impleme
default:
throw new AssertionError("Unexpected server mode " + serverMode);
}
- ShimLoader.getHadoopShims().setZookeeperClientKerberosJaasConfig(principal, keytab);
+ Utils.setZookeeperClientKerberosJaasConfig(principal, keytab);
}
private String getNonEmptyConfVar(Configuration conf, String param) throws IOException {
@@ -431,32 +432,32 @@ public class ZooKeeperTokenStore impleme
public void init(Object objectStore, ServerMode smode) {
this.serverMode = smode;
zkConnectString =
- conf.get(HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, null);
+ conf.get(HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, null);
if (zkConnectString == null || zkConnectString.trim().isEmpty()) {
// try alternate config param
zkConnectString =
conf.get(
- HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE,
+ HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE,
null);
if (zkConnectString == null || zkConnectString.trim().isEmpty()) {
throw new IllegalArgumentException("Zookeeper connect string has to be specifed through "
- + "either " + HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR
+ + "either " + HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR
+ " or "
- + HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE
+ + HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE
+ WHEN_ZK_DSTORE_MSG);
}
}
connectTimeoutMillis =
conf.getInt(
- HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS,
+ HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS,
CuratorFrameworkFactory.builder().getConnectionTimeoutMs());
- String aclStr = conf.get(HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ACL, null);
+ String aclStr = conf.get(HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_ACL, null);
if (StringUtils.isNotBlank(aclStr)) {
this.newNodeAcl = parseACLs(aclStr);
}
rootNode =
- conf.get(HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE,
- HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT) + serverMode;
+ conf.get(HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE,
+ HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT) + serverMode;
try {
// Install the JAAS Configuration for the runtime
Modified: hive/trunk/shims/common/pom.xml
URL: http://svn.apache.org/viewvc/hive/trunk/shims/common/pom.xml?rev=1641980&r1=1641979&r2=1641980&view=diff
==============================================================================
--- hive/trunk/shims/common/pom.xml (original)
+++ hive/trunk/shims/common/pom.xml Thu Nov 27 01:07:32 2014
@@ -53,7 +53,7 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
- <version>${hadoop-20.version}</version>
+ <version>${hadoop-20S.version}</version>
<optional>true</optional>
</dependency>
<dependency>
@@ -66,5 +66,16 @@
<artifactId>libthrift</artifactId>
<version>${libthrift.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>${zookeeper.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
</project>
Modified: hive/trunk/shims/common/src/main/java/org/apache/hadoop/fs/DefaultFileAccess.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/common/src/main/java/org/apache/hadoop/fs/DefaultFileAccess.java?rev=1641980&r1=1641979&r2=1641980&view=diff
==============================================================================
--- hive/trunk/shims/common/src/main/java/org/apache/hadoop/fs/DefaultFileAccess.java (original)
+++ hive/trunk/shims/common/src/main/java/org/apache/hadoop/fs/DefaultFileAccess.java Thu Nov 27 01:07:32 2014
@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.security.UserGroupInformation;
/**
@@ -51,9 +52,9 @@ public class DefaultFileAccess {
public static void checkFileAccess(FileSystem fs, FileStatus stat, FsAction action)
throws IOException, AccessControlException, LoginException {
// Get the user/groups for checking permissions based on the current UGI.
- UserGroupInformation currentUgi = ShimLoader.getHadoopShims().getUGIForConf(fs.getConf());
+ UserGroupInformation currentUgi = Utils.getUGIForConf(fs.getConf());
DefaultFileAccess.checkFileAccess(fs, stat, action,
- ShimLoader.getHadoopShims().getShortUserName(currentUgi),
+ currentUgi.getShortUserName(),
Arrays.asList(currentUgi.getGroupNames()));
}
Modified: hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java?rev=1641980&r1=1641979&r2=1641980&view=diff
==============================================================================
--- hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java (original)
+++ hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java Thu Nov 27 01:07:32 2014
@@ -23,19 +23,13 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URI;
-import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.security.AccessControlException;
-import java.security.PrivilegedExceptionAction;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
-import javax.security.auth.login.LoginException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -54,6 +48,7 @@ import org.apache.hadoop.mapred.JobProfi
import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.CombineFileSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
@@ -74,8 +69,6 @@ import org.apache.hadoop.util.Progressab
*/
public interface HadoopShims {
- static final Log LOG = LogFactory.getLog(HadoopShims.class);
-
/**
* Constructs and Returns TaskAttempt Log Url
* or null if the TaskLogServlet is not available
@@ -125,148 +118,6 @@ public interface HadoopShims {
CombineFileInputFormatShim getCombineFileInputFormat();
- String getInputFormatClassName();
-
- int createHadoopArchive(Configuration conf, Path parentDir, Path destDir,
- String archiveName) throws Exception;
-
- public URI getHarUri(URI original, URI base, URI originalBase)
- throws URISyntaxException;
- /**
- * Hive uses side effect files exclusively for it's output. It also manages
- * the setup/cleanup/commit of output from the hive client. As a result it does
- * not need support for the same inside the MR framework
- *
- * This routine sets the appropriate options related to bypass setup/cleanup/commit
- * support in the MR framework, but does not set the OutputFormat class.
- */
- void prepareJobOutput(JobConf conf);
-
- /**
- * Used by TaskLogProcessor to Remove HTML quoting from a string
- * @param item the string to unquote
- * @return the unquoted string
- *
- */
- public String unquoteHtmlChars(String item);
-
-
-
- public void closeAllForUGI(UserGroupInformation ugi);
-
- /**
- * Get the UGI that the given job configuration will run as.
- *
- * In secure versions of Hadoop, this simply returns the current
- * access control context's user, ignoring the configuration.
- */
- public UserGroupInformation getUGIForConf(Configuration conf) throws LoginException, IOException;
-
- /**
- * Used by metastore server to perform requested rpc in client context.
- * @param <T>
- * @param ugi
- * @param pvea
- * @throws IOException
- * @throws InterruptedException
- */
- public <T> T doAs(UserGroupInformation ugi, PrivilegedExceptionAction<T> pvea) throws
- IOException, InterruptedException;
-
- /**
- * Once a delegation token is stored in a file, the location is specified
- * for a child process that runs hadoop operations, using an environment
- * variable .
- * @return Return the name of environment variable used by hadoop to find
- * location of token file
- */
- public String getTokenFileLocEnvName();
-
-
- /**
- * Get delegation token from filesystem and write the token along with
- * metastore tokens into a file
- * @param conf
- * @return Path of the file with token credential
- * @throws IOException
- */
- public Path createDelegationTokenFile(final Configuration conf) throws IOException;
-
-
- /**
- * Used to creates UGI object for a remote user.
- * @param userName remote User Name
- * @param groupNames group names associated with remote user name
- * @return UGI created for the remote user.
- */
- public UserGroupInformation createRemoteUser(String userName, List<String> groupNames);
-
- /**
- * Get the short name corresponding to the subject in the passed UGI
- *
- * In secure versions of Hadoop, this returns the short name (after
- * undergoing the translation in the kerberos name rule mapping).
- * In unsecure versions of Hadoop, this returns the name of the subject
- */
- public String getShortUserName(UserGroupInformation ugi);
-
- /**
- * Return true if the Shim is based on Hadoop Security APIs.
- */
- public boolean isSecureShimImpl();
-
- /**
- * Return true if the hadoop configuration has security enabled
- * @return
- */
- public boolean isSecurityEnabled();
-
- /**
- * Get the string form of the token given a token signature.
- * The signature is used as the value of the "service" field in the token for lookup.
- * Ref: AbstractDelegationTokenSelector in Hadoop. If there exists such a token
- * in the token cache (credential store) of the job, the lookup returns that.
- * This is relevant only when running against a "secure" hadoop release
- * The method gets hold of the tokens if they are set up by hadoop - this should
- * happen on the map/reduce tasks if the client added the tokens into hadoop's
- * credential store in the front end during job submission. The method will
- * select the hive delegation token among the set of tokens and return the string
- * form of it
- * @param tokenSignature
- * @return the string form of the token found
- * @throws IOException
- */
- public String getTokenStrForm(String tokenSignature) throws IOException;
-
- /**
- * Dynamically sets up the JAAS configuration that uses kerberos
- * @param principal
- * @param keyTabFile
- * @throws IOException
- */
- public void setZookeeperClientKerberosJaasConfig(String principal, String keyTabFile)
- throws IOException;
-
- /**
- * Add a delegation token to the given ugi
- * @param ugi
- * @param tokenStr
- * @param tokenService
- * @throws IOException
- */
- public void setTokenStr(UserGroupInformation ugi, String tokenStr, String tokenService)
- throws IOException;
-
- /**
- * Add given service to the string format token
- * @param tokenStr
- * @param tokenService
- * @return
- * @throws IOException
- */
- public String addServiceToToken(String tokenStr, String tokenService)
- throws IOException;
-
enum JobTrackerState { INITIALIZING, RUNNING };
/**
@@ -315,43 +166,6 @@ public interface HadoopShims {
*/
public String getJobLauncherHttpAddress(Configuration conf);
-
- /**
- * Perform kerberos login using the given principal and keytab
- * @throws IOException
- */
- public void loginUserFromKeytab(String principal, String keytabFile) throws IOException;
-
- /**
- * Perform kerberos login using the given principal and keytab,
- * and return the UGI object
- * @throws IOException
- */
- public UserGroupInformation loginUserFromKeytabAndReturnUGI(String principal,
- String keytabFile) throws IOException;
-
- /**
- * Convert Kerberos principal name pattern to valid Kerberos principal names.
- * @param principal (principal name pattern)
- * @return
- * @throws IOException
- */
- public String getResolvedPrincipal(String principal) throws IOException;
-
- /**
- * Perform kerberos re-login using the given principal and keytab, to renew
- * the credentials
- * @throws IOException
- */
- public void reLoginUserFromKeytab() throws IOException;
-
- /***
- * Check if the current UGI is keytab based
- * @return
- * @throws IOException
- */
- public boolean isLoginKeytabBased() throws IOException;
-
/**
* Move the directory/file to trash. In case of the symlinks or mount points, the file is
* moved to the trashbin in the actual volume of the path p being deleted
@@ -392,20 +206,6 @@ public interface HadoopShims {
throws IOException;
/**
- * Create the proxy ugi for the given userid
- * @param userName
- * @return
- */
- public UserGroupInformation createProxyUser(String userName) throws IOException;
-
- /**
- * Verify proxy access to given UGI for given user
- * @param ugi
- */
- public void authorizeProxyAccess(String proxyUser, UserGroupInformation realUserUgi,
- String ipAddress, Configuration conf) throws IOException;
-
- /**
* The method sets to set the partition file has a different signature between
* hadoop versions.
* @param jobConf
@@ -416,53 +216,6 @@ public interface HadoopShims {
Comparator<LongWritable> getLongComparator();
/**
- * InputSplitShim.
- *
- */
- public interface InputSplitShim extends InputSplit {
- JobConf getJob();
-
- @Override
- long getLength();
-
- /** Returns an array containing the startoffsets of the files in the split. */
- long[] getStartOffsets();
-
- /** Returns an array containing the lengths of the files in the split. */
- long[] getLengths();
-
- /** Returns the start offset of the i<sup>th</sup> Path. */
- long getOffset(int i);
-
- /** Returns the length of the i<sup>th</sup> Path. */
- long getLength(int i);
-
- /** Returns the number of Paths in the split. */
- int getNumPaths();
-
- /** Returns the i<sup>th</sup> Path. */
- Path getPath(int i);
-
- /** Returns all the Paths in the split. */
- Path[] getPaths();
-
- /** Returns all the Paths where this input-split resides. */
- @Override
- String[] getLocations() throws IOException;
-
- void shrinkSplit(long length);
-
- @Override
- String toString();
-
- @Override
- void readFields(DataInput in) throws IOException;
-
- @Override
- void write(DataOutput out) throws IOException;
- }
-
- /**
* CombineFileInputFormatShim.
*
* @param <K>
@@ -473,11 +226,11 @@ public interface HadoopShims {
void createPool(JobConf conf, PathFilter... filters);
- InputSplitShim[] getSplits(JobConf job, int numSplits) throws IOException;
+ CombineFileSplit[] getSplits(JobConf job, int numSplits) throws IOException;
- InputSplitShim getInputSplitShim() throws IOException;
+ CombineFileSplit getInputSplitShim() throws IOException;
- RecordReader getRecordReader(JobConf job, InputSplitShim split, Reporter reporter,
+ RecordReader getRecordReader(JobConf job, CombineFileSplit split, Reporter reporter,
Class<RecordReader<K, V>> rrClass) throws IOException;
}
Modified: hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java?rev=1641980&r1=1641979&r2=1641980&view=diff
==============================================================================
--- hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java (original)
+++ hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java Thu Nov 27 01:07:32 2014
@@ -20,7 +20,6 @@ package org.apache.hadoop.hive.shims;
import java.util.HashMap;
import java.util.Map;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge;
import org.apache.hadoop.util.VersionInfo;
import org.apache.log4j.AppenderSkeleton;
@@ -43,7 +42,6 @@ public abstract class ShimLoader {
new HashMap<String, String>();
static {
- HADOOP_SHIM_CLASSES.put("0.20", "org.apache.hadoop.hive.shims.Hadoop20Shims");
HADOOP_SHIM_CLASSES.put("0.20S", "org.apache.hadoop.hive.shims.Hadoop20SShims");
HADOOP_SHIM_CLASSES.put("0.23", "org.apache.hadoop.hive.shims.Hadoop23Shims");
}
@@ -56,7 +54,6 @@ public abstract class ShimLoader {
new HashMap<String, String>();
static {
- JETTY_SHIM_CLASSES.put("0.20", "org.apache.hadoop.hive.shims.Jetty20Shims");
JETTY_SHIM_CLASSES.put("0.20S", "org.apache.hadoop.hive.shims.Jetty20SShims");
JETTY_SHIM_CLASSES.put("0.23", "org.apache.hadoop.hive.shims.Jetty23Shims");
}
@@ -68,7 +65,6 @@ public abstract class ShimLoader {
new HashMap<String, String>();
static {
- EVENT_COUNTER_SHIM_CLASSES.put("0.20", "org.apache.hadoop.metrics.jvm.EventCounter");
EVENT_COUNTER_SHIM_CLASSES.put("0.20S", "org.apache.hadoop.log.metrics.EventCounter");
EVENT_COUNTER_SHIM_CLASSES.put("0.23", "org.apache.hadoop.log.metrics.EventCounter");
}
@@ -80,10 +76,8 @@ public abstract class ShimLoader {
new HashMap<String, String>();
static {
- HADOOP_THRIFT_AUTH_BRIDGE_CLASSES.put("0.20",
- "org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge");
HADOOP_THRIFT_AUTH_BRIDGE_CLASSES.put("0.20S",
- "org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge20S");
+ "org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge");
HADOOP_THRIFT_AUTH_BRIDGE_CLASSES.put("0.23",
"org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge23");
}
@@ -153,9 +147,7 @@ public abstract class ShimLoader {
/**
* Return the "major" version of Hadoop currently on the classpath.
- * For releases in the 0.x series this is simply the first two
- * components of the version, e.g. "0.20" or "0.23". Releases in
- * the 1.x and 2.x series are mapped to the appropriate
+ * Releases in the 1.x and 2.x series are mapped to the appropriate
* 0.x release series, e.g. 1.x is mapped to "0.20S" and 2.x
* is mapped to "0.23".
*/
@@ -168,10 +160,7 @@ public abstract class ShimLoader {
" (expected A.B.* format)");
}
- // Special handling for Hadoop 1.x and 2.x
switch (Integer.parseInt(parts[0])) {
- case 0:
- break;
case 1:
return "0.20S";
case 2:
@@ -179,19 +168,6 @@ public abstract class ShimLoader {
default:
throw new IllegalArgumentException("Unrecognized Hadoop major version number: " + vers);
}
-
- String majorVersion = parts[0] + "." + parts[1];
-
- // If we are running a security release, we won't have UnixUserGroupInformation
- // (removed by HADOOP-6299 when switching to JAAS for Login)
- try {
- Class.forName("org.apache.hadoop.security.UnixUserGroupInformation");
- } catch (ClassNotFoundException cnf) {
- if ("0.20".equals(majorVersion)) {
- majorVersion += "S";
- }
- }
- return majorVersion;
}
private ShimLoader() {
Added: hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/Utils.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/Utils.java?rev=1641980&view=auto
==============================================================================
--- hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/Utils.java (added)
+++ hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/Utils.java Thu Nov 27 01:07:32 2014
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.shims;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.LoginException;
+import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.util.KerberosUtil;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenSelector;
+import org.apache.hadoop.security.token.delegation.DelegationTokenSelector;
+import org.apache.zookeeper.client.ZooKeeperSaslClient;
+
+public class Utils {
+
+ public static UserGroupInformation getUGIForConf(Configuration conf) throws LoginException, IOException {
+ String doAs = System.getenv("HADOOP_USER_NAME");
+ if(doAs != null && doAs.length() > 0) {
+ /*
+ * this allows doAs (proxy user) to be passed along across process boundary where
+ * delegation tokens are not supported. For example, a DDL stmt via WebHCat with
+ * a doAs parameter, forks to 'hcat' which needs to start a Session that
+ * proxies the end user
+ */
+ return UserGroupInformation.createProxyUser(doAs, UserGroupInformation.getLoginUser());
+ }
+ return UserGroupInformation.getCurrentUser();
+ }
+
+ /**
+ * Get the string form of the token given a token signature.
+ * The signature is used as the value of the "service" field in the token for lookup.
+ * Ref: AbstractDelegationTokenSelector in Hadoop. If there exists such a token
+ * in the token cache (credential store) of the job, the lookup returns that.
+ * This is relevant only when running against a "secure" hadoop release
+ * The method gets hold of the tokens if they are set up by hadoop - this should
+ * happen on the map/reduce tasks if the client added the tokens into hadoop's
+ * credential store in the front end during job submission. The method will
+ * select the hive delegation token among the set of tokens and return the string
+ * form of it
+ * @param tokenSignature
+ * @return the string form of the token found
+ * @throws IOException
+ */
+ public static String getTokenStrForm(String tokenSignature) throws IOException {
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ TokenSelector<? extends TokenIdentifier> tokenSelector = new DelegationTokenSelector();
+
+ Token<? extends TokenIdentifier> token = tokenSelector.selectToken(
+ tokenSignature == null ? new Text() : new Text(tokenSignature), ugi.getTokens());
+ return token != null ? token.encodeToUrlString() : null;
+ }
+
+ /**
+ * Create a delegation token object for the given token string and service.
+ * Add the token to given UGI
+ * @param ugi
+ * @param tokenStr
+ * @param tokenService
+ * @throws IOException
+ */
+ public static void setTokenStr(UserGroupInformation ugi, String tokenStr, String tokenService)
+ throws IOException {
+ Token<DelegationTokenIdentifier> delegationToken = createToken(tokenStr, tokenService);
+ ugi.addToken(delegationToken);
+ }
+
+ /**
+ * Add a given service to delegation token string.
+ * @param tokenStr
+ * @param tokenService
+ * @return
+ * @throws IOException
+ */
+ public static String addServiceToToken(String tokenStr, String tokenService)
+ throws IOException {
+ Token<DelegationTokenIdentifier> delegationToken = createToken(tokenStr, tokenService);
+ return delegationToken.encodeToUrlString();
+ }
+
+ /**
+ * Create a new token using the given string and service
+ * @param tokenStr
+ * @param tokenService
+ * @return
+ * @throws IOException
+ */
+ private static Token<DelegationTokenIdentifier> createToken(String tokenStr, String tokenService)
+ throws IOException {
+ Token<DelegationTokenIdentifier> delegationToken = new Token<DelegationTokenIdentifier>();
+ delegationToken.decodeFromUrlString(tokenStr);
+ delegationToken.setService(new Text(tokenService));
+ return delegationToken;
+ }
+
+ /**
+ * Dynamically sets up the JAAS configuration that uses kerberos
+ * @param principal
+ * @param keyTabFile
+ * @throws IOException
+ */
+ public static void setZookeeperClientKerberosJaasConfig(String principal, String keyTabFile) throws IOException {
+ // ZooKeeper property name to pick the correct JAAS conf section
+ final String SASL_LOGIN_CONTEXT_NAME = "HiveZooKeeperClient";
+ System.setProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, SASL_LOGIN_CONTEXT_NAME);
+
+ principal = SecurityUtil.getServerPrincipal(principal, "0.0.0.0");
+ JaasConfiguration jaasConf = new JaasConfiguration(SASL_LOGIN_CONTEXT_NAME, principal, keyTabFile);
+
+ // Install the Configuration in the runtime.
+ javax.security.auth.login.Configuration.setConfiguration(jaasConf);
+ }
+
+ /**
+ * A JAAS configuration for ZooKeeper clients intended to use for SASL
+ * Kerberos.
+ */
+ private static class JaasConfiguration extends javax.security.auth.login.Configuration {
+ // Current installed Configuration
+ private final javax.security.auth.login.Configuration baseConfig = javax.security.auth.login.Configuration
+ .getConfiguration();
+ private final String loginContextName;
+ private final String principal;
+ private final String keyTabFile;
+
+ public JaasConfiguration(String hiveLoginContextName, String principal, String keyTabFile) {
+ this.loginContextName = hiveLoginContextName;
+ this.principal = principal;
+ this.keyTabFile = keyTabFile;
+ }
+
+ @Override
+ public AppConfigurationEntry[] getAppConfigurationEntry(String appName) {
+ if (loginContextName.equals(appName)) {
+ Map<String, String> krbOptions = new HashMap<String, String>();
+ krbOptions.put("doNotPrompt", "true");
+ krbOptions.put("storeKey", "true");
+ krbOptions.put("useKeyTab", "true");
+ krbOptions.put("principal", principal);
+ krbOptions.put("keyTab", keyTabFile);
+ krbOptions.put("refreshKrb5Config", "true");
+ AppConfigurationEntry hiveZooKeeperClientEntry = new AppConfigurationEntry(
+ KerberosUtil.getKrb5LoginModuleName(), LoginModuleControlFlag.REQUIRED, krbOptions);
+ return new AppConfigurationEntry[] { hiveZooKeeperClientEntry };
+ }
+ // Try the base config
+ if (baseConfig != null) {
+ return baseConfig.getAppConfigurationEntry(appName);
+ }
+ return null;
+ }
+ }
+
+
+}
Added: hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenIdentifier.java?rev=1641980&view=auto
==============================================================================
--- hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenIdentifier.java (added)
+++ hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenIdentifier.java Thu Nov 27 01:07:32 2014
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.thrift;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
+
+/**
+ * A delegation token identifier that is specific to Hive.
+ */
+public class DelegationTokenIdentifier
+ extends AbstractDelegationTokenIdentifier {
+ public static final Text HIVE_DELEGATION_KIND = new Text("HIVE_DELEGATION_TOKEN");
+
+ /**
+ * Create an empty delegation token identifier for reading into.
+ */
+ public DelegationTokenIdentifier() {
+ }
+
+ /**
+ * Create a new delegation token identifier
+ * @param owner the effective username of the token owner
+ * @param renewer the username of the renewer
+ * @param realUser the real username of the token owner
+ */
+ public DelegationTokenIdentifier(Text owner, Text renewer, Text realUser) {
+ super(owner, renewer, realUser);
+ }
+
+ @Override
+ public Text getKind() {
+ return HIVE_DELEGATION_KIND;
+ }
+
+}
Added: hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenSecretManager.java?rev=1641980&view=auto
==============================================================================
--- hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenSecretManager.java (added)
+++ hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenSecretManager.java Thu Nov 27 01:07:32 2014
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.thrift;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
+
+/**
+ * A Hive specific delegation token secret manager.
+ * The secret manager is responsible for generating and accepting the password
+ * for each token.
+ */
+public class DelegationTokenSecretManager
+ extends AbstractDelegationTokenSecretManager<DelegationTokenIdentifier> {
+
+ /**
+ * Create a secret manager
+ * @param delegationKeyUpdateInterval the number of seconds for rolling new
+ * secret keys.
+ * @param delegationTokenMaxLifetime the maximum lifetime of the delegation
+ * tokens
+ * @param delegationTokenRenewInterval how often the tokens must be renewed
+ * @param delegationTokenRemoverScanInterval how often the tokens are scanned
+ * for expired tokens
+ */
+ public DelegationTokenSecretManager(long delegationKeyUpdateInterval,
+ long delegationTokenMaxLifetime,
+ long delegationTokenRenewInterval,
+ long delegationTokenRemoverScanInterval) {
+ super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
+ delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
+ }
+
+ @Override
+ public DelegationTokenIdentifier createIdentifier() {
+ return new DelegationTokenIdentifier();
+ }
+
+ public synchronized void cancelDelegationToken(String tokenStrForm) throws IOException {
+ Token<DelegationTokenIdentifier> t= new Token<DelegationTokenIdentifier>();
+ t.decodeFromUrlString(tokenStrForm);
+ String user = UserGroupInformation.getCurrentUser().getUserName();
+ cancelToken(t, user);
+ }
+
+ public synchronized long renewDelegationToken(String tokenStrForm) throws IOException {
+ Token<DelegationTokenIdentifier> t= new Token<DelegationTokenIdentifier>();
+ t.decodeFromUrlString(tokenStrForm);
+ String user = UserGroupInformation.getCurrentUser().getUserName();
+ return renewToken(t, user);
+ }
+
+ public synchronized String getDelegationToken(String renewer) throws IOException {
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ Text owner = new Text(ugi.getUserName());
+ Text realUser = null;
+ if (ugi.getRealUser() != null) {
+ realUser = new Text(ugi.getRealUser().getUserName());
+ }
+ DelegationTokenIdentifier ident =
+ new DelegationTokenIdentifier(owner, new Text(renewer), realUser);
+ Token<DelegationTokenIdentifier> t = new Token<DelegationTokenIdentifier>(
+ ident, this);
+ return t.encodeToUrlString();
+ }
+
+ public String getUserFromToken(String tokenStr) throws IOException {
+ Token<DelegationTokenIdentifier> delegationToken = new Token<DelegationTokenIdentifier>();
+ delegationToken.decodeFromUrlString(tokenStr);
+
+ ByteArrayInputStream buf = new ByteArrayInputStream(delegationToken.getIdentifier());
+ DataInputStream in = new DataInputStream(buf);
+ DelegationTokenIdentifier id = createIdentifier();
+ id.readFields(in);
+ return id.getUser().getShortUserName();
+ }
+}
+
Added: hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java?rev=1641980&view=auto
==============================================================================
--- hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java (added)
+++ hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java Thu Nov 27 01:07:32 2014
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.thrift;
+
+import java.io.Closeable;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
+
+/**
+ * Interface for pluggable token store that can be implemented with shared external
+ * storage for load balancing and high availability (for example using ZooKeeper).
+ * Internal, store specific errors are translated into {@link TokenStoreException}.
+ */
+public interface DelegationTokenStore extends Configurable, Closeable {
+
+ /**
+ * Exception for internal token store errors that typically cannot be handled by the caller.
+ */
+ public static class TokenStoreException extends RuntimeException {
+ private static final long serialVersionUID = -8693819817623074083L;
+
+ public TokenStoreException(Throwable cause) {
+ super(cause);
+ }
+
+ public TokenStoreException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+ /**
+ * Add new master key. The token store assigns and returns the sequence number.
+ * Caller needs to use the identifier to update the key (since it is embedded in the key).
+ *
+ * @param s
+ * @return sequence number for new key
+ */
+ int addMasterKey(String s) throws TokenStoreException;
+
+ /**
+ * Update master key (for expiration and setting store assigned sequence within key)
+ * @param keySeq
+ * @param s
+ * @throws TokenStoreException
+ */
+ void updateMasterKey(int keySeq, String s) throws TokenStoreException;
+
+ /**
+ * Remove key for given id.
+ * @param keySeq
+ * @return false if key no longer present, true otherwise.
+ */
+ boolean removeMasterKey(int keySeq);
+
+ /**
+ * Return all master keys.
+ * @return
+ * @throws TokenStoreException
+ */
+ String[] getMasterKeys() throws TokenStoreException;
+
+ /**
+ * Add token. If identifier is already present, token won't be added.
+ * @param tokenIdentifier
+ * @param token
+ * @return true if token was added, false for existing identifier
+ */
+ boolean addToken(DelegationTokenIdentifier tokenIdentifier,
+ DelegationTokenInformation token) throws TokenStoreException;
+
+ /**
+ * Get token. Returns null if the token does not exist.
+ * @param tokenIdentifier
+ * @return
+ */
+ DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier)
+ throws TokenStoreException;
+
+ /**
+ * Remove token. Return value can be used by caller to detect concurrency.
+ * @param tokenIdentifier
+ * @return true if token was removed, false if it was already removed.
+ * @throws TokenStoreException
+ */
+ boolean removeToken(DelegationTokenIdentifier tokenIdentifier) throws TokenStoreException;
+
+ /**
+ * List of all token identifiers in the store. This is used to remove expired tokens
+ * and a potential scalability improvement would be to partition by master key id
+ * @return
+ */
+ List<DelegationTokenIdentifier> getAllDelegationTokenIdentifiers() throws TokenStoreException;
+
+ /**
+ * @param hmsHandler ObjectStore used by DBTokenStore
+ * @param smode Indicate whether this is a metastore or hiveserver2 token store
+ */
+ void init(Object hmsHandler, ServerMode smode);
+
+}
Modified: hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java?rev=1641980&r1=1641979&r2=1641980&view=diff
==============================================================================
--- hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java (original)
+++ hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java Thu Nov 27 01:07:32 2014
@@ -15,107 +15,726 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hive.thrift;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION;
+
import java.io.IOException;
import java.net.InetAddress;
+import java.net.Socket;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
+import java.util.Locale;
import java.util.Map;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.RealmChoiceCallback;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.hive.shims.Utils;
+import org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
+import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.apache.hadoop.security.authorize.ProxyUsers;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSaslClientTransport;
+import org.apache.thrift.transport.TSaslServerTransport;
+import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.TTransportFactory;
/**
- * This class is only overridden by the secure hadoop shim. It allows
- * the Thrift SASL support to bridge to Hadoop's UserGroupInformation
- * & DelegationToken infrastructure.
+ * Functions that bridge Thrift's SASL transports to Hadoop's
+ * SASL callback handlers and authentication classes.
*/
public class HadoopThriftAuthBridge {
+ static final Log LOG = LogFactory.getLog(HadoopThriftAuthBridge.class);
+
public Client createClient() {
- throw new UnsupportedOperationException(
- "The current version of Hadoop does not support Authentication");
+ return new Client();
}
- public Client createClientWithConf(String authType) {
- throw new UnsupportedOperationException(
- "The current version of Hadoop does not support Authentication");
+ public Client createClientWithConf(String authMethod) {
+ UserGroupInformation ugi;
+ try {
+ ugi = UserGroupInformation.getLoginUser();
+ } catch(IOException e) {
+ throw new IllegalStateException("Unable to get current login user: " + e, e);
+ }
+ if (loginUserHasCurrentAuthMethod(ugi, authMethod)) {
+ LOG.debug("Not setting UGI conf as passed-in authMethod of " + authMethod + " = current.");
+ return new Client();
+ } else {
+ LOG.debug("Setting UGI conf as passed-in authMethod of " + authMethod + " != current.");
+ Configuration conf = new Configuration();
+ conf.set(HADOOP_SECURITY_AUTHENTICATION, authMethod);
+ UserGroupInformation.setConfiguration(conf);
+ return new Client();
+ }
}
- public UserGroupInformation getCurrentUGIWithConf(String authType)
- throws IOException {
- throw new UnsupportedOperationException(
- "The current version of Hadoop does not support Authentication");
+ public Server createServer(String keytabFile, String principalConf) throws TTransportException {
+ return new Server(keytabFile, principalConf);
}
public String getServerPrincipal(String principalConfig, String host)
throws IOException {
- throw new UnsupportedOperationException(
- "The current version of Hadoop does not support Authentication");
+ String serverPrincipal = SecurityUtil.getServerPrincipal(principalConfig, host);
+ String names[] = SaslRpcServer.splitKerberosName(serverPrincipal);
+ if (names.length != 3) {
+ throw new IOException(
+ "Kerberos principal name does NOT have the expected hostname part: "
+ + serverPrincipal);
+ }
+ return serverPrincipal;
}
- public Server createServer(String keytabFile, String principalConf)
- throws TTransportException {
- throw new UnsupportedOperationException(
- "The current version of Hadoop does not support Authentication");
+
+ public UserGroupInformation getCurrentUGIWithConf(String authMethod)
+ throws IOException {
+ UserGroupInformation ugi;
+ try {
+ ugi = UserGroupInformation.getCurrentUser();
+ } catch(IOException e) {
+ throw new IllegalStateException("Unable to get current user: " + e, e);
+ }
+ if (loginUserHasCurrentAuthMethod(ugi, authMethod)) {
+ LOG.debug("Not setting UGI conf as passed-in authMethod of " + authMethod + " = current.");
+ return ugi;
+ } else {
+ LOG.debug("Setting UGI conf as passed-in authMethod of " + authMethod + " != current.");
+ Configuration conf = new Configuration();
+ conf.set(HADOOP_SECURITY_AUTHENTICATION, authMethod);
+ UserGroupInformation.setConfiguration(conf);
+ return UserGroupInformation.getCurrentUser();
+ }
+ }
+
+ /**
+ * Return true if the current login user is already using the given authMethod.
+ *
+ * Used above to ensure we do not create a new Configuration object and as such
+ * lose other settings such as the cluster to which the JVM is connected. Required
+ * for oozie since it does not have a core-site.xml see HIVE-7682
+ */
+ private boolean loginUserHasCurrentAuthMethod(UserGroupInformation ugi, String sAuthMethod) {
+ AuthenticationMethod authMethod;
+ try {
+ // based on SecurityUtil.getAuthenticationMethod()
+ authMethod = Enum.valueOf(AuthenticationMethod.class, sAuthMethod.toUpperCase(Locale.ENGLISH));
+ } catch (IllegalArgumentException iae) {
+ throw new IllegalArgumentException("Invalid attribute value for " +
+ HADOOP_SECURITY_AUTHENTICATION + " of " + sAuthMethod, iae);
+ }
+ LOG.debug("Current authMethod = " + ugi.getAuthenticationMethod());
+ return ugi.getAuthenticationMethod().equals(authMethod);
}
/**
* Read and return Hadoop SASL configuration which can be configured using
* "hadoop.rpc.protection"
- *
* @param conf
* @return Hadoop SASL configuration
*/
+
public Map<String, String> getHadoopSaslProperties(Configuration conf) {
- throw new UnsupportedOperationException(
- "The current version of Hadoop does not support Authentication");
+ // Initialize the SaslRpcServer to ensure QOP parameters are read from conf
+ SaslRpcServer.init(conf);
+ return SaslRpcServer.SASL_PROPS;
}
- public static abstract class Client {
+ public static class Client {
/**
+ * Create a client-side SASL transport that wraps an underlying transport.
*
- * @param principalConfig In the case of Kerberos authentication this will
- * be the kerberos principal name, for DIGEST-MD5 (delegation token) based
- * authentication this will be null
- * @param host The metastore server host name
- * @param methodStr "KERBEROS" or "DIGEST"
- * @param tokenStrForm This is url encoded string form of
- * org.apache.hadoop.security.token.
- * @param underlyingTransport the underlying transport
- * @return the transport
- * @throws IOException
+ * @param method The authentication method to use. Currently only KERBEROS is
+ * supported.
+ * @param serverPrincipal The Kerberos principal of the target server.
+ * @param underlyingTransport The underlying transport mechanism, usually a TSocket.
+ * @param saslProps the sasl properties to create the client with
*/
- public abstract TTransport createClientTransport(
+
+
+ public TTransport createClientTransport(
String principalConfig, String host,
String methodStr, String tokenStrForm, TTransport underlyingTransport,
- Map<String, String> saslProps)
- throws IOException;
+ Map<String, String> saslProps) throws IOException {
+ AuthMethod method = AuthMethod.valueOf(AuthMethod.class, methodStr);
+
+ TTransport saslTransport = null;
+ switch (method) {
+ case DIGEST:
+ Token<DelegationTokenIdentifier> t= new Token<DelegationTokenIdentifier>();
+ t.decodeFromUrlString(tokenStrForm);
+ saslTransport = new TSaslClientTransport(
+ method.getMechanismName(),
+ null,
+ null, SaslRpcServer.SASL_DEFAULT_REALM,
+ saslProps, new SaslClientCallbackHandler(t),
+ underlyingTransport);
+ return new TUGIAssumingTransport(saslTransport, UserGroupInformation.getCurrentUser());
+
+ case KERBEROS:
+ String serverPrincipal = SecurityUtil.getServerPrincipal(principalConfig, host);
+ String names[] = SaslRpcServer.splitKerberosName(serverPrincipal);
+ if (names.length != 3) {
+ throw new IOException(
+ "Kerberos principal name does NOT have the expected hostname part: "
+ + serverPrincipal);
+ }
+ try {
+ saslTransport = new TSaslClientTransport(
+ method.getMechanismName(),
+ null,
+ names[0], names[1],
+ saslProps, null,
+ underlyingTransport);
+ return new TUGIAssumingTransport(saslTransport, UserGroupInformation.getCurrentUser());
+ } catch (SaslException se) {
+ throw new IOException("Could not instantiate SASL transport", se);
+ }
+
+ default:
+ throw new IOException("Unsupported authentication method: " + method);
+ }
+ }
+ private static class SaslClientCallbackHandler implements CallbackHandler {
+ private final String userName;
+ private final char[] userPassword;
+
+ public SaslClientCallbackHandler(Token<? extends TokenIdentifier> token) {
+ this.userName = encodeIdentifier(token.getIdentifier());
+ this.userPassword = encodePassword(token.getPassword());
+ }
+
+
+ @Override
+ public void handle(Callback[] callbacks)
+ throws UnsupportedCallbackException {
+ NameCallback nc = null;
+ PasswordCallback pc = null;
+ RealmCallback rc = null;
+ for (Callback callback : callbacks) {
+ if (callback instanceof RealmChoiceCallback) {
+ continue;
+ } else if (callback instanceof NameCallback) {
+ nc = (NameCallback) callback;
+ } else if (callback instanceof PasswordCallback) {
+ pc = (PasswordCallback) callback;
+ } else if (callback instanceof RealmCallback) {
+ rc = (RealmCallback) callback;
+ } else {
+ throw new UnsupportedCallbackException(callback,
+ "Unrecognized SASL client callback");
+ }
+ }
+ if (nc != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SASL client callback: setting username: " + userName);
+ }
+ nc.setName(userName);
+ }
+ if (pc != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SASL client callback: setting userPassword");
+ }
+ pc.setPassword(userPassword);
+ }
+ if (rc != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SASL client callback: setting realm: "
+ + rc.getDefaultText());
+ }
+ rc.setText(rc.getDefaultText());
+ }
+ }
+
+ static String encodeIdentifier(byte[] identifier) {
+ return new String(Base64.encodeBase64(identifier));
+ }
+
+ static char[] encodePassword(byte[] password) {
+ return new String(Base64.encodeBase64(password)).toCharArray();
+ }
+ }
}
- public static abstract class Server {
+ public static class Server {
public enum ServerMode {
HIVESERVER2, METASTORE
};
- public abstract TTransportFactory createTransportFactory(Map<String, String> saslProps) throws TTransportException;
- public abstract TProcessor wrapProcessor(TProcessor processor);
- public abstract TProcessor wrapNonAssumingProcessor(TProcessor processor);
- public abstract InetAddress getRemoteAddress();
- public abstract void startDelegationTokenSecretManager(Configuration conf,
- Object hmsHandler, ServerMode smode) throws IOException;
- public abstract String getDelegationToken(String owner, String renewer)
- throws IOException, InterruptedException;
- public abstract String getDelegationTokenWithService(String owner, String renewer, String service)
- throws IOException, InterruptedException;
- public abstract String getRemoteUser();
- public abstract long renewDelegationToken(String tokenStrForm) throws IOException;
- public abstract void cancelDelegationToken(String tokenStrForm) throws IOException;
- public abstract String getUserFromToken(String tokenStr) throws IOException;
+ final UserGroupInformation realUgi;
+ DelegationTokenSecretManager secretManager;
+ private final static long DELEGATION_TOKEN_GC_INTERVAL = 3600000; // 1 hour
+ //Delegation token related keys
+ public static final String DELEGATION_KEY_UPDATE_INTERVAL_KEY =
+ "hive.cluster.delegation.key.update-interval";
+ public static final long DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT =
+ 24*60*60*1000; // 1 day
+ public static final String DELEGATION_TOKEN_RENEW_INTERVAL_KEY =
+ "hive.cluster.delegation.token.renew-interval";
+ public static final long DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT =
+ 24*60*60*1000; // 1 day
+ public static final String DELEGATION_TOKEN_MAX_LIFETIME_KEY =
+ "hive.cluster.delegation.token.max-lifetime";
+ public static final long DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT =
+ 7*24*60*60*1000; // 7 days
+ public static final String DELEGATION_TOKEN_STORE_CLS =
+ "hive.cluster.delegation.token.store.class";
+ public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_STR =
+ "hive.cluster.delegation.token.store.zookeeper.connectString";
+ // alternate connect string specification configuration
+ public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE =
+ "hive.zookeeper.quorum";
+
+ public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS =
+ "hive.cluster.delegation.token.store.zookeeper.connectTimeoutMillis";
+ public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE =
+ "hive.cluster.delegation.token.store.zookeeper.znode";
+ public static final String DELEGATION_TOKEN_STORE_ZK_ACL =
+ "hive.cluster.delegation.token.store.zookeeper.acl";
+ public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT =
+ "/hivedelegation";
+
+ public Server() throws TTransportException {
+ try {
+ realUgi = UserGroupInformation.getCurrentUser();
+ } catch (IOException ioe) {
+ throw new TTransportException(ioe);
+ }
+ }
+ /**
+ * Create a server with a kerberos keytab/principal.
+ */
+ protected Server(String keytabFile, String principalConf)
+ throws TTransportException {
+ if (keytabFile == null || keytabFile.isEmpty()) {
+ throw new TTransportException("No keytab specified");
+ }
+ if (principalConf == null || principalConf.isEmpty()) {
+ throw new TTransportException("No principal specified");
+ }
+
+ // Login from the keytab
+ String kerberosName;
+ try {
+ kerberosName =
+ SecurityUtil.getServerPrincipal(principalConf, "0.0.0.0");
+ UserGroupInformation.loginUserFromKeytab(
+ kerberosName, keytabFile);
+ realUgi = UserGroupInformation.getLoginUser();
+ assert realUgi.isFromKeytab();
+ } catch (IOException ioe) {
+ throw new TTransportException(ioe);
+ }
+ }
+
+ /**
+ * Create a TTransportFactory that, upon connection of a client socket,
+ * negotiates a Kerberized SASL transport. The resulting TTransportFactory
+ * can be passed as both the input and output transport factory when
+ * instantiating a TThreadPoolServer, for example.
+ *
+ * @param saslProps Map of SASL properties
+ */
+
+ public TTransportFactory createTransportFactory(Map<String, String> saslProps)
+ throws TTransportException {
+ // Parse out the kerberos principal, host, realm.
+ String kerberosName = realUgi.getUserName();
+ final String names[] = SaslRpcServer.splitKerberosName(kerberosName);
+ if (names.length != 3) {
+ throw new TTransportException("Kerberos principal should have 3 parts: " + kerberosName);
+ }
+
+ TSaslServerTransport.Factory transFactory = new TSaslServerTransport.Factory();
+ transFactory.addServerDefinition(
+ AuthMethod.KERBEROS.getMechanismName(),
+ names[0], names[1], // two parts of kerberos principal
+ saslProps,
+ new SaslRpcServer.SaslGssCallbackHandler());
+ transFactory.addServerDefinition(AuthMethod.DIGEST.getMechanismName(),
+ null, SaslRpcServer.SASL_DEFAULT_REALM,
+ saslProps, new SaslDigestCallbackHandler(secretManager));
+
+ return new TUGIAssumingTransportFactory(transFactory, realUgi);
+ }
+
+ /**
+ * Wrap a TProcessor in such a way that, before processing any RPC, it
+ * assumes the UserGroupInformation of the user authenticated by
+ * the SASL transport.
+ */
+
+ public TProcessor wrapProcessor(TProcessor processor) {
+ return new TUGIAssumingProcessor(processor, secretManager, true);
+ }
+
+ /**
+ * Wrap a TProcessor to capture the client information like connecting userid, ip etc
+ */
+
+ public TProcessor wrapNonAssumingProcessor(TProcessor processor) {
+ return new TUGIAssumingProcessor(processor, secretManager, false);
+ }
+
+ protected DelegationTokenStore getTokenStore(Configuration conf)
+ throws IOException {
+ String tokenStoreClassName = conf.get(DELEGATION_TOKEN_STORE_CLS, "");
+ if (StringUtils.isBlank(tokenStoreClassName)) {
+ return new MemoryTokenStore();
+ }
+ try {
+ Class<? extends DelegationTokenStore> storeClass = Class
+ .forName(tokenStoreClassName).asSubclass(
+ DelegationTokenStore.class);
+ return ReflectionUtils.newInstance(storeClass, conf);
+ } catch (ClassNotFoundException e) {
+ throw new IOException("Error initializing delegation token store: " + tokenStoreClassName,
+ e);
+ }
+ }
+
+
+ public void startDelegationTokenSecretManager(Configuration conf, Object rawStore, ServerMode smode)
+ throws IOException{
+ long secretKeyInterval =
+ conf.getLong(DELEGATION_KEY_UPDATE_INTERVAL_KEY,
+ DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT);
+ long tokenMaxLifetime =
+ conf.getLong(DELEGATION_TOKEN_MAX_LIFETIME_KEY,
+ DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT);
+ long tokenRenewInterval =
+ conf.getLong(DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
+ DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
+
+ DelegationTokenStore dts = getTokenStore(conf);
+ dts.init(rawStore, smode);
+ secretManager = new TokenStoreDelegationTokenSecretManager(secretKeyInterval,
+ tokenMaxLifetime,
+ tokenRenewInterval,
+ DELEGATION_TOKEN_GC_INTERVAL, dts);
+ secretManager.startThreads();
+ }
+
+
+ public String getDelegationToken(final String owner, final String renewer)
+ throws IOException, InterruptedException {
+ if (!authenticationMethod.get().equals(AuthenticationMethod.KERBEROS)) {
+ throw new AuthorizationException(
+ "Delegation Token can be issued only with kerberos authentication. " +
+ "Current AuthenticationMethod: " + authenticationMethod.get()
+ );
+ }
+ //if the user asking the token is same as the 'owner' then don't do
+ //any proxy authorization checks. For cases like oozie, where it gets
+ //a delegation token for another user, we need to make sure oozie is
+ //authorized to get a delegation token.
+ //Do all checks on short names
+ UserGroupInformation currUser = UserGroupInformation.getCurrentUser();
+ UserGroupInformation ownerUgi = UserGroupInformation.createRemoteUser(owner);
+ if (!ownerUgi.getShortUserName().equals(currUser.getShortUserName())) {
+ //in the case of proxy users, the getCurrentUser will return the
+ //real user (for e.g. oozie) due to the doAs that happened just before the
+ //server started executing the method getDelegationToken in the MetaStore
+ ownerUgi = UserGroupInformation.createProxyUser(owner,
+ UserGroupInformation.getCurrentUser());
+ InetAddress remoteAddr = getRemoteAddress();
+ ProxyUsers.authorize(ownerUgi,remoteAddr.getHostAddress(), null);
+ }
+ return ownerUgi.doAs(new PrivilegedExceptionAction<String>() {
+
+ @Override
+ public String run() throws IOException {
+ return secretManager.getDelegationToken(renewer);
+ }
+ });
+ }
+
+
+ public String getDelegationTokenWithService(String owner, String renewer, String service)
+ throws IOException, InterruptedException {
+ String token = getDelegationToken(owner, renewer);
+ return Utils.addServiceToToken(token, service);
+ }
+
+
+ public long renewDelegationToken(String tokenStrForm) throws IOException {
+ if (!authenticationMethod.get().equals(AuthenticationMethod.KERBEROS)) {
+ throw new AuthorizationException(
+ "Delegation Token can be issued only with kerberos authentication. " +
+ "Current AuthenticationMethod: " + authenticationMethod.get()
+ );
+ }
+ return secretManager.renewDelegationToken(tokenStrForm);
+ }
+
+
+ public String getUserFromToken(String tokenStr) throws IOException {
+ return secretManager.getUserFromToken(tokenStr);
+ }
+
+
+ public void cancelDelegationToken(String tokenStrForm) throws IOException {
+ secretManager.cancelDelegationToken(tokenStrForm);
+ }
+
+ final static ThreadLocal<InetAddress> remoteAddress =
+ new ThreadLocal<InetAddress>() {
+
+ @Override
+ protected synchronized InetAddress initialValue() {
+ return null;
+ }
+ };
+
+
+ public InetAddress getRemoteAddress() {
+ return remoteAddress.get();
+ }
+
+ final static ThreadLocal<AuthenticationMethod> authenticationMethod =
+ new ThreadLocal<AuthenticationMethod>() {
+
+ @Override
+ protected synchronized AuthenticationMethod initialValue() {
+ return AuthenticationMethod.TOKEN;
+ }
+ };
+
+ private static ThreadLocal<String> remoteUser = new ThreadLocal<String> () {
+
+ @Override
+ protected synchronized String initialValue() {
+ return null;
+ }
+ };
+
+
+ public String getRemoteUser() {
+ return remoteUser.get();
+ }
+
+ /** CallbackHandler for SASL DIGEST-MD5 mechanism */
+ // This code is pretty much completely based on Hadoop's
+ // SaslRpcServer.SaslDigestCallbackHandler - the only reason we could not
+ // use that Hadoop class as-is was because it needs a Server.Connection object
+ // which is relevant in hadoop rpc but not here in the metastore - so the
+ // code below does not deal with the Connection Server.object.
+ static class SaslDigestCallbackHandler implements CallbackHandler {
+ private final DelegationTokenSecretManager secretManager;
+
+ public SaslDigestCallbackHandler(
+ DelegationTokenSecretManager secretManager) {
+ this.secretManager = secretManager;
+ }
+
+ private char[] getPassword(DelegationTokenIdentifier tokenid) throws InvalidToken {
+ return encodePassword(secretManager.retrievePassword(tokenid));
+ }
+
+ private char[] encodePassword(byte[] password) {
+ return new String(Base64.encodeBase64(password)).toCharArray();
+ }
+ /** {@inheritDoc} */
+
+ @Override
+ public void handle(Callback[] callbacks) throws InvalidToken,
+ UnsupportedCallbackException {
+ NameCallback nc = null;
+ PasswordCallback pc = null;
+ AuthorizeCallback ac = null;
+ for (Callback callback : callbacks) {
+ if (callback instanceof AuthorizeCallback) {
+ ac = (AuthorizeCallback) callback;
+ } else if (callback instanceof NameCallback) {
+ nc = (NameCallback) callback;
+ } else if (callback instanceof PasswordCallback) {
+ pc = (PasswordCallback) callback;
+ } else if (callback instanceof RealmCallback) {
+ continue; // realm is ignored
+ } else {
+ throw new UnsupportedCallbackException(callback,
+ "Unrecognized SASL DIGEST-MD5 Callback");
+ }
+ }
+ if (pc != null) {
+ DelegationTokenIdentifier tokenIdentifier = SaslRpcServer.
+ getIdentifier(nc.getDefaultName(), secretManager);
+ char[] password = getPassword(tokenIdentifier);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SASL server DIGEST-MD5 callback: setting password "
+ + "for client: " + tokenIdentifier.getUser());
+ }
+ pc.setPassword(password);
+ }
+ if (ac != null) {
+ String authid = ac.getAuthenticationID();
+ String authzid = ac.getAuthorizationID();
+ if (authid.equals(authzid)) {
+ ac.setAuthorized(true);
+ } else {
+ ac.setAuthorized(false);
+ }
+ if (ac.isAuthorized()) {
+ if (LOG.isDebugEnabled()) {
+ String username =
+ SaslRpcServer.getIdentifier(authzid, secretManager).getUser().getUserName();
+ LOG.debug("SASL server DIGEST-MD5 callback: setting "
+ + "canonicalized client ID: " + username);
+ }
+ ac.setAuthorizedID(authzid);
+ }
+ }
+ }
+ }
+
+ /**
+ * Processor that pulls the SaslServer object out of the transport, and
+ * assumes the remote user's UGI before calling through to the original
+ * processor.
+ *
+ * This is used on the server side to set the UGI for each specific call.
+ */
+ protected class TUGIAssumingProcessor implements TProcessor {
+ final TProcessor wrapped;
+ DelegationTokenSecretManager secretManager;
+ boolean useProxy;
+ TUGIAssumingProcessor(TProcessor wrapped, DelegationTokenSecretManager secretManager,
+ boolean useProxy) {
+ this.wrapped = wrapped;
+ this.secretManager = secretManager;
+ this.useProxy = useProxy;
+ }
+
+
+ @Override
+ public boolean process(final TProtocol inProt, final TProtocol outProt) throws TException {
+ TTransport trans = inProt.getTransport();
+ if (!(trans instanceof TSaslServerTransport)) {
+ throw new TException("Unexpected non-SASL transport " + trans.getClass());
+ }
+ TSaslServerTransport saslTrans = (TSaslServerTransport)trans;
+ SaslServer saslServer = saslTrans.getSaslServer();
+ String authId = saslServer.getAuthorizationID();
+ authenticationMethod.set(AuthenticationMethod.KERBEROS);
+ LOG.debug("AUTH ID ======>" + authId);
+ String endUser = authId;
+
+ if(saslServer.getMechanismName().equals("DIGEST-MD5")) {
+ try {
+ TokenIdentifier tokenId = SaslRpcServer.getIdentifier(authId,
+ secretManager);
+ endUser = tokenId.getUser().getUserName();
+ authenticationMethod.set(AuthenticationMethod.TOKEN);
+ } catch (InvalidToken e) {
+ throw new TException(e.getMessage());
+ }
+ }
+ Socket socket = ((TSocket)(saslTrans.getUnderlyingTransport())).getSocket();
+ remoteAddress.set(socket.getInetAddress());
+ UserGroupInformation clientUgi = null;
+ try {
+ if (useProxy) {
+ clientUgi = UserGroupInformation.createProxyUser(
+ endUser, UserGroupInformation.getLoginUser());
+ remoteUser.set(clientUgi.getShortUserName());
+ LOG.debug("Set remoteUser :" + remoteUser.get());
+ return clientUgi.doAs(new PrivilegedExceptionAction<Boolean>() {
+
+ @Override
+ public Boolean run() {
+ try {
+ return wrapped.process(inProt, outProt);
+ } catch (TException te) {
+ throw new RuntimeException(te);
+ }
+ }
+ });
+ } else {
+ // use the short user name for the request
+ UserGroupInformation endUserUgi = UserGroupInformation.createRemoteUser(endUser);
+ remoteUser.set(endUserUgi.getShortUserName());
+ LOG.debug("Set remoteUser :" + remoteUser.get() + ", from endUser :" + endUser);
+ return wrapped.process(inProt, outProt);
+ }
+ } catch (RuntimeException rte) {
+ if (rte.getCause() instanceof TException) {
+ throw (TException)rte.getCause();
+ }
+ throw rte;
+ } catch (InterruptedException ie) {
+ throw new RuntimeException(ie); // unexpected!
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe); // unexpected!
+ }
+ finally {
+ if (clientUgi != null) {
+ try { FileSystem.closeAllForUGI(clientUgi); }
+ catch(IOException exception) {
+ LOG.error("Could not clean up file-system handles for UGI: " + clientUgi, exception);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * A TransportFactory that wraps another one, but assumes a specified UGI
+ * before calling through.
+ *
+ * This is used on the server side to assume the server's Principal when accepting
+ * clients.
+ */
+ static class TUGIAssumingTransportFactory extends TTransportFactory {
+ private final UserGroupInformation ugi;
+ private final TTransportFactory wrapped;
+
+ public TUGIAssumingTransportFactory(TTransportFactory wrapped, UserGroupInformation ugi) {
+ assert wrapped != null;
+ assert ugi != null;
+ this.wrapped = wrapped;
+ this.ugi = ugi;
+ }
+
+
+ @Override
+ public TTransport getTransport(final TTransport trans) {
+ return ugi.doAs(new PrivilegedAction<TTransport>() {
+ @Override
+ public TTransport run() {
+ return wrapped.getTransport(trans);
+ }
+ });
+ }
+ }
}
}
-
Added: hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java?rev=1641980&view=auto
==============================================================================
--- hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java (added)
+++ hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java Thu Nov 27 01:07:32 2014
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.thrift;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Default in-memory token store implementation.
+ */
+public class MemoryTokenStore implements DelegationTokenStore {
+ private static final Logger LOG = LoggerFactory.getLogger(MemoryTokenStore.class);
+
+ private final Map<Integer, String> masterKeys
+ = new ConcurrentHashMap<Integer, String>();
+
+ private final ConcurrentHashMap<DelegationTokenIdentifier, DelegationTokenInformation> tokens
+ = new ConcurrentHashMap<DelegationTokenIdentifier, DelegationTokenInformation>();
+
+ private final AtomicInteger masterKeySeq = new AtomicInteger();
+ private Configuration conf;
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return this.conf;
+ }
+
+ @Override
+ public int addMasterKey(String s) {
+ int keySeq = masterKeySeq.getAndIncrement();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("addMasterKey: s = " + s + ", keySeq = " + keySeq);
+ }
+ masterKeys.put(keySeq, s);
+ return keySeq;
+ }
+
+ @Override
+ public void updateMasterKey(int keySeq, String s) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("updateMasterKey: s = " + s + ", keySeq = " + keySeq);
+ }
+ masterKeys.put(keySeq, s);
+ }
+
+ @Override
+ public boolean removeMasterKey(int keySeq) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("removeMasterKey: keySeq = " + keySeq);
+ }
+ return masterKeys.remove(keySeq) != null;
+ }
+
+ @Override
+ public String[] getMasterKeys() {
+ return masterKeys.values().toArray(new String[0]);
+ }
+
+ @Override
+ public boolean addToken(DelegationTokenIdentifier tokenIdentifier,
+ DelegationTokenInformation token) {
+ DelegationTokenInformation tokenInfo = tokens.putIfAbsent(tokenIdentifier, token);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("addToken: tokenIdentifier = " + tokenIdentifier + ", addded = " + (tokenInfo == null));
+ }
+ return (tokenInfo == null);
+ }
+
+ @Override
+ public boolean removeToken(DelegationTokenIdentifier tokenIdentifier) {
+ DelegationTokenInformation tokenInfo = tokens.remove(tokenIdentifier);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("removeToken: tokenIdentifier = " + tokenIdentifier + ", removed = " + (tokenInfo != null));
+ }
+ return tokenInfo != null;
+ }
+
+ @Override
+ public DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier) {
+ DelegationTokenInformation result = tokens.get(tokenIdentifier);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("getToken: tokenIdentifier = " + tokenIdentifier + ", result = " + result);
+ }
+ return result;
+ }
+
+ @Override
+ public List<DelegationTokenIdentifier> getAllDelegationTokenIdentifiers() {
+ List<DelegationTokenIdentifier> result = new ArrayList<DelegationTokenIdentifier>(
+ tokens.size());
+ for (DelegationTokenIdentifier id : tokens.keySet()) {
+ result.add(id);
+ }
+ return result;
+ }
+
+ @Override
+ public void close() throws IOException {
+ //no-op
+ }
+
+ @Override
+ public void init(Object hmsHandler, ServerMode smode) throws TokenStoreException {
+ // no-op
+ }
+}