You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2019/02/14 20:09:34 UTC

[nifi] branch master updated: NIFI-6000 Catch also IllegalArgumentException in ConvertAvroToORC hive processor. Added support for Avro null types.

This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new e598b30  NIFI-6000 Catch also IllegalArgumentException in ConvertAvroToORC hive processor. Added support for Avro null types.
e598b30 is described below

commit e598b30d6dc85111762f9618f1b7cfefc68e612b
Author: Aleksandr Salatich <sa...@gmail.com>
AuthorDate: Tue Feb 12 18:28:48 2019 +0300

    NIFI-6000 Catch also IllegalArgumentException in ConvertAvroToORC hive processor. Added support for Avro null types.
    
    Signed-off-by: Matthew Burgess <ma...@apache.org>
    
    This closes #3302
---
 .../apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java |  3 +
 .../nifi/processors/hive/ConvertAvroToORC.java     |  4 +-
 .../nifi/processors/hive/TestConvertAvroToORC.java | 95 ++++++++++++++++++++++
 .../org/apache/nifi/util/orc/TestNiFiOrcUtils.java | 50 +++++++++++-
 4 files changed, 149 insertions(+), 3 deletions(-)

diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java
index 687073e..ce06f82 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java
@@ -244,6 +244,7 @@ public class NiFiOrcUtils {
             case DOUBLE:
             case FLOAT:
             case STRING:
+            case NULL:
                 return getPrimitiveOrcTypeFromPrimitiveAvroType(fieldType);
 
             case UNION:
@@ -335,6 +336,7 @@ public class NiFiOrcUtils {
             case LONG:
                 return TypeInfoFactory.getPrimitiveTypeInfo("bigint");
             case BOOLEAN:
+            case NULL: // ORC has no null type, so just pick the smallest. All values are necessarily null.
                 return TypeInfoFactory.getPrimitiveTypeInfo("boolean");
             case BYTES:
                 return TypeInfoFactory.getPrimitiveTypeInfo("binary");
@@ -362,6 +364,7 @@ public class NiFiOrcUtils {
             case LONG:
                 return "BIGINT";
             case BOOLEAN:
+            case NULL: // Hive has no null type, we picked boolean as the ORC type so use it for Hive DDL too. All values are necessarily null.
                 return "BOOLEAN";
             case BYTES:
                 return "BINARY";
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/ConvertAvroToORC.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/ConvertAvroToORC.java
index f211ac5..e8ee2a2 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/ConvertAvroToORC.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/ConvertAvroToORC.java
@@ -283,8 +283,8 @@ public class ConvertAvroToORC extends AbstractProcessor {
             session.transfer(flowFile, REL_SUCCESS);
             session.getProvenanceReporter().modifyContent(flowFile, "Converted "+totalRecordCount.get()+" records", System.currentTimeMillis() - startTime);
 
-        } catch (final ProcessException pe) {
-            getLogger().error("Failed to convert {} from Avro to ORC due to {}; transferring to failure", new Object[]{flowFile, pe});
+        } catch (ProcessException | IllegalArgumentException e) {
+            getLogger().error("Failed to convert {} from Avro to ORC due to {}; transferring to failure", new Object[]{flowFile, e});
             session.transfer(flowFile, REL_FAILURE);
         }
     }
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestConvertAvroToORC.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestConvertAvroToORC.java
index 282b42d..f34a647 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestConvertAvroToORC.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestConvertAvroToORC.java
@@ -17,10 +17,13 @@
 package org.apache.nifi.processors.hive;
 
 import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.DatumWriter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -43,7 +46,9 @@ import org.apache.nifi.util.orc.TestNiFiOrcUtils;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.nio.charset.StandardCharsets;
@@ -55,6 +60,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -80,6 +86,95 @@ public class TestConvertAvroToORC {
     }
 
     @Test
+    public void test_onTrigger_routing_to_failure_null_type() throws Exception {
+        String testString = "Hello World";
+        GenericData.Record record = TestNiFiOrcUtils.buildAvroRecordWithNull(testString);
+
+        DatumWriter<GenericData.Record> writer = new GenericDatumWriter<>(record.getSchema());
+        DataFileWriter<GenericData.Record> fileWriter = new DataFileWriter<>(writer);
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        fileWriter.create(record.getSchema(), out);
+        fileWriter.append(record);
+        fileWriter.flush();
+        fileWriter.close();
+        out.close();
+
+        Map<String, String> attributes = new HashMap<String, String>() {{
+            put(CoreAttributes.FILENAME.key(), "test.avro");
+        }};
+        runner.enqueue(out.toByteArray(), attributes);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ConvertAvroToORC.REL_SUCCESS, 1);
+        MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(ConvertAvroToORC.REL_SUCCESS).get(0);
+        assertEquals("test.orc", resultFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
+        assertEquals("CREATE EXTERNAL TABLE IF NOT EXISTS test_record (string STRING, null BOOLEAN) STORED AS ORC",
+                resultFlowFile.getAttribute(ConvertAvroToORC.HIVE_DDL_ATTRIBUTE));
+    }
+
+    @Test
+    public void test_onTrigger_routing_to_failure_empty_array_type() throws Exception {
+        String testString = "Hello World";
+        GenericData.Record record = TestNiFiOrcUtils.buildAvroRecordWithEmptyArray(testString);
+
+        DatumWriter<GenericData.Record> writer = new GenericDatumWriter<>(record.getSchema());
+        DataFileWriter<GenericData.Record> fileWriter = new DataFileWriter<>(writer);
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        fileWriter.create(record.getSchema(), out);
+        fileWriter.append(record);
+        fileWriter.flush();
+        fileWriter.close();
+        out.close();
+
+        Map<String, String> attributes = new HashMap<String, String>() {{
+            put(CoreAttributes.FILENAME.key(), "test.avro");
+        }};
+        runner.enqueue(out.toByteArray(), attributes);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ConvertAvroToORC.REL_SUCCESS, 1);
+        MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(ConvertAvroToORC.REL_SUCCESS).get(0);
+        assertEquals("test.orc", resultFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
+        assertEquals("CREATE EXTERNAL TABLE IF NOT EXISTS test_record (string STRING, emptyArray ARRAY<BOOLEAN>) STORED AS ORC",
+                resultFlowFile.getAttribute(ConvertAvroToORC.HIVE_DDL_ATTRIBUTE));
+    }
+
+    @Test
+    public void test_onTrigger_routing_to_failure_fixed_type() throws Exception {
+        String testString = "Hello!";
+        GenericData.Record record = TestNiFiOrcUtils.buildAvroRecordWithFixed(testString);
+
+        DatumWriter<GenericData.Record> writer = new GenericDatumWriter<>(record.getSchema());
+        DataFileWriter<GenericData.Record> fileWriter = new DataFileWriter<>(writer);
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        fileWriter.create(record.getSchema(), out);
+        fileWriter.append(record);
+        fileWriter.flush();
+        fileWriter.close();
+        out.close();
+
+        Map<String, String> attributes = new HashMap<String, String>() {{
+            put(CoreAttributes.FILENAME.key(), "test.avro");
+        }};
+        runner.enqueue(out.toByteArray(), attributes);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ConvertAvroToORC.REL_FAILURE, 1);
+        MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(ConvertAvroToORC.REL_FAILURE).get(0);
+        assertEquals("test.avro", resultFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
+
+        final InputStream in = new ByteArrayInputStream(resultFlowFile.toByteArray());
+        final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
+        try (DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(in, datumReader)) {
+            assertTrue(dataFileReader.hasNext());
+            GenericRecord testedRecord = dataFileReader.next();
+
+            assertNotNull(testedRecord.get("fixed"));
+            assertArrayEquals(testString.getBytes(StandardCharsets.UTF_8), ((GenericData.Fixed) testedRecord.get("fixed")).bytes());
+        }
+    }
+
+    @Test
     public void test_onTrigger_primitive_record() throws Exception {
         GenericData.Record record = TestNiFiOrcUtils.buildPrimitiveAvroRecord(10, 20L, true, 30.0f, 40, StandardCharsets.UTF_8.encode("Hello"), "World");
 
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java
index 47ee3a5..cd7847f 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java
@@ -34,7 +34,9 @@ import org.apache.hadoop.io.Text;
 import org.junit.Test;
 
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -271,7 +273,7 @@ public class TestNiFiOrcUtils {
 
     @Test
     public void test_convertToORCObject() {
-        Schema schema = SchemaBuilder.enumeration("myEnum").symbols("x","y","z");
+        Schema schema = SchemaBuilder.enumeration("myEnum").symbols("x", "y", "z");
         List<Object> objects = Arrays.asList(new Utf8("Hello"), new GenericData.EnumSymbol(schema, "x"));
         objects.forEach((avroObject) -> {
             Object o = NiFiOrcUtils.convertToORCObject(TypeInfoUtils.getTypeInfoFromTypeString("uniontype<bigint,string>"), avroObject);
@@ -304,6 +306,29 @@ public class TestNiFiOrcUtils {
         return builder.endRecord();
     }
 
+    public static Schema buildAvroSchemaWithNull() {
+        // Build a fake Avro record which contains null
+        final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("test.record").namespace("any.data").fields();
+        builder.name("string").type().stringType().stringDefault("default");
+        builder.name("null").type().nullType().noDefault();
+        return builder.endRecord();
+    }
+
+    public static Schema buildAvroSchemaWithEmptyArray() {
+        // Build a fake Avro record which contains empty array
+        final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("test.record").namespace("any.data").fields();
+        builder.name("string").type().stringType().stringDefault("default");
+        builder.name("emptyArray").type().array().items().nullType().noDefault();
+        return builder.endRecord();
+    }
+
+    public static Schema buildAvroSchemaWithFixed() {
+        // Build a fake Avro record which contains null
+        final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("test.record").namespace("any.data").fields();
+        builder.name("fixed").type().fixed("fixedField").size(6).fixedDefault("123456");
+        return builder.endRecord();
+    }
+
     public static GenericData.Record buildPrimitiveAvroRecord(int i, long l, boolean b, float f, double d, ByteBuffer bytes, String string) {
         Schema schema = buildPrimitiveAvroSchema();
         GenericData.Record row = new GenericData.Record(schema);
@@ -351,6 +376,29 @@ public class TestNiFiOrcUtils {
         return row;
     }
 
+    public static GenericData.Record buildAvroRecordWithNull(String string) {
+        Schema schema = buildAvroSchemaWithNull();
+        GenericData.Record row = new GenericData.Record(schema);
+        row.put("string", string);
+        row.put("null", null);
+        return row;
+    }
+
+    public static GenericData.Record buildAvroRecordWithEmptyArray(String string) {
+        Schema schema = buildAvroSchemaWithEmptyArray();
+        GenericData.Record row = new GenericData.Record(schema);
+        row.put("string", string);
+        row.put("emptyArray", Collections.emptyList());
+        return row;
+    }
+
+    public static GenericData.Record buildAvroRecordWithFixed(String string) {
+        Schema schema = buildAvroSchemaWithFixed();
+        GenericData.Record row = new GenericData.Record(schema);
+        row.put("fixed", new GenericData.Fixed(schema, string.getBytes(StandardCharsets.UTF_8)));
+        return row;
+    }
+
     public static TypeInfo buildComplexOrcSchema() {
         return TypeInfoUtils.getTypeInfoFromTypeString("struct<myInt:int,myMap:map<string,double>,myEnum:string,myLongOrFloat:uniontype<int>,myIntList:array<int>>");
     }