You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2018/04/09 18:53:47 UTC
nifi git commit: NIFI-4857: Support String<->byte[] conversion for
records This closes #2570.
Repository: nifi
Updated Branches:
refs/heads/master 5f16f48a2 -> b29304df7
NIFI-4857: Support String<->byte[] conversion for records
This closes #2570.
Signed-off-by: Mark Payne <ma...@hotmail.com>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b29304df
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b29304df
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b29304df
Branch: refs/heads/master
Commit: b29304df79e78c5687b0c9411d5fab6cb93e6541
Parents: 5f16f48
Author: Matthew Burgess <ma...@apache.org>
Authored: Tue Mar 20 22:02:47 2018 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Apr 9 14:53:21 2018 -0400
----------------------------------------------------------------------
.../nifi/record/path/functions/ToBytes.java | 76 +++++++++++++
.../nifi/record/path/functions/ToString.java | 83 ++++++++++++++
.../record/path/paths/RecordPathCompiler.java | 10 ++
.../apache/nifi/record/path/TestRecordPath.java | 67 ++++++++++++
.../nifi/serialization/record/MapRecord.java | 3 +-
.../record/util/DataTypeUtils.java | 108 +++++++++++++++++--
.../serialization/record/TestDataTypeUtils.java | 34 +++++-
.../src/main/asciidoc/record-path-guide.adoc | 57 ++++++++++
.../java/org/apache/nifi/avro/AvroTypeUtil.java | 47 +++++---
.../org/apache/nifi/avro/TestAvroTypeUtil.java | 27 ++++-
10 files changed, 483 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/b29304df/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ToBytes.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ToBytes.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ToBytes.java
new file mode 100644
index 0000000..47275cf
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ToBytes.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.record.path.functions;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.StandardFieldValue;
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+import org.apache.nifi.record.path.util.RecordPathUtils;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+import java.nio.charset.Charset;
+import java.util.stream.Stream;
+
+public class ToBytes extends RecordPathSegment {
+
+ private final RecordPathSegment recordPath;
+ private final RecordPathSegment charsetSegment;
+
+ public ToBytes(final RecordPathSegment recordPath, final RecordPathSegment charsetSegment, final boolean absolute) {
+ super("toBytes", null, absolute);
+ this.recordPath = recordPath;
+ this.charsetSegment = charsetSegment;
+ }
+
+ @Override
+ public Stream<FieldValue> evaluate(RecordPathEvaluationContext context) {
+ final Stream<FieldValue> fieldValues = recordPath.evaluate(context);
+ return fieldValues.filter(fv -> fv.getValue() != null)
+ .map(fv -> {
+
+ if (!(fv.getValue() instanceof String)) {
+ throw new IllegalArgumentException("Argument supplied to toBytes must be a String");
+ }
+
+ final Charset charset = getCharset(this.charsetSegment, context);
+
+ final byte[] bytesValue;
+ Byte[] src = (Byte[]) DataTypeUtils.toArray(fv.getValue(), fv.getField().getFieldName(), RecordFieldType.BYTE.getDataType(), charset);
+ bytesValue = new byte[src.length];
+ for (int i = 0; i < src.length; i++) {
+ bytesValue[i] = src[i];
+ }
+
+ return new StandardFieldValue(bytesValue, fv.getField(), fv.getParent().orElse(null));
+ });
+ }
+
+ private Charset getCharset(final RecordPathSegment charsetSegment, final RecordPathEvaluationContext context) {
+ if (charsetSegment == null) {
+ return null;
+ }
+
+ final String charsetString = RecordPathUtils.getFirstStringValue(charsetSegment, context);
+ if (charsetString == null || charsetString.isEmpty()) {
+ return null;
+ }
+
+ return DataTypeUtils.getCharset(charsetString);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/b29304df/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ToString.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ToString.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ToString.java
new file mode 100644
index 0000000..7204ffc
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ToString.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.record.path.functions;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.StandardFieldValue;
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+import org.apache.nifi.record.path.util.RecordPathUtils;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+import java.nio.charset.Charset;
+import java.util.stream.Stream;
+
+public class ToString extends RecordPathSegment {
+
+ private final RecordPathSegment recordPath;
+ private final RecordPathSegment charsetSegment;
+
+ public ToString(final RecordPathSegment recordPath, final RecordPathSegment charsetSegment, final boolean absolute) {
+ super("toString", null, absolute);
+ this.recordPath = recordPath;
+ this.charsetSegment = charsetSegment;
+ }
+
+ @Override
+ public Stream<FieldValue> evaluate(RecordPathEvaluationContext context) {
+ final Stream<FieldValue> fieldValues = recordPath.evaluate(context);
+ return fieldValues.filter(fv -> fv.getValue() != null)
+ .map(fv -> {
+ final Charset charset = getCharset(this.charsetSegment, context);
+ Object value = fv.getValue();
+ final String stringValue;
+
+ if (value instanceof Object[]) {
+ Object[] o = (Object[]) value;
+ if (o.length > 0) {
+
+ byte[] dest = new byte[o.length];
+ for (int i = 0; i < o.length; i++) {
+ dest[i] = (byte) o[i];
+ }
+ stringValue = new String(dest, charset);
+ } else {
+ stringValue = ""; // Empty array = empty string
+ }
+ } else if (!(fv.getValue() instanceof byte[])) {
+ stringValue = fv.getValue().toString();
+ } else {
+ stringValue = DataTypeUtils.toString(fv.getValue(), (String) null, charset);
+ }
+ return new StandardFieldValue(stringValue, fv.getField(), fv.getParent().orElse(null));
+ });
+ }
+
+ private Charset getCharset(final RecordPathSegment charsetSegment, final RecordPathEvaluationContext context) {
+ if (charsetSegment == null) {
+ return null;
+ }
+
+ final String charsetString = RecordPathUtils.getFirstStringValue(charsetSegment, context);
+ if (charsetString == null || charsetString.isEmpty()) {
+ return null;
+ }
+
+ return DataTypeUtils.getCharset(charsetString);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/b29304df/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
index cfb5c06..9a2821a 100644
--- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
@@ -75,7 +75,9 @@ import org.apache.nifi.record.path.functions.SubstringAfter;
import org.apache.nifi.record.path.functions.SubstringAfterLast;
import org.apache.nifi.record.path.functions.SubstringBefore;
import org.apache.nifi.record.path.functions.SubstringBeforeLast;
+import org.apache.nifi.record.path.functions.ToBytes;
import org.apache.nifi.record.path.functions.ToDate;
+import org.apache.nifi.record.path.functions.ToString;
public class RecordPathCompiler {
@@ -250,6 +252,14 @@ public class RecordPathCompiler {
final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
return new ToDate(args[0], args[1], absolute);
}
+ case "toString": {
+ final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
+ return new ToString(args[0], args[1], absolute);
+ }
+ case "toBytes": {
+ final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
+ return new ToBytes(args[0], args[1], absolute);
+ }
case "format": {
final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
return new Format(args[0], args[1], absolute);
http://git-wip-us.apache.org/repos/asf/nifi/blob/b29304df/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java b/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java
index 67e390a..dbf5fba 100644
--- a/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java
+++ b/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java
@@ -21,6 +21,8 @@ import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import java.nio.charset.IllegalCharsetNameException;
+import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.text.DateFormat;
import java.text.ParseException;
@@ -1210,6 +1212,71 @@ public class TestRecordPath {
assertEquals("John Doe", RecordPath.compile("format(/name, 'yyyy-MM')").evaluate(record).getSelectedFields().findFirst().get().getValue());
}
+ @Test
+ public void testToString() {
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
+ fields.add(new RecordField("bytes", RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()))));
+
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+
+ final Map<String, Object> values = new HashMap<>();
+ values.put("id", 48);
+ values.put("bytes", "Hello World!".getBytes(StandardCharsets.UTF_16));
+ final Record record = new MapRecord(schema, values);
+
+ assertEquals("Hello World!", RecordPath.compile("toString(/bytes, \"UTF-16\")").evaluate(record).getSelectedFields().findFirst().get().getValue());
+ }
+
+ @Test(expected = IllegalCharsetNameException.class)
+ public void testToStringBadCharset() {
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
+ fields.add(new RecordField("bytes", RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()))));
+
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+
+ final Map<String, Object> values = new HashMap<>();
+ values.put("id", 48);
+ values.put("bytes", "Hello World!".getBytes(StandardCharsets.UTF_16));
+ final Record record = new MapRecord(schema, values);
+
+ RecordPath.compile("toString(/bytes, \"NOT A REAL CHARSET\")").evaluate(record).getSelectedFields().findFirst().get().getValue();
+ }
+
+ @Test
+ public void testToBytes() {
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
+ fields.add(new RecordField("s", RecordFieldType.STRING.getDataType()));
+
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+
+ final Map<String, Object> values = new HashMap<>();
+ values.put("id", 48);
+ values.put("s", "Hello World!");
+ final Record record = new MapRecord(schema, values);
+
+ assertArrayEquals("Hello World!".getBytes(StandardCharsets.UTF_16LE),
+ (byte[]) RecordPath.compile("toBytes(/s, \"UTF-16LE\")").evaluate(record).getSelectedFields().findFirst().get().getValue());
+ }
+
+ @Test(expected = IllegalCharsetNameException.class)
+ public void testToBytesBadCharset() {
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
+ fields.add(new RecordField("s", RecordFieldType.STRING.getDataType()));
+
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+
+ final Map<String, Object> values = new HashMap<>();
+ values.put("id", 48);
+ values.put("s", "Hello World!");
+ final Record record = new MapRecord(schema, values);
+
+ RecordPath.compile("toBytes(/s, \"NOT A REAL CHARSET\")").evaluate(record).getSelectedFields().findFirst().get().getValue();
+ }
+
private List<RecordField> getDefaultFields() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
http://git-wip-us.apache.org/repos/asf/nifi/blob/b29304df/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
index 0335bd2..57b7ac3 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
@@ -17,6 +17,7 @@
package org.apache.nifi.serialization.record;
+import java.nio.charset.StandardCharsets;
import java.text.DateFormat;
import java.util.Date;
import java.util.HashMap;
@@ -268,7 +269,7 @@ public class MapRecord implements Record {
@Override
public Object[] getAsArray(final String fieldName) {
- return DataTypeUtils.toArray(getValue(fieldName), fieldName);
+ return DataTypeUtils.toArray(getValue(fieldName), fieldName, null, StandardCharsets.UTF_8);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/b29304df/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
index 145c17c..477b02a 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
@@ -18,6 +18,8 @@
package org.apache.nifi.serialization.record.util;
import java.math.BigInteger;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
@@ -84,7 +86,11 @@ public class DataTypeUtils {
private static final Supplier<DateFormat> DEFAULT_TIMESTAMP_FORMAT = () -> getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat());
public static Object convertType(final Object value, final DataType dataType, final String fieldName) {
- return convertType(value, dataType, DEFAULT_DATE_FORMAT, DEFAULT_TIME_FORMAT, DEFAULT_TIMESTAMP_FORMAT, fieldName);
+ return convertType(value, dataType, fieldName, StandardCharsets.UTF_8);
+ }
+
+ public static Object convertType(final Object value, final DataType dataType, final String fieldName, final Charset charset) {
+ return convertType(value, dataType, DEFAULT_DATE_FORMAT, DEFAULT_TIME_FORMAT, DEFAULT_TIMESTAMP_FORMAT, fieldName, charset);
}
public static DateFormat getDateFormat(final RecordFieldType fieldType, final Supplier<DateFormat> dateFormat,
@@ -102,7 +108,12 @@ public class DataTypeUtils {
}
public static Object convertType(final Object value, final DataType dataType, final Supplier<DateFormat> dateFormat, final Supplier<DateFormat> timeFormat,
- final Supplier<DateFormat> timestampFormat, final String fieldName) {
+ final Supplier<DateFormat> timestampFormat, final String fieldName) {
+ return convertType(value, dataType, dateFormat, timeFormat, timestampFormat, fieldName, StandardCharsets.UTF_8);
+ }
+
+ public static Object convertType(final Object value, final DataType dataType, final Supplier<DateFormat> dateFormat, final Supplier<DateFormat> timeFormat,
+ final Supplier<DateFormat> timestampFormat, final String fieldName, final Charset charset) {
if (value == null) {
return null;
@@ -130,19 +141,19 @@ public class DataTypeUtils {
case SHORT:
return toShort(value, fieldName);
case STRING:
- return toString(value, () -> getDateFormat(dataType.getFieldType(), dateFormat, timeFormat, timestampFormat));
+ return toString(value, () -> getDateFormat(dataType.getFieldType(), dateFormat, timeFormat, timestampFormat), charset);
case TIME:
return toTime(value, timeFormat, fieldName);
case TIMESTAMP:
return toTimestamp(value, timestampFormat, fieldName);
case ARRAY:
- return toArray(value, fieldName);
+ return toArray(value, fieldName, ((ArrayDataType)dataType).getElementType(), charset);
case MAP:
return toMap(value, fieldName);
case RECORD:
final RecordDataType recordType = (RecordDataType) dataType;
final RecordSchema childSchema = recordType.getChildSchema();
- return toRecord(value, childSchema, fieldName);
+ return toRecord(value, childSchema, fieldName, charset);
case CHOICE: {
final ChoiceDataType choiceDataType = (ChoiceDataType) dataType;
final DataType chosenDataType = chooseDataType(value, choiceDataType);
@@ -151,7 +162,7 @@ public class DataTypeUtils {
+ " for field " + fieldName + " to any of the following available Sub-Types for a Choice: " + choiceDataType.getPossibleSubTypes());
}
- return convertType(value, chosenDataType, fieldName);
+ return convertType(value, chosenDataType, fieldName, charset);
}
}
@@ -162,7 +173,7 @@ public class DataTypeUtils {
public static boolean isCompatibleDataType(final Object value, final DataType dataType) {
switch (dataType.getFieldType()) {
case ARRAY:
- return isArrayTypeCompatible(value);
+ return isArrayTypeCompatible(value, ((ArrayDataType) dataType).getElementType());
case BIGINT:
return isBigIntTypeCompatible(value);
case BOOLEAN:
@@ -217,6 +228,10 @@ public class DataTypeUtils {
}
public static Record toRecord(final Object value, final RecordSchema recordSchema, final String fieldName) {
+ return toRecord(value, recordSchema, fieldName, StandardCharsets.UTF_8);
+ }
+
+ public static Record toRecord(final Object value, final RecordSchema recordSchema, final String fieldName, final Charset charset) {
if (value == null) {
return null;
}
@@ -247,7 +262,7 @@ public class DataTypeUtils {
}
final Object rawValue = entry.getValue();
- final Object coercedValue = convertType(rawValue, desiredTypeOption.get(), fieldName);
+ final Object coercedValue = convertType(rawValue, desiredTypeOption.get(), fieldName, charset);
coercedValues.put(key, coercedValue);
}
@@ -261,7 +276,11 @@ public class DataTypeUtils {
return value != null && value instanceof Record;
}
- public static Object[] toArray(final Object value, final String fieldName) {
+ public static Object[] toArray(final Object value, final String fieldName, final DataType elementDataType) {
+ return toArray(value, fieldName, elementDataType, StandardCharsets.UTF_8);
+ }
+
+ public static Object[] toArray(final Object value, final String fieldName, final DataType elementDataType, final Charset charset) {
if (value == null) {
return null;
}
@@ -270,11 +289,32 @@ public class DataTypeUtils {
return (Object[]) value;
}
+ if (value instanceof String && RecordFieldType.BYTE.getDataType().equals(elementDataType)) {
+ byte[] src = ((String) value).getBytes(charset);
+ Byte[] dest = new Byte[src.length];
+ for (int i = 0; i < src.length; i++) {
+ dest[i] = src[i];
+ }
+ return dest;
+ }
+
+ if (value instanceof byte[]) {
+ byte[] src = (byte[]) value;
+ Byte[] dest = new Byte[src.length];
+ for (int i = 0; i < src.length; i++) {
+ dest[i] = src[i];
+ }
+ return dest;
+ }
+
throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Object Array for field " + fieldName);
}
- public static boolean isArrayTypeCompatible(final Object value) {
- return value != null && value instanceof Object[];
+ public static boolean isArrayTypeCompatible(final Object value, final DataType elementDataType) {
+ return value != null
+ // Either an object array or a String to be converted to byte[]
+ && (value instanceof Object[]
+ || (value instanceof String && RecordFieldType.BYTE.getDataType().equals(elementDataType)));
}
@SuppressWarnings("unchecked")
@@ -416,6 +456,10 @@ public class DataTypeUtils {
public static String toString(final Object value, final Supplier<DateFormat> format) {
+ return toString(value, format, StandardCharsets.UTF_8);
+ }
+
+ public static String toString(final Object value, final Supplier<DateFormat> format, final Charset charset) {
if (value == null) {
return null;
}
@@ -432,6 +476,32 @@ public class DataTypeUtils {
return formatDate((java.util.Date) value, format);
}
+ if (value instanceof byte[]) {
+ return new String((byte[])value, charset);
+ }
+
+ if (value instanceof Byte[]) {
+ Byte[] src = (Byte[]) value;
+ byte[] dest = new byte[src.length];
+ for(int i=0;i<src.length;i++) {
+ dest[i] = src[i];
+ }
+ return new String(dest, charset);
+ }
+ if (value instanceof Object[]) {
+ Object[] o = (Object[]) value;
+ if (o.length > 0) {
+
+ byte[] dest = new byte[o.length];
+ for (int i = 0; i < o.length; i++) {
+ dest[i] = (byte) o[i];
+ }
+ return new String(dest, charset);
+ } else {
+ return ""; // Empty array = empty string
+ }
+ }
+
return value.toString();
}
@@ -445,6 +515,10 @@ public class DataTypeUtils {
}
public static String toString(final Object value, final String format) {
+ return toString(value, format, StandardCharsets.UTF_8);
+ }
+
+ public static String toString(final Object value, final String format, final Charset charset) {
if (value == null) {
return null;
}
@@ -474,6 +548,10 @@ public class DataTypeUtils {
return Arrays.toString((Object[]) value);
}
+ if (value instanceof byte[]) {
+ return new String((byte[]) value, charset);
+ }
+
return value.toString();
}
@@ -1100,4 +1178,12 @@ public class DataTypeUtils {
return true;
}
+
+ public static Charset getCharset(String charsetName) {
+ if(charsetName == null) {
+ return StandardCharsets.UTF_8;
+ } else {
+ return Charset.forName(charsetName);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/b29304df/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
index a239ea7..2c068c2 100644
--- a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
+++ b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
@@ -21,6 +21,7 @@ import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.junit.Test;
+import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashMap;
@@ -124,8 +125,8 @@ public class TestDataTypeUtils {
complexValueRecord2.put("a",new String[] {"hello","world!"});
complexValueRecord2.put("b",new String[] {"5","4","3"});
- complexValues.put("complex1", DataTypeUtils.toRecord(complexValueRecord1, nestedRecordSchema, "complex1"));
- complexValues.put("complex2", DataTypeUtils.toRecord(complexValueRecord2, nestedRecordSchema, "complex2"));
+ complexValues.put("complex1", DataTypeUtils.toRecord(complexValueRecord1, nestedRecordSchema, "complex1", StandardCharsets.UTF_8));
+ complexValues.put("complex2", DataTypeUtils.toRecord(complexValueRecord2, nestedRecordSchema, "complex2", StandardCharsets.UTF_8));
values.put("complex", complexValues);
final Record inputRecord = new MapRecord(schema, values);
@@ -165,4 +166,33 @@ public class TestDataTypeUtils {
assertEquals("4", ((String[])o)[1]);
}
+
+ @Test
+ public void testStringToBytes() {
+ Object bytes = DataTypeUtils.convertType("Hello", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()),null, StandardCharsets.UTF_8);
+ assertTrue(bytes instanceof Byte[]);
+ assertNotNull(bytes);
+ Byte[] b = (Byte[]) bytes;
+ assertEquals("Conversion from String to byte[] failed", (long) 72, (long) b[0] ); // H
+ assertEquals("Conversion from String to byte[] failed", (long) 101, (long) b[1] ); // e
+ assertEquals("Conversion from String to byte[] failed", (long) 108, (long) b[2] ); // l
+ assertEquals("Conversion from String to byte[] failed", (long) 108, (long) b[3] ); // l
+ assertEquals("Conversion from String to byte[] failed", (long) 111, (long) b[4] ); // o
+ }
+
+ @Test
+ public void testBytesToString() {
+ Object s = DataTypeUtils.convertType("Hello".getBytes(StandardCharsets.UTF_16), RecordFieldType.STRING.getDataType(),null, StandardCharsets.UTF_16);
+ assertNotNull(s);
+ assertTrue(s instanceof String);
+ assertEquals("Conversion from byte[] to String failed", "Hello", s);
+ }
+
+ @Test
+ public void testBytesToBytes() {
+ Object b = DataTypeUtils.convertType("Hello".getBytes(StandardCharsets.UTF_16), RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()),null, StandardCharsets.UTF_16);
+ assertNotNull(b);
+ assertTrue(b instanceof Byte[]);
+ assertEquals("Conversion from byte[] to String failed at char 0", (Object) "Hello".getBytes(StandardCharsets.UTF_16)[0], ((Byte[]) b)[0]);
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/b29304df/nifi-docs/src/main/asciidoc/record-path-guide.adoc
----------------------------------------------------------------------
diff --git a/nifi-docs/src/main/asciidoc/record-path-guide.adoc b/nifi-docs/src/main/asciidoc/record-path-guide.adoc
index f011c52..8de98ee 100644
--- a/nifi-docs/src/main/asciidoc/record-path-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/record-path-guide.adoc
@@ -502,6 +502,63 @@ The following record path would parse the eventDate field into a Date:
`toDate( /eventDate, "yyyy-MM-dd'T'HH:mm:ss'Z'")`
+=== toString
+
+Converts a value to a String, using the given character set if the input type is "bytes". For example,
+given a schema such as:
+
+----
+{
+ "type": "record",
+ "name": "events",
+ "fields": [
+ { "name": "name", "type": "string" },
+ { "name": "bytes", "type" : "bytes"}
+ ]
+}
+----
+
+and a record such as:
+
+----
+{
+ "name" : "My Event",
+ "bytes" : "Hello World!"
+}
+----
+
+The following record path would parse the bytes field into a String:
+
+`toString( /bytes, "UTF-8")`
+
+=== toBytes
+
+Converts a String to byte[], using the given character set. For example, given a schema such as:
+
+----
+{
+ "type": "record",
+ "name": "events",
+ "fields": [
+ { "name": "name", "type": "string" },
+ { "name": "s", "type" : "string"}
+ ]
+}
+----
+
+and a record such as:
+
+----
+{
+ "name" : "My Event",
+ "s" : "Hello World!"
+}
+----
+
+The following record path would convert the String field into a byte array using UTF-16 encoding:
+
+`toBytes( /s, "UTF-16")`
+
=== format
Converts a Date to a String in the given format.
http://git-wip-us.apache.org/repos/asf/nifi/blob/b29304df/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
index a01e03d..411ca68 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
@@ -20,6 +20,8 @@ package org.apache.nifi.avro;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.Duration;
@@ -460,6 +462,10 @@ public class AvroTypeUtil {
}
public static GenericRecord createAvroRecord(final Record record, final Schema avroSchema) throws IOException {
+ return createAvroRecord(record, avroSchema, StandardCharsets.UTF_8);
+ }
+
+ public static GenericRecord createAvroRecord(final Record record, final Schema avroSchema, final Charset charset) throws IOException {
final GenericRecord rec = new GenericData.Record(avroSchema);
final RecordSchema recordSchema = record.getSchema();
@@ -473,7 +479,7 @@ public class AvroTypeUtil {
continue;
}
- final Object converted = convertToAvroObject(rawValue, field.schema(), fieldName);
+ final Object converted = convertToAvroObject(rawValue, field.schema(), fieldName, charset);
rec.put(fieldName, converted);
}
@@ -490,11 +496,19 @@ public class AvroTypeUtil {
}
/**
- * Convert a raw value to an Avro object to serialize in Avro type system.
+ * Convert a raw value to an Avro object to serialize in Avro type system, using the provided character set when necessary.
* The counter-part method which reads an Avro object back to a raw value is {@link #normalizeValue(Object, Schema, String)}.
*/
public static Object convertToAvroObject(final Object rawValue, final Schema fieldSchema) {
- return convertToAvroObject(rawValue, fieldSchema, fieldSchema.getName());
+ return convertToAvroObject(rawValue, fieldSchema, StandardCharsets.UTF_8);
+ }
+
+ /**
+ * Convert a raw value to an Avro object to serialize in Avro type system, using the provided character set when necessary.
+ * The counter-part method which reads an Avro object back to a raw value is {@link #normalizeValue(Object, Schema, String)}.
+ */
+ public static Object convertToAvroObject(final Object rawValue, final Schema fieldSchema, final Charset charset) {
+ return convertToAvroObject(rawValue, fieldSchema, fieldSchema.getName(), charset);
}
/**
@@ -512,7 +526,7 @@ public class AvroTypeUtil {
recordFields.add(new RecordField(fieldName, dataType, field.aliases(), nullable));
} else {
Object defaultValue = field.defaultVal();
- if (fieldSchema.getType() == Schema.Type.ARRAY && !DataTypeUtils.isArrayTypeCompatible(defaultValue)) {
+ if (fieldSchema.getType() == Schema.Type.ARRAY && !DataTypeUtils.isArrayTypeCompatible(defaultValue, ((ArrayDataType) dataType).getElementType())) {
defaultValue = defaultValue instanceof List ? ((List<?>) defaultValue).toArray() : new Object[0];
}
recordFields.add(new RecordField(fieldName, dataType, defaultValue, field.aliases(), nullable));
@@ -526,7 +540,7 @@ public class AvroTypeUtil {
}
@SuppressWarnings("unchecked")
- private static Object convertToAvroObject(final Object rawValue, final Schema fieldSchema, final String fieldName) {
+ private static Object convertToAvroObject(final Object rawValue, final Schema fieldSchema, final String fieldName, final Charset charset) {
if (rawValue == null) {
return null;
}
@@ -609,6 +623,9 @@ public class AvroTypeUtil {
if (rawValue instanceof byte[]) {
return ByteBuffer.wrap((byte[]) rawValue);
}
+ if (rawValue instanceof String) {
+ return ByteBuffer.wrap(((String) rawValue).getBytes(charset));
+ }
if (rawValue instanceof Object[]) {
return AvroTypeUtil.convertByteArray((Object[]) rawValue);
} else {
@@ -630,7 +647,7 @@ public class AvroTypeUtil {
final Map<String, Object> objectMap = (Map<String, Object>) rawValue;
final Map<String, Object> map = new HashMap<>(objectMap.size());
for (final String s : objectMap.keySet()) {
- final Object converted = convertToAvroObject(objectMap.get(s), fieldSchema.getValueType(), fieldName + "[" + s + "]");
+ final Object converted = convertToAvroObject(objectMap.get(s), fieldSchema.getValueType(), fieldName + "[" + s + "]", charset);
map.put(s, converted);
}
return map;
@@ -650,18 +667,18 @@ public class AvroTypeUtil {
continue;
}
- final Object converted = convertToAvroObject(recordFieldValue, field.schema(), fieldName + "/" + recordFieldName);
+ final Object converted = convertToAvroObject(recordFieldValue, field.schema(), fieldName + "/" + recordFieldName, charset);
avroRecord.put(recordFieldName, converted);
}
return avroRecord;
case UNION:
- return convertUnionFieldValue(rawValue, fieldSchema, schema -> convertToAvroObject(rawValue, schema, fieldName), fieldName);
+ return convertUnionFieldValue(rawValue, fieldSchema, schema -> convertToAvroObject(rawValue, schema, fieldName, charset), fieldName);
case ARRAY:
final Object[] objectArray = (Object[]) rawValue;
final List<Object> list = new ArrayList<>(objectArray.length);
int i = 0;
for (final Object o : objectArray) {
- final Object converted = convertToAvroObject(o, fieldSchema.getElementType(), fieldName + "[" + i + "]");
+ final Object converted = convertToAvroObject(o, fieldSchema.getElementType(), fieldName + "[" + i + "]", charset);
list.add(converted);
i++;
}
@@ -677,13 +694,17 @@ public class AvroTypeUtil {
case ENUM:
return new GenericData.EnumSymbol(fieldSchema, rawValue);
case STRING:
- return DataTypeUtils.toString(rawValue, (String) null);
+ return DataTypeUtils.toString(rawValue, (String) null, charset);
}
return rawValue;
}
public static Map<String, Object> convertAvroRecordToMap(final GenericRecord avroRecord, final RecordSchema recordSchema) {
+ return convertAvroRecordToMap(avroRecord, recordSchema, StandardCharsets.UTF_8);
+ }
+
+ public static Map<String, Object> convertAvroRecordToMap(final GenericRecord avroRecord, final RecordSchema recordSchema, final Charset charset) {
final Map<String, Object> values = new HashMap<>(recordSchema.getFieldCount());
for (final RecordField recordField : recordSchema.getFields()) {
@@ -710,7 +731,7 @@ public class AvroTypeUtil {
final Object rawValue = normalizeValue(value, fieldSchema, fieldName);
final DataType desiredType = recordField.getDataType();
- final Object coercedValue = DataTypeUtils.convertType(rawValue, desiredType, fieldName);
+ final Object coercedValue = DataTypeUtils.convertType(rawValue, desiredType, fieldName, charset);
values.put(fieldName, coercedValue);
} catch (Exception ex) {
@@ -779,7 +800,7 @@ public class AvroTypeUtil {
}
break;
case ARRAY:
- if (value instanceof Array || value instanceof List) {
+ if (value instanceof Array || value instanceof List || value instanceof ByteBuffer) {
return true;
}
break;
@@ -795,7 +816,7 @@ public class AvroTypeUtil {
/**
* Convert an Avro object to a normal Java objects for further processing.
- * The counter-part method which convert a raw value to an Avro object is {@link #convertToAvroObject(Object, Schema, String)}
+ * The counter-part method which convert a raw value to an Avro object is {@link #convertToAvroObject(Object, Schema, String, Charset)}
*/
private static Object normalizeValue(final Object value, final Schema avroSchema, final String fieldName) {
if (value == null) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/b29304df/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
index bf7d255..e4e515b 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
@@ -25,6 +25,8 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -325,7 +327,7 @@ public class TestAvroTypeUtil {
try (DataFileStream<GenericRecord> r = new DataFileStream<>(getClass().getResourceAsStream("data.avro"),
new GenericDatumReader<>())) {
GenericRecord n = r.next();
- AvroTypeUtil.convertAvroRecordToMap(n, recordASchema);
+ AvroTypeUtil.convertAvroRecordToMap(n, recordASchema, StandardCharsets.UTF_8);
}
}
@@ -372,7 +374,7 @@ public class TestAvroTypeUtil {
expects.forEach((rawValue, expect) -> {
final Object convertedValue;
try {
- convertedValue = AvroTypeUtil.convertToAvroObject(rawValue, fieldSchema);
+ convertedValue = AvroTypeUtil.convertToAvroObject(rawValue, fieldSchema, StandardCharsets.UTF_8);
} catch (Exception e) {
if (expect.equals(e.getClass().getCanonicalName())) {
// Expected behavior.
@@ -394,4 +396,25 @@ public class TestAvroTypeUtil {
}
+ @Test
+ public void testStringToBytesConversion() {
+ Object o = AvroTypeUtil.convertToAvroObject("Hello", Schema.create(Type.BYTES), StandardCharsets.UTF_16);
+ assertTrue(o instanceof ByteBuffer);
+ assertEquals("Hello", new String(((ByteBuffer) o).array(), StandardCharsets.UTF_16));
+ }
+
+ @Test
+ public void testStringToNullableBytesConversion() {
+ Object o = AvroTypeUtil.convertToAvroObject("Hello", Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.BYTES)), StandardCharsets.UTF_16);
+ assertTrue(o instanceof ByteBuffer);
+ assertEquals("Hello", new String(((ByteBuffer) o).array(), StandardCharsets.UTF_16));
+ }
+
+ @Test
+ public void testBytesToStringConversion() {
+ final Charset charset = Charset.forName("UTF_32LE");
+ Object o = AvroTypeUtil.convertToAvroObject("Hello".getBytes(charset), Schema.create(Type.STRING), charset);
+ assertTrue(o instanceof String);
+ assertEquals("Hello", o);
+ }
}