You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/09/08 06:38:26 UTC
svn commit: r1623263 [28/28] - in /hive/branches/spark: ./
accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/
ant/src/org/apache/hadoop/hive/ant/ beeline/src/java/org/apache/hive/beeline/
beeline/src/test/org/apache/hive/beeline/ bin/...
Modified: hive/branches/spark/service/src/java/org/apache/hive/service/auth/PlainSaslHelper.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/auth/PlainSaslHelper.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/auth/PlainSaslHelper.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/auth/PlainSaslHelper.java Mon Sep 8 04:38:17 2014
@@ -18,8 +18,8 @@
package org.apache.hive.service.auth;
import java.io.IOException;
+import java.security.Security;
import java.util.HashMap;
-
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.callback.NameCallback;
@@ -30,10 +30,8 @@ import javax.security.sasl.Authenticatio
import javax.security.sasl.AuthorizeCallback;
import javax.security.sasl.SaslException;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hive.service.auth.PlainSaslServer.SaslPlainProvider;
import org.apache.hive.service.auth.AuthenticationProviderFactory.AuthMethods;
-import org.apache.hive.service.cli.thrift.TCLIService;
+import org.apache.hive.service.auth.PlainSaslServer.SaslPlainProvider;
import org.apache.hive.service.cli.thrift.TCLIService.Iface;
import org.apache.hive.service.cli.thrift.ThriftCLIService;
import org.apache.thrift.TProcessor;
@@ -43,78 +41,108 @@ import org.apache.thrift.transport.TSasl
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportFactory;
-public class PlainSaslHelper {
+public final class PlainSaslHelper {
+
+ public static TProcessorFactory getPlainProcessorFactory(ThriftCLIService service) {
+ return new SQLPlainProcessorFactory(service);
+ }
+
+ // Register Plain SASL server provider
+ static {
+ Security.addProvider(new SaslPlainProvider());
+ }
+
+ public static TTransportFactory getPlainTransportFactory(String authTypeStr)
+ throws LoginException {
+ TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
+ try {
+ saslFactory.addServerDefinition("PLAIN", authTypeStr, null, new HashMap<String, String>(),
+ new PlainServerCallbackHandler(authTypeStr));
+ } catch (AuthenticationException e) {
+ throw new LoginException("Error setting callback handler" + e);
+ }
+ return saslFactory;
+ }
+
+ public static TTransport getPlainTransport(String username, String password,
+ TTransport underlyingTransport) throws SaslException {
+ return new TSaslClientTransport("PLAIN", null, null, null, new HashMap<String, String>(),
+ new PlainCallbackHandler(username, password), underlyingTransport);
+ }
+
+ private PlainSaslHelper() {
+ throw new UnsupportedOperationException("Can't initialize class");
+ }
+
+ private static final class PlainServerCallbackHandler implements CallbackHandler {
- private static class PlainServerCallbackHandler implements CallbackHandler {
private final AuthMethods authMethod;
- public PlainServerCallbackHandler(String authMethodStr) throws AuthenticationException {
+
+ PlainServerCallbackHandler(String authMethodStr) throws AuthenticationException {
authMethod = AuthMethods.getValidAuthMethod(authMethodStr);
}
@Override
public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
- String userName = null;
- String passWord = null;
+ String username = null;
+ String password = null;
AuthorizeCallback ac = null;
- for (int i = 0; i < callbacks.length; i++) {
- if (callbacks[i] instanceof NameCallback) {
- NameCallback nc = (NameCallback)callbacks[i];
- userName = nc.getName();
- } else if (callbacks[i] instanceof PasswordCallback) {
- PasswordCallback pc = (PasswordCallback)callbacks[i];
- passWord = new String(pc.getPassword());
- } else if (callbacks[i] instanceof AuthorizeCallback) {
- ac = (AuthorizeCallback) callbacks[i];
+ for (Callback callback : callbacks) {
+ if (callback instanceof NameCallback) {
+ NameCallback nc = (NameCallback) callback;
+ username = nc.getName();
+ } else if (callback instanceof PasswordCallback) {
+ PasswordCallback pc = (PasswordCallback) callback;
+ password = new String(pc.getPassword());
+ } else if (callback instanceof AuthorizeCallback) {
+ ac = (AuthorizeCallback) callback;
} else {
- throw new UnsupportedCallbackException(callbacks[i]);
+ throw new UnsupportedCallbackException(callback);
}
}
PasswdAuthenticationProvider provider =
- AuthenticationProviderFactory.getAuthenticationProvider(authMethod);
- provider.Authenticate(userName, passWord);
+ AuthenticationProviderFactory.getAuthenticationProvider(authMethod);
+ provider.Authenticate(username, password);
if (ac != null) {
ac.setAuthorized(true);
}
}
}
- public static class PlainClientbackHandler implements CallbackHandler {
+ public static class PlainCallbackHandler implements CallbackHandler {
- private final String userName;
- private final String passWord;
+ private final String username;
+ private final String password;
- public PlainClientbackHandler (String userName, String passWord) {
- this.userName = userName;
- this.passWord = passWord;
+ public PlainCallbackHandler(String username, String password) {
+ this.username = username;
+ this.password = password;
}
@Override
- public void handle(Callback[] callbacks)
- throws IOException, UnsupportedCallbackException {
- AuthorizeCallback ac = null;
- for (int i = 0; i < callbacks.length; i++) {
- if (callbacks[i] instanceof NameCallback) {
- NameCallback nameCallback = (NameCallback)callbacks[i];
- nameCallback.setName(userName);
- } else if (callbacks[i] instanceof PasswordCallback) {
- PasswordCallback passCallback = (PasswordCallback) callbacks[i];
- passCallback.setPassword(passWord.toCharArray());
+ public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+ for (Callback callback : callbacks) {
+ if (callback instanceof NameCallback) {
+ NameCallback nameCallback = (NameCallback) callback;
+ nameCallback.setName(username);
+ } else if (callback instanceof PasswordCallback) {
+ PasswordCallback passCallback = (PasswordCallback) callback;
+ passCallback.setPassword(password.toCharArray());
} else {
- throw new UnsupportedCallbackException(callbacks[i]);
+ throw new UnsupportedCallbackException(callback);
}
}
}
}
- private static class SQLPlainProcessorFactory extends TProcessorFactory {
+ private static final class SQLPlainProcessorFactory extends TProcessorFactory {
+
private final ThriftCLIService service;
- private final HiveConf conf;
- public SQLPlainProcessorFactory(ThriftCLIService service) {
+ SQLPlainProcessorFactory(ThriftCLIService service) {
super(null);
this.service = service;
- this.conf = service.getHiveConf();
}
@Override
@@ -123,33 +151,4 @@ public class PlainSaslHelper {
}
}
- public static TProcessorFactory getPlainProcessorFactory(ThriftCLIService service) {
- return new SQLPlainProcessorFactory(service);
- }
-
- // Register Plain SASL server provider
- static {
- java.security.Security.addProvider(new SaslPlainProvider());
- }
-
- public static TTransportFactory getPlainTransportFactory(String authTypeStr)
- throws LoginException {
- TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
- try {
- saslFactory.addServerDefinition("PLAIN",
- authTypeStr, null, new HashMap<String, String>(),
- new PlainServerCallbackHandler(authTypeStr));
- } catch (AuthenticationException e) {
- throw new LoginException ("Error setting callback handler" + e);
- }
- return saslFactory;
- }
-
- public static TTransport getPlainTransport(String userName, String passwd,
- final TTransport underlyingTransport) throws SaslException {
- return new TSaslClientTransport("PLAIN", null,
- null, null, new HashMap<String, String>(),
- new PlainClientbackHandler(userName, passwd), underlyingTransport);
- }
-
}
Modified: hive/branches/spark/service/src/java/org/apache/hive/service/auth/PlainSaslServer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/auth/PlainSaslServer.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/auth/PlainSaslServer.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/auth/PlainSaslServer.java Mon Sep 8 04:38:17 2014
@@ -18,10 +18,10 @@
package org.apache.hive.service.auth;
import java.io.IOException;
+import java.security.Provider;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Map;
-
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.callback.NameCallback;
@@ -35,27 +35,26 @@ import javax.security.sasl.SaslServerFac
import org.apache.hive.service.auth.AuthenticationProviderFactory.AuthMethods;
/**
- *
- * PlainSaslServer.
- * Sun JDK only provides PLAIN client and not server. This class implements the Plain SASL server
- * conforming to RFC #4616 (http://www.ietf.org/rfc/rfc4616.txt)
+ * Sun JDK only provides a PLAIN client and no server. This class implements the Plain SASL server
+ * conforming to RFC #4616 (http://www.ietf.org/rfc/rfc4616.txt).
*/
-public class PlainSaslServer implements SaslServer {
- private final AuthMethods authMethod;
+public class PlainSaslServer implements SaslServer {
+
+ public static final String PLAIN_METHOD = "PLAIN";
private String user;
- private String passwd;
- private String authzId;
private final CallbackHandler handler;
PlainSaslServer(CallbackHandler handler, String authMethodStr) throws SaslException {
this.handler = handler;
- this.authMethod = AuthMethods.getValidAuthMethod(authMethodStr);
+ AuthMethods.getValidAuthMethod(authMethodStr);
}
+ @Override
public String getMechanismName() {
- return "PLAIN";
+ return PLAIN_METHOD;
}
+ @Override
public byte[] evaluateResponse(byte[] response) throws SaslException {
try {
// parse the response
@@ -68,28 +67,29 @@ public class PlainSaslServer implements
tokenList.addLast(messageToken.toString());
messageToken = new StringBuilder();
} else {
- messageToken.append((char)b);
+ messageToken.append((char) b);
}
}
tokenList.addLast(messageToken.toString());
// validate response
- if ((tokenList.size() < 2) || (tokenList.size() > 3)) {
+ if (tokenList.size() < 2 || tokenList.size() > 3) {
throw new SaslException("Invalid message format");
}
- passwd = tokenList.removeLast();
+ String passwd = tokenList.removeLast();
user = tokenList.removeLast();
// optional authzid
- if (!tokenList.isEmpty()) {
- authzId = tokenList.removeLast();
- } else {
+ String authzId;
+ if (tokenList.isEmpty()) {
authzId = user;
+ } else {
+ authzId = tokenList.removeLast();
}
if (user == null || user.isEmpty()) {
- throw new SaslException("No user name provide");
+ throw new SaslException("No user name provided");
}
if (passwd == null || passwd.isEmpty()) {
- throw new SaslException("No password name provide");
+ throw new SaslException("No password name provided");
}
NameCallback nameCallback = new NameCallback("User");
@@ -98,7 +98,7 @@ public class PlainSaslServer implements
pcCallback.setPassword(passwd.toCharArray());
AuthorizeCallback acCallback = new AuthorizeCallback(user, authzId);
- Callback[] cbList = new Callback[] {nameCallback, pcCallback, acCallback};
+ Callback[] cbList = {nameCallback, pcCallback, acCallback};
handler.handle(cbList);
if (!acCallback.isAuthorized()) {
throw new SaslException("Authentication failed");
@@ -113,49 +113,62 @@ public class PlainSaslServer implements
return null;
}
+ @Override
public boolean isComplete() {
return user != null;
}
+ @Override
public String getAuthorizationID() {
return user;
}
+ @Override
public byte[] unwrap(byte[] incoming, int offset, int len) {
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException();
}
+ @Override
public byte[] wrap(byte[] outgoing, int offset, int len) {
throw new UnsupportedOperationException();
}
+ @Override
public Object getNegotiatedProperty(String propName) {
return null;
}
+ @Override
public void dispose() {}
public static class SaslPlainServerFactory implements SaslServerFactory {
- public SaslServer createSaslServer(
- String mechanism, String protocol, String serverName, Map<String,?> props, CallbackHandler cbh)
- {
- if ("PLAIN".equals(mechanism)) {
+ @Override
+ public SaslServer createSaslServer(String mechanism, String protocol, String serverName,
+ Map<String, ?> props, CallbackHandler cbh) {
+ if (PLAIN_METHOD.equals(mechanism)) {
try {
return new PlainSaslServer(cbh, protocol);
} catch (SaslException e) {
+ /* This is to fulfill the contract of the interface which states that an exception shall
+ be thrown when a SaslServer cannot be created due to an error but null should be
+ returned when a Server can't be created due to the parameters supplied. And the only
+ thing PlainSaslServer can fail on is a non-supported authentication mechanism.
+ That's why we return null instead of throwing the Exception */
return null;
}
}
return null;
}
+ @Override
public String[] getMechanismNames(Map<String, ?> props) {
- return new String[] { "PLAIN" };
+ return new String[] {PLAIN_METHOD};
}
}
- public static class SaslPlainProvider extends java.security.Provider {
+ public static class SaslPlainProvider extends Provider {
+
public SaslPlainProvider() {
super("HiveSaslPlain", 1.0, "Hive Plain SASL provider");
put("SaslServerFactory.PLAIN", SaslPlainServerFactory.class.getName());
Modified: hive/branches/spark/service/src/java/org/apache/hive/service/auth/SaslQOP.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/auth/SaslQOP.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/auth/SaslQOP.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/auth/SaslQOP.java Mon Sep 8 04:38:17 2014
@@ -22,7 +22,7 @@ import java.util.HashMap;
import java.util.Map;
/**
- * Possible values of SASL quality-of-protection value.
+ * Possible values of SASL quality-of-protection value.
*/
public enum SaslQOP {
AUTH("auth"), // Authentication only.
@@ -32,14 +32,15 @@ public enum SaslQOP {
public final String saslQop;
- private static final Map<String, SaslQOP> strToEnum
- = new HashMap<String, SaslQOP>();
+ private static final Map<String, SaslQOP> STR_TO_ENUM = new HashMap<String, SaslQOP>();
+
static {
- for (SaslQOP SaslQOP : values())
- strToEnum.put(SaslQOP.toString(), SaslQOP);
+ for (SaslQOP saslQop : values()) {
+ STR_TO_ENUM.put(saslQop.toString(), saslQop);
+ }
}
- private SaslQOP(final String saslQop) {
+ SaslQOP(String saslQop) {
this.saslQop = saslQop;
}
@@ -48,13 +49,13 @@ public enum SaslQOP {
}
public static SaslQOP fromString(String str) {
- if(str != null) {
+ if (str != null) {
str = str.toLowerCase();
}
- SaslQOP saslQOP = strToEnum.get(str);
- if(saslQOP == null) {
- throw new IllegalArgumentException("Unknown auth type: " + str + " Allowed values are: "
- + strToEnum.keySet());
+ SaslQOP saslQOP = STR_TO_ENUM.get(str);
+ if (saslQOP == null) {
+ throw new IllegalArgumentException(
+ "Unknown auth type: " + str + " Allowed values are: " + STR_TO_ENUM.keySet());
}
return saslQOP;
}
Modified: hive/branches/spark/service/src/java/org/apache/hive/service/auth/TSetIpAddressProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/auth/TSetIpAddressProcessor.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/auth/TSetIpAddressProcessor.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/auth/TSetIpAddressProcessor.java Mon Sep 8 04:38:17 2014
@@ -34,10 +34,12 @@ import org.slf4j.LoggerFactory;
* This class is responsible for setting the ipAddress for operations executed via HiveServer2.
* <p>
* <ul>
- * <li>Ipaddress is only set for operations that calls listeners with hookContext @see ExecuteWithHookContext.</li>
- * <li>Ipaddress is only set if the underlying transport mechanism is socket. </li>
+ * <li>IP address is only set for operations that calls listeners with hookContext</li>
+ * <li>IP address is only set if the underlying transport mechanism is socket</li>
* </ul>
* </p>
+ *
+ * @see org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext
*/
public class TSetIpAddressProcessor<I extends Iface> extends TCLIService.Processor<Iface> {
@@ -54,26 +56,26 @@ public class TSetIpAddressProcessor<I ex
try {
return super.process(in, out);
} finally {
- threadLocalUserName.remove();
- threadLocalIpAddress.remove();
+ THREAD_LOCAL_USER_NAME.remove();
+ THREAD_LOCAL_IP_ADDRESS.remove();
}
}
private void setUserName(final TProtocol in) {
TTransport transport = in.getTransport();
if (transport instanceof TSaslServerTransport) {
- String userName = ((TSaslServerTransport)transport).getSaslServer().getAuthorizationID();
- threadLocalUserName.set(userName);
+ String userName = ((TSaslServerTransport) transport).getSaslServer().getAuthorizationID();
+ THREAD_LOCAL_USER_NAME.set(userName);
}
}
protected void setIpAddress(final TProtocol in) {
TTransport transport = in.getTransport();
TSocket tSocket = getUnderlyingSocketFromTransport(transport);
- if (tSocket != null) {
- threadLocalIpAddress.set(tSocket.getSocket().getInetAddress().toString());
- } else {
+ if (tSocket == null) {
LOGGER.warn("Unknown Transport, cannot determine ipAddress");
+ } else {
+ THREAD_LOCAL_IP_ADDRESS.set(tSocket.getSocket().getInetAddress().toString());
}
}
@@ -92,14 +94,14 @@ public class TSetIpAddressProcessor<I ex
return null;
}
- private static ThreadLocal<String> threadLocalIpAddress = new ThreadLocal<String>() {
+ private static final ThreadLocal<String> THREAD_LOCAL_IP_ADDRESS = new ThreadLocal<String>() {
@Override
protected synchronized String initialValue() {
return null;
}
};
- private static ThreadLocal<String> threadLocalUserName = new ThreadLocal<String>(){
+ private static final ThreadLocal<String> THREAD_LOCAL_USER_NAME = new ThreadLocal<String>() {
@Override
protected synchronized String initialValue() {
return null;
@@ -107,10 +109,10 @@ public class TSetIpAddressProcessor<I ex
};
public static String getUserIpAddress() {
- return threadLocalIpAddress.get();
+ return THREAD_LOCAL_IP_ADDRESS.get();
}
public static String getUserName() {
- return threadLocalUserName.get();
+ return THREAD_LOCAL_USER_NAME.get();
}
-}
\ No newline at end of file
+}
Modified: hive/branches/spark/service/src/java/org/apache/hive/service/auth/TSubjectAssumingTransport.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/auth/TSubjectAssumingTransport.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/auth/TSubjectAssumingTransport.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/auth/TSubjectAssumingTransport.java Mon Sep 8 04:38:17 2014
@@ -22,7 +22,6 @@ import java.security.AccessControlContex
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
-
import javax.security.auth.Subject;
import org.apache.hadoop.hive.thrift.TFilterTransport;
@@ -30,43 +29,42 @@ import org.apache.thrift.transport.TTran
import org.apache.thrift.transport.TTransportException;
/**
- *
- * This is used on the client side, where the API explicitly opens a transport to
- * the server using the Subject.doAs()
- */
- public class TSubjectAssumingTransport extends TFilterTransport {
-
- public TSubjectAssumingTransport(TTransport wrapped) {
- super(wrapped);
- }
-
- @Override
- public void open() throws TTransportException {
- try {
- AccessControlContext context = AccessController.getContext();
- Subject subject = Subject.getSubject(context);
- Subject.doAs(subject, new PrivilegedExceptionAction<Void>() {
- public Void run() {
- try {
- wrapped.open();
- } catch (TTransportException tte) {
- // Wrap the transport exception in an RTE, since Subject.doAs() then goes
- // and unwraps this for us out of the doAs block. We then unwrap one
- // more time in our catch clause to get back the TTE. (ugh)
- throw new RuntimeException(tte);
- }
- return null;
- }
- });
- } catch (PrivilegedActionException ioe) {
- throw new RuntimeException("Received an ioe we never threw!", ioe);
- } catch (RuntimeException rte) {
- if (rte.getCause() instanceof TTransportException) {
- throw (TTransportException)rte.getCause();
- } else {
- throw rte;
- }
- }
- }
+ * This is used on the client side, where the API explicitly opens a transport to
+ * the server using the Subject.doAs().
+ */
+public class TSubjectAssumingTransport extends TFilterTransport {
+
+ public TSubjectAssumingTransport(TTransport wrapped) {
+ super(wrapped);
+ }
+
+ @Override
+ public void open() throws TTransportException {
+ try {
+ AccessControlContext context = AccessController.getContext();
+ Subject subject = Subject.getSubject(context);
+ Subject.doAs(subject, new PrivilegedExceptionAction<Void>() {
+ public Void run() {
+ try {
+ wrapped.open();
+ } catch (TTransportException tte) {
+ // Wrap the transport exception in an RTE, since Subject.doAs() then goes
+ // and unwraps this for us out of the doAs block. We then unwrap one
+ // more time in our catch clause to get back the TTE. (ugh)
+ throw new RuntimeException(tte);
+ }
+ return null;
+ }
+ });
+ } catch (PrivilegedActionException ioe) {
+ throw new RuntimeException("Received an ioe we never threw!", ioe);
+ } catch (RuntimeException rte) {
+ if (rte.getCause() instanceof TTransportException) {
+ throw (TTransportException) rte.getCause();
+ } else {
+ throw rte;
+ }
+ }
+ }
- }
+}
Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/CLIService.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/CLIService.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/cli/CLIService.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/CLIService.java Mon Sep 8 04:38:17 2014
@@ -30,12 +30,8 @@ import javax.security.auth.login.LoginEx
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.conf.SystemVariables;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.ql.metadata.Hive;
@@ -118,15 +114,6 @@ public class CLIService extends Composit
@Override
public synchronized void start() {
super.start();
-
- try {
- // make sure that the base scratch directories exists and writable
- setupStagingDir(hiveConf.getVar(HiveConf.ConfVars.SCRATCHDIR), false);
- setupStagingDir(hiveConf.getVar(HiveConf.ConfVars.LOCALSCRATCHDIR), true);
- setupStagingDir(hiveConf.getVar(HiveConf.ConfVars.DOWNLOADED_RESOURCES_DIR), true);
- } catch (IOException eIO) {
- throw new ServiceException("Error setting stage directories", eIO);
- }
// Initialize and test a connection to the metastore
IMetaStoreClient metastoreClient = null;
try {
@@ -362,8 +349,9 @@ public class CLIService extends Composit
* However, if the background operation is complete, we return immediately.
*/
if (operation.shouldRunAsync()) {
- long timeout = operation.getParentSession().getHiveConf().getLongVar(
- HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT);
+ HiveConf conf = operation.getParentSession().getHiveConf();
+ long timeout = HiveConf.getTimeVar(conf,
+ HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT, TimeUnit.MILLISECONDS);
try {
operation.getBackgroundHandle().get(timeout, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
@@ -459,25 +447,6 @@ public class CLIService extends Composit
}
}
- // create the give Path if doesn't exists and make it writable
- private void setupStagingDir(String dirPath, boolean isLocal) throws IOException {
- Path scratchDir = getStaticPath(new Path(dirPath));
- if (scratchDir == null) {
- return;
- }
- FileSystem fs;
- if (isLocal) {
- fs = FileSystem.getLocal(hiveConf);
- } else {
- fs = scratchDir.getFileSystem(hiveConf);
- }
- if (!fs.exists(scratchDir)) {
- fs.mkdirs(scratchDir);
- }
- FsPermission fsPermission = new FsPermission((short)0777);
- fs.setPermission(scratchDir, fsPermission);
- }
-
@Override
public String getDelegationToken(SessionHandle sessionHandle, HiveAuthFactory authFactory,
String owner, String renewer) throws HiveSQLException {
@@ -501,16 +470,4 @@ public class CLIService extends Composit
sessionManager.getSession(sessionHandle).renewDelegationToken(authFactory, tokenStr);
LOG.info(sessionHandle + ": renewDelegationToken()");
}
-
- // DOWNLOADED_RESOURCES_DIR for example, which is by default ${system:java.io.tmpdir}/${hive.session.id}_resources,
- // {system:java.io.tmpdir} would be already evaluated but ${hive.session.id} would be not in here.
- // for that case, this returns evaluatd parts only, in this case, "/tmp"
- // what for ${hive.session.id}_resources/${system:java.io.tmpdir}? just don't do that.
- private Path getStaticPath(Path path) {
- Path current = path;
- for (; current != null && SystemVariables.containsVar(current.getName());
- current = current.getParent()) {
- }
- return current;
- }
}
Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/OperationState.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/OperationState.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/cli/OperationState.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/OperationState.java Mon Sep 8 04:38:17 2014
@@ -25,29 +25,26 @@ import org.apache.hive.service.cli.thrif
*
*/
public enum OperationState {
- INITIALIZED(TOperationState.INITIALIZED_STATE),
- RUNNING(TOperationState.RUNNING_STATE),
- FINISHED(TOperationState.FINISHED_STATE),
- CANCELED(TOperationState.CANCELED_STATE),
- CLOSED(TOperationState.CLOSED_STATE),
- ERROR(TOperationState.ERROR_STATE),
- UNKNOWN(TOperationState.UKNOWN_STATE),
- PENDING(TOperationState.PENDING_STATE);
+ INITIALIZED(TOperationState.INITIALIZED_STATE, false),
+ RUNNING(TOperationState.RUNNING_STATE, false),
+ FINISHED(TOperationState.FINISHED_STATE, true),
+ CANCELED(TOperationState.CANCELED_STATE, true),
+ CLOSED(TOperationState.CLOSED_STATE, true),
+ ERROR(TOperationState.ERROR_STATE, true),
+ UNKNOWN(TOperationState.UKNOWN_STATE, false),
+ PENDING(TOperationState.PENDING_STATE, false);
private final TOperationState tOperationState;
+ private final boolean terminal;
- OperationState(TOperationState tOperationState) {
+ OperationState(TOperationState tOperationState, boolean terminal) {
this.tOperationState = tOperationState;
+ this.terminal = terminal;
}
+ // must be sync with TOperationState in order
public static OperationState getOperationState(TOperationState tOperationState) {
- // TODO: replace this with a Map?
- for (OperationState opState : values()) {
- if (tOperationState.equals(opState.tOperationState)) {
- return opState;
- }
- }
- return OperationState.UNKNOWN;
+ return OperationState.values()[tOperationState.getValue()];
}
public static void validateTransition(OperationState oldState,
@@ -91,7 +88,8 @@ public enum OperationState {
default:
// fall-through
}
- throw new HiveSQLException("Illegal Operation state transition");
+ throw new HiveSQLException("Illegal Operation state transition " +
+ "from " + oldState + " to " + newState);
}
public void validateTransition(OperationState newState)
@@ -102,4 +100,8 @@ public enum OperationState {
public TOperationState toTOperationState() {
return tOperationState;
}
+
+ public boolean isTerminal() {
+ return terminal;
+ }
}
Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/operation/Operation.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/operation/Operation.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/cli/operation/Operation.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/operation/Operation.java Mon Sep 8 04:38:17 2014
@@ -21,6 +21,7 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.util.EnumSet;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -52,14 +53,19 @@ public abstract class Operation {
protected OperationLog operationLog;
protected boolean isOperationLogEnabled;
+ private long operationTimeout;
+ private long lastAccessTime;
+
protected static final EnumSet<FetchOrientation> DEFAULT_FETCH_ORIENTATION_SET =
EnumSet.of(FetchOrientation.FETCH_NEXT,FetchOrientation.FETCH_FIRST);
protected Operation(HiveSession parentSession, OperationType opType, boolean runInBackground) {
- super();
this.parentSession = parentSession;
this.runAsync = runInBackground;
this.opHandle = new OperationHandle(opType, parentSession.getProtocolVersion());
+ lastAccessTime = System.currentTimeMillis();
+ operationTimeout = HiveConf.getTimeVar(parentSession.getHiveConf(),
+ HiveConf.ConfVars.HIVE_SERVER2_IDLE_OPERATION_TIMEOUT, TimeUnit.MILLISECONDS);
}
public Future<?> getBackgroundHandle() {
@@ -111,7 +117,6 @@ public abstract class Operation {
opHandle.setHasResultSet(hasResultSet);
}
-
public OperationLog getOperationLog() {
return operationLog;
}
@@ -119,9 +124,33 @@ public abstract class Operation {
protected final OperationState setState(OperationState newState) throws HiveSQLException {
state.validateTransition(newState);
this.state = newState;
+ this.lastAccessTime = System.currentTimeMillis();
return this.state;
}
+ public boolean isTimedOut(long current) {
+ if (operationTimeout == 0) {
+ return false;
+ }
+ if (operationTimeout > 0) {
+ // check only when it's in terminal state
+ return state.isTerminal() && lastAccessTime + operationTimeout <= current;
+ }
+ return lastAccessTime + -operationTimeout <= current;
+ }
+
+ public long getLastAccessTime() {
+ return lastAccessTime;
+ }
+
+ public long getOperationTimeout() {
+ return operationTimeout;
+ }
+
+ public void setOperationTimeout(long operationTimeout) {
+ this.operationTimeout = operationTimeout;
+ }
+
protected void setOperationException(HiveSQLException operationException) {
this.operationException = operationException;
}
@@ -130,6 +159,7 @@ public abstract class Operation {
if (this.state != state) {
throw new HiveSQLException("Expected state " + state + ", but found " + this.state);
}
+ this.lastAccessTime = System.currentTimeMillis();
}
public boolean isRunning() {
Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java Mon Sep 8 04:38:17 2014
@@ -19,6 +19,7 @@
package org.apache.hive.service.cli.operation;
import java.util.Enumeration;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -155,15 +156,27 @@ public class OperationManager extends Ab
return operation;
}
- public synchronized Operation getOperation(OperationHandle operationHandle)
- throws HiveSQLException {
- Operation operation = handleToOperation.get(operationHandle);
+ public Operation getOperation(OperationHandle operationHandle) throws HiveSQLException {
+ Operation operation = getOperationInternal(operationHandle);
if (operation == null) {
throw new HiveSQLException("Invalid OperationHandle: " + operationHandle);
}
return operation;
}
+ private synchronized Operation getOperationInternal(OperationHandle operationHandle) {
+ return handleToOperation.get(operationHandle);
+ }
+
+ private synchronized Operation removeTimedOutOperation(OperationHandle operationHandle) {
+ Operation operation = handleToOperation.get(operationHandle);
+ if (operation != null && operation.isTimedOut(System.currentTimeMillis())) {
+ handleToOperation.remove(operationHandle);
+ return operation;
+ }
+ return null;
+ }
+
private synchronized void addOperation(Operation operation) {
handleToOperation.put(operation.getHandle(), operation);
}
@@ -252,4 +265,16 @@ public class OperationManager extends Ab
public OperationLog getOperationLogByThread() {
return OperationLog.getCurrentOperationLog();
}
+
+ public List<Operation> removeExpiredOperations(OperationHandle[] handles) {
+ List<Operation> removed = new ArrayList<Operation>();
+ for (OperationHandle handle : handles) {
+ Operation operation = removeTimedOutOperation(handle);
+ if (operation != null) {
+ LOG.warn("Operation " + handle + " is timed-out and will be closed");
+ removed.add(operation);
+ }
+ }
+ return removed;
+ }
}
Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/HiveSession.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/HiveSession.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/HiveSession.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/HiveSession.java Mon Sep 8 04:38:17 2014
@@ -27,9 +27,9 @@ import org.apache.hive.service.cli.*;
public interface HiveSession extends HiveSessionBase {
- public void open();
+ void open();
- public IMetaStoreClient getMetaStoreClient() throws HiveSQLException;
+ IMetaStoreClient getMetaStoreClient() throws HiveSQLException;
/**
* getInfo operation handler
@@ -37,7 +37,7 @@ public interface HiveSession extends Hiv
* @return
* @throws HiveSQLException
*/
- public GetInfoValue getInfo(GetInfoType getInfoType) throws HiveSQLException;
+ GetInfoValue getInfo(GetInfoType getInfoType) throws HiveSQLException;
/**
* execute operation handler
@@ -46,7 +46,7 @@ public interface HiveSession extends Hiv
* @return
* @throws HiveSQLException
*/
- public OperationHandle executeStatement(String statement,
+ OperationHandle executeStatement(String statement,
Map<String, String> confOverlay) throws HiveSQLException;
/**
@@ -56,7 +56,7 @@ public interface HiveSession extends Hiv
* @return
* @throws HiveSQLException
*/
- public OperationHandle executeStatementAsync(String statement,
+ OperationHandle executeStatementAsync(String statement,
Map<String, String> confOverlay) throws HiveSQLException;
/**
@@ -64,14 +64,14 @@ public interface HiveSession extends Hiv
* @return
* @throws HiveSQLException
*/
- public OperationHandle getTypeInfo() throws HiveSQLException;
+ OperationHandle getTypeInfo() throws HiveSQLException;
/**
* getCatalogs operation handler
* @return
* @throws HiveSQLException
*/
- public OperationHandle getCatalogs() throws HiveSQLException;
+ OperationHandle getCatalogs() throws HiveSQLException;
/**
* getSchemas operation handler
@@ -80,7 +80,7 @@ public interface HiveSession extends Hiv
* @return
* @throws HiveSQLException
*/
- public OperationHandle getSchemas(String catalogName, String schemaName)
+ OperationHandle getSchemas(String catalogName, String schemaName)
throws HiveSQLException;
/**
@@ -92,7 +92,7 @@ public interface HiveSession extends Hiv
* @return
* @throws HiveSQLException
*/
- public OperationHandle getTables(String catalogName, String schemaName,
+ OperationHandle getTables(String catalogName, String schemaName,
String tableName, List<String> tableTypes) throws HiveSQLException;
/**
@@ -100,7 +100,7 @@ public interface HiveSession extends Hiv
* @return
* @throws HiveSQLException
*/
- public OperationHandle getTableTypes() throws HiveSQLException ;
+ OperationHandle getTableTypes() throws HiveSQLException ;
/**
* getColumns operation handler
@@ -111,7 +111,7 @@ public interface HiveSession extends Hiv
* @return
* @throws HiveSQLException
*/
- public OperationHandle getColumns(String catalogName, String schemaName,
+ OperationHandle getColumns(String catalogName, String schemaName,
String tableName, String columnName) throws HiveSQLException;
/**
@@ -122,31 +122,33 @@ public interface HiveSession extends Hiv
* @return
* @throws HiveSQLException
*/
- public OperationHandle getFunctions(String catalogName, String schemaName,
+ OperationHandle getFunctions(String catalogName, String schemaName,
String functionName) throws HiveSQLException;
/**
* close the session
* @throws HiveSQLException
*/
- public void close() throws HiveSQLException;
+ void close() throws HiveSQLException;
- public void cancelOperation(OperationHandle opHandle) throws HiveSQLException;
+ void cancelOperation(OperationHandle opHandle) throws HiveSQLException;
- public void closeOperation(OperationHandle opHandle) throws HiveSQLException;
+ void closeOperation(OperationHandle opHandle) throws HiveSQLException;
- public TableSchema getResultSetMetadata(OperationHandle opHandle)
+ TableSchema getResultSetMetadata(OperationHandle opHandle)
throws HiveSQLException;
- public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation,
+ RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation,
long maxRows, FetchType fetchType) throws HiveSQLException;
- public String getDelegationToken(HiveAuthFactory authFactory, String owner,
+ String getDelegationToken(HiveAuthFactory authFactory, String owner,
String renewer) throws HiveSQLException;
- public void cancelDelegationToken(HiveAuthFactory authFactory, String tokenStr)
+ void cancelDelegationToken(HiveAuthFactory authFactory, String tokenStr)
throws HiveSQLException;
- public void renewDelegationToken(HiveAuthFactory authFactory, String tokenStr)
+ void renewDelegationToken(HiveAuthFactory authFactory, String tokenStr)
throws HiveSQLException;
+
+ void closeExpiredOperations();
}
Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java Mon Sep 8 04:38:17 2014
@@ -18,6 +18,8 @@
package org.apache.hive.service.cli.session;
+import java.util.Map;
+
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hive.service.cli.SessionHandle;
@@ -92,4 +94,6 @@ public interface HiveSessionBase {
String getIpAddress();
void setIpAddress(String ipAddress);
+
+ long getLastAccessTime();
}
Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java Mon Sep 8 04:38:17 2014
@@ -54,6 +54,7 @@ import org.apache.hive.service.cli.opera
import org.apache.hive.service.cli.operation.GetTableTypesOperation;
import org.apache.hive.service.cli.operation.GetTypeInfoOperation;
import org.apache.hive.service.cli.operation.MetadataOperation;
+import org.apache.hive.service.cli.operation.Operation;
import org.apache.hive.service.cli.operation.OperationManager;
import org.apache.hive.service.cli.thrift.TProtocolVersion;
import org.apache.hive.service.server.ThreadWithGarbageCleanup;
@@ -68,7 +69,7 @@ public class HiveSessionImpl implements
private String username;
private final String password;
- private final HiveConf hiveConf;
+ private HiveConf hiveConf;
private final SessionState sessionState;
private String ipAddress;
@@ -84,6 +85,8 @@ public class HiveSessionImpl implements
private boolean isOperationLogEnabled;
private File sessionLogDir;
+ private volatile long lastAccessTime;
+
public HiveSessionImpl(TProtocolVersion protocol, String username, String password,
HiveConf serverhiveConf, String ipAddress) {
this.username = username;
@@ -108,6 +111,8 @@ public class HiveSessionImpl implements
sessionState = new SessionState(hiveConf, username);
sessionState.setUserIpAddress(ipAddress);
sessionState.setIsHiveServerQuery(true);
+
+ lastAccessTime = System.currentTimeMillis();
SessionState.start(sessionState);
}
@@ -115,6 +120,15 @@ public class HiveSessionImpl implements
public void initialize(Map<String, String> sessionConfMap) throws Exception {
// Process global init file: .hiverc
processGlobalInitFile();
+ try {
+ sessionState.reloadAuxJars();
+ } catch (IOException e) {
+ String msg = "fail to load reloadable jar file path" + e;
+ LOG.error(msg, e);
+ throw new Exception(msg, e);
+ }
+ SessionState.setCurrentSessionState(sessionState);
+
// Set conf properties specified by user from client side
if (sessionConfMap != null) {
configureSession(sessionConfMap);
@@ -235,14 +249,23 @@ public class HiveSessionImpl implements
}
@Override
+ /**
+ * Opens a new HiveServer2 session for the client connection.
+ * Note that if doAs is true, this call goes through a proxy object,
+ * which wraps the method logic in a UserGroupInformation#doAs.
+ * That is why it is important to call SessionState#start here rather than the constructor.
+ */
public void open() {
SessionState.start(sessionState);
}
- protected synchronized void acquire() throws HiveSQLException {
+ protected synchronized void acquire(boolean userAccess) {
// Need to make sure that the this HiveServer2's session's session state is
// stored in the thread local for the handler thread.
SessionState.setCurrentSessionState(sessionState);
+ if (userAccess) {
+ lastAccessTime = System.currentTimeMillis();
+ }
}
/**
@@ -252,14 +275,16 @@ public class HiveSessionImpl implements
* when this thread is garbage collected later.
* @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize()
*/
- protected synchronized void release() {
- assert sessionState != null;
+ protected synchronized void release(boolean userAccess) {
SessionState.detachSession();
if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) {
ThreadWithGarbageCleanup currentThread =
(ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread();
currentThread.cacheThreadLocalRawStore();
}
+ if (userAccess) {
+ lastAccessTime = System.currentTimeMillis();
+ }
}
@Override
@@ -298,7 +323,7 @@ public class HiveSessionImpl implements
@Override
public GetInfoValue getInfo(GetInfoType getInfoType)
throws HiveSQLException {
- acquire();
+ acquire(true);
try {
switch (getInfoType) {
case CLI_SERVER_NAME:
@@ -318,7 +343,7 @@ public class HiveSessionImpl implements
throw new HiveSQLException("Unrecognized GetInfoType value: " + getInfoType.toString());
}
} finally {
- release();
+ release(true);
}
}
@@ -337,7 +362,7 @@ public class HiveSessionImpl implements
private OperationHandle executeStatementInternal(String statement, Map<String, String> confOverlay,
boolean runAsync)
throws HiveSQLException {
- acquire();
+ acquire(true);
OperationManager operationManager = getOperationManager();
ExecuteStatementOperation operation = operationManager
@@ -355,14 +380,14 @@ public class HiveSessionImpl implements
}
throw e;
} finally {
- release();
+ release(true);
}
}
@Override
public OperationHandle getTypeInfo()
throws HiveSQLException {
- acquire();
+ acquire(true);
OperationManager operationManager = getOperationManager();
GetTypeInfoOperation operation = operationManager.newGetTypeInfoOperation(getSession());
@@ -375,14 +400,14 @@ public class HiveSessionImpl implements
operationManager.closeOperation(opHandle);
throw e;
} finally {
- release();
+ release(true);
}
}
@Override
public OperationHandle getCatalogs()
throws HiveSQLException {
- acquire();
+ acquire(true);
OperationManager operationManager = getOperationManager();
GetCatalogsOperation operation = operationManager.newGetCatalogsOperation(getSession());
@@ -395,14 +420,14 @@ public class HiveSessionImpl implements
operationManager.closeOperation(opHandle);
throw e;
} finally {
- release();
+ release(true);
}
}
@Override
public OperationHandle getSchemas(String catalogName, String schemaName)
throws HiveSQLException {
- acquire();
+ acquire(true);
OperationManager operationManager = getOperationManager();
GetSchemasOperation operation =
@@ -416,7 +441,7 @@ public class HiveSessionImpl implements
operationManager.closeOperation(opHandle);
throw e;
} finally {
- release();
+ release(true);
}
}
@@ -424,7 +449,7 @@ public class HiveSessionImpl implements
public OperationHandle getTables(String catalogName, String schemaName, String tableName,
List<String> tableTypes)
throws HiveSQLException {
- acquire();
+ acquire(true);
OperationManager operationManager = getOperationManager();
MetadataOperation operation =
@@ -438,14 +463,14 @@ public class HiveSessionImpl implements
operationManager.closeOperation(opHandle);
throw e;
} finally {
- release();
+ release(true);
}
}
@Override
public OperationHandle getTableTypes()
throws HiveSQLException {
- acquire();
+ acquire(true);
OperationManager operationManager = getOperationManager();
GetTableTypesOperation operation = operationManager.newGetTableTypesOperation(getSession());
@@ -458,14 +483,14 @@ public class HiveSessionImpl implements
operationManager.closeOperation(opHandle);
throw e;
} finally {
- release();
+ release(true);
}
}
@Override
public OperationHandle getColumns(String catalogName, String schemaName,
String tableName, String columnName) throws HiveSQLException {
- acquire();
+ acquire(true);
OperationManager operationManager = getOperationManager();
GetColumnsOperation operation = operationManager.newGetColumnsOperation(getSession(),
@@ -479,14 +504,14 @@ public class HiveSessionImpl implements
operationManager.closeOperation(opHandle);
throw e;
} finally {
- release();
+ release(true);
}
}
@Override
public OperationHandle getFunctions(String catalogName, String schemaName, String functionName)
throws HiveSQLException {
- acquire();
+ acquire(true);
OperationManager operationManager = getOperationManager();
GetFunctionsOperation operation = operationManager
@@ -500,14 +525,14 @@ public class HiveSessionImpl implements
operationManager.closeOperation(opHandle);
throw e;
} finally {
- release();
+ release(true);
}
}
@Override
public void close() throws HiveSQLException {
try {
- acquire();
+ acquire(true);
/**
* For metadata operations like getTables(), getColumns() etc,
* the session allocates a private metastore handler which should be
@@ -532,7 +557,7 @@ public class HiveSessionImpl implements
} catch (IOException ioe) {
throw new HiveSQLException("Failure to close", ioe);
} finally {
- release();
+ release(true);
}
}
@@ -562,50 +587,79 @@ public class HiveSessionImpl implements
}
@Override
+ public long getLastAccessTime() {
+ return lastAccessTime;
+ }
+
+ @Override
+ public void closeExpiredOperations() {
+ OperationHandle[] handles = opHandleSet.toArray(new OperationHandle[opHandleSet.size()]);
+ if (handles.length > 0) {
+ List<Operation> operations = operationManager.removeExpiredOperations(handles);
+ if (!operations.isEmpty()) {
+ closeTimedOutOperations(operations);
+ }
+ }
+ }
+
+ private void closeTimedOutOperations(List<Operation> operations) {
+ acquire(false);
+ try {
+ for (Operation operation : operations) {
+ opHandleSet.remove(operation.getHandle());
+ try {
+ operation.close();
+ } catch (Exception e) {
+ LOG.warn("Exception is thrown closing timed-out operation " + operation.getHandle(), e);
+ }
+ }
+ } finally {
+ release(false);
+ }
+ }
+
+ @Override
public void cancelOperation(OperationHandle opHandle) throws HiveSQLException {
- acquire();
+ acquire(true);
try {
sessionManager.getOperationManager().cancelOperation(opHandle);
} finally {
- release();
+ release(true);
}
}
@Override
public void closeOperation(OperationHandle opHandle) throws HiveSQLException {
- acquire();
+ acquire(true);
try {
operationManager.closeOperation(opHandle);
opHandleSet.remove(opHandle);
} finally {
- release();
+ release(true);
}
}
@Override
public TableSchema getResultSetMetadata(OperationHandle opHandle) throws HiveSQLException {
- acquire();
+ acquire(true);
try {
return sessionManager.getOperationManager().getOperationResultSetSchema(opHandle);
} finally {
- release();
+ release(true);
}
}
@Override
public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation,
long maxRows, FetchType fetchType) throws HiveSQLException {
- acquire();
+ acquire(true);
try {
if (fetchType == FetchType.QUERY_OUTPUT) {
- return sessionManager.getOperationManager()
- .getOperationNextRowSet(opHandle, orientation, maxRows);
- } else {
- return sessionManager.getOperationManager()
- .getOperationLogRowSet(opHandle, orientation, maxRows);
+ return operationManager.getOperationNextRowSet(opHandle, orientation, maxRows);
}
+ return operationManager.getOperationLogRowSet(opHandle, orientation, maxRows);
} finally {
- release();
+ release(true);
}
}
Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java Mon Sep 8 04:38:17 2014
@@ -19,7 +19,6 @@
package org.apache.hive.service.cli.session;
import java.io.IOException;
-import java.util.Map;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.metadata.Hive;
@@ -83,8 +82,8 @@ public class HiveSessionImplwithUGI exte
}
@Override
- protected synchronized void acquire() throws HiveSQLException {
- super.acquire();
+ protected synchronized void acquire(boolean userAccess) {
+ super.acquire(userAccess);
// if we have a metastore connection with impersonation, then set it first
if (sessionHive != null) {
Hive.set(sessionHive);
@@ -98,11 +97,11 @@ public class HiveSessionImplwithUGI exte
@Override
public void close() throws HiveSQLException {
try {
- acquire();
+ acquire(true);
ShimLoader.getHadoopShims().closeAllForUGI(sessionUgi);
cancelDelegationToken();
} finally {
- release();
+ release(true);
super.close();
}
}
Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/SessionManager.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/SessionManager.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/SessionManager.java Mon Sep 8 04:38:17 2014
@@ -20,6 +20,8 @@ package org.apache.hive.service.cli.sess
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -59,6 +61,11 @@ public class SessionManager extends Comp
private boolean isOperationLogEnabled;
private File operationLogRootDir;
+ private long checkInterval;
+ private long sessionTimeout;
+
+ private volatile boolean shutdown;
+
public SessionManager() {
super("SessionManager");
}
@@ -81,20 +88,28 @@ public class SessionManager extends Comp
}
private void createBackgroundOperationPool() {
- int backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS);
- LOG.info("HiveServer2: Background operation thread pool size: " + backgroundPoolSize);
- int backgroundPoolQueueSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_WAIT_QUEUE_SIZE);
- LOG.info("HiveServer2: Background operation thread wait queue size: " + backgroundPoolQueueSize);
- int keepAliveTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME);
- LOG.info("HiveServer2: Background operation thread keepalive time: " + keepAliveTime);
- // Create a thread pool with #backgroundPoolSize threads
+ int poolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS);
+ LOG.info("HiveServer2: Background operation thread pool size: " + poolSize);
+ int poolQueueSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_WAIT_QUEUE_SIZE);
+ LOG.info("HiveServer2: Background operation thread wait queue size: " + poolQueueSize);
+ long keepAliveTime = HiveConf.getTimeVar(
+ hiveConf, ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME, TimeUnit.SECONDS);
+ LOG.info(
+ "HiveServer2: Background operation thread keepalive time: " + keepAliveTime + " seconds");
+
+ // Create a thread pool with #poolSize threads
// Threads terminate when they are idle for more than the keepAliveTime
- // A bounded blocking queue is used to queue incoming operations, if #operations > backgroundPoolSize
+ // A bounded blocking queue is used to queue incoming operations, if #operations > poolSize
String threadPoolName = "HiveServer2-Background-Pool";
- backgroundOperationPool = new ThreadPoolExecutor(backgroundPoolSize, backgroundPoolSize,
- keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(backgroundPoolQueueSize),
+ backgroundOperationPool = new ThreadPoolExecutor(poolSize, poolSize,
+ keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(poolQueueSize),
new ThreadFactoryWithGarbageCleanup(threadPoolName));
backgroundOperationPool.allowCoreThreadTimeOut(true);
+
+ checkInterval = HiveConf.getTimeVar(
+ hiveConf, ConfVars.HIVE_SERVER2_SESSION_CHECK_INTERVAL, TimeUnit.MILLISECONDS);
+ sessionTimeout = HiveConf.getTimeVar(
+ hiveConf, ConfVars.HIVE_SERVER2_IDLE_SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
}
private void applyAuthorizationConfigPolicy(HiveConf newHiveConf) throws HiveException {
@@ -139,20 +154,61 @@ public class SessionManager extends Comp
@Override
public synchronized void start() {
super.start();
+ if (checkInterval > 0) {
+ startTimeoutChecker();
+ }
+ }
+
+ private void startTimeoutChecker() {
+ final long interval = Math.max(checkInterval, 3000l); // minimum 3 seconds
+ Runnable timeoutChecker = new Runnable() {
+ @Override
+ public void run() {
+ for (sleepInterval(interval); !shutdown; sleepInterval(interval)) {
+ long current = System.currentTimeMillis();
+ for (HiveSession session : new ArrayList<HiveSession>(handleToSession.values())) {
+ if (sessionTimeout > 0 && session.getLastAccessTime() + sessionTimeout <= current) {
+ SessionHandle handle = session.getSessionHandle();
+ LOG.warn("Session " + handle + " is Timed-out (last access : " +
+ new Date(session.getLastAccessTime()) + ") and will be closed");
+ try {
+ closeSession(handle);
+ } catch (HiveSQLException e) {
+ LOG.warn("Exception is thrown closing session " + handle, e);
+ }
+ } else {
+ session.closeExpiredOperations();
+ }
+ }
+ }
+ }
+
+ private void sleepInterval(long interval) {
+ try {
+ Thread.sleep(interval);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ };
+ backgroundOperationPool.execute(timeoutChecker);
}
@Override
public synchronized void stop() {
super.stop();
+ shutdown = true;
if (backgroundOperationPool != null) {
backgroundOperationPool.shutdown();
- int timeout = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT);
+ long timeout = hiveConf.getTimeVar(
+ ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
try {
backgroundOperationPool.awaitTermination(timeout, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOG.warn("HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT = " + timeout +
" seconds has been exceeded. RUNNING background operations will be shut down", e);
}
+ backgroundOperationPool = null;
}
cleanupLoggingRootDir();
}
@@ -177,11 +233,13 @@ public class SessionManager extends Comp
Map<String, String> sessionConf, boolean withImpersonation, String delegationToken)
throws HiveSQLException {
HiveSession session;
+ // If doAs is set to true for HiveServer2, we will create a proxy object for the session impl.
+ // Within the proxy object, we wrap the method call in a UserGroupInformation#doAs
if (withImpersonation) {
- HiveSessionImplwithUGI hiveSessionUgi = new HiveSessionImplwithUGI(protocol, username, password,
- hiveConf, ipAddress, delegationToken);
- session = HiveSessionProxy.getProxy(hiveSessionUgi, hiveSessionUgi.getSessionUgi());
- hiveSessionUgi.setProxySession(session);
+ HiveSessionImplwithUGI sessionWithUGI = new HiveSessionImplwithUGI(protocol, username, password,
+ hiveConf, ipAddress, delegationToken);
+ session = HiveSessionProxy.getProxy(sessionWithUGI, sessionWithUGI.getSessionUgi());
+ sessionWithUGI.setProxySession(session);
} else {
session = new HiveSessionImpl(protocol, username, password, hiveConf, ipAddress);
}
Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java Mon Sep 8 04:38:17 2014
@@ -70,7 +70,8 @@ public class ThriftBinaryCLIService exte
minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS);
maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS);
- workerKeepAliveTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME);
+ workerKeepAliveTime = hiveConf.getTimeVar(
+ ConfVars.HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME, TimeUnit.SECONDS);
String threadPoolName = "HiveServer2-Handler-Pool";
ExecutorService executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads,
workerKeepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java Mon Sep 8 04:38:17 2014
@@ -61,7 +61,7 @@ public abstract class ThriftCLIService e
protected int minWorkerThreads;
protected int maxWorkerThreads;
- protected int workerKeepAliveTime;
+ protected long workerKeepAliveTime;
protected static HiveAuthFactory hiveAuthFactory;
Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java Mon Sep 8 04:38:17 2014
@@ -69,7 +69,8 @@ public class ThriftHttpCLIService extend
minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MIN_WORKER_THREADS);
maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_WORKER_THREADS);
- workerKeepAliveTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME);
+ workerKeepAliveTime = hiveConf.getTimeVar(
+ ConfVars.HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME, TimeUnit.SECONDS);
String httpPath = getHttpPath(hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH));
@@ -110,7 +111,8 @@ public class ThriftHttpCLIService extend
// Linux:yes, Windows:no
connector.setReuseAddress(!Shell.WINDOWS);
- int maxIdleTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME);
+ int maxIdleTime = (int) hiveConf.getTimeVar(
+ ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME, TimeUnit.MILLISECONDS);
connector.setMaxIdleTime(maxIdleTime);
httpServer.addConnector(connector);
Modified: hive/branches/spark/service/src/java/org/apache/hive/service/server/HiveServer2.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/server/HiveServer2.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/server/HiveServer2.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/server/HiveServer2.java Mon Sep 8 04:38:17 2014
@@ -144,6 +144,7 @@ public class HiveServer2 extends Composi
}
public static void main(String[] args) {
+ HiveConf.setLoadHiveServer2Config(true);
try {
ServerOptionsProcessor oproc = new ServerOptionsProcessor("hiveserver2");
if (!oproc.process(args)) {
Modified: hive/branches/spark/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java (original)
+++ hive/branches/spark/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java Mon Sep 8 04:38:17 2014
@@ -26,9 +26,9 @@ import static org.junit.Assert.fail;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -202,7 +202,8 @@ public abstract class CLIServiceTest {
* to give a compile time error.
* (compilation is done synchronous as of now)
*/
- longPollingTimeout = new HiveConf().getLongVar(ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT);
+ longPollingTimeout = HiveConf.getTimeVar(new HiveConf(),
+ HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT, TimeUnit.MILLISECONDS);
queryString = "SELECT NON_EXISTING_COLUMN FROM " + tableName;
try {
runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.ERROR, longPollingTimeout);
@@ -295,7 +296,7 @@ public abstract class CLIServiceTest {
long longPollingTimeDelta;
OperationStatus opStatus = null;
OperationState state = null;
- confOverlay.put(HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT.varname, String.valueOf(longPollingTimeout));
+ confOverlay.put(HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT.varname, longPollingTimeout + "ms");
OperationHandle opHandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay);
int count = 0;
while (true) {
Modified: hive/branches/spark/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java (original)
+++ hive/branches/spark/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java Mon Sep 8 04:38:17 2014
@@ -55,6 +55,7 @@ import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.ProxyFileSystem;
import org.apache.hadoop.fs.Trash;
import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
import org.apache.hadoop.io.LongWritable;
@@ -895,4 +896,14 @@ public class Hadoop20Shims implements Ha
// No password API, just retrieve value from conf
return conf.get(name);
}
+
+ @Override
+ public boolean supportStickyBit() {
+ return false;
+ }
+
+ @Override
+ public boolean hasStickyBit(FsPermission permission) {
+ return false; // not supported
+ }
}
Modified: hive/branches/spark/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java (original)
+++ hive/branches/spark/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java Mon Sep 8 04:38:17 2014
@@ -40,6 +40,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.ProxyFileSystem;
import org.apache.hadoop.fs.Trash;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapred.ClusterStatus;
@@ -513,4 +514,14 @@ public class Hadoop20SShims extends Hado
// No password API, just retrieve value from conf
return conf.get(name);
}
+
+ @Override
+ public boolean supportStickyBit() {
+ return false;
+ }
+
+ @Override
+ public boolean hasStickyBit(FsPermission permission) {
+ return false; // not supported
+ }
}
Modified: hive/branches/spark/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java (original)
+++ hive/branches/spark/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java Mon Sep 8 04:38:17 2014
@@ -19,21 +19,17 @@ package org.apache.hadoop.hive.shims;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URI;
import java.security.AccessControlException;
-import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
@@ -302,6 +298,7 @@ public class Hadoop23Shims extends Hadoo
mr = new MiniTezCluster("hive", numberOfTaskTrackers);
conf.set("fs.defaultFS", nameNode);
+ conf.set("tez.am.log.level", "DEBUG");
conf.set(MRJobConfig.MR_AM_STAGING_DIR, "/apps_staging_dir");
mr.init(conf);
mr.start();
@@ -816,4 +813,14 @@ public class Hadoop23Shims extends Hadoo
}
}
}
+
+ @Override
+ public boolean supportStickyBit() {
+ return true;
+ }
+
+ @Override
+ public boolean hasStickyBit(FsPermission permission) {
+ return permission.getStickyBit();
+ }
}
Modified: hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java (original)
+++ hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java Mon Sep 8 04:38:17 2014
@@ -24,6 +24,7 @@ import java.net.InetAddress;
import java.net.Socket;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
+import java.util.Locale;
import java.util.Map;
import javax.security.auth.callback.Callback;
@@ -79,11 +80,23 @@ public class HadoopThriftAuthBridge20S e
}
@Override
- public Client createClientWithConf(String authType) {
- Configuration conf = new Configuration();
- conf.set(HADOOP_SECURITY_AUTHENTICATION, authType);
- UserGroupInformation.setConfiguration(conf);
- return new Client();
+ public Client createClientWithConf(String authMethod) {
+ UserGroupInformation ugi;
+ try {
+ ugi = UserGroupInformation.getLoginUser();
+ } catch(IOException e) {
+ throw new IllegalStateException("Unable to get current login user: " + e, e);
+ }
+ if (loginUserHasCurrentAuthMethod(ugi, authMethod)) {
+ LOG.debug("Not setting UGI conf as passed-in authMethod of " + authMethod + " = current.");
+ return new Client();
+ } else {
+ LOG.debug("Setting UGI conf as passed-in authMethod of " + authMethod + " != current.");
+ Configuration conf = new Configuration();
+ conf.set(HADOOP_SECURITY_AUTHENTICATION, authMethod);
+ UserGroupInformation.setConfiguration(conf);
+ return new Client();
+ }
}
@Override
@@ -105,15 +118,48 @@ public class HadoopThriftAuthBridge20S e
}
@Override
- public UserGroupInformation getCurrentUGIWithConf(String authType)
+ public UserGroupInformation getCurrentUGIWithConf(String authMethod)
throws IOException {
- Configuration conf = new Configuration();
- conf.set(HADOOP_SECURITY_AUTHENTICATION, authType);
- UserGroupInformation.setConfiguration(conf);
- return UserGroupInformation.getCurrentUser();
+ UserGroupInformation ugi;
+ try {
+ ugi = UserGroupInformation.getCurrentUser();
+ } catch(IOException e) {
+ throw new IllegalStateException("Unable to get current user: " + e, e);
+ }
+ if (loginUserHasCurrentAuthMethod(ugi, authMethod)) {
+ LOG.debug("Not setting UGI conf as passed-in authMethod of " + authMethod + " = current.");
+ return ugi;
+ } else {
+ LOG.debug("Setting UGI conf as passed-in authMethod of " + authMethod + " != current.");
+ Configuration conf = new Configuration();
+ conf.set(HADOOP_SECURITY_AUTHENTICATION, authMethod);
+ UserGroupInformation.setConfiguration(conf);
+ return UserGroupInformation.getCurrentUser();
+ }
}
/**
+ * Return true if the current login user is already using the given authMethod.
+ *
+ * Used above to ensure we do not create a new Configuration object and as such
+ * lose other settings such as the cluster to which the JVM is connected. Required
+ * for oozie since it does not have a core-site.xml see HIVE-7682
+ */
+ private boolean loginUserHasCurrentAuthMethod(UserGroupInformation ugi, String sAuthMethod) {
+ AuthenticationMethod authMethod;
+ try {
+ // based on SecurityUtil.getAuthenticationMethod()
+ authMethod = Enum.valueOf(AuthenticationMethod.class, sAuthMethod.toUpperCase(Locale.ENGLISH));
+ } catch (IllegalArgumentException iae) {
+ throw new IllegalArgumentException("Invalid attribute value for " +
+ HADOOP_SECURITY_AUTHENTICATION + " of " + sAuthMethod, iae);
+ }
+ LOG.debug("Current authMethod = " + ugi.getAuthenticationMethod());
+ return ugi.getAuthenticationMethod().equals(authMethod);
+ }
+
+
+ /**
* Read and return Hadoop SASL configuration which can be configured using
* "hadoop.rpc.protection"
* @param conf
Modified: hive/branches/spark/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java (original)
+++ hive/branches/spark/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java Mon Sep 8 04:38:17 2014
@@ -44,6 +44,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.InputSplit;
@@ -694,4 +695,16 @@ public interface HadoopShims {
*/
public String getPassword(Configuration conf, String name) throws IOException;
+ /**
+ * check whether current hadoop supports sticky bit
+ * @return
+ */
+ boolean supportStickyBit();
+
+ /**
+ * Check stick bit in the permission
+ * @param permission
+ * @return sticky bit
+ */
+ boolean hasStickyBit(FsPermission permission);
}