You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2021/12/10 09:32:05 UTC

[nifi] branch main updated: NIFI-9457 Support microseconds for String Timestamps in PutKudu

This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new b7ad1f9  NIFI-9457 Support microseconds for String Timestamps in PutKudu
b7ad1f9 is described below

commit b7ad1f924d2a279f6b87748639a5a677f8e340a6
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Thu Dec 9 12:22:04 2021 -0600

    NIFI-9457 Support microseconds for String Timestamps in PutKudu
    
    - Implemented override for Timestamp Record Field Type format handling to add support for optional microseconds
    - Added FieldConverter and ObjectTimestampFieldConverter implementation for generalized Timestamp parsing using DateTimeFormatter
    - Updated PutKudu unit tests for standard Timestamp and Timestamp with microseconds
    
    Signed-off-by: Pierre Villard <pi...@gmail.com>
    
    This closes #5589.
---
 .../serialization/record/field/FieldConverter.java |  37 +++++++
 .../field/ObjectTimestampFieldConverter.java       |  86 ++++++++++++++++
 .../field/ObjectTimestampFieldConverterTest.java   | 114 +++++++++++++++++++++
 .../processors/kudu/AbstractKuduProcessor.java     |  31 +++++-
 .../apache/nifi/processors/kudu/TestPutKudu.java   |  39 +++++++
 5 files changed, 304 insertions(+), 3 deletions(-)

diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/field/FieldConverter.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/field/FieldConverter.java
new file mode 100644
index 0000000..7520be3
--- /dev/null
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/field/FieldConverter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.serialization.record.field;
+
+import java.util.Optional;
+
+/**
+ * Generalized Field Converter interface for handling type conversion with optional format parsing
+ *
+ * @param <I> Input Field Type
+ * @param <O> Output Field Type
+ */
+public interface FieldConverter<I, O> {
+    /**
+     * Convert Field using Output Field Type with optional format parsing
+     *
+     * @param field Input field to be converted
+     * @param pattern Format pattern optional for parsing
+     * @param name Input field name for tracking
+     * @return Converted Field can be null when input field is null or empty
+     */
+    O convertField(I field, Optional<String> pattern, String name);
+}
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/field/ObjectTimestampFieldConverter.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/field/ObjectTimestampFieldConverter.java
new file mode 100644
index 0000000..a624845
--- /dev/null
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/field/ObjectTimestampFieldConverter.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.serialization.record.field;
+
+import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
+
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeParseException;
+import java.util.Date;
+import java.util.Optional;
+
+/**
+ * Convert Object to java.sql.Timestamp using instanceof evaluation and optional format pattern for DateTimeFormatter
+ */
+public class ObjectTimestampFieldConverter implements FieldConverter<Object, Timestamp> {
+    /**
+     * Convert Object field to java.sql.Timestamp using optional format supported in DateTimeFormatter
+     *
+     * @param field Field can be null or a supported input type
+     * @param pattern Format pattern optional for parsing
+     * @param name Field name for tracking
+     * @return Timestamp or null when input field is null or empty string
+     * @throws IllegalTypeConversionException Thrown on parsing failures or unsupported types of input fields
+     */
+    @Override
+    public Timestamp convertField(final Object field, final Optional<String> pattern, final String name) {
+        if (field == null) {
+            return null;
+        }
+        if (field instanceof Timestamp) {
+            return (Timestamp) field;
+        }
+        if (field instanceof Date) {
+            final Date date = (Date) field;
+            return new Timestamp(date.getTime());
+        }
+        if (field instanceof Number) {
+            final Number number = (Number) field;
+            return new Timestamp(number.longValue());
+        }
+        if (field instanceof String) {
+            final String string = field.toString().trim();
+            if (string.isEmpty()) {
+                return null;
+            }
+
+            if (pattern.isPresent()) {
+                final DateTimeFormatter formatter = DateTimeFormatter.ofPattern(pattern.get());
+                try {
+                    final LocalDateTime localDateTime = LocalDateTime.parse(string, formatter);
+                    return Timestamp.valueOf(localDateTime);
+                } catch (final DateTimeParseException e) {
+                    final String message = String.format("Convert Field Name [%s] Value [%s] to Timestamp LocalDateTime parsing failed: %s", name, field, e.getMessage());
+                    throw new IllegalTypeConversionException(message);
+                }
+            } else {
+                try {
+                    final long number = Long.parseLong(string);
+                    return new Timestamp(number);
+                } catch (final NumberFormatException e) {
+                    final String message = String.format("Convert Field Name [%s] Value [%s] to Timestamp Long parsing failed: %s", name, field, e.getMessage());
+                    throw new IllegalTypeConversionException(message);
+                }
+            }
+        }
+
+        final String message = String.format("Convert Field Name [%s] Value [%s] Class [%s] to Timestamp not supported", name, field, field.getClass());
+        throw new IllegalTypeConversionException(message);
+    }
+}
diff --git a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/field/ObjectTimestampFieldConverterTest.java b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/field/ObjectTimestampFieldConverterTest.java
new file mode 100644
index 0000000..9424d0f
--- /dev/null
+++ b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/field/ObjectTimestampFieldConverterTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.serialization.record.field;
+
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
+import org.junit.jupiter.api.Test;
+
+import java.sql.Timestamp;
+import java.util.Date;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ObjectTimestampFieldConverterTest {
+    private static final ObjectTimestampFieldConverter CONVERTER = new ObjectTimestampFieldConverter();
+
+    private static final Optional<String> DEFAULT_PATTERN = Optional.of(RecordFieldType.TIMESTAMP.getDefaultFormat());
+
+    private static final String FIELD_NAME = Timestamp.class.getSimpleName();
+
+    private static final String EMPTY = "";
+
+    private static final String DATE_TIME_DEFAULT = "2000-01-01 12:00:00";
+
+    private static final Optional<String> DATE_TIME_NANOSECONDS_PATTERN = Optional.of("yyyy-MM-dd HH:mm:ss.SSSSSSSSS");
+
+    private static final String DATE_TIME_NANOSECONDS = "2000-01-01 12:00:00.123456789";
+
+    @Test
+    public void testConvertFieldNull() {
+        final Timestamp timestamp = CONVERTER.convertField(null, DEFAULT_PATTERN, FIELD_NAME);
+        assertNull(timestamp);
+    }
+
+    @Test
+    public void testConvertFieldTimestamp() {
+        final Timestamp field = new Timestamp(System.currentTimeMillis());
+        final Timestamp timestamp = CONVERTER.convertField(field, DEFAULT_PATTERN, FIELD_NAME);
+        assertEquals(field, timestamp);
+    }
+
+    @Test
+    public void testConvertFieldDate() {
+        final Date field = new Date();
+        final Timestamp timestamp = CONVERTER.convertField(field, DEFAULT_PATTERN, FIELD_NAME);
+        assertEquals(field.getTime(), timestamp.getTime());
+    }
+
+    @Test
+    public void testConvertFieldLong() {
+        final long field = System.currentTimeMillis();
+        final Timestamp timestamp = CONVERTER.convertField(field, DEFAULT_PATTERN, FIELD_NAME);
+        assertEquals(field, timestamp.getTime());
+    }
+
+    @Test
+    public void testConvertFieldStringEmpty() {
+        final Timestamp timestamp = CONVERTER.convertField(EMPTY, DEFAULT_PATTERN, FIELD_NAME);
+        assertNull(timestamp);
+    }
+
+    @Test
+    public void testConvertFieldStringFormatNull() {
+        final long currentTime = System.currentTimeMillis();
+        final String field = Long.toString(currentTime);
+        final Timestamp timestamp = CONVERTER.convertField(field, Optional.empty(), FIELD_NAME);
+        assertEquals(currentTime, timestamp.getTime());
+    }
+
+    @Test
+    public void testConvertFieldStringFormatNullNumberFormatException() {
+        final String field = String.class.getSimpleName();
+        final IllegalTypeConversionException exception = assertThrows(IllegalTypeConversionException.class, () -> CONVERTER.convertField(field, Optional.empty(), FIELD_NAME));
+        assertTrue(exception.getMessage().contains(field));
+    }
+
+    @Test
+    public void testConvertFieldStringFormatDefault() {
+        final Timestamp timestamp = CONVERTER.convertField(DATE_TIME_DEFAULT, DEFAULT_PATTERN, FIELD_NAME);
+        final Timestamp expected = Timestamp.valueOf(DATE_TIME_DEFAULT);
+        assertEquals(expected, timestamp);
+    }
+
+    @Test
+    public void testConvertFieldStringFormatCustomNanoseconds() {
+        final Timestamp timestamp = CONVERTER.convertField(DATE_TIME_NANOSECONDS, DATE_TIME_NANOSECONDS_PATTERN, FIELD_NAME);
+        final Timestamp expected = Timestamp.valueOf(DATE_TIME_NANOSECONDS);
+        assertEquals(expected, timestamp);
+    }
+
+    @Test
+    public void testConvertFieldStringFormatCustomFormatterException() {
+        final IllegalTypeConversionException exception = assertThrows(IllegalTypeConversionException.class, () -> CONVERTER.convertField(DATE_TIME_DEFAULT, DATE_TIME_NANOSECONDS_PATTERN, FIELD_NAME));
+        assertTrue(exception.getMessage().contains(DATE_TIME_DEFAULT));
+    }
+}
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
index 667058d..9bda759 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
@@ -50,6 +50,8 @@ import org.apache.nifi.security.krb.KerberosUser;
 import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.field.FieldConverter;
+import org.apache.nifi.serialization.record.field.ObjectTimestampFieldConverter;
 import org.apache.nifi.serialization.record.type.DecimalDataType;
 import org.apache.nifi.serialization.record.util.DataTypeUtils;
 import org.apache.nifi.util.StringUtils;
