You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2022/06/21 21:55:14 UTC

[nifi] branch main updated: NIFI-9983 Add output relation name attribute in QueryRecord processor (#6011)

This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new bc84532a8b NIFI-9983 Add output relation name attribute in QueryRecord processor (#6011)
bc84532a8b is described below

commit bc84532a8b27eae2e7f3ab284473c1cbc9035be6
Author: Juldrixx <31...@users.noreply.github.com>
AuthorDate: Tue Jun 21 23:55:07 2022 +0200

    NIFI-9983 Add output relation name attribute in QueryRecord processor (#6011)
    
    * NIFI-9983 Add output relation name attribute
---
 .../nifi/processors/standard/QueryRecord.java      | 16 +++++--
 .../nifi/processors/standard/TestQueryRecord.java  | 54 ++++++++++++++++++++++
 2 files changed, 67 insertions(+), 3 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
index aec971aa52..f87e175eb9 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
@@ -116,9 +116,13 @@ import static org.apache.nifi.util.db.JdbcProperties.DEFAULT_SCALE;
                          + "that is selected being routed to the relationship whose name is the property name")
 @WritesAttributes({
     @WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer"),
-    @WritesAttribute(attribute = "record.count", description = "The number of records selected by the query")
+    @WritesAttribute(attribute = "record.count", description = "The number of records selected by the query"),
+    @WritesAttribute(attribute = QueryRecord.ROUTE_ATTRIBUTE_KEY, description = "The relation to which the FlowFile was routed")
 })
 public class QueryRecord extends AbstractProcessor {
+
+    public static final String ROUTE_ATTRIBUTE_KEY = "QueryRecord.Route";
+
     static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
         .name("record-reader")
         .displayName("Record Reader")
@@ -258,7 +262,7 @@ public class QueryRecord extends AbstractProcessor {
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) {
-        final FlowFile original = session.get();
+        FlowFile original = session.get();
         if (original == null) {
             return;
         }
@@ -284,6 +288,7 @@ public class QueryRecord extends AbstractProcessor {
             writerSchema = recordSetWriterFactory.getSchema(originalAttributes, readerSchema);
         } catch (final Exception e) {
             getLogger().error("Failed to determine Record Schema from {}; routing to failure", new Object[] {original, e});
+            original = session.putAttribute(original, ROUTE_ATTRIBUTE_KEY, REL_FAILURE.getName());
             session.transfer(original, REL_FAILURE);
             return;
         }
@@ -312,6 +317,7 @@ public class QueryRecord extends AbstractProcessor {
                     final QueryResult queryResult = query(session, original, readerSchema, sql, recordReaderFactory);
 
                     final AtomicReference<String> mimeTypeRef = new AtomicReference<>();
+                    final FlowFile originalFlowFile = original;
                     try {
                         final ResultSet rs = queryResult.getResultSet();
                         transformed = session.write(transformed, new OutputStreamCallback() {
@@ -328,7 +334,7 @@ public class QueryRecord extends AbstractProcessor {
                                     throw new ProcessException(e);
                                 }
 
-                                try (final RecordSetWriter resultSetWriter = recordSetWriterFactory.createWriter(getLogger(), writeSchema, out, original)) {
+                                try (final RecordSetWriter resultSetWriter = recordSetWriterFactory.createWriter(getLogger(), writeSchema, out, originalFlowFile)) {
                                     writeResultRef.set(resultSetWriter.write(recordSet));
                                     mimeTypeRef.set(resultSetWriter.getMimeType());
                                 } catch (final Exception e) {
@@ -355,6 +361,7 @@ public class QueryRecord extends AbstractProcessor {
 
                         attributesToAdd.put(CoreAttributes.MIME_TYPE.key(), mimeTypeRef.get());
                         attributesToAdd.put("record.count", String.valueOf(result.getRecordCount()));
+                        attributesToAdd.put(ROUTE_ATTRIBUTE_KEY, relationship.getName());
                         transformed = session.putAllAttributes(transformed, attributesToAdd);
                         transformedFlowFiles.put(transformed, relationship);
 
@@ -382,13 +389,16 @@ public class QueryRecord extends AbstractProcessor {
             }
 
             getLogger().info("Successfully queried {} in {} millis", new Object[] {original, elapsedMillis});
+            original = session.putAttribute(original, ROUTE_ATTRIBUTE_KEY, REL_ORIGINAL.getName());
             session.transfer(original, REL_ORIGINAL);
         } catch (final SQLException e) {
             getLogger().error("Unable to query {} due to {}", new Object[] {original, e.getCause() == null ? e : e.getCause()});
+            original = session.putAttribute(original, ROUTE_ATTRIBUTE_KEY, REL_FAILURE.getName());
             session.remove(createdFlowFiles);
             session.transfer(original, REL_FAILURE);
         } catch (final Exception e) {
             getLogger().error("Unable to query {} due to {}", new Object[] {original, e});
+            original = session.putAttribute(original, ROUTE_ATTRIBUTE_KEY, REL_FAILURE.getName());
             session.remove(createdFlowFiles);
             session.transfer(original, REL_FAILURE);
         }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
index 765fececee..341487b89a 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
@@ -128,6 +128,11 @@ public class TestQueryRecord {
 
         runner.assertTransferCount(REL_NAME, 1);
 
+        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(REL_NAME);
+        flowFiles.get(0).assertAttributeEquals(QueryRecord.ROUTE_ATTRIBUTE_KEY, REL_NAME);
+        final List<MockFlowFile> flowFilesOriginal = runner.getFlowFilesForRelationship(QueryRecord.REL_ORIGINAL);
+        flowFilesOriginal.get(0).assertAttributeEquals(QueryRecord.ROUTE_ATTRIBUTE_KEY, QueryRecord.REL_ORIGINAL.getName());
+
         final List<Record> written = writer.getRecordsWritten();
         assertEquals(1, written.size());
 
@@ -182,6 +187,9 @@ public class TestQueryRecord {
 
         runner.assertTransferCount(REL_NAME, 1);
 
+        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(REL_NAME);
+        flowFiles.get(0).assertAttributeEquals(QueryRecord.ROUTE_ATTRIBUTE_KEY, REL_NAME);
+
         final List<Record> written = writer.getRecordsWritten();
         assertEquals(50, written.size());
 
@@ -222,6 +230,9 @@ public class TestQueryRecord {
 
         runner.assertTransferCount(REL_NAME, 1);
 
+        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(REL_NAME);
+        flowFiles.get(0).assertAttributeEquals(QueryRecord.ROUTE_ATTRIBUTE_KEY, REL_NAME);
+
         final List<Record> written = writer.getRecordsWritten();
         assertEquals(1, written.size());
 
@@ -257,6 +268,9 @@ public class TestQueryRecord {
 
         runner.assertTransferCount(REL_NAME, 1);
 
+        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(REL_NAME);
+        flowFiles.get(0).assertAttributeEquals(QueryRecord.ROUTE_ATTRIBUTE_KEY, REL_NAME);
+
         final List<Record> written = writer.getRecordsWritten();
         assertEquals(1, written.size());
 
@@ -295,6 +309,9 @@ public class TestQueryRecord {
 
         runner.assertTransferCount(REL_NAME, 1);
 
+        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(REL_NAME);
+        flowFiles.get(0).assertAttributeEquals(QueryRecord.ROUTE_ATTRIBUTE_KEY, REL_NAME);
+
         final List<Record> written = writer.getRecordsWritten();
         assertEquals(1, written.size());
 
@@ -335,6 +352,9 @@ public class TestQueryRecord {
 
         runner.assertTransferCount(REL_NAME, 1);
 
+        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(REL_NAME);
+        flowFiles.get(0).assertAttributeEquals(QueryRecord.ROUTE_ATTRIBUTE_KEY, REL_NAME);
+
         final List<Record> written = writer.getRecordsWritten();
         assertEquals(1, written.size());
 
@@ -378,6 +398,9 @@ public class TestQueryRecord {
 
         runner.assertTransferCount(REL_NAME, 1);
 
+        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(REL_NAME);
+        flowFiles.get(0).assertAttributeEquals(QueryRecord.ROUTE_ATTRIBUTE_KEY, REL_NAME);
+
         final List<Record> written = writer.getRecordsWritten();
         assertEquals(1, written.size());
 
@@ -417,6 +440,9 @@ public class TestQueryRecord {
 
         runner.assertTransferCount(REL_NAME, 1);
 
+        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(REL_NAME);
+        flowFiles.get(0).assertAttributeEquals(QueryRecord.ROUTE_ATTRIBUTE_KEY, REL_NAME);
+
         final List<Record> written = writer.getRecordsWritten();
         assertEquals(2, written.size());
 
@@ -458,6 +484,9 @@ public class TestQueryRecord {
 
         runner.assertTransferCount(REL_NAME, 1);
 
+        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(REL_NAME);
+        flowFiles.get(0).assertAttributeEquals(QueryRecord.ROUTE_ATTRIBUTE_KEY, REL_NAME);
+
         final List<Record> written = writer.getRecordsWritten();
         assertEquals(3, written.size());
 
@@ -518,6 +547,9 @@ public class TestQueryRecord {
 
         runner.assertTransferCount(REL_NAME, 1);
 
+        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(REL_NAME);
+        flowFiles.get(0).assertAttributeEquals(QueryRecord.ROUTE_ATTRIBUTE_KEY, REL_NAME);
+
         final List<Record> written = writer.getRecordsWritten();
         assertEquals(1, written.size());
 
@@ -554,6 +586,9 @@ public class TestQueryRecord {
 
         runner.assertTransferCount(REL_NAME, 1);
 
+        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(REL_NAME);
+        flowFiles.get(0).assertAttributeEquals(QueryRecord.ROUTE_ATTRIBUTE_KEY, REL_NAME);
+
         final List<Record> written = writer.getRecordsWritten();
         assertEquals(1, written.size());
 
@@ -590,6 +625,9 @@ public class TestQueryRecord {
 
         runner.assertTransferCount(REL_NAME, 1);
 
+        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(REL_NAME);
+        flowFiles.get(0).assertAttributeEquals(QueryRecord.ROUTE_ATTRIBUTE_KEY, REL_NAME);
+
         final List<Record> written = writer.getRecordsWritten();
         assertEquals(1, written.size());
 
@@ -627,6 +665,9 @@ public class TestQueryRecord {
 
         runner.assertTransferCount(REL_NAME, 1);
 
+        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(REL_NAME);
+        flowFiles.get(0).assertAttributeEquals(QueryRecord.ROUTE_ATTRIBUTE_KEY, REL_NAME);
+
         final List<Record> written = writer.getRecordsWritten();
         assertEquals(1, written.size());
 
@@ -871,6 +912,7 @@ public class TestQueryRecord {
 
         runner.assertTransferCount(REL_NAME, 1);
         final MockFlowFile out = runner.getFlowFilesForRelationship(REL_NAME).get(0);
+        out.assertAttributeEquals(QueryRecord.ROUTE_ATTRIBUTE_KEY, REL_NAME);
         System.out.println(new String(out.toByteArray()));
         out.assertContentEquals("\"name\",\"points\"\n\"Tom\",\"49\"\n");
     }
@@ -906,6 +948,7 @@ public class TestQueryRecord {
 
         runner.assertTransferCount(REL_NAME, 1);
         final MockFlowFile out = runner.getFlowFilesForRelationship(REL_NAME).get(0);
+        out.assertAttributeEquals(QueryRecord.ROUTE_ATTRIBUTE_KEY, REL_NAME);
         System.out.println(new String(out.toByteArray()));
         out.assertContentEquals("\"name\",\"points\"\n\"Tom\",\"49\"\n\"Alice\",\n,\"36\"\n");
     }
@@ -939,6 +982,7 @@ public class TestQueryRecord {
 
         runner.assertTransferCount(REL_NAME, 1);
         final MockFlowFile out = runner.getFlowFilesForRelationship(REL_NAME).get(0);
+        out.assertAttributeEquals(QueryRecord.ROUTE_ATTRIBUTE_KEY, REL_NAME);
         System.out.println(new String(out.toByteArray()));
         out.assertContentEquals("\"name\",\"points\"\n\"Tom\",\"49\"\n");
     }
@@ -967,6 +1011,7 @@ public class TestQueryRecord {
         runner.run();
         runner.assertTransferCount(REL_NAME, 1);
         final MockFlowFile out = runner.getFlowFilesForRelationship(REL_NAME).get(0);
+        out.assertAttributeEquals(QueryRecord.ROUTE_ATTRIBUTE_KEY, REL_NAME);
         System.out.println(new String(out.toByteArray()));
         out.assertContentEquals("\"name\",\"age\"\n");
     }
@@ -1002,6 +1047,7 @@ public class TestQueryRecord {
 
         runner.assertTransferCount(REL_NAME, 1);
         final MockFlowFile out = runner.getFlowFilesForRelationship(REL_NAME).get(0);
+        out.assertAttributeEquals(QueryRecord.ROUTE_ATTRIBUTE_KEY, REL_NAME);
 
         out.assertContentEquals("\"NAME\",\"POINTS\"\n\"100\",\"90.75\"\n");
     }
@@ -1035,6 +1081,8 @@ public class TestQueryRecord {
         runner.run();
 
         runner.assertAllFlowFilesTransferred(QueryRecord.REL_FAILURE, 1);
+        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(QueryRecord.REL_FAILURE);
+        flowFiles.get(0).assertAttributeEquals(QueryRecord.ROUTE_ATTRIBUTE_KEY, QueryRecord.REL_FAILURE.getName());
     }
 
     @Test
@@ -1063,6 +1111,7 @@ public class TestQueryRecord {
 
         runner.assertTransferCount(REL_NAME, 1);
         final MockFlowFile flowFileOut = runner.getFlowFilesForRelationship(REL_NAME).get(0);
+        flowFileOut.assertAttributeEquals(QueryRecord.ROUTE_ATTRIBUTE_KEY, REL_NAME);
         flowFileOut.assertContentEquals("\"name\",\"points\"\n\"Tom\",\"100\"\n\"Jerry\",\"2\"\n");
     }
 
@@ -1095,9 +1144,11 @@ public class TestQueryRecord {
         runner.assertTransferCount(REL_NAME, 1);
         runner.assertTransferCount("count", 1);
         final MockFlowFile flowFileOut = runner.getFlowFilesForRelationship(REL_NAME).get(0);
+        flowFileOut.assertAttributeEquals(QueryRecord.ROUTE_ATTRIBUTE_KEY, REL_NAME);
         flowFileOut.assertContentEquals("1\n\n\n3\n");
 
         final MockFlowFile countFlowFile = runner.getFlowFilesForRelationship("count").get(0);
+        countFlowFile.assertAttributeEquals(QueryRecord.ROUTE_ATTRIBUTE_KEY, "count");
         countFlowFile.assertContentEquals("2\n");
     }
 
@@ -1132,6 +1183,8 @@ public class TestQueryRecord {
         runner.run();
 
         runner.assertTransferCount(REL_NAME, 1);
+        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(REL_NAME);
+        flowFiles.get(0).assertAttributeEquals(QueryRecord.ROUTE_ATTRIBUTE_KEY, REL_NAME);
     }
 
     @Test
@@ -1156,6 +1209,7 @@ public class TestQueryRecord {
 
         runner.assertTransferCount(REL_NAME, 1);
         final MockFlowFile flowFileOut = runner.getFlowFilesForRelationship(REL_NAME).get(0);
+        flowFileOut.assertAttributeEquals(QueryRecord.ROUTE_ATTRIBUTE_KEY, REL_NAME);
         flowFileOut.assertContentEquals("[]");
     }