You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2018/06/08 17:57:28 UTC
hive git commit: HIVE-19817: Hive streaming API + dynamic
partitioning + json/regex writer does not work (Prasanth Jayachandran
reviewed by Ashutosh Chauhan)
Repository: hive
Updated Branches:
refs/heads/master 13960aa99 -> a0c465d1c
HIVE-19817: Hive streaming API + dynamic partitioning + json/regex writer does not work (Prasanth Jayachandran reviewed by Ashutosh Chauhan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a0c465d1
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a0c465d1
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a0c465d1
Branch: refs/heads/master
Commit: a0c465d1c083a0fe97fa6592805ea90a3914d039
Parents: 13960aa
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Fri Jun 8 10:57:07 2018 -0700
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Fri Jun 8 10:57:07 2018 -0700
----------------------------------------------------------------------
.../apache/hadoop/hive/serde2/JsonSerDe.java | 2 +-
.../hive/streaming/HiveStreamingConnection.java | 4 +
.../apache/hive/streaming/StrictJsonWriter.java | 5 ++
.../hive/streaming/StrictRegexWriter.java | 19 ++++-
.../TestStreamingDynamicPartitioning.java | 77 ++++++++++++++++++++
5 files changed, 104 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/a0c465d1/serde/src/java/org/apache/hadoop/hive/serde2/JsonSerDe.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/JsonSerDe.java b/serde/src/java/org/apache/hadoop/hive/serde2/JsonSerDe.java
index 40b2e8e..1119fa2 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/JsonSerDe.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/JsonSerDe.java
@@ -178,7 +178,7 @@ public class JsonSerDe extends AbstractSerDe {
if (token != JsonToken.FIELD_NAME) {
throw new IOException("Field name expected");
}
- String fieldName = p.getText();
+ String fieldName = p.getText().toLowerCase();
int fpos = s.getAllStructFieldNames().indexOf(fieldName);
if (fpos == -1) {
fpos = getPositionFromHiveInternalColumnName(fieldName);
http://git-wip-us.apache.org/repos/asf/hive/blob/a0c465d1/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
index 0f9260d..1d8fdff 100644
--- a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
+++ b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
@@ -899,6 +899,10 @@ public class HiveStreamingConnection implements StreamingConnection {
}
private void abortImpl(boolean abortAllRemaining) throws StreamingException {
+ if (minTxnId == null) {
+ return;
+ }
+
transactionLock.lock();
try {
if (abortAllRemaining) {
http://git-wip-us.apache.org/repos/asf/hive/blob/a0c465d1/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java b/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java
index 0f9b652..cabb64c 100644
--- a/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java
+++ b/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java
@@ -20,11 +20,14 @@ package org.apache.hive.streaming;
import java.util.Properties;
+import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.JsonSerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.io.Text;
+import com.google.common.base.Joiner;
+
/**
* Streaming Writer handles utf8 encoded Json (Strict syntax).
* Uses org.apache.hadoop.hive.serde2.JsonSerDe to process Json input
@@ -64,6 +67,8 @@ public class StrictJsonWriter extends AbstractRecordWriter {
public JsonSerDe createSerde() throws SerializationError {
try {
Properties tableProps = table.getMetadata();
+ tableProps.setProperty(serdeConstants.LIST_COLUMNS, Joiner.on(",").join(inputColumns));
+ tableProps.setProperty(serdeConstants.LIST_COLUMN_TYPES, Joiner.on(":").join(inputTypes));
JsonSerDe serde = new JsonSerDe();
SerDeUtils.initializeSerDe(serde, conf, tableProps, null);
this.serde = serde;
http://git-wip-us.apache.org/repos/asf/hive/blob/a0c465d1/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java b/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java
index 3651fa1..12516f5 100644
--- a/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java
+++ b/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java
@@ -18,15 +18,20 @@
package org.apache.hive.streaming;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Properties;
-import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.RegexSerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.io.Text;
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+
/**
* Streaming Writer handles text input data with regex. Uses
* org.apache.hadoop.hive.serde2.RegexSerDe
@@ -75,7 +80,17 @@ public class StrictRegexWriter extends AbstractRecordWriter {
try {
Properties tableProps = table.getMetadata();
tableProps.setProperty(RegexSerDe.INPUT_REGEX, regex);
- tableProps.setProperty(serdeConstants.LIST_COLUMNS, StringUtils.join(inputColumns, ","));
+ tableProps.setProperty(serdeConstants.LIST_COLUMNS, Joiner.on(",").join(inputColumns));
+ tableProps.setProperty(serdeConstants.LIST_COLUMN_TYPES, Joiner.on(":").join(inputTypes));
+ final String columnComments = tableProps.getProperty("columns.comments");
+ if (columnComments != null) {
+ List<String> comments = Lists.newArrayList(Splitter.on('\0').split(columnComments));
+ int commentsSize = comments.size();
+ for (int i = 0; i < inputColumns.size() - commentsSize; i++) {
+ comments.add("");
+ }
+ tableProps.setProperty("columns.comments", Joiner.on('\0').join(comments));
+ }
RegexSerDe serde = new RegexSerDe();
SerDeUtils.initializeSerDe(serde, conf, tableProps, null);
this.serde = serde;
http://git-wip-us.apache.org/repos/asf/hive/blob/a0c465d1/streaming/src/test/org/apache/hive/streaming/TestStreamingDynamicPartitioning.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/org/apache/hive/streaming/TestStreamingDynamicPartitioning.java b/streaming/src/test/org/apache/hive/streaming/TestStreamingDynamicPartitioning.java
index e513915..32a6d06 100644
--- a/streaming/src/test/org/apache/hive/streaming/TestStreamingDynamicPartitioning.java
+++ b/streaming/src/test/org/apache/hive/streaming/TestStreamingDynamicPartitioning.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -579,6 +580,82 @@ public class TestStreamingDynamicPartitioning {
}
@Test
+ public void testRegexInputStreamDP() throws Exception {
+ String regex = "([^,]*),(.*),(.*),(.*)";
+ StrictRegexWriter writer = StrictRegexWriter.newBuilder()
+ // if unspecified, default one or [\r\n] will be used for line break
+ .withRegex(regex)
+ .build();
+ StreamingConnection connection = HiveStreamingConnection.newBuilder()
+ .withDatabase(dbName)
+ .withTable(tblName)
+ .withAgentInfo("UT_" + Thread.currentThread().getName())
+ .withHiveConf(conf)
+ .withRecordWriter(writer)
+ .connect();
+
+ String rows = "1,foo,Asia,India\r2,bar,Europe,Germany\r3,baz,Asia,China\r4,cat,Australia,";
+ ByteArrayInputStream bais = new ByteArrayInputStream(rows.getBytes());
+ connection.beginTransaction();
+ connection.write(bais);
+ connection.commitTransaction();
+ bais.close();
+ connection.close();
+
+ List<String> rs = queryTable(driver, "select * from " + dbName + "." + tblName + " order by id");
+ Assert.assertEquals(4, rs.size());
+ Assert.assertEquals("1\tfoo\tAsia\tIndia", rs.get(0));
+ Assert.assertEquals("2\tbar\tEurope\tGermany", rs.get(1));
+ Assert.assertEquals("3\tbaz\tAsia\tChina", rs.get(2));
+ Assert.assertEquals("4\tcat\tAustralia\t__HIVE_DEFAULT_PARTITION__", rs.get(3));
+ rs = queryTable(driver, "show partitions " + dbName + "." + tblName);
+ Assert.assertEquals(4, rs.size());
+ Assert.assertTrue(rs.contains("continent=Asia/country=India"));
+ Assert.assertTrue(rs.contains("continent=Asia/country=China"));
+ Assert.assertTrue(rs.contains("continent=Europe/country=Germany"));
+ Assert.assertTrue(rs.contains("continent=Australia/country=__HIVE_DEFAULT_PARTITION__"));
+ }
+
+ @Test
+ public void testJsonInputStreamDP() throws Exception {
+ StrictJsonWriter writer = StrictJsonWriter.newBuilder()
+ .withLineDelimiterPattern("\\|")
+ .build();
+ HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
+ .withDatabase(dbName)
+ .withTable(tblName)
+ .withAgentInfo("UT_" + Thread.currentThread().getName())
+ .withRecordWriter(writer)
+ .withHiveConf(conf)
+ .connect();
+
+ // 1st Txn
+ connection.beginTransaction();
+ Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN, connection.getCurrentTransactionState());
+ String records = "{\"id\" : 1, \"msg\": \"Hello streaming\", \"continent\": \"Asia\", \"Country\": \"India\"}|" +
+ "{\"id\" : 2, \"msg\": \"Hello world\", \"continent\": \"Europe\", \"Country\": \"Germany\"}|" +
+ "{\"id\" : 3, \"msg\": \"Hello world!!\", \"continent\": \"Asia\", \"Country\": \"China\"}|" +
+ "{\"id\" : 4, \"msg\": \"Hmm..\", \"continent\": \"Australia\", \"Unknown-field\": \"whatever\"}|";
+ ByteArrayInputStream bais = new ByteArrayInputStream(records.getBytes());
+ connection.write(bais);
+ connection.commitTransaction();
+ bais.close();
+ connection.close();
+ List<String> rs = queryTable(driver, "select * from " + dbName + "." + tblName + " order by id");
+ Assert.assertEquals(4, rs.size());
+ Assert.assertEquals("1\tHello streaming\tAsia\tIndia", rs.get(0));
+ Assert.assertEquals("2\tHello world\tEurope\tGermany", rs.get(1));
+ Assert.assertEquals("3\tHello world!!\tAsia\tChina", rs.get(2));
+ Assert.assertEquals("4\tHmm..\tAustralia\t__HIVE_DEFAULT_PARTITION__", rs.get(3));
+ rs = queryTable(driver, "show partitions " + dbName + "." + tblName);
+ Assert.assertEquals(4, rs.size());
+ Assert.assertTrue(rs.contains("continent=Asia/country=India"));
+ Assert.assertTrue(rs.contains("continent=Asia/country=China"));
+ Assert.assertTrue(rs.contains("continent=Europe/country=Germany"));
+ Assert.assertTrue(rs.contains("continent=Australia/country=__HIVE_DEFAULT_PARTITION__"));
+ }
+
+ @Test
public void testWriteAfterClose() throws Exception {
StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')