You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/05/01 20:12:06 UTC
[13/17] nifi git commit: NIFI-3724 - Initial commit of Parquet bundle
with PutParquet and FetchParquet - Creating nifi-records-utils to share
utility code from record services - Refactoring Parquet tests to use
MockRecorderParser and MockRecordWriter - R
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
new file mode 100644
index 0000000..a6b965d
--- /dev/null
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record;
+
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+import java.util.Date;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+public class MapRecord implements Record {
+ private final RecordSchema schema;
+ private final Map<String, Object> values;
+
+ public MapRecord(final RecordSchema schema, final Map<String, Object> values) {
+ this.schema = Objects.requireNonNull(schema);
+ this.values = Objects.requireNonNull(values);
+ }
+
+ @Override
+ public RecordSchema getSchema() {
+ return schema;
+ }
+
+ @Override
+ public Object[] getValues() {
+ final Object[] values = new Object[schema.getFieldCount()];
+ int i = 0;
+ for (final RecordField recordField : schema.getFields()) {
+ values[i++] = getValue(recordField);
+ }
+ return values;
+ }
+
+ @Override
+ public Object getValue(final String fieldName) {
+ final Optional<RecordField> fieldOption = schema.getField(fieldName);
+ if (fieldOption.isPresent()) {
+ return getValue(fieldOption.get());
+ }
+
+ return null;
+ }
+
+ @Override
+ public Object getValue(final RecordField field) {
+ Object explicitValue = getExplicitValue(field);
+ if (explicitValue != null) {
+ return explicitValue;
+ }
+
+ final Optional<RecordField> resolvedField = resolveField(field);
+ final boolean resolvedFieldDifferent = resolvedField.isPresent() && !resolvedField.get().equals(field);
+ if (resolvedFieldDifferent) {
+ explicitValue = getExplicitValue(resolvedField.get());
+ if (explicitValue != null) {
+ return explicitValue;
+ }
+ }
+
+ Object defaultValue = field.getDefaultValue();
+ if (defaultValue != null) {
+ return defaultValue;
+ }
+
+ if (resolvedFieldDifferent) {
+ return resolvedField.get().getDefaultValue();
+ }
+
+ return null;
+ }
+
+ private Optional<RecordField> resolveField(final RecordField field) {
+ Optional<RecordField> resolved = schema.getField(field.getFieldName());
+ if (resolved.isPresent()) {
+ return resolved;
+ }
+
+ for (final String alias : field.getAliases()) {
+ resolved = schema.getField(alias);
+ if (resolved.isPresent()) {
+ return resolved;
+ }
+ }
+
+ return Optional.empty();
+ }
+
+ private Object getExplicitValue(final RecordField field) {
+ final String canonicalFieldName = field.getFieldName();
+
+ // We use containsKey here instead of just calling get() and checking for a null value
+ // because if the true field name is set to null, we want to return null, rather than
+ // what the alias points to. Likewise for a specific alias, since aliases are defined
+ // in a List with a specific ordering.
+ Object value = values.get(canonicalFieldName);
+ if (value != null) {
+ return value;
+ }
+
+ for (final String alias : field.getAliases()) {
+ value = values.get(alias);
+ if (value != null) {
+ return value;
+ }
+ }
+
+ return null;
+ }
+
+ @Override
+ public String getAsString(final String fieldName) {
+ final Optional<DataType> dataTypeOption = schema.getDataType(fieldName);
+ if (!dataTypeOption.isPresent()) {
+ return null;
+ }
+
+ return convertToString(getValue(fieldName), dataTypeOption.get().getFormat());
+ }
+
+ @Override
+ public String getAsString(final String fieldName, final String format) {
+ return convertToString(getValue(fieldName), format);
+ }
+
+ @Override
+ public String getAsString(final RecordField field, final String format) {
+ return convertToString(getValue(field), format);
+ }
+
+ private String getFormat(final String optionalFormat, final RecordFieldType fieldType) {
+ return (optionalFormat == null) ? fieldType.getDefaultFormat() : optionalFormat;
+ }
+
+ private String convertToString(final Object value, final String format) {
+ if (value == null) {
+ return null;
+ }
+
+ final String dateFormat = getFormat(format, RecordFieldType.DATE);
+ final String timestampFormat = getFormat(format, RecordFieldType.TIMESTAMP);
+ final String timeFormat = getFormat(format, RecordFieldType.TIME);
+ return DataTypeUtils.toString(value, dateFormat, timeFormat, timestampFormat);
+ }
+
+ @Override
+ public Long getAsLong(final String fieldName) {
+ return DataTypeUtils.toLong(getValue(fieldName), fieldName);
+ }
+
+ @Override
+ public Integer getAsInt(final String fieldName) {
+ return DataTypeUtils.toInteger(getValue(fieldName), fieldName);
+ }
+
+ @Override
+ public Double getAsDouble(final String fieldName) {
+ return DataTypeUtils.toDouble(getValue(fieldName), fieldName);
+ }
+
+ @Override
+ public Float getAsFloat(final String fieldName) {
+ return DataTypeUtils.toFloat(getValue(fieldName), fieldName);
+ }
+
+ @Override
+ public Record getAsRecord(String fieldName, final RecordSchema schema) {
+ return DataTypeUtils.toRecord(getValue(fieldName), schema, fieldName);
+ }
+
+ @Override
+ public Boolean getAsBoolean(final String fieldName) {
+ return DataTypeUtils.toBoolean(getValue(fieldName), fieldName);
+ }
+
+ @Override
+ public Date getAsDate(final String fieldName, final String format) {
+ return DataTypeUtils.toDate(getValue(fieldName), format, fieldName);
+ }
+
+ @Override
+ public Object[] getAsArray(final String fieldName) {
+ return DataTypeUtils.toArray(getValue(fieldName), fieldName);
+ }
+
+
+ @Override
+ public int hashCode() {
+ return 31 + 41 * values.hashCode() + 7 * schema.hashCode();
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (!(obj instanceof MapRecord)) {
+ return false;
+ }
+ final MapRecord other = (MapRecord) obj;
+ return schema.equals(other.schema) && values.equals(other.values);
+ }
+
+ @Override
+ public String toString() {
+ return "MapRecord[values=" + values + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/PushBackRecordSet.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/PushBackRecordSet.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/PushBackRecordSet.java
new file mode 100644
index 0000000..a186611
--- /dev/null
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/PushBackRecordSet.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record;
+
+import java.io.IOException;
+
+public class PushBackRecordSet implements RecordSet {
+ private final RecordSet original;
+ private Record pushback;
+
+ public PushBackRecordSet(final RecordSet original) {
+ this.original = original;
+ }
+
+ @Override
+ public RecordSchema getSchema() throws IOException {
+ return original.getSchema();
+ }
+
+ @Override
+ public Record next() throws IOException {
+ if (pushback != null) {
+ final Record record = pushback;
+ pushback = null;
+ return record;
+ }
+
+ return original.next();
+ }
+
+ public void pushback(final Record record) {
+ if (pushback != null) {
+ throw new IllegalStateException("RecordSet already has a Record pushed back. Cannot push back more than one record at a time.");
+ }
+
+ this.pushback = record;
+ }
+
+ public boolean isAnotherRecord() throws IOException {
+ if (pushback != null) {
+ return true;
+ }
+
+ final Record nextRecord = next();
+ if (nextRecord == null) {
+ return false;
+ }
+
+ pushback(nextRecord);
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/Record.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/Record.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/Record.java
new file mode 100644
index 0000000..5e5e7ba
--- /dev/null
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/Record.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record;
+
+import java.util.Date;
+
+public interface Record {
+
+ RecordSchema getSchema();
+
+ /**
+ * <p>
+ * Returns a view of the the values of the fields in this Record.
+ * </p>
+ *
+ * <b>NOTE:</b> The array that is returned may be an underlying array that is backing
+ * the contents of the Record. As such, modifying the array in any way may result in
+ * modifying the record.
+ *
+ * @return a view of the values of the fields in this Record
+ */
+ Object[] getValues();
+
+ Object getValue(String fieldName);
+
+ Object getValue(RecordField field);
+
+ String getAsString(String fieldName);
+
+ String getAsString(String fieldName, String format);
+
+ String getAsString(RecordField field, String format);
+
+ Long getAsLong(String fieldName);
+
+ Integer getAsInt(String fieldName);
+
+ Double getAsDouble(String fieldName);
+
+ Float getAsFloat(String fieldName);
+
+ Record getAsRecord(String fieldName, RecordSchema schema);
+
+ Boolean getAsBoolean(String fieldName);
+
+ Date getAsDate(String fieldName, String format);
+
+ Object[] getAsArray(String fieldName);
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordField.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordField.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordField.java
new file mode 100644
index 0000000..dc68b01
--- /dev/null
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordField.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record;
+
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+
+public class RecordField {
+ private final String fieldName;
+ private final DataType dataType;
+ private final Set<String> aliases;
+ private final Object defaultValue;
+
+ public RecordField(final String fieldName, final DataType dataType) {
+ this(fieldName, dataType, null, Collections.emptySet());
+ }
+
+ public RecordField(final String fieldName, final DataType dataType, final Object defaultValue) {
+ this(fieldName, dataType, defaultValue, Collections.emptySet());
+ }
+
+ public RecordField(final String fieldName, final DataType dataType, final Set<String> aliases) {
+ this(fieldName, dataType, null, aliases);
+ }
+
+ public RecordField(final String fieldName, final DataType dataType, final Object defaultValue, final Set<String> aliases) {
+ if (defaultValue != null && !DataTypeUtils.isCompatibleDataType(defaultValue, dataType)) {
+ throw new IllegalArgumentException("Cannot set the default value for field [" + fieldName + "] to [" + defaultValue
+ + "] because that is not a valid value for Data Type [" + dataType + "]");
+ }
+
+ this.fieldName = Objects.requireNonNull(fieldName);
+ this.dataType = Objects.requireNonNull(dataType);
+ this.aliases = Collections.unmodifiableSet(Objects.requireNonNull(aliases));
+ this.defaultValue = defaultValue;
+ }
+
+ public String getFieldName() {
+ return fieldName;
+ }
+
+ public Set<String> getAliases() {
+ return aliases;
+ }
+
+ public DataType getDataType() {
+ return dataType;
+ }
+
+ public Object getDefaultValue() {
+ return defaultValue;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + dataType.hashCode();
+ result = prime * result + fieldName.hashCode();
+ result = prime * result + aliases.hashCode();
+ result = prime * result + ((defaultValue == null) ? 0 : defaultValue.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+
+ RecordField other = (RecordField) obj;
+ return dataType.equals(other.getDataType()) && fieldName.equals(other.getFieldName()) && aliases.equals(other.getAliases()) && Objects.equals(defaultValue, other.defaultValue);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java
new file mode 100644
index 0000000..2e6b5d7
--- /dev/null
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record;
+
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.ChoiceDataType;
+import org.apache.nifi.serialization.record.type.MapDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public enum RecordFieldType {
+ /**
+ * A String field type. Fields of this type use a {@code java.lang.String} value.
+ */
+ STRING("string"),
+
+ /**
+ * A boolean field type. Fields of this type use a {@code boolean} value.
+ */
+ BOOLEAN("boolean"),
+
+ /**
+ * A byte field type. Fields of this type use a {@code byte} value.
+ */
+ BYTE("byte"),
+
+ /**
+ * A char field type. Fields of this type use a {@code char} value.
+ */
+ CHAR("char"),
+
+ /**
+ * A short field type. Fields of this type use a {@code short} value.
+ */
+ SHORT("short"),
+
+ /**
+ * An int field type. Fields of this type use an {@code int} value.
+ */
+ INT("int"),
+
+ /**
+ * A bigint field type. Fields of this type use a {@code java.math.BigInteger} value.
+ */
+ BIGINT("bigint"),
+
+ /**
+ * A long field type. Fields of this type use a {@code long} value.
+ */
+ LONG("long"),
+
+ /**
+ * A float field type. Fields of this type use a {@code float} value.
+ */
+ FLOAT("float"),
+
+ /**
+ * A double field type. Fields of this type use a {@code double} value.
+ */
+ DOUBLE("double"),
+
+ /**
+ * A date field type. Fields of this type use a {@code java.sql.Date} value.
+ */
+ DATE("date", "yyyy-MM-dd"),
+
+ /**
+ * A time field type. Fields of this type use a {@code java.sql.Time} value.
+ */
+ TIME("time", "HH:mm:ss"),
+
+ /**
+ * A timestamp field type. Fields of this type use a {@code java.sql.Timestamp} value.
+ */
+ TIMESTAMP("timestamp", "yyyy-MM-dd HH:mm:ss"),
+
+ /**
+ * <p>
+ * A record field type. Fields of this type use a {@code org.apache.nifi.serialization.record.Record} value. A Record DataType should be
+ * created by providing the {@link RecordSchema} for the record:
+ * </p>
+ *
+ * <code>
+ * final DataType recordType = RecordFieldType.RECORD.getRecordDataType(recordSchema);
+ * </code>
+ *
+ * <p>
+ * A field of type RECORD should always have a {@link RecordDataType}, so the following idiom is acceptable for use:
+ * </p>
+ *
+ * <code>
+ * <pre>
+ * final DataType dataType = ...;
+ * if (dataType.getFieldType() == RecordFieldType.RECORD) {
+ * final RecordDataType recordDataType = (RecordDataType) dataType;
+ * final RecordSchema childSchema = recordDataType.getChildSchema();
+ * ...
+ * }
+ * </pre>
+ * </code>
+ */
+ RECORD("record", null, new RecordDataType(null)),
+
+ /**
+ * <p>
+ * A choice field type. A field of type choice can be one of any number of different types, which are defined by the DataType that is used.
+ * For example, if a field should allow either a Long or an Integer, this can be accomplished by using:
+ * </p>
+ *
+ * <code>
+ * final DataType choiceType = RecordFieldType.CHOICE.getChoiceDataType( RecordFieldType.INT.getDataType(), RecordFieldType.LONG.getDataType() );
+ * </code>
+ *
+ * <p>
+ * A field of type CHOICE should always have a {@link ChoiceDataType}, so the following idiom is acceptable for use:
+ * </p>
+ *
+ * <code>
+ * <pre>
+ * final DataType dataType = ...;
+ * if (dataType.getFieldType() == RecordFieldType.CHOICE) {
+ * final ChoiceDataType choiceDataType = (ChoiceDataType) dataType;
+ * final List<DataType> allowableTypes = choiceDataType.getPossibleSubTypes();
+ * ...
+ * }
+ * </pre>
+ * </code>
+ */
+ CHOICE("choice", null, new ChoiceDataType(Collections.emptyList())),
+
+ /**
+ * <p>
+ * An array field type. Fields of this type use a {@code Object[]} value. Note that we are explicitly indicating that
+ * Object[] should be used here and not primitive array types. For instance, setting a value of {@code int[]} is not allowed. The DataType for
+ * this field should be created using the {@link #getArrayDataType(DataType)} method:
+ * </p>
+ *
+ * <code>
+ * final DataType arrayType = RecordFieldType.ARRAY.getArrayDataType( RecordFieldType.INT.getDataType() );
+ * </code>
+ *
+ * <p>
+ * A field of type ARRAY should always have an {@link ArrayDataType}, so the following idiom is acceptable for use:
+ * </p>
+ *
+ * <code>
+ * <pre>
+ * final DataType dataType = ...;
+ * if (dataType.getFieldType() == RecordFieldType.ARRAY) {
+ * final ArrayDataType arrayDataType = (ArrayDataType) dataType;
+ * final DataType elementType = arrayDataType.getElementType();
+ * ...
+ * }
+ * </pre>
+ * </code>
+ */
+ ARRAY("array", null, new ArrayDataType(null)),
+
+ /**
+ * <p>
+ * A record field type. Fields of this type use a {@code Map<String, Object>} value. A Map DataType should be
+ * created by providing the {@link DataType} for the values:
+ * </p>
+ *
+ * <code>
+ * final DataType recordType = RecordFieldType.MAP.getRecordDataType( RecordFieldType.STRING.getDataType() );
+ * </code>
+ *
+ * <p>
+ * A field of type MAP should always have a {@link MapDataType}, so the following idiom is acceptable for use:
+ * </p>
+ *
+ * <code>
+ * <pre>
+ * final DataType dataType = ...;
+ * if (dataType.getFieldType() == RecordFieldType.MAP) {
+ * final MapDataType mapDataType = (MapDataType) dataType;
+ * final DataType valueType = mapDataType.getValueType();
+ * ...
+ * }
+ * </pre>
+ * </code>
+ */
+ MAP("map", null, new MapDataType(null));
+
+
+ private static final Map<String, RecordFieldType> SIMPLE_NAME_MAP = new HashMap<String, RecordFieldType>();
+
+ static {
+ for (RecordFieldType value : values()) {
+ SIMPLE_NAME_MAP.put(value.simpleName, value);
+ }
+ }
+
+ private final String simpleName;
+ private final String defaultFormat;
+ private final DataType defaultDataType;
+
+ private RecordFieldType(final String simpleName) {
+ this(simpleName, null);
+ }
+
+ private RecordFieldType(final String simpleName, final String defaultFormat) {
+ this.simpleName = simpleName;
+ this.defaultFormat = defaultFormat;
+ this.defaultDataType = new DataType(this, defaultFormat);
+ }
+
+ private RecordFieldType(final String simpleName, final String defaultFormat, final DataType defaultDataType) {
+ this.simpleName = simpleName;
+ this.defaultFormat = defaultFormat;
+ this.defaultDataType = defaultDataType;
+ }
+
+ public String getDefaultFormat() {
+ return defaultFormat;
+ }
+
+ /**
+ * @return the DataType with the default format
+ */
+ public DataType getDataType() {
+ return defaultDataType;
+ }
+
+ public DataType getDataType(final String format) {
+ return new DataType(this, format);
+ }
+
+ /**
+ * Returns a Data Type that represents a "RECORD" or "ARRAY" type with the given schema.
+ *
+ * @param childSchema the Schema for the Record or Array
+ * @return a DataType that represents a Record or Array with the given schema, or <code>null</code> if this RecordFieldType
+ * is not the RECORD or ARRAY type.
+ */
+ public DataType getRecordDataType(final RecordSchema childSchema) {
+ if (this != RECORD) {
+ return null;
+ }
+
+ return new RecordDataType(childSchema);
+ }
+
+ /**
+ * Returns a Data Type that represents an "ARRAY" type with the given element type.
+ *
+ * @param elementType the type of the arrays in the element
+ * @return a DataType that represents an Array with the given element type, or <code>null</code> if this RecordFieldType
+ * is not the ARRAY type.
+ */
+ public DataType getArrayDataType(final DataType elementType) {
+ if (this != ARRAY) {
+ return null;
+ }
+
+ return new ArrayDataType(elementType);
+ }
+
+
+ /**
+ * Returns a Data Type that represents a "CHOICE" of multiple possible types. This method is
+ * only applicable for a RecordFieldType of {@link #CHOICE}.
+ *
+ * @param possibleChildTypes the possible types that are allowable
+ * @return a DataType that represents a "CHOICE" of multiple possible types, or <code>null</code> if this RecordFieldType
+ * is not the CHOICE type.
+ */
+ public DataType getChoiceDataType(final List<DataType> possibleChildTypes) {
+ if (this != CHOICE) {
+ return null;
+ }
+
+ return new ChoiceDataType(possibleChildTypes);
+ }
+
+ /**
+ * Returns a Data Type that represents a "CHOICE" of multiple possible types. This method is
+ * only applicable for a RecordFieldType of {@link #CHOICE}.
+ *
+ * @param possibleChildTypes the possible types that are allowable
+ * @return a DataType that represents a "CHOICE" of multiple possible types, or <code>null</code> if this RecordFieldType
+ * is not the CHOICE type.
+ */
+ public DataType getChoiceDataType(final DataType... possibleChildTypes) {
+ if (this != CHOICE) {
+ return null;
+ }
+
+ final List<DataType> list = new ArrayList<>(possibleChildTypes.length);
+ for (final DataType type : possibleChildTypes) {
+ list.add(type);
+ }
+
+ return new ChoiceDataType(list);
+ }
+
+ /**
+ * Returns a Data Type that represents a "MAP" type with the given value type.
+ *
+ * @param valueDataType the type of the values in the map
+ * @return a DataType that represents a Map with the given value type, or <code>null</code> if this RecordFieldType
+ * is not the MAP type.
+ */
+ public DataType getMapDataType(final DataType valueDataType) {
+ if (this != MAP) {
+ return null;
+ }
+
+ return new MapDataType(valueDataType);
+ }
+
+
+ public static RecordFieldType of(final String typeString) {
+ return SIMPLE_NAME_MAP.get(typeString);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordSchema.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordSchema.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordSchema.java
new file mode 100644
index 0000000..367f2b0
--- /dev/null
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordSchema.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record;
+
+import java.util.List;
+import java.util.Optional;
+
+public interface RecordSchema {
+ /**
+ * @return the list of fields that are present in the schema
+ */
+ List<RecordField> getFields();
+
+ /**
+ * @return the number of fields in the schema
+ */
+ int getFieldCount();
+
+ /**
+ * @param index the 0-based index of which field to return
+ * @return the index'th field
+ *
+ * @throws IndexOutOfBoundsException if the index is < 0 or >= the number of fields (determined by {@link #getFieldCount()}).
+ */
+ RecordField getField(int index);
+
+ /**
+ * @return the data types of the fields
+ */
+ List<DataType> getDataTypes();
+
+ /**
+ * @return the names of the fields
+ */
+ List<String> getFieldNames();
+
+ /**
+ * @param fieldName the name of the field whose type is desired
+ * @return the RecordFieldType associated with the field that has the given name, or
+ * <code>null</code> if the schema does not contain a field with the given name
+ */
+ Optional<DataType> getDataType(String fieldName);
+
+ /**
+ * @return the textual representation of the schema, if one is available
+ */
+ Optional<String> getSchemaText();
+
+ /**
+ * @return the format of the schema text, if schema text is present
+ */
+ Optional<String> getSchemaFormat();
+
+ /**
+ * @param fieldName the name of the field
+ * @return an Optional RecordField for the field with the given name
+ */
+ Optional<RecordField> getField(String fieldName);
+
+ /**
+ * @return the SchemaIdentifier, which provides various attributes for identifying a schema
+ */
+ SchemaIdentifier getIdentifier();
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordSet.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordSet.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordSet.java
new file mode 100644
index 0000000..9e67346
--- /dev/null
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordSet.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record;
+
+import java.io.IOException;
+
+public interface RecordSet {
+
+ /**
+ * @return the {@link RecordSchema} that applies to the records in this RecordSet
+ */
+ RecordSchema getSchema() throws IOException;
+
+ /**
+ * @return the next {@link Record} in the set or <code>null</code> if there are no more records
+ */
+ Record next() throws IOException;
+
+ /**
+ * Returns a new Record Set that will return no more than {@code maxRecords} records from this
+ * RecordSet. Any Records that are pulled from this newly created RecordSet will also advance
+ * the cursor in this Record Set and vice versa.
+ *
+ * @param maxRecords the maximum number of records to return from the new RecordSet
+ * @return a view of this RecordSet that limits the number of records returned
+ */
+ default RecordSet limit(final int maxRecords) {
+ if (maxRecords < 0) {
+ throw new IllegalArgumentException("Cannot limit number of records to " + maxRecords + ". Limit must be a non-negative integer");
+ }
+
+ final RecordSet original = this;
+ return new RecordSet() {
+ private int count = 0;
+
+ @Override
+ public RecordSchema getSchema() throws IOException {
+ return original.getSchema();
+ }
+
+ @Override
+ public Record next() throws IOException {
+ if (count >= maxRecords) {
+ return null;
+ }
+
+ final Record record = original.next();
+ if (record != null) {
+ count++;
+ }
+
+ return record;
+ }
+ };
+ }
+
+ public static RecordSet of(final RecordSchema schema, final Record... records) {
+ return new RecordSet() {
+ private int index = 0;
+
+ @Override
+ public RecordSchema getSchema() {
+ return schema;
+ }
+
+ @Override
+ public Record next() {
+ if (index >= records.length) {
+ return null;
+ }
+
+ return records[index++];
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
new file mode 100644
index 0000000..b6daab7
--- /dev/null
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.sql.Array;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ResultSetRecordSet implements RecordSet, Closeable {
+ private static final Logger logger = LoggerFactory.getLogger(ResultSetRecordSet.class);
+ private final ResultSet rs;
+ private final RecordSchema schema;
+ private final Set<String> rsColumnNames;
+ private boolean moreRows;
+
+ public ResultSetRecordSet(final ResultSet rs) throws SQLException {
+ this.rs = rs;
+ moreRows = rs.next();
+ this.schema = createSchema(rs);
+
+ rsColumnNames = new HashSet<>();
+ final ResultSetMetaData metadata = rs.getMetaData();
+ for (int i = 0; i < metadata.getColumnCount(); i++) {
+ rsColumnNames.add(metadata.getColumnLabel(i + 1));
+ }
+ }
+
+ @Override
+ public RecordSchema getSchema() {
+ return schema;
+ }
+
+ @Override
+ public Record next() throws IOException {
+ try {
+ if (moreRows) {
+ final Record record = createRecord(rs);
+ moreRows = rs.next();
+ return record;
+ } else {
+ return null;
+ }
+ } catch (final SQLException e) {
+ throw new IOException("Could not obtain next record from ResultSet", e);
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ rs.close();
+ } catch (final SQLException e) {
+ logger.error("Failed to close ResultSet", e);
+ }
+ }
+
+ private Record createRecord(final ResultSet rs) throws SQLException {
+ final Map<String, Object> values = new HashMap<>(schema.getFieldCount());
+
+ for (final RecordField field : schema.getFields()) {
+ final String fieldName = field.getFieldName();
+
+ final Object value;
+ if (rsColumnNames.contains(fieldName)) {
+ value = normalizeValue(rs.getObject(fieldName));
+ } else {
+ value = null;
+ }
+
+ values.put(fieldName, value);
+ }
+
+ return new MapRecord(schema, values);
+ }
+
+ @SuppressWarnings("rawtypes")
+ private Object normalizeValue(final Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof List) {
+ return ((List) value).toArray();
+ }
+
+ return value;
+ }
+
+ private static RecordSchema createSchema(final ResultSet rs) throws SQLException {
+ final ResultSetMetaData metadata = rs.getMetaData();
+ final int numCols = metadata.getColumnCount();
+ final List<RecordField> fields = new ArrayList<>(numCols);
+
+ for (int i = 0; i < numCols; i++) {
+ final int column = i + 1;
+ final int sqlType = metadata.getColumnType(column);
+
+ final DataType dataType = getDataType(sqlType, rs, column);
+ final String fieldName = metadata.getColumnLabel(column);
+ final RecordField field = new RecordField(fieldName, dataType);
+ fields.add(field);
+ }
+
+ return new SimpleRecordSchema(fields);
+ }
+
+ private static DataType getDataType(final int sqlType, final ResultSet rs, final int columnIndex) throws SQLException {
+ switch (sqlType) {
+ case Types.ARRAY:
+ // The JDBC API does not allow us to know what the base type of an array is through the metadata.
+ // As a result, we have to obtain the actual Array for this record. Once we have this, we can determine
+ // the base type. However, if the base type is, itself, an array, we will simply return a base type of
+ // String because otherwise, we need the ResultSet for the array itself, and many JDBC Drivers do not
+ // support calling Array.getResultSet() and will throw an Exception if that is not supported.
+ if (rs.isAfterLast()) {
+ return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType());
+ }
+
+ final Array array = rs.getArray(columnIndex);
+ if (array == null) {
+ return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType());
+ }
+
+ final DataType baseType = getArrayBaseType(array);
+ return RecordFieldType.ARRAY.getArrayDataType(baseType);
+ case Types.BINARY:
+ case Types.LONGVARBINARY:
+ case Types.VARBINARY:
+ return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType());
+ case Types.OTHER:
+ // If we have no records to inspect, we can't really know its schema so we simply use the default data type.
+ if (rs.isAfterLast()) {
+ return RecordFieldType.RECORD.getDataType();
+ }
+
+ final Object obj = rs.getObject(columnIndex);
+ if (obj == null || !(obj instanceof Record)) {
+ return RecordFieldType.RECORD.getDataType();
+ }
+
+ final Record record = (Record) obj;
+ final RecordSchema recordSchema = record.getSchema();
+ return RecordFieldType.RECORD.getRecordDataType(recordSchema);
+ default:
+ return getFieldType(sqlType).getDataType();
+ }
+ }
+
+ private static DataType getArrayBaseType(final Array array) throws SQLException {
+ final Object arrayValue = array.getArray();
+ if (arrayValue == null) {
+ return RecordFieldType.STRING.getDataType();
+ }
+
+ if (arrayValue instanceof byte[]) {
+ return RecordFieldType.BYTE.getDataType();
+ }
+ if (arrayValue instanceof int[]) {
+ return RecordFieldType.INT.getDataType();
+ }
+ if (arrayValue instanceof long[]) {
+ return RecordFieldType.LONG.getDataType();
+ }
+ if (arrayValue instanceof boolean[]) {
+ return RecordFieldType.BOOLEAN.getDataType();
+ }
+ if (arrayValue instanceof short[]) {
+ return RecordFieldType.SHORT.getDataType();
+ }
+ if (arrayValue instanceof byte[]) {
+ return RecordFieldType.BYTE.getDataType();
+ }
+ if (arrayValue instanceof float[]) {
+ return RecordFieldType.FLOAT.getDataType();
+ }
+ if (arrayValue instanceof double[]) {
+ return RecordFieldType.DOUBLE.getDataType();
+ }
+ if (arrayValue instanceof char[]) {
+ return RecordFieldType.CHAR.getDataType();
+ }
+ if (arrayValue instanceof Object[]) {
+ final Object[] values = (Object[]) arrayValue;
+ if (values.length == 0) {
+ return RecordFieldType.STRING.getDataType();
+ }
+
+ Object valueToLookAt = null;
+ for (int i = 0; i < values.length; i++) {
+ valueToLookAt = values[i];
+ if (valueToLookAt != null) {
+ break;
+ }
+ }
+ if (valueToLookAt == null) {
+ return RecordFieldType.STRING.getDataType();
+ }
+
+ if (valueToLookAt instanceof String) {
+ return RecordFieldType.STRING.getDataType();
+ }
+ if (valueToLookAt instanceof Long) {
+ return RecordFieldType.LONG.getDataType();
+ }
+ if (valueToLookAt instanceof Integer) {
+ return RecordFieldType.INT.getDataType();
+ }
+ if (valueToLookAt instanceof Short) {
+ return RecordFieldType.SHORT.getDataType();
+ }
+ if (valueToLookAt instanceof Byte) {
+ return RecordFieldType.BYTE.getDataType();
+ }
+ if (valueToLookAt instanceof Float) {
+ return RecordFieldType.FLOAT.getDataType();
+ }
+ if (valueToLookAt instanceof Double) {
+ return RecordFieldType.DOUBLE.getDataType();
+ }
+ if (valueToLookAt instanceof Boolean) {
+ return RecordFieldType.BOOLEAN.getDataType();
+ }
+ if (valueToLookAt instanceof Character) {
+ return RecordFieldType.CHAR.getDataType();
+ }
+ if (valueToLookAt instanceof BigInteger) {
+ return RecordFieldType.BIGINT.getDataType();
+ }
+ if (valueToLookAt instanceof Integer) {
+ return RecordFieldType.INT.getDataType();
+ }
+ if (valueToLookAt instanceof java.sql.Time) {
+ return RecordFieldType.TIME.getDataType();
+ }
+ if (valueToLookAt instanceof java.sql.Date) {
+ return RecordFieldType.DATE.getDataType();
+ }
+ if (valueToLookAt instanceof java.sql.Timestamp) {
+ return RecordFieldType.TIMESTAMP.getDataType();
+ }
+ if (valueToLookAt instanceof Record) {
+ return RecordFieldType.RECORD.getDataType();
+ }
+ }
+
+ return RecordFieldType.STRING.getDataType();
+ }
+
+
+ private static RecordFieldType getFieldType(final int sqlType) {
+ switch (sqlType) {
+ case Types.BIGINT:
+ case Types.ROWID:
+ return RecordFieldType.LONG;
+ case Types.BIT:
+ case Types.BOOLEAN:
+ return RecordFieldType.BOOLEAN;
+ case Types.CHAR:
+ return RecordFieldType.CHAR;
+ case Types.DATE:
+ return RecordFieldType.DATE;
+ case Types.DECIMAL:
+ case Types.DOUBLE:
+ case Types.NUMERIC:
+ case Types.REAL:
+ return RecordFieldType.DOUBLE;
+ case Types.FLOAT:
+ return RecordFieldType.FLOAT;
+ case Types.INTEGER:
+ return RecordFieldType.INT;
+ case Types.SMALLINT:
+ return RecordFieldType.SHORT;
+ case Types.TINYINT:
+ return RecordFieldType.BYTE;
+ case Types.LONGNVARCHAR:
+ case Types.LONGVARCHAR:
+ case Types.NCHAR:
+ case Types.NULL:
+ case Types.NVARCHAR:
+ case Types.VARCHAR:
+ return RecordFieldType.STRING;
+ case Types.OTHER:
+ case Types.JAVA_OBJECT:
+ return RecordFieldType.RECORD;
+ case Types.TIME:
+ case Types.TIME_WITH_TIMEZONE:
+ return RecordFieldType.TIME;
+ case Types.TIMESTAMP:
+ case Types.TIMESTAMP_WITH_TIMEZONE:
+ return RecordFieldType.TIMESTAMP;
+ }
+
+ return RecordFieldType.STRING;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java
new file mode 100644
index 0000000..d7f5664
--- /dev/null
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record;
+
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+public interface SchemaIdentifier {
+
+ /**
+ * @return the name of the schema, if one has been defined.
+ */
+ Optional<String> getName();
+
+ /**
+ * @return the identifier of the schema, if one has been defined.
+ */
+ OptionalLong getIdentifier();
+
+ /**
+ * @return the version of the schema, if one has been defined.
+ */
+ OptionalInt getVersion();
+
+
+ public static SchemaIdentifier EMPTY = new StandardSchemaIdentifier(null, null, null);
+
+ public static SchemaIdentifier ofName(final String name) {
+ return new StandardSchemaIdentifier(name, null, null);
+ }
+
+ public static SchemaIdentifier of(final String name, final long identifier, final int version) {
+ return new StandardSchemaIdentifier(name, identifier, version);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java
new file mode 100644
index 0000000..86db284
--- /dev/null
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record;
+
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+public class StandardSchemaIdentifier implements SchemaIdentifier {
+ private final Optional<String> name;
+ private final OptionalLong identifier;
+ private final OptionalInt version;
+
+ StandardSchemaIdentifier(final String name, final Long identifier, final Integer version) {
+ this.name = Optional.ofNullable(name);
+ this.identifier = identifier == null ? OptionalLong.empty() : OptionalLong.of(identifier);;
+ this.version = version == null ? OptionalInt.empty() : OptionalInt.of(version);;
+ }
+
+ @Override
+ public Optional<String> getName() {
+ return name;
+ }
+
+ @Override
+ public OptionalLong getIdentifier() {
+ return identifier;
+ }
+
+ @Override
+ public OptionalInt getVersion() {
+ return version;
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 + 41 * getName().hashCode() + 41 * getIdentifier().hashCode() + 41 * getVersion().hashCode();
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (!(obj instanceof SchemaIdentifier)) {
+ return false;
+ }
+ final SchemaIdentifier other = (SchemaIdentifier) obj;
+ return getName().equals(other.getName()) && getIdentifier().equals(other.getIdentifier()) && getVersion().equals(other.getVersion());
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/TypeMismatchException.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/TypeMismatchException.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/TypeMismatchException.java
new file mode 100644
index 0000000..af5f909
--- /dev/null
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/TypeMismatchException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record;
+
+public class TypeMismatchException extends RuntimeException {
+ public TypeMismatchException(String message) {
+ super(message);
+ }
+
+ public TypeMismatchException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/ArrayDataType.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/ArrayDataType.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/ArrayDataType.java
new file mode 100644
index 0000000..46dc447
--- /dev/null
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/ArrayDataType.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record.type;
+
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordFieldType;
+
+import java.util.Objects;
+
+public class ArrayDataType extends DataType {
+ private final DataType elementType;
+
+ public ArrayDataType(final DataType elementType) {
+ super(RecordFieldType.ARRAY, null);
+ this.elementType = elementType;
+ }
+
+ public DataType getElementType() {
+ return elementType;
+ }
+
+ @Override
+ public RecordFieldType getFieldType() {
+ return RecordFieldType.ARRAY;
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 + 41 * getFieldType().hashCode() + 41 * (elementType == null ? 0 : elementType.hashCode());
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (!(obj instanceof ArrayDataType)) {
+ return false;
+ }
+
+ final ArrayDataType other = (ArrayDataType) obj;
+ return getFieldType().equals(other.getFieldType()) && Objects.equals(elementType, other.elementType);
+ }
+
+ @Override
+ public String toString() {
+ return "ARRAY[" + elementType + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/ChoiceDataType.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/ChoiceDataType.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/ChoiceDataType.java
new file mode 100644
index 0000000..9fcdf73
--- /dev/null
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/ChoiceDataType.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record.type;
+
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordFieldType;
+
+import java.util.List;
+import java.util.Objects;
+
+public class ChoiceDataType extends DataType {
+ private final List<DataType> possibleSubTypes;
+
+ public ChoiceDataType(final List<DataType> possibleSubTypes) {
+ super(RecordFieldType.CHOICE, null);
+ this.possibleSubTypes = Objects.requireNonNull(possibleSubTypes);
+ }
+
+ public List<DataType> getPossibleSubTypes() {
+ return possibleSubTypes;
+ }
+
+ @Override
+ public RecordFieldType getFieldType() {
+ return RecordFieldType.CHOICE;
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 + 41 * getFieldType().hashCode() + 41 * (possibleSubTypes == null ? 0 : possibleSubTypes.hashCode());
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (!(obj instanceof ChoiceDataType)) {
+ return false;
+ }
+
+ final ChoiceDataType other = (ChoiceDataType) obj;
+ return getFieldType().equals(other.getFieldType()) && Objects.equals(possibleSubTypes, other.possibleSubTypes);
+ }
+
+ @Override
+ public String toString() {
+ return "CHOICE" + possibleSubTypes;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/MapDataType.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/MapDataType.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/MapDataType.java
new file mode 100644
index 0000000..5ed1c39
--- /dev/null
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/MapDataType.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record.type;
+
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordFieldType;
+
+import java.util.Objects;
+
+public class MapDataType extends DataType {
+ private final DataType valueType;
+
+ public MapDataType(final DataType elementType) {
+ super(RecordFieldType.MAP, null);
+ this.valueType = elementType;
+ }
+
+ public DataType getValueType() {
+ return valueType;
+ }
+
+ @Override
+ public RecordFieldType getFieldType() {
+ return RecordFieldType.MAP;
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 + 41 * getFieldType().hashCode() + 41 * (valueType == null ? 0 : valueType.hashCode());
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (!(obj instanceof MapDataType)) {
+ return false;
+ }
+
+ final MapDataType other = (MapDataType) obj;
+ return getValueType().equals(other.getValueType()) && Objects.equals(valueType, other.valueType);
+ }
+
+ @Override
+ public String toString() {
+ return "MAP[" + valueType + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/RecordDataType.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/RecordDataType.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/RecordDataType.java
new file mode 100644
index 0000000..fc6993f
--- /dev/null
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/RecordDataType.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record.type;
+
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.util.Objects;
+
+public class RecordDataType extends DataType {
+ private final RecordSchema childSchema;
+
+ public RecordDataType(final RecordSchema childSchema) {
+ super(RecordFieldType.RECORD, null);
+ this.childSchema = childSchema;
+ }
+
+ @Override
+ public RecordFieldType getFieldType() {
+ return RecordFieldType.RECORD;
+ }
+
+ public RecordSchema getChildSchema() {
+ return childSchema;
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 + 41 * getFieldType().hashCode() + 41 * (childSchema == null ? 0 : childSchema.hashCode());
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (!(obj instanceof RecordDataType)) {
+ return false;
+ }
+
+ final RecordDataType other = (RecordDataType) obj;
+ return getFieldType().equals(other.getFieldType()) && Objects.equals(childSchema, other.childSchema);
+ }
+
+ @Override
+ public String toString() {
+ return RecordFieldType.RECORD.toString();
+ }
+}