You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/05/01 20:11:56 UTC
[03/17] nifi git commit: NIFI-3724 - Initial commit of Parquet bundle
with PutParquet and FetchParquet - Creating nifi-records-utils to share
utility code from record services - Refactoring Parquet tests to use
MockRecorderParser and MockRecordWriter - R
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
deleted file mode 100644
index b6daab7..0000000
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.serialization.record;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.math.BigInteger;
-import java.sql.Array;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.sql.Types;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.nifi.serialization.SimpleRecordSchema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ResultSetRecordSet implements RecordSet, Closeable {
- private static final Logger logger = LoggerFactory.getLogger(ResultSetRecordSet.class);
- private final ResultSet rs;
- private final RecordSchema schema;
- private final Set<String> rsColumnNames;
- private boolean moreRows;
-
- public ResultSetRecordSet(final ResultSet rs) throws SQLException {
- this.rs = rs;
- moreRows = rs.next();
- this.schema = createSchema(rs);
-
- rsColumnNames = new HashSet<>();
- final ResultSetMetaData metadata = rs.getMetaData();
- for (int i = 0; i < metadata.getColumnCount(); i++) {
- rsColumnNames.add(metadata.getColumnLabel(i + 1));
- }
- }
-
- @Override
- public RecordSchema getSchema() {
- return schema;
- }
-
- @Override
- public Record next() throws IOException {
- try {
- if (moreRows) {
- final Record record = createRecord(rs);
- moreRows = rs.next();
- return record;
- } else {
- return null;
- }
- } catch (final SQLException e) {
- throw new IOException("Could not obtain next record from ResultSet", e);
- }
- }
-
- @Override
- public void close() {
- try {
- rs.close();
- } catch (final SQLException e) {
- logger.error("Failed to close ResultSet", e);
- }
- }
-
- private Record createRecord(final ResultSet rs) throws SQLException {
- final Map<String, Object> values = new HashMap<>(schema.getFieldCount());
-
- for (final RecordField field : schema.getFields()) {
- final String fieldName = field.getFieldName();
-
- final Object value;
- if (rsColumnNames.contains(fieldName)) {
- value = normalizeValue(rs.getObject(fieldName));
- } else {
- value = null;
- }
-
- values.put(fieldName, value);
- }
-
- return new MapRecord(schema, values);
- }
-
- @SuppressWarnings("rawtypes")
- private Object normalizeValue(final Object value) {
- if (value == null) {
- return null;
- }
-
- if (value instanceof List) {
- return ((List) value).toArray();
- }
-
- return value;
- }
-
- private static RecordSchema createSchema(final ResultSet rs) throws SQLException {
- final ResultSetMetaData metadata = rs.getMetaData();
- final int numCols = metadata.getColumnCount();
- final List<RecordField> fields = new ArrayList<>(numCols);
-
- for (int i = 0; i < numCols; i++) {
- final int column = i + 1;
- final int sqlType = metadata.getColumnType(column);
-
- final DataType dataType = getDataType(sqlType, rs, column);
- final String fieldName = metadata.getColumnLabel(column);
- final RecordField field = new RecordField(fieldName, dataType);
- fields.add(field);
- }
-
- return new SimpleRecordSchema(fields);
- }
-
- private static DataType getDataType(final int sqlType, final ResultSet rs, final int columnIndex) throws SQLException {
- switch (sqlType) {
- case Types.ARRAY:
- // The JDBC API does not allow us to know what the base type of an array is through the metadata.
- // As a result, we have to obtain the actual Array for this record. Once we have this, we can determine
- // the base type. However, if the base type is, itself, an array, we will simply return a base type of
- // String because otherwise, we need the ResultSet for the array itself, and many JDBC Drivers do not
- // support calling Array.getResultSet() and will throw an Exception if that is not supported.
- if (rs.isAfterLast()) {
- return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType());
- }
-
- final Array array = rs.getArray(columnIndex);
- if (array == null) {
- return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType());
- }
-
- final DataType baseType = getArrayBaseType(array);
- return RecordFieldType.ARRAY.getArrayDataType(baseType);
- case Types.BINARY:
- case Types.LONGVARBINARY:
- case Types.VARBINARY:
- return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType());
- case Types.OTHER:
- // If we have no records to inspect, we can't really know its schema so we simply use the default data type.
- if (rs.isAfterLast()) {
- return RecordFieldType.RECORD.getDataType();
- }
-
- final Object obj = rs.getObject(columnIndex);
- if (obj == null || !(obj instanceof Record)) {
- return RecordFieldType.RECORD.getDataType();
- }
-
- final Record record = (Record) obj;
- final RecordSchema recordSchema = record.getSchema();
- return RecordFieldType.RECORD.getRecordDataType(recordSchema);
- default:
- return getFieldType(sqlType).getDataType();
- }
- }
-
- private static DataType getArrayBaseType(final Array array) throws SQLException {
- final Object arrayValue = array.getArray();
- if (arrayValue == null) {
- return RecordFieldType.STRING.getDataType();
- }
-
- if (arrayValue instanceof byte[]) {
- return RecordFieldType.BYTE.getDataType();
- }
- if (arrayValue instanceof int[]) {
- return RecordFieldType.INT.getDataType();
- }
- if (arrayValue instanceof long[]) {
- return RecordFieldType.LONG.getDataType();
- }
- if (arrayValue instanceof boolean[]) {
- return RecordFieldType.BOOLEAN.getDataType();
- }
- if (arrayValue instanceof short[]) {
- return RecordFieldType.SHORT.getDataType();
- }
- if (arrayValue instanceof byte[]) {
- return RecordFieldType.BYTE.getDataType();
- }
- if (arrayValue instanceof float[]) {
- return RecordFieldType.FLOAT.getDataType();
- }
- if (arrayValue instanceof double[]) {
- return RecordFieldType.DOUBLE.getDataType();
- }
- if (arrayValue instanceof char[]) {
- return RecordFieldType.CHAR.getDataType();
- }
- if (arrayValue instanceof Object[]) {
- final Object[] values = (Object[]) arrayValue;
- if (values.length == 0) {
- return RecordFieldType.STRING.getDataType();
- }
-
- Object valueToLookAt = null;
- for (int i = 0; i < values.length; i++) {
- valueToLookAt = values[i];
- if (valueToLookAt != null) {
- break;
- }
- }
- if (valueToLookAt == null) {
- return RecordFieldType.STRING.getDataType();
- }
-
- if (valueToLookAt instanceof String) {
- return RecordFieldType.STRING.getDataType();
- }
- if (valueToLookAt instanceof Long) {
- return RecordFieldType.LONG.getDataType();
- }
- if (valueToLookAt instanceof Integer) {
- return RecordFieldType.INT.getDataType();
- }
- if (valueToLookAt instanceof Short) {
- return RecordFieldType.SHORT.getDataType();
- }
- if (valueToLookAt instanceof Byte) {
- return RecordFieldType.BYTE.getDataType();
- }
- if (valueToLookAt instanceof Float) {
- return RecordFieldType.FLOAT.getDataType();
- }
- if (valueToLookAt instanceof Double) {
- return RecordFieldType.DOUBLE.getDataType();
- }
- if (valueToLookAt instanceof Boolean) {
- return RecordFieldType.BOOLEAN.getDataType();
- }
- if (valueToLookAt instanceof Character) {
- return RecordFieldType.CHAR.getDataType();
- }
- if (valueToLookAt instanceof BigInteger) {
- return RecordFieldType.BIGINT.getDataType();
- }
- if (valueToLookAt instanceof Integer) {
- return RecordFieldType.INT.getDataType();
- }
- if (valueToLookAt instanceof java.sql.Time) {
- return RecordFieldType.TIME.getDataType();
- }
- if (valueToLookAt instanceof java.sql.Date) {
- return RecordFieldType.DATE.getDataType();
- }
- if (valueToLookAt instanceof java.sql.Timestamp) {
- return RecordFieldType.TIMESTAMP.getDataType();
- }
- if (valueToLookAt instanceof Record) {
- return RecordFieldType.RECORD.getDataType();
- }
- }
-
- return RecordFieldType.STRING.getDataType();
- }
-
-
- private static RecordFieldType getFieldType(final int sqlType) {
- switch (sqlType) {
- case Types.BIGINT:
- case Types.ROWID:
- return RecordFieldType.LONG;
- case Types.BIT:
- case Types.BOOLEAN:
- return RecordFieldType.BOOLEAN;
- case Types.CHAR:
- return RecordFieldType.CHAR;
- case Types.DATE:
- return RecordFieldType.DATE;
- case Types.DECIMAL:
- case Types.DOUBLE:
- case Types.NUMERIC:
- case Types.REAL:
- return RecordFieldType.DOUBLE;
- case Types.FLOAT:
- return RecordFieldType.FLOAT;
- case Types.INTEGER:
- return RecordFieldType.INT;
- case Types.SMALLINT:
- return RecordFieldType.SHORT;
- case Types.TINYINT:
- return RecordFieldType.BYTE;
- case Types.LONGNVARCHAR:
- case Types.LONGVARCHAR:
- case Types.NCHAR:
- case Types.NULL:
- case Types.NVARCHAR:
- case Types.VARCHAR:
- return RecordFieldType.STRING;
- case Types.OTHER:
- case Types.JAVA_OBJECT:
- return RecordFieldType.RECORD;
- case Types.TIME:
- case Types.TIME_WITH_TIMEZONE:
- return RecordFieldType.TIME;
- case Types.TIMESTAMP:
- case Types.TIMESTAMP_WITH_TIMEZONE:
- return RecordFieldType.TIMESTAMP;
- }
-
- return RecordFieldType.STRING;
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java
deleted file mode 100644
index d7f5664..0000000
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.serialization.record;
-
-import java.util.Optional;
-import java.util.OptionalInt;
-import java.util.OptionalLong;
-
-public interface SchemaIdentifier {
-
- /**
- * @return the name of the schema, if one has been defined.
- */
- Optional<String> getName();
-
- /**
- * @return the identifier of the schema, if one has been defined.
- */
- OptionalLong getIdentifier();
-
- /**
- * @return the version of the schema, if one has been defined.
- */
- OptionalInt getVersion();
-
-
- public static SchemaIdentifier EMPTY = new StandardSchemaIdentifier(null, null, null);
-
- public static SchemaIdentifier ofName(final String name) {
- return new StandardSchemaIdentifier(name, null, null);
- }
-
- public static SchemaIdentifier of(final String name, final long identifier, final int version) {
- return new StandardSchemaIdentifier(name, identifier, version);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java
deleted file mode 100644
index 86db284..0000000
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.serialization.record;
-
-import java.util.Optional;
-import java.util.OptionalInt;
-import java.util.OptionalLong;
-
-public class StandardSchemaIdentifier implements SchemaIdentifier {
- private final Optional<String> name;
- private final OptionalLong identifier;
- private final OptionalInt version;
-
- StandardSchemaIdentifier(final String name, final Long identifier, final Integer version) {
- this.name = Optional.ofNullable(name);
- this.identifier = identifier == null ? OptionalLong.empty() : OptionalLong.of(identifier);;
- this.version = version == null ? OptionalInt.empty() : OptionalInt.of(version);;
- }
-
- @Override
- public Optional<String> getName() {
- return name;
- }
-
- @Override
- public OptionalLong getIdentifier() {
- return identifier;
- }
-
- @Override
- public OptionalInt getVersion() {
- return version;
- }
-
- @Override
- public int hashCode() {
- return 31 + 41 * getName().hashCode() + 41 * getIdentifier().hashCode() + 41 * getVersion().hashCode();
- }
-
- @Override
- public boolean equals(final Object obj) {
- if (obj == this) {
- return true;
- }
- if (obj == null) {
- return false;
- }
- if (!(obj instanceof SchemaIdentifier)) {
- return false;
- }
- final SchemaIdentifier other = (SchemaIdentifier) obj;
- return getName().equals(other.getName()) && getIdentifier().equals(other.getIdentifier()) && getVersion().equals(other.getVersion());
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/TypeMismatchException.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/TypeMismatchException.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/TypeMismatchException.java
deleted file mode 100644
index af5f909..0000000
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/TypeMismatchException.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.serialization.record;
-
-public class TypeMismatchException extends RuntimeException {
- public TypeMismatchException(String message) {
- super(message);
- }
-
- public TypeMismatchException(String message, Throwable cause) {
- super(message, cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ArrayDataType.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ArrayDataType.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ArrayDataType.java
deleted file mode 100644
index 0c21239..0000000
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ArrayDataType.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.serialization.record.type;
-
-import java.util.Objects;
-
-import org.apache.nifi.serialization.record.DataType;
-import org.apache.nifi.serialization.record.RecordFieldType;
-
-public class ArrayDataType extends DataType {
- private final DataType elementType;
-
- public ArrayDataType(final DataType elementType) {
- super(RecordFieldType.ARRAY, null);
- this.elementType = elementType;
- }
-
- public DataType getElementType() {
- return elementType;
- }
-
- @Override
- public RecordFieldType getFieldType() {
- return RecordFieldType.ARRAY;
- }
-
- @Override
- public int hashCode() {
- return 31 + 41 * getFieldType().hashCode() + 41 * (elementType == null ? 0 : elementType.hashCode());
- }
-
- @Override
- public boolean equals(final Object obj) {
- if (obj == this) {
- return true;
- }
- if (obj == null) {
- return false;
- }
- if (!(obj instanceof ArrayDataType)) {
- return false;
- }
-
- final ArrayDataType other = (ArrayDataType) obj;
- return getFieldType().equals(other.getFieldType()) && Objects.equals(elementType, other.elementType);
- }
-
- @Override
- public String toString() {
- return "ARRAY[" + elementType + "]";
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ChoiceDataType.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ChoiceDataType.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ChoiceDataType.java
deleted file mode 100644
index 038b147..0000000
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ChoiceDataType.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.serialization.record.type;
-
-import java.util.List;
-import java.util.Objects;
-
-import org.apache.nifi.serialization.record.DataType;
-import org.apache.nifi.serialization.record.RecordFieldType;
-
-public class ChoiceDataType extends DataType {
- private final List<DataType> possibleSubTypes;
-
- public ChoiceDataType(final List<DataType> possibleSubTypes) {
- super(RecordFieldType.CHOICE, null);
- this.possibleSubTypes = Objects.requireNonNull(possibleSubTypes);
- }
-
- public List<DataType> getPossibleSubTypes() {
- return possibleSubTypes;
- }
-
- @Override
- public RecordFieldType getFieldType() {
- return RecordFieldType.CHOICE;
- }
-
- @Override
- public int hashCode() {
- return 31 + 41 * getFieldType().hashCode() + 41 * (possibleSubTypes == null ? 0 : possibleSubTypes.hashCode());
- }
-
- @Override
- public boolean equals(final Object obj) {
- if (obj == this) {
- return true;
- }
- if (obj == null) {
- return false;
- }
- if (!(obj instanceof ChoiceDataType)) {
- return false;
- }
-
- final ChoiceDataType other = (ChoiceDataType) obj;
- return getFieldType().equals(other.getFieldType()) && Objects.equals(possibleSubTypes, other.possibleSubTypes);
- }
-
- @Override
- public String toString() {
- return "CHOICE" + possibleSubTypes;
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/MapDataType.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/MapDataType.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/MapDataType.java
deleted file mode 100644
index a85fb5e..0000000
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/MapDataType.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.serialization.record.type;
-
-import java.util.Objects;
-
-import org.apache.nifi.serialization.record.DataType;
-import org.apache.nifi.serialization.record.RecordFieldType;
-
-public class MapDataType extends DataType {
- private final DataType valueType;
-
- public MapDataType(final DataType elementType) {
- super(RecordFieldType.MAP, null);
- this.valueType = elementType;
- }
-
- public DataType getValueType() {
- return valueType;
- }
-
- @Override
- public RecordFieldType getFieldType() {
- return RecordFieldType.MAP;
- }
-
- @Override
- public int hashCode() {
- return 31 + 41 * getFieldType().hashCode() + 41 * (valueType == null ? 0 : valueType.hashCode());
- }
-
- @Override
- public boolean equals(final Object obj) {
- if (obj == this) {
- return true;
- }
- if (obj == null) {
- return false;
- }
- if (!(obj instanceof MapDataType)) {
- return false;
- }
-
- final MapDataType other = (MapDataType) obj;
- return getValueType().equals(other.getValueType()) && Objects.equals(valueType, other.valueType);
- }
-
- @Override
- public String toString() {
- return "MAP[" + valueType + "]";
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/RecordDataType.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/RecordDataType.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/RecordDataType.java
deleted file mode 100644
index 006d34c..0000000
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/RecordDataType.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.serialization.record.type;
-
-import java.util.Objects;
-
-import org.apache.nifi.serialization.record.DataType;
-import org.apache.nifi.serialization.record.RecordFieldType;
-import org.apache.nifi.serialization.record.RecordSchema;
-
-public class RecordDataType extends DataType {
- private final RecordSchema childSchema;
-
- public RecordDataType(final RecordSchema childSchema) {
- super(RecordFieldType.RECORD, null);
- this.childSchema = childSchema;
- }
-
- @Override
- public RecordFieldType getFieldType() {
- return RecordFieldType.RECORD;
- }
-
- public RecordSchema getChildSchema() {
- return childSchema;
- }
-
- @Override
- public int hashCode() {
- return 31 + 41 * getFieldType().hashCode() + 41 * (childSchema == null ? 0 : childSchema.hashCode());
- }
-
- @Override
- public boolean equals(final Object obj) {
- if (obj == this) {
- return true;
- }
- if (obj == null) {
- return false;
- }
- if (!(obj instanceof RecordDataType)) {
- return false;
- }
-
- final RecordDataType other = (RecordDataType) obj;
- return getFieldType().equals(other.getFieldType()) && Objects.equals(childSchema, other.childSchema);
- }
-
- @Override
- public String toString() {
- return RecordFieldType.RECORD.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
deleted file mode 100644
index 05b3157..0000000
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
+++ /dev/null
@@ -1,670 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.serialization.record.util;
-
-import java.math.BigInteger;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.text.DateFormat;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.TimeZone;
-import java.util.function.Consumer;
-
-import org.apache.nifi.serialization.record.DataType;
-import org.apache.nifi.serialization.record.MapRecord;
-import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.serialization.record.RecordFieldType;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.type.ChoiceDataType;
-import org.apache.nifi.serialization.record.type.RecordDataType;
-
-public class DataTypeUtils {
-
- private static final TimeZone gmt = TimeZone.getTimeZone("gmt");
-
- public static Object convertType(final Object value, final DataType dataType, final String fieldName) {
- return convertType(value, dataType, RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), fieldName);
- }
-
- public static Object convertType(final Object value, final DataType dataType, final String dateFormat, final String timeFormat, final String timestampFormat, final String fieldName) {
- switch (dataType.getFieldType()) {
- case BIGINT:
- return toBigInt(value, fieldName);
- case BOOLEAN:
- return toBoolean(value, fieldName);
- case BYTE:
- return toByte(value, fieldName);
- case CHAR:
- return toCharacter(value, fieldName);
- case DATE:
- return toDate(value, dateFormat, fieldName);
- case DOUBLE:
- return toDouble(value, fieldName);
- case FLOAT:
- return toFloat(value, fieldName);
- case INT:
- return toInteger(value, fieldName);
- case LONG:
- return toLong(value, fieldName);
- case SHORT:
- return toShort(value, fieldName);
- case STRING:
- return toString(value, dateFormat, timeFormat, timestampFormat);
- case TIME:
- return toTime(value, timeFormat, fieldName);
- case TIMESTAMP:
- return toTimestamp(value, timestampFormat, fieldName);
- case ARRAY:
- return toArray(value, fieldName);
- case MAP:
- return toMap(value, fieldName);
- case RECORD:
- final RecordDataType recordType = (RecordDataType) dataType;
- final RecordSchema childSchema = recordType.getChildSchema();
- return toRecord(value, childSchema, fieldName);
- case CHOICE: {
- if (value == null) {
- return null;
- }
-
- final ChoiceDataType choiceDataType = (ChoiceDataType) dataType;
- final DataType chosenDataType = chooseDataType(value, choiceDataType);
- if (chosenDataType == null) {
- throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass()
- + " for field " + fieldName + " to any of the following available Sub-Types for a Choice: " + choiceDataType.getPossibleSubTypes());
- }
-
- return convertType(value, chosenDataType, fieldName);
- }
- }
-
- return null;
- }
-
-
- public static boolean isCompatibleDataType(final Object value, final DataType dataType) {
- switch (dataType.getFieldType()) {
- case ARRAY:
- return isArrayTypeCompatible(value);
- case BIGINT:
- return isBigIntTypeCompatible(value);
- case BOOLEAN:
- return isBooleanTypeCompatible(value);
- case BYTE:
- return isByteTypeCompatible(value);
- case CHAR:
- return isCharacterTypeCompatible(value);
- case DATE:
- return isDateTypeCompatible(value, dataType.getFormat());
- case DOUBLE:
- return isDoubleTypeCompatible(value);
- case FLOAT:
- return isFloatTypeCompatible(value);
- case INT:
- return isIntegerTypeCompatible(value);
- case LONG:
- return isLongTypeCompatible(value);
- case RECORD:
- return isRecordTypeCompatible(value);
- case SHORT:
- return isShortTypeCompatible(value);
- case TIME:
- return isTimeTypeCompatible(value, dataType.getFormat());
- case TIMESTAMP:
- return isTimestampTypeCompatible(value, dataType.getFormat());
- case STRING:
- return isStringTypeCompatible(value);
- case MAP:
- return isMapTypeCompatible(value);
- case CHOICE: {
- final DataType chosenDataType = chooseDataType(value, (ChoiceDataType) dataType);
- return chosenDataType != null;
- }
- }
-
- return false;
- }
-
- public static DataType chooseDataType(final Object value, final ChoiceDataType choiceType) {
- for (final DataType subType : choiceType.getPossibleSubTypes()) {
- if (isCompatibleDataType(value, subType)) {
- return subType;
- }
- }
-
- return null;
- }
-
- public static Record toRecord(final Object value, final RecordSchema recordSchema, final String fieldName) {
- if (value == null) {
- return null;
- }
-
- if (value instanceof Record) {
- return ((Record) value);
- }
-
- if (value instanceof Map) {
- if (recordSchema == null) {
- throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass()
- + " to Record for field " + fieldName + " because the value is a Map but no Record Schema was provided");
- }
-
- final Map<?, ?> map = (Map<?, ?>) value;
- final Map<String, Object> coercedValues = new HashMap<>();
-
- for (final Map.Entry<?, ?> entry : map.entrySet()) {
- final Object keyValue = entry.getKey();
- if (keyValue == null) {
- continue;
- }
-
- final String key = keyValue.toString();
- final Optional<DataType> desiredTypeOption = recordSchema.getDataType(key);
- if (!desiredTypeOption.isPresent()) {
- continue;
- }
-
- final Object rawValue = entry.getValue();
- final Object coercedValue = convertType(rawValue, desiredTypeOption.get(), fieldName);
- coercedValues.put(key, coercedValue);
- }
-
- return new MapRecord(recordSchema, coercedValues);
- }
-
- throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Record for field " + fieldName);
- }
-
- public static boolean isRecordTypeCompatible(final Object value) {
- return value != null && value instanceof Record;
- }
-
- public static Object[] toArray(final Object value, final String fieldName) {
- if (value == null) {
- return null;
- }
-
- if (value instanceof Object[]) {
- return (Object[]) value;
- }
-
- throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Object Array for field " + fieldName);
- }
-
- public static boolean isArrayTypeCompatible(final Object value) {
- return value != null && value instanceof Object[];
- }
-
- @SuppressWarnings("unchecked")
- public static Map<String, Object> toMap(final Object value, final String fieldName) {
- if (value == null) {
- return null;
- }
-
- if (value instanceof Map) {
- final Map<?, ?> original = (Map<?, ?>) value;
-
- boolean keysAreStrings = true;
- for (final Object key : original.keySet()) {
- if (!(key instanceof String)) {
- keysAreStrings = false;
- }
- }
-
- if (keysAreStrings) {
- return (Map<String, Object>) value;
- }
-
- final Map<String, Object> transformed = new HashMap<>();
- for (final Map.Entry<?, ?> entry : original.entrySet()) {
- final Object key = entry.getKey();
- if (key == null) {
- transformed.put(null, entry.getValue());
- } else {
- transformed.put(key.toString(), entry.getValue());
- }
- }
-
- return transformed;
- }
-
- if (value instanceof Record) {
- final Record record = (Record) value;
- final RecordSchema recordSchema = record.getSchema();
- if (recordSchema == null) {
- throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type Record to Map for field " + fieldName
- + " because Record does not have an associated Schema");
- }
-
- final Map<String, Object> map = new HashMap<>();
- for (final String recordFieldName : recordSchema.getFieldNames()) {
- map.put(recordFieldName, record.getValue(recordFieldName));
- }
-
- return map;
- }
-
- throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Map for field " + fieldName);
- }
-
- public static boolean isMapTypeCompatible(final Object value) {
- return value != null && value instanceof Map;
- }
-
-
- public static String toString(final Object value, final String dateFormat, final String timeFormat, final String timestampFormat) {
- if (value == null) {
- return null;
- }
-
- if (value instanceof String) {
- return (String) value;
- }
-
- if (value instanceof java.sql.Date) {
- return getDateFormat(dateFormat).format((java.util.Date) value);
- }
- if (value instanceof java.sql.Time) {
- return getDateFormat(timeFormat).format((java.util.Date) value);
- }
- if (value instanceof java.sql.Timestamp) {
- return getDateFormat(timestampFormat).format((java.util.Date) value);
- }
- if (value instanceof java.util.Date) {
- return getDateFormat(timestampFormat).format((java.util.Date) value);
- }
-
- return value.toString();
- }
-
- public static boolean isStringTypeCompatible(final Object value) {
- return value != null;
- }
-
- public static java.sql.Date toDate(final Object value, final String format, final String fieldName) {
- if (value == null) {
- return null;
- }
-
- if (value instanceof Date) {
- return (Date) value;
- }
-
- if (value instanceof Number) {
- final long longValue = ((Number) value).longValue();
- return new Date(longValue);
- }
-
- if (value instanceof String) {
- try {
- final java.util.Date utilDate = getDateFormat(format).parse((String) value);
- return new Date(utilDate.getTime());
- } catch (final ParseException e) {
- throw new IllegalTypeConversionException("Could not convert value [" + value
- + "] of type java.lang.String to Date because the value is not in the expected date format: " + format + " for field " + fieldName);
- }
- }
-
- throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Date for field " + fieldName);
- }
-
- public static boolean isDateTypeCompatible(final Object value, final String format) {
- if (value == null) {
- return false;
- }
-
- if (value instanceof java.util.Date || value instanceof Number) {
- return true;
- }
-
- if (value instanceof String) {
- try {
- getDateFormat(format).parse((String) value);
- return true;
- } catch (final ParseException e) {
- return false;
- }
- }
-
- return false;
- }
-
- public static Time toTime(final Object value, final String format, final String fieldName) {
- if (value == null) {
- return null;
- }
-
- if (value instanceof Time) {
- return (Time) value;
- }
-
- if (value instanceof Number) {
- final long longValue = ((Number) value).longValue();
- return new Time(longValue);
- }
-
- if (value instanceof String) {
- try {
- final java.util.Date utilDate = getDateFormat(format).parse((String) value);
- return new Time(utilDate.getTime());
- } catch (final ParseException e) {
- throw new IllegalTypeConversionException("Could not convert value [" + value
- + "] of type java.lang.String to Time for field " + fieldName + " because the value is not in the expected date format: " + format);
- }
- }
-
- throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Time for field " + fieldName);
- }
-
- private static DateFormat getDateFormat(final String format) {
- final DateFormat df = new SimpleDateFormat(format);
- df.setTimeZone(gmt);
- return df;
- }
-
- public static boolean isTimeTypeCompatible(final Object value, final String format) {
- return isDateTypeCompatible(value, format);
- }
-
- public static Timestamp toTimestamp(final Object value, final String format, final String fieldName) {
- if (value == null) {
- return null;
- }
-
- if (value instanceof Timestamp) {
- return (Timestamp) value;
- }
-
- if (value instanceof Number) {
- final long longValue = ((Number) value).longValue();
- return new Timestamp(longValue);
- }
-
- if (value instanceof String) {
- try {
- final java.util.Date utilDate = getDateFormat(format).parse((String) value);
- return new Timestamp(utilDate.getTime());
- } catch (final ParseException e) {
- throw new IllegalTypeConversionException("Could not convert value [" + value
- + "] of type java.lang.String to Timestamp for field " + fieldName + " because the value is not in the expected date format: " + format);
- }
- }
-
- throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Timestamp for field " + fieldName);
- }
-
- public static boolean isTimestampTypeCompatible(final Object value, final String format) {
- return isDateTypeCompatible(value, format);
- }
-
-
- public static BigInteger toBigInt(final Object value, final String fieldName) {
- if (value == null) {
- return null;
- }
-
- if (value instanceof BigInteger) {
- return (BigInteger) value;
- }
- if (value instanceof Long) {
- return BigInteger.valueOf((Long) value);
- }
-
- throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to BigInteger for field " + fieldName);
- }
-
- public static boolean isBigIntTypeCompatible(final Object value) {
- return value == null && (value instanceof BigInteger || value instanceof Long);
- }
-
- public static Boolean toBoolean(final Object value, final String fieldName) {
- if (value == null) {
- return null;
- }
-
- if (value instanceof Boolean) {
- return (Boolean) value;
- }
- if (value instanceof String) {
- final String string = (String) value;
- if (string.equalsIgnoreCase("true")) {
- return Boolean.TRUE;
- } else if (string.equalsIgnoreCase("false")) {
- return Boolean.FALSE;
- }
- }
-
- throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Boolean for field " + fieldName);
- }
-
- public static boolean isBooleanTypeCompatible(final Object value) {
- if (value == null) {
- return false;
- }
- if (value instanceof Boolean) {
- return true;
- }
- if (value instanceof String) {
- final String string = (String) value;
- return string.equalsIgnoreCase("true") || string.equalsIgnoreCase("false");
- }
- return false;
- }
-
- public static Double toDouble(final Object value, final String fieldName) {
- if (value == null) {
- return null;
- }
-
- if (value instanceof Number) {
- return ((Number) value).doubleValue();
- }
-
- if (value instanceof String) {
- return Double.parseDouble((String) value);
- }
-
- throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Double for field " + fieldName);
- }
-
- public static boolean isDoubleTypeCompatible(final Object value) {
- return isNumberTypeCompatible(value, s -> Double.parseDouble(s));
- }
-
- private static boolean isNumberTypeCompatible(final Object value, final Consumer<String> stringValueVerifier) {
- if (value == null) {
- return false;
- }
-
- if (value instanceof Number) {
- return true;
- }
-
- if (value instanceof String) {
- try {
- stringValueVerifier.accept((String) value);
- return true;
- } catch (final NumberFormatException nfe) {
- return false;
- }
- }
-
- return false;
- }
-
- public static Float toFloat(final Object value, final String fieldName) {
- if (value == null) {
- return null;
- }
-
- if (value instanceof Number) {
- return ((Number) value).floatValue();
- }
-
- if (value instanceof String) {
- return Float.parseFloat((String) value);
- }
-
- throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Float for field " + fieldName);
- }
-
- public static boolean isFloatTypeCompatible(final Object value) {
- return isNumberTypeCompatible(value, s -> Float.parseFloat(s));
- }
-
- public static Long toLong(final Object value, final String fieldName) {
- if (value == null) {
- return null;
- }
-
- if (value instanceof Number) {
- return ((Number) value).longValue();
- }
-
- if (value instanceof String) {
- return Long.parseLong((String) value);
- }
-
- if (value instanceof java.util.Date) {
- return ((java.util.Date) value).getTime();
- }
-
- throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Long for field " + fieldName);
- }
-
- public static boolean isLongTypeCompatible(final Object value) {
- if (value == null) {
- return false;
- }
-
- if (value instanceof Number) {
- return true;
- }
-
- if (value instanceof java.util.Date) {
- return true;
- }
-
- if (value instanceof String) {
- try {
- Long.parseLong((String) value);
- return true;
- } catch (final NumberFormatException nfe) {
- return false;
- }
- }
-
- return false;
- }
-
-
- public static Integer toInteger(final Object value, final String fieldName) {
- if (value == null) {
- return null;
- }
-
- if (value instanceof Number) {
- return ((Number) value).intValue();
- }
-
- if (value instanceof String) {
- return Integer.parseInt((String) value);
- }
-
- throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Integer for field " + fieldName);
- }
-
- public static boolean isIntegerTypeCompatible(final Object value) {
- return isNumberTypeCompatible(value, s -> Integer.parseInt(s));
- }
-
-
- public static Short toShort(final Object value, final String fieldName) {
- if (value == null) {
- return null;
- }
-
- if (value instanceof Number) {
- return ((Number) value).shortValue();
- }
-
- if (value instanceof String) {
- return Short.parseShort((String) value);
- }
-
- throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Short for field " + fieldName);
- }
-
- public static boolean isShortTypeCompatible(final Object value) {
- return isNumberTypeCompatible(value, s -> Short.parseShort(s));
- }
-
- public static Byte toByte(final Object value, final String fieldName) {
- if (value == null) {
- return null;
- }
-
- if (value instanceof Number) {
- return ((Number) value).byteValue();
- }
-
- if (value instanceof String) {
- return Byte.parseByte((String) value);
- }
-
- throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Byte for field " + fieldName);
- }
-
- public static boolean isByteTypeCompatible(final Object value) {
- return isNumberTypeCompatible(value, s -> Byte.parseByte(s));
- }
-
-
- public static Character toCharacter(final Object value, final String fieldName) {
- if (value == null) {
- return null;
- }
-
- if (value instanceof Character) {
- return ((Character) value);
- }
-
- if (value instanceof CharSequence) {
- final CharSequence charSeq = (CharSequence) value;
- if (charSeq.length() == 0) {
- throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass()
- + " to Character because it has a length of 0 for field " + fieldName);
- }
-
- return charSeq.charAt(0);
- }
-
- throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Character for field " + fieldName);
- }
-
- public static boolean isCharacterTypeCompatible(final Object value) {
- return value != null && (value instanceof Character || (value instanceof CharSequence && ((CharSequence) value).length() > 0));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/util/IllegalTypeConversionException.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/util/IllegalTypeConversionException.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/util/IllegalTypeConversionException.java
deleted file mode 100644
index 38b5d20..0000000
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/util/IllegalTypeConversionException.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.serialization.record.util;
-
-public class IllegalTypeConversionException extends RuntimeException {
-
- public IllegalTypeConversionException(final String message) {
- super(message);
- }
-
- public IllegalTypeConversionException(final String message, final Throwable cause) {
- super(message, cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/test/java/org/apache/nifi/serialization/TestSimpleRecordSchema.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/test/java/org/apache/nifi/serialization/TestSimpleRecordSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/test/java/org/apache/nifi/serialization/TestSimpleRecordSchema.java
deleted file mode 100644
index 5a61275..0000000
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/test/java/org/apache/nifi/serialization/TestSimpleRecordSchema.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.serialization;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.nifi.serialization.record.RecordField;
-import org.apache.nifi.serialization.record.RecordFieldType;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class TestSimpleRecordSchema {
-
- @Test
- public void testPreventsTwoFieldsWithSameAlias() {
- final List<RecordField> fields = new ArrayList<>();
- fields.add(new RecordField("hello", RecordFieldType.STRING.getDataType(), null, set("foo", "bar")));
- fields.add(new RecordField("goodbye", RecordFieldType.STRING.getDataType(), null, set("baz", "bar")));
-
- try {
- new SimpleRecordSchema(fields);
- Assert.fail("Was able to create two fields with same alias");
- } catch (final IllegalArgumentException expected) {
- }
- }
-
- @Test
- public void testPreventsTwoFieldsWithSameName() {
- final List<RecordField> fields = new ArrayList<>();
- fields.add(new RecordField("hello", RecordFieldType.STRING.getDataType(), null, set("foo", "bar")));
- fields.add(new RecordField("hello", RecordFieldType.STRING.getDataType()));
-
- try {
- new SimpleRecordSchema(fields);
- Assert.fail("Was able to create two fields with same name");
- } catch (final IllegalArgumentException expected) {
- }
- }
-
- @Test
- public void testPreventsTwoFieldsWithConflictingNamesAliases() {
- final List<RecordField> fields = new ArrayList<>();
- fields.add(new RecordField("hello", RecordFieldType.STRING.getDataType(), null, set("foo", "bar")));
- fields.add(new RecordField("bar", RecordFieldType.STRING.getDataType()));
-
- try {
- new SimpleRecordSchema(fields);
- Assert.fail("Was able to create two fields with conflicting names/aliases");
- } catch (final IllegalArgumentException expected) {
- }
- }
-
- private Set<String> set(final String... values) {
- final Set<String> set = new HashSet<>();
- for (final String value : values) {
- set.add(value);
- }
- return set;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/test/java/org/apache/nifi/serialization/record/TestMapRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/test/java/org/apache/nifi/serialization/record/TestMapRecord.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/test/java/org/apache/nifi/serialization/record/TestMapRecord.java
deleted file mode 100644
index 82e20a6..0000000
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/test/java/org/apache/nifi/serialization/record/TestMapRecord.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.serialization.record;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.nifi.serialization.SimpleRecordSchema;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class TestMapRecord {
-
- @Test
- public void testDefaultValue() {
- final List<RecordField> fields = new ArrayList<>();
- fields.add(new RecordField("noDefault", RecordFieldType.STRING.getDataType()));
- fields.add(new RecordField("defaultOfHello", RecordFieldType.STRING.getDataType(), "hello"));
-
- final RecordSchema schema = new SimpleRecordSchema(fields);
- final Map<String, Object> values = new HashMap<>();
- final Record record = new MapRecord(schema, values);
-
- assertNull(record.getValue("noDefault"));
- assertEquals("hello", record.getValue("defaultOfHello"));
- }
-
- @Test
- public void testDefaultValueInGivenField() {
- final List<RecordField> fields = new ArrayList<>();
- fields.add(new RecordField("noDefault", RecordFieldType.STRING.getDataType()));
- fields.add(new RecordField("defaultOfHello", RecordFieldType.STRING.getDataType(), "hello"));
-
- final RecordSchema schema = new SimpleRecordSchema(fields);
- final Map<String, Object> values = new HashMap<>();
- final Record record = new MapRecord(schema, values);
-
- assertNull(record.getValue("noDefault"));
- assertEquals("hello", record.getValue("defaultOfHello"));
-
- final RecordField newField = new RecordField("noDefault", RecordFieldType.STRING.getDataType(), "new");
- assertEquals("new", record.getValue(newField));
- }
-
- @Test
- public void testIllegalDefaultValue() {
- new RecordField("hello", RecordFieldType.STRING.getDataType(), 84);
- new RecordField("hello", RecordFieldType.STRING.getDataType(), (Object) null);
- new RecordField("hello", RecordFieldType.INT.getDataType(), 84);
- new RecordField("hello", RecordFieldType.INT.getDataType(), (Object) null);
-
- try {
- new RecordField("hello", RecordFieldType.INT.getDataType(), "foo");
- Assert.fail("Was able to set a default value of \"foo\" for INT type");
- } catch (final IllegalArgumentException expected) {
- // expected
- }
- }
-
- private Set<String> set(final String... values) {
- final Set<String> set = new HashSet<>();
- for (final String value : values) {
- set.add(value);
- }
- return set;
- }
-
- @Test
- public void testAliasOneValue() {
- final List<RecordField> fields = new ArrayList<>();
- fields.add(new RecordField("foo", RecordFieldType.STRING.getDataType(), null, set("bar", "baz")));
-
- final RecordSchema schema = new SimpleRecordSchema(fields);
- final Map<String, Object> values = new HashMap<>();
- values.put("bar", 1);
-
- final Record record = new MapRecord(schema, values);
- assertEquals(1, record.getValue("foo"));
- assertEquals(1, record.getValue("bar"));
- assertEquals(1, record.getValue("baz"));
- }
-
- @Test
- public void testAliasConflictingValues() {
- final List<RecordField> fields = new ArrayList<>();
- fields.add(new RecordField("foo", RecordFieldType.STRING.getDataType(), null, set("bar", "baz")));
-
- final RecordSchema schema = new SimpleRecordSchema(fields);
- final Map<String, Object> values = new HashMap<>();
- values.put("bar", 1);
- values.put("foo", null);
-
- final Record record = new MapRecord(schema, values);
- assertEquals(1, record.getValue("foo"));
- assertEquals(1, record.getValue("bar"));
- assertEquals(1, record.getValue("baz"));
- }
-
- @Test
- public void testAliasConflictingAliasValues() {
- final List<RecordField> fields = new ArrayList<>();
- fields.add(new RecordField("foo", RecordFieldType.STRING.getDataType(), null, set("bar", "baz")));
-
- final RecordSchema schema = new SimpleRecordSchema(fields);
- final Map<String, Object> values = new HashMap<>();
- values.put("baz", 1);
- values.put("bar", 33);
-
- final Record record = new MapRecord(schema, values);
- assertEquals(33, record.getValue("foo"));
- assertEquals(33, record.getValue("bar"));
- assertEquals(33, record.getValue("baz"));
- }
-
- @Test
- public void testAliasInGivenField() {
- final List<RecordField> fields = new ArrayList<>();
- fields.add(new RecordField("foo", RecordFieldType.STRING.getDataType(), null, set("bar", "baz")));
-
- final RecordSchema schema = new SimpleRecordSchema(fields);
- final Map<String, Object> values = new HashMap<>();
- values.put("bar", 33);
-
- final Record record = new MapRecord(schema, values);
- assertEquals(33, record.getValue("foo"));
- assertEquals(33, record.getValue("bar"));
- assertEquals(33, record.getValue("baz"));
-
- final RecordField noAlias = new RecordField("hello", RecordFieldType.STRING.getDataType());
- assertNull(record.getValue(noAlias));
-
- final RecordField withAlias = new RecordField("hello", RecordFieldType.STRING.getDataType(), null, set("baz"));
- assertEquals(33, record.getValue(withAlias));
- assertEquals("33", record.getAsString(withAlias, withAlias.getDataType().getFormat()));
- }
-
-
- @Test
- public void testDefaultValueWithAliasValue() {
- final List<RecordField> fields = new ArrayList<>();
- fields.add(new RecordField("foo", RecordFieldType.STRING.getDataType(), "hello", set("bar", "baz")));
-
- final RecordSchema schema = new SimpleRecordSchema(fields);
- final Map<String, Object> values = new HashMap<>();
- values.put("baz", 1);
- values.put("bar", 33);
-
- final Record record = new MapRecord(schema, values);
- assertEquals(33, record.getValue("foo"));
- assertEquals(33, record.getValue("bar"));
- assertEquals(33, record.getValue("baz"));
- }
-
- @Test
- public void testDefaultValueWithAliasesDefined() {
- final List<RecordField> fields = new ArrayList<>();
- fields.add(new RecordField("foo", RecordFieldType.STRING.getDataType(), "hello", set("bar", "baz")));
-
- final RecordSchema schema = new SimpleRecordSchema(fields);
- final Map<String, Object> values = new HashMap<>();
- final Record record = new MapRecord(schema, values);
- assertEquals("hello", record.getValue("foo"));
- assertEquals("hello", record.getValue("bar"));
- assertEquals("hello", record.getValue("baz"));
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
index 16479f1..81a6775 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
@@ -35,9 +35,17 @@
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
<artifactId>nifi-schema-registry-service-api</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-avro-record-utils</artifactId>
+ </dependency>
+ <dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
index f5b4373..ae6254e 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
@@ -27,6 +27,7 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
@@ -50,7 +51,7 @@ public class AvroReader extends SchemaRegistryService implements RecordReaderFac
@Override
public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in, final ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException {
- final String schemaAccessStrategy = getConfigurationContext().getProperty(SCHEMA_ACCESS_STRATEGY).getValue();
+ final String schemaAccessStrategy = getConfigurationContext().getProperty(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY).getValue();
if (EMBEDDED_AVRO_SCHEMA.getValue().equals(schemaAccessStrategy)) {
return new AvroReaderWithEmbeddedSchema(in);
} else {
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java
index 621ec74..13a8317 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java
@@ -17,39 +17,20 @@
package org.apache.nifi.avro;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.avro.LogicalType;
-import org.apache.avro.LogicalTypes;
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Field;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericData.Array;
-import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.util.Utf8;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
-import org.apache.nifi.serialization.SimpleRecordSchema;
-import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.util.DataTypeUtils;
-public abstract class AvroRecordReader implements RecordReader {
+import java.io.IOException;
+import java.util.Map;
+public abstract class AvroRecordReader implements RecordReader {
protected abstract GenericRecord nextAvroRecord() throws IOException;
-
@Override
public Record nextRecord() throws IOException, MalformedRecordException {
GenericRecord record = nextAvroRecord();
@@ -58,148 +39,8 @@ public abstract class AvroRecordReader implements RecordReader {
}
final RecordSchema schema = getSchema();
- final Map<String, Object> values = convertAvroRecordToMap(record, schema);
+ final Map<String, Object> values = AvroTypeUtil.convertAvroRecordToMap(record, schema);
return new MapRecord(schema, values);
}
-
- private Map<String, Object> convertAvroRecordToMap(final GenericRecord avroRecord, final RecordSchema recordSchema) {
- final Map<String, Object> values = new HashMap<>(recordSchema.getFieldCount());
-
- for (final RecordField recordField : recordSchema.getFields()) {
- Object value = avroRecord.get(recordField.getFieldName());
- if (value == null) {
- for (final String alias : recordField.getAliases()) {
- value = avroRecord.get(alias);
- if (value != null) {
- break;
- }
- }
- }
-
- final String fieldName = recordField.getFieldName();
- final Field avroField = avroRecord.getSchema().getField(fieldName);
- if (avroField == null) {
- values.put(fieldName, null);
- continue;
- }
-
- final Schema fieldSchema = avroField.schema();
- final Object rawValue = normalizeValue(value, fieldSchema);
-
- final DataType desiredType = recordField.getDataType();
- final Object coercedValue = DataTypeUtils.convertType(rawValue, desiredType, fieldName);
-
- values.put(fieldName, coercedValue);
- }
-
- return values;
- }
-
- private Object normalizeValue(final Object value, final Schema avroSchema) {
- if (value == null) {
- return null;
- }
-
- switch (avroSchema.getType()) {
- case INT: {
- final LogicalType logicalType = avroSchema.getLogicalType();
- if (logicalType == null) {
- return value;
- }
-
- final String logicalName = logicalType.getName();
- if (LogicalTypes.date().getName().equals(logicalName)) {
- // date logical name means that the value is number of days since Jan 1, 1970
- return new java.sql.Date(TimeUnit.DAYS.toMillis((int) value));
- } else if (LogicalTypes.timeMillis().equals(logicalName)) {
- // time-millis logical name means that the value is number of milliseconds since midnight.
- return new java.sql.Time((int) value);
- }
-
- break;
- }
- case LONG: {
- final LogicalType logicalType = avroSchema.getLogicalType();
- if (logicalType == null) {
- return value;
- }
-
- final String logicalName = logicalType.getName();
- if (LogicalTypes.timeMicros().getName().equals(logicalName)) {
- return new java.sql.Time(TimeUnit.MICROSECONDS.toMillis((long) value));
- } else if (LogicalTypes.timestampMillis().getName().equals(logicalName)) {
- return new java.sql.Timestamp((long) value);
- } else if (LogicalTypes.timestampMicros().getName().equals(logicalName)) {
- return new java.sql.Timestamp(TimeUnit.MICROSECONDS.toMillis((long) value));
- }
- break;
- }
- case UNION:
- if (value instanceof GenericData.Record) {
- final GenericData.Record avroRecord = (GenericData.Record) value;
- return normalizeValue(value, avroRecord.getSchema());
- }
- break;
- case RECORD:
- final GenericData.Record record = (GenericData.Record) value;
- final Schema recordSchema = record.getSchema();
- final List<Field> recordFields = recordSchema.getFields();
- final Map<String, Object> values = new HashMap<>(recordFields.size());
- for (final Field field : recordFields) {
- final Object avroFieldValue = record.get(field.name());
- final Object fieldValue = normalizeValue(avroFieldValue, field.schema());
- values.put(field.name(), fieldValue);
- }
- final RecordSchema childSchema = AvroTypeUtil.createSchema(recordSchema);
- return new MapRecord(childSchema, values);
- case BYTES:
- final ByteBuffer bb = (ByteBuffer) value;
- return AvroTypeUtil.convertByteArray(bb.array());
- case FIXED:
- final GenericFixed fixed = (GenericFixed) value;
- return AvroTypeUtil.convertByteArray(fixed.bytes());
- case ENUM:
- return value.toString();
- case NULL:
- return null;
- case STRING:
- return value.toString();
- case ARRAY:
- final Array<?> array = (Array<?>) value;
- final Object[] valueArray = new Object[array.size()];
- for (int i = 0; i < array.size(); i++) {
- final Schema elementSchema = avroSchema.getElementType();
- valueArray[i] = normalizeValue(array.get(i), elementSchema);
- }
- return valueArray;
- case MAP:
- final Map<?, ?> avroMap = (Map<?, ?>) value;
- final Map<String, Object> map = new HashMap<>(avroMap.size());
- for (final Map.Entry<?, ?> entry : avroMap.entrySet()) {
- Object obj = entry.getValue();
- if (obj instanceof Utf8 || obj instanceof CharSequence) {
- obj = obj.toString();
- }
-
- final String key = entry.getKey().toString();
- obj = normalizeValue(obj, avroSchema.getValueType());
-
- map.put(key, obj);
- }
-
- final DataType elementType = AvroTypeUtil.determineDataType(avroSchema.getValueType());
- final List<RecordField> mapFields = new ArrayList<>();
- for (final String key : map.keySet()) {
- mapFields.add(new RecordField(key, elementType));
- }
- final RecordSchema mapSchema = new SimpleRecordSchema(mapFields);
- return new MapRecord(mapSchema, map);
- }
-
- return value;
- }
-
-
-
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
index 381e978..62e53ea 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
@@ -33,6 +33,7 @@ import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.schema.access.SchemaField;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
@@ -60,7 +61,7 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
- properties.add(SCHEMA_ACCESS_STRATEGY);
+ properties.add(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY);
properties.add(SCHEMA_REGISTRY);
return properties;
}