You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2018/12/13 15:23:45 UTC

nifi git commit: NIFI-5891 fix handling of null logical types in Hive3Streaming processor

Repository: nifi
Updated Branches:
  refs/heads/master 9a1ab4c50 -> c51512f5e


NIFI-5891 fix handling of null logical types in Hive3Streaming processor

NIFI-5891: Fixed Checkstyle issues
Signed-off-by: Matthew Burgess <ma...@apache.org>

This closes #3216


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c51512f5
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c51512f5
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c51512f5

Branch: refs/heads/master
Commit: c51512f5e33cbd413b1fda8700408aa95614680e
Parents: 9a1ab4c
Author: gkkorir <gk...@Safaricom.co.ke>
Authored: Thu Dec 13 17:25:37 2018 +0300
Committer: Matthew Burgess <ma...@apache.org>
Committed: Thu Dec 13 10:23:18 2018 -0500

----------------------------------------------------------------------
 .../apache/hive/streaming/NiFiRecordSerDe.java  |  28 ++--
 .../processors/hive/TestPutHive3Streaming.java  | 140 +++++++++++++++++++
 2 files changed, 159 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/c51512f5/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java
index 932772e..e628474 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java
@@ -223,27 +223,37 @@ public class NiFiRecordSerDe extends AbstractSerDe {
                         break;
                     case DATE:
                         Date d = record.getAsDate(fieldName, field.getDataType().getFormat());
-                        org.apache.hadoop.hive.common.type.Date hiveDate = new org.apache.hadoop.hive.common.type.Date();
-                        hiveDate.setTimeInMillis(d.getTime());
-                        val = hiveDate;
+                        if(d != null) {
+                            org.apache.hadoop.hive.common.type.Date hiveDate = new org.apache.hadoop.hive.common.type.Date();
+                            hiveDate.setTimeInMillis(d.getTime());
+                            val = hiveDate;
+                        } else {
+                            val = null;
+                        }
                         break;
                     // ORC doesn't currently handle TIMESTAMPLOCALTZ
                     case TIMESTAMP:
                         Timestamp ts = DataTypeUtils.toTimestamp(record.getValue(fieldName), () -> DataTypeUtils.getDateFormat(field.getDataType().getFormat()), fieldName);
-                        // Convert to Hive's Timestamp type
-                        org.apache.hadoop.hive.common.type.Timestamp hivetimestamp = new org.apache.hadoop.hive.common.type.Timestamp();
-                        hivetimestamp.setTimeInMillis(ts.getTime(), ts.getNanos());
-                        val = hivetimestamp;
+                        if(ts != null) {
+                            // Convert to Hive's Timestamp type
+                            org.apache.hadoop.hive.common.type.Timestamp hivetimestamp = new org.apache.hadoop.hive.common.type.Timestamp();
+                            hivetimestamp.setTimeInMillis(ts.getTime(), ts.getNanos());
+                            val = hivetimestamp;
+                        } else {
+                            val = null;
+                        }
                         break;
                     case DECIMAL:
-                        val = HiveDecimal.create(record.getAsDouble(fieldName));
+                        Double value = record.getAsDouble(fieldName);
+                        val = value == null ? null : HiveDecimal.create(value);
                         break;
                     default:
                         throw new IllegalArgumentException("Field " + fieldName + " cannot be converted to type: " + primitiveCategory.name());
                 }
                 break;
             case LIST:
-                val = Arrays.asList(record.getAsArray(fieldName));
+                Object[] value = record.getAsArray(fieldName);
+                val = value == null ? null : Arrays.asList(value);
                 break;
             case MAP:
                 val = record.getValue(fieldName);

http://git-wip-us.apache.org/repos/asf/nifi/blob/c51512f5/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java
index 5fd759f..ee05416 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java
@@ -56,6 +56,7 @@ import org.apache.nifi.hadoop.SecurityUtil;
 import org.apache.nifi.kerberos.KerberosCredentialsService;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.MockRecordParser;
