You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jg...@apache.org on 2022/03/02 16:19:11 UTC

[nifi] branch main updated: NIFI-9064:Support Oracle timestamp when `Use Avro Logical Types` is true for ExecuteSQLRecord and QueryDatabaseTableRecord

This is an automated email from the ASF dual-hosted git repository.

jgresock pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new a85cafe  NIFI-9064:Support Oracle timestamp when `Use Avro Logical Types` is true for ExecuteSQLRecord and QueryDatabaseTableRecord
a85cafe is described below

commit a85cafe7718aee5a4080ccd35432b6517d90dab4
Author: zhangcheng <zh...@foxmail.com>
AuthorDate: Thu Aug 19 21:09:31 2021 +0800

    NIFI-9064:Support Oracle timestamp when `Use Avro Logical Types` is true for ExecuteSQLRecord and QueryDatabaseTableRecord
    
    Signed-off-by: Joe Gresock <jg...@gmail.com>
    
    This closes #5807.
---
 .../serialization/record/ResultSetRecordSet.java   | 33 +++++++++-------------
 .../record/ResultSetRecordSetTest.java             | 10 ++++++-
 2 files changed, 22 insertions(+), 21 deletions(-)

diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
index 5229dd5..cfb1e4b 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
@@ -44,6 +44,8 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.nifi.serialization.record.RecordFieldType.TIMESTAMP;
+
 public class ResultSetRecordSet implements RecordSet, Closeable {
     private static final Logger logger = LoggerFactory.getLogger(ResultSetRecordSet.class);
     private static final int JDBC_DEFAULT_PRECISION_VALUE = 10;
@@ -81,7 +83,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
         try {
             tempSchema = createSchema(rs, readerSchema, useLogicalTypes);
             moreRows = rs.next();
-        } catch(SQLException se) {
+        } catch (SQLException se) {
             // Tried to create the schema with a ResultSet without calling next() first (probably for DB2), now try the other way around
             moreRows = rs.next();
             tempSchema = createSchema(rs, readerSchema, useLogicalTypes);
@@ -136,13 +138,12 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
 
         for (final RecordField field : schema.getFields()) {
             final String fieldName = field.getFieldName();
-
+            RecordFieldType fieldType = field.getDataType().getFieldType();
             final Object value;
-            if (rsColumnNames.contains(fieldName)) {
-                value = normalizeValue(rs.getObject(fieldName));
-            } else {
-                value = null;
-            }
+
+            value = rsColumnNames.contains(fieldName)
+                    ? normalizeValue((fieldType == TIMESTAMP) ? rs.getTimestamp(fieldName) : rs.getObject(fieldName))
+                    : null;
 
             values.put(fieldName, value);
         }
@@ -186,12 +187,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
             final String fieldName = metadata.getColumnLabel(column);
 
             final int nullableFlag = metadata.isNullable(column);
-            final boolean nullable;
-            if (nullableFlag == ResultSetMetaData.columnNoNulls) {
-                nullable = false;
-            } else {
-                nullable = true;
-            }
+            final boolean nullable = nullableFlag != ResultSetMetaData.columnNoNulls;
 
             final RecordField field = new RecordField(fieldName, dataType, nullable);
             fields.add(field);
@@ -240,7 +236,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
                 if (!(obj instanceof Record)) {
                     final List<DataType> dataTypes = Stream.of(RecordFieldType.BIGINT, RecordFieldType.BOOLEAN, RecordFieldType.BYTE, RecordFieldType.CHAR, RecordFieldType.DATE,
                         RecordFieldType.DECIMAL, RecordFieldType.DOUBLE, RecordFieldType.FLOAT, RecordFieldType.INT, RecordFieldType.LONG, RecordFieldType.SHORT, RecordFieldType.STRING,
-                            RecordFieldType.TIME, RecordFieldType.TIMESTAMP)
+                            RecordFieldType.TIME, TIMESTAMP)
                     .map(RecordFieldType::getDataType)
                     .collect(Collectors.toList());
 
@@ -279,7 +275,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
                 && (fieldType == RecordFieldType.DECIMAL
                 || fieldType == RecordFieldType.DATE
                 || fieldType == RecordFieldType.TIME
-                || fieldType == RecordFieldType.TIMESTAMP)) {
+                || fieldType == TIMESTAMP)) {
             return RecordFieldType.STRING.getDataType();
         } else {
             return dataType;
@@ -424,9 +420,6 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
             if (valueToLookAt instanceof BigInteger) {
                 return RecordFieldType.BIGINT.getDataType();
             }
-            if (valueToLookAt instanceof Integer) {
-                return RecordFieldType.INT.getDataType();
-            }
             if (valueToLookAt instanceof java.sql.Time) {
                 return getDataType(RecordFieldType.TIME, useLogicalTypes);
             }
@@ -434,7 +427,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
                 return getDataType(RecordFieldType.DATE, useLogicalTypes);
             }
             if (valueToLookAt instanceof java.sql.Timestamp) {
-                return getDataType(RecordFieldType.TIMESTAMP, useLogicalTypes);
+                return getDataType(TIMESTAMP, useLogicalTypes);
             }
             if (valueToLookAt instanceof Record) {
                 final Record record = (Record) valueToLookAt;
@@ -518,7 +511,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
             case Types.TIMESTAMP_WITH_TIMEZONE:
             case -101: // Oracle's TIMESTAMP WITH TIME ZONE
             case -102: // Oracle's TIMESTAMP WITH LOCAL TIME ZONE
-                return getRecordFieldType(RecordFieldType.TIMESTAMP, useLogicalTypes);
+                return getRecordFieldType(TIMESTAMP, useLogicalTypes);
         }
 
         return RecordFieldType.STRING;
