You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2021/06/07 09:48:12 UTC
[nifi] branch main updated: [NIFI-8610] Do not reuse generic
records in convertAvroToORC
This is an automated email from the ASF dual-hosted git repository.
aboda pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new f07e17b [NIFI-8610] Do not reuse generic records in convertAvroToORC
f07e17b is described below
commit f07e17ba74da2759213d52431e00f9d0ba5d39a5
Author: Dominik Przybysz <dp...@touk.pl>
AuthorDate: Tue May 18 10:26:58 2021 +0200
[NIFI-8610] Do not reuse generic records in convertAvroToORC
[NIFI-8610] Simplify decimal test for convertAvroToORC
Signed-off-by: Arpad Boda <ab...@apache.org>
This closes #1081
---
.../nifi/processors/hive/ConvertAvroToORC.java | 5 +-
.../nifi/processors/hive/TestConvertAvroToORC.java | 65 +++++++++++++++++++++-
2 files changed, 66 insertions(+), 4 deletions(-)
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/ConvertAvroToORC.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/ConvertAvroToORC.java
index b323633..d918365 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/ConvertAvroToORC.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/ConvertAvroToORC.java
@@ -234,9 +234,8 @@ public class ConvertAvroToORC extends AbstractProcessor {
try {
int recordCount = 0;
- GenericRecord currRecord = null;
while (reader.hasNext()) {
- currRecord = reader.next(currRecord);
+ GenericRecord currRecord = reader.next();
List<Schema.Field> fields = currRecord.getSchema().getFields();
if (fields != null) {
Object[] row = new Object[fields.size()];
@@ -284,7 +283,7 @@ public class ConvertAvroToORC extends AbstractProcessor {
flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), ORC_MIME_TYPE);
flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), newFilename.toString());
session.transfer(flowFile, REL_SUCCESS);
- session.getProvenanceReporter().modifyContent(flowFile, "Converted "+totalRecordCount.get()+" records", System.currentTimeMillis() - startTime);
+ session.getProvenanceReporter().modifyContent(flowFile, "Converted " + totalRecordCount.get() + " records", System.currentTimeMillis() - startTime);
} catch (ProcessException | IllegalArgumentException e) {
getLogger().error("Failed to convert {} from Avro to ORC due to {}; transferring to failure", new Object[]{flowFile, e});
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestConvertAvroToORC.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestConvertAvroToORC.java
index 9248b27..5fe6752 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestConvertAvroToORC.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestConvertAvroToORC.java
@@ -49,9 +49,9 @@ import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
-import java.io.InputStream;
import java.io.File;
import java.io.FileOutputStream;
+import java.io.InputStream;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
@@ -321,6 +321,69 @@ public class TestConvertAvroToORC {
}
@Test
+ public void test_onTrigger_complex_records_with_bigdecimals() throws Exception {
+
+ Map<String, Double> mapData1 = new TreeMap<String, Double>() {{
+ put("key1", 1.0);
+ put("key2", 2.0);
+ }};
+
+
+ BigDecimal sampleBigDecimal1 = new BigDecimal("3500.12");
+ BigDecimal sampleBigDecimal2 = new BigDecimal("0.01");
+
+ GenericData.Record record1 = TestNiFiOrcUtils.buildComplexAvroRecord(null, mapData1, "XYZ", 4L, Arrays.asList(100, 200), toByteBuffer(sampleBigDecimal1));
+ DatumWriter<GenericData.Record> writer = new GenericDatumWriter<>(record1.getSchema());
+ DataFileWriter<GenericData.Record> fileWriter = new DataFileWriter<>(writer);
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ fileWriter.create(record1.getSchema(), out);
+ fileWriter.append(record1);
+ fileWriter.append(TestNiFiOrcUtils.buildComplexAvroRecord(null, mapData1, "XYZ", 4L, Arrays.asList(100, 200), toByteBuffer(sampleBigDecimal2)));
+ fileWriter.flush();
+ fileWriter.close();
+ out.close();
+
+ Map<String, String> attributes = new HashMap<String, String>() {{
+ put(CoreAttributes.FILENAME.key(), "test");
+ }};
+ runner.enqueue(out.toByteArray(), attributes);
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ConvertAvroToORC.REL_SUCCESS, 1);
+
+ // Write the flow file out to disk, since the ORC Reader needs a path
+ MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(ConvertAvroToORC.REL_SUCCESS).get(0);
+ assertEquals("2", resultFlowFile.getAttribute(ConvertAvroToORC.RECORD_COUNT_ATTRIBUTE));
+ assertEquals("test.orc", resultFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
+ byte[] resultContents = runner.getContentAsByteArray(resultFlowFile);
+ FileOutputStream fos = new FileOutputStream("target/test1.orc");
+ fos.write(resultContents);
+ fos.flush();
+ fos.close();
+
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.getLocal(conf);
+ Reader reader = OrcFile.createReader(new Path("target/test1.orc"), OrcFile.readerOptions(conf).filesystem(fs));
+ RecordReader rows = reader.rows();
+ TypeInfo resultSchema = TestNiFiOrcUtils.buildComplexOrcSchema();
+ StructObjectInspector inspector = (StructObjectInspector) OrcStruct.createObjectInspector(resultSchema);
+
+ Object result1 = rows.next(null);
+ assertNotNull(result1);
+ Object decimalFieldObject1 = inspector.getStructFieldData(result1, inspector.getStructFieldRef("myDecimal"));
+ assertEquals(sampleBigDecimal1, ((HiveDecimalWritable) decimalFieldObject1).getHiveDecimal().bigDecimalValue());
+
+ Object result2 = rows.next(null);
+ assertNotNull(result2);
+ Object decimalFieldObject2 = inspector.getStructFieldData(result2, inspector.getStructFieldRef("myDecimal"));
+ assertEquals(sampleBigDecimal2, ((HiveDecimalWritable) decimalFieldObject2).getHiveDecimal().bigDecimalValue());
+ }
+
+ private ByteBuffer toByteBuffer(BigDecimal sampleBigDecimal) {
+ return ByteBuffer.wrap(sampleBigDecimal.unscaledValue().toByteArray());
+ }
+
+ @Test
public void test_onTrigger_array_of_records() throws Exception {
final Schema schema = new Schema.Parser().parse(new File("src/test/resources/array_of_records.avsc"));
List<GenericRecord> innerRecords = new LinkedList<>();