You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2018/11/08 08:50:59 UTC

nifi git commit: NIFI-5802: Add QueryRecord nullable field support

Repository: nifi
Updated Branches:
  refs/heads/master a9045d54a -> 78a1cb7c5


NIFI-5802: Add QueryRecord nullable field support

Signed-off-by: Pierre Villard <pi...@gmail.com>

This closes #3158.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/78a1cb7c
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/78a1cb7c
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/78a1cb7c

Branch: refs/heads/master
Commit: 78a1cb7c5eb84da5d25d50d0df69970b6d732f0a
Parents: a9045d5
Author: Koji Kawamura <ij...@apache.org>
Authored: Thu Nov 8 12:10:36 2018 +0900
Committer: Pierre Villard <pi...@gmail.com>
Committed: Thu Nov 8 09:50:45 2018 +0100

----------------------------------------------------------------------
 .../apache/nifi/queryrecord/FlowFileTable.java  |  3 +-
 .../processors/standard/TestQueryRecord.java    | 43 ++++++++++++++++++--
 2 files changed, 41 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/78a1cb7c/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java
index c40e364..9e10377 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java
@@ -189,7 +189,8 @@ public class FlowFileTable<S, E> extends AbstractTable implements QueryableTable
         final JavaTypeFactory javaTypeFactory = (JavaTypeFactory) typeFactory;
         for (final RecordField field : schema.getFields()) {
             names.add(field.getFieldName());
-            types.add(getRelDataType(field.getDataType(), javaTypeFactory));
+            final RelDataType relDataType = getRelDataType(field.getDataType(), javaTypeFactory);
+            types.add(javaTypeFactory.createTypeWithNullability(relDataType, field.isNullable()));
         }
 
         logger.debug("Found Schema: {}", new Object[] {schema});

http://git-wip-us.apache.org/repos/asf/nifi/blob/78a1cb7c/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
index b266b47..60fefef 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
@@ -131,6 +131,41 @@ public class TestQueryRecord {
     }
 
     @Test
+    public void testNullable() throws InitializationException, IOException, SQLException {
+        final MockRecordParser parser = new MockRecordParser();
+        parser.addSchemaField("name", RecordFieldType.STRING, true);
+        parser.addSchemaField("age", RecordFieldType.INT, true);
+        parser.addRecord("Tom", 49);
+        parser.addRecord("Alice", null);
+        parser.addRecord(null, 36);
+
+        final MockRecordWriter writer = new MockRecordWriter("\"name\",\"points\"");
+
+        TestRunner runner = getRunner();
+        runner.addControllerService("parser", parser);
+        runner.enableControllerService(parser);
+        runner.addControllerService("writer", writer);
+        runner.enableControllerService(writer);
+
+        runner.setProperty(REL_NAME, "select name, age from FLOWFILE");
+        runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "parser");
+        runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+
+        final int numIterations = 1;
+        for (int i = 0; i < numIterations; i++) {
+            runner.enqueue(new byte[0]);
+        }
+
+        runner.setThreadCount(4);
+        runner.run(2 * numIterations);
+
+        runner.assertTransferCount(REL_NAME, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(REL_NAME).get(0);
+        System.out.println(new String(out.toByteArray()));
+        out.assertContentEquals("\"name\",\"points\"\n\"Tom\",\"49\"\n\"Alice\",\n,\"36\"\n");
+    }
+
+    @Test
     public void testParseFailure() throws InitializationException, IOException, SQLException {
         final MockRecordParser parser = new MockRecordParser();
         parser.addSchemaField("name", RecordFieldType.STRING);
@@ -172,10 +207,10 @@ public class TestQueryRecord {
         parser.addSchemaField("AMOUNT2", RecordFieldType.FLOAT);
         parser.addSchemaField("AMOUNT3", RecordFieldType.FLOAT);
 
-        parser.addRecord("008", 10.05F, 15.45F, 89.99F);
-        parser.addRecord("100", 20.25F, 25.25F, 45.25F);
-        parser.addRecord("105", 20.05F, 25.05F, 45.05F);
-        parser.addRecord("200", 34.05F, 25.05F, 75.05F);
+        parser.addRecord(8, 10.05F, 15.45F, 89.99F);
+        parser.addRecord(100, 20.25F, 25.25F, 45.25F);
+        parser.addRecord(105, 20.05F, 25.05F, 45.05F);
+        parser.addRecord(200, 34.05F, 25.05F, 75.05F);
 
         final MockRecordWriter writer = new MockRecordWriter("\"NAME\",\"POINTS\"");