You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2019/02/11 19:52:19 UTC

[GitHub] mattyb149 commented on a change in pull request #3223: NIFI-5903: Allow RecordPath to be used in QueryRecord processor. Also…

mattyb149 commented on a change in pull request #3223: NIFI-5903: Allow RecordPath to be used in QueryRecord processor. Also…
URL: https://github.com/apache/nifi/pull/3223#discussion_r255662738
 
 

 ##########
 File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
 ##########
 @@ -70,6 +78,398 @@ public TestRunner getRunner() {
         return runner;
     }
 
+
+    @Test
+    public void testRecordPathFunctions() throws InitializationException {
+        final Record record = createHierarchicalRecord();
+        final ArrayListRecordReader recordReader = new ArrayListRecordReader(record.getSchema());
+        recordReader.addRecord(record);
+
+        final ArrayListRecordWriter writer = new ArrayListRecordWriter(record.getSchema());
+
+        TestRunner runner = getRunner();
+        runner.addControllerService("reader", recordReader);
+        runner.enableControllerService(recordReader);
+        runner.addControllerService("writer", writer);
+        runner.enableControllerService(writer);
+
+        runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader");
+        runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.setProperty(REL_NAME,
+            "SELECT RPATH_STRING(person, '/name') AS name," +
+            " RPATH_INT(person, '/age') AS age," +
+            " RPATH(person, '/name') AS nameObj," +
+            " RPATH(person, '/age') AS ageObj," +
+            " RPATH(person, '/favoriteColors') AS colors," +
+            " RPATH(person, '//name') AS names," +
+            " RPATH_DATE(person, '/dob') AS dob," +
+            " RPATH_LONG(person, '/dobTimestamp') AS dobTimestamp," +
+            " RPATH_DATE(person, 'toDate(/joinTimestamp, \"yyyy-MM-dd\")') AS joinTime, " +
+            " RPATH_DOUBLE(person, '/weight') AS weight" +
+            " FROM FLOWFILE");
+
+        runner.enqueue(new byte[0]);
+
+        runner.run();
+
+        runner.assertTransferCount(REL_NAME, 1);
+
+        final List<Record> written = writer.getRecordsWritten();
+        assertEquals(1, written.size());
+
+        final Record output = written.get(0);
+        assertEquals("John Doe", output.getValue("name"));
+        assertEquals("John Doe", output.getValue("nameObj"));
+        assertEquals(30, output.getValue("age"));
+        assertEquals(30, output.getValue("ageObj"));
+        assertArrayEquals(new String[] { "red", "green"}, (Object[]) output.getValue("colors"));
+        assertArrayEquals(new String[] { "John Doe", "Jane Doe"}, (Object[]) output.getValue("names"));
+        assertEquals("1517702400000", output.getAsString("joinTime"));
+        assertEquals(Double.valueOf(180.8D), output.getAsDouble("weight"));
+    }
+
+    @Test
+    public void testRecordPathInAggregate() throws InitializationException {
+        final Record record = createHierarchicalRecord();
+
+        final ArrayListRecordReader recordReader = new ArrayListRecordReader(record.getSchema());
+        for (int i=0; i < 100; i++) {
+            final Record toAdd = createHierarchicalRecord();
+            final Record person = (Record) toAdd.getValue("person");
+
+            person.setValue("name", "Person " + i);
+            person.setValue("age", i);
+            recordReader.addRecord(toAdd);
+        }
+
+        final ArrayListRecordWriter writer = new ArrayListRecordWriter(record.getSchema());
+
+        TestRunner runner = getRunner();
+        runner.addControllerService("reader", recordReader);
+        runner.enableControllerService(recordReader);
+        runner.addControllerService("writer", writer);
+        runner.enableControllerService(writer);
+
+        runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader");
+        runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.setProperty(REL_NAME,
+            "SELECT RPATH_STRING(person, '/name') AS name FROM FLOWFILE" +
+                " WHERE RPATH_INT(person, '/age') > (" +
+                "   SELECT AVG( RPATH_INT(person, '/age') ) FROM FLOWFILE" +
+                ")");
+
+        runner.enqueue(new byte[0]);
+
+        runner.run();
+
+        runner.assertTransferCount(REL_NAME, 1);
+
+        final List<Record> written = writer.getRecordsWritten();
+        assertEquals(50, written.size());
+
+        int i=50;
+        for (final Record writtenRecord : written) {
+            final String name = writtenRecord.getAsString("name");
+            assertEquals("Person " + i, name);
+            i++;
+        }
+    }
+
+
+    @Test
+    public void testRecordPathWithArray() throws InitializationException {
+        final Record record = createHierarchicalArrayRecord();
+        final ArrayListRecordReader recordReader = new ArrayListRecordReader(record.getSchema());
+        recordReader.addRecord(record);
+
+        final ArrayListRecordWriter writer = new ArrayListRecordWriter(record.getSchema());
+
+        TestRunner runner = getRunner();
+        runner.addControllerService("reader", recordReader);
+        runner.enableControllerService(recordReader);
+        runner.addControllerService("writer", writer);
+        runner.enableControllerService(writer);
+
+        runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader");
+        runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.setProperty(REL_NAME,
+            "SELECT title, name" +
+            "    FROM FLOWFILE" +
+            "    WHERE RPATH(addresses, '/state[/label = ''home'']') <>" +
+            "          RPATH(addresses, '/state[/label = ''work'']')");
+
+        runner.enqueue(new byte[0]);
+
+        runner.run();
+
+        runner.assertTransferCount(REL_NAME, 1);
+
+        final List<Record> written = writer.getRecordsWritten();
+        assertEquals(1, written.size());
+
+        final Record output = written.get(0);
+        assertEquals("John Doe", output.getValue("name"));
+        assertEquals("Software Engineer", output.getValue("title"));
+    }
+
+    @Test
+    public void testCompareResultsOfTwoRecordPathsAgainstArray() throws InitializationException {
+        final Record record = createHierarchicalArrayRecord();
+
+        // Change the value of the 'state' field of both addresses to NY.
+        // This allows us to use an equals operator to ensure that we do get back the same values,
+        // whereas the unit test above tests <> and that may result in 'false confidence' if the software
+        // were to provide the wrong values but values that were not equal.
+        Record[] addresses = (Record[]) record.getValue("addresses");
+        for (final Record address : addresses) {
+            address.setValue("state", "NY");
+        }
+
+        final ArrayListRecordReader recordReader = new ArrayListRecordReader(record.getSchema());
+        recordReader.addRecord(record);
+
+        final ArrayListRecordWriter writer = new ArrayListRecordWriter(record.getSchema());
+
+        TestRunner runner = getRunner();
+        runner.addControllerService("reader", recordReader);
+        runner.enableControllerService(recordReader);
+        runner.addControllerService("writer", writer);
+        runner.enableControllerService(writer);
+
+        runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader");
+        runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.setProperty(REL_NAME,
+            "SELECT title, name" +
+                "    FROM FLOWFILE" +
+                "    WHERE RPATH(addresses, '/state[/label = ''home'']') =" +
+                "          RPATH(addresses, '/state[/label = ''work'']')");
+
+        runner.enqueue(new byte[0]);
+
+        runner.run();
+
+        runner.assertTransferCount(REL_NAME, 1);
+
+        final List<Record> written = writer.getRecordsWritten();
+        assertEquals(1, written.size());
+
+        final Record output = written.get(0);
+        assertEquals("John Doe", output.getValue("name"));
+        assertEquals("Software Engineer", output.getValue("title"));
+    }
+
+
+    @Test
+    public void testRecordPathWithArrayAndOnlyOneElementMatchingRPath() throws InitializationException {
+        final Record record = createHierarchicalArrayRecord();
+        final ArrayListRecordReader recordReader = new ArrayListRecordReader(record.getSchema());
+        recordReader.addRecord(record);
+
+        final ArrayListRecordWriter writer = new ArrayListRecordWriter(record.getSchema());
+
+        TestRunner runner = getRunner();
+        runner.addControllerService("reader", recordReader);
+        runner.enableControllerService(recordReader);
+        runner.addControllerService("writer", writer);
+        runner.enableControllerService(writer);
+
+        runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader");
+        runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.setProperty(REL_NAME,
+            "SELECT title, name" +
+                "    FROM FLOWFILE" +
+                "    WHERE RPATH(addresses, '/state[. = ''NY'']') = 'NY'");
+
+        runner.enqueue(new byte[0]);
+
+        runner.run();
+
+        runner.assertTransferCount(REL_NAME, 1);
+
+        final List<Record> written = writer.getRecordsWritten();
+        assertEquals(1, written.size());
+
+        final Record output = written.get(0);
+        assertEquals("John Doe", output.getValue("name"));
+        assertEquals("Software Engineer", output.getValue("title"));
+    }
+
+
+    @Test
+    public void testLikeWithRecordPath() throws InitializationException {
+        final Record record = createHierarchicalArrayRecord();
+        final ArrayListRecordReader recordReader = new ArrayListRecordReader(record.getSchema());
+        recordReader.addRecord(record);
+
+        final ArrayListRecordWriter writer = new ArrayListRecordWriter(record.getSchema());
+
+        TestRunner runner = getRunner();
+        runner.addControllerService("reader", recordReader);
+        runner.enableControllerService(recordReader);
+        runner.addControllerService("writer", writer);
+        runner.enableControllerService(writer);
+
+        runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader");
+        runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.setProperty(REL_NAME,
+            "SELECT title, name" +
+                "    FROM FLOWFILE" +
+                "    WHERE RPATH_STRING(addresses, '/state[. = ''NY'']') LIKE 'N%'");
+
+        runner.enqueue(new byte[0]);
+
+        runner.run();
+
+        runner.assertTransferCount(REL_NAME, 1);
+
+        final List<Record> written = writer.getRecordsWritten();
+        assertEquals(1, written.size());
+
+        final Record output = written.get(0);
+        assertEquals("John Doe", output.getValue("name"));
+        assertEquals("Software Engineer", output.getValue("title"));
+    }
+
+
+
+    /**
+     * Returns a Record that, if written in JSON, would look like:
+     * <code><pre>
+     * {
+     *    "person": {
+     *        "name": "John Doe",
+     *        "age": 30,
+     *        "favoriteColors": [ "red", "green" },
 
 Review comment:
   Very minor typo here (since it's javadoc on a private method), the `favoriteColors` array has a curly brace closer instead of a square bracket 😝 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services