You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mt...@apache.org on 2018/09/25 13:58:28 UTC
nifi git commit: NIFI-5591 - Added avro compression format to
ExecuteSQL
Repository: nifi
Updated Branches:
refs/heads/master 030129c7c -> 78c4e223f
NIFI-5591 - Added avro compression format to ExecuteSQL
This closes #3023
Signed-off-by: Mike Thomsen <mi...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/78c4e223
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/78c4e223
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/78c4e223
Branch: refs/heads/master
Commit: 78c4e223fcaf78a819e04f8b6fa6541bfff2782f
Parents: 030129c
Author: Pierre Villard <pi...@gmail.com>
Authored: Sun Sep 23 21:42:26 2018 +0200
Committer: Mike Thomsen <mi...@gmail.com>
Committed: Tue Sep 25 09:25:13 2018 -0400
----------------------------------------------------------------------
.../nifi/processors/standard/ExecuteSQL.java | 15 ++++++
.../nifi/processors/standard/util/AvroUtil.java | 51 ++++++++++++++++++++
.../processors/standard/util/JdbcCommon.java | 16 +++++-
.../processors/standard/TestExecuteSQL.java | 37 ++++++++++++++
4 files changed, 117 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/78c4e223/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
index b8c48d9..df82e2e 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
@@ -55,6 +55,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.util.AvroUtil.CodecType;
import org.apache.nifi.processors.standard.util.JdbcCommon;
import org.apache.nifi.util.StopWatch;
@@ -187,6 +188,16 @@ public class ExecuteSQL extends AbstractProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
+ public static final PropertyDescriptor COMPRESSION_FORMAT = new PropertyDescriptor.Builder()
+ .name("compression-format")
+ .displayName("Compression Format")
+ .description("Compression type to use when writing Avro files. Default is None.")
+ .allowableValues(CodecType.values())
+ .defaultValue(CodecType.NONE.toString())
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .required(true)
+ .build();
+
private final List<PropertyDescriptor> propDescriptors;
public ExecuteSQL() {
@@ -205,6 +216,7 @@ public class ExecuteSQL extends AbstractProcessor {
pds.add(DEFAULT_SCALE);
pds.add(MAX_ROWS_PER_FLOW_FILE);
pds.add(OUTPUT_BATCH_SIZE);
+ pds.add(COMPRESSION_FORMAT);
propDescriptors = Collections.unmodifiableList(pds);
}
@@ -255,6 +267,8 @@ public class ExecuteSQL extends AbstractProcessor {
final int outputBatchSize = outputBatchSizeField == null ? 0 : outputBatchSizeField;
final Integer defaultPrecision = context.getProperty(DEFAULT_PRECISION).evaluateAttributeExpressions(fileToProcess).asInteger();
final Integer defaultScale = context.getProperty(DEFAULT_SCALE).evaluateAttributeExpressions(fileToProcess).asInteger();
+ final String codec = context.getProperty(COMPRESSION_FORMAT).getValue();
+
final String selectQuery;
if (context.getProperty(SQL_SELECT_QUERY).isSet()) {
selectQuery = context.getProperty(SQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
@@ -300,6 +314,7 @@ public class ExecuteSQL extends AbstractProcessor {
.defaultPrecision(defaultPrecision)
.defaultScale(defaultScale)
.maxRows(maxRowsPerFlowFile)
+ .codecFactory(codec)
.build();
do {
http://git-wip-us.apache.org/repos/asf/nifi/blob/78c4e223/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/AvroUtil.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/AvroUtil.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/AvroUtil.java
new file mode 100644
index 0000000..970c7c2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/AvroUtil.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.util;
+
+import org.apache.avro.file.CodecFactory;
+
+/**
+ * Avro Utils.
+ */
+public class AvroUtil {
+
+ public static enum CodecType {
+ BZIP2,
+ DEFLATE,
+ NONE,
+ SNAPPY,
+ LZO
+ }
+
+ public static CodecFactory getCodecFactory(String property) {
+ CodecType type = CodecType.valueOf(property);
+ switch (type) {
+ case BZIP2:
+ return CodecFactory.bzip2Codec();
+ case DEFLATE:
+ return CodecFactory.deflateCodec(CodecFactory.DEFAULT_DEFLATE_LEVEL);
+ case LZO:
+ return CodecFactory.xzCodec(CodecFactory.DEFAULT_XZ_LEVEL);
+ case SNAPPY:
+ return CodecFactory.snappyCodec();
+ case NONE:
+ default:
+ return CodecFactory.nullCodec();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/78c4e223/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
index 88c8cda..03761c6 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
@@ -90,6 +90,7 @@ import org.apache.avro.SchemaBuilder.BaseTypeBuilder;
import org.apache.avro.SchemaBuilder.FieldAssembler;
import org.apache.avro.SchemaBuilder.NullDefault;
import org.apache.avro.SchemaBuilder.UnionAccumulator;
+import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
@@ -211,20 +212,24 @@ public class JdbcCommon {
}
public static class AvroConversionOptions {
+
private final String recordName;
private final int maxRows;
private final boolean convertNames;
private final boolean useLogicalTypes;
private final int defaultPrecision;
private final int defaultScale;
+ private final CodecFactory codec;
- private AvroConversionOptions(String recordName, int maxRows, boolean convertNames, boolean useLogicalTypes, int defaultPrecision, int defaultScale) {
+ private AvroConversionOptions(String recordName, int maxRows, boolean convertNames, boolean useLogicalTypes,
+ int defaultPrecision, int defaultScale, CodecFactory codec) {
this.recordName = recordName;
this.maxRows = maxRows;
this.convertNames = convertNames;
this.useLogicalTypes = useLogicalTypes;
this.defaultPrecision = defaultPrecision;
this.defaultScale = defaultScale;
+ this.codec = codec;
}
public static Builder builder() {
@@ -238,6 +243,7 @@ public class JdbcCommon {
private boolean useLogicalTypes = false;
private int defaultPrecision = DEFAULT_PRECISION_VALUE;
private int defaultScale = DEFAULT_SCALE_VALUE;
+ private CodecFactory codec = CodecFactory.nullCodec();
/**
* Specify a priori record name to use if it cannot be determined from the result set.
@@ -272,8 +278,13 @@ public class JdbcCommon {
return this;
}
+ public Builder codecFactory(String codec) {
+ this.codec = AvroUtil.getCodecFactory(codec);
+ return this;
+ }
+
public AvroConversionOptions build() {
- return new AvroConversionOptions(recordName, maxRows, convertNames, useLogicalTypes, defaultPrecision, defaultScale);
+ return new AvroConversionOptions(recordName, maxRows, convertNames, useLogicalTypes, defaultPrecision, defaultScale, codec);
}
}
}
@@ -285,6 +296,7 @@ public class JdbcCommon {
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
try (final DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter)) {
+ dataFileWriter.setCodec(options.codec);
dataFileWriter.create(schema, outStream);
final ResultSetMetaData meta = rs.getMetaData();
http://git-wip-us.apache.org/repos/asf/nifi/blob/78c4e223/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
index 10d9edd..33633f2 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
@@ -35,14 +35,17 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.avro.file.DataFileConstants;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
+import org.apache.commons.compress.compressors.CompressorException;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.flowfile.attributes.FragmentAttributes;
import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.standard.util.AvroUtil;
import org.apache.nifi.processors.standard.util.TestJdbcHugeStream;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
@@ -200,6 +203,40 @@ public class TestExecuteSQL {
}
@Test
+ public void testCompression() throws SQLException, CompressorException, IOException {
+ // remove previous test database, if any
+ final File dbLocation = new File(DB_LOCATION);
+ dbLocation.delete();
+
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+
+ try {
+ stmt.execute("drop table TEST_NULL_INT");
+ } catch (final SQLException sqle) {
+ }
+
+ stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
+
+ stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (0, NULL, 1)");
+ stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (1, 1, 1)");
+
+ runner.setIncomingConnection(false);
+ runner.setProperty(ExecuteSQL.COMPRESSION_FORMAT, AvroUtil.CodecType.BZIP2.name());
+ runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "SELECT * FROM TEST_NULL_INT");
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
+
+ MockFlowFile flowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0);
+
+ try (DataFileStream<GenericRecord> dfs = new DataFileStream<>(new ByteArrayInputStream(flowFile.toByteArray()), new GenericDatumReader<GenericRecord>())) {
+ assertEquals(AvroUtil.CodecType.BZIP2.name().toLowerCase(), dfs.getMetaString(DataFileConstants.CODEC).toLowerCase());
+ }
+ }
+
+ @Test
public void testWithOutputBatching() throws SQLException {
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);