You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2014/11/27 02:07:35 UTC

svn commit: r1641980 [3/4] - in /hive/trunk: ./ beeline/src/test/org/apache/hive/beeline/ common/src/java/org/apache/hadoop/hive/common/ common/src/java/org/apache/hadoop/hive/conf/ hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/ hcatalog/...

Modified: hive/trunk/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java?rev=1641980&r1=1641979&r2=1641980&view=diff
==============================================================================
--- hive/trunk/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java (original)
+++ hive/trunk/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java Thu Nov 27 01:07:32 2014
@@ -33,6 +33,7 @@ import org.apache.curator.framework.imps
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.hive.shims.Utils;
 import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
@@ -134,7 +135,7 @@ public class ZooKeeperTokenStore impleme
     default:
       throw new AssertionError("Unexpected server mode " + serverMode);
     }
-    ShimLoader.getHadoopShims().setZookeeperClientKerberosJaasConfig(principal, keytab);
+    Utils.setZookeeperClientKerberosJaasConfig(principal, keytab);
   }
 
   private String getNonEmptyConfVar(Configuration conf, String param) throws IOException {
@@ -431,32 +432,32 @@ public class ZooKeeperTokenStore impleme
   public void init(Object objectStore, ServerMode smode) {
     this.serverMode = smode;
     zkConnectString =
-        conf.get(HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, null);
+        conf.get(HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, null);
     if (zkConnectString == null || zkConnectString.trim().isEmpty()) {
       // try alternate config param
       zkConnectString =
           conf.get(
-              HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE,
+              HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE,
               null);
       if (zkConnectString == null || zkConnectString.trim().isEmpty()) {
         throw new IllegalArgumentException("Zookeeper connect string has to be specifed through "
-            + "either " + HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR
+            + "either " + HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR
             + " or "
-            + HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE
+            + HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE
             + WHEN_ZK_DSTORE_MSG);
       }
     }
     connectTimeoutMillis =
         conf.getInt(
-            HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS,
+            HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS,
             CuratorFrameworkFactory.builder().getConnectionTimeoutMs());
-    String aclStr = conf.get(HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ACL, null);
+    String aclStr = conf.get(HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_ACL, null);
     if (StringUtils.isNotBlank(aclStr)) {
       this.newNodeAcl = parseACLs(aclStr);
     }
     rootNode =
-        conf.get(HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE,
-            HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT) + serverMode;
+        conf.get(HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE,
+            HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT) + serverMode;
 
     try {
       // Install the JAAS Configuration for the runtime

Modified: hive/trunk/shims/common/pom.xml
URL: http://svn.apache.org/viewvc/hive/trunk/shims/common/pom.xml?rev=1641980&r1=1641979&r2=1641980&view=diff
==============================================================================
--- hive/trunk/shims/common/pom.xml (original)
+++ hive/trunk/shims/common/pom.xml Thu Nov 27 01:07:32 2014
@@ -53,7 +53,7 @@
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-core</artifactId>
-      <version>${hadoop-20.version}</version>
+      <version>${hadoop-20S.version}</version>
       <optional>true</optional>
     </dependency>
     <dependency>
@@ -66,5 +66,16 @@
       <artifactId>libthrift</artifactId>
       <version>${libthrift.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
+      <version>${zookeeper.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-core</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
   </dependencies>
 </project>

Modified: hive/trunk/shims/common/src/main/java/org/apache/hadoop/fs/DefaultFileAccess.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/common/src/main/java/org/apache/hadoop/fs/DefaultFileAccess.java?rev=1641980&r1=1641979&r2=1641980&view=diff
==============================================================================
--- hive/trunk/shims/common/src/main/java/org/apache/hadoop/fs/DefaultFileAccess.java (original)
+++ hive/trunk/shims/common/src/main/java/org/apache/hadoop/fs/DefaultFileAccess.java Thu Nov 27 01:07:32 2014
@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.hive.shims.Utils;
 import org.apache.hadoop.security.UserGroupInformation;
 
 /**
@@ -51,9 +52,9 @@ public class DefaultFileAccess {
   public static void checkFileAccess(FileSystem fs, FileStatus stat, FsAction action)
       throws IOException, AccessControlException, LoginException {
     // Get the user/groups for checking permissions based on the current UGI.
-    UserGroupInformation currentUgi = ShimLoader.getHadoopShims().getUGIForConf(fs.getConf());
+    UserGroupInformation currentUgi = Utils.getUGIForConf(fs.getConf());
     DefaultFileAccess.checkFileAccess(fs, stat, action,
-        ShimLoader.getHadoopShims().getShortUserName(currentUgi),
+        currentUgi.getShortUserName(),
         Arrays.asList(currentUgi.getGroupNames()));
   }
 

Modified: hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java?rev=1641980&r1=1641979&r2=1641980&view=diff
==============================================================================
--- hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java (original)
+++ hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java Thu Nov 27 01:07:32 2014
@@ -23,19 +23,13 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.MalformedURLException;
 import java.net.URI;
-import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
 import java.security.AccessControlException;
-import java.security.PrivilegedExceptionAction;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
-import javax.security.auth.login.LoginException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -54,6 +48,7 @@ import org.apache.hadoop.mapred.JobProfi
 import org.apache.hadoop.mapred.JobStatus;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.CombineFileSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobID;
@@ -74,8 +69,6 @@ import org.apache.hadoop.util.Progressab
  */
 public interface HadoopShims {
 
-  static final Log LOG = LogFactory.getLog(HadoopShims.class);
-
   /**
    * Constructs and Returns TaskAttempt Log Url
    * or null if the TaskLogServlet is not available
@@ -125,148 +118,6 @@ public interface HadoopShims {
 
   CombineFileInputFormatShim getCombineFileInputFormat();
 
-  String getInputFormatClassName();
-
-  int createHadoopArchive(Configuration conf, Path parentDir, Path destDir,
-      String archiveName) throws Exception;
-
-  public URI getHarUri(URI original, URI base, URI originalBase)
-      throws URISyntaxException;
-  /**
-   * Hive uses side effect files exclusively for it's output. It also manages
-   * the setup/cleanup/commit of output from the hive client. As a result it does
-   * not need support for the same inside the MR framework
-   *
-   * This routine sets the appropriate options related to bypass setup/cleanup/commit
-   * support in the MR framework, but does not set the OutputFormat class.
-   */
-  void prepareJobOutput(JobConf conf);
-
-  /**
-   * Used by TaskLogProcessor to Remove HTML quoting from a string
-   * @param item the string to unquote
-   * @return the unquoted string
-   *
-   */
-  public String unquoteHtmlChars(String item);
-
-
-
-  public void closeAllForUGI(UserGroupInformation ugi);
-
-  /**
-   * Get the UGI that the given job configuration will run as.
-   *
-   * In secure versions of Hadoop, this simply returns the current
-   * access control context's user, ignoring the configuration.
-   */
-  public UserGroupInformation getUGIForConf(Configuration conf) throws LoginException, IOException;
-
-  /**
-   * Used by metastore server to perform requested rpc in client context.
-   * @param <T>
-   * @param ugi
-   * @param pvea
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  public <T> T doAs(UserGroupInformation ugi, PrivilegedExceptionAction<T> pvea) throws
-  IOException, InterruptedException;
-
-  /**
-   * Once a delegation token is stored in a file, the location is specified
-   * for a child process that runs hadoop operations, using an environment
-   * variable .
-   * @return Return the name of environment variable used by hadoop to find
-   *  location of token file
-   */
-  public String getTokenFileLocEnvName();
-
-
-  /**
-   * Get delegation token from filesystem and write the token along with
-   * metastore tokens into a file
-   * @param conf
-   * @return Path of the file with token credential
-   * @throws IOException
-   */
-  public Path createDelegationTokenFile(final Configuration conf) throws IOException;
-
-
-  /**
-   * Used to creates UGI object for a remote user.
-   * @param userName remote User Name
-   * @param groupNames group names associated with remote user name
-   * @return UGI created for the remote user.
-   */
-  public UserGroupInformation createRemoteUser(String userName, List<String> groupNames);
-
-  /**
-   * Get the short name corresponding to the subject in the passed UGI
-   *
-   * In secure versions of Hadoop, this returns the short name (after
-   * undergoing the translation in the kerberos name rule mapping).
-   * In unsecure versions of Hadoop, this returns the name of the subject
-   */
-  public String getShortUserName(UserGroupInformation ugi);
-
-  /**
-   * Return true if the Shim is based on Hadoop Security APIs.
-   */
-  public boolean isSecureShimImpl();
-
-  /**
-   * Return true if the hadoop configuration has security enabled
-   * @return
-   */
-  public boolean isSecurityEnabled();
-
-  /**
-   * Get the string form of the token given a token signature.
-   * The signature is used as the value of the "service" field in the token for lookup.
-   * Ref: AbstractDelegationTokenSelector in Hadoop. If there exists such a token
-   * in the token cache (credential store) of the job, the lookup returns that.
-   * This is relevant only when running against a "secure" hadoop release
-   * The method gets hold of the tokens if they are set up by hadoop - this should
-   * happen on the map/reduce tasks if the client added the tokens into hadoop's
-   * credential store in the front end during job submission. The method will
-   * select the hive delegation token among the set of tokens and return the string
-   * form of it
-   * @param tokenSignature
-   * @return the string form of the token found
-   * @throws IOException
-   */
-  public String getTokenStrForm(String tokenSignature) throws IOException;
-
-  /**
-   * Dynamically sets up the JAAS configuration that uses kerberos
-   * @param principal
-   * @param keyTabFile
-   * @throws IOException
-   */
-  public void setZookeeperClientKerberosJaasConfig(String principal, String keyTabFile)
-      throws IOException;
-
-  /**
-   * Add a delegation token to the given ugi
-   * @param ugi
-   * @param tokenStr
-   * @param tokenService
-   * @throws IOException
-   */
-  public void setTokenStr(UserGroupInformation ugi, String tokenStr, String tokenService)
-      throws IOException;
-
-  /**
-   * Add given service to the string format token
-   * @param tokenStr
-   * @param tokenService
-   * @return
-   * @throws IOException
-   */
-  public String addServiceToToken(String tokenStr, String tokenService)
-      throws IOException;
-
   enum JobTrackerState { INITIALIZING, RUNNING };
 
   /**
@@ -315,43 +166,6 @@ public interface HadoopShims {
    */
   public String getJobLauncherHttpAddress(Configuration conf);
 
-
-  /**
-   *  Perform kerberos login using the given principal and keytab
-   * @throws IOException
-   */
-  public void loginUserFromKeytab(String principal, String keytabFile) throws IOException;
-
-  /**
-   *  Perform kerberos login using the given principal and keytab,
-   *  and return the UGI object
-   * @throws IOException
-   */
-  public UserGroupInformation loginUserFromKeytabAndReturnUGI(String principal,
-      String keytabFile) throws IOException;
-
-  /**
-   * Convert Kerberos principal name pattern to valid Kerberos principal names.
-   * @param principal (principal name pattern)
-   * @return
-   * @throws IOException
-   */
-  public String getResolvedPrincipal(String principal) throws IOException;
-
-  /**
-   * Perform kerberos re-login using the given principal and keytab, to renew
-   * the credentials
-   * @throws IOException
-   */
-  public void reLoginUserFromKeytab() throws IOException;
-
-  /***
-   * Check if the current UGI is keytab based
-   * @return
-   * @throws IOException
-   */
-  public boolean isLoginKeytabBased() throws IOException;
-
   /**
    * Move the directory/file to trash. In case of the symlinks or mount points, the file is
    * moved to the trashbin in the actual volume of the path p being deleted
@@ -392,20 +206,6 @@ public interface HadoopShims {
       throws IOException;
 
   /**
-   * Create the proxy ugi for the given userid
-   * @param userName
-   * @return
-   */
-  public UserGroupInformation createProxyUser(String userName) throws IOException;
-
-  /**
-   * Verify proxy access to given UGI for given user
-   * @param ugi
-   */
-  public void authorizeProxyAccess(String proxyUser, UserGroupInformation realUserUgi,
-      String ipAddress, Configuration conf) throws IOException;
-
-  /**
    * The method sets to set the partition file has a different signature between
    * hadoop versions.
    * @param jobConf
@@ -416,53 +216,6 @@ public interface HadoopShims {
   Comparator<LongWritable> getLongComparator();
 
   /**
-   * InputSplitShim.
-   *
-   */
-  public interface InputSplitShim extends InputSplit {
-    JobConf getJob();
-
-    @Override
-    long getLength();
-
-    /** Returns an array containing the startoffsets of the files in the split. */
-    long[] getStartOffsets();
-
-    /** Returns an array containing the lengths of the files in the split. */
-    long[] getLengths();
-
-    /** Returns the start offset of the i<sup>th</sup> Path. */
-    long getOffset(int i);
-
-    /** Returns the length of the i<sup>th</sup> Path. */
-    long getLength(int i);
-
-    /** Returns the number of Paths in the split. */
-    int getNumPaths();
-
-    /** Returns the i<sup>th</sup> Path. */
-    Path getPath(int i);
-
-    /** Returns all the Paths in the split. */
-    Path[] getPaths();
-
-    /** Returns all the Paths where this input-split resides. */
-    @Override
-    String[] getLocations() throws IOException;
-
-    void shrinkSplit(long length);
-
-    @Override
-    String toString();
-
-    @Override
-    void readFields(DataInput in) throws IOException;
-
-    @Override
-    void write(DataOutput out) throws IOException;
-  }
-
-  /**
    * CombineFileInputFormatShim.
    *
    * @param <K>
@@ -473,11 +226,11 @@ public interface HadoopShims {
 
     void createPool(JobConf conf, PathFilter... filters);
 
-    InputSplitShim[] getSplits(JobConf job, int numSplits) throws IOException;
+    CombineFileSplit[] getSplits(JobConf job, int numSplits) throws IOException;
 
-    InputSplitShim getInputSplitShim() throws IOException;
+    CombineFileSplit getInputSplitShim() throws IOException;
 
-    RecordReader getRecordReader(JobConf job, InputSplitShim split, Reporter reporter,
+    RecordReader getRecordReader(JobConf job, CombineFileSplit split, Reporter reporter,
         Class<RecordReader<K, V>> rrClass) throws IOException;
   }
 

Modified: hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java?rev=1641980&r1=1641979&r2=1641980&view=diff
==============================================================================
--- hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java (original)
+++ hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java Thu Nov 27 01:07:32 2014
@@ -20,7 +20,6 @@ package org.apache.hadoop.hive.shims;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge;
 import org.apache.hadoop.util.VersionInfo;
 import org.apache.log4j.AppenderSkeleton;
@@ -43,7 +42,6 @@ public abstract class ShimLoader {
       new HashMap<String, String>();
 
   static {
-    HADOOP_SHIM_CLASSES.put("0.20", "org.apache.hadoop.hive.shims.Hadoop20Shims");
     HADOOP_SHIM_CLASSES.put("0.20S", "org.apache.hadoop.hive.shims.Hadoop20SShims");
     HADOOP_SHIM_CLASSES.put("0.23", "org.apache.hadoop.hive.shims.Hadoop23Shims");
   }
@@ -56,7 +54,6 @@ public abstract class ShimLoader {
       new HashMap<String, String>();
 
   static {
-    JETTY_SHIM_CLASSES.put("0.20", "org.apache.hadoop.hive.shims.Jetty20Shims");
     JETTY_SHIM_CLASSES.put("0.20S", "org.apache.hadoop.hive.shims.Jetty20SShims");
     JETTY_SHIM_CLASSES.put("0.23", "org.apache.hadoop.hive.shims.Jetty23Shims");
   }
@@ -68,7 +65,6 @@ public abstract class ShimLoader {
       new HashMap<String, String>();
 
   static {
-    EVENT_COUNTER_SHIM_CLASSES.put("0.20", "org.apache.hadoop.metrics.jvm.EventCounter");
     EVENT_COUNTER_SHIM_CLASSES.put("0.20S", "org.apache.hadoop.log.metrics.EventCounter");
     EVENT_COUNTER_SHIM_CLASSES.put("0.23", "org.apache.hadoop.log.metrics.EventCounter");
   }
@@ -80,10 +76,8 @@ public abstract class ShimLoader {
       new HashMap<String, String>();
 
   static {
-    HADOOP_THRIFT_AUTH_BRIDGE_CLASSES.put("0.20",
-        "org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge");
     HADOOP_THRIFT_AUTH_BRIDGE_CLASSES.put("0.20S",
-        "org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge20S");
+        "org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge");
     HADOOP_THRIFT_AUTH_BRIDGE_CLASSES.put("0.23",
         "org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge23");
   }
@@ -153,9 +147,7 @@ public abstract class ShimLoader {
 
   /**
    * Return the "major" version of Hadoop currently on the classpath.
-   * For releases in the 0.x series this is simply the first two
-   * components of the version, e.g. "0.20" or "0.23". Releases in
-   * the 1.x and 2.x series are mapped to the appropriate
+   * Releases in the 1.x and 2.x series are mapped to the appropriate
    * 0.x release series, e.g. 1.x is mapped to "0.20S" and 2.x
    * is mapped to "0.23".
    */
@@ -168,10 +160,7 @@ public abstract class ShimLoader {
           " (expected A.B.* format)");
     }
 
-    // Special handling for Hadoop 1.x and 2.x
     switch (Integer.parseInt(parts[0])) {
-    case 0:
-      break;
     case 1:
       return "0.20S";
     case 2:
@@ -179,19 +168,6 @@ public abstract class ShimLoader {
     default:
       throw new IllegalArgumentException("Unrecognized Hadoop major version number: " + vers);
     }
-
-    String majorVersion = parts[0] + "." + parts[1];
-
-    // If we are running a security release, we won't have UnixUserGroupInformation
-    // (removed by HADOOP-6299 when switching to JAAS for Login)
-    try {
-      Class.forName("org.apache.hadoop.security.UnixUserGroupInformation");
-    } catch (ClassNotFoundException cnf) {
-      if ("0.20".equals(majorVersion)) {
-        majorVersion += "S";
-      }
-    }
-    return majorVersion;
   }
 
   private ShimLoader() {

Added: hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/Utils.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/Utils.java?rev=1641980&view=auto
==============================================================================
--- hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/Utils.java (added)
+++ hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/Utils.java Thu Nov 27 01:07:32 2014
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.shims;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.LoginException;
+import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.util.KerberosUtil;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenSelector;
+import org.apache.hadoop.security.token.delegation.DelegationTokenSelector;
+import org.apache.zookeeper.client.ZooKeeperSaslClient;
+
+public class Utils {
+
+  public static UserGroupInformation getUGIForConf(Configuration conf) throws LoginException, IOException {
+    String doAs = System.getenv("HADOOP_USER_NAME");
+    if(doAs != null && doAs.length() > 0) {
+     /*
+      * this allows doAs (proxy user) to be passed along across process boundary where
+      * delegation tokens are not supported.  For example, a DDL stmt via WebHCat with
+      * a doAs parameter, forks to 'hcat' which needs to start a Session that
+      * proxies the end user
+      */
+      return UserGroupInformation.createProxyUser(doAs, UserGroupInformation.getLoginUser());
+    }
+    return UserGroupInformation.getCurrentUser();
+  }
+
+  /**
+   * Get the string form of the token given a token signature.
+   * The signature is used as the value of the "service" field in the token for lookup.
+   * Ref: AbstractDelegationTokenSelector in Hadoop. If there exists such a token
+   * in the token cache (credential store) of the job, the lookup returns that.
+   * This is relevant only when running against a "secure" hadoop release
+   * The method gets hold of the tokens if they are set up by hadoop - this should
+   * happen on the map/reduce tasks if the client added the tokens into hadoop's
+   * credential store in the front end during job submission. The method will
+   * select the hive delegation token among the set of tokens and return the string
+   * form of it
+   * @param tokenSignature
+   * @return the string form of the token found
+   * @throws IOException
+   */
+  public static String getTokenStrForm(String tokenSignature) throws IOException {
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    TokenSelector<? extends TokenIdentifier> tokenSelector = new DelegationTokenSelector();
+
+    Token<? extends TokenIdentifier> token = tokenSelector.selectToken(
+        tokenSignature == null ? new Text() : new Text(tokenSignature), ugi.getTokens());
+    return token != null ? token.encodeToUrlString() : null;
+  }
+
+  /**
+   * Create a delegation token object for the given token string and service.
+   * Add the token to given UGI
+   * @param ugi
+   * @param tokenStr
+   * @param tokenService
+   * @throws IOException
+   */
+  public static void setTokenStr(UserGroupInformation ugi, String tokenStr, String tokenService)
+      throws IOException {
+    Token<DelegationTokenIdentifier> delegationToken = createToken(tokenStr, tokenService);
+    ugi.addToken(delegationToken);
+  }
+
+  /**
+   * Add a given service to delegation token string.
+   * @param tokenStr
+   * @param tokenService
+   * @return
+   * @throws IOException
+   */
+  public static String addServiceToToken(String tokenStr, String tokenService)
+      throws IOException {
+    Token<DelegationTokenIdentifier> delegationToken = createToken(tokenStr, tokenService);
+    return delegationToken.encodeToUrlString();
+  }
+
+  /**
+   * Create a new token using the given string and service
+   * @param tokenStr
+   * @param tokenService
+   * @return
+   * @throws IOException
+   */
+  private static Token<DelegationTokenIdentifier> createToken(String tokenStr, String tokenService)
+      throws IOException {
+    Token<DelegationTokenIdentifier> delegationToken = new Token<DelegationTokenIdentifier>();
+    delegationToken.decodeFromUrlString(tokenStr);
+    delegationToken.setService(new Text(tokenService));
+    return delegationToken;
+  }
+
+  /**
+   * Dynamically sets up the JAAS configuration that uses kerberos
+   * @param principal
+   * @param keyTabFile
+   * @throws IOException
+   */
+  public static void setZookeeperClientKerberosJaasConfig(String principal, String keyTabFile) throws IOException {
+    // ZooKeeper property name to pick the correct JAAS conf section
+    final String SASL_LOGIN_CONTEXT_NAME = "HiveZooKeeperClient";
+    System.setProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, SASL_LOGIN_CONTEXT_NAME);
+
+    principal = SecurityUtil.getServerPrincipal(principal, "0.0.0.0");
+    JaasConfiguration jaasConf = new JaasConfiguration(SASL_LOGIN_CONTEXT_NAME, principal, keyTabFile);
+
+    // Install the Configuration in the runtime.
+    javax.security.auth.login.Configuration.setConfiguration(jaasConf);
+  }
+
+  /**
+   * A JAAS configuration for ZooKeeper clients intended to use for SASL
+   * Kerberos.
+   */
+  private static class JaasConfiguration extends javax.security.auth.login.Configuration {
+    // Current installed Configuration
+    private final javax.security.auth.login.Configuration baseConfig = javax.security.auth.login.Configuration
+        .getConfiguration();
+    private final String loginContextName;
+    private final String principal;
+    private final String keyTabFile;
+
+    public JaasConfiguration(String hiveLoginContextName, String principal, String keyTabFile) {
+      this.loginContextName = hiveLoginContextName;
+      this.principal = principal;
+      this.keyTabFile = keyTabFile;
+    }
+
+    @Override
+    public AppConfigurationEntry[] getAppConfigurationEntry(String appName) {
+      if (loginContextName.equals(appName)) {
+        Map<String, String> krbOptions = new HashMap<String, String>();
+        krbOptions.put("doNotPrompt", "true");
+        krbOptions.put("storeKey", "true");
+        krbOptions.put("useKeyTab", "true");
+        krbOptions.put("principal", principal);
+        krbOptions.put("keyTab", keyTabFile);
+        krbOptions.put("refreshKrb5Config", "true");
+        AppConfigurationEntry hiveZooKeeperClientEntry = new AppConfigurationEntry(
+            KerberosUtil.getKrb5LoginModuleName(), LoginModuleControlFlag.REQUIRED, krbOptions);
+        return new AppConfigurationEntry[] { hiveZooKeeperClientEntry };
+      }
+      // Try the base config
+      if (baseConfig != null) {
+        return baseConfig.getAppConfigurationEntry(appName);
+      }
+      return null;
+    }
+  }
+
+
+}

Added: hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenIdentifier.java?rev=1641980&view=auto
==============================================================================
--- hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenIdentifier.java (added)
+++ hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenIdentifier.java Thu Nov 27 01:07:32 2014
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.thrift;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
+
+/**
+ * A delegation token identifier that is specific to Hive.
+ */
+public class DelegationTokenIdentifier
+    extends AbstractDelegationTokenIdentifier {
+  public static final Text HIVE_DELEGATION_KIND = new Text("HIVE_DELEGATION_TOKEN");
+
+  /**
+   * Create an empty delegation token identifier for reading into.
+   */
+  public DelegationTokenIdentifier() {
+  }
+
+  /**
+   * Create a new delegation token identifier
+   * @param owner the effective username of the token owner
+   * @param renewer the username of the renewer
+   * @param realUser the real username of the token owner
+   */
+  public DelegationTokenIdentifier(Text owner, Text renewer, Text realUser) {
+    super(owner, renewer, realUser);
+  }
+
+  @Override
+  public Text getKind() {
+    return HIVE_DELEGATION_KIND;
+  }
+
+}

Added: hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenSecretManager.java?rev=1641980&view=auto
==============================================================================
--- hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenSecretManager.java (added)
+++ hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenSecretManager.java Thu Nov 27 01:07:32 2014
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.thrift;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
+
+/**
+ * A Hive specific delegation token secret manager.
+ * The secret manager is responsible for generating and accepting the password
+ * for each token.
+ */
+public class DelegationTokenSecretManager
+    extends AbstractDelegationTokenSecretManager<DelegationTokenIdentifier> {
+
+  /**
+   * Create a secret manager
+   * @param delegationKeyUpdateInterval the number of seconds for rolling new
+   *        secret keys.
+   * @param delegationTokenMaxLifetime the maximum lifetime of the delegation
+   *        tokens
+   * @param delegationTokenRenewInterval how often the tokens must be renewed
+   * @param delegationTokenRemoverScanInterval how often the tokens are scanned
+   *        for expired tokens
+   */
+  public DelegationTokenSecretManager(long delegationKeyUpdateInterval,
+                                      long delegationTokenMaxLifetime,
+                                      long delegationTokenRenewInterval,
+                                      long delegationTokenRemoverScanInterval) {
+    super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
+          delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
+  }
+
+  @Override
+  public DelegationTokenIdentifier createIdentifier() {
+    return new DelegationTokenIdentifier();
+  }
+
+  public synchronized void cancelDelegationToken(String tokenStrForm) throws IOException {
+    Token<DelegationTokenIdentifier> t= new Token<DelegationTokenIdentifier>();
+    t.decodeFromUrlString(tokenStrForm);
+    String user = UserGroupInformation.getCurrentUser().getUserName();
+    cancelToken(t, user);
+  }
+
+  public synchronized long renewDelegationToken(String tokenStrForm) throws IOException {
+    Token<DelegationTokenIdentifier> t= new Token<DelegationTokenIdentifier>();
+    t.decodeFromUrlString(tokenStrForm);
+    String user = UserGroupInformation.getCurrentUser().getUserName();
+    return renewToken(t, user);
+  }
+
+  public synchronized String getDelegationToken(String renewer) throws IOException {
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    Text owner = new Text(ugi.getUserName());
+    Text realUser = null;
+    if (ugi.getRealUser() != null) {
+      realUser = new Text(ugi.getRealUser().getUserName());
+    }
+    DelegationTokenIdentifier ident =
+      new DelegationTokenIdentifier(owner, new Text(renewer), realUser);
+    Token<DelegationTokenIdentifier> t = new Token<DelegationTokenIdentifier>(
+        ident, this);
+    return t.encodeToUrlString();
+  }
+
+  public String getUserFromToken(String tokenStr) throws IOException {
+    Token<DelegationTokenIdentifier> delegationToken = new Token<DelegationTokenIdentifier>();
+    delegationToken.decodeFromUrlString(tokenStr);
+
+    ByteArrayInputStream buf = new ByteArrayInputStream(delegationToken.getIdentifier());
+    DataInputStream in = new DataInputStream(buf);
+    DelegationTokenIdentifier id = createIdentifier();
+    id.readFields(in);
+    return id.getUser().getShortUserName();
+  }
+}
+

Added: hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java?rev=1641980&view=auto
==============================================================================
--- hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java (added)
+++ hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java Thu Nov 27 01:07:32 2014
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.thrift;
+
+import java.io.Closeable;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
+
+/**
+ * Interface for pluggable token store that can be implemented with shared external
+ * storage for load balancing and high availability (for example using ZooKeeper).
+ * Internal, store specific errors are translated into {@link TokenStoreException}.
+ */
+public interface DelegationTokenStore extends Configurable, Closeable {
+
+  /**
+   * Exception for internal token store errors that typically cannot be handled by the caller.
+   */
+  public static class TokenStoreException extends RuntimeException {
+    private static final long serialVersionUID = -8693819817623074083L;
+
+    public TokenStoreException(Throwable cause) {
+      super(cause);
+    }
+
+    public TokenStoreException(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
+  /**
+   * Add new master key. The token store assigns and returns the sequence number.
+   * Caller needs to use the identifier to update the key (since it is embedded in the key).
+   *
+   * @param s
+   * @return sequence number for new key
+   */
+  int addMasterKey(String s) throws TokenStoreException;
+
+  /**
+   * Update master key (for expiration and setting store assigned sequence within key)
+   * @param keySeq
+   * @param s
+   * @throws TokenStoreException
+   */
+  void updateMasterKey(int keySeq, String s) throws TokenStoreException;
+
+  /**
+   * Remove key for given id.
+   * @param keySeq
+   * @return false if key no longer present, true otherwise.
+   */
+  boolean removeMasterKey(int keySeq);
+
+  /**
+   * Return all master keys.
+   * @return
+   * @throws TokenStoreException
+   */
+  String[] getMasterKeys() throws TokenStoreException;
+
+  /**
+   * Add token. If identifier is already present, token won't be added.
+   * @param tokenIdentifier
+   * @param token
+   * @return true if token was added, false for existing identifier
+   */
+  boolean addToken(DelegationTokenIdentifier tokenIdentifier,
+      DelegationTokenInformation token) throws TokenStoreException;
+
+  /**
+   * Get token. Returns null if the token does not exist.
+   * @param tokenIdentifier
+   * @return
+   */
+  DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier)
+      throws TokenStoreException;
+
+  /**
+   * Remove token. Return value can be used by caller to detect concurrency.
+   * @param tokenIdentifier
+   * @return true if token was removed, false if it was already removed.
+   * @throws TokenStoreException
+   */
+  boolean removeToken(DelegationTokenIdentifier tokenIdentifier) throws TokenStoreException;
+
+  /**
+   * List of all token identifiers in the store. This is used to remove expired tokens
+   * and a potential scalability improvement would be to partition by master key id
+   * @return
+   */
+  List<DelegationTokenIdentifier> getAllDelegationTokenIdentifiers() throws TokenStoreException;
+
+  /**
+   * @param hmsHandler ObjectStore used by DBTokenStore
+   * @param smode Indicate whether this is a metastore or hiveserver2 token store
+   */
+  void init(Object hmsHandler, ServerMode smode);
+
+}

Modified: hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java?rev=1641980&r1=1641979&r2=1641980&view=diff
==============================================================================
--- hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java (original)
+++ hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java Thu Nov 27 01:07:32 2014
@@ -15,107 +15,726 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.hive.thrift;
 
+import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION;
+
 import java.io.IOException;
 import java.net.InetAddress;
+import java.net.Socket;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
+import java.util.Locale;
 import java.util.Map;
 
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.RealmChoiceCallback;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.hive.shims.Utils;
+import org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.apache.hadoop.security.authorize.ProxyUsers;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.thrift.TException;
 import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSaslClientTransport;
+import org.apache.thrift.transport.TSaslServerTransport;
+import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 import org.apache.thrift.transport.TTransportFactory;
 
 /**
- * This class is only overridden by the secure hadoop shim. It allows
- * the Thrift SASL support to bridge to Hadoop's UserGroupInformation
- * & DelegationToken infrastructure.
+ * Functions that bridge Thrift's SASL transports to Hadoop's
+ * SASL callback handlers and authentication classes.
  */
 public class HadoopThriftAuthBridge {
+  static final Log LOG = LogFactory.getLog(HadoopThriftAuthBridge.class);
+
   public Client createClient() {
-    throw new UnsupportedOperationException(
-        "The current version of Hadoop does not support Authentication");
+    return new Client();
   }
 
-  public Client createClientWithConf(String authType) {
-    throw new UnsupportedOperationException(
-        "The current version of Hadoop does not support Authentication");
+  public Client createClientWithConf(String authMethod) {
+    UserGroupInformation ugi;
+    try {
+      ugi = UserGroupInformation.getLoginUser();
+    } catch(IOException e) {
+      throw new IllegalStateException("Unable to get current login user: " + e, e);
+    }
+    if (loginUserHasCurrentAuthMethod(ugi, authMethod)) {
+      LOG.debug("Not setting UGI conf as passed-in authMethod of " + authMethod + " = current.");
+      return new Client();
+    } else {
+      LOG.debug("Setting UGI conf as passed-in authMethod of " + authMethod + " != current.");
+      Configuration conf = new Configuration();
+      conf.set(HADOOP_SECURITY_AUTHENTICATION, authMethod);
+      UserGroupInformation.setConfiguration(conf);
+      return new Client();
+    }
   }
 
-  public UserGroupInformation getCurrentUGIWithConf(String authType)
-      throws IOException {
-    throw new UnsupportedOperationException(
-        "The current version of Hadoop does not support Authentication");
+  public Server createServer(String keytabFile, String principalConf) throws TTransportException {
+    return new Server(keytabFile, principalConf);
   }
 
 
   public String getServerPrincipal(String principalConfig, String host)
       throws IOException {
-    throw new UnsupportedOperationException(
-        "The current version of Hadoop does not support Authentication");
+    String serverPrincipal = SecurityUtil.getServerPrincipal(principalConfig, host);
+    String names[] = SaslRpcServer.splitKerberosName(serverPrincipal);
+    if (names.length != 3) {
+      throw new IOException(
+          "Kerberos principal name does NOT have the expected hostname part: "
+              + serverPrincipal);
+    }
+    return serverPrincipal;
   }
 
-  public Server createServer(String keytabFile, String principalConf)
-      throws TTransportException {
-    throw new UnsupportedOperationException(
-        "The current version of Hadoop does not support Authentication");
+
+  public UserGroupInformation getCurrentUGIWithConf(String authMethod)
+      throws IOException {
+    UserGroupInformation ugi;
+    try {
+      ugi = UserGroupInformation.getCurrentUser();
+    } catch(IOException e) {
+      throw new IllegalStateException("Unable to get current user: " + e, e);
+    }
+    if (loginUserHasCurrentAuthMethod(ugi, authMethod)) {
+      LOG.debug("Not setting UGI conf as passed-in authMethod of " + authMethod + " = current.");
+      return ugi;
+    } else {
+      LOG.debug("Setting UGI conf as passed-in authMethod of " + authMethod + " != current.");
+      Configuration conf = new Configuration();
+      conf.set(HADOOP_SECURITY_AUTHENTICATION, authMethod);
+      UserGroupInformation.setConfiguration(conf);
+      return UserGroupInformation.getCurrentUser();
+    }
+  }
+
+  /**
+   * Return true if the current login user is already using the given authMethod.
+   *
+   * Used above to ensure we do not create a new Configuration object and as such
+   * lose other settings such as the cluster to which the JVM is connected. Required
+   * for oozie since it does not have a core-site.xml see HIVE-7682
+   */
+  private boolean loginUserHasCurrentAuthMethod(UserGroupInformation ugi, String sAuthMethod) {
+    AuthenticationMethod authMethod;
+    try {
+      // based on SecurityUtil.getAuthenticationMethod()
+      authMethod = Enum.valueOf(AuthenticationMethod.class, sAuthMethod.toUpperCase(Locale.ENGLISH));
+    } catch (IllegalArgumentException iae) {
+      throw new IllegalArgumentException("Invalid attribute value for " +
+          HADOOP_SECURITY_AUTHENTICATION + " of " + sAuthMethod, iae);
+    }
+    LOG.debug("Current authMethod = " + ugi.getAuthenticationMethod());
+    return ugi.getAuthenticationMethod().equals(authMethod);
   }
 
 
   /**
    * Read and return Hadoop SASL configuration which can be configured using
    * "hadoop.rpc.protection"
-   *
    * @param conf
    * @return Hadoop SASL configuration
    */
+
   public Map<String, String> getHadoopSaslProperties(Configuration conf) {
-    throw new UnsupportedOperationException(
-        "The current version of Hadoop does not support Authentication");
+    // Initialize the SaslRpcServer to ensure QOP parameters are read from conf
+    SaslRpcServer.init(conf);
+    return SaslRpcServer.SASL_PROPS;
   }
 
-  public static abstract class Client {
+  public static class Client {
     /**
+     * Create a client-side SASL transport that wraps an underlying transport.
      *
-     * @param principalConfig In the case of Kerberos authentication this will
-     * be the kerberos principal name, for DIGEST-MD5 (delegation token) based
-     * authentication this will be null
-     * @param host The metastore server host name
-     * @param methodStr "KERBEROS" or "DIGEST"
-     * @param tokenStrForm This is url encoded string form of
-     * org.apache.hadoop.security.token.
-     * @param underlyingTransport the underlying transport
-     * @return the transport
-     * @throws IOException
+     * @param method The authentication method to use. Currently only KERBEROS is
+     *               supported.
+     * @param serverPrincipal The Kerberos principal of the target server.
+     * @param underlyingTransport The underlying transport mechanism, usually a TSocket.
+     * @param saslProps the sasl properties to create the client with
      */
-    public abstract TTransport createClientTransport(
+
+
+    public TTransport createClientTransport(
         String principalConfig, String host,
         String methodStr, String tokenStrForm, TTransport underlyingTransport,
-        Map<String, String> saslProps)
-            throws IOException;
+        Map<String, String> saslProps) throws IOException {
+      AuthMethod method = AuthMethod.valueOf(AuthMethod.class, methodStr);
+
+      TTransport saslTransport = null;
+      switch (method) {
+      case DIGEST:
+        Token<DelegationTokenIdentifier> t= new Token<DelegationTokenIdentifier>();
+        t.decodeFromUrlString(tokenStrForm);
+        saslTransport = new TSaslClientTransport(
+            method.getMechanismName(),
+            null,
+            null, SaslRpcServer.SASL_DEFAULT_REALM,
+            saslProps, new SaslClientCallbackHandler(t),
+            underlyingTransport);
+        return new TUGIAssumingTransport(saslTransport, UserGroupInformation.getCurrentUser());
+
+      case KERBEROS:
+        String serverPrincipal = SecurityUtil.getServerPrincipal(principalConfig, host);
+        String names[] = SaslRpcServer.splitKerberosName(serverPrincipal);
+        if (names.length != 3) {
+          throw new IOException(
+              "Kerberos principal name does NOT have the expected hostname part: "
+                  + serverPrincipal);
+        }
+        try {
+          saslTransport = new TSaslClientTransport(
+              method.getMechanismName(),
+              null,
+              names[0], names[1],
+              saslProps, null,
+              underlyingTransport);
+          return new TUGIAssumingTransport(saslTransport, UserGroupInformation.getCurrentUser());
+        } catch (SaslException se) {
+          throw new IOException("Could not instantiate SASL transport", se);
+        }
+
+      default:
+        throw new IOException("Unsupported authentication method: " + method);
+      }
+    }
+    private static class SaslClientCallbackHandler implements CallbackHandler {
+      private final String userName;
+      private final char[] userPassword;
+
+      public SaslClientCallbackHandler(Token<? extends TokenIdentifier> token) {
+        this.userName = encodeIdentifier(token.getIdentifier());
+        this.userPassword = encodePassword(token.getPassword());
+      }
+
+
+      @Override
+      public void handle(Callback[] callbacks)
+          throws UnsupportedCallbackException {
+        NameCallback nc = null;
+        PasswordCallback pc = null;
+        RealmCallback rc = null;
+        for (Callback callback : callbacks) {
+          if (callback instanceof RealmChoiceCallback) {
+            continue;
+          } else if (callback instanceof NameCallback) {
+            nc = (NameCallback) callback;
+          } else if (callback instanceof PasswordCallback) {
+            pc = (PasswordCallback) callback;
+          } else if (callback instanceof RealmCallback) {
+            rc = (RealmCallback) callback;
+          } else {
+            throw new UnsupportedCallbackException(callback,
+                "Unrecognized SASL client callback");
+          }
+        }
+        if (nc != null) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("SASL client callback: setting username: " + userName);
+          }
+          nc.setName(userName);
+        }
+        if (pc != null) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("SASL client callback: setting userPassword");
+          }
+          pc.setPassword(userPassword);
+        }
+        if (rc != null) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("SASL client callback: setting realm: "
+                + rc.getDefaultText());
+          }
+          rc.setText(rc.getDefaultText());
+        }
+      }
+
+      static String encodeIdentifier(byte[] identifier) {
+        return new String(Base64.encodeBase64(identifier));
+      }
+
+      static char[] encodePassword(byte[] password) {
+        return new String(Base64.encodeBase64(password)).toCharArray();
+      }
+    }
   }
 
-  public static abstract class Server {
+  public static class Server {
     public enum ServerMode {
       HIVESERVER2, METASTORE
     };
-    public abstract TTransportFactory createTransportFactory(Map<String, String> saslProps) throws TTransportException;
-    public abstract TProcessor wrapProcessor(TProcessor processor);
-    public abstract TProcessor wrapNonAssumingProcessor(TProcessor processor);
-    public abstract InetAddress getRemoteAddress();
-    public abstract void startDelegationTokenSecretManager(Configuration conf,
-        Object hmsHandler, ServerMode smode) throws IOException;
-    public abstract String getDelegationToken(String owner, String renewer)
-        throws IOException, InterruptedException;
-    public abstract String getDelegationTokenWithService(String owner, String renewer, String service)
-        throws IOException, InterruptedException;
-    public abstract String getRemoteUser();
-    public abstract long renewDelegationToken(String tokenStrForm) throws IOException;
-    public abstract void cancelDelegationToken(String tokenStrForm) throws IOException;
-    public abstract String getUserFromToken(String tokenStr) throws IOException;
+    final UserGroupInformation realUgi;
+    DelegationTokenSecretManager secretManager;
+    private final static long DELEGATION_TOKEN_GC_INTERVAL = 3600000; // 1 hour
+    //Delegation token related keys
+    public static final String  DELEGATION_KEY_UPDATE_INTERVAL_KEY =
+        "hive.cluster.delegation.key.update-interval";
+    public static final long    DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT =
+        24*60*60*1000; // 1 day
+    public static final String  DELEGATION_TOKEN_RENEW_INTERVAL_KEY =
+        "hive.cluster.delegation.token.renew-interval";
+    public static final long    DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT =
+        24*60*60*1000;  // 1 day
+    public static final String  DELEGATION_TOKEN_MAX_LIFETIME_KEY =
+        "hive.cluster.delegation.token.max-lifetime";
+    public static final long    DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT =
+        7*24*60*60*1000; // 7 days
+    public static final String DELEGATION_TOKEN_STORE_CLS =
+        "hive.cluster.delegation.token.store.class";
+    public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_STR =
+        "hive.cluster.delegation.token.store.zookeeper.connectString";
+    // alternate connect string specification configuration
+    public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE =
+        "hive.zookeeper.quorum";
+
+    public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS =
+        "hive.cluster.delegation.token.store.zookeeper.connectTimeoutMillis";
+    public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE =
+        "hive.cluster.delegation.token.store.zookeeper.znode";
+    public static final String DELEGATION_TOKEN_STORE_ZK_ACL =
+        "hive.cluster.delegation.token.store.zookeeper.acl";
+    public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT =
+        "/hivedelegation";
+
+    public Server() throws TTransportException {
+      try {
+        realUgi = UserGroupInformation.getCurrentUser();
+      } catch (IOException ioe) {
+        throw new TTransportException(ioe);
+      }
+    }
+    /**
+     * Create a server with a kerberos keytab/principal.
+     */
+    protected Server(String keytabFile, String principalConf)
+        throws TTransportException {
+      if (keytabFile == null || keytabFile.isEmpty()) {
+        throw new TTransportException("No keytab specified");
+      }
+      if (principalConf == null || principalConf.isEmpty()) {
+        throw new TTransportException("No principal specified");
+      }
+
+      // Login from the keytab
+      String kerberosName;
+      try {
+        kerberosName =
+            SecurityUtil.getServerPrincipal(principalConf, "0.0.0.0");
+        UserGroupInformation.loginUserFromKeytab(
+            kerberosName, keytabFile);
+        realUgi = UserGroupInformation.getLoginUser();
+        assert realUgi.isFromKeytab();
+      } catch (IOException ioe) {
+        throw new TTransportException(ioe);
+      }
+    }
+
+    /**
+     * Create a TTransportFactory that, upon connection of a client socket,
+     * negotiates a Kerberized SASL transport. The resulting TTransportFactory
+     * can be passed as both the input and output transport factory when
+     * instantiating a TThreadPoolServer, for example.
+     *
+     * @param saslProps Map of SASL properties
+     */
+
+    public TTransportFactory createTransportFactory(Map<String, String> saslProps)
+        throws TTransportException {
+      // Parse out the kerberos principal, host, realm.
+      String kerberosName = realUgi.getUserName();
+      final String names[] = SaslRpcServer.splitKerberosName(kerberosName);
+      if (names.length != 3) {
+        throw new TTransportException("Kerberos principal should have 3 parts: " + kerberosName);
+      }
+
+      TSaslServerTransport.Factory transFactory = new TSaslServerTransport.Factory();
+      transFactory.addServerDefinition(
+          AuthMethod.KERBEROS.getMechanismName(),
+          names[0], names[1],  // two parts of kerberos principal
+          saslProps,
+          new SaslRpcServer.SaslGssCallbackHandler());
+      transFactory.addServerDefinition(AuthMethod.DIGEST.getMechanismName(),
+          null, SaslRpcServer.SASL_DEFAULT_REALM,
+          saslProps, new SaslDigestCallbackHandler(secretManager));
+
+      return new TUGIAssumingTransportFactory(transFactory, realUgi);
+    }
+
+    /**
+     * Wrap a TProcessor in such a way that, before processing any RPC, it
+     * assumes the UserGroupInformation of the user authenticated by
+     * the SASL transport.
+     */
+
+    public TProcessor wrapProcessor(TProcessor processor) {
+      return new TUGIAssumingProcessor(processor, secretManager, true);
+    }
+
+    /**
+     * Wrap a TProcessor to capture the client information like connecting userid, ip etc
+     */
+
+    public TProcessor wrapNonAssumingProcessor(TProcessor processor) {
+      return new TUGIAssumingProcessor(processor, secretManager, false);
+    }
+
+    protected DelegationTokenStore getTokenStore(Configuration conf)
+        throws IOException {
+      String tokenStoreClassName = conf.get(DELEGATION_TOKEN_STORE_CLS, "");
+      if (StringUtils.isBlank(tokenStoreClassName)) {
+        return new MemoryTokenStore();
+      }
+      try {
+        Class<? extends DelegationTokenStore> storeClass = Class
+            .forName(tokenStoreClassName).asSubclass(
+                DelegationTokenStore.class);
+        return ReflectionUtils.newInstance(storeClass, conf);
+      } catch (ClassNotFoundException e) {
+        throw new IOException("Error initializing delegation token store: " + tokenStoreClassName,
+            e);
+      }
+    }
+
+
+    public void startDelegationTokenSecretManager(Configuration conf, Object rawStore, ServerMode smode)
+        throws IOException{
+      long secretKeyInterval =
+          conf.getLong(DELEGATION_KEY_UPDATE_INTERVAL_KEY,
+              DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT);
+      long tokenMaxLifetime =
+          conf.getLong(DELEGATION_TOKEN_MAX_LIFETIME_KEY,
+              DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT);
+      long tokenRenewInterval =
+          conf.getLong(DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
+              DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
+
+      DelegationTokenStore dts = getTokenStore(conf);
+      dts.init(rawStore, smode);
+      secretManager = new TokenStoreDelegationTokenSecretManager(secretKeyInterval,
+          tokenMaxLifetime,
+          tokenRenewInterval,
+          DELEGATION_TOKEN_GC_INTERVAL, dts);
+      secretManager.startThreads();
+    }
+
+
+    public String getDelegationToken(final String owner, final String renewer)
+        throws IOException, InterruptedException {
+      if (!authenticationMethod.get().equals(AuthenticationMethod.KERBEROS)) {
+        throw new AuthorizationException(
+            "Delegation Token can be issued only with kerberos authentication. " +
+                "Current AuthenticationMethod: " + authenticationMethod.get()
+            );
+      }
+      //if the user asking the token is same as the 'owner' then don't do
+      //any proxy authorization checks. For cases like oozie, where it gets
+      //a delegation token for another user, we need to make sure oozie is
+      //authorized to get a delegation token.
+      //Do all checks on short names
+      UserGroupInformation currUser = UserGroupInformation.getCurrentUser();
+      UserGroupInformation ownerUgi = UserGroupInformation.createRemoteUser(owner);
+      if (!ownerUgi.getShortUserName().equals(currUser.getShortUserName())) {
+        //in the case of proxy users, the getCurrentUser will return the
+        //real user (for e.g. oozie) due to the doAs that happened just before the
+        //server started executing the method getDelegationToken in the MetaStore
+        ownerUgi = UserGroupInformation.createProxyUser(owner,
+            UserGroupInformation.getCurrentUser());
+        InetAddress remoteAddr = getRemoteAddress();
+        ProxyUsers.authorize(ownerUgi,remoteAddr.getHostAddress(), null);
+      }
+      return ownerUgi.doAs(new PrivilegedExceptionAction<String>() {
+
+        @Override
+        public String run() throws IOException {
+          return secretManager.getDelegationToken(renewer);
+        }
+      });
+    }
+
+
+    public String getDelegationTokenWithService(String owner, String renewer, String service)
+        throws IOException, InterruptedException {
+      String token = getDelegationToken(owner, renewer);
+      return Utils.addServiceToToken(token, service);
+    }
+
+
+    public long renewDelegationToken(String tokenStrForm) throws IOException {
+      if (!authenticationMethod.get().equals(AuthenticationMethod.KERBEROS)) {
+        throw new AuthorizationException(
+            "Delegation Token can be issued only with kerberos authentication. " +
+                "Current AuthenticationMethod: " + authenticationMethod.get()
+            );
+      }
+      return secretManager.renewDelegationToken(tokenStrForm);
+    }
+
+
+    public String getUserFromToken(String tokenStr) throws IOException {
+      return secretManager.getUserFromToken(tokenStr);
+    }
+
+
+    public void cancelDelegationToken(String tokenStrForm) throws IOException {
+      secretManager.cancelDelegationToken(tokenStrForm);
+    }
+
+    final static ThreadLocal<InetAddress> remoteAddress =
+        new ThreadLocal<InetAddress>() {
+
+      @Override
+      protected synchronized InetAddress initialValue() {
+        return null;
+      }
+    };
+
+
+    public InetAddress getRemoteAddress() {
+      return remoteAddress.get();
+    }
+
+    final static ThreadLocal<AuthenticationMethod> authenticationMethod =
+        new ThreadLocal<AuthenticationMethod>() {
+
+      @Override
+      protected synchronized AuthenticationMethod initialValue() {
+        return AuthenticationMethod.TOKEN;
+      }
+    };
+
+    private static ThreadLocal<String> remoteUser = new ThreadLocal<String> () {
+
+      @Override
+      protected synchronized String initialValue() {
+        return null;
+      }
+    };
+
+
+    public String getRemoteUser() {
+      return remoteUser.get();
+    }
+
+    /** CallbackHandler for SASL DIGEST-MD5 mechanism */
+    // This code is pretty much completely based on Hadoop's
+    // SaslRpcServer.SaslDigestCallbackHandler - the only reason we could not
+    // use that Hadoop class as-is was because it needs a Server.Connection object
+    // which is relevant in hadoop rpc but not here in the metastore - so the
+    // code below does not deal with the Connection Server.object.
+    static class SaslDigestCallbackHandler implements CallbackHandler {
+      private final DelegationTokenSecretManager secretManager;
+
+      public SaslDigestCallbackHandler(
+          DelegationTokenSecretManager secretManager) {
+        this.secretManager = secretManager;
+      }
+
+      private char[] getPassword(DelegationTokenIdentifier tokenid) throws InvalidToken {
+        return encodePassword(secretManager.retrievePassword(tokenid));
+      }
+
+      private char[] encodePassword(byte[] password) {
+        return new String(Base64.encodeBase64(password)).toCharArray();
+      }
+      /** {@inheritDoc} */
+
+      @Override
+      public void handle(Callback[] callbacks) throws InvalidToken,
+      UnsupportedCallbackException {
+        NameCallback nc = null;
+        PasswordCallback pc = null;
+        AuthorizeCallback ac = null;
+        for (Callback callback : callbacks) {
+          if (callback instanceof AuthorizeCallback) {
+            ac = (AuthorizeCallback) callback;
+          } else if (callback instanceof NameCallback) {
+            nc = (NameCallback) callback;
+          } else if (callback instanceof PasswordCallback) {
+            pc = (PasswordCallback) callback;
+          } else if (callback instanceof RealmCallback) {
+            continue; // realm is ignored
+          } else {
+            throw new UnsupportedCallbackException(callback,
+                "Unrecognized SASL DIGEST-MD5 Callback");
+          }
+        }
+        if (pc != null) {
+          DelegationTokenIdentifier tokenIdentifier = SaslRpcServer.
+              getIdentifier(nc.getDefaultName(), secretManager);
+          char[] password = getPassword(tokenIdentifier);
+
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("SASL server DIGEST-MD5 callback: setting password "
+                + "for client: " + tokenIdentifier.getUser());
+          }
+          pc.setPassword(password);
+        }
+        if (ac != null) {
+          String authid = ac.getAuthenticationID();
+          String authzid = ac.getAuthorizationID();
+          if (authid.equals(authzid)) {
+            ac.setAuthorized(true);
+          } else {
+            ac.setAuthorized(false);
+          }
+          if (ac.isAuthorized()) {
+            if (LOG.isDebugEnabled()) {
+              String username =
+                  SaslRpcServer.getIdentifier(authzid, secretManager).getUser().getUserName();
+              LOG.debug("SASL server DIGEST-MD5 callback: setting "
+                  + "canonicalized client ID: " + username);
+            }
+            ac.setAuthorizedID(authzid);
+          }
+        }
+      }
+    }
+
+    /**
+     * Processor that pulls the SaslServer object out of the transport, and
+     * assumes the remote user's UGI before calling through to the original
+     * processor.
+     *
+     * This is used on the server side to set the UGI for each specific call.
+     */
+    protected class TUGIAssumingProcessor implements TProcessor {
+      final TProcessor wrapped;
+      DelegationTokenSecretManager secretManager;
+      boolean useProxy;
+      TUGIAssumingProcessor(TProcessor wrapped, DelegationTokenSecretManager secretManager,
+          boolean useProxy) {
+        this.wrapped = wrapped;
+        this.secretManager = secretManager;
+        this.useProxy = useProxy;
+      }
+
+
+      @Override
+      public boolean process(final TProtocol inProt, final TProtocol outProt) throws TException {
+        TTransport trans = inProt.getTransport();
+        if (!(trans instanceof TSaslServerTransport)) {
+          throw new TException("Unexpected non-SASL transport " + trans.getClass());
+        }
+        TSaslServerTransport saslTrans = (TSaslServerTransport)trans;
+        SaslServer saslServer = saslTrans.getSaslServer();
+        String authId = saslServer.getAuthorizationID();
+        authenticationMethod.set(AuthenticationMethod.KERBEROS);
+        LOG.debug("AUTH ID ======>" + authId);
+        String endUser = authId;
+
+        if(saslServer.getMechanismName().equals("DIGEST-MD5")) {
+          try {
+            TokenIdentifier tokenId = SaslRpcServer.getIdentifier(authId,
+                secretManager);
+            endUser = tokenId.getUser().getUserName();
+            authenticationMethod.set(AuthenticationMethod.TOKEN);
+          } catch (InvalidToken e) {
+            throw new TException(e.getMessage());
+          }
+        }
+        Socket socket = ((TSocket)(saslTrans.getUnderlyingTransport())).getSocket();
+        remoteAddress.set(socket.getInetAddress());
+        UserGroupInformation clientUgi = null;
+        try {
+          if (useProxy) {
+            clientUgi = UserGroupInformation.createProxyUser(
+                endUser, UserGroupInformation.getLoginUser());
+            remoteUser.set(clientUgi.getShortUserName());
+            LOG.debug("Set remoteUser :" + remoteUser.get());
+            return clientUgi.doAs(new PrivilegedExceptionAction<Boolean>() {
+
+              @Override
+              public Boolean run() {
+                try {
+                  return wrapped.process(inProt, outProt);
+                } catch (TException te) {
+                  throw new RuntimeException(te);
+                }
+              }
+            });
+          } else {
+            // use the short user name for the request
+            UserGroupInformation endUserUgi = UserGroupInformation.createRemoteUser(endUser);
+            remoteUser.set(endUserUgi.getShortUserName());
+            LOG.debug("Set remoteUser :" + remoteUser.get() + ", from endUser :" + endUser);
+            return wrapped.process(inProt, outProt);
+          }
+        } catch (RuntimeException rte) {
+          if (rte.getCause() instanceof TException) {
+            throw (TException)rte.getCause();
+          }
+          throw rte;
+        } catch (InterruptedException ie) {
+          throw new RuntimeException(ie); // unexpected!
+        } catch (IOException ioe) {
+          throw new RuntimeException(ioe); // unexpected!
+        }
+        finally {
+          if (clientUgi != null) {
+            try { FileSystem.closeAllForUGI(clientUgi); }
+            catch(IOException exception) {
+              LOG.error("Could not clean up file-system handles for UGI: " + clientUgi, exception);
+            }
+          }
+        }
+      }
+    }
+
+    /**
+     * A TransportFactory that wraps another one, but assumes a specified UGI
+     * before calling through.
+     *
+     * This is used on the server side to assume the server's Principal when accepting
+     * clients.
+     */
+    static class TUGIAssumingTransportFactory extends TTransportFactory {
+      private final UserGroupInformation ugi;
+      private final TTransportFactory wrapped;
+
+      public TUGIAssumingTransportFactory(TTransportFactory wrapped, UserGroupInformation ugi) {
+        assert wrapped != null;
+        assert ugi != null;
+        this.wrapped = wrapped;
+        this.ugi = ugi;
+      }
+
+
+      @Override
+      public TTransport getTransport(final TTransport trans) {
+        return ugi.doAs(new PrivilegedAction<TTransport>() {
+          @Override
+          public TTransport run() {
+            return wrapped.getTransport(trans);
+          }
+        });
+      }
+    }
   }
 }
-

Added: hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java?rev=1641980&view=auto
==============================================================================
--- hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java (added)
+++ hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java Thu Nov 27 01:07:32 2014
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.thrift;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Default in-memory token store implementation.
+ */
+public class MemoryTokenStore implements DelegationTokenStore {
+  private static final Logger LOG = LoggerFactory.getLogger(MemoryTokenStore.class);
+
+  private final Map<Integer, String> masterKeys
+      = new ConcurrentHashMap<Integer, String>();
+
+  private final ConcurrentHashMap<DelegationTokenIdentifier, DelegationTokenInformation> tokens
+      = new ConcurrentHashMap<DelegationTokenIdentifier, DelegationTokenInformation>();
+
+  private final AtomicInteger masterKeySeq = new AtomicInteger();
+  private Configuration conf;
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return this.conf;
+  }
+
+  @Override
+  public int addMasterKey(String s) {
+    int keySeq = masterKeySeq.getAndIncrement();
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("addMasterKey: s = " + s + ", keySeq = " + keySeq);
+    }
+    masterKeys.put(keySeq, s);
+    return keySeq;
+  }
+
+  @Override
+  public void updateMasterKey(int keySeq, String s) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("updateMasterKey: s = " + s + ", keySeq = " + keySeq);
+    }
+    masterKeys.put(keySeq, s);
+  }
+
+  @Override
+  public boolean removeMasterKey(int keySeq) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("removeMasterKey: keySeq = " + keySeq);
+    }
+    return masterKeys.remove(keySeq) != null;
+  }
+
+  @Override
+  public String[] getMasterKeys() {
+    return masterKeys.values().toArray(new String[0]);
+  }
+
+  @Override
+  public boolean addToken(DelegationTokenIdentifier tokenIdentifier,
+    DelegationTokenInformation token) {
+    DelegationTokenInformation tokenInfo = tokens.putIfAbsent(tokenIdentifier, token);
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("addToken: tokenIdentifier = " + tokenIdentifier + ", addded = " + (tokenInfo == null));
+    }
+    return (tokenInfo == null);
+  }
+
+  @Override
+  public boolean removeToken(DelegationTokenIdentifier tokenIdentifier) {
+    DelegationTokenInformation tokenInfo = tokens.remove(tokenIdentifier);
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("removeToken: tokenIdentifier = " + tokenIdentifier + ", removed = " + (tokenInfo != null));
+    }
+    return tokenInfo != null;
+  }
+
+  @Override
+  public DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier) {
+    DelegationTokenInformation result = tokens.get(tokenIdentifier);
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("getToken: tokenIdentifier = " + tokenIdentifier + ", result = " + result);
+    }
+    return result;
+  }
+
+  @Override
+  public List<DelegationTokenIdentifier> getAllDelegationTokenIdentifiers() {
+    List<DelegationTokenIdentifier> result = new ArrayList<DelegationTokenIdentifier>(
+        tokens.size());
+    for (DelegationTokenIdentifier id : tokens.keySet()) {
+        result.add(id);
+    }
+    return result;
+  }
+
+  @Override
+  public void close() throws IOException {
+    //no-op
+  }
+
+  @Override
+  public void init(Object hmsHandler, ServerMode smode) throws TokenStoreException {
+    // no-op
+  }
+}