You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2020/05/21 17:14:13 UTC

[nifi] branch master updated: NIFI-7462: This adds a way to convert or cast a choice object into a valid type for use with calcite query functions

This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new b6ef7e1  NIFI-7462: This adds a way to convert or cast a choice object into a valid type for use with calcite query functions
b6ef7e1 is described below

commit b6ef7e13bf076fb88fd94ce49d2a217db3f19aaa
Author: pcgrenier <pc...@gmail.com>
AuthorDate: Fri May 15 20:03:01 2020 -0400

    NIFI-7462: This adds a way to convert or cast a choice object into a valid type for use with calcite query functions
    
    NIFI-7462: Update to allow FlowFile Table's schema to be more intelligent when using CHOICE types
    
    NIFI-7462: Fixed checkstyle violation, removed documentation around the CAST functions that were no longer needed
    
    Signed-off-by: Matthew Burgess <ma...@apache.org>
    
    This closes #4282
---
 .../nifi/processors/standard/QueryRecord.java      |  26 ++---
 .../org/apache/nifi/queryrecord/FlowFileTable.java |  58 +++++++++
 .../nifi/processors/standard/TestQueryRecord.java  | 129 +++++++++++++++++++++
 3 files changed, 199 insertions(+), 14 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
index 82aea6f..a620a60 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
@@ -784,12 +784,6 @@ public class QueryRecord extends AbstractProcessor {
         }
     }
 
-    public static class RecordRecordPath extends RecordPathFunction {
-        public Record eval(Object record, String recordPath) {
-            return eval(record, recordPath, Record.class::cast);
-        }
-    }
-
 
     public static class RecordPathFunction {
         private static final RecordField ROOT_RECORD_FIELD = new RecordField("root", RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()));
@@ -803,14 +797,18 @@ public class QueryRecord extends AbstractProcessor {
                 return null;
             }
 
-            if (record instanceof Record) {
-                return eval((Record) record, recordPath, transform);
-            } else if (record instanceof Record[]) {
-                return eval((Record[]) record, recordPath, transform);
-            } else if (record instanceof Iterable) {
-                return eval((Iterable<Record>) record, recordPath, transform);
-            } else if (record instanceof Map) {
-                return eval((Map<?, ?>) record, recordPath, transform);
+            try {
+                if (record instanceof Record) {
+                    return eval((Record) record, recordPath, transform);
+                } else if (record instanceof Record[]) {
+                    return eval((Record[]) record, recordPath, transform);
+                } else if (record instanceof Iterable) {
+                    return eval((Iterable<Record>) record, recordPath, transform);
+                } else if (record instanceof Map) {
+                    return eval((Map<?, ?>) record, recordPath, transform);
+                }
+            } catch (IllegalArgumentException e) {
+                throw new RuntimeException("Cannot evaluate RecordPath " + recordPath + " against " + record, e);
             }
 
             throw new RuntimeException("Cannot evaluate RecordPath " + recordPath + " against given argument because the argument is of type " + record.getClass() + " instead of Record");
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java
index 3030008..18cbc63 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java
@@ -43,6 +43,7 @@ import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.ChoiceDataType;
 
 import java.lang.reflect.Type;
 import java.math.BigInteger;
@@ -223,12 +224,69 @@ public class FlowFileTable extends AbstractTable implements QueryableTable, Tran
             case BIGINT:
                 return typeFactory.createJavaType(BigInteger.class);
             case CHOICE:
+                final ChoiceDataType choiceDataType = (ChoiceDataType) fieldType;
+                DataType widestDataType = choiceDataType.getPossibleSubTypes().get(0);
+                for (final DataType possibleType : choiceDataType.getPossibleSubTypes()) {
+                    if (possibleType == widestDataType) {
+                        continue;
+                    }
+                    if (possibleType.getFieldType().isWiderThan(widestDataType.getFieldType())) {
+                        widestDataType = possibleType;
+                        continue;
+                    }
+                    if (widestDataType.getFieldType().isWiderThan(possibleType.getFieldType())) {
+                        continue;
+                    }
+
+                    // Neither is wider than the other.
+                    widestDataType = null;
+                    break;
+                }
+
+                // If one of the CHOICE data types is the widest, use it.
+                if (widestDataType != null) {
+                    return getRelDataType(widestDataType, typeFactory);
+                }
+
+                // None of the data types is strictly the widest. Check if all data types are numeric.
+                // This would happen, for instance, if the data type is a choice between float and integer.
+                // If that is the case, we can use a String type for the table schema because all values will fit
+                // into a String. This will still allow for casting, etc. if the query requires it.
+                boolean allNumeric = true;
+                for (final DataType possibleType : choiceDataType.getPossibleSubTypes()) {
+                    if (!isNumeric(possibleType)) {
+                        allNumeric = false;
+                        break;
+                    }
+                }
+
+                if (allNumeric) {
+                    return typeFactory.createJavaType(String.class);
+                }
+
+                // There is no specific type that we can use for the schema. This would happen, for instance, if our
+                // CHOICE is between an integer and a Record.
                 return typeFactory.createJavaType(Object.class);
         }
 
         throw new IllegalArgumentException("Unknown Record Field Type: " + fieldType);
     }
 
