You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2021/12/14 17:15:32 UTC
[nifi] 14/15: NIFI-9185 Add Avro logical type to SelectHive3QL processor
This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch support/nifi-1.15
in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 2f9963a534d55df2608fe4efaa0841bee7bba393
Author: Timea Barna <ti...@gmail.com>
AuthorDate: Tue Aug 31 13:44:34 2021 +0200
NIFI-9185 Add Avro logical type to SelectHive3QL processor
Modifying unit test to avoid systemdefault timezone usuage
NIFI-9185 Applying review recommendations removing duplicate dependency from pom.xml
Signed-off-by: Matthew Burgess <ma...@apache.org>
This closes #5358
---
.../apache/nifi/processors/hive/SelectHive3QL.java | 19 +++-
.../org/apache/nifi/util/hive/HiveJdbcCommon.java | 72 ++++++++++--
.../nifi/processors/hive/TestSelectHive3QL.java | 124 +++++++++++++++++++++
3 files changed, 202 insertions(+), 13 deletions(-)
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/SelectHive3QL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/SelectHive3QL.java
index f124c73..af87bd1 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/SelectHive3QL.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/SelectHive3QL.java
@@ -236,6 +236,21 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
+ public static final PropertyDescriptor USE_AVRO_LOGICAL_TYPES = new PropertyDescriptor.Builder()
+ .name("use-logical-types")
+ .displayName("Use Avro Logical Types")
+ .description("Whether to use Avro Logical Types for DECIMAL, DATE and TIMESTAMP columns. "
+ + "If disabled, written as string. "
+ + "If enabled, Logical types are used and written as its underlying type, specifically, "
+ + "DECIMAL as logical 'decimal': written as bytes with additional precision and scale meta data, "
+ + "DATE as logical 'date': written as int denoting days since Unix epoch (1970-01-01), "
+ + "and TIMESTAMP as logical 'timestamp-millis': written as long denoting milliseconds since Unix epoch. "
+ + "If a reader of written Avro records also knows these logical types, then these values can be deserialized with more context depending on reader implementation.")
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .required(true)
+ .build();
+
private final static List<PropertyDescriptor> propertyDescriptors;
private final static Set<Relationship> relationships;
@@ -255,6 +270,7 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
_propertyDescriptors.add(MAX_FRAGMENTS);
_propertyDescriptors.add(HIVEQL_OUTPUT_FORMAT);
_propertyDescriptors.add(NORMALIZE_NAMES_FOR_AVRO);
+ _propertyDescriptors.add(USE_AVRO_LOGICAL_TYPES);
_propertyDescriptors.add(HIVEQL_CSV_HEADER);
_propertyDescriptors.add(HIVEQL_CSV_ALT_HEADER);
_propertyDescriptors.add(HIVEQL_CSV_DELIMITER);
@@ -344,6 +360,7 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
final String delimiter = context.getProperty(HIVEQL_CSV_DELIMITER).evaluateAttributeExpressions(fileToProcess).getValue();
final boolean quote = context.getProperty(HIVEQL_CSV_QUOTE).asBoolean();
final boolean escape = context.getProperty(HIVEQL_CSV_HEADER).asBoolean();
+ final boolean useLogicalTypes = context.getProperty(USE_AVRO_LOGICAL_TYPES).asBoolean();
final String fragmentIdentifier = UUID.randomUUID().toString();
try (final Connection con = dbcpService.getConnection(fileToProcess == null ? Collections.emptyMap() : fileToProcess.getAttributes());
@@ -411,7 +428,7 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
flowfile = session.write(flowfile, out -> {
try {
if (AVRO.equals(outputFormat)) {
- nrOfRows.set(HiveJdbcCommon.convertToAvroStream(resultSet, out, maxRowsPerFlowFile, convertNamesForAvro));
+ nrOfRows.set(HiveJdbcCommon.convertToAvroStream(resultSet, out, maxRowsPerFlowFile, convertNamesForAvro, useLogicalTypes));
} else if (CSV.equals(outputFormat)) {
CsvOutputOptions options = new CsvOutputOptions(header, altHeader, delimiter, quote, escape, maxRowsPerFlowFile);
nrOfRows.set(HiveJdbcCommon.convertToCsvStream(resultSet, out, options));
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/util/hive/HiveJdbcCommon.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/util/hive/HiveJdbcCommon.java
index 09eecce..2a704c0 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/util/hive/HiveJdbcCommon.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/util/hive/HiveJdbcCommon.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.util.hive;
+import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.SchemaBuilder.FieldAssembler;
@@ -29,6 +30,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.components.PropertyDescriptor;
import java.io.IOException;
@@ -88,6 +90,10 @@ public class HiveJdbcCommon {
public static final String MIME_TYPE_AVRO_BINARY = "application/avro-binary";
public static final String CSV_MIME_TYPE = "text/csv";
+ private static final Schema TIMESTAMP_MILLIS_SCHEMA = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG));
+ private static final Schema DATE_SCHEMA = LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT));
+ private static final int DEFAULT_PRECISION = 10;
+ private static final int DEFAULT_SCALE = 0;
public static final PropertyDescriptor NORMALIZE_NAMES_FOR_AVRO = new PropertyDescriptor.Builder()
.name("hive-normalize-avro")
@@ -99,14 +105,15 @@ public class HiveJdbcCommon {
.required(true)
.build();
- public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, final int maxRows, boolean convertNames) throws SQLException, IOException {
- return convertToAvroStream(rs, outStream, null, maxRows, convertNames, null);
+ public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, final int maxRows, boolean convertNames, final boolean useLogicalTypes) throws SQLException, IOException {
+ return convertToAvroStream(rs, outStream, null, maxRows, convertNames, null, useLogicalTypes);
}
- public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, String recordName, final int maxRows, boolean convertNames, ResultSetRowCallback callback)
+ public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, String recordName, final int maxRows, boolean convertNames,
+ ResultSetRowCallback callback, final boolean useLogicalTypes)
throws SQLException, IOException {
- final Schema schema = createSchema(rs, recordName, convertNames);
+ final Schema schema = createSchema(rs, recordName, convertNames, useLogicalTypes);
final GenericRecord rec = new GenericData.Record(schema);
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
@@ -149,7 +156,16 @@ public class HiveJdbcCommon {
// org.apache.avro.AvroRuntimeException: Unknown datum type java.lang.Byte
rec.put(i - 1, ((Byte) value).intValue());
- } else if (value instanceof BigDecimal || value instanceof BigInteger) {
+ } else if (value instanceof BigDecimal) {
+ if (useLogicalTypes) {
+ final int precision = getPrecision(meta.getPrecision(i));
+ final int scale = getScale(meta.getScale(i));
+ rec.put(i - 1, AvroTypeUtil.convertToAvroObject(value, LogicalTypes.decimal(precision, scale).addToSchema(Schema.create(Schema.Type.BYTES))));
+ } else {
+ rec.put(i - 1, value.toString());
+ }
+
+ } else if (value instanceof BigInteger) {
// Avro can't handle BigDecimal and BigInteger as numbers - it will throw an AvroRuntimeException such as: "Unknown datum type: java.math.BigDecimal: 38"
rec.put(i - 1, value.toString());
@@ -170,10 +186,14 @@ public class HiveJdbcCommon {
rec.put(i - 1, value);
} else if (value instanceof java.sql.SQLXML) {
rec.put(i - 1, ((java.sql.SQLXML) value).getString());
+ } else if (useLogicalTypes && javaSqlType == DATE) {
+ rec.put(i - 1, AvroTypeUtil.convertToAvroObject(value, DATE_SCHEMA));
+ } else if (useLogicalTypes && javaSqlType == TIMESTAMP) {
+ rec.put(i - 1, AvroTypeUtil.convertToAvroObject(value, TIMESTAMP_MILLIS_SCHEMA));
} else {
// The different types that we support are numbers (int, long, double, float),
- // as well as boolean values and Strings. Since Avro doesn't provide
- // timestamp types, we want to convert those to Strings. So we will cast anything other
+ // as well as boolean values, decimal, date, timestamp and Strings. Since Avro doesn't provide
+ // times type, we want to convert those to Strings. So we will cast anything other
// than numbers or booleans to strings by using the toString() method.
rec.put(i - 1, value.toString());
}
@@ -190,7 +210,7 @@ public class HiveJdbcCommon {
}
public static Schema createSchema(final ResultSet rs, boolean convertNames) throws SQLException {
- return createSchema(rs, null, false);
+ return createSchema(rs, null, false, false);
}
/**
@@ -203,7 +223,7 @@ public class HiveJdbcCommon {
* @return A Schema object representing the result set converted to an Avro record
* @throws SQLException if any error occurs during conversion
*/
- public static Schema createSchema(final ResultSet rs, String recordName, boolean convertNames) throws SQLException {
+ public static Schema createSchema(final ResultSet rs, String recordName, boolean convertNames, final boolean useLogicalTypes) throws SQLException {
final ResultSetMetaData meta = rs.getMetaData();
final int nrOfColumns = meta.getColumnCount();
String tableName = StringUtils.isEmpty(recordName) ? "NiFi_SelectHiveQL_Record" : recordName;
@@ -298,14 +318,32 @@ public class HiveJdbcCommon {
// Did not find direct suitable type, need to be clarified!!!!
case DECIMAL:
case NUMERIC:
- builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
+ if (useLogicalTypes) {
+ final int precision = getPrecision(meta.getPrecision(i));
+ final int scale = getScale(meta.getScale(i));
+ builder.name(columnName).type().unionOf().nullBuilder().endNull().and()
+ .type(LogicalTypes.decimal(precision, scale).addToSchema(Schema.create(Schema.Type.BYTES))).endUnion().noDefault();
+ } else {
+ builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
+ }
break;
- // Did not find direct suitable type, need to be clarified!!!!
+ // Dates were introduced in Hive 0.12.0
case DATE:
+ if (useLogicalTypes) {
+ builder.name(columnName).type().unionOf().nullBuilder().endNull().and().type(DATE_SCHEMA).endUnion().noDefault();
+ } else {
+ builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
+ }
+ break;
+ // Did not find direct suitable type, need to be clarified!!!!
case TIME:
case TIMESTAMP:
- builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
+ if (useLogicalTypes) {
+ builder.name(columnName).type().unionOf().nullBuilder().endNull().and().type(TIMESTAMP_MILLIS_SCHEMA).endUnion().noDefault();
+ } else {
+ builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
+ }
break;
case BINARY:
@@ -461,4 +499,14 @@ public class HiveJdbcCommon {
}
return hiveConfig;
}
+
+ //If data in result set contains invalid precision value use Hive default precision.
+ private static int getPrecision(int precision) {
+ return precision > 1 ? precision : DEFAULT_PRECISION;
+ }
+
+ //If data in result set contains invalid scale value use Hive default scale.
+ private static int getScale(int scale) {
+ return scale > 0 ? scale : DEFAULT_SCALE;
+ }
}
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHive3QL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHive3QL.java
index 356106f..d319742 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHive3QL.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHive3QL.java
@@ -16,6 +16,9 @@
*/
package org.apache.nifi.processors.hive;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
@@ -44,11 +47,15 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
+import java.sql.Timestamp;
import java.sql.Types;
+import java.time.LocalDate;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -67,6 +74,11 @@ public class TestSelectHive3QL {
private static final Logger LOGGER;
private final static String MAX_ROWS_KEY = "maxRows";
private final int NUM_OF_ROWS = 100;
+ private static final int ID = 1;
+ private static final String NAME = "Joe Smith";
+ private static final String BIRTH_DATE = "1956-11-22";
+ private static final String BIG_NUMBER = "12345678.12";
+ private static final String CREATED_ON = "1962-09-23 03:23:34.234";
static {
@@ -690,6 +702,111 @@ public class TestSelectHive3QL {
runner.clearTransferState();
}
+ @Test
+ public void testAvroRecordCreatedWithoutLogicalTypesByDefault() throws SQLException, IOException {
+ final Schema expectedSchema = SchemaBuilder.record("NiFi_SelectHiveQL_Record").namespace("any.data").fields()
+ .name("ID").type().unionOf().nullBuilder().endNull().and().intType().endUnion().noDefault()
+ .name("NAME").type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault()
+ .name("BIRTH_DATE").type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault()
+ .name("BIG_NUMBER").type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault()
+ .name("CREATED_ON").type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault()
+ .endRecord();
+
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+ final Statement stmt = con.createStatement();
+ final InputStream in;
+ final MockFlowFile mff;
+
+ try {
+ stmt.execute("drop table TEST_QUERY_DB_TABLE");
+ } catch (final SQLException sqle) {
+ // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+ }
+
+ stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), birth_date date, big_number decimal(10,2),created_on timestamp)");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, birth_date, big_number, created_on) VALUES (" +
+ ID + ", '" + NAME + "', '" + BIRTH_DATE + "', " + BIG_NUMBER + ", '" + CREATED_ON + "')");
+
+ runner.setIncomingConnection(false);
+ runner.setProperty(SelectHive3QL.HIVEQL_SELECT_QUERY, "SELECT * FROM TEST_QUERY_DB_TABLE");
+
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_SUCCESS, 1);
+ mff = runner.getFlowFilesForRelationship(SelectHive3QL.REL_SUCCESS).get(0);
+ in = new ByteArrayInputStream(mff.toByteArray());
+
+ final GenericRecord record = getFirstRecordFromStream(in);
+
+ assertEquals(expectedSchema, record.getSchema());
+ assertEquals(ID, record.get("ID"));
+ assertEquals(NAME, record.get("NAME").toString());
+ assertEquals(BIRTH_DATE, record.get("BIRTH_DATE").toString());
+ assertEquals(BIG_NUMBER, record.get("BIG_NUMBER").toString());
+ assertEquals(CREATED_ON, record.get("CREATED_ON").toString());
+
+ runner.clearTransferState();
+ }
+
+ @Test
+ public void testAvroRecordCreatedWithLogicalTypesWhenSet() throws SQLException, IOException {
+ final Schema expectedSchema = SchemaBuilder.record("NiFi_SelectHiveQL_Record").namespace("any.data").fields()
+ .name("ID").type().unionOf().nullBuilder().endNull().and().intType().endUnion().noDefault()
+ .name("NAME").type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault()
+ .name("BIRTH_DATE").type().unionOf().nullBuilder().endNull().and()
+ .type(LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT))).endUnion().noDefault()
+ .name("BIG_NUMBER").type().unionOf().nullBuilder().endNull().and()
+ .type(LogicalTypes.decimal(10, 2).addToSchema(Schema.create(Schema.Type.BYTES))).endUnion().noDefault()
+ .name("CREATED_ON").type().unionOf().nullBuilder().endNull().and()
+ .type(LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG))).endUnion().noDefault()
+ .endRecord();
+
+ final int expectedBirthDate = (int) LocalDate.parse(BIRTH_DATE).toEpochDay();
+ final BigDecimal decimal = new BigDecimal(BIG_NUMBER).setScale(2, BigDecimal.ROUND_HALF_EVEN);
+ final ByteBuffer expectedBigNumber = ByteBuffer.wrap(decimal.unscaledValue().toByteArray());
+ final Timestamp timestamp = Timestamp.valueOf(CREATED_ON);
+ final long expectedCreatedOn = timestamp.getTime();
+
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+ final Statement stmt = con.createStatement();
+ final InputStream in;
+ final MockFlowFile mff;
+
+ try {
+ stmt.execute("drop table TEST_QUERY_DB_TABLE");
+ } catch (final SQLException sqle) {
+ // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+ }
+
+ stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), birth_date date, big_number decimal(10,2),created_on timestamp)");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, birth_date, big_number, created_on) VALUES (" +
+ ID + ", '" + NAME + "', '" + BIRTH_DATE + "', " + BIG_NUMBER + ", '" + CREATED_ON + "')");
+
+ runner.setIncomingConnection(false);
+ runner.setProperty(SelectHive3QL.HIVEQL_SELECT_QUERY, "SELECT * FROM TEST_QUERY_DB_TABLE");
+ runner.setProperty(SelectHive3QL.USE_AVRO_LOGICAL_TYPES, "true");
+
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_SUCCESS, 1);
+ mff = runner.getFlowFilesForRelationship(SelectHive3QL.REL_SUCCESS).get(0);
+ in = new ByteArrayInputStream(mff.toByteArray());
+
+ final GenericRecord record = getFirstRecordFromStream(in);
+
+ assertEquals(expectedSchema, record.getSchema());
+ assertEquals(ID, record.get("ID"));
+ assertEquals(NAME, record.get("NAME").toString());
+ assertEquals(expectedBirthDate, record.get("BIRTH_DATE"));
+ assertEquals(expectedBigNumber, record.get("BIG_NUMBER"));
+ assertEquals(expectedCreatedOn, record.get("CREATED_ON"));
+
+
+ runner.clearTransferState();
+ }
+
private long getNumberOfRecordsFromStream(InputStream in) throws IOException {
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
try (DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(in, datumReader)) {
@@ -707,6 +824,13 @@ public class TestSelectHive3QL {
}
}
+ private GenericRecord getFirstRecordFromStream(InputStream in) throws IOException {
+ final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
+ try (DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(in, datumReader)) {
+ return dataFileReader.next();
+ }
+ }
+
/**
* Simple implementation only for SelectHive3QL processor testing.
*/