You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2018/05/02 16:45:00 UTC
[4/7] hive git commit: HIVE-19211: New streaming ingest API and
support for dynamic partitioning (Prasanth Jayachandran reviewed by Eugene
Koifman)
http://git-wip-us.apache.org/repos/asf/hive/blob/bf8d305a/streaming/src/java/org/apache/hive/streaming/HiveEndPoint.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/HiveEndPoint.java b/streaming/src/java/org/apache/hive/streaming/HiveEndPoint.java
deleted file mode 100644
index b04e137..0000000
--- a/streaming/src/java/org/apache/hive/streaming/HiveEndPoint.java
+++ /dev/null
@@ -1,1117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hive.streaming;
-
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hive.cli.CliSessionState;
-import org.apache.hadoop.hive.common.JavaUtils;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.LockComponentBuilder;
-import org.apache.hadoop.hive.metastore.LockRequestBuilder;
-import org.apache.hadoop.hive.metastore.Warehouse;
-import org.apache.hadoop.hive.metastore.api.DataOperationType;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse;
-import org.apache.hadoop.hive.metastore.api.LockRequest;
-import org.apache.hadoop.hive.metastore.api.LockResponse;
-import org.apache.hadoop.hive.metastore.api.LockState;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
-import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
-import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
-import org.apache.hadoop.hive.ql.DriverFactory;
-import org.apache.hadoop.hive.ql.IDriver;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hive.hcatalog.common.HCatUtil;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Information about the hive end point (i.e. table or partition) to write to.
- * A light weight object that does NOT internally hold on to resources such as
- * network connections. It can be stored in Hashed containers such as sets and hash tables.
- */
-public class HiveEndPoint {
- public final String metaStoreUri;
- public final String database;
- public final String table;
- public final ArrayList<String> partitionVals;
-
-
- static final private Logger LOG = LoggerFactory.getLogger(HiveEndPoint.class.getName());
-
- /**
- *
- * @param metaStoreUri URI of the metastore to connect to eg: thrift://localhost:9083
- * @param database Name of the Hive database
- * @param table Name of table to stream to
- * @param partitionVals Indicates the specific partition to stream to. Can be null or empty List
- * if streaming to a table without partitions. The order of values in this
- * list must correspond exactly to the order of partition columns specified
- * during the table creation. E.g. For a table partitioned by
- * (continent string, country string), partitionVals could be the list
- * ("Asia", "India").
- */
- public HiveEndPoint(String metaStoreUri
- , String database, String table, List<String> partitionVals) {
- this.metaStoreUri = metaStoreUri;
- if (database==null) {
- throw new IllegalArgumentException("Database cannot be null for HiveEndPoint");
- }
- this.database = database;
- this.table = table;
- if (table==null) {
- throw new IllegalArgumentException("Table cannot be null for HiveEndPoint");
- }
- this.partitionVals = partitionVals==null ? new ArrayList<String>()
- : new ArrayList<String>( partitionVals );
- }
-
-
- /**
- * @deprecated As of release 1.3/2.1. Replaced by {@link #newConnection(boolean, String)}
- */
- @Deprecated
- public StreamingConnection newConnection(final boolean createPartIfNotExists)
- throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed
- , ImpersonationFailed , InterruptedException {
- return newConnection(createPartIfNotExists, null, null, null);
- }
- /**
- * @deprecated As of release 1.3/2.1. Replaced by {@link #newConnection(boolean, HiveConf, String)}
- */
- @Deprecated
- public StreamingConnection newConnection(final boolean createPartIfNotExists, HiveConf conf)
- throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed
- , ImpersonationFailed , InterruptedException {
- return newConnection(createPartIfNotExists, conf, null, null);
- }
- /**
- * @deprecated As of release 1.3/2.1. Replaced by {@link #newConnection(boolean, HiveConf, UserGroupInformation, String)}
- */
- @Deprecated
- public StreamingConnection newConnection(final boolean createPartIfNotExists, final HiveConf conf,
- final UserGroupInformation authenticatedUser)
- throws ConnectionError, InvalidPartition,
- InvalidTable, PartitionCreationFailed, ImpersonationFailed , InterruptedException {
- return newConnection(createPartIfNotExists, conf, authenticatedUser, null);
- }
- /**
- * Acquire a new connection to MetaStore for streaming
- * @param createPartIfNotExists If true, the partition specified in the endpoint
- * will be auto created if it does not exist
- * @param agentInfo should uniquely identify the process/entity that is using this batch. This
- * should be something that can be correlated with calling application log files
- * and/or monitoring consoles.
- * @return
- * @throws ConnectionError if problem connecting
- * @throws InvalidPartition if specified partition is not valid (createPartIfNotExists = false)
- * @throws ImpersonationFailed if not able to impersonate 'proxyUser'
- * @throws PartitionCreationFailed if failed to create partition
- * @throws InterruptedException
- */
- public StreamingConnection newConnection(final boolean createPartIfNotExists, String agentInfo)
- throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed
- , ImpersonationFailed , InterruptedException {
- return newConnection(createPartIfNotExists, null, null, agentInfo);
- }
-
- /**
- * Acquire a new connection to MetaStore for streaming
- * @param createPartIfNotExists If true, the partition specified in the endpoint
- * will be auto created if it does not exist
- * @param conf HiveConf object, set it to null if not using advanced hive settings.
- * @param agentInfo should uniquely identify the process/entity that is using this batch. This
- * should be something that can be correlated with calling application log files
- * and/or monitoring consoles.
- * @return
- * @throws ConnectionError if problem connecting
- * @throws InvalidPartition if specified partition is not valid (createPartIfNotExists = false)
- * @throws ImpersonationFailed if not able to impersonate 'proxyUser'
- * @throws PartitionCreationFailed if failed to create partition
- * @throws InterruptedException
- */
- public StreamingConnection newConnection(final boolean createPartIfNotExists, HiveConf conf, String agentInfo)
- throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed
- , ImpersonationFailed , InterruptedException {
- return newConnection(createPartIfNotExists, conf, null, agentInfo);
- }
-
- /**
- * Acquire a new connection to MetaStore for streaming. To connect using Kerberos,
- * 'authenticatedUser' argument should have been used to do a kerberos login. Additionally the
- * 'hive.metastore.kerberos.principal' setting should be set correctly either in hive-site.xml or
- * in the 'conf' argument (if not null). If using hive-site.xml, it should be in classpath.
- *
- * @param createPartIfNotExists If true, the partition specified in the endpoint
- * will be auto created if it does not exist
- * @param conf HiveConf object to be used for the connection. Can be null.
- * @param authenticatedUser UserGroupInformation object obtained from successful authentication.
- * Uses non-secure mode if this argument is null.
- * @param agentInfo should uniquely identify the process/entity that is using this batch. This
- * should be something that can be correlated with calling application log files
- * and/or monitoring consoles.
- * @return
- * @throws ConnectionError if there is a connection problem
- * @throws InvalidPartition if specified partition is not valid (createPartIfNotExists = false)
- * @throws ImpersonationFailed if not able to impersonate 'username'
- * @throws PartitionCreationFailed if failed to create partition
- * @throws InterruptedException
- */
- public StreamingConnection newConnection(final boolean createPartIfNotExists, final HiveConf conf,
- final UserGroupInformation authenticatedUser, final String agentInfo)
- throws ConnectionError, InvalidPartition,
- InvalidTable, PartitionCreationFailed, ImpersonationFailed , InterruptedException {
-
- if( authenticatedUser==null ) {
- return newConnectionImpl(authenticatedUser, createPartIfNotExists, conf, agentInfo);
- }
-
- try {
- return authenticatedUser.doAs (
- new PrivilegedExceptionAction<StreamingConnection>() {
- @Override
- public StreamingConnection run()
- throws ConnectionError, InvalidPartition, InvalidTable
- , PartitionCreationFailed {
- return newConnectionImpl(authenticatedUser, createPartIfNotExists, conf, agentInfo);
- }
- }
- );
- } catch (IOException e) {
- throw new ConnectionError("Failed to connect as : " + authenticatedUser.getShortUserName(), e);
- }
- }
-
- private StreamingConnection newConnectionImpl(UserGroupInformation ugi,
- boolean createPartIfNotExists, HiveConf conf, String agentInfo)
- throws ConnectionError, InvalidPartition, InvalidTable
- , PartitionCreationFailed {
- return new ConnectionImpl(this, ugi, conf, createPartIfNotExists, agentInfo);
- }
-
- private static UserGroupInformation getUserGroupInfo(String user)
- throws ImpersonationFailed {
- try {
- return UserGroupInformation.createProxyUser(
- user, UserGroupInformation.getLoginUser());
- } catch (IOException e) {
- LOG.error("Unable to get UserGroupInfo for user : " + user, e);
- throw new ImpersonationFailed(user,e);
- }
- }
-
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- HiveEndPoint endPoint = (HiveEndPoint) o;
-
- if (database != null
- ? !database.equals(endPoint.database)
- : endPoint.database != null ) {
- return false;
- }
- if (metaStoreUri != null
- ? !metaStoreUri.equals(endPoint.metaStoreUri)
- : endPoint.metaStoreUri != null ) {
- return false;
- }
- if (!partitionVals.equals(endPoint.partitionVals)) {
- return false;
- }
- if (table != null ? !table.equals(endPoint.table) : endPoint.table != null) {
- return false;
- }
-
- return true;
- }
-
- @Override
- public int hashCode() {
- int result = metaStoreUri != null ? metaStoreUri.hashCode() : 0;
- result = 31 * result + (database != null ? database.hashCode() : 0);
- result = 31 * result + (table != null ? table.hashCode() : 0);
- result = 31 * result + partitionVals.hashCode();
- return result;
- }
-
- @Override
- public String toString() {
- return "{" +
- "metaStoreUri='" + metaStoreUri + '\'' +
- ", database='" + database + '\'' +
- ", table='" + table + '\'' +
- ", partitionVals=" + partitionVals + " }";
- }
-
-
- private static class ConnectionImpl implements StreamingConnection {
- private final IMetaStoreClient msClient;
- private final IMetaStoreClient heartbeaterMSClient;
- private final HiveEndPoint endPt;
- private final UserGroupInformation ugi;
- private final String username;
- private final boolean secureMode;
- private final String agentInfo;
-
- /**
- * @param endPoint end point to connect to
- * @param ugi on behalf of whom streaming is done. cannot be null
- * @param conf HiveConf object
- * @param createPart create the partition if it does not exist
- * @throws ConnectionError if there is trouble connecting
- * @throws InvalidPartition if specified partition does not exist (and createPart=false)
- * @throws InvalidTable if specified table does not exist
- * @throws PartitionCreationFailed if createPart=true and not able to create partition
- */
- private ConnectionImpl(HiveEndPoint endPoint, UserGroupInformation ugi,
- HiveConf conf, boolean createPart, String agentInfo)
- throws ConnectionError, InvalidPartition, InvalidTable
- , PartitionCreationFailed {
- this.endPt = endPoint;
- this.ugi = ugi;
- this.agentInfo = agentInfo;
- this.username = ugi==null ? System.getProperty("user.name") : ugi.getShortUserName();
- if (conf==null) {
- conf = HiveEndPoint.createHiveConf(this.getClass(), endPoint.metaStoreUri);
- }
- else {
- overrideConfSettings(conf);
- }
- this.secureMode = ugi==null ? false : ugi.hasKerberosCredentials();
- this.msClient = getMetaStoreClient(endPoint, conf, secureMode);
- // We use a separate metastore client for heartbeat calls to ensure heartbeat RPC calls are
- // isolated from the other transaction related RPC calls.
- this.heartbeaterMSClient = getMetaStoreClient(endPoint, conf, secureMode);
- checkEndPoint(endPoint, msClient);
- if (createPart && !endPoint.partitionVals.isEmpty()) {
- createPartitionIfNotExists(endPoint, msClient, conf);
- }
- }
-
- /**
- * Checks the validity of endpoint
- * @param endPoint the HiveEndPoint to be checked
- * @param msClient the metastore client
- * @throws InvalidTable
- */
- private void checkEndPoint(HiveEndPoint endPoint, IMetaStoreClient msClient)
- throws InvalidTable, ConnectionError {
- Table t;
- try {
- t = msClient.getTable(endPoint.database, endPoint.table);
- } catch (Exception e) {
- LOG.warn("Unable to check the endPoint: " + endPoint, e);
- throw new InvalidTable(endPoint.database, endPoint.table, e);
- }
- // 1 - check that the table is Acid
- if (!AcidUtils.isFullAcidTable(t)) {
- LOG.error("HiveEndPoint " + endPoint + " must use an acid table");
- throw new InvalidTable(endPoint.database, endPoint.table, "is not an Acid table");
- }
-
- // 2 - check if partitionvals are legitimate
- if (t.getPartitionKeys() != null && !t.getPartitionKeys().isEmpty()
- && endPoint.partitionVals.isEmpty()) {
- // Invalid if table is partitioned, but endPoint's partitionVals is empty
- String errMsg = "HiveEndPoint " + endPoint + " doesn't specify any partitions for " +
- "partitioned table";
- LOG.error(errMsg);
- throw new ConnectionError(errMsg);
- }
- if ((t.getPartitionKeys() == null || t.getPartitionKeys().isEmpty())
- && !endPoint.partitionVals.isEmpty()) {
- // Invalid if table is not partitioned, but endPoint's partitionVals is not empty
- String errMsg = "HiveEndPoint" + endPoint + " specifies partitions for unpartitioned table";
- LOG.error(errMsg);
- throw new ConnectionError(errMsg);
- }
- }
-
- /**
- * Close connection
- */
- @Override
- public void close() {
- if (ugi==null) {
- msClient.close();
- heartbeaterMSClient.close();
- return;
- }
- try {
- ugi.doAs (
- new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws Exception {
- msClient.close();
- heartbeaterMSClient.close();
- return null;
- }
- } );
- try {
- FileSystem.closeAllForUGI(ugi);
- } catch (IOException exception) {
- LOG.error("Could not clean up file-system handles for UGI: " + ugi, exception);
- }
- } catch (IOException e) {
- LOG.error("Error closing connection to " + endPt, e);
- } catch (InterruptedException e) {
- LOG.error("Interrupted when closing connection to " + endPt, e);
- }
- }
-
- @Override
- public UserGroupInformation getUserGroupInformation() {
- return ugi;
- }
-
- /**
- * Acquires a new batch of transactions from Hive.
- *
- * @param numTransactions is a hint from client indicating how many transactions client needs.
- * @param recordWriter Used to write record. The same writer instance can
- * be shared with another TransactionBatch (to the same endpoint)
- * only after the first TransactionBatch has been closed.
- * Writer will be closed when the TransactionBatch is closed.
- * @return
- * @throws StreamingIOFailure if failed to create new RecordUpdater for batch
- * @throws TransactionBatchUnAvailable if failed to acquire a new Transaction batch
- * @throws ImpersonationFailed failed to run command as proxyUser
- * @throws InterruptedException
- */
- @Override
- public TransactionBatch fetchTransactionBatch(final int numTransactions,
- final RecordWriter recordWriter)
- throws StreamingException, TransactionBatchUnAvailable, ImpersonationFailed
- , InterruptedException {
- if (ugi==null) {
- return fetchTransactionBatchImpl(numTransactions, recordWriter);
- }
- try {
- return ugi.doAs (
- new PrivilegedExceptionAction<TransactionBatch>() {
- @Override
- public TransactionBatch run() throws StreamingException, InterruptedException {
- return fetchTransactionBatchImpl(numTransactions, recordWriter);
- }
- }
- );
- } catch (IOException e) {
- throw new ImpersonationFailed("Failed to fetch Txn Batch as user '" + ugi.getShortUserName()
- + "' when acquiring Transaction Batch on endPoint " + endPt, e);
- }
- }
-
- private TransactionBatch fetchTransactionBatchImpl(int numTransactions,
- RecordWriter recordWriter)
- throws StreamingException, TransactionBatchUnAvailable, InterruptedException {
- return new TransactionBatchImpl(username, ugi, endPt, numTransactions, msClient,
- heartbeaterMSClient, recordWriter, agentInfo);
- }
-
-
- private static void createPartitionIfNotExists(HiveEndPoint ep,
- IMetaStoreClient msClient, HiveConf conf)
- throws InvalidTable, PartitionCreationFailed {
- if (ep.partitionVals.isEmpty()) {
- return;
- }
- SessionState localSession = null;
- if(SessionState.get() == null) {
- localSession = SessionState.start(new CliSessionState(conf));
- }
- IDriver driver = DriverFactory.newDriver(conf);
-
- try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Attempting to create partition (if not existent) " + ep);
- }
-
- List<FieldSchema> partKeys = msClient.getTable(ep.database, ep.table)
- .getPartitionKeys();
- runDDL(driver, "use " + ep.database);
- String query = "alter table " + ep.table + " add if not exists partition "
- + partSpecStr(partKeys, ep.partitionVals);
- runDDL(driver, query);
- } catch (MetaException e) {
- LOG.error("Failed to create partition : " + ep, e);
- throw new PartitionCreationFailed(ep, e);
- } catch (NoSuchObjectException e) {
- LOG.error("Failed to create partition : " + ep, e);
- throw new InvalidTable(ep.database, ep.table);
- } catch (TException e) {
- LOG.error("Failed to create partition : " + ep, e);
- throw new PartitionCreationFailed(ep, e);
- } catch (QueryFailedException e) {
- LOG.error("Failed to create partition : " + ep, e);
- throw new PartitionCreationFailed(ep, e);
- } finally {
- driver.close();
- try {
- if(localSession != null) {
- localSession.close();
- }
- } catch (IOException e) {
- LOG.warn("Error closing SessionState used to run Hive DDL.");
- }
- }
- }
-
- private static boolean runDDL(IDriver driver, String sql) throws QueryFailedException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Running Hive Query: " + sql);
- }
- driver.run(sql);
- return true;
- }
-
- private static String partSpecStr(List<FieldSchema> partKeys, ArrayList<String> partVals) {
- if (partKeys.size()!=partVals.size()) {
- throw new IllegalArgumentException("Partition values:" + partVals +
- ", does not match the partition Keys in table :" + partKeys );
- }
- StringBuilder buff = new StringBuilder(partKeys.size()*20);
- buff.append(" ( ");
- int i=0;
- for (FieldSchema schema : partKeys) {
- buff.append(schema.getName());
- buff.append("='");
- buff.append(partVals.get(i));
- buff.append("'");
- if (i!=partKeys.size()-1) {
- buff.append(",");
- }
- ++i;
- }
- buff.append(" )");
- return buff.toString();
- }
-
- private static IMetaStoreClient getMetaStoreClient(HiveEndPoint endPoint, HiveConf conf, boolean secureMode)
- throws ConnectionError {
-
- if (endPoint.metaStoreUri!= null) {
- conf.setVar(HiveConf.ConfVars.METASTOREURIS, endPoint.metaStoreUri);
- }
- if(secureMode) {
- conf.setBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL,true);
- }
- try {
- return HCatUtil.getHiveMetastoreClient(conf);
- } catch (MetaException e) {
- throw new ConnectionError("Error connecting to Hive Metastore URI: "
- + endPoint.metaStoreUri + ". " + e.getMessage(), e);
- } catch (IOException e) {
- throw new ConnectionError("Error connecting to Hive Metastore URI: "
- + endPoint.metaStoreUri + ". " + e.getMessage(), e);
- }
- }
- } // class ConnectionImpl
-
- private static class TransactionBatchImpl implements TransactionBatch {
- private final String username;
- private final UserGroupInformation ugi;
- private final HiveEndPoint endPt;
- private final IMetaStoreClient msClient;
- private final IMetaStoreClient heartbeaterMSClient;
- private final RecordWriter recordWriter;
- private final List<TxnToWriteId> txnToWriteIds;
-
- //volatile because heartbeat() may be in a "different" thread; updates of this are "piggybacking"
- private volatile int currentTxnIndex = -1;
- private final String partNameForLock;
- //volatile because heartbeat() may be in a "different" thread
- private volatile TxnState state;
- private LockRequest lockRequest = null;
- /**
- * once any operation on this batch encounters a system exception
- * (e.g. IOException on write) it's safest to assume that we can't write to the
- * file backing this batch any more. This guards important public methods
- */
- private volatile boolean isClosed = false;
- private final String agentInfo;
- /**
- * Tracks the state of each transaction
- */
- private final TxnState[] txnStatus;
- /**
- * ID of the last txn used by {@link #beginNextTransactionImpl()}
- */
- private long lastTxnUsed;
-
- /**
- * Represents a batch of transactions acquired from MetaStore
- *
- * @throws StreamingException if failed to create new RecordUpdater for batch
- * @throws TransactionBatchUnAvailable if failed to acquire a new Transaction batch
- */
- private TransactionBatchImpl(final String user, UserGroupInformation ugi, HiveEndPoint endPt,
- final int numTxns, final IMetaStoreClient msClient,
- final IMetaStoreClient heartbeaterMSClient, RecordWriter recordWriter, String agentInfo)
- throws StreamingException, TransactionBatchUnAvailable, InterruptedException {
- boolean success = false;
- try {
- if ( endPt.partitionVals!=null && !endPt.partitionVals.isEmpty() ) {
- Table tableObj = msClient.getTable(endPt.database, endPt.table);
- List<FieldSchema> partKeys = tableObj.getPartitionKeys();
- partNameForLock = Warehouse.makePartName(partKeys, endPt.partitionVals);
- } else {
- partNameForLock = null;
- }
- this.username = user;
- this.ugi = ugi;
- this.endPt = endPt;
- this.msClient = msClient;
- this.heartbeaterMSClient = heartbeaterMSClient;
- this.recordWriter = recordWriter;
- this.agentInfo = agentInfo;
-
- List<Long> txnIds = openTxnImpl(msClient, user, numTxns, ugi);
- txnToWriteIds = allocateWriteIdsImpl(msClient, txnIds, ugi);
- assert(txnToWriteIds.size() == numTxns);
-
- txnStatus = new TxnState[numTxns];
- for(int i = 0; i < txnStatus.length; i++) {
- assert(txnToWriteIds.get(i).getTxnId() == txnIds.get(i));
- txnStatus[i] = TxnState.OPEN;//Open matches Metastore state
- }
- this.state = TxnState.INACTIVE;
-
- // The Write Ids returned for the transaction batch is also sequential
- recordWriter.newBatch(txnToWriteIds.get(0).getWriteId(), txnToWriteIds.get(numTxns-1).getWriteId());
- success = true;
- } catch (TException e) {
- throw new TransactionBatchUnAvailable(endPt, e);
- } catch (IOException e) {
- throw new TransactionBatchUnAvailable(endPt, e);
- }
- finally {
- //clean up if above throws
- markDead(success);
- }
- }
-
- private List<Long> openTxnImpl(final IMetaStoreClient msClient, final String user, final int numTxns, UserGroupInformation ugi)
- throws IOException, TException, InterruptedException {
- if(ugi==null) {
- return msClient.openTxns(user, numTxns).getTxn_ids();
- }
- return (List<Long>) ugi.doAs(new PrivilegedExceptionAction<Object>() {
- @Override
- public Object run() throws Exception {
- return msClient.openTxns(user, numTxns).getTxn_ids();
- }
- });
- }
-
- private List<TxnToWriteId> allocateWriteIdsImpl(final IMetaStoreClient msClient,
- final List<Long> txnIds, UserGroupInformation ugi)
- throws IOException, TException, InterruptedException {
- if(ugi==null) {
- return msClient.allocateTableWriteIdsBatch(txnIds, endPt.database, endPt.table);
- }
- return (List<TxnToWriteId>) ugi.doAs(new PrivilegedExceptionAction<Object>() {
- @Override
- public Object run() throws Exception {
- return msClient.allocateTableWriteIdsBatch(txnIds, endPt.database, endPt.table);
- }
- });
- }
-
- @Override
- public String toString() {
- if (txnToWriteIds==null || txnToWriteIds.isEmpty()) {
- return "{}";
- }
- StringBuilder sb = new StringBuilder(" TxnStatus[");
- for(TxnState state : txnStatus) {
- //'state' should not be null - future proofing
- sb.append(state == null ? "N" : state);
- }
- sb.append("] LastUsed ").append(JavaUtils.txnIdToString(lastTxnUsed));
- return "TxnId/WriteIds=[" + txnToWriteIds.get(0).getTxnId()
- + "/" + txnToWriteIds.get(0).getWriteId()
- + "..."
- + txnToWriteIds.get(txnToWriteIds.size()-1).getTxnId()
- + "/" + txnToWriteIds.get(txnToWriteIds.size()-1).getWriteId()
- + "] on endPoint = " + endPt + "; " + sb;
- }
-
- /**
- * Activate the next available transaction in the current transaction batch
- * @throws TransactionError failed to switch to next transaction
- */
- @Override
- public void beginNextTransaction() throws TransactionError, ImpersonationFailed,
- InterruptedException {
- checkIsClosed();
- if (ugi==null) {
- beginNextTransactionImpl();
- return;
- }
- try {
- ugi.doAs (
- new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws TransactionError {
- beginNextTransactionImpl();
- return null;
- }
- }
- );
- } catch (IOException e) {
- throw new ImpersonationFailed("Failed switching to next Txn as user '" + username +
- "' in Txn batch :" + this, e);
- }
- }
-
- private void beginNextTransactionImpl() throws TransactionError {
- state = TxnState.INACTIVE;//clear state from previous txn
-
- if ((currentTxnIndex + 1) >= txnToWriteIds.size()) {
- throw new InvalidTrasactionState("No more transactions available in" +
- " current batch for end point : " + endPt);
- }
- ++currentTxnIndex;
- state = TxnState.OPEN;
- lastTxnUsed = getCurrentTxnId();
- lockRequest = createLockRequest(endPt, partNameForLock, username, getCurrentTxnId(), agentInfo);
- try {
- LockResponse res = msClient.lock(lockRequest);
- if (res.getState() != LockState.ACQUIRED) {
- throw new TransactionError("Unable to acquire lock on " + endPt);
- }
- } catch (TException e) {
- throw new TransactionError("Unable to acquire lock on " + endPt, e);
- }
- }
-
- /**
- * Get Id of currently open transaction.
- * @return -1 if there is no open TX
- */
- @Override
- public Long getCurrentTxnId() {
- if (currentTxnIndex >= 0) {
- return txnToWriteIds.get(currentTxnIndex).getTxnId();
- }
- return -1L;
- }
-
- /**
- * Get Id of currently open transaction.
- * @return -1 if there is no open TX
- */
- @Override
- public Long getCurrentWriteId() {
- if (currentTxnIndex >= 0) {
- return txnToWriteIds.get(currentTxnIndex).getWriteId();
- }
- return -1L;
- }
-
- /**
- * get state of current transaction
- * @return
- */
- @Override
- public TxnState getCurrentTransactionState() {
- return state;
- }
-
- /**
- * Remaining transactions are the ones that are not committed or aborted or active.
- * Active transaction is not considered part of remaining txns.
- * @return number of transactions remaining this batch.
- */
- @Override
- public int remainingTransactions() {
- if (currentTxnIndex>=0) {
- return txnToWriteIds.size() - currentTxnIndex -1;
- }
- return txnToWriteIds.size();
- }
-
-
- /**
- * Write record using RecordWriter
- * @param record the data to be written
- * @throws StreamingIOFailure I/O failure
- * @throws SerializationError serialization error
- * @throws ImpersonationFailed error writing on behalf of proxyUser
- * @throws InterruptedException
- */
- @Override
- public void write(final byte[] record)
- throws StreamingException, InterruptedException {
- write(Collections.singletonList(record));
- }
- private void checkIsClosed() throws IllegalStateException {
- if(isClosed) {
- throw new IllegalStateException("TransactionBatch " + toString() + " has been closed()");
- }
- }
- /**
- * A transaction batch opens a single HDFS file and writes multiple transaction to it. If there is any issue
- * with the write, we can't continue to write to the same file any as it may be corrupted now (at the tail).
- * This ensures that a client can't ignore these failures and continue to write.
- */
- private void markDead(boolean success) {
- if(success) {
- return;
- }
- isClosed = true;//also ensures that heartbeat() is no-op since client is likely doing it async
- try {
- abort(true);//abort all remaining txns
- }
- catch(Exception ex) {
- LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex);
- }
- try {
- closeImpl();
- }
- catch (Exception ex) {
- LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex);
- }
- }
-
-
- /**
- * Write records using RecordWriter
- * @param records collection of rows to be written
- * @throws StreamingException serialization error
- * @throws ImpersonationFailed error writing on behalf of proxyUser
- * @throws InterruptedException
- */
- @Override
- public void write(final Collection<byte[]> records)
- throws StreamingException, InterruptedException,
- ImpersonationFailed {
- checkIsClosed();
- boolean success = false;
- try {
- if (ugi == null) {
- writeImpl(records);
- } else {
- ugi.doAs(
- new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws StreamingException {
- writeImpl(records);
- return null;
- }
- }
- );
- }
- success = true;
- } catch(SerializationError ex) {
- //this exception indicates that a {@code record} could not be parsed and the
- //caller can decide whether to drop it or send it to dead letter queue.
- //rolling back the txn and retrying won't help since the tuple will be exactly the same
- //when it's replayed.
- success = true;
- throw ex;
- } catch(IOException e){
- throw new ImpersonationFailed("Failed writing as user '" + username +
- "' to endPoint :" + endPt + ". Transaction Id: "
- + getCurrentTxnId(), e);
- }
- finally {
- markDead(success);
- }
- }
-
- private void writeImpl(Collection<byte[]> records)
- throws StreamingException {
- for (byte[] record : records) {
- recordWriter.write(getCurrentWriteId(), record);
- }
- }
-
-
- /**
- * Commit the currently open transaction
- * @throws TransactionError
- * @throws StreamingIOFailure if flushing records failed
- * @throws ImpersonationFailed if
- * @throws InterruptedException
- */
- @Override
- public void commit() throws TransactionError, StreamingException,
- ImpersonationFailed, InterruptedException {
- checkIsClosed();
- boolean success = false;
- try {
- if (ugi == null) {
- commitImpl();
- }
- else {
- ugi.doAs(
- new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws StreamingException {
- commitImpl();
- return null;
- }
- }
- );
- }
- success = true;
- } catch (IOException e) {
- throw new ImpersonationFailed("Failed committing Txn ID " + getCurrentTxnId() + " as user '"
- + username + "'on endPoint :" + endPt + ". Transaction Id: ", e);
- }
- finally {
- markDead(success);
- }
- }
-
- private void commitImpl() throws TransactionError, StreamingException {
- try {
- recordWriter.flush();
- msClient.commitTxn(txnToWriteIds.get(currentTxnIndex).getTxnId());
- state = TxnState.COMMITTED;
- txnStatus[currentTxnIndex] = TxnState.COMMITTED;
- } catch (NoSuchTxnException e) {
- throw new TransactionError("Invalid transaction id : "
- + getCurrentTxnId(), e);
- } catch (TxnAbortedException e) {
- throw new TransactionError("Aborted transaction cannot be committed"
- , e);
- } catch (TException e) {
- throw new TransactionError("Unable to commit transaction"
- + getCurrentTxnId(), e);
- }
- }
-
- /**
- * Abort the currently open transaction
- * @throws TransactionError
- */
- @Override
- public void abort() throws TransactionError, StreamingException
- , ImpersonationFailed, InterruptedException {
- if(isClosed) {
- /**
- * isDead is only set internally by this class. {@link #markDead(boolean)} will abort all
- * remaining txns, so make this no-op to make sure that a well-behaved client that calls abort()
- * error doesn't get misleading errors
- */
- return;
- }
- abort(false);
- }
- private void abort(final boolean abortAllRemaining) throws TransactionError, StreamingException
- , ImpersonationFailed, InterruptedException {
- if (ugi==null) {
- abortImpl(abortAllRemaining);
- return;
- }
- try {
- ugi.doAs (
- new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws StreamingException {
- abortImpl(abortAllRemaining);
- return null;
- }
- }
- );
- } catch (IOException e) {
- throw new ImpersonationFailed("Failed aborting Txn " + getCurrentTxnId() + " as user '"
- + username + "' on endPoint :" + endPt, e);
- }
- }
-
- private void abortImpl(boolean abortAllRemaining) throws TransactionError, StreamingException {
- try {
- if(abortAllRemaining) {
- //when last txn finished (abort/commit) the currentTxnIndex is pointing at that txn
- //so we need to start from next one, if any. Also if batch was created but
- //fetchTransactionBatch() was never called, we want to start with first txn
- int minOpenTxnIndex = Math.max(currentTxnIndex +
- (state == TxnState.ABORTED || state == TxnState.COMMITTED ? 1 : 0), 0);
- for(currentTxnIndex = minOpenTxnIndex;
- currentTxnIndex < txnToWriteIds.size(); currentTxnIndex++) {
- msClient.rollbackTxn(txnToWriteIds.get(currentTxnIndex).getTxnId());
- txnStatus[currentTxnIndex] = TxnState.ABORTED;
- }
- currentTxnIndex--;//since the loop left it == txnToWriteIds.size()
- }
- else {
- if (getCurrentTxnId() > 0) {
- msClient.rollbackTxn(getCurrentTxnId());
- txnStatus[currentTxnIndex] = TxnState.ABORTED;
- }
- }
- state = TxnState.ABORTED;
- recordWriter.clear();
- } catch (NoSuchTxnException e) {
- throw new TransactionError("Unable to abort invalid transaction id : "
- + getCurrentTxnId(), e);
- } catch (TException e) {
- throw new TransactionError("Unable to abort transaction id : "
- + getCurrentTxnId(), e);
- }
- }
-
- @Override
- public void heartbeat() throws StreamingException, HeartBeatFailure {
- if(isClosed) {
- return;
- }
- if(state != TxnState.OPEN && currentTxnIndex >= txnToWriteIds.size() - 1) {
- //here means last txn in the batch is resolved but the close() hasn't been called yet so
- //there is nothing to heartbeat
- return;
- }
- //if here after commit()/abort() but before next beginNextTransaction(), currentTxnIndex still
- //points at the last txn which we don't want to heartbeat
- Long first = txnToWriteIds.get(state == TxnState.OPEN ? currentTxnIndex : currentTxnIndex + 1).getTxnId();
- Long last = txnToWriteIds.get(txnToWriteIds.size()-1).getTxnId();
- try {
- HeartbeatTxnRangeResponse resp = heartbeaterMSClient.heartbeatTxnRange(first, last);
- if (!resp.getAborted().isEmpty() || !resp.getNosuch().isEmpty()) {
- throw new HeartBeatFailure(resp.getAborted(), resp.getNosuch());
- }
- } catch (TException e) {
- throw new StreamingException("Failure to heartbeat on ids (" + first + "src/gen/thrift"
- + last + ") on end point : " + endPt );
- }
- }
-
- @Override
- public boolean isClosed() {
- return isClosed;
- }
- /**
- * Close the TransactionBatch. This will abort any still open txns in this batch.
- * @throws StreamingIOFailure I/O failure when closing transaction batch
- */
- @Override
- public void close() throws StreamingException, ImpersonationFailed, InterruptedException {
- if(isClosed) {
- return;
- }
- isClosed = true;
- abortImpl(true);//abort proactively so that we don't wait for timeout
- closeImpl();//perhaps we should add a version of RecordWriter.closeBatch(boolean abort) which
- //will call RecordUpdater.close(boolean abort)
- }
- private void closeImpl() throws StreamingException, InterruptedException{
- state = TxnState.INACTIVE;
- if(ugi == null) {
- recordWriter.closeBatch();
- return;
- }
- try {
- ugi.doAs (
- new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws StreamingException {
- recordWriter.closeBatch();
- return null;
- }
- }
- );
- try {
- FileSystem.closeAllForUGI(ugi);
- } catch (IOException exception) {
- LOG.error("Could not clean up file-system handles for UGI: " + ugi, exception);
- }
- } catch (IOException e) {
- throw new ImpersonationFailed("Failed closing Txn Batch as user '" + username +
- "' on endPoint :" + endPt, e);
- }
- }
-
- private static LockRequest createLockRequest(final HiveEndPoint hiveEndPoint,
- String partNameForLock, String user, long txnId, String agentInfo) {
- LockRequestBuilder rqstBuilder = agentInfo == null ?
- new LockRequestBuilder() : new LockRequestBuilder(agentInfo);
- rqstBuilder.setUser(user);
- rqstBuilder.setTransactionId(txnId);
-
- LockComponentBuilder lockCompBuilder = new LockComponentBuilder()
- .setDbName(hiveEndPoint.database)
- .setTableName(hiveEndPoint.table)
- .setShared()
- .setOperationType(DataOperationType.INSERT);
- if (partNameForLock!=null && !partNameForLock.isEmpty() ) {
- lockCompBuilder.setPartitionName(partNameForLock);
- }
- rqstBuilder.addLockComponent(lockCompBuilder.build());
-
- return rqstBuilder.build();
- }
- } // class TransactionBatchImpl
-
- static HiveConf createHiveConf(Class<?> clazz, String metaStoreUri) {
- HiveConf conf = new HiveConf(clazz);
- if (metaStoreUri!= null) {
- setHiveConf(conf, HiveConf.ConfVars.METASTOREURIS, metaStoreUri);
- }
- HiveEndPoint.overrideConfSettings(conf);
- return conf;
- }
-
- private static void overrideConfSettings(HiveConf conf) {
- setHiveConf(conf, HiveConf.ConfVars.HIVE_TXN_MANAGER,
- "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
- setHiveConf(conf, HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
- setHiveConf(conf, HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI, true);
- // Avoids creating Tez Client sessions internally as it takes much longer currently
- setHiveConf(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "mr");
- }
-
- private static void setHiveConf(HiveConf conf, HiveConf.ConfVars var, String value) {
- if( LOG.isDebugEnabled() ) {
- LOG.debug("Overriding HiveConf setting : " + var + " = " + value);
- }
- conf.setVar(var, value);
- }
-
- private static void setHiveConf(HiveConf conf, HiveConf.ConfVars var, boolean value) {
- if( LOG.isDebugEnabled() ) {
- LOG.debug("Overriding HiveConf setting : " + var + " = " + value);
- }
- conf.setBoolVar(var, value);
- }
-
-} // class HiveEndPoint
http://git-wip-us.apache.org/repos/asf/hive/blob/bf8d305a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
new file mode 100644
index 0000000..205ed6c
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
@@ -0,0 +1,1039 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.LockComponentBuilder;
+import org.apache.hadoop.hive.metastore.LockRequestBuilder;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.DataOperationType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+import org.apache.hadoop.hive.ql.lockmgr.LockException;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Streaming connection implementation for hive. To create a streaming connection, use the builder API
+ * to create record writer first followed by the connection itself. Once connection is created, clients can
+ * begin a transaction, keep writing using the connection, commit the transaction and close connection when done.
+ * To bind to the correct metastore, HiveConf object has to be created from hive-site.xml or HIVE_CONF_DIR.
+ * If hive conf is manually created, metastore uri has to be set correctly. If hive conf object is not specified,
+ * "thrift://localhost:9083" will be used as default.
+ * <br/><br/>
+ * NOTE: The streaming connection APIs and record writer APIs are not thread-safe. Streaming connection creation,
+ * begin/commit/abort transactions, write and close has to be called in the same thread. If close() or
+ * abortTransaction() has to be triggered from a separate thread it has to be co-ordinated via external variables or
+ * synchronization mechanism
+ * <br/><br/>
+ * Example usage:
+ * <pre>{@code
+ * // create delimited record writer whose schema exactly matches table schema
+ * StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
+ * .withFieldDelimiter(',')
+ * .build();
+ *
+ * // create and open streaming connection (default.src table has to exist already)
+ * StreamingConnection connection = HiveStreamingConnection.newBuilder()
+ * .withDatabase("default")
+ * .withTable("src")
+ * .withAgentInfo("nifi-agent")
+ * .withRecordWriter(writer)
+ * .withHiveConf(hiveConf)
+ * .connect();
+ *
+ * // begin a transaction, write records and commit 1st transaction
+ * connection.beginTransaction();
+ * connection.write("key1,val1".getBytes());
+ * connection.write("key2,val2".getBytes());
+ * connection.commitTransaction();
+ *
+ * // begin another transaction, write more records and commit 2nd transaction
+ * connection.beginTransaction();
+ * connection.write("key3,val3".getBytes());
+ * connection.write("key4,val4".getBytes());
+ * connection.commitTransaction();
+ *
+ * // close the streaming connection
+ * connection.close();
+ * }
+ * </pre>
+ */
+public class HiveStreamingConnection implements StreamingConnection {
+ private static final Logger LOG = LoggerFactory.getLogger(HiveStreamingConnection.class.getName());
+
+ private static final String DEFAULT_METASTORE_URI = "thrift://localhost:9083";
+ private static final int DEFAULT_TRANSACTION_BATCH_SIZE = 1;
+ private static final int DEFAULT_HEARTBEAT_INTERVAL = 60 * 1000;
+ private static final boolean DEFAULT_STREAMING_OPTIMIZATIONS_ENABLED = true;
+
+ public enum TxnState {
+ INACTIVE("I"), OPEN("O"), COMMITTED("C"), ABORTED("A");
+
+ private final String code;
+
+ TxnState(String code) {
+ this.code = code;
+ }
+
+ public String toString() {
+ return code;
+ }
+ }
+
+ // fields populated from builder
+ private String database;
+ private String table;
+ private List<String> staticPartitionValues;
+ private String agentInfo;
+ private int transactionBatchSize;
+ private RecordWriter recordWriter;
+ private TransactionBatch currentTransactionBatch;
+ private HiveConf conf;
+ private boolean streamingOptimizations;
+ private AtomicBoolean isConnectionClosed = new AtomicBoolean(false);
+
+ // internal fields
+ private boolean isPartitionedTable;
+ private IMetaStoreClient msClient;
+ private IMetaStoreClient heartbeatMSClient;
+ private final String username;
+ private final boolean secureMode;
+ private Table tableObject = null;
+ private String metastoreUri;
+
+ private HiveStreamingConnection(Builder builder) throws StreamingException {
+ this.database = builder.database.toLowerCase();
+ this.table = builder.table.toLowerCase();
+ this.staticPartitionValues = builder.staticPartitionValues;
+ this.conf = builder.hiveConf;
+ this.agentInfo = builder.agentInfo;
+ this.streamingOptimizations = builder.streamingOptimizations;
+ UserGroupInformation loggedInUser = null;
+ try {
+ loggedInUser = UserGroupInformation.getLoginUser();
+ } catch (IOException e) {
+ LOG.warn("Unable to get logged in user via UGI. err: {}", e.getMessage());
+ }
+ if (loggedInUser == null) {
+ this.username = System.getProperty("user.name");
+ this.secureMode = false;
+ } else {
+ this.username = loggedInUser.getShortUserName();
+ this.secureMode = loggedInUser.hasKerberosCredentials();
+ }
+ this.transactionBatchSize = builder.transactionBatchSize;
+ this.recordWriter = builder.recordWriter;
+ if (agentInfo == null) {
+ try {
+ agentInfo = username + ":" + InetAddress.getLocalHost().getHostName() + ":" + Thread.currentThread().getName();
+ } catch (UnknownHostException e) {
+ // ignore and use UUID instead
+ this.agentInfo = UUID.randomUUID().toString();
+ }
+ }
+ if (conf == null) {
+ conf = createHiveConf(this.getClass(), DEFAULT_METASTORE_URI);
+ }
+ overrideConfSettings(conf);
+ this.metastoreUri = conf.get(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName());
+ this.msClient = getMetaStoreClient(conf, metastoreUri, secureMode);
+ // We use a separate metastore client for heartbeat calls to ensure heartbeat RPC calls are
+ // isolated from the other transaction related RPC calls.
+ this.heartbeatMSClient = getMetaStoreClient(conf, metastoreUri, secureMode);
+ validateTable();
+ LOG.info("STREAMING CONNECTION INFO: {}", toConnectionInfoString());
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private String database;
+ private String table;
+ private List<String> staticPartitionValues;
+ private String agentInfo;
+ private HiveConf hiveConf;
+ private int transactionBatchSize = DEFAULT_TRANSACTION_BATCH_SIZE;
+ private boolean streamingOptimizations = DEFAULT_STREAMING_OPTIMIZATIONS_ENABLED;
+ private RecordWriter recordWriter;
+
+ /**
+ * Specify database to use for streaming connection.
+ *
+ * @param database - db name
+ * @return - builder
+ */
+ public Builder withDatabase(final String database) {
+ this.database = database;
+ return this;
+ }
+
+ /**
+ * Specify table to use for streaming connection.
+ *
+ * @param table - table name
+ * @return - builder
+ */
+ public Builder withTable(final String table) {
+ this.table = table;
+ return this;
+ }
+
+ /**
+ * Specify the name of partition to use for streaming connection.
+ *
+ * @param staticPartitionValues - static partition values
+ * @return - builder
+ */
+ public Builder withStaticPartitionValues(final List<String> staticPartitionValues) {
+ this.staticPartitionValues = staticPartitionValues == null ? null : new ArrayList<>(staticPartitionValues);
+ return this;
+ }
+
+ /**
+ * Specify agent info to use for streaming connection.
+ *
+ * @param agentInfo - agent info
+ * @return - builder
+ */
+ public Builder withAgentInfo(final String agentInfo) {
+ this.agentInfo = agentInfo;
+ return this;
+ }
+
+ /**
+ * Specify hive configuration object to use for streaming connection.
+ * Generate this object by point to already existing hive-site.xml or HIVE_CONF_DIR.
+ * Make sure if metastore URI has been set correctly else thrift://localhost:9083 will be
+ * used as default.
+ *
+ * @param hiveConf - hive conf object
+ * @return - builder
+ */
+ public Builder withHiveConf(final HiveConf hiveConf) {
+ this.hiveConf = hiveConf;
+ return this;
+ }
+
+ /**
+ * Transaction batch size to use (default value is 10). This is expert level configuration.
+ * For every transaction batch a delta directory will be created which will impact
+ * when compaction will trigger.
+ * NOTE: This is evolving API and is subject to change/might not be honored in future releases.
+ *
+ * @param transactionBatchSize - transaction batch size
+ * @return - builder
+ */
+ @InterfaceStability.Evolving
+ public Builder withTransactionBatchSize(final int transactionBatchSize) {
+ this.transactionBatchSize = transactionBatchSize;
+ return this;
+ }
+
+ /**
+ * Whether to enable streaming optimizations. This is expert level configurations.
+ * Disabling streaming optimizations will have significant impact to performance and memory consumption.
+ *
+ * @param enable - flag to enable or not
+ * @return - builder
+ */
+ public Builder withStreamingOptimizations(final boolean enable) {
+ this.streamingOptimizations = enable;
+ return this;
+ }
+
+ /**
+ * Record writer to use for writing records to destination table.
+ *
+ * @param recordWriter - record writer
+ * @return - builder
+ */
+ public Builder withRecordWriter(final RecordWriter recordWriter) {
+ this.recordWriter = recordWriter;
+ return this;
+ }
+
+ /**
+ * Returning a streaming connection to hive.
+ *
+ * @return - hive streaming connection
+ */
+ public HiveStreamingConnection connect() throws StreamingException {
+ if (database == null) {
+ throw new StreamingException("Database cannot be null for streaming connection");
+ }
+ if (table == null) {
+ throw new StreamingException("Table cannot be null for streaming connection");
+ }
+ if (recordWriter == null) {
+ throw new StreamingException("Record writer cannot be null for streaming connection");
+ }
+ return new HiveStreamingConnection(this);
+ }
+ }
+
+ private void setPartitionedTable(boolean isPartitionedTable) {
+ this.isPartitionedTable = isPartitionedTable;
+ }
+
+ @Override
+ public String toString() {
+ return "{ metaStoreUri: " + metastoreUri + ", database: " + database + ", table: " + table + " }";
+ }
+
+ private String toConnectionInfoString() {
+ return "{ metastore-uri: " + metastoreUri + ", " +
+ "database: " + database + ", " +
+ "table: " + table + ", " +
+ "partitioned-table: " + isPartitionedTable() + ", " +
+ "dynamic-partitioning: " + isDynamicPartitioning() + ", " +
+ "username: " + username + ", " +
+ "secure-mode: " + secureMode + ", " +
+ "record-writer: " + recordWriter.getClass().getSimpleName() + ", " +
+ "agent-info: " + agentInfo + " }";
+ }
+
+ @VisibleForTesting
+ String toTransactionString() {
+ return currentTransactionBatch == null ? "" : currentTransactionBatch.toString();
+ }
+
+ @Override
+ public PartitionInfo createPartitionIfNotExists(final List<String> partitionValues) throws StreamingException {
+ String partLocation = null;
+ String partName = null;
+ boolean exists = false;
+ try {
+ Map<String, String> partSpec = Warehouse.makeSpecFromValues(tableObject.getPartitionKeys(), partitionValues);
+ AddPartitionDesc addPartitionDesc = new AddPartitionDesc(database, table, true);
+ partName = Warehouse.makePartName(tableObject.getPartitionKeys(), partitionValues);
+ partLocation = new Path(tableObject.getDataLocation(), Warehouse.makePartPath(partSpec)).toString();
+ addPartitionDesc.addPartition(partSpec, partLocation);
+ Partition partition = Hive.convertAddSpecToMetaPartition(tableObject, addPartitionDesc.getPartition(0), conf);
+ msClient.add_partition(partition);
+ } catch (AlreadyExistsException e) {
+ exists = true;
+ } catch (HiveException | TException e) {
+ throw new StreamingException("Unable to creation partition for values: " + partitionValues + " connection: " +
+ toConnectionInfoString());
+ }
+ return new PartitionInfo(partName, partLocation, exists);
+ }
+
+ private void validateTable() throws InvalidTable, ConnectionError {
+ try {
+ tableObject = new Table(msClient.getTable(database, table));
+ } catch (Exception e) {
+ LOG.warn("Unable to validate the table for connection: " + toConnectionInfoString(), e);
+ throw new InvalidTable(database, table, e);
+ }
+ // 1 - check that the table is Acid
+ if (!AcidUtils.isFullAcidTable(tableObject)) {
+ LOG.error("HiveEndPoint " + this + " must use an acid table");
+ throw new InvalidTable(database, table, "is not an Acid table");
+ }
+
+ if (tableObject.getPartitionKeys() != null && !tableObject.getPartitionKeys().isEmpty()) {
+ setPartitionedTable(true);
+ } else {
+ setPartitionedTable(false);
+ }
+
+ // partition values are specified on non-partitioned table
+ if (!isPartitionedTable() && (staticPartitionValues != null && !staticPartitionValues.isEmpty())) {
+ // Invalid if table is not partitioned, but endPoint's partitionVals is not empty
+ String errMsg = this.toString() + " specifies partitions for un-partitioned table";
+ LOG.error(errMsg);
+ throw new ConnectionError(errMsg);
+ }
+ }
+
+ private static class HeartbeatRunnable implements Runnable {
+ private final IMetaStoreClient heartbeatMSClient;
+ private final AtomicLong minTxnId;
+ private final long maxTxnId;
+ private final ReentrantLock transactionLock;
+ private final AtomicBoolean isTxnClosed;
+
+ HeartbeatRunnable(final IMetaStoreClient heartbeatMSClient, final AtomicLong minTxnId, final long maxTxnId,
+ final ReentrantLock transactionLock, final AtomicBoolean isTxnClosed) {
+ this.heartbeatMSClient = heartbeatMSClient;
+ this.minTxnId = minTxnId;
+ this.maxTxnId = maxTxnId;
+ this.transactionLock = transactionLock;
+ this.isTxnClosed = isTxnClosed;
+ }
+
+ @Override
+ public void run() {
+ transactionLock.lock();
+ try {
+ if (minTxnId.get() > 0) {
+ HeartbeatTxnRangeResponse resp = heartbeatMSClient.heartbeatTxnRange(minTxnId.get(), maxTxnId);
+ if (!resp.getAborted().isEmpty() || !resp.getNosuch().isEmpty()) {
+ LOG.error("Heartbeat failure: {}", resp.toString());
+ isTxnClosed.set(true);
+ } else {
+ LOG.info("Heartbeat sent for range: [{}-{}]", minTxnId.get(), maxTxnId);
+ }
+ }
+ } catch (TException e) {
+ LOG.warn("Failure to heartbeat for transaction range: [" + minTxnId.get() + "-" + maxTxnId + "]", e);
+ } finally {
+ transactionLock.unlock();
+ }
+ }
+ }
+
+ private void beginNextTransaction() throws StreamingException {
+ if (currentTransactionBatch == null) {
+ currentTransactionBatch = createNewTransactionBatch();
+ LOG.info("Opened new transaction batch {}", currentTransactionBatch);
+ }
+
+ if (currentTransactionBatch.isClosed()) {
+ throw new IllegalStateException("Cannot begin next transaction on a closed streaming connection");
+ }
+
+ if (currentTransactionBatch.remainingTransactions() == 0) {
+ LOG.info("Transaction batch {} is done. Rolling over to next transaction batch.",
+ currentTransactionBatch);
+ currentTransactionBatch.close();
+ currentTransactionBatch = createNewTransactionBatch();
+ LOG.info("Rolled over to new transaction batch {}", currentTransactionBatch);
+ }
+ currentTransactionBatch.beginNextTransaction();
+ }
+
+ private TransactionBatch createNewTransactionBatch() throws StreamingException {
+ return new TransactionBatch(this);
+ }
+
+ private void checkClosedState() throws StreamingException {
+ if (isConnectionClosed.get()) {
+ throw new StreamingException("Streaming connection is closed already.");
+ }
+ }
+
+ private void checkState() throws StreamingException {
+ checkClosedState();
+ if (currentTransactionBatch == null) {
+ throw new StreamingException("Transaction batch is null. Missing beginTransaction?");
+ }
+ if (currentTransactionBatch.state != TxnState.OPEN) {
+ throw new StreamingException("Transaction state is not OPEN. Missing beginTransaction?");
+ }
+ }
+
+ @Override
+ public void write(final byte[] record) throws StreamingException {
+ checkState();
+ currentTransactionBatch.write(record);
+ }
+
+ @Override
+ public void beginTransaction() throws StreamingException {
+ checkClosedState();
+ beginNextTransaction();
+ }
+
+ @Override
+ public void commitTransaction() throws StreamingException {
+ checkState();
+ currentTransactionBatch.commit();
+ }
+
+ @Override
+ public void abortTransaction() throws StreamingException {
+ checkState();
+ currentTransactionBatch.abort();
+ }
+
+ @Override
+ public void close() {
+ if (isConnectionClosed.get()) {
+ return;
+ }
+ isConnectionClosed.set(true);
+ try {
+ if (currentTransactionBatch != null) {
+ currentTransactionBatch.close();
+ }
+ } catch (StreamingException e) {
+ LOG.error("Unable to close current transaction batch: " + currentTransactionBatch, e);
+ } finally {
+ msClient.close();
+ heartbeatMSClient.close();
+ }
+ }
+
+ private static IMetaStoreClient getMetaStoreClient(HiveConf conf, String metastoreUri, boolean secureMode)
+ throws ConnectionError {
+ if (metastoreUri != null) {
+ conf.set(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName(), metastoreUri);
+ }
+ if (secureMode) {
+ conf.setBoolean(MetastoreConf.ConfVars.USE_THRIFT_SASL.getHiveName(), true);
+ }
+ try {
+ return HiveMetaStoreUtils.getHiveMetastoreClient(conf);
+ } catch (MetaException | IOException e) {
+ throw new ConnectionError("Error connecting to Hive Metastore URI: "
+ + metastoreUri + ". " + e.getMessage(), e);
+ }
+ }
+
+ @VisibleForTesting
+ TxnState getCurrentTransactionState() {
+ return currentTransactionBatch.getCurrentTransactionState();
+ }
+
+ @VisibleForTesting
+ int remainingTransactions() {
+ return currentTransactionBatch.remainingTransactions();
+ }
+
+ @VisibleForTesting
+ Long getCurrentTxnId() {
+ return currentTransactionBatch.getCurrentTxnId();
+ }
+
+ private static class TransactionBatch {
+ private String username;
+ private HiveStreamingConnection conn;
+ private IMetaStoreClient msClient;
+ private IMetaStoreClient heartbeatMSClient;
+ private ScheduledExecutorService scheduledExecutorService;
+ private RecordWriter recordWriter;
+ private String partNameForLock = null;
+ private List<TxnToWriteId> txnToWriteIds;
+ private int currentTxnIndex = -1;
+ private TxnState state;
+ private LockRequest lockRequest = null;
+ // heartbeats can only be sent for open transactions.
+ // there is a race between committing/aborting a transaction and heartbeat.
+ // Example: If a heartbeat is sent for committed txn, exception will be thrown.
+ // Similarly if we don't send a heartbeat, metastore server might abort a txn
+ // for missed heartbeat right before commit txn call.
+ // This lock is used to mutex commit/abort and heartbeat calls
+ private final ReentrantLock transactionLock = new ReentrantLock();
+ // min txn id is incremented linearly within a transaction batch.
+ // keeping minTxnId atomic as it is shared with heartbeat thread
+ private final AtomicLong minTxnId;
+ // max txn id does not change for a transaction batch
+ private final long maxTxnId;
+
+ /**
+ * once any operation on this batch encounters a system exception
+ * (e.g. IOException on write) it's safest to assume that we can't write to the
+ * file backing this batch any more. This guards important public methods
+ */
+ private final AtomicBoolean isTxnClosed = new AtomicBoolean(false);
+ private String agentInfo;
+ private int numTxns;
+ /**
+ * Tracks the state of each transaction
+ */
+ private TxnState[] txnStatus;
+ /**
+ * ID of the last txn used by {@link #beginNextTransactionImpl()}
+ */
+ private long lastTxnUsed;
+
+ /**
+ * Represents a batch of transactions acquired from MetaStore
+ *
+ * @param conn - hive streaming connection
+ * @throws StreamingException if failed to create new RecordUpdater for batch
+ */
+ private TransactionBatch(HiveStreamingConnection conn) throws StreamingException {
+ boolean success = false;
+ try {
+ if (conn.isPartitionedTable() && !conn.isDynamicPartitioning()) {
+ List<FieldSchema> partKeys = conn.tableObject.getPartitionKeys();
+ partNameForLock = Warehouse.makePartName(partKeys, conn.staticPartitionValues);
+ }
+ this.conn = conn;
+ this.username = conn.username;
+ this.msClient = conn.msClient;
+ this.heartbeatMSClient = conn.heartbeatMSClient;
+ this.recordWriter = conn.recordWriter;
+ this.agentInfo = conn.agentInfo;
+ this.numTxns = conn.transactionBatchSize;
+
+ setupHeartBeatThread();
+
+ List<Long> txnIds = openTxnImpl(msClient, username, numTxns);
+ txnToWriteIds = allocateWriteIdsImpl(msClient, txnIds);
+ assert (txnToWriteIds.size() == numTxns);
+
+ txnStatus = new TxnState[numTxns];
+ for (int i = 0; i < txnStatus.length; i++) {
+ assert (txnToWriteIds.get(i).getTxnId() == txnIds.get(i));
+ txnStatus[i] = TxnState.OPEN; //Open matches Metastore state
+ }
+ this.state = TxnState.INACTIVE;
+
+ // initialize record writer with connection and write id info
+ recordWriter.init(conn, txnToWriteIds.get(0).getWriteId(), txnToWriteIds.get(numTxns - 1).getWriteId());
+ this.minTxnId = new AtomicLong(txnIds.get(0));
+ this.maxTxnId = txnIds.get(txnIds.size() - 1);
+ success = true;
+ } catch (TException e) {
+ throw new StreamingException(conn.toString(), e);
+ } finally {
+ //clean up if above throws
+ markDead(success);
+ }
+ }
+
+ private void setupHeartBeatThread() {
+ // start heartbeat thread
+ ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("HiveStreamingConnection-Heartbeat-Thread")
+ .build();
+ this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
+ long heartBeatInterval;
+ long initialDelay;
+ try {
+ // if HIVE_TXN_TIMEOUT is defined, heartbeat interval will be HIVE_TXN_TIMEOUT/2
+ heartBeatInterval = DbTxnManager.getHeartbeatInterval(conn.conf);
+ } catch (LockException e) {
+ heartBeatInterval = DEFAULT_HEARTBEAT_INTERVAL;
+ }
+ // to introduce some randomness and to avoid hammering the metastore at the same time (same logic as DbTxnManager)
+ initialDelay = (long) (heartBeatInterval * 0.75 * Math.random());
+ LOG.info("Starting heartbeat thread with interval: {} ms initialDelay: {} ms for agentInfo: {}",
+ heartBeatInterval, initialDelay, conn.agentInfo);
+ Runnable runnable = new HeartbeatRunnable(heartbeatMSClient, minTxnId, maxTxnId, transactionLock, isTxnClosed);
+ this.scheduledExecutorService.scheduleWithFixedDelay(runnable, initialDelay, heartBeatInterval, TimeUnit
+ .MILLISECONDS);
+ }
+
+ private List<Long> openTxnImpl(final IMetaStoreClient msClient, final String user, final int numTxns)
+ throws TException {
+ return msClient.openTxns(user, numTxns).getTxn_ids();
+ }
+
+ private List<TxnToWriteId> allocateWriteIdsImpl(final IMetaStoreClient msClient,
+ final List<Long> txnIds) throws TException {
+ return msClient.allocateTableWriteIdsBatch(txnIds, conn.database, conn.table);
+ }
+
+ @Override
+ public String toString() {
+ if (txnToWriteIds == null || txnToWriteIds.isEmpty()) {
+ return "{}";
+ }
+ StringBuilder sb = new StringBuilder(" TxnStatus[");
+ for (TxnState state : txnStatus) {
+ //'state' should not be null - future proofing
+ sb.append(state == null ? "N" : state);
+ }
+ sb.append("] LastUsed ").append(JavaUtils.txnIdToString(lastTxnUsed));
+ return "TxnId/WriteIds=[" + txnToWriteIds.get(0).getTxnId()
+ + "/" + txnToWriteIds.get(0).getWriteId()
+ + "..."
+ + txnToWriteIds.get(txnToWriteIds.size() - 1).getTxnId()
+ + "/" + txnToWriteIds.get(txnToWriteIds.size() - 1).getWriteId()
+ + "] on connection = " + conn + "; " + sb;
+ }
+
+ private void beginNextTransaction() throws StreamingException {
+ checkIsClosed();
+ beginNextTransactionImpl();
+ }
+
+ private void beginNextTransactionImpl() throws TransactionError {
+ state = TxnState.INACTIVE;//clear state from previous txn
+ if ((currentTxnIndex + 1) >= txnToWriteIds.size()) {
+ throw new InvalidTransactionState("No more transactions available in" +
+ " next batch for connection: " + conn + " user: " + username);
+ }
+ currentTxnIndex++;
+ state = TxnState.OPEN;
+ lastTxnUsed = getCurrentTxnId();
+ lockRequest = createLockRequest(conn, partNameForLock, username, getCurrentTxnId(), agentInfo);
+ try {
+ LockResponse res = msClient.lock(lockRequest);
+ if (res.getState() != LockState.ACQUIRED) {
+ throw new TransactionError("Unable to acquire lock on " + conn);
+ }
+ } catch (TException e) {
+ throw new TransactionError("Unable to acquire lock on " + conn, e);
+ }
+ }
+
+ long getCurrentTxnId() {
+ if (currentTxnIndex >= 0) {
+ return txnToWriteIds.get(currentTxnIndex).getTxnId();
+ }
+ return -1L;
+ }
+
+ long getCurrentWriteId() {
+ if (currentTxnIndex >= 0) {
+ return txnToWriteIds.get(currentTxnIndex).getWriteId();
+ }
+ return -1L;
+ }
+
+ TxnState getCurrentTransactionState() {
+ return state;
+ }
+
+ int remainingTransactions() {
+ if (currentTxnIndex >= 0) {
+ return txnToWriteIds.size() - currentTxnIndex - 1;
+ }
+ return txnToWriteIds.size();
+ }
+
+
+ public void write(final byte[] record) throws StreamingException {
+ checkIsClosed();
+ boolean success = false;
+ try {
+ recordWriter.write(getCurrentWriteId(), record);
+ success = true;
+ } catch (SerializationError ex) {
+ //this exception indicates that a {@code record} could not be parsed and the
+ //caller can decide whether to drop it or send it to dead letter queue.
+ //rolling back the txn and retrying won't help since the tuple will be exactly the same
+ //when it's replayed.
+ success = true;
+ throw ex;
+ } finally {
+ markDead(success);
+ }
+ }
+
+ private void checkIsClosed() throws StreamingException {
+ if (isTxnClosed.get()) {
+ throw new StreamingException("Transaction" + toString() + " is closed()");
+ }
+ }
+
+ /**
+ * A transaction batch opens a single HDFS file and writes multiple transaction to it. If there is any issue
+ * with the write, we can't continue to write to the same file any as it may be corrupted now (at the tail).
+ * This ensures that a client can't ignore these failures and continue to write.
+ */
+ private void markDead(boolean success) {
+ if (success) {
+ return;
+ }
+ isTxnClosed.set(true); //also ensures that heartbeat() is no-op since client is likely doing it async
+ try {
+ abort(true);//abort all remaining txns
+ } catch (Exception ex) {
+ LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex);
+ }
+ try {
+ closeImpl();
+ } catch (Exception ex) {
+ LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex);
+ }
+ }
+
+
+ void commit() throws StreamingException {
+ checkIsClosed();
+ boolean success = false;
+ try {
+ commitImpl();
+ success = true;
+ } finally {
+ markDead(success);
+ }
+ }
+
+ private void commitImpl() throws StreamingException {
+ try {
+ recordWriter.flush();
+ TxnToWriteId txnToWriteId = txnToWriteIds.get(currentTxnIndex);
+ if (conn.isDynamicPartitioning()) {
+ List<String> partNames = new ArrayList<>(recordWriter.getPartitions());
+ msClient.addDynamicPartitions(txnToWriteId.getTxnId(), txnToWriteId.getWriteId(), conn.database, conn.table,
+ partNames, DataOperationType.INSERT);
+ }
+ transactionLock.lock();
+ try {
+ msClient.commitTxn(txnToWriteId.getTxnId());
+ // increment the min txn id so that heartbeat thread will heartbeat only from the next open transaction.
+ // the current transaction is going to committed or fail, so don't need heartbeat for current transaction.
+ if (currentTxnIndex + 1 < txnToWriteIds.size()) {
+ minTxnId.set(txnToWriteIds.get(currentTxnIndex + 1).getTxnId());
+ } else {
+ // exhausted the batch, no longer have to heartbeat for current txn batch
+ minTxnId.set(-1);
+ }
+ } finally {
+ transactionLock.unlock();
+ }
+ state = TxnState.COMMITTED;
+ txnStatus[currentTxnIndex] = TxnState.COMMITTED;
+ } catch (NoSuchTxnException e) {
+ throw new TransactionError("Invalid transaction id : "
+ + getCurrentTxnId(), e);
+ } catch (TxnAbortedException e) {
+ throw new TransactionError("Aborted transaction cannot be committed"
+ , e);
+ } catch (TException e) {
+ throw new TransactionError("Unable to commitTransaction transaction"
+ + getCurrentTxnId(), e);
+ }
+ }
+
+ void abort() throws StreamingException {
+ if (isTxnClosed.get()) {
+ /*
+ * isDead is only set internally by this class. {@link #markDead(boolean)} will abort all
+ * remaining txns, so make this no-op to make sure that a well-behaved client that calls abortTransaction()
+ * error doesn't get misleading errors
+ */
+ return;
+ }
+ abort(false);
+ }
+
+ private void abort(final boolean abortAllRemaining) throws StreamingException {
+ abortImpl(abortAllRemaining);
+ }
+
+ private void abortImpl(boolean abortAllRemaining) throws StreamingException {
+ transactionLock.lock();
+ try {
+ if (abortAllRemaining) {
+ // we are aborting all txns in the current batch, so no need to heartbeat
+ minTxnId.set(-1);
+ //when last txn finished (abortTransaction/commitTransaction) the currentTxnIndex is pointing at that txn
+ //so we need to start from next one, if any. Also if batch was created but
+ //fetchTransactionBatch() was never called, we want to start with first txn
+ int minOpenTxnIndex = Math.max(currentTxnIndex +
+ (state == TxnState.ABORTED || state == TxnState.COMMITTED ? 1 : 0), 0);
+ for (currentTxnIndex = minOpenTxnIndex;
+ currentTxnIndex < txnToWriteIds.size(); currentTxnIndex++) {
+ msClient.rollbackTxn(txnToWriteIds.get(currentTxnIndex).getTxnId());
+ txnStatus[currentTxnIndex] = TxnState.ABORTED;
+ }
+ currentTxnIndex--;//since the loop left it == txnToWriteIds.size()
+ } else {
+ // we are aborting only the current transaction, so move the min range for heartbeat or disable heartbeat
+ // if the current txn is last in the batch.
+ if (currentTxnIndex + 1 < txnToWriteIds.size()) {
+ minTxnId.set(txnToWriteIds.get(currentTxnIndex + 1).getTxnId());
+ } else {
+ // exhausted the batch, no longer have to heartbeat
+ minTxnId.set(-1);
+ }
+ long currTxnId = getCurrentTxnId();
+ if (currTxnId > 0) {
+ msClient.rollbackTxn(currTxnId);
+ txnStatus[currentTxnIndex] = TxnState.ABORTED;
+ }
+ }
+ state = TxnState.ABORTED;
+ } catch (NoSuchTxnException e) {
+ throw new TransactionError("Unable to abort invalid transaction id : "
+ + getCurrentTxnId(), e);
+ } catch (TException e) {
+ throw new TransactionError("Unable to abort transaction id : "
+ + getCurrentTxnId(), e);
+ } finally {
+ transactionLock.unlock();
+ }
+ }
+
+ public boolean isClosed() {
+ return isTxnClosed.get();
+ }
+
+ /**
+ * Close the TransactionBatch. This will abort any still open txns in this batch.
+ *
+ * @throws StreamingException - failure when closing transaction batch
+ */
+ public void close() throws StreamingException {
+ if (isTxnClosed.get()) {
+ return;
+ }
+ isTxnClosed.set(true);
+ abortImpl(true);
+ closeImpl();
+ }
+
+ private void closeImpl() throws StreamingException {
+ state = TxnState.INACTIVE;
+ recordWriter.close();
+ if (scheduledExecutorService != null) {
+ scheduledExecutorService.shutdownNow();
+ }
+ }
+
+ static LockRequest createLockRequest(final HiveStreamingConnection connection,
+ String partNameForLock, String user, long txnId, String agentInfo) {
+ LockRequestBuilder requestBuilder = new LockRequestBuilder(agentInfo);
+ requestBuilder.setUser(user);
+ requestBuilder.setTransactionId(txnId);
+
+ LockComponentBuilder lockCompBuilder = new LockComponentBuilder()
+ .setDbName(connection.database)
+ .setTableName(connection.table)
+ .setShared()
+ .setOperationType(DataOperationType.INSERT);
+ if (connection.isDynamicPartitioning()) {
+ lockCompBuilder.setIsDynamicPartitionWrite(true);
+ }
+ if (partNameForLock != null && !partNameForLock.isEmpty()) {
+ lockCompBuilder.setPartitionName(partNameForLock);
+ }
+ requestBuilder.addLockComponent(lockCompBuilder.build());
+
+ return requestBuilder.build();
+ }
+ }
+
+ private HiveConf createHiveConf(Class<?> clazz, String metaStoreUri) {
+ HiveConf conf = new HiveConf(clazz);
+ if (metaStoreUri != null) {
+ conf.set(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName(), metaStoreUri);
+ }
+ return conf;
+ }
+
+ private void overrideConfSettings(HiveConf conf) {
+ setHiveConf(conf, HiveConf.ConfVars.HIVE_TXN_MANAGER, DbTxnManager.class.getName());
+ setHiveConf(conf, HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
+ setHiveConf(conf, MetastoreConf.ConfVars.EXECUTE_SET_UGI.getHiveName());
+ // Avoids creating Tez Client sessions internally as it takes much longer currently
+ setHiveConf(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "mr");
+ setHiveConf(conf, HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
+ if (streamingOptimizations) {
+ setHiveConf(conf, HiveConf.ConfVars.HIVE_ORC_DELTA_STREAMING_OPTIMIZATIONS_ENABLED, true);
+ }
+ // since same thread creates metastore client for streaming connection thread and heartbeat thread we explicitly
+ // disable metastore client cache
+ setHiveConf(conf, HiveConf.ConfVars.METASTORE_CLIENT_CACHE_ENABLED, false);
+ }
+
+ private static void setHiveConf(HiveConf conf, HiveConf.ConfVars var, String value) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Overriding HiveConf setting : " + var + " = " + value);
+ }
+ conf.setVar(var, value);
+ }
+
+ private static void setHiveConf(HiveConf conf, HiveConf.ConfVars var, boolean value) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Overriding HiveConf setting : " + var + " = " + value);
+ }
+ conf.setBoolVar(var, true);
+ }
+
+ private static void setHiveConf(HiveConf conf, String var) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Overriding HiveConf setting : " + var + " = " + true);
+ }
+ conf.setBoolean(var, true);
+ }
+
+ @Override
+ public HiveConf getHiveConf() {
+ return conf;
+ }
+
+ @Override
+ public String getMetastoreUri() {
+ return metastoreUri;
+ }
+
+ @Override
+ public String getDatabase() {
+ return database;
+ }
+
+ @Override
+ public String getTable() {
+ return table;
+ }
+
+ @Override
+ public List<String> getStaticPartitionValues() {
+ return staticPartitionValues;
+ }
+
+ @Override
+ public String getAgentInfo() {
+ return agentInfo;
+ }
+
+ @Override
+ public boolean isPartitionedTable() {
+ return isPartitionedTable;
+ }
+
+ @Override
+ public boolean isDynamicPartitioning() {
+ return isPartitionedTable() && (staticPartitionValues == null || staticPartitionValues.isEmpty());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/bf8d305a/streaming/src/java/org/apache/hive/streaming/ImpersonationFailed.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/ImpersonationFailed.java b/streaming/src/java/org/apache/hive/streaming/ImpersonationFailed.java
deleted file mode 100644
index 23e17e7..0000000
--- a/streaming/src/java/org/apache/hive/streaming/ImpersonationFailed.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hive.streaming;
-
-public class ImpersonationFailed extends StreamingException {
- public ImpersonationFailed(String username, Exception e) {
- super("Failed to impersonate user " + username, e);
- }
-}