You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mt...@apache.org on 2018/09/25 13:58:28 UTC

nifi git commit: NIFI-5591 - Added avro compression format to ExecuteSQL

Repository: nifi
Updated Branches:
  refs/heads/master 030129c7c -> 78c4e223f


NIFI-5591 - Added avro compression format to ExecuteSQL

This closes #3023

Signed-off-by: Mike Thomsen <mi...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/78c4e223
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/78c4e223
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/78c4e223

Branch: refs/heads/master
Commit: 78c4e223fcaf78a819e04f8b6fa6541bfff2782f
Parents: 030129c
Author: Pierre Villard <pi...@gmail.com>
Authored: Sun Sep 23 21:42:26 2018 +0200
Committer: Mike Thomsen <mi...@gmail.com>
Committed: Tue Sep 25 09:25:13 2018 -0400

----------------------------------------------------------------------
 .../nifi/processors/standard/ExecuteSQL.java    | 15 ++++++
 .../nifi/processors/standard/util/AvroUtil.java | 51 ++++++++++++++++++++
 .../processors/standard/util/JdbcCommon.java    | 16 +++++-
 .../processors/standard/TestExecuteSQL.java     | 37 ++++++++++++++
 4 files changed, 117 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/78c4e223/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
index b8c48d9..df82e2e 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
@@ -55,6 +55,7 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.util.AvroUtil.CodecType;
 import org.apache.nifi.processors.standard.util.JdbcCommon;
 import org.apache.nifi.util.StopWatch;
 
