You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2021/12/14 17:15:32 UTC

[nifi] 14/15: NIFI-9185 Add Avro logical type to SelectHive3QL processor

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.15
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 2f9963a534d55df2608fe4efaa0841bee7bba393
Author: Timea Barna <ti...@gmail.com>
AuthorDate: Tue Aug 31 13:44:34 2021 +0200

    NIFI-9185 Add Avro logical type to SelectHive3QL processor
    
    Modifying unit test to avoid systemdefault timezone usuage
    
    NIFI-9185 Applying review recommendations removing duplicate dependency from pom.xml
    
    Signed-off-by: Matthew Burgess <ma...@apache.org>
    
    This closes #5358
---
 .../apache/nifi/processors/hive/SelectHive3QL.java |  19 +++-
 .../org/apache/nifi/util/hive/HiveJdbcCommon.java  |  72 ++++++++++--
 .../nifi/processors/hive/TestSelectHive3QL.java    | 124 +++++++++++++++++++++
 3 files changed, 202 insertions(+), 13 deletions(-)

diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/SelectHive3QL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/SelectHive3QL.java
index f124c73..af87bd1 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/SelectHive3QL.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/SelectHive3QL.java
@@ -236,6 +236,21 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
             .expressionLanguageSupported(ExpressionLanguageScope.NONE)
             .build();
 
+    public static final PropertyDescriptor USE_AVRO_LOGICAL_TYPES = new PropertyDescriptor.Builder()
+            .name("use-logical-types")
+            .displayName("Use Avro Logical Types")
+            .description("Whether to use Avro Logical Types for DECIMAL, DATE and TIMESTAMP columns. "
+                    + "If disabled, written as string. "
+                    + "If enabled, Logical types are used and written as its underlying type, specifically, "
+                    + "DECIMAL as logical 'decimal': written as bytes with additional precision and scale meta data, "
+                    + "DATE as logical 'date': written as int denoting days since Unix epoch (1970-01-01), "
+                    + "and TIMESTAMP as logical 'timestamp-millis': written as long denoting milliseconds since Unix epoch. "
+                    + "If a reader of written Avro records also knows these logical types, then these values can be deserialized with more context depending on reader implementation.")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .required(true)
+            .build();
+
     private final static List<PropertyDescriptor> propertyDescriptors;
     private final static Set<Relationship> relationships;
 
@@ -255,6 +270,7 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
         _propertyDescriptors.add(MAX_FRAGMENTS);
         _propertyDescriptors.add(HIVEQL_OUTPUT_FORMAT);
         _propertyDescriptors.add(NORMALIZE_NAMES_FOR_AVRO);
+        _propertyDescriptors.add(USE_AVRO_LOGICAL_TYPES);
         _propertyDescriptors.add(HIVEQL_CSV_HEADER);
         _propertyDescriptors.add(HIVEQL_CSV_ALT_HEADER);
         _propertyDescriptors.add(HIVEQL_CSV_DELIMITER);
@@ -344,6 +360,7 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
         final String delimiter = context.getProperty(HIVEQL_CSV_DELIMITER).evaluateAttributeExpressions(fileToProcess).getValue();
         final boolean quote = context.getProperty(HIVEQL_CSV_QUOTE).asBoolean();
         final boolean escape = context.getProperty(HIVEQL_CSV_HEADER).asBoolean();
