You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2022/06/21 21:55:14 UTC
[nifi] branch main updated: NIFI-9983 Add output relation name attribute in QueryRecord processor (#6011)
This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new bc84532a8b NIFI-9983 Add output relation name attribute in QueryRecord processor (#6011)
bc84532a8b is described below
commit bc84532a8b27eae2e7f3ab284473c1cbc9035be6
Author: Juldrixx <31...@users.noreply.github.com>
AuthorDate: Tue Jun 21 23:55:07 2022 +0200
NIFI-9983 Add output relation name attribute in QueryRecord processor (#6011)
* NIFI-9983 Add output relation name attribute
---
.../nifi/processors/standard/QueryRecord.java | 16 +++++--
.../nifi/processors/standard/TestQueryRecord.java | 54 ++++++++++++++++++++++
2 files changed, 67 insertions(+), 3 deletions(-)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
index aec971aa52..f87e175eb9 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
@@ -116,9 +116,13 @@ import static org.apache.nifi.util.db.JdbcProperties.DEFAULT_SCALE;
+ "that is selected being routed to the relationship whose name is the property name")
@WritesAttributes({
@WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer"),
- @WritesAttribute(attribute = "record.count", description = "The number of records selected by the query")
+ @WritesAttribute(attribute = "record.count", description = "The number of records selected by the query"),
+ @WritesAttribute(attribute = QueryRecord.ROUTE_ATTRIBUTE_KEY, description = "The relation to which the FlowFile was routed")
})
public class QueryRecord extends AbstractProcessor {
+
+ public static final String ROUTE_ATTRIBUTE_KEY = "QueryRecord.Route";
+
static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
.name("record-reader")
.displayName("Record Reader")
@@ -258,7 +262,7 @@ public class QueryRecord extends AbstractProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
- final FlowFile original = session.get();
+ FlowFile original = session.get();
if (original == null) {
return;
}
@@ -284,6 +288,7 @@ public class QueryRecord extends AbstractProcessor {
writerSchema = recordSetWriterFactory.getSchema(originalAttributes, readerSchema);
} catch (final Exception e) {
getLogger().error("Failed to determine Record Schema from {}; routing to failure", new Object[] {original, e});
+ original = session.putAttribute(original, ROUTE_ATTRIBUTE_KEY, REL_FAILURE.getName());
session.transfer(original, REL_FAILURE);
return;
}
@@ -312,6 +317,7 @@ public class QueryRecord extends AbstractProcessor {
final QueryResult queryResult = query(session, original, readerSchema, sql, recordReaderFactory);
final AtomicReference<String> mimeTypeRef = new AtomicReference<>();
+ final FlowFile originalFlowFile = original;
try {
final ResultSet rs = queryResult.getResultSet();
transformed = session.write(transformed, new OutputStreamCallback() {
@@ -328,7 +334,7 @@ public class QueryRecord extends AbstractProcessor {
throw new ProcessException(e);
}
- try (final RecordSetWriter resultSetWriter = recordSetWriterFactory.createWriter(getLogger(), writeSchema, out, original)) {
+ try (final RecordSetWriter resultSetWriter = recordSetWriterFactory.createWriter(getLogger(), writeSchema, out, originalFlowFile)) {
writeResultRef.set(resultSetWriter.write(recordSet));
mimeTypeRef.set(resultSetWriter.getMimeType());
} catch (final Exception e) {
@@ -355,6 +361,7 @@ public class QueryRecord extends AbstractProcessor {
attributesToAdd.put(CoreAttributes.MIME_TYPE.key(), mimeTypeRef.get());
attributesToAdd.put("record.count", String.valueOf(result.getRecordCount()));
+ attributesToAdd.put(ROUTE_ATTRIBUTE_KEY, relationship.getName());
transformed = session.putAllAttributes(transformed, attributesToAdd);
transformedFlowFiles.put(transformed, relationship);
@@ -382,13 +389,16 @@ public class QueryRecord extends AbstractProcessor {
}
getLogger().info("Successfully queried {} in {} millis", new Object[] {original, elapsedMillis});
+ original = session.putAttribute(original, ROUTE_ATTRIBUTE_KEY, REL_ORIGINAL.getName());
session.transfer(original, REL_ORIGINAL);
} catch (final SQLException e) {
getLogger().error("Unable to query {} due to {}", new Object[] {original, e.getCause() == null ? e : e.getCause()});
+ original = session.putAttribute(original, ROUTE_ATTRIBUTE_KEY, REL_FAILURE.getName());
session.remove(createdFlowFiles);
session.transfer(original, REL_FAILURE);
} catch (final Exception e) {
getLogger().error("Unable to query {} due to {}", new Object[] {original, e});
+ original = session.putAttribute(original, ROUTE_ATTRIBUTE_KEY, REL_FAILURE.getName());
session.remove(createdFlowFiles);
session.transfer(original, REL_FAILURE);
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
index 765fececee..341487b89a 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
@@ -128,6 +128,11 @@ public class TestQueryRecord {
runner.assertTransferCount(REL_NAME, 1);
+ final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(REL_NAME);
+ flowFiles.get(0).assertAttributeEquals(QueryRecord.ROUTE_ATTRIBUTE_KEY, REL_NAME);
+ final List<MockFlowFile> flowFilesOriginal = runner.getFlowFilesForRelationship(QueryRecord.REL_ORIGINAL);
+ flowFilesOriginal.get(0).assertAttributeEquals(QueryRecord.ROUTE_ATTRIBUTE_KEY, QueryRecord.REL_ORIGINAL.getName());
+
final List<Record> written = writer.getRecordsWritten();
assertEquals(1, written.size());
@@ -182,6 +187,9 @@ public class TestQueryRecord {
runner.assertTransferCount(REL_NAME, 1);
+ final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(REL_NAME);
+ flowFiles.get(0).assertAttributeEquals(QueryRecord.ROUTE_ATTRIBUTE_KEY, REL_NAME);
+
final List<Record> written = writer.getRecordsWritten();
assertEquals(50, written.size());
@@ -222,6 +230,9 @@ public class TestQueryRecord {
runner.assertTransferCount(REL_NAME, 1);
+ final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(REL_NAME);
+ flowFiles.get(0).assertAttributeEquals(QueryRecord.ROUTE_ATTRIBUTE_KEY, REL_NAME);
+
final List<Record> written = writer.getRecordsWritten();
assertEquals(1, written.size());
@@ -257,6 +268,9 @@ public class TestQueryRecord {
runner.assertTransferCount(REL_NAME, 1);
+ final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(REL_NAME);
+ flowFiles.get(0).assertAttributeEquals(QueryRecord.ROUTE_ATTRIBUTE_KEY, REL_NAME);
+
final List<Record> written = writer.getRecordsWritten();
assertEquals(1, written.size());
@@ -295,6 +309,9 @@ public class TestQueryRecord {
runner.assertTransferCount(REL_NAME, 1);
+ final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(REL_NAME);
+ flowFiles.get(0).assertAttributeEquals(QueryRecord.ROUTE_ATTRIBUTE_KEY, REL_NAME);
+
final List<Record> written = writer.getRecordsWritten();
assertEquals(1, written.size());
@@ -335,6 +352,9 @@ public class TestQueryRecord {
runner.assertTransferCount(REL_NAME, 1);
+ final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(REL_NAME);
+ flowFiles.get(0).assertAttributeEquals(QueryRecord.ROUTE_ATTRIBUTE_KEY, REL_NAME);
+
final List<Record> written = writer.getRecordsWritten();
assertEquals(1, written.size());
@@ -378,6 +398,9 @@ public class TestQueryRecord {
runner.assertTransferCount(REL_NAME, 1);
+ final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(REL_NAME);
+ flowFiles.get(0).assertAttributeEquals(QueryRecord.ROUTE_ATTRIBUTE_KEY, REL_NAME);
+
final List<Record> written = writer.getRecordsWritten();
assertEquals(1, written.size());
@@ -417,6 +440,9 @@ public class TestQueryRecord {
runner.assertTransferCount(REL_NAME, 1);
+ final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(REL_NAME);
+ flowFiles.get(0).assertAttributeEquals(QueryRecord.ROUTE_ATTRIBUTE_KEY, REL_NAME);
+
final List<Record> written = writer.getRecordsWritten();
assertEquals(2, written.size());
@@ -458,6 +484,9 @@ public class TestQueryRecord {
runner.assertTransferCount(REL_NAME, 1);
+ final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(REL_NAME);
+ flowFiles.get(0).assertAttributeEquals(QueryRecord.ROUTE_ATTRIBUTE_KEY, REL_NAME);
+
final List<Record> written = writer.getRecordsWritten();
assertEquals(3, written.size());
@@ -518,6 +547,9 @@ public class TestQueryRecord {
runner.assertTransferCount(REL_NAME, 1);
+ final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(REL_NAME);
+ flowFiles.get(0).assertAttributeEquals(QueryRecord.ROUTE_ATTRIBUTE_KEY, REL_NAME);
+
final List<Record> written = writer.getRecordsWritten();
assertEquals(1, written.size());
@@ -554,6 +586,9 @@ public class TestQueryRecord {
runner.assertTransferCount(REL_NAME, 1);
+ final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(REL_NAME);
+ flowFiles.get(0).assertAttributeEquals(QueryRecord.ROUTE_ATTRIBUTE_KEY, REL_NAME);
+
final List<Record> written = writer.getRecordsWritten();
assertEquals(1, written.size());
@@ -590,6 +625,9 @@ public class TestQueryRecord {
runner.assertTransferCount(REL_NAME, 1);
+ final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(REL_NAME);
+ flowFiles.get(0).assertAttributeEquals(QueryRecord.ROUTE_ATTRIBUTE_KEY, REL_NAME);
+
final List<Record> written = writer.getRecordsWritten();
assertEquals(1, written.size());
@@ -627,6 +665,9 @@ public class TestQueryRecord {
runner.assertTransferCount(REL_NAME, 1);
+ final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(REL_NAME);
+ flowFiles.get(0).assertAttributeEquals(QueryRecord.ROUTE_ATTRIBUTE_KEY, REL_NAME);
+
final List<Record> written = writer.getRecordsWritten();
assertEquals(1, written.size());
@@ -871,6 +912,7 @@ public class TestQueryRecord {
runner.assertTransferCount(REL_NAME, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(REL_NAME).get(0);
+ out.assertAttributeEquals(QueryRecord.ROUTE_ATTRIBUTE_KEY, REL_NAME);
System.out.println(new String(out.toByteArray()));
out.assertContentEquals("\"name\",\"points\"\n\"Tom\",\"49\"\n");
}
@@ -906,6 +948,7 @@ public class TestQueryRecord {
runner.assertTransferCount(REL_NAME, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(REL_NAME).get(0);
+ out.assertAttributeEquals(QueryRecord.ROUTE_ATTRIBUTE_KEY, REL_NAME);
System.out.println(new String(out.toByteArray()));
out.assertContentEquals("\"name\",\"points\"\n\"Tom\",\"49\"\n\"Alice\",\n,\"36\"\n");
}
@@ -939,6 +982,7 @@ public class TestQueryRecord {
runner.assertTransferCount(REL_NAME, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(REL_NAME).get(0);
+ out.assertAttributeEquals(QueryRecord.ROUTE_ATTRIBUTE_KEY, REL_NAME);
System.out.println(new String(out.toByteArray()));
out.assertContentEquals("\"name\",\"points\"\n\"Tom\",\"49\"\n");
}
@@ -967,6 +1011,7 @@ public class TestQueryRecord {
runner.run();
runner.assertTransferCount(REL_NAME, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(REL_NAME).get(0);
+ out.assertAttributeEquals(QueryRecord.ROUTE_ATTRIBUTE_KEY, REL_NAME);
System.out.println(new String(out.toByteArray()));
out.assertContentEquals("\"name\",\"age\"\n");
}
@@ -1002,6 +1047,7 @@ public class TestQueryRecord {
runner.assertTransferCount(REL_NAME, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(REL_NAME).get(0);
+ out.assertAttributeEquals(QueryRecord.ROUTE_ATTRIBUTE_KEY, REL_NAME);
out.assertContentEquals("\"NAME\",\"POINTS\"\n\"100\",\"90.75\"\n");
}
@@ -1035,6 +1081,8 @@ public class TestQueryRecord {
runner.run();
runner.assertAllFlowFilesTransferred(QueryRecord.REL_FAILURE, 1);
+ final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(QueryRecord.REL_FAILURE);
+ flowFiles.get(0).assertAttributeEquals(QueryRecord.ROUTE_ATTRIBUTE_KEY, QueryRecord.REL_FAILURE.getName());
}
@Test
@@ -1063,6 +1111,7 @@ public class TestQueryRecord {
runner.assertTransferCount(REL_NAME, 1);
final MockFlowFile flowFileOut = runner.getFlowFilesForRelationship(REL_NAME).get(0);
+ flowFileOut.assertAttributeEquals(QueryRecord.ROUTE_ATTRIBUTE_KEY, REL_NAME);
flowFileOut.assertContentEquals("\"name\",\"points\"\n\"Tom\",\"100\"\n\"Jerry\",\"2\"\n");
}
@@ -1095,9 +1144,11 @@ public class TestQueryRecord {
runner.assertTransferCount(REL_NAME, 1);
runner.assertTransferCount("count", 1);
final MockFlowFile flowFileOut = runner.getFlowFilesForRelationship(REL_NAME).get(0);
+ flowFileOut.assertAttributeEquals(QueryRecord.ROUTE_ATTRIBUTE_KEY, REL_NAME);
flowFileOut.assertContentEquals("1\n\n\n3\n");
final MockFlowFile countFlowFile = runner.getFlowFilesForRelationship("count").get(0);
+ countFlowFile.assertAttributeEquals(QueryRecord.ROUTE_ATTRIBUTE_KEY, "count");
countFlowFile.assertContentEquals("2\n");
}
@@ -1132,6 +1183,8 @@ public class TestQueryRecord {
runner.run();
runner.assertTransferCount(REL_NAME, 1);
+ final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(REL_NAME);
+ flowFiles.get(0).assertAttributeEquals(QueryRecord.ROUTE_ATTRIBUTE_KEY, REL_NAME);
}
@Test
@@ -1156,6 +1209,7 @@ public class TestQueryRecord {
runner.assertTransferCount(REL_NAME, 1);
final MockFlowFile flowFileOut = runner.getFlowFilesForRelationship(REL_NAME).get(0);
+ flowFileOut.assertAttributeEquals(QueryRecord.ROUTE_ATTRIBUTE_KEY, REL_NAME);
flowFileOut.assertContentEquals("[]");
}