You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ga...@apache.org on 2017/08/18 23:01:04 UTC

[2/3] hive git commit: HIVE-17241 Change metastore classes to not use the shims. This closes #228. (Alan Gates, reviewed by Vaibhav Gumashta)

http://git-wip-us.apache.org/repos/asf/hive/blob/e2770d6e/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
index 6a54306..bbe13fd 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
@@ -65,6 +65,8 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.hive.metastore.api.Decimal;
 import org.apache.hadoop.hive.metastore.api.Order;
 import org.apache.hadoop.hive.metastore.api.SkewedInfo;
+import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge;
+import org.apache.hadoop.hive.shims.ShimLoader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -108,8 +110,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge;
 import org.apache.hadoop.security.SaslRpcServer;
 import org.apache.hive.common.util.HiveStringUtils;
 import org.apache.hive.common.util.ReflectionUtil;
@@ -1258,7 +1258,7 @@ public class MetaStoreUtils {
   }
 
   public static int startMetaStore() throws Exception {
-    return startMetaStore(ShimLoader.getHadoopThriftAuthBridge(), null);
+    return startMetaStore(HadoopThriftAuthBridge.getBridge(), null);
   }
 
   public static int startMetaStore(final HadoopThriftAuthBridge bridge, HiveConf conf) throws Exception {
@@ -1268,7 +1268,7 @@ public class MetaStoreUtils {
   }
 
   public static int startMetaStore(HiveConf conf) throws Exception {
-    return startMetaStore(ShimLoader.getHadoopThriftAuthBridge(), conf);
+    return startMetaStore(HadoopThriftAuthBridge.getBridge(), conf);
   }
 
   public static void startMetaStore(final int port, final HadoopThriftAuthBridge bridge) throws Exception {

http://git-wip-us.apache.org/repos/asf/hive/blob/e2770d6e/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index 897fc4e..b878115 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -121,6 +121,7 @@ import org.apache.hadoop.hive.metastore.api.Type;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.hadoop.hive.metastore.api.UnknownPartitionException;
 import org.apache.hadoop.hive.metastore.api.UnknownTableException;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.model.MColumnDescriptor;
 import org.apache.hadoop.hive.metastore.model.MConstraint;
 import org.apache.hadoop.hive.metastore.model.MDBPrivilege;
@@ -157,7 +158,6 @@ import org.apache.hadoop.hive.metastore.parser.ExpressionTree.FilterBuilder;
 import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
-import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hive.common.util.HiveStringUtils;
 import org.apache.thrift.TException;
@@ -493,8 +493,7 @@ public class ObjectStore implements RawStore, Configurable {
     }
     // Password may no longer be in the conf, use getPassword()
     try {
-      String passwd =
-          ShimLoader.getHadoopShims().getPassword(conf, HiveConf.ConfVars.METASTOREPWD.varname);
+      String passwd = MetastoreConf.getPassword(conf, MetastoreConf.ConfVars.PWD);
       if (passwd != null && !passwd.isEmpty()) {
         prop.setProperty(HiveConf.ConfVars.METASTOREPWD.varname, passwd);
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/e2770d6e/metastore/src/java/org/apache/hadoop/hive/metastore/TUGIBasedProcessor.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/TUGIBasedProcessor.java b/metastore/src/java/org/apache/hadoop/hive/metastore/TUGIBasedProcessor.java
index 89f4701..64f0b96 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/TUGIBasedProcessor.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/TUGIBasedProcessor.java
@@ -25,13 +25,13 @@ import java.security.PrivilegedExceptionAction;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.hive.metastore.security.TUGIContainingTransport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore.Iface;
 import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore.set_ugi_args;
 import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore.set_ugi_result;
-import org.apache.hadoop.hive.thrift.TUGIContainingTransport;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.thrift.ProcessFunction;
 import org.apache.thrift.TApplicationException;

http://git-wip-us.apache.org/repos/asf/hive/blob/e2770d6e/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java b/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java
index 778550b..1dd50de 100755
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java
@@ -33,6 +33,7 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hive.metastore.utils.HdfsUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -52,7 +53,6 @@ import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.util.ReflectionUtils;
 
 /**
@@ -252,7 +252,7 @@ public class Warehouse {
     try {
       fs = getFs(path);
       stat = fs.getFileStatus(path);
-      ShimLoader.getHadoopShims().checkFileAccess(fs, stat, FsAction.WRITE);
+      HdfsUtils.checkFileAccess(fs, stat, FsAction.WRITE);
       return true;
     } catch (FileNotFoundException fnfe){
       // File named by path doesn't exist; nothing to validate.

http://git-wip-us.apache.org/repos/asf/hive/blob/e2770d6e/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
index f72c379..0161894 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
@@ -28,10 +28,10 @@ import java.sql.Statement;
 import java.util.Properties;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.shims.ShimLoader;
 
 /**
  * Utility methods for creating and destroying txn database/schema, plus methods for
@@ -328,8 +328,7 @@ public final class TxnDbUtil {
     Properties prop = new Properties();
     String driverUrl = HiveConf.getVar(conf, HiveConf.ConfVars.METASTORECONNECTURLKEY);
     String user = HiveConf.getVar(conf, HiveConf.ConfVars.METASTORE_CONNECTION_USER_NAME);
-    String passwd =
-      ShimLoader.getHadoopShims().getPassword(conf, HiveConf.ConfVars.METASTOREPWD.varname);
+    String passwd = MetastoreConf.getPassword(conf, MetastoreConf.ConfVars.PWD);
     prop.setProperty("user", user);
     prop.setProperty("password", passwd);
     Connection conn = driver.connect(driverUrl, prop);

http://git-wip-us.apache.org/repos/asf/hive/blob/e2770d6e/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index b722af6..f3968e4 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.common.classification.RetrySemantics;
 import org.apache.hadoop.hive.metastore.DatabaseProduct;
 import org.apache.hadoop.hive.metastore.HouseKeeperService;
 import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.commons.dbcp.PoolingDataSource;
@@ -45,7 +46,6 @@ import org.apache.hadoop.hive.common.StringableMap;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConfUtil;
 import org.apache.hadoop.hive.metastore.api.*;
-import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.util.StringUtils;
 
 import javax.sql.DataSource;
@@ -3701,8 +3701,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
 
   private static String getMetastoreJdbcPasswd(HiveConf conf) throws SQLException {
     try {
-      return ShimLoader.getHadoopShims().getPassword(conf,
-          HiveConf.ConfVars.METASTOREPWD.varname);
+      return MetastoreConf.getPassword(conf, MetastoreConf.ConfVars.PWD);
     } catch (IOException err) {
       throw new SQLException("Error getting metastore password", err);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/e2770d6e/standalone-metastore/pom.xml
----------------------------------------------------------------------
diff --git a/standalone-metastore/pom.xml b/standalone-metastore/pom.xml
index b8826c6..2e0c51e 100644
--- a/standalone-metastore/pom.xml
+++ b/standalone-metastore/pom.xml
@@ -53,6 +53,22 @@
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <version>${hadoop.version}</version>
+      <optional>true</optional>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>commmons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
     <!-- This is our one and only Hive dependency.-->
     <dependency>
       <groupId>org.apache.hive</groupId>

http://git-wip-us.apache.org/repos/asf/hive/blob/e2770d6e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index d8ec1d9..0fb878a 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.metastore.conf;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.security.MetastoreDelegationTokenManager;
 import org.apache.hadoop.hive.metastore.utils.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -361,6 +362,17 @@ public class MetastoreConf {
         "The default partition name in case the dynamic partition column value is null/empty string or any other values that cannot be escaped. \n" +
             "This value must not contain any special character used in HDFS URI (e.g., ':', '%', '/' etc). \n" +
             "The user has to be aware that the dynamic partition value should not contain this value to avoid confusions."),
+    DELEGATION_KEY_UPDATE_INTERVAL("metastore.cluster.delegation.key.update-interval",
+        "hive.cluster.delegation.key.update-interval", 1, TimeUnit.DAYS, ""),
+    DELEGATION_TOKEN_GC_INTERVAL("metastore.cluster.delegation.token.gc-interval",
+        "hive.cluster.delegation.token.gc-interval", 1, TimeUnit.HOURS, ""),
+    DELEGATION_TOKEN_MAX_LIFETIME("metastore.cluster.delegation.token.max-lifetime",
+        "hive.cluster.delegation.token.max-lifetime", 7, TimeUnit.DAYS, ""),
+    DELEGATION_TOKEN_RENEW_INTERVAL("metastore.cluster.delegation.token.renew-interval",
+      "hive.cluster.delegation.token.renew-interval", 1, TimeUnit.DAYS, ""),
+    DELEGATION_TOKEN_STORE_CLS("metastore.cluster.delegation.token.store.class",
+        "hive.cluster.delegation.token.store.class", MetastoreDelegationTokenManager.class.getName(),
+        "Class to store delegation tokens"),
     DETACH_ALL_ON_COMMIT("javax.jdo.option.DetachAllOnCommit",
         "javax.jdo.option.DetachAllOnCommit", true,
         "Detaches all objects from session so that they can be used after transaction is committed"),

http://git-wip-us.apache.org/repos/asf/hive/blob/e2770d6e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenIdentifier.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenIdentifier.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenIdentifier.java
new file mode 100644
index 0000000..ba6c7e3
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenIdentifier.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.security;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
+
+/**
+ * A delegation token identifier that is specific to Hive.
+ */
+public class DelegationTokenIdentifier
+    extends AbstractDelegationTokenIdentifier {
+  public static final Text HIVE_DELEGATION_KIND = new Text("HIVE_DELEGATION_TOKEN");
+
+  /**
+   * Create an empty delegation token identifier for reading into.
+   */
+  public DelegationTokenIdentifier() {
+  }
+
+  /**
+   * Create a new delegation token identifier
+   * @param owner the effective username of the token owner
+   * @param renewer the username of the renewer
+   * @param realUser the real username of the token owner
+   */
+  public DelegationTokenIdentifier(Text owner, Text renewer, Text realUser) {
+    super(owner, renewer, realUser);
+  }
+
+  @Override
+  public Text getKind() {
+    return HIVE_DELEGATION_KIND;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/e2770d6e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenSecretManager.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenSecretManager.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenSecretManager.java
new file mode 100644
index 0000000..aae96a5
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenSecretManager.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.security;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
+
+/**
+ * A Hive specific delegation token secret manager.
+ * The secret manager is responsible for generating and accepting the password
+ * for each token.
+ */
+public class DelegationTokenSecretManager
+    extends AbstractDelegationTokenSecretManager<DelegationTokenIdentifier> {
+
+  /**
+   * Create a secret manager
+   * @param delegationKeyUpdateInterval the number of seconds for rolling new
+   *        secret keys.
+   * @param delegationTokenMaxLifetime the maximum lifetime of the delegation
+   *        tokens
+   * @param delegationTokenRenewInterval how often the tokens must be renewed
+   * @param delegationTokenRemoverScanInterval how often the tokens are scanned
+   *        for expired tokens
+   */
+  public DelegationTokenSecretManager(long delegationKeyUpdateInterval,
+                                      long delegationTokenMaxLifetime,
+                                      long delegationTokenRenewInterval,
+                                      long delegationTokenRemoverScanInterval) {
+    super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
+          delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
+  }
+
+  @Override
+  public DelegationTokenIdentifier createIdentifier() {
+    return new DelegationTokenIdentifier();
+  }
+
+  /**
+   * Verify token string
+   * @param tokenStrForm
+   * @return user name
+   * @throws IOException
+   */
+  public synchronized String verifyDelegationToken(String tokenStrForm) throws IOException {
+    Token<DelegationTokenIdentifier> t = new Token<>();
+    t.decodeFromUrlString(tokenStrForm);
+
+    DelegationTokenIdentifier id = getTokenIdentifier(t);
+    verifyToken(id, t.getPassword());
+    return id.getUser().getShortUserName();
+  }
+
+  protected DelegationTokenIdentifier getTokenIdentifier(Token<DelegationTokenIdentifier> token)
+      throws IOException {
+    // turn bytes back into identifier for cache lookup
+    ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
+    DataInputStream in = new DataInputStream(buf);
+    DelegationTokenIdentifier id = createIdentifier();
+    id.readFields(in);
+    return id;
+  }
+
+  public synchronized void cancelDelegationToken(String tokenStrForm) throws IOException {
+    Token<DelegationTokenIdentifier> t= new Token<>();
+    t.decodeFromUrlString(tokenStrForm);
+    String user = UserGroupInformation.getCurrentUser().getUserName();
+    cancelToken(t, user);
+  }
+
+  public synchronized long renewDelegationToken(String tokenStrForm) throws IOException {
+    Token<DelegationTokenIdentifier> t= new Token<>();
+    t.decodeFromUrlString(tokenStrForm);
+    String user = UserGroupInformation.getCurrentUser().getUserName();
+    return renewToken(t, user);
+  }
+
+  public synchronized String getDelegationToken(String renewer) throws IOException {
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    Text owner = new Text(ugi.getUserName());
+    Text realUser = null;
+    if (ugi.getRealUser() != null) {
+      realUser = new Text(ugi.getRealUser().getUserName());
+    }
+    DelegationTokenIdentifier ident =
+      new DelegationTokenIdentifier(owner, new Text(renewer), realUser);
+    Token<DelegationTokenIdentifier> t = new Token<>(
+        ident, this);
+    return t.encodeToUrlString();
+  }
+
+  public String getUserFromToken(String tokenStr) throws IOException {
+    Token<DelegationTokenIdentifier> delegationToken = new Token<>();
+    delegationToken.decodeFromUrlString(tokenStr);
+
+    ByteArrayInputStream buf = new ByteArrayInputStream(delegationToken.getIdentifier());
+    DataInputStream in = new DataInputStream(buf);
+    DelegationTokenIdentifier id = createIdentifier();
+    id.readFields(in);
+    return id.getUser().getShortUserName();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/hive/blob/e2770d6e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenStore.java
new file mode 100644
index 0000000..0cafeff
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenStore.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.security;
+
+import java.io.Closeable;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge.Server.ServerMode;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
+
+/**
+ * Interface for pluggable token store that can be implemented with shared external
+ * storage for load balancing and high availability (for example using ZooKeeper).
+ * Internal, store specific errors are translated into {@link TokenStoreException}.
+ */
+public interface DelegationTokenStore extends Configurable, Closeable {
+
+  /**
+   * Exception for internal token store errors that typically cannot be handled by the caller.
+   */
+  class TokenStoreException extends RuntimeException {
+    private static final long serialVersionUID = -8693819817623074083L;
+
+    public TokenStoreException(Throwable cause) {
+      super(cause);
+    }
+
+    public TokenStoreException(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
+  /**
+   * Add new master key. The token store assigns and returns the sequence number.
+   * Caller needs to use the identifier to update the key (since it is embedded in the key).
+   *
+   * @param s
+   * @return sequence number for new key
+   */
+  int addMasterKey(String s) throws TokenStoreException;
+
+  /**
+   * Update master key (for expiration and setting store assigned sequence within key)
+   * @param keySeq
+   * @param s
+   * @throws TokenStoreException
+   */
+  void updateMasterKey(int keySeq, String s) throws TokenStoreException;
+
+  /**
+   * Remove key for given id.
+   * @param keySeq
+   * @return false if key no longer present, true otherwise.
+   */
+  boolean removeMasterKey(int keySeq);
+
+  /**
+   * Return all master keys.
+   * @return
+   * @throws TokenStoreException
+   */
+  String[] getMasterKeys() throws TokenStoreException;
+
+  /**
+   * Add token. If identifier is already present, token won't be added.
+   * @param tokenIdentifier
+   * @param token
+   * @return true if token was added, false for existing identifier
+   */
+  boolean addToken(DelegationTokenIdentifier tokenIdentifier,
+      DelegationTokenInformation token) throws TokenStoreException;
+
+  /**
+   * Get token. Returns null if the token does not exist.
+   * @param tokenIdentifier
+   * @return
+   */
+  DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier)
+      throws TokenStoreException;
+
+  /**
+   * Remove token. Return value can be used by caller to detect concurrency.
+   * @param tokenIdentifier
+   * @return true if token was removed, false if it was already removed.
+   * @throws TokenStoreException
+   */
+  boolean removeToken(DelegationTokenIdentifier tokenIdentifier) throws TokenStoreException;
+
+  /**
+   * List of all token identifiers in the store. This is used to remove expired tokens
+   * and a potential scalability improvement would be to partition by master key id
+   * @return
+   */
+  List<DelegationTokenIdentifier> getAllDelegationTokenIdentifiers() throws TokenStoreException;
+
+  /**
+   * @param hmsHandler ObjectStore used by DBTokenStore
+   * @param smode Indicate whether this is a metastore or hiveserver2 token store
+   */
+  void init(Object hmsHandler, ServerMode smode);
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/e2770d6e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/HadoopThriftAuthBridge.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/HadoopThriftAuthBridge.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/HadoopThriftAuthBridge.java
new file mode 100644
index 0000000..3f02ffd
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/HadoopThriftAuthBridge.java
@@ -0,0 +1,684 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.security;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
+import java.util.Locale;
+import java.util.Map;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.RealmChoiceCallback;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+import org.apache.commons.codec.binary.Base64;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.thrift.TException;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSaslClientTransport;
+import org.apache.thrift.transport.TSaslServerTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.transport.TTransportFactory;
+
+/**
+ * Functions that bridge Thrift's SASL transports to Hadoop's
+ * SASL callback handlers and authentication classes.
+ * HIVE-11378 This class is not directly used anymore.  It now exists only as a shell to be
+ * extended by HadoopThriftAuthBridge23 in 0.23 shims.  I have made it abstract
+ * to avoid maintenance errors.
+ */
+public abstract class HadoopThriftAuthBridge {
+  private static final Logger LOG = LoggerFactory.getLogger(HadoopThriftAuthBridge.class);
+
+  // We want to have only one auth bridge.  In the past this was handled by ShimLoader, but since
+  // we're no longer using that we'll do it here.
+  private static HadoopThriftAuthBridge self = null;
+
+  public static HadoopThriftAuthBridge getBridge() {
+    if (self == null) {
+      synchronized (HadoopThriftAuthBridge.class) {
+        if (self == null) self = new HadoopThriftAuthBridge23();
+      }
+    }
+    return self;
+  }
+
+  public Client createClient() {
+    return new Client();
+  }
+
+  public Client createClientWithConf(String authMethod) {
+    UserGroupInformation ugi;
+    try {
+      ugi = UserGroupInformation.getLoginUser();
+    } catch(IOException e) {
+      throw new IllegalStateException("Unable to get current login user: " + e, e);
+    }
+    if (loginUserHasCurrentAuthMethod(ugi, authMethod)) {
+      LOG.debug("Not setting UGI conf as passed-in authMethod of " + authMethod + " = current.");
+      return new Client();
+    } else {
+      LOG.debug("Setting UGI conf as passed-in authMethod of " + authMethod + " != current.");
+      Configuration conf = new Configuration();
+      conf.set(HADOOP_SECURITY_AUTHENTICATION, authMethod);
+      UserGroupInformation.setConfiguration(conf);
+      return new Client();
+    }
+  }
+
+  public Server createServer(String keytabFile, String principalConf) throws TTransportException {
+    return new Server(keytabFile, principalConf);
+  }
+
+
+  public String getServerPrincipal(String principalConfig, String host)
+      throws IOException {
+    String serverPrincipal = SecurityUtil.getServerPrincipal(principalConfig, host);
+    String names[] = SaslRpcServer.splitKerberosName(serverPrincipal);
+    if (names.length != 3) {
+      throw new IOException(
+          "Kerberos principal name does NOT have the expected hostname part: "
+              + serverPrincipal);
+    }
+    return serverPrincipal;
+  }
+
+  /**
+   * Method to get canonical-ized hostname, given a hostname (possibly a CNAME).
+   * This should allow for service-principals to use simplified CNAMEs.
+   * @param hostName The hostname to be canonical-ized.
+   * @return Given a CNAME, the canonical-ized hostname is returned. If not found, the original hostname is returned.
+   */
+  public String getCanonicalHostName(String hostName) {
+    try {
+      return InetAddress.getByName(hostName).getCanonicalHostName();
+    }
+    catch(UnknownHostException exception) {
+      LOG.warn("Could not retrieve canonical hostname for " + hostName, exception);
+      return hostName;
+    }
+  }
+
+  public UserGroupInformation getCurrentUGIWithConf(String authMethod)
+      throws IOException {
+    UserGroupInformation ugi;
+    try {
+      ugi = UserGroupInformation.getCurrentUser();
+    } catch(IOException e) {
+      throw new IllegalStateException("Unable to get current user: " + e, e);
+    }
+    if (loginUserHasCurrentAuthMethod(ugi, authMethod)) {
+      LOG.debug("Not setting UGI conf as passed-in authMethod of " + authMethod + " = current.");
+      return ugi;
+    } else {
+      LOG.debug("Setting UGI conf as passed-in authMethod of " + authMethod + " != current.");
+      Configuration conf = new Configuration();
+      conf.set(HADOOP_SECURITY_AUTHENTICATION, authMethod);
+      UserGroupInformation.setConfiguration(conf);
+      return UserGroupInformation.getCurrentUser();
+    }
+  }
+
+  /**
+   * Return true if the current login user is already using the given authMethod.
+   *
+   * Used above to ensure we do not create a new Configuration object and as such
+   * lose other settings such as the cluster to which the JVM is connected. Required
+   * for oozie since it does not have a core-site.xml see HIVE-7682
+   */
+  private boolean loginUserHasCurrentAuthMethod(UserGroupInformation ugi, String sAuthMethod) {
+    AuthenticationMethod authMethod;
+    try {
+      // based on SecurityUtil.getAuthenticationMethod()
+      authMethod = Enum.valueOf(AuthenticationMethod.class, sAuthMethod.toUpperCase(Locale.ENGLISH));
+    } catch (IllegalArgumentException iae) {
+      throw new IllegalArgumentException("Invalid attribute value for " +
+          HADOOP_SECURITY_AUTHENTICATION + " of " + sAuthMethod, iae);
+    }
+    LOG.debug("Current authMethod = " + ugi.getAuthenticationMethod());
+    return ugi.getAuthenticationMethod().equals(authMethod);
+  }
+
+
+  /**
+   * Read and return Hadoop SASL configuration which can be configured using
+   * "hadoop.rpc.protection"
+   * @param conf
+   * @return Hadoop SASL configuration
+   */
+
+  public abstract Map<String, String> getHadoopSaslProperties(Configuration conf);
+
+  public static class Client {
+    /**
+     * Create a client-side SASL transport that wraps an underlying transport.
+     *
+     * @param methodStr The authentication method to use. Currently only KERBEROS is
+     *               supported.
+     * @param principalConfig The Kerberos principal of the target server.
+     * @param underlyingTransport The underlying transport mechanism, usually a TSocket.
+     * @param saslProps the sasl properties to create the client with
+     */
+
+
+    public TTransport createClientTransport(
+        String principalConfig, String host,
+        String methodStr, String tokenStrForm, final TTransport underlyingTransport,
+        final Map<String, String> saslProps) throws IOException {
+      final AuthMethod method = AuthMethod.valueOf(AuthMethod.class, methodStr);
+
+      TTransport saslTransport = null;
+      switch (method) {
+      case DIGEST:
+        Token<DelegationTokenIdentifier> t= new Token<>();
+        t.decodeFromUrlString(tokenStrForm);
+        saslTransport = new TSaslClientTransport(
+            method.getMechanismName(),
+            null,
+            null, SaslRpcServer.SASL_DEFAULT_REALM,
+            saslProps, new SaslClientCallbackHandler(t),
+            underlyingTransport);
+        return new TUGIAssumingTransport(saslTransport, UserGroupInformation.getCurrentUser());
+
+      case KERBEROS:
+        String serverPrincipal = SecurityUtil.getServerPrincipal(principalConfig, host);
+        final String names[] = SaslRpcServer.splitKerberosName(serverPrincipal);
+        if (names.length != 3) {
+          throw new IOException(
+              "Kerberos principal name does NOT have the expected hostname part: "
+                  + serverPrincipal);
+        }
+        try {
+          return UserGroupInformation.getCurrentUser().doAs(
+              new PrivilegedExceptionAction<TUGIAssumingTransport>() {
+                @Override
+                public TUGIAssumingTransport run() throws IOException {
+                  TTransport saslTransport = new TSaslClientTransport(
+                    method.getMechanismName(),
+                    null,
+                    names[0], names[1],
+                    saslProps, null,
+                    underlyingTransport);
+                  return new TUGIAssumingTransport(saslTransport, UserGroupInformation.getCurrentUser());
+                }
+              });
+        } catch (InterruptedException | SaslException se) {
+          throw new IOException("Could not instantiate SASL transport", se);
+        }
+
+      default:
+        throw new IOException("Unsupported authentication method: " + method);
+      }
+    }
+    private static class SaslClientCallbackHandler implements CallbackHandler {
+      private final String userName;
+      private final char[] userPassword;
+
+      public SaslClientCallbackHandler(Token<? extends TokenIdentifier> token) {
+        this.userName = encodeIdentifier(token.getIdentifier());
+        this.userPassword = encodePassword(token.getPassword());
+      }
+
+
+      @Override
+      public void handle(Callback[] callbacks)
+          throws UnsupportedCallbackException {
+        NameCallback nc = null;
+        PasswordCallback pc = null;
+        RealmCallback rc = null;
+        for (Callback callback : callbacks) {
+          if (callback instanceof RealmChoiceCallback) {
+            continue;
+          } else if (callback instanceof NameCallback) {
+            nc = (NameCallback) callback;
+          } else if (callback instanceof PasswordCallback) {
+            pc = (PasswordCallback) callback;
+          } else if (callback instanceof RealmCallback) {
+            rc = (RealmCallback) callback;
+          } else {
+            throw new UnsupportedCallbackException(callback,
+                "Unrecognized SASL client callback");
+          }
+        }
+        if (nc != null) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("SASL client callback: setting username: " + userName);
+          }
+          nc.setName(userName);
+        }
+        if (pc != null) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("SASL client callback: setting userPassword");
+          }
+          pc.setPassword(userPassword);
+        }
+        if (rc != null) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("SASL client callback: setting realm: "
+                + rc.getDefaultText());
+          }
+          rc.setText(rc.getDefaultText());
+        }
+      }
+
+      static String encodeIdentifier(byte[] identifier) {
+        return new String(Base64.encodeBase64(identifier));
+      }
+
+      static char[] encodePassword(byte[] password) {
+        return new String(Base64.encodeBase64(password)).toCharArray();
+      }
+    }
+  }
+
+  public static class Server {
+    public enum ServerMode {
+      HIVESERVER2, METASTORE
+    };
+
+    protected final UserGroupInformation realUgi;
+    protected DelegationTokenSecretManager secretManager;
+
+    public Server() throws TTransportException {
+      try {
+        realUgi = UserGroupInformation.getCurrentUser();
+      } catch (IOException ioe) {
+        throw new TTransportException(ioe);
+      }
+    }
+    /**
+     * Create a server with a kerberos keytab/principal.
+     */
+    protected Server(String keytabFile, String principalConf)
+        throws TTransportException {
+      if (keytabFile == null || keytabFile.isEmpty()) {
+        throw new TTransportException("No keytab specified");
+      }
+      if (principalConf == null || principalConf.isEmpty()) {
+        throw new TTransportException("No principal specified");
+      }
+
+      // Login from the keytab
+      String kerberosName;
+      try {
+        kerberosName =
+            SecurityUtil.getServerPrincipal(principalConf, "0.0.0.0");
+        UserGroupInformation.loginUserFromKeytab(
+            kerberosName, keytabFile);
+        realUgi = UserGroupInformation.getLoginUser();
+        assert realUgi.isFromKeytab();
+      } catch (IOException ioe) {
+        throw new TTransportException(ioe);
+      }
+    }
+
+    public void setSecretManager(DelegationTokenSecretManager secretManager) {
+      this.secretManager = secretManager;
+    }
+
+    /**
+     * Create a TTransportFactory that, upon connection of a client socket,
+     * negotiates a Kerberized SASL transport. The resulting TTransportFactory
+     * can be passed as both the input and output transport factory when
+     * instantiating a TThreadPoolServer, for example.
+     *
+     * @param saslProps Map of SASL properties
+     */
+
+    public TTransportFactory createTransportFactory(Map<String, String> saslProps)
+        throws TTransportException {
+
+      TSaslServerTransport.Factory transFactory = createSaslServerTransportFactory(saslProps);
+
+      return new TUGIAssumingTransportFactory(transFactory, realUgi);
+    }
+
+    /**
+     * Create a TSaslServerTransport.Factory that, upon connection of a client
+     * socket, negotiates a Kerberized SASL transport.
+     *
+     * @param saslProps Map of SASL properties
+     */
+    public TSaslServerTransport.Factory createSaslServerTransportFactory(
+        Map<String, String> saslProps) throws TTransportException {
+      // Parse out the kerberos principal, host, realm.
+      String kerberosName = realUgi.getUserName();
+      final String names[] = SaslRpcServer.splitKerberosName(kerberosName);
+      if (names.length != 3) {
+        throw new TTransportException("Kerberos principal should have 3 parts: " + kerberosName);
+      }
+
+      TSaslServerTransport.Factory transFactory = new TSaslServerTransport.Factory();
+      transFactory.addServerDefinition(
+          AuthMethod.KERBEROS.getMechanismName(),
+          names[0], names[1],  // two parts of kerberos principal
+          saslProps,
+          new SaslRpcServer.SaslGssCallbackHandler());
+      transFactory.addServerDefinition(AuthMethod.DIGEST.getMechanismName(),
+          null, SaslRpcServer.SASL_DEFAULT_REALM,
+          saslProps, new SaslDigestCallbackHandler(secretManager));
+
+      return transFactory;
+    }
+
+    /**
+     * Wrap a TTransportFactory in such a way that, before processing any RPC, it
+     * assumes the UserGroupInformation of the user authenticated by
+     * the SASL transport.
+     */
+    public TTransportFactory wrapTransportFactory(TTransportFactory transFactory) {
+      return new TUGIAssumingTransportFactory(transFactory, realUgi);
+    }
+
+    /**
+     * Wrap a TProcessor in such a way that, before processing any RPC, it
+     * assumes the UserGroupInformation of the user authenticated by
+     * the SASL transport.
+     */
+
+    public TProcessor wrapProcessor(TProcessor processor) {
+      return new TUGIAssumingProcessor(processor, secretManager, true);
+    }
+
+    /**
+     * Wrap a TProcessor to capture the client information like connecting userid, ip etc
+     */
+
+    public TProcessor wrapNonAssumingProcessor(TProcessor processor) {
+      return new TUGIAssumingProcessor(processor, secretManager, false);
+    }
+
+    final static ThreadLocal<InetAddress> remoteAddress =
+        new ThreadLocal<InetAddress>() {
+
+      @Override
+      protected InetAddress initialValue() {
+        return null;
+      }
+    };
+
+    public InetAddress getRemoteAddress() {
+      return remoteAddress.get();
+    }
+
+    final static ThreadLocal<AuthenticationMethod> authenticationMethod =
+        new ThreadLocal<AuthenticationMethod>() {
+
+      @Override
+      protected AuthenticationMethod initialValue() {
+        return AuthenticationMethod.TOKEN;
+      }
+    };
+
+    private static ThreadLocal<String> remoteUser = new ThreadLocal<String> () {
+
+      @Override
+      protected String initialValue() {
+        return null;
+      }
+    };
+
+
+    public String getRemoteUser() {
+      return remoteUser.get();
+    }
+
+    private final static ThreadLocal<String> userAuthMechanism =
+        new ThreadLocal<String>() {
+
+      @Override
+      protected String initialValue() {
+        return AuthMethod.KERBEROS.getMechanismName();
+      }
+    };
+
+    public String getUserAuthMechanism() {
+      return userAuthMechanism.get();
+    }
+    /** CallbackHandler for SASL DIGEST-MD5 mechanism */
+    // This code is pretty much completely based on Hadoop's
+    // SaslRpcServer.SaslDigestCallbackHandler - the only reason we could not
+    // use that Hadoop class as-is was because it needs a Server.Connection object
+    // which is relevant in hadoop rpc but not here in the metastore - so the
+    // code below does not deal with the Connection Server.object.
+    static class SaslDigestCallbackHandler implements CallbackHandler {
+      private final DelegationTokenSecretManager secretManager;
+
+      public SaslDigestCallbackHandler(
+          DelegationTokenSecretManager secretManager) {
+        this.secretManager = secretManager;
+      }
+
+      private char[] getPassword(DelegationTokenIdentifier tokenid) throws InvalidToken {
+        return encodePassword(secretManager.retrievePassword(tokenid));
+      }
+
+      private char[] encodePassword(byte[] password) {
+        return new String(Base64.encodeBase64(password)).toCharArray();
+      }
+      /** {@inheritDoc} */
+
+      @Override
+      public void handle(Callback[] callbacks) throws InvalidToken,
+      UnsupportedCallbackException {
+        NameCallback nc = null;
+        PasswordCallback pc = null;
+        AuthorizeCallback ac = null;
+        for (Callback callback : callbacks) {
+          if (callback instanceof AuthorizeCallback) {
+            ac = (AuthorizeCallback) callback;
+          } else if (callback instanceof NameCallback) {
+            nc = (NameCallback) callback;
+          } else if (callback instanceof PasswordCallback) {
+            pc = (PasswordCallback) callback;
+          } else if (callback instanceof RealmCallback) {
+            continue; // realm is ignored
+          } else {
+            throw new UnsupportedCallbackException(callback,
+                "Unrecognized SASL DIGEST-MD5 Callback");
+          }
+        }
+        if (pc != null) {
+          DelegationTokenIdentifier tokenIdentifier = SaslRpcServer.
+              getIdentifier(nc.getDefaultName(), secretManager);
+          char[] password = getPassword(tokenIdentifier);
+
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("SASL server DIGEST-MD5 callback: setting password "
+                + "for client: " + tokenIdentifier.getUser());
+          }
+          pc.setPassword(password);
+        }
+        if (ac != null) {
+          String authid = ac.getAuthenticationID();
+          String authzid = ac.getAuthorizationID();
+          if (authid.equals(authzid)) {
+            ac.setAuthorized(true);
+          } else {
+            ac.setAuthorized(false);
+          }
+          if (ac.isAuthorized()) {
+            if (LOG.isDebugEnabled()) {
+              String username =
+                  SaslRpcServer.getIdentifier(authzid, secretManager).getUser().getUserName();
+              LOG.debug("SASL server DIGEST-MD5 callback: setting "
+                  + "canonicalized client ID: " + username);
+            }
+            ac.setAuthorizedID(authzid);
+          }
+        }
+      }
+    }
+
+    /**
+     * Processor that pulls the SaslServer object out of the transport, and
+     * assumes the remote user's UGI before calling through to the original
+     * processor.
+     *
+     * This is used on the server side to set the UGI for each specific call.
+     */
+    protected class TUGIAssumingProcessor implements TProcessor {
+      final TProcessor wrapped;
+      DelegationTokenSecretManager secretManager;
+      boolean useProxy;
+      TUGIAssumingProcessor(TProcessor wrapped, DelegationTokenSecretManager secretManager,
+          boolean useProxy) {
+        this.wrapped = wrapped;
+        this.secretManager = secretManager;
+        this.useProxy = useProxy;
+      }
+
+
+      @Override
+      public boolean process(final TProtocol inProt, final TProtocol outProt) throws TException {
+        TTransport trans = inProt.getTransport();
+        if (!(trans instanceof TSaslServerTransport)) {
+          throw new TException("Unexpected non-SASL transport " + trans.getClass());
+        }
+        TSaslServerTransport saslTrans = (TSaslServerTransport)trans;
+        SaslServer saslServer = saslTrans.getSaslServer();
+        String authId = saslServer.getAuthorizationID();
+        LOG.debug("AUTH ID ======>" + authId);
+        String endUser = authId;
+
+        Socket socket = ((TSocket)(saslTrans.getUnderlyingTransport())).getSocket();
+        remoteAddress.set(socket.getInetAddress());
+
+        String mechanismName = saslServer.getMechanismName();
+        userAuthMechanism.set(mechanismName);
+        if (AuthMethod.PLAIN.getMechanismName().equalsIgnoreCase(mechanismName)) {
+          remoteUser.set(endUser);
+          return wrapped.process(inProt, outProt);
+        }
+
+        authenticationMethod.set(AuthenticationMethod.KERBEROS);
+        if(AuthMethod.TOKEN.getMechanismName().equalsIgnoreCase(mechanismName)) {
+          try {
+            TokenIdentifier tokenId = SaslRpcServer.getIdentifier(authId,
+                secretManager);
+            endUser = tokenId.getUser().getUserName();
+            authenticationMethod.set(AuthenticationMethod.TOKEN);
+          } catch (InvalidToken e) {
+            throw new TException(e.getMessage());
+          }
+        }
+
+        UserGroupInformation clientUgi = null;
+        try {
+          if (useProxy) {
+            clientUgi = UserGroupInformation.createProxyUser(
+                endUser, UserGroupInformation.getLoginUser());
+            remoteUser.set(clientUgi.getShortUserName());
+            LOG.debug("Set remoteUser :" + remoteUser.get());
+            return clientUgi.doAs(new PrivilegedExceptionAction<Boolean>() {
+
+              @Override
+              public Boolean run() {
+                try {
+                  return wrapped.process(inProt, outProt);
+                } catch (TException te) {
+                  throw new RuntimeException(te);
+                }
+              }
+            });
+          } else {
+            // use the short user name for the request
+            UserGroupInformation endUserUgi = UserGroupInformation.createRemoteUser(endUser);
+            remoteUser.set(endUserUgi.getShortUserName());
+            LOG.debug("Set remoteUser :" + remoteUser.get() + ", from endUser :" + endUser);
+            return wrapped.process(inProt, outProt);
+          }
+        } catch (RuntimeException rte) {
+          if (rte.getCause() instanceof TException) {
+            throw (TException)rte.getCause();
+          }
+          throw rte;
+        } catch (InterruptedException ie) {
+          throw new RuntimeException(ie); // unexpected!
+        } catch (IOException ioe) {
+          throw new RuntimeException(ioe); // unexpected!
+        }
+        finally {
+          if (clientUgi != null) {
+            try { FileSystem.closeAllForUGI(clientUgi); }
+            catch(IOException exception) {
+              LOG.error("Could not clean up file-system handles for UGI: " + clientUgi, exception);
+            }
+          }
+        }
+      }
+    }
+
+    /**
+     * A TransportFactory that wraps another one, but assumes a specified UGI
+     * before calling through.
+     *
+     * This is used on the server side to assume the server's Principal when accepting
+     * clients.
+     */
+    static class TUGIAssumingTransportFactory extends TTransportFactory {
+      private final UserGroupInformation ugi;
+      private final TTransportFactory wrapped;
+
+      public TUGIAssumingTransportFactory(TTransportFactory wrapped, UserGroupInformation ugi) {
+        assert wrapped != null;
+        assert ugi != null;
+        this.wrapped = wrapped;
+        this.ugi = ugi;
+      }
+
+
+      @Override
+      public TTransport getTransport(final TTransport trans) {
+        return ugi.doAs(new PrivilegedAction<TTransport>() {
+          @Override
+          public TTransport run() {
+            return wrapped.getTransport(trans);
+          }
+        });
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/e2770d6e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/HadoopThriftAuthBridge23.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/HadoopThriftAuthBridge23.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/HadoopThriftAuthBridge23.java
new file mode 100644
index 0000000..dc76535
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/HadoopThriftAuthBridge23.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.security;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.SaslRpcServer;
+
+/**
+ * Functions that bridge Thrift's SASL transports to Hadoop's SASL callback
+ * handlers and authentication classes.
+ *
+ * This is a 0.23/2.x specific implementation
+ */
+public class HadoopThriftAuthBridge23 extends HadoopThriftAuthBridge {
+
+  private static Field SASL_PROPS_FIELD;
+  private static Class<?> SASL_PROPERTIES_RESOLVER_CLASS;
+  private static Method RES_GET_INSTANCE_METHOD;
+  private static Method GET_DEFAULT_PROP_METHOD;
+  static {
+    SASL_PROPERTIES_RESOLVER_CLASS = null;
+    SASL_PROPS_FIELD = null;
+    final String SASL_PROP_RES_CLASSNAME = "org.apache.hadoop.security.SaslPropertiesResolver";
+    try {
+      SASL_PROPERTIES_RESOLVER_CLASS = Class.forName(SASL_PROP_RES_CLASSNAME);
+
+    } catch (ClassNotFoundException e) {
+    }
+
+    if (SASL_PROPERTIES_RESOLVER_CLASS != null) {
+      // found the class, so this would be hadoop version 2.4 or newer (See
+      // HADOOP-10221, HADOOP-10451)
+      try {
+        RES_GET_INSTANCE_METHOD = SASL_PROPERTIES_RESOLVER_CLASS.getMethod("getInstance",
+            Configuration.class);
+        GET_DEFAULT_PROP_METHOD = SASL_PROPERTIES_RESOLVER_CLASS.getMethod("getDefaultProperties");
+      } catch (Exception e) {
+        // this must be hadoop 2.4 , where getDefaultProperties was protected
+      }
+    }
+
+    if (SASL_PROPERTIES_RESOLVER_CLASS == null || GET_DEFAULT_PROP_METHOD == null) {
+      // this must be a hadoop 2.4 version or earlier.
+      // Resorting to the earlier method of getting the properties, which uses SASL_PROPS field
+      try {
+        SASL_PROPS_FIELD = SaslRpcServer.class.getField("SASL_PROPS");
+      } catch (NoSuchFieldException e) {
+        // Older version of hadoop should have had this field
+        throw new IllegalStateException("Error finding hadoop SASL_PROPS field in "
+            + SaslRpcServer.class.getSimpleName(), e);
+      }
+    }
+  }
+
+  // TODO RIVEN switch this back to package level when we can move TestHadoopAuthBridge23 into
+  // riven.
+  // Package permission so that HadoopThriftAuthBridge can construct it but others cannot.
+  protected HadoopThriftAuthBridge23() {
+
+  }
+
+  /**
+   * Read and return Hadoop SASL configuration which can be configured using
+   * "hadoop.rpc.protection"
+   *
+   * @param conf
+   * @return Hadoop SASL configuration
+   */
+  @SuppressWarnings("unchecked")
+  @Override
+  public Map<String, String> getHadoopSaslProperties(Configuration conf) {
+    if (SASL_PROPS_FIELD != null) {
+      // hadoop 2.4 and earlier way of finding the sasl property settings
+      // Initialize the SaslRpcServer to ensure QOP parameters are read from
+      // conf
+      SaslRpcServer.init(conf);
+      try {
+        return (Map<String, String>) SASL_PROPS_FIELD.get(null);
+      } catch (Exception e) {
+        throw new IllegalStateException("Error finding hadoop SASL properties", e);
+      }
+    }
+    // 2.5 and later way of finding sasl property
+    try {
+      Configurable saslPropertiesResolver = (Configurable) RES_GET_INSTANCE_METHOD.invoke(null,
+          conf);
+      saslPropertiesResolver.setConf(conf);
+      return (Map<String, String>) GET_DEFAULT_PROP_METHOD.invoke(saslPropertiesResolver);
+    } catch (Exception e) {
+      throw new IllegalStateException("Error finding hadoop SASL properties", e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/e2770d6e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/MemoryTokenStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/MemoryTokenStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/MemoryTokenStore.java
new file mode 100644
index 0000000..c484cd3
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/MemoryTokenStore.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.security;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge.Server.ServerMode;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Default in-memory token store implementation.
+ */
+public class MemoryTokenStore implements DelegationTokenStore {
+  private static final Logger LOG = LoggerFactory.getLogger(MemoryTokenStore.class);
+
+  private final Map<Integer, String> masterKeys = new ConcurrentHashMap<>();
+
+  private final ConcurrentHashMap<DelegationTokenIdentifier, DelegationTokenInformation> tokens
+      = new ConcurrentHashMap<>();
+
+  private final AtomicInteger masterKeySeq = new AtomicInteger();
+  private Configuration conf;
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return this.conf;
+  }
+
+  @Override
+  public int addMasterKey(String s) {
+    int keySeq = masterKeySeq.getAndIncrement();
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("addMasterKey: s = " + s + ", keySeq = " + keySeq);
+    }
+    masterKeys.put(keySeq, s);
+    return keySeq;
+  }
+
+  @Override
+  public void updateMasterKey(int keySeq, String s) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("updateMasterKey: s = " + s + ", keySeq = " + keySeq);
+    }
+    masterKeys.put(keySeq, s);
+  }
+
+  @Override
+  public boolean removeMasterKey(int keySeq) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("removeMasterKey: keySeq = " + keySeq);
+    }
+    return masterKeys.remove(keySeq) != null;
+  }
+
+  @Override
+  public String[] getMasterKeys() {
+    return masterKeys.values().toArray(new String[0]);
+  }
+
+  @Override
+  public boolean addToken(DelegationTokenIdentifier tokenIdentifier,
+    DelegationTokenInformation token) {
+    DelegationTokenInformation tokenInfo = tokens.putIfAbsent(tokenIdentifier, token);
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("addToken: tokenIdentifier = " + tokenIdentifier + ", added = " + (tokenInfo == null));
+    }
+    return (tokenInfo == null);
+  }
+
+  @Override
+  public boolean removeToken(DelegationTokenIdentifier tokenIdentifier) {
+    DelegationTokenInformation tokenInfo = tokens.remove(tokenIdentifier);
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("removeToken: tokenIdentifier = " + tokenIdentifier + ", removed = " + (tokenInfo != null));
+    }
+    return tokenInfo != null;
+  }
+
+  @Override
+  public DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier) {
+    DelegationTokenInformation result = tokens.get(tokenIdentifier);
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("getToken: tokenIdentifier = " + tokenIdentifier + ", result = " + result);
+    }
+    return result;
+  }
+
+  @Override
+  public List<DelegationTokenIdentifier> getAllDelegationTokenIdentifiers() {
+    List<DelegationTokenIdentifier> result = new ArrayList<>(
+        tokens.size());
+    for (DelegationTokenIdentifier id : tokens.keySet()) {
+        result.add(id);
+    }
+    return result;
+  }
+
+  @Override
+  public void close() throws IOException {
+    //no-op
+  }
+
+  @Override
+  public void init(Object hmsHandler, ServerMode smode) throws TokenStoreException {
+    // no-op
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/e2770d6e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/MetastoreDelegationTokenManager.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/MetastoreDelegationTokenManager.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/MetastoreDelegationTokenManager.java
new file mode 100644
index 0000000..2b0110f
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/MetastoreDelegationTokenManager.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.hive.metastore.security;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.ProxyUsers;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class MetastoreDelegationTokenManager {
+
+  protected DelegationTokenSecretManager secretManager;
+
+  public MetastoreDelegationTokenManager() {
+  }
+
+  public DelegationTokenSecretManager getSecretManager() {
+    return secretManager;
+  }
+
+  public void startDelegationTokenSecretManager(Configuration conf, Object hms, HadoopThriftAuthBridge.Server.ServerMode smode)
+      throws IOException {
+    long secretKeyInterval = MetastoreConf.getTimeVar(conf,
+        MetastoreConf.ConfVars.DELEGATION_KEY_UPDATE_INTERVAL, TimeUnit.MILLISECONDS);
+    long tokenMaxLifetime = MetastoreConf.getTimeVar(conf,
+        MetastoreConf.ConfVars.DELEGATION_TOKEN_MAX_LIFETIME, TimeUnit.MILLISECONDS);
+    long tokenRenewInterval = MetastoreConf.getTimeVar(conf,
+        MetastoreConf.ConfVars.DELEGATION_TOKEN_RENEW_INTERVAL, TimeUnit.MILLISECONDS);
+    long tokenGcInterval = MetastoreConf.getTimeVar(conf,
+        MetastoreConf.ConfVars.DELEGATION_TOKEN_GC_INTERVAL, TimeUnit.MILLISECONDS);
+
+    DelegationTokenStore dts = getTokenStore(conf);
+    dts.setConf(conf);
+    dts.init(hms, smode);
+    secretManager =
+        new TokenStoreDelegationTokenSecretManager(secretKeyInterval, tokenMaxLifetime,
+            tokenRenewInterval, tokenGcInterval, dts);
+    secretManager.startThreads();
+  }
+
+  public String getDelegationToken(final String owner, final String renewer, String remoteAddr)
+      throws IOException,
+      InterruptedException {
+    /*
+     * If the user asking the token is same as the 'owner' then don't do
+     * any proxy authorization checks. For cases like oozie, where it gets
+     * a delegation token for another user, we need to make sure oozie is
+     * authorized to get a delegation token.
+     */
+    // Do all checks on short names
+    UserGroupInformation currUser = UserGroupInformation.getCurrentUser();
+    UserGroupInformation ownerUgi = UserGroupInformation.createRemoteUser(owner);
+    if (!ownerUgi.getShortUserName().equals(currUser.getShortUserName())) {
+      // in the case of proxy users, the getCurrentUser will return the
+      // real user (for e.g. oozie) due to the doAs that happened just before the
+      // server started executing the method getDelegationToken in the MetaStore
+      ownerUgi = UserGroupInformation.createProxyUser(owner, UserGroupInformation.getCurrentUser());
+      ProxyUsers.authorize(ownerUgi, remoteAddr, null);
+    }
+    return ownerUgi.doAs(new PrivilegedExceptionAction<String>() {
+
+      @Override
+      public String run() throws IOException {
+        return secretManager.getDelegationToken(renewer);
+      }
+    });
+  }
+
+  public String getDelegationTokenWithService(String owner, String renewer, String service, String remoteAddr)
+      throws IOException, InterruptedException {
+    String token = getDelegationToken(owner, renewer, remoteAddr);
+    return addServiceToToken(token, service);
+  }
+
+  public long renewDelegationToken(String tokenStrForm)
+      throws IOException {
+    return secretManager.renewDelegationToken(tokenStrForm);
+  }
+
+  public String getUserFromToken(String tokenStr) throws IOException {
+    return secretManager.getUserFromToken(tokenStr);
+  }
+
+  public void cancelDelegationToken(String tokenStrForm) throws IOException {
+    secretManager.cancelDelegationToken(tokenStrForm);
+  }
+
+  /**
+   * Verify token string
+   * @param tokenStrForm
+   * @return user name
+   * @throws IOException
+   */
+  public String verifyDelegationToken(String tokenStrForm) throws IOException {
+    return secretManager.verifyDelegationToken(tokenStrForm);
+  }
+
+  private DelegationTokenStore getTokenStore(Configuration conf) throws IOException {
+    String tokenStoreClassName =
+        MetastoreConf.getVar(conf, MetastoreConf.ConfVars.DELEGATION_TOKEN_STORE_CLS, "");
+    // The second half of this if is to catch cases where users are passing in a HiveConf for
+    // configuration.  It will have set the default value of
+    // "hive.cluster.delegation.token.store .class" to
+    // "org.apache.hadoop.hive.thrift.MemoryTokenStore" as part of its construction.  But this is
+    // the hive-shims version of the memory store.  We want to convert this to our default value.
+    if (StringUtils.isBlank(tokenStoreClassName) ||
+        "org.apache.hadoop.hive.thrift.MemoryTokenStore".equals(tokenStoreClassName)) {
+      return new MemoryTokenStore();
+    }
+    try {
+      Class<? extends DelegationTokenStore> storeClass =
+          Class.forName(tokenStoreClassName).asSubclass(DelegationTokenStore.class);
+      return ReflectionUtils.newInstance(storeClass, conf);
+    } catch (ClassNotFoundException e) {
+      throw new IOException("Error initializing delegation token store: " + tokenStoreClassName, e);
+    }
+  }
+
+  /**
+   * Add a given service to delegation token string.
+   * @param tokenStr
+   * @param tokenService
+   * @return
+   * @throws IOException
+   */
+  public static String addServiceToToken(String tokenStr, String tokenService)
+      throws IOException {
+    Token<DelegationTokenIdentifier> delegationToken = createToken(tokenStr, tokenService);
+    return delegationToken.encodeToUrlString();
+  }
+
+  /**
+   * Create a new token using the given string and service
+   * @param tokenStr
+   * @param tokenService
+   * @return
+   * @throws IOException
+   */
+  private static Token<DelegationTokenIdentifier> createToken(String tokenStr, String tokenService)
+      throws IOException {
+    Token<DelegationTokenIdentifier> delegationToken = new Token<>();
+    delegationToken.decodeFromUrlString(tokenStr);
+    delegationToken.setService(new Text(tokenService));
+    return delegationToken;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/e2770d6e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/TFilterTransport.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/TFilterTransport.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/TFilterTransport.java
new file mode 100644
index 0000000..79d317a
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/TFilterTransport.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.security;
+
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+/**
+  * Transport that simply wraps another transport.
+  * This is the equivalent of FilterInputStream for Thrift transports.
+  */
+ public class TFilterTransport extends TTransport {
+   protected final TTransport wrapped;
+
+   public TFilterTransport(TTransport wrapped) {
+     this.wrapped = wrapped;
+   }
+
+   @Override
+   public void open() throws TTransportException {
+     wrapped.open();
+   }
+
+   @Override
+   public boolean isOpen() {
+     return wrapped.isOpen();
+   }
+
+   @Override
+   public boolean peek() {
+     return wrapped.peek();
+   }
+
+   @Override
+   public void close() {
+     wrapped.close();
+   }
+
+   @Override
+   public int read(byte[] buf, int off, int len) throws TTransportException {
+     return wrapped.read(buf, off, len);
+   }
+
+   @Override
+   public int readAll(byte[] buf, int off, int len) throws TTransportException {
+     return wrapped.readAll(buf, off, len);
+   }
+
+   @Override
+   public void write(byte[] buf) throws TTransportException {
+     wrapped.write(buf);
+   }
+
+   @Override
+   public void write(byte[] buf, int off, int len) throws TTransportException {
+     wrapped.write(buf, off, len);
+   }
+
+   @Override
+   public void flush() throws TTransportException {
+     wrapped.flush();
+   }
+
+   @Override
+   public byte[] getBuffer() {
+     return wrapped.getBuffer();
+   }
+
+   @Override
+   public int getBufferPosition() {
+     return wrapped.getBufferPosition();
+   }
+
+   @Override
+   public int getBytesRemainingInBuffer() {
+     return wrapped.getBytesRemainingInBuffer();
+   }
+
+   @Override
+   public void consumeBuffer(int len) {
+     wrapped.consumeBuffer(len);
+   }
+ }

http://git-wip-us.apache.org/repos/asf/hive/blob/e2770d6e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/TUGIAssumingTransport.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/TUGIAssumingTransport.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/TUGIAssumingTransport.java
new file mode 100644
index 0000000..38a946e
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/TUGIAssumingTransport.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.security;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+/**
+  * The Thrift SASL transports call Sasl.createSaslServer and Sasl.createSaslClient
+  * inside open(). So, we need to assume the correct UGI when the transport is opened
+  * so that the SASL mechanisms have access to the right principal. This transport
+  * wraps the Sasl transports to set up the right UGI context for open().
+  *
+  * This is used on the client side, where the API explicitly opens a transport to
+  * the server.
+  */
+ public class TUGIAssumingTransport extends TFilterTransport {
+   protected UserGroupInformation ugi;
+
+   public TUGIAssumingTransport(TTransport wrapped, UserGroupInformation ugi) {
+     super(wrapped);
+     this.ugi = ugi;
+   }
+
+   @Override
+   public void open() throws TTransportException {
+     try {
+       ugi.doAs(new PrivilegedExceptionAction<Void>() {
+         public Void run() {
+           try {
+             wrapped.open();
+           } catch (TTransportException tte) {
+             // Wrap the transport exception in an RTE, since UGI.doAs() then goes
+             // and unwraps this for us out of the doAs block. We then unwrap one
+             // more time in our catch clause to get back the TTE. (ugh)
+             throw new RuntimeException(tte);
+           }
+           return null;
+         }
+       });
+     } catch (IOException ioe) {
+       throw new RuntimeException("Received an ioe we never threw!", ioe);
+     } catch (InterruptedException ie) {
+       throw new RuntimeException("Received an ie we never threw!", ie);
+     } catch (RuntimeException rte) {
+       if (rte.getCause() instanceof TTransportException) {
+         throw (TTransportException)rte.getCause();
+       } else {
+         throw rte;
+       }
+     }
+   }
+ }

http://git-wip-us.apache.org/repos/asf/hive/blob/e2770d6e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/TUGIContainingTransport.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/TUGIContainingTransport.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/TUGIContainingTransport.java
new file mode 100644
index 0000000..acfe949
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/TUGIContainingTransport.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.security;
+
+import java.net.Socket;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportFactory;
+
+import com.google.common.collect.MapMaker;
+
+/** TUGIContainingTransport associates ugi information with connection (transport).
+ *  Wraps underlying <code>TSocket</code> transport and annotates it with ugi.
+*/
+
+public class TUGIContainingTransport extends TFilterTransport {
+
+  private UserGroupInformation ugi;
+
+  public TUGIContainingTransport(TTransport wrapped) {
+    super(wrapped);
+  }
+
+  public UserGroupInformation getClientUGI(){
+    return ugi;
+  }
+
+  public void setClientUGI(UserGroupInformation ugi){
+    this.ugi = ugi;
+  }
+
+  /**
+   * If the underlying TTransport is an instance of TSocket, it returns the Socket object
+   * which it contains.  Otherwise it returns null.
+   */
+  public Socket getSocket() {
+    if (wrapped instanceof TSocket) {
+      return (((TSocket)wrapped).getSocket());
+    }
+
+    return null;
+  }
+
+  /** Factory to create TUGIContainingTransport.
+   */
+
+  public static class Factory extends TTransportFactory {
+
+    // Need a concurrent weakhashmap. WeakKeys() so that when underlying transport gets out of
+    // scope, it still can be GC'ed. Since value of map has a ref to key, need weekValues as well.
+    private static final ConcurrentMap<TTransport, TUGIContainingTransport> transMap =
+        new MapMaker().weakKeys().weakValues().makeMap();
+
+    /**
+     * Get a new <code>TUGIContainingTransport</code> instance, or reuse the
+     * existing one if a <code>TUGIContainingTransport</code> has already been
+     * created before using the given <code>TTransport</code> as an underlying
+     * transport. This ensures that a given underlying transport instance
+     * receives the same <code>TUGIContainingTransport</code>.
+     */
+    @Override
+    public TUGIContainingTransport getTransport(TTransport trans) {
+
+      // UGI information is not available at connection setup time, it will be set later
+      // via set_ugi() rpc.
+      TUGIContainingTransport tugiTrans = transMap.get(trans);
+      if (tugiTrans == null) {
+        tugiTrans = new TUGIContainingTransport(trans);
+        TUGIContainingTransport prev = transMap.putIfAbsent(trans, tugiTrans);
+        if (prev != null) {
+          return prev;
+        }
+      }
+      return tugiTrans;
+    }
+  }
+}