You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2019/02/14 20:09:34 UTC
[nifi] branch master updated: NIFI-6000 Catch also
IllegalArgumentException in ConvertAvroToORC hive processor. Added support
for Avro null types.
This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new e598b30 NIFI-6000 Catch also IllegalArgumentException in ConvertAvroToORC hive processor. Added support for Avro null types.
e598b30 is described below
commit e598b30d6dc85111762f9618f1b7cfefc68e612b
Author: Aleksandr Salatich <sa...@gmail.com>
AuthorDate: Tue Feb 12 18:28:48 2019 +0300
NIFI-6000 Catch also IllegalArgumentException in ConvertAvroToORC hive processor. Added support for Avro null types.
Signed-off-by: Matthew Burgess <ma...@apache.org>
This closes #3302
---
.../apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java | 3 +
.../nifi/processors/hive/ConvertAvroToORC.java | 4 +-
.../nifi/processors/hive/TestConvertAvroToORC.java | 95 ++++++++++++++++++++++
.../org/apache/nifi/util/orc/TestNiFiOrcUtils.java | 50 +++++++++++-
4 files changed, 149 insertions(+), 3 deletions(-)
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java
index 687073e..ce06f82 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java
@@ -244,6 +244,7 @@ public class NiFiOrcUtils {
case DOUBLE:
case FLOAT:
case STRING:
+ case NULL:
return getPrimitiveOrcTypeFromPrimitiveAvroType(fieldType);
case UNION:
@@ -335,6 +336,7 @@ public class NiFiOrcUtils {
case LONG:
return TypeInfoFactory.getPrimitiveTypeInfo("bigint");
case BOOLEAN:
+ case NULL: // ORC has no null type, so just pick the smallest. All values are necessarily null.
return TypeInfoFactory.getPrimitiveTypeInfo("boolean");
case BYTES:
return TypeInfoFactory.getPrimitiveTypeInfo("binary");
@@ -362,6 +364,7 @@ public class NiFiOrcUtils {
case LONG:
return "BIGINT";
case BOOLEAN:
+ case NULL: // Hive has no null type, we picked boolean as the ORC type so use it for Hive DDL too. All values are necessarily null.
return "BOOLEAN";
case BYTES:
return "BINARY";
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/ConvertAvroToORC.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/ConvertAvroToORC.java
index f211ac5..e8ee2a2 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/ConvertAvroToORC.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/ConvertAvroToORC.java
@@ -283,8 +283,8 @@ public class ConvertAvroToORC extends AbstractProcessor {
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().modifyContent(flowFile, "Converted "+totalRecordCount.get()+" records", System.currentTimeMillis() - startTime);
- } catch (final ProcessException pe) {
- getLogger().error("Failed to convert {} from Avro to ORC due to {}; transferring to failure", new Object[]{flowFile, pe});
+ } catch (ProcessException | IllegalArgumentException e) {
+ getLogger().error("Failed to convert {} from Avro to ORC due to {}; transferring to failure", new Object[]{flowFile, e});
session.transfer(flowFile, REL_FAILURE);
}
}
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestConvertAvroToORC.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestConvertAvroToORC.java
index 282b42d..f34a647 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestConvertAvroToORC.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestConvertAvroToORC.java
@@ -17,10 +17,13 @@
package org.apache.nifi.processors.hive;
import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -43,7 +46,9 @@ import org.apache.nifi.util.orc.TestNiFiOrcUtils;
import org.junit.Before;
import org.junit.Test;
+import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.nio.charset.StandardCharsets;
@@ -55,6 +60,7 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -80,6 +86,95 @@ public class TestConvertAvroToORC {
}
@Test
+ public void test_onTrigger_routing_to_failure_null_type() throws Exception {
+ String testString = "Hello World";
+ GenericData.Record record = TestNiFiOrcUtils.buildAvroRecordWithNull(testString);
+
+ DatumWriter<GenericData.Record> writer = new GenericDatumWriter<>(record.getSchema());
+ DataFileWriter<GenericData.Record> fileWriter = new DataFileWriter<>(writer);
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ fileWriter.create(record.getSchema(), out);
+ fileWriter.append(record);
+ fileWriter.flush();
+ fileWriter.close();
+ out.close();
+
+ Map<String, String> attributes = new HashMap<String, String>() {{
+ put(CoreAttributes.FILENAME.key(), "test.avro");
+ }};
+ runner.enqueue(out.toByteArray(), attributes);
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ConvertAvroToORC.REL_SUCCESS, 1);
+ MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(ConvertAvroToORC.REL_SUCCESS).get(0);
+ assertEquals("test.orc", resultFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
+ assertEquals("CREATE EXTERNAL TABLE IF NOT EXISTS test_record (string STRING, null BOOLEAN) STORED AS ORC",
+ resultFlowFile.getAttribute(ConvertAvroToORC.HIVE_DDL_ATTRIBUTE));
+ }
+
+ @Test
+ public void test_onTrigger_routing_to_failure_empty_array_type() throws Exception {
+ String testString = "Hello World";
+ GenericData.Record record = TestNiFiOrcUtils.buildAvroRecordWithEmptyArray(testString);
+
+ DatumWriter<GenericData.Record> writer = new GenericDatumWriter<>(record.getSchema());
+ DataFileWriter<GenericData.Record> fileWriter = new DataFileWriter<>(writer);
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ fileWriter.create(record.getSchema(), out);
+ fileWriter.append(record);
+ fileWriter.flush();
+ fileWriter.close();
+ out.close();
+
+ Map<String, String> attributes = new HashMap<String, String>() {{
+ put(CoreAttributes.FILENAME.key(), "test.avro");
+ }};
+ runner.enqueue(out.toByteArray(), attributes);
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ConvertAvroToORC.REL_SUCCESS, 1);
+ MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(ConvertAvroToORC.REL_SUCCESS).get(0);
+ assertEquals("test.orc", resultFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
+ assertEquals("CREATE EXTERNAL TABLE IF NOT EXISTS test_record (string STRING, emptyArray ARRAY<BOOLEAN>) STORED AS ORC",
+ resultFlowFile.getAttribute(ConvertAvroToORC.HIVE_DDL_ATTRIBUTE));
+ }
+
+ @Test
+ public void test_onTrigger_routing_to_failure_fixed_type() throws Exception {
+ String testString = "Hello!";
+ GenericData.Record record = TestNiFiOrcUtils.buildAvroRecordWithFixed(testString);
+
+ DatumWriter<GenericData.Record> writer = new GenericDatumWriter<>(record.getSchema());
+ DataFileWriter<GenericData.Record> fileWriter = new DataFileWriter<>(writer);
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ fileWriter.create(record.getSchema(), out);
+ fileWriter.append(record);
+ fileWriter.flush();
+ fileWriter.close();
+ out.close();
+
+ Map<String, String> attributes = new HashMap<String, String>() {{
+ put(CoreAttributes.FILENAME.key(), "test.avro");
+ }};
+ runner.enqueue(out.toByteArray(), attributes);
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ConvertAvroToORC.REL_FAILURE, 1);
+ MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(ConvertAvroToORC.REL_FAILURE).get(0);
+ assertEquals("test.avro", resultFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
+
+ final InputStream in = new ByteArrayInputStream(resultFlowFile.toByteArray());
+ final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
+ try (DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(in, datumReader)) {
+ assertTrue(dataFileReader.hasNext());
+ GenericRecord testedRecord = dataFileReader.next();
+
+ assertNotNull(testedRecord.get("fixed"));
+ assertArrayEquals(testString.getBytes(StandardCharsets.UTF_8), ((GenericData.Fixed) testedRecord.get("fixed")).bytes());
+ }
+ }
+
+ @Test
public void test_onTrigger_primitive_record() throws Exception {
GenericData.Record record = TestNiFiOrcUtils.buildPrimitiveAvroRecord(10, 20L, true, 30.0f, 40, StandardCharsets.UTF_8.encode("Hello"), "World");
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java
index 47ee3a5..cd7847f 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java
@@ -34,7 +34,9 @@ import org.apache.hadoop.io.Text;
import org.junit.Test;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -271,7 +273,7 @@ public class TestNiFiOrcUtils {
@Test
public void test_convertToORCObject() {
- Schema schema = SchemaBuilder.enumeration("myEnum").symbols("x","y","z");
+ Schema schema = SchemaBuilder.enumeration("myEnum").symbols("x", "y", "z");
List<Object> objects = Arrays.asList(new Utf8("Hello"), new GenericData.EnumSymbol(schema, "x"));
objects.forEach((avroObject) -> {
Object o = NiFiOrcUtils.convertToORCObject(TypeInfoUtils.getTypeInfoFromTypeString("uniontype<bigint,string>"), avroObject);
@@ -304,6 +306,29 @@ public class TestNiFiOrcUtils {
return builder.endRecord();
}
+ public static Schema buildAvroSchemaWithNull() {
+ // Build a fake Avro record which contains null
+ final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("test.record").namespace("any.data").fields();
+ builder.name("string").type().stringType().stringDefault("default");
+ builder.name("null").type().nullType().noDefault();
+ return builder.endRecord();
+ }
+
+ public static Schema buildAvroSchemaWithEmptyArray() {
+ // Build a fake Avro record which contains empty array
+ final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("test.record").namespace("any.data").fields();
+ builder.name("string").type().stringType().stringDefault("default");
+ builder.name("emptyArray").type().array().items().nullType().noDefault();
+ return builder.endRecord();
+ }
+
+ public static Schema buildAvroSchemaWithFixed() {
+ // Build a fake Avro record which contains null
+ final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("test.record").namespace("any.data").fields();
+ builder.name("fixed").type().fixed("fixedField").size(6).fixedDefault("123456");
+ return builder.endRecord();
+ }
+
public static GenericData.Record buildPrimitiveAvroRecord(int i, long l, boolean b, float f, double d, ByteBuffer bytes, String string) {
Schema schema = buildPrimitiveAvroSchema();
GenericData.Record row = new GenericData.Record(schema);
@@ -351,6 +376,29 @@ public class TestNiFiOrcUtils {
return row;
}
+ public static GenericData.Record buildAvroRecordWithNull(String string) {
+ Schema schema = buildAvroSchemaWithNull();
+ GenericData.Record row = new GenericData.Record(schema);
+ row.put("string", string);
+ row.put("null", null);
+ return row;
+ }
+
+ public static GenericData.Record buildAvroRecordWithEmptyArray(String string) {
+ Schema schema = buildAvroSchemaWithEmptyArray();
+ GenericData.Record row = new GenericData.Record(schema);
+ row.put("string", string);
+ row.put("emptyArray", Collections.emptyList());
+ return row;
+ }
+
+ public static GenericData.Record buildAvroRecordWithFixed(String string) {
+ Schema schema = buildAvroSchemaWithFixed();
+ GenericData.Record row = new GenericData.Record(schema);
+ row.put("fixed", new GenericData.Fixed(schema, string.getBytes(StandardCharsets.UTF_8)));
+ return row;
+ }
+
public static TypeInfo buildComplexOrcSchema() {
return TypeInfoUtils.getTypeInfoFromTypeString("struct<myInt:int,myMap:map<string,double>,myEnum:string,myLongOrFloat:uniontype<int>,myIntList:array<int>>");
}