You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2018/06/08 17:57:28 UTC

hive git commit: HIVE-19817: Hive streaming API + dynamic partitioning + json/regex writer does not work (Prasanth Jayachandran reviewed by Ashutosh Chauhan)

Repository: hive
Updated Branches:
  refs/heads/master 13960aa99 -> a0c465d1c


HIVE-19817: Hive streaming API + dynamic partitioning + json/regex writer does not work (Prasanth Jayachandran reviewed by Ashutosh Chauhan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a0c465d1
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a0c465d1
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a0c465d1

Branch: refs/heads/master
Commit: a0c465d1c083a0fe97fa6592805ea90a3914d039
Parents: 13960aa
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Fri Jun 8 10:57:07 2018 -0700
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Fri Jun 8 10:57:07 2018 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hive/serde2/JsonSerDe.java    |  2 +-
 .../hive/streaming/HiveStreamingConnection.java |  4 +
 .../apache/hive/streaming/StrictJsonWriter.java |  5 ++
 .../hive/streaming/StrictRegexWriter.java       | 19 ++++-
 .../TestStreamingDynamicPartitioning.java       | 77 ++++++++++++++++++++
 5 files changed, 104 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/a0c465d1/serde/src/java/org/apache/hadoop/hive/serde2/JsonSerDe.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/JsonSerDe.java b/serde/src/java/org/apache/hadoop/hive/serde2/JsonSerDe.java
index 40b2e8e..1119fa2 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/JsonSerDe.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/JsonSerDe.java
@@ -178,7 +178,7 @@ public class JsonSerDe extends AbstractSerDe {
     if (token != JsonToken.FIELD_NAME) {
       throw new IOException("Field name expected");
     }
-    String fieldName = p.getText();
+    String fieldName = p.getText().toLowerCase();
     int fpos = s.getAllStructFieldNames().indexOf(fieldName);
     if (fpos == -1) {
       fpos = getPositionFromHiveInternalColumnName(fieldName);

http://git-wip-us.apache.org/repos/asf/hive/blob/a0c465d1/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
index 0f9260d..1d8fdff 100644
--- a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
+++ b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
@@ -899,6 +899,10 @@ public class HiveStreamingConnection implements StreamingConnection {
     }
 
     private void abortImpl(boolean abortAllRemaining) throws StreamingException {
+      if (minTxnId == null) {
+        return;
+      }
+
       transactionLock.lock();
       try {
         if (abortAllRemaining) {

http://git-wip-us.apache.org/repos/asf/hive/blob/a0c465d1/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java b/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java
index 0f9b652..cabb64c 100644
--- a/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java
+++ b/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java
@@ -20,11 +20,14 @@ package org.apache.hive.streaming;
 
 import java.util.Properties;
 
+import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.JsonSerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.io.Text;
 
+import com.google.common.base.Joiner;
+
 /**
  * Streaming Writer handles utf8 encoded Json (Strict syntax).
  * Uses org.apache.hadoop.hive.serde2.JsonSerDe to process Json input
@@ -64,6 +67,8 @@ public class StrictJsonWriter extends AbstractRecordWriter {
   public JsonSerDe createSerde() throws SerializationError {
     try {
       Properties tableProps = table.getMetadata();
+      tableProps.setProperty(serdeConstants.LIST_COLUMNS, Joiner.on(",").join(inputColumns));
+      tableProps.setProperty(serdeConstants.LIST_COLUMN_TYPES, Joiner.on(":").join(inputTypes));
       JsonSerDe serde = new JsonSerDe();
       SerDeUtils.initializeSerDe(serde, conf, tableProps, null);
       this.serde = serde;

http://git-wip-us.apache.org/repos/asf/hive/blob/a0c465d1/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java b/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java
index 3651fa1..12516f5 100644
--- a/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java
+++ b/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java
@@ -18,15 +18,20 @@
 
 package org.apache.hive.streaming;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Properties;
 
-import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.RegexSerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.io.Text;
 
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+
 /**
  * Streaming Writer handles text input data with regex. Uses
  * org.apache.hadoop.hive.serde2.RegexSerDe
@@ -75,7 +80,17 @@ public class StrictRegexWriter extends AbstractRecordWriter {
     try {
       Properties tableProps = table.getMetadata();
       tableProps.setProperty(RegexSerDe.INPUT_REGEX, regex);
-      tableProps.setProperty(serdeConstants.LIST_COLUMNS, StringUtils.join(inputColumns, ","));
+      tableProps.setProperty(serdeConstants.LIST_COLUMNS, Joiner.on(",").join(inputColumns));
+      tableProps.setProperty(serdeConstants.LIST_COLUMN_TYPES, Joiner.on(":").join(inputTypes));
+      final String columnComments = tableProps.getProperty("columns.comments");
+      if (columnComments != null) {
+        List<String> comments = Lists.newArrayList(Splitter.on('\0').split(columnComments));
+        int commentsSize = comments.size();
+        for (int i = 0; i < inputColumns.size() - commentsSize; i++) {
+          comments.add("");
+        }
+        tableProps.setProperty("columns.comments", Joiner.on('\0').join(comments));
+      }
       RegexSerDe serde = new RegexSerDe();
       SerDeUtils.initializeSerDe(serde, conf, tableProps, null);
       this.serde = serde;

http://git-wip-us.apache.org/repos/asf/hive/blob/a0c465d1/streaming/src/test/org/apache/hive/streaming/TestStreamingDynamicPartitioning.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/org/apache/hive/streaming/TestStreamingDynamicPartitioning.java b/streaming/src/test/org/apache/hive/streaming/TestStreamingDynamicPartitioning.java
index e513915..32a6d06 100644
--- a/streaming/src/test/org/apache/hive/streaming/TestStreamingDynamicPartitioning.java
+++ b/streaming/src/test/org/apache/hive/streaming/TestStreamingDynamicPartitioning.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
+import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -579,6 +580,82 @@ public class TestStreamingDynamicPartitioning {
   }
 
   @Test
+  public void testRegexInputStreamDP() throws Exception {
+    String regex = "([^,]*),(.*),(.*),(.*)";
+    StrictRegexWriter writer = StrictRegexWriter.newBuilder()
+      // if unspecified, default one or [\r\n] will be used for line break
+      .withRegex(regex)
+      .build();
+    StreamingConnection connection = HiveStreamingConnection.newBuilder()
+      .withDatabase(dbName)
+      .withTable(tblName)
+      .withAgentInfo("UT_" + Thread.currentThread().getName())
+      .withHiveConf(conf)
+      .withRecordWriter(writer)
+      .connect();
+
+    String rows = "1,foo,Asia,India\r2,bar,Europe,Germany\r3,baz,Asia,China\r4,cat,Australia,";
+    ByteArrayInputStream bais = new ByteArrayInputStream(rows.getBytes());
+    connection.beginTransaction();
+    connection.write(bais);
+    connection.commitTransaction();
+    bais.close();
+    connection.close();
+
+    List<String> rs = queryTable(driver, "select * from " + dbName + "." + tblName + " order by id");
+    Assert.assertEquals(4, rs.size());
+    Assert.assertEquals("1\tfoo\tAsia\tIndia", rs.get(0));
+    Assert.assertEquals("2\tbar\tEurope\tGermany", rs.get(1));
+    Assert.assertEquals("3\tbaz\tAsia\tChina", rs.get(2));
+    Assert.assertEquals("4\tcat\tAustralia\t__HIVE_DEFAULT_PARTITION__", rs.get(3));
+    rs = queryTable(driver, "show partitions " + dbName + "." + tblName);
+    Assert.assertEquals(4, rs.size());
+    Assert.assertTrue(rs.contains("continent=Asia/country=India"));
+    Assert.assertTrue(rs.contains("continent=Asia/country=China"));
+    Assert.assertTrue(rs.contains("continent=Europe/country=Germany"));
+    Assert.assertTrue(rs.contains("continent=Australia/country=__HIVE_DEFAULT_PARTITION__"));
+  }
+
+  @Test
+  public void testJsonInputStreamDP() throws Exception {
+    StrictJsonWriter writer = StrictJsonWriter.newBuilder()
+      .withLineDelimiterPattern("\\|")
+      .build();
+    HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
+      .withDatabase(dbName)
+      .withTable(tblName)
+      .withAgentInfo("UT_" + Thread.currentThread().getName())
+      .withRecordWriter(writer)
+      .withHiveConf(conf)
+      .connect();
+
+    // 1st Txn
+    connection.beginTransaction();
+    Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN, connection.getCurrentTransactionState());
+    String records = "{\"id\" : 1, \"msg\": \"Hello streaming\", \"continent\": \"Asia\", \"Country\": \"India\"}|" +
+      "{\"id\" : 2, \"msg\": \"Hello world\", \"continent\": \"Europe\", \"Country\": \"Germany\"}|" +
+      "{\"id\" : 3, \"msg\": \"Hello world!!\", \"continent\": \"Asia\", \"Country\": \"China\"}|" +
+      "{\"id\" : 4, \"msg\": \"Hmm..\", \"continent\": \"Australia\", \"Unknown-field\": \"whatever\"}|";
+    ByteArrayInputStream bais = new ByteArrayInputStream(records.getBytes());
+    connection.write(bais);
+    connection.commitTransaction();
+    bais.close();
+    connection.close();
+    List<String> rs = queryTable(driver, "select * from " + dbName + "." + tblName + " order by id");
+    Assert.assertEquals(4, rs.size());
+    Assert.assertEquals("1\tHello streaming\tAsia\tIndia", rs.get(0));
+    Assert.assertEquals("2\tHello world\tEurope\tGermany", rs.get(1));
+    Assert.assertEquals("3\tHello world!!\tAsia\tChina", rs.get(2));
+    Assert.assertEquals("4\tHmm..\tAustralia\t__HIVE_DEFAULT_PARTITION__", rs.get(3));
+    rs = queryTable(driver, "show partitions " + dbName + "." + tblName);
+    Assert.assertEquals(4, rs.size());
+    Assert.assertTrue(rs.contains("continent=Asia/country=India"));
+    Assert.assertTrue(rs.contains("continent=Asia/country=China"));
+    Assert.assertTrue(rs.contains("continent=Europe/country=Germany"));
+    Assert.assertTrue(rs.contains("continent=Australia/country=__HIVE_DEFAULT_PARTITION__"));
+  }
+
+  @Test
   public void testWriteAfterClose() throws Exception {
     StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
       .withFieldDelimiter(',')