+    private boolean isNumeric(final DataType dataType) {
+        switch (dataType.getFieldType()) {
+            case BIGINT:
+            case BYTE:
+            case DOUBLE:
+            case FLOAT:
+            case INT:
+            case LONG:
+            case SHORT:
+                return true;
+            default:
+                return false;
+        }
+    }
+
     @Override
     public TableType getJdbcTableType() {
         return TableType.TEMPORARY_TABLE;
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
index 0c26980..a2c2b19 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
@@ -41,6 +41,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.math.BigDecimal;
 import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.ArrayList;
@@ -249,6 +250,130 @@ public class TestQueryRecord {
     }
 
     @Test
+    public void testCollectionFunctionsWithoutCastFailure() throws InitializationException {
+        final Record record = createHierarchicalArrayRecord();
+        final Record record2 = createHierarchicalArrayRecord();
+        record2.setValue("height", 30);
+
+        final ArrayListRecordReader recordReader = new ArrayListRecordReader(record.getSchema());
+        recordReader.addRecord(record);
+        recordReader.addRecord(record2);
+        final ArrayListRecordWriter writer = new ArrayListRecordWriter(record.getSchema());
+
+        TestRunner runner = getRunner();
+        runner.addControllerService("reader", recordReader);
+        runner.enableControllerService(recordReader);
+        runner.addControllerService("writer", writer);
+        runner.enableControllerService(writer);
+
+        runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader");
+        runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.setProperty(REL_NAME,
+                "SELECT title, name, sum(height) as height_total " +
+                "FROM FLOWFILE " +
+                "GROUP BY title, name");
+
+        runner.enqueue(new byte[0]);
+
+        runner.run();
+
+        runner.assertTransferCount(REL_NAME, 1);
+
+        final List<Record> written = writer.getRecordsWritten();
+        assertEquals(1, written.size());
+
+        final Record output = written.get(0);
+        assertEquals("John Doe", output.getValue("name"));
+        assertEquals("Software Engineer", output.getValue("title"));
+        assertEquals(BigDecimal.valueOf(90.5D), output.getValue("height_total"));
+    }
+
+    @Test
+    public void testCollectionFunctionsWithCastChoice() throws InitializationException {
+        final Record record = createHierarchicalArrayRecord();
+
+        final ArrayListRecordReader recordReader = new ArrayListRecordReader(record.getSchema());
+        recordReader.addRecord(record);
+        recordReader.addRecord(record);
+
+        final ArrayListRecordWriter writer = new ArrayListRecordWriter(record.getSchema());
+
+        TestRunner runner = getRunner();
+        runner.addControllerService("reader", recordReader);
+        runner.enableControllerService(recordReader);
+        runner.addControllerService("writer", writer);
+        runner.enableControllerService(writer);
+
+        runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader");
+        runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.setProperty(REL_NAME,
+                "SELECT title, name, " +
+                    "sum(CAST(height AS DOUBLE)) as height_total_double, " +
+                    "sum(CAST(height AS REAL)) as height_total_float " +
+                "FROM FLOWFILE " +
+                "GROUP BY title, name");
+
+        runner.enqueue(new byte[0]);
+
+        runner.run();
+
+        runner.assertTransferCount(REL_NAME, 1);
+
+        final List<Record> written = writer.getRecordsWritten();
+        assertEquals(1, written.size());
+
+        final Number height = 121.0;
+        final Record output = written.get(0);
+        assertEquals("John Doe", output.getValue("name"));
+        assertEquals("Software Engineer", output.getValue("title"));
+        assertEquals(height.doubleValue(), output.getValue("height_total_double"));
+        assertEquals(height.floatValue(), output.getValue("height_total_float"));
+    }
+
+    @Test
+    public void testCollectionFunctionsWithCastChoiceWithInts() throws InitializationException {
+        final Record record = createHierarchicalArrayRecord();
+        record.setValue("height", 30);
+
+        final ArrayListRecordReader recordReader = new ArrayListRecordReader(record.getSchema());
+        recordReader.addRecord(record);
+        recordReader.addRecord(record);
+
+        final ArrayListRecordWriter writer = new ArrayListRecordWriter(record.getSchema());
+
+        TestRunner runner = getRunner();
+        runner.addControllerService("reader", recordReader);
+        runner.enableControllerService(recordReader);
+        runner.addControllerService("writer", writer);
+        runner.enableControllerService(writer);
+
+        runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader");
+        runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.setProperty(REL_NAME,
+            "SELECT title, name, " +
+                "sum(CAST(height AS INT)) as height_total_int, " +
+                "sum(CAST(height AS BIGINT)) as height_total_long " +
+                "FROM FLOWFILE " +
+                "GROUP BY title, name");
+
+        runner.enqueue(new byte[0]);
+
+        runner.run();
+
+        runner.assertTransferCount(REL_NAME, 1);
+
+        final List<Record> written = writer.getRecordsWritten();
+        assertEquals(1, written.size());
+
+        final Number height = 60;
+        final Record output = written.get(0);
+        assertEquals("John Doe", output.getValue("name"));
+        assertEquals("Software Engineer", output.getValue("title"));
+        assertEquals(height.longValue(), output.getValue("height_total_long"));
+        assertEquals(height.intValue(), output.getValue("height_total_int"));
+    }
+
+    @Test
     public void testCollectionFunctionsWithWhereClause() throws InitializationException {
         final Record sample = createTaggedRecord("1", "a", "b", "c");
 
@@ -534,6 +659,7 @@ public class TestQueryRecord {
         personFields.add(new RecordField("dobTimestamp", RecordFieldType.LONG.getDataType()));
         personFields.add(new RecordField("joinTimestamp", RecordFieldType.STRING.getDataType()));
         personFields.add(new RecordField("weight", RecordFieldType.DOUBLE.getDataType()));
+        personFields.add(new RecordField("height", RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.LONG.getDataType(), RecordFieldType.INT.getDataType())));
         personFields.add(new RecordField("mother", RecordFieldType.RECORD.getRecordDataType(namedPersonSchema)));
         final RecordSchema personSchema = new SimpleRecordSchema(personFields);
 
@@ -559,6 +685,7 @@ public class TestQueryRecord {
         map.put("dobTimestamp", ts);
         map.put("joinTimestamp", "2018-02-04 10:20:55.802");
         map.put("weight", 180.8D);
+        map.put("height", 60.5);
         map.put("mother", mother);
         final Record person = new MapRecord(personSchema, map);
 
@@ -640,6 +767,7 @@ public class TestQueryRecord {
         personFields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
         personFields.add(new RecordField("age", RecordFieldType.INT.getDataType()));
         personFields.add(new RecordField("title", RecordFieldType.STRING.getDataType()));
+        personFields.add(new RecordField("height", RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.DOUBLE.getDataType(), RecordFieldType.INT.getDataType())));
         personFields.add(new RecordField("addresses", RecordFieldType.ARRAY.getArrayDataType( RecordFieldType.RECORD.getRecordDataType(addressSchema)) ));
         final RecordSchema personSchema = new SimpleRecordSchema(personFields);
 
@@ -666,6 +794,7 @@ public class TestQueryRecord {
         final Map<String, Object> map = new HashMap<>();
         map.put("name", "John Doe");
         map.put("age", 30);
+        map.put("height", 60.5);
         map.put("title", "Software Engineer");
         map.put("addresses", new Record[] {homeAddress, workAddress});
         final Record person = new MapRecord(personSchema, map);