You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/09/08 06:38:26 UTC

svn commit: r1623263 [28/28] - in /hive/branches/spark: ./ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/ ant/src/org/apache/hadoop/hive/ant/ beeline/src/java/org/apache/hive/beeline/ beeline/src/test/org/apache/hive/beeline/ bin/...

Modified: hive/branches/spark/service/src/java/org/apache/hive/service/auth/PlainSaslHelper.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/auth/PlainSaslHelper.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/auth/PlainSaslHelper.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/auth/PlainSaslHelper.java Mon Sep  8 04:38:17 2014
@@ -18,8 +18,8 @@
 package org.apache.hive.service.auth;
 
 import java.io.IOException;
+import java.security.Security;
 import java.util.HashMap;
-
 import javax.security.auth.callback.Callback;
 import javax.security.auth.callback.CallbackHandler;
 import javax.security.auth.callback.NameCallback;
@@ -30,10 +30,8 @@ import javax.security.sasl.Authenticatio
 import javax.security.sasl.AuthorizeCallback;
 import javax.security.sasl.SaslException;
 
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hive.service.auth.PlainSaslServer.SaslPlainProvider;
 import org.apache.hive.service.auth.AuthenticationProviderFactory.AuthMethods;
-import org.apache.hive.service.cli.thrift.TCLIService;
+import org.apache.hive.service.auth.PlainSaslServer.SaslPlainProvider;
 import org.apache.hive.service.cli.thrift.TCLIService.Iface;
 import org.apache.hive.service.cli.thrift.ThriftCLIService;
 import org.apache.thrift.TProcessor;
@@ -43,78 +41,108 @@ import org.apache.thrift.transport.TSasl
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportFactory;
 
