You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/04/11 23:33:36 UTC
[13/19] nifi git commit: NIFI-1280 added support for RecordSchema in
SchemaRegistry
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
new file mode 100644
index 0000000..2102813
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.avro;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericData.Array;
+import org.apache.avro.generic.GenericData.StringType;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.junit.Test;
+
+public class TestWriteAvroResult {
+
+ @Test
+ public void testLogicalTypes() throws IOException, ParseException {
+ final Schema schema = new Schema.Parser().parse(new File("src/test/resources/avro/logical-types.avsc"));
+ final WriteAvroResult writer = new WriteAvroResult(schema);
+
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("timeMillis", RecordFieldType.TIME.getDataType()));
+ fields.add(new RecordField("timeMicros", RecordFieldType.TIME.getDataType()));
+ fields.add(new RecordField("timestampMillis", RecordFieldType.TIMESTAMP.getDataType()));
+ fields.add(new RecordField("timestampMicros", RecordFieldType.TIMESTAMP.getDataType()));
+ fields.add(new RecordField("date", RecordFieldType.DATE.getDataType()));
+ final RecordSchema recordSchema = new SimpleRecordSchema(fields);
+
+ final String expectedTime = "2017-04-04 14:20:33.000";
+ final DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+ df.setTimeZone(TimeZone.getTimeZone("gmt"));
+ final long timeLong = df.parse(expectedTime).getTime();
+
+ final Map<String, Object> values = new HashMap<>();
+ values.put("timeMillis", new Time(timeLong));
+ values.put("timeMicros", new Time(timeLong));
+ values.put("timestampMillis", new Timestamp(timeLong));
+ values.put("timestampMicros", new Timestamp(timeLong));
+ values.put("date", new Date(timeLong));
+ final Record record = new MapRecord(recordSchema, values);
+
+ final byte[] data;
+ try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+ writer.write(RecordSet.of(record.getSchema(), record), baos);
+ data = baos.toByteArray();
+ }
+
+ try (final InputStream in = new ByteArrayInputStream(data)) {
+ final DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>());
+ final Schema avroSchema = dataFileStream.getSchema();
+ GenericData.setStringType(avroSchema, StringType.String);
+
+ final GenericRecord avroRecord = dataFileStream.next();
+ final long secondsSinceMidnight = 33 + (20 * 60) + (14 * 60 * 60);
+ final long millisSinceMidnight = secondsSinceMidnight * 1000L;
+
+ assertEquals((int) millisSinceMidnight, avroRecord.get("timeMillis"));
+ assertEquals(millisSinceMidnight * 1000L, avroRecord.get("timeMicros"));
+ assertEquals(timeLong, avroRecord.get("timestampMillis"));
+ assertEquals(timeLong * 1000L, avroRecord.get("timestampMicros"));
+ assertEquals(17260, avroRecord.get("date"));
+ }
+ }
+
+
+ @Test
+ public void testDataTypes() throws IOException {
+ // TODO: Test Enums
+ final Schema schema = new Schema.Parser().parse(new File("src/test/resources/avro/datatypes.avsc"));
+ final WriteAvroResult writer = new WriteAvroResult(schema);
+
+ final List<RecordField> subRecordFields = Collections.singletonList(new RecordField("field1", RecordFieldType.STRING.getDataType()));
+ final RecordSchema subRecordSchema = new SimpleRecordSchema(subRecordFields);
+
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("string", RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("int", RecordFieldType.INT.getDataType()));
+ fields.add(new RecordField("long", RecordFieldType.LONG.getDataType()));
+ fields.add(new RecordField("double", RecordFieldType.DOUBLE.getDataType()));
+ fields.add(new RecordField("float", RecordFieldType.FLOAT.getDataType()));
+ fields.add(new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType()));
+ fields.add(new RecordField("bytes", RecordFieldType.ARRAY.getChoiceDataType(Collections.singletonList(RecordFieldType.BYTE.getDataType()))));
+ fields.add(new RecordField("nullOrLong", RecordFieldType.LONG.getDataType()));
+ fields.add(new RecordField("array", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.INT.getDataType())));
+ fields.add(new RecordField("record", RecordFieldType.RECORD.getRecordDataType(subRecordSchema)));
+ final RecordSchema recordSchema = new SimpleRecordSchema(fields);
+
+ final Record innerRecord = new MapRecord(subRecordSchema, Collections.singletonMap("field1", "hello"));
+
+ final Map<String, Object> values = new HashMap<>();
+ values.put("string", "hello");
+ values.put("int", 8);
+ values.put("long", 42L);
+ values.put("double", 3.14159D);
+ values.put("float", 1.23456F);
+ values.put("boolean", true);
+ values.put("bytes", AvroTypeUtil.convertByteArray("hello".getBytes()));
+ values.put("nullOrLong", null);
+ values.put("array", new Integer[] {1, 2, 3});
+ values.put("record", innerRecord);
+
+ final Record record = new MapRecord(recordSchema, values);
+
+ final byte[] data;
+ try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+ writer.write(RecordSet.of(record.getSchema(), record), baos);
+ data = baos.toByteArray();
+ }
+
+ try (final InputStream in = new ByteArrayInputStream(data)) {
+ final DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>());
+ final Schema avroSchema = dataFileStream.getSchema();
+ GenericData.setStringType(avroSchema, StringType.String);
+
+ final GenericRecord avroRecord = dataFileStream.next();
+ assertMatch(record, avroRecord);
+ }
+ }
+
+ private void assertMatch(final Record record, final GenericRecord avroRecord) {
+ for (final String fieldName : record.getSchema().getFieldNames()) {
+ Object avroValue = avroRecord.get(fieldName);
+ final Object recordValue = record.getValue(fieldName);
+
+ if (recordValue instanceof String) {
+ assertNotNull(fieldName + " should not have been null", avroValue);
+ avroValue = avroValue.toString();
+ }
+
+ if (recordValue instanceof Object[] && avroValue instanceof ByteBuffer) {
+ final ByteBuffer bb = (ByteBuffer) avroValue;
+ final Object[] objectArray = (Object[]) recordValue;
+ assertEquals("For field " + fieldName + ", byte buffer remaining should have been " + objectArray.length + " but was " + bb.remaining(),
+ objectArray.length, bb.remaining());
+
+ for (int i = 0; i < objectArray.length; i++) {
+ assertEquals(objectArray[i], bb.get());
+ }
+ } else if (recordValue instanceof Object[]) {
+ assertTrue(fieldName + " should have been instanceof Array", avroValue instanceof Array);
+ final Array<?> avroArray = (Array<?>) avroValue;
+ final Object[] recordArray = (Object[]) recordValue;
+ assertEquals(fieldName + " not equal", recordArray.length, avroArray.size());
+ for (int i = 0; i < recordArray.length; i++) {
+ assertEquals(fieldName + "[" + i + "] not equal", recordArray[i], avroArray.get(i));
+ }
+ } else if (recordValue instanceof byte[]) {
+ final ByteBuffer bb = ByteBuffer.wrap((byte[]) recordValue);
+ assertEquals(fieldName + " not equal", bb, avroValue);
+ } else if (recordValue instanceof Record) {
+ assertMatch((Record) recordValue, (GenericRecord) avroValue);
+ } else {
+ assertEquals(fieldName + " not equal", recordValue, avroValue);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java
index 1e53d89..cb790f1 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java
@@ -20,18 +20,24 @@ package org.apache.nifi.csv;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
+import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.util.Arrays;
-import java.util.HashMap;
+import java.sql.Date;
+import java.util.ArrayList;
+import java.util.Calendar;
import java.util.List;
-import java.util.Map;
+import java.util.TimeZone;
+import org.apache.commons.csv.CSVFormat;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.junit.Assert;
@@ -39,21 +45,50 @@ import org.junit.Test;
import org.mockito.Mockito;
public class TestCSVRecordReader {
- private final DataType stringDataType = RecordFieldType.STRING.getDataType();
private final DataType doubleDataType = RecordFieldType.DOUBLE.getDataType();
- private final DataType timeDataType = RecordFieldType.TIME.getDataType();
+ private final CSVFormat format = CSVFormat.DEFAULT.withFirstRecordAsHeader().withTrim().withQuote('"');
+
+ private List<RecordField> getDefaultFields() {
+ final List<RecordField> fields = new ArrayList<>();
+ for (final String fieldName : new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"}) {
+ fields.add(new RecordField(fieldName, RecordFieldType.STRING.getDataType()));
+ }
+ return fields;
+ }
+
+ @Test
+ public void testDate() throws IOException, MalformedRecordException {
+ final String text = "date\n11/30/1983";
+
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("date", RecordFieldType.DATE.getDataType()));
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+
+ try (final InputStream bais = new ByteArrayInputStream(text.getBytes());
+ final CSVRecordReader reader = new CSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, format,
+ "MM/dd/yyyy", RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat())) {
+
+ final Record record = reader.nextRecord();
+ final java.sql.Date date = (Date) record.getValue("date");
+ final Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("gmt"));
+ calendar.setTimeInMillis(date.getTime());
+
+ assertEquals(1983, calendar.get(Calendar.YEAR));
+ assertEquals(10, calendar.get(Calendar.MONTH));
+ assertEquals(30, calendar.get(Calendar.DAY_OF_MONTH));
+ }
+ }
@Test
public void testSimpleParse() throws IOException, MalformedRecordException {
- final Map<String, DataType> overrides = new HashMap<>();
- overrides.put("balance", doubleDataType);
- overrides.put("other", timeDataType);
+ final List<RecordField> fields = getDefaultFields();
+ fields.replaceAll(f -> f.getFieldName().equals("balance") ? new RecordField("balance", doubleDataType) : f);
- try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/single-bank-account.csv"))) {
- final CSVRecordReader reader = new CSVRecordReader(fis, null, overrides);
+ final RecordSchema schema = new SimpleRecordSchema(fields);
- final RecordSchema schema = reader.getSchema();
- verifyFields(schema);
+ try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/single-bank-account.csv"))) {
+ final CSVRecordReader reader = new CSVRecordReader(fis, Mockito.mock(ComponentLog.class), schema, format,
+ RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat());
final Object[] record = reader.nextRecord().getValues();
final Object[] expectedValues = new Object[] {"1", "John Doe", 4750.89D, "123 My Street", "My City", "MS", "11111", "USA"};
@@ -65,14 +100,14 @@ public class TestCSVRecordReader {
@Test
public void testMultipleRecords() throws IOException, MalformedRecordException {
- final Map<String, DataType> overrides = new HashMap<>();
- overrides.put("balance", doubleDataType);
+ final List<RecordField> fields = getDefaultFields();
+ fields.replaceAll(f -> f.getFieldName().equals("balance") ? new RecordField("balance", doubleDataType) : f);
- try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/multi-bank-account.csv"))) {
- final CSVRecordReader reader = new CSVRecordReader(fis, null, overrides);
+ final RecordSchema schema = new SimpleRecordSchema(fields);
- final RecordSchema schema = reader.getSchema();
- verifyFields(schema);
+ try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/multi-bank-account.csv"))) {
+ final CSVRecordReader reader = new CSVRecordReader(fis, Mockito.mock(ComponentLog.class), schema, format,
+ RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat());
final Object[] firstRecord = reader.nextRecord().getValues();
final Object[] firstExpectedValues = new Object[] {"1", "John Doe", 4750.89D, "123 My Street", "My City", "MS", "11111", "USA"};
@@ -88,14 +123,14 @@ public class TestCSVRecordReader {
@Test
public void testExtraWhiteSpace() throws IOException, MalformedRecordException {
- final Map<String, DataType> overrides = new HashMap<>();
- overrides.put("balance", doubleDataType);
+ final List<RecordField> fields = getDefaultFields();
+ fields.replaceAll(f -> f.getFieldName().equals("balance") ? new RecordField("balance", doubleDataType) : f);
- try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/extra-white-space.csv"))) {
- final CSVRecordReader reader = new CSVRecordReader(fis, Mockito.mock(ComponentLog.class), overrides);
+ final RecordSchema schema = new SimpleRecordSchema(fields);
- final RecordSchema schema = reader.getSchema();
- verifyFields(schema);
+ try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/extra-white-space.csv"))) {
+ final CSVRecordReader reader = new CSVRecordReader(fis, Mockito.mock(ComponentLog.class), schema, format,
+ RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat());
final Object[] firstRecord = reader.nextRecord().getValues();
final Object[] firstExpectedValues = new Object[] {"1", "John Doe", 4750.89D, "123 My Street", "My City", "MS", "11111", "USA"};
@@ -108,15 +143,4 @@ public class TestCSVRecordReader {
assertNull(reader.nextRecord());
}
}
-
- private void verifyFields(final RecordSchema schema) {
- final List<String> fieldNames = schema.getFieldNames();
- final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country");
- assertEquals(expectedFieldNames, fieldNames);
-
- final List<DataType> dataTypes = schema.getDataTypes();
- final List<DataType> expectedDataTypes = Arrays.asList(stringDataType, stringDataType, doubleDataType,
- stringDataType, stringDataType, stringDataType, stringDataType, stringDataType);
- assertEquals(expectedDataTypes, dataTypes);
- }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java
index 04f8479..1e8997b 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java
@@ -26,12 +26,16 @@ import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
+import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.TimeZone;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.QuoteMode;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.MapRecord;
@@ -47,7 +51,8 @@ public class TestWriteCSVResult {
@Test
public void testDataTypes() throws IOException {
- final WriteCSVResult result = new WriteCSVResult(RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat());
+ final CSVFormat csvFormat = CSVFormat.DEFAULT.withQuoteMode(QuoteMode.ALL).withRecordSeparator("\n");
+ final WriteCSVResult result = new WriteCSVResult(csvFormat, RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat());
final StringBuilder headerBuilder = new StringBuilder();
final List<RecordField> fields = new ArrayList<>();
@@ -57,7 +62,7 @@ public class TestWriteCSVResult {
possibleTypes.add(RecordFieldType.INT.getDataType());
possibleTypes.add(RecordFieldType.LONG.getDataType());
- fields.add(new RecordField(fieldType.name().toLowerCase(), fieldType.getDataType(possibleTypes)));
+ fields.add(new RecordField(fieldType.name().toLowerCase(), fieldType.getChoiceDataType(possibleTypes)));
} else {
fields.add(new RecordField(fieldType.name().toLowerCase(), fieldType.getDataType()));
}
@@ -81,7 +86,7 @@ public class TestWriteCSVResult {
valueMap.put("date", new Date(now));
valueMap.put("time", new Time(now));
valueMap.put("timestamp", new Timestamp(now));
- valueMap.put("object", null);
+ valueMap.put("record", null);
valueMap.put("choice", 48L);
valueMap.put("array", null);
@@ -105,9 +110,9 @@ public class TestWriteCSVResult {
final StringBuilder expectedBuilder = new StringBuilder();
expectedBuilder.append("\"string\",\"true\",\"1\",\"c\",\"8\",\"9\",\"8\",\"8\",\"8.0\",\"8.0\",");
- final String dateValue = new SimpleDateFormat(RecordFieldType.DATE.getDefaultFormat()).format(now);
- final String timeValue = new SimpleDateFormat(RecordFieldType.TIME.getDefaultFormat()).format(now);
- final String timestampValue = new SimpleDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat()).format(now);
+ final String dateValue = getDateFormat(RecordFieldType.DATE.getDefaultFormat()).format(now);
+ final String timeValue = getDateFormat(RecordFieldType.TIME.getDefaultFormat()).format(now);
+ final String timestampValue = getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat()).format(now);
expectedBuilder.append('"').append(dateValue).append('"').append(',');
expectedBuilder.append('"').append(timeValue).append('"').append(',');
@@ -118,4 +123,10 @@ public class TestWriteCSVResult {
assertEquals(expectedValues, values);
}
+ private DateFormat getDateFormat(final String format) {
+ final DateFormat df = new SimpleDateFormat(format);
+ df.setTimeZone(TimeZone.getTimeZone("gmt"));
+ return df;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java
index 3757ab1..a741ad1 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java
@@ -28,7 +28,6 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
-import java.util.Collections;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.record.Record;
@@ -46,7 +45,7 @@ public class TestGrokRecordReader {
grok.addPatternFromFile("src/main/resources/default-grok-patterns.txt");
grok.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}");
- final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, Collections.emptyMap());
+ final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, null);
final String[] logLevels = new String[] {"INFO", "WARN", "ERROR", "FATAL", "FINE"};
final String[] messages = new String[] {"Test Message 1", "Red", "Green", "Blue", "Yellow"};
@@ -76,7 +75,7 @@ public class TestGrokRecordReader {
final String msg = "2016-08-04 13:26:32,473 INFO [Leader Election Notification Thread-1] o.a.n.LoggerClass \n"
+ "org.apache.nifi.exception.UnitTestException: Testing to ensure we are able to capture stack traces";
final InputStream bais = new ByteArrayInputStream(msg.getBytes(StandardCharsets.UTF_8));
- final GrokRecordReader deserializer = new GrokRecordReader(bais, grok, Collections.emptyMap());
+ final GrokRecordReader deserializer = new GrokRecordReader(bais, grok, null);
final Object[] values = deserializer.nextRecord().getValues();
@@ -99,7 +98,7 @@ public class TestGrokRecordReader {
grok.addPatternFromFile("src/main/resources/default-grok-patterns.txt");
grok.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} \\[%{DATA:thread}\\] %{DATA:class} %{GREEDYDATA:message}");
- final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, Collections.emptyMap());
+ final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, null);
final String[] logLevels = new String[] {"INFO", "INFO", "INFO", "WARN", "WARN"};
@@ -123,7 +122,7 @@ public class TestGrokRecordReader {
grok.addPatternFromFile("src/main/resources/default-grok-patterns.txt");
grok.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} \\[%{DATA:thread}\\] %{DATA:class} %{GREEDYDATA:message}?");
- final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, Collections.emptyMap());
+ final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, null);
final String[] logLevels = new String[] {"INFO", "INFO", "ERROR", "WARN", "WARN"};
@@ -155,7 +154,7 @@ public class TestGrokRecordReader {
grok.addPatternFromFile("src/main/resources/default-grok-patterns.txt");
grok.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}");
- final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, Collections.emptyMap());
+ final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, null);
final String[] logLevels = new String[] {"INFO", "ERROR", "INFO"};
final String[] messages = new String[] {"message without stack trace",
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonPathRowRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonPathRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonPathRowRecordReader.java
index fa41396..11e2828 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonPathRowRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonPathRowRecordReader.java
@@ -26,17 +26,18 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
-import java.util.Map;
import java.util.stream.Collectors;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.junit.Assert;
@@ -47,6 +48,10 @@ import org.mockito.Mockito;
import com.jayway.jsonpath.JsonPath;
public class TestJsonPathRowRecordReader {
+ private final String dateFormat = RecordFieldType.DATE.getDefaultFormat();
+ private final String timeFormat = RecordFieldType.TIME.getDefaultFormat();
+ private final String timestampFormat = RecordFieldType.TIMESTAMP.getDefaultFormat();
+
private final LinkedHashMap<String, JsonPath> allJsonPaths = new LinkedHashMap<>();
@Before
@@ -63,12 +68,36 @@ public class TestJsonPathRowRecordReader {
allJsonPaths.put("country", JsonPath.compile("$.country"));
}
+
+ private List<RecordField> getDefaultFields() {
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
+ fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("balance", RecordFieldType.DOUBLE.getDataType()));
+ fields.add(new RecordField("address", RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("city", RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("state", RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("zipCode", RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("country", RecordFieldType.STRING.getDataType()));
+ return fields;
+ }
+
+ private RecordSchema getAccountSchema() {
+ final List<RecordField> accountFields = new ArrayList<>();
+ accountFields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
+ accountFields.add(new RecordField("balance", RecordFieldType.DOUBLE.getDataType()));
+
+ final RecordSchema accountSchema = new SimpleRecordSchema(accountFields);
+ return accountSchema;
+ }
+
+
@Test
public void testReadArray() throws IOException, MalformedRecordException {
- try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array.json"));
- final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(allJsonPaths, Collections.emptyMap(), in, Mockito.mock(ComponentLog.class))) {
+ final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
- final RecordSchema schema = reader.getSchema();
+ try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array.json"));
+ final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(allJsonPaths, schema, in, Mockito.mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat)) {
final List<String> fieldNames = schema.getFieldNames();
final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
@@ -91,10 +120,10 @@ public class TestJsonPathRowRecordReader {
@Test
public void testSingleJsonElement() throws IOException, MalformedRecordException {
- try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-bank-account.json"));
- final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(allJsonPaths, Collections.emptyMap(), in, Mockito.mock(ComponentLog.class))) {
+ final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
- final RecordSchema schema = reader.getSchema();
+ try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-bank-account.json"));
+ final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(allJsonPaths, schema, in, Mockito.mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat)) {
final List<String> fieldNames = schema.getFieldNames();
final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
@@ -119,17 +148,20 @@ public class TestJsonPathRowRecordReader {
final LinkedHashMap<String, JsonPath> jsonPaths = new LinkedHashMap<>(allJsonPaths);
jsonPaths.put("account", JsonPath.compile("$.account"));
- try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-element-nested.json"));
- final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(jsonPaths, Collections.emptyMap(), in, Mockito.mock(ComponentLog.class))) {
+ final DataType accountType = RecordFieldType.RECORD.getRecordDataType(getAccountSchema());
+ final List<RecordField> fields = getDefaultFields();
+ fields.add(new RecordField("account", accountType));
+ final RecordSchema schema = new SimpleRecordSchema(fields);
- final RecordSchema schema = reader.getSchema();
+ try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-element-nested.json"));
+ final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(jsonPaths, schema, in, Mockito.mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat)) {
final List<String> fieldNames = schema.getFieldNames();
final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country", "account"});
assertEquals(expectedFieldNames, fieldNames);
final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
- final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING, RecordFieldType.STRING,
+ final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING, RecordFieldType.DOUBLE,
RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.RECORD});
assertEquals(expectedTypes, dataTypes);
@@ -152,10 +184,15 @@ public class TestJsonPathRowRecordReader {
final LinkedHashMap<String, JsonPath> jsonPaths = new LinkedHashMap<>(allJsonPaths);
jsonPaths.put("accounts", JsonPath.compile("$.accounts"));
- try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-element-nested-array.json"));
- final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(jsonPaths, Collections.emptyMap(), in, Mockito.mock(ComponentLog.class))) {
+ final DataType accountRecordType = RecordFieldType.RECORD.getRecordDataType(getAccountSchema());
+ final DataType accountsType = RecordFieldType.ARRAY.getArrayDataType(accountRecordType);
+ final List<RecordField> fields = getDefaultFields();
+ fields.add(new RecordField("accounts", accountsType));
+ final RecordSchema schema = new SimpleRecordSchema(fields);
- final RecordSchema schema = reader.getSchema();
+
+ try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-element-nested-array.json"));
+ final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(jsonPaths, schema, in, Mockito.mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat)) {
final List<String> fieldNames = schema.getFieldNames();
final List<String> expectedFieldNames = Arrays.asList(new String[] {
@@ -163,7 +200,7 @@ public class TestJsonPathRowRecordReader {
assertEquals(expectedFieldNames, fieldNames);
final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
- final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING, RecordFieldType.STRING,
+ final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING, RecordFieldType.DOUBLE,
RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.ARRAY});
assertEquals(expectedTypes, dataTypes);
@@ -177,17 +214,17 @@ public class TestJsonPathRowRecordReader {
final Object[] array = (Object[]) lastRecord;
assertEquals(2, array.length);
final Object firstElement = array[0];
- assertTrue(firstElement instanceof Map);
+ assertTrue(firstElement instanceof Record);
- final Map<?, ?> firstMap = (Map<?, ?>) firstElement;
- assertEquals(42, firstMap.get("id"));
- assertEquals(4750.89D, firstMap.get("balance"));
+ final Record firstRecord = (Record) firstElement;
+ assertEquals(42, firstRecord.getValue("id"));
+ assertEquals(4750.89D, firstRecord.getValue("balance"));
final Object secondElement = array[1];
- assertTrue(secondElement instanceof Map);
- final Map<?, ?> secondMap = (Map<?, ?>) secondElement;
- assertEquals(43, secondMap.get("id"));
- assertEquals(48212.38D, secondMap.get("balance"));
+ assertTrue(secondElement instanceof Record);
+ final Record secondRecord = (Record) secondElement;
+ assertEquals(43, secondRecord.getValue("id"));
+ assertEquals(48212.38D, secondRecord.getValue("balance"));
assertNull(reader.nextRecord());
}
@@ -195,10 +232,10 @@ public class TestJsonPathRowRecordReader {
@Test
public void testReadArrayDifferentSchemas() throws IOException, MalformedRecordException {
- try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array-different-schemas.json"));
- final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(allJsonPaths, Collections.emptyMap(), in, Mockito.mock(ComponentLog.class))) {
+ final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
- final RecordSchema schema = reader.getSchema();
+ try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array-different-schemas.json"));
+ final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(allJsonPaths, schema, in, Mockito.mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat)) {
final List<String> fieldNames = schema.getFieldNames();
final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
@@ -226,11 +263,14 @@ public class TestJsonPathRowRecordReader {
public void testReadArrayDifferentSchemasWithOverride() throws IOException, MalformedRecordException {
final LinkedHashMap<String, JsonPath> jsonPaths = new LinkedHashMap<>(allJsonPaths);
jsonPaths.put("address2", JsonPath.compile("$.address2"));
- final Map<String, DataType> typeOverrides = Collections.singletonMap("address2", RecordFieldType.STRING.getDataType());
+
+ final List<RecordField> fields = getDefaultFields();
+ fields.add(new RecordField("address2", RecordFieldType.STRING.getDataType()));
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+
try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array-different-schemas.json"));
- final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(jsonPaths, typeOverrides, in, Mockito.mock(ComponentLog.class))) {
- final RecordSchema schema = reader.getSchema();
+ final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(jsonPaths, schema, in, Mockito.mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat)) {
final List<String> fieldNames = schema.getFieldNames();
final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country", "address2"});
@@ -259,10 +299,13 @@ public class TestJsonPathRowRecordReader {
final LinkedHashMap<String, JsonPath> jsonPaths = new LinkedHashMap<>(allJsonPaths);
jsonPaths.put("accountIds", JsonPath.compile("$.accountIds"));
- try (final InputStream in = new FileInputStream(new File("src/test/resources/json/primitive-type-array.json"));
- final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(jsonPaths, Collections.emptyMap(), in, Mockito.mock(ComponentLog.class))) {
+ final List<RecordField> fields = getDefaultFields();
+ final DataType idsType = RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.INT.getDataType());
+ fields.add(new RecordField("accountIds", idsType));
+ final RecordSchema schema = new SimpleRecordSchema(fields);
- final RecordSchema schema = reader.getSchema();
+ try (final InputStream in = new FileInputStream(new File("src/test/resources/json/primitive-type-array.json"));
+ final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(jsonPaths, schema, in, Mockito.mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat)) {
final List<String> fieldNames = schema.getFieldNames();
final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country", "accountIds"});
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
index c5ee0e3..2422206 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
@@ -25,8 +25,8 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -34,8 +34,10 @@ import java.util.stream.Collectors;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.junit.Assert;
@@ -43,13 +45,38 @@ import org.junit.Test;
import org.mockito.Mockito;
public class TestJsonTreeRowRecordReader {
+ private final String dateFormat = RecordFieldType.DATE.getDefaultFormat();
+ private final String timeFormat = RecordFieldType.TIME.getDefaultFormat();
+ private final String timestampFormat = RecordFieldType.TIMESTAMP.getDefaultFormat();
+
+ private List<RecordField> getDefaultFields() {
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
+ fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("balance", RecordFieldType.DOUBLE.getDataType()));
+ fields.add(new RecordField("address", RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("city", RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("state", RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("zipCode", RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("country", RecordFieldType.STRING.getDataType()));
+ return fields;
+ }
+
+ private RecordSchema getAccountSchema() {
+ final List<RecordField> accountFields = new ArrayList<>();
+ accountFields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
+ accountFields.add(new RecordField("balance", RecordFieldType.DOUBLE.getDataType()));
+
+ final RecordSchema accountSchema = new SimpleRecordSchema(accountFields);
+ return accountSchema;
+ }
@Test
public void testReadArray() throws IOException, MalformedRecordException {
- try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array.json"));
- final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), Collections.emptyMap())) {
+ final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
- final RecordSchema schema = reader.getSchema();
+ try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array.json"));
+ final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
final List<String> fieldNames = schema.getFieldNames();
final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
@@ -72,10 +99,10 @@ public class TestJsonTreeRowRecordReader {
@Test
public void testSingleJsonElement() throws IOException, MalformedRecordException {
- try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-bank-account.json"));
- final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), Collections.emptyMap())) {
+ final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
- final RecordSchema schema = reader.getSchema();
+ try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-bank-account.json"));
+ final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
final List<String> fieldNames = schema.getFieldNames();
final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
@@ -95,14 +122,14 @@ public class TestJsonTreeRowRecordReader {
@Test
public void testElementWithNestedData() throws IOException, MalformedRecordException {
- try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-element-nested.json"));
- final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), Collections.emptyMap())) {
-
- final RecordSchema schema = reader.getSchema();
+ final DataType accountType = RecordFieldType.RECORD.getRecordDataType(getAccountSchema());
+ final List<RecordField> fields = getDefaultFields();
+ fields.add(new RecordField("account", accountType));
+ fields.remove(new RecordField("balance", RecordFieldType.DOUBLE.getDataType()));
+ final RecordSchema schema = new SimpleRecordSchema(fields);
- final List<String> fieldNames = schema.getFieldNames();
- final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "address", "city", "state", "zipCode", "country", "account"});
- assertEquals(expectedFieldNames, fieldNames);
+ try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-element-nested.json"));
+ final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING,
@@ -125,10 +152,16 @@ public class TestJsonTreeRowRecordReader {
@Test
public void testElementWithNestedArray() throws IOException, MalformedRecordException {
- try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-element-nested-array.json"));
- final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), Collections.emptyMap())) {
+ final DataType accountRecordType = RecordFieldType.RECORD.getRecordDataType(getAccountSchema());
+ final DataType accountsType = RecordFieldType.ARRAY.getArrayDataType(accountRecordType);
+
+ final List<RecordField> fields = getDefaultFields();
+ fields.add(new RecordField("accounts", accountsType));
+ fields.remove(new RecordField("balance", RecordFieldType.DOUBLE.getDataType()));
+ final RecordSchema schema = new SimpleRecordSchema(fields);
- final RecordSchema schema = reader.getSchema();
+ try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-element-nested-array.json"));
+ final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
final List<String> fieldNames = schema.getFieldNames();
final List<String> expectedFieldNames = Arrays.asList(new String[] {
@@ -153,10 +186,10 @@ public class TestJsonTreeRowRecordReader {
@Test
public void testReadArrayDifferentSchemas() throws IOException, MalformedRecordException {
- try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array-different-schemas.json"));
- final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), Collections.emptyMap())) {
+ final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
- final RecordSchema schema = reader.getSchema();
+ try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array-different-schemas.json"));
+ final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
final List<String> fieldNames = schema.getFieldNames();
final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
@@ -185,10 +218,12 @@ public class TestJsonTreeRowRecordReader {
final Map<String, DataType> overrides = new HashMap<>();
overrides.put("address2", RecordFieldType.STRING.getDataType());
- try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array-different-schemas.json"));
- final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), overrides)) {
+ final List<RecordField> fields = getDefaultFields();
+ fields.add(new RecordField("address2", RecordFieldType.STRING.getDataType()));
+ final RecordSchema schema = new SimpleRecordSchema(fields);
- final RecordSchema schema = reader.getSchema();
+ try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array-different-schemas.json"));
+ final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
final List<String> fieldNames = schema.getFieldNames();
final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country", "address2"});
@@ -214,13 +249,10 @@ public class TestJsonTreeRowRecordReader {
@Test
public void testReadArrayDifferentSchemasWithOptionalElementOverridden() throws IOException, MalformedRecordException {
- final Map<String, DataType> overrides = new HashMap<>();
- overrides.put("balance", RecordFieldType.DOUBLE.getDataType());
+ final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array-optional-balance.json"));
- final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), overrides)) {
-
- final RecordSchema schema = reader.getSchema();
+ final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
final List<String> fieldNames = schema.getFieldNames();
final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
@@ -247,8 +279,22 @@ public class TestJsonTreeRowRecordReader {
@Test
public void testReadUnicodeCharacters() throws IOException, MalformedRecordException {
+
+ final List<RecordField> fromFields = new ArrayList<>();
+ fromFields.add(new RecordField("id", RecordFieldType.LONG.getDataType()));
+ fromFields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
+ final RecordSchema fromSchema = new SimpleRecordSchema(fromFields);
+ final DataType fromType = RecordFieldType.RECORD.getRecordDataType(fromSchema);
+
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("created_at", RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("id", RecordFieldType.LONG.getDataType()));
+ fields.add(new RecordField("unicode", RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("from", fromType));
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+
try (final InputStream in = new FileInputStream(new File("src/test/resources/json/json-with-unicode.json"));
- final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), Collections.emptyMap())) {
+ final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
final Object[] firstRecordValues = reader.nextRecord().getValues();
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java
index f9849ba..6119d36 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java
@@ -27,12 +27,14 @@ import java.nio.file.Paths;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
+import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.TimeZone;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.serialization.SimpleRecordSchema;
@@ -60,14 +62,17 @@ public class TestWriteJsonResult {
possibleTypes.add(RecordFieldType.INT.getDataType());
possibleTypes.add(RecordFieldType.LONG.getDataType());
- fields.add(new RecordField(fieldType.name().toLowerCase(), fieldType.getDataType(possibleTypes)));
+ fields.add(new RecordField(fieldType.name().toLowerCase(), fieldType.getChoiceDataType(possibleTypes)));
} else {
fields.add(new RecordField(fieldType.name().toLowerCase(), fieldType.getDataType()));
}
}
final RecordSchema schema = new SimpleRecordSchema(fields);
- final long time = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS").parse("2017/01/01 17:00:00.000").getTime();
+ final DateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
+ df.setTimeZone(TimeZone.getTimeZone("gmt"));
+ final long time = df.parse("2017/01/01 17:00:00.000").getTime();
+
final Map<String, Object> valueMap = new LinkedHashMap<>();
valueMap.put("string", "string");
valueMap.put("boolean", true);
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/datatypes.avsc
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/datatypes.avsc b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/datatypes.avsc
new file mode 100644
index 0000000..cc7f60e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/datatypes.avsc
@@ -0,0 +1,47 @@
+{
+ "namespace": "nifi",
+ "name": "data_types",
+ "type": "record",
+ "fields": [
+ {
+ "name": "string",
+ "type": "string"
+ }, {
+ "name": "int",
+ "type": "int"
+ }, {
+ "name": "long",
+ "type": "long"
+ }, {
+ "name": "double",
+ "type": "double"
+ }, {
+ "name": "float",
+ "type": "float"
+ }, {
+ "name": "boolean",
+ "type": "boolean"
+ }, {
+ "name": "bytes",
+ "type": "bytes"
+ }, {
+ "name": "nullOrLong",
+ "type": [ "null", "long" ]
+ }, {
+ "name": "array",
+ "type" : {
+ "type": "array",
+ "items": "int"
+ }
+ }, {
+ "name": "record",
+ "type": {
+ "type": "record",
+ "name": "subRecord",
+ "fields": [
+ { "name": "field1", "type": "string" }
+ ]
+ }
+ }
+ ]
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/logical-types.avsc
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/logical-types.avsc b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/logical-types.avsc
new file mode 100644
index 0000000..d8315b2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/logical-types.avsc
@@ -0,0 +1,34 @@
+{
+ "namespace": "nifi",
+ "name": "data_types",
+ "type": "record",
+ "fields": [
+ {
+ "name" : "timeMillis",
+ "type": {
+ "type": "int",
+ "logicalType": "time-millis"
+ }
+ }, {
+ "name" : "timeMicros", "type": {
+ "type" : "long",
+ "logicalType" : "time-micros"
+ }
+ }, {
+ "name" : "timestampMillis", "type": {
+ "type" : "long",
+ "logicalType" : "timestamp-millis"
+ }
+ }, {
+ "name" : "timestampMicros", "type": {
+ "type" : "long",
+ "logicalType" : "timestamp-micros"
+ }
+ }, {
+ "name" : "date", "type": {
+ "type" : "int",
+ "logicalType" : "date"
+ }
+ }
+ ]
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/pom.xml
new file mode 100644
index 0000000..265eb71
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/pom.xml
@@ -0,0 +1,32 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <!-- Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with this
+ work for additional information regarding copyright ownership. The ASF licenses
+ this file to You under the Apache License, Version 2.0 (the "License"); you
+ may not use this file except in compliance with the License. You may obtain
+ a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless
+ required by applicable law or agreed to in writing, software distributed
+ under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+ OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+ the specific language governing permissions and limitations under the License. -->
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-standard-services</artifactId>
+ <version>1.2.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>nifi-schema-registry-service-api</artifactId>
+ <packaging>jar</packaging>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-serialization-service-api</artifactId>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/SchemaRegistry.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/SchemaRegistry.java b/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/SchemaRegistry.java
new file mode 100644
index 0000000..68c2461
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/SchemaRegistry.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schemaregistry.services;
+
+import java.util.Map;
+
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+/**
+ * Represents {@link ControllerService} strategy to expose internal and/or
+ * integrate with external Schema Registry
+ */
+public interface SchemaRegistry extends ControllerService, AutoCloseable {
+
+ public static final String SCHEMA_NAME_ATTR = "schema.name";
+
+
+ /**
+ * Retrieves and returns the textual representation of the schema based on
+ * the provided name of the schema available in Schema Registry. Will throw
+ * an runtime exception if schema can not be found.
+ */
+ String retrieveSchemaText(String schemaName);
+
+ /**
+ * Retrieves and returns the textual representation of the schema based on
+ * the provided name of the schema available in Schema Registry and optional
+ * additional attributes. Will throw an runtime exception if schema can not
+ * be found.
+ */
+ String retrieveSchemaText(String schemaName, Map<String, String> attributes);
+
+
+ RecordSchema retrieveSchema(String schemaName);
+
+
+ RecordSchema retrieveSchema(String schemaName, Map<String, String> attributes);
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml
index eae3515..5cee52e 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml
@@ -58,6 +58,11 @@
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-schema-registry-service-api</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-service-api</artifactId>
<scope>compile</scope>
</dependency>
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/pom.xml
index 3948a1b..4fac7d2 100644
--- a/nifi-nar-bundles/nifi-standard-services/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/pom.xml
@@ -35,6 +35,7 @@
<module>nifi-dbcp-service-bundle</module>
<module>nifi-hbase-client-service-api</module>
<module>nifi-hbase_1_1_2-client-service-bundle</module>
+ <module>nifi-schema-registry-service-api</module>
<module>nifi-record-serialization-service-api</module>
<module>nifi-record-serialization-services-bundle</module>
</modules>
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 057832b..0173b04 100644
--- a/pom.xml
+++ b/pom.xml
@@ -681,7 +681,7 @@ language governing permissions and limitations under the License. -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
- <version>1.7.7</version>
+ <version>1.8.1</version>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
@@ -921,6 +921,11 @@ language governing permissions and limitations under the License. -->
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-registry-service</artifactId>
+ <version>1.2.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
<artifactId>nifi-resources</artifactId>
<version>1.2.0-SNAPSHOT</version>
<classifier>resources</classifier>
@@ -971,6 +976,11 @@ language governing permissions and limitations under the License. -->
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-schema-registry-service-api</artifactId>
+ <version>1.2.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
<artifactId>nifi-distributed-cache-services-nar</artifactId>
<version>1.2.0-SNAPSHOT</version>
<type>nar</type>