You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2019/03/14 00:34:31 UTC

[nifi] 02/02: NIFI-6082: Refactor the way to handle fields nullable

This is an automated email from the ASF dual-hosted git repository.

ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 4db5446c878a9be1d621429686f2835dc642d550
Author: Koji Kawamura <ij...@apache.org>
AuthorDate: Wed Mar 13 16:50:01 2019 +0900

    NIFI-6082: Refactor the way to handle fields nullable
    
    - Make enriched fields nullable at LookupRecord.
    - Removed unnecessary AvroConversionOptions and reader schema creation,
    because ResultSetRecordSet can generate NiFi Record Schema from RS
    directly. No Avro schema is needed to do that.
---
 .../serialization/record/ResultSetRecordSet.java   | 18 ++--------
 .../nifi/processors/standard/LookupRecord.java     | 11 ++++--
 .../nifi/processors/standard/TestLookupRecord.java | 40 ++++++++++++++++++++++
 .../lookup/db/DatabaseRecordLookupService.java     | 18 +---------
 4 files changed, 53 insertions(+), 34 deletions(-)

diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
index fc3d60f..ee47c63 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
@@ -55,21 +55,9 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
     private static final String FLOAT_CLASS_NAME = Float.class.getName();
 
     public ResultSetRecordSet(final ResultSet rs, final RecordSchema readerSchema) throws SQLException {
-        this(rs, readerSchema, false);
-    }
-
-    /**
-     * Constructs a ResultSetRecordSet with a given ResultSet and RecordSchema
-     *
-     * @param rs The underlying ResultSet for this RecordSet
-     * @param readerSchema The schema to which this RecordSet adheres
-     * @param allFieldsNullable Whether to override the database column's "nullable" metadata. If true then all fields in the RecordSet are nullable.
-     * @throws SQLException if an error occurs while creating the schema or reading the result set's metadata
-     */
-    public ResultSetRecordSet(final ResultSet rs, final RecordSchema readerSchema, boolean allFieldsNullable) throws SQLException {
         this.rs = rs;
         moreRows = rs.next();
-        this.schema = createSchema(rs, readerSchema, allFieldsNullable);
+        this.schema = createSchema(rs, readerSchema);
 
         rsColumnNames = new HashSet<>();
         final ResultSetMetaData metadata = rs.getMetaData();
@@ -152,7 +140,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
         return value;
     }
 
