You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2018/10/11 23:23:55 UTC
[1/2] hive git commit: HIVE-20291: Allow HiveStreamingConnection to
receive a WriteId (Jaume Marhuenda reviewed by Prasanth Jayachandran)
Repository: hive
Updated Branches:
refs/heads/master 7c4d48ec2 -> bdbd3bcff
http://git-wip-us.apache.org/repos/asf/hive/blob/bdbd3bcf/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
index 8b5e508..1c9e43f 100644
--- a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
+++ b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
@@ -35,6 +35,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -60,13 +61,13 @@ import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
import org.apache.hadoop.hive.metastore.api.LockState;
import org.apache.hadoop.hive.metastore.api.LockType;
import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
import org.apache.hadoop.hive.metastore.api.TxnInfo;
-import org.apache.hadoop.hive.metastore.api.TxnState;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
@@ -82,6 +83,7 @@ import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
import org.apache.hadoop.hive.ql.io.orc.Reader;
import org.apache.hadoop.hive.ql.io.orc.RecordReader;
+import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.txn.compactor.Worker;
@@ -150,7 +152,8 @@ public class TestStreaming {
if (file.canExecute()) {
mod |= 0111;
}
- return new FileStatus(file.length(), file.isDirectory(), 1, 1024,
+ return new FileStatus(file.length(), file.isDirectory(),
+ 1, 1024,
file.lastModified(), file.lastModified(),
FsPermission.createImmutable(mod), "owen", "users", path);
}
@@ -419,6 +422,123 @@ public class TestStreaming {
}
@Test
+ public void testGetDeltaPath() throws Exception {
+ StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
+ .withFieldDelimiter(',')
+ .build();
+ HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
+ .withDatabase(dbName)
+ .withTable(tblName)
+ .withRecordWriter(writer)
+ .withHiveConf(conf)
+ .connect();
+ Path path = connection.getDeltaFileLocation(partitionVals, 0,
+ 5L, 5L, 9);
+ Assert.assertTrue(path.toString().endsWith("testing.db/alerts/continent"
+ + "=Asia/country=India/delta_0000005_0000005_0009/bucket_00000"));
+ }
+
+ @Test
+ public void testConnectionWithWriteId() throws Exception {
+ queryTable(driver, "drop table if exists default.writeidconnection");
+ queryTable(driver, "create table default.writeidconnection (a string, b string) stored as orc " +
+ "TBLPROPERTIES('transactional'='true')");
+ queryTable(driver, "insert into default.writeidconnection values('a0','bar')");
+
+ List<String> rs = queryTable(driver, "select * from default.writeidconnection");
+ Assert.assertEquals(1, rs.size());
+ Assert.assertEquals("a0\tbar", rs.get(0));
+
+ StrictDelimitedInputWriter writerT = StrictDelimitedInputWriter.newBuilder()
+ .withFieldDelimiter(',')
+ .build();
+ HiveStreamingConnection transactionConnection = HiveStreamingConnection.newBuilder()
+ .withDatabase("Default")
+ .withTable("writeidconnection")
+ .withRecordWriter(writerT)
+ .withHiveConf(conf)
+ .connect();
+ transactionConnection.beginTransaction();
+
+ Table tObject = transactionConnection.getTable();
+ Long writeId = transactionConnection.getCurrentWriteId();
+
+ Assert.assertNotNull(tObject);
+ Assert.assertNotNull(writeId);
+
+ StrictDelimitedInputWriter writerOne = StrictDelimitedInputWriter.newBuilder()
+ .withFieldDelimiter(',')
+ .build();
+ HiveStreamingConnection connectionOne = HiveStreamingConnection.newBuilder()
+ .withDatabase("Default")
+ .withTable("writeidconnection")
+ .withRecordWriter(writerOne)
+ .withHiveConf(conf)
+ .withWriteId(writeId)
+ .withStatementId(1)
+ .withTableObject(tObject)
+ .connect();
+
+ StrictDelimitedInputWriter writerTwo = StrictDelimitedInputWriter.newBuilder()
+ .withFieldDelimiter(',')
+ .build();
+ HiveStreamingConnection connectionTwo = HiveStreamingConnection.newBuilder()
+ .withDatabase("Default")
+ .withRecordWriter(writerTwo)
+ .withHiveConf(conf)
+ .withWriteId(writeId)
+ .withStatementId(2)
+ .withTableObject(tObject)
+ .connect();
+
+ Assert.assertNotNull(connectionOne);
+ Assert.assertNotNull(connectionTwo);
+
+ connectionOne.beginTransaction();
+ connectionTwo.beginTransaction();
+ connectionOne.write("a1,b2".getBytes());
+ connectionTwo.write("a5,b6".getBytes());
+ connectionOne.write("a3,b4".getBytes());
+ connectionOne.commitTransaction();
+ connectionTwo.commitTransaction();
+
+ Assert.assertEquals(HiveStreamingConnection.TxnState.PREPARED_FOR_COMMIT,
+ connectionOne.getCurrentTransactionState());
+ Assert.assertEquals(HiveStreamingConnection.TxnState.PREPARED_FOR_COMMIT,
+ connectionTwo.getCurrentTransactionState());
+
+ try {
+ connectionOne.beginTransaction();
+ Assert.fail("second beginTransaction should have thrown a "
+ + "StreamingException");
+ } catch (StreamingException e) {
+
+ }
+
+ connectionOne.close();
+ connectionTwo.close();
+
+ rs = queryTable(driver, "select ROW__ID, a, b, "
+ + "INPUT__FILE__NAME from default.writeidconnection order by ROW__ID");
+ // Nothing here since it hasn't been committed
+ Assert.assertEquals(1, rs.size());
+
+ transactionConnection.commitTransaction();
+
+ rs = queryTable(driver, "select ROW__ID, a, b, "
+ + "INPUT__FILE__NAME from default.writeidconnection order by a");
+ Assert.assertEquals(4, rs.size());
+ Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\ta0\tbar"));
+ Assert.assertTrue(rs.get(0), rs.get(0).endsWith("bucket_00000"));
+ Assert.assertTrue(rs.get(1), rs.get(1).contains("\"rowid\":0}\ta1\tb2"));
+ Assert.assertTrue(rs.get(1), rs.get(1).endsWith("bucket_00000"));
+ Assert.assertTrue(rs.get(2), rs.get(2).contains("\"rowid\":1}\ta3\tb4"));
+ Assert.assertTrue(rs.get(2), rs.get(2).endsWith("bucket_00000"));
+ Assert.assertTrue(rs.get(3), rs.get(3).contains("\ta5\tb6"));
+ Assert.assertTrue(rs.get(3), rs.get(3).endsWith("bucket_00000"));
+ }
+
+ @Test
public void testAllTypesDelimitedWriter() throws Exception {
queryTable(driver, "drop table if exists default.alltypes");
queryTable(driver,
@@ -619,7 +739,7 @@ public class TestStreaming {
}
/**
- * this is a clone from TestTxnStatement2....
+ * this is a clone from TestHiveStreamingConnection.TxnStatement2....
*/
public static void runWorker(HiveConf hiveConf) throws MetaException {
AtomicBoolean stop = new AtomicBoolean(true);
@@ -956,12 +1076,159 @@ public class TestStreaming {
// Create partition
Assert.assertNotNull(connection);
+ connection.beginTransaction();
+ connection.write("3,Hello streaming - once again".getBytes());
+ connection.commitTransaction();
+
+ // Ensure partition is present
+ Partition p = msClient.getPartition(dbName, tblName, newPartVals);
+ Assert.assertNotNull("Did not find added partition", p);
+ }
+
+ @Test
+ public void testAddPartitionWithWriteId() throws Exception {
+ List<String> newPartVals = new ArrayList<String>(2);
+ newPartVals.add("WriteId_continent");
+ newPartVals.add("WriteId_country");
+
+ StrictDelimitedInputWriter writerT = StrictDelimitedInputWriter.newBuilder()
+ .withFieldDelimiter(',')
+ .build();
+ HiveStreamingConnection transactionConnection = HiveStreamingConnection.newBuilder()
+ .withDatabase(dbName)
+ .withTable(tblName)
+ .withStaticPartitionValues(newPartVals)
+ .withRecordWriter(writerT)
+ .withHiveConf(conf)
+ .connect();
+ transactionConnection.beginTransaction();
+
+ Table tObject = transactionConnection.getTable();
+ Long writeId = transactionConnection.getCurrentWriteId();
+
+ Assert.assertNotNull(tObject);
+ Assert.assertNotNull(writeId);
+
+ StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
+ .withFieldDelimiter(',')
+ .build();
+ HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
+ .withDatabase(dbName)
+ .withTable(tblName)
+ .withStaticPartitionValues(newPartVals)
+ .withRecordWriter(writer)
+ .withHiveConf(conf)
+ .withWriteId(writeId)
+ .withStatementId(1)
+ .withTableObject(tObject)
+ .connect();
+
+ Assert.assertNotNull(connection);
+
+ connection.beginTransaction();
+ connection.write("3,Hello streaming - once again".getBytes());
+ connection.commitTransaction();
+
+ Set<String> partitions = new HashSet<>(connection.getPartitions());
+
+ connection.close();
+
+ // Ensure partition is not present
+ try {
+ msClient.getPartition(dbName, tblName, newPartVals);
+ Assert.fail("Partition shouldn't exist so a NoSuchObjectException should have been raised");
+ } catch (NoSuchObjectException e) {}
+
+ transactionConnection.commitTransactionWithPartition(partitions);
+
// Ensure partition is present
- Partition p = msClient.getPartition(dbName, tblName, partitionVals);
+ Partition p = msClient.getPartition(dbName, tblName, newPartVals);
Assert.assertNotNull("Did not find added partition", p);
}
@Test
+ public void testAddDynamicPartitionWithWriteId() throws Exception {
+ queryTable(driver, "drop table if exists default.writeiddynamic");
+ queryTable(driver, "create table default.writeiddynamic (a"
+ + " string, b string) partitioned by (c string, d string)"
+ + " stored as orc TBLPROPERTIES('transactional'='true')");
+
+ StrictDelimitedInputWriter writerT =
+ StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
+ HiveStreamingConnection transactionConnection =
+ HiveStreamingConnection.newBuilder().withDatabase("default")
+ .withTable("writeiddynamic").withRecordWriter(writerT)
+ .withHiveConf(conf).connect();
+ transactionConnection.beginTransaction();
+
+ Table tObject = transactionConnection.getTable();
+ Long writeId = transactionConnection.getCurrentWriteId();
+
+ Assert.assertNotNull(tObject);
+ Assert.assertNotNull(writeId);
+
+ StrictDelimitedInputWriter writerOne =
+ StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
+ HiveStreamingConnection connectionOne =
+ HiveStreamingConnection.newBuilder().withDatabase("default")
+ .withTable("writeiddynamic").withRecordWriter(writerOne)
+ .withHiveConf(conf).withWriteId(writeId).withStatementId(1)
+ .withTableObject(tObject).connect();
+
+ StrictDelimitedInputWriter writerTwo =
+ StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
+ HiveStreamingConnection connectionTwo =
+ HiveStreamingConnection.newBuilder().withDatabase("default")
+ .withTable("writeiddynamic")
+ .withRecordWriter(writerTwo)
+ .withHiveConf(conf).withWriteId(writeId).withStatementId(1)
+ .withTableObject(tObject)
+ .connect();
+
+ Assert.assertNotNull(connectionOne);
+
+ connectionTwo.beginTransaction();
+ connectionOne.beginTransaction();
+ connectionOne.write("1,2,3,4".getBytes());
+ connectionOne.write("1,2,5,6".getBytes());
+ connectionTwo.write("1,2,30,40".getBytes());
+ connectionOne.write("1,2,7,8".getBytes());
+ connectionTwo.write("1,2,50,60".getBytes());
+ connectionOne.write("1,2,9,10".getBytes());
+ connectionOne.commitTransaction();
+ connectionTwo.commitTransaction();
+
+ Set<String> partitionsOne = new HashSet<>(connectionOne.getPartitions());
+ Assert.assertEquals(4, partitionsOne.size());
+
+ Set<String> partitionsTwo = new HashSet<>(connectionTwo.getPartitions());
+ Assert.assertEquals(2, partitionsTwo.size());
+
+ connectionOne.close();
+ connectionTwo.close();
+
+ try {
+ String partitionName = partitionsOne.iterator().next();
+ msClient.getPartition("default", "writeiddynamic", partitionName);
+ Assert.fail(
+ "Partition shouldn't exist so a NoSuchObjectException should have been raised");
+ } catch (NoSuchObjectException e) {
+ }
+
+ partitionsOne.addAll(partitionsTwo);
+ Set<String> allPartitions = partitionsOne;
+ transactionConnection.commitTransactionWithPartition(allPartitions);
+
+ // Ensure partition is present
+ for (String partition : allPartitions) {
+ Partition p =
+ msClient.getPartition("default", "writeiddynamic",
+ partition);
+ Assert.assertNotNull("Did not find added partition", p);
+ }
+ }
+
+ @Test
public void testTransactionBatchEmptyCommit() throws Exception {
// 1) to partitioned table
StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
@@ -977,8 +1244,8 @@ public class TestStreaming {
.connect();
connection.beginTransaction();
connection.commitTransaction();
- Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED
- , connection.getCurrentTransactionState());
+ Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
+ connection.getCurrentTransactionState());
connection.close();
// 2) To unpartitioned table
@@ -995,8 +1262,8 @@ public class TestStreaming {
connection.beginTransaction();
connection.commitTransaction();
- Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED
- , connection.getCurrentTransactionState());
+ Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
+ connection.getCurrentTransactionState());
connection.close();
}
@@ -1115,8 +1382,8 @@ public class TestStreaming {
connection.beginTransaction();
connection.abortTransaction();
- Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED
- , connection.getCurrentTransactionState());
+ Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED,
+ connection.getCurrentTransactionState());
connection.close();
// 2) to unpartitioned table
@@ -1133,8 +1400,8 @@ public class TestStreaming {
connection.beginTransaction();
connection.abortTransaction();
- Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED
- , connection.getCurrentTransactionState());
+ Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED,
+ connection.getCurrentTransactionState());
connection.close();
}
@@ -1156,20 +1423,20 @@ public class TestStreaming {
// 1st Txn
connection.beginTransaction();
- Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN
- , connection.getCurrentTransactionState());
+ Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN,
+ connection.getCurrentTransactionState());
connection.write("1,Hello streaming".getBytes());
connection.commitTransaction();
checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
- Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED
- , connection.getCurrentTransactionState());
+ Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
+ connection.getCurrentTransactionState());
// 2nd Txn
connection.beginTransaction();
- Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN
- , connection.getCurrentTransactionState());
+ Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN,
+ connection.getCurrentTransactionState());
connection.write("2,Welcome to streaming".getBytes());
// data should not be visible
@@ -1182,8 +1449,8 @@ public class TestStreaming {
connection.close();
- Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE
- , connection.getCurrentTransactionState());
+ Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE,
+ connection.getCurrentTransactionState());
// To Unpartitioned table
@@ -1199,13 +1466,13 @@ public class TestStreaming {
.connect();
// 1st Txn
connection.beginTransaction();
- Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN
- , connection.getCurrentTransactionState());
+ Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN,
+ connection.getCurrentTransactionState());
connection.write("1,Hello streaming".getBytes());
connection.commitTransaction();
- Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED
- , connection.getCurrentTransactionState());
+ Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
+ connection.getCurrentTransactionState());
connection.close();
}
@@ -1227,20 +1494,20 @@ public class TestStreaming {
// 1st Txn
connection.beginTransaction();
- Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN
- , connection.getCurrentTransactionState());
+ Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN,
+ connection.getCurrentTransactionState());
connection.write("1,Hello streaming".getBytes());
connection.commitTransaction();
checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
- Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED
- , connection.getCurrentTransactionState());
+ Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
+ connection.getCurrentTransactionState());
// 2nd Txn
connection.beginTransaction();
- Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN
- , connection.getCurrentTransactionState());
+ Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN,
+ connection.getCurrentTransactionState());
connection.write("2,Welcome to streaming".getBytes());
// data should not be visible
@@ -1252,8 +1519,8 @@ public class TestStreaming {
"{2, Welcome to streaming}");
connection.close();
- Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE
- , connection.getCurrentTransactionState());
+ Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE,
+ connection.getCurrentTransactionState());
// To Unpartitioned table
regex = "([^:]*):(.*)";
@@ -1271,13 +1538,13 @@ public class TestStreaming {
// 1st Txn
connection.beginTransaction();
- Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN
- , connection.getCurrentTransactionState());
+ Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN,
+ connection.getCurrentTransactionState());
connection.write("1:Hello streaming".getBytes());
connection.commitTransaction();
- Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED
- , connection.getCurrentTransactionState());
+ Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
+ connection.getCurrentTransactionState());
connection.close();
}
@@ -1328,20 +1595,20 @@ public class TestStreaming {
// 1st Txn
connection.beginTransaction();
- Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN
- , connection.getCurrentTransactionState());
+ Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN,
+ connection.getCurrentTransactionState());
String rec1 = "{\"id\" : 1, \"msg\": \"Hello streaming\"}";
connection.write(rec1.getBytes());
connection.commitTransaction();
checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
- Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED
- , connection.getCurrentTransactionState());
+ Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
+ connection.getCurrentTransactionState());
connection.close();
- Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE
- , connection.getCurrentTransactionState());
+ Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE,
+ connection.getCurrentTransactionState());
List<String> rs = queryTable(driver, "select * from " + dbName + "." + tblName);
Assert.assertEquals(1, rs.size());
@@ -1399,20 +1666,20 @@ public class TestStreaming {
connection.beginTransaction();
Assert.assertEquals(--initialCount, connection.remainingTransactions());
for (int rec = 0; rec < 2; ++rec) {
- Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN
- , connection.getCurrentTransactionState());
+ Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN,
+ connection.getCurrentTransactionState());
connection.write((batch * rec + ",Hello streaming").getBytes());
}
connection.commitTransaction();
- Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED
- , connection.getCurrentTransactionState());
+ Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
+ connection.getCurrentTransactionState());
++batch;
}
Assert.assertEquals(0, connection.remainingTransactions());
connection.close();
- Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE
- , connection.getCurrentTransactionState());
+ Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE,
+ connection.getCurrentTransactionState());
connection = HiveStreamingConnection.newBuilder()
.withDatabase(dbName)
@@ -1430,20 +1697,20 @@ public class TestStreaming {
connection.beginTransaction();
Assert.assertEquals(--initialCount, connection.remainingTransactions());
for (int rec = 0; rec < 2; ++rec) {
- Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN
- , connection.getCurrentTransactionState());
+ Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN,
+ connection.getCurrentTransactionState());
connection.write((batch * rec + ",Hello streaming").getBytes());
}
connection.abortTransaction();
- Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED
- , connection.getCurrentTransactionState());
+ Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED,
+ connection.getCurrentTransactionState());
++batch;
}
Assert.assertEquals(0, connection.remainingTransactions());
connection.close();
- Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE
- , connection.getCurrentTransactionState());
+ Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE,
+ connection.getCurrentTransactionState());
}
@Test
@@ -1468,8 +1735,8 @@ public class TestStreaming {
checkNothingWritten(partLoc);
- Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED
- , connection.getCurrentTransactionState());
+ Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED,
+ connection.getCurrentTransactionState());
connection.close();
@@ -1507,8 +1774,8 @@ public class TestStreaming {
checkNothingWritten(partLoc);
- Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED
- , connection.getCurrentTransactionState());
+ Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED,
+ connection.getCurrentTransactionState());
connection.beginTransaction();
connection.write("1,Hello streaming".getBytes());
@@ -1577,8 +1844,8 @@ public class TestStreaming {
"2\tWelcome to streaming", "3\tHello streaming - once again",
"4\tWelcome to streaming - once again");
- Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED
- , connection.getCurrentTransactionState());
+ Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
+ connection.getCurrentTransactionState());
connection.close();
}
@@ -1690,10 +1957,10 @@ public class TestStreaming {
"3\tHello streaming - once again",
"4\tWelcome to streaming - once again");
- Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED
- , connection.getCurrentTransactionState());
- Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED
- , connection2.getCurrentTransactionState());
+ Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
+ connection.getCurrentTransactionState());
+ Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
+ connection2.getCurrentTransactionState());
connection.close();
connection2.close();
@@ -2511,7 +2778,8 @@ public class TestStreaming {
}
@Test
- public void testErrorHandling() throws Exception {
+ public void testErrorHandling()
+ throws Exception {
String agentInfo = "UT_" + Thread.currentThread().getName();
runCmdOnDriver("create database testErrors");
runCmdOnDriver("use testErrors");
@@ -2538,8 +2806,12 @@ public class TestStreaming {
GetOpenTxnsInfoResponse r = msClient.showTxns();
Assert.assertEquals("HWM didn'table match", 17, r.getTxn_high_water_mark());
List<TxnInfo> ti = r.getOpen_txns();
- Assert.assertEquals("wrong status ti(0)", TxnState.ABORTED, ti.get(0).getState());
- Assert.assertEquals("wrong status ti(1)", TxnState.ABORTED, ti.get(1).getState());
+ Assert.assertEquals("wrong status ti(0)",
+ org.apache.hadoop.hive.metastore.api.TxnState.ABORTED,
+ ti.get(0).getState());
+ Assert.assertEquals("wrong status ti(1)",
+ org.apache.hadoop.hive.metastore.api.TxnState.ABORTED,
+ ti.get(1).getState());
try {
@@ -2621,10 +2893,16 @@ public class TestStreaming {
r = msClient.showTxns();
Assert.assertEquals("HWM didn't match", 19, r.getTxn_high_water_mark());
ti = r.getOpen_txns();
- Assert.assertEquals("wrong status ti(0)", TxnState.ABORTED, ti.get(0).getState());
- Assert.assertEquals("wrong status ti(1)", TxnState.ABORTED, ti.get(1).getState());
+ Assert.assertEquals("wrong status ti(0)",
+ org.apache.hadoop.hive.metastore.api.TxnState.ABORTED,
+ ti.get(0).getState());
+ Assert.assertEquals("wrong status ti(1)",
+ org.apache.hadoop.hive.metastore.api.TxnState.ABORTED,
+ ti.get(1).getState());
//txnid 3 was committed and thus not open
- Assert.assertEquals("wrong status ti(2)", TxnState.ABORTED, ti.get(2).getState());
+ Assert.assertEquals("wrong status ti(2)",
+ org.apache.hadoop.hive.metastore.api.TxnState.ABORTED,
+ ti.get(2).getState());
connection.close();
writer.disableErrors();
@@ -2651,8 +2929,12 @@ public class TestStreaming {
r = msClient.showTxns();
Assert.assertEquals("HWM didn'table match", 21, r.getTxn_high_water_mark());
ti = r.getOpen_txns();
- Assert.assertEquals("wrong status ti(3)", TxnState.ABORTED, ti.get(3).getState());
- Assert.assertEquals("wrong status ti(4)", TxnState.ABORTED, ti.get(4).getState());
+ Assert.assertEquals("wrong status ti(3)",
+ org.apache.hadoop.hive.metastore.api.TxnState.ABORTED,
+ ti.get(3).getState());
+ Assert.assertEquals("wrong status ti(4)",
+ org.apache.hadoop.hive.metastore.api.TxnState.ABORTED,
+ ti.get(4).getState());
}
// assumes un partitioned table
@@ -2953,5 +3235,12 @@ public class TestStreaming {
void disableErrors() {
shouldThrow = false;
}
+
+ @Override
+ public Path getDeltaFileLocation(List<String> partitionValues,
+ Integer bucketId, Long minWriteId, Long maxWriteId, Integer statementId,
+ Table table) throws StreamingException {
+ return null;
+ }
}
}
[2/2] hive git commit: HIVE-20291: Allow HiveStreamingConnection to
receive a WriteId (Jaume Marhuenda reviewed by Prasanth Jayachandran)
Posted by pr...@apache.org.
HIVE-20291: Allow HiveStreamingConnection to receive a WriteId (Jaume Marhuenda reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/bdbd3bcf
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/bdbd3bcf
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/bdbd3bcf
Branch: refs/heads/master
Commit: bdbd3bcffac9f7fe1d3babb45eb40547b1499bb5
Parents: 7c4d48e
Author: Jaume Marhuenda <ja...@gmail.com>
Authored: Thu Oct 11 16:22:22 2018 -0700
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Thu Oct 11 16:23:34 2018 -0700
----------------------------------------------------------------------
.../hive/streaming/AbstractRecordWriter.java | 65 +-
.../streaming/AbstractStreamingTransaction.java | 156 +++++
.../apache/hive/streaming/ConnectionStats.java | 38 +-
.../hive/streaming/HiveStreamingConnection.java | 697 ++++++-------------
.../hive/streaming/InvalidTransactionState.java | 5 +-
.../apache/hive/streaming/PartitionInfo.java | 1 +
.../org/apache/hive/streaming/RecordWriter.java | 38 +-
.../hive/streaming/StreamingConnection.java | 43 ++
.../hive/streaming/StreamingTransaction.java | 113 +++
.../apache/hive/streaming/TransactionBatch.java | 430 ++++++++++++
.../apache/hive/streaming/TransactionError.java | 7 +-
.../streaming/UnManagedSingleTransaction.java | 135 ++++
.../org/apache/hive/streaming/package-info.java | 22 +
.../java/org/apache/hive/streaming/package.html | 3 +-
.../apache/hive/streaming/TestStreaming.java | 433 ++++++++++--
15 files changed, 1616 insertions(+), 570 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/bdbd3bcf/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
index 9e90d36..88a7d82 100644
--- a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
+++ b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
@@ -46,9 +46,11 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.LlapUtil;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.RecordUpdater;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
@@ -66,6 +68,7 @@ public abstract class AbstractRecordWriter implements RecordWriter {
private static final Logger LOG = LoggerFactory.getLogger(AbstractRecordWriter.class.getName());
private static final String DEFAULT_LINE_DELIMITER_PATTERN = "[\r\n]";
+ private Integer statementId;
protected HiveConf conf;
protected StreamingConnection conn;
protected Table table;
@@ -128,13 +131,21 @@ public abstract class AbstractRecordWriter implements RecordWriter {
}
@Override
- public void init(StreamingConnection conn, long minWriteId, long maxWriteId) throws StreamingException {
+ public void init(StreamingConnection conn, long minWriteId, long maxWriteId)
+ throws StreamingException {
+ init(conn, minWriteId, maxWriteId, -1);
+ }
+
+ @Override
+ public void init(StreamingConnection conn, long minWriteId, long maxWriteId,
+ int statementId) throws StreamingException {
if (conn == null) {
throw new StreamingException("Streaming connection cannot be null during record writer initialization");
}
this.conn = conn;
this.curBatchMinWriteId = minWriteId;
this.curBatchMaxWriteId = maxWriteId;
+ this.statementId = statementId;
this.conf = conn.getHiveConf();
this.defaultPartitionName = conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME);
this.table = conn.getTable();
@@ -431,6 +442,7 @@ public abstract class AbstractRecordWriter implements RecordWriter {
int bucket = getBucket(encodedRow);
List<String> partitionValues = getPartitionValues(encodedRow);
getRecordUpdater(partitionValues, bucket).insert(writeId, encodedRow);
+
// ingest size bytes gets resetted on flush() whereas connection stats is not
conn.getConnectionStats().incrementRecordsWritten();
conn.getConnectionStats().incrementRecordsSize(record.length);
@@ -492,10 +504,53 @@ public abstract class AbstractRecordWriter implements RecordWriter {
.tableProperties(tblProperties)
.minimumWriteId(minWriteId)
.maximumWriteId(maxWriteID)
- .statementId(-1)
+ .statementId(statementId)
.finalDestination(partitionPath));
}
+ /**
+ * Returns the file that would be used to store rows under this.
+ * parameters
+ * @param partitionValues partition values
+ * @param bucketId bucket id
+ * @param minWriteId min write Id
+ * @param maxWriteId max write Id
+ * @param statementId statement Id
+ * @param table table
+ * @return the location of the file.
+ * @throws StreamingException when the path is not found
+ */
+ @Override
+ public Path getDeltaFileLocation(List<String> partitionValues,
+ Integer bucketId, Long minWriteId, Long maxWriteId, Integer statementId,
+ Table table) throws StreamingException {
+ Path destLocation;
+ if (partitionValues == null) {
+ destLocation = new Path(table.getSd().getLocation());
+ } else {
+ Map<String, String> partSpec = Warehouse.makeSpecFromValues(
+ table.getPartitionKeys(), partitionValues);
+ try {
+ destLocation = new Path(table.getDataLocation(), Warehouse.makePartPath(partSpec));
+ } catch (MetaException e) {
+ throw new StreamingException("Unable to retrieve the delta file location"
+ + " for values: " + partitionValues
+ + ", minWriteId: " + minWriteId
+ + ", maxWriteId: " + maxWriteId
+ + ", statementId: " + statementId, e);
+ }
+ }
+ AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
+ .filesystem(fs)
+ .inspector(outputRowObjectInspector)
+ .bucket(bucketId)
+ .minimumWriteId(minWriteId)
+ .maximumWriteId(maxWriteId)
+ .statementId(statementId)
+ .finalDestination(destLocation);
+ return AcidUtils.createFilename(destLocation, options);
+ }
+
protected RecordUpdater getRecordUpdater(List<String> partitionValues, int bucketId) throws StreamingIOFailure {
RecordUpdater recordUpdater;
String key;
@@ -516,12 +571,10 @@ public abstract class AbstractRecordWriter implements RecordWriter {
// partitions to TxnHandler
if (!partitionInfo.isExists()) {
addedPartitions.add(partitionInfo.getName());
- if (LOG.isDebugEnabled()) {
- LOG.debug("Created partition {} for table {}", partitionInfo.getName(), fullyQualifiedTableName);
- }
} else {
if (LOG.isDebugEnabled()) {
- LOG.debug("Partition {} already exists for table {}", partitionInfo.getName(), fullyQualifiedTableName);
+ LOG.debug("Partition {} already exists for table {}",
+ partitionInfo.getName(), fullyQualifiedTableName);
}
}
destLocation = new Path(partitionInfo.getPartitionLocation());
http://git-wip-us.apache.org/repos/asf/hive/blob/bdbd3bcf/streaming/src/java/org/apache/hive/streaming/AbstractStreamingTransaction.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/AbstractStreamingTransaction.java b/streaming/src/java/org/apache/hive/streaming/AbstractStreamingTransaction.java
new file mode 100644
index 0000000..a99fdba
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/AbstractStreamingTransaction.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
+
+import java.io.InputStream;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Common methods for the implementing classes.
+ */
+abstract class AbstractStreamingTransaction
+ implements StreamingTransaction {
+
+ /**
+ * This variable should be initialized by the children.
+ */
+ protected RecordWriter recordWriter;
+
+ /**
+ * This variable should be initialized by the children.
+ */
+ protected List<TxnToWriteId> txnToWriteIds;
+
+
+ /**
+ * once any operation on this batch encounters a system exception
+ * (e.g. IOException on write) it's safest to assume that we can't write to the
+ * file backing this batch any more. This guards important public methods
+ */
+ protected final AtomicBoolean isTxnClosed = new AtomicBoolean(false);
+
+ protected int currentTxnIndex = -1;
+ protected HiveStreamingConnection.TxnState state;
+
+
+ protected void checkIsClosed() throws StreamingException {
+ if (isTxnClosed.get()) {
+ throw new StreamingException("Transaction" + toString() + " is closed()");
+ }
+ }
+
+ protected void beginNextTransactionImpl(String errorMessage)
+ throws StreamingException{
+ state = HiveStreamingConnection.TxnState.INACTIVE; //clear state from previous txn
+ if ((currentTxnIndex + 1) >= txnToWriteIds.size()) {
+ throw new InvalidTransactionState(errorMessage);
+ }
+ currentTxnIndex++;
+ state = HiveStreamingConnection.TxnState.OPEN;
+ }
+
+ public void write(final byte[] record) throws StreamingException {
+ checkIsClosed();
+ boolean success = false;
+ try {
+ recordWriter.write(getCurrentWriteId(), record);
+ success = true;
+ } catch (SerializationError ex) {
+ //this exception indicates that a {@code record} could not be parsed and the
+ //caller can decide whether to drop it or send it to dead letter queue.
+ //rolling back the txn and retrying won't help since the tuple will be exactly the same
+ //when it's replayed.
+ success = true;
+ throw ex;
+ } finally {
+ markDead(success);
+ }
+ }
+
+ public void write(final InputStream inputStream) throws StreamingException {
+ checkIsClosed();
+ boolean success = false;
+ try {
+ recordWriter.write(getCurrentWriteId(), inputStream);
+ success = true;
+ } catch (SerializationError ex) {
+ //this exception indicates that a {@code record} could not be parsed and the
+ //caller can decide whether to drop it or send it to dead letter queue.
+ //rolling back the txn and retrying won'table help since the tuple will be exactly the same
+ //when it's replayed.
+ success = true;
+ throw ex;
+ } finally {
+ markDead(success);
+ }
+ }
+
+ /**
+ * A transaction batch opens a single HDFS file and writes multiple transaction to it. If there is any issue
+ * with the write, we can't continue to write to the same file any as it may be corrupted now (at the tail).
+ * This ensures that a client can't ignore these failures and continue to write.
+ */
+ protected void markDead(boolean success) throws StreamingException {
+ if (success) {
+ return;
+ }
+ close();
+ }
+
+ public long getCurrentWriteId() {
+ if (currentTxnIndex >= 0) {
+ return txnToWriteIds.get(currentTxnIndex).getWriteId();
+ }
+ return -1L;
+ }
+
+ public int remainingTransactions() {
+ if (currentTxnIndex >= 0) {
+ return txnToWriteIds.size() - currentTxnIndex - 1;
+ }
+ return txnToWriteIds.size();
+ }
+
+ public boolean isClosed() {
+ return isTxnClosed.get();
+ }
+
+ public HiveStreamingConnection.TxnState getCurrentTransactionState() {
+ return state;
+ }
+
+ public long getCurrentTxnId() {
+ if (currentTxnIndex >= 0) {
+ return txnToWriteIds.get(currentTxnIndex).getTxnId();
+ }
+ return -1L;
+ }
+
+ @Override
+ public List<TxnToWriteId> getTxnToWriteIds() {
+ return txnToWriteIds;
+ }
+
+ public void commit() throws StreamingException {
+ commitWithPartitions(null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/bdbd3bcf/streaming/src/java/org/apache/hive/streaming/ConnectionStats.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/ConnectionStats.java b/streaming/src/java/org/apache/hive/streaming/ConnectionStats.java
index 355456e..c0e7d5c 100644
--- a/streaming/src/java/org/apache/hive/streaming/ConnectionStats.java
+++ b/streaming/src/java/org/apache/hive/streaming/ConnectionStats.java
@@ -31,6 +31,16 @@ public class ConnectionStats {
private LongAdder autoFlushCount = new LongAdder();
private LongAdder metastoreCalls = new LongAdder();
+ /**
+ * Total partitions that have been affected.
+ */
+ private LongAdder totalPartitions = new LongAdder();
+
+ /**
+ * Number of partitions that where created.
+ */
+ private LongAdder createdPartitions = new LongAdder();
+
public void incrementRecordsWritten() {
recordsWritten.increment();
}
@@ -55,6 +65,22 @@ public class ConnectionStats {
recordsSize.add(delta);
}
+ /**
+ * Increment by delta the number of created partitions.
+ * @param delta to increment by.
+ */
+ public void incrementCreatedPartitions(long delta) {
+ createdPartitions.add(delta);
+ }
+
+ /**
+ * Increment by delta the total partitions.
+ * @param delta to increment by.
+ */
+ public void incrementTotalPartitions(long delta) {
+ totalPartitions.add(delta);
+ }
+
public long getRecordsWritten() {
return recordsWritten.longValue();
}
@@ -79,10 +105,20 @@ public class ConnectionStats {
return metastoreCalls.longValue();
}
+ public LongAdder getTotalPartitions() {
+ return totalPartitions;
+ }
+
+ public LongAdder getCreatedPartitions() {
+ return createdPartitions;
+ }
+
@Override
public String toString() {
return "{records-written: " + recordsWritten + ", records-size: "+ recordsSize + ", committed-transactions: " +
committedTransactions + ", aborted-transactions: " + abortedTransactions + ", auto-flushes: " + autoFlushCount +
- ", metastore-calls: " + metastoreCalls + " }";
+ ", metastore-calls: " + metastoreCalls
+ + ", created-partitions: " + createdPartitions
+ + ", total-partitions: " + totalPartitions + " }";
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/bdbd3bcf/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
index 6cf14b0..f79b844 100644
--- a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
+++ b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
@@ -23,43 +23,27 @@ import java.io.InputStream;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.LockComponentBuilder;
-import org.apache.hadoop.hive.metastore.LockRequestBuilder;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
-import org.apache.hadoop.hive.metastore.api.DataOperationType;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse;
-import org.apache.hadoop.hive.metastore.api.LockRequest;
-import org.apache.hadoop.hive.metastore.api.LockResponse;
-import org.apache.hadoop.hive.metastore.api.LockState;
import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
-import org.apache.hadoop.hive.ql.lockmgr.LockException;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Table;
@@ -71,7 +55,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* Streaming connection implementation for hive. To create a streaming connection, use the builder API
@@ -120,11 +103,11 @@ public class HiveStreamingConnection implements StreamingConnection {
private static final String DEFAULT_METASTORE_URI = "thrift://localhost:9083";
private static final int DEFAULT_TRANSACTION_BATCH_SIZE = 1;
- private static final int DEFAULT_HEARTBEAT_INTERVAL = 60 * 1000;
private static final boolean DEFAULT_STREAMING_OPTIMIZATIONS_ENABLED = true;
public enum TxnState {
- INACTIVE("I"), OPEN("O"), COMMITTED("C"), ABORTED("A");
+ INACTIVE("I"), OPEN("O"), COMMITTED("C"), ABORTED("A"),
+ PREPARED_FOR_COMMIT("P");
private final String code;
@@ -144,7 +127,7 @@ public class HiveStreamingConnection implements StreamingConnection {
private String agentInfo;
private int transactionBatchSize;
private RecordWriter recordWriter;
- private TransactionBatch currentTransactionBatch;
+ private StreamingTransaction currentTransactionBatch;
private HiveConf conf;
private boolean streamingOptimizations;
private AtomicBoolean isConnectionClosed = new AtomicBoolean(false);
@@ -158,6 +141,11 @@ public class HiveStreamingConnection implements StreamingConnection {
private Table tableObject = null;
private String metastoreUri;
private ConnectionStats connectionStats;
+ private final Long writeId;
+ private final Integer statementId;
+ private boolean manageTransactions;
+ private int countTransactions = 0;
+ private Set<String> partitions;
private HiveStreamingConnection(Builder builder) throws StreamingException {
this.database = builder.database.toLowerCase();
@@ -166,6 +154,12 @@ public class HiveStreamingConnection implements StreamingConnection {
this.conf = builder.hiveConf;
this.agentInfo = builder.agentInfo;
this.streamingOptimizations = builder.streamingOptimizations;
+ this.writeId = builder.writeId;
+ this.statementId = builder.statementId;
+ this.tableObject = builder.tableObject;
+ this.setPartitionedTable(builder.isPartitioned);
+ this.manageTransactions = builder.manageTransactions;
+
UserGroupInformation loggedInUser = null;
try {
loggedInUser = UserGroupInformation.getLoginUser();
@@ -193,13 +187,18 @@ public class HiveStreamingConnection implements StreamingConnection {
if (conf == null) {
conf = createHiveConf(this.getClass(), DEFAULT_METASTORE_URI);
}
+
overrideConfSettings(conf);
- this.metastoreUri = conf.get(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName());
- this.msClient = getMetaStoreClient(conf, metastoreUri, secureMode, "streaming-connection");
- // We use a separate metastore client for heartbeat calls to ensure heartbeat RPC calls are
- // isolated from the other transaction related RPC calls.
- this.heartbeatMSClient = getMetaStoreClient(conf, metastoreUri, secureMode, "streaming-connection-heartbeat");
- validateTable();
+ if (manageTransactions) {
+ this.metastoreUri = conf.get(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName());
+ this.msClient = getMetaStoreClient(conf, metastoreUri, secureMode,
+ "streaming-connection");
+ // We use a separate metastore client for heartbeat calls to ensure heartbeat RPC calls are
+ // isolated from the other transaction related RPC calls.
+ this.heartbeatMSClient = getMetaStoreClient(conf, metastoreUri, secureMode,
+ "streaming-connection-heartbeat");
+ validateTable();
+ }
LOG.info("STREAMING CONNECTION INFO: {}", toConnectionInfoString());
}
@@ -217,6 +216,11 @@ public class HiveStreamingConnection implements StreamingConnection {
private int transactionBatchSize = DEFAULT_TRANSACTION_BATCH_SIZE;
private boolean streamingOptimizations = DEFAULT_STREAMING_OPTIMIZATIONS_ENABLED;
private RecordWriter recordWriter;
+ private long writeId = -1;
+ private int statementId = -1;
+ private boolean manageTransactions = true;
+ private Table tableObject;
+ private boolean isPartitioned;
/**
* Specify database to use for streaming connection.
@@ -315,6 +319,44 @@ public class HiveStreamingConnection implements StreamingConnection {
}
/**
+ * Specify this parameter if we want the current connection
+ * to join an ongoing transaction without having to query
+ * the metastore to create it.
+ * @param writeId write id
+ * @return builder
+ */
+ public Builder withWriteId(final long writeId) {
+ this.writeId = writeId;
+ manageTransactions = false;
+ return this;
+ }
+
+ /**
+ * Specify this parameter to set an statement id in the writer.
+ * This really only makes sense to be specified when a writeId is
+ * provided as well
+ * @param statementId statement id
+ * @return builder
+ */
+ public Builder withStatementId(final int statementId) {
+ this.statementId = statementId;
+ return this;
+ }
+
+ /**
+ * Specify the table object since sometimes no connections
+ * to the metastore will be opened.
+ * @param table table object.
+ * @return builder
+ */
+ public Builder withTableObject(Table table) {
+ this.tableObject = table;
+ this.isPartitioned = tableObject.getPartitionKeys() != null
+ && !tableObject.getPartitionKeys().isEmpty();
+ return this;
+ }
+
+ /**
* Returning a streaming connection to hive.
*
* @return - hive streaming connection
@@ -324,11 +366,27 @@ public class HiveStreamingConnection implements StreamingConnection {
throw new StreamingException("Database cannot be null for streaming connection");
}
if (table == null) {
- throw new StreamingException("Table cannot be null for streaming connection");
+ if (tableObject == null) {
+ throw new StreamingException("Table and table object cannot be "
+ + "null for streaming connection");
+ } else {
+ table = tableObject.getTableName();
+ }
+ }
+
+ if (tableObject != null && !tableObject.getTableName().equals(table)) {
+ throw new StreamingException("Table must match tableObject table name");
}
+
if (recordWriter == null) {
throw new StreamingException("Record writer cannot be null for streaming connection");
}
+ if ((writeId != -1 && tableObject == null) ||
+ (writeId == -1 && tableObject != null)){
+ throw new StreamingException("If writeId is set, tableObject "
+ + "must be set as well and vice versa");
+ }
+
HiveStreamingConnection streamingConnection = new HiveStreamingConnection(this);
// assigning higher priority than FileSystem shutdown hook so that streaming connection gets closed first before
// filesystem close (to avoid ClosedChannelException)
@@ -338,7 +396,7 @@ public class HiveStreamingConnection implements StreamingConnection {
}
}
- private void setPartitionedTable(boolean isPartitionedTable) {
+ private void setPartitionedTable(Boolean isPartitionedTable) {
this.isPartitionedTable = isPartitionedTable;
}
@@ -356,7 +414,9 @@ public class HiveStreamingConnection implements StreamingConnection {
"username: " + username + ", " +
"secure-mode: " + secureMode + ", " +
"record-writer: " + recordWriter.getClass().getSimpleName() + ", " +
- "agent-info: " + agentInfo + " }";
+ "agent-info: " + agentInfo + ", " +
+ "writeId: " + writeId + ", " +
+ "statementId: " + statementId + " }";
}
@VisibleForTesting
@@ -369,6 +429,7 @@ public class HiveStreamingConnection implements StreamingConnection {
String partLocation = null;
String partName = null;
boolean exists = false;
+
try {
Map<String, String> partSpec = Warehouse.makeSpecFromValues(tableObject.getPartitionKeys(), partitionValues);
AddPartitionDesc addPartitionDesc = new AddPartitionDesc(database, table, true);
@@ -376,7 +437,18 @@ public class HiveStreamingConnection implements StreamingConnection {
partLocation = new Path(tableObject.getDataLocation(), Warehouse.makePartPath(partSpec)).toString();
addPartitionDesc.addPartition(partSpec, partLocation);
Partition partition = Hive.convertAddSpecToMetaPartition(tableObject, addPartitionDesc.getPartition(0), conf);
+
+ if (getMSC() == null) {
+ // We assume it doesn't exist if we can't check it
+ // so the driver will decide
+ return new PartitionInfo(partName, partLocation, false);
+ }
+
getMSC().add_partition(partition);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Created partition {} for table {}", partName,
+ tableObject.getFullyQualifiedName());
+ }
} catch (AlreadyExistsException e) {
exists = true;
} catch (HiveException | TException e) {
@@ -386,6 +458,25 @@ public class HiveStreamingConnection implements StreamingConnection {
return new PartitionInfo(partName, partLocation, exists);
}
+ /**
+ * Returns the file that would be used to store rows under this.
+ * parameters
+ * @param partitionValues partition values
+ * @param bucketId bucket id
+ * @param minWriteId min write Id
+ * @param maxWriteId max write Id
+ * @param statementId statement Id
+ * @return the location of the file.
+ * @throws StreamingException when the path is not found
+ */
+ @Override
+ public Path getDeltaFileLocation(List<String> partitionValues,
+ Integer bucketId, Long minWriteId, Long maxWriteId, Integer statementId)
+ throws StreamingException {
+ return recordWriter.getDeltaFileLocation(partitionValues,
+ bucketId, minWriteId, maxWriteId, statementId, tableObject);
+ }
+
IMetaStoreClient getMSC() {
connectionStats.incrementMetastoreCalls();
return msClient;
@@ -424,43 +515,6 @@ public class HiveStreamingConnection implements StreamingConnection {
}
}
- private static class HeartbeatRunnable implements Runnable {
- private final HiveStreamingConnection conn;
- private final AtomicLong minTxnId;
- private final long maxTxnId;
- private final ReentrantLock transactionLock;
- private final AtomicBoolean isTxnClosed;
-
- HeartbeatRunnable(final HiveStreamingConnection conn, final AtomicLong minTxnId, final long maxTxnId,
- final ReentrantLock transactionLock, final AtomicBoolean isTxnClosed) {
- this.conn = conn;
- this.minTxnId = minTxnId;
- this.maxTxnId = maxTxnId;
- this.transactionLock = transactionLock;
- this.isTxnClosed = isTxnClosed;
- }
-
- @Override
- public void run() {
- transactionLock.lock();
- try {
- if (minTxnId.get() > 0) {
- HeartbeatTxnRangeResponse resp = conn.getHeatbeatMSC().heartbeatTxnRange(minTxnId.get(), maxTxnId);
- if (!resp.getAborted().isEmpty() || !resp.getNosuch().isEmpty()) {
- LOG.error("Heartbeat failure: {}", resp.toString());
- isTxnClosed.set(true);
- } else {
- LOG.info("Heartbeat sent for range: [{}-{}]", minTxnId.get(), maxTxnId);
- }
- }
- } catch (TException e) {
- LOG.warn("Failure to heartbeat for transaction range: [" + minTxnId.get() + "-" + maxTxnId + "]", e);
- } finally {
- transactionLock.unlock();
- }
- }
- }
-
private void beginNextTransaction() throws StreamingException {
if (currentTransactionBatch == null) {
currentTransactionBatch = createNewTransactionBatch();
@@ -481,8 +535,18 @@ public class HiveStreamingConnection implements StreamingConnection {
currentTransactionBatch.beginNextTransaction();
}
- private TransactionBatch createNewTransactionBatch() throws StreamingException {
- return new TransactionBatch(this);
+ private StreamingTransaction createNewTransactionBatch() throws StreamingException {
+ countTransactions++;
+ if (manageTransactions) {
+ return new TransactionBatch(this);
+ } else {
+ if (countTransactions > 1) {
+ throw new StreamingException("If a writeId is passed for the "
+ + "construction of HiveStreaming only one transaction batch"
+ + " can be done");
+ }
+ return new UnManagedSingleTransaction(this);
+ }
}
private void checkClosedState() throws StreamingException {
@@ -496,7 +560,7 @@ public class HiveStreamingConnection implements StreamingConnection {
if (currentTransactionBatch == null) {
throw new StreamingException("Transaction batch is null. Missing beginTransaction?");
}
- if (currentTransactionBatch.state != TxnState.OPEN) {
+ if (currentTransactionBatch.getCurrentTransactionState() != TxnState.OPEN) {
throw new StreamingException("Transaction state is not OPEN. Missing beginTransaction?");
}
}
@@ -504,13 +568,40 @@ public class HiveStreamingConnection implements StreamingConnection {
@Override
public void beginTransaction() throws StreamingException {
checkClosedState();
+ partitions = new HashSet<>();
beginNextTransaction();
}
@Override
public void commitTransaction() throws StreamingException {
+ commitTransactionWithPartition(null);
+ }
+
+ @Override
+ public void commitTransactionWithPartition(Set<String> partitions)
+ throws StreamingException {
checkState();
- currentTransactionBatch.commit();
+
+ Set<String> createdPartitions = new HashSet<>();
+ if (partitions != null) {
+ for (String partition: partitions) {
+ try {
+ PartitionInfo info = createPartitionIfNotExists(
+ Warehouse.getPartValuesFromPartName(partition));
+ if (!info.isExists()) {
+ createdPartitions.add(partition);
+ }
+ } catch (MetaException e) {
+ throw new StreamingException("Partition " + partition + " is invalid.", e);
+ }
+ }
+ connectionStats.incrementTotalPartitions(partitions.size());
+ }
+
+ currentTransactionBatch.commitWithPartitions(createdPartitions);
+ this.partitions.addAll(
+ currentTransactionBatch.getPartitions());
+ connectionStats.incrementCreatedPartitions(createdPartitions.size());
connectionStats.incrementCommittedTransactions();
}
@@ -549,8 +640,10 @@ public class HiveStreamingConnection implements StreamingConnection {
} catch (StreamingException e) {
LOG.warn("Unable to close current transaction batch: " + currentTransactionBatch, e);
} finally {
- getMSC().close();
- getHeatbeatMSC().close();
+ if (manageTransactions) {
+ getMSC().close();
+ getHeatbeatMSC().close();
+ }
}
if (LOG.isInfoEnabled()) {
LOG.info("Closed streaming connection. Agent: {} Stats: {}", getAgentInfo(), getConnectionStats());
@@ -595,418 +688,6 @@ public class HiveStreamingConnection implements StreamingConnection {
return currentTransactionBatch.getCurrentTxnId();
}
- private static class TransactionBatch {
- private String username;
- private HiveStreamingConnection conn;
- private ScheduledExecutorService scheduledExecutorService;
- private RecordWriter recordWriter;
- private String partNameForLock = null;
- private List<TxnToWriteId> txnToWriteIds;
- private int currentTxnIndex = -1;
- private TxnState state;
- private LockRequest lockRequest = null;
- // heartbeats can only be sent for open transactions.
- // there is a race between committing/aborting a transaction and heartbeat.
- // Example: If a heartbeat is sent for committed txn, exception will be thrown.
- // Similarly if we don't send a heartbeat, metastore server might abort a txn
- // for missed heartbeat right before commit txn call.
- // This lock is used to mutex commit/abort and heartbeat calls
- private final ReentrantLock transactionLock = new ReentrantLock();
- // min txn id is incremented linearly within a transaction batch.
- // keeping minTxnId atomic as it is shared with heartbeat thread
- private final AtomicLong minTxnId;
- // max txn id does not change for a transaction batch
- private final long maxTxnId;
-
- /**
- * once any operation on this batch encounters a system exception
- * (e.g. IOException on write) it's safest to assume that we can't write to the
- * file backing this batch any more. This guards important public methods
- */
- private final AtomicBoolean isTxnClosed = new AtomicBoolean(false);
- private String agentInfo;
- private int numTxns;
- /**
- * Tracks the state of each transaction
- */
- private TxnState[] txnStatus;
- /**
- * ID of the last txn used by {@link #beginNextTransactionImpl()}
- */
- private long lastTxnUsed;
-
- /**
- * Represents a batch of transactions acquired from MetaStore
- *
- * @param conn - hive streaming connection
- * @throws StreamingException if failed to create new RecordUpdater for batch
- */
- private TransactionBatch(HiveStreamingConnection conn) throws StreamingException {
- boolean success = false;
- try {
- if (conn.isPartitionedTable() && !conn.isDynamicPartitioning()) {
- List<FieldSchema> partKeys = conn.tableObject.getPartitionKeys();
- partNameForLock = Warehouse.makePartName(partKeys, conn.staticPartitionValues);
- }
- this.conn = conn;
- this.username = conn.username;
- this.recordWriter = conn.recordWriter;
- this.agentInfo = conn.agentInfo;
- this.numTxns = conn.transactionBatchSize;
-
- setupHeartBeatThread();
-
- List<Long> txnIds = openTxnImpl(username, numTxns);
- txnToWriteIds = allocateWriteIdsImpl(txnIds);
- assert (txnToWriteIds.size() == numTxns);
-
- txnStatus = new TxnState[numTxns];
- for (int i = 0; i < txnStatus.length; i++) {
- assert (txnToWriteIds.get(i).getTxnId() == txnIds.get(i));
- txnStatus[i] = TxnState.OPEN; //Open matches Metastore state
- }
- this.state = TxnState.INACTIVE;
-
- // initialize record writer with connection and write id info
- recordWriter.init(conn, txnToWriteIds.get(0).getWriteId(), txnToWriteIds.get(numTxns - 1).getWriteId());
- this.minTxnId = new AtomicLong(txnIds.get(0));
- this.maxTxnId = txnIds.get(txnIds.size() - 1);
- success = true;
- } catch (TException e) {
- throw new StreamingException(conn.toString(), e);
- } finally {
- //clean up if above throws
- markDead(success);
- }
- }
-
- private void setupHeartBeatThread() {
- // start heartbeat thread
- ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
- .setNameFormat("HiveStreamingConnection-Heartbeat-Thread")
- .build();
- this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
- long heartBeatInterval;
- long initialDelay;
- try {
- // if HIVE_TXN_TIMEOUT is defined, heartbeat interval will be HIVE_TXN_TIMEOUT/2
- heartBeatInterval = DbTxnManager.getHeartbeatInterval(conn.conf);
- } catch (LockException e) {
- heartBeatInterval = DEFAULT_HEARTBEAT_INTERVAL;
- }
- // to introduce some randomness and to avoid hammering the metastore at the same time (same logic as DbTxnManager)
- initialDelay = (long) (heartBeatInterval * 0.75 * Math.random());
- LOG.info("Starting heartbeat thread with interval: {} ms initialDelay: {} ms for agentInfo: {}",
- heartBeatInterval, initialDelay, conn.agentInfo);
- Runnable runnable = new HeartbeatRunnable(conn, minTxnId, maxTxnId, transactionLock, isTxnClosed);
- this.scheduledExecutorService.scheduleWithFixedDelay(runnable, initialDelay, heartBeatInterval, TimeUnit
- .MILLISECONDS);
- }
-
- private List<Long> openTxnImpl(final String user, final int numTxns) throws TException {
- return conn.getMSC().openTxns(user, numTxns).getTxn_ids();
- }
-
- private List<TxnToWriteId> allocateWriteIdsImpl(final List<Long> txnIds) throws TException {
- return conn.getMSC().allocateTableWriteIdsBatch(txnIds, conn.database, conn.table);
- }
-
- @Override
- public String toString() {
- if (txnToWriteIds == null || txnToWriteIds.isEmpty()) {
- return "{}";
- }
- StringBuilder sb = new StringBuilder(" TxnStatus[");
- for (TxnState state : txnStatus) {
- //'state' should not be null - future proofing
- sb.append(state == null ? "N" : state);
- }
- sb.append("] LastUsed ").append(JavaUtils.txnIdToString(lastTxnUsed));
- return "TxnId/WriteIds=[" + txnToWriteIds.get(0).getTxnId()
- + "/" + txnToWriteIds.get(0).getWriteId()
- + "..."
- + txnToWriteIds.get(txnToWriteIds.size() - 1).getTxnId()
- + "/" + txnToWriteIds.get(txnToWriteIds.size() - 1).getWriteId()
- + "] on connection = " + conn + "; " + sb;
- }
-
- private void beginNextTransaction() throws StreamingException {
- checkIsClosed();
- beginNextTransactionImpl();
- }
-
- private void beginNextTransactionImpl() throws TransactionError {
- state = TxnState.INACTIVE;//clear state from previous txn
- if ((currentTxnIndex + 1) >= txnToWriteIds.size()) {
- throw new InvalidTransactionState("No more transactions available in" +
- " next batch for connection: " + conn + " user: " + username);
- }
- currentTxnIndex++;
- state = TxnState.OPEN;
- lastTxnUsed = getCurrentTxnId();
- lockRequest = createLockRequest(conn, partNameForLock, username, getCurrentTxnId(), agentInfo);
- try {
- LockResponse res = conn.getMSC().lock(lockRequest);
- if (res.getState() != LockState.ACQUIRED) {
- throw new TransactionError("Unable to acquire lock on " + conn);
- }
- } catch (TException e) {
- throw new TransactionError("Unable to acquire lock on " + conn, e);
- }
- }
-
- long getCurrentTxnId() {
- if (currentTxnIndex >= 0) {
- return txnToWriteIds.get(currentTxnIndex).getTxnId();
- }
- return -1L;
- }
-
- long getCurrentWriteId() {
- if (currentTxnIndex >= 0) {
- return txnToWriteIds.get(currentTxnIndex).getWriteId();
- }
- return -1L;
- }
-
- TxnState getCurrentTransactionState() {
- return state;
- }
-
- int remainingTransactions() {
- if (currentTxnIndex >= 0) {
- return txnToWriteIds.size() - currentTxnIndex - 1;
- }
- return txnToWriteIds.size();
- }
-
-
- public void write(final byte[] record) throws StreamingException {
- checkIsClosed();
- boolean success = false;
- try {
- recordWriter.write(getCurrentWriteId(), record);
- success = true;
- } catch (SerializationError ex) {
- //this exception indicates that a {@code record} could not be parsed and the
- //caller can decide whether to drop it or send it to dead letter queue.
- //rolling back the txn and retrying won't help since the tuple will be exactly the same
- //when it's replayed.
- success = true;
- throw ex;
- } finally {
- markDead(success);
- }
- }
-
- public void write(final InputStream inputStream) throws StreamingException {
- checkIsClosed();
- boolean success = false;
- try {
- recordWriter.write(getCurrentWriteId(), inputStream);
- success = true;
- } catch (SerializationError ex) {
- //this exception indicates that a {@code record} could not be parsed and the
- //caller can decide whether to drop it or send it to dead letter queue.
- //rolling back the txn and retrying won'table help since the tuple will be exactly the same
- //when it's replayed.
- success = true;
- throw ex;
- } finally {
- markDead(success);
- }
- }
-
- private void checkIsClosed() throws StreamingException {
- if (isTxnClosed.get()) {
- throw new StreamingException("Transaction" + toString() + " is closed()");
- }
- }
-
- /**
- * A transaction batch opens a single HDFS file and writes multiple transaction to it. If there is any issue
- * with the write, we can't continue to write to the same file any as it may be corrupted now (at the tail).
- * This ensures that a client can't ignore these failures and continue to write.
- */
- private void markDead(boolean success) throws StreamingException {
- if (success) {
- return;
- }
- close();
- }
-
-
- void commit() throws StreamingException {
- checkIsClosed();
- boolean success = false;
- try {
- commitImpl();
- success = true;
- } finally {
- markDead(success);
- }
- }
-
- private void commitImpl() throws StreamingException {
- try {
- recordWriter.flush();
- TxnToWriteId txnToWriteId = txnToWriteIds.get(currentTxnIndex);
- if (conn.isDynamicPartitioning()) {
- List<String> partNames = new ArrayList<>(recordWriter.getPartitions());
- conn.getMSC().addDynamicPartitions(txnToWriteId.getTxnId(), txnToWriteId.getWriteId(), conn.database, conn.table,
- partNames, DataOperationType.INSERT);
- }
- transactionLock.lock();
- try {
- conn.getMSC().commitTxn(txnToWriteId.getTxnId());
- // increment the min txn id so that heartbeat thread will heartbeat only from the next open transaction.
- // the current transaction is going to committed or fail, so don't need heartbeat for current transaction.
- if (currentTxnIndex + 1 < txnToWriteIds.size()) {
- minTxnId.set(txnToWriteIds.get(currentTxnIndex + 1).getTxnId());
- } else {
- // exhausted the batch, no longer have to heartbeat for current txn batch
- minTxnId.set(-1);
- }
- } finally {
- transactionLock.unlock();
- }
- state = TxnState.COMMITTED;
- txnStatus[currentTxnIndex] = TxnState.COMMITTED;
- } catch (NoSuchTxnException e) {
- throw new TransactionError("Invalid transaction id : "
- + getCurrentTxnId(), e);
- } catch (TxnAbortedException e) {
- throw new TransactionError("Aborted transaction cannot be committed"
- , e);
- } catch (TException e) {
- throw new TransactionError("Unable to commitTransaction transaction"
- + getCurrentTxnId(), e);
- }
- }
-
- void abort() throws StreamingException {
- if (isTxnClosed.get()) {
- /*
- * isDead is only set internally by this class. {@link #markDead(boolean)} will abort all
- * remaining txns, so make this no-op to make sure that a well-behaved client that calls abortTransaction()
- * error doesn't get misleading errors
- */
- return;
- }
- abort(false);
- }
-
- private void abort(final boolean abortAllRemaining) throws StreamingException {
- abortImpl(abortAllRemaining);
- }
-
- private void abortImpl(boolean abortAllRemaining) throws StreamingException {
- if (minTxnId == null) {
- return;
- }
-
- transactionLock.lock();
- try {
- if (abortAllRemaining) {
- // we are aborting all txns in the current batch, so no need to heartbeat
- minTxnId.set(-1);
- //when last txn finished (abortTransaction/commitTransaction) the currentTxnIndex is pointing at that txn
- //so we need to start from next one, if any. Also if batch was created but
- //fetchTransactionBatch() was never called, we want to start with first txn
- int minOpenTxnIndex = Math.max(currentTxnIndex +
- (state == TxnState.ABORTED || state == TxnState.COMMITTED ? 1 : 0), 0);
- for (currentTxnIndex = minOpenTxnIndex;
- currentTxnIndex < txnToWriteIds.size(); currentTxnIndex++) {
- conn.getMSC().rollbackTxn(txnToWriteIds.get(currentTxnIndex).getTxnId());
- txnStatus[currentTxnIndex] = TxnState.ABORTED;
- }
- currentTxnIndex--;//since the loop left it == txnToWriteIds.size()
- } else {
- // we are aborting only the current transaction, so move the min range for heartbeat or disable heartbeat
- // if the current txn is last in the batch.
- if (currentTxnIndex + 1 < txnToWriteIds.size()) {
- minTxnId.set(txnToWriteIds.get(currentTxnIndex + 1).getTxnId());
- } else {
- // exhausted the batch, no longer have to heartbeat
- minTxnId.set(-1);
- }
- long currTxnId = getCurrentTxnId();
- if (currTxnId > 0) {
- conn.getMSC().rollbackTxn(currTxnId);
- txnStatus[currentTxnIndex] = TxnState.ABORTED;
- }
- }
- state = TxnState.ABORTED;
- } catch (NoSuchTxnException e) {
- throw new TransactionError("Unable to abort invalid transaction id : "
- + getCurrentTxnId(), e);
- } catch (TException e) {
- throw new TransactionError("Unable to abort transaction id : "
- + getCurrentTxnId(), e);
- } finally {
- transactionLock.unlock();
- }
- }
-
- public boolean isClosed() {
- return isTxnClosed.get();
- }
-
- /**
- * Close the TransactionBatch. This will abort any still open txns in this batch.
- *
- * @throws StreamingException - failure when closing transaction batch
- */
- public void close() throws StreamingException {
- if (isTxnClosed.get()) {
- return;
- }
- isTxnClosed.set(true); //also ensures that heartbeat() is no-op since client is likely doing it async
- try {
- abort(true);//abort all remaining txns
- } catch (Exception ex) {
- LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex);
- throw new StreamingException("Unable to abort", ex);
- }
- try {
- closeImpl();
- } catch (Exception ex) {
- LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex);
- throw new StreamingException("Unable to close", ex);
- }
- }
-
- private void closeImpl() throws StreamingException {
- state = TxnState.INACTIVE;
- recordWriter.close();
- if (scheduledExecutorService != null) {
- scheduledExecutorService.shutdownNow();
- }
- }
-
- static LockRequest createLockRequest(final HiveStreamingConnection connection,
- String partNameForLock, String user, long txnId, String agentInfo) {
- LockRequestBuilder requestBuilder = new LockRequestBuilder(agentInfo);
- requestBuilder.setUser(user);
- requestBuilder.setTransactionId(txnId);
-
- LockComponentBuilder lockCompBuilder = new LockComponentBuilder()
- .setDbName(connection.database)
- .setTableName(connection.table)
- .setShared()
- .setOperationType(DataOperationType.INSERT);
- if (connection.isDynamicPartitioning()) {
- lockCompBuilder.setIsDynamicPartitionWrite(true);
- }
- if (partNameForLock != null && !partNameForLock.isEmpty()) {
- lockCompBuilder.setPartitionName(partNameForLock);
- }
- requestBuilder.addLockComponent(lockCompBuilder.build());
-
- return requestBuilder.build();
- }
- }
-
private HiveConf createHiveConf(Class<?> clazz, String metaStoreUri) {
HiveConf conf = new HiveConf(clazz);
if (metaStoreUri != null) {
@@ -1049,6 +730,13 @@ public class HiveStreamingConnection implements StreamingConnection {
conf.setBoolean(var, true);
}
+ public List<TxnToWriteId> getTxnToWriteIds() {
+ if (currentTransactionBatch != null) {
+ return currentTransactionBatch.getTxnToWriteIds();
+ }
+ return null;
+ }
+
@Override
public HiveConf getHiveConf() {
return conf;
@@ -1083,4 +771,41 @@ public class HiveStreamingConnection implements StreamingConnection {
public boolean isDynamicPartitioning() {
return isPartitionedTable() && (staticPartitionValues == null || staticPartitionValues.isEmpty());
}
+
+ @Override
+ public Set<String> getPartitions() {
+ return partitions;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
+ public String getDatabase() {
+ return database;
+ }
+
+ public RecordWriter getRecordWriter() {
+ return recordWriter;
+ }
+
+ public int getTransactionBatchSize() {
+ return transactionBatchSize;
+ }
+
+ public HiveConf getConf() {
+ return conf;
+ }
+
+ public Long getWriteId() {
+ return writeId;
+ }
+
+ public Integer getStatementId() {
+ return statementId;
+ }
+
+ public Long getCurrentWriteId() {
+ return currentTransactionBatch.getCurrentWriteId();
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/bdbd3bcf/streaming/src/java/org/apache/hive/streaming/InvalidTransactionState.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/InvalidTransactionState.java b/streaming/src/java/org/apache/hive/streaming/InvalidTransactionState.java
index 9d92dfa..d076ce7 100644
--- a/streaming/src/java/org/apache/hive/streaming/InvalidTransactionState.java
+++ b/streaming/src/java/org/apache/hive/streaming/InvalidTransactionState.java
@@ -18,8 +18,11 @@
package org.apache.hive.streaming;
+/**
+ * Invalid transaction.
+ */
public class InvalidTransactionState extends TransactionError {
- InvalidTransactionState(String msg) {
+ public InvalidTransactionState(String msg) {
super(msg);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/bdbd3bcf/streaming/src/java/org/apache/hive/streaming/PartitionInfo.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/PartitionInfo.java b/streaming/src/java/org/apache/hive/streaming/PartitionInfo.java
index ce9f76a..d16a7a5 100644
--- a/streaming/src/java/org/apache/hive/streaming/PartitionInfo.java
+++ b/streaming/src/java/org/apache/hive/streaming/PartitionInfo.java
@@ -26,6 +26,7 @@ public class PartitionInfo {
private String partitionLocation;
private boolean exists;
+
public PartitionInfo(final String name, final String partitionLocation, final boolean exists) {
this.name = name;
this.partitionLocation = partitionLocation;
http://git-wip-us.apache.org/repos/asf/hive/blob/bdbd3bcf/streaming/src/java/org/apache/hive/streaming/RecordWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/RecordWriter.java b/streaming/src/java/org/apache/hive/streaming/RecordWriter.java
index d9c4455..5b02754 100644
--- a/streaming/src/java/org/apache/hive/streaming/RecordWriter.java
+++ b/streaming/src/java/org/apache/hive/streaming/RecordWriter.java
@@ -19,8 +19,12 @@
package org.apache.hive.streaming;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.metadata.Table;
+
import java.io.InputStream;
+import java.util.List;
import java.util.Set;
public interface RecordWriter {
@@ -36,6 +40,20 @@ public interface RecordWriter {
void init(StreamingConnection connection, long minWriteId, long maxWriteID) throws StreamingException;
/**
+ * Initialize record writer.
+ *
+ * @param connection - streaming connection
+ * @param minWriteId - min write id
+ * @param maxWriteID - max write id
+ * @param statementId - statemenId. Note this number can't be bigger than 2^12
+ * @throws StreamingException - thrown when initialization failed
+ */
+ default void init(StreamingConnection connection, long minWriteId,
+ long maxWriteID, int statementId) throws StreamingException {
+ init(connection, minWriteId, maxWriteID);
+ }
+
+ /**
* Writes using a hive RecordUpdater.
*
* @param writeId - the write ID of the table mapping to Txn in which the write occurs
@@ -69,9 +87,27 @@ public interface RecordWriter {
void close() throws StreamingException;
/**
- * Get the set of partitions that were added by the record writer.
+ * Get the set of partitions that were added were used but may have been
+ * added or not to the metastore.
*
* @return - set of partitions
*/
Set<String> getPartitions();
+
+ /**
+ * Returns the location of the delta directory.
+ * @param partitionValues partition values
+ * @param bucketId bucket id
+ * @param minWriteId min write Id
+ * @param maxWriteId max write Id
+ * @param statementId statement Id
+ * @param table table
+ * @return the location of the file
+ * @throws StreamingException when the path is not found
+ */
+ default Path getDeltaFileLocation(List<String> partitionValues,
+ Integer bucketId, Long minWriteId, Long maxWriteId, Integer statementId,
+ Table table) throws StreamingException {
+ throw new UnsupportedOperationException();
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/bdbd3bcf/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java
index fbe00db..92016e5 100644
--- a/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java
+++ b/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java
@@ -19,9 +19,14 @@
package org.apache.hive.streaming;
import java.io.InputStream;
+import java.util.List;
+import java.util.Set;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import javax.annotation.Nullable;
+
public interface StreamingConnection extends ConnectionInfo, PartitionHandler {
/**
* Returns hive configuration object used during connection creation.
@@ -61,6 +66,18 @@ public interface StreamingConnection extends ConnectionInfo, PartitionHandler {
void commitTransaction() throws StreamingException;
/**
+ * Commit a transaction to make the writes visible for readers. Include
+ * other partitions that may have been added independently.
+ *
+ * @param partitions - extra partitions to commit.
+ * @throws StreamingException - if there are errors when committing the open transaction.
+ */
+ default void commitTransactionWithPartition(@Nullable Set<String> partitions)
+ throws StreamingException {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
* Manually abort the opened transaction.
*
* @throws StreamingException - if there are errors when aborting the transaction
@@ -78,4 +95,30 @@ public interface StreamingConnection extends ConnectionInfo, PartitionHandler {
* @return - connection stats
*/
ConnectionStats getConnectionStats();
+
+ /**
+ * Get the partitions used during the streaming. This partitions haven't
+ * been committed to the metastore.
+ * @return partitions.
+ */
+ default Set<String> getPartitions() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Returns the file that would be used by the writer to write the rows.
+ * given the parameters
+ * @param partitionValues partition values
+ * @param bucketId bucket id
+ * @param minWriteId min write Id
+ * @param maxWriteId max write Id
+ * @param statementId statement Id
+ * @return the location of the file.
+ * @throws StreamingException when the path is not found
+ */
+ default Path getDeltaFileLocation(List<String> partitionValues,
+ Integer bucketId, Long minWriteId, Long maxWriteId, Integer statementId)
+ throws StreamingException {
+ throw new UnsupportedOperationException();
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/bdbd3bcf/streaming/src/java/org/apache/hive/streaming/StreamingTransaction.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/StreamingTransaction.java b/streaming/src/java/org/apache/hive/streaming/StreamingTransaction.java
new file mode 100644
index 0000000..83b2f15
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/StreamingTransaction.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Common interface for transaction in HiveStreamingConnection.
+ */
+public interface StreamingTransaction {
+ /**
+ * get ready for the next transaction.
+ * @throws StreamingException
+ */
+ void beginNextTransaction() throws StreamingException;
+
+ /**
+ * commit transaction.
+ * @throws StreamingException
+ */
+ void commit() throws StreamingException;
+
+ /**
+ * Commit transaction and sent to the metastore the created partitions
+ * in the process.
+ * @param partitions to commit.
+ * @throws StreamingException
+ */
+ void commitWithPartitions(Set<String> partitions) throws StreamingException;
+
+ /**
+ * Abort a transaction.
+ * @throws StreamingException
+ */
+ void abort() throws StreamingException;
+
+ /**
+ * Write data withing a transaction. This expectects beginNextTransaction
+ * to have been called before this and commit to be called after.
+ * @param record bytes to write.
+ * @throws StreamingException
+ */
+ void write(byte[] record) throws StreamingException;
+
+ /**
+ * Write data within a transaction.
+ * @param stream stream to write.
+ * @throws StreamingException
+ */
+ void write(InputStream stream) throws StreamingException;
+
+ /**
+ * Free/close resources used by the streaming transaction.
+ * @throws StreamingException
+ */
+ void close() throws StreamingException;
+
+ /**
+ * @return true if closed.
+ */
+ boolean isClosed();
+
+ /**
+ * @return the state of the current transaction.
+ */
+ HiveStreamingConnection.TxnState getCurrentTransactionState();
+
+ /**
+ * @return remaining number of transactions
+ */
+ int remainingTransactions();
+
+ /**
+ * @return the current transaction id being used
+ */
+ long getCurrentTxnId();
+
+ /**
+ * @return the current write id being used
+ */
+ long getCurrentWriteId();
+
+ /**
+ * Get the partitions that were used in this transaction. They may have
+ * been created
+ * @return list of partitions
+ */
+ Set<String> getPartitions();
+
+ /**
+ * @return get the paris for transaction ids <--> write ids
+ */
+ List<TxnToWriteId> getTxnToWriteIds();
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/bdbd3bcf/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java b/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java
new file mode 100644
index 0000000..dabbe21
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.metastore.LockComponentBuilder;
+import org.apache.hadoop.hive.metastore.LockRequestBuilder;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.DataOperationType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
+import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+import org.apache.hadoop.hive.ql.lockmgr.LockException;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Streaming transaction to use most of the times. Will query the
+ * metastore to get the transaction ids and the writer ids and then
+ * will commit them.
+ */
+public class TransactionBatch extends AbstractStreamingTransaction {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ TransactionBatch.class.getName());
+ private static final int DEFAULT_HEARTBEAT_INTERVAL = 60 * 1000;
+ protected Set<String> createdPartitions = null;
+ private String username;
+ private HiveStreamingConnection conn;
+ private ScheduledExecutorService scheduledExecutorService;
+ private String partNameForLock = null;
+ private LockRequest lockRequest = null;
+ // heartbeats can only be sent for open transactions.
+ // there is a race between committing/aborting a transaction and heartbeat.
+ // Example: If a heartbeat is sent for committed txn, exception will be thrown.
+ // Similarly if we don't send a heartbeat, metastore server might abort a txn
+ // for missed heartbeat right before commit txn call.
+ // This lock is used to mutex commit/abort and heartbeat calls
+ private final ReentrantLock transactionLock = new ReentrantLock();
+ // min txn id is incremented linearly within a transaction batch.
+ // keeping minTxnId atomic as it is shared with heartbeat thread
+ private final AtomicLong minTxnId;
+ // max txn id does not change for a transaction batch
+ private final long maxTxnId;
+
+ private String agentInfo;
+ private int numTxns;
+ /**
+ * Tracks the state of each transaction.
+ */
+ private HiveStreamingConnection.TxnState[] txnStatus;
+ /**
+ * ID of the last txn used by {@link #beginNextTransactionImpl()}.
+ */
+ private long lastTxnUsed;
+
+ /**
+ * Represents a batch of transactions acquired from MetaStore.
+ *
+ * @param conn - hive streaming connection
+ * @throws StreamingException if failed to create new RecordUpdater for batch
+ */
+ public TransactionBatch(HiveStreamingConnection conn) throws StreamingException {
+ boolean success = false;
+ try {
+ if (conn.isPartitionedTable() && !conn.isDynamicPartitioning()) {
+ List<FieldSchema> partKeys = conn.getTable().getPartitionKeys();
+ partNameForLock = Warehouse.makePartName(partKeys, conn.getStaticPartitionValues());
+ }
+ this.conn = conn;
+ this.username = conn.getUsername();
+ this.recordWriter = conn.getRecordWriter();
+ this.agentInfo = conn.getAgentInfo();
+ this.numTxns = conn.getTransactionBatchSize();
+
+ setupHeartBeatThread();
+
+ List<Long> txnIds = openTxnImpl(username, numTxns);
+ txnToWriteIds = allocateWriteIdsImpl(txnIds);
+ assert (txnToWriteIds.size() == numTxns);
+
+ txnStatus = new HiveStreamingConnection.TxnState[numTxns];
+ for (int i = 0; i < txnStatus.length; i++) {
+ assert (txnToWriteIds.get(i).getTxnId() == txnIds.get(i));
+ txnStatus[i] = HiveStreamingConnection.TxnState.OPEN; //Open matches Metastore state
+ }
+ this.state = HiveStreamingConnection.TxnState.INACTIVE;
+
+ // initialize record writer with connection and write id info
+ recordWriter.init(conn, txnToWriteIds.get(0).getWriteId(),
+ txnToWriteIds.get(numTxns - 1).getWriteId(), conn.getStatementId());
+ this.minTxnId = new AtomicLong(txnIds.get(0));
+ this.maxTxnId = txnIds.get(txnIds.size() - 1);
+ success = true;
+ } catch (TException e) {
+ throw new StreamingException(conn.toString(), e);
+ } finally {
+ //clean up if above throws
+ markDead(success);
+ }
+ }
+
+ private static class HeartbeatRunnable implements Runnable {
+ private final HiveStreamingConnection conn;
+ private final AtomicLong minTxnId;
+ private final long maxTxnId;
+ private final ReentrantLock transactionLock;
+ private final AtomicBoolean isTxnClosed;
+
+ HeartbeatRunnable(final HiveStreamingConnection conn, final AtomicLong minTxnId, final long maxTxnId,
+ final ReentrantLock transactionLock, final AtomicBoolean isTxnClosed) {
+ this.conn = conn;
+ this.minTxnId = minTxnId;
+ this.maxTxnId = maxTxnId;
+ this.transactionLock = transactionLock;
+ this.isTxnClosed = isTxnClosed;
+ }
+
+ @Override
+ public void run() {
+ transactionLock.lock();
+ try {
+ if (minTxnId.get() > 0) {
+ HeartbeatTxnRangeResponse resp = conn.getHeatbeatMSC().heartbeatTxnRange(minTxnId.get(), maxTxnId);
+ if (!resp.getAborted().isEmpty() || !resp.getNosuch().isEmpty()) {
+ LOG.error("Heartbeat failure: {}", resp.toString());
+ isTxnClosed.set(true);
+ } else {
+ LOG.info("Heartbeat sent for range: [{}-{}]", minTxnId.get(), maxTxnId);
+ }
+ }
+ } catch (TException e) {
+ LOG.warn("Failure to heartbeat for transaction range: [" + minTxnId.get() + "-" + maxTxnId + "]", e);
+ } finally {
+ transactionLock.unlock();
+ }
+ }
+ }
+
+ private void setupHeartBeatThread() {
+ // start heartbeat thread
+ ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("HiveStreamingConnection-Heartbeat-Thread")
+ .build();
+ this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
+ long heartBeatInterval;
+ long initialDelay;
+ try {
+ // if HIVE_TXN_TIMEOUT is defined, heartbeat interval will be HIVE_TXN_TIMEOUT/2
+ heartBeatInterval = DbTxnManager.getHeartbeatInterval(conn.getConf());
+ } catch (LockException e) {
+ heartBeatInterval = DEFAULT_HEARTBEAT_INTERVAL;
+ }
+ // to introduce some randomness and to avoid hammering the metastore at the same time (same logic as DbTxnManager)
+ initialDelay = (long) (heartBeatInterval * 0.75 * Math.random());
+ LOG.info("Starting heartbeat thread with interval: {} ms initialDelay: {} ms for agentInfo: {}",
+ heartBeatInterval, initialDelay, conn.getAgentInfo());
+ Runnable runnable = new HeartbeatRunnable(conn, minTxnId, maxTxnId, transactionLock, isTxnClosed);
+ this.scheduledExecutorService.scheduleWithFixedDelay(runnable, initialDelay, heartBeatInterval, TimeUnit
+ .MILLISECONDS);
+ }
+
+ private List<Long> openTxnImpl(final String user, final int numTxns) throws TException {
+ return conn.getMSC().openTxns(user, numTxns).getTxn_ids();
+ }
+
+ private List<TxnToWriteId> allocateWriteIdsImpl(final List<Long> txnIds) throws TException {
+ return conn.getMSC().allocateTableWriteIdsBatch(txnIds, conn.getDatabase(),
+ conn.getTable().getTableName());
+ }
+
+ @Override
+ public String toString() {
+ if (txnToWriteIds == null || txnToWriteIds.isEmpty()) {
+ return "{}";
+ }
+ StringBuilder sb = new StringBuilder(" TxnStatus[");
+ for (HiveStreamingConnection.TxnState state : txnStatus) {
+ //'state' should not be null - future proofing
+ sb.append(state == null ? "N" : state);
+ }
+ sb.append("] LastUsed ").append(JavaUtils.txnIdToString(lastTxnUsed));
+ return "TxnId/WriteIds=[" + txnToWriteIds.get(0).getTxnId()
+ + "/" + txnToWriteIds.get(0).getWriteId()
+ + "..."
+ + txnToWriteIds.get(txnToWriteIds.size() - 1).getTxnId()
+ + "/" + txnToWriteIds.get(txnToWriteIds.size() - 1).getWriteId()
+ + "] on connection = " + conn + "; " + sb;
+ }
+
+ public void beginNextTransaction() throws StreamingException {
+ checkIsClosed();
+ beginNextTransactionImpl();
+ }
+
+ private void beginNextTransactionImpl() throws StreamingException {
+ beginNextTransactionImpl("No more transactions available in" +
+ " next batch for connection: " + conn + " user: " + username);
+ lastTxnUsed = getCurrentTxnId();
+ lockRequest = createLockRequest(conn, partNameForLock, username, getCurrentTxnId(), agentInfo);
+ createdPartitions = Sets.newHashSet();
+ try {
+ LockResponse res = conn.getMSC().lock(lockRequest);
+ if (res.getState() != LockState.ACQUIRED) {
+ throw new TransactionError("Unable to acquire lock on " + conn);
+ }
+ } catch (TException e) {
+ throw new TransactionError("Unable to acquire lock on " + conn, e);
+ }
+ }
+
+ public void commitWithPartitions(Set<String> partitions) throws StreamingException {
+ checkIsClosed();
+ boolean success = false;
+ try {
+ commitImpl(partitions);
+ success = true;
+ } finally {
+ markDead(success);
+ }
+ }
+
+ private void commitImpl(Set<String> partitions) throws StreamingException {
+ try {
+ recordWriter.flush();
+ TxnToWriteId txnToWriteId = txnToWriteIds.get(currentTxnIndex);
+ if (conn.isDynamicPartitioning()) {
+ List<String> partNames = new ArrayList<>(recordWriter.getPartitions());
+ createdPartitions.addAll(partNames);
+ if (partitions != null) {
+ partNames.addAll(partitions);
+ }
+ if (!partNames.isEmpty()) {
+ conn.getMSC().addDynamicPartitions(txnToWriteId.getTxnId(),
+ txnToWriteId.getWriteId(), conn.getDatabase(),
+ conn.getTable().getTableName(), partNames,
+ DataOperationType.INSERT);
+ }
+ }
+ transactionLock.lock();
+ try {
+ conn.getMSC().commitTxn(txnToWriteId.getTxnId());
+ // increment the min txn id so that heartbeat thread will heartbeat only from the next open transaction.
+ // the current transaction is going to committed or fail, so don't need heartbeat for current transaction.
+ if (currentTxnIndex + 1 < txnToWriteIds.size()) {
+ minTxnId.set(txnToWriteIds.get(currentTxnIndex + 1).getTxnId());
+ } else {
+ // exhausted the batch, no longer have to heartbeat for current txn batch
+ minTxnId.set(-1);
+ }
+ } finally {
+ transactionLock.unlock();
+ }
+ state = HiveStreamingConnection.TxnState.COMMITTED;
+ txnStatus[currentTxnIndex] = HiveStreamingConnection.TxnState.COMMITTED;
+ } catch (NoSuchTxnException e) {
+ throw new TransactionError("Invalid transaction id : "
+ + getCurrentTxnId(), e);
+ } catch (TxnAbortedException e) {
+ throw new TransactionError("Aborted transaction "
+ + "cannot be committed", e);
+ } catch (TException e) {
+ throw new TransactionError("Unable to commitTransaction transaction"
+ + getCurrentTxnId(), e);
+ }
+ }
+
+ public void abort() throws StreamingException {
+ if (isTxnClosed.get()) {
+ /*
+ * isDead is only set internally by this class. {@link #markDead(boolean)} will abort all
+ * remaining txns, so make this no-op to make sure that a well-behaved client that calls abortTransaction()
+ * error doesn't get misleading errors
+ */
+ return;
+ }
+ abort(false);
+ }
+
+ private void abort(final boolean abortAllRemaining) throws StreamingException {
+ abortImpl(abortAllRemaining);
+ }
+
+ private void abortImpl(boolean abortAllRemaining) throws StreamingException {
+ if (minTxnId == null) {
+ return;
+ }
+
+ transactionLock.lock();
+ try {
+ if (abortAllRemaining) {
+ // we are aborting all txns in the current batch, so no need to heartbeat
+ minTxnId.set(-1);
+ //when last txn finished (abortTransaction/commitTransaction) the currentTxnIndex is pointing at that txn
+ //so we need to start from next one, if any. Also if batch was created but
+ //fetchTransactionBatch() was never called, we want to start with first txn
+ int minOpenTxnIndex = Math.max(currentTxnIndex +
+ (state == HiveStreamingConnection.TxnState.ABORTED
+ || state == HiveStreamingConnection.TxnState.COMMITTED
+ ? 1 : 0), 0);
+ for (currentTxnIndex = minOpenTxnIndex;
+ currentTxnIndex < txnToWriteIds.size(); currentTxnIndex++) {
+ conn.getMSC().rollbackTxn(txnToWriteIds.get(currentTxnIndex).getTxnId());
+ txnStatus[currentTxnIndex] = HiveStreamingConnection.TxnState.ABORTED;
+ }
+ currentTxnIndex--; //since the loop left it == txnToWriteIds.size()
+ } else {
+ // we are aborting only the current transaction, so move the min range for heartbeat or disable heartbeat
+ // if the current txn is last in the batch.
+ if (currentTxnIndex + 1 < txnToWriteIds.size()) {
+ minTxnId.set(txnToWriteIds.get(currentTxnIndex + 1).getTxnId());
+ } else {
+ // exhausted the batch, no longer have to heartbeat
+ minTxnId.set(-1);
+ }
+ long currTxnId = getCurrentTxnId();
+ if (currTxnId > 0) {
+ conn.getMSC().rollbackTxn(currTxnId);
+ txnStatus[currentTxnIndex] = HiveStreamingConnection.TxnState.ABORTED;
+ }
+ }
+ state = HiveStreamingConnection.TxnState.ABORTED;
+ } catch (NoSuchTxnException e) {
+ throw new TransactionError("Unable to abort invalid transaction id : "
+ + getCurrentTxnId(), e);
+ } catch (TException e) {
+ throw new TransactionError("Unable to abort transaction id : "
+ + getCurrentTxnId(), e);
+ } finally {
+ transactionLock.unlock();
+ }
+ }
+
+ /**
+ * Close the TransactionBatch. This will abort any still open txns in this batch.
+ *
+ * @throws StreamingException - failure when closing transaction batch
+ */
+ public void close() throws StreamingException {
+ if (isClosed()) {
+ return;
+ }
+ isTxnClosed.set(true); //also ensures that heartbeat() is no-op since client is likely doing it async
+ try {
+ abort(true); //abort all remaining txns
+ } catch (Exception ex) {
+ LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex);
+ throw new StreamingException("Unable to abort", ex);
+ }
+ try {
+ closeImpl();
+ } catch (Exception ex) {
+ LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex);
+ throw new StreamingException("Unable to close", ex);
+ }
+ }
+
+ private void closeImpl() throws StreamingException {
+ state = HiveStreamingConnection.TxnState.INACTIVE;
+ recordWriter.close();
+ if (scheduledExecutorService != null) {
+ scheduledExecutorService.shutdownNow();
+ }
+ }
+
+ private static LockRequest createLockRequest(final HiveStreamingConnection connection,
+ String partNameForLock, String user, long txnId, String agentInfo) {
+ LockRequestBuilder requestBuilder = new LockRequestBuilder(agentInfo);
+ requestBuilder.setUser(user);
+ requestBuilder.setTransactionId(txnId);
+
+ LockComponentBuilder lockCompBuilder = new LockComponentBuilder()
+ .setDbName(connection.getDatabase())
+ .setTableName(connection.getTable().getTableName())
+ .setShared()
+ .setOperationType(DataOperationType.INSERT);
+ if (connection.isDynamicPartitioning()) {
+ lockCompBuilder.setIsDynamicPartitionWrite(true);
+ }
+ if (partNameForLock != null && !partNameForLock.isEmpty()) {
+ lockCompBuilder.setPartitionName(partNameForLock);
+ }
+ requestBuilder.addLockComponent(lockCompBuilder.build());
+
+ return requestBuilder.build();
+ }
+
+ /**
+ * @return the list of created partitions.
+ */
+ @Override
+ public Set<String> getPartitions() {
+ return createdPartitions;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/bdbd3bcf/streaming/src/java/org/apache/hive/streaming/TransactionError.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/TransactionError.java b/streaming/src/java/org/apache/hive/streaming/TransactionError.java
index ae56e7c..7033012 100644
--- a/streaming/src/java/org/apache/hive/streaming/TransactionError.java
+++ b/streaming/src/java/org/apache/hive/streaming/TransactionError.java
@@ -18,12 +18,15 @@
package org.apache.hive.streaming;
+/**
+ * Transaction error.
+ */
public class TransactionError extends StreamingException {
- TransactionError(String msg, Exception e) {
+ public TransactionError(String msg, Exception e) {
super(msg + (e == null ? "" : ": " + e.getMessage()), e);
}
- TransactionError(String msg) {
+ public TransactionError(String msg) {
super(msg);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/bdbd3bcf/streaming/src/java/org/apache/hive/streaming/UnManagedSingleTransaction.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/UnManagedSingleTransaction.java b/streaming/src/java/org/apache/hive/streaming/UnManagedSingleTransaction.java
new file mode 100644
index 0000000..68b0906
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/UnManagedSingleTransaction.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Receives a single writeId. Doesn't open connections to the metastore
+ * so the commit has to be done externally by the entity that created
+ * the writeId.
+ */
+public class UnManagedSingleTransaction extends AbstractStreamingTransaction {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ UnManagedSingleTransaction.class.getName());
+ private final String username;
+ private final HiveStreamingConnection conn;
+ private final Set<String> partitions = Sets.newHashSet();
+
+ public UnManagedSingleTransaction(HiveStreamingConnection conn)
+ throws StreamingException{
+ assert conn.getWriteId() != null;
+
+ this.conn = conn;
+ this.username = conn.getUsername();
+ this.recordWriter = conn.getRecordWriter();
+ this.state = HiveStreamingConnection.TxnState.INACTIVE;
+
+ txnToWriteIds = Lists.newArrayList(new TxnToWriteId(-1,
+ conn.getWriteId()));
+
+ boolean success = false;
+ try {
+ recordWriter.init(conn, txnToWriteIds.get(0).getWriteId(),
+ txnToWriteIds.get(0).getWriteId(), conn.getStatementId());
+ success = true;
+ } finally {
+ markDead(success);
+ }
+ }
+
+ @Override
+ public void beginNextTransaction() throws StreamingException {
+ beginNextTransactionImpl("No more transactions available in" +
+ " next batch for connection: " + conn + " user: " + username);
+ }
+
+ @Override
+ public void commitWithPartitions(Set<String> partitions) throws StreamingException {
+ checkIsClosed();
+ boolean success = false;
+ try {
+ commitImpl();
+ success = true;
+ } finally {
+ markDead(success);
+ }
+ }
+
+ private void commitImpl() throws StreamingException {
+ recordWriter.flush();
+ List<String> partNames = new ArrayList<>(recordWriter.getPartitions());
+ partitions.addAll(partNames);
+ state = HiveStreamingConnection.TxnState.PREPARED_FOR_COMMIT;
+ }
+
+ @Override
+ public void abort() {
+ if (isTxnClosed.get()) {
+ return;
+ }
+ state = HiveStreamingConnection.TxnState.ABORTED;
+ }
+
+ @Override
+ public void close() throws StreamingException {
+ if (isClosed()) {
+ return;
+ }
+ isTxnClosed.set(true);
+ abort();
+ try {
+ closeImpl();
+ } catch (Exception ex) {
+ LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex);
+ throw new StreamingException("Unable to close", ex);
+ }
+ }
+
+ private void closeImpl() throws StreamingException {
+ state = HiveStreamingConnection.TxnState.INACTIVE;
+ recordWriter.close();
+ }
+
+ @Override
+ public String toString() {
+ if (txnToWriteIds == null || txnToWriteIds.isEmpty()) {
+ return "{}";
+ }
+ return "TxnId/WriteIds=[" + txnToWriteIds.get(0).getWriteId()
+ + "] on connection = " + conn + "; " + "status=" + state;
+ }
+
+ /**
+ * @return This class doesn't have a connection to the metastore so it won't
+ * create any partition
+ */
+ @Override
+ public Set<String> getPartitions() {
+ return partitions;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/bdbd3bcf/streaming/src/java/org/apache/hive/streaming/package-info.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/package-info.java b/streaming/src/java/org/apache/hive/streaming/package-info.java
new file mode 100644
index 0000000..a9f3fae
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package grouping streaming classes.
+ */
+package org.apache.hive.streaming;
http://git-wip-us.apache.org/repos/asf/hive/blob/bdbd3bcf/streaming/src/java/org/apache/hive/streaming/package.html
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/package.html b/streaming/src/java/org/apache/hive/streaming/package.html
index 2b45792..19b39e7 100644
--- a/streaming/src/java/org/apache/hive/streaming/package.html
+++ b/streaming/src/java/org/apache/hive/streaming/package.html
@@ -43,7 +43,8 @@ performed to HDFS via Hive wrapper APIs that bypass MetaStore. </p>
<p>
<b>Note on packaging</b>: The APIs are defined in the
-<b>org.apache.hive.streaming</b> Java package and included as
+<b>org.apache.hive.streaming</b> and
+<b>org.apache.hive.streaming.transaction</b> Java packages and included as
the hive-streaming jar.</p>
<h2>STREAMING REQUIREMENTS</h2>