You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ga...@apache.org on 2014/09/01 19:06:19 UTC

svn commit: r1621844 - /hive/trunk/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java

Author: gates
Date: Mon Sep  1 17:06:19 2014
New Revision: 1621844

URL: http://svn.apache.org/r1621844
Log:
HIVE-7508 Kerberos support for streaming (Roshan Naik via Alan Gates)

Modified:
    hive/trunk/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java

Modified: hive/trunk/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java?rev=1621844&r1=1621843&r2=1621844&view=diff
==============================================================================
--- hive/trunk/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java (original)
+++ hive/trunk/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java Mon Sep  1 17:06:19 2014
@@ -107,7 +107,7 @@ public class HiveEndPoint {
   public StreamingConnection newConnection(final boolean createPartIfNotExists)
           throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed
           , ImpersonationFailed , InterruptedException {
-    return newConnection(null, createPartIfNotExists, null);
+    return newConnection(createPartIfNotExists, null, null);
   }
 
   /**
@@ -126,67 +126,63 @@ public class HiveEndPoint {
   public StreamingConnection newConnection(final boolean createPartIfNotExists, HiveConf conf)
           throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed
           , ImpersonationFailed , InterruptedException {
-    return newConnection(null, createPartIfNotExists, conf);
+    return newConnection(createPartIfNotExists, conf, null);
   }
 
   /**
    * Acquire a new connection to MetaStore for streaming
-   * @param proxyUser User on whose behalf all hdfs and hive operations will be
-   *                  performed on this connection. Set it to null or empty string
-   *                  to connect as user of current process without impersonation.
-   *                  Currently this argument is not supported and must be null
    * @param createPartIfNotExists If true, the partition specified in the endpoint
    *                              will be auto created if it does not exist
+   * @param authenticatedUser  UserGroupInformation object obtained from successful authentication.
+   *                           Uses insecure mode if this argument is null.
    * @return
-   * @throws ConnectionError if problem connecting
+   * @throws ConnectionError if there is a connection problem
    * @throws InvalidPartition  if specified partition is not valid (createPartIfNotExists = false)
-   * @throws ImpersonationFailed  if not able to impersonate 'proxyUser'
+   * @throws ImpersonationFailed  if not able to impersonate 'username'
    * @throws IOException  if there was an I/O error when acquiring connection
    * @throws PartitionCreationFailed if failed to create partition
    * @throws InterruptedException
    */
-  private StreamingConnection newConnection(final String proxyUser,
-                                            final boolean createPartIfNotExists, final HiveConf conf)
+  public StreamingConnection newConnection(final boolean createPartIfNotExists, final HiveConf conf,
+                                            final UserGroupInformation authenticatedUser)
           throws ConnectionError, InvalidPartition,
                InvalidTable, PartitionCreationFailed, ImpersonationFailed , InterruptedException {
-    if (proxyUser ==null || proxyUser.trim().isEmpty() ) {
-      return newConnectionImpl(System.getProperty("user.name"), null, createPartIfNotExists, conf);
+
+    if( authenticatedUser==null ) {
+      return newConnectionImpl(authenticatedUser, createPartIfNotExists, conf);
     }
-    final UserGroupInformation ugi = getUserGroupInfo(proxyUser);
+
     try {
-      return ugi.doAs (
-              new PrivilegedExceptionAction<StreamingConnection>() {
+      return authenticatedUser.doAs (
+             new PrivilegedExceptionAction<StreamingConnection>() {
                 @Override
                 public StreamingConnection run()
                         throws ConnectionError, InvalidPartition, InvalidTable
                         , PartitionCreationFailed {
-                  return newConnectionImpl(proxyUser, ugi, createPartIfNotExists, conf);
+                  return newConnectionImpl(authenticatedUser, createPartIfNotExists, conf);
                 }
-              }
+             }
       );
     } catch (IOException e) {
-      throw new ImpersonationFailed("Failed to impersonate '" + proxyUser +
-              "' when acquiring connection", e);
+      throw new ConnectionError("Failed to connect as : " + authenticatedUser.getShortUserName(), e);
     }
   }
 
-
-
-  private StreamingConnection newConnectionImpl(String proxyUser, UserGroupInformation ugi,
+  private StreamingConnection newConnectionImpl(UserGroupInformation ugi,
                                                boolean createPartIfNotExists, HiveConf conf)
           throws ConnectionError, InvalidPartition, InvalidTable
           , PartitionCreationFailed {
-    return new ConnectionImpl(this, proxyUser, ugi, conf, createPartIfNotExists);
+    return new ConnectionImpl(this, ugi, conf, createPartIfNotExists);
   }
 
-  private static UserGroupInformation getUserGroupInfo(String proxyUser)
+  private static UserGroupInformation getUserGroupInfo(String user)
           throws ImpersonationFailed {
     try {
       return UserGroupInformation.createProxyUser(
-              proxyUser, UserGroupInformation.getLoginUser());
+              user, UserGroupInformation.getLoginUser());
     } catch (IOException e) {
-      LOG.error("Unable to login as proxy user. Exception follows.", e);
-      throw new ImpersonationFailed(proxyUser,e);
+      LOG.error("Unable to get UserGroupInfo for user : " + user, e);
+      throw new ImpersonationFailed(user,e);
     }
   }
 
@@ -242,14 +238,12 @@ public class HiveEndPoint {
   private static class ConnectionImpl implements StreamingConnection {
     private final IMetaStoreClient msClient;
     private final HiveEndPoint endPt;
-    private final String proxyUser;
     private final UserGroupInformation ugi;
+    private final String username;
 
     /**
-     *
      * @param endPoint end point to connect to
-     * @param proxyUser  can be null
-     * @param ugi of prody user. If ugi is null, impersonation of proxy user will be disabled
+     * @param ugi on behalf of whom streaming is done. cannot be null
      * @param conf HiveConf object
      * @param createPart create the partition if it does not exist
      * @throws ConnectionError if there is trouble connecting
@@ -257,15 +251,15 @@ public class HiveEndPoint {
      * @throws InvalidTable if specified table does not exist
      * @throws PartitionCreationFailed if createPart=true and not able to create partition
      */
-    private ConnectionImpl(HiveEndPoint endPoint, String proxyUser, UserGroupInformation ugi,
+    private ConnectionImpl(HiveEndPoint endPoint, UserGroupInformation ugi,
                            HiveConf conf, boolean createPart)
             throws ConnectionError, InvalidPartition, InvalidTable
                    , PartitionCreationFailed {
-      this.proxyUser = proxyUser;
       this.endPt = endPoint;
       this.ugi = ugi;
+      this.username = ugi==null ? System.getProperty("user.name") : ugi.getShortUserName();
       if (conf==null) {
-        conf = HiveEndPoint.createHiveConf(this.getClass(),endPoint.metaStoreUri);
+        conf = HiveEndPoint.createHiveConf(this.getClass(), endPoint.metaStoreUri);
       }
       this.msClient = getMetaStoreClient(endPoint, conf);
       if (createPart  &&  !endPoint.partitionVals.isEmpty()) {
@@ -324,21 +318,21 @@ public class HiveEndPoint {
         return ugi.doAs (
                 new PrivilegedExceptionAction<TransactionBatch>() {
                   @Override
-                  public TransactionBatch run() throws StreamingException {
+                  public TransactionBatch run() throws StreamingException, InterruptedException {
                     return fetchTransactionBatchImpl(numTransactions, recordWriter);
                   }
                 }
         );
       } catch (IOException e) {
-        throw new ImpersonationFailed("Failed impersonating proxy user '" + proxyUser +
-                "' when acquiring Transaction Batch on endPoint " + endPt, e);
+        throw new ImpersonationFailed("Failed to fetch Txn Batch as user '" + ugi.getShortUserName()
+                + "' when acquiring Transaction Batch on endPoint " + endPt, e);
       }
     }
 
     private TransactionBatch fetchTransactionBatchImpl(int numTransactions,
                                                   RecordWriter recordWriter)
-            throws StreamingException, TransactionBatchUnAvailable {
-      return new TransactionBatchImpl(proxyUser, ugi, endPt, numTransactions, msClient
+            throws StreamingException, TransactionBatchUnAvailable, InterruptedException {
+      return new TransactionBatchImpl(username, ugi, endPt, numTransactions, msClient
               , recordWriter);
     }
 
@@ -445,7 +439,7 @@ public class HiveEndPoint {
   } // class ConnectionImpl
 
   private static class TransactionBatchImpl implements TransactionBatch {
-    private final String proxyUser;
+    private final String username;
     private final UserGroupInformation ugi;
     private final HiveEndPoint endPt;
     private final IMetaStoreClient msClient;
@@ -461,7 +455,7 @@ public class HiveEndPoint {
     /**
      * Represents a batch of transactions acquired from MetaStore
      *
-     * @param proxyUser
+     * @param user
      * @param ugi
      * @param endPt
      * @param numTxns
@@ -470,9 +464,9 @@ public class HiveEndPoint {
      * @throws StreamingException if failed to create new RecordUpdater for batch
      * @throws TransactionBatchUnAvailable if failed to acquire a new Transaction batch
      */
-    private TransactionBatchImpl(String proxyUser, UserGroupInformation ugi, HiveEndPoint endPt
-              , int numTxns, IMetaStoreClient msClient, RecordWriter recordWriter)
-            throws StreamingException, TransactionBatchUnAvailable {
+    private TransactionBatchImpl(final String user, UserGroupInformation ugi, HiveEndPoint endPt
+              , final int numTxns, final IMetaStoreClient msClient, RecordWriter recordWriter)
+            throws StreamingException, TransactionBatchUnAvailable, InterruptedException {
       try {
         if ( endPt.partitionVals!=null   &&   !endPt.partitionVals.isEmpty() ) {
           Table tableObj = msClient.getTable(endPt.database, endPt.table);
@@ -481,20 +475,38 @@ public class HiveEndPoint {
         } else {
           partNameForLock = null;
         }
-        this.proxyUser = proxyUser;
+        this.username = user;
         this.ugi = ugi;
         this.endPt = endPt;
         this.msClient = msClient;
         this.recordWriter = recordWriter;
-        this.txnIds = msClient.openTxns(proxyUser, numTxns).getTxn_ids();
+
+        txnIds = openTxnImpl(msClient, user, numTxns, ugi);
+
+
         this.currentTxnIndex = -1;
         this.state = TxnState.INACTIVE;
         recordWriter.newBatch(txnIds.get(0), txnIds.get(txnIds.size()-1));
       } catch (TException e) {
         throw new TransactionBatchUnAvailable(endPt, e);
+      } catch (IOException e) {
+        throw new TransactionBatchUnAvailable(endPt, e);
       }
     }
 
+    private List<Long> openTxnImpl(final IMetaStoreClient msClient, final String user, final int numTxns, UserGroupInformation ugi)
+            throws IOException, TException,  InterruptedException {
+      if(ugi==null) {
+        return  msClient.openTxns(user, numTxns).getTxn_ids();
+      }
+      return (List<Long>) ugi.doAs(new PrivilegedExceptionAction<Object>() {
+        @Override
+        public Object run() throws Exception {
+          return msClient.openTxns(user, numTxns).getTxn_ids();
+        }
+      }) ;
+    }
+
     @Override
     public String toString() {
       if (txnIds==null || txnIds.isEmpty()) {
@@ -526,8 +538,8 @@ public class HiveEndPoint {
               }
         );
       } catch (IOException e) {
-        throw new ImpersonationFailed("Failed impersonating proxyUser '" + proxyUser +
-                "' when switch to next Transaction for endPoint :" + endPt, e);
+        throw new ImpersonationFailed("Failed switching to next Txn as user '" + username +
+                "' in Txn batch :" + this, e);
       }
     }
 
@@ -536,7 +548,7 @@ public class HiveEndPoint {
         throw new InvalidTrasactionState("No more transactions available in" +
                 " current batch for end point : " + endPt);
       ++currentTxnIndex;
-      lockRequest = createLockRequest(endPt, partNameForLock, proxyUser, getCurrentTxnId());
+      lockRequest = createLockRequest(endPt, partNameForLock, username, getCurrentTxnId());
       try {
         LockResponse res = msClient.lock(lockRequest);
         if (res.getState() != LockState.ACQUIRED) {
@@ -608,8 +620,8 @@ public class HiveEndPoint {
             }
         );
       } catch (IOException e) {
-        throw new ImpersonationFailed("Failed impersonating proxy user '" + proxyUser +
-                "' when writing to endPoint :" + endPt + ". Transaction Id: "
+        throw new ImpersonationFailed("Failed wirting as user '" + username +
+                "' to endPoint :" + endPt + ". Transaction Id: "
                 + getCurrentTxnId(), e);
       }
     }
@@ -641,8 +653,8 @@ public class HiveEndPoint {
                 }
         );
       } catch (IOException e) {
-        throw new ImpersonationFailed("Failed impersonating proxyUser '" + proxyUser +
-                "' when writing to endPoint :" + endPt + ". Transaction Id: "
+        throw new ImpersonationFailed("Failed writing as user '" + username +
+                "' to endPoint :" + endPt + ". Transaction Id: "
                 + getCurrentTxnId(), e);
       }
     }
@@ -680,9 +692,8 @@ public class HiveEndPoint {
               }
         );
       } catch (IOException e) {
-        throw new ImpersonationFailed("Failed impersonating proxy user '" + proxyUser +
-                "' when committing Txn on endPoint :" + endPt + ". Transaction Id: "
-                + getCurrentTxnId(), e);
+        throw new ImpersonationFailed("Failed committing Txn ID " + getCurrentTxnId() + " as user '"
+                + username + "'on endPoint :" + endPt + ". Transaction Id: ", e);
       }
 
     }
@@ -726,9 +737,8 @@ public class HiveEndPoint {
                 }
         );
       } catch (IOException e) {
-        throw new ImpersonationFailed("Failed impersonating proxy user '" + proxyUser +
-                "' when aborting Txn on endPoint :" + endPt + ". Transaction Id: "
-                + getCurrentTxnId(), e);
+        throw new ImpersonationFailed("Failed aborting Txn " + getCurrentTxnId()  + " as user '"
+                + username + "' on endPoint :" + endPt, e);
       }
     }
 
@@ -784,8 +794,8 @@ public class HiveEndPoint {
                 }
         );
       } catch (IOException e) {
-        throw new ImpersonationFailed("Failed impersonating proxy user '" + proxyUser +
-                "' when closing Txn Batch on  endPoint :" + endPt, e);
+        throw new ImpersonationFailed("Failed closing Txn Batch as user '" + username +
+                "' on  endPoint :" + endPt, e);
       }
     }