You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2018/10/11 23:23:55 UTC

[1/2] hive git commit: HIVE-20291: Allow HiveStreamingConnection to receive a WriteId (Jaume Marhuenda reviewed by Prasanth Jayachandran)

Repository: hive
Updated Branches:
  refs/heads/master 7c4d48ec2 -> bdbd3bcff


http://git-wip-us.apache.org/repos/asf/hive/blob/bdbd3bcf/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
index 8b5e508..1c9e43f 100644
--- a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
+++ b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
@@ -35,6 +35,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -60,13 +61,13 @@ import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
 import org.apache.hadoop.hive.metastore.api.LockState;
 import org.apache.hadoop.hive.metastore.api.LockType;
 import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
 import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
 import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
 import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
 import org.apache.hadoop.hive.metastore.api.TxnInfo;
-import org.apache.hadoop.hive.metastore.api.TxnState;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
@@ -82,6 +83,7 @@ import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
 import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
 import org.apache.hadoop.hive.ql.io.orc.Reader;
 import org.apache.hadoop.hive.ql.io.orc.RecordReader;
+import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.txn.compactor.Worker;
@@ -150,7 +152,8 @@ public class TestStreaming {
       if (file.canExecute()) {
         mod |= 0111;
       }
-      return new FileStatus(file.length(), file.isDirectory(), 1, 1024,
+      return new FileStatus(file.length(), file.isDirectory(),
+          1, 1024,
         file.lastModified(), file.lastModified(),
         FsPermission.createImmutable(mod), "owen", "users", path);
     }
@@ -419,6 +422,123 @@ public class TestStreaming {
   }
 
   @Test
+  public void testGetDeltaPath() throws Exception {
+    StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
+        .withFieldDelimiter(',')
+        .build();
+    HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
+        .withDatabase(dbName)
+        .withTable(tblName)
+        .withRecordWriter(writer)
+        .withHiveConf(conf)
+        .connect();
+    Path path = connection.getDeltaFileLocation(partitionVals, 0,
+        5L, 5L, 9);
+    Assert.assertTrue(path.toString().endsWith("testing.db/alerts/continent"
+        + "=Asia/country=India/delta_0000005_0000005_0009/bucket_00000"));
+  }
+
+  @Test
+  public void testConnectionWithWriteId() throws Exception {
+    queryTable(driver, "drop table if exists default.writeidconnection");
+    queryTable(driver, "create table default.writeidconnection (a string, b string) stored as orc " +
+        "TBLPROPERTIES('transactional'='true')");
+    queryTable(driver, "insert into default.writeidconnection values('a0','bar')");
+
+    List<String> rs = queryTable(driver, "select * from default.writeidconnection");
+    Assert.assertEquals(1, rs.size());
+    Assert.assertEquals("a0\tbar", rs.get(0));
+
+    StrictDelimitedInputWriter writerT = StrictDelimitedInputWriter.newBuilder()
+        .withFieldDelimiter(',')
+        .build();
+    HiveStreamingConnection transactionConnection = HiveStreamingConnection.newBuilder()
+        .withDatabase("Default")
+        .withTable("writeidconnection")
+        .withRecordWriter(writerT)
+        .withHiveConf(conf)
+        .connect();
+    transactionConnection.beginTransaction();
+
+    Table tObject = transactionConnection.getTable();
+    Long writeId = transactionConnection.getCurrentWriteId();
+
+    Assert.assertNotNull(tObject);
+    Assert.assertNotNull(writeId);
+
+    StrictDelimitedInputWriter writerOne = StrictDelimitedInputWriter.newBuilder()
+        .withFieldDelimiter(',')
+        .build();
+    HiveStreamingConnection connectionOne = HiveStreamingConnection.newBuilder()
+        .withDatabase("Default")
+        .withTable("writeidconnection")
+        .withRecordWriter(writerOne)
+        .withHiveConf(conf)
+        .withWriteId(writeId)
+        .withStatementId(1)
+        .withTableObject(tObject)
+        .connect();
+
+    StrictDelimitedInputWriter writerTwo = StrictDelimitedInputWriter.newBuilder()
+        .withFieldDelimiter(',')
+        .build();
+    HiveStreamingConnection connectionTwo = HiveStreamingConnection.newBuilder()
+        .withDatabase("Default")
+        .withRecordWriter(writerTwo)
+        .withHiveConf(conf)
+        .withWriteId(writeId)
+        .withStatementId(2)
+        .withTableObject(tObject)
+        .connect();
+
+    Assert.assertNotNull(connectionOne);
+    Assert.assertNotNull(connectionTwo);
+
+    connectionOne.beginTransaction();
+    connectionTwo.beginTransaction();
+    connectionOne.write("a1,b2".getBytes());
+    connectionTwo.write("a5,b6".getBytes());
+    connectionOne.write("a3,b4".getBytes());
+    connectionOne.commitTransaction();
+    connectionTwo.commitTransaction();
+
+    Assert.assertEquals(HiveStreamingConnection.TxnState.PREPARED_FOR_COMMIT,
+        connectionOne.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.PREPARED_FOR_COMMIT,
+        connectionTwo.getCurrentTransactionState());
+
+    try {
+      connectionOne.beginTransaction();
+      Assert.fail("second beginTransaction should have thrown a "
+          + "StreamingException");
+    } catch (StreamingException e) {
+
+    }
+
+    connectionOne.close();
+    connectionTwo.close();
+
+    rs = queryTable(driver, "select ROW__ID, a, b, "
+        + "INPUT__FILE__NAME from default.writeidconnection order by ROW__ID");
+    // Nothing here since it hasn't been committed
+    Assert.assertEquals(1, rs.size());
+
+    transactionConnection.commitTransaction();
+
+    rs = queryTable(driver, "select ROW__ID, a, b, "
+        + "INPUT__FILE__NAME from default.writeidconnection order by a");
+    Assert.assertEquals(4, rs.size());
+    Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\ta0\tbar"));
+    Assert.assertTrue(rs.get(0), rs.get(0).endsWith("bucket_00000"));
+    Assert.assertTrue(rs.get(1), rs.get(1).contains("\"rowid\":0}\ta1\tb2"));
+    Assert.assertTrue(rs.get(1), rs.get(1).endsWith("bucket_00000"));
+    Assert.assertTrue(rs.get(2), rs.get(2).contains("\"rowid\":1}\ta3\tb4"));
+    Assert.assertTrue(rs.get(2), rs.get(2).endsWith("bucket_00000"));
+    Assert.assertTrue(rs.get(3), rs.get(3).contains("\ta5\tb6"));
+    Assert.assertTrue(rs.get(3), rs.get(3).endsWith("bucket_00000"));
+  }
+
+  @Test
   public void testAllTypesDelimitedWriter() throws Exception {
     queryTable(driver, "drop table if exists default.alltypes");
     queryTable(driver,
@@ -619,7 +739,7 @@ public class TestStreaming {
   }
 
   /**
-   * this is a clone from TestTxnStatement2....
+   * this is a clone from TestHiveStreamingConnection.TxnStatement2....
    */
   public static void runWorker(HiveConf hiveConf) throws MetaException {
     AtomicBoolean stop = new AtomicBoolean(true);
@@ -956,12 +1076,159 @@ public class TestStreaming {
     // Create partition
     Assert.assertNotNull(connection);
 
+    connection.beginTransaction();
+    connection.write("3,Hello streaming - once again".getBytes());
+    connection.commitTransaction();
+
+    // Ensure partition is present
+    Partition p = msClient.getPartition(dbName, tblName, newPartVals);
+    Assert.assertNotNull("Did not find added partition", p);
+  }
+
+  @Test
+  public void testAddPartitionWithWriteId() throws Exception {
+    List<String> newPartVals = new ArrayList<String>(2);
+    newPartVals.add("WriteId_continent");
+    newPartVals.add("WriteId_country");
+
+    StrictDelimitedInputWriter writerT = StrictDelimitedInputWriter.newBuilder()
+        .withFieldDelimiter(',')
+        .build();
+    HiveStreamingConnection transactionConnection = HiveStreamingConnection.newBuilder()
+        .withDatabase(dbName)
+        .withTable(tblName)
+        .withStaticPartitionValues(newPartVals)
+        .withRecordWriter(writerT)
+        .withHiveConf(conf)
+        .connect();
+    transactionConnection.beginTransaction();
+
+    Table tObject = transactionConnection.getTable();
+    Long writeId = transactionConnection.getCurrentWriteId();
+
+    Assert.assertNotNull(tObject);
+    Assert.assertNotNull(writeId);
+
+    StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
+        .withFieldDelimiter(',')
+        .build();
+    HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
+        .withDatabase(dbName)
+        .withTable(tblName)
+        .withStaticPartitionValues(newPartVals)
+        .withRecordWriter(writer)
+        .withHiveConf(conf)
+        .withWriteId(writeId)
+        .withStatementId(1)
+        .withTableObject(tObject)
+        .connect();
+
+    Assert.assertNotNull(connection);
+
+    connection.beginTransaction();
+    connection.write("3,Hello streaming - once again".getBytes());
+    connection.commitTransaction();
+
+    Set<String> partitions = new HashSet<>(connection.getPartitions());
+
+    connection.close();
+
+    // Ensure partition is not present
+    try {
+      msClient.getPartition(dbName, tblName, newPartVals);
+      Assert.fail("Partition shouldn't exist so a NoSuchObjectException should have been raised");
+    } catch (NoSuchObjectException e) {}
+
+    transactionConnection.commitTransactionWithPartition(partitions);
+
     // Ensure partition is present
-    Partition p = msClient.getPartition(dbName, tblName, partitionVals);
+    Partition p = msClient.getPartition(dbName, tblName, newPartVals);
     Assert.assertNotNull("Did not find added partition", p);
   }
 
   @Test
+  public void testAddDynamicPartitionWithWriteId() throws Exception {
+    queryTable(driver, "drop table if exists default.writeiddynamic");
+    queryTable(driver, "create table default.writeiddynamic (a"
+        + " string, b string) partitioned by (c string, d string)"
+        + " stored as orc TBLPROPERTIES('transactional'='true')");
+
+    StrictDelimitedInputWriter writerT =
+        StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
+    HiveStreamingConnection transactionConnection =
+        HiveStreamingConnection.newBuilder().withDatabase("default")
+            .withTable("writeiddynamic").withRecordWriter(writerT)
+            .withHiveConf(conf).connect();
+    transactionConnection.beginTransaction();
+
+    Table tObject = transactionConnection.getTable();
+    Long writeId = transactionConnection.getCurrentWriteId();
+
+    Assert.assertNotNull(tObject);
+    Assert.assertNotNull(writeId);
+
+    StrictDelimitedInputWriter writerOne =
+        StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
+    HiveStreamingConnection connectionOne =
+        HiveStreamingConnection.newBuilder().withDatabase("default")
+            .withTable("writeiddynamic").withRecordWriter(writerOne)
+            .withHiveConf(conf).withWriteId(writeId).withStatementId(1)
+            .withTableObject(tObject).connect();
+
+    StrictDelimitedInputWriter writerTwo =
+        StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
+    HiveStreamingConnection connectionTwo =
+        HiveStreamingConnection.newBuilder().withDatabase("default")
+            .withTable("writeiddynamic")
+            .withRecordWriter(writerTwo)
+            .withHiveConf(conf).withWriteId(writeId).withStatementId(1)
+            .withTableObject(tObject)
+            .connect();
+
+    Assert.assertNotNull(connectionOne);
+
+    connectionTwo.beginTransaction();
+    connectionOne.beginTransaction();
+    connectionOne.write("1,2,3,4".getBytes());
+    connectionOne.write("1,2,5,6".getBytes());
+    connectionTwo.write("1,2,30,40".getBytes());
+    connectionOne.write("1,2,7,8".getBytes());
+    connectionTwo.write("1,2,50,60".getBytes());
+    connectionOne.write("1,2,9,10".getBytes());
+    connectionOne.commitTransaction();
+    connectionTwo.commitTransaction();
+
+    Set<String> partitionsOne = new HashSet<>(connectionOne.getPartitions());
+    Assert.assertEquals(4, partitionsOne.size());
+
+    Set<String> partitionsTwo = new HashSet<>(connectionTwo.getPartitions());
+    Assert.assertEquals(2, partitionsTwo.size());
+
+    connectionOne.close();
+    connectionTwo.close();
+
+    try {
+      String partitionName = partitionsOne.iterator().next();
+      msClient.getPartition("default", "writeiddynamic", partitionName);
+      Assert.fail(
+          "Partition shouldn't exist so a NoSuchObjectException should have been raised");
+    } catch (NoSuchObjectException e) {
+    }
+
+    partitionsOne.addAll(partitionsTwo);
+    Set<String> allPartitions = partitionsOne;
+    transactionConnection.commitTransactionWithPartition(allPartitions);
+
+    // Ensure partition is present
+    for (String partition : allPartitions) {
+      Partition p =
+          msClient.getPartition("default", "writeiddynamic",
+              partition);
+      Assert.assertNotNull("Did not find added partition", p);
+    }
+  }
+
+  @Test
   public void testTransactionBatchEmptyCommit() throws Exception {
     // 1)  to partitioned table
     StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
@@ -977,8 +1244,8 @@ public class TestStreaming {
       .connect();
     connection.beginTransaction();
     connection.commitTransaction();
-    Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED
-      , connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
+        connection.getCurrentTransactionState());
     connection.close();
 
     // 2) To unpartitioned table
@@ -995,8 +1262,8 @@ public class TestStreaming {
 
     connection.beginTransaction();
     connection.commitTransaction();
-    Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED
-      , connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
+        connection.getCurrentTransactionState());
     connection.close();
   }
 
@@ -1115,8 +1382,8 @@ public class TestStreaming {
 
     connection.beginTransaction();
     connection.abortTransaction();
-    Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED
-      , connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED,
+        connection.getCurrentTransactionState());
     connection.close();
 
     // 2) to unpartitioned table
@@ -1133,8 +1400,8 @@ public class TestStreaming {
 
     connection.beginTransaction();
     connection.abortTransaction();
-    Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED
-      , connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED,
+        connection.getCurrentTransactionState());
     connection.close();
   }
 
@@ -1156,20 +1423,20 @@ public class TestStreaming {
 
     // 1st Txn
     connection.beginTransaction();
-    Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN
-      , connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN,
+        connection.getCurrentTransactionState());
     connection.write("1,Hello streaming".getBytes());
     connection.commitTransaction();
 
     checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
 
-    Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED
-      , connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
+        connection.getCurrentTransactionState());
 
     // 2nd Txn
     connection.beginTransaction();
-    Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN
-      , connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN,
+        connection.getCurrentTransactionState());
     connection.write("2,Welcome to streaming".getBytes());
 
     // data should not be visible
@@ -1182,8 +1449,8 @@ public class TestStreaming {
 
     connection.close();
 
-    Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE
-      , connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE,
+        connection.getCurrentTransactionState());
 
 
     // To Unpartitioned table
@@ -1199,13 +1466,13 @@ public class TestStreaming {
       .connect();
     // 1st Txn
     connection.beginTransaction();
-    Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN
-      , connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN,
+        connection.getCurrentTransactionState());
     connection.write("1,Hello streaming".getBytes());
     connection.commitTransaction();
 
-    Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED
-      , connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
+        connection.getCurrentTransactionState());
     connection.close();
   }
 