-public class PlainSaslHelper {
+public final class PlainSaslHelper {
+
+  public static TProcessorFactory getPlainProcessorFactory(ThriftCLIService service) {
+    return new SQLPlainProcessorFactory(service);
+  }
+
+  // Register Plain SASL server provider
+  static {
+    Security.addProvider(new SaslPlainProvider());
+  }
+
+  public static TTransportFactory getPlainTransportFactory(String authTypeStr)
+    throws LoginException {
+    TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
+    try {
+      saslFactory.addServerDefinition("PLAIN", authTypeStr, null, new HashMap<String, String>(),
+        new PlainServerCallbackHandler(authTypeStr));
+    } catch (AuthenticationException e) {
+      throw new LoginException("Error setting callback handler" + e);
+    }
+    return saslFactory;
+  }
+
+  public static TTransport getPlainTransport(String username, String password,
+    TTransport underlyingTransport) throws SaslException {
+    return new TSaslClientTransport("PLAIN", null, null, null, new HashMap<String, String>(),
+      new PlainCallbackHandler(username, password), underlyingTransport);
+  }
+
+  private PlainSaslHelper() {
+    throw new UnsupportedOperationException("Can't initialize class");
+  }
+
+  private static final class PlainServerCallbackHandler implements CallbackHandler {
 
-  private static class PlainServerCallbackHandler implements CallbackHandler {
     private final AuthMethods authMethod;
-    public PlainServerCallbackHandler(String authMethodStr) throws AuthenticationException {
+
+    PlainServerCallbackHandler(String authMethodStr) throws AuthenticationException {
       authMethod = AuthMethods.getValidAuthMethod(authMethodStr);
     }
 
     @Override
     public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
-      String userName = null;
-      String passWord = null;
+      String username = null;
+      String password = null;
       AuthorizeCallback ac = null;
 
-      for (int i = 0; i < callbacks.length; i++) {
-        if (callbacks[i] instanceof NameCallback) {
-          NameCallback nc = (NameCallback)callbacks[i];
-          userName = nc.getName();
-        } else if (callbacks[i] instanceof PasswordCallback) {
-          PasswordCallback pc = (PasswordCallback)callbacks[i];
-          passWord = new String(pc.getPassword());
-        } else if (callbacks[i] instanceof AuthorizeCallback) {
-          ac = (AuthorizeCallback) callbacks[i];
+      for (Callback callback : callbacks) {
+        if (callback instanceof NameCallback) {
+          NameCallback nc = (NameCallback) callback;
+          username = nc.getName();
+        } else if (callback instanceof PasswordCallback) {
+          PasswordCallback pc = (PasswordCallback) callback;
+          password = new String(pc.getPassword());
+        } else if (callback instanceof AuthorizeCallback) {
+          ac = (AuthorizeCallback) callback;
         } else {
-          throw new UnsupportedCallbackException(callbacks[i]);
+          throw new UnsupportedCallbackException(callback);
         }
       }
       PasswdAuthenticationProvider provider =
-            AuthenticationProviderFactory.getAuthenticationProvider(authMethod);
-      provider.Authenticate(userName, passWord);
+        AuthenticationProviderFactory.getAuthenticationProvider(authMethod);
+      provider.Authenticate(username, password);
       if (ac != null) {
         ac.setAuthorized(true);
       }
     }
   }
 
-  public static class PlainClientbackHandler implements CallbackHandler {
+  public static class PlainCallbackHandler implements CallbackHandler {
 
-    private final String userName;
-    private final String passWord;
+    private final String username;
+    private final String password;
 
-    public PlainClientbackHandler (String userName, String passWord) {
-      this.userName = userName;
-      this.passWord = passWord;
+    public PlainCallbackHandler(String username, String password) {
+      this.username = username;
+      this.password = password;
     }
 
     @Override
-    public void handle(Callback[] callbacks)
-          throws IOException, UnsupportedCallbackException {
-      AuthorizeCallback ac = null;
-      for (int i = 0; i < callbacks.length; i++) {
-        if (callbacks[i] instanceof NameCallback) {
-          NameCallback nameCallback = (NameCallback)callbacks[i];
-          nameCallback.setName(userName);
-        } else if (callbacks[i] instanceof PasswordCallback) {
-          PasswordCallback passCallback = (PasswordCallback) callbacks[i];
-          passCallback.setPassword(passWord.toCharArray());
+    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+      for (Callback callback : callbacks) {
+        if (callback instanceof NameCallback) {
+          NameCallback nameCallback = (NameCallback) callback;
+          nameCallback.setName(username);
+        } else if (callback instanceof PasswordCallback) {
+          PasswordCallback passCallback = (PasswordCallback) callback;
+          passCallback.setPassword(password.toCharArray());
         } else {
-          throw new UnsupportedCallbackException(callbacks[i]);
+          throw new UnsupportedCallbackException(callback);
         }
       }
     }
   }
 
-  private static class SQLPlainProcessorFactory extends TProcessorFactory {
+  private static final class SQLPlainProcessorFactory extends TProcessorFactory {
+
     private final ThriftCLIService service;
-    private final HiveConf conf;
 
-    public SQLPlainProcessorFactory(ThriftCLIService service) {
+    SQLPlainProcessorFactory(ThriftCLIService service) {
       super(null);
       this.service = service;
-      this.conf = service.getHiveConf();
     }
 
     @Override
@@ -123,33 +151,4 @@ public class PlainSaslHelper {
     }
   }
 
-  public static TProcessorFactory getPlainProcessorFactory(ThriftCLIService service) {
-    return new SQLPlainProcessorFactory(service);
-  }
-
-  // Register Plain SASL server provider
-  static {
-    java.security.Security.addProvider(new SaslPlainProvider());
-  }
-
-  public static TTransportFactory getPlainTransportFactory(String authTypeStr)
-      throws LoginException {
-    TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
-    try {
-      saslFactory.addServerDefinition("PLAIN",
-          authTypeStr, null, new HashMap<String, String>(),
-          new PlainServerCallbackHandler(authTypeStr));
-    } catch (AuthenticationException e) {
-      throw new LoginException ("Error setting callback handler" + e);
-    }
-    return saslFactory;
-  }
-
-  public static TTransport getPlainTransport(String userName, String passwd,
-      final TTransport underlyingTransport) throws SaslException {
-    return new TSaslClientTransport("PLAIN", null,
-        null, null, new HashMap<String, String>(),
-        new PlainClientbackHandler(userName, passwd), underlyingTransport);
-  }
-
 }

Modified: hive/branches/spark/service/src/java/org/apache/hive/service/auth/PlainSaslServer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/auth/PlainSaslServer.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/auth/PlainSaslServer.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/auth/PlainSaslServer.java Mon Sep  8 04:38:17 2014
@@ -18,10 +18,10 @@
 package org.apache.hive.service.auth;
 
 import java.io.IOException;
+import java.security.Provider;
 import java.util.ArrayDeque;
 import java.util.Deque;
 import java.util.Map;
-
 import javax.security.auth.callback.Callback;
 import javax.security.auth.callback.CallbackHandler;
 import javax.security.auth.callback.NameCallback;
@@ -35,27 +35,26 @@ import javax.security.sasl.SaslServerFac
 import org.apache.hive.service.auth.AuthenticationProviderFactory.AuthMethods;
 
 /**
- *
- * PlainSaslServer.
- * Sun JDK only provides PLAIN client and not server. This class implements the Plain SASL server
- * conforming to RFC #4616 (http://www.ietf.org/rfc/rfc4616.txt)
+ * Sun JDK only provides a PLAIN client and no server. This class implements the Plain SASL server
+ * conforming to RFC #4616 (http://www.ietf.org/rfc/rfc4616.txt).
  */
-public class PlainSaslServer implements SaslServer  {
-  private final AuthMethods authMethod;
+public class PlainSaslServer implements SaslServer {
+
+  public static final String PLAIN_METHOD = "PLAIN";
   private String user;
-  private String passwd;
-  private String authzId;
   private final CallbackHandler handler;
 
   PlainSaslServer(CallbackHandler handler, String authMethodStr) throws SaslException {
     this.handler = handler;
-    this.authMethod = AuthMethods.getValidAuthMethod(authMethodStr);
+    AuthMethods.getValidAuthMethod(authMethodStr);
   }
 
+  @Override
   public String getMechanismName() {
-    return "PLAIN";
+    return PLAIN_METHOD;
   }
 
+  @Override
   public byte[] evaluateResponse(byte[] response) throws SaslException {
     try {
       // parse the response
@@ -68,28 +67,29 @@ public class PlainSaslServer implements 
           tokenList.addLast(messageToken.toString());
           messageToken = new StringBuilder();
         } else {
-          messageToken.append((char)b);
+          messageToken.append((char) b);
         }
       }
       tokenList.addLast(messageToken.toString());
 
       // validate response
-      if ((tokenList.size() < 2) || (tokenList.size() > 3)) {
+      if (tokenList.size() < 2 || tokenList.size() > 3) {
         throw new SaslException("Invalid message format");
       }
-      passwd = tokenList.removeLast();
+      String passwd = tokenList.removeLast();
       user = tokenList.removeLast();
       // optional authzid
-      if (!tokenList.isEmpty()) {
-        authzId = tokenList.removeLast();
-      } else {
+      String authzId;
+      if (tokenList.isEmpty()) {
         authzId = user;
+      } else {
+        authzId = tokenList.removeLast();
       }
       if (user == null || user.isEmpty()) {
-        throw new SaslException("No user name provide");
+        throw new SaslException("No user name provided");
       }
       if (passwd == null || passwd.isEmpty()) {
-        throw new SaslException("No password name provide");
+        throw new SaslException("No password name provided");
       }
 
       NameCallback nameCallback = new NameCallback("User");
@@ -98,7 +98,7 @@ public class PlainSaslServer implements 
       pcCallback.setPassword(passwd.toCharArray());
       AuthorizeCallback acCallback = new AuthorizeCallback(user, authzId);
 
-      Callback[] cbList = new Callback[] {nameCallback, pcCallback, acCallback};
+      Callback[] cbList = {nameCallback, pcCallback, acCallback};
       handler.handle(cbList);
       if (!acCallback.isAuthorized()) {
         throw new SaslException("Authentication failed");
@@ -113,49 +113,62 @@ public class PlainSaslServer implements 
     return null;
   }
 
+  @Override
   public boolean isComplete() {
     return user != null;
   }
 
+  @Override
   public String getAuthorizationID() {
     return user;
   }
 
+  @Override
   public byte[] unwrap(byte[] incoming, int offset, int len) {
-      throw new UnsupportedOperationException();
+    throw new UnsupportedOperationException();
   }
 
+  @Override
   public byte[] wrap(byte[] outgoing, int offset, int len) {
     throw new UnsupportedOperationException();
   }
 
+  @Override
   public Object getNegotiatedProperty(String propName) {
     return null;
   }
 
+  @Override
   public void dispose() {}
 
   public static class SaslPlainServerFactory implements SaslServerFactory {
 
-    public SaslServer createSaslServer(
-      String mechanism, String protocol, String serverName, Map<String,?> props, CallbackHandler cbh)
-    {
-      if ("PLAIN".equals(mechanism)) {
+    @Override
+    public SaslServer createSaslServer(String mechanism, String protocol, String serverName,
+      Map<String, ?> props, CallbackHandler cbh) {
+      if (PLAIN_METHOD.equals(mechanism)) {
         try {
           return new PlainSaslServer(cbh, protocol);
         } catch (SaslException e) {
+          /* This is to fulfill the contract of the interface which states that an exception shall
+             be thrown when a SaslServer cannot be created due to an error but null should be
+             returned when a Server can't be created due to the parameters supplied. And the only
+             thing PlainSaslServer can fail on is a non-supported authentication mechanism.
+             That's why we return null instead of throwing the Exception */
           return null;
         }
       }
       return null;
     }
 
+    @Override
     public String[] getMechanismNames(Map<String, ?> props) {
-      return new String[] { "PLAIN" };
+      return new String[] {PLAIN_METHOD};
     }
   }
 
-  public static class SaslPlainProvider extends java.security.Provider {
+  public static class SaslPlainProvider extends Provider {
+
     public SaslPlainProvider() {
       super("HiveSaslPlain", 1.0, "Hive Plain SASL provider");
       put("SaslServerFactory.PLAIN", SaslPlainServerFactory.class.getName());

Modified: hive/branches/spark/service/src/java/org/apache/hive/service/auth/SaslQOP.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/auth/SaslQOP.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/auth/SaslQOP.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/auth/SaslQOP.java Mon Sep  8 04:38:17 2014
@@ -22,7 +22,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 /**
- * Possible values of  SASL quality-of-protection value.
+ * Possible values of SASL quality-of-protection value.
  */
 public enum SaslQOP {
   AUTH("auth"), // Authentication only.
@@ -32,14 +32,15 @@ public enum SaslQOP {
 
   public final String saslQop;
 
-  private static final Map<String, SaslQOP> strToEnum
-          = new HashMap<String, SaslQOP>();
+  private static final Map<String, SaslQOP> STR_TO_ENUM = new HashMap<String, SaslQOP>();
+
   static {
-    for (SaslQOP SaslQOP : values())
-      strToEnum.put(SaslQOP.toString(), SaslQOP);
+    for (SaslQOP saslQop : values()) {
+      STR_TO_ENUM.put(saslQop.toString(), saslQop);
+    }
   }
 
-  private SaslQOP(final String saslQop) {
+  SaslQOP(String saslQop) {
     this.saslQop = saslQop;
   }
 
@@ -48,13 +49,13 @@ public enum SaslQOP {
   }
 
   public static SaslQOP fromString(String str) {
-    if(str != null) {
+    if (str != null) {
       str = str.toLowerCase();
     }
-    SaslQOP saslQOP = strToEnum.get(str);
-    if(saslQOP == null) {
-      throw new IllegalArgumentException("Unknown auth type: " + str + " Allowed values are: "
-              + strToEnum.keySet());
+    SaslQOP saslQOP = STR_TO_ENUM.get(str);
+    if (saslQOP == null) {
+      throw new IllegalArgumentException(
+        "Unknown auth type: " + str + " Allowed values are: " + STR_TO_ENUM.keySet());
     }
     return saslQOP;
   }

Modified: hive/branches/spark/service/src/java/org/apache/hive/service/auth/TSetIpAddressProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/auth/TSetIpAddressProcessor.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/auth/TSetIpAddressProcessor.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/auth/TSetIpAddressProcessor.java Mon Sep  8 04:38:17 2014
@@ -34,10 +34,12 @@ import org.slf4j.LoggerFactory;
  * This class is responsible for setting the ipAddress for operations executed via HiveServer2.
  * <p>
  * <ul>
- * <li>Ipaddress is only set for operations that calls listeners with hookContext @see ExecuteWithHookContext.</li>
- * <li>Ipaddress is only set if the underlying transport mechanism is socket. </li>
+ * <li>IP address is only set for operations that calls listeners with hookContext</li>
+ * <li>IP address is only set if the underlying transport mechanism is socket</li>
  * </ul>
  * </p>
+ *
+ * @see org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext
  */
 public class TSetIpAddressProcessor<I extends Iface> extends TCLIService.Processor<Iface> {
 
@@ -54,26 +56,26 @@ public class TSetIpAddressProcessor<I ex
     try {
       return super.process(in, out);
     } finally {
-      threadLocalUserName.remove();
-      threadLocalIpAddress.remove();
+      THREAD_LOCAL_USER_NAME.remove();
+      THREAD_LOCAL_IP_ADDRESS.remove();
     }
   }
 
   private void setUserName(final TProtocol in) {
     TTransport transport = in.getTransport();
     if (transport instanceof TSaslServerTransport) {
-      String userName = ((TSaslServerTransport)transport).getSaslServer().getAuthorizationID();
-      threadLocalUserName.set(userName);
+      String userName = ((TSaslServerTransport) transport).getSaslServer().getAuthorizationID();
+      THREAD_LOCAL_USER_NAME.set(userName);
     }
   }
 
   protected void setIpAddress(final TProtocol in) {
     TTransport transport = in.getTransport();
     TSocket tSocket = getUnderlyingSocketFromTransport(transport);
-    if (tSocket != null) {
-      threadLocalIpAddress.set(tSocket.getSocket().getInetAddress().toString());
-    } else {
+    if (tSocket == null) {
       LOGGER.warn("Unknown Transport, cannot determine ipAddress");
+    } else {
+      THREAD_LOCAL_IP_ADDRESS.set(tSocket.getSocket().getInetAddress().toString());
     }
   }
 
@@ -92,14 +94,14 @@ public class TSetIpAddressProcessor<I ex
     return null;
   }
 
-  private static ThreadLocal<String> threadLocalIpAddress = new ThreadLocal<String>() {
+  private static final ThreadLocal<String> THREAD_LOCAL_IP_ADDRESS = new ThreadLocal<String>() {
     @Override
     protected synchronized String initialValue() {
       return null;
     }
   };
 
-  private static ThreadLocal<String> threadLocalUserName = new ThreadLocal<String>(){
+  private static final ThreadLocal<String> THREAD_LOCAL_USER_NAME = new ThreadLocal<String>() {
     @Override
     protected synchronized String initialValue() {
       return null;
@@ -107,10 +109,10 @@ public class TSetIpAddressProcessor<I ex
   };
 
   public static String getUserIpAddress() {
-    return threadLocalIpAddress.get();
+    return THREAD_LOCAL_IP_ADDRESS.get();
   }
 
   public static String getUserName() {
-    return threadLocalUserName.get();
+    return THREAD_LOCAL_USER_NAME.get();
   }
-}
\ No newline at end of file
+}

Modified: hive/branches/spark/service/src/java/org/apache/hive/service/auth/TSubjectAssumingTransport.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/auth/TSubjectAssumingTransport.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/auth/TSubjectAssumingTransport.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/auth/TSubjectAssumingTransport.java Mon Sep  8 04:38:17 2014
@@ -22,7 +22,6 @@ import java.security.AccessControlContex
 import java.security.AccessController;
 import java.security.PrivilegedActionException;
 import java.security.PrivilegedExceptionAction;
-
 import javax.security.auth.Subject;
 
 import org.apache.hadoop.hive.thrift.TFilterTransport;
@@ -30,43 +29,42 @@ import org.apache.thrift.transport.TTran
 import org.apache.thrift.transport.TTransportException;
 
 /**
-  *
-  * This is used on the client side, where the API explicitly opens a transport to
-  * the server using the Subject.doAs()
-  */
- public class TSubjectAssumingTransport extends TFilterTransport {
-
-   public TSubjectAssumingTransport(TTransport wrapped) {
-     super(wrapped);
-   }
-
-   @Override
-   public void open() throws TTransportException {
-     try {
-       AccessControlContext context = AccessController.getContext();
-       Subject subject = Subject.getSubject(context);
-       Subject.doAs(subject, new PrivilegedExceptionAction<Void>() {
-         public Void run() {
-           try {
-             wrapped.open();
-           } catch (TTransportException tte) {
-             // Wrap the transport exception in an RTE, since Subject.doAs() then goes
-             // and unwraps this for us out of the doAs block. We then unwrap one
-             // more time in our catch clause to get back the TTE. (ugh)
-             throw new RuntimeException(tte);
-           }
-           return null;
-         }
-       });
-     } catch (PrivilegedActionException ioe) {
-       throw new RuntimeException("Received an ioe we never threw!", ioe);
-     } catch (RuntimeException rte) {
-       if (rte.getCause() instanceof TTransportException) {
-         throw (TTransportException)rte.getCause();
-       } else {
-         throw rte;
-       }
-     }
-   }
+ * This is used on the client side, where the API explicitly opens a transport to
+ * the server using the Subject.doAs().
+ */
+public class TSubjectAssumingTransport extends TFilterTransport {
+
+  public TSubjectAssumingTransport(TTransport wrapped) {
+    super(wrapped);
+  }
+
+  @Override
+  public void open() throws TTransportException {
+    try {
+      AccessControlContext context = AccessController.getContext();
+      Subject subject = Subject.getSubject(context);
+      Subject.doAs(subject, new PrivilegedExceptionAction<Void>() {
+        public Void run() {
+          try {
+            wrapped.open();
+          } catch (TTransportException tte) {
+            // Wrap the transport exception in an RTE, since Subject.doAs() then goes
+            // and unwraps this for us out of the doAs block. We then unwrap one
+            // more time in our catch clause to get back the TTE. (ugh)
+            throw new RuntimeException(tte);
+          }
+          return null;
+        }
+      });
+    } catch (PrivilegedActionException ioe) {
+      throw new RuntimeException("Received an ioe we never threw!", ioe);
+    } catch (RuntimeException rte) {
+      if (rte.getCause() instanceof TTransportException) {
+        throw (TTransportException) rte.getCause();
+      } else {
+        throw rte;
+      }
+    }
+  }
 
- }
+}

Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/CLIService.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/CLIService.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/cli/CLIService.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/CLIService.java Mon Sep  8 04:38:17 2014
@@ -30,12 +30,8 @@ import javax.security.auth.login.LoginEx
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.conf.SystemVariables;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.ql.metadata.Hive;
@@ -118,15 +114,6 @@ public class CLIService extends Composit
   @Override
   public synchronized void start() {
     super.start();
-
-    try {
-      // make sure that the base scratch directories exists and writable
-      setupStagingDir(hiveConf.getVar(HiveConf.ConfVars.SCRATCHDIR), false);
-      setupStagingDir(hiveConf.getVar(HiveConf.ConfVars.LOCALSCRATCHDIR), true);
-      setupStagingDir(hiveConf.getVar(HiveConf.ConfVars.DOWNLOADED_RESOURCES_DIR), true);
-    } catch (IOException eIO) {
-      throw new ServiceException("Error setting stage directories", eIO);
-    }
     // Initialize and test a connection to the metastore
     IMetaStoreClient metastoreClient = null;
     try {
@@ -362,8 +349,9 @@ public class CLIService extends Composit
      * However, if the background operation is complete, we return immediately.
      */
     if (operation.shouldRunAsync()) {
-      long timeout = operation.getParentSession().getHiveConf().getLongVar(
-          HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT);
+      HiveConf conf = operation.getParentSession().getHiveConf();
+      long timeout = HiveConf.getTimeVar(conf,
+          HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT, TimeUnit.MILLISECONDS);
       try {
         operation.getBackgroundHandle().get(timeout, TimeUnit.MILLISECONDS);
       } catch (TimeoutException e) {
@@ -459,25 +447,6 @@ public class CLIService extends Composit
     }
   }
 
-  // create the give Path if doesn't exists and make it writable
-  private void setupStagingDir(String dirPath, boolean isLocal) throws IOException {
-    Path scratchDir = getStaticPath(new Path(dirPath));
-    if (scratchDir == null) {
-      return;
-    }
-    FileSystem fs;
-    if (isLocal) {
-      fs = FileSystem.getLocal(hiveConf);
-    } else {
-      fs = scratchDir.getFileSystem(hiveConf);
-    }
-    if (!fs.exists(scratchDir)) {
-      fs.mkdirs(scratchDir);
-    }
-    FsPermission fsPermission = new FsPermission((short)0777);
-    fs.setPermission(scratchDir, fsPermission);
-  }
-
   @Override
   public String getDelegationToken(SessionHandle sessionHandle, HiveAuthFactory authFactory,
       String owner, String renewer) throws HiveSQLException {
@@ -501,16 +470,4 @@ public class CLIService extends Composit
     sessionManager.getSession(sessionHandle).renewDelegationToken(authFactory, tokenStr);
     LOG.info(sessionHandle  + ": renewDelegationToken()");
   }
-
-  // DOWNLOADED_RESOURCES_DIR for example, which is by default ${system:java.io.tmpdir}/${hive.session.id}_resources,
-  // {system:java.io.tmpdir} would be already evaluated but ${hive.session.id} would be not in here.
-  // for that case, this returns evaluatd parts only, in this case, "/tmp"
-  // what for ${hive.session.id}_resources/${system:java.io.tmpdir}? just don't do that.
-  private Path getStaticPath(Path path) {
-    Path current = path;
-    for (; current != null && SystemVariables.containsVar(current.getName());
-        current = current.getParent()) {
-    }
-    return current;
-  }
 }

Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/OperationState.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/OperationState.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/cli/OperationState.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/OperationState.java Mon Sep  8 04:38:17 2014
@@ -25,29 +25,26 @@ import org.apache.hive.service.cli.thrif
  *
  */
 public enum OperationState {
-  INITIALIZED(TOperationState.INITIALIZED_STATE),
-  RUNNING(TOperationState.RUNNING_STATE),
-  FINISHED(TOperationState.FINISHED_STATE),
-  CANCELED(TOperationState.CANCELED_STATE),
-  CLOSED(TOperationState.CLOSED_STATE),
-  ERROR(TOperationState.ERROR_STATE),
-  UNKNOWN(TOperationState.UKNOWN_STATE),
-  PENDING(TOperationState.PENDING_STATE);
+  INITIALIZED(TOperationState.INITIALIZED_STATE, false),
+  RUNNING(TOperationState.RUNNING_STATE, false),
+  FINISHED(TOperationState.FINISHED_STATE, true),
+  CANCELED(TOperationState.CANCELED_STATE, true),
+  CLOSED(TOperationState.CLOSED_STATE, true),
+  ERROR(TOperationState.ERROR_STATE, true),
+  UNKNOWN(TOperationState.UKNOWN_STATE, false),
+  PENDING(TOperationState.PENDING_STATE, false);
 
   private final TOperationState tOperationState;
+  private final boolean terminal;
 
-  OperationState(TOperationState tOperationState) {
+  OperationState(TOperationState tOperationState, boolean terminal) {
     this.tOperationState = tOperationState;
+    this.terminal = terminal;
   }
 
+  // must be sync with TOperationState in order
   public static OperationState getOperationState(TOperationState tOperationState) {
-    // TODO: replace this with a Map?
-    for (OperationState opState : values()) {
-      if (tOperationState.equals(opState.tOperationState)) {
-        return opState;
-      }
-    }
-    return OperationState.UNKNOWN;
+    return OperationState.values()[tOperationState.getValue()];
   }
 
   public static void validateTransition(OperationState oldState,
@@ -91,7 +88,8 @@ public enum OperationState {
     default:
       // fall-through
     }
-    throw new HiveSQLException("Illegal Operation state transition");
+    throw new HiveSQLException("Illegal Operation state transition " +
+        "from " + oldState + " to " + newState);
   }
 
   public void validateTransition(OperationState newState)
@@ -102,4 +100,8 @@ public enum OperationState {
   public TOperationState toTOperationState() {
     return tOperationState;
   }
+
+  public boolean isTerminal() {
+    return terminal;
+  }
 }

Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/operation/Operation.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/operation/Operation.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/cli/operation/Operation.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/operation/Operation.java Mon Sep  8 04:38:17 2014
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.util.EnumSet;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -52,14 +53,19 @@ public abstract class Operation {
   protected OperationLog operationLog;
   protected boolean isOperationLogEnabled;
 
+  private long operationTimeout;
+  private long lastAccessTime;
+
   protected static final EnumSet<FetchOrientation> DEFAULT_FETCH_ORIENTATION_SET =
       EnumSet.of(FetchOrientation.FETCH_NEXT,FetchOrientation.FETCH_FIRST);
 
   protected Operation(HiveSession parentSession, OperationType opType, boolean runInBackground) {
-    super();
     this.parentSession = parentSession;
     this.runAsync = runInBackground;
     this.opHandle = new OperationHandle(opType, parentSession.getProtocolVersion());
+    lastAccessTime = System.currentTimeMillis();
+    operationTimeout = HiveConf.getTimeVar(parentSession.getHiveConf(),
+        HiveConf.ConfVars.HIVE_SERVER2_IDLE_OPERATION_TIMEOUT, TimeUnit.MILLISECONDS);
   }
 
   public Future<?> getBackgroundHandle() {
@@ -111,7 +117,6 @@ public abstract class Operation {
     opHandle.setHasResultSet(hasResultSet);
   }
 
-
   public OperationLog getOperationLog() {
     return operationLog;
   }
@@ -119,9 +124,33 @@ public abstract class Operation {
   protected final OperationState setState(OperationState newState) throws HiveSQLException {
     state.validateTransition(newState);
     this.state = newState;
+    this.lastAccessTime = System.currentTimeMillis();
     return this.state;
   }
 
+  public boolean isTimedOut(long current) {
+    if (operationTimeout == 0) {
+      return false;
+    }
+    if (operationTimeout > 0) {
+      // check only when it's in terminal state
+      return state.isTerminal() && lastAccessTime + operationTimeout <= current;
+    }
+    return lastAccessTime + -operationTimeout <= current;
+  }
+
+  public long getLastAccessTime() {
+    return lastAccessTime;
+  }
+
+  public long getOperationTimeout() {
+    return operationTimeout;
+  }
+
+  public void setOperationTimeout(long operationTimeout) {
+    this.operationTimeout = operationTimeout;
+  }
+
   protected void setOperationException(HiveSQLException operationException) {
     this.operationException = operationException;
   }
@@ -130,6 +159,7 @@ public abstract class Operation {
     if (this.state != state) {
       throw new HiveSQLException("Expected state " + state + ", but found " + this.state);
     }
+    this.lastAccessTime = System.currentTimeMillis();
   }
 
   public boolean isRunning() {

Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java Mon Sep  8 04:38:17 2014
@@ -19,6 +19,7 @@
 package org.apache.hive.service.cli.operation;
 
 import java.util.Enumeration;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -155,15 +156,27 @@ public class OperationManager extends Ab
     return operation;
   }
 
-  public synchronized Operation getOperation(OperationHandle operationHandle)
-      throws HiveSQLException {
-    Operation operation = handleToOperation.get(operationHandle);
+  public Operation getOperation(OperationHandle operationHandle) throws HiveSQLException {
+    Operation operation = getOperationInternal(operationHandle);
     if (operation == null) {
       throw new HiveSQLException("Invalid OperationHandle: " + operationHandle);
     }
     return operation;
   }
 
+  private synchronized Operation getOperationInternal(OperationHandle operationHandle) {
+    return handleToOperation.get(operationHandle);
+  }
+
+  private synchronized Operation removeTimedOutOperation(OperationHandle operationHandle) {
+    Operation operation = handleToOperation.get(operationHandle);
+    if (operation != null && operation.isTimedOut(System.currentTimeMillis())) {
+      handleToOperation.remove(operationHandle);
+      return operation;
+    }
+    return null;
+  }
+
   private synchronized void addOperation(Operation operation) {
     handleToOperation.put(operation.getHandle(), operation);
   }
@@ -252,4 +265,16 @@ public class OperationManager extends Ab
   public OperationLog getOperationLogByThread() {
     return OperationLog.getCurrentOperationLog();
   }
+
+  public List<Operation> removeExpiredOperations(OperationHandle[] handles) {
+    List<Operation> removed = new ArrayList<Operation>();
+    for (OperationHandle handle : handles) {
+      Operation operation = removeTimedOutOperation(handle);
+      if (operation != null) {
+        LOG.warn("Operation " + handle + " is timed-out and will be closed");
+        removed.add(operation);
+      }
+    }
+    return removed;
+  }
 }

Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/HiveSession.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/HiveSession.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/HiveSession.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/HiveSession.java Mon Sep  8 04:38:17 2014
@@ -27,9 +27,9 @@ import org.apache.hive.service.cli.*;
 
 public interface HiveSession extends HiveSessionBase {
 
-  public void open();
+  void open();
 
-  public IMetaStoreClient getMetaStoreClient() throws HiveSQLException;
+  IMetaStoreClient getMetaStoreClient() throws HiveSQLException;
 
   /**
    * getInfo operation handler
@@ -37,7 +37,7 @@ public interface HiveSession extends Hiv
    * @return
    * @throws HiveSQLException
    */
-  public GetInfoValue getInfo(GetInfoType getInfoType) throws HiveSQLException;
+  GetInfoValue getInfo(GetInfoType getInfoType) throws HiveSQLException;
 
   /**
    * execute operation handler
@@ -46,7 +46,7 @@ public interface HiveSession extends Hiv
    * @return
    * @throws HiveSQLException
    */
-  public OperationHandle executeStatement(String statement,
+  OperationHandle executeStatement(String statement,
       Map<String, String> confOverlay) throws HiveSQLException;
 
   /**
@@ -56,7 +56,7 @@ public interface HiveSession extends Hiv
    * @return
    * @throws HiveSQLException
    */
-  public OperationHandle executeStatementAsync(String statement,
+  OperationHandle executeStatementAsync(String statement,
       Map<String, String> confOverlay) throws HiveSQLException;
 
   /**
@@ -64,14 +64,14 @@ public interface HiveSession extends Hiv
    * @return
    * @throws HiveSQLException
    */
-  public OperationHandle getTypeInfo() throws HiveSQLException;
+  OperationHandle getTypeInfo() throws HiveSQLException;
 
   /**
    * getCatalogs operation handler
    * @return
    * @throws HiveSQLException
    */
-  public OperationHandle getCatalogs() throws HiveSQLException;
+  OperationHandle getCatalogs() throws HiveSQLException;
 
   /**
    * getSchemas operation handler
@@ -80,7 +80,7 @@ public interface HiveSession extends Hiv
    * @return
    * @throws HiveSQLException
    */
-  public OperationHandle getSchemas(String catalogName, String schemaName)
+  OperationHandle getSchemas(String catalogName, String schemaName)
       throws HiveSQLException;
 
   /**
@@ -92,7 +92,7 @@ public interface HiveSession extends Hiv
    * @return
    * @throws HiveSQLException
    */
-  public OperationHandle getTables(String catalogName, String schemaName,
+  OperationHandle getTables(String catalogName, String schemaName,
       String tableName, List<String> tableTypes) throws HiveSQLException;
 
   /**
@@ -100,7 +100,7 @@ public interface HiveSession extends Hiv
    * @return
    * @throws HiveSQLException
    */
-  public OperationHandle getTableTypes() throws HiveSQLException ;
+  OperationHandle getTableTypes() throws HiveSQLException ;
 
   /**
    * getColumns operation handler
@@ -111,7 +111,7 @@ public interface HiveSession extends Hiv
    * @return
    * @throws HiveSQLException
    */
-  public OperationHandle getColumns(String catalogName, String schemaName,
+  OperationHandle getColumns(String catalogName, String schemaName,
       String tableName, String columnName)  throws HiveSQLException;
 
   /**
@@ -122,31 +122,33 @@ public interface HiveSession extends Hiv
    * @return
    * @throws HiveSQLException
    */
-  public OperationHandle getFunctions(String catalogName, String schemaName,
+  OperationHandle getFunctions(String catalogName, String schemaName,
       String functionName) throws HiveSQLException;
 
   /**
    * close the session
    * @throws HiveSQLException
    */
-  public void close() throws HiveSQLException;
+  void close() throws HiveSQLException;
 
-  public void cancelOperation(OperationHandle opHandle) throws HiveSQLException;
+  void cancelOperation(OperationHandle opHandle) throws HiveSQLException;
 
-  public void closeOperation(OperationHandle opHandle) throws HiveSQLException;
+  void closeOperation(OperationHandle opHandle) throws HiveSQLException;
 
-  public TableSchema getResultSetMetadata(OperationHandle opHandle)
+  TableSchema getResultSetMetadata(OperationHandle opHandle)
       throws HiveSQLException;
 
-  public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation,
+  RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation,
       long maxRows, FetchType fetchType) throws HiveSQLException;
 
-  public String getDelegationToken(HiveAuthFactory authFactory, String owner,
+  String getDelegationToken(HiveAuthFactory authFactory, String owner,
       String renewer) throws HiveSQLException;
 
-  public void cancelDelegationToken(HiveAuthFactory authFactory, String tokenStr)
+  void cancelDelegationToken(HiveAuthFactory authFactory, String tokenStr)
       throws HiveSQLException;
 
-  public void renewDelegationToken(HiveAuthFactory authFactory, String tokenStr)
+  void renewDelegationToken(HiveAuthFactory authFactory, String tokenStr)
       throws HiveSQLException;
+
+  void closeExpiredOperations();
 }

Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java Mon Sep  8 04:38:17 2014
@@ -18,6 +18,8 @@
 
 package org.apache.hive.service.cli.session;
 
+import java.util.Map;
+
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hive.service.cli.SessionHandle;
@@ -92,4 +94,6 @@ public interface HiveSessionBase {
   String getIpAddress();
 
   void setIpAddress(String ipAddress);
+
+  long getLastAccessTime();
 }

Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java Mon Sep  8 04:38:17 2014
@@ -54,6 +54,7 @@ import org.apache.hive.service.cli.opera
 import org.apache.hive.service.cli.operation.GetTableTypesOperation;
 import org.apache.hive.service.cli.operation.GetTypeInfoOperation;
 import org.apache.hive.service.cli.operation.MetadataOperation;
+import org.apache.hive.service.cli.operation.Operation;
 import org.apache.hive.service.cli.operation.OperationManager;
 import org.apache.hive.service.cli.thrift.TProtocolVersion;
 import org.apache.hive.service.server.ThreadWithGarbageCleanup;
@@ -68,7 +69,7 @@ public class HiveSessionImpl implements 
 
   private String username;
   private final String password;
-  private final HiveConf hiveConf;
+  private HiveConf hiveConf;
   private final SessionState sessionState;
   private String ipAddress;
 
@@ -84,6 +85,8 @@ public class HiveSessionImpl implements 
   private boolean isOperationLogEnabled;
   private File sessionLogDir;
 
+  private volatile long lastAccessTime;
+
   public HiveSessionImpl(TProtocolVersion protocol, String username, String password,
       HiveConf serverhiveConf, String ipAddress) {
     this.username = username;
@@ -108,6 +111,8 @@ public class HiveSessionImpl implements 
     sessionState = new SessionState(hiveConf, username);
     sessionState.setUserIpAddress(ipAddress);
     sessionState.setIsHiveServerQuery(true);
+
+    lastAccessTime = System.currentTimeMillis();
     SessionState.start(sessionState);
   }
 
@@ -115,6 +120,15 @@ public class HiveSessionImpl implements 
   public void initialize(Map<String, String> sessionConfMap) throws Exception {
     // Process global init file: .hiverc
     processGlobalInitFile();
+    try {
+      sessionState.reloadAuxJars();
+    } catch (IOException e) {
+      String msg = "fail to load reloadable jar file path" + e;
+      LOG.error(msg, e);
+      throw new Exception(msg, e);
+    }
+    SessionState.setCurrentSessionState(sessionState);
+
     // Set conf properties specified by user from client side
     if (sessionConfMap != null) {
       configureSession(sessionConfMap);
@@ -235,14 +249,23 @@ public class HiveSessionImpl implements 
   }
 
   @Override
+  /**
+   * Opens a new HiveServer2 session for the client connection.
+   * Note that if doAs is true, this call goes through a proxy object,
+   * which wraps the method logic in a UserGroupInformation#doAs.
+   * That is why it is important to call SessionState#start here rather than the constructor.
+   */
   public void open() {
     SessionState.start(sessionState);
   }
 
-  protected synchronized void acquire() throws HiveSQLException {
+  protected synchronized void acquire(boolean userAccess) {
     // Need to make sure that the this HiveServer2's session's session state is
     // stored in the thread local for the handler thread.
     SessionState.setCurrentSessionState(sessionState);
+    if (userAccess) {
+      lastAccessTime = System.currentTimeMillis();
+    }
   }
 
   /**
@@ -252,14 +275,16 @@ public class HiveSessionImpl implements 
    * when this thread is garbage collected later.
    * @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize()
    */
-  protected synchronized void release() {
-    assert sessionState != null;
+  protected synchronized void release(boolean userAccess) {
     SessionState.detachSession();
     if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) {
       ThreadWithGarbageCleanup currentThread =
           (ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread();
       currentThread.cacheThreadLocalRawStore();
     }
+    if (userAccess) {
+      lastAccessTime = System.currentTimeMillis();
+    }
   }
 
   @Override
@@ -298,7 +323,7 @@ public class HiveSessionImpl implements 
   @Override
   public GetInfoValue getInfo(GetInfoType getInfoType)
       throws HiveSQLException {
-    acquire();
+    acquire(true);
     try {
       switch (getInfoType) {
       case CLI_SERVER_NAME:
@@ -318,7 +343,7 @@ public class HiveSessionImpl implements 
         throw new HiveSQLException("Unrecognized GetInfoType value: " + getInfoType.toString());
       }
     } finally {
-      release();
+      release(true);
     }
   }
 
@@ -337,7 +362,7 @@ public class HiveSessionImpl implements 
   private OperationHandle executeStatementInternal(String statement, Map<String, String> confOverlay,
       boolean runAsync)
           throws HiveSQLException {
-    acquire();
+    acquire(true);
 
     OperationManager operationManager = getOperationManager();
     ExecuteStatementOperation operation = operationManager
@@ -355,14 +380,14 @@ public class HiveSessionImpl implements 
       }
       throw e;
     } finally {
-      release();
+      release(true);
     }
   }
 
   @Override
   public OperationHandle getTypeInfo()
       throws HiveSQLException {
-    acquire();
+    acquire(true);
 
     OperationManager operationManager = getOperationManager();
     GetTypeInfoOperation operation = operationManager.newGetTypeInfoOperation(getSession());
@@ -375,14 +400,14 @@ public class HiveSessionImpl implements 
       operationManager.closeOperation(opHandle);
       throw e;
     } finally {
-      release();
+      release(true);
     }
   }
 
   @Override
   public OperationHandle getCatalogs()
       throws HiveSQLException {
-    acquire();
+    acquire(true);
 
     OperationManager operationManager = getOperationManager();
     GetCatalogsOperation operation = operationManager.newGetCatalogsOperation(getSession());
@@ -395,14 +420,14 @@ public class HiveSessionImpl implements 
       operationManager.closeOperation(opHandle);
       throw e;
     } finally {
-      release();
+      release(true);
     }
   }
 
   @Override
   public OperationHandle getSchemas(String catalogName, String schemaName)
       throws HiveSQLException {
-    acquire();
+    acquire(true);
 
     OperationManager operationManager = getOperationManager();
     GetSchemasOperation operation =
@@ -416,7 +441,7 @@ public class HiveSessionImpl implements 
       operationManager.closeOperation(opHandle);
       throw e;
     } finally {
-      release();
+      release(true);
     }
   }
 
@@ -424,7 +449,7 @@ public class HiveSessionImpl implements 
   public OperationHandle getTables(String catalogName, String schemaName, String tableName,
       List<String> tableTypes)
           throws HiveSQLException {
-    acquire();
+    acquire(true);
 
     OperationManager operationManager = getOperationManager();
     MetadataOperation operation =
@@ -438,14 +463,14 @@ public class HiveSessionImpl implements 
       operationManager.closeOperation(opHandle);
       throw e;
     } finally {
-      release();
+      release(true);
     }
   }
 
   @Override
   public OperationHandle getTableTypes()
       throws HiveSQLException {
-    acquire();
+    acquire(true);
 
     OperationManager operationManager = getOperationManager();
     GetTableTypesOperation operation = operationManager.newGetTableTypesOperation(getSession());
@@ -458,14 +483,14 @@ public class HiveSessionImpl implements 
       operationManager.closeOperation(opHandle);
       throw e;
     } finally {
-      release();
+      release(true);
     }
   }
 
   @Override
   public OperationHandle getColumns(String catalogName, String schemaName,
       String tableName, String columnName)  throws HiveSQLException {
-    acquire();
+    acquire(true);
 
     OperationManager operationManager = getOperationManager();
     GetColumnsOperation operation = operationManager.newGetColumnsOperation(getSession(),
@@ -479,14 +504,14 @@ public class HiveSessionImpl implements 
       operationManager.closeOperation(opHandle);
       throw e;
     } finally {
-      release();
+      release(true);
     }
   }
 
   @Override
   public OperationHandle getFunctions(String catalogName, String schemaName, String functionName)
       throws HiveSQLException {
-    acquire();
+    acquire(true);
 
     OperationManager operationManager = getOperationManager();
     GetFunctionsOperation operation = operationManager
@@ -500,14 +525,14 @@ public class HiveSessionImpl implements 
       operationManager.closeOperation(opHandle);
       throw e;
     } finally {
-      release();
+      release(true);
     }
   }
 
   @Override
   public void close() throws HiveSQLException {
     try {
-      acquire();
+      acquire(true);
       /**
        * For metadata operations like getTables(), getColumns() etc,
        * the session allocates a private metastore handler which should be
@@ -532,7 +557,7 @@ public class HiveSessionImpl implements 
     } catch (IOException ioe) {
       throw new HiveSQLException("Failure to close", ioe);
     } finally {
-      release();
+      release(true);
     }
   }
 
@@ -562,50 +587,79 @@ public class HiveSessionImpl implements 
   }
 
   @Override
+  public long getLastAccessTime() {
+    return lastAccessTime;
+  }
+
+  @Override
+  public void closeExpiredOperations() {
+    OperationHandle[] handles = opHandleSet.toArray(new OperationHandle[opHandleSet.size()]);
+    if (handles.length > 0) {
+      List<Operation> operations = operationManager.removeExpiredOperations(handles);
+      if (!operations.isEmpty()) {
+        closeTimedOutOperations(operations);
+      }
+    }
+  }
+
+  private void closeTimedOutOperations(List<Operation> operations) {
+    acquire(false);
+    try {
+      for (Operation operation : operations) {
+        opHandleSet.remove(operation.getHandle());
+        try {
+          operation.close();
+        } catch (Exception e) {
+          LOG.warn("Exception is thrown closing timed-out operation " + operation.getHandle(), e);
+        }
+      }
+    } finally {
+      release(false);
+    }
+  }
+
+  @Override
   public void cancelOperation(OperationHandle opHandle) throws HiveSQLException {
-    acquire();
+    acquire(true);
     try {
       sessionManager.getOperationManager().cancelOperation(opHandle);
     } finally {
-      release();
+      release(true);
     }
   }
 
   @Override
   public void closeOperation(OperationHandle opHandle) throws HiveSQLException {
-    acquire();
+    acquire(true);
     try {
       operationManager.closeOperation(opHandle);
       opHandleSet.remove(opHandle);
     } finally {
-      release();
+      release(true);
     }
   }
 
   @Override
   public TableSchema getResultSetMetadata(OperationHandle opHandle) throws HiveSQLException {
-    acquire();
+    acquire(true);
     try {
       return sessionManager.getOperationManager().getOperationResultSetSchema(opHandle);
     } finally {
-      release();
+      release(true);
     }
   }
 
   @Override
   public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation,
       long maxRows, FetchType fetchType) throws HiveSQLException {
-    acquire();
+    acquire(true);
     try {
       if (fetchType == FetchType.QUERY_OUTPUT) {
-        return sessionManager.getOperationManager()
-            .getOperationNextRowSet(opHandle, orientation, maxRows);
-      } else {
-        return sessionManager.getOperationManager()
-            .getOperationLogRowSet(opHandle, orientation, maxRows);
+        return operationManager.getOperationNextRowSet(opHandle, orientation, maxRows);
       }
+      return operationManager.getOperationLogRowSet(opHandle, orientation, maxRows);
     } finally {
-      release();
+      release(true);
     }
   }
 

Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java Mon Sep  8 04:38:17 2014
@@ -19,7 +19,6 @@
 package org.apache.hive.service.cli.session;
 
 import java.io.IOException;
-import java.util.Map;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.metadata.Hive;
@@ -83,8 +82,8 @@ public class HiveSessionImplwithUGI exte
   }
 
   @Override
-  protected synchronized void acquire() throws HiveSQLException {
-    super.acquire();
+  protected synchronized void acquire(boolean userAccess) {
+    super.acquire(userAccess);
     // if we have a metastore connection with impersonation, then set it first
     if (sessionHive != null) {
       Hive.set(sessionHive);
@@ -98,11 +97,11 @@ public class HiveSessionImplwithUGI exte
   @Override
   public void close() throws HiveSQLException {
     try {
-    acquire();
+    acquire(true);
     ShimLoader.getHadoopShims().closeAllForUGI(sessionUgi);
     cancelDelegationToken();
     } finally {
-      release();
+      release(true);
       super.close();
     }
   }

Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/SessionManager.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/SessionManager.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/SessionManager.java Mon Sep  8 04:38:17 2014
@@ -20,6 +20,8 @@ package org.apache.hive.service.cli.sess
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -59,6 +61,11 @@ public class SessionManager extends Comp
   private boolean isOperationLogEnabled;
   private File operationLogRootDir;
 
+  private long checkInterval;
+  private long sessionTimeout;
+
+  private volatile boolean shutdown;
+
   public SessionManager() {
     super("SessionManager");
   }
@@ -81,20 +88,28 @@ public class SessionManager extends Comp
   }
 
   private void createBackgroundOperationPool() {
-    int backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS);
-    LOG.info("HiveServer2: Background operation thread pool size: " + backgroundPoolSize);
-    int backgroundPoolQueueSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_WAIT_QUEUE_SIZE);
-    LOG.info("HiveServer2: Background operation thread wait queue size: " + backgroundPoolQueueSize);
-    int keepAliveTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME);
-    LOG.info("HiveServer2: Background operation thread keepalive time: " + keepAliveTime);
-    // Create a thread pool with #backgroundPoolSize threads
+    int poolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS);
+    LOG.info("HiveServer2: Background operation thread pool size: " + poolSize);
+    int poolQueueSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_WAIT_QUEUE_SIZE);
+    LOG.info("HiveServer2: Background operation thread wait queue size: " + poolQueueSize);
+    long keepAliveTime = HiveConf.getTimeVar(
+        hiveConf, ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME, TimeUnit.SECONDS);
+    LOG.info(
+        "HiveServer2: Background operation thread keepalive time: " + keepAliveTime + " seconds");
+
+    // Create a thread pool with #poolSize threads
     // Threads terminate when they are idle for more than the keepAliveTime
-    // A bounded blocking queue is used to queue incoming operations, if #operations > backgroundPoolSize
+    // A bounded blocking queue is used to queue incoming operations, if #operations > poolSize
     String threadPoolName = "HiveServer2-Background-Pool";
-    backgroundOperationPool = new ThreadPoolExecutor(backgroundPoolSize, backgroundPoolSize,
-        keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(backgroundPoolQueueSize),
+    backgroundOperationPool = new ThreadPoolExecutor(poolSize, poolSize,
+        keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(poolQueueSize),
         new ThreadFactoryWithGarbageCleanup(threadPoolName));
     backgroundOperationPool.allowCoreThreadTimeOut(true);
+
+    checkInterval = HiveConf.getTimeVar(
+        hiveConf, ConfVars.HIVE_SERVER2_SESSION_CHECK_INTERVAL, TimeUnit.MILLISECONDS);
+    sessionTimeout = HiveConf.getTimeVar(
+        hiveConf, ConfVars.HIVE_SERVER2_IDLE_SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
   }
 
   private void applyAuthorizationConfigPolicy(HiveConf newHiveConf) throws HiveException {
@@ -139,20 +154,61 @@ public class SessionManager extends Comp
   @Override
   public synchronized void start() {
     super.start();
+    if (checkInterval > 0) {
+      startTimeoutChecker();
+    }
+  }
+
+  private void startTimeoutChecker() {
+    final long interval = Math.max(checkInterval, 3000l);  // minimum 3 seconds
+    Runnable timeoutChecker = new Runnable() {
+      @Override
+      public void run() {
+        for (sleepInterval(interval); !shutdown; sleepInterval(interval)) {
+          long current = System.currentTimeMillis();
+          for (HiveSession session : new ArrayList<HiveSession>(handleToSession.values())) {
+            if (sessionTimeout > 0 && session.getLastAccessTime() + sessionTimeout <= current) {
+              SessionHandle handle = session.getSessionHandle();
+              LOG.warn("Session " + handle + " is Timed-out (last access : " +
+                  new Date(session.getLastAccessTime()) + ") and will be closed");
+              try {
+                closeSession(handle);
+              } catch (HiveSQLException e) {
+                LOG.warn("Exception is thrown closing session " + handle, e);
+              }
+            } else {
+              session.closeExpiredOperations();
+            }
+          }
+        }
+      }
+
+      private void sleepInterval(long interval) {
+        try {
+          Thread.sleep(interval);
+        } catch (InterruptedException e) {
+          // ignore
+        }
+      }
+    };
+    backgroundOperationPool.execute(timeoutChecker);
   }
 
   @Override
   public synchronized void stop() {
     super.stop();
+    shutdown = true;
     if (backgroundOperationPool != null) {
       backgroundOperationPool.shutdown();
-      int timeout = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT);
+      long timeout = hiveConf.getTimeVar(
+          ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
       try {
         backgroundOperationPool.awaitTermination(timeout, TimeUnit.SECONDS);
       } catch (InterruptedException e) {
         LOG.warn("HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT = " + timeout +
             " seconds has been exceeded. RUNNING background operations will be shut down", e);
       }
+      backgroundOperationPool = null;
     }
     cleanupLoggingRootDir();
   }
@@ -177,11 +233,13 @@ public class SessionManager extends Comp
       Map<String, String> sessionConf, boolean withImpersonation, String delegationToken)
           throws HiveSQLException {
     HiveSession session;
+    // If doAs is set to true for HiveServer2, we will create a proxy object for the session impl.
+    // Within the proxy object, we wrap the method call in a UserGroupInformation#doAs
     if (withImpersonation) {
-      HiveSessionImplwithUGI hiveSessionUgi = new HiveSessionImplwithUGI(protocol, username, password,
-        hiveConf, ipAddress, delegationToken);
-      session = HiveSessionProxy.getProxy(hiveSessionUgi, hiveSessionUgi.getSessionUgi());
-      hiveSessionUgi.setProxySession(session);
+      HiveSessionImplwithUGI sessionWithUGI = new HiveSessionImplwithUGI(protocol, username, password,
+          hiveConf, ipAddress, delegationToken);
+      session = HiveSessionProxy.getProxy(sessionWithUGI, sessionWithUGI.getSessionUgi());
+      sessionWithUGI.setProxySession(session);
     } else {
       session = new HiveSessionImpl(protocol, username, password, hiveConf, ipAddress);
     }

Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java Mon Sep  8 04:38:17 2014
@@ -70,7 +70,8 @@ public class ThriftBinaryCLIService exte
 
       minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS);
       maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS);
-      workerKeepAliveTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME);
+      workerKeepAliveTime = hiveConf.getTimeVar(
+          ConfVars.HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME, TimeUnit.SECONDS);
       String threadPoolName = "HiveServer2-Handler-Pool";
       ExecutorService executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads,
           workerKeepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),

Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java Mon Sep  8 04:38:17 2014
@@ -61,7 +61,7 @@ public abstract class ThriftCLIService e
 
   protected int minWorkerThreads;
   protected int maxWorkerThreads;
-  protected int workerKeepAliveTime;
+  protected long workerKeepAliveTime;
 
   protected static HiveAuthFactory hiveAuthFactory;
 

Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java Mon Sep  8 04:38:17 2014
@@ -69,7 +69,8 @@ public class ThriftHttpCLIService extend
 
       minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MIN_WORKER_THREADS);
       maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_WORKER_THREADS);
-      workerKeepAliveTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME);
+      workerKeepAliveTime = hiveConf.getTimeVar(
+          ConfVars.HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME, TimeUnit.SECONDS);
 
       String httpPath =  getHttpPath(hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH));
 
@@ -110,7 +111,8 @@ public class ThriftHttpCLIService extend
       // Linux:yes, Windows:no
       connector.setReuseAddress(!Shell.WINDOWS);
       
-      int maxIdleTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME);
+      int maxIdleTime = (int) hiveConf.getTimeVar(
+          ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME, TimeUnit.MILLISECONDS);
       connector.setMaxIdleTime(maxIdleTime);
       
       httpServer.addConnector(connector);

Modified: hive/branches/spark/service/src/java/org/apache/hive/service/server/HiveServer2.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/server/HiveServer2.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/server/HiveServer2.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/server/HiveServer2.java Mon Sep  8 04:38:17 2014
@@ -144,6 +144,7 @@ public class HiveServer2 extends Composi
   }
 
   public static void main(String[] args) {
+    HiveConf.setLoadHiveServer2Config(true);
     try {
       ServerOptionsProcessor oproc = new ServerOptionsProcessor("hiveserver2");
       if (!oproc.process(args)) {

Modified: hive/branches/spark/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java (original)
+++ hive/branches/spark/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java Mon Sep  8 04:38:17 2014
@@ -26,9 +26,9 @@ import static org.junit.Assert.fail;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -202,7 +202,8 @@ public abstract class CLIServiceTest {
      * to give a compile time error.
      * (compilation is done synchronous as of now)
      */
-    longPollingTimeout = new HiveConf().getLongVar(ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT);
+    longPollingTimeout = HiveConf.getTimeVar(new HiveConf(),
+        HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT, TimeUnit.MILLISECONDS);
     queryString = "SELECT NON_EXISTING_COLUMN FROM " + tableName;
     try {
       runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.ERROR, longPollingTimeout);
@@ -295,7 +296,7 @@ public abstract class CLIServiceTest {
     long longPollingTimeDelta;
     OperationStatus opStatus = null;
     OperationState state = null;
-    confOverlay.put(HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT.varname, String.valueOf(longPollingTimeout));
+    confOverlay.put(HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT.varname, longPollingTimeout + "ms");
     OperationHandle opHandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay);
     int count = 0;
     while (true) {

Modified: hive/branches/spark/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java (original)
+++ hive/branches/spark/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java Mon Sep  8 04:38:17 2014
@@ -55,6 +55,7 @@ import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.ProxyFileSystem;
 import org.apache.hadoop.fs.Trash;
 import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
 import org.apache.hadoop.io.LongWritable;
@@ -895,4 +896,14 @@ public class Hadoop20Shims implements Ha
     // No password API, just retrieve value from conf
     return conf.get(name);
   }
+
+  @Override
+  public boolean supportStickyBit() {
+    return false;
+  }
+
+  @Override
+  public boolean hasStickyBit(FsPermission permission) {
+    return false;   // not supported
+  }
 }

Modified: hive/branches/spark/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java (original)
+++ hive/branches/spark/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java Mon Sep  8 04:38:17 2014
@@ -40,6 +40,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.ProxyFileSystem;
 import org.apache.hadoop.fs.Trash;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.mapred.ClusterStatus;
@@ -513,4 +514,14 @@ public class Hadoop20SShims extends Hado
     // No password API, just retrieve value from conf
     return conf.get(name);
   }
+
+  @Override
+  public boolean supportStickyBit() {
+    return false;
+  }
+
+  @Override
+  public boolean hasStickyBit(FsPermission permission) {
+    return false;   // not supported
+  }
 }

Modified: hive/branches/spark/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java (original)
+++ hive/branches/spark/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java Mon Sep  8 04:38:17 2014
@@ -19,21 +19,17 @@ package org.apache.hadoop.hive.shims;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
 import java.net.MalformedURLException;
 import java.net.URI;
 import java.security.AccessControlException;
-import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
@@ -302,6 +298,7 @@ public class Hadoop23Shims extends Hadoo
 
       mr = new MiniTezCluster("hive", numberOfTaskTrackers);
       conf.set("fs.defaultFS", nameNode);
+      conf.set("tez.am.log.level", "DEBUG");
       conf.set(MRJobConfig.MR_AM_STAGING_DIR, "/apps_staging_dir");
       mr.init(conf);
       mr.start();
@@ -816,4 +813,14 @@ public class Hadoop23Shims extends Hadoo
       }
     }
   }
+
+  @Override
+  public boolean supportStickyBit() {
+    return true;
+  }
+
+  @Override
+  public boolean hasStickyBit(FsPermission permission) {
+    return permission.getStickyBit();
+  }
 }

Modified: hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java (original)
+++ hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java Mon Sep  8 04:38:17 2014
@@ -24,6 +24,7 @@ import java.net.InetAddress;
 import java.net.Socket;
 import java.security.PrivilegedAction;
 import java.security.PrivilegedExceptionAction;
+import java.util.Locale;
 import java.util.Map;
 
 import javax.security.auth.callback.Callback;
@@ -79,11 +80,23 @@ public class HadoopThriftAuthBridge20S e
   }
 
   @Override
-  public Client createClientWithConf(String authType) {
-    Configuration conf = new Configuration();
-    conf.set(HADOOP_SECURITY_AUTHENTICATION, authType);
-    UserGroupInformation.setConfiguration(conf);
-    return new Client();
+  public Client createClientWithConf(String authMethod) {
+    UserGroupInformation ugi;
+    try {
+      ugi = UserGroupInformation.getLoginUser();
+    } catch(IOException e) {
+      throw new IllegalStateException("Unable to get current login user: " + e, e);
+    }
+    if (loginUserHasCurrentAuthMethod(ugi, authMethod)) {
+      LOG.debug("Not setting UGI conf as passed-in authMethod of " + authMethod + " = current.");
+      return new Client();
+    } else {
+      LOG.debug("Setting UGI conf as passed-in authMethod of " + authMethod + " != current.");
+      Configuration conf = new Configuration();
+      conf.set(HADOOP_SECURITY_AUTHENTICATION, authMethod);
+      UserGroupInformation.setConfiguration(conf);
+      return new Client();
+    }
   }
 
   @Override
@@ -105,15 +118,48 @@ public class HadoopThriftAuthBridge20S e
   }
 
   @Override
-  public UserGroupInformation getCurrentUGIWithConf(String authType)
+  public UserGroupInformation getCurrentUGIWithConf(String authMethod)
       throws IOException {
-    Configuration conf = new Configuration();
-    conf.set(HADOOP_SECURITY_AUTHENTICATION, authType);
-    UserGroupInformation.setConfiguration(conf);
-    return UserGroupInformation.getCurrentUser();
+    UserGroupInformation ugi;
+    try {
+      ugi = UserGroupInformation.getCurrentUser();
+    } catch(IOException e) {
+      throw new IllegalStateException("Unable to get current user: " + e, e);
+    }
+    if (loginUserHasCurrentAuthMethod(ugi, authMethod)) {
+      LOG.debug("Not setting UGI conf as passed-in authMethod of " + authMethod + " = current.");
+      return ugi;
+    } else {
+      LOG.debug("Setting UGI conf as passed-in authMethod of " + authMethod + " != current.");
+      Configuration conf = new Configuration();
+      conf.set(HADOOP_SECURITY_AUTHENTICATION, authMethod);
+      UserGroupInformation.setConfiguration(conf);
+      return UserGroupInformation.getCurrentUser();
+    }
   }
 
   /**
+   * Return true if the current login user is already using the given authMethod.
+   *
+   * Used above to ensure we do not create a new Configuration object and as such
+   * lose other settings such as the cluster to which the JVM is connected. Required
+   * for oozie since it does not have a core-site.xml see HIVE-7682
+   */
+  private boolean loginUserHasCurrentAuthMethod(UserGroupInformation ugi, String sAuthMethod) {
+    AuthenticationMethod authMethod;
+    try {
+      // based on SecurityUtil.getAuthenticationMethod()
+      authMethod = Enum.valueOf(AuthenticationMethod.class, sAuthMethod.toUpperCase(Locale.ENGLISH));
+    } catch (IllegalArgumentException iae) {
+      throw new IllegalArgumentException("Invalid attribute value for " +
+          HADOOP_SECURITY_AUTHENTICATION + " of " + sAuthMethod, iae);
+    }
+    LOG.debug("Current authMethod = " + ugi.getAuthenticationMethod());
+    return ugi.getAuthenticationMethod().equals(authMethod);
+  }
+
+
+  /**
    * Read and return Hadoop SASL configuration which can be configured using
    * "hadoop.rpc.protection"
    * @param conf

Modified: hive/branches/spark/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java (original)
+++ hive/branches/spark/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java Mon Sep  8 04:38:17 2014
@@ -44,6 +44,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.mapred.ClusterStatus;
 import org.apache.hadoop.mapred.InputSplit;
@@ -694,4 +695,16 @@ public interface HadoopShims {
    */
   public String getPassword(Configuration conf, String name) throws IOException;
 
+  /**
+   * check whether current hadoop supports sticky bit
+   * @return
+   */
+  boolean supportStickyBit();
+
+  /**
+   * Check stick bit in the permission
+   * @param permission
+   * @return sticky bit
+   */
+  boolean hasStickyBit(FsPermission permission);
 }