You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jg...@apache.org on 2022/03/02 16:19:11 UTC
[nifi] branch main updated: NIFI-9064:Support Oracle timestamp when `Use Avro Logical Types` is true for ExecuteSQLRecord and QueryDatabaseTableRecord
This is an automated email from the ASF dual-hosted git repository.
jgresock pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new a85cafe NIFI-9064:Support Oracle timestamp when `Use Avro Logical Types` is true for ExecuteSQLRecord and QueryDatabaseTableRecord
a85cafe is described below
commit a85cafe7718aee5a4080ccd35432b6517d90dab4
Author: zhangcheng <zh...@foxmail.com>
AuthorDate: Thu Aug 19 21:09:31 2021 +0800
NIFI-9064:Support Oracle timestamp when `Use Avro Logical Types` is true for ExecuteSQLRecord and QueryDatabaseTableRecord
Signed-off-by: Joe Gresock <jg...@gmail.com>
This closes #5807.
---
.../serialization/record/ResultSetRecordSet.java | 33 +++++++++-------------
.../record/ResultSetRecordSetTest.java | 10 ++++++-
2 files changed, 22 insertions(+), 21 deletions(-)
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
index 5229dd5..cfb1e4b 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
@@ -44,6 +44,8 @@ import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.apache.nifi.serialization.record.RecordFieldType.TIMESTAMP;
+
public class ResultSetRecordSet implements RecordSet, Closeable {
private static final Logger logger = LoggerFactory.getLogger(ResultSetRecordSet.class);
private static final int JDBC_DEFAULT_PRECISION_VALUE = 10;
@@ -81,7 +83,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
try {
tempSchema = createSchema(rs, readerSchema, useLogicalTypes);
moreRows = rs.next();
- } catch(SQLException se) {
+ } catch (SQLException se) {
// Tried to create the schema with a ResultSet without calling next() first (probably for DB2), now try the other way around
moreRows = rs.next();
tempSchema = createSchema(rs, readerSchema, useLogicalTypes);
@@ -136,13 +138,12 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
for (final RecordField field : schema.getFields()) {
final String fieldName = field.getFieldName();
-
+ RecordFieldType fieldType = field.getDataType().getFieldType();
final Object value;
- if (rsColumnNames.contains(fieldName)) {
- value = normalizeValue(rs.getObject(fieldName));
- } else {
- value = null;
- }
+
+ value = rsColumnNames.contains(fieldName)
+ ? normalizeValue((fieldType == TIMESTAMP) ? rs.getTimestamp(fieldName) : rs.getObject(fieldName))
+ : null;
values.put(fieldName, value);
}
@@ -186,12 +187,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
final String fieldName = metadata.getColumnLabel(column);
final int nullableFlag = metadata.isNullable(column);
- final boolean nullable;
- if (nullableFlag == ResultSetMetaData.columnNoNulls) {
- nullable = false;
- } else {
- nullable = true;
- }
+ final boolean nullable = nullableFlag != ResultSetMetaData.columnNoNulls;
final RecordField field = new RecordField(fieldName, dataType, nullable);
fields.add(field);
@@ -240,7 +236,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
if (!(obj instanceof Record)) {
final List<DataType> dataTypes = Stream.of(RecordFieldType.BIGINT, RecordFieldType.BOOLEAN, RecordFieldType.BYTE, RecordFieldType.CHAR, RecordFieldType.DATE,
RecordFieldType.DECIMAL, RecordFieldType.DOUBLE, RecordFieldType.FLOAT, RecordFieldType.INT, RecordFieldType.LONG, RecordFieldType.SHORT, RecordFieldType.STRING,
- RecordFieldType.TIME, RecordFieldType.TIMESTAMP)
+ RecordFieldType.TIME, TIMESTAMP)
.map(RecordFieldType::getDataType)
.collect(Collectors.toList());
@@ -279,7 +275,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
&& (fieldType == RecordFieldType.DECIMAL
|| fieldType == RecordFieldType.DATE
|| fieldType == RecordFieldType.TIME
- || fieldType == RecordFieldType.TIMESTAMP)) {
+ || fieldType == TIMESTAMP)) {
return RecordFieldType.STRING.getDataType();
} else {
return dataType;
@@ -424,9 +420,6 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
if (valueToLookAt instanceof BigInteger) {
return RecordFieldType.BIGINT.getDataType();
}
- if (valueToLookAt instanceof Integer) {
- return RecordFieldType.INT.getDataType();
- }
if (valueToLookAt instanceof java.sql.Time) {
return getDataType(RecordFieldType.TIME, useLogicalTypes);
}
@@ -434,7 +427,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
return getDataType(RecordFieldType.DATE, useLogicalTypes);
}
if (valueToLookAt instanceof java.sql.Timestamp) {
- return getDataType(RecordFieldType.TIMESTAMP, useLogicalTypes);
+ return getDataType(TIMESTAMP, useLogicalTypes);
}
if (valueToLookAt instanceof Record) {
final Record record = (Record) valueToLookAt;
@@ -518,7 +511,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
case Types.TIMESTAMP_WITH_TIMEZONE:
case -101: // Oracle's TIMESTAMP WITH TIME ZONE
case -102: // Oracle's TIMESTAMP WITH LOCAL TIME ZONE
- return getRecordFieldType(RecordFieldType.TIMESTAMP, useLogicalTypes);
+ return getRecordFieldType(TIMESTAMP, useLogicalTypes);
}
return RecordFieldType.STRING;
diff --git a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/ResultSetRecordSetTest.java b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/ResultSetRecordSetTest.java
index ab28dab..edc36da 100644
--- a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/ResultSetRecordSetTest.java
+++ b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/ResultSetRecordSetTest.java
@@ -19,6 +19,7 @@ package org.apache.nifi.serialization.record;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.DecimalDataType;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -40,6 +41,7 @@ import java.sql.Time;
import java.sql.Timestamp;
import java.sql.Types;
import java.time.LocalDate;
+import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
@@ -67,6 +69,7 @@ public class ResultSetRecordSetTest {
private static final String COLUMN_NAME_BOOLEAN = "boolean";
private static final String COLUMN_NAME_CHAR = "char";
private static final String COLUMN_NAME_DATE = "date";
+ private static final String COLUMN_NAME_TIMESTAMP = "timestamp";
private static final String COLUMN_NAME_INTEGER = "integer";
private static final String COLUMN_NAME_DOUBLE = "double";
private static final String COLUMN_NAME_REAL = "real";
@@ -99,7 +102,8 @@ public class ResultSetRecordSetTest {
new TestColumn(15, COLUMN_NAME_BIG_DECIMAL_2, Types.NUMERIC, RecordFieldType.DECIMAL.getDecimalDataType(4, 0)),
new TestColumn(16, COLUMN_NAME_BIG_DECIMAL_3, Types.JAVA_OBJECT, RecordFieldType.DECIMAL.getDecimalDataType(501, 1)),
new TestColumn(17, COLUMN_NAME_BIG_DECIMAL_4, Types.DECIMAL, RecordFieldType.DECIMAL.getDecimalDataType(10, 3)),
- new TestColumn(18, COLUMN_NAME_BIG_DECIMAL_5, Types.DECIMAL, RecordFieldType.DECIMAL.getDecimalDataType(3, 10))
+ new TestColumn(18, COLUMN_NAME_BIG_DECIMAL_5, Types.DECIMAL, RecordFieldType.DECIMAL.getDecimalDataType(3, 10)),
+ new TestColumn(19, COLUMN_NAME_TIMESTAMP, Types.TIMESTAMP, RecordFieldType.TIMESTAMP.getDataType())
};
@Mock
@@ -252,6 +256,7 @@ public class ResultSetRecordSetTest {
final RecordSchema recordSchema = givenRecordSchema(COLUMNS);
LocalDate testDate = LocalDate.of(2021, 1, 26);
+ LocalDateTime testDateTime = LocalDateTime.of(2021, 9, 10, 11, 11, 11);
final String varcharValue = "varchar";
final Long bigintValue = 1234567890123456789L;
@@ -260,6 +265,7 @@ public class ResultSetRecordSetTest {
final Boolean booleanValue = Boolean.TRUE;
final Character charValue = 'c';
final Date dateValue = Date.valueOf(testDate);
+ final Timestamp timestampValue = Timestamp.valueOf(testDateTime);
final Integer integerValue = 1234567890;
final Double doubleValue = 0.12;
final Double realValue = 3.45;
@@ -279,6 +285,7 @@ public class ResultSetRecordSetTest {
when(resultSet.getObject(COLUMN_NAME_BOOLEAN)).thenReturn(booleanValue);
when(resultSet.getObject(COLUMN_NAME_CHAR)).thenReturn(charValue);
when(resultSet.getObject(COLUMN_NAME_DATE)).thenReturn(dateValue);
+ when(resultSet.getTimestamp(COLUMN_NAME_TIMESTAMP)).thenReturn(timestampValue);
when(resultSet.getObject(COLUMN_NAME_INTEGER)).thenReturn(integerValue);
when(resultSet.getObject(COLUMN_NAME_DOUBLE)).thenReturn(doubleValue);
when(resultSet.getObject(COLUMN_NAME_REAL)).thenReturn(realValue);
@@ -306,6 +313,7 @@ public class ResultSetRecordSetTest {
// Date is expected in UTC normalized form
Date expectedDate = new Date(testDate.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli());
assertEquals(expectedDate, record.getAsDate(COLUMN_NAME_DATE, null));
+ assertEquals(timestampValue, DataTypeUtils.toTimestamp(record.getValue(COLUMN_NAME_TIMESTAMP), null, COLUMN_NAME_TIMESTAMP));
assertEquals(integerValue, record.getAsInt(COLUMN_NAME_INTEGER));
assertEquals(doubleValue, record.getAsDouble(COLUMN_NAME_DOUBLE));