@@ -1227,20 +1494,20 @@ public class TestStreaming {
 
     // 1st Txn
     connection.beginTransaction();
-    Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN
-      , connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN,
+        connection.getCurrentTransactionState());
     connection.write("1,Hello streaming".getBytes());
     connection.commitTransaction();
 
     checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
 
-    Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED
-      , connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
+        connection.getCurrentTransactionState());
 
     // 2nd Txn
     connection.beginTransaction();
-    Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN
-      , connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN,
+        connection.getCurrentTransactionState());
     connection.write("2,Welcome to streaming".getBytes());
 
     // data should not be visible
@@ -1252,8 +1519,8 @@ public class TestStreaming {
       "{2, Welcome to streaming}");
 
     connection.close();
-    Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE
-      , connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE,
+        connection.getCurrentTransactionState());
 
     // To Unpartitioned table
     regex = "([^:]*):(.*)";
@@ -1271,13 +1538,13 @@ public class TestStreaming {
 
     // 1st Txn
     connection.beginTransaction();
-    Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN
-      , connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN,
+        connection.getCurrentTransactionState());
     connection.write("1:Hello streaming".getBytes());
     connection.commitTransaction();
 
-    Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED
-      , connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
+        connection.getCurrentTransactionState());
     connection.close();
   }
 
@@ -1328,20 +1595,20 @@ public class TestStreaming {
 
     // 1st Txn
     connection.beginTransaction();
-    Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN
-      , connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN,
+        connection.getCurrentTransactionState());
     String rec1 = "{\"id\" : 1, \"msg\": \"Hello streaming\"}";
     connection.write(rec1.getBytes());
     connection.commitTransaction();
 
     checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
 
-    Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED
-      , connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
+        connection.getCurrentTransactionState());
 
     connection.close();
-    Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE
-      , connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE,
+        connection.getCurrentTransactionState());
 
     List<String> rs = queryTable(driver, "select * from " + dbName + "." + tblName);
     Assert.assertEquals(1, rs.size());
@@ -1399,20 +1666,20 @@ public class TestStreaming {
       connection.beginTransaction();
       Assert.assertEquals(--initialCount, connection.remainingTransactions());
       for (int rec = 0; rec < 2; ++rec) {
-        Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN
-          , connection.getCurrentTransactionState());
+        Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN,
+            connection.getCurrentTransactionState());
         connection.write((batch * rec + ",Hello streaming").getBytes());
       }
       connection.commitTransaction();
-      Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED
-        , connection.getCurrentTransactionState());
+      Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
+          connection.getCurrentTransactionState());
       ++batch;
     }
     Assert.assertEquals(0, connection.remainingTransactions());
     connection.close();
 
-    Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE
-      , connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE,
+        connection.getCurrentTransactionState());
 
     connection = HiveStreamingConnection.newBuilder()
       .withDatabase(dbName)
@@ -1430,20 +1697,20 @@ public class TestStreaming {
       connection.beginTransaction();
       Assert.assertEquals(--initialCount, connection.remainingTransactions());
       for (int rec = 0; rec < 2; ++rec) {
-        Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN
-          , connection.getCurrentTransactionState());
+        Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN,
+            connection.getCurrentTransactionState());
         connection.write((batch * rec + ",Hello streaming").getBytes());
       }
       connection.abortTransaction();
-      Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED
-        , connection.getCurrentTransactionState());
+      Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED,
+          connection.getCurrentTransactionState());
       ++batch;
     }
     Assert.assertEquals(0, connection.remainingTransactions());
     connection.close();
 
-    Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE
-      , connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE,
+        connection.getCurrentTransactionState());
   }
 
   @Test
@@ -1468,8 +1735,8 @@ public class TestStreaming {
 
     checkNothingWritten(partLoc);
 
-    Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED
-      , connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED,
+        connection.getCurrentTransactionState());
 
     connection.close();
 
@@ -1507,8 +1774,8 @@ public class TestStreaming {
 
     checkNothingWritten(partLoc);
 
-    Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED
-      , connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED,
+        connection.getCurrentTransactionState());
 
     connection.beginTransaction();
     connection.write("1,Hello streaming".getBytes());
@@ -1577,8 +1844,8 @@ public class TestStreaming {
       "2\tWelcome to streaming", "3\tHello streaming - once again",
       "4\tWelcome to streaming - once again");
 
-    Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED
-      , connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
+        connection.getCurrentTransactionState());
 
     connection.close();
   }
@@ -1690,10 +1957,10 @@ public class TestStreaming {
       "3\tHello streaming - once again",
       "4\tWelcome to streaming - once again");
 
-    Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED
-      , connection.getCurrentTransactionState());
-    Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED
-      , connection2.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
+        connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
+        connection2.getCurrentTransactionState());
 
     connection.close();
     connection2.close();
@@ -2511,7 +2778,8 @@ public class TestStreaming {
   }
 
   @Test