@@ -752,6 +753,145 @@ public class TestPutHive3Streaming {
         assertEquals("default.users", flowFile.getAttribute(ATTR_OUTPUT_TABLES));
     }
 
+    //logical types
+
+    @Test
+    public void testNullDateHandling() throws IOException, MalformedRecordException, InitializationException {
+        String schemaText = "{ \"name\":\"test\", \"type\":\"record\", \"fields\":[ { \"name\":\"dob\", \"type\": [ \"null\", { \"type\":\"int\", \"logicalType\":\"date\"  }  ] } ] }";
+        schema = new Schema.Parser().parse(schemaText);
+        processor.setFields(Arrays.asList(
+                new FieldSchema("dob", serdeConstants.DATE_TYPE_NAME, "null dob")
+        ));
+        //setup runner
+        runner = TestRunners.newTestRunner(processor);
+        runner.setProperty(PutHive3Streaming.HIVE_CONFIGURATION_RESOURCES, TEST_CONF_PATH);
+        MockRecordParser readerFactory = new MockRecordParser();
+        final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema);
+        for (final RecordField recordField : recordSchema.getFields()) {
+            readerFactory.addSchemaField(recordField.getFieldName(), recordField.getDataType().getFieldType(), recordField.isNullable());
+        }
+
+        readerFactory.addRecord(new Object[] { null });
+
+        runner.addControllerService("mock-reader-factory", readerFactory);
+        runner.enableControllerService(readerFactory);
+        runner.setProperty(PutHive3Streaming.RECORD_READER, "mock-reader-factory");
+
+        runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083");
+        runner.setProperty(PutHive3Streaming.DB_NAME, "default");
+        runner.setProperty(PutHive3Streaming.TABLE_NAME, "dobs");
+        runner.enqueue(new byte[0]);
+        runner.run();
+
+        runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 1);
+        final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutHive3Streaming.REL_SUCCESS).get(0);
+        assertEquals("1", flowFile.getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR));
+        assertEquals("default.dobs", flowFile.getAttribute(ATTR_OUTPUT_TABLES));
+    }
+
+    @Test
+    public void testNullTimestampHandling() throws IOException, MalformedRecordException, InitializationException {
+        String schemaText = "{ \"name\":\"test\", \"type\":\"record\", \"fields\":[ { \"name\":\"dob\", \"type\": [ \"null\", { \"type\":\"long\", \"logicalType\":\"timestamp-millis\"  }  ] } ] }";
+        schema = new Schema.Parser().parse(schemaText);
+        processor.setFields(Arrays.asList(
+                new FieldSchema("dob", serdeConstants.TIMESTAMP_TYPE_NAME, "null dob")
+        ));
+        //setup runner
+        runner = TestRunners.newTestRunner(processor);
+        runner.setProperty(PutHive3Streaming.HIVE_CONFIGURATION_RESOURCES, TEST_CONF_PATH);
+        MockRecordParser readerFactory = new MockRecordParser();
+        final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema);
+        for (final RecordField recordField : recordSchema.getFields()) {
+            readerFactory.addSchemaField(recordField.getFieldName(), recordField.getDataType().getFieldType(), recordField.isNullable());
+        }
+
+        readerFactory.addRecord(new Object[] { null });
+
+        runner.addControllerService("mock-reader-factory", readerFactory);
+        runner.enableControllerService(readerFactory);
+        runner.setProperty(PutHive3Streaming.RECORD_READER, "mock-reader-factory");
+
+        runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083");
+        runner.setProperty(PutHive3Streaming.DB_NAME, "default");
+        runner.setProperty(PutHive3Streaming.TABLE_NAME, "ts");
+        runner.enqueue(new byte[0]);
+        runner.run();
+
+        runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 1);
+        final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutHive3Streaming.REL_SUCCESS).get(0);
+        assertEquals("1", flowFile.getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR));
+        assertEquals("default.ts", flowFile.getAttribute(ATTR_OUTPUT_TABLES));
+    }
+
+    @Test
+    public void testNullDecimalHandling() throws IOException, MalformedRecordException, InitializationException {
+        String schemaText = "{ \"name\":\"test\", \"type\":\"record\", \"fields\":[ { \"name\":\"amount\", \"type\": [ \"null\", { \"type\":\"bytes\", "
+            + "\"logicalType\":\"decimal\", \"precision\":18, \"scale\":2  }  ] } ] }";
+        schema = new Schema.Parser().parse(schemaText);
+        processor.setFields(Arrays.asList(
+                new FieldSchema("amount", serdeConstants.DECIMAL_TYPE_NAME, "null amount")
+        ));
+        //setup runner
+        runner = TestRunners.newTestRunner(processor);
+        runner.setProperty(PutHive3Streaming.HIVE_CONFIGURATION_RESOURCES, TEST_CONF_PATH);
+        MockRecordParser readerFactory = new MockRecordParser();
+        final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema);
+        for (final RecordField recordField : recordSchema.getFields()) {
+            readerFactory.addSchemaField(recordField.getFieldName(), recordField.getDataType().getFieldType(), recordField.isNullable());
+        }
+
+        readerFactory.addRecord(new Object[] { null });
+
+        runner.addControllerService("mock-reader-factory", readerFactory);
+        runner.enableControllerService(readerFactory);
+        runner.setProperty(PutHive3Streaming.RECORD_READER, "mock-reader-factory");
+
+        runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083");
+        runner.setProperty(PutHive3Streaming.DB_NAME, "default");
+        runner.setProperty(PutHive3Streaming.TABLE_NAME, "transactions");
+        runner.enqueue(new byte[0]);
+        runner.run();
+
+        runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 1);
+        final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutHive3Streaming.REL_SUCCESS).get(0);
+        assertEquals("1", flowFile.getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR));
+        assertEquals("default.transactions", flowFile.getAttribute(ATTR_OUTPUT_TABLES));
+    }
+
+    @Test
+    public void testNullArrayHandling() throws IOException, MalformedRecordException, InitializationException {
+        String schemaText = "{ \"name\":\"test\", \"type\":\"record\", \"fields\":[ { \"name\":\"groups\", \"type\": [ \"null\", { \"type\":\"array\", \"items\":\"string\" }  ] } ] }";
+        schema = new Schema.Parser().parse(schemaText);
+        processor.setFields(Arrays.asList(
+                new FieldSchema("groups", "array<string>", "null groups")
+        ));
+        //setup runner
+        runner = TestRunners.newTestRunner(processor);
+        runner.setProperty(PutHive3Streaming.HIVE_CONFIGURATION_RESOURCES, TEST_CONF_PATH);
+        MockRecordParser readerFactory = new MockRecordParser();
+        final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema);
+        for (final RecordField recordField : recordSchema.getFields()) {
+            readerFactory.addSchemaField(recordField.getFieldName(), recordField.getDataType().getFieldType(), recordField.isNullable());
+        }
+
+        readerFactory.addRecord(new Object[] { null });
+
+        runner.addControllerService("mock-reader-factory", readerFactory);
+        runner.enableControllerService(readerFactory);
+        runner.setProperty(PutHive3Streaming.RECORD_READER, "mock-reader-factory");
+
+        runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083");
+        runner.setProperty(PutHive3Streaming.DB_NAME, "default");
+        runner.setProperty(PutHive3Streaming.TABLE_NAME, "groups");
+        runner.enqueue(new byte[0]);
+        runner.run();
+
+        runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 1);
+        final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutHive3Streaming.REL_SUCCESS).get(0);
+        assertEquals("1", flowFile.getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR));
+        assertEquals("default.groups", flowFile.getAttribute(ATTR_OUTPUT_TABLES));
+    }
+
     @Test
     public void cleanup() {
         processor.cleanup();