You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2019/02/06 20:55:58 UTC

[nifi] branch master updated: NIFI-5983: handling parse problems in recordReader implementations

This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 24a7d48  NIFI-5983: handling parse problems in recordReader implementations
24a7d48 is described below

commit 24a7d480c8fde18a7dd64d7de80812d18eb2c5a4
Author: Endre Zoltan Kovacs <ek...@hortonworks.com>
AuthorDate: Thu Jan 31 14:47:21 2019 +0100

    NIFI-5983: handling parse problems in recordReader implementations
    
    Fixed Checkstyle violation
    
    Signed-off-by: Matthew Burgess <ma...@apache.org>
    
    This closes #3282
---
 .../nifi-record-serialization-services/pom.xml     |  5 ++
 .../org/apache/nifi/avro/AvroRecordReader.java     | 25 +++++---
 .../java/org/apache/nifi/csv/CSVRecordReader.java  | 68 ++++++++++++----------
 .../apache/nifi/csv/ITApacheCSVRecordReader.java   | 35 ++++++++++-
 4 files changed, 93 insertions(+), 40 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
index 140a74b..b16464e 100755
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
@@ -119,6 +119,11 @@
             <artifactId>caffeine</artifactId>
             <version>2.6.2</version>
         </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>27.0.1-jre</version>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java
index c1d87b6..c9a624f 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java
@@ -24,6 +24,8 @@ import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordSchema;
 
+import com.google.common.base.Throwables;
+
 import java.io.IOException;
 import java.util.Map;
 
@@ -33,14 +35,21 @@ public abstract class AvroRecordReader implements RecordReader {
 
     @Override
     public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFields) throws IOException, MalformedRecordException {
-        GenericRecord record = nextAvroRecord();
-        if (record == null) {
-            return null;
+        try {
+            GenericRecord record = nextAvroRecord();
+            if (record == null) {
+                return null;
+            }
+
+            final RecordSchema schema = getSchema();
+            final Map<String, Object> values = AvroTypeUtil.convertAvroRecordToMap(record, schema);
+            return new MapRecord(schema, values);
+        } catch (IOException e) {
+            throw e;
+        } catch (MalformedRecordException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new MalformedRecordException("Error while getting next record. Root cause: " + Throwables.getRootCause(e), e);
         }
-
-        final RecordSchema schema = getSchema();
-        final Map<String, Object> values = AvroTypeUtil.convertAvroRecordToMap(record, schema);
-        return new MapRecord(schema, values);
     }
-
 }
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java
index 299ad05..22a2e8a 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java
@@ -42,6 +42,8 @@ import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
 
