You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/04/29 18:32:54 UTC
[06/25] spark git commit: [SPARK-14987][SQL] inline hive-service
(cli) into sql/hive-thriftserver
http://git-wip-us.apache.org/repos/asf/spark/blob/7feeb82c/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/HttpAuthUtils.java
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/HttpAuthUtils.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/HttpAuthUtils.java
new file mode 100644
index 0000000..3ef5577
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/HttpAuthUtils.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.auth;
+
+import java.security.AccessControlContext;
+import java.security.AccessController;
+import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.StringTokenizer;
+
+import javax.security.auth.Subject;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.http.protocol.BasicHttpContext;
+import org.apache.http.protocol.HttpContext;
+import org.ietf.jgss.GSSContext;
+import org.ietf.jgss.GSSManager;
+import org.ietf.jgss.GSSName;
+import org.ietf.jgss.Oid;
+
+/**
+ * Utility functions for HTTP mode authentication.
+ */
+public final class HttpAuthUtils {
+ public static final String WWW_AUTHENTICATE = "WWW-Authenticate";
+ public static final String AUTHORIZATION = "Authorization";
+ public static final String BASIC = "Basic";
+ public static final String NEGOTIATE = "Negotiate";
+ private static final Log LOG = LogFactory.getLog(HttpAuthUtils.class);
+ private static final String COOKIE_ATTR_SEPARATOR = "&";
+ private static final String COOKIE_CLIENT_USER_NAME = "cu";
+ private static final String COOKIE_CLIENT_RAND_NUMBER = "rn";
+ private static final String COOKIE_KEY_VALUE_SEPARATOR = "=";
+ private final static Set<String> COOKIE_ATTRIBUTES =
+ new HashSet<String>(Arrays.asList(COOKIE_CLIENT_USER_NAME, COOKIE_CLIENT_RAND_NUMBER));
+
+ /**
+ * @return Stringified Base64 encoded kerberosAuthHeader on success
+ * @throws Exception
+ */
+ public static String getKerberosServiceTicket(String principal, String host,
+ String serverHttpUrl, boolean assumeSubject) throws Exception {
+ String serverPrincipal =
+ ShimLoader.getHadoopThriftAuthBridge().getServerPrincipal(principal, host);
+ if (assumeSubject) {
+ // With this option, we're assuming that the external application,
+ // using the JDBC driver has done a JAAS kerberos login already
+ AccessControlContext context = AccessController.getContext();
+ Subject subject = Subject.getSubject(context);
+ if (subject == null) {
+ throw new Exception("The Subject is not set");
+ }
+ return Subject.doAs(subject, new HttpKerberosClientAction(serverPrincipal, serverHttpUrl));
+ } else {
+ // JAAS login from ticket cache to setup the client UserGroupInformation
+ UserGroupInformation clientUGI =
+ ShimLoader.getHadoopThriftAuthBridge().getCurrentUGIWithConf("kerberos");
+ return clientUGI.doAs(new HttpKerberosClientAction(serverPrincipal, serverHttpUrl));
+ }
+ }
+
+ /**
+ * Creates and returns a HS2 cookie token.
+ * @param clientUserName Client User name.
+ * @return An unsigned cookie token generated from input parameters.
+ * The final cookie generated is of the following format :
+ * cu=<username>&rn=<randomNumber>&s=<cookieSignature>
+ */
+ public static String createCookieToken(String clientUserName) {
+ StringBuffer sb = new StringBuffer();
+ sb.append(COOKIE_CLIENT_USER_NAME).append(COOKIE_KEY_VALUE_SEPARATOR).append(clientUserName).
+ append(COOKIE_ATTR_SEPARATOR);
+ sb.append(COOKIE_CLIENT_RAND_NUMBER).append(COOKIE_KEY_VALUE_SEPARATOR).
+ append((new Random(System.currentTimeMillis())).nextLong());
+ return sb.toString();
+ }
+
+ /**
+ * Parses a cookie token to retrieve client user name.
+ * @param tokenStr Token String.
+ * @return A valid user name if input is of valid format, else returns null.
+ */
+ public static String getUserNameFromCookieToken(String tokenStr) {
+ Map<String, String> map = splitCookieToken(tokenStr);
+
+ if (!map.keySet().equals(COOKIE_ATTRIBUTES)) {
+ LOG.error("Invalid token with missing attributes " + tokenStr);
+ return null;
+ }
+ return map.get(COOKIE_CLIENT_USER_NAME);
+ }
+
+ /**
+ * Splits the cookie token into attributes pairs.
+ * @param str input token.
+ * @return a map with the attribute pairs of the token if the input is valid.
+ * Else, returns null.
+ */
+ private static Map<String, String> splitCookieToken(String tokenStr) {
+ Map<String, String> map = new HashMap<String, String>();
+ StringTokenizer st = new StringTokenizer(tokenStr, COOKIE_ATTR_SEPARATOR);
+
+ while (st.hasMoreTokens()) {
+ String part = st.nextToken();
+ int separator = part.indexOf(COOKIE_KEY_VALUE_SEPARATOR);
+ if (separator == -1) {
+ LOG.error("Invalid token string " + tokenStr);
+ return null;
+ }
+ String key = part.substring(0, separator);
+ String value = part.substring(separator + 1);
+ map.put(key, value);
+ }
+ return map;
+ }
+
+
+ private HttpAuthUtils() {
+ throw new UnsupportedOperationException("Can't initialize class");
+ }
+
+ /**
+ * We'll create an instance of this class within a doAs block so that the client's TGT credentials
+ * can be read from the Subject
+ */
+ public static class HttpKerberosClientAction implements PrivilegedExceptionAction<String> {
+ public static final String HTTP_RESPONSE = "HTTP_RESPONSE";
+ public static final String SERVER_HTTP_URL = "SERVER_HTTP_URL";
+ private final String serverPrincipal;
+ private final String serverHttpUrl;
+ private final Base64 base64codec;
+ private final HttpContext httpContext;
+
+ public HttpKerberosClientAction(String serverPrincipal, String serverHttpUrl) {
+ this.serverPrincipal = serverPrincipal;
+ this.serverHttpUrl = serverHttpUrl;
+ base64codec = new Base64(0);
+ httpContext = new BasicHttpContext();
+ httpContext.setAttribute(SERVER_HTTP_URL, serverHttpUrl);
+ }
+
+ @Override
+ public String run() throws Exception {
+ // This Oid for Kerberos GSS-API mechanism.
+ Oid mechOid = new Oid("1.2.840.113554.1.2.2");
+ // Oid for kerberos principal name
+ Oid krb5PrincipalOid = new Oid("1.2.840.113554.1.2.2.1");
+ GSSManager manager = GSSManager.getInstance();
+ // GSS name for server
+ GSSName serverName = manager.createName(serverPrincipal, krb5PrincipalOid);
+ // Create a GSSContext for authentication with the service.
+ // We're passing client credentials as null since we want them to be read from the Subject.
+ GSSContext gssContext =
+ manager.createContext(serverName, mechOid, null, GSSContext.DEFAULT_LIFETIME);
+ gssContext.requestMutualAuth(false);
+ // Establish context
+ byte[] inToken = new byte[0];
+ byte[] outToken = gssContext.initSecContext(inToken, 0, inToken.length);
+ gssContext.dispose();
+ // Base64 encoded and stringified token for server
+ return new String(base64codec.encode(outToken));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/7feeb82c/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/HttpAuthenticationException.java
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/HttpAuthenticationException.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/HttpAuthenticationException.java
new file mode 100644
index 0000000..5764325
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/HttpAuthenticationException.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hive.service.auth;
+
+public class HttpAuthenticationException extends Exception {
+
+ private static final long serialVersionUID = 0;
+
+ /**
+ * @param cause original exception
+ */
+ public HttpAuthenticationException(Throwable cause) {
+ super(cause);
+ }
+
+ /**
+ * @param msg exception message
+ */
+ public HttpAuthenticationException(String msg) {
+ super(msg);
+ }
+
+ /**
+ * @param msg exception message
+ * @param cause original exception
+ */
+ public HttpAuthenticationException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/7feeb82c/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/KerberosSaslHelper.java
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/KerberosSaslHelper.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/KerberosSaslHelper.java
new file mode 100644
index 0000000..11d2669
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/KerberosSaslHelper.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.service.auth;
+
+import java.io.IOException;
+import java.util.Map;
+import javax.security.sasl.SaslException;
+
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge;
+import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server;
+import org.apache.hive.service.cli.thrift.TCLIService;
+import org.apache.hive.service.cli.thrift.TCLIService.Iface;
+import org.apache.hive.service.cli.thrift.ThriftCLIService;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.TProcessorFactory;
+import org.apache.thrift.transport.TSaslClientTransport;
+import org.apache.thrift.transport.TTransport;
+
+public final class KerberosSaslHelper {
+
+ public static TProcessorFactory getKerberosProcessorFactory(Server saslServer,
+ ThriftCLIService service) {
+ return new CLIServiceProcessorFactory(saslServer, service);
+ }
+
+ public static TTransport getKerberosTransport(String principal, String host,
+ TTransport underlyingTransport, Map<String, String> saslProps, boolean assumeSubject)
+ throws SaslException {
+ try {
+ String[] names = principal.split("[/@]");
+ if (names.length != 3) {
+ throw new IllegalArgumentException("Kerberos principal should have 3 parts: " + principal);
+ }
+
+ if (assumeSubject) {
+ return createSubjectAssumedTransport(principal, underlyingTransport, saslProps);
+ } else {
+ HadoopThriftAuthBridge.Client authBridge =
+ ShimLoader.getHadoopThriftAuthBridge().createClientWithConf("kerberos");
+ return authBridge.createClientTransport(principal, host, "KERBEROS", null,
+ underlyingTransport, saslProps);
+ }
+ } catch (IOException e) {
+ throw new SaslException("Failed to open client transport", e);
+ }
+ }
+
+ public static TTransport createSubjectAssumedTransport(String principal,
+ TTransport underlyingTransport, Map<String, String> saslProps) throws IOException {
+ String[] names = principal.split("[/@]");
+ try {
+ TTransport saslTransport =
+ new TSaslClientTransport("GSSAPI", null, names[0], names[1], saslProps, null,
+ underlyingTransport);
+ return new TSubjectAssumingTransport(saslTransport);
+ } catch (SaslException se) {
+ throw new IOException("Could not instantiate SASL transport", se);
+ }
+ }
+
+ public static TTransport getTokenTransport(String tokenStr, String host,
+ TTransport underlyingTransport, Map<String, String> saslProps) throws SaslException {
+ HadoopThriftAuthBridge.Client authBridge =
+ ShimLoader.getHadoopThriftAuthBridge().createClientWithConf("kerberos");
+
+ try {
+ return authBridge.createClientTransport(null, host, "DIGEST", tokenStr, underlyingTransport,
+ saslProps);
+ } catch (IOException e) {
+ throw new SaslException("Failed to open client transport", e);
+ }
+ }
+
+ private KerberosSaslHelper() {
+ throw new UnsupportedOperationException("Can't initialize class");
+ }
+
+ private static class CLIServiceProcessorFactory extends TProcessorFactory {
+
+ private final ThriftCLIService service;
+ private final Server saslServer;
+
+ public CLIServiceProcessorFactory(Server saslServer, ThriftCLIService service) {
+ super(null);
+ this.service = service;
+ this.saslServer = saslServer;
+ }
+
+ @Override
+ public TProcessor getProcessor(TTransport trans) {
+ TProcessor sqlProcessor = new TCLIService.Processor<Iface>(service);
+ return saslServer.wrapNonAssumingProcessor(sqlProcessor);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/7feeb82c/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/LdapAuthenticationProviderImpl.java
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/LdapAuthenticationProviderImpl.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/LdapAuthenticationProviderImpl.java
new file mode 100644
index 0000000..4e2ef90
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/LdapAuthenticationProviderImpl.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.service.auth;
+
+import java.util.Hashtable;
+import javax.naming.Context;
+import javax.naming.NamingException;
+import javax.naming.directory.InitialDirContext;
+import javax.security.sasl.AuthenticationException;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hive.service.ServiceUtils;
+
+public class LdapAuthenticationProviderImpl implements PasswdAuthenticationProvider {
+
+ private final String ldapURL;
+ private final String baseDN;
+ private final String ldapDomain;
+
+ LdapAuthenticationProviderImpl() {
+ HiveConf conf = new HiveConf();
+ ldapURL = conf.getVar(HiveConf.ConfVars.HIVE_SERVER2_PLAIN_LDAP_URL);
+ baseDN = conf.getVar(HiveConf.ConfVars.HIVE_SERVER2_PLAIN_LDAP_BASEDN);
+ ldapDomain = conf.getVar(HiveConf.ConfVars.HIVE_SERVER2_PLAIN_LDAP_DOMAIN);
+ }
+
+ @Override
+ public void Authenticate(String user, String password) throws AuthenticationException {
+
+ Hashtable<String, Object> env = new Hashtable<String, Object>();
+ env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory");
+ env.put(Context.PROVIDER_URL, ldapURL);
+
+ // If the domain is available in the config, then append it unless domain is
+ // already part of the username. LDAP providers like Active Directory use a
+ // fully qualified user name like foo@bar.com.
+ if (!hasDomain(user) && ldapDomain != null) {
+ user = user + "@" + ldapDomain;
+ }
+
+ if (password == null || password.isEmpty() || password.getBytes()[0] == 0) {
+ throw new AuthenticationException("Error validating LDAP user:" +
+ " a null or blank password has been provided");
+ }
+
+ // setup the security principal
+ String bindDN;
+ if (baseDN == null) {
+ bindDN = user;
+ } else {
+ bindDN = "uid=" + user + "," + baseDN;
+ }
+ env.put(Context.SECURITY_AUTHENTICATION, "simple");
+ env.put(Context.SECURITY_PRINCIPAL, bindDN);
+ env.put(Context.SECURITY_CREDENTIALS, password);
+
+ try {
+ // Create initial context
+ Context ctx = new InitialDirContext(env);
+ ctx.close();
+ } catch (NamingException e) {
+ throw new AuthenticationException("Error validating LDAP user", e);
+ }
+ }
+
+ private boolean hasDomain(String userName) {
+ return (ServiceUtils.indexOfDomainMatch(userName) > 0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/7feeb82c/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/PamAuthenticationProviderImpl.java
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/PamAuthenticationProviderImpl.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/PamAuthenticationProviderImpl.java
new file mode 100644
index 0000000..68f62c4
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/PamAuthenticationProviderImpl.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.service.auth;
+
+import javax.security.sasl.AuthenticationException;
+
+import net.sf.jpam.Pam;
+import org.apache.hadoop.hive.conf.HiveConf;
+
+public class PamAuthenticationProviderImpl implements PasswdAuthenticationProvider {
+
+ private final String pamServiceNames;
+
+ PamAuthenticationProviderImpl() {
+ HiveConf conf = new HiveConf();
+ pamServiceNames = conf.getVar(HiveConf.ConfVars.HIVE_SERVER2_PAM_SERVICES);
+ }
+
+ @Override
+ public void Authenticate(String user, String password) throws AuthenticationException {
+
+ if (pamServiceNames == null || pamServiceNames.trim().isEmpty()) {
+ throw new AuthenticationException("No PAM services are set.");
+ }
+
+ String[] pamServices = pamServiceNames.split(",");
+ for (String pamService : pamServices) {
+ Pam pam = new Pam(pamService);
+ boolean isAuthenticated = pam.authenticateSuccessful(user, password);
+ if (!isAuthenticated) {
+ throw new AuthenticationException(
+ "Error authenticating with the PAM service: " + pamService);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/7feeb82c/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/PasswdAuthenticationProvider.java
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/PasswdAuthenticationProvider.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/PasswdAuthenticationProvider.java
new file mode 100644
index 0000000..e2a6de1
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/PasswdAuthenticationProvider.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.service.auth;
+
+import javax.security.sasl.AuthenticationException;
+
+public interface PasswdAuthenticationProvider {
+
+ /**
+ * The Authenticate method is called by the HiveServer2 authentication layer
+ * to authenticate users for their requests.
+ * If a user is to be granted, return nothing/throw nothing.
+ * When a user is to be disallowed, throw an appropriate {@link AuthenticationException}.
+ * <p/>
+ * For an example implementation, see {@link LdapAuthenticationProviderImpl}.
+ *
+ * @param user The username received over the connection request
+ * @param password The password received over the connection request
+ *
+ * @throws AuthenticationException When a user is found to be
+ * invalid by the implementation
+ */
+ void Authenticate(String user, String password) throws AuthenticationException;
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/7feeb82c/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/PlainSaslHelper.java
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/PlainSaslHelper.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/PlainSaslHelper.java
new file mode 100644
index 0000000..afc1441
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/PlainSaslHelper.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.service.auth;
+
+import java.io.IOException;
+import java.security.Security;
+import java.util.HashMap;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.LoginException;
+import javax.security.sasl.AuthenticationException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.SaslException;
+
+import org.apache.hive.service.auth.AuthenticationProviderFactory.AuthMethods;
+import org.apache.hive.service.auth.PlainSaslServer.SaslPlainProvider;
+import org.apache.hive.service.cli.thrift.TCLIService.Iface;
+import org.apache.hive.service.cli.thrift.ThriftCLIService;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.TProcessorFactory;
+import org.apache.thrift.transport.TSaslClientTransport;
+import org.apache.thrift.transport.TSaslServerTransport;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportFactory;
+
+public final class PlainSaslHelper {
+
+ public static TProcessorFactory getPlainProcessorFactory(ThriftCLIService service) {
+ return new SQLPlainProcessorFactory(service);
+ }
+
+ // Register Plain SASL server provider
+ static {
+ Security.addProvider(new SaslPlainProvider());
+ }
+
+ public static TTransportFactory getPlainTransportFactory(String authTypeStr)
+ throws LoginException {
+ TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
+ try {
+ saslFactory.addServerDefinition("PLAIN", authTypeStr, null, new HashMap<String, String>(),
+ new PlainServerCallbackHandler(authTypeStr));
+ } catch (AuthenticationException e) {
+ throw new LoginException("Error setting callback handler" + e);
+ }
+ return saslFactory;
+ }
+
+ public static TTransport getPlainTransport(String username, String password,
+ TTransport underlyingTransport) throws SaslException {
+ return new TSaslClientTransport("PLAIN", null, null, null, new HashMap<String, String>(),
+ new PlainCallbackHandler(username, password), underlyingTransport);
+ }
+
+ private PlainSaslHelper() {
+ throw new UnsupportedOperationException("Can't initialize class");
+ }
+
+ private static final class PlainServerCallbackHandler implements CallbackHandler {
+
+ private final AuthMethods authMethod;
+
+ PlainServerCallbackHandler(String authMethodStr) throws AuthenticationException {
+ authMethod = AuthMethods.getValidAuthMethod(authMethodStr);
+ }
+
+ @Override
+ public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+ String username = null;
+ String password = null;
+ AuthorizeCallback ac = null;
+
+ for (Callback callback : callbacks) {
+ if (callback instanceof NameCallback) {
+ NameCallback nc = (NameCallback) callback;
+ username = nc.getName();
+ } else if (callback instanceof PasswordCallback) {
+ PasswordCallback pc = (PasswordCallback) callback;
+ password = new String(pc.getPassword());
+ } else if (callback instanceof AuthorizeCallback) {
+ ac = (AuthorizeCallback) callback;
+ } else {
+ throw new UnsupportedCallbackException(callback);
+ }
+ }
+ PasswdAuthenticationProvider provider =
+ AuthenticationProviderFactory.getAuthenticationProvider(authMethod);
+ provider.Authenticate(username, password);
+ if (ac != null) {
+ ac.setAuthorized(true);
+ }
+ }
+ }
+
+ public static class PlainCallbackHandler implements CallbackHandler {
+
+ private final String username;
+ private final String password;
+
+ public PlainCallbackHandler(String username, String password) {
+ this.username = username;
+ this.password = password;
+ }
+
+ @Override
+ public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+ for (Callback callback : callbacks) {
+ if (callback instanceof NameCallback) {
+ NameCallback nameCallback = (NameCallback) callback;
+ nameCallback.setName(username);
+ } else if (callback instanceof PasswordCallback) {
+ PasswordCallback passCallback = (PasswordCallback) callback;
+ passCallback.setPassword(password.toCharArray());
+ } else {
+ throw new UnsupportedCallbackException(callback);
+ }
+ }
+ }
+ }
+
+ private static final class SQLPlainProcessorFactory extends TProcessorFactory {
+
+ private final ThriftCLIService service;
+
+ SQLPlainProcessorFactory(ThriftCLIService service) {
+ super(null);
+ this.service = service;
+ }
+
+ @Override
+ public TProcessor getProcessor(TTransport trans) {
+ return new TSetIpAddressProcessor<Iface>(service);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/7feeb82c/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/PlainSaslServer.java
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/PlainSaslServer.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/PlainSaslServer.java
new file mode 100644
index 0000000..cd675da
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/PlainSaslServer.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.service.auth;
+
+import java.io.IOException;
+import java.security.Provider;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Map;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+import javax.security.sasl.SaslServerFactory;
+
+import org.apache.hive.service.auth.AuthenticationProviderFactory.AuthMethods;
+
+/**
+ * Sun JDK only provides a PLAIN client and no server. This class implements the Plain SASL server
+ * conforming to RFC #4616 (http://www.ietf.org/rfc/rfc4616.txt).
+ */
+public class PlainSaslServer implements SaslServer {
+
+ public static final String PLAIN_METHOD = "PLAIN";
+ private String user;
+ private final CallbackHandler handler;
+
+ PlainSaslServer(CallbackHandler handler, String authMethodStr) throws SaslException {
+ this.handler = handler;
+ AuthMethods.getValidAuthMethod(authMethodStr);
+ }
+
+ @Override
+ public String getMechanismName() {
+ return PLAIN_METHOD;
+ }
+
+ @Override
+ public byte[] evaluateResponse(byte[] response) throws SaslException {
+ try {
+ // parse the response
+ // message = [authzid] UTF8NUL authcid UTF8NUL passwd'
+
+ Deque<String> tokenList = new ArrayDeque<String>();
+ StringBuilder messageToken = new StringBuilder();
+ for (byte b : response) {
+ if (b == 0) {
+ tokenList.addLast(messageToken.toString());
+ messageToken = new StringBuilder();
+ } else {
+ messageToken.append((char) b);
+ }
+ }
+ tokenList.addLast(messageToken.toString());
+
+ // validate response
+ if (tokenList.size() < 2 || tokenList.size() > 3) {
+ throw new SaslException("Invalid message format");
+ }
+ String passwd = tokenList.removeLast();
+ user = tokenList.removeLast();
+ // optional authzid
+ String authzId;
+ if (tokenList.isEmpty()) {
+ authzId = user;
+ } else {
+ authzId = tokenList.removeLast();
+ }
+ if (user == null || user.isEmpty()) {
+ throw new SaslException("No user name provided");
+ }
+ if (passwd == null || passwd.isEmpty()) {
+ throw new SaslException("No password name provided");
+ }
+
+ NameCallback nameCallback = new NameCallback("User");
+ nameCallback.setName(user);
+ PasswordCallback pcCallback = new PasswordCallback("Password", false);
+ pcCallback.setPassword(passwd.toCharArray());
+ AuthorizeCallback acCallback = new AuthorizeCallback(user, authzId);
+
+ Callback[] cbList = {nameCallback, pcCallback, acCallback};
+ handler.handle(cbList);
+ if (!acCallback.isAuthorized()) {
+ throw new SaslException("Authentication failed");
+ }
+ } catch (IllegalStateException eL) {
+ throw new SaslException("Invalid message format", eL);
+ } catch (IOException eI) {
+ throw new SaslException("Error validating the login", eI);
+ } catch (UnsupportedCallbackException eU) {
+ throw new SaslException("Error validating the login", eU);
+ }
+ return null;
+ }
+
+ @Override
+ public boolean isComplete() {
+ return user != null;
+ }
+
+ @Override
+ public String getAuthorizationID() {
+ return user;
+ }
+
+ @Override
+ public byte[] unwrap(byte[] incoming, int offset, int len) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public byte[] wrap(byte[] outgoing, int offset, int len) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Object getNegotiatedProperty(String propName) {
+ return null;
+ }
+
+ @Override
+ public void dispose() {}
+
+ public static class SaslPlainServerFactory implements SaslServerFactory {
+
+ @Override
+ public SaslServer createSaslServer(String mechanism, String protocol, String serverName,
+ Map<String, ?> props, CallbackHandler cbh) {
+ if (PLAIN_METHOD.equals(mechanism)) {
+ try {
+ return new PlainSaslServer(cbh, protocol);
+ } catch (SaslException e) {
+ /* This is to fulfill the contract of the interface which states that an exception shall
+ be thrown when a SaslServer cannot be created due to an error but null should be
+ returned when a Server can't be created due to the parameters supplied. And the only
+ thing PlainSaslServer can fail on is a non-supported authentication mechanism.
+ That's why we return null instead of throwing the Exception */
+ return null;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public String[] getMechanismNames(Map<String, ?> props) {
+ return new String[] {PLAIN_METHOD};
+ }
+ }
+
+ public static class SaslPlainProvider extends Provider {
+
+ public SaslPlainProvider() {
+ super("HiveSaslPlain", 1.0, "Hive Plain SASL provider");
+ put("SaslServerFactory.PLAIN", SaslPlainServerFactory.class.getName());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/7feeb82c/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/SaslQOP.java
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/SaslQOP.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/SaslQOP.java
new file mode 100644
index 0000000..479ebf3
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/SaslQOP.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.auth;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Possible values of SASL quality-of-protection value.
+ */
+public enum SaslQOP {
+ AUTH("auth"), // Authentication only.
+ AUTH_INT("auth-int"), // Authentication and integrity checking by using signatures.
+ AUTH_CONF("auth-conf"); // Authentication, integrity and confidentiality checking
+ // by using signatures and encryption.
+
+ public final String saslQop;
+
+ private static final Map<String, SaslQOP> STR_TO_ENUM = new HashMap<String, SaslQOP>();
+
+ static {
+ for (SaslQOP saslQop : values()) {
+ STR_TO_ENUM.put(saslQop.toString(), saslQop);
+ }
+ }
+
+ SaslQOP(String saslQop) {
+ this.saslQop = saslQop;
+ }
+
+ public String toString() {
+ return saslQop;
+ }
+
+ public static SaslQOP fromString(String str) {
+ if (str != null) {
+ str = str.toLowerCase();
+ }
+ SaslQOP saslQOP = STR_TO_ENUM.get(str);
+ if (saslQOP == null) {
+ throw new IllegalArgumentException(
+ "Unknown auth type: " + str + " Allowed values are: " + STR_TO_ENUM.keySet());
+ }
+ return saslQOP;
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/7feeb82c/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/TSetIpAddressProcessor.java
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/TSetIpAddressProcessor.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/TSetIpAddressProcessor.java
new file mode 100644
index 0000000..645e3e2
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/TSetIpAddressProcessor.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.auth;
+
+import org.apache.hive.service.cli.thrift.TCLIService;
+import org.apache.hive.service.cli.thrift.TCLIService.Iface;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSaslClientTransport;
+import org.apache.thrift.transport.TSaslServerTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is responsible for setting the ipAddress for operations executed via HiveServer2.
+ * <p>
+ * <ul>
+ * <li>IP address is only set for operations that calls listeners with hookContext</li>
+ * <li>IP address is only set if the underlying transport mechanism is socket</li>
+ * </ul>
+ * </p>
+ *
+ * @see org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext
+ */
+public class TSetIpAddressProcessor<I extends Iface> extends TCLIService.Processor<Iface> {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(TSetIpAddressProcessor.class.getName());
+
+ public TSetIpAddressProcessor(Iface iface) {
+ super(iface);
+ }
+
+ @Override
+ public boolean process(final TProtocol in, final TProtocol out) throws TException {
+ setIpAddress(in);
+ setUserName(in);
+ try {
+ return super.process(in, out);
+ } finally {
+ THREAD_LOCAL_USER_NAME.remove();
+ THREAD_LOCAL_IP_ADDRESS.remove();
+ }
+ }
+
+ private void setUserName(final TProtocol in) {
+ TTransport transport = in.getTransport();
+ if (transport instanceof TSaslServerTransport) {
+ String userName = ((TSaslServerTransport) transport).getSaslServer().getAuthorizationID();
+ THREAD_LOCAL_USER_NAME.set(userName);
+ }
+ }
+
+ protected void setIpAddress(final TProtocol in) {
+ TTransport transport = in.getTransport();
+ TSocket tSocket = getUnderlyingSocketFromTransport(transport);
+ if (tSocket == null) {
+ LOGGER.warn("Unknown Transport, cannot determine ipAddress");
+ } else {
+ THREAD_LOCAL_IP_ADDRESS.set(tSocket.getSocket().getInetAddress().getHostAddress());
+ }
+ }
+
+ private TSocket getUnderlyingSocketFromTransport(TTransport transport) {
+ while (transport != null) {
+ if (transport instanceof TSaslServerTransport) {
+ transport = ((TSaslServerTransport) transport).getUnderlyingTransport();
+ }
+ if (transport instanceof TSaslClientTransport) {
+ transport = ((TSaslClientTransport) transport).getUnderlyingTransport();
+ }
+ if (transport instanceof TSocket) {
+ return (TSocket) transport;
+ }
+ }
+ return null;
+ }
+
+ private static final ThreadLocal<String> THREAD_LOCAL_IP_ADDRESS = new ThreadLocal<String>() {
+ @Override
+ protected synchronized String initialValue() {
+ return null;
+ }
+ };
+
+ private static final ThreadLocal<String> THREAD_LOCAL_USER_NAME = new ThreadLocal<String>() {
+ @Override
+ protected synchronized String initialValue() {
+ return null;
+ }
+ };
+
+ public static String getUserIpAddress() {
+ return THREAD_LOCAL_IP_ADDRESS.get();
+ }
+
+ public static String getUserName() {
+ return THREAD_LOCAL_USER_NAME.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/7feeb82c/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/TSubjectAssumingTransport.java
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/TSubjectAssumingTransport.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/TSubjectAssumingTransport.java
new file mode 100644
index 0000000..2422e86
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/TSubjectAssumingTransport.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.auth;
+
+import java.security.AccessControlContext;
+import java.security.AccessController;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+import javax.security.auth.Subject;
+
+import org.apache.hadoop.hive.thrift.TFilterTransport;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+/**
+ * This is used on the client side, where the API explicitly opens a transport to
+ * the server using the Subject.doAs().
+ */
+public class TSubjectAssumingTransport extends TFilterTransport {
+
+ public TSubjectAssumingTransport(TTransport wrapped) {
+ super(wrapped);
+ }
+
+ @Override
+ public void open() throws TTransportException {
+ try {
+ AccessControlContext context = AccessController.getContext();
+ Subject subject = Subject.getSubject(context);
+ Subject.doAs(subject, new PrivilegedExceptionAction<Void>() {
+ public Void run() {
+ try {
+ wrapped.open();
+ } catch (TTransportException tte) {
+ // Wrap the transport exception in an RTE, since Subject.doAs() then goes
+ // and unwraps this for us out of the doAs block. We then unwrap one
+ // more time in our catch clause to get back the TTE. (ugh)
+ throw new RuntimeException(tte);
+ }
+ return null;
+ }
+ });
+ } catch (PrivilegedActionException ioe) {
+ throw new RuntimeException("Received an ioe we never threw!", ioe);
+ } catch (RuntimeException rte) {
+ if (rte.getCause() instanceof TTransportException) {
+ throw (TTransportException) rte.getCause();
+ } else {
+ throw rte;
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/7feeb82c/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/CLIService.java
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/CLIService.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/CLIService.java
new file mode 100644
index 0000000..a3af7b2
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/CLIService.java
@@ -0,0 +1,508 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.security.auth.login.LoginException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.shims.Utils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.service.CompositeService;
+import org.apache.hive.service.ServiceException;
+import org.apache.hive.service.auth.HiveAuthFactory;
+import org.apache.hive.service.cli.operation.Operation;
+import org.apache.hive.service.cli.session.SessionManager;
+import org.apache.hive.service.cli.thrift.TProtocolVersion;
+import org.apache.hive.service.server.HiveServer2;
+
+/**
+ * CLIService.
+ *
+ */
+public class CLIService extends CompositeService implements ICLIService {
+
+ public static final TProtocolVersion SERVER_VERSION;
+
+ static {
+ TProtocolVersion[] protocols = TProtocolVersion.values();
+ SERVER_VERSION = protocols[protocols.length - 1];
+ }
+
+ private final Log LOG = LogFactory.getLog(CLIService.class.getName());
+
+ private HiveConf hiveConf;
+ private SessionManager sessionManager;
+ private UserGroupInformation serviceUGI;
+ private UserGroupInformation httpUGI;
+ // The HiveServer2 instance running this service
+ private final HiveServer2 hiveServer2;
+
+ public CLIService(HiveServer2 hiveServer2) {
+ super(CLIService.class.getSimpleName());
+ this.hiveServer2 = hiveServer2;
+ }
+
+ @Override
+ public synchronized void init(HiveConf hiveConf) {
+ this.hiveConf = hiveConf;
+ sessionManager = new SessionManager(hiveServer2);
+ addService(sessionManager);
+ // If the hadoop cluster is secure, do a kerberos login for the service from the keytab
+ if (UserGroupInformation.isSecurityEnabled()) {
+ try {
+ HiveAuthFactory.loginFromKeytab(hiveConf);
+ this.serviceUGI = Utils.getUGI();
+ } catch (IOException e) {
+ throw new ServiceException("Unable to login to kerberos with given principal/keytab", e);
+ } catch (LoginException e) {
+ throw new ServiceException("Unable to login to kerberos with given principal/keytab", e);
+ }
+
+ // Also try creating a UGI object for the SPNego principal
+ String principal = hiveConf.getVar(ConfVars.HIVE_SERVER2_SPNEGO_PRINCIPAL);
+ String keyTabFile = hiveConf.getVar(ConfVars.HIVE_SERVER2_SPNEGO_KEYTAB);
+ if (principal.isEmpty() || keyTabFile.isEmpty()) {
+ LOG.info("SPNego httpUGI not created, spNegoPrincipal: " + principal +
+ ", ketabFile: " + keyTabFile);
+ } else {
+ try {
+ this.httpUGI = HiveAuthFactory.loginFromSpnegoKeytabAndReturnUGI(hiveConf);
+ LOG.info("SPNego httpUGI successfully created.");
+ } catch (IOException e) {
+ LOG.warn("SPNego httpUGI creation failed: ", e);
+ }
+ }
+ }
+ // creates connection to HMS and thus *must* occur after kerberos login above
+ try {
+ applyAuthorizationConfigPolicy(hiveConf);
+ } catch (Exception e) {
+ throw new RuntimeException("Error applying authorization policy on hive configuration: "
+ + e.getMessage(), e);
+ }
+ setupBlockedUdfs();
+ super.init(hiveConf);
+ }
+
+ private void applyAuthorizationConfigPolicy(HiveConf newHiveConf) throws HiveException,
+ MetaException {
+ // authorization setup using SessionState should be revisited eventually, as
+ // authorization and authentication are not session specific settings
+ SessionState ss = new SessionState(newHiveConf);
+ ss.setIsHiveServerQuery(true);
+ SessionState.start(ss);
+ ss.applyAuthorizationPolicy();
+ }
+
+ private void setupBlockedUdfs() {
+ FunctionRegistry.setupPermissionsForBuiltinUDFs(
+ hiveConf.getVar(ConfVars.HIVE_SERVER2_BUILTIN_UDF_WHITELIST),
+ hiveConf.getVar(ConfVars.HIVE_SERVER2_BUILTIN_UDF_BLACKLIST));
+ }
+
+ public UserGroupInformation getServiceUGI() {
+ return this.serviceUGI;
+ }
+
+ public UserGroupInformation getHttpUGI() {
+ return this.httpUGI;
+ }
+
+ @Override
+ public synchronized void start() {
+ super.start();
+ // Initialize and test a connection to the metastore
+ IMetaStoreClient metastoreClient = null;
+ try {
+ metastoreClient = new HiveMetaStoreClient(hiveConf);
+ metastoreClient.getDatabases("default");
+ } catch (Exception e) {
+ throw new ServiceException("Unable to connect to MetaStore!", e);
+ }
+ finally {
+ if (metastoreClient != null) {
+ metastoreClient.close();
+ }
+ }
+ }
+
+ @Override
+ public synchronized void stop() {
+ super.stop();
+ }
+
+ /**
+ * @deprecated Use {@link #openSession(TProtocolVersion, String, String, String, Map)}
+ */
+ @Deprecated
+ public SessionHandle openSession(TProtocolVersion protocol, String username, String password,
+ Map<String, String> configuration) throws HiveSQLException {
+ SessionHandle sessionHandle = sessionManager.openSession(protocol, username, password, null, configuration, false, null);
+ LOG.debug(sessionHandle + ": openSession()");
+ return sessionHandle;
+ }
+
+ /**
+ * @deprecated Use {@link #openSessionWithImpersonation(TProtocolVersion, String, String, String, Map, String)}
+ */
+ @Deprecated
+ public SessionHandle openSessionWithImpersonation(TProtocolVersion protocol, String username,
+ String password, Map<String, String> configuration, String delegationToken)
+ throws HiveSQLException {
+ SessionHandle sessionHandle = sessionManager.openSession(protocol, username, password, null, configuration,
+ true, delegationToken);
+ LOG.debug(sessionHandle + ": openSessionWithImpersonation()");
+ return sessionHandle;
+ }
+
+ public SessionHandle openSession(TProtocolVersion protocol, String username, String password, String ipAddress,
+ Map<String, String> configuration) throws HiveSQLException {
+ SessionHandle sessionHandle = sessionManager.openSession(protocol, username, password, ipAddress, configuration, false, null);
+ LOG.debug(sessionHandle + ": openSession()");
+ return sessionHandle;
+ }
+
+ public SessionHandle openSessionWithImpersonation(TProtocolVersion protocol, String username,
+ String password, String ipAddress, Map<String, String> configuration, String delegationToken)
+ throws HiveSQLException {
+ SessionHandle sessionHandle = sessionManager.openSession(protocol, username, password, ipAddress, configuration,
+ true, delegationToken);
+ LOG.debug(sessionHandle + ": openSession()");
+ return sessionHandle;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hive.service.cli.ICLIService#openSession(java.lang.String, java.lang.String, java.util.Map)
+ */
+ @Override
+ public SessionHandle openSession(String username, String password, Map<String, String> configuration)
+ throws HiveSQLException {
+ SessionHandle sessionHandle = sessionManager.openSession(SERVER_VERSION, username, password, null, configuration, false, null);
+ LOG.debug(sessionHandle + ": openSession()");
+ return sessionHandle;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hive.service.cli.ICLIService#openSession(java.lang.String, java.lang.String, java.util.Map)
+ */
+ @Override
+ public SessionHandle openSessionWithImpersonation(String username, String password, Map<String, String> configuration,
+ String delegationToken) throws HiveSQLException {
+ SessionHandle sessionHandle = sessionManager.openSession(SERVER_VERSION, username, password, null, configuration,
+ true, delegationToken);
+ LOG.debug(sessionHandle + ": openSession()");
+ return sessionHandle;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hive.service.cli.ICLIService#closeSession(org.apache.hive.service.cli.SessionHandle)
+ */
+ @Override
+ public void closeSession(SessionHandle sessionHandle)
+ throws HiveSQLException {
+ sessionManager.closeSession(sessionHandle);
+ LOG.debug(sessionHandle + ": closeSession()");
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hive.service.cli.ICLIService#getInfo(org.apache.hive.service.cli.SessionHandle, java.util.List)
+ */
+ @Override
+ public GetInfoValue getInfo(SessionHandle sessionHandle, GetInfoType getInfoType)
+ throws HiveSQLException {
+ GetInfoValue infoValue = sessionManager.getSession(sessionHandle)
+ .getInfo(getInfoType);
+ LOG.debug(sessionHandle + ": getInfo()");
+ return infoValue;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hive.service.cli.ICLIService#executeStatement(org.apache.hive.service.cli.SessionHandle,
+ * java.lang.String, java.util.Map)
+ */
+ @Override
+ public OperationHandle executeStatement(SessionHandle sessionHandle, String statement,
+ Map<String, String> confOverlay)
+ throws HiveSQLException {
+ OperationHandle opHandle = sessionManager.getSession(sessionHandle)
+ .executeStatement(statement, confOverlay);
+ LOG.debug(sessionHandle + ": executeStatement()");
+ return opHandle;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hive.service.cli.ICLIService#executeStatementAsync(org.apache.hive.service.cli.SessionHandle,
+ * java.lang.String, java.util.Map)
+ */
+ @Override
+ public OperationHandle executeStatementAsync(SessionHandle sessionHandle, String statement,
+ Map<String, String> confOverlay) throws HiveSQLException {
+ OperationHandle opHandle = sessionManager.getSession(sessionHandle)
+ .executeStatementAsync(statement, confOverlay);
+ LOG.debug(sessionHandle + ": executeStatementAsync()");
+ return opHandle;
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.hive.service.cli.ICLIService#getTypeInfo(org.apache.hive.service.cli.SessionHandle)
+ */
+ @Override
+ public OperationHandle getTypeInfo(SessionHandle sessionHandle)
+ throws HiveSQLException {
+ OperationHandle opHandle = sessionManager.getSession(sessionHandle)
+ .getTypeInfo();
+ LOG.debug(sessionHandle + ": getTypeInfo()");
+ return opHandle;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hive.service.cli.ICLIService#getCatalogs(org.apache.hive.service.cli.SessionHandle)
+ */
+ @Override
+ public OperationHandle getCatalogs(SessionHandle sessionHandle)
+ throws HiveSQLException {
+ OperationHandle opHandle = sessionManager.getSession(sessionHandle)
+ .getCatalogs();
+ LOG.debug(sessionHandle + ": getCatalogs()");
+ return opHandle;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hive.service.cli.ICLIService#getSchemas(org.apache.hive.service.cli.SessionHandle, java.lang.String, java.lang.String)
+ */
+ @Override
+ public OperationHandle getSchemas(SessionHandle sessionHandle,
+ String catalogName, String schemaName)
+ throws HiveSQLException {
+ OperationHandle opHandle = sessionManager.getSession(sessionHandle)
+ .getSchemas(catalogName, schemaName);
+ LOG.debug(sessionHandle + ": getSchemas()");
+ return opHandle;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hive.service.cli.ICLIService#getTables(org.apache.hive.service.cli.SessionHandle, java.lang.String, java.lang.String, java.lang.String, java.util.List)
+ */
+ @Override
+ public OperationHandle getTables(SessionHandle sessionHandle,
+ String catalogName, String schemaName, String tableName, List<String> tableTypes)
+ throws HiveSQLException {
+ OperationHandle opHandle = sessionManager.getSession(sessionHandle)
+ .getTables(catalogName, schemaName, tableName, tableTypes);
+ LOG.debug(sessionHandle + ": getTables()");
+ return opHandle;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hive.service.cli.ICLIService#getTableTypes(org.apache.hive.service.cli.SessionHandle)
+ */
+ @Override
+ public OperationHandle getTableTypes(SessionHandle sessionHandle)
+ throws HiveSQLException {
+ OperationHandle opHandle = sessionManager.getSession(sessionHandle)
+ .getTableTypes();
+ LOG.debug(sessionHandle + ": getTableTypes()");
+ return opHandle;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hive.service.cli.ICLIService#getColumns(org.apache.hive.service.cli.SessionHandle)
+ */
+ @Override
+ public OperationHandle getColumns(SessionHandle sessionHandle,
+ String catalogName, String schemaName, String tableName, String columnName)
+ throws HiveSQLException {
+ OperationHandle opHandle = sessionManager.getSession(sessionHandle)
+ .getColumns(catalogName, schemaName, tableName, columnName);
+ LOG.debug(sessionHandle + ": getColumns()");
+ return opHandle;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hive.service.cli.ICLIService#getFunctions(org.apache.hive.service.cli.SessionHandle)
+ */
+ @Override
+ public OperationHandle getFunctions(SessionHandle sessionHandle,
+ String catalogName, String schemaName, String functionName)
+ throws HiveSQLException {
+ OperationHandle opHandle = sessionManager.getSession(sessionHandle)
+ .getFunctions(catalogName, schemaName, functionName);
+ LOG.debug(sessionHandle + ": getFunctions()");
+ return opHandle;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hive.service.cli.ICLIService#getOperationStatus(org.apache.hive.service.cli.OperationHandle)
+ */
+ @Override
+ public OperationStatus getOperationStatus(OperationHandle opHandle)
+ throws HiveSQLException {
+ Operation operation = sessionManager.getOperationManager().getOperation(opHandle);
+ /**
+ * If this is a background operation run asynchronously,
+ * we block for a configured duration, before we return
+ * (duration: HIVE_SERVER2_LONG_POLLING_TIMEOUT).
+ * However, if the background operation is complete, we return immediately.
+ */
+ if (operation.shouldRunAsync()) {
+ HiveConf conf = operation.getParentSession().getHiveConf();
+ long timeout = HiveConf.getTimeVar(conf,
+ HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT, TimeUnit.MILLISECONDS);
+ try {
+ operation.getBackgroundHandle().get(timeout, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ // No Op, return to the caller since long polling timeout has expired
+ LOG.trace(opHandle + ": Long polling timed out");
+ } catch (CancellationException e) {
+ // The background operation thread was cancelled
+ LOG.trace(opHandle + ": The background operation was cancelled", e);
+ } catch (ExecutionException e) {
+ // The background operation thread was aborted
+ LOG.warn(opHandle + ": The background operation was aborted", e);
+ } catch (InterruptedException e) {
+ // No op, this thread was interrupted
+ // In this case, the call might return sooner than long polling timeout
+ }
+ }
+ OperationStatus opStatus = operation.getStatus();
+ LOG.debug(opHandle + ": getOperationStatus()");
+ return opStatus;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hive.service.cli.ICLIService#cancelOperation(org.apache.hive.service.cli.OperationHandle)
+ */
+ @Override
+ public void cancelOperation(OperationHandle opHandle)
+ throws HiveSQLException {
+ sessionManager.getOperationManager().getOperation(opHandle)
+ .getParentSession().cancelOperation(opHandle);
+ LOG.debug(opHandle + ": cancelOperation()");
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hive.service.cli.ICLIService#closeOperation(org.apache.hive.service.cli.OperationHandle)
+ */
+ @Override
+ public void closeOperation(OperationHandle opHandle)
+ throws HiveSQLException {
+ sessionManager.getOperationManager().getOperation(opHandle)
+ .getParentSession().closeOperation(opHandle);
+ LOG.debug(opHandle + ": closeOperation");
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hive.service.cli.ICLIService#getResultSetMetadata(org.apache.hive.service.cli.OperationHandle)
+ */
+ @Override
+ public TableSchema getResultSetMetadata(OperationHandle opHandle)
+ throws HiveSQLException {
+ TableSchema tableSchema = sessionManager.getOperationManager()
+ .getOperation(opHandle).getParentSession().getResultSetMetadata(opHandle);
+ LOG.debug(opHandle + ": getResultSetMetadata()");
+ return tableSchema;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hive.service.cli.ICLIService#fetchResults(org.apache.hive.service.cli.OperationHandle)
+ */
+ @Override
+ public RowSet fetchResults(OperationHandle opHandle)
+ throws HiveSQLException {
+ return fetchResults(opHandle, Operation.DEFAULT_FETCH_ORIENTATION,
+ Operation.DEFAULT_FETCH_MAX_ROWS, FetchType.QUERY_OUTPUT);
+ }
+
+ @Override
+ public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation,
+ long maxRows, FetchType fetchType) throws HiveSQLException {
+ RowSet rowSet = sessionManager.getOperationManager().getOperation(opHandle)
+ .getParentSession().fetchResults(opHandle, orientation, maxRows, fetchType);
+ LOG.debug(opHandle + ": fetchResults()");
+ return rowSet;
+ }
+
+ // obtain delegation token for the give user from metastore
+ public synchronized String getDelegationTokenFromMetaStore(String owner)
+ throws HiveSQLException, UnsupportedOperationException, LoginException, IOException {
+ if (!hiveConf.getBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL) ||
+ !hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) {
+ throw new UnsupportedOperationException(
+ "delegation token is can only be obtained for a secure remote metastore");
+ }
+
+ try {
+ Hive.closeCurrent();
+ return Hive.get(hiveConf).getDelegationToken(owner, owner);
+ } catch (HiveException e) {
+ if (e.getCause() instanceof UnsupportedOperationException) {
+ throw (UnsupportedOperationException)e.getCause();
+ } else {
+ throw new HiveSQLException("Error connect metastore to setup impersonation", e);
+ }
+ }
+ }
+
+ @Override
+ public String getDelegationToken(SessionHandle sessionHandle, HiveAuthFactory authFactory,
+ String owner, String renewer) throws HiveSQLException {
+ String delegationToken = sessionManager.getSession(sessionHandle).
+ getDelegationToken(authFactory, owner, renewer);
+ LOG.info(sessionHandle + ": getDelegationToken()");
+ return delegationToken;
+ }
+
+ @Override
+ public void cancelDelegationToken(SessionHandle sessionHandle, HiveAuthFactory authFactory,
+ String tokenStr) throws HiveSQLException {
+ sessionManager.getSession(sessionHandle).
+ cancelDelegationToken(authFactory, tokenStr);
+ LOG.info(sessionHandle + ": cancelDelegationToken()");
+ }
+
+ @Override
+ public void renewDelegationToken(SessionHandle sessionHandle, HiveAuthFactory authFactory,
+ String tokenStr) throws HiveSQLException {
+ sessionManager.getSession(sessionHandle).renewDelegationToken(authFactory, tokenStr);
+ LOG.info(sessionHandle + ": renewDelegationToken()");
+ }
+
+ public SessionManager getSessionManager() {
+ return sessionManager;
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/7feeb82c/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/CLIServiceClient.java
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/CLIServiceClient.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/CLIServiceClient.java
new file mode 100644
index 0000000..3155c23
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/CLIServiceClient.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli;
+
+import java.util.Collections;
+
+import org.apache.hive.service.auth.HiveAuthFactory;
+
+
+/**
+ * CLIServiceClient.
+ *
+ */
+public abstract class CLIServiceClient implements ICLIService {
+ private static final long DEFAULT_MAX_ROWS = 1000;
+
+ public SessionHandle openSession(String username, String password)
+ throws HiveSQLException {
+ return openSession(username, password, Collections.<String, String>emptyMap());
+ }
+
+ @Override
+ public RowSet fetchResults(OperationHandle opHandle) throws HiveSQLException {
+ // TODO: provide STATIC default value
+ return fetchResults(opHandle, FetchOrientation.FETCH_NEXT, DEFAULT_MAX_ROWS, FetchType.QUERY_OUTPUT);
+ }
+
+ @Override
+ public abstract String getDelegationToken(SessionHandle sessionHandle, HiveAuthFactory authFactory,
+ String owner, String renewer) throws HiveSQLException;
+
+ @Override
+ public abstract void cancelDelegationToken(SessionHandle sessionHandle, HiveAuthFactory authFactory,
+ String tokenStr) throws HiveSQLException;
+
+ @Override
+ public abstract void renewDelegationToken(SessionHandle sessionHandle, HiveAuthFactory authFactory,
+ String tokenStr) throws HiveSQLException;
+
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/7feeb82c/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/CLIServiceUtils.java
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/CLIServiceUtils.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/CLIServiceUtils.java
new file mode 100644
index 0000000..9d64b10
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/CLIServiceUtils.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli;
+
+import org.apache.log4j.Layout;
+import org.apache.log4j.PatternLayout;
+
+/**
+ * CLIServiceUtils.
+ *
+ */
+public class CLIServiceUtils {
+
+
+ private static final char SEARCH_STRING_ESCAPE = '\\';
+ public static final Layout verboseLayout = new PatternLayout(
+ "%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n");
+ public static final Layout nonVerboseLayout = new PatternLayout(
+ "%-5p : %m%n");
+
+ /**
+ * Convert a SQL search pattern into an equivalent Java Regex.
+ *
+ * @param pattern input which may contain '%' or '_' wildcard characters, or
+ * these characters escaped using {@link #getSearchStringEscape()}.
+ * @return replace %/_ with regex search characters, also handle escaped
+ * characters.
+ */
+ public static String patternToRegex(String pattern) {
+ if (pattern == null) {
+ return ".*";
+ } else {
+ StringBuilder result = new StringBuilder(pattern.length());
+
+ boolean escaped = false;
+ for (int i = 0, len = pattern.length(); i < len; i++) {
+ char c = pattern.charAt(i);
+ if (escaped) {
+ if (c != SEARCH_STRING_ESCAPE) {
+ escaped = false;
+ }
+ result.append(c);
+ } else {
+ if (c == SEARCH_STRING_ESCAPE) {
+ escaped = true;
+ continue;
+ } else if (c == '%') {
+ result.append(".*");
+ } else if (c == '_') {
+ result.append('.');
+ } else {
+ result.append(Character.toLowerCase(c));
+ }
+ }
+ }
+ return result.toString();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/7feeb82c/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/Column.java
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/Column.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/Column.java
new file mode 100644
index 0000000..2e21f18
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/Column.java
@@ -0,0 +1,423 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli;
+
+import java.nio.ByteBuffer;
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.List;
+
+import com.google.common.primitives.Booleans;
+import com.google.common.primitives.Bytes;
+import com.google.common.primitives.Doubles;
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
+import com.google.common.primitives.Shorts;
+import org.apache.hive.service.cli.thrift.TBinaryColumn;
+import org.apache.hive.service.cli.thrift.TBoolColumn;
+import org.apache.hive.service.cli.thrift.TByteColumn;
+import org.apache.hive.service.cli.thrift.TColumn;
+import org.apache.hive.service.cli.thrift.TDoubleColumn;
+import org.apache.hive.service.cli.thrift.TI16Column;
+import org.apache.hive.service.cli.thrift.TI32Column;
+import org.apache.hive.service.cli.thrift.TI64Column;
+import org.apache.hive.service.cli.thrift.TStringColumn;
+
+/**
+ * Column.
+ */
+public class Column extends AbstractList {
+
+ private static final int DEFAULT_SIZE = 100;
+
+ private final Type type;
+
+ private BitSet nulls;
+
+ private int size;
+ private boolean[] boolVars;
+ private byte[] byteVars;
+ private short[] shortVars;
+ private int[] intVars;
+ private long[] longVars;
+ private double[] doubleVars;
+ private List<String> stringVars;
+ private List<ByteBuffer> binaryVars;
+
+ public Column(Type type, BitSet nulls, Object values) {
+ this.type = type;
+ this.nulls = nulls;
+ if (type == Type.BOOLEAN_TYPE) {
+ boolVars = (boolean[]) values;
+ size = boolVars.length;
+ } else if (type == Type.TINYINT_TYPE) {
+ byteVars = (byte[]) values;
+ size = byteVars.length;
+ } else if (type == Type.SMALLINT_TYPE) {
+ shortVars = (short[]) values;
+ size = shortVars.length;
+ } else if (type == Type.INT_TYPE) {
+ intVars = (int[]) values;
+ size = intVars.length;
+ } else if (type == Type.BIGINT_TYPE) {
+ longVars = (long[]) values;
+ size = longVars.length;
+ } else if (type == Type.DOUBLE_TYPE) {
+ doubleVars = (double[]) values;
+ size = doubleVars.length;
+ } else if (type == Type.BINARY_TYPE) {
+ binaryVars = (List<ByteBuffer>) values;
+ size = binaryVars.size();
+ } else if (type == Type.STRING_TYPE) {
+ stringVars = (List<String>) values;
+ size = stringVars.size();
+ } else {
+ throw new IllegalStateException("invalid union object");
+ }
+ }
+
+ public Column(Type type) {
+ nulls = new BitSet();
+ switch (type) {
+ case BOOLEAN_TYPE:
+ boolVars = new boolean[DEFAULT_SIZE];
+ break;
+ case TINYINT_TYPE:
+ byteVars = new byte[DEFAULT_SIZE];
+ break;
+ case SMALLINT_TYPE:
+ shortVars = new short[DEFAULT_SIZE];
+ break;
+ case INT_TYPE:
+ intVars = new int[DEFAULT_SIZE];
+ break;
+ case BIGINT_TYPE:
+ longVars = new long[DEFAULT_SIZE];
+ break;
+ case FLOAT_TYPE:
+ case DOUBLE_TYPE:
+ type = Type.DOUBLE_TYPE;
+ doubleVars = new double[DEFAULT_SIZE];
+ break;
+ case BINARY_TYPE:
+ binaryVars = new ArrayList<ByteBuffer>();
+ break;
+ default:
+ type = Type.STRING_TYPE;
+ stringVars = new ArrayList<String>();
+ }
+ this.type = type;
+ }
+
+ public Column(TColumn colValues) {
+ if (colValues.isSetBoolVal()) {
+ type = Type.BOOLEAN_TYPE;
+ nulls = toBitset(colValues.getBoolVal().getNulls());
+ boolVars = Booleans.toArray(colValues.getBoolVal().getValues());
+ size = boolVars.length;
+ } else if (colValues.isSetByteVal()) {
+ type = Type.TINYINT_TYPE;
+ nulls = toBitset(colValues.getByteVal().getNulls());
+ byteVars = Bytes.toArray(colValues.getByteVal().getValues());
+ size = byteVars.length;
+ } else if (colValues.isSetI16Val()) {
+ type = Type.SMALLINT_TYPE;
+ nulls = toBitset(colValues.getI16Val().getNulls());
+ shortVars = Shorts.toArray(colValues.getI16Val().getValues());
+ size = shortVars.length;
+ } else if (colValues.isSetI32Val()) {
+ type = Type.INT_TYPE;
+ nulls = toBitset(colValues.getI32Val().getNulls());
+ intVars = Ints.toArray(colValues.getI32Val().getValues());
+ size = intVars.length;
+ } else if (colValues.isSetI64Val()) {
+ type = Type.BIGINT_TYPE;
+ nulls = toBitset(colValues.getI64Val().getNulls());
+ longVars = Longs.toArray(colValues.getI64Val().getValues());
+ size = longVars.length;
+ } else if (colValues.isSetDoubleVal()) {
+ type = Type.DOUBLE_TYPE;
+ nulls = toBitset(colValues.getDoubleVal().getNulls());
+ doubleVars = Doubles.toArray(colValues.getDoubleVal().getValues());
+ size = doubleVars.length;
+ } else if (colValues.isSetBinaryVal()) {
+ type = Type.BINARY_TYPE;
+ nulls = toBitset(colValues.getBinaryVal().getNulls());
+ binaryVars = colValues.getBinaryVal().getValues();
+ size = binaryVars.size();
+ } else if (colValues.isSetStringVal()) {
+ type = Type.STRING_TYPE;
+ nulls = toBitset(colValues.getStringVal().getNulls());
+ stringVars = colValues.getStringVal().getValues();
+ size = stringVars.size();
+ } else {
+ throw new IllegalStateException("invalid union object");
+ }
+ }
+
+ public Column extractSubset(int start, int end) {
+ BitSet subNulls = nulls.get(start, end);
+ if (type == Type.BOOLEAN_TYPE) {
+ Column subset = new Column(type, subNulls, Arrays.copyOfRange(boolVars, start, end));
+ boolVars = Arrays.copyOfRange(boolVars, end, size);
+ nulls = nulls.get(start, size);
+ size = boolVars.length;
+ return subset;
+ }
+ if (type == Type.TINYINT_TYPE) {
+ Column subset = new Column(type, subNulls, Arrays.copyOfRange(byteVars, start, end));
+ byteVars = Arrays.copyOfRange(byteVars, end, size);
+ nulls = nulls.get(start, size);
+ size = byteVars.length;
+ return subset;
+ }
+ if (type == Type.SMALLINT_TYPE) {
+ Column subset = new Column(type, subNulls, Arrays.copyOfRange(shortVars, start, end));
+ shortVars = Arrays.copyOfRange(shortVars, end, size);
+ nulls = nulls.get(start, size);
+ size = shortVars.length;
+ return subset;
+ }
+ if (type == Type.INT_TYPE) {
+ Column subset = new Column(type, subNulls, Arrays.copyOfRange(intVars, start, end));
+ intVars = Arrays.copyOfRange(intVars, end, size);
+ nulls = nulls.get(start, size);
+ size = intVars.length;
+ return subset;
+ }
+ if (type == Type.BIGINT_TYPE) {
+ Column subset = new Column(type, subNulls, Arrays.copyOfRange(longVars, start, end));
+ longVars = Arrays.copyOfRange(longVars, end, size);
+ nulls = nulls.get(start, size);
+ size = longVars.length;
+ return subset;
+ }
+ if (type == Type.DOUBLE_TYPE) {
+ Column subset = new Column(type, subNulls, Arrays.copyOfRange(doubleVars, start, end));
+ doubleVars = Arrays.copyOfRange(doubleVars, end, size);
+ nulls = nulls.get(start, size);
+ size = doubleVars.length;
+ return subset;
+ }
+ if (type == Type.BINARY_TYPE) {
+ Column subset = new Column(type, subNulls, binaryVars.subList(start, end));
+ binaryVars = binaryVars.subList(end, binaryVars.size());
+ nulls = nulls.get(start, size);
+ size = binaryVars.size();
+ return subset;
+ }
+ if (type == Type.STRING_TYPE) {
+ Column subset = new Column(type, subNulls, stringVars.subList(start, end));
+ stringVars = stringVars.subList(end, stringVars.size());
+ nulls = nulls.get(start, size);
+ size = stringVars.size();
+ return subset;
+ }
+ throw new IllegalStateException("invalid union object");
+ }
+
+ private static final byte[] MASKS = new byte[] {
+ 0x01, 0x02, 0x04, 0x08, 0x10, 0x20, 0x40, (byte)0x80
+ };
+
+ private static BitSet toBitset(byte[] nulls) {
+ BitSet bitset = new BitSet();
+ int bits = nulls.length * 8;
+ for (int i = 0; i < bits; i++) {
+ bitset.set(i, (nulls[i / 8] & MASKS[i % 8]) != 0);
+ }
+ return bitset;
+ }
+
+ private static byte[] toBinary(BitSet bitset) {
+ byte[] nulls = new byte[1 + (bitset.length() / 8)];
+ for (int i = 0; i < bitset.length(); i++) {
+ nulls[i / 8] |= bitset.get(i) ? MASKS[i % 8] : 0;
+ }
+ return nulls;
+ }
+
+ public Type getType() {
+ return type;
+ }
+
+ @Override
+ public Object get(int index) {
+ if (nulls.get(index)) {
+ return null;
+ }
+ switch (type) {
+ case BOOLEAN_TYPE:
+ return boolVars[index];
+ case TINYINT_TYPE:
+ return byteVars[index];
+ case SMALLINT_TYPE:
+ return shortVars[index];
+ case INT_TYPE:
+ return intVars[index];
+ case BIGINT_TYPE:
+ return longVars[index];
+ case DOUBLE_TYPE:
+ return doubleVars[index];
+ case STRING_TYPE:
+ return stringVars.get(index);
+ case BINARY_TYPE:
+ return binaryVars.get(index).array();
+ }
+ return null;
+ }
+
+ @Override
+ public int size() {
+ return size;
+ }
+
+ public TColumn toTColumn() {
+ TColumn value = new TColumn();
+ ByteBuffer nullMasks = ByteBuffer.wrap(toBinary(nulls));
+ switch (type) {
+ case BOOLEAN_TYPE:
+ value.setBoolVal(new TBoolColumn(Booleans.asList(Arrays.copyOfRange(boolVars, 0, size)), nullMasks));
+ break;
+ case TINYINT_TYPE:
+ value.setByteVal(new TByteColumn(Bytes.asList(Arrays.copyOfRange(byteVars, 0, size)), nullMasks));
+ break;
+ case SMALLINT_TYPE:
+ value.setI16Val(new TI16Column(Shorts.asList(Arrays.copyOfRange(shortVars, 0, size)), nullMasks));
+ break;
+ case INT_TYPE:
+ value.setI32Val(new TI32Column(Ints.asList(Arrays.copyOfRange(intVars, 0, size)), nullMasks));
+ break;
+ case BIGINT_TYPE:
+ value.setI64Val(new TI64Column(Longs.asList(Arrays.copyOfRange(longVars, 0, size)), nullMasks));
+ break;
+ case DOUBLE_TYPE:
+ value.setDoubleVal(new TDoubleColumn(Doubles.asList(Arrays.copyOfRange(doubleVars, 0, size)), nullMasks));
+ break;
+ case STRING_TYPE:
+ value.setStringVal(new TStringColumn(stringVars, nullMasks));
+ break;
+ case BINARY_TYPE:
+ value.setBinaryVal(new TBinaryColumn(binaryVars, nullMasks));
+ break;
+ }
+ return value;
+ }
+
+ private static final ByteBuffer EMPTY_BINARY = ByteBuffer.allocate(0);
+ private static final String EMPTY_STRING = "";
+
+ public void addValue(Type type, Object field) {
+ switch (type) {
+ case BOOLEAN_TYPE:
+ nulls.set(size, field == null);
+ boolVars()[size] = field == null ? true : (Boolean)field;
+ break;
+ case TINYINT_TYPE:
+ nulls.set(size, field == null);
+ byteVars()[size] = field == null ? 0 : (Byte) field;
+ break;
+ case SMALLINT_TYPE:
+ nulls.set(size, field == null);
+ shortVars()[size] = field == null ? 0 : (Short)field;
+ break;
+ case INT_TYPE:
+ nulls.set(size, field == null);
+ intVars()[size] = field == null ? 0 : (Integer)field;
+ break;
+ case BIGINT_TYPE:
+ nulls.set(size, field == null);
+ longVars()[size] = field == null ? 0 : (Long)field;
+ break;
+ case FLOAT_TYPE:
+ nulls.set(size, field == null);
+ doubleVars()[size] = field == null ? 0 : ((Float)field).doubleValue();
+ break;
+ case DOUBLE_TYPE:
+ nulls.set(size, field == null);
+ doubleVars()[size] = field == null ? 0 : (Double)field;
+ break;
+ case BINARY_TYPE:
+ nulls.set(binaryVars.size(), field == null);
+ binaryVars.add(field == null ? EMPTY_BINARY : ByteBuffer.wrap((byte[])field));
+ break;
+ default:
+ nulls.set(stringVars.size(), field == null);
+ stringVars.add(field == null ? EMPTY_STRING : String.valueOf(field));
+ break;
+ }
+ size++;
+ }
+
+ private boolean[] boolVars() {
+ if (boolVars.length == size) {
+ boolean[] newVars = new boolean[size << 1];
+ System.arraycopy(boolVars, 0, newVars, 0, size);
+ return boolVars = newVars;
+ }
+ return boolVars;
+ }
+
+ private byte[] byteVars() {
+ if (byteVars.length == size) {
+ byte[] newVars = new byte[size << 1];
+ System.arraycopy(byteVars, 0, newVars, 0, size);
+ return byteVars = newVars;
+ }
+ return byteVars;
+ }
+
+ private short[] shortVars() {
+ if (shortVars.length == size) {
+ short[] newVars = new short[size << 1];
+ System.arraycopy(shortVars, 0, newVars, 0, size);
+ return shortVars = newVars;
+ }
+ return shortVars;
+ }
+
+ private int[] intVars() {
+ if (intVars.length == size) {
+ int[] newVars = new int[size << 1];
+ System.arraycopy(intVars, 0, newVars, 0, size);
+ return intVars = newVars;
+ }
+ return intVars;
+ }
+
+ private long[] longVars() {
+ if (longVars.length == size) {
+ long[] newVars = new long[size << 1];
+ System.arraycopy(longVars, 0, newVars, 0, size);
+ return longVars = newVars;
+ }
+ return longVars;
+ }
+
+ private double[] doubleVars() {
+ if (doubleVars.length == size) {
+ double[] newVars = new double[size << 1];
+ System.arraycopy(doubleVars, 0, newVars, 0, size);
+ return doubleVars = newVars;
+ }
+ return doubleVars;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org