@@ -187,6 +188,16 @@ public class ExecuteSQL extends AbstractProcessor {
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
+    public static final PropertyDescriptor COMPRESSION_FORMAT = new PropertyDescriptor.Builder()
+            .name("compression-format")
+            .displayName("Compression Format")
+            .description("Compression type to use when writing Avro files. Default is None.")
+            .allowableValues(CodecType.values())
+            .defaultValue(CodecType.NONE.toString())
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .required(true)
+            .build();
+
     private final List<PropertyDescriptor> propDescriptors;
 
     public ExecuteSQL() {
@@ -205,6 +216,7 @@ public class ExecuteSQL extends AbstractProcessor {
         pds.add(DEFAULT_SCALE);
         pds.add(MAX_ROWS_PER_FLOW_FILE);
         pds.add(OUTPUT_BATCH_SIZE);
+        pds.add(COMPRESSION_FORMAT);
         propDescriptors = Collections.unmodifiableList(pds);
     }
 
@@ -255,6 +267,8 @@ public class ExecuteSQL extends AbstractProcessor {
         final int outputBatchSize = outputBatchSizeField == null ? 0 : outputBatchSizeField;
         final Integer defaultPrecision = context.getProperty(DEFAULT_PRECISION).evaluateAttributeExpressions(fileToProcess).asInteger();
         final Integer defaultScale = context.getProperty(DEFAULT_SCALE).evaluateAttributeExpressions(fileToProcess).asInteger();
+        final String codec = context.getProperty(COMPRESSION_FORMAT).getValue();
+
         final String selectQuery;
         if (context.getProperty(SQL_SELECT_QUERY).isSet()) {
             selectQuery = context.getProperty(SQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
@@ -300,6 +314,7 @@ public class ExecuteSQL extends AbstractProcessor {
                                 .defaultPrecision(defaultPrecision)
                                 .defaultScale(defaultScale)
                                 .maxRows(maxRowsPerFlowFile)
+                                .codecFactory(codec)
                                 .build();
 
                         do {

http://git-wip-us.apache.org/repos/asf/nifi/blob/78c4e223/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/AvroUtil.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/AvroUtil.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/AvroUtil.java
new file mode 100644
index 0000000..970c7c2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/AvroUtil.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.util;
+
+import org.apache.avro.file.CodecFactory;
+
+/**
+ * Avro Utils.
+ */
+public class AvroUtil {
+
+    public static enum CodecType {
+        BZIP2,
+        DEFLATE,
+        NONE,
+        SNAPPY,
+        LZO
+    }
+
+    public static CodecFactory getCodecFactory(String property) {
+        CodecType type = CodecType.valueOf(property);
+        switch (type) {
+            case BZIP2:
+                return CodecFactory.bzip2Codec();
+            case DEFLATE:
+                return CodecFactory.deflateCodec(CodecFactory.DEFAULT_DEFLATE_LEVEL);
+            case LZO:
+                return CodecFactory.xzCodec(CodecFactory.DEFAULT_XZ_LEVEL);
+            case SNAPPY:
+                return CodecFactory.snappyCodec();
+            case NONE:
+            default:
+                return CodecFactory.nullCodec();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/78c4e223/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
index 88c8cda..03761c6 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
@@ -90,6 +90,7 @@ import org.apache.avro.SchemaBuilder.BaseTypeBuilder;
 import org.apache.avro.SchemaBuilder.FieldAssembler;
 import org.apache.avro.SchemaBuilder.NullDefault;
 import org.apache.avro.SchemaBuilder.UnionAccumulator;
+import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
@@ -211,20 +212,24 @@ public class JdbcCommon {
     }
 
     public static class AvroConversionOptions {
+
         private final String recordName;
         private final int maxRows;
         private final boolean convertNames;
         private final boolean useLogicalTypes;
         private final int defaultPrecision;
         private final int defaultScale;
+        private final CodecFactory codec;
 
-        private AvroConversionOptions(String recordName, int maxRows, boolean convertNames, boolean useLogicalTypes, int defaultPrecision, int defaultScale) {
+        private AvroConversionOptions(String recordName, int maxRows, boolean convertNames, boolean useLogicalTypes,
+                int defaultPrecision, int defaultScale, CodecFactory codec) {
             this.recordName = recordName;
             this.maxRows = maxRows;
             this.convertNames = convertNames;
             this.useLogicalTypes = useLogicalTypes;
             this.defaultPrecision = defaultPrecision;
             this.defaultScale = defaultScale;
+            this.codec = codec;
         }
 
         public static Builder builder() {
@@ -238,6 +243,7 @@ public class JdbcCommon {
             private boolean useLogicalTypes = false;
             private int defaultPrecision = DEFAULT_PRECISION_VALUE;
             private int defaultScale = DEFAULT_SCALE_VALUE;
+            private CodecFactory codec = CodecFactory.nullCodec();
 
             /**
              * Specify a priori record name to use if it cannot be determined from the result set.
@@ -272,8 +278,13 @@ public class JdbcCommon {
                 return this;
             }
 
+            public Builder codecFactory(String codec) {
+                this.codec = AvroUtil.getCodecFactory(codec);
+                return this;
+            }
+
             public AvroConversionOptions build() {
-                return new AvroConversionOptions(recordName, maxRows, convertNames, useLogicalTypes, defaultPrecision, defaultScale);
+                return new AvroConversionOptions(recordName, maxRows, convertNames, useLogicalTypes, defaultPrecision, defaultScale, codec);
             }
         }
     }
@@ -285,6 +296,7 @@ public class JdbcCommon {
 
         final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
         try (final DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter)) {
+            dataFileWriter.setCodec(options.codec);
             dataFileWriter.create(schema, outStream);
 
             final ResultSetMetaData meta = rs.getMetaData();

http://git-wip-us.apache.org/repos/asf/nifi/blob/78c4e223/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
index 10d9edd..33633f2 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
@@ -35,14 +35,17 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.avro.file.DataFileConstants;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumReader;
+import org.apache.commons.compress.compressors.CompressorException;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.dbcp.DBCPService;
 import org.apache.nifi.flowfile.attributes.FragmentAttributes;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.standard.util.AvroUtil;
 import org.apache.nifi.processors.standard.util.TestJdbcHugeStream;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.MockFlowFile;
@@ -200,6 +203,40 @@ public class TestExecuteSQL {
     }
 
     @Test
+    public void testCompression() throws SQLException, CompressorException, IOException {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_NULL_INT");
+        } catch (final SQLException sqle) {
+        }
+
+        stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
+
+        stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (0, NULL, 1)");
+        stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (1, 1, 1)");
+
+        runner.setIncomingConnection(false);
+        runner.setProperty(ExecuteSQL.COMPRESSION_FORMAT, AvroUtil.CodecType.BZIP2.name());
+        runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "SELECT * FROM TEST_NULL_INT");
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
+
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0);
+
+        try (DataFileStream<GenericRecord> dfs = new DataFileStream<>(new ByteArrayInputStream(flowFile.toByteArray()), new GenericDatumReader<GenericRecord>())) {
+            assertEquals(AvroUtil.CodecType.BZIP2.name().toLowerCase(), dfs.getMetaString(DataFileConstants.CODEC).toLowerCase());
+        }
+    }
+
+    @Test
     public void testWithOutputBatching() throws SQLException {
         // remove previous test database, if any
         final File dbLocation = new File(DB_LOCATION);