You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2021/12/10 09:32:05 UTC
[nifi] branch main updated: NIFI-9457 Support microseconds for String Timestamps in PutKudu
This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new b7ad1f9 NIFI-9457 Support microseconds for String Timestamps in PutKudu
b7ad1f9 is described below
commit b7ad1f924d2a279f6b87748639a5a677f8e340a6
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Thu Dec 9 12:22:04 2021 -0600
NIFI-9457 Support microseconds for String Timestamps in PutKudu
- Implemented override for Timestamp Record Field Type format handling to add support for optional microseconds
- Added FieldConverter and ObjectTimestampFieldConverter implementation for generalized Timestamp parsing using DateTimeFormatter
- Updated PutKudu unit tests for standard Timestamp and Timestamp with microseconds
Signed-off-by: Pierre Villard <pi...@gmail.com>
This closes #5589.
---
.../serialization/record/field/FieldConverter.java | 37 +++++++
.../field/ObjectTimestampFieldConverter.java | 86 ++++++++++++++++
.../field/ObjectTimestampFieldConverterTest.java | 114 +++++++++++++++++++++
.../processors/kudu/AbstractKuduProcessor.java | 31 +++++-
.../apache/nifi/processors/kudu/TestPutKudu.java | 39 +++++++
5 files changed, 304 insertions(+), 3 deletions(-)
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/field/FieldConverter.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/field/FieldConverter.java
new file mode 100644
index 0000000..7520be3
--- /dev/null
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/field/FieldConverter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.serialization.record.field;
+
+import java.util.Optional;
+
+/**
+ * Generalized Field Converter interface for handling type conversion with optional format parsing
+ *
+ * @param <I> Input Field Type
+ * @param <O> Output Field Type
+ */
+public interface FieldConverter<I, O> {
+ /**
+ * Convert Field using Output Field Type with optional format parsing
+ *
+ * @param field Input field to be converted
+ * @param pattern Format pattern optional for parsing
+ * @param name Input field name for tracking
+ * @return Converted Field can be null when input field is null or empty
+ */
+ O convertField(I field, Optional<String> pattern, String name);
+}
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/field/ObjectTimestampFieldConverter.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/field/ObjectTimestampFieldConverter.java
new file mode 100644
index 0000000..a624845
--- /dev/null
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/field/ObjectTimestampFieldConverter.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.serialization.record.field;
+
+import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
+
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeParseException;
+import java.util.Date;
+import java.util.Optional;
+
+/**
+ * Convert Object to java.sql.Timestamp using instanceof evaluation and optional format pattern for DateTimeFormatter
+ */
+public class ObjectTimestampFieldConverter implements FieldConverter<Object, Timestamp> {
+ /**
+ * Convert Object field to java.sql.Timestamp using optional format supported in DateTimeFormatter
+ *
+ * @param field Field can be null or a supported input type
+ * @param pattern Format pattern optional for parsing
+ * @param name Field name for tracking
+ * @return Timestamp or null when input field is null or empty string
+ * @throws IllegalTypeConversionException Thrown on parsing failures or unsupported types of input fields
+ */
+ @Override
+ public Timestamp convertField(final Object field, final Optional<String> pattern, final String name) {
+ if (field == null) {
+ return null;
+ }
+ if (field instanceof Timestamp) {
+ return (Timestamp) field;
+ }
+ if (field instanceof Date) {
+ final Date date = (Date) field;
+ return new Timestamp(date.getTime());
+ }
+ if (field instanceof Number) {
+ final Number number = (Number) field;
+ return new Timestamp(number.longValue());
+ }
+ if (field instanceof String) {
+ final String string = field.toString().trim();
+ if (string.isEmpty()) {
+ return null;
+ }
+
+ if (pattern.isPresent()) {
+ final DateTimeFormatter formatter = DateTimeFormatter.ofPattern(pattern.get());
+ try {
+ final LocalDateTime localDateTime = LocalDateTime.parse(string, formatter);
+ return Timestamp.valueOf(localDateTime);
+ } catch (final DateTimeParseException e) {
+ final String message = String.format("Convert Field Name [%s] Value [%s] to Timestamp LocalDateTime parsing failed: %s", name, field, e.getMessage());
+ throw new IllegalTypeConversionException(message);
+ }
+ } else {
+ try {
+ final long number = Long.parseLong(string);
+ return new Timestamp(number);
+ } catch (final NumberFormatException e) {
+ final String message = String.format("Convert Field Name [%s] Value [%s] to Timestamp Long parsing failed: %s", name, field, e.getMessage());
+ throw new IllegalTypeConversionException(message);
+ }
+ }
+ }
+
+ final String message = String.format("Convert Field Name [%s] Value [%s] Class [%s] to Timestamp not supported", name, field, field.getClass());
+ throw new IllegalTypeConversionException(message);
+ }
+}
diff --git a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/field/ObjectTimestampFieldConverterTest.java b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/field/ObjectTimestampFieldConverterTest.java
new file mode 100644
index 0000000..9424d0f
--- /dev/null
+++ b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/field/ObjectTimestampFieldConverterTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.serialization.record.field;
+
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
+import org.junit.jupiter.api.Test;
+
+import java.sql.Timestamp;
+import java.util.Date;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ObjectTimestampFieldConverterTest {
+ private static final ObjectTimestampFieldConverter CONVERTER = new ObjectTimestampFieldConverter();
+
+ private static final Optional<String> DEFAULT_PATTERN = Optional.of(RecordFieldType.TIMESTAMP.getDefaultFormat());
+
+ private static final String FIELD_NAME = Timestamp.class.getSimpleName();
+
+ private static final String EMPTY = "";
+
+ private static final String DATE_TIME_DEFAULT = "2000-01-01 12:00:00";
+
+ private static final Optional<String> DATE_TIME_NANOSECONDS_PATTERN = Optional.of("yyyy-MM-dd HH:mm:ss.SSSSSSSSS");
+
+ private static final String DATE_TIME_NANOSECONDS = "2000-01-01 12:00:00.123456789";
+
+ @Test
+ public void testConvertFieldNull() {
+ final Timestamp timestamp = CONVERTER.convertField(null, DEFAULT_PATTERN, FIELD_NAME);
+ assertNull(timestamp);
+ }
+
+ @Test
+ public void testConvertFieldTimestamp() {
+ final Timestamp field = new Timestamp(System.currentTimeMillis());
+ final Timestamp timestamp = CONVERTER.convertField(field, DEFAULT_PATTERN, FIELD_NAME);
+ assertEquals(field, timestamp);
+ }
+
+ @Test
+ public void testConvertFieldDate() {
+ final Date field = new Date();
+ final Timestamp timestamp = CONVERTER.convertField(field, DEFAULT_PATTERN, FIELD_NAME);
+ assertEquals(field.getTime(), timestamp.getTime());
+ }
+
+ @Test
+ public void testConvertFieldLong() {
+ final long field = System.currentTimeMillis();
+ final Timestamp timestamp = CONVERTER.convertField(field, DEFAULT_PATTERN, FIELD_NAME);
+ assertEquals(field, timestamp.getTime());
+ }
+
+ @Test
+ public void testConvertFieldStringEmpty() {
+ final Timestamp timestamp = CONVERTER.convertField(EMPTY, DEFAULT_PATTERN, FIELD_NAME);
+ assertNull(timestamp);
+ }
+
+ @Test
+ public void testConvertFieldStringFormatNull() {
+ final long currentTime = System.currentTimeMillis();
+ final String field = Long.toString(currentTime);
+ final Timestamp timestamp = CONVERTER.convertField(field, Optional.empty(), FIELD_NAME);
+ assertEquals(currentTime, timestamp.getTime());
+ }
+
+ @Test
+ public void testConvertFieldStringFormatNullNumberFormatException() {
+ final String field = String.class.getSimpleName();
+ final IllegalTypeConversionException exception = assertThrows(IllegalTypeConversionException.class, () -> CONVERTER.convertField(field, Optional.empty(), FIELD_NAME));
+ assertTrue(exception.getMessage().contains(field));
+ }
+
+ @Test
+ public void testConvertFieldStringFormatDefault() {
+ final Timestamp timestamp = CONVERTER.convertField(DATE_TIME_DEFAULT, DEFAULT_PATTERN, FIELD_NAME);
+ final Timestamp expected = Timestamp.valueOf(DATE_TIME_DEFAULT);
+ assertEquals(expected, timestamp);
+ }
+
+ @Test
+ public void testConvertFieldStringFormatCustomNanoseconds() {
+ final Timestamp timestamp = CONVERTER.convertField(DATE_TIME_NANOSECONDS, DATE_TIME_NANOSECONDS_PATTERN, FIELD_NAME);
+ final Timestamp expected = Timestamp.valueOf(DATE_TIME_NANOSECONDS);
+ assertEquals(expected, timestamp);
+ }
+
+ @Test
+ public void testConvertFieldStringFormatCustomFormatterException() {
+ final IllegalTypeConversionException exception = assertThrows(IllegalTypeConversionException.class, () -> CONVERTER.convertField(DATE_TIME_DEFAULT, DATE_TIME_NANOSECONDS_PATTERN, FIELD_NAME));
+ assertTrue(exception.getMessage().contains(DATE_TIME_DEFAULT));
+ }
+}
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
index 667058d..9bda759 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
@@ -50,6 +50,8 @@ import org.apache.nifi.security.krb.KerberosUser;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.field.FieldConverter;
+import org.apache.nifi.serialization.record.field.ObjectTimestampFieldConverter;
import org.apache.nifi.serialization.record.type.DecimalDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.util.StringUtils;
@@ -161,6 +163,10 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
+ private static final FieldConverter<Object, Timestamp> TIMESTAMP_FIELD_CONVERTER = new ObjectTimestampFieldConverter();
+ /** Timestamp Pattern overrides default RecordFieldType.TIMESTAMP pattern of yyyy-MM-dd HH:mm:ss with optional microseconds */
+ private static final String MICROSECOND_TIMESTAMP_PATTERN = "yyyy-MM-dd HH:mm:ss[.SSSSSS]";
+
private volatile KuduClient kuduClient;
private final ReadWriteLock kuduClientReadWriteLock = new ReentrantReadWriteLock();
private final Lock kuduClientReadLock = kuduClientReadWriteLock.readLock();
@@ -419,9 +425,9 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
row.addLong(columnIndex, DataTypeUtils.toLong(value, recordFieldName));
break;
case UNIXTIME_MICROS:
- DataType fieldType = record.getSchema().getDataType(recordFieldName).get();
- Timestamp timestamp = DataTypeUtils.toTimestamp(record.getValue(recordFieldName),
- () -> DataTypeUtils.getDateFormat(fieldType.getFormat()), recordFieldName);
+ final Optional<DataType> optionalDataType = record.getSchema().getDataType(recordFieldName);
+ final Optional<String> optionalPattern = getTimestampPattern(optionalDataType);
+ final Timestamp timestamp = TIMESTAMP_FIELD_CONVERTER.convertField(value, optionalPattern, recordFieldName);
row.addTimestamp(columnIndex, timestamp);
break;
case STRING:
@@ -454,6 +460,25 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
}
/**
+ * Get Timestamp Pattern and override Timestamp Record Field pattern with optional microsecond pattern
+ *
+ * @param optionalDataType Optional Data Type
+ * @return Optional Timestamp Pattern
+ */
+ private Optional<String> getTimestampPattern(final Optional<DataType> optionalDataType) {
+ String pattern = null;
+ if (optionalDataType.isPresent()) {
+ final DataType dataType = optionalDataType.get();
+ if (RecordFieldType.TIMESTAMP == dataType.getFieldType()) {
+ pattern = MICROSECOND_TIMESTAMP_PATTERN;
+ } else {
+ pattern = dataType.getFormat();
+ }
+ }
+ return Optional.ofNullable(pattern);
+ }
+
+ /**
* Get java.sql.Date from Record Field Value with optional parsing when input value is a String
*
* @param value Record Field Value
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
index 126ca6e..8006907 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
@@ -96,6 +96,10 @@ public class TestPutKudu {
private static final String ISO_8601_YEAR_MONTH_DAY = "2000-01-01";
private static final String ISO_8601_YEAR_MONTH_DAY_PATTERN = "yyyy-MM-dd";
+ private static final String TIMESTAMP_FIELD = "updated";
+ private static final String TIMESTAMP_STANDARD = "2000-01-01 12:00:00";
+ private static final String TIMESTAMP_MICROSECONDS = "2000-01-01 12:00:00.123456";
+
private TestRunner testRunner;
private MockPutKudu processor;
@@ -525,6 +529,41 @@ public class TestPutKudu {
assertPartialRowDateFieldEquals(ISO_8601_YEAR_MONTH_DAY);
}
+ @Test
+ public void testBuildPartialRowWithTimestampStandardString() {
+ assertPartialRowTimestampFieldEquals(TIMESTAMP_STANDARD);
+ }
+
+ @Test
+ public void testBuildPartialRowWithTimestampMicrosecondsString() {
+ assertPartialRowTimestampFieldEquals(TIMESTAMP_MICROSECONDS);
+ }
+
+ private void assertPartialRowTimestampFieldEquals(final Object timestampFieldValue) {
+ final PartialRow row = buildPartialRowTimestampField(timestampFieldValue);
+ final Timestamp timestamp = row.getTimestamp(TIMESTAMP_FIELD);
+ final Timestamp expected = Timestamp.valueOf(timestampFieldValue.toString());
+ assertEquals("Partial Row Timestamp Field not matched", expected, timestamp);
+ }
+
+ private PartialRow buildPartialRowTimestampField(final Object timestampFieldValue) {
+ final Schema kuduSchema = new Schema(Collections.singletonList(
+ new ColumnSchema.ColumnSchemaBuilder(TIMESTAMP_FIELD, Type.UNIXTIME_MICROS).nullable(true).build()
+ ));
+
+ final RecordSchema schema = new SimpleRecordSchema(Collections.singletonList(
+ new RecordField(TIMESTAMP_FIELD, RecordFieldType.TIMESTAMP.getDataType())
+ ));
+
+ final Map<String, Object> values = new HashMap<>();
+ values.put(TIMESTAMP_FIELD, timestampFieldValue);
+ final MapRecord record = new MapRecord(schema, values);
+
+ final PartialRow row = kuduSchema.newPartialRow();
+ processor.buildPartialRow(kuduSchema, row, record, schema.getFieldNames(), true, true);
+ return row;
+ }
+
private void assertPartialRowDateFieldEquals(final Object dateFieldValue) {
final PartialRow row = buildPartialRowDateField(dateFieldValue, Type.DATE);
final java.sql.Date rowDate = row.getDate(DATE_FIELD);