You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2019/03/14 00:34:31 UTC
[nifi] 02/02: NIFI-6082: Refactor the way to handle fields nullable
This is an automated email from the ASF dual-hosted git repository.
ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 4db5446c878a9be1d621429686f2835dc642d550
Author: Koji Kawamura <ij...@apache.org>
AuthorDate: Wed Mar 13 16:50:01 2019 +0900
NIFI-6082: Refactor the way to handle fields nullable
- Make enriched fields nullable at LookupRecord.
- Removed unnecessary AvroConversionOptions and reader schema creation,
because ResultSetRecordSet can generate NiFi Record Schema from RS
directly. No Avro schema is needed to do that.
---
.../serialization/record/ResultSetRecordSet.java | 18 ++--------
.../nifi/processors/standard/LookupRecord.java | 11 ++++--
.../nifi/processors/standard/TestLookupRecord.java | 40 ++++++++++++++++++++++
.../lookup/db/DatabaseRecordLookupService.java | 18 +---------
4 files changed, 53 insertions(+), 34 deletions(-)
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
index fc3d60f..ee47c63 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
@@ -55,21 +55,9 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
private static final String FLOAT_CLASS_NAME = Float.class.getName();
public ResultSetRecordSet(final ResultSet rs, final RecordSchema readerSchema) throws SQLException {
- this(rs, readerSchema, false);
- }
-
- /**
- * Constructs a ResultSetRecordSet with a given ResultSet and RecordSchema
- *
- * @param rs The underlying ResultSet for this RecordSet
- * @param readerSchema The schema to which this RecordSet adheres
- * @param allFieldsNullable Whether to override the database column's "nullable" metadata. If true then all fields in the RecordSet are nullable.
- * @throws SQLException if an error occurs while creating the schema or reading the result set's metadata
- */
- public ResultSetRecordSet(final ResultSet rs, final RecordSchema readerSchema, boolean allFieldsNullable) throws SQLException {
this.rs = rs;
moreRows = rs.next();
- this.schema = createSchema(rs, readerSchema, allFieldsNullable);
+ this.schema = createSchema(rs, readerSchema);
rsColumnNames = new HashSet<>();
final ResultSetMetaData metadata = rs.getMetaData();
@@ -152,7 +140,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
return value;
}
- private static RecordSchema createSchema(final ResultSet rs, final RecordSchema readerSchema, boolean allFieldsNullable) throws SQLException {
+ private static RecordSchema createSchema(final ResultSet rs, final RecordSchema readerSchema) throws SQLException {
final ResultSetMetaData metadata = rs.getMetaData();
final int numCols = metadata.getColumnCount();
final List<RecordField> fields = new ArrayList<>(numCols);
@@ -166,7 +154,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
final int nullableFlag = metadata.isNullable(column);
final boolean nullable;
- if (nullableFlag == ResultSetMetaData.columnNoNulls && !allFieldsNullable) {
+ if (nullableFlag == ResultSetMetaData.columnNoNulls) {
nullable = false;
} else {
nullable = true;
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
index 96a8d3e..23d1325 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
@@ -316,7 +316,7 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
if (RESULT_RECORD_FIELDS.getValue().equals(resultContentsValue) && lookupValue instanceof Record) {
final Record lookupRecord = (Record) lookupValue;
- // Use wants to add all fields of the resultant Record to the specified Record Path.
+ // User wants to add all fields of the resultant Record to the specified Record Path.
// If the destination Record Path returns to us a Record, then we will add all field values of
// the Lookup Record to the destination Record. However, if the destination Record Path returns
// something other than a Record, then we can't add the fields to it. We can only replace it,
@@ -332,7 +332,14 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
final Optional<RecordField> recordFieldOption = lookupRecord.getSchema().getField(fieldName);
if (recordFieldOption.isPresent()) {
- destinationRecord.setValue(recordFieldOption.get(), value);
+ // Even if the looked up field is not nullable, if the lookup key didn't match with any record,
+ // and matched/unmatched records are written to the same FlowFile routed to 'success' relationship,
+ // then enriched fields should be nullable to support unmatched records whose enriched fields will be null.
+ RecordField field = recordFieldOption.get();
+ if (!routeToMatchedUnmatched && !field.isNullable()) {
+ field = new RecordField(field.getFieldName(), field.getDataType(), field.getDefaultValue(), field.getAliases(), true);
+ }
+ destinationRecord.setValue(field, value);
} else {
destinationRecord.setValue(fieldName, value);
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
index 3efd9d1..f8fb158 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
@@ -395,6 +395,46 @@ public class TestLookupRecord {
|| outputContents.equals("John Doe,48,MapRecord[{least=soccer, favorite=basketball}]\n"));
}
+ @Test
+ public void testAddFieldsToExistingRecordRouteToSuccess() throws InitializationException {
+ final RecordLookup lookupService = new RecordLookup();
+ runner.addControllerService("lookup", lookupService);
+ runner.enableControllerService(lookupService);
+ runner.setProperty(LookupRecord.ROUTING_STRATEGY, LookupRecord.ROUTE_TO_SUCCESS);
+
+ // Even if the looked up record's original schema is not nullable, the result record's enriched fields should be nullable.
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("favorite", RecordFieldType.STRING.getDataType(), false));
+ fields.add(new RecordField("least", RecordFieldType.STRING.getDataType(), true));
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+ final Record sports = new MapRecord(schema, new HashMap<>());
+
+ sports.setValue("favorite", "basketball");
+ sports.setValue("least", "soccer");
+
+ lookupService.addValue("John Doe", sports);
+
+ // Incoming Record doesn't have the fields to be enriched.
+ recordReader = new MockRecordParser();
+ recordReader.addSchemaField("name", RecordFieldType.STRING);
+ recordReader.addSchemaField("age", RecordFieldType.INT);
+
+ recordReader.addRecord("John Doe", 48);
+ recordReader.addRecord("Jane Doe", 47);
+
+ runner.addControllerService("reader", recordReader);
+ runner.enableControllerService(recordReader);
+
+ runner.setProperty("lookup", "/name");
+ runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/");
+ runner.setProperty(LookupRecord.RESULT_CONTENTS, LookupRecord.RESULT_RECORD_FIELDS);
+
+ runner.enqueue("");
+ runner.run();
+
+ final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_SUCCESS).get(0);
+ out.assertContentEquals("John Doe,48,soccer,basketball\nJane Doe,47\n");
+ }
private static class MapLookup extends AbstractControllerService implements StringLookupService {
private final Map<String, String> values = new HashMap<>();
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/DatabaseRecordLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/DatabaseRecordLookupService.java
index fdb1452..b176b33 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/DatabaseRecordLookupService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/DatabaseRecordLookupService.java
@@ -19,12 +19,10 @@ package org.apache.nifi.lookup.db;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Expiry;
-import org.apache.avro.Schema;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
-import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
@@ -34,10 +32,8 @@ import org.apache.nifi.lookup.LookupFailureException;
import org.apache.nifi.lookup.RecordLookupService;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.ResultSetRecordSet;
import org.apache.nifi.util.Tuple;
-import org.apache.nifi.util.db.JdbcCommon;
import java.io.IOException;
import java.sql.Connection;
@@ -62,7 +58,6 @@ import java.util.stream.Stream;
public class DatabaseRecordLookupService extends AbstractDatabaseLookupService implements RecordLookupService {
private volatile Cache<Tuple<String, Object>, Record> cache;
- private volatile JdbcCommon.AvroConversionOptions options;
static final PropertyDescriptor LOOKUP_VALUE_COLUMNS = new PropertyDescriptor.Builder()
.name("dbrecord-lookup-value-columns")
@@ -120,15 +115,6 @@ public class DatabaseRecordLookupService extends AbstractDatabaseLookupService i
.build();
}
}
-
- options = JdbcCommon.AvroConversionOptions.builder()
- .recordName("NiFi_DB_Record_Lookup")
- // Ignore duplicates
- .maxRows(1)
- // Keep column names as field names
- .convertNames(false)
- .useLogicalTypes(true)
- .build();
}
@Override
@@ -173,9 +159,7 @@ public class DatabaseRecordLookupService extends AbstractDatabaseLookupService i
st.setObject(1, key);
ResultSet resultSet = st.executeQuery();
- final Schema avroSchema = JdbcCommon.createSchema(resultSet, options);
- final RecordSchema recordAvroSchema = AvroTypeUtil.createSchema(avroSchema);
- ResultSetRecordSet resultSetRecordSet = new ResultSetRecordSet(resultSet, recordAvroSchema, true);
+ ResultSetRecordSet resultSetRecordSet = new ResultSetRecordSet(resultSet, null);
foundRecord = resultSetRecordSet.next();
// Populate the cache if the record is present