You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2018/10/22 21:19:05 UTC
hive git commit: HIVE-20701: Allow HiveStreaming to receive a key
value to commit atomically together with the transaction (Jaume M reviewed by
Prasanth Jayachandran)
Repository: hive
Updated Branches:
refs/heads/master cbe3228c2 -> 7765e90aa
HIVE-20701: Allow HiveStreaming to receive a key value to commit atomically together with the transaction (Jaume M reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7765e90a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7765e90a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7765e90a
Branch: refs/heads/master
Commit: 7765e90aad44747860b3c1adbe8a4857d864912d
Parents: cbe3228
Author: Jaume Marhuenda <ja...@gmail.com>
Authored: Mon Oct 22 14:18:20 2018 -0700
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Mon Oct 22 14:18:49 2018 -0700
----------------------------------------------------------------------
.../streaming/AbstractStreamingTransaction.java | 6 ++-
.../hive/streaming/HiveStreamingConnection.java | 13 +++++--
.../hive/streaming/StreamingConnection.java | 23 ++++++++---
.../hive/streaming/StreamingTransaction.java | 14 ++++++-
.../apache/hive/streaming/TransactionBatch.java | 26 +++++++++++--
.../streaming/UnManagedSingleTransaction.java | 3 +-
.../apache/hive/streaming/TestStreaming.java | 41 +++++++++++++++++++-
7 files changed, 109 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/7765e90a/streaming/src/java/org/apache/hive/streaming/AbstractStreamingTransaction.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/AbstractStreamingTransaction.java b/streaming/src/java/org/apache/hive/streaming/AbstractStreamingTransaction.java
index a99fdba..6ab3ffe 100644
--- a/streaming/src/java/org/apache/hive/streaming/AbstractStreamingTransaction.java
+++ b/streaming/src/java/org/apache/hive/streaming/AbstractStreamingTransaction.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
import java.io.InputStream;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -151,6 +152,9 @@ abstract class AbstractStreamingTransaction
}
public void commit() throws StreamingException {
- commitWithPartitions(null);
+ commit(null);
+ }
+ public void commit(Set<String> partitions) throws StreamingException {
+ commit(partitions, null, null);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/7765e90a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
index f79b844..74fc531 100644
--- a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
+++ b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
@@ -146,6 +146,7 @@ public class HiveStreamingConnection implements StreamingConnection {
private boolean manageTransactions;
private int countTransactions = 0;
private Set<String> partitions;
+ private Long tableId;
private HiveStreamingConnection(Builder builder) throws StreamingException {
this.database = builder.database.toLowerCase();
@@ -574,12 +575,18 @@ public class HiveStreamingConnection implements StreamingConnection {
@Override
public void commitTransaction() throws StreamingException {
- commitTransactionWithPartition(null);
+ commitTransaction(null);
}
@Override
- public void commitTransactionWithPartition(Set<String> partitions)
+ public void commitTransaction(Set<String> partitions)
throws StreamingException {
+ commitTransaction(partitions, null, null);
+ }
+
+ @Override
+ public void commitTransaction(Set<String> partitions, String key,
+ String value) throws StreamingException {
checkState();
Set<String> createdPartitions = new HashSet<>();
@@ -598,7 +605,7 @@ public class HiveStreamingConnection implements StreamingConnection {
connectionStats.incrementTotalPartitions(partitions.size());
}
- currentTransactionBatch.commitWithPartitions(createdPartitions);
+ currentTransactionBatch.commit(createdPartitions, key, value);
this.partitions.addAll(
currentTransactionBatch.getPartitions());
connectionStats.incrementCreatedPartitions(createdPartitions.size());
http://git-wip-us.apache.org/repos/asf/hive/blob/7765e90a/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java
index 92016e5..ba4c6a5 100644
--- a/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java
+++ b/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java
@@ -66,13 +66,26 @@ public interface StreamingConnection extends ConnectionInfo, PartitionHandler {
void commitTransaction() throws StreamingException;
/**
- * Commit a transaction to make the writes visible for readers. Include
- * other partitions that may have been added independently.
- *
+ * Commits the transaction together with a key value atomically.
* @param partitions - extra partitions to commit.
- * @throws StreamingException - if there are errors when committing the open transaction.
+ * @param key - key to commit.
+ * @param value - value to commit.
+ * @throws StreamingException - if there are errors when committing
+ * the open transaction.
*/
- default void commitTransactionWithPartition(@Nullable Set<String> partitions)
+ default void commitTransaction(@Nullable Set<String> partitions,
+ @Nullable String key, @Nullable String value) throws StreamingException {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Commit a transaction to make the writes visible for readers. Include
+ * other partitions that may have been added independently.
+ *
+ * @param partitions - extra partitions to commit.
+ * @throws StreamingException - if there are errors when committing the open transaction.
+ */
+ default void commitTransaction(@Nullable Set<String> partitions)
throws StreamingException {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/7765e90a/streaming/src/java/org/apache/hive/streaming/StreamingTransaction.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/StreamingTransaction.java b/streaming/src/java/org/apache/hive/streaming/StreamingTransaction.java
index 83b2f15..c0ee034 100644
--- a/streaming/src/java/org/apache/hive/streaming/StreamingTransaction.java
+++ b/streaming/src/java/org/apache/hive/streaming/StreamingTransaction.java
@@ -19,6 +19,8 @@
package org.apache.hive.streaming;
import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
+
+import javax.annotation.Nullable;
import java.io.InputStream;
import java.util.List;
import java.util.Set;
@@ -45,7 +47,17 @@ public interface StreamingTransaction {
* @param partitions to commit.
* @throws StreamingException
*/
- void commitWithPartitions(Set<String> partitions) throws StreamingException;
+ void commit(@Nullable Set<String> partitions) throws StreamingException;
+
+ /**
+ * Commits atomically together with a key and a value.
+ * @param partitions to commit.
+ * @param key to commit.
+ * @param value to commit.
+ * @throws StreamingException
+ */
+ void commit(@Nullable Set<String> partitions, @Nullable String key,
+ @Nullable String value) throws StreamingException;
/**
* Abort a transaction.
http://git-wip-us.apache.org/repos/asf/hive/blob/7765e90a/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java b/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java
index dabbe21..a625759 100644
--- a/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java
+++ b/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java
@@ -80,6 +80,11 @@ public class TransactionBatch extends AbstractStreamingTransaction {
private String agentInfo;
private int numTxns;
+
+ /**
+ * Id of the table from the streaming connection.
+ */
+ private final long tableId;
/**
* Tracks the state of each transaction.
*/
@@ -107,6 +112,7 @@ public class TransactionBatch extends AbstractStreamingTransaction {
this.recordWriter = conn.getRecordWriter();
this.agentInfo = conn.getAgentInfo();
this.numTxns = conn.getTransactionBatchSize();
+ this.tableId = conn.getTable().getTTable().getId();
setupHeartBeatThread();
@@ -244,19 +250,26 @@ public class TransactionBatch extends AbstractStreamingTransaction {
}
}
- public void commitWithPartitions(Set<String> partitions) throws StreamingException {
+ public void commit(Set<String> partitions, String key, String value)
+ throws StreamingException {
checkIsClosed();
boolean success = false;
try {
- commitImpl(partitions);
+ commitImpl(partitions, key, value);
success = true;
} finally {
markDead(success);
}
}
- private void commitImpl(Set<String> partitions) throws StreamingException {
+ private void commitImpl(Set<String> partitions, String key, String value)
+ throws StreamingException {
try {
+ if ((key == null && value != null) || (key != null && value == null)) {
+ throw new StreamingException(String.format(
+ "If key is set, the value should be as well and vice versa,"
+ + " key, value = %s, %s", key, value));
+ }
recordWriter.flush();
TxnToWriteId txnToWriteId = txnToWriteIds.get(currentTxnIndex);
if (conn.isDynamicPartitioning()) {
@@ -274,7 +287,12 @@ public class TransactionBatch extends AbstractStreamingTransaction {
}
transactionLock.lock();
try {
- conn.getMSC().commitTxn(txnToWriteId.getTxnId());
+ if (key != null) {
+ conn.getMSC().commitTxnWithKeyValue(txnToWriteId.getTxnId(),
+ tableId, key, value);
+ } else {
+ conn.getMSC().commitTxn(txnToWriteId.getTxnId());
+ }
// increment the min txn id so that heartbeat thread will heartbeat only from the next open transaction.
// the current transaction is going to committed or fail, so don't need heartbeat for current transaction.
if (currentTxnIndex + 1 < txnToWriteIds.size()) {
http://git-wip-us.apache.org/repos/asf/hive/blob/7765e90a/streaming/src/java/org/apache/hive/streaming/UnManagedSingleTransaction.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/UnManagedSingleTransaction.java b/streaming/src/java/org/apache/hive/streaming/UnManagedSingleTransaction.java
index 68b0906..75779d5 100644
--- a/streaming/src/java/org/apache/hive/streaming/UnManagedSingleTransaction.java
+++ b/streaming/src/java/org/apache/hive/streaming/UnManagedSingleTransaction.java
@@ -69,7 +69,8 @@ public class UnManagedSingleTransaction extends AbstractStreamingTransaction {
}
@Override
- public void commitWithPartitions(Set<String> partitions) throws StreamingException {
+ public void commit(Set<String> partitions, String key, String value)
+ throws StreamingException {
checkIsClosed();
boolean success = false;
try {
http://git-wip-us.apache.org/repos/asf/hive/blob/7765e90a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
index 1c9e43f..50433b6 100644
--- a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
+++ b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
@@ -439,6 +439,43 @@ public class TestStreaming {
}
@Test
+ public void testCommitWithKeyValue() throws Exception {
+ queryTable(driver, "drop table if exists default.keyvalue");
+ queryTable(driver, "create table default.keyvalue (a string, b string) stored as orc " +
+ "TBLPROPERTIES('transactional'='true')");
+ queryTable(driver, "insert into default.keyvalue values('foo','bar')");
+ queryTable(driver, "ALTER TABLE default.keyvalue SET TBLPROPERTIES('_metamykey' = 'myvalue')");
+ List<String> rs = queryTable(driver, "select * from default.keyvalue");
+ Assert.assertEquals(1, rs.size());
+ Assert.assertEquals("foo\tbar", rs.get(0));
+ StrictDelimitedInputWriter wr = StrictDelimitedInputWriter.newBuilder()
+ .withFieldDelimiter(',')
+ .build();
+ HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
+ .withDatabase("Default")
+ .withTable("keyvalue")
+ .withAgentInfo("UT_" + Thread.currentThread().getName())
+ .withTransactionBatchSize(2)
+ .withRecordWriter(wr)
+ .withHiveConf(conf)
+ .connect();
+ connection.beginTransaction();
+ connection.write("a1,b2".getBytes());
+ connection.write("a3,b4".getBytes());
+ connection.commitTransaction(null, "_metamykey", "myvalue");
+ connection.close();
+
+ rs = queryTable(driver, "select ROW__ID, a, b, INPUT__FILE__NAME from default.keyvalue order by ROW__ID");
+ Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\ta1\tb2"));
+ Assert.assertTrue(rs.get(1), rs.get(1).endsWith("keyvalue/delta_0000002_0000003/bucket_00000"));
+ Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
+ Assert.assertTrue(rs.get(2), rs.get(2).endsWith("keyvalue/delta_0000002_0000003/bucket_00000"));
+
+ rs = queryTable(driver, "SHOW TBLPROPERTIES default.keyvalue('_metamykey')");
+ Assert.assertEquals(rs.get(0), "_metamykey\tmyvalue", rs.get(0));
+ }
+
+ @Test
public void testConnectionWithWriteId() throws Exception {
queryTable(driver, "drop table if exists default.writeidconnection");
queryTable(driver, "create table default.writeidconnection (a string, b string) stored as orc " +
@@ -1139,7 +1176,7 @@ public class TestStreaming {
Assert.fail("Partition shouldn't exist so a NoSuchObjectException should have been raised");
} catch (NoSuchObjectException e) {}
- transactionConnection.commitTransactionWithPartition(partitions);
+ transactionConnection.commitTransaction(partitions);
// Ensure partition is present
Partition p = msClient.getPartition(dbName, tblName, newPartVals);
@@ -1217,7 +1254,7 @@ public class TestStreaming {
partitionsOne.addAll(partitionsTwo);
Set<String> allPartitions = partitionsOne;
- transactionConnection.commitTransactionWithPartition(allPartitions);
+ transactionConnection.commitTransaction(allPartitions);
// Ensure partition is present
for (String partition : allPartitions) {