You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2019/02/06 20:55:58 UTC
[nifi] branch master updated: NIFI-5983: handling parse problems in
recordReader implementations
This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 24a7d48 NIFI-5983: handling parse problems in recordReader implementations
24a7d48 is described below
commit 24a7d480c8fde18a7dd64d7de80812d18eb2c5a4
Author: Endre Zoltan Kovacs <ek...@hortonworks.com>
AuthorDate: Thu Jan 31 14:47:21 2019 +0100
NIFI-5983: handling parse problems in recordReader implementations
Fixed Checkstyle violation
Signed-off-by: Matthew Burgess <ma...@apache.org>
This closes #3282
---
.../nifi-record-serialization-services/pom.xml | 5 ++
.../org/apache/nifi/avro/AvroRecordReader.java | 25 +++++---
.../java/org/apache/nifi/csv/CSVRecordReader.java | 68 ++++++++++++----------
.../apache/nifi/csv/ITApacheCSVRecordReader.java | 35 ++++++++++-
4 files changed, 93 insertions(+), 40 deletions(-)
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
index 140a74b..b16464e 100755
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
@@ -119,6 +119,11 @@
<artifactId>caffeine</artifactId>
<version>2.6.2</version>
</dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>27.0.1-jre</version>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java
index c1d87b6..c9a624f 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java
@@ -24,6 +24,8 @@ import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
+import com.google.common.base.Throwables;
+
import java.io.IOException;
import java.util.Map;
@@ -33,14 +35,21 @@ public abstract class AvroRecordReader implements RecordReader {
@Override
public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFields) throws IOException, MalformedRecordException {
- GenericRecord record = nextAvroRecord();
- if (record == null) {
- return null;
+ try {
+ GenericRecord record = nextAvroRecord();
+ if (record == null) {
+ return null;
+ }
+
+ final RecordSchema schema = getSchema();
+ final Map<String, Object> values = AvroTypeUtil.convertAvroRecordToMap(record, schema);
+ return new MapRecord(schema, values);
+ } catch (IOException e) {
+ throw e;
+ } catch (MalformedRecordException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new MalformedRecordException("Error while getting next record. Root cause: " + Throwables.getRootCause(e), e);
}
-
- final RecordSchema schema = getSchema();
- final Map<String, Object> values = AvroTypeUtil.convertAvroRecordToMap(record, schema);
- return new MapRecord(schema, values);
}
-
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java
index 299ad05..22a2e8a 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java
@@ -42,6 +42,8 @@ import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
+import com.google.common.base.Throwables;
+
public class CSVRecordReader extends AbstractCSVRecordReader {
private final CSVParser csvParser;
@@ -72,45 +74,49 @@ public class CSVRecordReader extends AbstractCSVRecordReader {
@Override
public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFields) throws IOException, MalformedRecordException {
- final RecordSchema schema = getSchema();
-
- final List<RecordField> recordFields = getRecordFields();
- final int numFieldNames = recordFields.size();
-
- for (final CSVRecord csvRecord : csvParser) {
- final Map<String, Object> values = new LinkedHashMap<>(recordFields.size() * 2);
- for (int i = 0; i < csvRecord.size(); i++) {
- final String rawValue = csvRecord.get(i);
- final String rawFieldName;
- final DataType dataType;
- if (i >= numFieldNames) {
- if (!dropUnknownFields) {
- values.put("unknown_field_index_" + i, rawValue);
+ try {
+ final RecordSchema schema = getSchema();
+
+ final List<RecordField> recordFields = getRecordFields();
+ final int numFieldNames = recordFields.size();
+ for (final CSVRecord csvRecord : csvParser) {
+ final Map<String, Object> values = new LinkedHashMap<>(recordFields.size() * 2);
+ for (int i = 0; i < csvRecord.size(); i++) {
+ final String rawValue = csvRecord.get(i);
+
+ final String rawFieldName;
+ final DataType dataType;
+ if (i >= numFieldNames) {
+ if (!dropUnknownFields) {
+ values.put("unknown_field_index_" + i, rawValue);
+ }
+
+ continue;
+ } else {
+ final RecordField recordField = recordFields.get(i);
+ rawFieldName = recordField.getFieldName();
+ dataType = recordField.getDataType();
}
- continue;
- } else {
- final RecordField recordField = recordFields.get(i);
- rawFieldName = recordField.getFieldName();
- dataType = recordField.getDataType();
- }
+ final Object value;
+ if (coerceTypes) {
+ value = convert(rawValue, dataType, rawFieldName);
+ } else {
+ // The CSV Reader is going to return all fields as Strings, because CSV doesn't have any way to
+ // dictate a field type. As a result, we will use the schema that we have to attempt to convert
+ // the value into the desired type if it's a simple type.
+ value = convertSimpleIfPossible(rawValue, dataType, rawFieldName);
+ }
- final Object value;
- if (coerceTypes) {
- value = convert(rawValue, dataType, rawFieldName);
- } else {
- // The CSV Reader is going to return all fields as Strings, because CSV doesn't have any way to
- // dictate a field type. As a result, we will use the schema that we have to attempt to convert
- // the value into the desired type if it's a simple type.
- value = convertSimpleIfPossible(rawValue, dataType, rawFieldName);
+ values.put(rawFieldName, value);
}
- values.put(rawFieldName, value);
+ return new MapRecord(schema, values, coerceTypes, dropUnknownFields);
}
-
- return new MapRecord(schema, values, coerceTypes, dropUnknownFields);
+ } catch (Exception e) {
+ throw new MalformedRecordException("Error while getting next record. Root cause: " + Throwables.getRootCause(e), e);
}
return null;
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/ITApacheCSVRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/ITApacheCSVRecordReader.java
index 30c05c0..17649cd 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/ITApacheCSVRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/ITApacheCSVRecordReader.java
@@ -17,6 +17,7 @@
package org.apache.nifi.csv;
import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.QuoteMode;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.SimpleRecordSchema;
@@ -27,22 +28,30 @@ import org.apache.nifi.serialization.record.RecordSchema;
import org.junit.Test;
import org.mockito.Mockito;
+import com.google.common.base.Throwables;
+
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
+import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
public class ITApacheCSVRecordReader {
private final CSVFormat format = CSVFormat.DEFAULT.withFirstRecordAsHeader().withTrim().withQuote('"');
private List<RecordField> getDefaultFields() {
+ return createStringFields(new String[]{"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
+ }
+
+ private List<RecordField> createStringFields(String[] fieldNames) {
final List<RecordField> fields = new ArrayList<>();
- for (final String fieldName : new String[]{"id", "name", "balance", "address", "city", "state", "zipCode", "country"}) {
+ for (final String fieldName : fieldNames) {
fields.add(new RecordField(fieldName, RecordFieldType.STRING.getDataType()));
}
return fields;
@@ -71,4 +80,28 @@ public class ITApacheCSVRecordReader {
assertEquals(NUM_LINES, numRecords);
}
}
+
+ @Test
+ public void testExceptionThrownOnParseProblem() throws IOException, MalformedRecordException {
+ CSVFormat csvFormat = CSVFormat.DEFAULT.withFirstRecordAsHeader().withQuoteMode(QuoteMode.ALL).withTrim().withDelimiter(',');
+ final int NUM_LINES = 25;
+ StringBuilder sb = new StringBuilder("\"id\",\"name\",\"balance\"");
+ for (int i = 0; i < NUM_LINES; i++) {
+ sb.append(String.format("\"%s\",\"John Doe\",\"4750.89D\"\n", i));
+ }
+ // cause a parse problem
+ sb.append(String.format("\"%s\"dieParser,\"John Doe\",\"4750.89D\"\n", NUM_LINES ));
+ sb.append(String.format("\"%s\",\"John Doe\",\"4750.89D\"\n", NUM_LINES + 1));
+ final RecordSchema schema = new SimpleRecordSchema(createStringFields(new String[] {"id", "name", "balance"}));
+
+ try (final InputStream bais = new ByteArrayInputStream(sb.toString().getBytes());
+ final CSVRecordReader reader = new CSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, csvFormat, true, false,
+ RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8")) {
+
+ while (reader.nextRecord() != null) {}
+ } catch (Exception e) {
+ assertThat(e, instanceOf(MalformedRecordException.class));
+ assertThat(Throwables.getRootCause(e), instanceOf(IOException.class));
+ }
+ }
}