You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/10/26 21:12:07 UTC

[57/75] [abbrv] hive git commit: HIVE-20701: Allow HiveStreaming to receive a key value to commit atomically together with the transaction (Jaume M reviewed by Prasanth Jayachandran)

HIVE-20701: Allow HiveStreaming to receive a key value to commit atomically together with the transaction (Jaume M reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7765e90a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7765e90a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7765e90a

Branch: refs/heads/master-tez092
Commit: 7765e90aad44747860b3c1adbe8a4857d864912d
Parents: cbe3228
Author: Jaume Marhuenda <ja...@gmail.com>
Authored: Mon Oct 22 14:18:20 2018 -0700
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Mon Oct 22 14:18:49 2018 -0700

----------------------------------------------------------------------
 .../streaming/AbstractStreamingTransaction.java |  6 ++-
 .../hive/streaming/HiveStreamingConnection.java | 13 +++++--
 .../hive/streaming/StreamingConnection.java     | 23 ++++++++---
 .../hive/streaming/StreamingTransaction.java    | 14 ++++++-
 .../apache/hive/streaming/TransactionBatch.java | 26 +++++++++++--
 .../streaming/UnManagedSingleTransaction.java   |  3 +-
 .../apache/hive/streaming/TestStreaming.java    | 41 +++++++++++++++++++-
 7 files changed, 109 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/7765e90a/streaming/src/java/org/apache/hive/streaming/AbstractStreamingTransaction.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/AbstractStreamingTransaction.java b/streaming/src/java/org/apache/hive/streaming/AbstractStreamingTransaction.java
index a99fdba..6ab3ffe 100644
--- a/streaming/src/java/org/apache/hive/streaming/AbstractStreamingTransaction.java
+++ b/streaming/src/java/org/apache/hive/streaming/AbstractStreamingTransaction.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
 
 import java.io.InputStream;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -151,6 +152,9 @@ abstract class AbstractStreamingTransaction
   }
 
   public void commit() throws StreamingException {
-    commitWithPartitions(null);
+    commit(null);
+  }
+  public void commit(Set<String> partitions) throws StreamingException {
+    commit(partitions, null, null);
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/7765e90a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
index f79b844..74fc531 100644
--- a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
+++ b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
@@ -146,6 +146,7 @@ public class HiveStreamingConnection implements StreamingConnection {
   private boolean manageTransactions;
   private int countTransactions = 0;
   private Set<String> partitions;
+  private Long tableId;
 
   private HiveStreamingConnection(Builder builder) throws StreamingException {
     this.database = builder.database.toLowerCase();
@@ -574,12 +575,18 @@ public class HiveStreamingConnection implements StreamingConnection {
 
   @Override
   public void commitTransaction() throws StreamingException {
-    commitTransactionWithPartition(null);
+    commitTransaction(null);
   }
 
   @Override
-  public void commitTransactionWithPartition(Set<String> partitions)
+  public void commitTransaction(Set<String> partitions)
       throws StreamingException {
+    commitTransaction(partitions, null, null);
+  }
+
+  @Override
+  public void commitTransaction(Set<String> partitions, String key,
+      String value) throws StreamingException {
     checkState();
 
     Set<String> createdPartitions = new HashSet<>();
@@ -598,7 +605,7 @@ public class HiveStreamingConnection implements StreamingConnection {
       connectionStats.incrementTotalPartitions(partitions.size());
     }
 
-    currentTransactionBatch.commitWithPartitions(createdPartitions);
+    currentTransactionBatch.commit(createdPartitions, key, value);
     this.partitions.addAll(
         currentTransactionBatch.getPartitions());
     connectionStats.incrementCreatedPartitions(createdPartitions.size());

http://git-wip-us.apache.org/repos/asf/hive/blob/7765e90a/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java
index 92016e5..ba4c6a5 100644
--- a/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java
+++ b/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java
@@ -66,13 +66,26 @@ public interface StreamingConnection extends ConnectionInfo, PartitionHandler {
   void commitTransaction() throws StreamingException;
 
   /**
-   * Commit a transaction to make the writes visible for readers. Include
-   * other partitions that may have been added independently.
-   *
+   * Commits the transaction together with a key value atomically.
    * @param partitions - extra partitions to commit.
-   * @throws StreamingException - if there are errors when committing the open transaction.
+   * @param key - key to commit.
+   * @param value - value to commit.
+   * @throws StreamingException - if there are errors when committing
+   * the open transaction.
    */
-  default void commitTransactionWithPartition(@Nullable Set<String> partitions)
+  default void commitTransaction(@Nullable Set<String> partitions,
+      @Nullable String key, @Nullable String value) throws StreamingException {
+    throw new UnsupportedOperationException();
+  }
+
+    /**
+     * Commit a transaction to make the writes visible for readers. Include
+     * other partitions that may have been added independently.
+     *
+     * @param partitions - extra partitions to commit.
+     * @throws StreamingException - if there are errors when committing the open transaction.
+     */
+  default void commitTransaction(@Nullable Set<String> partitions)
       throws StreamingException {
     throw new UnsupportedOperationException();
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/7765e90a/streaming/src/java/org/apache/hive/streaming/StreamingTransaction.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/StreamingTransaction.java b/streaming/src/java/org/apache/hive/streaming/StreamingTransaction.java
index 83b2f15..c0ee034 100644
--- a/streaming/src/java/org/apache/hive/streaming/StreamingTransaction.java
+++ b/streaming/src/java/org/apache/hive/streaming/StreamingTransaction.java
@@ -19,6 +19,8 @@
 package org.apache.hive.streaming;
 
 import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
+
+import javax.annotation.Nullable;
 import java.io.InputStream;
 import java.util.List;
 import java.util.Set;
@@ -45,7 +47,17 @@ public interface StreamingTransaction {
    * @param partitions to commit.
    * @throws StreamingException
    */
-  void commitWithPartitions(Set<String> partitions) throws StreamingException;
+  void commit(@Nullable Set<String> partitions) throws StreamingException;
+
+  /**
+   * Commits atomically together with a key and a value.
+   * @param partitions to commit.
+   * @param key to commit.
+   * @param value to commit.
+   * @throws StreamingException
+   */
+  void commit(@Nullable Set<String> partitions, @Nullable String key,
+      @Nullable String value) throws StreamingException;
 
   /**
    * Abort a transaction.

http://git-wip-us.apache.org/repos/asf/hive/blob/7765e90a/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java b/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java
index dabbe21..a625759 100644
--- a/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java
+++ b/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java
@@ -80,6 +80,11 @@ public class TransactionBatch extends AbstractStreamingTransaction {
 
   private String agentInfo;
   private int numTxns;
+
+  /**
+   * Id of the table from the streaming connection.
+   */
+  private final long tableId;
   /**
    * Tracks the state of each transaction.
    */
@@ -107,6 +112,7 @@ public class TransactionBatch extends AbstractStreamingTransaction {
       this.recordWriter = conn.getRecordWriter();
       this.agentInfo = conn.getAgentInfo();
       this.numTxns = conn.getTransactionBatchSize();
+      this.tableId = conn.getTable().getTTable().getId();
 
       setupHeartBeatThread();
 
@@ -244,19 +250,26 @@ public class TransactionBatch extends AbstractStreamingTransaction {
     }
   }
 
-  public void commitWithPartitions(Set<String> partitions) throws StreamingException {
+  public void commit(Set<String> partitions, String key, String value)
+      throws StreamingException {
     checkIsClosed();
     boolean success = false;
     try {
-      commitImpl(partitions);
+      commitImpl(partitions, key, value);
       success = true;
     } finally {
       markDead(success);
     }
   }
 
-  private void commitImpl(Set<String> partitions) throws StreamingException {
+  private void commitImpl(Set<String> partitions, String key, String value)
+      throws StreamingException {
     try {
+      if ((key == null && value != null) || (key != null && value == null)) {
+        throw new StreamingException(String.format(
+            "If key is set, the value should be as well and vice versa,"
+                + " key, value = %s, %s", key, value));
+      }
       recordWriter.flush();
       TxnToWriteId txnToWriteId = txnToWriteIds.get(currentTxnIndex);
       if (conn.isDynamicPartitioning()) {
@@ -274,7 +287,12 @@ public class TransactionBatch extends AbstractStreamingTransaction {
       }
       transactionLock.lock();
       try {
-        conn.getMSC().commitTxn(txnToWriteId.getTxnId());
+        if (key != null) {
+          conn.getMSC().commitTxnWithKeyValue(txnToWriteId.getTxnId(),
+              tableId, key, value);
+        } else {
+          conn.getMSC().commitTxn(txnToWriteId.getTxnId());
+        }
         // increment the min txn id so that heartbeat thread will heartbeat only from the next open transaction.
         // the current transaction is going to committed or fail, so don't need heartbeat for current transaction.
         if (currentTxnIndex + 1 < txnToWriteIds.size()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/7765e90a/streaming/src/java/org/apache/hive/streaming/UnManagedSingleTransaction.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/UnManagedSingleTransaction.java b/streaming/src/java/org/apache/hive/streaming/UnManagedSingleTransaction.java
index 68b0906..75779d5 100644
--- a/streaming/src/java/org/apache/hive/streaming/UnManagedSingleTransaction.java
+++ b/streaming/src/java/org/apache/hive/streaming/UnManagedSingleTransaction.java
@@ -69,7 +69,8 @@ public class UnManagedSingleTransaction extends AbstractStreamingTransaction {
   }
 
   @Override
-  public void commitWithPartitions(Set<String> partitions) throws StreamingException {
+  public void commit(Set<String> partitions, String key, String value)
+      throws StreamingException {
     checkIsClosed();
     boolean success = false;
     try {

http://git-wip-us.apache.org/repos/asf/hive/blob/7765e90a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
index 1c9e43f..50433b6 100644
--- a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
+++ b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
@@ -439,6 +439,43 @@ public class TestStreaming {
   }
 
   @Test
+  public void testCommitWithKeyValue() throws Exception {
+    queryTable(driver, "drop table if exists default.keyvalue");
+    queryTable(driver, "create table default.keyvalue (a string, b string) stored as orc " +
+        "TBLPROPERTIES('transactional'='true')");
+    queryTable(driver, "insert into default.keyvalue values('foo','bar')");
+    queryTable(driver, "ALTER TABLE default.keyvalue SET TBLPROPERTIES('_metamykey' = 'myvalue')");
+    List<String> rs = queryTable(driver, "select * from default.keyvalue");
+    Assert.assertEquals(1, rs.size());
+    Assert.assertEquals("foo\tbar", rs.get(0));
+    StrictDelimitedInputWriter wr = StrictDelimitedInputWriter.newBuilder()
+        .withFieldDelimiter(',')
+        .build();
+    HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
+        .withDatabase("Default")
+        .withTable("keyvalue")
+        .withAgentInfo("UT_" + Thread.currentThread().getName())
+        .withTransactionBatchSize(2)
+        .withRecordWriter(wr)
+        .withHiveConf(conf)
+        .connect();
+    connection.beginTransaction();
+    connection.write("a1,b2".getBytes());
+    connection.write("a3,b4".getBytes());
+    connection.commitTransaction(null,  "_metamykey", "myvalue");
+    connection.close();
+
+    rs = queryTable(driver, "select ROW__ID, a, b, INPUT__FILE__NAME from default.keyvalue order by ROW__ID");
+    Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\ta1\tb2"));
+    Assert.assertTrue(rs.get(1), rs.get(1).endsWith("keyvalue/delta_0000002_0000003/bucket_00000"));
+    Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
+    Assert.assertTrue(rs.get(2), rs.get(2).endsWith("keyvalue/delta_0000002_0000003/bucket_00000"));
+
+    rs = queryTable(driver, "SHOW TBLPROPERTIES default.keyvalue('_metamykey')");
+    Assert.assertEquals(rs.get(0), "_metamykey\tmyvalue", rs.get(0));
+  }
+
+  @Test
   public void testConnectionWithWriteId() throws Exception {
     queryTable(driver, "drop table if exists default.writeidconnection");
     queryTable(driver, "create table default.writeidconnection (a string, b string) stored as orc " +
@@ -1139,7 +1176,7 @@ public class TestStreaming {
       Assert.fail("Partition shouldn't exist so a NoSuchObjectException should have been raised");
     } catch (NoSuchObjectException e) {}
 
-    transactionConnection.commitTransactionWithPartition(partitions);
+    transactionConnection.commitTransaction(partitions);
 
     // Ensure partition is present
     Partition p = msClient.getPartition(dbName, tblName, newPartVals);
@@ -1217,7 +1254,7 @@ public class TestStreaming {
 
     partitionsOne.addAll(partitionsTwo);
     Set<String> allPartitions = partitionsOne;
-    transactionConnection.commitTransactionWithPartition(allPartitions);
+    transactionConnection.commitTransaction(allPartitions);
 
     // Ensure partition is present
     for (String partition : allPartitions) {