You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2018/11/08 08:50:59 UTC
nifi git commit: NIFI-5802: Add QueryRecord nullable field support
Repository: nifi
Updated Branches:
refs/heads/master a9045d54a -> 78a1cb7c5
NIFI-5802: Add QueryRecord nullable field support
Signed-off-by: Pierre Villard <pi...@gmail.com>
This closes #3158.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/78a1cb7c
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/78a1cb7c
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/78a1cb7c
Branch: refs/heads/master
Commit: 78a1cb7c5eb84da5d25d50d0df69970b6d732f0a
Parents: a9045d5
Author: Koji Kawamura <ij...@apache.org>
Authored: Thu Nov 8 12:10:36 2018 +0900
Committer: Pierre Villard <pi...@gmail.com>
Committed: Thu Nov 8 09:50:45 2018 +0100
----------------------------------------------------------------------
.../apache/nifi/queryrecord/FlowFileTable.java | 3 +-
.../processors/standard/TestQueryRecord.java | 43 ++++++++++++++++++--
2 files changed, 41 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/78a1cb7c/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java
index c40e364..9e10377 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java
@@ -189,7 +189,8 @@ public class FlowFileTable<S, E> extends AbstractTable implements QueryableTable
final JavaTypeFactory javaTypeFactory = (JavaTypeFactory) typeFactory;
for (final RecordField field : schema.getFields()) {
names.add(field.getFieldName());
- types.add(getRelDataType(field.getDataType(), javaTypeFactory));
+ final RelDataType relDataType = getRelDataType(field.getDataType(), javaTypeFactory);
+ types.add(javaTypeFactory.createTypeWithNullability(relDataType, field.isNullable()));
}
logger.debug("Found Schema: {}", new Object[] {schema});
http://git-wip-us.apache.org/repos/asf/nifi/blob/78a1cb7c/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
index b266b47..60fefef 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
@@ -131,6 +131,41 @@ public class TestQueryRecord {
}
@Test
+ public void testNullable() throws InitializationException, IOException, SQLException {
+ final MockRecordParser parser = new MockRecordParser();
+ parser.addSchemaField("name", RecordFieldType.STRING, true);
+ parser.addSchemaField("age", RecordFieldType.INT, true);
+ parser.addRecord("Tom", 49);
+ parser.addRecord("Alice", null);
+ parser.addRecord(null, 36);
+
+ final MockRecordWriter writer = new MockRecordWriter("\"name\",\"points\"");
+
+ TestRunner runner = getRunner();
+ runner.addControllerService("parser", parser);
+ runner.enableControllerService(parser);
+ runner.addControllerService("writer", writer);
+ runner.enableControllerService(writer);
+
+ runner.setProperty(REL_NAME, "select name, age from FLOWFILE");
+ runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "parser");
+ runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+
+ final int numIterations = 1;
+ for (int i = 0; i < numIterations; i++) {
+ runner.enqueue(new byte[0]);
+ }
+
+ runner.setThreadCount(4);
+ runner.run(2 * numIterations);
+
+ runner.assertTransferCount(REL_NAME, 1);
+ final MockFlowFile out = runner.getFlowFilesForRelationship(REL_NAME).get(0);
+ System.out.println(new String(out.toByteArray()));
+ out.assertContentEquals("\"name\",\"points\"\n\"Tom\",\"49\"\n\"Alice\",\n,\"36\"\n");
+ }
+
+ @Test
public void testParseFailure() throws InitializationException, IOException, SQLException {
final MockRecordParser parser = new MockRecordParser();
parser.addSchemaField("name", RecordFieldType.STRING);
@@ -172,10 +207,10 @@ public class TestQueryRecord {
parser.addSchemaField("AMOUNT2", RecordFieldType.FLOAT);
parser.addSchemaField("AMOUNT3", RecordFieldType.FLOAT);
- parser.addRecord("008", 10.05F, 15.45F, 89.99F);
- parser.addRecord("100", 20.25F, 25.25F, 45.25F);
- parser.addRecord("105", 20.05F, 25.05F, 45.05F);
- parser.addRecord("200", 34.05F, 25.05F, 75.05F);
+ parser.addRecord(8, 10.05F, 15.45F, 89.99F);
+ parser.addRecord(100, 20.25F, 25.25F, 45.25F);
+ parser.addRecord(105, 20.05F, 25.05F, 45.05F);
+ parser.addRecord(200, 34.05F, 25.05F, 75.05F);
final MockRecordWriter writer = new MockRecordWriter("\"NAME\",\"POINTS\"");