You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2018/05/02 16:45:00 UTC

[4/7] hive git commit: HIVE-19211: New streaming ingest API and support for dynamic partitioning (Prasanth Jayachandran reviewed by Eugene Koifman)

http://git-wip-us.apache.org/repos/asf/hive/blob/bf8d305a/streaming/src/java/org/apache/hive/streaming/HiveEndPoint.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/HiveEndPoint.java b/streaming/src/java/org/apache/hive/streaming/HiveEndPoint.java
deleted file mode 100644
index b04e137..0000000
--- a/streaming/src/java/org/apache/hive/streaming/HiveEndPoint.java
+++ /dev/null
@@ -1,1117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hive.streaming;
-
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hive.cli.CliSessionState;
-import org.apache.hadoop.hive.common.JavaUtils;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.LockComponentBuilder;
-import org.apache.hadoop.hive.metastore.LockRequestBuilder;
-import org.apache.hadoop.hive.metastore.Warehouse;
-import org.apache.hadoop.hive.metastore.api.DataOperationType;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse;
-import org.apache.hadoop.hive.metastore.api.LockRequest;
-import org.apache.hadoop.hive.metastore.api.LockResponse;
-import org.apache.hadoop.hive.metastore.api.LockState;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
-import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
-import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
-import org.apache.hadoop.hive.ql.DriverFactory;
-import org.apache.hadoop.hive.ql.IDriver;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hive.hcatalog.common.HCatUtil;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Information about the hive end point (i.e. table or partition) to write to.
- * A light weight object that does NOT internally hold on to resources such as
- * network connections. It can be stored in Hashed containers such as sets and hash tables.
- */
-public class HiveEndPoint {
-  public final String metaStoreUri;
-  public final String database;
-  public final String table;
-  public final ArrayList<String> partitionVals;
-
-
-  static final private Logger LOG = LoggerFactory.getLogger(HiveEndPoint.class.getName());
-
-  /**
-   *
-   * @param metaStoreUri   URI of the metastore to connect to eg: thrift://localhost:9083
-   * @param database       Name of the Hive database
-   * @param table          Name of table to stream to
-   * @param partitionVals  Indicates the specific partition to stream to. Can be null or empty List
-   *                       if streaming to a table without partitions. The order of values in this
-   *                       list must correspond exactly to the order of partition columns specified
-   *                       during the table creation. E.g. For a table partitioned by
-   *                       (continent string, country string), partitionVals could be the list
-   *                       ("Asia", "India").
-   */
-  public HiveEndPoint(String metaStoreUri
-          , String database, String table, List<String> partitionVals) {
-    this.metaStoreUri = metaStoreUri;
-    if (database==null) {
-      throw new IllegalArgumentException("Database cannot be null for HiveEndPoint");
-    }
-    this.database = database;
-    this.table = table;
-    if (table==null) {
-      throw new IllegalArgumentException("Table cannot be null for HiveEndPoint");
-    }
-    this.partitionVals = partitionVals==null ? new ArrayList<String>()
-                                             : new ArrayList<String>( partitionVals );
-  }
-
-
-  /**
-   * @deprecated As of release 1.3/2.1.  Replaced by {@link #newConnection(boolean, String)}
-   */
-  @Deprecated
-  public StreamingConnection newConnection(final boolean createPartIfNotExists)
-    throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed
-    , ImpersonationFailed , InterruptedException {
-    return newConnection(createPartIfNotExists, null, null, null);
-  }
-  /**
-   * @deprecated As of release 1.3/2.1.  Replaced by {@link #newConnection(boolean, HiveConf, String)}
-   */
-  @Deprecated
-  public StreamingConnection newConnection(final boolean createPartIfNotExists, HiveConf conf)
-    throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed
-    , ImpersonationFailed , InterruptedException {
-    return newConnection(createPartIfNotExists, conf, null, null);
-  }
-  /**
-   * @deprecated As of release 1.3/2.1.  Replaced by {@link #newConnection(boolean, HiveConf, UserGroupInformation, String)}
-   */
-  @Deprecated
-  public StreamingConnection newConnection(final boolean createPartIfNotExists, final HiveConf conf,
-                                           final UserGroupInformation authenticatedUser)
-    throws ConnectionError, InvalidPartition,
-    InvalidTable, PartitionCreationFailed, ImpersonationFailed , InterruptedException {
-    return newConnection(createPartIfNotExists, conf, authenticatedUser, null);
-  }
-  /**
-   * Acquire a new connection to MetaStore for streaming
-   * @param createPartIfNotExists If true, the partition specified in the endpoint
-   *                              will be auto created if it does not exist
-   * @param agentInfo should uniquely identify the process/entity that is using this batch.  This
-   *                  should be something that can be correlated with calling application log files
-   *                  and/or monitoring consoles.
-   * @return
-   * @throws ConnectionError if problem connecting
-   * @throws InvalidPartition  if specified partition is not valid (createPartIfNotExists = false)
-   * @throws ImpersonationFailed  if not able to impersonate 'proxyUser'
-   * @throws PartitionCreationFailed if failed to create partition
-   * @throws InterruptedException
-   */
-  public StreamingConnection newConnection(final boolean createPartIfNotExists, String agentInfo)
-    throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed
-    , ImpersonationFailed , InterruptedException {
-    return newConnection(createPartIfNotExists, null, null, agentInfo);
-  }
-
-  /**
-   * Acquire a new connection to MetaStore for streaming
-   * @param createPartIfNotExists If true, the partition specified in the endpoint
-   *                              will be auto created if it does not exist
-   * @param conf HiveConf object, set it to null if not using advanced hive settings.
-   * @param agentInfo should uniquely identify the process/entity that is using this batch.  This
-   *                  should be something that can be correlated with calling application log files
-   *                  and/or monitoring consoles.
-   * @return
-   * @throws ConnectionError if problem connecting
-   * @throws InvalidPartition  if specified partition is not valid (createPartIfNotExists = false)
-   * @throws ImpersonationFailed  if not able to impersonate 'proxyUser'
-   * @throws PartitionCreationFailed if failed to create partition
-   * @throws InterruptedException
-   */
-  public StreamingConnection newConnection(final boolean createPartIfNotExists, HiveConf conf, String agentInfo)
-          throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed
-          , ImpersonationFailed , InterruptedException {
-    return newConnection(createPartIfNotExists, conf, null, agentInfo);
-  }
-
-  /**
-   * Acquire a new connection to MetaStore for streaming. To connect using Kerberos,
-   *   'authenticatedUser' argument should have been used to do a kerberos login.  Additionally the
-   *   'hive.metastore.kerberos.principal' setting should be set correctly either in hive-site.xml or
-   *    in the 'conf' argument (if not null). If using hive-site.xml, it should be in classpath.
-   *
-   * @param createPartIfNotExists If true, the partition specified in the endpoint
-   *                              will be auto created if it does not exist
-   * @param conf               HiveConf object to be used for the connection. Can be null.
-   * @param authenticatedUser  UserGroupInformation object obtained from successful authentication.
-   *                           Uses non-secure mode if this argument is null.
-   * @param agentInfo should uniquely identify the process/entity that is using this batch.  This
-   *                  should be something that can be correlated with calling application log files
-   *                  and/or monitoring consoles.
-   * @return
-   * @throws ConnectionError if there is a connection problem
-   * @throws InvalidPartition  if specified partition is not valid (createPartIfNotExists = false)
-   * @throws ImpersonationFailed  if not able to impersonate 'username'
-   * @throws PartitionCreationFailed if failed to create partition
-   * @throws InterruptedException
-   */
-  public StreamingConnection newConnection(final boolean createPartIfNotExists, final HiveConf conf,
-                                           final UserGroupInformation authenticatedUser, final String agentInfo)
-          throws ConnectionError, InvalidPartition,
-               InvalidTable, PartitionCreationFailed, ImpersonationFailed , InterruptedException {
-
-    if( authenticatedUser==null ) {
-      return newConnectionImpl(authenticatedUser, createPartIfNotExists, conf, agentInfo);
-    }
-
-    try {
-      return authenticatedUser.doAs (
-             new PrivilegedExceptionAction<StreamingConnection>() {
-                @Override
-                public StreamingConnection run()
-                        throws ConnectionError, InvalidPartition, InvalidTable
-                        , PartitionCreationFailed {
-                  return newConnectionImpl(authenticatedUser, createPartIfNotExists, conf, agentInfo);
-                }
-             }
-      );
-    } catch (IOException e) {
-      throw new ConnectionError("Failed to connect as : " + authenticatedUser.getShortUserName(), e);
-    }
-  }
-
-  private StreamingConnection newConnectionImpl(UserGroupInformation ugi,
-                                               boolean createPartIfNotExists, HiveConf conf, String agentInfo)
-          throws ConnectionError, InvalidPartition, InvalidTable
-          , PartitionCreationFailed {
-    return new ConnectionImpl(this, ugi, conf, createPartIfNotExists, agentInfo);
-  }
-
-  private static UserGroupInformation getUserGroupInfo(String user)
-          throws ImpersonationFailed {
-    try {
-      return UserGroupInformation.createProxyUser(
-              user, UserGroupInformation.getLoginUser());
-    } catch (IOException e) {
-      LOG.error("Unable to get UserGroupInfo for user : " + user, e);
-      throw new ImpersonationFailed(user,e);
-    }
-  }
-
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-
-    HiveEndPoint endPoint = (HiveEndPoint) o;
-
-    if (database != null
-            ? !database.equals(endPoint.database)
-            : endPoint.database != null ) {
-      return false;
-    }
-    if (metaStoreUri != null
-            ? !metaStoreUri.equals(endPoint.metaStoreUri)
-            : endPoint.metaStoreUri != null ) {
-      return false;
-    }
-    if (!partitionVals.equals(endPoint.partitionVals)) {
-      return false;
-    }
-    if (table != null ? !table.equals(endPoint.table) : endPoint.table != null) {
-      return false;
-    }
-
-    return true;
-  }
-
-  @Override
-  public int hashCode() {
-    int result = metaStoreUri != null ? metaStoreUri.hashCode() : 0;
-    result = 31 * result + (database != null ? database.hashCode() : 0);
-    result = 31 * result + (table != null ? table.hashCode() : 0);
-    result = 31 * result + partitionVals.hashCode();
-    return result;
-  }
-
-  @Override
-  public String toString() {
-    return "{" +
-            "metaStoreUri='" + metaStoreUri + '\'' +
-            ", database='" + database + '\'' +
-            ", table='" + table + '\'' +
-            ", partitionVals=" + partitionVals + " }";
-  }
-
-
-  private static class ConnectionImpl implements StreamingConnection {
-    private final IMetaStoreClient msClient;
-    private final IMetaStoreClient heartbeaterMSClient;
-    private final HiveEndPoint endPt;
-    private final UserGroupInformation ugi;
-    private final String username;
-    private final boolean secureMode;
-    private final String agentInfo;
-
-    /**
-     * @param endPoint end point to connect to
-     * @param ugi on behalf of whom streaming is done. cannot be null
-     * @param conf HiveConf object
-     * @param createPart create the partition if it does not exist
-     * @throws ConnectionError if there is trouble connecting
-     * @throws InvalidPartition if specified partition does not exist (and createPart=false)
-     * @throws InvalidTable if specified table does not exist
-     * @throws PartitionCreationFailed if createPart=true and not able to create partition
-     */
-    private ConnectionImpl(HiveEndPoint endPoint, UserGroupInformation ugi,
-                           HiveConf conf, boolean createPart, String agentInfo)
-            throws ConnectionError, InvalidPartition, InvalidTable
-                   , PartitionCreationFailed {
-      this.endPt = endPoint;
-      this.ugi = ugi;
-      this.agentInfo = agentInfo;
-      this.username = ugi==null ? System.getProperty("user.name") : ugi.getShortUserName();
-      if (conf==null) {
-        conf = HiveEndPoint.createHiveConf(this.getClass(), endPoint.metaStoreUri);
-      }
-      else {
-          overrideConfSettings(conf);
-      }
-      this.secureMode = ugi==null ? false : ugi.hasKerberosCredentials();
-      this.msClient = getMetaStoreClient(endPoint, conf, secureMode);
-      // We use a separate metastore client for heartbeat calls to ensure heartbeat RPC calls are
-      // isolated from the other transaction related RPC calls.
-      this.heartbeaterMSClient = getMetaStoreClient(endPoint, conf, secureMode);
-      checkEndPoint(endPoint, msClient);
-      if (createPart  &&  !endPoint.partitionVals.isEmpty()) {
-        createPartitionIfNotExists(endPoint, msClient, conf);
-      }
-    }
-
-    /**
-     * Checks the validity of endpoint
-     * @param endPoint the HiveEndPoint to be checked
-     * @param msClient the metastore client
-     * @throws InvalidTable
-     */
-    private void checkEndPoint(HiveEndPoint endPoint, IMetaStoreClient msClient)
-        throws InvalidTable, ConnectionError {
-      Table t;
-      try {
-        t = msClient.getTable(endPoint.database, endPoint.table);
-      } catch (Exception e) {
-        LOG.warn("Unable to check the endPoint: " + endPoint, e);
-        throw new InvalidTable(endPoint.database, endPoint.table, e);
-      }
-      // 1 - check that the table is Acid
-      if (!AcidUtils.isFullAcidTable(t)) {
-        LOG.error("HiveEndPoint " + endPoint + " must use an acid table");
-        throw new InvalidTable(endPoint.database, endPoint.table, "is not an Acid table");
-      }
-
-      // 2 - check if partitionvals are legitimate
-      if (t.getPartitionKeys() != null && !t.getPartitionKeys().isEmpty()
-          && endPoint.partitionVals.isEmpty()) {
-        // Invalid if table is partitioned, but endPoint's partitionVals is empty
-        String errMsg = "HiveEndPoint " + endPoint + " doesn't specify any partitions for " +
-            "partitioned table";
-        LOG.error(errMsg);
-        throw new ConnectionError(errMsg);
-      }
-      if ((t.getPartitionKeys() == null || t.getPartitionKeys().isEmpty())
-          && !endPoint.partitionVals.isEmpty()) {
-        // Invalid if table is not partitioned, but endPoint's partitionVals is not empty
-        String errMsg = "HiveEndPoint" + endPoint + " specifies partitions for unpartitioned table";
-        LOG.error(errMsg);
-        throw new ConnectionError(errMsg);
-      }
-    }
-
-    /**
-     * Close connection
-     */
-    @Override
-    public void close() {
-      if (ugi==null) {
-        msClient.close();
-        heartbeaterMSClient.close();
-        return;
-      }
-      try {
-        ugi.doAs (
-            new PrivilegedExceptionAction<Void>() {
-              @Override
-              public Void run() throws Exception {
-                msClient.close();
-                heartbeaterMSClient.close();
-                return null;
-              }
-            } );
-        try {
-          FileSystem.closeAllForUGI(ugi);
-        } catch (IOException exception) {
-          LOG.error("Could not clean up file-system handles for UGI: " + ugi, exception);
-        }
-      } catch (IOException e) {
-        LOG.error("Error closing connection to " + endPt, e);
-      } catch (InterruptedException e) {
-        LOG.error("Interrupted when closing connection to " + endPt, e);
-      }
-    }
-
-    @Override
-    public UserGroupInformation getUserGroupInformation() {
-      return ugi;
-    }
-
-    /**
-     * Acquires a new batch of transactions from Hive.
-     *
-     * @param numTransactions is a hint from client indicating how many transactions client needs.
-     * @param recordWriter  Used to write record. The same writer instance can
-     *                      be shared with another TransactionBatch (to the same endpoint)
-     *                      only after the first TransactionBatch has been closed.
-     *                      Writer will be closed when the TransactionBatch is closed.
-     * @return
-     * @throws StreamingIOFailure if failed to create new RecordUpdater for batch
-     * @throws TransactionBatchUnAvailable if failed to acquire a new Transaction batch
-     * @throws ImpersonationFailed failed to run command as proxyUser
-     * @throws InterruptedException
-     */
-    @Override
-    public TransactionBatch fetchTransactionBatch(final int numTransactions,
-                                                      final RecordWriter recordWriter)
-            throws StreamingException, TransactionBatchUnAvailable, ImpersonationFailed
-                  , InterruptedException {
-      if (ugi==null) {
-        return fetchTransactionBatchImpl(numTransactions, recordWriter);
-      }
-      try {
-        return ugi.doAs (
-                new PrivilegedExceptionAction<TransactionBatch>() {
-                  @Override
-                  public TransactionBatch run() throws StreamingException, InterruptedException {
-                    return fetchTransactionBatchImpl(numTransactions, recordWriter);
-                  }
-                }
-        );
-      } catch (IOException e) {
-        throw new ImpersonationFailed("Failed to fetch Txn Batch as user '" + ugi.getShortUserName()
-                + "' when acquiring Transaction Batch on endPoint " + endPt, e);
-      }
-    }
-
-    private TransactionBatch fetchTransactionBatchImpl(int numTransactions,
-                                                  RecordWriter recordWriter)
-            throws StreamingException, TransactionBatchUnAvailable, InterruptedException {
-      return new TransactionBatchImpl(username, ugi, endPt, numTransactions, msClient,
-          heartbeaterMSClient, recordWriter, agentInfo);
-    }
-
-
-    private static void createPartitionIfNotExists(HiveEndPoint ep,
-                                                   IMetaStoreClient msClient, HiveConf conf)
-            throws InvalidTable, PartitionCreationFailed {
-      if (ep.partitionVals.isEmpty()) {
-        return;
-      }
-      SessionState localSession = null;
-      if(SessionState.get() == null) {
-        localSession = SessionState.start(new CliSessionState(conf));
-      }
-      IDriver driver = DriverFactory.newDriver(conf);
-
-      try {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Attempting to create partition (if not existent) " + ep);
-        }
-
-        List<FieldSchema> partKeys = msClient.getTable(ep.database, ep.table)
-                .getPartitionKeys();
-        runDDL(driver, "use " + ep.database);
-        String query = "alter table " + ep.table + " add if not exists partition "
-                + partSpecStr(partKeys, ep.partitionVals);
-        runDDL(driver, query);
-      } catch (MetaException e) {
-        LOG.error("Failed to create partition : " + ep, e);
-        throw new PartitionCreationFailed(ep, e);
-      } catch (NoSuchObjectException e) {
-        LOG.error("Failed to create partition : " + ep, e);
-        throw new InvalidTable(ep.database, ep.table);
-      } catch (TException e) {
-        LOG.error("Failed to create partition : " + ep, e);
-        throw new PartitionCreationFailed(ep, e);
-      } catch (QueryFailedException e) {
-        LOG.error("Failed to create partition : " + ep, e);
-        throw new PartitionCreationFailed(ep, e);
-      } finally {
-        driver.close();
-        try {
-          if(localSession != null) {
-            localSession.close();
-          }
-        } catch (IOException e) {
-          LOG.warn("Error closing SessionState used to run Hive DDL.");
-        }
-      }
-    }
-
-    private static boolean runDDL(IDriver driver, String sql) throws QueryFailedException {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Running Hive Query: " + sql);
-      }
-      driver.run(sql);
-      return true;
-    }
-
-    private static String partSpecStr(List<FieldSchema> partKeys, ArrayList<String> partVals) {
-      if (partKeys.size()!=partVals.size()) {
-        throw new IllegalArgumentException("Partition values:" + partVals +
-                ", does not match the partition Keys in table :" + partKeys );
-      }
-      StringBuilder buff = new StringBuilder(partKeys.size()*20);
-      buff.append(" ( ");
-      int i=0;
-      for (FieldSchema schema : partKeys) {
-        buff.append(schema.getName());
-        buff.append("='");
-        buff.append(partVals.get(i));
-        buff.append("'");
-        if (i!=partKeys.size()-1) {
-          buff.append(",");
-        }
-        ++i;
-      }
-      buff.append(" )");
-      return buff.toString();
-    }
-
-    private static IMetaStoreClient getMetaStoreClient(HiveEndPoint endPoint, HiveConf conf, boolean secureMode)
-            throws ConnectionError {
-
-      if (endPoint.metaStoreUri!= null) {
-        conf.setVar(HiveConf.ConfVars.METASTOREURIS, endPoint.metaStoreUri);
-      }
-      if(secureMode) {
-        conf.setBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL,true);
-      }
-      try {
-        return HCatUtil.getHiveMetastoreClient(conf);
-      } catch (MetaException e) {
-        throw new ConnectionError("Error connecting to Hive Metastore URI: "
-                + endPoint.metaStoreUri + ". " + e.getMessage(), e);
-      } catch (IOException e) {
-        throw new ConnectionError("Error connecting to Hive Metastore URI: "
-            + endPoint.metaStoreUri + ". " + e.getMessage(), e);
-      }
-    }
-  } // class ConnectionImpl
-
-  private static class TransactionBatchImpl implements TransactionBatch {
-    private final String username;
-    private final UserGroupInformation ugi;
-    private final HiveEndPoint endPt;
-    private final IMetaStoreClient msClient;
-    private final IMetaStoreClient heartbeaterMSClient;
-    private final RecordWriter recordWriter;
-    private final List<TxnToWriteId> txnToWriteIds;
-
-    //volatile because heartbeat() may be in a "different" thread; updates of this are "piggybacking"
-    private volatile int currentTxnIndex = -1;
-    private final String partNameForLock;
-    //volatile because heartbeat() may be in a "different" thread
-    private volatile TxnState state;
-    private LockRequest lockRequest = null;
-    /**
-     * once any operation on this batch encounters a system exception
-     * (e.g. IOException on write) it's safest to assume that we can't write to the
-     * file backing this batch any more.  This guards important public methods
-     */
-    private volatile boolean isClosed = false;
-    private final String agentInfo;
-    /**
-     * Tracks the state of each transaction
-     */
-    private final TxnState[] txnStatus;
-    /**
-     * ID of the last txn used by {@link #beginNextTransactionImpl()}
-     */
-    private long lastTxnUsed;
-
-    /**
-     * Represents a batch of transactions acquired from MetaStore
-     *
-     * @throws StreamingException if failed to create new RecordUpdater for batch
-     * @throws TransactionBatchUnAvailable if failed to acquire a new Transaction batch
-     */
-    private TransactionBatchImpl(final String user, UserGroupInformation ugi, HiveEndPoint endPt,
-        final int numTxns, final IMetaStoreClient msClient,
-        final IMetaStoreClient heartbeaterMSClient, RecordWriter recordWriter, String agentInfo)
-        throws StreamingException, TransactionBatchUnAvailable, InterruptedException {
-      boolean success = false;
-      try {
-        if ( endPt.partitionVals!=null   &&   !endPt.partitionVals.isEmpty() ) {
-          Table tableObj = msClient.getTable(endPt.database, endPt.table);
-          List<FieldSchema> partKeys = tableObj.getPartitionKeys();
-          partNameForLock = Warehouse.makePartName(partKeys, endPt.partitionVals);
-        } else {
-          partNameForLock = null;
-        }
-        this.username = user;
-        this.ugi = ugi;
-        this.endPt = endPt;
-        this.msClient = msClient;
-        this.heartbeaterMSClient = heartbeaterMSClient;
-        this.recordWriter = recordWriter;
-        this.agentInfo = agentInfo;
-
-        List<Long> txnIds = openTxnImpl(msClient, user, numTxns, ugi);
-        txnToWriteIds = allocateWriteIdsImpl(msClient, txnIds, ugi);
-        assert(txnToWriteIds.size() == numTxns);
-
-        txnStatus = new TxnState[numTxns];
-        for(int i = 0; i < txnStatus.length; i++) {
-          assert(txnToWriteIds.get(i).getTxnId() == txnIds.get(i));
-          txnStatus[i] = TxnState.OPEN;//Open matches Metastore state
-        }
-        this.state = TxnState.INACTIVE;
-
-        // The Write Ids returned for the transaction batch is also sequential
-        recordWriter.newBatch(txnToWriteIds.get(0).getWriteId(), txnToWriteIds.get(numTxns-1).getWriteId());
-        success = true;
-      } catch (TException e) {
-        throw new TransactionBatchUnAvailable(endPt, e);
-      } catch (IOException e) {
-        throw new TransactionBatchUnAvailable(endPt, e);
-      }
-      finally {
-        //clean up if above throws
-        markDead(success);
-      }
-    }
-
-    private List<Long> openTxnImpl(final IMetaStoreClient msClient, final String user, final int numTxns, UserGroupInformation ugi)
-            throws IOException, TException,  InterruptedException {
-      if(ugi==null) {
-        return  msClient.openTxns(user, numTxns).getTxn_ids();
-      }
-      return (List<Long>) ugi.doAs(new PrivilegedExceptionAction<Object>() {
-        @Override
-        public Object run() throws Exception {
-          return msClient.openTxns(user, numTxns).getTxn_ids();
-        }
-      });
-    }
-
-    private List<TxnToWriteId> allocateWriteIdsImpl(final IMetaStoreClient msClient,
-                                                    final List<Long> txnIds, UserGroupInformation ugi)
-            throws IOException, TException,  InterruptedException {
-      if(ugi==null) {
-        return  msClient.allocateTableWriteIdsBatch(txnIds, endPt.database, endPt.table);
-      }
-      return (List<TxnToWriteId>) ugi.doAs(new PrivilegedExceptionAction<Object>() {
-        @Override
-        public Object run() throws Exception {
-          return msClient.allocateTableWriteIdsBatch(txnIds, endPt.database, endPt.table);
-        }
-      });
-    }
-
-    @Override
-    public String toString() {
-      if (txnToWriteIds==null || txnToWriteIds.isEmpty()) {
-        return "{}";
-      }
-      StringBuilder sb = new StringBuilder(" TxnStatus[");
-      for(TxnState state : txnStatus) {
-        //'state' should not be null - future proofing
-        sb.append(state == null ? "N" : state);
-      }
-      sb.append("] LastUsed ").append(JavaUtils.txnIdToString(lastTxnUsed));
-      return "TxnId/WriteIds=[" + txnToWriteIds.get(0).getTxnId()
-              + "/" + txnToWriteIds.get(0).getWriteId()
-              + "..."
-              + txnToWriteIds.get(txnToWriteIds.size()-1).getTxnId()
-              + "/" + txnToWriteIds.get(txnToWriteIds.size()-1).getWriteId()
-              + "] on endPoint = " + endPt + "; " + sb;
-    }
-
-    /**
-     * Activate the next available transaction in the current transaction batch
-     * @throws TransactionError failed to switch to next transaction
-     */
-    @Override
-    public void beginNextTransaction() throws TransactionError, ImpersonationFailed,
-            InterruptedException {
-      checkIsClosed();
-      if (ugi==null) {
-        beginNextTransactionImpl();
-        return;
-      }
-      try {
-        ugi.doAs (
-              new PrivilegedExceptionAction<Void>() {
-                @Override
-                public Void run() throws TransactionError {
-                  beginNextTransactionImpl();
-                  return null;
-                }
-              }
-        );
-      } catch (IOException e) {
-        throw new ImpersonationFailed("Failed switching to next Txn as user '" + username +
-                "' in Txn batch :" + this, e);
-      }
-    }
-
-    private void beginNextTransactionImpl() throws TransactionError {
-      state = TxnState.INACTIVE;//clear state from previous txn
-
-      if ((currentTxnIndex + 1) >= txnToWriteIds.size()) {
-        throw new InvalidTrasactionState("No more transactions available in" +
-                " current batch for end point : " + endPt);
-      }
-      ++currentTxnIndex;
-      state = TxnState.OPEN;
-      lastTxnUsed = getCurrentTxnId();
-      lockRequest = createLockRequest(endPt, partNameForLock, username, getCurrentTxnId(), agentInfo);
-      try {
-        LockResponse res = msClient.lock(lockRequest);
-        if (res.getState() != LockState.ACQUIRED) {
-          throw new TransactionError("Unable to acquire lock on " + endPt);
-        }
-      } catch (TException e) {
-        throw new TransactionError("Unable to acquire lock on " + endPt, e);
-      }
-    }
-
-    /**
-     * Get Id of currently open transaction.
-     * @return -1 if there is no open TX
-     */
-    @Override
-    public Long getCurrentTxnId() {
-      if (currentTxnIndex >= 0) {
-        return txnToWriteIds.get(currentTxnIndex).getTxnId();
-      }
-      return -1L;
-    }
-
-    /**
-     * Get Id of currently open transaction.
-     * @return -1 if there is no open TX
-     */
-    @Override
-    public Long getCurrentWriteId() {
-      if (currentTxnIndex >= 0) {
-        return txnToWriteIds.get(currentTxnIndex).getWriteId();
-      }
-      return -1L;
-    }
-
-    /**
-     * get state of current transaction
-     * @return
-     */
-    @Override
-    public TxnState getCurrentTransactionState() {
-      return state;
-    }
-
-    /**
-     * Remaining transactions are the ones that are not committed or aborted or active.
-     * Active transaction is not considered part of remaining txns.
-     * @return number of transactions remaining this batch.
-     */
-    @Override
-    public int remainingTransactions() {
-      if (currentTxnIndex>=0) {
-        return txnToWriteIds.size() - currentTxnIndex -1;
-      }
-      return txnToWriteIds.size();
-    }
-
-
-    /**
-     *  Write record using RecordWriter
-     * @param record  the data to be written
-     * @throws StreamingIOFailure I/O failure
-     * @throws SerializationError  serialization error
-     * @throws ImpersonationFailed error writing on behalf of proxyUser
-     * @throws InterruptedException
-     */
-    @Override
-    public void write(final byte[] record)
-            throws StreamingException, InterruptedException {
-      write(Collections.singletonList(record));
-    }
-    private void checkIsClosed() throws IllegalStateException {
-      if(isClosed) {
-        throw new IllegalStateException("TransactionBatch " + toString() + " has been closed()");
-      }
-    }
-    /**
-     * A transaction batch opens a single HDFS file and writes multiple transaction to it.  If there is any issue
-     * with the write, we can't continue to write to the same file any as it may be corrupted now (at the tail).
-     * This ensures that a client can't ignore these failures and continue to write.
-     */
-    private void markDead(boolean success) {
-      if(success) {
-        return;
-      }
-      isClosed = true;//also ensures that heartbeat() is no-op since client is likely doing it async
-      try {
-        abort(true);//abort all remaining txns
-      }
-      catch(Exception ex) {
-        LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex);
-      }
-      try {
-        closeImpl();
-      }
-      catch (Exception ex) {
-        LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex);
-      }
-    }
-
-
-    /**
-     *  Write records using RecordWriter
-     * @param records collection of rows to be written
-     * @throws StreamingException  serialization error
-     * @throws ImpersonationFailed error writing on behalf of proxyUser
-     * @throws InterruptedException
-     */
-    @Override
-    public void write(final Collection<byte[]> records)
-            throws StreamingException, InterruptedException,
-            ImpersonationFailed {
-      checkIsClosed();
-      boolean success = false;
-      try {
-        if (ugi == null) {
-          writeImpl(records);
-        } else {
-          ugi.doAs(
-            new PrivilegedExceptionAction<Void>() {
-              @Override
-              public Void run() throws StreamingException {
-                writeImpl(records);
-                return null;
-              }
-            }
-          );
-        }
-        success = true;
-      } catch(SerializationError ex) {
-        //this exception indicates that a {@code record} could not be parsed and the
-        //caller can decide whether to drop it or send it to dead letter queue.
-        //rolling back the txn and retrying won't help since the tuple will be exactly the same
-        //when it's replayed.
-        success = true;
-        throw ex;
-      } catch(IOException e){
-        throw new ImpersonationFailed("Failed writing as user '" + username +
-          "' to endPoint :" + endPt + ". Transaction Id: "
-          + getCurrentTxnId(), e);
-      }
-      finally {
-        markDead(success);
-      }
-    }
-
-    private void writeImpl(Collection<byte[]> records)
-            throws StreamingException {
-      for (byte[] record : records) {
-        recordWriter.write(getCurrentWriteId(), record);
-      }
-    }
-
-
-    /**
-     * Commit the currently open transaction
-     * @throws TransactionError
-     * @throws StreamingIOFailure  if flushing records failed
-     * @throws ImpersonationFailed if
-     * @throws InterruptedException
-     */
-    @Override
-    public void commit()  throws TransactionError, StreamingException,
-           ImpersonationFailed, InterruptedException {
-      checkIsClosed();
-      boolean success = false;
-      try {
-        if (ugi == null) {
-          commitImpl();
-        }
-        else {
-          ugi.doAs(
-            new PrivilegedExceptionAction<Void>() {
-              @Override
-              public Void run() throws StreamingException {
-                commitImpl();
-                return null;
-              }
-            }
-          );
-        }
-        success = true;
-      } catch (IOException e) {
-        throw new ImpersonationFailed("Failed committing Txn ID " + getCurrentTxnId() + " as user '"
-                + username + "'on endPoint :" + endPt + ". Transaction Id: ", e);
-      }
-      finally {
-        markDead(success);
-      }
-    }
-
-    private void commitImpl() throws TransactionError, StreamingException {
-      try {
-        recordWriter.flush();
-        msClient.commitTxn(txnToWriteIds.get(currentTxnIndex).getTxnId());
-        state = TxnState.COMMITTED;
-        txnStatus[currentTxnIndex] = TxnState.COMMITTED;
-      } catch (NoSuchTxnException e) {
-        throw new TransactionError("Invalid transaction id : "
-                + getCurrentTxnId(), e);
-      } catch (TxnAbortedException e) {
-        throw new TransactionError("Aborted transaction cannot be committed"
-                , e);
-      } catch (TException e) {
-        throw new TransactionError("Unable to commit transaction"
-                + getCurrentTxnId(), e);
-      }
-    }
-
-    /**
-     * Abort the currently open transaction
-     * @throws TransactionError
-     */
-    @Override
-    public void abort() throws TransactionError, StreamingException
-                      , ImpersonationFailed, InterruptedException {
-      if(isClosed) {
-        /**
-         * isDead is only set internally by this class.  {@link #markDead(boolean)} will abort all
-         * remaining txns, so make this no-op to make sure that a well-behaved client that calls abort()
-         * error doesn't get misleading errors
-         */
-        return;
-      }
-      abort(false);
-    }
-    private void abort(final boolean abortAllRemaining) throws TransactionError, StreamingException
-        , ImpersonationFailed, InterruptedException {
-      if (ugi==null) {
-        abortImpl(abortAllRemaining);
-        return;
-      }
-      try {
-        ugi.doAs (
-                new PrivilegedExceptionAction<Void>() {
-                  @Override
-                  public Void run() throws StreamingException {
-                    abortImpl(abortAllRemaining);
-                    return null;
-                  }
-                }
-        );
-      } catch (IOException e) {
-        throw new ImpersonationFailed("Failed aborting Txn " + getCurrentTxnId()  + " as user '"
-                + username + "' on endPoint :" + endPt, e);
-      }
-    }
-
-    private void abortImpl(boolean abortAllRemaining) throws TransactionError, StreamingException {
-      try {
-        if(abortAllRemaining) {
-          //when last txn finished (abort/commit) the currentTxnIndex is pointing at that txn
-          //so we need to start from next one, if any.  Also if batch was created but
-          //fetchTransactionBatch() was never called, we want to start with first txn
-          int minOpenTxnIndex = Math.max(currentTxnIndex +
-            (state == TxnState.ABORTED || state == TxnState.COMMITTED ? 1 : 0), 0);
-          for(currentTxnIndex = minOpenTxnIndex;
-              currentTxnIndex < txnToWriteIds.size(); currentTxnIndex++) {
-            msClient.rollbackTxn(txnToWriteIds.get(currentTxnIndex).getTxnId());
-            txnStatus[currentTxnIndex] = TxnState.ABORTED;
-          }
-          currentTxnIndex--;//since the loop left it == txnToWriteIds.size()
-        }
-        else {
-          if (getCurrentTxnId() > 0) {
-            msClient.rollbackTxn(getCurrentTxnId());
-            txnStatus[currentTxnIndex] = TxnState.ABORTED;
-          }
-        }
-        state = TxnState.ABORTED;
-        recordWriter.clear();
-      } catch (NoSuchTxnException e) {
-        throw new TransactionError("Unable to abort invalid transaction id : "
-                + getCurrentTxnId(), e);
-      } catch (TException e) {
-        throw new TransactionError("Unable to abort transaction id : "
-                + getCurrentTxnId(), e);
-      }
-    }
-
-    @Override
-    public void heartbeat() throws StreamingException, HeartBeatFailure {
-      if(isClosed) {
-        return;
-      }
-      if(state != TxnState.OPEN && currentTxnIndex >= txnToWriteIds.size() - 1) {
-        //here means last txn in the batch is resolved but the close() hasn't been called yet so
-        //there is nothing to heartbeat
-        return;
-      }
-      //if here after commit()/abort() but before next beginNextTransaction(), currentTxnIndex still
-      //points at the last txn which we don't want to heartbeat
-      Long first = txnToWriteIds.get(state == TxnState.OPEN ? currentTxnIndex : currentTxnIndex + 1).getTxnId();
-      Long last = txnToWriteIds.get(txnToWriteIds.size()-1).getTxnId();
-      try {
-        HeartbeatTxnRangeResponse resp = heartbeaterMSClient.heartbeatTxnRange(first, last);
-        if (!resp.getAborted().isEmpty() || !resp.getNosuch().isEmpty()) {
-          throw new HeartBeatFailure(resp.getAborted(), resp.getNosuch());
-        }
-      } catch (TException e) {
-        throw new StreamingException("Failure to heartbeat on ids (" + first + "src/gen/thrift"
-                + last + ") on end point : " + endPt );
-      }
-    }
-
-    @Override
-    public boolean isClosed() {
-      return isClosed;
-    }
-    /**
-     * Close the TransactionBatch.  This will abort any still open txns in this batch.
-     * @throws StreamingIOFailure I/O failure when closing transaction batch
-     */
-    @Override
-    public void close() throws StreamingException, ImpersonationFailed, InterruptedException {
-      if(isClosed) {
-        return;
-      }
-      isClosed = true;
-      abortImpl(true);//abort proactively so that we don't wait for timeout
-      closeImpl();//perhaps we should add a version of RecordWriter.closeBatch(boolean abort) which
-      //will call RecordUpdater.close(boolean abort)
-    }
-    private void closeImpl() throws StreamingException, InterruptedException{
-      state = TxnState.INACTIVE;
-      if(ugi == null) {
-        recordWriter.closeBatch();
-        return;
-      }
-      try {
-        ugi.doAs (
-                new PrivilegedExceptionAction<Void>() {
-                  @Override
-                  public Void run() throws StreamingException {
-                    recordWriter.closeBatch();
-                    return null;
-                  }
-                }
-        );
-        try {
-          FileSystem.closeAllForUGI(ugi);
-        } catch (IOException exception) {
-          LOG.error("Could not clean up file-system handles for UGI: " + ugi, exception);
-        }
-      } catch (IOException e) {
-        throw new ImpersonationFailed("Failed closing Txn Batch as user '" + username +
-                "' on  endPoint :" + endPt, e);
-      }
-    }
-
-    private static LockRequest createLockRequest(final HiveEndPoint hiveEndPoint,
-            String partNameForLock, String user, long txnId, String agentInfo)  {
-      LockRequestBuilder rqstBuilder = agentInfo == null ?
-        new LockRequestBuilder() : new LockRequestBuilder(agentInfo);
-      rqstBuilder.setUser(user);
-      rqstBuilder.setTransactionId(txnId);
-
-      LockComponentBuilder lockCompBuilder = new LockComponentBuilder()
-              .setDbName(hiveEndPoint.database)
-              .setTableName(hiveEndPoint.table)
-              .setShared()
-              .setOperationType(DataOperationType.INSERT);
-      if (partNameForLock!=null && !partNameForLock.isEmpty() ) {
-          lockCompBuilder.setPartitionName(partNameForLock);
-      }
-      rqstBuilder.addLockComponent(lockCompBuilder.build());
-
-      return rqstBuilder.build();
-    }
-  } // class TransactionBatchImpl
-
-  static HiveConf createHiveConf(Class<?> clazz, String metaStoreUri) {
-    HiveConf conf = new HiveConf(clazz);
-    if (metaStoreUri!= null) {
-      setHiveConf(conf, HiveConf.ConfVars.METASTOREURIS, metaStoreUri);
-    }
-    HiveEndPoint.overrideConfSettings(conf);
-    return conf;
-  }
-
-  private static void overrideConfSettings(HiveConf conf) {
-    setHiveConf(conf, HiveConf.ConfVars.HIVE_TXN_MANAGER,
-            "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
-    setHiveConf(conf, HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
-    setHiveConf(conf, HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI, true);
-    // Avoids creating Tez Client sessions internally as it takes much longer currently
-    setHiveConf(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "mr");
-  }
-
-  private static void setHiveConf(HiveConf conf, HiveConf.ConfVars var, String value) {
-    if( LOG.isDebugEnabled() ) {
-      LOG.debug("Overriding HiveConf setting : " + var + " = " + value);
-    }
-    conf.setVar(var, value);
-  }
-
-  private static void setHiveConf(HiveConf conf, HiveConf.ConfVars var, boolean value) {
-    if( LOG.isDebugEnabled() ) {
-      LOG.debug("Overriding HiveConf setting : " + var + " = " + value);
-    }
-    conf.setBoolVar(var, value);
-  }
-
-}  // class HiveEndPoint

http://git-wip-us.apache.org/repos/asf/hive/blob/bf8d305a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
new file mode 100644
index 0000000..205ed6c
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
@@ -0,0 +1,1039 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.LockComponentBuilder;
+import org.apache.hadoop.hive.metastore.LockRequestBuilder;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.DataOperationType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+import org.apache.hadoop.hive.ql.lockmgr.LockException;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Streaming connection implementation for hive. To create a streaming connection, use the builder API
+ * to create record writer first followed by the connection itself. Once connection is created, clients can
+ * begin a transaction, keep writing using the connection, commit the transaction and close connection when done.
+ * To bind to the correct metastore, HiveConf object has to be created from hive-site.xml or HIVE_CONF_DIR.
+ * If hive conf is manually created, metastore uri has to be set correctly. If hive conf object is not specified,
+ * "thrift://localhost:9083" will be used as default.
+ * <br/><br/>
+ * NOTE: The streaming connection APIs and record writer APIs are not thread-safe. Streaming connection creation,
+ * begin/commit/abort transactions, write and close has to be called in the same thread. If close() or
+ * abortTransaction() has to be triggered from a separate thread it has to be co-ordinated via external variables or
+ * synchronization mechanism
+ * <br/><br/>
+ * Example usage:
+ * <pre>{@code
+ * // create delimited record writer whose schema exactly matches table schema
+ * StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
+ *                                      .withFieldDelimiter(',')
+ *                                      .build();
+ *
+ * // create and open streaming connection (default.src table has to exist already)
+ * StreamingConnection connection = HiveStreamingConnection.newBuilder()
+ *                                    .withDatabase("default")
+ *                                    .withTable("src")
+ *                                    .withAgentInfo("nifi-agent")
+ *                                    .withRecordWriter(writer)
+ *                                    .withHiveConf(hiveConf)
+ *                                    .connect();
+ *
+ * // begin a transaction, write records and commit 1st transaction
+ * connection.beginTransaction();
+ * connection.write("key1,val1".getBytes());
+ * connection.write("key2,val2".getBytes());
+ * connection.commitTransaction();
+ *
+ * // begin another transaction, write more records and commit 2nd transaction
+ * connection.beginTransaction();
+ * connection.write("key3,val3".getBytes());
+ * connection.write("key4,val4".getBytes());
+ * connection.commitTransaction();
+ *
+ * // close the streaming connection
+ * connection.close();
+ * }
+ * </pre>
+ */
+public class HiveStreamingConnection implements StreamingConnection {
+  private static final Logger LOG = LoggerFactory.getLogger(HiveStreamingConnection.class.getName());
+
+  private static final String DEFAULT_METASTORE_URI = "thrift://localhost:9083";
+  private static final int DEFAULT_TRANSACTION_BATCH_SIZE = 1;
+  private static final int DEFAULT_HEARTBEAT_INTERVAL = 60 * 1000;
+  private static final boolean DEFAULT_STREAMING_OPTIMIZATIONS_ENABLED = true;
+
+  public enum TxnState {
+    INACTIVE("I"), OPEN("O"), COMMITTED("C"), ABORTED("A");
+
+    private final String code;
+
+    TxnState(String code) {
+      this.code = code;
+    }
+
+    public String toString() {
+      return code;
+    }
+  }
+
+  // fields populated from builder
+  private String database;
+  private String table;
+  private List<String> staticPartitionValues;
+  private String agentInfo;
+  private int transactionBatchSize;
+  private RecordWriter recordWriter;
+  private TransactionBatch currentTransactionBatch;
+  private HiveConf conf;
+  private boolean streamingOptimizations;
+  private AtomicBoolean isConnectionClosed = new AtomicBoolean(false);
+
+  // internal fields
+  private boolean isPartitionedTable;
+  private IMetaStoreClient msClient;
+  private IMetaStoreClient heartbeatMSClient;
+  private final String username;
+  private final boolean secureMode;
+  private Table tableObject = null;
+  private String metastoreUri;
+
+  private HiveStreamingConnection(Builder builder) throws StreamingException {
+    this.database = builder.database.toLowerCase();
+    this.table = builder.table.toLowerCase();
+    this.staticPartitionValues = builder.staticPartitionValues;
+    this.conf = builder.hiveConf;
+    this.agentInfo = builder.agentInfo;
+    this.streamingOptimizations = builder.streamingOptimizations;
+    UserGroupInformation loggedInUser = null;
+    try {
+      loggedInUser = UserGroupInformation.getLoginUser();
+    } catch (IOException e) {
+      LOG.warn("Unable to get logged in user via UGI. err: {}", e.getMessage());
+    }
+    if (loggedInUser == null) {
+      this.username = System.getProperty("user.name");
+      this.secureMode = false;
+    } else {
+      this.username = loggedInUser.getShortUserName();
+      this.secureMode = loggedInUser.hasKerberosCredentials();
+    }
+    this.transactionBatchSize = builder.transactionBatchSize;
+    this.recordWriter = builder.recordWriter;
+    if (agentInfo == null) {
+      try {
+        agentInfo = username + ":" + InetAddress.getLocalHost().getHostName() + ":" + Thread.currentThread().getName();
+      } catch (UnknownHostException e) {
+        // ignore and use UUID instead
+        this.agentInfo = UUID.randomUUID().toString();
+      }
+    }
+    if (conf == null) {
+      conf = createHiveConf(this.getClass(), DEFAULT_METASTORE_URI);
+    }
+    overrideConfSettings(conf);
+    this.metastoreUri = conf.get(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName());
+    this.msClient = getMetaStoreClient(conf, metastoreUri, secureMode);
+    // We use a separate metastore client for heartbeat calls to ensure heartbeat RPC calls are
+    // isolated from the other transaction related RPC calls.
+    this.heartbeatMSClient = getMetaStoreClient(conf, metastoreUri, secureMode);
+    validateTable();
+    LOG.info("STREAMING CONNECTION INFO: {}", toConnectionInfoString());
+  }
+
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  public static class Builder {
+    private String database;
+    private String table;
+    private List<String> staticPartitionValues;
+    private String agentInfo;
+    private HiveConf hiveConf;
+    private int transactionBatchSize = DEFAULT_TRANSACTION_BATCH_SIZE;
+    private boolean streamingOptimizations = DEFAULT_STREAMING_OPTIMIZATIONS_ENABLED;
+    private RecordWriter recordWriter;
+
+    /**
+     * Specify database to use for streaming connection.
+     *
+     * @param database - db name
+     * @return - builder
+     */
+    public Builder withDatabase(final String database) {
+      this.database = database;
+      return this;
+    }
+
+    /**
+     * Specify table to use for streaming connection.
+     *
+     * @param table - table name
+     * @return - builder
+     */
+    public Builder withTable(final String table) {
+      this.table = table;
+      return this;
+    }
+
+    /**
+     * Specify the name of partition to use for streaming connection.
+     *
+     * @param staticPartitionValues - static partition values
+     * @return - builder
+     */
+    public Builder withStaticPartitionValues(final List<String> staticPartitionValues) {
+      this.staticPartitionValues = staticPartitionValues == null ? null : new ArrayList<>(staticPartitionValues);
+      return this;
+    }
+
+    /**
+     * Specify agent info to use for streaming connection.
+     *
+     * @param agentInfo - agent info
+     * @return - builder
+     */
+    public Builder withAgentInfo(final String agentInfo) {
+      this.agentInfo = agentInfo;
+      return this;
+    }
+
+    /**
+     * Specify hive configuration object to use for streaming connection.
+     * Generate this object by point to already existing hive-site.xml or HIVE_CONF_DIR.
+     * Make sure if metastore URI has been set correctly else thrift://localhost:9083 will be
+     * used as default.
+     *
+     * @param hiveConf - hive conf object
+     * @return - builder
+     */
+    public Builder withHiveConf(final HiveConf hiveConf) {
+      this.hiveConf = hiveConf;
+      return this;
+    }
+
+    /**
+     * Transaction batch size to use (default value is 10). This is expert level configuration.
+     * For every transaction batch a delta directory will be created which will impact
+     * when compaction will trigger.
+     * NOTE: This is evolving API and is subject to change/might not be honored in future releases.
+     *
+     * @param transactionBatchSize - transaction batch size
+     * @return - builder
+     */
+    @InterfaceStability.Evolving
+    public Builder withTransactionBatchSize(final int transactionBatchSize) {
+      this.transactionBatchSize = transactionBatchSize;
+      return this;
+    }
+
+    /**
+     * Whether to enable streaming optimizations. This is expert level configurations.
+     * Disabling streaming optimizations will have significant impact to performance and memory consumption.
+     *
+     * @param enable - flag to enable or not
+     * @return - builder
+     */
+    public Builder withStreamingOptimizations(final boolean enable) {
+      this.streamingOptimizations = enable;
+      return this;
+    }
+
+    /**
+     * Record writer to use for writing records to destination table.
+     *
+     * @param recordWriter - record writer
+     * @return - builder
+     */
+    public Builder withRecordWriter(final RecordWriter recordWriter) {
+      this.recordWriter = recordWriter;
+      return this;
+    }
+
+    /**
+     * Returning a streaming connection to hive.
+     *
+     * @return - hive streaming connection
+     */
+    public HiveStreamingConnection connect() throws StreamingException {
+      if (database == null) {
+        throw new StreamingException("Database cannot be null for streaming connection");
+      }
+      if (table == null) {
+        throw new StreamingException("Table cannot be null for streaming connection");
+      }
+      if (recordWriter == null) {
+        throw new StreamingException("Record writer cannot be null for streaming connection");
+      }
+      return new HiveStreamingConnection(this);
+    }
+  }
+
+  private void setPartitionedTable(boolean isPartitionedTable) {
+    this.isPartitionedTable = isPartitionedTable;
+  }
+
+  @Override
+  public String toString() {
+    return "{ metaStoreUri: " + metastoreUri + ", database: " + database + ", table: " + table + " }";
+  }
+
+  private String toConnectionInfoString() {
+    return "{ metastore-uri: " + metastoreUri + ", " +
+      "database: " + database + ", " +
+      "table: " + table + ", " +
+      "partitioned-table: " + isPartitionedTable() + ", " +
+      "dynamic-partitioning: " + isDynamicPartitioning() + ", " +
+      "username: " + username + ", " +
+      "secure-mode: " + secureMode + ", " +
+      "record-writer: " + recordWriter.getClass().getSimpleName() + ", " +
+      "agent-info: " + agentInfo + " }";
+  }
+
+  @VisibleForTesting
+  String toTransactionString() {
+    return currentTransactionBatch == null ? "" : currentTransactionBatch.toString();
+  }
+
+  @Override
+  public PartitionInfo createPartitionIfNotExists(final List<String> partitionValues) throws StreamingException {
+    String partLocation = null;
+    String partName = null;
+    boolean exists = false;
+    try {
+      Map<String, String> partSpec = Warehouse.makeSpecFromValues(tableObject.getPartitionKeys(), partitionValues);
+      AddPartitionDesc addPartitionDesc = new AddPartitionDesc(database, table, true);
+      partName = Warehouse.makePartName(tableObject.getPartitionKeys(), partitionValues);
+      partLocation = new Path(tableObject.getDataLocation(), Warehouse.makePartPath(partSpec)).toString();
+      addPartitionDesc.addPartition(partSpec, partLocation);
+      Partition partition = Hive.convertAddSpecToMetaPartition(tableObject, addPartitionDesc.getPartition(0), conf);
+      msClient.add_partition(partition);
+    } catch (AlreadyExistsException e) {
+      exists = true;
+    } catch (HiveException | TException e) {
+      throw new StreamingException("Unable to creation partition for values: " + partitionValues + " connection: " +
+        toConnectionInfoString());
+    }
+    return new PartitionInfo(partName, partLocation, exists);
+  }
+
+  private void validateTable() throws InvalidTable, ConnectionError {
+    try {
+      tableObject = new Table(msClient.getTable(database, table));
+    } catch (Exception e) {
+      LOG.warn("Unable to validate the table for connection: " + toConnectionInfoString(), e);
+      throw new InvalidTable(database, table, e);
+    }
+    // 1 - check that the table is Acid
+    if (!AcidUtils.isFullAcidTable(tableObject)) {
+      LOG.error("HiveEndPoint " + this + " must use an acid table");
+      throw new InvalidTable(database, table, "is not an Acid table");
+    }
+
+    if (tableObject.getPartitionKeys() != null && !tableObject.getPartitionKeys().isEmpty()) {
+      setPartitionedTable(true);
+    } else {
+      setPartitionedTable(false);
+    }
+
+    // partition values are specified on non-partitioned table
+    if (!isPartitionedTable() && (staticPartitionValues != null && !staticPartitionValues.isEmpty())) {
+      // Invalid if table is not partitioned, but endPoint's partitionVals is not empty
+      String errMsg = this.toString() + " specifies partitions for un-partitioned table";
+      LOG.error(errMsg);
+      throw new ConnectionError(errMsg);
+    }
+  }
+
+  private static class HeartbeatRunnable implements Runnable {
+    private final IMetaStoreClient heartbeatMSClient;
+    private final AtomicLong minTxnId;
+    private final long maxTxnId;
+    private final ReentrantLock transactionLock;
+    private final AtomicBoolean isTxnClosed;
+
+    HeartbeatRunnable(final IMetaStoreClient heartbeatMSClient, final AtomicLong minTxnId, final long maxTxnId,
+      final ReentrantLock transactionLock, final AtomicBoolean isTxnClosed) {
+      this.heartbeatMSClient = heartbeatMSClient;
+      this.minTxnId = minTxnId;
+      this.maxTxnId = maxTxnId;
+      this.transactionLock = transactionLock;
+      this.isTxnClosed = isTxnClosed;
+    }
+
+    @Override
+    public void run() {
+      transactionLock.lock();
+      try {
+        if (minTxnId.get() > 0) {
+          HeartbeatTxnRangeResponse resp = heartbeatMSClient.heartbeatTxnRange(minTxnId.get(), maxTxnId);
+          if (!resp.getAborted().isEmpty() || !resp.getNosuch().isEmpty()) {
+            LOG.error("Heartbeat failure: {}", resp.toString());
+            isTxnClosed.set(true);
+          } else {
+            LOG.info("Heartbeat sent for range: [{}-{}]", minTxnId.get(), maxTxnId);
+          }
+        }
+      } catch (TException e) {
+        LOG.warn("Failure to heartbeat for transaction range: [" + minTxnId.get() + "-" + maxTxnId + "]", e);
+      } finally {
+        transactionLock.unlock();
+      }
+    }
+  }
+
+  private void beginNextTransaction() throws StreamingException {
+    if (currentTransactionBatch == null) {
+      currentTransactionBatch = createNewTransactionBatch();
+      LOG.info("Opened new transaction batch {}", currentTransactionBatch);
+    }
+
+    if (currentTransactionBatch.isClosed()) {
+      throw new IllegalStateException("Cannot begin next transaction on a closed streaming connection");
+    }
+
+    if (currentTransactionBatch.remainingTransactions() == 0) {
+      LOG.info("Transaction batch {} is done. Rolling over to next transaction batch.",
+        currentTransactionBatch);
+      currentTransactionBatch.close();
+      currentTransactionBatch = createNewTransactionBatch();
+      LOG.info("Rolled over to new transaction batch {}", currentTransactionBatch);
+    }
+    currentTransactionBatch.beginNextTransaction();
+  }
+
+  private TransactionBatch createNewTransactionBatch() throws StreamingException {
+    return new TransactionBatch(this);
+  }
+
+  private void checkClosedState() throws StreamingException {
+    if (isConnectionClosed.get()) {
+      throw new StreamingException("Streaming connection is closed already.");
+    }
+  }
+
+  private void checkState() throws StreamingException {
+    checkClosedState();
+    if (currentTransactionBatch == null) {
+      throw new StreamingException("Transaction batch is null. Missing beginTransaction?");
+    }
+    if (currentTransactionBatch.state != TxnState.OPEN) {
+      throw new StreamingException("Transaction state is not OPEN. Missing beginTransaction?");
+    }
+  }
+
+  @Override
+  public void write(final byte[] record) throws StreamingException {
+    checkState();
+    currentTransactionBatch.write(record);
+  }
+
+  @Override
+  public void beginTransaction() throws StreamingException {
+    checkClosedState();
+    beginNextTransaction();
+  }
+
+  @Override
+  public void commitTransaction() throws StreamingException {
+    checkState();
+    currentTransactionBatch.commit();
+  }
+
+  @Override
+  public void abortTransaction() throws StreamingException {
+    checkState();
+    currentTransactionBatch.abort();
+  }
+
+  @Override
+  public void close() {
+    if (isConnectionClosed.get()) {
+      return;
+    }
+    isConnectionClosed.set(true);
+    try {
+      if (currentTransactionBatch != null) {
+        currentTransactionBatch.close();
+      }
+    } catch (StreamingException e) {
+      LOG.error("Unable to close current transaction batch: " + currentTransactionBatch, e);
+    } finally {
+      msClient.close();
+      heartbeatMSClient.close();
+    }
+  }
+
+  private static IMetaStoreClient getMetaStoreClient(HiveConf conf, String metastoreUri, boolean secureMode)
+    throws ConnectionError {
+    if (metastoreUri != null) {
+      conf.set(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName(), metastoreUri);
+    }
+    if (secureMode) {
+      conf.setBoolean(MetastoreConf.ConfVars.USE_THRIFT_SASL.getHiveName(), true);
+    }
+    try {
+      return HiveMetaStoreUtils.getHiveMetastoreClient(conf);
+    } catch (MetaException | IOException e) {
+      throw new ConnectionError("Error connecting to Hive Metastore URI: "
+        + metastoreUri + ". " + e.getMessage(), e);
+    }
+  }
+
+  @VisibleForTesting
+  TxnState getCurrentTransactionState() {
+    return currentTransactionBatch.getCurrentTransactionState();
+  }
+
+  @VisibleForTesting
+  int remainingTransactions() {
+    return currentTransactionBatch.remainingTransactions();
+  }
+
+  @VisibleForTesting
+  Long getCurrentTxnId() {
+    return currentTransactionBatch.getCurrentTxnId();
+  }
+
+  private static class TransactionBatch {
+    private String username;
+    private HiveStreamingConnection conn;
+    private IMetaStoreClient msClient;
+    private IMetaStoreClient heartbeatMSClient;
+    private ScheduledExecutorService scheduledExecutorService;
+    private RecordWriter recordWriter;
+    private String partNameForLock = null;
+    private List<TxnToWriteId> txnToWriteIds;
+    private int currentTxnIndex = -1;
+    private TxnState state;
+    private LockRequest lockRequest = null;
+    // heartbeats can only be sent for open transactions.
+    // there is a race between committing/aborting a transaction and heartbeat.
+    // Example: If a heartbeat is sent for committed txn, exception will be thrown.
+    // Similarly if we don't send a heartbeat, metastore server might abort a txn
+    // for missed heartbeat right before commit txn call.
+    // This lock is used to mutex commit/abort and heartbeat calls
+    private final ReentrantLock transactionLock = new ReentrantLock();
+    // min txn id is incremented linearly within a transaction batch.
+    // keeping minTxnId atomic as it is shared with heartbeat thread
+    private final AtomicLong minTxnId;
+    // max txn id does not change for a transaction batch
+    private final long maxTxnId;
+
+    /**
+     * once any operation on this batch encounters a system exception
+     * (e.g. IOException on write) it's safest to assume that we can't write to the
+     * file backing this batch any more.  This guards important public methods
+     */
+    private final AtomicBoolean isTxnClosed = new AtomicBoolean(false);
+    private String agentInfo;
+    private int numTxns;
+    /**
+     * Tracks the state of each transaction
+     */
+    private TxnState[] txnStatus;
+    /**
+     * ID of the last txn used by {@link #beginNextTransactionImpl()}
+     */
+    private long lastTxnUsed;
+
+    /**
+     * Represents a batch of transactions acquired from MetaStore
+     *
+     * @param conn - hive streaming connection
+     * @throws StreamingException if failed to create new RecordUpdater for batch
+     */
+    private TransactionBatch(HiveStreamingConnection conn) throws StreamingException {
+      boolean success = false;
+      try {
+        if (conn.isPartitionedTable() && !conn.isDynamicPartitioning()) {
+          List<FieldSchema> partKeys = conn.tableObject.getPartitionKeys();
+          partNameForLock = Warehouse.makePartName(partKeys, conn.staticPartitionValues);
+        }
+        this.conn = conn;
+        this.username = conn.username;
+        this.msClient = conn.msClient;
+        this.heartbeatMSClient = conn.heartbeatMSClient;
+        this.recordWriter = conn.recordWriter;
+        this.agentInfo = conn.agentInfo;
+        this.numTxns = conn.transactionBatchSize;
+
+        setupHeartBeatThread();
+
+        List<Long> txnIds = openTxnImpl(msClient, username, numTxns);
+        txnToWriteIds = allocateWriteIdsImpl(msClient, txnIds);
+        assert (txnToWriteIds.size() == numTxns);
+
+        txnStatus = new TxnState[numTxns];
+        for (int i = 0; i < txnStatus.length; i++) {
+          assert (txnToWriteIds.get(i).getTxnId() == txnIds.get(i));
+          txnStatus[i] = TxnState.OPEN; //Open matches Metastore state
+        }
+        this.state = TxnState.INACTIVE;
+
+        // initialize record writer with connection and write id info
+        recordWriter.init(conn, txnToWriteIds.get(0).getWriteId(), txnToWriteIds.get(numTxns - 1).getWriteId());
+        this.minTxnId = new AtomicLong(txnIds.get(0));
+        this.maxTxnId = txnIds.get(txnIds.size() - 1);
+        success = true;
+      } catch (TException e) {
+        throw new StreamingException(conn.toString(), e);
+      } finally {
+        //clean up if above throws
+        markDead(success);
+      }
+    }
+
+    private void setupHeartBeatThread() {
+      // start heartbeat thread
+      ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
+        .setNameFormat("HiveStreamingConnection-Heartbeat-Thread")
+        .build();
+      this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
+      long heartBeatInterval;
+      long initialDelay;
+      try {
+        // if HIVE_TXN_TIMEOUT is defined, heartbeat interval will be HIVE_TXN_TIMEOUT/2
+        heartBeatInterval = DbTxnManager.getHeartbeatInterval(conn.conf);
+      } catch (LockException e) {
+        heartBeatInterval = DEFAULT_HEARTBEAT_INTERVAL;
+      }
+      // to introduce some randomness and to avoid hammering the metastore at the same time (same logic as DbTxnManager)
+      initialDelay = (long) (heartBeatInterval * 0.75 * Math.random());
+      LOG.info("Starting heartbeat thread with interval: {} ms initialDelay: {} ms for agentInfo: {}",
+        heartBeatInterval, initialDelay, conn.agentInfo);
+      Runnable runnable = new HeartbeatRunnable(heartbeatMSClient, minTxnId, maxTxnId, transactionLock, isTxnClosed);
+      this.scheduledExecutorService.scheduleWithFixedDelay(runnable, initialDelay, heartBeatInterval, TimeUnit
+        .MILLISECONDS);
+    }
+
+    private List<Long> openTxnImpl(final IMetaStoreClient msClient, final String user, final int numTxns)
+      throws TException {
+      return msClient.openTxns(user, numTxns).getTxn_ids();
+    }
+
+    private List<TxnToWriteId> allocateWriteIdsImpl(final IMetaStoreClient msClient,
+      final List<Long> txnIds) throws TException {
+      return msClient.allocateTableWriteIdsBatch(txnIds, conn.database, conn.table);
+    }
+
+    @Override
+    public String toString() {
+      if (txnToWriteIds == null || txnToWriteIds.isEmpty()) {
+        return "{}";
+      }
+      StringBuilder sb = new StringBuilder(" TxnStatus[");
+      for (TxnState state : txnStatus) {
+        //'state' should not be null - future proofing
+        sb.append(state == null ? "N" : state);
+      }
+      sb.append("] LastUsed ").append(JavaUtils.txnIdToString(lastTxnUsed));
+      return "TxnId/WriteIds=[" + txnToWriteIds.get(0).getTxnId()
+        + "/" + txnToWriteIds.get(0).getWriteId()
+        + "..."
+        + txnToWriteIds.get(txnToWriteIds.size() - 1).getTxnId()
+        + "/" + txnToWriteIds.get(txnToWriteIds.size() - 1).getWriteId()
+        + "] on connection = " + conn + "; " + sb;
+    }
+
+    private void beginNextTransaction() throws StreamingException {
+      checkIsClosed();
+      beginNextTransactionImpl();
+    }
+
+    private void beginNextTransactionImpl() throws TransactionError {
+      state = TxnState.INACTIVE;//clear state from previous txn
+      if ((currentTxnIndex + 1) >= txnToWriteIds.size()) {
+        throw new InvalidTransactionState("No more transactions available in" +
+          " next batch for connection: " + conn + " user: " + username);
+      }
+      currentTxnIndex++;
+      state = TxnState.OPEN;
+      lastTxnUsed = getCurrentTxnId();
+      lockRequest = createLockRequest(conn, partNameForLock, username, getCurrentTxnId(), agentInfo);
+      try {
+        LockResponse res = msClient.lock(lockRequest);
+        if (res.getState() != LockState.ACQUIRED) {
+          throw new TransactionError("Unable to acquire lock on " + conn);
+        }
+      } catch (TException e) {
+        throw new TransactionError("Unable to acquire lock on " + conn, e);
+      }
+    }
+
+    long getCurrentTxnId() {
+      if (currentTxnIndex >= 0) {
+        return txnToWriteIds.get(currentTxnIndex).getTxnId();
+      }
+      return -1L;
+    }
+
+    long getCurrentWriteId() {
+      if (currentTxnIndex >= 0) {
+        return txnToWriteIds.get(currentTxnIndex).getWriteId();
+      }
+      return -1L;
+    }
+
+    TxnState getCurrentTransactionState() {
+      return state;
+    }
+
+    int remainingTransactions() {
+      if (currentTxnIndex >= 0) {
+        return txnToWriteIds.size() - currentTxnIndex - 1;
+      }
+      return txnToWriteIds.size();
+    }
+
+
+    public void write(final byte[] record) throws StreamingException {
+      checkIsClosed();
+      boolean success = false;
+      try {
+        recordWriter.write(getCurrentWriteId(), record);
+        success = true;
+      } catch (SerializationError ex) {
+        //this exception indicates that a {@code record} could not be parsed and the
+        //caller can decide whether to drop it or send it to dead letter queue.
+        //rolling back the txn and retrying won't help since the tuple will be exactly the same
+        //when it's replayed.
+        success = true;
+        throw ex;
+      } finally {
+        markDead(success);
+      }
+    }
+
+    private void checkIsClosed() throws StreamingException {
+      if (isTxnClosed.get()) {
+        throw new StreamingException("Transaction" + toString() + " is closed()");
+      }
+    }
+
+    /**
+     * A transaction batch opens a single HDFS file and writes multiple transaction to it.  If there is any issue
+     * with the write, we can't continue to write to the same file any as it may be corrupted now (at the tail).
+     * This ensures that a client can't ignore these failures and continue to write.
+     */
+    private void markDead(boolean success) {
+      if (success) {
+        return;
+      }
+      isTxnClosed.set(true); //also ensures that heartbeat() is no-op since client is likely doing it async
+      try {
+        abort(true);//abort all remaining txns
+      } catch (Exception ex) {
+        LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex);
+      }
+      try {
+        closeImpl();
+      } catch (Exception ex) {
+        LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex);
+      }
+    }
+
+
+    void commit() throws StreamingException {
+      checkIsClosed();
+      boolean success = false;
+      try {
+        commitImpl();
+        success = true;
+      } finally {
+        markDead(success);
+      }
+    }
+
+    private void commitImpl() throws StreamingException {
+      try {
+        recordWriter.flush();
+        TxnToWriteId txnToWriteId = txnToWriteIds.get(currentTxnIndex);
+        if (conn.isDynamicPartitioning()) {
+          List<String> partNames = new ArrayList<>(recordWriter.getPartitions());
+          msClient.addDynamicPartitions(txnToWriteId.getTxnId(), txnToWriteId.getWriteId(), conn.database, conn.table,
+            partNames, DataOperationType.INSERT);
+        }
+        transactionLock.lock();
+        try {
+          msClient.commitTxn(txnToWriteId.getTxnId());
+          // increment the min txn id so that heartbeat thread will heartbeat only from the next open transaction.
+          // the current transaction is going to committed or fail, so don't need heartbeat for current transaction.
+          if (currentTxnIndex + 1 < txnToWriteIds.size()) {
+            minTxnId.set(txnToWriteIds.get(currentTxnIndex + 1).getTxnId());
+          } else {
+            // exhausted the batch, no longer have to heartbeat for current txn batch
+            minTxnId.set(-1);
+          }
+        } finally {
+          transactionLock.unlock();
+        }
+        state = TxnState.COMMITTED;
+        txnStatus[currentTxnIndex] = TxnState.COMMITTED;
+      } catch (NoSuchTxnException e) {
+        throw new TransactionError("Invalid transaction id : "
+          + getCurrentTxnId(), e);
+      } catch (TxnAbortedException e) {
+        throw new TransactionError("Aborted transaction cannot be committed"
+          , e);
+      } catch (TException e) {
+        throw new TransactionError("Unable to commitTransaction transaction"
+          + getCurrentTxnId(), e);
+      }
+    }
+
+    void abort() throws StreamingException {
+      if (isTxnClosed.get()) {
+        /*
+         * isDead is only set internally by this class.  {@link #markDead(boolean)} will abort all
+         * remaining txns, so make this no-op to make sure that a well-behaved client that calls abortTransaction()
+         * error doesn't get misleading errors
+         */
+        return;
+      }
+      abort(false);
+    }
+
+    private void abort(final boolean abortAllRemaining) throws StreamingException {
+      abortImpl(abortAllRemaining);
+    }
+
+    private void abortImpl(boolean abortAllRemaining) throws StreamingException {
+      transactionLock.lock();
+      try {
+        if (abortAllRemaining) {
+          // we are aborting all txns in the current batch, so no need to heartbeat
+          minTxnId.set(-1);
+          //when last txn finished (abortTransaction/commitTransaction) the currentTxnIndex is pointing at that txn
+          //so we need to start from next one, if any.  Also if batch was created but
+          //fetchTransactionBatch() was never called, we want to start with first txn
+          int minOpenTxnIndex = Math.max(currentTxnIndex +
+            (state == TxnState.ABORTED || state == TxnState.COMMITTED ? 1 : 0), 0);
+          for (currentTxnIndex = minOpenTxnIndex;
+            currentTxnIndex < txnToWriteIds.size(); currentTxnIndex++) {
+            msClient.rollbackTxn(txnToWriteIds.get(currentTxnIndex).getTxnId());
+            txnStatus[currentTxnIndex] = TxnState.ABORTED;
+          }
+          currentTxnIndex--;//since the loop left it == txnToWriteIds.size()
+        } else {
+          // we are aborting only the current transaction, so move the min range for heartbeat or disable heartbeat
+          // if the current txn is last in the batch.
+          if (currentTxnIndex + 1 < txnToWriteIds.size()) {
+            minTxnId.set(txnToWriteIds.get(currentTxnIndex + 1).getTxnId());
+          } else {
+            // exhausted the batch, no longer have to heartbeat
+            minTxnId.set(-1);
+          }
+          long currTxnId = getCurrentTxnId();
+          if (currTxnId > 0) {
+            msClient.rollbackTxn(currTxnId);
+            txnStatus[currentTxnIndex] = TxnState.ABORTED;
+          }
+        }
+        state = TxnState.ABORTED;
+      } catch (NoSuchTxnException e) {
+        throw new TransactionError("Unable to abort invalid transaction id : "
+          + getCurrentTxnId(), e);
+      } catch (TException e) {
+        throw new TransactionError("Unable to abort transaction id : "
+          + getCurrentTxnId(), e);
+      } finally {
+        transactionLock.unlock();
+      }
+    }
+
+    public boolean isClosed() {
+      return isTxnClosed.get();
+    }
+
+    /**
+     * Close the TransactionBatch.  This will abort any still open txns in this batch.
+     *
+     * @throws StreamingException - failure when closing transaction batch
+     */
+    public void close() throws StreamingException {
+      if (isTxnClosed.get()) {
+        return;
+      }
+      isTxnClosed.set(true);
+      abortImpl(true);
+      closeImpl();
+    }
+
+    private void closeImpl() throws StreamingException {
+      state = TxnState.INACTIVE;
+      recordWriter.close();
+      if (scheduledExecutorService != null) {
+        scheduledExecutorService.shutdownNow();
+      }
+    }
+
+    static LockRequest createLockRequest(final HiveStreamingConnection connection,
+      String partNameForLock, String user, long txnId, String agentInfo) {
+      LockRequestBuilder requestBuilder = new LockRequestBuilder(agentInfo);
+      requestBuilder.setUser(user);
+      requestBuilder.setTransactionId(txnId);
+
+      LockComponentBuilder lockCompBuilder = new LockComponentBuilder()
+        .setDbName(connection.database)
+        .setTableName(connection.table)
+        .setShared()
+        .setOperationType(DataOperationType.INSERT);
+      if (connection.isDynamicPartitioning()) {
+        lockCompBuilder.setIsDynamicPartitionWrite(true);
+      }
+      if (partNameForLock != null && !partNameForLock.isEmpty()) {
+        lockCompBuilder.setPartitionName(partNameForLock);
+      }
+      requestBuilder.addLockComponent(lockCompBuilder.build());
+
+      return requestBuilder.build();
+    }
+  }
+
+  private HiveConf createHiveConf(Class<?> clazz, String metaStoreUri) {
+    HiveConf conf = new HiveConf(clazz);
+    if (metaStoreUri != null) {
+      conf.set(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName(), metaStoreUri);
+    }
+    return conf;
+  }
+
+  private void overrideConfSettings(HiveConf conf) {
+    setHiveConf(conf, HiveConf.ConfVars.HIVE_TXN_MANAGER, DbTxnManager.class.getName());
+    setHiveConf(conf, HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
+    setHiveConf(conf, MetastoreConf.ConfVars.EXECUTE_SET_UGI.getHiveName());
+    // Avoids creating Tez Client sessions internally as it takes much longer currently
+    setHiveConf(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "mr");
+    setHiveConf(conf, HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
+    if (streamingOptimizations) {
+      setHiveConf(conf, HiveConf.ConfVars.HIVE_ORC_DELTA_STREAMING_OPTIMIZATIONS_ENABLED, true);
+    }
+    // since same thread creates metastore client for streaming connection thread and heartbeat thread we explicitly
+    // disable metastore client cache
+    setHiveConf(conf, HiveConf.ConfVars.METASTORE_CLIENT_CACHE_ENABLED, false);
+  }
+
+  private static void setHiveConf(HiveConf conf, HiveConf.ConfVars var, String value) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Overriding HiveConf setting : " + var + " = " + value);
+    }
+    conf.setVar(var, value);
+  }
+
+  private static void setHiveConf(HiveConf conf, HiveConf.ConfVars var, boolean value) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Overriding HiveConf setting : " + var + " = " + value);
+    }
+    conf.setBoolVar(var, true);
+  }
+
+  private static void setHiveConf(HiveConf conf, String var) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Overriding HiveConf setting : " + var + " = " + true);
+    }
+    conf.setBoolean(var, true);
+  }
+
+  @Override
+  public HiveConf getHiveConf() {
+    return conf;
+  }
+
+  @Override
+  public String getMetastoreUri() {
+    return metastoreUri;
+  }
+
+  @Override
+  public String getDatabase() {
+    return database;
+  }
+
+  @Override
+  public String getTable() {
+    return table;
+  }
+
+  @Override
+  public List<String> getStaticPartitionValues() {
+    return staticPartitionValues;
+  }
+
+  @Override
+  public String getAgentInfo() {
+    return agentInfo;
+  }
+
+  @Override
+  public boolean isPartitionedTable() {
+    return isPartitionedTable;
+  }
+
+  @Override
+  public boolean isDynamicPartitioning() {
+    return isPartitionedTable() && (staticPartitionValues == null || staticPartitionValues.isEmpty());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/bf8d305a/streaming/src/java/org/apache/hive/streaming/ImpersonationFailed.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/ImpersonationFailed.java b/streaming/src/java/org/apache/hive/streaming/ImpersonationFailed.java
deleted file mode 100644
index 23e17e7..0000000
--- a/streaming/src/java/org/apache/hive/streaming/ImpersonationFailed.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hive.streaming;
-
-public class ImpersonationFailed extends StreamingException {
-  public ImpersonationFailed(String username, Exception e) {
-    super("Failed to impersonate user " + username, e);
-  }
-}