You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2017/04/24 21:03:51 UTC
[5/7] nifi git commit: NIFI-3682: This closes #1682. Add Schema
Access Strategy and Schema Write Strategy Record Readers and Writers;
bug fixes.
http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryFlowFile.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryFlowFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryFlowFile.java
deleted file mode 100644
index 8e1c7ed..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryFlowFile.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.nifi.controller.AbstractControllerService;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processors.standard.util.record.MockRecordParser;
-import org.apache.nifi.processors.standard.util.record.MockRecordWriter;
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.serialization.RecordSetWriter;
-import org.apache.nifi.serialization.RecordSetWriterFactory;
-import org.apache.nifi.serialization.WriteResult;
-import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.serialization.record.RecordFieldType;
-import org.apache.nifi.serialization.record.RecordSet;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class TestQueryFlowFile {
-
- static {
- System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
- System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
- System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug");
- System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.standard.SQLTransform", "debug");
- }
-
- private static final String REL_NAME = "success";
-
- @Test
- public void testSimple() throws InitializationException, IOException, SQLException {
- final MockRecordParser parser = new MockRecordParser();
- parser.addSchemaField("name", RecordFieldType.STRING);
- parser.addSchemaField("age", RecordFieldType.INT);
- parser.addRecord("Tom", 49);
-
- final MockRecordWriter writer = new MockRecordWriter("\"name\",\"points\"");
-
- final TestRunner runner = TestRunners.newTestRunner(QueryFlowFile.class);
- runner.addControllerService("parser", parser);
- runner.enableControllerService(parser);
- runner.addControllerService("writer", writer);
- runner.enableControllerService(writer);
-
- runner.setProperty(REL_NAME, "select name, age from FLOWFILE WHERE name <> ''");
- runner.setProperty(QueryFlowFile.RECORD_READER_FACTORY, "parser");
- runner.setProperty(QueryFlowFile.RECORD_WRITER_FACTORY, "writer");
-
- final int numIterations = 1;
- for (int i = 0; i < numIterations; i++) {
- runner.enqueue(new byte[0]);
- }
-
- runner.setThreadCount(4);
- runner.run(2 * numIterations);
-
- runner.assertTransferCount(REL_NAME, 1);
- final MockFlowFile out = runner.getFlowFilesForRelationship(REL_NAME).get(0);
- System.out.println(new String(out.toByteArray()));
- out.assertContentEquals("\"name\",\"points\"\n\"Tom\",\"49\"\n");
- }
-
- @Test
- public void testParseFailure() throws InitializationException, IOException, SQLException {
- final MockRecordParser parser = new MockRecordParser();
- parser.addSchemaField("name", RecordFieldType.STRING);
- parser.addSchemaField("age", RecordFieldType.INT);
- parser.addRecord("Tom", 49);
-
- final MockRecordWriter writer = new MockRecordWriter("\"name\",\"points\"");
-
- final TestRunner runner = TestRunners.newTestRunner(QueryFlowFile.class);
- runner.addControllerService("parser", parser);
- runner.enableControllerService(parser);
- runner.addControllerService("writer", writer);
- runner.enableControllerService(writer);
-
- runner.setProperty(REL_NAME, "select name, age from FLOWFILE WHERE name <> ''");
- runner.setProperty(QueryFlowFile.RECORD_READER_FACTORY, "parser");
- runner.setProperty(QueryFlowFile.RECORD_WRITER_FACTORY, "writer");
-
- final int numIterations = 1;
- for (int i = 0; i < numIterations; i++) {
- runner.enqueue(new byte[0]);
- }
-
- runner.setThreadCount(4);
- runner.run(2 * numIterations);
-
- runner.assertTransferCount(REL_NAME, 1);
- final MockFlowFile out = runner.getFlowFilesForRelationship(REL_NAME).get(0);
- System.out.println(new String(out.toByteArray()));
- out.assertContentEquals("\"name\",\"points\"\n\"Tom\",\"49\"\n");
- }
-
-
- @Test
- public void testTransformCalc() throws InitializationException, IOException, SQLException {
- final MockRecordParser parser = new MockRecordParser();
- parser.addSchemaField("ID", RecordFieldType.INT);
- parser.addSchemaField("AMOUNT1", RecordFieldType.FLOAT);
- parser.addSchemaField("AMOUNT2", RecordFieldType.FLOAT);
- parser.addSchemaField("AMOUNT3", RecordFieldType.FLOAT);
-
- parser.addRecord("008", 10.05F, 15.45F, 89.99F);
- parser.addRecord("100", 20.25F, 25.25F, 45.25F);
- parser.addRecord("105", 20.05F, 25.05F, 45.05F);
- parser.addRecord("200", 34.05F, 25.05F, 75.05F);
-
- final MockRecordWriter writer = new MockRecordWriter("\"NAME\",\"POINTS\"");
-
- final TestRunner runner = TestRunners.newTestRunner(QueryFlowFile.class);
- runner.addControllerService("parser", parser);
- runner.enableControllerService(parser);
- runner.addControllerService("writer", writer);
- runner.enableControllerService(writer);
-
- runner.setProperty(REL_NAME, "select ID, AMOUNT1+AMOUNT2+AMOUNT3 as TOTAL from FLOWFILE where ID=100");
- runner.setProperty(QueryFlowFile.RECORD_READER_FACTORY, "parser");
- runner.setProperty(QueryFlowFile.RECORD_WRITER_FACTORY, "writer");
-
- runner.enqueue(new byte[0]);
- runner.run();
-
- runner.assertTransferCount(REL_NAME, 1);
- final MockFlowFile out = runner.getFlowFilesForRelationship(REL_NAME).get(0);
-
- out.assertContentEquals("\"NAME\",\"POINTS\"\n\"100\",\"90.75\"\n");
- }
-
-
- @Test
- public void testAggregateFunction() throws InitializationException, IOException {
- final MockRecordParser parser = new MockRecordParser();
- parser.addSchemaField("name", RecordFieldType.STRING);
- parser.addSchemaField("points", RecordFieldType.INT);
- parser.addRecord("Tom", 1);
- parser.addRecord("Jerry", 2);
- parser.addRecord("Tom", 99);
-
- final MockRecordWriter writer = new MockRecordWriter("\"name\",\"points\"");
-
- final TestRunner runner = TestRunners.newTestRunner(QueryFlowFile.class);
- runner.addControllerService("parser", parser);
- runner.enableControllerService(parser);
- runner.addControllerService("writer", writer);
- runner.enableControllerService(writer);
-
- runner.setProperty(REL_NAME, "select name, sum(points) as points from FLOWFILE GROUP BY name");
- runner.setProperty(QueryFlowFile.RECORD_READER_FACTORY, "parser");
- runner.setProperty(QueryFlowFile.RECORD_WRITER_FACTORY, "writer");
-
- runner.enqueue("");
- runner.run();
-
- runner.assertTransferCount(REL_NAME, 1);
- final MockFlowFile flowFileOut = runner.getFlowFilesForRelationship(ExecuteProcess.REL_SUCCESS).get(0);
- flowFileOut.assertContentEquals("\"name\",\"points\"\n\"Tom\",\"100\"\n\"Jerry\",\"2\"\n");
- }
-
- @Test
- public void testColumnNames() throws InitializationException, IOException {
- final MockRecordParser parser = new MockRecordParser();
- parser.addSchemaField("name", RecordFieldType.STRING);
- parser.addSchemaField("points", RecordFieldType.INT);
- parser.addSchemaField("greeting", RecordFieldType.STRING);
- parser.addRecord("Tom", 1, "Hello");
- parser.addRecord("Jerry", 2, "Hi");
- parser.addRecord("Tom", 99, "Howdy");
-
- final List<String> colNames = new ArrayList<>();
- colNames.add("name");
- colNames.add("points");
- colNames.add("greeting");
- colNames.add("FAV_GREETING");
- final ResultSetValidatingRecordWriter writer = new ResultSetValidatingRecordWriter(colNames);
-
- final TestRunner runner = TestRunners.newTestRunner(QueryFlowFile.class);
- runner.addControllerService("parser", parser);
- runner.enableControllerService(parser);
- runner.addControllerService("writer", writer);
- runner.enableControllerService(writer);
-
- runner.setProperty(REL_NAME, "select *, greeting AS FAV_GREETING from FLOWFILE");
- runner.setProperty(QueryFlowFile.RECORD_READER_FACTORY, "parser");
- runner.setProperty(QueryFlowFile.RECORD_WRITER_FACTORY, "writer");
-
- runner.enqueue("");
- runner.run();
-
- runner.assertTransferCount(REL_NAME, 1);
- }
-
-
- private static class ResultSetValidatingRecordWriter extends AbstractControllerService implements RecordSetWriterFactory {
- private final List<String> columnNames;
-
- public ResultSetValidatingRecordWriter(final List<String> colNames) {
- this.columnNames = new ArrayList<>(colNames);
- }
-
- @Override
- public RecordSetWriter createWriter(ComponentLog logger) {
- return new RecordSetWriter() {
- @Override
- public WriteResult write(final RecordSet rs, final OutputStream out) throws IOException {
- final int colCount = rs.getSchema().getFieldCount();
- Assert.assertEquals(columnNames.size(), colCount);
-
- final List<String> colNames = new ArrayList<>(colCount);
- for (int i = 0; i < colCount; i++) {
- colNames.add(rs.getSchema().getField(i).getFieldName());
- }
-
- Assert.assertEquals(columnNames, colNames);
-
- // Iterate over the rest of the records to ensure that we read the entire stream. If we don't
- // do this, we won't consume all of the data and as a result we will not close the stream properly
- Record record;
- while ((record = rs.next()) != null) {
- System.out.println(record);
- }
-
- return WriteResult.of(0, Collections.emptyMap());
- }
-
- @Override
- public String getMimeType() {
- return "text/plain";
- }
-
- @Override
- public WriteResult write(Record record, OutputStream out) throws IOException {
- return null;
- }
- };
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
new file mode 100644
index 0000000..32c3635
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processors.standard.util.record.MockRecordParser;
+import org.apache.nifi.processors.standard.util.record.MockRecordWriter;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestQueryRecord {
+
+ static {
+ System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
+ System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
+ System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug");
+ System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.standard.SQLTransform", "debug");
+ }
+
+ private static final String REL_NAME = "success";
+
+ @Test
+ public void testSimple() throws InitializationException, IOException, SQLException {
+ final MockRecordParser parser = new MockRecordParser();
+ parser.addSchemaField("name", RecordFieldType.STRING);
+ parser.addSchemaField("age", RecordFieldType.INT);
+ parser.addRecord("Tom", 49);
+
+ final MockRecordWriter writer = new MockRecordWriter("\"name\",\"points\"");
+
+ final TestRunner runner = TestRunners.newTestRunner(QueryRecord.class);
+ runner.addControllerService("parser", parser);
+ runner.enableControllerService(parser);
+ runner.addControllerService("writer", writer);
+ runner.enableControllerService(writer);
+
+ runner.setProperty(REL_NAME, "select name, age from FLOWFILE WHERE name <> ''");
+ runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "parser");
+ runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+
+ final int numIterations = 1;
+ for (int i = 0; i < numIterations; i++) {
+ runner.enqueue(new byte[0]);
+ }
+
+ runner.setThreadCount(4);
+ runner.run(2 * numIterations);
+
+ runner.assertTransferCount(REL_NAME, 1);
+ final MockFlowFile out = runner.getFlowFilesForRelationship(REL_NAME).get(0);
+ System.out.println(new String(out.toByteArray()));
+ out.assertContentEquals("\"name\",\"points\"\n\"Tom\",\"49\"\n");
+ }
+
+ @Test
+ public void testParseFailure() throws InitializationException, IOException, SQLException {
+ final MockRecordParser parser = new MockRecordParser();
+ parser.addSchemaField("name", RecordFieldType.STRING);
+ parser.addSchemaField("age", RecordFieldType.INT);
+ parser.addRecord("Tom", 49);
+
+ final MockRecordWriter writer = new MockRecordWriter("\"name\",\"points\"");
+
+ final TestRunner runner = TestRunners.newTestRunner(QueryRecord.class);
+ runner.addControllerService("parser", parser);
+ runner.enableControllerService(parser);
+ runner.addControllerService("writer", writer);
+ runner.enableControllerService(writer);
+
+ runner.setProperty(REL_NAME, "select name, age from FLOWFILE WHERE name <> ''");
+ runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "parser");
+ runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+
+ final int numIterations = 1;
+ for (int i = 0; i < numIterations; i++) {
+ runner.enqueue(new byte[0]);
+ }
+
+ runner.setThreadCount(4);
+ runner.run(2 * numIterations);
+
+ runner.assertTransferCount(REL_NAME, 1);
+ final MockFlowFile out = runner.getFlowFilesForRelationship(REL_NAME).get(0);
+ System.out.println(new String(out.toByteArray()));
+ out.assertContentEquals("\"name\",\"points\"\n\"Tom\",\"49\"\n");
+ }
+
+
+ @Test
+ public void testTransformCalc() throws InitializationException, IOException, SQLException {
+ final MockRecordParser parser = new MockRecordParser();
+ parser.addSchemaField("ID", RecordFieldType.INT);
+ parser.addSchemaField("AMOUNT1", RecordFieldType.FLOAT);
+ parser.addSchemaField("AMOUNT2", RecordFieldType.FLOAT);
+ parser.addSchemaField("AMOUNT3", RecordFieldType.FLOAT);
+
+ parser.addRecord("008", 10.05F, 15.45F, 89.99F);
+ parser.addRecord("100", 20.25F, 25.25F, 45.25F);
+ parser.addRecord("105", 20.05F, 25.05F, 45.05F);
+ parser.addRecord("200", 34.05F, 25.05F, 75.05F);
+
+ final MockRecordWriter writer = new MockRecordWriter("\"NAME\",\"POINTS\"");
+
+ final TestRunner runner = TestRunners.newTestRunner(QueryRecord.class);
+ runner.addControllerService("parser", parser);
+ runner.enableControllerService(parser);
+ runner.addControllerService("writer", writer);
+ runner.enableControllerService(writer);
+
+ runner.setProperty(REL_NAME, "select ID, AMOUNT1+AMOUNT2+AMOUNT3 as TOTAL from FLOWFILE where ID=100");
+ runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "parser");
+ runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+
+ runner.enqueue(new byte[0]);
+ runner.run();
+
+ runner.assertTransferCount(REL_NAME, 1);
+ final MockFlowFile out = runner.getFlowFilesForRelationship(REL_NAME).get(0);
+
+ out.assertContentEquals("\"NAME\",\"POINTS\"\n\"100\",\"90.75\"\n");
+ }
+
+ @Test
+ public void testHandlingWithInvalidSchema() throws InitializationException {
+ final MockRecordParser parser = new MockRecordParser();
+ parser.addSchemaField("name", RecordFieldType.STRING);
+ parser.addSchemaField("favorite_color", RecordFieldType.STRING);
+ parser.addSchemaField("address", RecordFieldType.STRING);
+ parser.addRecord("Tom", "blue", null);
+ parser.addRecord("Jerry", "red", null);
+
+ final MockRecordWriter writer = new MockRecordWriter("\"name\",\"points\"");
+
+ final TestRunner runner = TestRunners.newTestRunner(QueryRecord.class);
+ runner.enforceReadStreamsClosed(false);
+ runner.addControllerService("parser", parser);
+ runner.enableControllerService(parser);
+ runner.addControllerService("writer", writer);
+ runner.enableControllerService(writer);
+
+ runner.setProperty(QueryRecord.INCLUDE_ZERO_RECORD_FLOWFILES, "false");
+ runner.setProperty("rel1", "select * from FLOWFILE where address IS NOT NULL");
+ runner.setProperty("rel2", "select name, CAST(favorite_color AS DOUBLE) AS num from FLOWFILE");
+ runner.setProperty("rel3", "select * from FLOWFILE where address IS NOT NULL");
+ runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "parser");
+ runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+
+ runner.enqueue("");
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(QueryRecord.REL_FAILURE, 1);
+ }
+
+ @Test
+ public void testAggregateFunction() throws InitializationException, IOException {
+ final MockRecordParser parser = new MockRecordParser();
+ parser.addSchemaField("name", RecordFieldType.STRING);
+ parser.addSchemaField("points", RecordFieldType.INT);
+ parser.addRecord("Tom", 1);
+ parser.addRecord("Jerry", 2);
+ parser.addRecord("Tom", 99);
+
+ final MockRecordWriter writer = new MockRecordWriter("\"name\",\"points\"");
+
+ final TestRunner runner = TestRunners.newTestRunner(QueryRecord.class);
+ runner.addControllerService("parser", parser);
+ runner.enableControllerService(parser);
+ runner.addControllerService("writer", writer);
+ runner.enableControllerService(writer);
+
+ runner.setProperty(REL_NAME, "select name, sum(points) as points from FLOWFILE GROUP BY name");
+ runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "parser");
+ runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+
+ runner.enqueue("");
+ runner.run();
+
+ runner.assertTransferCount(REL_NAME, 1);
+ final MockFlowFile flowFileOut = runner.getFlowFilesForRelationship(ExecuteProcess.REL_SUCCESS).get(0);
+ flowFileOut.assertContentEquals("\"name\",\"points\"\n\"Tom\",\"100\"\n\"Jerry\",\"2\"\n");
+ }
+
+ @Test
+ public void testColumnNames() throws InitializationException, IOException {
+ final MockRecordParser parser = new MockRecordParser();
+ parser.addSchemaField("name", RecordFieldType.STRING);
+ parser.addSchemaField("points", RecordFieldType.INT);
+ parser.addSchemaField("greeting", RecordFieldType.STRING);
+ parser.addRecord("Tom", 1, "Hello");
+ parser.addRecord("Jerry", 2, "Hi");
+ parser.addRecord("Tom", 99, "Howdy");
+
+ final List<String> colNames = new ArrayList<>();
+ colNames.add("name");
+ colNames.add("points");
+ colNames.add("greeting");
+ colNames.add("FAV_GREETING");
+ final ResultSetValidatingRecordWriter writer = new ResultSetValidatingRecordWriter(colNames);
+
+ final TestRunner runner = TestRunners.newTestRunner(QueryRecord.class);
+ runner.addControllerService("parser", parser);
+ runner.enableControllerService(parser);
+ runner.addControllerService("writer", writer);
+ runner.enableControllerService(writer);
+
+ runner.setProperty(REL_NAME, "select *, greeting AS FAV_GREETING from FLOWFILE");
+ runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "parser");
+ runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+
+ runner.enqueue("");
+ runner.run();
+
+ runner.assertTransferCount(REL_NAME, 1);
+ }
+
+
+ private static class ResultSetValidatingRecordWriter extends AbstractControllerService implements RecordSetWriterFactory {
+ private final List<String> columnNames;
+
+ public ResultSetValidatingRecordWriter(final List<String> colNames) {
+ this.columnNames = new ArrayList<>(colNames);
+ }
+
+ @Override
+ public RecordSetWriter createWriter(ComponentLog logger, FlowFile flowFile, InputStream in) {
+ return new RecordSetWriter() {
+ @Override
+ public WriteResult write(final RecordSet rs, final OutputStream out) throws IOException {
+ final int colCount = rs.getSchema().getFieldCount();
+ Assert.assertEquals(columnNames.size(), colCount);
+
+ final List<String> colNames = new ArrayList<>(colCount);
+ for (int i = 0; i < colCount; i++) {
+ colNames.add(rs.getSchema().getField(i).getFieldName());
+ }
+
+ Assert.assertEquals(columnNames, colNames);
+
+ // Iterate over the rest of the records to ensure that we read the entire stream. If we don't
+ // do this, we won't consume all of the data and as a result we will not close the stream properly
+ Record record;
+ while ((record = rs.next()) != null) {
+ System.out.println(record);
+ }
+
+ return WriteResult.of(0, Collections.emptyMap());
+ }
+
+ @Override
+ public String getMimeType() {
+ return "text/plain";
+ }
+
+ @Override
+ public WriteResult write(Record record, OutputStream out) throws IOException {
+ return null;
+ }
+ };
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordParser.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordParser.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordParser.java
index 1a39b82..fcf0d10 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordParser.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordParser.java
@@ -28,9 +28,10 @@ import java.util.Map;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
-import org.apache.nifi.serialization.RowRecordReaderFactory;
+import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
@@ -38,7 +39,7 @@ import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
-public class MockRecordParser extends AbstractControllerService implements RowRecordReaderFactory {
+public class MockRecordParser extends AbstractControllerService implements RecordReaderFactory {
private final List<Object[]> records = new ArrayList<>();
private final List<RecordField> fields = new ArrayList<>();
private final int failAfterN;
@@ -61,7 +62,7 @@ public class MockRecordParser extends AbstractControllerService implements RowRe
}
@Override
- public RecordReader createRecordReader(FlowFile flowFile, InputStream in, ComponentLog logger) throws IOException {
+ public RecordReader createRecordReader(FlowFile flowFile, InputStream in, ComponentLog logger) throws IOException, SchemaNotFoundException {
final Iterator<Object[]> itr = records.iterator();
return new RecordReader() {
@@ -99,9 +100,4 @@ public class MockRecordParser extends AbstractControllerService implements RowRe
}
};
}
-
- @Override
- public RecordSchema getSchema(FlowFile flowFile) throws MalformedRecordException, IOException {
- return null;
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordWriter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordWriter.java
index 0a57b29..1dbfd04 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordWriter.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordWriter.java
@@ -18,10 +18,12 @@
package org.apache.nifi.processors.standard.util.record;
import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collections;
import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
@@ -49,7 +51,7 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
}
@Override
- public RecordSetWriter createWriter(final ComponentLog logger) {
+ public RecordSetWriter createWriter(final ComponentLog logger, final FlowFile flowFile, final InputStream in) {
return new RecordSetWriter() {
@Override
public WriteResult write(final RecordSet rs, final OutputStream out) throws IOException {
@@ -69,9 +71,11 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
final String val = record.getAsString(fieldName);
if (quoteValues) {
out.write("\"".getBytes());
- out.write(val.getBytes());
+ if (val != null) {
+ out.write(val.getBytes());
+ }
out.write("\"".getBytes());
- } else {
+ } else if (val != null) {
out.write(val.getBytes());
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/schema/access/SchemaField.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/schema/access/SchemaField.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/schema/access/SchemaField.java
new file mode 100644
index 0000000..2fe06f4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/schema/access/SchemaField.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.schema.access;
+
+public enum SchemaField {
+ SCHEMA_TEXT("Schema Text"),
+ SCHEMA_TEXT_FORMAT("Schema Text Format"),
+ SCHEMA_NAME("Schema Name"),
+ SCHEMA_IDENTIFIER("Schema Identifier"),
+ SCHEMA_VERSION("Schema Version");
+
+ private final String description;
+
+ private SchemaField(final String description) {
+ this.description = description;
+ }
+
+ @Override
+ public String toString() {
+ return description;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/schema/access/SchemaNotFoundException.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/schema/access/SchemaNotFoundException.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/schema/access/SchemaNotFoundException.java
new file mode 100644
index 0000000..9a064ff
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/schema/access/SchemaNotFoundException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.schema.access;
+
+public class SchemaNotFoundException extends Exception {
+ public SchemaNotFoundException(final String message) {
+ super(message);
+ }
+
+ public SchemaNotFoundException(final String message, final Throwable cause) {
+ super(cause);
+ }
+
+ public SchemaNotFoundException(final Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordReaderFactory.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordReaderFactory.java
new file mode 100644
index 0000000..7d7268e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordReaderFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+
+/**
+ * <p>
+ * A Controller Service that is responsible for creating a {@link RecordReader}.
+ * </p>
+ */
+public interface RecordReaderFactory extends ControllerService {
+
+ RecordReader createRecordReader(FlowFile flowFile, InputStream in, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException;
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java
index 2286f3f..e23ad20 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java
@@ -17,14 +17,47 @@
package org.apache.nifi.serialization;
+import java.io.IOException;
+import java.io.InputStream;
+
import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
/**
* <p>
- * A Controller Service that is responsible for creating a {@link RecordSetWriter}.
+ * A Controller Service that is responsible for creating a {@link RecordSetWriter}. The writer is created
+ * based on a FlowFile and an InputStream for that FlowFile, but it is important to note that this the FlowFile passed
+ * to the {@link #createWriter(ComponentLog, FlowFile, InputStream)} may not be the FlowFile that the Writer will writer to.
+ * Rather, it is the FlowFile and InputStream from which the Writer's Schema should be determined. This is done because most
+ * Processors that make use of Record Writers also make use of Record Readers and the schema for the output is often determined
+ * by either reading the schema from the content of the input FlowFile or from referencing attributes of the
+ * input FlowFile.
+ * </p>
+ *
+ * <p>
+ * PLEASE NOTE: This interface is still considered 'unstable' and may change in a non-backward-compatible
+ * manner between minor or incremental releases of NiFi.
* </p>
*/
public interface RecordSetWriterFactory extends ControllerService {
- RecordSetWriter createWriter(ComponentLog logger);
+
+ /**
+ * <p>
+ * Creates a new RecordSetWriter that is capable of writing record contents to an OutputStream. Note that the
+ * FlowFile and InputStream that are given may well be different than the FlowFile that the writer is intended
+ * to write to. The given FlowFile and InputStream are intended to be used for determining the schema that should
+ * be used when writing records.
+ * </p>
+ *
+ * @param logger the logger to use when logging information. This is passed in, rather than using the logger of the Controller Service
+ * because it allows messages to be logged for the component that is calling this Controller Service.
+ * @param schemaFlowFile the FlowFile from which the schema should be determined.
+ * @param schemaFlowFileContent the contents of the FlowFile from which to determine the schema
+ * @return a RecordSetWriter that can write record sets to an OutputStream
+ * @throws SchemaNotFoundException if unable to find the schema
+ * @throws IOException if unable to read from the given InputStream
+ */
+ RecordSetWriter createWriter(ComponentLog logger, FlowFile schemaFlowFile, InputStream schemaFlowFileContent) throws SchemaNotFoundException, IOException;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RowRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RowRecordReaderFactory.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RowRecordReaderFactory.java
deleted file mode 100644
index fbd8a21..0000000
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RowRecordReaderFactory.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.serialization;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.nifi.controller.ControllerService;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.serialization.record.RecordSchema;
-
-/**
- * <p>
- * A Controller Service that is responsible for creating a {@link RecordReader}.
- * </p>
- */
-public interface RowRecordReaderFactory extends ControllerService {
-
- RecordReader createRecordReader(FlowFile flowFile, InputStream in, ComponentLog logger) throws MalformedRecordException, IOException;
-
- RecordSchema getSchema(FlowFile flowFile) throws MalformedRecordException, IOException;
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java
index 246e0af..017aef1 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java
@@ -29,21 +29,69 @@ import java.util.stream.Collectors;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
public class SimpleRecordSchema implements RecordSchema {
private final List<RecordField> fields;
private final Map<String, Integer> fieldIndices;
+ private final boolean textAvailable;
+ private final String text;
+ private final String schemaFormat;
+ private final SchemaIdentifier schemaIdentifier;
public SimpleRecordSchema(final List<RecordField> fields) {
+ this(fields, createText(fields), null, false, SchemaIdentifier.EMPTY);
+ }
+
+ public SimpleRecordSchema(final List<RecordField> fields, final SchemaIdentifier id) {
+ this(fields, createText(fields), null, false, id);
+ }
+
+ public SimpleRecordSchema(final List<RecordField> fields, final String text, final String schemaFormat, final SchemaIdentifier id) {
+ this(fields, text, schemaFormat, true, id);
+ }
+
+ private SimpleRecordSchema(final List<RecordField> fields, final String text, final String schemaFormat, final boolean textAvailable, final SchemaIdentifier id) {
+ this.text = text;
+ this.schemaFormat = schemaFormat;
+ this.schemaIdentifier = id;
+ this.textAvailable = textAvailable;
this.fields = Collections.unmodifiableList(new ArrayList<>(fields));
this.fieldIndices = new HashMap<>(fields.size());
int index = 0;
for (final RecordField field : fields) {
- fieldIndices.put(field.getFieldName(), index++);
+ Integer previousValue = fieldIndices.put(field.getFieldName(), index);
+ if (previousValue != null) {
+ throw new IllegalArgumentException("Two fields are given with the same name (or alias) of '" + field.getFieldName() + "'");
+ }
+
+ for (final String alias : field.getAliases()) {
+ previousValue = fieldIndices.put(alias, index);
+ if (previousValue != null) {
+ throw new IllegalArgumentException("Two fields are given with the same name (or alias) of '" + field.getFieldName() + "'");
+ }
+ }
+
+ index++;
+ }
+ }
+
+ @Override
+ public Optional<String> getSchemaText() {
+ if (textAvailable) {
+ return Optional.ofNullable(text);
+ } else {
+ return Optional.empty();
}
}
+
+ @Override
+ public Optional<String> getSchemaFormat() {
+ return Optional.ofNullable(schemaFormat);
+ }
+
@Override
public List<RecordField> getFields() {
return fields;
@@ -77,6 +125,16 @@ public class SimpleRecordSchema implements RecordSchema {
return idx.isPresent() ? Optional.of(fields.get(idx.getAsInt()).getDataType()) : Optional.empty();
}
+ @Override
+ public Optional<RecordField> getField(final String fieldName) {
+ final OptionalInt indexOption = getFieldIndex(fieldName);
+ if (indexOption.isPresent()) {
+ return Optional.of(fields.get(indexOption.getAsInt()));
+ }
+
+ return Optional.empty();
+ }
+
private OptionalInt getFieldIndex(final String fieldName) {
final Integer index = fieldIndices.get(fieldName);
return index == null ? OptionalInt.empty() : OptionalInt.of(index);
@@ -103,8 +161,7 @@ public class SimpleRecordSchema implements RecordSchema {
return 143 + 3 * fields.hashCode();
}
- @Override
- public String toString() {
+ private static String createText(final List<RecordField> fields) {
final StringBuilder sb = new StringBuilder("[");
for (int i = 0; i < fields.size(); i++) {
@@ -123,4 +180,14 @@ public class SimpleRecordSchema implements RecordSchema {
sb.append("]");
return sb.toString();
}
+
+ @Override
+ public String toString() {
+ return text;
+ }
+
+ @Override
+ public SchemaIdentifier getIdentifier() {
+ return schemaIdentifier;
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/DataType.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/DataType.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/DataType.java
index b72c107..6ed4bd6 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/DataType.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/DataType.java
@@ -17,6 +17,8 @@
package org.apache.nifi.serialization.record;
+import java.util.Objects;
+
public class DataType {
private final RecordFieldType fieldType;
private final String format;
@@ -36,7 +38,7 @@ public class DataType {
@Override
public int hashCode() {
- return 31 + 41 * fieldType.hashCode() + 41 * (format == null ? 0 : format.hashCode());
+ return 31 + 41 * getFieldType().hashCode() + 41 * (getFormat() == null ? 0 : getFormat().hashCode());
}
@Override
@@ -52,15 +54,15 @@ public class DataType {
}
final DataType other = (DataType) obj;
- return fieldType.equals(other.fieldType) && ((format == null && other.format == null) || (format != null && format.equals(other.format)));
+ return getFieldType().equals(other.getFieldType()) && Objects.equals(getFormat(), other.getFormat());
}
@Override
public String toString() {
- if (format == null) {
- return fieldType.toString();
+ if (getFormat() == null) {
+ return getFieldType().toString();
} else {
- return fieldType.toString() + ":" + format;
+ return getFieldType().toString() + ":" + getFormat();
}
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/MapRecord.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
index 0bbb534..56cf909 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
@@ -42,15 +42,86 @@ public class MapRecord implements Record {
public Object[] getValues() {
final Object[] values = new Object[schema.getFieldCount()];
int i = 0;
- for (final String fieldName : schema.getFieldNames()) {
- values[i++] = getValue(fieldName);
+ for (final RecordField recordField : schema.getFields()) {
+ values[i++] = getValue(recordField);
}
return values;
}
@Override
public Object getValue(final String fieldName) {
- return values.get(fieldName);
+ final Optional<RecordField> fieldOption = schema.getField(fieldName);
+ if (fieldOption.isPresent()) {
+ return getValue(fieldOption.get());
+ }
+
+ return null;
+ }
+
+ @Override
+ public Object getValue(final RecordField field) {
+ Object explicitValue = getExplicitValue(field);
+ if (explicitValue != null) {
+ return explicitValue;
+ }
+
+ final Optional<RecordField> resolvedField = resolveField(field);
+ final boolean resolvedFieldDifferent = resolvedField.isPresent() && !resolvedField.get().equals(field);
+ if (resolvedFieldDifferent) {
+ explicitValue = getExplicitValue(resolvedField.get());
+ if (explicitValue != null) {
+ return explicitValue;
+ }
+ }
+
+ Object defaultValue = field.getDefaultValue();
+ if (defaultValue != null) {
+ return defaultValue;
+ }
+
+ if (resolvedFieldDifferent) {
+ return resolvedField.get().getDefaultValue();
+ }
+
+ return null;
+ }
+
+ private Optional<RecordField> resolveField(final RecordField field) {
+ Optional<RecordField> resolved = schema.getField(field.getFieldName());
+ if (resolved.isPresent()) {
+ return resolved;
+ }
+
+ for (final String alias : field.getAliases()) {
+ resolved = schema.getField(alias);
+ if (resolved.isPresent()) {
+ return resolved;
+ }
+ }
+
+ return Optional.empty();
+ }
+
+ private Object getExplicitValue(final RecordField field) {
+ final String canonicalFieldName = field.getFieldName();
+
+ // We use containsKey here instead of just calling get() and checking for a null value
+ // because if the true field name is set to null, we want to return null, rather than
+ // what the alias points to. Likewise for a specific alias, since aliases are defined
+ // in a List with a specific ordering.
+ Object value = values.get(canonicalFieldName);
+ if (value != null) {
+ return value;
+ }
+
+ for (final String alias : field.getAliases()) {
+ value = values.get(alias);
+ if (value != null) {
+ return value;
+ }
+ }
+
+ return null;
}
@Override
@@ -68,6 +139,11 @@ public class MapRecord implements Record {
return convertToString(getValue(fieldName), format);
}
+ @Override
+ public String getAsString(final RecordField field, final String format) {
+ return convertToString(getValue(field), format);
+ }
+
private String getFormat(final String optionalFormat, final RecordFieldType fieldType) {
return (optionalFormat == null) ? fieldType.getDefaultFormat() : optionalFormat;
}
@@ -85,42 +161,42 @@ public class MapRecord implements Record {
@Override
public Long getAsLong(final String fieldName) {
- return DataTypeUtils.toLong(getValue(fieldName));
+ return DataTypeUtils.toLong(getValue(fieldName), fieldName);
}
@Override
public Integer getAsInt(final String fieldName) {
- return DataTypeUtils.toInteger(getValue(fieldName));
+ return DataTypeUtils.toInteger(getValue(fieldName), fieldName);
}
@Override
public Double getAsDouble(final String fieldName) {
- return DataTypeUtils.toDouble(getValue(fieldName));
+ return DataTypeUtils.toDouble(getValue(fieldName), fieldName);
}
@Override
public Float getAsFloat(final String fieldName) {
- return DataTypeUtils.toFloat(getValue(fieldName));
+ return DataTypeUtils.toFloat(getValue(fieldName), fieldName);
}
@Override
public Record getAsRecord(String fieldName, final RecordSchema schema) {
- return DataTypeUtils.toRecord(getValue(fieldName), schema);
+ return DataTypeUtils.toRecord(getValue(fieldName), schema, fieldName);
}
@Override
public Boolean getAsBoolean(final String fieldName) {
- return DataTypeUtils.toBoolean(getValue(fieldName));
+ return DataTypeUtils.toBoolean(getValue(fieldName), fieldName);
}
@Override
public Date getAsDate(final String fieldName, final String format) {
- return DataTypeUtils.toDate(getValue(fieldName), format);
+ return DataTypeUtils.toDate(getValue(fieldName), format, fieldName);
}
@Override
public Object[] getAsArray(final String fieldName) {
- return DataTypeUtils.toArray(getValue(fieldName));
+ return DataTypeUtils.toArray(getValue(fieldName), fieldName);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/Record.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/Record.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/Record.java
index e1d52e9..5e5e7ba 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/Record.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/Record.java
@@ -38,10 +38,14 @@ public interface Record {
Object getValue(String fieldName);
+ Object getValue(RecordField field);
+
String getAsString(String fieldName);
String getAsString(String fieldName, String format);
+ String getAsString(RecordField field, String format);
+
Long getAsLong(String fieldName);
Integer getAsInt(String fieldName);
http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordField.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordField.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordField.java
index 135ae66..fe3d8e5 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordField.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordField.java
@@ -17,29 +17,66 @@
package org.apache.nifi.serialization.record;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
public class RecordField {
private final String fieldName;
private final DataType dataType;
+ private final Set<String> aliases;
+ private final Object defaultValue;
public RecordField(final String fieldName, final DataType dataType) {
- this.fieldName = fieldName;
- this.dataType = dataType;
+ this(fieldName, dataType, null, Collections.emptySet());
+ }
+
+ public RecordField(final String fieldName, final DataType dataType, final Object defaultValue) {
+ this(fieldName, dataType, defaultValue, Collections.emptySet());
+ }
+
+ public RecordField(final String fieldName, final DataType dataType, final Set<String> aliases) {
+ this(fieldName, dataType, null, aliases);
+ }
+
+ public RecordField(final String fieldName, final DataType dataType, final Object defaultValue, final Set<String> aliases) {
+ if (defaultValue != null && !DataTypeUtils.isCompatibleDataType(defaultValue, dataType)) {
+ throw new IllegalArgumentException("Cannot set the default value for field [" + fieldName + "] to [" + defaultValue
+ + "] because that is not a valid value for Data Type [" + dataType + "]");
+ }
+
+ this.fieldName = Objects.requireNonNull(fieldName);
+ this.dataType = Objects.requireNonNull(dataType);
+ this.aliases = Collections.unmodifiableSet(Objects.requireNonNull(aliases));
+ this.defaultValue = defaultValue;
}
public String getFieldName() {
return fieldName;
}
+ public Set<String> getAliases() {
+ return aliases;
+ }
+
public DataType getDataType() {
return dataType;
}
+ public Object getDefaultValue() {
+ return defaultValue;
+ }
+
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
- result = prime * result + ((dataType == null) ? 0 : dataType.hashCode());
- result = prime * result + ((fieldName == null) ? 0 : fieldName.hashCode());
+ result = prime * result + dataType.hashCode();
+ result = prime * result + fieldName.hashCode();
+ result = prime * result + aliases.hashCode();
+ result = prime * result + ((defaultValue == null) ? 0 : defaultValue.hashCode());
return result;
}
@@ -57,7 +94,7 @@ public class RecordField {
}
RecordField other = (RecordField) obj;
- return dataType.equals(other.getDataType()) && fieldName.equals(other.getFieldName());
+ return dataType.equals(other.getDataType()) && fieldName.equals(other.getFieldName()) && aliases.equals(other.getAliases()) && Objects.equals(defaultValue, other.defaultValue);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java
index cc83a41..785b8d2 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java
@@ -25,6 +25,7 @@ import java.util.Map;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
+import org.apache.nifi.serialization.record.type.MapDataType;
import org.apache.nifi.serialization.record.type.RecordDataType;
public enum RecordFieldType {
@@ -149,7 +150,7 @@ public enum RecordFieldType {
/**
* <p>
- * An array field type. Records should be updated using an {@code Object[]} value for this field. Note that we are explicitly indicating that
+ * An array field type. Fields of this type use a {@code Object[]} value. Note that we are explicitly indicating that
* Object[] should be used here and not primitive array types. For instance, setting a value of {@code int[]} is not allowed. The DataType for
* this field should be created using the {@link #getArrayDataType(DataType)} method:
* </p>
@@ -173,7 +174,34 @@ public enum RecordFieldType {
* </pre>
* </code>
*/
- ARRAY("array", null, new ArrayDataType(null));
+ ARRAY("array", null, new ArrayDataType(null)),
+
+ /**
+ * <p>
+ * A record field type. Fields of this type use a {@code Map<String, Object>} value. A Map DataType should be
+ * created by providing the {@link DataType} for the values:
+ * </p>
+ *
+ * <code>
+ * final DataType recordType = RecordFieldType.MAP.getRecordDataType( RecordFieldType.STRING.getDataType() );
+ * </code>
+ *
+ * <p>
+ * A field of type MAP should always have a {@link MapDataType}, so the following idiom is acceptable for use:
+ * </p>
+ *
+ * <code>
+ * <pre>
+ * final DataType dataType = ...;
+ * if (dataType.getFieldType() == RecordFieldType.MAP) {
+ * final MapDataType mapDataType = (MapDataType) dataType;
+ * final DataType valueType = mapDataType.getValueType();
+ * ...
+ * }
+ * </pre>
+ * </code>
+ */
+ MAP("map", null, new MapDataType(null));
private static final Map<String, RecordFieldType> SIMPLE_NAME_MAP = new HashMap<String, RecordFieldType>();
@@ -235,11 +263,11 @@ public enum RecordFieldType {
}
/**
- * Returns a Data Type that represents a "RECORD" or "ARRAY" type with the given schema.
+ * Returns a Data Type that represents an "ARRAY" type with the given element type.
*
* @param elementType the type of the arrays in the element
- * @return a DataType that represents a Record or Array with the given schema, or <code>null</code> if this RecordFieldType
- * is not the RECORD or ARRAY type.
+ * @return a DataType that represents an Array with the given element type, or <code>null</code> if this RecordFieldType
+ * is not the ARRAY type.
*/
public DataType getArrayDataType(final DataType elementType) {
if (this != ARRAY) {
@@ -287,6 +315,21 @@ public enum RecordFieldType {
return new ChoiceDataType(list);
}
+ /**
+ * Returns a Data Type that represents a "MAP" type with the given value type.
+ *
+ * @param valueDataType the type of the values in the map
+ * @return a DataType that represents a Map with the given value type, or <code>null</code> if this RecordFieldType
+ * is not the MAP type.
+ */
+ public DataType getMapDataType(final DataType valueDataType) {
+ if (this != MAP) {
+ return null;
+ }
+
+ return new MapDataType(valueDataType);
+ }
+
public static RecordFieldType of(final String typeString) {
return SIMPLE_NAME_MAP.get(typeString);
http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordSchema.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordSchema.java
index 115fb51..367f2b0 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordSchema.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordSchema.java
@@ -55,4 +55,25 @@ public interface RecordSchema {
* <code>null</code> if the schema does not contain a field with the given name
*/
Optional<DataType> getDataType(String fieldName);
+
+ /**
+ * @return the textual representation of the schema, if one is available
+ */
+ Optional<String> getSchemaText();
+
+ /**
+ * @return the format of the schema text, if schema text is present
+ */
+ Optional<String> getSchemaFormat();
+
+ /**
+ * @param fieldName the name of the field
+ * @return an Optional RecordField for the field with the given name
+ */
+ Optional<RecordField> getField(String fieldName);
+
+ /**
+ * @return the SchemaIdentifier, which provides various attributes for identifying a schema
+ */
+ SchemaIdentifier getIdentifier();
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
index be064ab..b6daab7 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
@@ -157,6 +157,20 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
case Types.LONGVARBINARY:
case Types.VARBINARY:
return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType());
+ case Types.OTHER:
+ // If we have no records to inspect, we can't really know its schema so we simply use the default data type.
+ if (rs.isAfterLast()) {
+ return RecordFieldType.RECORD.getDataType();
+ }
+
+ final Object obj = rs.getObject(columnIndex);
+ if (obj == null || !(obj instanceof Record)) {
+ return RecordFieldType.RECORD.getDataType();
+ }
+
+ final Record record = (Record) obj;
+ final RecordSchema recordSchema = record.getSchema();
+ return RecordFieldType.RECORD.getRecordDataType(recordSchema);
default:
return getFieldType(sqlType).getDataType();
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java
new file mode 100644
index 0000000..b711952
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record;
+
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+public interface SchemaIdentifier {
+
+ /**
+ * @return the name of the schema, if one has been defined.
+ */
+ Optional<String> getName();
+
+ /**
+ * @return the identifier of the schema, if one has been defined.
+ */
+ OptionalLong getIdentifier();
+
+ /**
+ * @return the version of the schema, if one has been defined.
+ */
+ OptionalInt getVersion();
+
+
+ public static SchemaIdentifier EMPTY = new SchemaIdentifier() {
+ @Override
+ public Optional<String> getName() {
+ return Optional.empty();
+ }
+
+ @Override
+ public OptionalLong getIdentifier() {
+ return OptionalLong.empty();
+ }
+
+ @Override
+ public OptionalInt getVersion() {
+ return OptionalInt.empty();
+ }
+ };
+
+ public static SchemaIdentifier ofName(final String name) {
+ return new SchemaIdentifier() {
+ @Override
+ public Optional<String> getName() {
+ return Optional.ofNullable(name);
+ }
+
+ @Override
+ public OptionalLong getIdentifier() {
+ return OptionalLong.empty();
+ }
+
+ @Override
+ public OptionalInt getVersion() {
+ return OptionalInt.empty();
+ }
+ };
+ }
+
+ public static SchemaIdentifier of(final String name, final long identifier, final int version) {
+ return new SchemaIdentifier() {
+ @Override
+ public Optional<String> getName() {
+ return Optional.ofNullable(name);
+ }
+
+ @Override
+ public OptionalLong getIdentifier() {
+ return OptionalLong.of(identifier);
+ }
+
+ @Override
+ public OptionalInt getVersion() {
+ return OptionalInt.of(version);
+ }
+ };
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ArrayDataType.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ArrayDataType.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ArrayDataType.java
index f507f23..0c21239 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ArrayDataType.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ArrayDataType.java
@@ -52,7 +52,7 @@ public class ArrayDataType extends DataType {
if (obj == null) {
return false;
}
- if (!(obj instanceof RecordDataType)) {
+ if (!(obj instanceof ArrayDataType)) {
return false;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ChoiceDataType.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ChoiceDataType.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ChoiceDataType.java
index b74cdcc..038b147 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ChoiceDataType.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ChoiceDataType.java
@@ -53,7 +53,7 @@ public class ChoiceDataType extends DataType {
if (obj == null) {
return false;
}
- if (!(obj instanceof RecordDataType)) {
+ if (!(obj instanceof ChoiceDataType)) {
return false;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/MapDataType.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/MapDataType.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/MapDataType.java
new file mode 100644
index 0000000..a85fb5e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/MapDataType.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record.type;
+
+import java.util.Objects;
+
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordFieldType;
+
+public class MapDataType extends DataType {
+ private final DataType valueType;
+
+ public MapDataType(final DataType elementType) {
+ super(RecordFieldType.MAP, null);
+ this.valueType = elementType;
+ }
+
+ public DataType getValueType() {
+ return valueType;
+ }
+
+ @Override
+ public RecordFieldType getFieldType() {
+ return RecordFieldType.MAP;
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 + 41 * getFieldType().hashCode() + 41 * (valueType == null ? 0 : valueType.hashCode());
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (!(obj instanceof MapDataType)) {
+ return false;
+ }
+
+ final MapDataType other = (MapDataType) obj;
+ return getValueType().equals(other.getValueType()) && Objects.equals(valueType, other.valueType);
+ }
+
+ @Override
+ public String toString() {
+ return "MAP[" + valueType + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/RecordDataType.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/RecordDataType.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/RecordDataType.java
index f24d036..006d34c 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/RecordDataType.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/RecordDataType.java
@@ -60,4 +60,9 @@ public class RecordDataType extends DataType {
final RecordDataType other = (RecordDataType) obj;
return getFieldType().equals(other.getFieldType()) && Objects.equals(childSchema, other.childSchema);
}
+
+ @Override
+ public String toString() {
+ return RecordFieldType.RECORD.toString();
+ }
}