You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2018/12/13 15:23:45 UTC
nifi git commit: NIFI-5891 fix handling of null logical types in
Hive3Streaming processor
Repository: nifi
Updated Branches:
refs/heads/master 9a1ab4c50 -> c51512f5e
NIFI-5891 fix handling of null logical types in Hive3Streaming processor
NIFI-5891: Fixed Checkstyle issues
Signed-off-by: Matthew Burgess <ma...@apache.org>
This closes #3216
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c51512f5
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c51512f5
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c51512f5
Branch: refs/heads/master
Commit: c51512f5e33cbd413b1fda8700408aa95614680e
Parents: 9a1ab4c
Author: gkkorir <gk...@Safaricom.co.ke>
Authored: Thu Dec 13 17:25:37 2018 +0300
Committer: Matthew Burgess <ma...@apache.org>
Committed: Thu Dec 13 10:23:18 2018 -0500
----------------------------------------------------------------------
.../apache/hive/streaming/NiFiRecordSerDe.java | 28 ++--
.../processors/hive/TestPutHive3Streaming.java | 140 +++++++++++++++++++
2 files changed, 159 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/c51512f5/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java
index 932772e..e628474 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java
@@ -223,27 +223,37 @@ public class NiFiRecordSerDe extends AbstractSerDe {
break;
case DATE:
Date d = record.getAsDate(fieldName, field.getDataType().getFormat());
- org.apache.hadoop.hive.common.type.Date hiveDate = new org.apache.hadoop.hive.common.type.Date();
- hiveDate.setTimeInMillis(d.getTime());
- val = hiveDate;
+ if(d != null) {
+ org.apache.hadoop.hive.common.type.Date hiveDate = new org.apache.hadoop.hive.common.type.Date();
+ hiveDate.setTimeInMillis(d.getTime());
+ val = hiveDate;
+ } else {
+ val = null;
+ }
break;
// ORC doesn't currently handle TIMESTAMPLOCALTZ
case TIMESTAMP:
Timestamp ts = DataTypeUtils.toTimestamp(record.getValue(fieldName), () -> DataTypeUtils.getDateFormat(field.getDataType().getFormat()), fieldName);
- // Convert to Hive's Timestamp type
- org.apache.hadoop.hive.common.type.Timestamp hivetimestamp = new org.apache.hadoop.hive.common.type.Timestamp();
- hivetimestamp.setTimeInMillis(ts.getTime(), ts.getNanos());
- val = hivetimestamp;
+ if(ts != null) {
+ // Convert to Hive's Timestamp type
+ org.apache.hadoop.hive.common.type.Timestamp hivetimestamp = new org.apache.hadoop.hive.common.type.Timestamp();
+ hivetimestamp.setTimeInMillis(ts.getTime(), ts.getNanos());
+ val = hivetimestamp;
+ } else {
+ val = null;
+ }
break;
case DECIMAL:
- val = HiveDecimal.create(record.getAsDouble(fieldName));
+ Double value = record.getAsDouble(fieldName);
+ val = value == null ? null : HiveDecimal.create(value);
break;
default:
throw new IllegalArgumentException("Field " + fieldName + " cannot be converted to type: " + primitiveCategory.name());
}
break;
case LIST:
- val = Arrays.asList(record.getAsArray(fieldName));
+ Object[] value = record.getAsArray(fieldName);
+ val = value == null ? null : Arrays.asList(value);
break;
case MAP:
val = record.getValue(fieldName);
http://git-wip-us.apache.org/repos/asf/nifi/blob/c51512f5/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java
index 5fd759f..ee05416 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java
@@ -56,6 +56,7 @@ import org.apache.nifi.hadoop.SecurityUtil;
import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.MockRecordParser;
@@ -752,6 +753,145 @@ public class TestPutHive3Streaming {
assertEquals("default.users", flowFile.getAttribute(ATTR_OUTPUT_TABLES));
}
+ //logical types
+
+ @Test
+ public void testNullDateHandling() throws IOException, MalformedRecordException, InitializationException {
+ String schemaText = "{ \"name\":\"test\", \"type\":\"record\", \"fields\":[ { \"name\":\"dob\", \"type\": [ \"null\", { \"type\":\"int\", \"logicalType\":\"date\" } ] } ] }";
+ schema = new Schema.Parser().parse(schemaText);
+ processor.setFields(Arrays.asList(
+ new FieldSchema("dob", serdeConstants.DATE_TYPE_NAME, "null dob")
+ ));
+ //setup runner
+ runner = TestRunners.newTestRunner(processor);
+ runner.setProperty(PutHive3Streaming.HIVE_CONFIGURATION_RESOURCES, TEST_CONF_PATH);
+ MockRecordParser readerFactory = new MockRecordParser();
+ final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema);
+ for (final RecordField recordField : recordSchema.getFields()) {
+ readerFactory.addSchemaField(recordField.getFieldName(), recordField.getDataType().getFieldType(), recordField.isNullable());
+ }
+
+ readerFactory.addRecord(new Object[] { null });
+
+ runner.addControllerService("mock-reader-factory", readerFactory);
+ runner.enableControllerService(readerFactory);
+ runner.setProperty(PutHive3Streaming.RECORD_READER, "mock-reader-factory");
+
+ runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083");
+ runner.setProperty(PutHive3Streaming.DB_NAME, "default");
+ runner.setProperty(PutHive3Streaming.TABLE_NAME, "dobs");
+ runner.enqueue(new byte[0]);
+ runner.run();
+
+ runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 1);
+ final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutHive3Streaming.REL_SUCCESS).get(0);
+ assertEquals("1", flowFile.getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR));
+ assertEquals("default.dobs", flowFile.getAttribute(ATTR_OUTPUT_TABLES));
+ }
+
+ @Test
+ public void testNullTimestampHandling() throws IOException, MalformedRecordException, InitializationException {
+ String schemaText = "{ \"name\":\"test\", \"type\":\"record\", \"fields\":[ { \"name\":\"dob\", \"type\": [ \"null\", { \"type\":\"long\", \"logicalType\":\"timestamp-millis\" } ] } ] }";
+ schema = new Schema.Parser().parse(schemaText);
+ processor.setFields(Arrays.asList(
+ new FieldSchema("dob", serdeConstants.TIMESTAMP_TYPE_NAME, "null dob")
+ ));
+ //setup runner
+ runner = TestRunners.newTestRunner(processor);
+ runner.setProperty(PutHive3Streaming.HIVE_CONFIGURATION_RESOURCES, TEST_CONF_PATH);
+ MockRecordParser readerFactory = new MockRecordParser();
+ final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema);
+ for (final RecordField recordField : recordSchema.getFields()) {
+ readerFactory.addSchemaField(recordField.getFieldName(), recordField.getDataType().getFieldType(), recordField.isNullable());
+ }
+
+ readerFactory.addRecord(new Object[] { null });
+
+ runner.addControllerService("mock-reader-factory", readerFactory);
+ runner.enableControllerService(readerFactory);
+ runner.setProperty(PutHive3Streaming.RECORD_READER, "mock-reader-factory");
+
+ runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083");
+ runner.setProperty(PutHive3Streaming.DB_NAME, "default");
+ runner.setProperty(PutHive3Streaming.TABLE_NAME, "ts");
+ runner.enqueue(new byte[0]);
+ runner.run();
+
+ runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 1);
+ final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutHive3Streaming.REL_SUCCESS).get(0);
+ assertEquals("1", flowFile.getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR));
+ assertEquals("default.ts", flowFile.getAttribute(ATTR_OUTPUT_TABLES));
+ }
+
+ @Test
+ public void testNullDecimalHandling() throws IOException, MalformedRecordException, InitializationException {
+ String schemaText = "{ \"name\":\"test\", \"type\":\"record\", \"fields\":[ { \"name\":\"amount\", \"type\": [ \"null\", { \"type\":\"bytes\", "
+ + "\"logicalType\":\"decimal\", \"precision\":18, \"scale\":2 } ] } ] }";
+ schema = new Schema.Parser().parse(schemaText);
+ processor.setFields(Arrays.asList(
+ new FieldSchema("amount", serdeConstants.DECIMAL_TYPE_NAME, "null amount")
+ ));
+ //setup runner
+ runner = TestRunners.newTestRunner(processor);
+ runner.setProperty(PutHive3Streaming.HIVE_CONFIGURATION_RESOURCES, TEST_CONF_PATH);
+ MockRecordParser readerFactory = new MockRecordParser();
+ final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema);
+ for (final RecordField recordField : recordSchema.getFields()) {
+ readerFactory.addSchemaField(recordField.getFieldName(), recordField.getDataType().getFieldType(), recordField.isNullable());
+ }
+
+ readerFactory.addRecord(new Object[] { null });
+
+ runner.addControllerService("mock-reader-factory", readerFactory);
+ runner.enableControllerService(readerFactory);
+ runner.setProperty(PutHive3Streaming.RECORD_READER, "mock-reader-factory");
+
+ runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083");
+ runner.setProperty(PutHive3Streaming.DB_NAME, "default");
+ runner.setProperty(PutHive3Streaming.TABLE_NAME, "transactions");
+ runner.enqueue(new byte[0]);
+ runner.run();
+
+ runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 1);
+ final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutHive3Streaming.REL_SUCCESS).get(0);
+ assertEquals("1", flowFile.getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR));
+ assertEquals("default.transactions", flowFile.getAttribute(ATTR_OUTPUT_TABLES));
+ }
+
+ @Test
+ public void testNullArrayHandling() throws IOException, MalformedRecordException, InitializationException {
+ String schemaText = "{ \"name\":\"test\", \"type\":\"record\", \"fields\":[ { \"name\":\"groups\", \"type\": [ \"null\", { \"type\":\"array\", \"items\":\"string\" } ] } ] }";
+ schema = new Schema.Parser().parse(schemaText);
+ processor.setFields(Arrays.asList(
+ new FieldSchema("groups", "array<string>", "null groups")
+ ));
+ //setup runner
+ runner = TestRunners.newTestRunner(processor);
+ runner.setProperty(PutHive3Streaming.HIVE_CONFIGURATION_RESOURCES, TEST_CONF_PATH);
+ MockRecordParser readerFactory = new MockRecordParser();
+ final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema);
+ for (final RecordField recordField : recordSchema.getFields()) {
+ readerFactory.addSchemaField(recordField.getFieldName(), recordField.getDataType().getFieldType(), recordField.isNullable());
+ }
+
+ readerFactory.addRecord(new Object[] { null });
+
+ runner.addControllerService("mock-reader-factory", readerFactory);
+ runner.enableControllerService(readerFactory);
+ runner.setProperty(PutHive3Streaming.RECORD_READER, "mock-reader-factory");
+
+ runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083");
+ runner.setProperty(PutHive3Streaming.DB_NAME, "default");
+ runner.setProperty(PutHive3Streaming.TABLE_NAME, "groups");
+ runner.enqueue(new byte[0]);
+ runner.run();
+
+ runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 1);
+ final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutHive3Streaming.REL_SUCCESS).get(0);
+ assertEquals("1", flowFile.getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR));
+ assertEquals("default.groups", flowFile.getAttribute(ATTR_OUTPUT_TABLES));
+ }
+
@Test
public void cleanup() {
processor.cleanup();