-  public void testErrorHandling() throws Exception {
+  public void testErrorHandling()
+      throws Exception {
     String agentInfo = "UT_" + Thread.currentThread().getName();
     runCmdOnDriver("create database testErrors");
     runCmdOnDriver("use testErrors");
@@ -2538,8 +2806,12 @@ public class TestStreaming {
     GetOpenTxnsInfoResponse r = msClient.showTxns();
     Assert.assertEquals("HWM didn'table match", 17, r.getTxn_high_water_mark());
     List<TxnInfo> ti = r.getOpen_txns();
-    Assert.assertEquals("wrong status ti(0)", TxnState.ABORTED, ti.get(0).getState());
-    Assert.assertEquals("wrong status ti(1)", TxnState.ABORTED, ti.get(1).getState());
+    Assert.assertEquals("wrong status ti(0)",
+        org.apache.hadoop.hive.metastore.api.TxnState.ABORTED,
+        ti.get(0).getState());
+    Assert.assertEquals("wrong status ti(1)",
+        org.apache.hadoop.hive.metastore.api.TxnState.ABORTED,
+        ti.get(1).getState());
 
 
     try {
@@ -2621,10 +2893,16 @@ public class TestStreaming {
     r = msClient.showTxns();
     Assert.assertEquals("HWM didn't match", 19, r.getTxn_high_water_mark());
     ti = r.getOpen_txns();
-    Assert.assertEquals("wrong status ti(0)", TxnState.ABORTED, ti.get(0).getState());
-    Assert.assertEquals("wrong status ti(1)", TxnState.ABORTED, ti.get(1).getState());
+    Assert.assertEquals("wrong status ti(0)",
+        org.apache.hadoop.hive.metastore.api.TxnState.ABORTED,
+        ti.get(0).getState());
+    Assert.assertEquals("wrong status ti(1)",
+        org.apache.hadoop.hive.metastore.api.TxnState.ABORTED,
+        ti.get(1).getState());
     //txnid 3 was committed and thus not open
-    Assert.assertEquals("wrong status ti(2)", TxnState.ABORTED, ti.get(2).getState());
+    Assert.assertEquals("wrong status ti(2)",
+        org.apache.hadoop.hive.metastore.api.TxnState.ABORTED,
+        ti.get(2).getState());
     connection.close();
 
     writer.disableErrors();
@@ -2651,8 +2929,12 @@ public class TestStreaming {
     r = msClient.showTxns();
     Assert.assertEquals("HWM didn'table match", 21, r.getTxn_high_water_mark());
     ti = r.getOpen_txns();
-    Assert.assertEquals("wrong status ti(3)", TxnState.ABORTED, ti.get(3).getState());
-    Assert.assertEquals("wrong status ti(4)", TxnState.ABORTED, ti.get(4).getState());
+    Assert.assertEquals("wrong status ti(3)",
+        org.apache.hadoop.hive.metastore.api.TxnState.ABORTED,
+        ti.get(3).getState());
+    Assert.assertEquals("wrong status ti(4)",
+        org.apache.hadoop.hive.metastore.api.TxnState.ABORTED,
+        ti.get(4).getState());
   }
 
   // assumes un partitioned table
@@ -2953,5 +3235,12 @@ public class TestStreaming {
     void disableErrors() {
       shouldThrow = false;
     }
+
+    @Override
+    public Path getDeltaFileLocation(List<String> partitionValues,
+        Integer bucketId, Long minWriteId, Long maxWriteId, Integer statementId,
+        Table table) throws StreamingException {
+      return null;
+    }
   }
 }


[2/2] hive git commit: HIVE-20291: Allow HiveStreamingConnection to receive a WriteId (Jaume Marhuenda reviewed by Prasanth Jayachandran)

Posted by pr...@apache.org.
HIVE-20291: Allow HiveStreamingConnection to receive a WriteId (Jaume Marhuenda reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/bdbd3bcf
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/bdbd3bcf
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/bdbd3bcf

Branch: refs/heads/master
Commit: bdbd3bcffac9f7fe1d3babb45eb40547b1499bb5
Parents: 7c4d48e
Author: Jaume Marhuenda <ja...@gmail.com>
Authored: Thu Oct 11 16:22:22 2018 -0700
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Thu Oct 11 16:23:34 2018 -0700

----------------------------------------------------------------------
 .../hive/streaming/AbstractRecordWriter.java    |  65 +-
 .../streaming/AbstractStreamingTransaction.java | 156 +++++
 .../apache/hive/streaming/ConnectionStats.java  |  38 +-
 .../hive/streaming/HiveStreamingConnection.java | 697 ++++++-------------
 .../hive/streaming/InvalidTransactionState.java |   5 +-
 .../apache/hive/streaming/PartitionInfo.java    |   1 +
 .../org/apache/hive/streaming/RecordWriter.java |  38 +-
 .../hive/streaming/StreamingConnection.java     |  43 ++
 .../hive/streaming/StreamingTransaction.java    | 113 +++
 .../apache/hive/streaming/TransactionBatch.java | 430 ++++++++++++
 .../apache/hive/streaming/TransactionError.java |   7 +-
 .../streaming/UnManagedSingleTransaction.java   | 135 ++++
 .../org/apache/hive/streaming/package-info.java |  22 +
 .../java/org/apache/hive/streaming/package.html |   3 +-
 .../apache/hive/streaming/TestStreaming.java    | 433 ++++++++++--
 15 files changed, 1616 insertions(+), 570 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/bdbd3bcf/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
index 9e90d36..88a7d82 100644
--- a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
+++ b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
@@ -46,9 +46,11 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.LlapUtil;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.RecordUpdater;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.serde2.AbstractSerDe;
@@ -66,6 +68,7 @@ public abstract class AbstractRecordWriter implements RecordWriter {
   private static final Logger LOG = LoggerFactory.getLogger(AbstractRecordWriter.class.getName());
 
   private static final String DEFAULT_LINE_DELIMITER_PATTERN = "[\r\n]";
+  private Integer statementId;
   protected HiveConf conf;
   protected StreamingConnection conn;
   protected Table table;
@@ -128,13 +131,21 @@ public abstract class AbstractRecordWriter implements RecordWriter {
   }
 
   @Override
-  public void init(StreamingConnection conn, long minWriteId, long maxWriteId) throws StreamingException {
+  public void init(StreamingConnection conn, long minWriteId, long maxWriteId)
+      throws StreamingException {
+    init(conn, minWriteId, maxWriteId, -1);
+  }
+
+  @Override
+  public void init(StreamingConnection conn, long minWriteId, long maxWriteId,
+      int statementId) throws StreamingException {
     if (conn == null) {
       throw new StreamingException("Streaming connection cannot be null during record writer initialization");
     }
     this.conn = conn;
     this.curBatchMinWriteId = minWriteId;
     this.curBatchMaxWriteId = maxWriteId;
+    this.statementId = statementId;
     this.conf = conn.getHiveConf();
     this.defaultPartitionName = conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME);
     this.table = conn.getTable();
@@ -431,6 +442,7 @@ public abstract class AbstractRecordWriter implements RecordWriter {
       int bucket = getBucket(encodedRow);
       List<String> partitionValues = getPartitionValues(encodedRow);
       getRecordUpdater(partitionValues, bucket).insert(writeId, encodedRow);
+
       // ingest size bytes gets resetted on flush() whereas connection stats is not
       conn.getConnectionStats().incrementRecordsWritten();
       conn.getConnectionStats().incrementRecordsSize(record.length);
@@ -492,10 +504,53 @@ public abstract class AbstractRecordWriter implements RecordWriter {
         .tableProperties(tblProperties)
         .minimumWriteId(minWriteId)
         .maximumWriteId(maxWriteID)
-        .statementId(-1)
+        .statementId(statementId)
         .finalDestination(partitionPath));
   }
 
+  /**
+   * Returns the file that would be used to store rows under this.
+   * parameters
+   * @param partitionValues partition values
+   * @param bucketId bucket id
+   * @param minWriteId min write Id
+   * @param maxWriteId max write Id
+   * @param statementId statement Id
+   * @param table table
+   * @return the location of the file.
+   * @throws StreamingException when the path is not found
+   */
+  @Override
+  public Path getDeltaFileLocation(List<String> partitionValues,
+      Integer bucketId, Long minWriteId, Long maxWriteId, Integer statementId,
+      Table table) throws StreamingException {
+    Path destLocation;
+    if (partitionValues == null) {
+      destLocation = new Path(table.getSd().getLocation());
+    } else {
+      Map<String, String> partSpec = Warehouse.makeSpecFromValues(
+          table.getPartitionKeys(), partitionValues);
+      try {
+        destLocation = new Path(table.getDataLocation(), Warehouse.makePartPath(partSpec));
+      } catch (MetaException e) {
+        throw new StreamingException("Unable to retrieve the delta file location"
+            + " for values: " + partitionValues
+            + ", minWriteId: " + minWriteId
+            + ", maxWriteId: " + maxWriteId
+            + ", statementId: " + statementId, e);
+      }
+    }
+    AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
+        .filesystem(fs)
+        .inspector(outputRowObjectInspector)
+        .bucket(bucketId)
+        .minimumWriteId(minWriteId)
+        .maximumWriteId(maxWriteId)
+        .statementId(statementId)
+        .finalDestination(destLocation);
+    return AcidUtils.createFilename(destLocation, options);
+  }
+
   protected RecordUpdater getRecordUpdater(List<String> partitionValues, int bucketId) throws StreamingIOFailure {
     RecordUpdater recordUpdater;
     String key;
@@ -516,12 +571,10 @@ public abstract class AbstractRecordWriter implements RecordWriter {
           // partitions to TxnHandler
           if (!partitionInfo.isExists()) {
             addedPartitions.add(partitionInfo.getName());
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Created partition {} for table {}", partitionInfo.getName(), fullyQualifiedTableName);
-            }
           } else {
             if (LOG.isDebugEnabled()) {
-              LOG.debug("Partition {} already exists for table {}", partitionInfo.getName(), fullyQualifiedTableName);
+              LOG.debug("Partition {} already exists for table {}",
+                  partitionInfo.getName(), fullyQualifiedTableName);
             }
           }
           destLocation = new Path(partitionInfo.getPartitionLocation());

http://git-wip-us.apache.org/repos/asf/hive/blob/bdbd3bcf/streaming/src/java/org/apache/hive/streaming/AbstractStreamingTransaction.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/AbstractStreamingTransaction.java b/streaming/src/java/org/apache/hive/streaming/AbstractStreamingTransaction.java
new file mode 100644
index 0000000..a99fdba
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/AbstractStreamingTransaction.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
+
+import java.io.InputStream;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Common methods for the implementing classes.
+ */
+abstract class AbstractStreamingTransaction
+    implements StreamingTransaction {
+
+  /**
+   * This variable should be initialized by the children.
+   */
+  protected RecordWriter recordWriter;
+
+  /**
+   * This variable should be initialized by the children.
+   */
+  protected List<TxnToWriteId> txnToWriteIds;
+
+
+  /**
+   * once any operation on this batch encounters a system exception
+   * (e.g. IOException on write) it's safest to assume that we can't write to the
+   * file backing this batch any more.  This guards important public methods
+   */
+  protected final AtomicBoolean isTxnClosed = new AtomicBoolean(false);
+
+  protected int currentTxnIndex = -1;
+  protected HiveStreamingConnection.TxnState state;
+
+
+  protected void checkIsClosed() throws StreamingException {
+    if (isTxnClosed.get()) {
+      throw new StreamingException("Transaction" + toString() + " is closed()");
+    }
+  }
+
+  protected void beginNextTransactionImpl(String errorMessage)
+      throws StreamingException{
+    state = HiveStreamingConnection.TxnState.INACTIVE; //clear state from previous txn
+    if ((currentTxnIndex + 1) >= txnToWriteIds.size()) {
+      throw new InvalidTransactionState(errorMessage);
+    }
+    currentTxnIndex++;
+    state = HiveStreamingConnection.TxnState.OPEN;
+  }
+
+  public void write(final byte[] record) throws StreamingException {
+    checkIsClosed();
+    boolean success = false;
+    try {
+      recordWriter.write(getCurrentWriteId(), record);
+      success = true;
+    } catch (SerializationError ex) {
+      //this exception indicates that a {@code record} could not be parsed and the
+      //caller can decide whether to drop it or send it to dead letter queue.
+      //rolling back the txn and retrying won't help since the tuple will be exactly the same
+      //when it's replayed.
+      success = true;
+      throw ex;
+    } finally {
+      markDead(success);
+    }
+  }
+
+  public void write(final InputStream inputStream) throws StreamingException {
+    checkIsClosed();
+    boolean success = false;
+    try {
+      recordWriter.write(getCurrentWriteId(), inputStream);
+      success = true;
+    } catch (SerializationError ex) {
+      //this exception indicates that a {@code record} could not be parsed and the
+      //caller can decide whether to drop it or send it to dead letter queue.
+      //rolling back the txn and retrying won'table help since the tuple will be exactly the same
+      //when it's replayed.
+      success = true;
+      throw ex;
+    } finally {
+      markDead(success);
+    }
+  }
+
+  /**
+   * A transaction batch opens a single HDFS file and writes multiple transaction to it.  If there is any issue
+   * with the write, we can't continue to write to the same file any as it may be corrupted now (at the tail).
+   * This ensures that a client can't ignore these failures and continue to write.
+   */
+  protected void markDead(boolean success) throws StreamingException {
+    if (success) {
+      return;
+    }
+    close();
+  }
+
+  public long getCurrentWriteId() {
+    if (currentTxnIndex >= 0) {
+      return txnToWriteIds.get(currentTxnIndex).getWriteId();
+    }
+    return -1L;
+  }
+
+  public int remainingTransactions() {
+    if (currentTxnIndex >= 0) {
+      return txnToWriteIds.size() - currentTxnIndex - 1;
+    }
+    return txnToWriteIds.size();
+  }
+
+  public boolean isClosed() {
+    return isTxnClosed.get();
+  }
+
+  public HiveStreamingConnection.TxnState getCurrentTransactionState() {
+    return state;
+  }
+
+  public long getCurrentTxnId() {
+    if (currentTxnIndex >= 0) {
+      return txnToWriteIds.get(currentTxnIndex).getTxnId();
+    }
+    return -1L;
+  }
+
+  @Override
+  public List<TxnToWriteId> getTxnToWriteIds() {
+    return txnToWriteIds;
+  }
+
+  public void commit() throws StreamingException {
+    commitWithPartitions(null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/bdbd3bcf/streaming/src/java/org/apache/hive/streaming/ConnectionStats.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/ConnectionStats.java b/streaming/src/java/org/apache/hive/streaming/ConnectionStats.java
index 355456e..c0e7d5c 100644
--- a/streaming/src/java/org/apache/hive/streaming/ConnectionStats.java
+++ b/streaming/src/java/org/apache/hive/streaming/ConnectionStats.java
@@ -31,6 +31,16 @@ public class ConnectionStats {
   private LongAdder autoFlushCount = new LongAdder();
   private LongAdder metastoreCalls = new LongAdder();
 
+  /**
+   * Total partitions that have been affected.
+   */
+  private LongAdder totalPartitions = new LongAdder();
+
+  /**
+   * Number of partitions that where created.
+   */
+  private LongAdder createdPartitions = new LongAdder();
+
   public void incrementRecordsWritten() {
     recordsWritten.increment();
   }
@@ -55,6 +65,22 @@ public class ConnectionStats {
     recordsSize.add(delta);
   }
 
+  /**
+   * Increment by delta the number of created partitions.
+   * @param delta to increment by.
+   */
+  public void incrementCreatedPartitions(long delta) {
+    createdPartitions.add(delta);
+  }
+
+  /**
+   * Increment by delta the total partitions.
+   * @param delta to increment by.
+   */
+  public void incrementTotalPartitions(long delta) {
+    totalPartitions.add(delta);
+  }
+
   public long getRecordsWritten() {
     return recordsWritten.longValue();
   }
@@ -79,10 +105,20 @@ public class ConnectionStats {
     return metastoreCalls.longValue();
   }
 
+  public LongAdder getTotalPartitions() {
+    return totalPartitions;
+  }
+
+  public LongAdder getCreatedPartitions() {
+    return createdPartitions;
+  }
+
   @Override
   public String toString() {
     return "{records-written: " + recordsWritten + ", records-size: "+ recordsSize + ", committed-transactions: " +
       committedTransactions + ", aborted-transactions: " + abortedTransactions + ", auto-flushes: " + autoFlushCount +
-      ", metastore-calls: " + metastoreCalls + " }";
+      ", metastore-calls: " + metastoreCalls
+        + ", created-partitions: " + createdPartitions
+        + ", total-partitions: " + totalPartitions + " }";
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/bdbd3bcf/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
index 6cf14b0..f79b844 100644
--- a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
+++ b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
@@ -23,43 +23,27 @@ import java.io.InputStream;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.LockComponentBuilder;
-import org.apache.hadoop.hive.metastore.LockRequestBuilder;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
-import org.apache.hadoop.hive.metastore.api.DataOperationType;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse;
-import org.apache.hadoop.hive.metastore.api.LockRequest;
-import org.apache.hadoop.hive.metastore.api.LockResponse;
-import org.apache.hadoop.hive.metastore.api.LockState;
 import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
 import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
 import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
-import org.apache.hadoop.hive.ql.lockmgr.LockException;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Table;
@@ -71,7 +55,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
  * Streaming connection implementation for hive. To create a streaming connection, use the builder API
@@ -120,11 +103,11 @@ public class HiveStreamingConnection implements StreamingConnection {
 
   private static final String DEFAULT_METASTORE_URI = "thrift://localhost:9083";
   private static final int DEFAULT_TRANSACTION_BATCH_SIZE = 1;
-  private static final int DEFAULT_HEARTBEAT_INTERVAL = 60 * 1000;
   private static final boolean DEFAULT_STREAMING_OPTIMIZATIONS_ENABLED = true;
 
   public enum TxnState {
-    INACTIVE("I"), OPEN("O"), COMMITTED("C"), ABORTED("A");
+    INACTIVE("I"), OPEN("O"), COMMITTED("C"), ABORTED("A"),
+    PREPARED_FOR_COMMIT("P");
 
     private final String code;
 
@@ -144,7 +127,7 @@ public class HiveStreamingConnection implements StreamingConnection {
   private String agentInfo;
   private int transactionBatchSize;
   private RecordWriter recordWriter;
-  private TransactionBatch currentTransactionBatch;
+  private StreamingTransaction currentTransactionBatch;
   private HiveConf conf;
   private boolean streamingOptimizations;
   private AtomicBoolean isConnectionClosed = new AtomicBoolean(false);
@@ -158,6 +141,11 @@ public class HiveStreamingConnection implements StreamingConnection {
   private Table tableObject = null;
   private String metastoreUri;
   private ConnectionStats connectionStats;
+  private final Long writeId;
+  private final Integer statementId;
+  private boolean manageTransactions;
+  private int countTransactions = 0;
+  private Set<String> partitions;
 
   private HiveStreamingConnection(Builder builder) throws StreamingException {
     this.database = builder.database.toLowerCase();
@@ -166,6 +154,12 @@ public class HiveStreamingConnection implements StreamingConnection {
     this.conf = builder.hiveConf;
     this.agentInfo = builder.agentInfo;
     this.streamingOptimizations = builder.streamingOptimizations;
+    this.writeId = builder.writeId;
+    this.statementId = builder.statementId;
+    this.tableObject = builder.tableObject;
+    this.setPartitionedTable(builder.isPartitioned);
+    this.manageTransactions = builder.manageTransactions;
+
     UserGroupInformation loggedInUser = null;
     try {
       loggedInUser = UserGroupInformation.getLoginUser();
@@ -193,13 +187,18 @@ public class HiveStreamingConnection implements StreamingConnection {
     if (conf == null) {
       conf = createHiveConf(this.getClass(), DEFAULT_METASTORE_URI);
     }
+
     overrideConfSettings(conf);
-    this.metastoreUri = conf.get(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName());
-    this.msClient = getMetaStoreClient(conf, metastoreUri, secureMode, "streaming-connection");
-    // We use a separate metastore client for heartbeat calls to ensure heartbeat RPC calls are
-    // isolated from the other transaction related RPC calls.
-    this.heartbeatMSClient = getMetaStoreClient(conf, metastoreUri, secureMode, "streaming-connection-heartbeat");
-    validateTable();
+    if (manageTransactions) {
+      this.metastoreUri = conf.get(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName());
+      this.msClient = getMetaStoreClient(conf, metastoreUri, secureMode,
+          "streaming-connection");
+      // We use a separate metastore client for heartbeat calls to ensure heartbeat RPC calls are
+      // isolated from the other transaction related RPC calls.
+      this.heartbeatMSClient = getMetaStoreClient(conf, metastoreUri, secureMode,
+          "streaming-connection-heartbeat");
+      validateTable();
+    }
 
     LOG.info("STREAMING CONNECTION INFO: {}", toConnectionInfoString());
   }
@@ -217,6 +216,11 @@ public class HiveStreamingConnection implements StreamingConnection {
     private int transactionBatchSize = DEFAULT_TRANSACTION_BATCH_SIZE;
     private boolean streamingOptimizations = DEFAULT_STREAMING_OPTIMIZATIONS_ENABLED;
     private RecordWriter recordWriter;
+    private long writeId = -1;
+    private int statementId = -1;
+    private boolean manageTransactions = true;
+    private Table tableObject;
+    private boolean isPartitioned;
 
     /**
      * Specify database to use for streaming connection.
@@ -315,6 +319,44 @@ public class HiveStreamingConnection implements StreamingConnection {
     }
 
     /**
+     * Specify this parameter if we want the current connection
+     * to join an ongoing transaction without having to query
+     * the metastore to create it.
+     * @param writeId write id
+     * @return builder
+     */
+    public Builder withWriteId(final long writeId) {
+      this.writeId = writeId;
+      manageTransactions = false;
+      return this;
+    }
+
+    /**
+     * Specify this parameter to set an statement id in the writer.
+     * This really only makes sense to be specified when a writeId is
+     * provided as well
+     * @param statementId statement id
+     * @return builder
+     */
+    public Builder withStatementId(final int statementId) {
+      this.statementId = statementId;
+      return this;
+    }
+
+    /**
+     * Specify the table object since sometimes no connections
+     * to the metastore will be opened.
+     * @param table table object.
+     * @return builder
+     */
+    public Builder withTableObject(Table table) {
+      this.tableObject = table;
+      this.isPartitioned = tableObject.getPartitionKeys() != null
+          && !tableObject.getPartitionKeys().isEmpty();
+      return this;
+    }
+
+    /**
      * Returning a streaming connection to hive.
      *
      * @return - hive streaming connection
@@ -324,11 +366,27 @@ public class HiveStreamingConnection implements StreamingConnection {
         throw new StreamingException("Database cannot be null for streaming connection");
       }
       if (table == null) {
-        throw new StreamingException("Table cannot be null for streaming connection");
+        if (tableObject == null) {
+          throw new StreamingException("Table and table object cannot be "
+              + "null for streaming connection");
+        } else {
+          table = tableObject.getTableName();
+        }
+      }
+
+      if (tableObject != null && !tableObject.getTableName().equals(table)) {
+        throw new StreamingException("Table must match tableObject table name");
       }
+
       if (recordWriter == null) {
         throw new StreamingException("Record writer cannot be null for streaming connection");
       }
+      if ((writeId != -1 && tableObject == null) ||
+          (writeId == -1 && tableObject != null)){
+        throw new StreamingException("If writeId is set, tableObject "
+            + "must be set as well and vice versa");
+      }
+
       HiveStreamingConnection streamingConnection = new HiveStreamingConnection(this);
       // assigning higher priority than FileSystem shutdown hook so that streaming connection gets closed first before
       // filesystem close (to avoid ClosedChannelException)
@@ -338,7 +396,7 @@ public class HiveStreamingConnection implements StreamingConnection {
     }
   }
 
-  private void setPartitionedTable(boolean isPartitionedTable) {
+  private void setPartitionedTable(Boolean isPartitionedTable) {
     this.isPartitionedTable = isPartitionedTable;
   }
 
@@ -356,7 +414,9 @@ public class HiveStreamingConnection implements StreamingConnection {
       "username: " + username + ", " +
       "secure-mode: " + secureMode + ", " +
       "record-writer: " + recordWriter.getClass().getSimpleName() + ", " +
-      "agent-info: " + agentInfo + " }";
+      "agent-info: " + agentInfo + ", " +
+      "writeId: " + writeId +  ", " +
+      "statementId: " + statementId + " }";
   }
 
   @VisibleForTesting
@@ -369,6 +429,7 @@ public class HiveStreamingConnection implements StreamingConnection {
     String partLocation = null;
     String partName = null;
     boolean exists = false;
+
     try {
       Map<String, String> partSpec = Warehouse.makeSpecFromValues(tableObject.getPartitionKeys(), partitionValues);
       AddPartitionDesc addPartitionDesc = new AddPartitionDesc(database, table, true);
@@ -376,7 +437,18 @@ public class HiveStreamingConnection implements StreamingConnection {
       partLocation = new Path(tableObject.getDataLocation(), Warehouse.makePartPath(partSpec)).toString();
       addPartitionDesc.addPartition(partSpec, partLocation);
       Partition partition = Hive.convertAddSpecToMetaPartition(tableObject, addPartitionDesc.getPartition(0), conf);
+
+      if (getMSC() == null) {
+        // We assume it doesn't exist if we can't check it
+        // so the driver will decide
+        return new PartitionInfo(partName, partLocation, false);
+      }
+
       getMSC().add_partition(partition);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Created partition {} for table {}", partName,
+            tableObject.getFullyQualifiedName());
+      }
     } catch (AlreadyExistsException e) {
       exists = true;
     } catch (HiveException | TException e) {
@@ -386,6 +458,25 @@ public class HiveStreamingConnection implements StreamingConnection {
     return new PartitionInfo(partName, partLocation, exists);
   }
 
+  /**
+   * Returns the file that would be used to store rows under this.
+   * parameters
+   * @param partitionValues partition values
+   * @param bucketId bucket id
+   * @param minWriteId min write Id
+   * @param maxWriteId max write Id
+   * @param statementId statement Id
+   * @return the location of the file.
+   * @throws StreamingException when the path is not found
+   */
+  @Override
+  public Path getDeltaFileLocation(List<String> partitionValues,
+      Integer bucketId, Long minWriteId, Long maxWriteId, Integer statementId)
+      throws StreamingException {
+    return recordWriter.getDeltaFileLocation(partitionValues,
+        bucketId, minWriteId, maxWriteId, statementId, tableObject);
+  }
+
   IMetaStoreClient getMSC() {
     connectionStats.incrementMetastoreCalls();
     return msClient;
@@ -424,43 +515,6 @@ public class HiveStreamingConnection implements StreamingConnection {
     }
   }
 
-  private static class HeartbeatRunnable implements Runnable {
-    private final HiveStreamingConnection conn;
-    private final AtomicLong minTxnId;
-    private final long maxTxnId;
-    private final ReentrantLock transactionLock;
-    private final AtomicBoolean isTxnClosed;
-
-    HeartbeatRunnable(final HiveStreamingConnection conn, final AtomicLong minTxnId, final long maxTxnId,
-      final ReentrantLock transactionLock, final AtomicBoolean isTxnClosed) {
-      this.conn = conn;
-      this.minTxnId = minTxnId;
-      this.maxTxnId = maxTxnId;
-      this.transactionLock = transactionLock;
-      this.isTxnClosed = isTxnClosed;
-    }
-
-    @Override
-    public void run() {
-      transactionLock.lock();
-      try {
-        if (minTxnId.get() > 0) {
-          HeartbeatTxnRangeResponse resp = conn.getHeatbeatMSC().heartbeatTxnRange(minTxnId.get(), maxTxnId);
-          if (!resp.getAborted().isEmpty() || !resp.getNosuch().isEmpty()) {
-            LOG.error("Heartbeat failure: {}", resp.toString());
-            isTxnClosed.set(true);
-          } else {
-            LOG.info("Heartbeat sent for range: [{}-{}]", minTxnId.get(), maxTxnId);
-          }
-        }
-      } catch (TException e) {
-        LOG.warn("Failure to heartbeat for transaction range: [" + minTxnId.get() + "-" + maxTxnId + "]", e);
-      } finally {
-        transactionLock.unlock();
-      }
-    }
-  }
-
   private void beginNextTransaction() throws StreamingException {
     if (currentTransactionBatch == null) {
       currentTransactionBatch = createNewTransactionBatch();
@@ -481,8 +535,18 @@ public class HiveStreamingConnection implements StreamingConnection {
     currentTransactionBatch.beginNextTransaction();
   }
 
-  private TransactionBatch createNewTransactionBatch() throws StreamingException {
-    return new TransactionBatch(this);
+  private StreamingTransaction createNewTransactionBatch() throws StreamingException {
+    countTransactions++;
+    if (manageTransactions) {
+      return new TransactionBatch(this);
+    } else {
+      if (countTransactions > 1) {
+        throw new StreamingException("If a writeId is passed for the "
+            + "construction of HiveStreaming only one transaction batch"
+            + " can be done");
+      }
+      return new UnManagedSingleTransaction(this);
+    }
   }
 
   private void checkClosedState() throws StreamingException {
@@ -496,7 +560,7 @@ public class HiveStreamingConnection implements StreamingConnection {
     if (currentTransactionBatch == null) {
       throw new StreamingException("Transaction batch is null. Missing beginTransaction?");
     }
-    if (currentTransactionBatch.state != TxnState.OPEN) {
+    if (currentTransactionBatch.getCurrentTransactionState() != TxnState.OPEN) {
       throw new StreamingException("Transaction state is not OPEN. Missing beginTransaction?");
     }
   }
@@ -504,13 +568,40 @@ public class HiveStreamingConnection implements StreamingConnection {
   @Override
   public void beginTransaction() throws StreamingException {
     checkClosedState();
+    partitions = new HashSet<>();
     beginNextTransaction();
   }
 
   @Override
   public void commitTransaction() throws StreamingException {
+    commitTransactionWithPartition(null);
+  }
+
+  @Override
+  public void commitTransactionWithPartition(Set<String> partitions)
+      throws StreamingException {
     checkState();
-    currentTransactionBatch.commit();
+
+    Set<String> createdPartitions = new HashSet<>();
+    if (partitions != null) {
+      for (String partition: partitions) {
+        try {
+          PartitionInfo info = createPartitionIfNotExists(
+              Warehouse.getPartValuesFromPartName(partition));
+          if (!info.isExists()) {
+            createdPartitions.add(partition);
+          }
+        } catch (MetaException e) {
+          throw new StreamingException("Partition " + partition + " is invalid.", e);
+        }
+      }
+      connectionStats.incrementTotalPartitions(partitions.size());
+    }
+
+    currentTransactionBatch.commitWithPartitions(createdPartitions);
+    this.partitions.addAll(
+        currentTransactionBatch.getPartitions());
+    connectionStats.incrementCreatedPartitions(createdPartitions.size());
     connectionStats.incrementCommittedTransactions();
   }
 
@@ -549,8 +640,10 @@ public class HiveStreamingConnection implements StreamingConnection {
     } catch (StreamingException e) {
       LOG.warn("Unable to close current transaction batch: " + currentTransactionBatch, e);
     } finally {
-      getMSC().close();
-      getHeatbeatMSC().close();
+      if (manageTransactions) {
+        getMSC().close();
+        getHeatbeatMSC().close();
+      }
     }
     if (LOG.isInfoEnabled()) {
       LOG.info("Closed streaming connection. Agent: {} Stats: {}", getAgentInfo(), getConnectionStats());
@@ -595,418 +688,6 @@ public class HiveStreamingConnection implements StreamingConnection {
     return currentTransactionBatch.getCurrentTxnId();
   }
 
-  private static class TransactionBatch {
-    private String username;
-    private HiveStreamingConnection conn;
-    private ScheduledExecutorService scheduledExecutorService;
-    private RecordWriter recordWriter;
-    private String partNameForLock = null;
-    private List<TxnToWriteId> txnToWriteIds;
-    private int currentTxnIndex = -1;
-    private TxnState state;
-    private LockRequest lockRequest = null;
-    // heartbeats can only be sent for open transactions.
-    // there is a race between committing/aborting a transaction and heartbeat.
-    // Example: If a heartbeat is sent for committed txn, exception will be thrown.
-    // Similarly if we don't send a heartbeat, metastore server might abort a txn
-    // for missed heartbeat right before commit txn call.
-    // This lock is used to mutex commit/abort and heartbeat calls
-    private final ReentrantLock transactionLock = new ReentrantLock();
-    // min txn id is incremented linearly within a transaction batch.
-    // keeping minTxnId atomic as it is shared with heartbeat thread
-    private final AtomicLong minTxnId;
-    // max txn id does not change for a transaction batch
-    private final long maxTxnId;
-
-    /**
-     * once any operation on this batch encounters a system exception
-     * (e.g. IOException on write) it's safest to assume that we can't write to the
-     * file backing this batch any more.  This guards important public methods
-     */
-    private final AtomicBoolean isTxnClosed = new AtomicBoolean(false);
-    private String agentInfo;
-    private int numTxns;
-    /**
-     * Tracks the state of each transaction
-     */
-    private TxnState[] txnStatus;
-    /**
-     * ID of the last txn used by {@link #beginNextTransactionImpl()}
-     */
-    private long lastTxnUsed;
-
-    /**
-     * Represents a batch of transactions acquired from MetaStore
-     *
-     * @param conn - hive streaming connection
-     * @throws StreamingException if failed to create new RecordUpdater for batch
-     */
-    private TransactionBatch(HiveStreamingConnection conn) throws StreamingException {
-      boolean success = false;
-      try {
-        if (conn.isPartitionedTable() && !conn.isDynamicPartitioning()) {
-          List<FieldSchema> partKeys = conn.tableObject.getPartitionKeys();
-          partNameForLock = Warehouse.makePartName(partKeys, conn.staticPartitionValues);
-        }
-        this.conn = conn;
-        this.username = conn.username;
-        this.recordWriter = conn.recordWriter;
-        this.agentInfo = conn.agentInfo;
-        this.numTxns = conn.transactionBatchSize;
-
-        setupHeartBeatThread();
-
-        List<Long> txnIds = openTxnImpl(username, numTxns);
-        txnToWriteIds = allocateWriteIdsImpl(txnIds);
-        assert (txnToWriteIds.size() == numTxns);
-
-        txnStatus = new TxnState[numTxns];
-        for (int i = 0; i < txnStatus.length; i++) {
-          assert (txnToWriteIds.get(i).getTxnId() == txnIds.get(i));
-          txnStatus[i] = TxnState.OPEN; //Open matches Metastore state
-        }
-        this.state = TxnState.INACTIVE;
-
-        // initialize record writer with connection and write id info
-        recordWriter.init(conn, txnToWriteIds.get(0).getWriteId(), txnToWriteIds.get(numTxns - 1).getWriteId());
-        this.minTxnId = new AtomicLong(txnIds.get(0));
-        this.maxTxnId = txnIds.get(txnIds.size() - 1);
-        success = true;
-      } catch (TException e) {
-        throw new StreamingException(conn.toString(), e);
-      } finally {
-        //clean up if above throws
-        markDead(success);
-      }
-    }
-
-    private void setupHeartBeatThread() {
-      // start heartbeat thread
-      ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
-        .setNameFormat("HiveStreamingConnection-Heartbeat-Thread")
-        .build();
-      this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
-      long heartBeatInterval;
-      long initialDelay;
-      try {
-        // if HIVE_TXN_TIMEOUT is defined, heartbeat interval will be HIVE_TXN_TIMEOUT/2
-        heartBeatInterval = DbTxnManager.getHeartbeatInterval(conn.conf);
-      } catch (LockException e) {
-        heartBeatInterval = DEFAULT_HEARTBEAT_INTERVAL;
-      }
-      // to introduce some randomness and to avoid hammering the metastore at the same time (same logic as DbTxnManager)
-      initialDelay = (long) (heartBeatInterval * 0.75 * Math.random());
-      LOG.info("Starting heartbeat thread with interval: {} ms initialDelay: {} ms for agentInfo: {}",
-        heartBeatInterval, initialDelay, conn.agentInfo);
-      Runnable runnable = new HeartbeatRunnable(conn, minTxnId, maxTxnId, transactionLock, isTxnClosed);
-      this.scheduledExecutorService.scheduleWithFixedDelay(runnable, initialDelay, heartBeatInterval, TimeUnit
-        .MILLISECONDS);
-    }
-
-    private List<Long> openTxnImpl(final String user, final int numTxns) throws TException {
-      return conn.getMSC().openTxns(user, numTxns).getTxn_ids();
-    }
-
-    private List<TxnToWriteId> allocateWriteIdsImpl(final List<Long> txnIds) throws TException {
-      return conn.getMSC().allocateTableWriteIdsBatch(txnIds, conn.database, conn.table);
-    }
-
-    @Override
-    public String toString() {
-      if (txnToWriteIds == null || txnToWriteIds.isEmpty()) {
-        return "{}";
-      }
-      StringBuilder sb = new StringBuilder(" TxnStatus[");
-      for (TxnState state : txnStatus) {
-        //'state' should not be null - future proofing
-        sb.append(state == null ? "N" : state);
-      }
-      sb.append("] LastUsed ").append(JavaUtils.txnIdToString(lastTxnUsed));
-      return "TxnId/WriteIds=[" + txnToWriteIds.get(0).getTxnId()
-        + "/" + txnToWriteIds.get(0).getWriteId()
-        + "..."
-        + txnToWriteIds.get(txnToWriteIds.size() - 1).getTxnId()
-        + "/" + txnToWriteIds.get(txnToWriteIds.size() - 1).getWriteId()
-        + "] on connection = " + conn + "; " + sb;
-    }
-
-    private void beginNextTransaction() throws StreamingException {
-      checkIsClosed();
-      beginNextTransactionImpl();
-    }
-
-    private void beginNextTransactionImpl() throws TransactionError {
-      state = TxnState.INACTIVE;//clear state from previous txn
-      if ((currentTxnIndex + 1) >= txnToWriteIds.size()) {
-        throw new InvalidTransactionState("No more transactions available in" +
-          " next batch for connection: " + conn + " user: " + username);
-      }
-      currentTxnIndex++;
-      state = TxnState.OPEN;
-      lastTxnUsed = getCurrentTxnId();
-      lockRequest = createLockRequest(conn, partNameForLock, username, getCurrentTxnId(), agentInfo);
-      try {
-        LockResponse res = conn.getMSC().lock(lockRequest);
-        if (res.getState() != LockState.ACQUIRED) {
-          throw new TransactionError("Unable to acquire lock on " + conn);
-        }
-      } catch (TException e) {
-        throw new TransactionError("Unable to acquire lock on " + conn, e);
-      }
-    }
-
-    long getCurrentTxnId() {
-      if (currentTxnIndex >= 0) {
-        return txnToWriteIds.get(currentTxnIndex).getTxnId();
-      }
-      return -1L;
-    }
-
-    long getCurrentWriteId() {
-      if (currentTxnIndex >= 0) {
-        return txnToWriteIds.get(currentTxnIndex).getWriteId();
-      }
-      return -1L;
-    }
-
-    TxnState getCurrentTransactionState() {
-      return state;
-    }
-
-    int remainingTransactions() {
-      if (currentTxnIndex >= 0) {
-        return txnToWriteIds.size() - currentTxnIndex - 1;
-      }
-      return txnToWriteIds.size();
-    }
-
-
-    public void write(final byte[] record) throws StreamingException {
-      checkIsClosed();
-      boolean success = false;
-      try {
-        recordWriter.write(getCurrentWriteId(), record);
-        success = true;
-      } catch (SerializationError ex) {
-        //this exception indicates that a {@code record} could not be parsed and the
-        //caller can decide whether to drop it or send it to dead letter queue.
-        //rolling back the txn and retrying won't help since the tuple will be exactly the same
-        //when it's replayed.
-        success = true;
-        throw ex;
-      } finally {
-        markDead(success);
-      }
-    }
-
-    public void write(final InputStream inputStream) throws StreamingException {
-      checkIsClosed();
-      boolean success = false;
-      try {
-        recordWriter.write(getCurrentWriteId(), inputStream);
-        success = true;
-      } catch (SerializationError ex) {
-        //this exception indicates that a {@code record} could not be parsed and the
-        //caller can decide whether to drop it or send it to dead letter queue.
-        //rolling back the txn and retrying won'table help since the tuple will be exactly the same
-        //when it's replayed.
-        success = true;
-        throw ex;
-      } finally {
-        markDead(success);
-      }
-    }
-
-    private void checkIsClosed() throws StreamingException {
-      if (isTxnClosed.get()) {
-        throw new StreamingException("Transaction" + toString() + " is closed()");
-      }
-    }
-
-    /**
-     * A transaction batch opens a single HDFS file and writes multiple transaction to it.  If there is any issue
-     * with the write, we can't continue to write to the same file any as it may be corrupted now (at the tail).
-     * This ensures that a client can't ignore these failures and continue to write.
-     */
-    private void markDead(boolean success) throws StreamingException {
-      if (success) {
-        return;
-      }
-      close();
-    }
-
-
-    void commit() throws StreamingException {
-      checkIsClosed();
-      boolean success = false;
-      try {
-        commitImpl();
-        success = true;
-      } finally {
-        markDead(success);
-      }
-    }
-
-    private void commitImpl() throws StreamingException {
-      try {
-        recordWriter.flush();
-        TxnToWriteId txnToWriteId = txnToWriteIds.get(currentTxnIndex);
-        if (conn.isDynamicPartitioning()) {
-          List<String> partNames = new ArrayList<>(recordWriter.getPartitions());
-          conn.getMSC().addDynamicPartitions(txnToWriteId.getTxnId(), txnToWriteId.getWriteId(), conn.database, conn.table,
-            partNames, DataOperationType.INSERT);
-        }
-        transactionLock.lock();
-        try {
-          conn.getMSC().commitTxn(txnToWriteId.getTxnId());
-          // increment the min txn id so that heartbeat thread will heartbeat only from the next open transaction.
-          // the current transaction is going to committed or fail, so don't need heartbeat for current transaction.
-          if (currentTxnIndex + 1 < txnToWriteIds.size()) {
-            minTxnId.set(txnToWriteIds.get(currentTxnIndex + 1).getTxnId());
-          } else {
-            // exhausted the batch, no longer have to heartbeat for current txn batch
-            minTxnId.set(-1);
-          }
-        } finally {
-          transactionLock.unlock();
-        }
-        state = TxnState.COMMITTED;
-        txnStatus[currentTxnIndex] = TxnState.COMMITTED;
-      } catch (NoSuchTxnException e) {
-        throw new TransactionError("Invalid transaction id : "
-          + getCurrentTxnId(), e);
-      } catch (TxnAbortedException e) {
-        throw new TransactionError("Aborted transaction cannot be committed"
-          , e);
-      } catch (TException e) {
-        throw new TransactionError("Unable to commitTransaction transaction"
-          + getCurrentTxnId(), e);
-      }
-    }
-
-    void abort() throws StreamingException {
-      if (isTxnClosed.get()) {
-        /*
-         * isDead is only set internally by this class.  {@link #markDead(boolean)} will abort all
-         * remaining txns, so make this no-op to make sure that a well-behaved client that calls abortTransaction()
-         * error doesn't get misleading errors
-         */
-        return;
-      }
-      abort(false);
-    }
-
-    private void abort(final boolean abortAllRemaining) throws StreamingException {
-      abortImpl(abortAllRemaining);
-    }
-
-    private void abortImpl(boolean abortAllRemaining) throws StreamingException {
-      if (minTxnId == null) {
-        return;
-      }
-
-      transactionLock.lock();
-      try {
-        if (abortAllRemaining) {
-          // we are aborting all txns in the current batch, so no need to heartbeat
-          minTxnId.set(-1);
-          //when last txn finished (abortTransaction/commitTransaction) the currentTxnIndex is pointing at that txn
-          //so we need to start from next one, if any.  Also if batch was created but
-          //fetchTransactionBatch() was never called, we want to start with first txn
-          int minOpenTxnIndex = Math.max(currentTxnIndex +
-            (state == TxnState.ABORTED || state == TxnState.COMMITTED ? 1 : 0), 0);
-          for (currentTxnIndex = minOpenTxnIndex;
-            currentTxnIndex < txnToWriteIds.size(); currentTxnIndex++) {
-            conn.getMSC().rollbackTxn(txnToWriteIds.get(currentTxnIndex).getTxnId());
-            txnStatus[currentTxnIndex] = TxnState.ABORTED;
-          }
-          currentTxnIndex--;//since the loop left it == txnToWriteIds.size()
-        } else {
-          // we are aborting only the current transaction, so move the min range for heartbeat or disable heartbeat
-          // if the current txn is last in the batch.
-          if (currentTxnIndex + 1 < txnToWriteIds.size()) {
-            minTxnId.set(txnToWriteIds.get(currentTxnIndex + 1).getTxnId());
-          } else {
-            // exhausted the batch, no longer have to heartbeat
-            minTxnId.set(-1);
-          }
-          long currTxnId = getCurrentTxnId();
-          if (currTxnId > 0) {
-            conn.getMSC().rollbackTxn(currTxnId);
-            txnStatus[currentTxnIndex] = TxnState.ABORTED;
-          }
-        }
-        state = TxnState.ABORTED;
-      } catch (NoSuchTxnException e) {
-        throw new TransactionError("Unable to abort invalid transaction id : "
-          + getCurrentTxnId(), e);
-      } catch (TException e) {
-        throw new TransactionError("Unable to abort transaction id : "
-          + getCurrentTxnId(), e);
-      } finally {
-        transactionLock.unlock();
-      }
-    }
-
-    public boolean isClosed() {
-      return isTxnClosed.get();
-    }
-
-    /**
-     * Close the TransactionBatch.  This will abort any still open txns in this batch.
-     *
-     * @throws StreamingException - failure when closing transaction batch
-     */
-    public void close() throws StreamingException {
-      if (isTxnClosed.get()) {
-        return;
-      }
-      isTxnClosed.set(true); //also ensures that heartbeat() is no-op since client is likely doing it async
-      try {
-        abort(true);//abort all remaining txns
-      } catch (Exception ex) {
-        LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex);
-        throw new StreamingException("Unable to abort", ex);
-      }
-      try {
-        closeImpl();
-      } catch (Exception ex) {
-        LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex);
-        throw new StreamingException("Unable to close", ex);
-      }
-    }
-
-    private void closeImpl() throws StreamingException {
-      state = TxnState.INACTIVE;
-      recordWriter.close();
-      if (scheduledExecutorService != null) {
-        scheduledExecutorService.shutdownNow();
-      }
-    }
-
-    static LockRequest createLockRequest(final HiveStreamingConnection connection,
-      String partNameForLock, String user, long txnId, String agentInfo) {
-      LockRequestBuilder requestBuilder = new LockRequestBuilder(agentInfo);
-      requestBuilder.setUser(user);
-      requestBuilder.setTransactionId(txnId);
-
-      LockComponentBuilder lockCompBuilder = new LockComponentBuilder()
-        .setDbName(connection.database)
-        .setTableName(connection.table)
-        .setShared()
-        .setOperationType(DataOperationType.INSERT);
-      if (connection.isDynamicPartitioning()) {
-        lockCompBuilder.setIsDynamicPartitionWrite(true);
-      }
-      if (partNameForLock != null && !partNameForLock.isEmpty()) {
-        lockCompBuilder.setPartitionName(partNameForLock);
-      }
-      requestBuilder.addLockComponent(lockCompBuilder.build());
-
-      return requestBuilder.build();
-    }
-  }
-
   private HiveConf createHiveConf(Class<?> clazz, String metaStoreUri) {
     HiveConf conf = new HiveConf(clazz);
     if (metaStoreUri != null) {
@@ -1049,6 +730,13 @@ public class HiveStreamingConnection implements StreamingConnection {
     conf.setBoolean(var, true);
   }
 
+  public List<TxnToWriteId> getTxnToWriteIds() {
+    if (currentTransactionBatch != null) {
+      return currentTransactionBatch.getTxnToWriteIds();
+    }
+    return null;
+  }
+
   @Override
   public HiveConf getHiveConf() {
     return conf;
@@ -1083,4 +771,41 @@ public class HiveStreamingConnection implements StreamingConnection {
   public boolean isDynamicPartitioning() {
     return isPartitionedTable() && (staticPartitionValues == null || staticPartitionValues.isEmpty());
   }
+
+  @Override
+  public Set<String> getPartitions() {
+    return partitions;
+  }
+
+  public String getUsername() {
+    return username;
+  }
+
+  public String getDatabase() {
+    return database;
+  }
+
+  public RecordWriter getRecordWriter() {
+    return recordWriter;
+  }
+
+  public int getTransactionBatchSize() {
+    return transactionBatchSize;
+  }
+
+  public HiveConf getConf() {
+    return conf;
+  }
+
+  public Long getWriteId() {
+    return writeId;
+  }
+
+  public Integer getStatementId() {
+    return statementId;
+  }
+
+  public Long getCurrentWriteId() {
+    return currentTransactionBatch.getCurrentWriteId();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/bdbd3bcf/streaming/src/java/org/apache/hive/streaming/InvalidTransactionState.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/InvalidTransactionState.java b/streaming/src/java/org/apache/hive/streaming/InvalidTransactionState.java
index 9d92dfa..d076ce7 100644
--- a/streaming/src/java/org/apache/hive/streaming/InvalidTransactionState.java
+++ b/streaming/src/java/org/apache/hive/streaming/InvalidTransactionState.java
@@ -18,8 +18,11 @@
 
 package org.apache.hive.streaming;
 
+/**
+ * Invalid transaction.
+ */
 public class InvalidTransactionState extends TransactionError {
-  InvalidTransactionState(String msg) {
+  public InvalidTransactionState(String msg) {
     super(msg);
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/bdbd3bcf/streaming/src/java/org/apache/hive/streaming/PartitionInfo.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/PartitionInfo.java b/streaming/src/java/org/apache/hive/streaming/PartitionInfo.java
index ce9f76a..d16a7a5 100644
--- a/streaming/src/java/org/apache/hive/streaming/PartitionInfo.java
+++ b/streaming/src/java/org/apache/hive/streaming/PartitionInfo.java
@@ -26,6 +26,7 @@ public class PartitionInfo {
   private String partitionLocation;
   private boolean exists;
 
+
   public PartitionInfo(final String name, final String partitionLocation, final boolean exists) {
     this.name = name;
     this.partitionLocation = partitionLocation;

http://git-wip-us.apache.org/repos/asf/hive/blob/bdbd3bcf/streaming/src/java/org/apache/hive/streaming/RecordWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/RecordWriter.java b/streaming/src/java/org/apache/hive/streaming/RecordWriter.java
index d9c4455..5b02754 100644
--- a/streaming/src/java/org/apache/hive/streaming/RecordWriter.java
+++ b/streaming/src/java/org/apache/hive/streaming/RecordWriter.java
@@ -19,8 +19,12 @@
 package org.apache.hive.streaming;
 
 
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.metadata.Table;
+
 import java.io.InputStream;
 
+import java.util.List;
 import java.util.Set;
 
 public interface RecordWriter {
@@ -36,6 +40,20 @@ public interface RecordWriter {
   void init(StreamingConnection connection, long minWriteId, long maxWriteID) throws StreamingException;
 
   /**
+   * Initialize record writer.
+   *
+   * @param connection - streaming connection
+   * @param minWriteId - min write id
+   * @param maxWriteID - max write id
+   * @param statementId - statemenId. Note this number can't be bigger than 2^12
+   * @throws StreamingException - thrown when initialization failed
+   */
+  default void init(StreamingConnection connection, long minWriteId,
+      long maxWriteID, int statementId) throws StreamingException {
+    init(connection, minWriteId, maxWriteID);
+  }
+
+  /**
    * Writes using a hive RecordUpdater.
    *
    * @param writeId - the write ID of the table mapping to Txn in which the write occurs
@@ -69,9 +87,27 @@ public interface RecordWriter {
   void close() throws StreamingException;
 
   /**
-   * Get the set of partitions that were added by the record writer.
+   * Get the set of partitions that were added were used but may have been
+   * added or not to the metastore.
    *
    * @return - set of partitions
    */
   Set<String> getPartitions();
+
+  /**
+   * Returns the location of the delta directory.
+   * @param partitionValues partition values
+   * @param bucketId bucket id
+   * @param minWriteId min write Id
+   * @param maxWriteId max write Id
+   * @param statementId statement Id
+   * @param table table
+   * @return the location of the file
+   * @throws StreamingException when the path is not found
+   */
+  default Path getDeltaFileLocation(List<String> partitionValues,
+      Integer bucketId, Long minWriteId, Long maxWriteId, Integer statementId,
+      Table table) throws StreamingException {
+    throw new UnsupportedOperationException();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/bdbd3bcf/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java
index fbe00db..92016e5 100644
--- a/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java
+++ b/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java
@@ -19,9 +19,14 @@
 package org.apache.hive.streaming;
 
 import java.io.InputStream;
+import java.util.List;
+import java.util.Set;
 
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 
+import javax.annotation.Nullable;
+
 public interface StreamingConnection extends ConnectionInfo, PartitionHandler {
   /**
    * Returns hive configuration object used during connection creation.
@@ -61,6 +66,18 @@ public interface StreamingConnection extends ConnectionInfo, PartitionHandler {
   void commitTransaction() throws StreamingException;
 
   /**
+   * Commit a transaction to make the writes visible for readers. Include
+   * other partitions that may have been added independently.
+   *
+   * @param partitions - extra partitions to commit.
+   * @throws StreamingException - if there are errors when committing the open transaction.
+   */
+  default void commitTransactionWithPartition(@Nullable Set<String> partitions)
+      throws StreamingException {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
    * Manually abort the opened transaction.
    *
    * @throws StreamingException - if there are errors when aborting the transaction
@@ -78,4 +95,30 @@ public interface StreamingConnection extends ConnectionInfo, PartitionHandler {
    * @return - connection stats
    */
   ConnectionStats getConnectionStats();
+
+  /**
+   * Get the partitions used during the streaming. This partitions haven't
+   * been committed to the metastore.
+   * @return partitions.
+   */
+  default Set<String> getPartitions() {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Returns the file that would be used by the writer to write the rows.
+   * given the parameters
+   * @param partitionValues partition values
+   * @param bucketId bucket id
+   * @param minWriteId min write Id
+   * @param maxWriteId max write Id
+   * @param statementId statement Id
+   * @return the location of the file.
+   * @throws StreamingException when the path is not found
+   */
+  default Path getDeltaFileLocation(List<String> partitionValues,
+      Integer bucketId, Long minWriteId, Long maxWriteId, Integer statementId)
+      throws StreamingException {
+    throw new UnsupportedOperationException();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/bdbd3bcf/streaming/src/java/org/apache/hive/streaming/StreamingTransaction.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/StreamingTransaction.java b/streaming/src/java/org/apache/hive/streaming/StreamingTransaction.java
new file mode 100644
index 0000000..83b2f15
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/StreamingTransaction.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Common interface for transaction in HiveStreamingConnection.
+ */
+public interface StreamingTransaction {
+  /**
+   * get ready for the next transaction.
+   * @throws StreamingException
+   */
+  void beginNextTransaction() throws StreamingException;
+
+  /**
+   * commit transaction.
+   * @throws StreamingException
+   */
+  void commit() throws StreamingException;
+
+  /**
+   * Commit transaction and sent to the metastore the created partitions
+   * in the process.
+   * @param partitions to commit.
+   * @throws StreamingException
+   */
+  void commitWithPartitions(Set<String> partitions) throws StreamingException;
+
+  /**
+   * Abort a transaction.
+   * @throws StreamingException
+   */
+  void abort() throws StreamingException;
+
+  /**
+   * Write data withing a transaction. This expectects beginNextTransaction
+   * to have been called before this and commit to be called after.
+   * @param record bytes to write.
+   * @throws StreamingException
+   */
+  void write(byte[] record) throws StreamingException;
+
+  /**
+   * Write data within a transaction.
+   * @param stream stream to write.
+   * @throws StreamingException
+   */
+  void write(InputStream stream) throws StreamingException;
+
+  /**
+   * Free/close resources used by the streaming transaction.
+   * @throws StreamingException
+   */
+  void close() throws StreamingException;
+
+  /**
+   * @return true if closed.
+   */
+  boolean isClosed();
+
+  /**
+   * @return the state of the current transaction.
+   */
+  HiveStreamingConnection.TxnState getCurrentTransactionState();
+
+  /**
+   * @return remaining number of transactions
+   */
+  int remainingTransactions();
+
+  /**
+   * @return the current transaction id being used
+   */
+  long getCurrentTxnId();
+
+  /**
+   * @return the current write id being used
+   */
+  long getCurrentWriteId();
+
+  /**
+   * Get the partitions that were used in this transaction. They may have
+   * been created
+   * @return list of partitions
+   */
+  Set<String> getPartitions();
+
+  /**
+   * @return get the paris for transaction ids <--> write ids
+   */
+  List<TxnToWriteId> getTxnToWriteIds();
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/bdbd3bcf/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java b/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java
new file mode 100644
index 0000000..dabbe21
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.metastore.LockComponentBuilder;
+import org.apache.hadoop.hive.metastore.LockRequestBuilder;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.DataOperationType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
+import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+import org.apache.hadoop.hive.ql.lockmgr.LockException;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Streaming transaction to use most of the times. Will query the
+ * metastore to get the transaction ids and the writer ids and then
+ * will commit them.
+ */
+public class TransactionBatch extends AbstractStreamingTransaction {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      TransactionBatch.class.getName());
+  private static final int DEFAULT_HEARTBEAT_INTERVAL = 60 * 1000;
+  protected Set<String> createdPartitions = null;
+  private String username;
+  private HiveStreamingConnection conn;
+  private ScheduledExecutorService scheduledExecutorService;
+  private String partNameForLock = null;
+  private LockRequest lockRequest = null;
+  // heartbeats can only be sent for open transactions.
+  // there is a race between committing/aborting a transaction and heartbeat.
+  // Example: If a heartbeat is sent for committed txn, exception will be thrown.
+  // Similarly if we don't send a heartbeat, metastore server might abort a txn
+  // for missed heartbeat right before commit txn call.
+  // This lock is used to mutex commit/abort and heartbeat calls
+  private final ReentrantLock transactionLock = new ReentrantLock();
+  // min txn id is incremented linearly within a transaction batch.
+  // keeping minTxnId atomic as it is shared with heartbeat thread
+  private final AtomicLong minTxnId;
+  // max txn id does not change for a transaction batch
+  private final long maxTxnId;
+
+  private String agentInfo;
+  private int numTxns;
+  /**
+   * Tracks the state of each transaction.
+   */
+  private HiveStreamingConnection.TxnState[] txnStatus;
+  /**
+   * ID of the last txn used by {@link #beginNextTransactionImpl()}.
+   */
+  private long lastTxnUsed;
+
+  /**
+   * Represents a batch of transactions acquired from MetaStore.
+   *
+   * @param conn - hive streaming connection
+   * @throws StreamingException if failed to create new RecordUpdater for batch
+   */
+  public TransactionBatch(HiveStreamingConnection conn) throws StreamingException {
+    boolean success = false;
+    try {
+      if (conn.isPartitionedTable() && !conn.isDynamicPartitioning()) {
+        List<FieldSchema> partKeys = conn.getTable().getPartitionKeys();
+        partNameForLock = Warehouse.makePartName(partKeys, conn.getStaticPartitionValues());
+      }
+      this.conn = conn;
+      this.username = conn.getUsername();
+      this.recordWriter = conn.getRecordWriter();
+      this.agentInfo = conn.getAgentInfo();
+      this.numTxns = conn.getTransactionBatchSize();
+
+      setupHeartBeatThread();
+
+      List<Long> txnIds = openTxnImpl(username, numTxns);
+      txnToWriteIds = allocateWriteIdsImpl(txnIds);
+      assert (txnToWriteIds.size() == numTxns);
+
+      txnStatus = new HiveStreamingConnection.TxnState[numTxns];
+      for (int i = 0; i < txnStatus.length; i++) {
+        assert (txnToWriteIds.get(i).getTxnId() == txnIds.get(i));
+        txnStatus[i] = HiveStreamingConnection.TxnState.OPEN; //Open matches Metastore state
+      }
+      this.state = HiveStreamingConnection.TxnState.INACTIVE;
+
+      // initialize record writer with connection and write id info
+      recordWriter.init(conn, txnToWriteIds.get(0).getWriteId(),
+          txnToWriteIds.get(numTxns - 1).getWriteId(), conn.getStatementId());
+      this.minTxnId = new AtomicLong(txnIds.get(0));
+      this.maxTxnId = txnIds.get(txnIds.size() - 1);
+      success = true;
+    } catch (TException e) {
+      throw new StreamingException(conn.toString(), e);
+    } finally {
+      //clean up if above throws
+      markDead(success);
+    }
+  }
+
+  private static class HeartbeatRunnable implements Runnable {
+    private final HiveStreamingConnection conn;
+    private final AtomicLong minTxnId;
+    private final long maxTxnId;
+    private final ReentrantLock transactionLock;
+    private final AtomicBoolean isTxnClosed;
+
+    HeartbeatRunnable(final HiveStreamingConnection conn, final AtomicLong minTxnId, final long maxTxnId,
+        final ReentrantLock transactionLock, final AtomicBoolean isTxnClosed) {
+      this.conn = conn;
+      this.minTxnId = minTxnId;
+      this.maxTxnId = maxTxnId;
+      this.transactionLock = transactionLock;
+      this.isTxnClosed = isTxnClosed;
+    }
+
+    @Override
+    public void run() {
+      transactionLock.lock();
+      try {
+        if (minTxnId.get() > 0) {
+          HeartbeatTxnRangeResponse resp = conn.getHeatbeatMSC().heartbeatTxnRange(minTxnId.get(), maxTxnId);
+          if (!resp.getAborted().isEmpty() || !resp.getNosuch().isEmpty()) {
+            LOG.error("Heartbeat failure: {}", resp.toString());
+            isTxnClosed.set(true);
+          } else {
+            LOG.info("Heartbeat sent for range: [{}-{}]", minTxnId.get(), maxTxnId);
+          }
+        }
+      } catch (TException e) {
+        LOG.warn("Failure to heartbeat for transaction range: [" + minTxnId.get() + "-" + maxTxnId + "]", e);
+      } finally {
+        transactionLock.unlock();
+      }
+    }
+  }
+
+  private void setupHeartBeatThread() {
+    // start heartbeat thread
+    ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
+        .setNameFormat("HiveStreamingConnection-Heartbeat-Thread")
+        .build();
+    this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
+    long heartBeatInterval;
+    long initialDelay;
+    try {
+      // if HIVE_TXN_TIMEOUT is defined, heartbeat interval will be HIVE_TXN_TIMEOUT/2
+      heartBeatInterval = DbTxnManager.getHeartbeatInterval(conn.getConf());
+    } catch (LockException e) {
+      heartBeatInterval = DEFAULT_HEARTBEAT_INTERVAL;
+    }
+    // to introduce some randomness and to avoid hammering the metastore at the same time (same logic as DbTxnManager)
+    initialDelay = (long) (heartBeatInterval * 0.75 * Math.random());
+    LOG.info("Starting heartbeat thread with interval: {} ms initialDelay: {} ms for agentInfo: {}",
+        heartBeatInterval, initialDelay, conn.getAgentInfo());
+    Runnable runnable = new HeartbeatRunnable(conn, minTxnId, maxTxnId, transactionLock, isTxnClosed);
+    this.scheduledExecutorService.scheduleWithFixedDelay(runnable, initialDelay, heartBeatInterval, TimeUnit
+        .MILLISECONDS);
+  }
+
+  private List<Long> openTxnImpl(final String user, final int numTxns) throws TException {
+    return conn.getMSC().openTxns(user, numTxns).getTxn_ids();
+  }
+
+  private List<TxnToWriteId> allocateWriteIdsImpl(final List<Long> txnIds) throws TException {
+    return conn.getMSC().allocateTableWriteIdsBatch(txnIds, conn.getDatabase(),
+        conn.getTable().getTableName());
+  }
+
+  @Override
+  public String toString() {
+    if (txnToWriteIds == null || txnToWriteIds.isEmpty()) {
+      return "{}";
+    }
+    StringBuilder sb = new StringBuilder(" TxnStatus[");
+    for (HiveStreamingConnection.TxnState state : txnStatus) {
+      //'state' should not be null - future proofing
+      sb.append(state == null ? "N" : state);
+    }
+    sb.append("] LastUsed ").append(JavaUtils.txnIdToString(lastTxnUsed));
+    return "TxnId/WriteIds=[" + txnToWriteIds.get(0).getTxnId()
+        + "/" + txnToWriteIds.get(0).getWriteId()
+        + "..."
+        + txnToWriteIds.get(txnToWriteIds.size() - 1).getTxnId()
+        + "/" + txnToWriteIds.get(txnToWriteIds.size() - 1).getWriteId()
+        + "] on connection = " + conn + "; " + sb;
+  }
+
+  public void beginNextTransaction() throws StreamingException {
+    checkIsClosed();
+    beginNextTransactionImpl();
+  }
+
+  private void beginNextTransactionImpl() throws StreamingException {
+    beginNextTransactionImpl("No more transactions available in" +
+        " next batch for connection: " + conn + " user: " + username);
+    lastTxnUsed = getCurrentTxnId();
+    lockRequest = createLockRequest(conn, partNameForLock, username, getCurrentTxnId(), agentInfo);
+    createdPartitions = Sets.newHashSet();
+    try {
+      LockResponse res = conn.getMSC().lock(lockRequest);
+      if (res.getState() != LockState.ACQUIRED) {
+        throw new TransactionError("Unable to acquire lock on " + conn);
+      }
+    } catch (TException e) {
+      throw new TransactionError("Unable to acquire lock on " + conn, e);
+    }
+  }
+
+  public void commitWithPartitions(Set<String> partitions) throws StreamingException {
+    checkIsClosed();
+    boolean success = false;
+    try {
+      commitImpl(partitions);
+      success = true;
+    } finally {
+      markDead(success);
+    }
+  }
+
+  private void commitImpl(Set<String> partitions) throws StreamingException {
+    try {
+      recordWriter.flush();
+      TxnToWriteId txnToWriteId = txnToWriteIds.get(currentTxnIndex);
+      if (conn.isDynamicPartitioning()) {
+        List<String> partNames = new ArrayList<>(recordWriter.getPartitions());
+        createdPartitions.addAll(partNames);
+        if (partitions != null) {
+          partNames.addAll(partitions);
+        }
+        if (!partNames.isEmpty()) {
+          conn.getMSC().addDynamicPartitions(txnToWriteId.getTxnId(),
+              txnToWriteId.getWriteId(), conn.getDatabase(),
+              conn.getTable().getTableName(), partNames,
+              DataOperationType.INSERT);
+        }
+      }
+      transactionLock.lock();
+      try {
+        conn.getMSC().commitTxn(txnToWriteId.getTxnId());
+        // increment the min txn id so that heartbeat thread will heartbeat only from the next open transaction.
+        // the current transaction is going to committed or fail, so don't need heartbeat for current transaction.
+        if (currentTxnIndex + 1 < txnToWriteIds.size()) {
+          minTxnId.set(txnToWriteIds.get(currentTxnIndex + 1).getTxnId());
+        } else {
+          // exhausted the batch, no longer have to heartbeat for current txn batch
+          minTxnId.set(-1);
+        }
+      } finally {
+        transactionLock.unlock();
+      }
+      state = HiveStreamingConnection.TxnState.COMMITTED;
+      txnStatus[currentTxnIndex] = HiveStreamingConnection.TxnState.COMMITTED;
+    } catch (NoSuchTxnException e) {
+      throw new TransactionError("Invalid transaction id : "
+          + getCurrentTxnId(), e);
+    } catch (TxnAbortedException e) {
+      throw new TransactionError("Aborted transaction "
+          + "cannot be committed", e);
+    } catch (TException e) {
+      throw new TransactionError("Unable to commitTransaction transaction"
+          + getCurrentTxnId(), e);
+    }
+  }
+
+  public void abort() throws StreamingException {
+    if (isTxnClosed.get()) {
+      /*
+       * isDead is only set internally by this class.  {@link #markDead(boolean)} will abort all
+       * remaining txns, so make this no-op to make sure that a well-behaved client that calls abortTransaction()
+       * error doesn't get misleading errors
+       */
+      return;
+    }
+    abort(false);
+  }
+
+  private void abort(final boolean abortAllRemaining) throws StreamingException {
+    abortImpl(abortAllRemaining);
+  }
+
+  private void abortImpl(boolean abortAllRemaining) throws StreamingException {
+    if (minTxnId == null) {
+      return;
+    }
+
+    transactionLock.lock();
+    try {
+      if (abortAllRemaining) {
+        // we are aborting all txns in the current batch, so no need to heartbeat
+        minTxnId.set(-1);
+        //when last txn finished (abortTransaction/commitTransaction) the currentTxnIndex is pointing at that txn
+        //so we need to start from next one, if any.  Also if batch was created but
+        //fetchTransactionBatch() was never called, we want to start with first txn
+        int minOpenTxnIndex = Math.max(currentTxnIndex +
+            (state == HiveStreamingConnection.TxnState.ABORTED
+                || state == HiveStreamingConnection.TxnState.COMMITTED
+                ? 1 : 0), 0);
+        for (currentTxnIndex = minOpenTxnIndex;
+             currentTxnIndex < txnToWriteIds.size(); currentTxnIndex++) {
+          conn.getMSC().rollbackTxn(txnToWriteIds.get(currentTxnIndex).getTxnId());
+          txnStatus[currentTxnIndex] = HiveStreamingConnection.TxnState.ABORTED;
+        }
+        currentTxnIndex--; //since the loop left it == txnToWriteIds.size()
+      } else {
+        // we are aborting only the current transaction, so move the min range for heartbeat or disable heartbeat
+        // if the current txn is last in the batch.
+        if (currentTxnIndex + 1 < txnToWriteIds.size()) {
+          minTxnId.set(txnToWriteIds.get(currentTxnIndex + 1).getTxnId());
+        } else {
+          // exhausted the batch, no longer have to heartbeat
+          minTxnId.set(-1);
+        }
+        long currTxnId = getCurrentTxnId();
+        if (currTxnId > 0) {
+          conn.getMSC().rollbackTxn(currTxnId);
+          txnStatus[currentTxnIndex] = HiveStreamingConnection.TxnState.ABORTED;
+        }
+      }
+      state = HiveStreamingConnection.TxnState.ABORTED;
+    } catch (NoSuchTxnException e) {
+      throw new TransactionError("Unable to abort invalid transaction id : "
+          + getCurrentTxnId(), e);
+    } catch (TException e) {
+      throw new TransactionError("Unable to abort transaction id : "
+          + getCurrentTxnId(), e);
+    } finally {
+      transactionLock.unlock();
+    }
+  }
+
+  /**
+   * Close the TransactionBatch.  This will abort any still open txns in this batch.
+   *
+   * @throws StreamingException - failure when closing transaction batch
+   */
+  public void close() throws StreamingException {
+    if (isClosed()) {
+      return;
+    }
+    isTxnClosed.set(true); //also ensures that heartbeat() is no-op since client is likely doing it async
+    try {
+      abort(true); //abort all remaining txns
+    } catch (Exception ex) {
+      LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex);
+      throw new StreamingException("Unable to abort", ex);
+    }
+    try {
+      closeImpl();
+    } catch (Exception ex) {
+      LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex);
+      throw new StreamingException("Unable to close", ex);
+    }
+  }
+
+  private void closeImpl() throws StreamingException {
+    state = HiveStreamingConnection.TxnState.INACTIVE;
+    recordWriter.close();
+    if (scheduledExecutorService != null) {
+      scheduledExecutorService.shutdownNow();
+    }
+  }
+
+  private static LockRequest createLockRequest(final HiveStreamingConnection connection,
+      String partNameForLock, String user, long txnId, String agentInfo) {
+    LockRequestBuilder requestBuilder = new LockRequestBuilder(agentInfo);
+    requestBuilder.setUser(user);
+    requestBuilder.setTransactionId(txnId);
+
+    LockComponentBuilder lockCompBuilder = new LockComponentBuilder()
+        .setDbName(connection.getDatabase())
+        .setTableName(connection.getTable().getTableName())
+        .setShared()
+        .setOperationType(DataOperationType.INSERT);
+    if (connection.isDynamicPartitioning()) {
+      lockCompBuilder.setIsDynamicPartitionWrite(true);
+    }
+    if (partNameForLock != null && !partNameForLock.isEmpty()) {
+      lockCompBuilder.setPartitionName(partNameForLock);
+    }
+    requestBuilder.addLockComponent(lockCompBuilder.build());
+
+    return requestBuilder.build();
+  }
+
+  /**
+   * @return the list of created partitions.
+   */
+  @Override
+  public Set<String> getPartitions() {
+    return createdPartitions;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/bdbd3bcf/streaming/src/java/org/apache/hive/streaming/TransactionError.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/TransactionError.java b/streaming/src/java/org/apache/hive/streaming/TransactionError.java
index ae56e7c..7033012 100644
--- a/streaming/src/java/org/apache/hive/streaming/TransactionError.java
+++ b/streaming/src/java/org/apache/hive/streaming/TransactionError.java
@@ -18,12 +18,15 @@
 
 package org.apache.hive.streaming;
 
+/**
+ * Transaction error.
+ */
 public class TransactionError extends StreamingException {
-  TransactionError(String msg, Exception e) {
+  public TransactionError(String msg, Exception e) {
     super(msg + (e == null ? "" : ": " + e.getMessage()), e);
   }
 
-  TransactionError(String msg) {
+  public TransactionError(String msg) {
     super(msg);
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/bdbd3bcf/streaming/src/java/org/apache/hive/streaming/UnManagedSingleTransaction.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/UnManagedSingleTransaction.java b/streaming/src/java/org/apache/hive/streaming/UnManagedSingleTransaction.java
new file mode 100644
index 0000000..68b0906
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/UnManagedSingleTransaction.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Receives a single writeId. Doesn't open connections to the metastore
+ * so the commit has to be done externally by the entity that created
+ * the writeId.
+ */
+public class UnManagedSingleTransaction extends AbstractStreamingTransaction {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      UnManagedSingleTransaction.class.getName());
+  private final String username;
+  private final HiveStreamingConnection conn;
+  private final Set<String> partitions = Sets.newHashSet();
+
+  public UnManagedSingleTransaction(HiveStreamingConnection conn)
+      throws StreamingException{
+    assert conn.getWriteId() != null;
+
+    this.conn = conn;
+    this.username = conn.getUsername();
+    this.recordWriter = conn.getRecordWriter();
+    this.state = HiveStreamingConnection.TxnState.INACTIVE;
+
+    txnToWriteIds = Lists.newArrayList(new TxnToWriteId(-1,
+        conn.getWriteId()));
+
+    boolean success = false;
+    try {
+      recordWriter.init(conn, txnToWriteIds.get(0).getWriteId(),
+          txnToWriteIds.get(0).getWriteId(), conn.getStatementId());
+      success = true;
+    } finally {
+      markDead(success);
+    }
+  }
+
+  @Override
+  public void beginNextTransaction() throws StreamingException {
+    beginNextTransactionImpl("No more transactions available in" +
+        " next batch for connection: " + conn + " user: " + username);
+  }
+
+  @Override
+  public void commitWithPartitions(Set<String> partitions) throws StreamingException {
+    checkIsClosed();
+    boolean success = false;
+    try {
+      commitImpl();
+      success = true;
+    } finally {
+      markDead(success);
+    }
+  }
+
+  private void commitImpl() throws StreamingException {
+    recordWriter.flush();
+    List<String> partNames = new ArrayList<>(recordWriter.getPartitions());
+    partitions.addAll(partNames);
+    state = HiveStreamingConnection.TxnState.PREPARED_FOR_COMMIT;
+  }
+
+  @Override
+  public void abort() {
+    if (isTxnClosed.get()) {
+      return;
+    }
+    state = HiveStreamingConnection.TxnState.ABORTED;
+  }
+
+  @Override
+  public void close() throws StreamingException {
+    if (isClosed()) {
+      return;
+    }
+    isTxnClosed.set(true);
+    abort();
+    try {
+      closeImpl();
+    } catch (Exception ex) {
+      LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex);
+      throw new StreamingException("Unable to close", ex);
+    }
+  }
+
+  private void closeImpl() throws StreamingException {
+    state = HiveStreamingConnection.TxnState.INACTIVE;
+    recordWriter.close();
+  }
+
+  @Override
+  public String toString() {
+    if (txnToWriteIds == null || txnToWriteIds.isEmpty()) {
+      return "{}";
+    }
+    return "TxnId/WriteIds=[" + txnToWriteIds.get(0).getWriteId()
+        + "] on connection = " + conn + "; " + "status=" + state;
+  }
+
+  /**
+   * @return This class doesn't have a connection to the metastore so it won't
+   * create any partition
+   */
+  @Override
+  public Set<String> getPartitions() {
+    return partitions;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/bdbd3bcf/streaming/src/java/org/apache/hive/streaming/package-info.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/package-info.java b/streaming/src/java/org/apache/hive/streaming/package-info.java
new file mode 100644
index 0000000..a9f3fae
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package grouping streaming classes.
+ */
+package org.apache.hive.streaming;

http://git-wip-us.apache.org/repos/asf/hive/blob/bdbd3bcf/streaming/src/java/org/apache/hive/streaming/package.html
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/package.html b/streaming/src/java/org/apache/hive/streaming/package.html
index 2b45792..19b39e7 100644
--- a/streaming/src/java/org/apache/hive/streaming/package.html
+++ b/streaming/src/java/org/apache/hive/streaming/package.html
@@ -43,7 +43,8 @@ performed to HDFS via Hive wrapper APIs that bypass MetaStore. </p>
 
 <p>
 <b>Note on packaging</b>: The APIs are defined in the 
-<b>org.apache.hive.streaming</b> Java package and included as
+<b>org.apache.hive.streaming</b> and
+<b>org.apache.hive.streaming.transaction</b> Java packages and included as
 the hive-streaming jar.</p>
 
 <h2>STREAMING REQUIREMENTS</h2>