You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2018/08/24 09:09:16 UTC

nifi git commit: NIFI-5517 - Add support for remaining Hive types to PutHive3Streaming

Repository: nifi
Updated Branches:
  refs/heads/master aac2c6a60 -> cfc858c90


NIFI-5517 - Add support for remaining Hive types to PutHive3Streaming

Signed-off-by: Pierre Villard <pi...@gmail.com>

This closes #2950.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/cfc858c9
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/cfc858c9
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/cfc858c9

Branch: refs/heads/master
Commit: cfc858c901b82eb6fba60a844cb370e1b59aaa77
Parents: aac2c6a
Author: Matthew Burgess <ma...@apache.org>
Authored: Tue Aug 14 14:06:46 2018 -0400
Committer: Pierre Villard <pi...@gmail.com>
Committed: Fri Aug 24 11:09:05 2018 +0200

----------------------------------------------------------------------
 .../apache/hive/streaming/NiFiRecordSerDe.java  | 21 ++++++++++++----
 .../processors/hive/TestPutHive3Streaming.java  | 26 ++++++++++++++++++--
 .../src/test/resources/datatype_test.avsc       |  7 ++++++
 3 files changed, 47 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/cfc858c9/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java
index 0f15096..932772e 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java
@@ -17,6 +17,7 @@
 package org.apache.hive.streaming;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.AbstractSerDe;
@@ -45,9 +46,11 @@ import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.util.DataTypeUtils;
 
 import java.io.IOException;
+import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -219,23 +222,31 @@ public class NiFiRecordSerDe extends AbstractSerDe {
                         val = AvroTypeUtil.convertByteArray(array).array();
                         break;
                     case DATE:
-                        val = record.getAsDate(fieldName, field.getDataType().getFormat());
+                        Date d = record.getAsDate(fieldName, field.getDataType().getFormat());
+                        org.apache.hadoop.hive.common.type.Date hiveDate = new org.apache.hadoop.hive.common.type.Date();
+                        hiveDate.setTimeInMillis(d.getTime());
+                        val = hiveDate;
                         break;
+                    // ORC doesn't currently handle TIMESTAMPLOCALTZ
                     case TIMESTAMP:
-                        val = DataTypeUtils.toTimestamp(record.getValue(fieldName), () -> DataTypeUtils.getDateFormat(field.getDataType().getFormat()), fieldName);
+                        Timestamp ts = DataTypeUtils.toTimestamp(record.getValue(fieldName), () -> DataTypeUtils.getDateFormat(field.getDataType().getFormat()), fieldName);
+                        // Convert to Hive's Timestamp type
+                        org.apache.hadoop.hive.common.type.Timestamp hivetimestamp = new org.apache.hadoop.hive.common.type.Timestamp();
+                        hivetimestamp.setTimeInMillis(ts.getTime(), ts.getNanos());
+                        val = hivetimestamp;
                         break;
                     case DECIMAL:
-                        val = record.getAsDouble(fieldName);
+                        val = HiveDecimal.create(record.getAsDouble(fieldName));
                         break;
                     default:
-                        throw new IllegalArgumentException("Field " + fieldName + " cannot be converted to unknown type: " + primitiveCategory.name());
+                        throw new IllegalArgumentException("Field " + fieldName + " cannot be converted to type: " + primitiveCategory.name());
                 }
                 break;
             case LIST:
                 val = Arrays.asList(record.getAsArray(fieldName));
                 break;
             case MAP:
-                val = DataTypeUtils.convertRecordFieldtoObject(record.getValue(fieldName), field.getDataType());
+                val = record.getValue(fieldName);
                 break;
             case STRUCT:
                 // The Hive StandardStructObjectInspector expects the object corresponding to a "struct" to be an array or List rather than a Map.

http://git-wip-us.apache.org/repos/asf/nifi/blob/cfc858c9/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java
index da463e6..5fd759f 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java
@@ -77,7 +77,10 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
+import java.sql.Timestamp;
+import java.time.Instant;
 import java.util.Arrays;
