You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2018/07/12 01:31:12 UTC

[04/10] nifi git commit: NIFI-5316 Fixed array handling for Avro that comes from Parquet's Avro reader

NIFI-5316 Fixed array handling for Avro that comes from Parquet's Avro reader

Signed-off-by: zenfenan <ze...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/260bc29e
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/260bc29e
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/260bc29e

Branch: refs/heads/support/nifi-1.7.x
Commit: 260bc29e1014a2fd21c3775e27f434756e13e0ee
Parents: 54bb511
Author: Bryan Bende <bb...@apache.org>
Authored: Mon Jun 25 10:36:55 2018 -0400
Committer: Andy LoPresto <al...@apache.org>
Committed: Wed Jul 11 18:29:22 2018 -0700

----------------------------------------------------------------------
 .../java/org/apache/nifi/avro/AvroTypeUtil.java |  22 +++-
 .../nifi-parquet-processors/pom.xml             |   2 +
 .../processors/parquet/FetchParquetTest.java    | 110 +++++++++++++++++++
 .../test/resources/avro/user-with-array.avsc    |   9 ++
 .../avro/user-with-nullable-array.avsc          |   9 ++
 5 files changed, 146 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/260bc29e/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
index c214819..23f74b8 100755
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
@@ -894,13 +894,23 @@ public class AvroTypeUtil {
             case STRING:
                 return value.toString();
             case ARRAY:
-                final GenericData.Array<?> array = (GenericData.Array<?>) value;
-                final Object[] valueArray = new Object[array.size()];
-                for (int i = 0; i < array.size(); i++) {
-                    final Schema elementSchema = avroSchema.getElementType();
-                    valueArray[i] = normalizeValue(array.get(i), elementSchema, fieldName + "[" + i + "]");
+                if (value instanceof List) {
+                    final List<?> list = (List<?>) value;
+                    final Object[] valueArray = new Object[list.size()];
+                    for (int i = 0; i < list.size(); i++) {
+                        final Schema elementSchema = avroSchema.getElementType();
+                        valueArray[i] = normalizeValue(list.get(i), elementSchema, fieldName + "[" + i + "]");
+                    }
+                    return valueArray;
+                } else {
+                    final GenericData.Array<?> array = (GenericData.Array<?>) value;
+                    final Object[] valueArray = new Object[array.size()];
+                    for (int i = 0; i < array.size(); i++) {
+                        final Schema elementSchema = avroSchema.getElementType();
+                        valueArray[i] = normalizeValue(array.get(i), elementSchema, fieldName + "[" + i + "]");
+                    }
+                    return valueArray;
                 }
-                return valueArray;
             case MAP:
                 final Map<?, ?> avroMap = (Map<?, ?>) value;
                 final Map<String, Object> map = new HashMap<>(avroMap.size());

http://git-wip-us.apache.org/repos/asf/nifi/blob/260bc29e/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml
index daefadf..4b552d3 100644
--- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml
@@ -101,6 +101,8 @@
                 <configuration>
                     <excludes combine.children="append">
                         <exclude>src/test/resources/avro/user.avsc</exclude>
+                        <exclude>src/test/resources/avro/user-with-array.avsc</exclude>
+                        <exclude>src/test/resources/avro/user-with-nullable-array.avsc</exclude>
                     </excludes>
                 </configuration>
             </plugin>

http://git-wip-us.apache.org/repos/asf/nifi/blob/260bc29e/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java
index 76d44aa..83b11f2 100644
--- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java
@@ -62,6 +62,8 @@ public class FetchParquetTest {
     static final String RECORD_HEADER = "name,favorite_number,favorite_color";
 
     private Schema schema;
+    private Schema schemaWithArray;
+    private Schema schemaWithNullableArray;
     private Configuration testConf;
     private FetchParquet proc;
     private TestRunner testRunner;
@@ -71,6 +73,12 @@ public class FetchParquetTest {
         final String avroSchema = IOUtils.toString(new FileInputStream("src/test/resources/avro/user.avsc"), StandardCharsets.UTF_8);
         schema = new Schema.Parser().parse(avroSchema);
 
+        final String avroSchemaWithArray = IOUtils.toString(new FileInputStream("src/test/resources/avro/user-with-array.avsc"), StandardCharsets.UTF_8);
+        schemaWithArray = new Schema.Parser().parse(avroSchemaWithArray);
+
+        final String avroSchemaWithNullableArray = IOUtils.toString(new FileInputStream("src/test/resources/avro/user-with-nullable-array.avsc"), StandardCharsets.UTF_8);
+        schemaWithNullableArray = new Schema.Parser().parse(avroSchemaWithNullableArray);
+
         testConf = new Configuration();
         testConf.addResource(new Path(TEST_CONF_PATH));
 
@@ -243,6 +251,42 @@ public class FetchParquetTest {
         flowFile.assertContentEquals("TRIGGER");
     }
 
+    @Test
+    public void testFetchWithArray() throws InitializationException, IOException {
+        configure(proc);
+
+        final File parquetDir = new File(DIRECTORY);
+        final File parquetFile = new File(parquetDir,"testFetchParquetWithArrayToCSV.parquet");
+        final int numUsers = 10;
+        writeParquetUsersWithArray(parquetFile, numUsers);
+
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put(CoreAttributes.PATH.key(), parquetDir.getAbsolutePath());
+        attributes.put(CoreAttributes.FILENAME.key(), parquetFile.getName());
+
+        testRunner.enqueue("TRIGGER", attributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(FetchParquet.REL_SUCCESS, 1);
+    }
+
+    @Test
+    public void testFetchWithNullableArray() throws InitializationException, IOException {
+        configure(proc);
+
+        final File parquetDir = new File(DIRECTORY);
+        final File parquetFile = new File(parquetDir,"testFetchParquetWithNullableArrayToCSV.parquet");
+        final int numUsers = 10;
+        writeParquetUsersWithNullableArray(parquetFile, numUsers);
+
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put(CoreAttributes.PATH.key(), parquetDir.getAbsolutePath());
+        attributes.put(CoreAttributes.FILENAME.key(), parquetFile.getName());
+
+        testRunner.enqueue("TRIGGER", attributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(FetchParquet.REL_SUCCESS, 1);
+    }
+
     protected void verifyCSVRecords(int numUsers, String csvContent) {
         final String[] splits = csvContent.split("[\\n]");
         Assert.assertEquals(numUsers, splits.length);
@@ -278,4 +322,70 @@ public class FetchParquetTest {
 
     }
 
+    private void writeParquetUsersWithArray(final File parquetFile, int numUsers) throws IOException {
+        if (parquetFile.exists()) {
+            Assert.assertTrue(parquetFile.delete());
+        }
+
+        final Path parquetPath = new Path(parquetFile.getPath());
+
+        final AvroParquetWriter.Builder<GenericRecord> writerBuilder = AvroParquetWriter
+                .<GenericRecord>builder(parquetPath)
+                .withSchema(schemaWithArray)
+                .withConf(testConf);
+
+        final Schema favoriteColorsSchema = schemaWithArray.getField("favorite_colors").schema();
+
+        try (final ParquetWriter<GenericRecord> writer = writerBuilder.build()) {
+            for (int i=0; i < numUsers; i++) {
+                final GenericRecord user = new GenericData.Record(schema);
+                user.put("name", "Bob" + i);
+                user.put("favorite_number", i);
+
+
+                final GenericData.Array<String> colors = new GenericData.Array<>(1, favoriteColorsSchema);
+                colors.add("blue" + i);
+
+                user.put("favorite_color", colors);
+
+                writer.write(user);
+            }
+        }
+
+    }
+
+    private void writeParquetUsersWithNullableArray(final File parquetFile, int numUsers) throws IOException {
+        if (parquetFile.exists()) {
+            Assert.assertTrue(parquetFile.delete());
+        }
+
+        final Path parquetPath = new Path(parquetFile.getPath());
+
+        final AvroParquetWriter.Builder<GenericRecord> writerBuilder = AvroParquetWriter
+                .<GenericRecord>builder(parquetPath)
+                .withSchema(schemaWithNullableArray)
+                .withConf(testConf);
+
+        // use the schemaWithArray here just to get the schema for the array part of the favorite_colors fields, the overall
+        // schemaWithNullableArray has a union of the array schema and null
+        final Schema favoriteColorsSchema = schemaWithArray.getField("favorite_colors").schema();
+
+        try (final ParquetWriter<GenericRecord> writer = writerBuilder.build()) {
+            for (int i=0; i < numUsers; i++) {
+                final GenericRecord user = new GenericData.Record(schema);
+                user.put("name", "Bob" + i);
+                user.put("favorite_number", i);
+
+
+                final GenericData.Array<String> colors = new GenericData.Array<>(1, favoriteColorsSchema);
+                colors.add("blue" + i);
+
+                user.put("favorite_color", colors);
+
+                writer.write(user);
+            }
+        }
+
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/260bc29e/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/avro/user-with-array.avsc
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/avro/user-with-array.avsc b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/avro/user-with-array.avsc
new file mode 100644
index 0000000..67a0cca
--- /dev/null
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/avro/user-with-array.avsc
@@ -0,0 +1,9 @@
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+     {"name": "name", "type": "string"},
+     {"name": "favorite_number",  "type": ["int", "null"]},
+     {"name": "favorite_colors",  "type": { "type": "array", "items": ["string","null"] }, "default": null }
+ ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/260bc29e/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/avro/user-with-nullable-array.avsc
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/avro/user-with-nullable-array.avsc b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/avro/user-with-nullable-array.avsc
new file mode 100644
index 0000000..8986eba
--- /dev/null
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/avro/user-with-nullable-array.avsc
@@ -0,0 +1,9 @@
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+     {"name": "name", "type": "string"},
+     {"name": "favorite_number",  "type": ["int", "null"]},
+     {"name": "favorite_colors", "type": [ "null", { "type": "array", "items": ["string","null"] } ], "default": null }
+ ]
+}
\ No newline at end of file