@@ -161,6 +163,10 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
+    private static final FieldConverter<Object, Timestamp> TIMESTAMP_FIELD_CONVERTER = new ObjectTimestampFieldConverter();
+    /** Timestamp Pattern overrides default RecordFieldType.TIMESTAMP pattern of yyyy-MM-dd HH:mm:ss with optional microseconds */
+    private static final String MICROSECOND_TIMESTAMP_PATTERN = "yyyy-MM-dd HH:mm:ss[.SSSSSS]";
+
     private volatile KuduClient kuduClient;
     private final ReadWriteLock kuduClientReadWriteLock = new ReentrantReadWriteLock();
     private final Lock kuduClientReadLock = kuduClientReadWriteLock.readLock();
@@ -419,9 +425,9 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
                         row.addLong(columnIndex,  DataTypeUtils.toLong(value, recordFieldName));
                         break;
                     case UNIXTIME_MICROS:
-                        DataType fieldType = record.getSchema().getDataType(recordFieldName).get();
-                        Timestamp timestamp = DataTypeUtils.toTimestamp(record.getValue(recordFieldName),
-                                () -> DataTypeUtils.getDateFormat(fieldType.getFormat()), recordFieldName);
+                        final Optional<DataType> optionalDataType = record.getSchema().getDataType(recordFieldName);
+                        final Optional<String> optionalPattern = getTimestampPattern(optionalDataType);
+                        final Timestamp timestamp = TIMESTAMP_FIELD_CONVERTER.convertField(value, optionalPattern, recordFieldName);
                         row.addTimestamp(columnIndex, timestamp);
                         break;
                     case STRING:
@@ -454,6 +460,25 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
     }
 
     /**
+     * Get Timestamp Pattern and override Timestamp Record Field pattern with optional microsecond pattern
+     *
+     * @param optionalDataType Optional Data Type
+     * @return Optional Timestamp Pattern
+     */
+    private Optional<String> getTimestampPattern(final Optional<DataType> optionalDataType) {
+        String pattern = null;
+        if (optionalDataType.isPresent()) {
+            final DataType dataType = optionalDataType.get();
+            if (RecordFieldType.TIMESTAMP == dataType.getFieldType()) {
+                pattern = MICROSECOND_TIMESTAMP_PATTERN;
+            } else {
+                pattern = dataType.getFormat();
+            }
+        }
+        return Optional.ofNullable(pattern);
+    }
+
+    /**
      * Get java.sql.Date from Record Field Value with optional parsing when input value is a String
      *
      * @param value Record Field Value
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
index 126ca6e..8006907 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
@@ -96,6 +96,10 @@ public class TestPutKudu {
     private static final String ISO_8601_YEAR_MONTH_DAY = "2000-01-01";
     private static final String ISO_8601_YEAR_MONTH_DAY_PATTERN = "yyyy-MM-dd";
 
+    private static final String TIMESTAMP_FIELD = "updated";
+    private static final String TIMESTAMP_STANDARD = "2000-01-01 12:00:00";
+    private static final String TIMESTAMP_MICROSECONDS = "2000-01-01 12:00:00.123456";
+
     private TestRunner testRunner;
 
     private MockPutKudu processor;
@@ -525,6 +529,41 @@ public class TestPutKudu {
         assertPartialRowDateFieldEquals(ISO_8601_YEAR_MONTH_DAY);
     }
 
+    @Test
+    public void testBuildPartialRowWithTimestampStandardString() {
+        assertPartialRowTimestampFieldEquals(TIMESTAMP_STANDARD);
+    }
+
+    @Test
+    public void testBuildPartialRowWithTimestampMicrosecondsString() {
+        assertPartialRowTimestampFieldEquals(TIMESTAMP_MICROSECONDS);
+    }
+
+    private void assertPartialRowTimestampFieldEquals(final Object timestampFieldValue) {
+        final PartialRow row = buildPartialRowTimestampField(timestampFieldValue);
+        final Timestamp timestamp = row.getTimestamp(TIMESTAMP_FIELD);
+        final Timestamp expected = Timestamp.valueOf(timestampFieldValue.toString());
+        assertEquals("Partial Row Timestamp Field not matched", expected, timestamp);
+    }
+
+    private PartialRow buildPartialRowTimestampField(final Object timestampFieldValue) {
+        final Schema kuduSchema = new Schema(Collections.singletonList(
+                new ColumnSchema.ColumnSchemaBuilder(TIMESTAMP_FIELD, Type.UNIXTIME_MICROS).nullable(true).build()
+        ));
+
+        final RecordSchema schema = new SimpleRecordSchema(Collections.singletonList(
+                new RecordField(TIMESTAMP_FIELD, RecordFieldType.TIMESTAMP.getDataType())
+        ));
+
+        final Map<String, Object> values = new HashMap<>();
+        values.put(TIMESTAMP_FIELD, timestampFieldValue);
+        final MapRecord record = new MapRecord(schema, values);
+
+        final PartialRow row = kuduSchema.newPartialRow();
+        processor.buildPartialRow(kuduSchema, row, record, schema.getFieldNames(), true, true);
+        return row;
+    }
+
     private void assertPartialRowDateFieldEquals(final Object dateFieldValue) {
         final PartialRow row = buildPartialRowDateField(dateFieldValue, Type.DATE);
         final java.sql.Date rowDate = row.getDate(DATE_FIELD);