You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/04/11 23:33:40 UTC
[17/19] nifi git commit: NIFI-1280 added support for RecordSchema in
SchemaRegistry
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryFlowFile.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryFlowFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryFlowFile.java
index 41469ba..8e1c7ed 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryFlowFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryFlowFile.java
@@ -17,31 +17,22 @@
package org.apache.nifi.processors.standard;
import java.io.IOException;
-import java.io.InputStream;
import java.io.OutputStream;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processors.standard.util.record.MockRecordParser;
+import org.apache.nifi.processors.standard.util.record.MockRecordWriter;
import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.serialization.MalformedRecordException;
-import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
-import org.apache.nifi.serialization.RowRecordReaderFactory;
-import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
-import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
-import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
@@ -247,58 +238,14 @@ public class TestQueryFlowFile {
Assert.assertEquals(columnNames, colNames);
- return WriteResult.of(0, Collections.emptyMap());
- }
-
- @Override
- public String getMimeType() {
- return "text/plain";
- }
-
- @Override
- public WriteResult write(Record record, OutputStream out) throws IOException {
- return null;
- }
- };
- }
-
- }
-
- private static class MockRecordWriter extends AbstractControllerService implements RecordSetWriterFactory {
- private final String header;
-
- public MockRecordWriter(final String header) {
- this.header = header;
- }
-
- @Override
- public RecordSetWriter createWriter(final ComponentLog logger) {
- return new RecordSetWriter() {
- @Override
- public WriteResult write(final RecordSet rs, final OutputStream out) throws IOException {
- out.write(header.getBytes());
- out.write("\n".getBytes());
-
- int recordCount = 0;
- final int numCols = rs.getSchema().getFieldCount();
- Record record = null;
+ // Iterate over the rest of the records to ensure that we read the entire stream. If we don't
+ // do this, we won't consume all of the data and as a result we will not close the stream properly
+ Record record;
while ((record = rs.next()) != null) {
- recordCount++;
- int i = 0;
- for (final String fieldName : record.getSchema().getFieldNames()) {
- final String val = record.getAsString(fieldName);
- out.write("\"".getBytes());
- out.write(val.getBytes());
- out.write("\"".getBytes());
-
- if (i++ < numCols - 1) {
- out.write(",".getBytes());
- }
- }
- out.write("\n".getBytes());
+ System.out.println(record);
}
- return WriteResult.of(recordCount, Collections.emptyMap());
+ return WriteResult.of(0, Collections.emptyMap());
}
@Override
@@ -312,68 +259,7 @@ public class TestQueryFlowFile {
}
};
}
- }
-
- private static class MockRecordParser extends AbstractControllerService implements RowRecordReaderFactory {
- private final List<Object[]> records = new ArrayList<>();
- private final List<RecordField> fields = new ArrayList<>();
- private final int failAfterN;
- public MockRecordParser() {
- this(-1);
- }
-
- public MockRecordParser(final int failAfterN) {
- this.failAfterN = failAfterN;
- }
-
-
- public void addSchemaField(final String fieldName, final RecordFieldType type) {
- fields.add(new RecordField(fieldName, type.getDataType()));
- }
-
- public void addRecord(Object... values) {
- records.add(values);
- }
-
- @Override
- public RecordReader createRecordReader(InputStream in, ComponentLog logger) throws IOException {
- final Iterator<Object[]> itr = records.iterator();
-
- return new RecordReader() {
- private int recordCount = 0;
-
- @Override
- public void close() throws IOException {
- }
-
- @Override
- public Record nextRecord() throws IOException, MalformedRecordException {
- if (failAfterN >= recordCount) {
- throw new MalformedRecordException("Intentional Unit Test Exception because " + recordCount + " records have been read");
- }
- recordCount++;
-
- if (!itr.hasNext()) {
- return null;
- }
-
- final Object[] values = itr.next();
- final Map<String, Object> valueMap = new HashMap<>();
- int i = 0;
- for (final RecordField field : fields) {
- final String fieldName = field.getFieldName();
- valueMap.put(fieldName, values[i++]);
- }
-
- return new MapRecord(new SimpleRecordSchema(fields), valueMap);
- }
-
- @Override
- public RecordSchema getSchema() {
- return new SimpleRecordSchema(fields);
- }
- };
- }
}
+
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordParser.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordParser.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordParser.java
new file mode 100644
index 0000000..1a39b82
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordParser.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard.util.record;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RowRecordReaderFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+public class MockRecordParser extends AbstractControllerService implements RowRecordReaderFactory {
+ private final List<Object[]> records = new ArrayList<>();
+ private final List<RecordField> fields = new ArrayList<>();
+ private final int failAfterN;
+
+ public MockRecordParser() {
+ this(-1);
+ }
+
+ public MockRecordParser(final int failAfterN) {
+ this.failAfterN = failAfterN;
+ }
+
+
+ public void addSchemaField(final String fieldName, final RecordFieldType type) {
+ fields.add(new RecordField(fieldName, type.getDataType()));
+ }
+
+ public void addRecord(Object... values) {
+ records.add(values);
+ }
+
+ @Override
+ public RecordReader createRecordReader(FlowFile flowFile, InputStream in, ComponentLog logger) throws IOException {
+ final Iterator<Object[]> itr = records.iterator();
+
+ return new RecordReader() {
+ private int recordCount = 0;
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public Record nextRecord() throws IOException, MalformedRecordException {
+ if (failAfterN >= recordCount) {
+ throw new MalformedRecordException("Intentional Unit Test Exception because " + recordCount + " records have been read");
+ }
+ recordCount++;
+
+ if (!itr.hasNext()) {
+ return null;
+ }
+
+ final Object[] values = itr.next();
+ final Map<String, Object> valueMap = new HashMap<>();
+ int i = 0;
+ for (final RecordField field : fields) {
+ final String fieldName = field.getFieldName();
+ valueMap.put(fieldName, values[i++]);
+ }
+
+ return new MapRecord(new SimpleRecordSchema(fields), valueMap);
+ }
+
+ @Override
+ public RecordSchema getSchema() {
+ return new SimpleRecordSchema(fields);
+ }
+ };
+ }
+
+ @Override
+ public RecordSchema getSchema(FlowFile flowFile) throws MalformedRecordException, IOException {
+ return null;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordWriter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordWriter.java
new file mode 100644
index 0000000..1cf2a28
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordWriter.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard.util.record;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collections;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSet;
+
+public class MockRecordWriter extends AbstractControllerService implements RecordSetWriterFactory {
+ private final String header;
+
+ public MockRecordWriter(final String header) {
+ this.header = header;
+ }
+
+ @Override
+ public RecordSetWriter createWriter(final ComponentLog logger) {
+ return new RecordSetWriter() {
+ @Override
+ public WriteResult write(final RecordSet rs, final OutputStream out) throws IOException {
+ out.write(header.getBytes());
+ out.write("\n".getBytes());
+
+ int recordCount = 0;
+ final int numCols = rs.getSchema().getFieldCount();
+ Record record = null;
+ while ((record = rs.next()) != null) {
+ recordCount++;
+ int i = 0;
+ for (final String fieldName : record.getSchema().getFieldNames()) {
+ final String val = record.getAsString(fieldName);
+ out.write("\"".getBytes());
+ out.write(val.getBytes());
+ out.write("\"".getBytes());
+
+ if (i++ < numCols - 1) {
+ out.write(",".getBytes());
+ }
+ }
+ out.write("\n".getBytes());
+ }
+
+ return WriteResult.of(recordCount, Collections.emptyMap());
+ }
+
+ @Override
+ public String getMimeType() {
+ return "text/plain";
+ }
+
+ @Override
+ public WriteResult write(Record record, OutputStream out) throws IOException {
+ return null;
+ }
+ };
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/pom.xml
index d7d5605..78c0381 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/pom.xml
@@ -17,7 +17,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services</artifactId>
- <version>1.1.0-SNAPSHOT</version>
+ <version>1.2.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-record-serialization-service-api</artifactId>
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordReader.java
index a0cfc79..b728498 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordReader.java
@@ -39,7 +39,6 @@ public interface RecordReader extends Closeable {
/**
* Returns the next record in the stream or <code>null</code> if no more records are available.
*
- * @param schema the schema to use in order to determine how to interprets the fields in a record
* @return the next record in the stream or <code>null</code> if no more records are available.
*
* @throws IOException if unable to read from the underlying data
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordWriter.java
index eef8d82..aa298d9 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordWriter.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordWriter.java
@@ -26,7 +26,7 @@ public interface RecordWriter {
/**
* Writes the given result set to the given output stream
*
- * @param recordSet the record set to serialize
+ * @param record the record set to serialize
* @param out the OutputStream to write to
* @return the results of writing the data
* @throws IOException if unable to write to the given OutputStream
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RowRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RowRecordReaderFactory.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RowRecordReaderFactory.java
index 5ef4c7c..fbd8a21 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RowRecordReaderFactory.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RowRecordReaderFactory.java
@@ -21,7 +21,9 @@ import java.io.IOException;
import java.io.InputStream;
import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.record.RecordSchema;
/**
* <p>
@@ -29,5 +31,8 @@ import org.apache.nifi.logging.ComponentLog;
* </p>
*/
public interface RowRecordReaderFactory extends ControllerService {
- RecordReader createRecordReader(InputStream in, ComponentLog logger) throws MalformedRecordException, IOException;
+
+ RecordReader createRecordReader(FlowFile flowFile, InputStream in, ComponentLog logger) throws MalformedRecordException, IOException;
+
+ RecordSchema getSchema(FlowFile flowFile) throws MalformedRecordException, IOException;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/DataType.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/DataType.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/DataType.java
index 0c187f1..b72c107 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/DataType.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/DataType.java
@@ -17,36 +17,15 @@
package org.apache.nifi.serialization.record;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-
public class DataType {
private final RecordFieldType fieldType;
private final String format;
- private final RecordSchema childSchema;
- private final List<DataType> childTypes;
-
- DataType(final RecordFieldType fieldType, final String format) {
- this(fieldType, format, (RecordSchema) null);
- }
-
- DataType(final RecordFieldType fieldType, final String format, final RecordSchema childSchema) {
+ protected DataType(final RecordFieldType fieldType, final String format) {
this.fieldType = fieldType;
this.format = format;
- this.childSchema = childSchema;
- this.childTypes = Collections.emptyList();
}
- DataType(final RecordFieldType fieldType, final String format, final List<DataType> childTypes) {
- this.fieldType = fieldType;
- this.format = format;
- this.childSchema = null;
- this.childTypes = Collections.unmodifiableList(childTypes);
- }
-
-
public String getFormat() {
return format;
}
@@ -55,14 +34,6 @@ public class DataType {
return fieldType;
}
- public Optional<RecordSchema> getChildRecordSchema() {
- return Optional.ofNullable(childSchema);
- }
-
- public List<DataType> getPossibleTypes() {
- return childTypes;
- }
-
@Override
public int hashCode() {
return 31 + 41 * fieldType.hashCode() + 41 * (format == null ? 0 : format.hashCode());
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/MapRecord.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
index f3f9024..0bbb534 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
@@ -17,16 +17,13 @@
package org.apache.nifi.serialization.record;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
import java.util.Date;
-import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
public class MapRecord implements Record {
private final RecordSchema schema;
private final Map<String, Object> values;
@@ -80,220 +77,52 @@ public class MapRecord implements Record {
return null;
}
- if (value instanceof java.sql.Date) {
- java.sql.Date date = (java.sql.Date) value;
- final long time = date.getTime();
- return new SimpleDateFormat(getFormat(format, RecordFieldType.DATE)).format(new java.util.Date(time));
- }
- if (value instanceof java.util.Date) {
- return new SimpleDateFormat(getFormat(format, RecordFieldType.DATE)).format((java.util.Date) value);
- }
- if (value instanceof Timestamp) {
- java.sql.Timestamp date = (java.sql.Timestamp) value;
- final long time = date.getTime();
- return new SimpleDateFormat(getFormat(format, RecordFieldType.TIMESTAMP)).format(new java.util.Date(time));
- }
- if (value instanceof Time) {
- java.sql.Time date = (java.sql.Time) value;
- final long time = date.getTime();
- return new SimpleDateFormat(getFormat(format, RecordFieldType.TIME)).format(new java.util.Date(time));
- }
-
- return value.toString();
+ final String dateFormat = getFormat(format, RecordFieldType.DATE);
+ final String timestampFormat = getFormat(format, RecordFieldType.TIMESTAMP);
+ final String timeFormat = getFormat(format, RecordFieldType.TIME);
+ return DataTypeUtils.toString(value, dateFormat, timeFormat, timestampFormat);
}
@Override
public Long getAsLong(final String fieldName) {
- return convertToLong(getValue(fieldName), fieldName);
- }
-
- private Long convertToLong(final Object value, final Object fieldDesc) {
- if (value == null) {
- return null;
- }
-
- if (value instanceof Number) {
- return ((Number) value).longValue();
- }
- if (value instanceof String) {
- return Long.parseLong((String) value);
- }
- if (value instanceof Date) {
- return ((Date) value).getTime();
- }
-
- throw new TypeMismatchException("Cannot convert value of type " + value.getClass() + " to Long for field " + fieldDesc);
+ return DataTypeUtils.toLong(getValue(fieldName));
}
@Override
public Integer getAsInt(final String fieldName) {
- return convertToInt(getValue(fieldName), fieldName);
- }
-
- private Integer convertToInt(final Object value, final Object fieldDesc) {
- if (value == null) {
- return null;
- }
-
- if (value instanceof Number) {
- return ((Number) value).intValue();
- }
- if (value instanceof String) {
- return Integer.parseInt((String) value);
- }
-
- throw new TypeMismatchException("Cannot convert value of type " + value.getClass() + " to Integer for field " + fieldDesc);
+ return DataTypeUtils.toInteger(getValue(fieldName));
}
-
@Override
public Double getAsDouble(final String fieldName) {
- return convertToDouble(getValue(fieldName), fieldName);
- }
-
- private Double convertToDouble(final Object value, final Object fieldDesc) {
- if (value == null) {
- return null;
- }
-
- if (value instanceof Number) {
- return ((Number) value).doubleValue();
- }
- if (value instanceof String) {
- return Double.parseDouble((String) value);
- }
-
- throw new TypeMismatchException("Cannot convert value of type " + value.getClass() + " to Double for field " + fieldDesc);
+ return DataTypeUtils.toDouble(getValue(fieldName));
}
@Override
public Float getAsFloat(final String fieldName) {
- return convertToFloat(getValue(fieldName), fieldName);
- }
-
- private Float convertToFloat(final Object value, final Object fieldDesc) {
- if (value == null) {
- return null;
- }
-
- if (value instanceof Number) {
- return ((Number) value).floatValue();
- }
- if (value instanceof String) {
- return Float.parseFloat((String) value);
- }
-
- throw new TypeMismatchException("Cannot convert value of type " + value.getClass() + " to Float for field " + fieldDesc);
+ return DataTypeUtils.toFloat(getValue(fieldName));
}
@Override
- public Record getAsRecord(String fieldName) {
- return convertToRecord(getValue(fieldName), fieldName);
- }
-
- private Record convertToRecord(final Object value, final Object fieldDesc) {
- if (value == null) {
- return null;
- }
-
- if (value instanceof Record) {
- return (Record) value;
- }
-
- throw new TypeMismatchException("Cannot convert value of type " + value.getClass() + " to Record for field " + fieldDesc);
+ public Record getAsRecord(String fieldName, final RecordSchema schema) {
+ return DataTypeUtils.toRecord(getValue(fieldName), schema);
}
-
@Override
public Boolean getAsBoolean(final String fieldName) {
- return convertToBoolean(getValue(fieldName), fieldName);
- }
-
- private Boolean convertToBoolean(final Object value, final Object fieldDesc) {
- if (value == null) {
- return null;
- }
-
- if (value instanceof Boolean) {
- return (Boolean) value;
- }
- if (value instanceof String) {
- final String string = (String) value;
- if (string.equalsIgnoreCase("true") || string.equalsIgnoreCase("t")) {
- return Boolean.TRUE;
- }
-
- if (string.equalsIgnoreCase("false") || string.equals("f")) {
- return Boolean.FALSE;
- }
-
- throw new TypeMismatchException("Cannot convert String value to Boolean for field " + fieldDesc + " because it is not a valid boolean value");
- }
-
- throw new TypeMismatchException("Cannot convert value of type " + value.getClass() + " to Boolean for field " + fieldDesc);
- }
-
- @Override
- public Date getAsDate(final String fieldName) {
- final Optional<DataType> dataTypeOption = schema.getDataType(fieldName);
- if (!dataTypeOption.isPresent()) {
- return null;
- }
-
- return convertToDate(getValue(fieldName), fieldName, dataTypeOption.get().getFormat());
+ return DataTypeUtils.toBoolean(getValue(fieldName));
}
@Override
public Date getAsDate(final String fieldName, final String format) {
- return convertToDate(getValue(fieldName), fieldName, format);
- }
-
- private Date convertToDate(final Object value, final Object fieldDesc, final String format) {
- if (value == null) {
- return null;
- }
-
- if (value instanceof Date) {
- return (Date) value;
- }
- if (value instanceof Number) {
- final Long time = ((Number) value).longValue();
- return new Date(time);
- }
- if (value instanceof java.sql.Date) {
- return new Date(((java.sql.Date) value).getTime());
- }
- if (value instanceof String) {
- try {
- return new SimpleDateFormat(getFormat(format, RecordFieldType.DATE)).parse((String) value);
- } catch (final ParseException e) {
- throw new TypeMismatchException("Cannot convert String value to date for field " + fieldDesc + " because it is not in the correct format of: " + format, e);
- }
- }
-
- throw new TypeMismatchException("Cannot convert value of type " + value.getClass() + " to Boolean for field " + fieldDesc);
+ return DataTypeUtils.toDate(getValue(fieldName), format);
}
@Override
public Object[] getAsArray(final String fieldName) {
- return convertToArray(getValue(fieldName));
+ return DataTypeUtils.toArray(getValue(fieldName));
}
- private Object[] convertToArray(final Object value) {
- if (value == null) {
- return null;
- }
-
- if (value instanceof Object[]) {
- return (Object[]) value;
- }
-
- if (value instanceof List) {
- return ((List<?>) value).toArray();
- }
-
- return new Object[] {value};
- }
@Override
public int hashCode() {
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/Record.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/Record.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/Record.java
index ca85741..e1d52e9 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/Record.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/Record.java
@@ -50,12 +50,10 @@ public interface Record {
Float getAsFloat(String fieldName);
- Record getAsRecord(String fieldName);
+ Record getAsRecord(String fieldName, RecordSchema schema);
Boolean getAsBoolean(String fieldName);
- Date getAsDate(String fieldName);
-
Date getAsDate(String fieldName, String format);
Object[] getAsArray(String fieldName);
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java
index 8ad212b..cc83a41 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java
@@ -17,35 +17,171 @@
package org.apache.nifi.serialization.record;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.ChoiceDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+
public enum RecordFieldType {
+ /**
+ * A String field type. Fields of this type use a {@code java.lang.String} value.
+ */
STRING("string"),
+
+ /**
+ * A boolean field type. Fields of this type use a {@code boolean} value.
+ */
BOOLEAN("boolean"),
+
+ /**
+ * A byte field type. Fields of this type use a {@code byte} value.
+ */
BYTE("byte"),
+
+ /**
+ * A char field type. Fields of this type use a {@code char} value.
+ */
CHAR("char"),
+
+ /**
+ * A short field type. Fields of this type use a {@code short} value.
+ */
SHORT("short"),
+
+ /**
+ * An int field type. Fields of this type use an {@code int} value.
+ */
INT("int"),
+
+ /**
+ * A bigint field type. Fields of this type use a {@code java.math.BigInteger} value.
+ */
BIGINT("bigint"),
+
+ /**
+ * A long field type. Fields of this type use a {@code long} value.
+ */
LONG("long"),
+
+ /**
+ * A float field type. Fields of this type use a {@code float} value.
+ */
FLOAT("float"),
+
+ /**
+ * A double field type. Fields of this type use a {@code double} value.
+ */
DOUBLE("double"),
+
+ /**
+ * A date field type. Fields of this type use a {@code java.sql.Date} value.
+ */
DATE("date", "yyyy-MM-dd"),
+
+ /**
+ * A time field type. Fields of this type use a {@code java.sql.Time} value.
+ */
TIME("time", "HH:mm:ss"),
+
+ /**
+ * A timestamp field type. Fields of this type use a {@code java.sql.Timestamp} value.
+ */
TIMESTAMP("timestamp", "yyyy-MM-dd HH:mm:ss"),
- RECORD("record"),
- CHOICE("choice"),
- ARRAY("array");
+
+ /**
+ * <p>
+ * A record field type. Fields of this type use a {@code org.apache.nifi.serialization.record.Record} value. A Record DataType should be
+ * created by providing the {@link RecordSchema} for the record:
+ * </p>
+ *
+ * <code>
+ * final DataType recordType = RecordFieldType.RECORD.getRecordDataType(recordSchema);
+ * </code>
+ *
+ * <p>
+ * A field of type RECORD should always have a {@link RecordDataType}, so the following idiom is acceptable for use:
+ * </p>
+ *
+ * <code>
+ * <pre>
+ * final DataType dataType = ...;
+ * if (dataType.getFieldType() == RecordFieldType.RECORD) {
+ * final RecordDataType recordDataType = (RecordDataType) dataType;
+ * final RecordSchema childSchema = recordDataType.getChildSchema();
+ * ...
+ * }
+ * </pre>
+ * </code>
+ */
+ RECORD("record", null, new RecordDataType(null)),
+
+ /**
+ * <p>
+ * A choice field type. A field of type choice can be one of any number of different types, which are defined by the DataType that is used.
+ * For example, if a field should allow either a Long or an Integer, this can be accomplished by using:
+ * </p>
+ *
+ * <code>
+ * final DataType choiceType = RecordFieldType.CHOICE.getChoiceDataType( RecordFieldType.INT.getDataType(), RecordFieldType.LONG.getDataType() );
+ * </code>
+ *
+ * <p>
+ * A field of type CHOICE should always have a {@link ChoiceDataType}, so the following idiom is acceptable for use:
+ * </p>
+ *
+ * <code>
+ * <pre>
+ * final DataType dataType = ...;
+ * if (dataType.getFieldType() == RecordFieldType.CHOICE) {
+ * final ChoiceDataType choiceDataType = (ChoiceDataType) dataType;
+ * final List<DataType> allowableTypes = choiceDataType.getPossibleSubTypes();
+ * ...
+ * }
+ * </pre>
+ * </code>
+ */
+ CHOICE("choice", null, new ChoiceDataType(Collections.emptyList())),
+
+ /**
+ * <p>
+ * An array field type. Records should be updated using an {@code Object[]} value for this field. Note that we are explicitly indicating that
+ * Object[] should be used here and not primitive array types. For instance, setting a value of {@code int[]} is not allowed. The DataType for
+ * this field should be created using the {@link #getArrayDataType(DataType)} method:
+ * </p>
+ *
+ * <code>
+ * final DataType arrayType = RecordFieldType.ARRAY.getArrayDataType( RecordFieldType.INT.getDataType() );
+ * </code>
+ *
+ * <p>
+ * A field of type ARRAY should always have an {@link ArrayDataType}, so the following idiom is acceptable for use:
+ * </p>
+ *
+ * <code>
+ * <pre>
+ * final DataType dataType = ...;
+ * if (dataType.getFieldType() == RecordFieldType.ARRAY) {
+ * final ArrayDataType arrayDataType = (ArrayDataType) dataType;
+ * final DataType elementType = arrayDataType.getElementType();
+ * ...
+ * }
+ * </pre>
+ * </code>
+ */
+ ARRAY("array", null, new ArrayDataType(null));
private static final Map<String, RecordFieldType> SIMPLE_NAME_MAP = new HashMap<String, RecordFieldType>();
static {
- for (RecordFieldType value : values()) {
- SIMPLE_NAME_MAP.put(value.simpleName, value);
- }
+ for (RecordFieldType value : values()) {
+ SIMPLE_NAME_MAP.put(value.simpleName, value);
+ }
}
private final String simpleName;
@@ -62,6 +198,12 @@ public enum RecordFieldType {
this.defaultDataType = new DataType(this, defaultFormat);
}
+ private RecordFieldType(final String simpleName, final String defaultFormat, final DataType defaultDataType) {
+ this.simpleName = simpleName;
+ this.defaultFormat = defaultFormat;
+ this.defaultDataType = defaultDataType;
+ }
+
public String getDefaultFormat() {
return defaultFormat;
}
@@ -78,18 +220,50 @@ public enum RecordFieldType {
}
/**
- * Returns a Data Type that represents a "RECORD" type with the given schema.
+ * Returns a Data Type that represents a "RECORD" or "ARRAY" type with the given schema.
*
- * @param childSchema the Schema for the Record
- * @return a DataType that represents a Record with the given schema, or <code>null</code> if this RecordFieldType
- * is not the RECORD type.
+ * @param childSchema the Schema for the Record or Array
+ * @return a DataType that represents a Record or Array with the given schema, or <code>null</code> if this RecordFieldType
+ * is not the RECORD or ARRAY type.
*/
- public DataType getDataType(final RecordSchema childSchema) {
+ public DataType getRecordDataType(final RecordSchema childSchema) {
if (this != RECORD) {
return null;
}
- return new DataType(this, getDefaultFormat(), childSchema);
+ return new RecordDataType(childSchema);
+ }
+
+ /**
+ * Returns a Data Type that represents a "RECORD" or "ARRAY" type with the given schema.
+ *
+ * @param elementType the type of the arrays in the element
+ * @return a DataType that represents a Record or Array with the given schema, or <code>null</code> if this RecordFieldType
+ * is not the RECORD or ARRAY type.
+ */
+ public DataType getArrayDataType(final DataType elementType) {
+ if (this != ARRAY) {
+ return null;
+ }
+
+ return new ArrayDataType(elementType);
+ }
+
+
+ /**
+ * Returns a Data Type that represents a "CHOICE" of multiple possible types. This method is
+ * only applicable for a RecordFieldType of {@link #CHOICE}.
+ *
+ * @param possibleChildTypes the possible types that are allowable
+ * @return a DataType that represents a "CHOICE" of multiple possible types, or <code>null</code> if this RecordFieldType
+ * is not the CHOICE type.
+ */
+ public DataType getChoiceDataType(final List<DataType> possibleChildTypes) {
+ if (this != CHOICE) {
+ return null;
+ }
+
+ return new ChoiceDataType(possibleChildTypes);
}
/**
@@ -100,14 +274,20 @@ public enum RecordFieldType {
* @return a DataType that represents a "CHOICE" of multiple possible types, or <code>null</code> if this RecordFieldType
* is not the CHOICE type.
*/
- public DataType getDataType(final List<DataType> possibleChildTypes) {
+ public DataType getChoiceDataType(final DataType... possibleChildTypes) {
if (this != CHOICE) {
return null;
}
- return new DataType(this, getDefaultFormat(), possibleChildTypes);
+ final List<DataType> list = new ArrayList<>(possibleChildTypes.length);
+ for (final DataType type : possibleChildTypes) {
+ list.add(type);
+ }
+
+ return new ChoiceDataType(list);
}
+
public static RecordFieldType of(final String typeString) {
return SIMPLE_NAME_MAP.get(typeString);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
index e166918..be064ab 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
@@ -19,6 +19,8 @@ package org.apache.nifi.serialization.record;
import java.io.Closeable;
import java.io.IOException;
+import java.math.BigInteger;
+import java.sql.Array;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
@@ -39,9 +41,11 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
private final ResultSet rs;
private final RecordSchema schema;
private final Set<String> rsColumnNames;
+ private boolean moreRows;
public ResultSetRecordSet(final ResultSet rs) throws SQLException {
this.rs = rs;
+ moreRows = rs.next();
this.schema = createSchema(rs);
rsColumnNames = new HashSet<>();
@@ -59,14 +63,16 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
@Override
public Record next() throws IOException {
try {
- if (rs.next()) {
- return createRecord(rs);
+ if (moreRows) {
+ final Record record = createRecord(rs);
+ moreRows = rs.next();
+ return record;
+ } else {
+ return null;
}
} catch (final SQLException e) {
throw new IOException("Could not obtain next record from ResultSet", e);
}
-
- return null;
}
@Override
@@ -86,7 +92,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
final Object value;
if (rsColumnNames.contains(fieldName)) {
- value = rs.getObject(field.getFieldName());
+ value = normalizeValue(rs.getObject(fieldName));
} else {
value = null;
}
@@ -97,6 +103,19 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
return new MapRecord(schema, values);
}
+ @SuppressWarnings("rawtypes")
+ private Object normalizeValue(final Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof List) {
+ return ((List) value).toArray();
+ }
+
+ return value;
+ }
+
private static RecordSchema createSchema(final ResultSet rs) throws SQLException {
final ResultSetMetaData metadata = rs.getMetaData();
final int numCols = metadata.getColumnCount();
@@ -106,26 +125,149 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
final int column = i + 1;
final int sqlType = metadata.getColumnType(column);
- final RecordFieldType fieldType = getFieldType(sqlType);
+ final DataType dataType = getDataType(sqlType, rs, column);
final String fieldName = metadata.getColumnLabel(column);
- final RecordField field = new RecordField(fieldName, fieldType.getDataType());
+ final RecordField field = new RecordField(fieldName, dataType);
fields.add(field);
}
return new SimpleRecordSchema(fields);
}
- private static RecordFieldType getFieldType(final int sqlType) {
+ private static DataType getDataType(final int sqlType, final ResultSet rs, final int columnIndex) throws SQLException {
switch (sqlType) {
case Types.ARRAY:
- return RecordFieldType.ARRAY;
- case Types.BIGINT:
- case Types.ROWID:
- return RecordFieldType.LONG;
+ // The JDBC API does not allow us to know what the base type of an array is through the metadata.
+ // As a result, we have to obtain the actual Array for this record. Once we have this, we can determine
+ // the base type. However, if the base type is, itself, an array, we will simply return a base type of
+ // String because otherwise, we need the ResultSet for the array itself, and many JDBC Drivers do not
+ // support calling Array.getResultSet() and will throw an Exception if that is not supported.
+ if (rs.isAfterLast()) {
+ return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType());
+ }
+
+ final Array array = rs.getArray(columnIndex);
+ if (array == null) {
+ return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType());
+ }
+
+ final DataType baseType = getArrayBaseType(array);
+ return RecordFieldType.ARRAY.getArrayDataType(baseType);
case Types.BINARY:
case Types.LONGVARBINARY:
case Types.VARBINARY:
- return RecordFieldType.ARRAY;
+ return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType());
+ default:
+ return getFieldType(sqlType).getDataType();
+ }
+ }
+
+ private static DataType getArrayBaseType(final Array array) throws SQLException {
+ final Object arrayValue = array.getArray();
+ if (arrayValue == null) {
+ return RecordFieldType.STRING.getDataType();
+ }
+
+ if (arrayValue instanceof byte[]) {
+ return RecordFieldType.BYTE.getDataType();
+ }
+ if (arrayValue instanceof int[]) {
+ return RecordFieldType.INT.getDataType();
+ }
+ if (arrayValue instanceof long[]) {
+ return RecordFieldType.LONG.getDataType();
+ }
+ if (arrayValue instanceof boolean[]) {
+ return RecordFieldType.BOOLEAN.getDataType();
+ }
+ if (arrayValue instanceof short[]) {
+ return RecordFieldType.SHORT.getDataType();
+ }
+ if (arrayValue instanceof byte[]) {
+ return RecordFieldType.BYTE.getDataType();
+ }
+ if (arrayValue instanceof float[]) {
+ return RecordFieldType.FLOAT.getDataType();
+ }
+ if (arrayValue instanceof double[]) {
+ return RecordFieldType.DOUBLE.getDataType();
+ }
+ if (arrayValue instanceof char[]) {
+ return RecordFieldType.CHAR.getDataType();
+ }
+ if (arrayValue instanceof Object[]) {
+ final Object[] values = (Object[]) arrayValue;
+ if (values.length == 0) {
+ return RecordFieldType.STRING.getDataType();
+ }
+
+ Object valueToLookAt = null;
+ for (int i = 0; i < values.length; i++) {
+ valueToLookAt = values[i];
+ if (valueToLookAt != null) {
+ break;
+ }
+ }
+ if (valueToLookAt == null) {
+ return RecordFieldType.STRING.getDataType();
+ }
+
+ if (valueToLookAt instanceof String) {
+ return RecordFieldType.STRING.getDataType();
+ }
+ if (valueToLookAt instanceof Long) {
+ return RecordFieldType.LONG.getDataType();
+ }
+ if (valueToLookAt instanceof Integer) {
+ return RecordFieldType.INT.getDataType();
+ }
+ if (valueToLookAt instanceof Short) {
+ return RecordFieldType.SHORT.getDataType();
+ }
+ if (valueToLookAt instanceof Byte) {
+ return RecordFieldType.BYTE.getDataType();
+ }
+ if (valueToLookAt instanceof Float) {
+ return RecordFieldType.FLOAT.getDataType();
+ }
+ if (valueToLookAt instanceof Double) {
+ return RecordFieldType.DOUBLE.getDataType();
+ }
+ if (valueToLookAt instanceof Boolean) {
+ return RecordFieldType.BOOLEAN.getDataType();
+ }
+ if (valueToLookAt instanceof Character) {
+ return RecordFieldType.CHAR.getDataType();
+ }
+ if (valueToLookAt instanceof BigInteger) {
+ return RecordFieldType.BIGINT.getDataType();
+ }
+ if (valueToLookAt instanceof Integer) {
+ return RecordFieldType.INT.getDataType();
+ }
+ if (valueToLookAt instanceof java.sql.Time) {
+ return RecordFieldType.TIME.getDataType();
+ }
+ if (valueToLookAt instanceof java.sql.Date) {
+ return RecordFieldType.DATE.getDataType();
+ }
+ if (valueToLookAt instanceof java.sql.Timestamp) {
+ return RecordFieldType.TIMESTAMP.getDataType();
+ }
+ if (valueToLookAt instanceof Record) {
+ return RecordFieldType.RECORD.getDataType();
+ }
+ }
+
+ return RecordFieldType.STRING.getDataType();
+ }
+
+
+ private static RecordFieldType getFieldType(final int sqlType) {
+ switch (sqlType) {
+ case Types.BIGINT:
+ case Types.ROWID:
+ return RecordFieldType.LONG;
case Types.BIT:
case Types.BOOLEAN:
return RecordFieldType.BOOLEAN;
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ArrayDataType.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ArrayDataType.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ArrayDataType.java
new file mode 100644
index 0000000..f507f23
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ArrayDataType.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record.type;
+
+import java.util.Objects;
+
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordFieldType;
+
+public class ArrayDataType extends DataType {
+ private final DataType elementType;
+
+ public ArrayDataType(final DataType elementType) {
+ super(RecordFieldType.ARRAY, null);
+ this.elementType = elementType;
+ }
+
+ public DataType getElementType() {
+ return elementType;
+ }
+
+ @Override
+ public RecordFieldType getFieldType() {
+ return RecordFieldType.ARRAY;
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 + 41 * getFieldType().hashCode() + 41 * (elementType == null ? 0 : elementType.hashCode());
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (!(obj instanceof RecordDataType)) {
+ return false;
+ }
+
+ final ArrayDataType other = (ArrayDataType) obj;
+ return getFieldType().equals(other.getFieldType()) && Objects.equals(elementType, other.elementType);
+ }
+
+ @Override
+ public String toString() {
+ return "ARRAY[" + elementType + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ChoiceDataType.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ChoiceDataType.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ChoiceDataType.java
new file mode 100644
index 0000000..b74cdcc
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ChoiceDataType.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record.type;
+
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordFieldType;
+
+public class ChoiceDataType extends DataType {
+ private final List<DataType> possibleSubTypes;
+
+ public ChoiceDataType(final List<DataType> possibleSubTypes) {
+ super(RecordFieldType.CHOICE, null);
+ this.possibleSubTypes = Objects.requireNonNull(possibleSubTypes);
+ }
+
+ public List<DataType> getPossibleSubTypes() {
+ return possibleSubTypes;
+ }
+
+ @Override
+ public RecordFieldType getFieldType() {
+ return RecordFieldType.CHOICE;
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 + 41 * getFieldType().hashCode() + 41 * (possibleSubTypes == null ? 0 : possibleSubTypes.hashCode());
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (!(obj instanceof RecordDataType)) {
+ return false;
+ }
+
+ final ChoiceDataType other = (ChoiceDataType) obj;
+ return getFieldType().equals(other.getFieldType()) && Objects.equals(possibleSubTypes, other.possibleSubTypes);
+ }
+
+ @Override
+ public String toString() {
+ return "CHOICE" + possibleSubTypes;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/RecordDataType.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/RecordDataType.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/RecordDataType.java
new file mode 100644
index 0000000..f24d036
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/RecordDataType.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record.type;
+
+import java.util.Objects;
+
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+public class RecordDataType extends DataType {
+ private final RecordSchema childSchema;
+
+ public RecordDataType(final RecordSchema childSchema) {
+ super(RecordFieldType.RECORD, null);
+ this.childSchema = childSchema;
+ }
+
+ @Override
+ public RecordFieldType getFieldType() {
+ return RecordFieldType.RECORD;
+ }
+
+ public RecordSchema getChildSchema() {
+ return childSchema;
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 + 41 * getFieldType().hashCode() + 41 * (childSchema == null ? 0 : childSchema.hashCode());
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (!(obj instanceof RecordDataType)) {
+ return false;
+ }
+
+ final RecordDataType other = (RecordDataType) obj;
+ return getFieldType().equals(other.getFieldType()) && Objects.equals(childSchema, other.childSchema);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
new file mode 100644
index 0000000..1cdefb8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
@@ -0,0 +1,608 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record.util;
+
+import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.function.Consumer;
+
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.ChoiceDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+
+public class DataTypeUtils {
+
+ private static final TimeZone gmt = TimeZone.getTimeZone("gmt");
+
+ public static Object convertType(final Object value, final DataType dataType) {
+ return convertType(value, dataType, RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat());
+ }
+
+ public static Object convertType(final Object value, final DataType dataType, final String dateFormat, final String timeFormat, final String timestampFormat) {
+ switch (dataType.getFieldType()) {
+ case BIGINT:
+ return toBigInt(value);
+ case BOOLEAN:
+ return toBoolean(value);
+ case BYTE:
+ return toByte(value);
+ case CHAR:
+ return toCharacter(value);
+ case DATE:
+ return toDate(value, dateFormat);
+ case DOUBLE:
+ return toDouble(value);
+ case FLOAT:
+ return toFloat(value);
+ case INT:
+ return toInteger(value);
+ case LONG:
+ return toLong(value);
+ case SHORT:
+ return toShort(value);
+ case STRING:
+ return toString(value, dateFormat, timeFormat, timestampFormat);
+ case TIME:
+ return toTime(value, timeFormat);
+ case TIMESTAMP:
+ return toTimestamp(value, timestampFormat);
+ case ARRAY:
+ return toArray(value);
+ case RECORD:
+ final RecordDataType recordType = (RecordDataType) dataType;
+ final RecordSchema childSchema = recordType.getChildSchema();
+ return toRecord(value, childSchema);
+ case CHOICE: {
+ if (value == null) {
+ return null;
+ }
+
+ final ChoiceDataType choiceDataType = (ChoiceDataType) dataType;
+ final DataType chosenDataType = chooseDataType(value, choiceDataType);
+ if (chosenDataType == null) {
+ throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass()
+ + " to any of the following available Sub-Types for a Choice: " + choiceDataType.getPossibleSubTypes());
+ }
+
+ return convertType(value, chosenDataType);
+ }
+ }
+
+ return null;
+ }
+
+
+ public static boolean isCompatibleDataType(final Object value, final DataType dataType) {
+ switch (dataType.getFieldType()) {
+ case ARRAY:
+ return isArrayTypeCompatible(value);
+ case BIGINT:
+ return isBigIntTypeCompatible(value);
+ case BOOLEAN:
+ return isBooleanTypeCompatible(value);
+ case BYTE:
+ return isByteTypeCompatible(value);
+ case CHAR:
+ return isCharacterTypeCompatible(value);
+ case DATE:
+ return isDateTypeCompatible(value, dataType.getFormat());
+ case DOUBLE:
+ return isDoubleTypeCompatible(value);
+ case FLOAT:
+ return isFloatTypeCompatible(value);
+ case INT:
+ return isIntegerTypeCompatible(value);
+ case LONG:
+ return isLongTypeCompatible(value);
+ case RECORD:
+ return isRecordTypeCompatible(value);
+ case SHORT:
+ return isShortTypeCompatible(value);
+ case TIME:
+ return isTimeTypeCompatible(value, dataType.getFormat());
+ case TIMESTAMP:
+ return isTimestampTypeCompatible(value, dataType.getFormat());
+ case STRING:
+ return isStringTypeCompatible(value);
+ case CHOICE: {
+ final DataType chosenDataType = chooseDataType(value, (ChoiceDataType) dataType);
+ return chosenDataType != null;
+ }
+ }
+
+ return false;
+ }
+
+ public static DataType chooseDataType(final Object value, final ChoiceDataType choiceType) {
+ for (final DataType subType : choiceType.getPossibleSubTypes()) {
+ if (isCompatibleDataType(value, subType)) {
+ return subType;
+ }
+ }
+
+ return null;
+ }
+
+ public static Record toRecord(final Object value, final RecordSchema recordSchema) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof Record) {
+ return ((Record) value);
+ }
+
+ if (value instanceof Map) {
+ if (recordSchema == null) {
+ throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass()
+ + " to Record because the value is a Map but no Record Schema was provided");
+ }
+
+ final Map<?, ?> map = (Map<?, ?>) value;
+ final Map<String, Object> coercedValues = new HashMap<>();
+
+ for (final Map.Entry<?, ?> entry : map.entrySet()) {
+ final Object keyValue = entry.getKey();
+ if (keyValue == null) {
+ continue;
+ }
+
+ final String key = keyValue.toString();
+ final Optional<DataType> desiredTypeOption = recordSchema.getDataType(key);
+ if (!desiredTypeOption.isPresent()) {
+ continue;
+ }
+
+ final Object rawValue = entry.getValue();
+ final Object coercedValue = convertType(rawValue, desiredTypeOption.get());
+ coercedValues.put(key, coercedValue);
+ }
+
+ return new MapRecord(recordSchema, coercedValues);
+ }
+
+ throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Record");
+ }
+
+ public static boolean isRecordTypeCompatible(final Object value) {
+ return value != null && value instanceof Record;
+ }
+
+ public static Object[] toArray(final Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof Object[]) {
+ return (Object[]) value;
+ }
+
+ throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Object Array");
+ }
+
+ public static boolean isArrayTypeCompatible(final Object value) {
+ return value != null && value instanceof Object[];
+ }
+
+ public static String toString(final Object value, final String dateFormat, final String timeFormat, final String timestampFormat) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof String) {
+ return (String) value;
+ }
+
+ if (value instanceof java.sql.Date) {
+ return getDateFormat(dateFormat).format((java.util.Date) value);
+ }
+ if (value instanceof java.sql.Time) {
+ return getDateFormat(timeFormat).format((java.util.Date) value);
+ }
+ if (value instanceof java.sql.Timestamp) {
+ return getDateFormat(timestampFormat).format((java.util.Date) value);
+ }
+ if (value instanceof java.util.Date) {
+ return getDateFormat(timestampFormat).format((java.util.Date) value);
+ }
+
+ return value.toString();
+ }
+
+ public static boolean isStringTypeCompatible(final Object value) {
+ return value != null && (value instanceof String || value instanceof java.util.Date);
+ }
+
+ public static java.sql.Date toDate(final Object value, final String format) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof Date) {
+ return (Date) value;
+ }
+
+ if (value instanceof Number) {
+ final long longValue = ((Number) value).longValue();
+ return new Date(longValue);
+ }
+
+ if (value instanceof String) {
+ try {
+ final java.util.Date utilDate = getDateFormat(format).parse((String) value);
+ return new Date(utilDate.getTime());
+ } catch (final ParseException e) {
+ throw new IllegalTypeConversionException("Could not convert value [" + value
+ + "] of type java.lang.String to Date because the value is not in the expected date format: " + format);
+ }
+ }
+
+ throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Date");
+ }
+
+ public static boolean isDateTypeCompatible(final Object value, final String format) {
+ if (value == null) {
+ return false;
+ }
+
+ if (value instanceof java.util.Date || value instanceof Number) {
+ return true;
+ }
+
+ if (value instanceof String) {
+ try {
+ getDateFormat(format).parse((String) value);
+ return true;
+ } catch (final ParseException e) {
+ return false;
+ }
+ }
+
+ return false;
+ }
+
+ public static Time toTime(final Object value, final String format) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof Time) {
+ return (Time) value;
+ }
+
+ if (value instanceof Number) {
+ final long longValue = ((Number) value).longValue();
+ return new Time(longValue);
+ }
+
+ if (value instanceof String) {
+ try {
+ final java.util.Date utilDate = getDateFormat(format).parse((String) value);
+ return new Time(utilDate.getTime());
+ } catch (final ParseException e) {
+ throw new IllegalTypeConversionException("Could not convert value [" + value
+ + "] of type java.lang.String to Time because the value is not in the expected date format: " + format);
+ }
+ }
+
+ throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Time");
+ }
+
+ private static DateFormat getDateFormat(final String format) {
+ final DateFormat df = new SimpleDateFormat(format);
+ df.setTimeZone(gmt);
+ return df;
+ }
+
+ public static boolean isTimeTypeCompatible(final Object value, final String format) {
+ return isDateTypeCompatible(value, format);
+ }
+
+ public static Timestamp toTimestamp(final Object value, final String format) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof Timestamp) {
+ return (Timestamp) value;
+ }
+
+ if (value instanceof Number) {
+ final long longValue = ((Number) value).longValue();
+ return new Timestamp(longValue);
+ }
+
+ if (value instanceof String) {
+ try {
+ final java.util.Date utilDate = getDateFormat(format).parse((String) value);
+ return new Timestamp(utilDate.getTime());
+ } catch (final ParseException e) {
+ throw new IllegalTypeConversionException("Could not convert value [" + value
+ + "] of type java.lang.String to Timestamp because the value is not in the expected date format: " + format);
+ }
+ }
+
+ throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Timestamp");
+ }
+
+ public static boolean isTimestampTypeCompatible(final Object value, final String format) {
+ return isDateTypeCompatible(value, format);
+ }
+
+
+ public static BigInteger toBigInt(final Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof BigInteger) {
+ return (BigInteger) value;
+ }
+ if (value instanceof Long) {
+ return BigInteger.valueOf((Long) value);
+ }
+
+ throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to BigInteger");
+ }
+
+ public static boolean isBigIntTypeCompatible(final Object value) {
+ return value == null && (value instanceof BigInteger || value instanceof Long);
+ }
+
+ public static Boolean toBoolean(final Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof Boolean) {
+ return (Boolean) value;
+ }
+ if (value instanceof String) {
+ final String string = (String) value;
+ if (string.equalsIgnoreCase("true")) {
+ return Boolean.TRUE;
+ } else if (string.equalsIgnoreCase("false")) {
+ return Boolean.FALSE;
+ }
+ }
+
+ throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Boolean");
+ }
+
+ public static boolean isBooleanTypeCompatible(final Object value) {
+ if (value == null) {
+ return false;
+ }
+ if (value instanceof Boolean) {
+ return true;
+ }
+ if (value instanceof String) {
+ final String string = (String) value;
+ return string.equalsIgnoreCase("true") || string.equalsIgnoreCase("false");
+ }
+ return false;
+ }
+
+ public static Double toDouble(final Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof Number) {
+ return ((Number) value).doubleValue();
+ }
+
+ if (value instanceof String) {
+ return Double.parseDouble((String) value);
+ }
+
+ throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Double");
+ }
+
+ public static boolean isDoubleTypeCompatible(final Object value) {
+ return isNumberTypeCompatible(value, s -> Double.parseDouble(s));
+ }
+
+ private static boolean isNumberTypeCompatible(final Object value, final Consumer<String> stringValueVerifier) {
+ if (value == null) {
+ return false;
+ }
+
+ if (value instanceof Number) {
+ return true;
+ }
+
+ if (value instanceof String) {
+ try {
+ stringValueVerifier.accept((String) value);
+ return true;
+ } catch (final NumberFormatException nfe) {
+ return false;
+ }
+ }
+
+ return false;
+ }
+
+ public static Float toFloat(final Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof Number) {
+ return ((Number) value).floatValue();
+ }
+
+ if (value instanceof String) {
+ return Float.parseFloat((String) value);
+ }
+
+ throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Float");
+ }
+
+ public static boolean isFloatTypeCompatible(final Object value) {
+ return isNumberTypeCompatible(value, s -> Float.parseFloat(s));
+ }
+
+ public static Long toLong(final Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof Number) {
+ return ((Number) value).longValue();
+ }
+
+ if (value instanceof String) {
+ return Long.parseLong((String) value);
+ }
+
+ if (value instanceof java.util.Date) {
+ return ((java.util.Date) value).getTime();
+ }
+
+ throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Long");
+ }
+
+ public static boolean isLongTypeCompatible(final Object value) {
+ if (value == null) {
+ return false;
+ }
+
+ if (value instanceof Number) {
+ return true;
+ }
+
+ if (value instanceof java.util.Date) {
+ return true;
+ }
+
+ if (value instanceof String) {
+ try {
+ Long.parseLong((String) value);
+ return true;
+ } catch (final NumberFormatException nfe) {
+ return false;
+ }
+ }
+
+ return false;
+ }
+
+
+ public static Integer toInteger(final Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof Number) {
+ return ((Number) value).intValue();
+ }
+
+ if (value instanceof String) {
+ return Integer.parseInt((String) value);
+ }
+
+ throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Integer");
+ }
+
+ public static boolean isIntegerTypeCompatible(final Object value) {
+ return isNumberTypeCompatible(value, s -> Integer.parseInt(s));
+ }
+
+
+ public static Short toShort(final Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof Number) {
+ return ((Number) value).shortValue();
+ }
+
+ if (value instanceof String) {
+ return Short.parseShort((String) value);
+ }
+
+ throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Short");
+ }
+
+ public static boolean isShortTypeCompatible(final Object value) {
+ return isNumberTypeCompatible(value, s -> Short.parseShort(s));
+ }
+
+ public static Byte toByte(final Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof Number) {
+ return ((Number) value).byteValue();
+ }
+
+ if (value instanceof String) {
+ return Byte.parseByte((String) value);
+ }
+
+ throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Byte");
+ }
+
+ public static boolean isByteTypeCompatible(final Object value) {
+ return isNumberTypeCompatible(value, s -> Byte.parseByte(s));
+ }
+
+
+ public static Character toCharacter(final Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof Character) {
+ return ((Character) value);
+ }
+
+ if (value instanceof CharSequence) {
+ final CharSequence charSeq = (CharSequence) value;
+ if (charSeq.length() == 0) {
+ throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Character because it has a length of 0");
+ }
+
+ return charSeq.charAt(0);
+ }
+
+ throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Character");
+ }
+
+ public static boolean isCharacterTypeCompatible(final Object value) {
+ return value != null && (value instanceof Character || (value instanceof CharSequence && ((CharSequence) value).length() > 0));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/util/IllegalTypeConversionException.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/util/IllegalTypeConversionException.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/util/IllegalTypeConversionException.java
new file mode 100644
index 0000000..38b5d20
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/util/IllegalTypeConversionException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record.util;
+
+public class IllegalTypeConversionException extends RuntimeException {
+
+ public IllegalTypeConversionException(final String message) {
+ super(message);
+ }
+
+ public IllegalTypeConversionException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+}