You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ga...@apache.org on 2017/08/18 23:01:04 UTC
[2/3] hive git commit: HIVE-17241 Change metastore classes to not use
the shims. This closes #228. (Alan Gates, reviewed by Vaibhav Gumashta)
http://git-wip-us.apache.org/repos/asf/hive/blob/e2770d6e/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
index 6a54306..bbe13fd 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
@@ -65,6 +65,8 @@ import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.metastore.api.Decimal;
import org.apache.hadoop.hive.metastore.api.Order;
import org.apache.hadoop.hive.metastore.api.SkewedInfo;
+import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge;
+import org.apache.hadoop.hive.shims.ShimLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -108,8 +110,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge;
import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hive.common.util.HiveStringUtils;
import org.apache.hive.common.util.ReflectionUtil;
@@ -1258,7 +1258,7 @@ public class MetaStoreUtils {
}
public static int startMetaStore() throws Exception {
- return startMetaStore(ShimLoader.getHadoopThriftAuthBridge(), null);
+ return startMetaStore(HadoopThriftAuthBridge.getBridge(), null);
}
public static int startMetaStore(final HadoopThriftAuthBridge bridge, HiveConf conf) throws Exception {
@@ -1268,7 +1268,7 @@ public class MetaStoreUtils {
}
public static int startMetaStore(HiveConf conf) throws Exception {
- return startMetaStore(ShimLoader.getHadoopThriftAuthBridge(), conf);
+ return startMetaStore(HadoopThriftAuthBridge.getBridge(), conf);
}
public static void startMetaStore(final int port, final HadoopThriftAuthBridge bridge) throws Exception {
http://git-wip-us.apache.org/repos/asf/hive/blob/e2770d6e/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index 897fc4e..b878115 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -121,6 +121,7 @@ import org.apache.hadoop.hive.metastore.api.Type;
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.hadoop.hive.metastore.api.UnknownPartitionException;
import org.apache.hadoop.hive.metastore.api.UnknownTableException;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.model.MColumnDescriptor;
import org.apache.hadoop.hive.metastore.model.MConstraint;
import org.apache.hadoop.hive.metastore.model.MDBPrivilege;
@@ -157,7 +158,6 @@ import org.apache.hadoop.hive.metastore.parser.ExpressionTree.FilterBuilder;
import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
-import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.util.StringUtils;
import org.apache.hive.common.util.HiveStringUtils;
import org.apache.thrift.TException;
@@ -493,8 +493,7 @@ public class ObjectStore implements RawStore, Configurable {
}
// Password may no longer be in the conf, use getPassword()
try {
- String passwd =
- ShimLoader.getHadoopShims().getPassword(conf, HiveConf.ConfVars.METASTOREPWD.varname);
+ String passwd = MetastoreConf.getPassword(conf, MetastoreConf.ConfVars.PWD);
if (passwd != null && !passwd.isEmpty()) {
prop.setProperty(HiveConf.ConfVars.METASTOREPWD.varname, passwd);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/e2770d6e/metastore/src/java/org/apache/hadoop/hive/metastore/TUGIBasedProcessor.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/TUGIBasedProcessor.java b/metastore/src/java/org/apache/hadoop/hive/metastore/TUGIBasedProcessor.java
index 89f4701..64f0b96 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/TUGIBasedProcessor.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/TUGIBasedProcessor.java
@@ -25,13 +25,13 @@ import java.security.PrivilegedExceptionAction;
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.hive.metastore.security.TUGIContainingTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore.Iface;
import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore.set_ugi_args;
import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore.set_ugi_result;
-import org.apache.hadoop.hive.thrift.TUGIContainingTransport;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.thrift.ProcessFunction;
import org.apache.thrift.TApplicationException;
http://git-wip-us.apache.org/repos/asf/hive/blob/e2770d6e/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java b/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java
index 778550b..1dd50de 100755
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java
@@ -33,6 +33,7 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hive.metastore.utils.HdfsUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -52,7 +53,6 @@ import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.util.ReflectionUtils;
/**
@@ -252,7 +252,7 @@ public class Warehouse {
try {
fs = getFs(path);
stat = fs.getFileStatus(path);
- ShimLoader.getHadoopShims().checkFileAccess(fs, stat, FsAction.WRITE);
+ HdfsUtils.checkFileAccess(fs, stat, FsAction.WRITE);
return true;
} catch (FileNotFoundException fnfe){
// File named by path doesn't exist; nothing to validate.
http://git-wip-us.apache.org/repos/asf/hive/blob/e2770d6e/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
index f72c379..0161894 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
@@ -28,10 +28,10 @@ import java.sql.Statement;
import java.util.Properties;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.shims.ShimLoader;
/**
* Utility methods for creating and destroying txn database/schema, plus methods for
@@ -328,8 +328,7 @@ public final class TxnDbUtil {
Properties prop = new Properties();
String driverUrl = HiveConf.getVar(conf, HiveConf.ConfVars.METASTORECONNECTURLKEY);
String user = HiveConf.getVar(conf, HiveConf.ConfVars.METASTORE_CONNECTION_USER_NAME);
- String passwd =
- ShimLoader.getHadoopShims().getPassword(conf, HiveConf.ConfVars.METASTOREPWD.varname);
+ String passwd = MetastoreConf.getPassword(conf, MetastoreConf.ConfVars.PWD);
prop.setProperty("user", user);
prop.setProperty("password", passwd);
Connection conn = driver.connect(driverUrl, prop);
http://git-wip-us.apache.org/repos/asf/hive/blob/e2770d6e/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index b722af6..f3968e4 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.common.classification.RetrySemantics;
import org.apache.hadoop.hive.metastore.DatabaseProduct;
import org.apache.hadoop.hive.metastore.HouseKeeperService;
import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.dbcp.PoolingDataSource;
@@ -45,7 +46,6 @@ import org.apache.hadoop.hive.common.StringableMap;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConfUtil;
import org.apache.hadoop.hive.metastore.api.*;
-import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.util.StringUtils;
import javax.sql.DataSource;
@@ -3701,8 +3701,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
private static String getMetastoreJdbcPasswd(HiveConf conf) throws SQLException {
try {
- return ShimLoader.getHadoopShims().getPassword(conf,
- HiveConf.ConfVars.METASTOREPWD.varname);
+ return MetastoreConf.getPassword(conf, MetastoreConf.ConfVars.PWD);
} catch (IOException err) {
throw new SQLException("Error getting metastore password", err);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/e2770d6e/standalone-metastore/pom.xml
----------------------------------------------------------------------
diff --git a/standalone-metastore/pom.xml b/standalone-metastore/pom.xml
index b8826c6..2e0c51e 100644
--- a/standalone-metastore/pom.xml
+++ b/standalone-metastore/pom.xml
@@ -53,6 +53,22 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <version>${hadoop.version}</version>
+ <optional>true</optional>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commmons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
<!-- This is our one and only Hive dependency.-->
<dependency>
<groupId>org.apache.hive</groupId>
http://git-wip-us.apache.org/repos/asf/hive/blob/e2770d6e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index d8ec1d9..0fb878a 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.metastore.conf;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.security.MetastoreDelegationTokenManager;
import org.apache.hadoop.hive.metastore.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -361,6 +362,17 @@ public class MetastoreConf {
"The default partition name in case the dynamic partition column value is null/empty string or any other values that cannot be escaped. \n" +
"This value must not contain any special character used in HDFS URI (e.g., ':', '%', '/' etc). \n" +
"The user has to be aware that the dynamic partition value should not contain this value to avoid confusions."),
+ DELEGATION_KEY_UPDATE_INTERVAL("metastore.cluster.delegation.key.update-interval",
+ "hive.cluster.delegation.key.update-interval", 1, TimeUnit.DAYS, ""),
+ DELEGATION_TOKEN_GC_INTERVAL("metastore.cluster.delegation.token.gc-interval",
+ "hive.cluster.delegation.token.gc-interval", 1, TimeUnit.HOURS, ""),
+ DELEGATION_TOKEN_MAX_LIFETIME("metastore.cluster.delegation.token.max-lifetime",
+ "hive.cluster.delegation.token.max-lifetime", 7, TimeUnit.DAYS, ""),
+ DELEGATION_TOKEN_RENEW_INTERVAL("metastore.cluster.delegation.token.renew-interval",
+ "hive.cluster.delegation.token.renew-interval", 1, TimeUnit.DAYS, ""),
+ DELEGATION_TOKEN_STORE_CLS("metastore.cluster.delegation.token.store.class",
+ "hive.cluster.delegation.token.store.class", MetastoreDelegationTokenManager.class.getName(),
+ "Class to store delegation tokens"),
DETACH_ALL_ON_COMMIT("javax.jdo.option.DetachAllOnCommit",
"javax.jdo.option.DetachAllOnCommit", true,
"Detaches all objects from session so that they can be used after transaction is committed"),
http://git-wip-us.apache.org/repos/asf/hive/blob/e2770d6e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenIdentifier.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenIdentifier.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenIdentifier.java
new file mode 100644
index 0000000..ba6c7e3
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenIdentifier.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.security;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
+
+/**
+ * A delegation token identifier that is specific to Hive.
+ */
+public class DelegationTokenIdentifier
+ extends AbstractDelegationTokenIdentifier {
+ public static final Text HIVE_DELEGATION_KIND = new Text("HIVE_DELEGATION_TOKEN");
+
+ /**
+ * Create an empty delegation token identifier for reading into.
+ */
+ public DelegationTokenIdentifier() {
+ }
+
+ /**
+ * Create a new delegation token identifier
+ * @param owner the effective username of the token owner
+ * @param renewer the username of the renewer
+ * @param realUser the real username of the token owner
+ */
+ public DelegationTokenIdentifier(Text owner, Text renewer, Text realUser) {
+ super(owner, renewer, realUser);
+ }
+
+ @Override
+ public Text getKind() {
+ return HIVE_DELEGATION_KIND;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/e2770d6e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenSecretManager.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenSecretManager.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenSecretManager.java
new file mode 100644
index 0000000..aae96a5
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenSecretManager.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.security;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
+
+/**
+ * A Hive specific delegation token secret manager.
+ * The secret manager is responsible for generating and accepting the password
+ * for each token.
+ */
+public class DelegationTokenSecretManager
+ extends AbstractDelegationTokenSecretManager<DelegationTokenIdentifier> {
+
+ /**
+ * Create a secret manager
+ * @param delegationKeyUpdateInterval the number of seconds for rolling new
+ * secret keys.
+ * @param delegationTokenMaxLifetime the maximum lifetime of the delegation
+ * tokens
+ * @param delegationTokenRenewInterval how often the tokens must be renewed
+ * @param delegationTokenRemoverScanInterval how often the tokens are scanned
+ * for expired tokens
+ */
+ public DelegationTokenSecretManager(long delegationKeyUpdateInterval,
+ long delegationTokenMaxLifetime,
+ long delegationTokenRenewInterval,
+ long delegationTokenRemoverScanInterval) {
+ super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
+ delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
+ }
+
+ @Override
+ public DelegationTokenIdentifier createIdentifier() {
+ return new DelegationTokenIdentifier();
+ }
+
+ /**
+ * Verify token string
+ * @param tokenStrForm
+ * @return user name
+ * @throws IOException
+ */
+ public synchronized String verifyDelegationToken(String tokenStrForm) throws IOException {
+ Token<DelegationTokenIdentifier> t = new Token<>();
+ t.decodeFromUrlString(tokenStrForm);
+
+ DelegationTokenIdentifier id = getTokenIdentifier(t);
+ verifyToken(id, t.getPassword());
+ return id.getUser().getShortUserName();
+ }
+
+ protected DelegationTokenIdentifier getTokenIdentifier(Token<DelegationTokenIdentifier> token)
+ throws IOException {
+ // turn bytes back into identifier for cache lookup
+ ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
+ DataInputStream in = new DataInputStream(buf);
+ DelegationTokenIdentifier id = createIdentifier();
+ id.readFields(in);
+ return id;
+ }
+
+ public synchronized void cancelDelegationToken(String tokenStrForm) throws IOException {
+ Token<DelegationTokenIdentifier> t= new Token<>();
+ t.decodeFromUrlString(tokenStrForm);
+ String user = UserGroupInformation.getCurrentUser().getUserName();
+ cancelToken(t, user);
+ }
+
+ public synchronized long renewDelegationToken(String tokenStrForm) throws IOException {
+ Token<DelegationTokenIdentifier> t= new Token<>();
+ t.decodeFromUrlString(tokenStrForm);
+ String user = UserGroupInformation.getCurrentUser().getUserName();
+ return renewToken(t, user);
+ }
+
+ public synchronized String getDelegationToken(String renewer) throws IOException {
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ Text owner = new Text(ugi.getUserName());
+ Text realUser = null;
+ if (ugi.getRealUser() != null) {
+ realUser = new Text(ugi.getRealUser().getUserName());
+ }
+ DelegationTokenIdentifier ident =
+ new DelegationTokenIdentifier(owner, new Text(renewer), realUser);
+ Token<DelegationTokenIdentifier> t = new Token<>(
+ ident, this);
+ return t.encodeToUrlString();
+ }
+
+ public String getUserFromToken(String tokenStr) throws IOException {
+ Token<DelegationTokenIdentifier> delegationToken = new Token<>();
+ delegationToken.decodeFromUrlString(tokenStr);
+
+ ByteArrayInputStream buf = new ByteArrayInputStream(delegationToken.getIdentifier());
+ DataInputStream in = new DataInputStream(buf);
+ DelegationTokenIdentifier id = createIdentifier();
+ id.readFields(in);
+ return id.getUser().getShortUserName();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/hive/blob/e2770d6e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenStore.java
new file mode 100644
index 0000000..0cafeff
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenStore.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.security;
+
+import java.io.Closeable;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge.Server.ServerMode;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
+
+/**
+ * Interface for pluggable token store that can be implemented with shared external
+ * storage for load balancing and high availability (for example using ZooKeeper).
+ * Internal, store specific errors are translated into {@link TokenStoreException}.
+ */
+public interface DelegationTokenStore extends Configurable, Closeable {
+
+ /**
+ * Exception for internal token store errors that typically cannot be handled by the caller.
+ */
+ class TokenStoreException extends RuntimeException {
+ private static final long serialVersionUID = -8693819817623074083L;
+
+ public TokenStoreException(Throwable cause) {
+ super(cause);
+ }
+
+ public TokenStoreException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+ /**
+ * Add new master key. The token store assigns and returns the sequence number.
+ * Caller needs to use the identifier to update the key (since it is embedded in the key).
+ *
+ * @param s
+ * @return sequence number for new key
+ */
+ int addMasterKey(String s) throws TokenStoreException;
+
+ /**
+ * Update master key (for expiration and setting store assigned sequence within key)
+ * @param keySeq
+ * @param s
+ * @throws TokenStoreException
+ */
+ void updateMasterKey(int keySeq, String s) throws TokenStoreException;
+
+ /**
+ * Remove key for given id.
+ * @param keySeq
+ * @return false if key no longer present, true otherwise.
+ */
+ boolean removeMasterKey(int keySeq);
+
+ /**
+ * Return all master keys.
+ * @return
+ * @throws TokenStoreException
+ */
+ String[] getMasterKeys() throws TokenStoreException;
+
+ /**
+ * Add token. If identifier is already present, token won't be added.
+ * @param tokenIdentifier
+ * @param token
+ * @return true if token was added, false for existing identifier
+ */
+ boolean addToken(DelegationTokenIdentifier tokenIdentifier,
+ DelegationTokenInformation token) throws TokenStoreException;
+
+ /**
+ * Get token. Returns null if the token does not exist.
+ * @param tokenIdentifier
+ * @return
+ */
+ DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier)
+ throws TokenStoreException;
+
+ /**
+ * Remove token. Return value can be used by caller to detect concurrency.
+ * @param tokenIdentifier
+ * @return true if token was removed, false if it was already removed.
+ * @throws TokenStoreException
+ */
+ boolean removeToken(DelegationTokenIdentifier tokenIdentifier) throws TokenStoreException;
+
+ /**
+ * List of all token identifiers in the store. This is used to remove expired tokens
+ * and a potential scalability improvement would be to partition by master key id
+ * @return
+ */
+ List<DelegationTokenIdentifier> getAllDelegationTokenIdentifiers() throws TokenStoreException;
+
+ /**
+ * @param hmsHandler ObjectStore used by DBTokenStore
+ * @param smode Indicate whether this is a metastore or hiveserver2 token store
+ */
+ void init(Object hmsHandler, ServerMode smode);
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/e2770d6e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/HadoopThriftAuthBridge.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/HadoopThriftAuthBridge.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/HadoopThriftAuthBridge.java
new file mode 100644
index 0000000..3f02ffd
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/HadoopThriftAuthBridge.java
@@ -0,0 +1,684 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.security;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
+import java.util.Locale;
+import java.util.Map;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.RealmChoiceCallback;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+import org.apache.commons.codec.binary.Base64;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.thrift.TException;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSaslClientTransport;
+import org.apache.thrift.transport.TSaslServerTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.transport.TTransportFactory;
+
+/**
+ * Functions that bridge Thrift's SASL transports to Hadoop's
+ * SASL callback handlers and authentication classes.
+ * HIVE-11378 This class is not directly used anymore. It now exists only as a shell to be
+ * extended by HadoopThriftAuthBridge23 in 0.23 shims. I have made it abstract
+ * to avoid maintenance errors.
+ */
+public abstract class HadoopThriftAuthBridge {
+ private static final Logger LOG = LoggerFactory.getLogger(HadoopThriftAuthBridge.class);
+
+ // We want to have only one auth bridge. In the past this was handled by ShimLoader, but since
+ // we're no longer using that we'll do it here.
+ private static HadoopThriftAuthBridge self = null;
+
+ public static HadoopThriftAuthBridge getBridge() {
+ if (self == null) {
+ synchronized (HadoopThriftAuthBridge.class) {
+ if (self == null) self = new HadoopThriftAuthBridge23();
+ }
+ }
+ return self;
+ }
+
+ public Client createClient() {
+ return new Client();
+ }
+
+ public Client createClientWithConf(String authMethod) {
+ UserGroupInformation ugi;
+ try {
+ ugi = UserGroupInformation.getLoginUser();
+ } catch(IOException e) {
+ throw new IllegalStateException("Unable to get current login user: " + e, e);
+ }
+ if (loginUserHasCurrentAuthMethod(ugi, authMethod)) {
+ LOG.debug("Not setting UGI conf as passed-in authMethod of " + authMethod + " = current.");
+ return new Client();
+ } else {
+ LOG.debug("Setting UGI conf as passed-in authMethod of " + authMethod + " != current.");
+ Configuration conf = new Configuration();
+ conf.set(HADOOP_SECURITY_AUTHENTICATION, authMethod);
+ UserGroupInformation.setConfiguration(conf);
+ return new Client();
+ }
+ }
+
+ public Server createServer(String keytabFile, String principalConf) throws TTransportException {
+ return new Server(keytabFile, principalConf);
+ }
+
+
+ public String getServerPrincipal(String principalConfig, String host)
+ throws IOException {
+ String serverPrincipal = SecurityUtil.getServerPrincipal(principalConfig, host);
+ String names[] = SaslRpcServer.splitKerberosName(serverPrincipal);
+ if (names.length != 3) {
+ throw new IOException(
+ "Kerberos principal name does NOT have the expected hostname part: "
+ + serverPrincipal);
+ }
+ return serverPrincipal;
+ }
+
+ /**
+ * Method to get canonical-ized hostname, given a hostname (possibly a CNAME).
+ * This should allow for service-principals to use simplified CNAMEs.
+ * @param hostName The hostname to be canonical-ized.
+ * @return Given a CNAME, the canonical-ized hostname is returned. If not found, the original hostname is returned.
+ */
+ public String getCanonicalHostName(String hostName) {
+ try {
+ return InetAddress.getByName(hostName).getCanonicalHostName();
+ }
+ catch(UnknownHostException exception) {
+ LOG.warn("Could not retrieve canonical hostname for " + hostName, exception);
+ return hostName;
+ }
+ }
+
+ public UserGroupInformation getCurrentUGIWithConf(String authMethod)
+ throws IOException {
+ UserGroupInformation ugi;
+ try {
+ ugi = UserGroupInformation.getCurrentUser();
+ } catch(IOException e) {
+ throw new IllegalStateException("Unable to get current user: " + e, e);
+ }
+ if (loginUserHasCurrentAuthMethod(ugi, authMethod)) {
+ LOG.debug("Not setting UGI conf as passed-in authMethod of " + authMethod + " = current.");
+ return ugi;
+ } else {
+ LOG.debug("Setting UGI conf as passed-in authMethod of " + authMethod + " != current.");
+ Configuration conf = new Configuration();
+ conf.set(HADOOP_SECURITY_AUTHENTICATION, authMethod);
+ UserGroupInformation.setConfiguration(conf);
+ return UserGroupInformation.getCurrentUser();
+ }
+ }
+
+ /**
+ * Return true if the current login user is already using the given authMethod.
+ *
+ * Used above to ensure we do not create a new Configuration object and as such
+ * lose other settings such as the cluster to which the JVM is connected. Required
+ * for oozie since it does not have a core-site.xml see HIVE-7682
+ */
+ private boolean loginUserHasCurrentAuthMethod(UserGroupInformation ugi, String sAuthMethod) {
+ AuthenticationMethod authMethod;
+ try {
+ // based on SecurityUtil.getAuthenticationMethod()
+ authMethod = Enum.valueOf(AuthenticationMethod.class, sAuthMethod.toUpperCase(Locale.ENGLISH));
+ } catch (IllegalArgumentException iae) {
+ throw new IllegalArgumentException("Invalid attribute value for " +
+ HADOOP_SECURITY_AUTHENTICATION + " of " + sAuthMethod, iae);
+ }
+ LOG.debug("Current authMethod = " + ugi.getAuthenticationMethod());
+ return ugi.getAuthenticationMethod().equals(authMethod);
+ }
+
+
+ /**
+ * Read and return Hadoop SASL configuration which can be configured using
+ * "hadoop.rpc.protection"
+ * @param conf
+ * @return Hadoop SASL configuration
+ */
+
+ public abstract Map<String, String> getHadoopSaslProperties(Configuration conf);
+
+ public static class Client {
+ /**
+ * Create a client-side SASL transport that wraps an underlying transport.
+ *
+ * @param methodStr The authentication method to use. Currently only KERBEROS is
+ * supported.
+ * @param principalConfig The Kerberos principal of the target server.
+ * @param underlyingTransport The underlying transport mechanism, usually a TSocket.
+ * @param saslProps the sasl properties to create the client with
+ */
+
+
+ public TTransport createClientTransport(
+ String principalConfig, String host,
+ String methodStr, String tokenStrForm, final TTransport underlyingTransport,
+ final Map<String, String> saslProps) throws IOException {
+ final AuthMethod method = AuthMethod.valueOf(AuthMethod.class, methodStr);
+
+ TTransport saslTransport = null;
+ switch (method) {
+ case DIGEST:
+ Token<DelegationTokenIdentifier> t= new Token<>();
+ t.decodeFromUrlString(tokenStrForm);
+ saslTransport = new TSaslClientTransport(
+ method.getMechanismName(),
+ null,
+ null, SaslRpcServer.SASL_DEFAULT_REALM,
+ saslProps, new SaslClientCallbackHandler(t),
+ underlyingTransport);
+ return new TUGIAssumingTransport(saslTransport, UserGroupInformation.getCurrentUser());
+
+ case KERBEROS:
+ String serverPrincipal = SecurityUtil.getServerPrincipal(principalConfig, host);
+ final String names[] = SaslRpcServer.splitKerberosName(serverPrincipal);
+ if (names.length != 3) {
+ throw new IOException(
+ "Kerberos principal name does NOT have the expected hostname part: "
+ + serverPrincipal);
+ }
+ try {
+ return UserGroupInformation.getCurrentUser().doAs(
+ new PrivilegedExceptionAction<TUGIAssumingTransport>() {
+ @Override
+ public TUGIAssumingTransport run() throws IOException {
+ TTransport saslTransport = new TSaslClientTransport(
+ method.getMechanismName(),
+ null,
+ names[0], names[1],
+ saslProps, null,
+ underlyingTransport);
+ return new TUGIAssumingTransport(saslTransport, UserGroupInformation.getCurrentUser());
+ }
+ });
+ } catch (InterruptedException | SaslException se) {
+ throw new IOException("Could not instantiate SASL transport", se);
+ }
+
+ default:
+ throw new IOException("Unsupported authentication method: " + method);
+ }
+ }
+ private static class SaslClientCallbackHandler implements CallbackHandler {
+ private final String userName;
+ private final char[] userPassword;
+
+ public SaslClientCallbackHandler(Token<? extends TokenIdentifier> token) {
+ this.userName = encodeIdentifier(token.getIdentifier());
+ this.userPassword = encodePassword(token.getPassword());
+ }
+
+
+ @Override
+ public void handle(Callback[] callbacks)
+ throws UnsupportedCallbackException {
+ NameCallback nc = null;
+ PasswordCallback pc = null;
+ RealmCallback rc = null;
+ for (Callback callback : callbacks) {
+ if (callback instanceof RealmChoiceCallback) {
+ continue;
+ } else if (callback instanceof NameCallback) {
+ nc = (NameCallback) callback;
+ } else if (callback instanceof PasswordCallback) {
+ pc = (PasswordCallback) callback;
+ } else if (callback instanceof RealmCallback) {
+ rc = (RealmCallback) callback;
+ } else {
+ throw new UnsupportedCallbackException(callback,
+ "Unrecognized SASL client callback");
+ }
+ }
+ if (nc != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SASL client callback: setting username: " + userName);
+ }
+ nc.setName(userName);
+ }
+ if (pc != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SASL client callback: setting userPassword");
+ }
+ pc.setPassword(userPassword);
+ }
+ if (rc != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SASL client callback: setting realm: "
+ + rc.getDefaultText());
+ }
+ rc.setText(rc.getDefaultText());
+ }
+ }
+
+ static String encodeIdentifier(byte[] identifier) {
+ return new String(Base64.encodeBase64(identifier));
+ }
+
+ static char[] encodePassword(byte[] password) {
+ return new String(Base64.encodeBase64(password)).toCharArray();
+ }
+ }
+ }
+
+ public static class Server {
+ public enum ServerMode {
+ HIVESERVER2, METASTORE
+ };
+
+ protected final UserGroupInformation realUgi;
+ protected DelegationTokenSecretManager secretManager;
+
+ public Server() throws TTransportException {
+ try {
+ realUgi = UserGroupInformation.getCurrentUser();
+ } catch (IOException ioe) {
+ throw new TTransportException(ioe);
+ }
+ }
+ /**
+ * Create a server with a kerberos keytab/principal.
+ */
+ protected Server(String keytabFile, String principalConf)
+ throws TTransportException {
+ if (keytabFile == null || keytabFile.isEmpty()) {
+ throw new TTransportException("No keytab specified");
+ }
+ if (principalConf == null || principalConf.isEmpty()) {
+ throw new TTransportException("No principal specified");
+ }
+
+ // Login from the keytab
+ String kerberosName;
+ try {
+ kerberosName =
+ SecurityUtil.getServerPrincipal(principalConf, "0.0.0.0");
+ UserGroupInformation.loginUserFromKeytab(
+ kerberosName, keytabFile);
+ realUgi = UserGroupInformation.getLoginUser();
+ assert realUgi.isFromKeytab();
+ } catch (IOException ioe) {
+ throw new TTransportException(ioe);
+ }
+ }
+
+ public void setSecretManager(DelegationTokenSecretManager secretManager) {
+ this.secretManager = secretManager;
+ }
+
+ /**
+ * Create a TTransportFactory that, upon connection of a client socket,
+ * negotiates a Kerberized SASL transport. The resulting TTransportFactory
+ * can be passed as both the input and output transport factory when
+ * instantiating a TThreadPoolServer, for example.
+ *
+ * @param saslProps Map of SASL properties
+ */
+
+ public TTransportFactory createTransportFactory(Map<String, String> saslProps)
+ throws TTransportException {
+
+ TSaslServerTransport.Factory transFactory = createSaslServerTransportFactory(saslProps);
+
+ return new TUGIAssumingTransportFactory(transFactory, realUgi);
+ }
+
+ /**
+ * Create a TSaslServerTransport.Factory that, upon connection of a client
+ * socket, negotiates a Kerberized SASL transport.
+ *
+ * @param saslProps Map of SASL properties
+ */
+ public TSaslServerTransport.Factory createSaslServerTransportFactory(
+ Map<String, String> saslProps) throws TTransportException {
+ // Parse out the kerberos principal, host, realm.
+ String kerberosName = realUgi.getUserName();
+ final String names[] = SaslRpcServer.splitKerberosName(kerberosName);
+ if (names.length != 3) {
+ throw new TTransportException("Kerberos principal should have 3 parts: " + kerberosName);
+ }
+
+ TSaslServerTransport.Factory transFactory = new TSaslServerTransport.Factory();
+ transFactory.addServerDefinition(
+ AuthMethod.KERBEROS.getMechanismName(),
+ names[0], names[1], // two parts of kerberos principal
+ saslProps,
+ new SaslRpcServer.SaslGssCallbackHandler());
+ transFactory.addServerDefinition(AuthMethod.DIGEST.getMechanismName(),
+ null, SaslRpcServer.SASL_DEFAULT_REALM,
+ saslProps, new SaslDigestCallbackHandler(secretManager));
+
+ return transFactory;
+ }
+
+ /**
+ * Wrap a TTransportFactory in such a way that, before processing any RPC, it
+ * assumes the UserGroupInformation of the user authenticated by
+ * the SASL transport.
+ */
+ public TTransportFactory wrapTransportFactory(TTransportFactory transFactory) {
+ return new TUGIAssumingTransportFactory(transFactory, realUgi);
+ }
+
+ /**
+ * Wrap a TProcessor in such a way that, before processing any RPC, it
+ * assumes the UserGroupInformation of the user authenticated by
+ * the SASL transport.
+ */
+
+ public TProcessor wrapProcessor(TProcessor processor) {
+ return new TUGIAssumingProcessor(processor, secretManager, true);
+ }
+
+ /**
+ * Wrap a TProcessor to capture the client information like connecting userid, ip etc
+ */
+
+ public TProcessor wrapNonAssumingProcessor(TProcessor processor) {
+ return new TUGIAssumingProcessor(processor, secretManager, false);
+ }
+
+ final static ThreadLocal<InetAddress> remoteAddress =
+ new ThreadLocal<InetAddress>() {
+
+ @Override
+ protected InetAddress initialValue() {
+ return null;
+ }
+ };
+
+ public InetAddress getRemoteAddress() {
+ return remoteAddress.get();
+ }
+
+ final static ThreadLocal<AuthenticationMethod> authenticationMethod =
+ new ThreadLocal<AuthenticationMethod>() {
+
+ @Override
+ protected AuthenticationMethod initialValue() {
+ return AuthenticationMethod.TOKEN;
+ }
+ };
+
+ private static ThreadLocal<String> remoteUser = new ThreadLocal<String> () {
+
+ @Override
+ protected String initialValue() {
+ return null;
+ }
+ };
+
+
+ public String getRemoteUser() {
+ return remoteUser.get();
+ }
+
+ private final static ThreadLocal<String> userAuthMechanism =
+ new ThreadLocal<String>() {
+
+ @Override
+ protected String initialValue() {
+ return AuthMethod.KERBEROS.getMechanismName();
+ }
+ };
+
+ public String getUserAuthMechanism() {
+ return userAuthMechanism.get();
+ }
+ /** CallbackHandler for SASL DIGEST-MD5 mechanism */
+ // This code is pretty much completely based on Hadoop's
+ // SaslRpcServer.SaslDigestCallbackHandler - the only reason we could not
+ // use that Hadoop class as-is was because it needs a Server.Connection object
+ // which is relevant in hadoop rpc but not here in the metastore - so the
+ // code below does not deal with the Connection Server.object.
+ static class SaslDigestCallbackHandler implements CallbackHandler {
+ private final DelegationTokenSecretManager secretManager;
+
+ public SaslDigestCallbackHandler(
+ DelegationTokenSecretManager secretManager) {
+ this.secretManager = secretManager;
+ }
+
+ private char[] getPassword(DelegationTokenIdentifier tokenid) throws InvalidToken {
+ return encodePassword(secretManager.retrievePassword(tokenid));
+ }
+
+ private char[] encodePassword(byte[] password) {
+ return new String(Base64.encodeBase64(password)).toCharArray();
+ }
+ /** {@inheritDoc} */
+
+ @Override
+ public void handle(Callback[] callbacks) throws InvalidToken,
+ UnsupportedCallbackException {
+ NameCallback nc = null;
+ PasswordCallback pc = null;
+ AuthorizeCallback ac = null;
+ for (Callback callback : callbacks) {
+ if (callback instanceof AuthorizeCallback) {
+ ac = (AuthorizeCallback) callback;
+ } else if (callback instanceof NameCallback) {
+ nc = (NameCallback) callback;
+ } else if (callback instanceof PasswordCallback) {
+ pc = (PasswordCallback) callback;
+ } else if (callback instanceof RealmCallback) {
+ continue; // realm is ignored
+ } else {
+ throw new UnsupportedCallbackException(callback,
+ "Unrecognized SASL DIGEST-MD5 Callback");
+ }
+ }
+ if (pc != null) {
+ DelegationTokenIdentifier tokenIdentifier = SaslRpcServer.
+ getIdentifier(nc.getDefaultName(), secretManager);
+ char[] password = getPassword(tokenIdentifier);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SASL server DIGEST-MD5 callback: setting password "
+ + "for client: " + tokenIdentifier.getUser());
+ }
+ pc.setPassword(password);
+ }
+ if (ac != null) {
+ String authid = ac.getAuthenticationID();
+ String authzid = ac.getAuthorizationID();
+ if (authid.equals(authzid)) {
+ ac.setAuthorized(true);
+ } else {
+ ac.setAuthorized(false);
+ }
+ if (ac.isAuthorized()) {
+ if (LOG.isDebugEnabled()) {
+ String username =
+ SaslRpcServer.getIdentifier(authzid, secretManager).getUser().getUserName();
+ LOG.debug("SASL server DIGEST-MD5 callback: setting "
+ + "canonicalized client ID: " + username);
+ }
+ ac.setAuthorizedID(authzid);
+ }
+ }
+ }
+ }
+
+ /**
+ * Processor that pulls the SaslServer object out of the transport, and
+ * assumes the remote user's UGI before calling through to the original
+ * processor.
+ *
+ * This is used on the server side to set the UGI for each specific call.
+ */
+ protected class TUGIAssumingProcessor implements TProcessor {
+ final TProcessor wrapped;
+ DelegationTokenSecretManager secretManager;
+ boolean useProxy;
+ TUGIAssumingProcessor(TProcessor wrapped, DelegationTokenSecretManager secretManager,
+ boolean useProxy) {
+ this.wrapped = wrapped;
+ this.secretManager = secretManager;
+ this.useProxy = useProxy;
+ }
+
+
+ @Override
+ public boolean process(final TProtocol inProt, final TProtocol outProt) throws TException {
+ TTransport trans = inProt.getTransport();
+ if (!(trans instanceof TSaslServerTransport)) {
+ throw new TException("Unexpected non-SASL transport " + trans.getClass());
+ }
+ TSaslServerTransport saslTrans = (TSaslServerTransport)trans;
+ SaslServer saslServer = saslTrans.getSaslServer();
+ String authId = saslServer.getAuthorizationID();
+ LOG.debug("AUTH ID ======>" + authId);
+ String endUser = authId;
+
+ Socket socket = ((TSocket)(saslTrans.getUnderlyingTransport())).getSocket();
+ remoteAddress.set(socket.getInetAddress());
+
+ String mechanismName = saslServer.getMechanismName();
+ userAuthMechanism.set(mechanismName);
+ if (AuthMethod.PLAIN.getMechanismName().equalsIgnoreCase(mechanismName)) {
+ remoteUser.set(endUser);
+ return wrapped.process(inProt, outProt);
+ }
+
+ authenticationMethod.set(AuthenticationMethod.KERBEROS);
+ if(AuthMethod.TOKEN.getMechanismName().equalsIgnoreCase(mechanismName)) {
+ try {
+ TokenIdentifier tokenId = SaslRpcServer.getIdentifier(authId,
+ secretManager);
+ endUser = tokenId.getUser().getUserName();
+ authenticationMethod.set(AuthenticationMethod.TOKEN);
+ } catch (InvalidToken e) {
+ throw new TException(e.getMessage());
+ }
+ }
+
+ UserGroupInformation clientUgi = null;
+ try {
+ if (useProxy) {
+ clientUgi = UserGroupInformation.createProxyUser(
+ endUser, UserGroupInformation.getLoginUser());
+ remoteUser.set(clientUgi.getShortUserName());
+ LOG.debug("Set remoteUser :" + remoteUser.get());
+ return clientUgi.doAs(new PrivilegedExceptionAction<Boolean>() {
+
+ @Override
+ public Boolean run() {
+ try {
+ return wrapped.process(inProt, outProt);
+ } catch (TException te) {
+ throw new RuntimeException(te);
+ }
+ }
+ });
+ } else {
+ // use the short user name for the request
+ UserGroupInformation endUserUgi = UserGroupInformation.createRemoteUser(endUser);
+ remoteUser.set(endUserUgi.getShortUserName());
+ LOG.debug("Set remoteUser :" + remoteUser.get() + ", from endUser :" + endUser);
+ return wrapped.process(inProt, outProt);
+ }
+ } catch (RuntimeException rte) {
+ if (rte.getCause() instanceof TException) {
+ throw (TException)rte.getCause();
+ }
+ throw rte;
+ } catch (InterruptedException ie) {
+ throw new RuntimeException(ie); // unexpected!
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe); // unexpected!
+ }
+ finally {
+ if (clientUgi != null) {
+ try { FileSystem.closeAllForUGI(clientUgi); }
+ catch(IOException exception) {
+ LOG.error("Could not clean up file-system handles for UGI: " + clientUgi, exception);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * A TransportFactory that wraps another one, but assumes a specified UGI
+ * before calling through.
+ *
+ * This is used on the server side to assume the server's Principal when accepting
+ * clients.
+ */
+ static class TUGIAssumingTransportFactory extends TTransportFactory {
+ private final UserGroupInformation ugi;
+ private final TTransportFactory wrapped;
+
+ public TUGIAssumingTransportFactory(TTransportFactory wrapped, UserGroupInformation ugi) {
+ assert wrapped != null;
+ assert ugi != null;
+ this.wrapped = wrapped;
+ this.ugi = ugi;
+ }
+
+
+ @Override
+ public TTransport getTransport(final TTransport trans) {
+ return ugi.doAs(new PrivilegedAction<TTransport>() {
+ @Override
+ public TTransport run() {
+ return wrapped.getTransport(trans);
+ }
+ });
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/e2770d6e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/HadoopThriftAuthBridge23.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/HadoopThriftAuthBridge23.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/HadoopThriftAuthBridge23.java
new file mode 100644
index 0000000..dc76535
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/HadoopThriftAuthBridge23.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.security;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.SaslRpcServer;
+
+/**
+ * Functions that bridge Thrift's SASL transports to Hadoop's SASL callback
+ * handlers and authentication classes.
+ *
+ * This is a 0.23/2.x specific implementation
+ */
+public class HadoopThriftAuthBridge23 extends HadoopThriftAuthBridge {
+
+ private static Field SASL_PROPS_FIELD;
+ private static Class<?> SASL_PROPERTIES_RESOLVER_CLASS;
+ private static Method RES_GET_INSTANCE_METHOD;
+ private static Method GET_DEFAULT_PROP_METHOD;
+ static {
+ SASL_PROPERTIES_RESOLVER_CLASS = null;
+ SASL_PROPS_FIELD = null;
+ final String SASL_PROP_RES_CLASSNAME = "org.apache.hadoop.security.SaslPropertiesResolver";
+ try {
+ SASL_PROPERTIES_RESOLVER_CLASS = Class.forName(SASL_PROP_RES_CLASSNAME);
+
+ } catch (ClassNotFoundException e) {
+ }
+
+ if (SASL_PROPERTIES_RESOLVER_CLASS != null) {
+ // found the class, so this would be hadoop version 2.4 or newer (See
+ // HADOOP-10221, HADOOP-10451)
+ try {
+ RES_GET_INSTANCE_METHOD = SASL_PROPERTIES_RESOLVER_CLASS.getMethod("getInstance",
+ Configuration.class);
+ GET_DEFAULT_PROP_METHOD = SASL_PROPERTIES_RESOLVER_CLASS.getMethod("getDefaultProperties");
+ } catch (Exception e) {
+ // this must be hadoop 2.4 , where getDefaultProperties was protected
+ }
+ }
+
+ if (SASL_PROPERTIES_RESOLVER_CLASS == null || GET_DEFAULT_PROP_METHOD == null) {
+ // this must be a hadoop 2.4 version or earlier.
+ // Resorting to the earlier method of getting the properties, which uses SASL_PROPS field
+ try {
+ SASL_PROPS_FIELD = SaslRpcServer.class.getField("SASL_PROPS");
+ } catch (NoSuchFieldException e) {
+ // Older version of hadoop should have had this field
+ throw new IllegalStateException("Error finding hadoop SASL_PROPS field in "
+ + SaslRpcServer.class.getSimpleName(), e);
+ }
+ }
+ }
+
+ // TODO RIVEN switch this back to package level when we can move TestHadoopAuthBridge23 into
+ // riven.
+ // Package permission so that HadoopThriftAuthBridge can construct it but others cannot.
+ protected HadoopThriftAuthBridge23() {
+
+ }
+
+ /**
+ * Read and return Hadoop SASL configuration which can be configured using
+ * "hadoop.rpc.protection"
+ *
+ * @param conf
+ * @return Hadoop SASL configuration
+ */
+ @SuppressWarnings("unchecked")
+ @Override
+ public Map<String, String> getHadoopSaslProperties(Configuration conf) {
+ if (SASL_PROPS_FIELD != null) {
+ // hadoop 2.4 and earlier way of finding the sasl property settings
+ // Initialize the SaslRpcServer to ensure QOP parameters are read from
+ // conf
+ SaslRpcServer.init(conf);
+ try {
+ return (Map<String, String>) SASL_PROPS_FIELD.get(null);
+ } catch (Exception e) {
+ throw new IllegalStateException("Error finding hadoop SASL properties", e);
+ }
+ }
+ // 2.5 and later way of finding sasl property
+ try {
+ Configurable saslPropertiesResolver = (Configurable) RES_GET_INSTANCE_METHOD.invoke(null,
+ conf);
+ saslPropertiesResolver.setConf(conf);
+ return (Map<String, String>) GET_DEFAULT_PROP_METHOD.invoke(saslPropertiesResolver);
+ } catch (Exception e) {
+ throw new IllegalStateException("Error finding hadoop SASL properties", e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/e2770d6e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/MemoryTokenStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/MemoryTokenStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/MemoryTokenStore.java
new file mode 100644
index 0000000..c484cd3
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/MemoryTokenStore.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.security;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge.Server.ServerMode;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Default in-memory token store implementation.
+ */
+public class MemoryTokenStore implements DelegationTokenStore {
+ private static final Logger LOG = LoggerFactory.getLogger(MemoryTokenStore.class);
+
+ private final Map<Integer, String> masterKeys = new ConcurrentHashMap<>();
+
+ private final ConcurrentHashMap<DelegationTokenIdentifier, DelegationTokenInformation> tokens
+ = new ConcurrentHashMap<>();
+
+ private final AtomicInteger masterKeySeq = new AtomicInteger();
+ private Configuration conf;
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return this.conf;
+ }
+
+ @Override
+ public int addMasterKey(String s) {
+ int keySeq = masterKeySeq.getAndIncrement();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("addMasterKey: s = " + s + ", keySeq = " + keySeq);
+ }
+ masterKeys.put(keySeq, s);
+ return keySeq;
+ }
+
+ @Override
+ public void updateMasterKey(int keySeq, String s) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("updateMasterKey: s = " + s + ", keySeq = " + keySeq);
+ }
+ masterKeys.put(keySeq, s);
+ }
+
+ @Override
+ public boolean removeMasterKey(int keySeq) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("removeMasterKey: keySeq = " + keySeq);
+ }
+ return masterKeys.remove(keySeq) != null;
+ }
+
+ @Override
+ public String[] getMasterKeys() {
+ return masterKeys.values().toArray(new String[0]);
+ }
+
+ @Override
+ public boolean addToken(DelegationTokenIdentifier tokenIdentifier,
+ DelegationTokenInformation token) {
+ DelegationTokenInformation tokenInfo = tokens.putIfAbsent(tokenIdentifier, token);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("addToken: tokenIdentifier = " + tokenIdentifier + ", added = " + (tokenInfo == null));
+ }
+ return (tokenInfo == null);
+ }
+
+ @Override
+ public boolean removeToken(DelegationTokenIdentifier tokenIdentifier) {
+ DelegationTokenInformation tokenInfo = tokens.remove(tokenIdentifier);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("removeToken: tokenIdentifier = " + tokenIdentifier + ", removed = " + (tokenInfo != null));
+ }
+ return tokenInfo != null;
+ }
+
+ @Override
+ public DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier) {
+ DelegationTokenInformation result = tokens.get(tokenIdentifier);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("getToken: tokenIdentifier = " + tokenIdentifier + ", result = " + result);
+ }
+ return result;
+ }
+
+ @Override
+ public List<DelegationTokenIdentifier> getAllDelegationTokenIdentifiers() {
+ List<DelegationTokenIdentifier> result = new ArrayList<>(
+ tokens.size());
+ for (DelegationTokenIdentifier id : tokens.keySet()) {
+ result.add(id);
+ }
+ return result;
+ }
+
+ @Override
+ public void close() throws IOException {
+ //no-op
+ }
+
+ @Override
+ public void init(Object hmsHandler, ServerMode smode) throws TokenStoreException {
+ // no-op
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/e2770d6e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/MetastoreDelegationTokenManager.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/MetastoreDelegationTokenManager.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/MetastoreDelegationTokenManager.java
new file mode 100644
index 0000000..2b0110f
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/MetastoreDelegationTokenManager.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.hive.metastore.security;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.ProxyUsers;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class MetastoreDelegationTokenManager {
+
+ protected DelegationTokenSecretManager secretManager;
+
+ public MetastoreDelegationTokenManager() {
+ }
+
+ public DelegationTokenSecretManager getSecretManager() {
+ return secretManager;
+ }
+
+ public void startDelegationTokenSecretManager(Configuration conf, Object hms, HadoopThriftAuthBridge.Server.ServerMode smode)
+ throws IOException {
+ long secretKeyInterval = MetastoreConf.getTimeVar(conf,
+ MetastoreConf.ConfVars.DELEGATION_KEY_UPDATE_INTERVAL, TimeUnit.MILLISECONDS);
+ long tokenMaxLifetime = MetastoreConf.getTimeVar(conf,
+ MetastoreConf.ConfVars.DELEGATION_TOKEN_MAX_LIFETIME, TimeUnit.MILLISECONDS);
+ long tokenRenewInterval = MetastoreConf.getTimeVar(conf,
+ MetastoreConf.ConfVars.DELEGATION_TOKEN_RENEW_INTERVAL, TimeUnit.MILLISECONDS);
+ long tokenGcInterval = MetastoreConf.getTimeVar(conf,
+ MetastoreConf.ConfVars.DELEGATION_TOKEN_GC_INTERVAL, TimeUnit.MILLISECONDS);
+
+ DelegationTokenStore dts = getTokenStore(conf);
+ dts.setConf(conf);
+ dts.init(hms, smode);
+ secretManager =
+ new TokenStoreDelegationTokenSecretManager(secretKeyInterval, tokenMaxLifetime,
+ tokenRenewInterval, tokenGcInterval, dts);
+ secretManager.startThreads();
+ }
+
+ public String getDelegationToken(final String owner, final String renewer, String remoteAddr)
+ throws IOException,
+ InterruptedException {
+ /*
+ * If the user asking the token is same as the 'owner' then don't do
+ * any proxy authorization checks. For cases like oozie, where it gets
+ * a delegation token for another user, we need to make sure oozie is
+ * authorized to get a delegation token.
+ */
+ // Do all checks on short names
+ UserGroupInformation currUser = UserGroupInformation.getCurrentUser();
+ UserGroupInformation ownerUgi = UserGroupInformation.createRemoteUser(owner);
+ if (!ownerUgi.getShortUserName().equals(currUser.getShortUserName())) {
+ // in the case of proxy users, the getCurrentUser will return the
+ // real user (for e.g. oozie) due to the doAs that happened just before the
+ // server started executing the method getDelegationToken in the MetaStore
+ ownerUgi = UserGroupInformation.createProxyUser(owner, UserGroupInformation.getCurrentUser());
+ ProxyUsers.authorize(ownerUgi, remoteAddr, null);
+ }
+ return ownerUgi.doAs(new PrivilegedExceptionAction<String>() {
+
+ @Override
+ public String run() throws IOException {
+ return secretManager.getDelegationToken(renewer);
+ }
+ });
+ }
+
+ public String getDelegationTokenWithService(String owner, String renewer, String service, String remoteAddr)
+ throws IOException, InterruptedException {
+ String token = getDelegationToken(owner, renewer, remoteAddr);
+ return addServiceToToken(token, service);
+ }
+
+ public long renewDelegationToken(String tokenStrForm)
+ throws IOException {
+ return secretManager.renewDelegationToken(tokenStrForm);
+ }
+
+ public String getUserFromToken(String tokenStr) throws IOException {
+ return secretManager.getUserFromToken(tokenStr);
+ }
+
+ public void cancelDelegationToken(String tokenStrForm) throws IOException {
+ secretManager.cancelDelegationToken(tokenStrForm);
+ }
+
+ /**
+ * Verify token string
+ * @param tokenStrForm
+ * @return user name
+ * @throws IOException
+ */
+ public String verifyDelegationToken(String tokenStrForm) throws IOException {
+ return secretManager.verifyDelegationToken(tokenStrForm);
+ }
+
+ private DelegationTokenStore getTokenStore(Configuration conf) throws IOException {
+ String tokenStoreClassName =
+ MetastoreConf.getVar(conf, MetastoreConf.ConfVars.DELEGATION_TOKEN_STORE_CLS, "");
+ // The second half of this if is to catch cases where users are passing in a HiveConf for
+ // configuration. It will have set the default value of
+ // "hive.cluster.delegation.token.store .class" to
+ // "org.apache.hadoop.hive.thrift.MemoryTokenStore" as part of its construction. But this is
+ // the hive-shims version of the memory store. We want to convert this to our default value.
+ if (StringUtils.isBlank(tokenStoreClassName) ||
+ "org.apache.hadoop.hive.thrift.MemoryTokenStore".equals(tokenStoreClassName)) {
+ return new MemoryTokenStore();
+ }
+ try {
+ Class<? extends DelegationTokenStore> storeClass =
+ Class.forName(tokenStoreClassName).asSubclass(DelegationTokenStore.class);
+ return ReflectionUtils.newInstance(storeClass, conf);
+ } catch (ClassNotFoundException e) {
+ throw new IOException("Error initializing delegation token store: " + tokenStoreClassName, e);
+ }
+ }
+
+ /**
+ * Add a given service to delegation token string.
+ * @param tokenStr
+ * @param tokenService
+ * @return
+ * @throws IOException
+ */
+ public static String addServiceToToken(String tokenStr, String tokenService)
+ throws IOException {
+ Token<DelegationTokenIdentifier> delegationToken = createToken(tokenStr, tokenService);
+ return delegationToken.encodeToUrlString();
+ }
+
+ /**
+ * Create a new token using the given string and service
+ * @param tokenStr
+ * @param tokenService
+ * @return
+ * @throws IOException
+ */
+ private static Token<DelegationTokenIdentifier> createToken(String tokenStr, String tokenService)
+ throws IOException {
+ Token<DelegationTokenIdentifier> delegationToken = new Token<>();
+ delegationToken.decodeFromUrlString(tokenStr);
+ delegationToken.setService(new Text(tokenService));
+ return delegationToken;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/e2770d6e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/TFilterTransport.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/TFilterTransport.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/TFilterTransport.java
new file mode 100644
index 0000000..79d317a
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/TFilterTransport.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.security;
+
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+/**
+ * Transport that simply wraps another transport.
+ * This is the equivalent of FilterInputStream for Thrift transports.
+ */
+ public class TFilterTransport extends TTransport {
+ protected final TTransport wrapped;
+
+ public TFilterTransport(TTransport wrapped) {
+ this.wrapped = wrapped;
+ }
+
+ @Override
+ public void open() throws TTransportException {
+ wrapped.open();
+ }
+
+ @Override
+ public boolean isOpen() {
+ return wrapped.isOpen();
+ }
+
+ @Override
+ public boolean peek() {
+ return wrapped.peek();
+ }
+
+ @Override
+ public void close() {
+ wrapped.close();
+ }
+
+ @Override
+ public int read(byte[] buf, int off, int len) throws TTransportException {
+ return wrapped.read(buf, off, len);
+ }
+
+ @Override
+ public int readAll(byte[] buf, int off, int len) throws TTransportException {
+ return wrapped.readAll(buf, off, len);
+ }
+
+ @Override
+ public void write(byte[] buf) throws TTransportException {
+ wrapped.write(buf);
+ }
+
+ @Override
+ public void write(byte[] buf, int off, int len) throws TTransportException {
+ wrapped.write(buf, off, len);
+ }
+
+ @Override
+ public void flush() throws TTransportException {
+ wrapped.flush();
+ }
+
+ @Override
+ public byte[] getBuffer() {
+ return wrapped.getBuffer();
+ }
+
+ @Override
+ public int getBufferPosition() {
+ return wrapped.getBufferPosition();
+ }
+
+ @Override
+ public int getBytesRemainingInBuffer() {
+ return wrapped.getBytesRemainingInBuffer();
+ }
+
+ @Override
+ public void consumeBuffer(int len) {
+ wrapped.consumeBuffer(len);
+ }
+ }
http://git-wip-us.apache.org/repos/asf/hive/blob/e2770d6e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/TUGIAssumingTransport.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/TUGIAssumingTransport.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/TUGIAssumingTransport.java
new file mode 100644
index 0000000..38a946e
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/TUGIAssumingTransport.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.security;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+/**
+ * The Thrift SASL transports call Sasl.createSaslServer and Sasl.createSaslClient
+ * inside open(). So, we need to assume the correct UGI when the transport is opened
+ * so that the SASL mechanisms have access to the right principal. This transport
+ * wraps the Sasl transports to set up the right UGI context for open().
+ *
+ * This is used on the client side, where the API explicitly opens a transport to
+ * the server.
+ */
+ public class TUGIAssumingTransport extends TFilterTransport {
+ protected UserGroupInformation ugi;
+
+ public TUGIAssumingTransport(TTransport wrapped, UserGroupInformation ugi) {
+ super(wrapped);
+ this.ugi = ugi;
+ }
+
+ @Override
+ public void open() throws TTransportException {
+ try {
+ ugi.doAs(new PrivilegedExceptionAction<Void>() {
+ public Void run() {
+ try {
+ wrapped.open();
+ } catch (TTransportException tte) {
+ // Wrap the transport exception in an RTE, since UGI.doAs() then goes
+ // and unwraps this for us out of the doAs block. We then unwrap one
+ // more time in our catch clause to get back the TTE. (ugh)
+ throw new RuntimeException(tte);
+ }
+ return null;
+ }
+ });
+ } catch (IOException ioe) {
+ throw new RuntimeException("Received an ioe we never threw!", ioe);
+ } catch (InterruptedException ie) {
+ throw new RuntimeException("Received an ie we never threw!", ie);
+ } catch (RuntimeException rte) {
+ if (rte.getCause() instanceof TTransportException) {
+ throw (TTransportException)rte.getCause();
+ } else {
+ throw rte;
+ }
+ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/hive/blob/e2770d6e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/TUGIContainingTransport.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/TUGIContainingTransport.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/TUGIContainingTransport.java
new file mode 100644
index 0000000..acfe949
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/TUGIContainingTransport.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.security;
+
+import java.net.Socket;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportFactory;
+
+import com.google.common.collect.MapMaker;
+
+/** TUGIContainingTransport associates ugi information with connection (transport).
+ * Wraps underlying <code>TSocket</code> transport and annotates it with ugi.
+*/
+
+public class TUGIContainingTransport extends TFilterTransport {
+
+ private UserGroupInformation ugi;
+
+ public TUGIContainingTransport(TTransport wrapped) {
+ super(wrapped);
+ }
+
+ public UserGroupInformation getClientUGI(){
+ return ugi;
+ }
+
+ public void setClientUGI(UserGroupInformation ugi){
+ this.ugi = ugi;
+ }
+
+ /**
+ * If the underlying TTransport is an instance of TSocket, it returns the Socket object
+ * which it contains. Otherwise it returns null.
+ */
+ public Socket getSocket() {
+ if (wrapped instanceof TSocket) {
+ return (((TSocket)wrapped).getSocket());
+ }
+
+ return null;
+ }
+
+ /** Factory to create TUGIContainingTransport.
+ */
+
+ public static class Factory extends TTransportFactory {
+
+ // Need a concurrent weakhashmap. WeakKeys() so that when underlying transport gets out of
+ // scope, it still can be GC'ed. Since value of map has a ref to key, need weekValues as well.
+ private static final ConcurrentMap<TTransport, TUGIContainingTransport> transMap =
+ new MapMaker().weakKeys().weakValues().makeMap();
+
+ /**
+ * Get a new <code>TUGIContainingTransport</code> instance, or reuse the
+ * existing one if a <code>TUGIContainingTransport</code> has already been
+ * created before using the given <code>TTransport</code> as an underlying
+ * transport. This ensures that a given underlying transport instance
+ * receives the same <code>TUGIContainingTransport</code>.
+ */
+ @Override
+ public TUGIContainingTransport getTransport(TTransport trans) {
+
+ // UGI information is not available at connection setup time, it will be set later
+ // via set_ugi() rpc.
+ TUGIContainingTransport tugiTrans = transMap.get(trans);
+ if (tugiTrans == null) {
+ tugiTrans = new TUGIContainingTransport(trans);
+ TUGIContainingTransport prev = transMap.putIfAbsent(trans, tugiTrans);
+ if (prev != null) {
+ return prev;
+ }
+ }
+ return tugiTrans;
+ }
+ }
+}