You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2021/06/07 09:48:12 UTC

[nifi] branch main updated: [NIFI-8610] Do not reuse generic records in convertAvroToORC

This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new f07e17b  [NIFI-8610] Do not reuse generic records in convertAvroToORC
f07e17b is described below

commit f07e17ba74da2759213d52431e00f9d0ba5d39a5
Author: Dominik Przybysz <dp...@touk.pl>
AuthorDate: Tue May 18 10:26:58 2021 +0200

    [NIFI-8610] Do not reuse generic records in convertAvroToORC
    
    [NIFI-8610] Simplify decimal test for convertAvroToORC
    
    Signed-off-by: Arpad Boda <ab...@apache.org>
    
    This closes #1081
---
 .../nifi/processors/hive/ConvertAvroToORC.java     |  5 +-
 .../nifi/processors/hive/TestConvertAvroToORC.java | 65 +++++++++++++++++++++-
 2 files changed, 66 insertions(+), 4 deletions(-)

diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/ConvertAvroToORC.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/ConvertAvroToORC.java
index b323633..d918365 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/ConvertAvroToORC.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/ConvertAvroToORC.java
@@ -234,9 +234,8 @@ public class ConvertAvroToORC extends AbstractProcessor {
                     try {
 
                         int recordCount = 0;
-                        GenericRecord currRecord = null;
                         while (reader.hasNext()) {
-                            currRecord = reader.next(currRecord);
+                            GenericRecord currRecord = reader.next();
                             List<Schema.Field> fields = currRecord.getSchema().getFields();
                             if (fields != null) {
                                 Object[] row = new Object[fields.size()];
@@ -284,7 +283,7 @@ public class ConvertAvroToORC extends AbstractProcessor {
             flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), ORC_MIME_TYPE);
             flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), newFilename.toString());
             session.transfer(flowFile, REL_SUCCESS);
-            session.getProvenanceReporter().modifyContent(flowFile, "Converted "+totalRecordCount.get()+" records", System.currentTimeMillis() - startTime);
+            session.getProvenanceReporter().modifyContent(flowFile, "Converted " + totalRecordCount.get() + " records", System.currentTimeMillis() - startTime);
 
         } catch (ProcessException | IllegalArgumentException e) {
             getLogger().error("Failed to convert {} from Avro to ORC due to {}; transferring to failure", new Object[]{flowFile, e});
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestConvertAvroToORC.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestConvertAvroToORC.java
index 9248b27..5fe6752 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestConvertAvroToORC.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestConvertAvroToORC.java
@@ -49,9 +49,9 @@ import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.InputStream;
 import java.io.File;
 import java.io.FileOutputStream;
+import java.io.InputStream;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
@@ -321,6 +321,69 @@ public class TestConvertAvroToORC {
     }
 
     @Test
+    public void test_onTrigger_complex_records_with_bigdecimals() throws Exception {
+
+        Map<String, Double> mapData1 = new TreeMap<String, Double>() {{
+            put("key1", 1.0);
+            put("key2", 2.0);
+        }};
+
+
+        BigDecimal sampleBigDecimal1 = new BigDecimal("3500.12");
+        BigDecimal sampleBigDecimal2 = new BigDecimal("0.01");
+
+        GenericData.Record record1 = TestNiFiOrcUtils.buildComplexAvroRecord(null, mapData1, "XYZ", 4L, Arrays.asList(100, 200), toByteBuffer(sampleBigDecimal1));
+        DatumWriter<GenericData.Record> writer = new GenericDatumWriter<>(record1.getSchema());
+        DataFileWriter<GenericData.Record> fileWriter = new DataFileWriter<>(writer);
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        fileWriter.create(record1.getSchema(), out);
+        fileWriter.append(record1);
+        fileWriter.append(TestNiFiOrcUtils.buildComplexAvroRecord(null, mapData1, "XYZ", 4L, Arrays.asList(100, 200), toByteBuffer(sampleBigDecimal2)));
+        fileWriter.flush();
+        fileWriter.close();
+        out.close();
+
+        Map<String, String> attributes = new HashMap<String, String>() {{
+            put(CoreAttributes.FILENAME.key(), "test");
+        }};
+        runner.enqueue(out.toByteArray(), attributes);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ConvertAvroToORC.REL_SUCCESS, 1);
+
+        // Write the flow file out to disk, since the ORC Reader needs a path
+        MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(ConvertAvroToORC.REL_SUCCESS).get(0);
+        assertEquals("2", resultFlowFile.getAttribute(ConvertAvroToORC.RECORD_COUNT_ATTRIBUTE));
+        assertEquals("test.orc", resultFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
+        byte[] resultContents = runner.getContentAsByteArray(resultFlowFile);
+        FileOutputStream fos = new FileOutputStream("target/test1.orc");
+        fos.write(resultContents);
+        fos.flush();
+        fos.close();
+
+        Configuration conf = new Configuration();
+        FileSystem fs = FileSystem.getLocal(conf);
+        Reader reader = OrcFile.createReader(new Path("target/test1.orc"), OrcFile.readerOptions(conf).filesystem(fs));
+        RecordReader rows = reader.rows();
+        TypeInfo resultSchema = TestNiFiOrcUtils.buildComplexOrcSchema();
+        StructObjectInspector inspector = (StructObjectInspector) OrcStruct.createObjectInspector(resultSchema);
+
+        Object result1 = rows.next(null);
+        assertNotNull(result1);
+        Object decimalFieldObject1 = inspector.getStructFieldData(result1, inspector.getStructFieldRef("myDecimal"));
+        assertEquals(sampleBigDecimal1, ((HiveDecimalWritable) decimalFieldObject1).getHiveDecimal().bigDecimalValue());
+
+        Object result2 = rows.next(null);
+        assertNotNull(result2);
+        Object decimalFieldObject2 = inspector.getStructFieldData(result2, inspector.getStructFieldRef("myDecimal"));
+        assertEquals(sampleBigDecimal2, ((HiveDecimalWritable) decimalFieldObject2).getHiveDecimal().bigDecimalValue());
+    }
+
+    private ByteBuffer toByteBuffer(BigDecimal sampleBigDecimal) {
+        return ByteBuffer.wrap(sampleBigDecimal.unscaledValue().toByteArray());
+    }
+
+    @Test
     public void test_onTrigger_array_of_records() throws Exception {
         final Schema schema = new Schema.Parser().parse(new File("src/test/resources/array_of_records.avsc"));
         List<GenericRecord> innerRecords = new LinkedList<>();