You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ga...@apache.org on 2014/09/01 19:06:19 UTC
svn commit: r1621844 -
/hive/trunk/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
Author: gates
Date: Mon Sep 1 17:06:19 2014
New Revision: 1621844
URL: http://svn.apache.org/r1621844
Log:
HIVE-7508 Kerberos support for streaming (Roshan Naik via Alan Gates)
Modified:
hive/trunk/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
Modified: hive/trunk/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java?rev=1621844&r1=1621843&r2=1621844&view=diff
==============================================================================
--- hive/trunk/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java (original)
+++ hive/trunk/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java Mon Sep 1 17:06:19 2014
@@ -107,7 +107,7 @@ public class HiveEndPoint {
public StreamingConnection newConnection(final boolean createPartIfNotExists)
throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed
, ImpersonationFailed , InterruptedException {
- return newConnection(null, createPartIfNotExists, null);
+ return newConnection(createPartIfNotExists, null, null);
}
/**
@@ -126,67 +126,63 @@ public class HiveEndPoint {
public StreamingConnection newConnection(final boolean createPartIfNotExists, HiveConf conf)
throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed
, ImpersonationFailed , InterruptedException {
- return newConnection(null, createPartIfNotExists, conf);
+ return newConnection(createPartIfNotExists, conf, null);
}
/**
* Acquire a new connection to MetaStore for streaming
- * @param proxyUser User on whose behalf all hdfs and hive operations will be
- * performed on this connection. Set it to null or empty string
- * to connect as user of current process without impersonation.
- * Currently this argument is not supported and must be null
* @param createPartIfNotExists If true, the partition specified in the endpoint
* will be auto created if it does not exist
+ * @param authenticatedUser UserGroupInformation object obtained from successful authentication.
+ * Uses insecure mode if this argument is null.
* @return
- * @throws ConnectionError if problem connecting
+ * @throws ConnectionError if there is a connection problem
* @throws InvalidPartition if specified partition is not valid (createPartIfNotExists = false)
- * @throws ImpersonationFailed if not able to impersonate 'proxyUser'
+ * @throws ImpersonationFailed if not able to impersonate 'username'
* @throws IOException if there was an I/O error when acquiring connection
* @throws PartitionCreationFailed if failed to create partition
* @throws InterruptedException
*/
- private StreamingConnection newConnection(final String proxyUser,
- final boolean createPartIfNotExists, final HiveConf conf)
+ public StreamingConnection newConnection(final boolean createPartIfNotExists, final HiveConf conf,
+ final UserGroupInformation authenticatedUser)
throws ConnectionError, InvalidPartition,
InvalidTable, PartitionCreationFailed, ImpersonationFailed , InterruptedException {
- if (proxyUser ==null || proxyUser.trim().isEmpty() ) {
- return newConnectionImpl(System.getProperty("user.name"), null, createPartIfNotExists, conf);
+
+ if( authenticatedUser==null ) {
+ return newConnectionImpl(authenticatedUser, createPartIfNotExists, conf);
}
- final UserGroupInformation ugi = getUserGroupInfo(proxyUser);
+
try {
- return ugi.doAs (
- new PrivilegedExceptionAction<StreamingConnection>() {
+ return authenticatedUser.doAs (
+ new PrivilegedExceptionAction<StreamingConnection>() {
@Override
public StreamingConnection run()
throws ConnectionError, InvalidPartition, InvalidTable
, PartitionCreationFailed {
- return newConnectionImpl(proxyUser, ugi, createPartIfNotExists, conf);
+ return newConnectionImpl(authenticatedUser, createPartIfNotExists, conf);
}
- }
+ }
);
} catch (IOException e) {
- throw new ImpersonationFailed("Failed to impersonate '" + proxyUser +
- "' when acquiring connection", e);
+ throw new ConnectionError("Failed to connect as : " + authenticatedUser.getShortUserName(), e);
}
}
-
-
- private StreamingConnection newConnectionImpl(String proxyUser, UserGroupInformation ugi,
+ private StreamingConnection newConnectionImpl(UserGroupInformation ugi,
boolean createPartIfNotExists, HiveConf conf)
throws ConnectionError, InvalidPartition, InvalidTable
, PartitionCreationFailed {
- return new ConnectionImpl(this, proxyUser, ugi, conf, createPartIfNotExists);
+ return new ConnectionImpl(this, ugi, conf, createPartIfNotExists);
}
- private static UserGroupInformation getUserGroupInfo(String proxyUser)
+ private static UserGroupInformation getUserGroupInfo(String user)
throws ImpersonationFailed {
try {
return UserGroupInformation.createProxyUser(
- proxyUser, UserGroupInformation.getLoginUser());
+ user, UserGroupInformation.getLoginUser());
} catch (IOException e) {
- LOG.error("Unable to login as proxy user. Exception follows.", e);
- throw new ImpersonationFailed(proxyUser,e);
+ LOG.error("Unable to get UserGroupInfo for user : " + user, e);
+ throw new ImpersonationFailed(user,e);
}
}
@@ -242,14 +238,12 @@ public class HiveEndPoint {
private static class ConnectionImpl implements StreamingConnection {
private final IMetaStoreClient msClient;
private final HiveEndPoint endPt;
- private final String proxyUser;
private final UserGroupInformation ugi;
+ private final String username;
/**
- *
* @param endPoint end point to connect to
- * @param proxyUser can be null
- * @param ugi of prody user. If ugi is null, impersonation of proxy user will be disabled
+ * @param ugi on behalf of whom streaming is done. cannot be null
* @param conf HiveConf object
* @param createPart create the partition if it does not exist
* @throws ConnectionError if there is trouble connecting
@@ -257,15 +251,15 @@ public class HiveEndPoint {
* @throws InvalidTable if specified table does not exist
* @throws PartitionCreationFailed if createPart=true and not able to create partition
*/
- private ConnectionImpl(HiveEndPoint endPoint, String proxyUser, UserGroupInformation ugi,
+ private ConnectionImpl(HiveEndPoint endPoint, UserGroupInformation ugi,
HiveConf conf, boolean createPart)
throws ConnectionError, InvalidPartition, InvalidTable
, PartitionCreationFailed {
- this.proxyUser = proxyUser;
this.endPt = endPoint;
this.ugi = ugi;
+ this.username = ugi==null ? System.getProperty("user.name") : ugi.getShortUserName();
if (conf==null) {
- conf = HiveEndPoint.createHiveConf(this.getClass(),endPoint.metaStoreUri);
+ conf = HiveEndPoint.createHiveConf(this.getClass(), endPoint.metaStoreUri);
}
this.msClient = getMetaStoreClient(endPoint, conf);
if (createPart && !endPoint.partitionVals.isEmpty()) {
@@ -324,21 +318,21 @@ public class HiveEndPoint {
return ugi.doAs (
new PrivilegedExceptionAction<TransactionBatch>() {
@Override
- public TransactionBatch run() throws StreamingException {
+ public TransactionBatch run() throws StreamingException, InterruptedException {
return fetchTransactionBatchImpl(numTransactions, recordWriter);
}
}
);
} catch (IOException e) {
- throw new ImpersonationFailed("Failed impersonating proxy user '" + proxyUser +
- "' when acquiring Transaction Batch on endPoint " + endPt, e);
+ throw new ImpersonationFailed("Failed to fetch Txn Batch as user '" + ugi.getShortUserName()
+ + "' when acquiring Transaction Batch on endPoint " + endPt, e);
}
}
private TransactionBatch fetchTransactionBatchImpl(int numTransactions,
RecordWriter recordWriter)
- throws StreamingException, TransactionBatchUnAvailable {
- return new TransactionBatchImpl(proxyUser, ugi, endPt, numTransactions, msClient
+ throws StreamingException, TransactionBatchUnAvailable, InterruptedException {
+ return new TransactionBatchImpl(username, ugi, endPt, numTransactions, msClient
, recordWriter);
}
@@ -445,7 +439,7 @@ public class HiveEndPoint {
} // class ConnectionImpl
private static class TransactionBatchImpl implements TransactionBatch {
- private final String proxyUser;
+ private final String username;
private final UserGroupInformation ugi;
private final HiveEndPoint endPt;
private final IMetaStoreClient msClient;
@@ -461,7 +455,7 @@ public class HiveEndPoint {
/**
* Represents a batch of transactions acquired from MetaStore
*
- * @param proxyUser
+ * @param user
* @param ugi
* @param endPt
* @param numTxns
@@ -470,9 +464,9 @@ public class HiveEndPoint {
* @throws StreamingException if failed to create new RecordUpdater for batch
* @throws TransactionBatchUnAvailable if failed to acquire a new Transaction batch
*/
- private TransactionBatchImpl(String proxyUser, UserGroupInformation ugi, HiveEndPoint endPt
- , int numTxns, IMetaStoreClient msClient, RecordWriter recordWriter)
- throws StreamingException, TransactionBatchUnAvailable {
+ private TransactionBatchImpl(final String user, UserGroupInformation ugi, HiveEndPoint endPt
+ , final int numTxns, final IMetaStoreClient msClient, RecordWriter recordWriter)
+ throws StreamingException, TransactionBatchUnAvailable, InterruptedException {
try {
if ( endPt.partitionVals!=null && !endPt.partitionVals.isEmpty() ) {
Table tableObj = msClient.getTable(endPt.database, endPt.table);
@@ -481,20 +475,38 @@ public class HiveEndPoint {
} else {
partNameForLock = null;
}
- this.proxyUser = proxyUser;
+ this.username = user;
this.ugi = ugi;
this.endPt = endPt;
this.msClient = msClient;
this.recordWriter = recordWriter;
- this.txnIds = msClient.openTxns(proxyUser, numTxns).getTxn_ids();
+
+ txnIds = openTxnImpl(msClient, user, numTxns, ugi);
+
+
this.currentTxnIndex = -1;
this.state = TxnState.INACTIVE;
recordWriter.newBatch(txnIds.get(0), txnIds.get(txnIds.size()-1));
} catch (TException e) {
throw new TransactionBatchUnAvailable(endPt, e);
+ } catch (IOException e) {
+ throw new TransactionBatchUnAvailable(endPt, e);
}
}
+ private List<Long> openTxnImpl(final IMetaStoreClient msClient, final String user, final int numTxns, UserGroupInformation ugi)
+ throws IOException, TException, InterruptedException {
+ if(ugi==null) {
+ return msClient.openTxns(user, numTxns).getTxn_ids();
+ }
+ return (List<Long>) ugi.doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws Exception {
+ return msClient.openTxns(user, numTxns).getTxn_ids();
+ }
+ }) ;
+ }
+
@Override
public String toString() {
if (txnIds==null || txnIds.isEmpty()) {
@@ -526,8 +538,8 @@ public class HiveEndPoint {
}
);
} catch (IOException e) {
- throw new ImpersonationFailed("Failed impersonating proxyUser '" + proxyUser +
- "' when switch to next Transaction for endPoint :" + endPt, e);
+ throw new ImpersonationFailed("Failed switching to next Txn as user '" + username +
+ "' in Txn batch :" + this, e);
}
}
@@ -536,7 +548,7 @@ public class HiveEndPoint {
throw new InvalidTrasactionState("No more transactions available in" +
" current batch for end point : " + endPt);
++currentTxnIndex;
- lockRequest = createLockRequest(endPt, partNameForLock, proxyUser, getCurrentTxnId());
+ lockRequest = createLockRequest(endPt, partNameForLock, username, getCurrentTxnId());
try {
LockResponse res = msClient.lock(lockRequest);
if (res.getState() != LockState.ACQUIRED) {
@@ -608,8 +620,8 @@ public class HiveEndPoint {
}
);
} catch (IOException e) {
- throw new ImpersonationFailed("Failed impersonating proxy user '" + proxyUser +
- "' when writing to endPoint :" + endPt + ". Transaction Id: "
+ throw new ImpersonationFailed("Failed wirting as user '" + username +
+ "' to endPoint :" + endPt + ". Transaction Id: "
+ getCurrentTxnId(), e);
}
}
@@ -641,8 +653,8 @@ public class HiveEndPoint {
}
);
} catch (IOException e) {
- throw new ImpersonationFailed("Failed impersonating proxyUser '" + proxyUser +
- "' when writing to endPoint :" + endPt + ". Transaction Id: "
+ throw new ImpersonationFailed("Failed writing as user '" + username +
+ "' to endPoint :" + endPt + ". Transaction Id: "
+ getCurrentTxnId(), e);
}
}
@@ -680,9 +692,8 @@ public class HiveEndPoint {
}
);
} catch (IOException e) {
- throw new ImpersonationFailed("Failed impersonating proxy user '" + proxyUser +
- "' when committing Txn on endPoint :" + endPt + ". Transaction Id: "
- + getCurrentTxnId(), e);
+ throw new ImpersonationFailed("Failed committing Txn ID " + getCurrentTxnId() + " as user '"
+ + username + "'on endPoint :" + endPt + ". Transaction Id: ", e);
}
}
@@ -726,9 +737,8 @@ public class HiveEndPoint {
}
);
} catch (IOException e) {
- throw new ImpersonationFailed("Failed impersonating proxy user '" + proxyUser +
- "' when aborting Txn on endPoint :" + endPt + ". Transaction Id: "
- + getCurrentTxnId(), e);
+ throw new ImpersonationFailed("Failed aborting Txn " + getCurrentTxnId() + " as user '"
+ + username + "' on endPoint :" + endPt, e);
}
}
@@ -784,8 +794,8 @@ public class HiveEndPoint {
}
);
} catch (IOException e) {
- throw new ImpersonationFailed("Failed impersonating proxy user '" + proxyUser +
- "' when closing Txn Batch on endPoint :" + endPt, e);
+ throw new ImpersonationFailed("Failed closing Txn Batch as user '" + username +
+ "' on endPoint :" + endPt, e);
}
}