-    private static RecordSchema createSchema(final ResultSet rs, final RecordSchema readerSchema, boolean allFieldsNullable) throws SQLException {
+    private static RecordSchema createSchema(final ResultSet rs, final RecordSchema readerSchema) throws SQLException {
         final ResultSetMetaData metadata = rs.getMetaData();
         final int numCols = metadata.getColumnCount();
         final List<RecordField> fields = new ArrayList<>(numCols);
@@ -166,7 +154,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
 
             final int nullableFlag = metadata.isNullable(column);
             final boolean nullable;
-            if (nullableFlag == ResultSetMetaData.columnNoNulls && !allFieldsNullable) {
+            if (nullableFlag == ResultSetMetaData.columnNoNulls) {
                 nullable = false;
             } else {
                 nullable = true;
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
index 96a8d3e..23d1325 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
@@ -316,7 +316,7 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
             if (RESULT_RECORD_FIELDS.getValue().equals(resultContentsValue) && lookupValue instanceof Record) {
                 final Record lookupRecord = (Record) lookupValue;
 
-                // Use wants to add all fields of the resultant Record to the specified Record Path.
+                // User wants to add all fields of the resultant Record to the specified Record Path.
                 // If the destination Record Path returns to us a Record, then we will add all field values of
                 // the Lookup Record to the destination Record. However, if the destination Record Path returns
                 // something other than a Record, then we can't add the fields to it. We can only replace it,
@@ -332,7 +332,14 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
 
                             final Optional<RecordField> recordFieldOption = lookupRecord.getSchema().getField(fieldName);
                             if (recordFieldOption.isPresent()) {
-                                destinationRecord.setValue(recordFieldOption.get(), value);
+                                // Even if the looked up field is not nullable, if the lookup key didn't match with any record,
+                                // and matched/unmatched records are written to the same FlowFile routed to 'success' relationship,
+                                // then enriched fields should be nullable to support unmatched records whose enriched fields will be null.
+                                RecordField field = recordFieldOption.get();
+                                if (!routeToMatchedUnmatched && !field.isNullable()) {
+                                    field = new RecordField(field.getFieldName(), field.getDataType(), field.getDefaultValue(), field.getAliases(), true);
+                                }
+                                destinationRecord.setValue(field, value);
                             } else {
                                 destinationRecord.setValue(fieldName, value);
                             }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
index 3efd9d1..f8fb158 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
@@ -395,6 +395,46 @@ public class TestLookupRecord {
             || outputContents.equals("John Doe,48,MapRecord[{least=soccer, favorite=basketball}]\n"));
     }
 
+    @Test
+    public void testAddFieldsToExistingRecordRouteToSuccess() throws InitializationException {
+        final RecordLookup lookupService = new RecordLookup();
+        runner.addControllerService("lookup", lookupService);
+        runner.enableControllerService(lookupService);
+        runner.setProperty(LookupRecord.ROUTING_STRATEGY, LookupRecord.ROUTE_TO_SUCCESS);
+
+        // Even if the looked up record's original schema is not nullable, the result record's enriched fields should be nullable.
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("favorite", RecordFieldType.STRING.getDataType(), false));
+        fields.add(new RecordField("least", RecordFieldType.STRING.getDataType(), true));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+        final Record sports = new MapRecord(schema, new HashMap<>());
+
+        sports.setValue("favorite", "basketball");
+        sports.setValue("least", "soccer");
+
+        lookupService.addValue("John Doe", sports);
+
+        // Incoming Record doesn't have the fields to be enriched.
+        recordReader = new MockRecordParser();
+        recordReader.addSchemaField("name", RecordFieldType.STRING);
+        recordReader.addSchemaField("age", RecordFieldType.INT);
+
+        recordReader.addRecord("John Doe", 48);
+        recordReader.addRecord("Jane Doe", 47);
+
+        runner.addControllerService("reader", recordReader);
+        runner.enableControllerService(recordReader);
+
+        runner.setProperty("lookup", "/name");
+        runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/");
+        runner.setProperty(LookupRecord.RESULT_CONTENTS, LookupRecord.RESULT_RECORD_FIELDS);
+
+        runner.enqueue("");
+        runner.run();
+
+        final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_SUCCESS).get(0);
+        out.assertContentEquals("John Doe,48,soccer,basketball\nJane Doe,47\n");
+    }
 
     private static class MapLookup extends AbstractControllerService implements StringLookupService {
         private final Map<String, String> values = new HashMap<>();
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/DatabaseRecordLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/DatabaseRecordLookupService.java
index fdb1452..b176b33 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/DatabaseRecordLookupService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/DatabaseRecordLookupService.java
@@ -19,12 +19,10 @@ package org.apache.nifi.lookup.db;
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.Expiry;
-import org.apache.avro.Schema;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
-import org.apache.nifi.avro.AvroTypeUtil;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.ControllerServiceInitializationContext;
@@ -34,10 +32,8 @@ import org.apache.nifi.lookup.LookupFailureException;
 import org.apache.nifi.lookup.RecordLookupService;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.ResultSetRecordSet;
 import org.apache.nifi.util.Tuple;
-import org.apache.nifi.util.db.JdbcCommon;
 
 import java.io.IOException;
 import java.sql.Connection;
@@ -62,7 +58,6 @@ import java.util.stream.Stream;
 public class DatabaseRecordLookupService extends AbstractDatabaseLookupService implements RecordLookupService {
 
     private volatile Cache<Tuple<String, Object>, Record> cache;
-    private volatile JdbcCommon.AvroConversionOptions options;
 
     static final PropertyDescriptor LOOKUP_VALUE_COLUMNS = new PropertyDescriptor.Builder()
             .name("dbrecord-lookup-value-columns")
@@ -120,15 +115,6 @@ public class DatabaseRecordLookupService extends AbstractDatabaseLookupService i
                         .build();
             }
         }
-
-        options = JdbcCommon.AvroConversionOptions.builder()
-                .recordName("NiFi_DB_Record_Lookup")
-                // Ignore duplicates
-                .maxRows(1)
-                // Keep column names as field names
-                .convertNames(false)
-                .useLogicalTypes(true)
-                .build();
     }
 
     @Override
@@ -173,9 +159,7 @@ public class DatabaseRecordLookupService extends AbstractDatabaseLookupService i
 
                 st.setObject(1, key);
                 ResultSet resultSet = st.executeQuery();
-                final Schema avroSchema = JdbcCommon.createSchema(resultSet, options);
-                final RecordSchema recordAvroSchema = AvroTypeUtil.createSchema(avroSchema);
-                ResultSetRecordSet resultSetRecordSet = new ResultSetRecordSet(resultSet, recordAvroSchema, true);
+                ResultSetRecordSet resultSetRecordSet = new ResultSetRecordSet(resultSet, null);
                 foundRecord = resultSetRecordSet.next();
 
                 // Populate the cache if the record is present