diff --git a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/ResultSetRecordSetTest.java b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/ResultSetRecordSetTest.java
index ab28dab..edc36da 100644
--- a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/ResultSetRecordSetTest.java
+++ b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/ResultSetRecordSetTest.java
@@ -19,6 +19,7 @@ package org.apache.nifi.serialization.record;
 import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.record.type.ArrayDataType;
 import org.apache.nifi.serialization.record.type.DecimalDataType;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -40,6 +41,7 @@ import java.sql.Time;
 import java.sql.Timestamp;
 import java.sql.Types;
 import java.time.LocalDate;
+import java.time.LocalDateTime;
 import java.time.ZoneOffset;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -67,6 +69,7 @@ public class ResultSetRecordSetTest {
     private static final String COLUMN_NAME_BOOLEAN = "boolean";
     private static final String COLUMN_NAME_CHAR = "char";
     private static final String COLUMN_NAME_DATE = "date";
+    private static final String COLUMN_NAME_TIMESTAMP = "timestamp";
     private static final String COLUMN_NAME_INTEGER = "integer";
     private static final String COLUMN_NAME_DOUBLE = "double";
     private static final String COLUMN_NAME_REAL = "real";
@@ -99,7 +102,8 @@ public class ResultSetRecordSetTest {
             new TestColumn(15, COLUMN_NAME_BIG_DECIMAL_2, Types.NUMERIC, RecordFieldType.DECIMAL.getDecimalDataType(4, 0)),
             new TestColumn(16, COLUMN_NAME_BIG_DECIMAL_3, Types.JAVA_OBJECT, RecordFieldType.DECIMAL.getDecimalDataType(501, 1)),
             new TestColumn(17, COLUMN_NAME_BIG_DECIMAL_4, Types.DECIMAL, RecordFieldType.DECIMAL.getDecimalDataType(10, 3)),
-            new TestColumn(18, COLUMN_NAME_BIG_DECIMAL_5, Types.DECIMAL, RecordFieldType.DECIMAL.getDecimalDataType(3, 10))
+            new TestColumn(18, COLUMN_NAME_BIG_DECIMAL_5, Types.DECIMAL, RecordFieldType.DECIMAL.getDecimalDataType(3, 10)),
+            new TestColumn(19, COLUMN_NAME_TIMESTAMP, Types.TIMESTAMP, RecordFieldType.TIMESTAMP.getDataType())
     };
 
     @Mock
@@ -252,6 +256,7 @@ public class ResultSetRecordSetTest {
         final RecordSchema recordSchema = givenRecordSchema(COLUMNS);
 
         LocalDate testDate = LocalDate.of(2021, 1, 26);
+        LocalDateTime testDateTime = LocalDateTime.of(2021, 9, 10, 11, 11, 11);
 
         final String varcharValue = "varchar";
         final Long bigintValue = 1234567890123456789L;
@@ -260,6 +265,7 @@ public class ResultSetRecordSetTest {
         final Boolean booleanValue = Boolean.TRUE;
         final Character charValue = 'c';
         final Date dateValue = Date.valueOf(testDate);
+        final Timestamp timestampValue = Timestamp.valueOf(testDateTime);
         final Integer integerValue = 1234567890;
         final Double doubleValue = 0.12;
         final Double realValue = 3.45;
@@ -279,6 +285,7 @@ public class ResultSetRecordSetTest {
         when(resultSet.getObject(COLUMN_NAME_BOOLEAN)).thenReturn(booleanValue);
         when(resultSet.getObject(COLUMN_NAME_CHAR)).thenReturn(charValue);
         when(resultSet.getObject(COLUMN_NAME_DATE)).thenReturn(dateValue);
+        when(resultSet.getTimestamp(COLUMN_NAME_TIMESTAMP)).thenReturn(timestampValue);
         when(resultSet.getObject(COLUMN_NAME_INTEGER)).thenReturn(integerValue);
         when(resultSet.getObject(COLUMN_NAME_DOUBLE)).thenReturn(doubleValue);
         when(resultSet.getObject(COLUMN_NAME_REAL)).thenReturn(realValue);
@@ -306,6 +313,7 @@ public class ResultSetRecordSetTest {
         // Date is expected in UTC normalized form
         Date expectedDate = new Date(testDate.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli());
         assertEquals(expectedDate, record.getAsDate(COLUMN_NAME_DATE, null));
+        assertEquals(timestampValue, DataTypeUtils.toTimestamp(record.getValue(COLUMN_NAME_TIMESTAMP), null, COLUMN_NAME_TIMESTAMP));
 
         assertEquals(integerValue, record.getAsInt(COLUMN_NAME_INTEGER));
         assertEquals(doubleValue, record.getAsDouble(COLUMN_NAME_DOUBLE));