+import com.google.common.base.Throwables;
+
 
 public class CSVRecordReader extends AbstractCSVRecordReader {
     private final CSVParser csvParser;
@@ -72,45 +74,49 @@ public class CSVRecordReader extends AbstractCSVRecordReader {
 
     @Override
     public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFields) throws IOException, MalformedRecordException {
-        final RecordSchema schema = getSchema();
-
-        final List<RecordField> recordFields = getRecordFields();
-        final int numFieldNames = recordFields.size();
-
-        for (final CSVRecord csvRecord : csvParser) {
-            final Map<String, Object> values = new LinkedHashMap<>(recordFields.size() * 2);
-            for (int i = 0; i < csvRecord.size(); i++) {
-                final String rawValue = csvRecord.get(i);
 
-                final String rawFieldName;
-                final DataType dataType;
-                if (i >= numFieldNames) {
-                    if (!dropUnknownFields) {
-                        values.put("unknown_field_index_" + i, rawValue);
+        try {
+            final RecordSchema schema = getSchema();
+
+            final List<RecordField> recordFields = getRecordFields();
+            final int numFieldNames = recordFields.size();
+            for (final CSVRecord csvRecord : csvParser) {
+                final Map<String, Object> values = new LinkedHashMap<>(recordFields.size() * 2);
+                for (int i = 0; i < csvRecord.size(); i++) {
+                    final String rawValue = csvRecord.get(i);
+
+                    final String rawFieldName;
+                    final DataType dataType;
+                    if (i >= numFieldNames) {
+                        if (!dropUnknownFields) {
+                            values.put("unknown_field_index_" + i, rawValue);
+                        }
+
+                        continue;
+                    } else {
+                        final RecordField recordField = recordFields.get(i);
+                        rawFieldName = recordField.getFieldName();
+                        dataType = recordField.getDataType();
                     }
 
-                    continue;
-                } else {
-                    final RecordField recordField = recordFields.get(i);
-                    rawFieldName = recordField.getFieldName();
-                    dataType = recordField.getDataType();
-                }
 
+                    final Object value;
+                    if (coerceTypes) {
+                        value = convert(rawValue, dataType, rawFieldName);
+                    } else {
+                        // The CSV Reader is going to return all fields as Strings, because CSV doesn't have any way to
+                        // dictate a field type. As a result, we will use the schema that we have to attempt to convert
+                        // the value into the desired type if it's a simple type.
+                        value = convertSimpleIfPossible(rawValue, dataType, rawFieldName);
+                    }
 
-                final Object value;
-                if (coerceTypes) {
-                    value = convert(rawValue, dataType, rawFieldName);
-                } else {
-                    // The CSV Reader is going to return all fields as Strings, because CSV doesn't have any way to
-                    // dictate a field type. As a result, we will use the schema that we have to attempt to convert
-                    // the value into the desired type if it's a simple type.
-                    value = convertSimpleIfPossible(rawValue, dataType, rawFieldName);
+                    values.put(rawFieldName, value);
                 }
 
-                values.put(rawFieldName, value);
+                return new MapRecord(schema, values, coerceTypes, dropUnknownFields);
             }
-
-            return new MapRecord(schema, values, coerceTypes, dropUnknownFields);
+        } catch (Exception e) {
+            throw new MalformedRecordException("Error while getting next record. Root cause: " +  Throwables.getRootCause(e), e);
         }
 
         return null;
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/ITApacheCSVRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/ITApacheCSVRecordReader.java
index 30c05c0..17649cd 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/ITApacheCSVRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/ITApacheCSVRecordReader.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.csv;
 
 import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.QuoteMode;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.SimpleRecordSchema;
@@ -27,22 +28,30 @@ import org.apache.nifi.serialization.record.RecordSchema;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import com.google.common.base.Throwables;
+
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
 
 public class ITApacheCSVRecordReader {
 
     private final CSVFormat format = CSVFormat.DEFAULT.withFirstRecordAsHeader().withTrim().withQuote('"');
 
     private List<RecordField> getDefaultFields() {
+        return createStringFields(new String[]{"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
+    }
+
+    private List<RecordField> createStringFields(String[] fieldNames) {
         final List<RecordField> fields = new ArrayList<>();
-        for (final String fieldName : new String[]{"id", "name", "balance", "address", "city", "state", "zipCode", "country"}) {
+        for (final String fieldName : fieldNames) {
             fields.add(new RecordField(fieldName, RecordFieldType.STRING.getDataType()));
         }
         return fields;
@@ -71,4 +80,28 @@ public class ITApacheCSVRecordReader {
             assertEquals(NUM_LINES, numRecords);
         }
     }
+
+    @Test
+    public void testExceptionThrownOnParseProblem() throws IOException, MalformedRecordException {
+        CSVFormat csvFormat = CSVFormat.DEFAULT.withFirstRecordAsHeader().withQuoteMode(QuoteMode.ALL).withTrim().withDelimiter(',');
+        final int NUM_LINES = 25;
+        StringBuilder sb = new StringBuilder("\"id\",\"name\",\"balance\"");
+        for (int i = 0; i < NUM_LINES; i++) {
+            sb.append(String.format("\"%s\",\"John Doe\",\"4750.89D\"\n", i));
+        }
+        // cause a parse problem
+        sb.append(String.format("\"%s\"dieParser,\"John Doe\",\"4750.89D\"\n", NUM_LINES ));
+        sb.append(String.format("\"%s\",\"John Doe\",\"4750.89D\"\n", NUM_LINES + 1));
+        final RecordSchema schema = new SimpleRecordSchema(createStringFields(new String[] {"id", "name", "balance"}));
+
+        try (final InputStream bais = new ByteArrayInputStream(sb.toString().getBytes());
+             final CSVRecordReader reader = new CSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, csvFormat, true, false,
+                     RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8")) {
+
+            while (reader.nextRecord() != null) {}
+        } catch (Exception e) {
+            assertThat(e, instanceOf(MalformedRecordException.class));
+            assertThat(Throwables.getRootCause(e), instanceOf(IOException.class));
+        }
+    }
 }