You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2020/05/21 17:14:13 UTC
[nifi] branch master updated: NIFI-7462: This adds a way to convert
or cast a choice object into a valid type for use with calcite query
functions
This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new b6ef7e1 NIFI-7462: This adds a way to convert or cast a choice object into a valid type for use with calcite query functions
b6ef7e1 is described below
commit b6ef7e13bf076fb88fd94ce49d2a217db3f19aaa
Author: pcgrenier <pc...@gmail.com>
AuthorDate: Fri May 15 20:03:01 2020 -0400
NIFI-7462: This adds a way to convert or cast a choice object into a valid type for use with calcite query functions
NIFI-7462: Update to allow FlowFile Table's schema to be more intelligent when using CHOICE types
NIFI-7462: Fixed checkstyle violation, removed documentation around the CAST functions that were no longer needed
Signed-off-by: Matthew Burgess <ma...@apache.org>
This closes #4282
---
.../nifi/processors/standard/QueryRecord.java | 26 ++---
.../org/apache/nifi/queryrecord/FlowFileTable.java | 58 +++++++++
.../nifi/processors/standard/TestQueryRecord.java | 129 +++++++++++++++++++++
3 files changed, 199 insertions(+), 14 deletions(-)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
index 82aea6f..a620a60 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
@@ -784,12 +784,6 @@ public class QueryRecord extends AbstractProcessor {
}
}
- public static class RecordRecordPath extends RecordPathFunction {
- public Record eval(Object record, String recordPath) {
- return eval(record, recordPath, Record.class::cast);
- }
- }
-
public static class RecordPathFunction {
private static final RecordField ROOT_RECORD_FIELD = new RecordField("root", RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()));
@@ -803,14 +797,18 @@ public class QueryRecord extends AbstractProcessor {
return null;
}
- if (record instanceof Record) {
- return eval((Record) record, recordPath, transform);
- } else if (record instanceof Record[]) {
- return eval((Record[]) record, recordPath, transform);
- } else if (record instanceof Iterable) {
- return eval((Iterable<Record>) record, recordPath, transform);
- } else if (record instanceof Map) {
- return eval((Map<?, ?>) record, recordPath, transform);
+ try {
+ if (record instanceof Record) {
+ return eval((Record) record, recordPath, transform);
+ } else if (record instanceof Record[]) {
+ return eval((Record[]) record, recordPath, transform);
+ } else if (record instanceof Iterable) {
+ return eval((Iterable<Record>) record, recordPath, transform);
+ } else if (record instanceof Map) {
+ return eval((Map<?, ?>) record, recordPath, transform);
+ }
+ } catch (IllegalArgumentException e) {
+ throw new RuntimeException("Cannot evaluate RecordPath " + recordPath + " against " + record, e);
}
throw new RuntimeException("Cannot evaluate RecordPath " + recordPath + " against given argument because the argument is of type " + record.getClass() + " instead of Record");
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java
index 3030008..18cbc63 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java
@@ -43,6 +43,7 @@ import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.ChoiceDataType;
import java.lang.reflect.Type;
import java.math.BigInteger;
@@ -223,12 +224,69 @@ public class FlowFileTable extends AbstractTable implements QueryableTable, Tran
case BIGINT:
return typeFactory.createJavaType(BigInteger.class);
case CHOICE:
+ final ChoiceDataType choiceDataType = (ChoiceDataType) fieldType;
+ DataType widestDataType = choiceDataType.getPossibleSubTypes().get(0);
+ for (final DataType possibleType : choiceDataType.getPossibleSubTypes()) {
+ if (possibleType == widestDataType) {
+ continue;
+ }
+ if (possibleType.getFieldType().isWiderThan(widestDataType.getFieldType())) {
+ widestDataType = possibleType;
+ continue;
+ }
+ if (widestDataType.getFieldType().isWiderThan(possibleType.getFieldType())) {
+ continue;
+ }
+
+ // Neither is wider than the other.
+ widestDataType = null;
+ break;
+ }
+
+ // If one of the CHOICE data types is the widest, use it.
+ if (widestDataType != null) {
+ return getRelDataType(widestDataType, typeFactory);
+ }
+
+ // None of the data types is strictly the widest. Check if all data types are numeric.
+ // This would happen, for instance, if the data type is a choice between float and integer.
+ // If that is the case, we can use a String type for the table schema because all values will fit
+ // into a String. This will still allow for casting, etc. if the query requires it.
+ boolean allNumeric = true;
+ for (final DataType possibleType : choiceDataType.getPossibleSubTypes()) {
+ if (!isNumeric(possibleType)) {
+ allNumeric = false;
+ break;
+ }
+ }
+
+ if (allNumeric) {
+ return typeFactory.createJavaType(String.class);
+ }
+
+ // There is no specific type that we can use for the schema. This would happen, for instance, if our
+ // CHOICE is between an integer and a Record.
return typeFactory.createJavaType(Object.class);
}
throw new IllegalArgumentException("Unknown Record Field Type: " + fieldType);
}
+ private boolean isNumeric(final DataType dataType) {
+ switch (dataType.getFieldType()) {
+ case BIGINT:
+ case BYTE:
+ case DOUBLE:
+ case FLOAT:
+ case INT:
+ case LONG:
+ case SHORT:
+ return true;
+ default:
+ return false;
+ }
+ }
+
@Override
public TableType getJdbcTableType() {
return TableType.TEMPORARY_TABLE;
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
index 0c26980..a2c2b19 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
@@ -41,6 +41,7 @@ import org.junit.Test;
import java.io.IOException;
import java.io.OutputStream;
+import java.math.BigDecimal;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.ArrayList;
@@ -249,6 +250,130 @@ public class TestQueryRecord {
}
@Test
+ public void testCollectionFunctionsWithoutCastFailure() throws InitializationException {
+ final Record record = createHierarchicalArrayRecord();
+ final Record record2 = createHierarchicalArrayRecord();
+ record2.setValue("height", 30);
+
+ final ArrayListRecordReader recordReader = new ArrayListRecordReader(record.getSchema());
+ recordReader.addRecord(record);
+ recordReader.addRecord(record2);
+ final ArrayListRecordWriter writer = new ArrayListRecordWriter(record.getSchema());
+
+ TestRunner runner = getRunner();
+ runner.addControllerService("reader", recordReader);
+ runner.enableControllerService(recordReader);
+ runner.addControllerService("writer", writer);
+ runner.enableControllerService(writer);
+
+ runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader");
+ runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+ runner.setProperty(REL_NAME,
+ "SELECT title, name, sum(height) as height_total " +
+ "FROM FLOWFILE " +
+ "GROUP BY title, name");
+
+ runner.enqueue(new byte[0]);
+
+ runner.run();
+
+ runner.assertTransferCount(REL_NAME, 1);
+
+ final List<Record> written = writer.getRecordsWritten();
+ assertEquals(1, written.size());
+
+ final Record output = written.get(0);
+ assertEquals("John Doe", output.getValue("name"));
+ assertEquals("Software Engineer", output.getValue("title"));
+ assertEquals(BigDecimal.valueOf(90.5D), output.getValue("height_total"));
+ }
+
+ @Test
+ public void testCollectionFunctionsWithCastChoice() throws InitializationException {
+ final Record record = createHierarchicalArrayRecord();
+
+ final ArrayListRecordReader recordReader = new ArrayListRecordReader(record.getSchema());
+ recordReader.addRecord(record);
+ recordReader.addRecord(record);
+
+ final ArrayListRecordWriter writer = new ArrayListRecordWriter(record.getSchema());
+
+ TestRunner runner = getRunner();
+ runner.addControllerService("reader", recordReader);
+ runner.enableControllerService(recordReader);
+ runner.addControllerService("writer", writer);
+ runner.enableControllerService(writer);
+
+ runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader");
+ runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+ runner.setProperty(REL_NAME,
+ "SELECT title, name, " +
+ "sum(CAST(height AS DOUBLE)) as height_total_double, " +
+ "sum(CAST(height AS REAL)) as height_total_float " +
+ "FROM FLOWFILE " +
+ "GROUP BY title, name");
+
+ runner.enqueue(new byte[0]);
+
+ runner.run();
+
+ runner.assertTransferCount(REL_NAME, 1);
+
+ final List<Record> written = writer.getRecordsWritten();
+ assertEquals(1, written.size());
+
+ final Number height = 121.0;
+ final Record output = written.get(0);
+ assertEquals("John Doe", output.getValue("name"));
+ assertEquals("Software Engineer", output.getValue("title"));
+ assertEquals(height.doubleValue(), output.getValue("height_total_double"));
+ assertEquals(height.floatValue(), output.getValue("height_total_float"));
+ }
+
+ @Test
+ public void testCollectionFunctionsWithCastChoiceWithInts() throws InitializationException {
+ final Record record = createHierarchicalArrayRecord();
+ record.setValue("height", 30);
+
+ final ArrayListRecordReader recordReader = new ArrayListRecordReader(record.getSchema());
+ recordReader.addRecord(record);
+ recordReader.addRecord(record);
+
+ final ArrayListRecordWriter writer = new ArrayListRecordWriter(record.getSchema());
+
+ TestRunner runner = getRunner();
+ runner.addControllerService("reader", recordReader);
+ runner.enableControllerService(recordReader);
+ runner.addControllerService("writer", writer);
+ runner.enableControllerService(writer);
+
+ runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader");
+ runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+ runner.setProperty(REL_NAME,
+ "SELECT title, name, " +
+ "sum(CAST(height AS INT)) as height_total_int, " +
+ "sum(CAST(height AS BIGINT)) as height_total_long " +
+ "FROM FLOWFILE " +
+ "GROUP BY title, name");
+
+ runner.enqueue(new byte[0]);
+
+ runner.run();
+
+ runner.assertTransferCount(REL_NAME, 1);
+
+ final List<Record> written = writer.getRecordsWritten();
+ assertEquals(1, written.size());
+
+ final Number height = 60;
+ final Record output = written.get(0);
+ assertEquals("John Doe", output.getValue("name"));
+ assertEquals("Software Engineer", output.getValue("title"));
+ assertEquals(height.longValue(), output.getValue("height_total_long"));
+ assertEquals(height.intValue(), output.getValue("height_total_int"));
+ }
+
+ @Test
public void testCollectionFunctionsWithWhereClause() throws InitializationException {
final Record sample = createTaggedRecord("1", "a", "b", "c");
@@ -534,6 +659,7 @@ public class TestQueryRecord {
personFields.add(new RecordField("dobTimestamp", RecordFieldType.LONG.getDataType()));
personFields.add(new RecordField("joinTimestamp", RecordFieldType.STRING.getDataType()));
personFields.add(new RecordField("weight", RecordFieldType.DOUBLE.getDataType()));
+ personFields.add(new RecordField("height", RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.LONG.getDataType(), RecordFieldType.INT.getDataType())));
personFields.add(new RecordField("mother", RecordFieldType.RECORD.getRecordDataType(namedPersonSchema)));
final RecordSchema personSchema = new SimpleRecordSchema(personFields);
@@ -559,6 +685,7 @@ public class TestQueryRecord {
map.put("dobTimestamp", ts);
map.put("joinTimestamp", "2018-02-04 10:20:55.802");
map.put("weight", 180.8D);
+ map.put("height", 60.5);
map.put("mother", mother);
final Record person = new MapRecord(personSchema, map);
@@ -640,6 +767,7 @@ public class TestQueryRecord {
personFields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
personFields.add(new RecordField("age", RecordFieldType.INT.getDataType()));
personFields.add(new RecordField("title", RecordFieldType.STRING.getDataType()));
+ personFields.add(new RecordField("height", RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.DOUBLE.getDataType(), RecordFieldType.INT.getDataType())));
personFields.add(new RecordField("addresses", RecordFieldType.ARRAY.getArrayDataType( RecordFieldType.RECORD.getRecordDataType(addressSchema)) ));
final RecordSchema personSchema = new SimpleRecordSchema(personFields);
@@ -666,6 +794,7 @@ public class TestQueryRecord {
final Map<String, Object> map = new HashMap<>();
map.put("name", "John Doe");
map.put("age", 30);
+ map.put("height", 60.5);
map.put("title", "Software Engineer");
map.put("addresses", new Record[] {homeAddress, workAddress});
final Record person = new MapRecord(personSchema, map);