+        final boolean useLogicalTypes = context.getProperty(USE_AVRO_LOGICAL_TYPES).asBoolean();
         final String fragmentIdentifier = UUID.randomUUID().toString();
 
         try (final Connection con = dbcpService.getConnection(fileToProcess == null ? Collections.emptyMap() : fileToProcess.getAttributes());
@@ -411,7 +428,7 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
                         flowfile = session.write(flowfile, out -> {
                             try {
                                 if (AVRO.equals(outputFormat)) {
-                                    nrOfRows.set(HiveJdbcCommon.convertToAvroStream(resultSet, out, maxRowsPerFlowFile, convertNamesForAvro));
+                                    nrOfRows.set(HiveJdbcCommon.convertToAvroStream(resultSet, out, maxRowsPerFlowFile, convertNamesForAvro, useLogicalTypes));
                                 } else if (CSV.equals(outputFormat)) {
                                     CsvOutputOptions options = new CsvOutputOptions(header, altHeader, delimiter, quote, escape, maxRowsPerFlowFile);
                                     nrOfRows.set(HiveJdbcCommon.convertToCsvStream(resultSet, out, options));
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/util/hive/HiveJdbcCommon.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/util/hive/HiveJdbcCommon.java
index 09eecce..2a704c0 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/util/hive/HiveJdbcCommon.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/util/hive/HiveJdbcCommon.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.util.hive;
 
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
 import org.apache.avro.SchemaBuilder.FieldAssembler;
@@ -29,6 +30,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.nifi.avro.AvroTypeUtil;
 import org.apache.nifi.components.PropertyDescriptor;
 
 import java.io.IOException;
@@ -88,6 +90,10 @@ public class HiveJdbcCommon {
     public static final String MIME_TYPE_AVRO_BINARY = "application/avro-binary";
     public static final String CSV_MIME_TYPE = "text/csv";
 
+    private static final Schema TIMESTAMP_MILLIS_SCHEMA = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG));
+    private static final Schema DATE_SCHEMA = LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT));
+    private static final int DEFAULT_PRECISION = 10;
+    private static final int DEFAULT_SCALE = 0;
 
     public static final PropertyDescriptor NORMALIZE_NAMES_FOR_AVRO = new PropertyDescriptor.Builder()
             .name("hive-normalize-avro")
@@ -99,14 +105,15 @@ public class HiveJdbcCommon {
             .required(true)
             .build();
 
-    public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, final int maxRows, boolean convertNames) throws SQLException, IOException {
-        return convertToAvroStream(rs, outStream, null, maxRows, convertNames, null);
+    public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, final int maxRows, boolean convertNames, final boolean useLogicalTypes) throws SQLException, IOException {
+        return convertToAvroStream(rs, outStream, null, maxRows, convertNames, null, useLogicalTypes);
     }
 
 
-    public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, String recordName, final int maxRows, boolean convertNames, ResultSetRowCallback callback)
+    public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, String recordName, final int maxRows, boolean convertNames,
+                                           ResultSetRowCallback callback, final boolean useLogicalTypes)
             throws SQLException, IOException {
-        final Schema schema = createSchema(rs, recordName, convertNames);
+        final Schema schema = createSchema(rs, recordName, convertNames, useLogicalTypes);
         final GenericRecord rec = new GenericData.Record(schema);
 
         final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
@@ -149,7 +156,16 @@ public class HiveJdbcCommon {
                         // org.apache.avro.AvroRuntimeException: Unknown datum type java.lang.Byte
                         rec.put(i - 1, ((Byte) value).intValue());
 
-                    } else if (value instanceof BigDecimal || value instanceof BigInteger) {
+                    } else if (value instanceof BigDecimal) {
+                        if (useLogicalTypes) {
+                            final int precision = getPrecision(meta.getPrecision(i));
+                            final int scale = getScale(meta.getScale(i));
+                            rec.put(i - 1, AvroTypeUtil.convertToAvroObject(value, LogicalTypes.decimal(precision, scale).addToSchema(Schema.create(Schema.Type.BYTES))));
+                        } else {
+                            rec.put(i - 1, value.toString());
+                        }
+
+                    } else if (value instanceof BigInteger) {
                         // Avro can't handle BigDecimal and BigInteger as numbers - it will throw an AvroRuntimeException such as: "Unknown datum type: java.math.BigDecimal: 38"
                         rec.put(i - 1, value.toString());
 
@@ -170,10 +186,14 @@ public class HiveJdbcCommon {
                         rec.put(i - 1, value);
                     } else if (value instanceof java.sql.SQLXML) {
                         rec.put(i - 1, ((java.sql.SQLXML) value).getString());
+                    } else if (useLogicalTypes && javaSqlType == DATE) {
+                        rec.put(i - 1, AvroTypeUtil.convertToAvroObject(value, DATE_SCHEMA));
+                    } else if (useLogicalTypes && javaSqlType == TIMESTAMP) {
+                        rec.put(i - 1, AvroTypeUtil.convertToAvroObject(value, TIMESTAMP_MILLIS_SCHEMA));
                     } else {
                         // The different types that we support are numbers (int, long, double, float),
-                        // as well as boolean values and Strings. Since Avro doesn't provide
-                        // timestamp types, we want to convert those to Strings. So we will cast anything other
+                        // as well as boolean values, decimal, date, timestamp and Strings. Since Avro doesn't provide
+                        // times type, we want to convert those to Strings. So we will cast anything other
                         // than numbers or booleans to strings by using the toString() method.
                         rec.put(i - 1, value.toString());
                     }
@@ -190,7 +210,7 @@ public class HiveJdbcCommon {
     }
 
     public static Schema createSchema(final ResultSet rs, boolean convertNames) throws SQLException {
-        return createSchema(rs, null, false);
+        return createSchema(rs, null, false, false);
     }
 
     /**
@@ -203,7 +223,7 @@ public class HiveJdbcCommon {
      * @return A Schema object representing the result set converted to an Avro record
      * @throws SQLException if any error occurs during conversion
      */
-    public static Schema createSchema(final ResultSet rs, String recordName, boolean convertNames) throws SQLException {
+    public static Schema createSchema(final ResultSet rs, String recordName, boolean convertNames, final boolean useLogicalTypes) throws SQLException {
         final ResultSetMetaData meta = rs.getMetaData();
         final int nrOfColumns = meta.getColumnCount();
         String tableName = StringUtils.isEmpty(recordName) ? "NiFi_SelectHiveQL_Record" : recordName;
@@ -298,14 +318,32 @@ public class HiveJdbcCommon {
                 // Did not find direct suitable type, need to be clarified!!!!
                 case DECIMAL:
                 case NUMERIC:
-                    builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
+                    if (useLogicalTypes) {
+                        final int precision = getPrecision(meta.getPrecision(i));
+                        final int scale = getScale(meta.getScale(i));
+                        builder.name(columnName).type().unionOf().nullBuilder().endNull().and()
+                                .type(LogicalTypes.decimal(precision, scale).addToSchema(Schema.create(Schema.Type.BYTES))).endUnion().noDefault();
+                    } else {
+                        builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
+                    }
                     break;
 
-                // Did not find direct suitable type, need to be clarified!!!!
+                // Dates were introduced in Hive 0.12.0
                 case DATE:
+                    if (useLogicalTypes) {
+                        builder.name(columnName).type().unionOf().nullBuilder().endNull().and().type(DATE_SCHEMA).endUnion().noDefault();
+                    } else {
+                        builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
+                    }
+                    break;
+                // Did not find direct suitable type, need to be clarified!!!!
                 case TIME:
                 case TIMESTAMP:
-                    builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
+                    if (useLogicalTypes) {
+                        builder.name(columnName).type().unionOf().nullBuilder().endNull().and().type(TIMESTAMP_MILLIS_SCHEMA).endUnion().noDefault();
+                    } else {
+                        builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
+                    }
                     break;
 
                 case BINARY:
@@ -461,4 +499,14 @@ public class HiveJdbcCommon {
         }
         return hiveConfig;
     }
+
+    //If data in result set contains invalid precision value use Hive default precision.
+    private static int getPrecision(int precision) {
+        return precision > 1 ? precision : DEFAULT_PRECISION;
+    }
+
+    //If data in result set contains invalid scale value use Hive default scale.
+    private static int getScale(int scale) {
+        return scale > 0 ? scale : DEFAULT_SCALE;
+    }
 }
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHive3QL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHive3QL.java
index 356106f..d319742 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHive3QL.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHive3QL.java
@@ -16,6 +16,9 @@
  */
 package org.apache.nifi.processors.hive;
 
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
@@ -44,11 +47,15 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.sql.Timestamp;
 import java.sql.Types;
+import java.time.LocalDate;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -67,6 +74,11 @@ public class TestSelectHive3QL {
     private static final Logger LOGGER;
     private final static String MAX_ROWS_KEY = "maxRows";
     private final int NUM_OF_ROWS = 100;
+    private static final int ID = 1;
+    private static final String NAME = "Joe Smith";
+    private static final String BIRTH_DATE = "1956-11-22";
+    private static final String BIG_NUMBER = "12345678.12";
+    private static final String CREATED_ON = "1962-09-23 03:23:34.234";
 
 
     static {
@@ -690,6 +702,111 @@ public class TestSelectHive3QL {
         runner.clearTransferState();
     }
 
+    @Test
+    public void testAvroRecordCreatedWithoutLogicalTypesByDefault() throws SQLException, IOException {
+        final Schema expectedSchema = SchemaBuilder.record("NiFi_SelectHiveQL_Record").namespace("any.data").fields()
+                .name("ID").type().unionOf().nullBuilder().endNull().and().intType().endUnion().noDefault()
+                .name("NAME").type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault()
+                .name("BIRTH_DATE").type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault()
+                .name("BIG_NUMBER").type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault()
+                .name("CREATED_ON").type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault()
+                .endRecord();
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        final Statement stmt = con.createStatement();
+        final InputStream in;
+        final MockFlowFile mff;
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), birth_date date, big_number decimal(10,2),created_on timestamp)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, birth_date, big_number, created_on) VALUES (" +
+                ID + ", '" + NAME + "', '" + BIRTH_DATE + "', " + BIG_NUMBER + ", '" + CREATED_ON + "')");
+
+        runner.setIncomingConnection(false);
+        runner.setProperty(SelectHive3QL.HIVEQL_SELECT_QUERY, "SELECT * FROM TEST_QUERY_DB_TABLE");
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_SUCCESS, 1);
+        mff = runner.getFlowFilesForRelationship(SelectHive3QL.REL_SUCCESS).get(0);
+        in = new ByteArrayInputStream(mff.toByteArray());
+
+        final GenericRecord record = getFirstRecordFromStream(in);
+
+        assertEquals(expectedSchema, record.getSchema());
+        assertEquals(ID, record.get("ID"));
+        assertEquals(NAME, record.get("NAME").toString());
+        assertEquals(BIRTH_DATE, record.get("BIRTH_DATE").toString());
+        assertEquals(BIG_NUMBER, record.get("BIG_NUMBER").toString());
+        assertEquals(CREATED_ON, record.get("CREATED_ON").toString());
+
+        runner.clearTransferState();
+    }
+
+    @Test
+    public void testAvroRecordCreatedWithLogicalTypesWhenSet() throws SQLException, IOException {
+        final Schema expectedSchema = SchemaBuilder.record("NiFi_SelectHiveQL_Record").namespace("any.data").fields()
+                .name("ID").type().unionOf().nullBuilder().endNull().and().intType().endUnion().noDefault()
+                .name("NAME").type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault()
+                .name("BIRTH_DATE").type().unionOf().nullBuilder().endNull().and()
+                .type(LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT))).endUnion().noDefault()
+                .name("BIG_NUMBER").type().unionOf().nullBuilder().endNull().and()
+                .type(LogicalTypes.decimal(10, 2).addToSchema(Schema.create(Schema.Type.BYTES))).endUnion().noDefault()
+                .name("CREATED_ON").type().unionOf().nullBuilder().endNull().and()
+                .type(LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG))).endUnion().noDefault()
+                .endRecord();
+
+        final int expectedBirthDate = (int) LocalDate.parse(BIRTH_DATE).toEpochDay();
+        final BigDecimal decimal = new BigDecimal(BIG_NUMBER).setScale(2, BigDecimal.ROUND_HALF_EVEN);
+        final ByteBuffer expectedBigNumber = ByteBuffer.wrap(decimal.unscaledValue().toByteArray());
+        final Timestamp timestamp = Timestamp.valueOf(CREATED_ON);
+        final long expectedCreatedOn = timestamp.getTime();
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        final Statement stmt = con.createStatement();
+        final InputStream in;
+        final MockFlowFile mff;
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), birth_date date, big_number decimal(10,2),created_on timestamp)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, birth_date, big_number, created_on) VALUES (" +
+                ID + ", '" + NAME + "', '" + BIRTH_DATE + "', " + BIG_NUMBER + ", '" + CREATED_ON + "')");
+
+        runner.setIncomingConnection(false);
+        runner.setProperty(SelectHive3QL.HIVEQL_SELECT_QUERY, "SELECT * FROM TEST_QUERY_DB_TABLE");
+        runner.setProperty(SelectHive3QL.USE_AVRO_LOGICAL_TYPES, "true");
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_SUCCESS, 1);
+        mff = runner.getFlowFilesForRelationship(SelectHive3QL.REL_SUCCESS).get(0);
+        in = new ByteArrayInputStream(mff.toByteArray());
+
+        final GenericRecord record = getFirstRecordFromStream(in);
+
+        assertEquals(expectedSchema, record.getSchema());
+        assertEquals(ID, record.get("ID"));
+        assertEquals(NAME, record.get("NAME").toString());
+        assertEquals(expectedBirthDate, record.get("BIRTH_DATE"));
+        assertEquals(expectedBigNumber, record.get("BIG_NUMBER"));
+        assertEquals(expectedCreatedOn, record.get("CREATED_ON"));
+
+
+        runner.clearTransferState();
+    }
+
     private long getNumberOfRecordsFromStream(InputStream in) throws IOException {
         final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
         try (DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(in, datumReader)) {
@@ -707,6 +824,13 @@ public class TestSelectHive3QL {
         }
     }
 
+    private GenericRecord getFirstRecordFromStream(InputStream in) throws IOException {
+        final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
+        try (DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(in, datumReader)) {
+            return dataFileReader.next();
+        }
+    }
+
     /**
      * Simple implementation only for SelectHive3QL processor testing.
      */