+import java.util.Calendar;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -660,6 +663,8 @@ public class TestPutHive3Streaming {
         processor.setFields(Arrays.asList(
                 new FieldSchema("uuid", serdeConstants.STRING_TYPE_NAME, "uuid"),
                 new FieldSchema("stringc", serdeConstants.STRING_TYPE_NAME, "stringc"),
+                new FieldSchema("charc", serdeConstants.CHAR_TYPE_NAME + "(1)", "charc"),
+                new FieldSchema("varcharc", serdeConstants.VARCHAR_TYPE_NAME + "(100)", "varcharc"),
                 new FieldSchema("intc", serdeConstants.INT_TYPE_NAME, "intc"),
                 new FieldSchema("tinyintc", serdeConstants.TINYINT_TYPE_NAME, "tinyintc"),
                 new FieldSchema("smallintc", serdeConstants.SMALLINT_TYPE_NAME, "smallintc"),
@@ -667,11 +672,16 @@ public class TestPutHive3Streaming {
                 new FieldSchema("booleanc", serdeConstants.BOOLEAN_TYPE_NAME, "booleanc"),
                 new FieldSchema("floatc", serdeConstants.FLOAT_TYPE_NAME, "floatc"),
                 new FieldSchema("doublec", serdeConstants.DOUBLE_TYPE_NAME, "doublec"),
+                new FieldSchema("bytesc", serdeConstants.BINARY_TYPE_NAME, "bytesc"),
                 new FieldSchema("listc", serdeConstants.LIST_TYPE_NAME + "<" + serdeConstants.STRING_TYPE_NAME + ">", "listc"),
                 new FieldSchema("structc", serdeConstants.STRUCT_TYPE_NAME
                         + "<sint:" + serdeConstants.INT_TYPE_NAME + ","
                         + "sboolean:" + serdeConstants.BOOLEAN_TYPE_NAME + ","
                         + "sstring:" + serdeConstants.STRING_TYPE_NAME + ">", "structc"),
+                new FieldSchema("mapc", serdeConstants.MAP_TYPE_NAME + "<" + serdeConstants.STRING_TYPE_NAME + "," + serdeConstants.INT_TYPE_NAME + ">", "mapc"),
+                new FieldSchema("datec", serdeConstants.DATE_TYPE_NAME, "datec"),
+                new FieldSchema("timestampc", serdeConstants.TIMESTAMP_TYPE_NAME, "timestampc"),
+                new FieldSchema("decimalc", serdeConstants.DECIMAL_TYPE_NAME + "(4,2)", "decimalc"),
                 new FieldSchema("enumc", serdeConstants.STRING_TYPE_NAME, "enumc")));
 
         runner = TestRunners.newTestRunner(processor);
@@ -686,7 +696,7 @@ public class TestPutHive3Streaming {
         Random r = new Random();
         for (int index = 0; index < 10; index++) {
             final int i = index;
-            Record mapRecord = new MapRecord(AvroTypeUtil.createSchema(schema.getField("structc").schema().getTypes().get(1)), // Get non-null type in union
+            Record structRecord = new MapRecord(AvroTypeUtil.createSchema(schema.getField("structc").schema().getTypes().get(1)), // Get non-null type in union
                     new HashMap<String, Object>() {
                 {
                     put("sint", i + 2); // {"name": "sint", "type": "int"},
@@ -701,6 +711,8 @@ public class TestPutHive3Streaming {
             readerFactory.addRecord(
                     UUID.randomUUID(), // {"name": "uuid", "type": "string"},
                     "hello", // {"name": "stringc", "type": "string"},
+                    'a',
+                    "world",
                     i, // {"name": "intc", "type": "int"},
                     i + 1, // {"name": "tinyintc", "type": ["null", "int"]},
                     i * 10, // {"name": "smallintc", "type": "int"},
@@ -708,8 +720,18 @@ public class TestPutHive3Streaming {
                     i % 2 == 0, // {"name": "booleanc", "type": "boolean"},
                     i * 100.0f, // {"name": "floatc", "type": "floatc"},
                     i * 100.0, // {"name": "doublec", "type": "double"},
+                    "Hello".getBytes(),
                     new String[]{"a", "b"}, // {"name": "listc", "type": ["null", {"type": "array", "items": "string"}]},
-                    mapRecord,
+                    structRecord,
+                    new HashMap<String, Integer>() {  //"name": "mapType", "type": "map", "values": "string"}
+                        {
+                            put("sint1", i + 2); // {"name": "sint", "type": "int"},
+                            put("sint2", i); // {"name": "x", "type": "int"},
+                        }
+                    },
+                    new java.sql.Date(Calendar.getInstance().getTimeInMillis()),
+                    Timestamp.from(Instant.now()),
+                    i*99.0 / 100,
                     enumc.get(r.nextInt(4)) // {"name": "enumc", "type": {"type": "enum", "name": "Suit", "symbols": ["SPADES","HEARTS","DIAMONDS","CLUBS"]}}
             );
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/cfc858c9/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/resources/datatype_test.avsc
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/resources/datatype_test.avsc b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/resources/datatype_test.avsc
index a232ad0..cde608d 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/resources/datatype_test.avsc
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/resources/datatype_test.avsc
@@ -21,6 +21,8 @@
  "fields": [
   {"name": "uuid", "type": "string"},
   {"name": "stringc", "type": "string"},
+  {"name": "charc", "type": "string"},
+  {"name": "varcharc", "type": "string"},
   {"name": "intc", "type": "int"},
   {"name": "tinyintc", "type": ["null", "int"]},
   {"name": "smallintc", "type": "int"},
@@ -28,12 +30,17 @@
   {"name": "booleanc", "type": "boolean"},
   {"name": "floatc", "type": "float"},
   {"name": "doublec", "type": "double"},
+  {"name": "bytesc", "type": "bytes"},
   {"name": "listc", "type": ["null", {"type": "array", "items": "string"}]},
   {"name": "structc", "type": ["null", {"name": "structcRecord", "type": "record", "fields": [
 	{"name": "sint", "type": "int"},
 	{"name": "sboolean", "type": ["null","boolean"]},
     {"name": "sstring", "type": "string"}
    ]}]},
+  {"name": "mapc", "type": ["null", {"name": "mapType", "type": "map", "values": "string"}]},
+  {"name": "datec","type": {"type": "int","logicalType": "date"}},
+  {"name": "timestampc","type": {"type": "long","logicalType": "timestamp-millis"}},
+  {"name": "decimalc", "type": {"type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 2}},
   {"name": "enumc", "type": {"type": "enum", "name": "Suit", "symbols": ["SPADES","HEARTS","DIAMONDS","CLUBS"]}}
  ]
 }
\ No newline at end of file