You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2019/03/14 00:34:30 UTC
[nifi] 01/02: NIFI-6082: Added DatabaseRecordLookupService,
refactored common DB utils
This is an automated email from the ASF dual-hosted git repository.
ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
commit ca76fe178cfae8890b347081260cd59a62321219
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Wed Feb 27 16:17:46 2019 -0500
NIFI-6082: Added DatabaseRecordLookupService, refactored common DB utils
NIFI-6082: Added SimpleDatabaseLookupService
NIFI-6082: Merged Koji's improvements, incorporated review comments
This closes #3341.
Signed-off-by: Koji Kawamura <ij...@apache.org>
---
.../serialization/record/ResultSetRecordSet.java | 18 +-
.../{ => nifi-database-test-utils}/pom.xml | 26 +--
.../apache/nifi/util/db/SimpleCommerceDataSet.java | 112 ++++++++++
.../nifi-database-utils/pom.xml | 101 +++++++++
.../java/org/apache/nifi/util/db}/AvroUtil.java | 2 +-
.../java/org/apache/nifi/util/db}/JdbcCommon.java | 71 +------
.../apache/nifi/util/db}/JdbcCommonTestUtils.java | 2 +-
.../org/apache/nifi/util/db}/TestJdbcCommon.java | 18 +-
.../nifi/util/db}/TestJdbcCommonConvertToAvro.java | 8 +-
.../apache/nifi/util/db}/TestJdbcHugeStream.java | 2 +-
.../apache/nifi/util/db}/TestJdbcTypesDerby.java | 2 +-
.../org/apache/nifi/util/db}/TestJdbcTypesH2.java | 2 +-
nifi-nar-bundles/nifi-extension-utils/pom.xml | 2 +
.../nifi-standard-processors/pom.xml | 11 +
.../processors/standard/AbstractExecuteSQL.java | 2 +-
.../standard/AbstractQueryDatabaseTable.java | 2 +-
.../nifi/processors/standard/ExecuteSQL.java | 12 +-
.../nifi/processors/standard/ExecuteSQLRecord.java | 5 +-
.../nifi/processors/standard/LookupRecord.java | 5 +-
.../apache/nifi/processors/standard/PutSQL.java | 2 +-
.../processors/standard/QueryDatabaseTable.java | 10 +-
.../standard/QueryDatabaseTableRecord.java | 4 +-
.../standard/sql/DefaultAvroSqlWriter.java | 12 +-
.../processors/standard/sql/RecordSqlWriter.java | 14 +-
.../nifi/processors/standard/sql/SqlWriter.java | 6 +-
.../processors/standard/util/JdbcProperties.java | 81 ++++++++
.../nifi/processors/standard/TestExecuteSQL.java | 6 +-
.../processors/standard/TestExecuteSQLRecord.java | 4 +-
.../nifi-lookup-services/pom.xml | 29 ++-
.../lookup/db/AbstractDatabaseLookupService.java | 104 ++++++++++
.../lookup/db/DatabaseRecordLookupService.java | 206 ++++++++++++++++++
.../lookup/db/SimpleDatabaseLookupService.java | 174 ++++++++++++++++
.../org.apache.nifi.controller.ControllerService | 2 +
.../db/TestDatabaseRecordLookupService.groovy | 229 +++++++++++++++++++++
.../db/TestSimpleDatabaseLookupService.groovy | 184 +++++++++++++++++
35 files changed, 1322 insertions(+), 148 deletions(-)
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
index ee47c63..fc3d60f 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
@@ -55,9 +55,21 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
private static final String FLOAT_CLASS_NAME = Float.class.getName();
public ResultSetRecordSet(final ResultSet rs, final RecordSchema readerSchema) throws SQLException {
+ this(rs, readerSchema, false);
+ }
+
+ /**
+ * Constructs a ResultSetRecordSet with a given ResultSet and RecordSchema
+ *
+ * @param rs The underlying ResultSet for this RecordSet
+ * @param readerSchema The schema to which this RecordSet adheres
+ * @param allFieldsNullable Whether to override the database column's "nullable" metadata. If true then all fields in the RecordSet are nullable.
+ * @throws SQLException if an error occurs while creating the schema or reading the result set's metadata
+ */
+ public ResultSetRecordSet(final ResultSet rs, final RecordSchema readerSchema, boolean allFieldsNullable) throws SQLException {
this.rs = rs;
moreRows = rs.next();
- this.schema = createSchema(rs, readerSchema);
+ this.schema = createSchema(rs, readerSchema, allFieldsNullable);
rsColumnNames = new HashSet<>();
final ResultSetMetaData metadata = rs.getMetaData();
@@ -140,7 +152,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
return value;
}
- private static RecordSchema createSchema(final ResultSet rs, final RecordSchema readerSchema) throws SQLException {
+ private static RecordSchema createSchema(final ResultSet rs, final RecordSchema readerSchema, boolean allFieldsNullable) throws SQLException {
final ResultSetMetaData metadata = rs.getMetaData();
final int numCols = metadata.getColumnCount();
final List<RecordField> fields = new ArrayList<>(numCols);
@@ -154,7 +166,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
final int nullableFlag = metadata.isNullable(column);
final boolean nullable;
- if (nullableFlag == ResultSetMetaData.columnNoNulls) {
+ if (nullableFlag == ResultSetMetaData.columnNoNulls && !allFieldsNullable) {
nullable = false;
} else {
nullable = true;
diff --git a/nifi-nar-bundles/nifi-extension-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/nifi-database-test-utils/pom.xml
similarity index 55%
copy from nifi-nar-bundles/nifi-extension-utils/pom.xml
copy to nifi-nar-bundles/nifi-extension-utils/nifi-database-test-utils/pom.xml
index b2c8f51..cf63b5e 100644
--- a/nifi-nar-bundles/nifi-extension-utils/pom.xml
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-test-utils/pom.xml
@@ -1,4 +1,4 @@
-<?xml version="1.0"?>
+<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
@@ -13,25 +13,19 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
- <artifactId>nifi-nar-bundles</artifactId>
+ <artifactId>nifi-extension-utils</artifactId>
<version>1.10.0-SNAPSHOT</version>
</parent>
- <packaging>pom</packaging>
- <artifactId>nifi-extension-utils</artifactId>
- <description>
- This module contains reusable utilities related to extensions that can be shared across NARs.
- </description>
- <modules>
- <module>nifi-record-utils</module>
- <module>nifi-hadoop-utils</module>
- <module>nifi-processor-utils</module>
- <module>nifi-reporting-utils</module>
- <module>nifi-syslog-utils</module>
- </modules>
+ <artifactId>nifi-database-test-utils</artifactId>
-</project>
+ <dependencies>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-database-test-utils/src/main/java/org/apache/nifi/util/db/SimpleCommerceDataSet.java b/nifi-nar-bundles/nifi-extension-utils/nifi-database-test-utils/src/main/java/org/apache/nifi/util/db/SimpleCommerceDataSet.java
new file mode 100644
index 0000000..89c2600
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-test-utils/src/main/java/org/apache/nifi/util/db/SimpleCommerceDataSet.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.db;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Random;
+
+/**
+ * A sample data set for test consists of 'persons', 'products' and 'relationships' tables.
+ */
+public class SimpleCommerceDataSet {
+
+ static String dropPersons = "drop table persons";
+ static String dropProducts = "drop table products";
+ static String dropRelationships = "drop table relationships";
+ static String createPersons = "create table persons (id integer, name varchar(100), code integer)";
+ static String createProducts = "create table products (id integer, name varchar(100), code integer)";
+ static String createRelationships = "create table relationships (id integer,name varchar(100), code integer)";
+
+ public static void loadTestData2Database(Connection con, int nrOfPersons, int nrOfProducts, int nrOfRels) throws SQLException {
+
+ System.out.println(createRandomName());
+ System.out.println(createRandomName());
+ System.out.println(createRandomName());
+
+ final Statement st = con.createStatement();
+
+ // tables may not exist, this is not serious problem.
+ try {
+ st.executeUpdate(dropPersons);
+ } catch (final Exception ignored) {
+ }
+
+ try {
+ st.executeUpdate(dropProducts);
+ } catch (final Exception ignored) {
+ }
+
+ try {
+ st.executeUpdate(dropRelationships);
+ } catch (final Exception ignored) {
+ }
+
+ st.executeUpdate(createPersons);
+ st.executeUpdate(createProducts);
+ st.executeUpdate(createRelationships);
+
+ for (int i = 0; i < nrOfPersons; i++)
+ loadPersons(st, i);
+
+ for (int i = 0; i < nrOfProducts; i++)
+ loadProducts(st, i);
+
+ for (int i = 0; i < nrOfRels; i++)
+ loadRelationships(st, i);
+
+ st.close();
+ }
+
+ static Random rng = new Random(53495);
+
+ static private void loadPersons(Statement st, int nr) throws SQLException {
+ st.executeUpdate("insert into persons values (" + nr + ", '" + createRandomName() + "', " + rng.nextInt(469946) + ")");
+ }
+
+ static private void loadProducts(Statement st, int nr) throws SQLException {
+ st.executeUpdate("insert into products values (" + nr + ", '" + createRandomName() + "', " + rng.nextInt(469946) + ")");
+ }
+
+ static private void loadRelationships(Statement st, int nr) throws SQLException {
+ st.executeUpdate("insert into relationships values (" + nr + ", '" + createRandomName() + "', " + rng.nextInt(469946) + ")");
+ }
+
+ static private String createRandomName() {
+ return createRandomString() + " " + createRandomString();
+ }
+
+ static private String createRandomString() {
+
+ final int length = rng.nextInt(10);
+ final String characters = "ABCDEFGHIJ";
+
+ final char[] text = new char[length];
+ for (int i = 0; i < length; i++) {
+ text[i] = characters.charAt(rng.nextInt(characters.length()));
+ }
+ return new String(text);
+ }
+
+ private Connection createConnection(String location) throws ClassNotFoundException, SQLException {
+ Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+ return DriverManager.getConnection("jdbc:derby:" + location + ";create=true");
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/pom.xml
new file mode 100644
index 0000000..3cc62e8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/pom.xml
@@ -0,0 +1,101 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-extension-utils</artifactId>
+ <version>1.10.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>nifi-database-utils</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-standard-record-utils</artifactId>
+ <version>1.10.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-avro-record-utils</artifactId>
+ <version>1.10.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>com.github.ben-manes.caffeine</groupId>
+ <artifactId>caffeine</artifactId>
+ <version>2.6.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>3.8.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>1.8.1</version>
+ </dependency>
+ <!-- Other modules using nifi-database-utils are expected to have these APIs available, typically through a NAR dependency -->
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-mock</artifactId>
+ <version>1.10.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>2.6</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.derby</groupId>
+ <artifactId>derby</artifactId>
+ <version>10.11.1.1</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.h2database</groupId>
+ <artifactId>h2</artifactId>
+ <version>1.4.187</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <configuration>
+ <excludes combine.children="append">
+ <exclude>src/test/resources/org/apache/nifi/avro/data.avro</exclude>
+ <exclude>src/test/resources/org/apache/nifi/avro/schema.json</exclude>
+ <exclude>src/test/resources/org/apache/nifi/avro/simpleSchema.json</exclude>
+ <exclude>src/test/resources/org/apache/nifi/avro/defaultArrayValue1.json</exclude>
+ <exclude>src/test/resources/org/apache/nifi/avro/defaultArrayValue2.json</exclude>
+ <exclude>src/test/resources/org/apache/nifi/avro/defaultArrayInRecords1.json</exclude>
+ <exclude>src/test/resources/org/apache/nifi/avro/defaultArrayInRecords2.json</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/AvroUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/AvroUtil.java
similarity index 97%
rename from nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/AvroUtil.java
rename to nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/AvroUtil.java
index 970c7c2..8bb2261 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/AvroUtil.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/AvroUtil.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi.processors.standard.util;
+package org.apache.nifi.util.db;
import org.apache.avro.file.CodecFactory;
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/JdbcCommon.java
similarity index 90%
rename from nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
rename to nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/JdbcCommon.java
index 3de86c7..e41b3cb 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/JdbcCommon.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi.processors.standard.util;
+package org.apache.nifi.util.db;
import static java.sql.Types.ARRAY;
import static java.sql.Types.BIGINT;
@@ -100,12 +100,9 @@ import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
-import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.avro.AvroTypeUtil;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.processor.util.StandardValidators;
import javax.xml.bind.DatatypeConverter;
@@ -114,11 +111,11 @@ import javax.xml.bind.DatatypeConverter;
*/
public class JdbcCommon {
- private static final int MAX_DIGITS_IN_BIGINT = 19;
- private static final int MAX_DIGITS_IN_INT = 9;
+ public static final int MAX_DIGITS_IN_BIGINT = 19;
+ public static final int MAX_DIGITS_IN_INT = 9;
// Derived from MySQL default precision.
- private static final int DEFAULT_PRECISION_VALUE = 10;
- private static final int DEFAULT_SCALE_VALUE = 0;
+ public static final int DEFAULT_PRECISION_VALUE = 10;
+ public static final int DEFAULT_SCALE_VALUE = 0;
public static final Pattern LONG_PATTERN = Pattern.compile("^-?\\d{1,19}$");
public static final Pattern SQL_TYPE_ATTRIBUTE_PATTERN = Pattern.compile("sql\\.args\\.(\\d+)\\.type");
@@ -126,62 +123,6 @@ public class JdbcCommon {
public static final String MIME_TYPE_AVRO_BINARY = "application/avro-binary";
- public static final PropertyDescriptor NORMALIZE_NAMES_FOR_AVRO = new PropertyDescriptor.Builder()
- .name("dbf-normalize")
- .displayName("Normalize Table/Column Names")
- .description("Whether to change non-Avro-compatible characters in column names to Avro-compatible characters. For example, colons and periods "
- + "will be changed to underscores in order to build a valid Avro record.")
- .allowableValues("true", "false")
- .defaultValue("false")
- .required(true)
- .build();
-
- public static final PropertyDescriptor USE_AVRO_LOGICAL_TYPES = new PropertyDescriptor.Builder()
- .name("dbf-user-logical-types")
- .displayName("Use Avro Logical Types")
- .description("Whether to use Avro Logical Types for DECIMAL/NUMBER, DATE, TIME and TIMESTAMP columns. "
- + "If disabled, written as string. "
- + "If enabled, Logical types are used and written as its underlying type, specifically, "
- + "DECIMAL/NUMBER as logical 'decimal': written as bytes with additional precision and scale meta data, "
- + "DATE as logical 'date-millis': written as int denoting days since Unix epoch (1970-01-01), "
- + "TIME as logical 'time-millis': written as int denoting milliseconds since Unix epoch, "
- + "and TIMESTAMP as logical 'timestamp-millis': written as long denoting milliseconds since Unix epoch. "
- + "If a reader of written Avro records also knows these logical types, then these values can be deserialized with more context depending on reader implementation.")
- .allowableValues("true", "false")
- .defaultValue("false")
- .required(true)
- .build();
-
- public static final PropertyDescriptor DEFAULT_PRECISION = new PropertyDescriptor.Builder()
- .name("dbf-default-precision")
- .displayName("Default Decimal Precision")
- .description("When a DECIMAL/NUMBER value is written as a 'decimal' Avro logical type,"
- + " a specific 'precision' denoting number of available digits is required."
- + " Generally, precision is defined by column data type definition or database engines default."
- + " However undefined precision (0) can be returned from some database engines."
- + " 'Default Decimal Precision' is used when writing those undefined precision numbers.")
- .defaultValue(String.valueOf(DEFAULT_PRECISION_VALUE))
- .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .required(true)
- .build();
-
- public static final PropertyDescriptor DEFAULT_SCALE = new PropertyDescriptor.Builder()
- .name("dbf-default-scale")
- .displayName("Default Decimal Scale")
- .description("When a DECIMAL/NUMBER value is written as a 'decimal' Avro logical type,"
- + " a specific 'scale' denoting number of available decimal digits is required."
- + " Generally, scale is defined by column data type definition or database engines default."
- + " However when undefined precision (0) is returned, scale can also be uncertain with some database engines."
- + " 'Default Decimal Scale' is used when writing those undefined numbers."
- + " If a value has more decimals than specified scale, then the value will be rounded-up,"
- + " e.g. 1.53 becomes 2 with scale 0, and 1.5 with scale 1.")
- .defaultValue(String.valueOf(DEFAULT_SCALE_VALUE))
- .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .required(true)
- .build();
-
public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, boolean convertNames) throws SQLException, IOException {
return convertToAvroStream(rs, outStream, null, null, convertNames);
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/JdbcCommonTestUtils.java b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/JdbcCommonTestUtils.java
similarity index 97%
rename from nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/JdbcCommonTestUtils.java
rename to nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/JdbcCommonTestUtils.java
index ad57158..cc8d29e 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/JdbcCommonTestUtils.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/JdbcCommonTestUtils.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi.processors.standard.util;
+package org.apache.nifi.util.db;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcCommon.java
similarity index 97%
rename from nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java
rename to nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcCommon.java
index 9cf4fc1..fa584c0 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcCommon.java
@@ -14,10 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi.processors.standard.util;
+package org.apache.nifi.util.db;
-import static org.apache.nifi.processors.standard.util.JdbcCommonTestUtils.convertResultSetToAvroInputStream;
-import static org.apache.nifi.processors.standard.util.JdbcCommonTestUtils.resultSetReturningMetadata;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
@@ -427,7 +425,7 @@ public class TestJdbcCommon {
when(metadata.getPrecision(1)).thenReturn(dbPrecision);
when(metadata.getScale(1)).thenReturn(expectedScale);
- final ResultSet rs = resultSetReturningMetadata(metadata);
+ final ResultSet rs = JdbcCommonTestUtils.resultSetReturningMetadata(metadata);
when(rs.getObject(Mockito.anyInt())).thenReturn(bigDecimal);
@@ -580,12 +578,12 @@ public class TestJdbcCommon {
when(metadata.getColumnName(1)).thenReturn("t_int");
when(metadata.getTableName(1)).thenReturn("table");
- final ResultSet rs = resultSetReturningMetadata(metadata);
+ final ResultSet rs = JdbcCommonTestUtils.resultSetReturningMetadata(metadata);
final short s = 25;
when(rs.getObject(Mockito.anyInt())).thenReturn(s);
- final InputStream instream = convertResultSetToAvroInputStream(rs);
+ final InputStream instream = JdbcCommonTestUtils.convertResultSetToAvroInputStream(rs);
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
try (final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(instream, datumReader)) {
@@ -608,12 +606,12 @@ public class TestJdbcCommon {
when(metadata.getColumnName(1)).thenReturn(mockColumnName);
when(metadata.getTableName(1)).thenReturn("table");
- final ResultSet rs = resultSetReturningMetadata(metadata);
+ final ResultSet rs = JdbcCommonTestUtils.resultSetReturningMetadata(metadata);
final Long ret = 0L;
when(rs.getObject(Mockito.anyInt())).thenReturn(ret);
- final InputStream instream = convertResultSetToAvroInputStream(rs);
+ final InputStream instream = JdbcCommonTestUtils.convertResultSetToAvroInputStream(rs);
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
try (final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(instream, datumReader)) {
@@ -636,12 +634,12 @@ public class TestJdbcCommon {
when(metadata.getColumnName(1)).thenReturn(mockColumnName);
when(metadata.getTableName(1)).thenReturn("table");
- final ResultSet rs = resultSetReturningMetadata(metadata);
+ final ResultSet rs = JdbcCommonTestUtils.resultSetReturningMetadata(metadata);
final Long ret = 0L;
when(rs.getObject(Mockito.anyInt())).thenReturn(ret);
- final InputStream instream = convertResultSetToAvroInputStream(rs);
+ final InputStream instream = JdbcCommonTestUtils.convertResultSetToAvroInputStream(rs);
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
try (final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(instream, datumReader)) {
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommonConvertToAvro.java b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcCommonConvertToAvro.java
similarity index 92%
rename from nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommonConvertToAvro.java
rename to nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcCommonConvertToAvro.java
index eb736e2..e6f9743 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommonConvertToAvro.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcCommonConvertToAvro.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi.processors.standard.util;
+package org.apache.nifi.util.db;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
@@ -40,8 +40,6 @@ import static java.sql.Types.INTEGER;
import static java.sql.Types.SMALLINT;
import static java.sql.Types.TINYINT;
import static java.sql.Types.BIGINT;
-import static org.apache.nifi.processors.standard.util.JdbcCommonTestUtils.convertResultSetToAvroInputStream;
-import static org.apache.nifi.processors.standard.util.JdbcCommonTestUtils.resultSetReturningMetadata;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -133,12 +131,12 @@ public class TestJdbcCommonConvertToAvro {
when(metadata.getColumnName(1)).thenReturn("t_int");
when(metadata.getTableName(1)).thenReturn("table");
- final ResultSet rs = resultSetReturningMetadata(metadata);
+ final ResultSet rs = JdbcCommonTestUtils.resultSetReturningMetadata(metadata);
final int ret = 0;
when(rs.getObject(Mockito.anyInt())).thenReturn(ret);
- final InputStream instream = convertResultSetToAvroInputStream(rs);
+ final InputStream instream = JdbcCommonTestUtils.convertResultSetToAvroInputStream(rs);
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
try (final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(instream, datumReader)) {
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcHugeStream.java b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcHugeStream.java
similarity index 99%
rename from nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcHugeStream.java
rename to nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcHugeStream.java
index 499127b..e44024a 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcHugeStream.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcHugeStream.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi.processors.standard.util;
+package org.apache.nifi.util.db;
import static org.junit.Assert.assertEquals;
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesDerby.java b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcTypesDerby.java
similarity index 99%
rename from nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesDerby.java
rename to nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcTypesDerby.java
index 2c3eb58..37af3ac 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesDerby.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcTypesDerby.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi.processors.standard.util;
+package org.apache.nifi.util.db;
import static org.junit.Assert.assertNotNull;
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesH2.java b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcTypesH2.java
similarity index 99%
rename from nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesH2.java
rename to nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcTypesH2.java
index c4f6071..5f594df 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesH2.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcTypesH2.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi.processors.standard.util;
+package org.apache.nifi.util.db;
import static org.junit.Assert.assertNotNull;
diff --git a/nifi-nar-bundles/nifi-extension-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/pom.xml
index b2c8f51..ccec552 100644
--- a/nifi-nar-bundles/nifi-extension-utils/pom.xml
+++ b/nifi-nar-bundles/nifi-extension-utils/pom.xml
@@ -32,6 +32,8 @@
<module>nifi-processor-utils</module>
<module>nifi-reporting-utils</module>
<module>nifi-syslog-utils</module>
+ <module>nifi-database-utils</module>
+ <module>nifi-database-test-utils</module>
</modules>
</project>
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 7c1a13c..fb10f6a 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -344,6 +344,11 @@
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-database-utils</artifactId>
+ <version>1.10.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-web-test-utils</artifactId>
<version>1.10.0-SNAPSHOT</version>
<scope>test</scope>
@@ -354,6 +359,12 @@
<version>2.0.1</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-database-test-utils</artifactId>
+ <version>1.10.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
index e013a5c..212febc 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
@@ -33,8 +33,8 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.sql.SqlWriter;
-import org.apache.nifi.processors.standard.util.JdbcCommon;
import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.db.JdbcCommon;
import java.nio.charset.Charset;
import java.sql.Connection;
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
index 6b166d9..1df0ae2 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
@@ -37,8 +37,8 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.db.DatabaseAdapter;
import org.apache.nifi.processors.standard.sql.SqlWriter;
-import org.apache.nifi.processors.standard.util.JdbcCommon;
import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.db.JdbcCommon;
import java.io.IOException;
import java.sql.Connection;
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
index cfdef29..f058b77 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
@@ -39,13 +39,13 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processors.standard.sql.DefaultAvroSqlWriter;
import org.apache.nifi.processors.standard.sql.SqlWriter;
-import org.apache.nifi.processors.standard.util.JdbcCommon;
-import org.apache.nifi.processors.standard.util.AvroUtil.CodecType;
+import org.apache.nifi.util.db.JdbcCommon;
-import static org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_PRECISION;
-import static org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_SCALE;
-import static org.apache.nifi.processors.standard.util.JdbcCommon.NORMALIZE_NAMES_FOR_AVRO;
-import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGICAL_TYPES;
+import static org.apache.nifi.processors.standard.util.JdbcProperties.DEFAULT_PRECISION;
+import static org.apache.nifi.processors.standard.util.JdbcProperties.DEFAULT_SCALE;
+import static org.apache.nifi.processors.standard.util.JdbcProperties.NORMALIZE_NAMES_FOR_AVRO;
+import static org.apache.nifi.processors.standard.util.JdbcProperties.USE_AVRO_LOGICAL_TYPES;
+import static org.apache.nifi.util.db.AvroUtil.CodecType;
@EventDriven
@InputRequirement(Requirement.INPUT_ALLOWED)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
index 80d33c0..897a929 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
@@ -32,8 +32,8 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processors.standard.sql.RecordSqlWriter;
import org.apache.nifi.processors.standard.sql.SqlWriter;
-import org.apache.nifi.processors.standard.util.JdbcCommon;
import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.util.db.JdbcCommon;
import java.util.ArrayList;
import java.util.Collections;
@@ -41,7 +41,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGICAL_TYPES;
+import static org.apache.nifi.processors.standard.util.JdbcProperties.USE_AVRO_LOGICAL_TYPES;
+
@EventDriven
@InputRequirement(Requirement.INPUT_ALLOWED)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
index b9686b2..96a8d3e 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
@@ -73,7 +73,7 @@ import java.util.stream.Collectors;
@WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer"),
@WritesAttribute(attribute = "record.count", description = "The number of records in the FlowFile")
})
-@Tags({"lookup", "enrichment", "route", "record", "csv", "json", "avro", "logs", "convert", "filter"})
+@Tags({"lookup", "enrichment", "route", "record", "csv", "json", "avro", "database", "db", "logs", "convert", "filter"})
@CapabilityDescription("Extracts one or more fields from a Record and looks up a value for those fields in a LookupService. If a result is returned by the LookupService, "
+ "that result is optionally added to the Record. In this case, the processor functions as an Enrichment processor. Regardless, the Record is then "
+ "routed to either the 'matched' relationship or 'unmatched' relationship (if the 'Routing Strategy' property is configured to do so), "
@@ -87,7 +87,8 @@ import java.util.stream.Collectors;
+ "the schema that is configured for your Record Writer) then the fields will not be written out to the FlowFile.")
@DynamicProperty(name = "Value To Lookup", value = "Valid Record Path", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
description = "A RecordPath that points to the field whose value will be looked up in the configured Lookup Service")
-@SeeAlso(value = {ConvertRecord.class, SplitRecord.class}, classNames = {"org.apache.nifi.lookup.SimpleKeyValueLookupService", "org.apache.nifi.lookup.maxmind.IPLookupService"})
+@SeeAlso(value = {ConvertRecord.class, SplitRecord.class},
+ classNames = {"org.apache.nifi.lookup.SimpleKeyValueLookupService", "org.apache.nifi.lookup.maxmind.IPLookupService", "org.apache.nifi.lookup.db.DatabaseRecordLookupService"})
public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPath>, RecordPath>> {
private volatile RecordPathCache recordPathCache = new RecordPathCache(25);
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
index 8834821..6a4e3a6 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
@@ -51,8 +51,8 @@ import org.apache.nifi.processor.util.pattern.PartialFunctions.FlowFileGroup;
import org.apache.nifi.processor.util.pattern.PutGroup;
import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
import org.apache.nifi.processor.util.pattern.RoutingResult;
-import org.apache.nifi.processors.standard.util.JdbcCommon;
import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.db.JdbcCommon;
import java.io.IOException;
import java.io.InputStream;
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
index 1089370..b8cc75c 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
@@ -35,7 +35,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processors.standard.sql.DefaultAvroSqlWriter;
import org.apache.nifi.processors.standard.sql.SqlWriter;
-import org.apache.nifi.processors.standard.util.JdbcCommon;
+import org.apache.nifi.util.db.JdbcCommon;
import java.util.ArrayList;
import java.util.Collections;
@@ -43,10 +43,10 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import static org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_PRECISION;
-import static org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_SCALE;
-import static org.apache.nifi.processors.standard.util.JdbcCommon.NORMALIZE_NAMES_FOR_AVRO;
-import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGICAL_TYPES;
+import static org.apache.nifi.processors.standard.util.JdbcProperties.DEFAULT_PRECISION;
+import static org.apache.nifi.processors.standard.util.JdbcProperties.DEFAULT_SCALE;
+import static org.apache.nifi.processors.standard.util.JdbcProperties.NORMALIZE_NAMES_FOR_AVRO;
+import static org.apache.nifi.processors.standard.util.JdbcProperties.USE_AVRO_LOGICAL_TYPES;
@TriggerSerially
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java
index 4464842..371d225 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java
@@ -35,8 +35,8 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processors.standard.sql.RecordSqlWriter;
import org.apache.nifi.processors.standard.sql.SqlWriter;
-import org.apache.nifi.processors.standard.util.JdbcCommon;
import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.util.db.JdbcCommon;
import java.util.ArrayList;
import java.util.Collections;
@@ -44,7 +44,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGICAL_TYPES;
+import static org.apache.nifi.processors.standard.util.JdbcProperties.USE_AVRO_LOGICAL_TYPES;
@TriggerSerially
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/DefaultAvroSqlWriter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/DefaultAvroSqlWriter.java
index 574aca7..d5b51c8 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/DefaultAvroSqlWriter.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/DefaultAvroSqlWriter.java
@@ -19,8 +19,7 @@ package org.apache.nifi.processors.standard.sql;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processors.standard.AbstractQueryDatabaseTable;
-import org.apache.nifi.processors.standard.util.JdbcCommon;
+import org.apache.nifi.util.db.JdbcCommon;
import java.io.IOException;
import java.io.OutputStream;
@@ -29,20 +28,23 @@ import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
+import static org.apache.nifi.util.db.JdbcCommon.AvroConversionOptions;
+import static org.apache.nifi.util.db.JdbcCommon.ResultSetRowCallback;
+
public class DefaultAvroSqlWriter implements SqlWriter {
- private final JdbcCommon.AvroConversionOptions options;
+ private final AvroConversionOptions options;
private final Map<String,String> attributesToAdd = new HashMap<String,String>() {{
put(CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
}};
- public DefaultAvroSqlWriter(JdbcCommon.AvroConversionOptions options) {
+ public DefaultAvroSqlWriter(AvroConversionOptions options) {
this.options = options;
}
@Override
- public long writeResultSet(ResultSet resultSet, OutputStream outputStream, ComponentLog logger, AbstractQueryDatabaseTable.MaxValueResultSetRowCollector callback) throws Exception {
+ public long writeResultSet(ResultSet resultSet, OutputStream outputStream, ComponentLog logger, ResultSetRowCallback callback) throws Exception {
try {
return JdbcCommon.convertToAvroStream(resultSet, outputStream, options, callback);
} catch (SQLException e) {
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/RecordSqlWriter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/RecordSqlWriter.java
index c1a76b4..d5d798b 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/RecordSqlWriter.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/RecordSqlWriter.java
@@ -22,8 +22,6 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processors.standard.AbstractQueryDatabaseTable;
-import org.apache.nifi.processors.standard.util.JdbcCommon;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
@@ -32,6 +30,7 @@ import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.serialization.record.ResultSetRecordSet;
+import org.apache.nifi.util.db.JdbcCommon;
import java.io.IOException;
import java.io.OutputStream;
@@ -41,6 +40,9 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
+import static org.apache.nifi.util.db.JdbcCommon.AvroConversionOptions;
+import static org.apache.nifi.util.db.JdbcCommon.ResultSetRowCallback;
+
public class RecordSqlWriter implements SqlWriter {
private final RecordSetWriterFactory recordSetWriterFactory;
@@ -52,7 +54,7 @@ public class RecordSqlWriter implements SqlWriter {
private RecordSchema writeSchema;
private String mimeType;
- public RecordSqlWriter(RecordSetWriterFactory recordSetWriterFactory, JdbcCommon.AvroConversionOptions options, int maxRowsPerFlowFile, Map<String, String> originalAttributes) {
+ public RecordSqlWriter(RecordSetWriterFactory recordSetWriterFactory, AvroConversionOptions options, int maxRowsPerFlowFile, Map<String, String> originalAttributes) {
this.recordSetWriterFactory = recordSetWriterFactory;
this.writeResultRef = new AtomicReference<>();
this.maxRowsPerFlowFile = maxRowsPerFlowFile;
@@ -61,7 +63,7 @@ public class RecordSqlWriter implements SqlWriter {
}
@Override
- public long writeResultSet(ResultSet resultSet, OutputStream outputStream, ComponentLog logger, AbstractQueryDatabaseTable.MaxValueResultSetRowCollector callback) throws Exception {
+ public long writeResultSet(ResultSet resultSet, OutputStream outputStream, ComponentLog logger, ResultSetRowCallback callback) throws Exception {
final RecordSet recordSet;
try {
if (fullRecordSet == null) {
@@ -129,9 +131,9 @@ public class RecordSqlWriter implements SqlWriter {
private static class ResultSetRecordSetWithCallback extends ResultSetRecordSet {
- private final AbstractQueryDatabaseTable.MaxValueResultSetRowCollector callback;
+ private final ResultSetRowCallback callback;
- ResultSetRecordSetWithCallback(ResultSet rs, RecordSchema readerSchema, AbstractQueryDatabaseTable.MaxValueResultSetRowCollector callback) throws SQLException {
+ ResultSetRecordSetWithCallback(ResultSet rs, RecordSchema readerSchema, ResultSetRowCallback callback) throws SQLException {
super(rs, readerSchema);
this.callback = callback;
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/SqlWriter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/SqlWriter.java
index 08fc3fd..abbe842 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/SqlWriter.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/SqlWriter.java
@@ -18,7 +18,6 @@ package org.apache.nifi.processors.standard.sql;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processors.standard.AbstractQueryDatabaseTable;
import java.io.IOException;
import java.io.OutputStream;
@@ -26,6 +25,9 @@ import java.sql.ResultSet;
import java.util.Collections;
import java.util.Map;
+import static org.apache.nifi.util.db.JdbcCommon.ResultSetRowCallback;
+
+
/**
* The SqlWriter interface provides a standard way for processors such as ExecuteSQL, ExecuteSQLRecord, QueryDatabaseTable, and QueryDatabaseTableRecord
* to write SQL result sets out to a flow file in whichever manner is appropriate. For example, ExecuteSQL writes the result set as Avro but ExecuteSQLRecord
@@ -42,7 +44,7 @@ public interface SqlWriter {
* @return the number of rows written to the output stream
* @throws Exception if any errors occur during the writing of the result set to the output stream
*/
- long writeResultSet(ResultSet resultSet, OutputStream outputStream, ComponentLog logger, AbstractQueryDatabaseTable.MaxValueResultSetRowCollector callback) throws Exception;
+ long writeResultSet(ResultSet resultSet, OutputStream outputStream, ComponentLog logger, ResultSetRowCallback callback) throws Exception;
/**
* Returns a map of attribute key/value pairs to be added to any outgoing flow file(s). The default implementation is to return an empty map.
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcProperties.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcProperties.java
new file mode 100644
index 0000000..4683cbc
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcProperties.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.util;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.db.JdbcCommon;
+
+public class JdbcProperties {
+
+ public static final PropertyDescriptor NORMALIZE_NAMES_FOR_AVRO = new PropertyDescriptor.Builder()
+ .name("dbf-normalize")
+ .displayName("Normalize Table/Column Names")
+ .description("Whether to change non-Avro-compatible characters in column names to Avro-compatible characters. For example, colons and periods "
+ + "will be changed to underscores in order to build a valid Avro record.")
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor USE_AVRO_LOGICAL_TYPES = new PropertyDescriptor.Builder()
+ .name("dbf-user-logical-types")
+ .displayName("Use Avro Logical Types")
+ .description("Whether to use Avro Logical Types for DECIMAL/NUMBER, DATE, TIME and TIMESTAMP columns. "
+ + "If disabled, written as string. "
+ + "If enabled, Logical types are used and written as its underlying type, specifically, "
+ + "DECIMAL/NUMBER as logical 'decimal': written as bytes with additional precision and scale meta data, "
+ + "DATE as logical 'date-millis': written as int denoting days since Unix epoch (1970-01-01), "
+ + "TIME as logical 'time-millis': written as int denoting milliseconds since Unix epoch, "
+ + "and TIMESTAMP as logical 'timestamp-millis': written as long denoting milliseconds since Unix epoch. "
+ + "If a reader of written Avro records also knows these logical types, then these values can be deserialized with more context depending on reader implementation.")
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor DEFAULT_PRECISION = new PropertyDescriptor.Builder()
+ .name("dbf-default-precision")
+ .displayName("Default Decimal Precision")
+ .description("When a DECIMAL/NUMBER value is written as a 'decimal' Avro logical type,"
+ + " a specific 'precision' denoting number of available digits is required."
+ + " Generally, precision is defined by column data type definition or database engines default."
+ + " However undefined precision (0) can be returned from some database engines."
+ + " 'Default Decimal Precision' is used when writing those undefined precision numbers.")
+ .defaultValue(String.valueOf(JdbcCommon.DEFAULT_PRECISION_VALUE))
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor DEFAULT_SCALE = new PropertyDescriptor.Builder()
+ .name("dbf-default-scale")
+ .displayName("Default Decimal Scale")
+ .description("When a DECIMAL/NUMBER value is written as a 'decimal' Avro logical type,"
+ + " a specific 'scale' denoting number of available decimal digits is required."
+ + " Generally, scale is defined by column data type definition or database engines default."
+ + " However when undefined precision (0) is returned, scale can also be uncertain with some database engines."
+ + " 'Default Decimal Scale' is used when writing those undefined numbers."
+ + " If a value has more decimals than specified scale, then the value will be rounded-up,"
+ + " e.g. 1.53 becomes 2 with scale 0, and 1.5 with scale 1.")
+ .defaultValue(String.valueOf(JdbcCommon.DEFAULT_SCALE_VALUE))
+ .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(true)
+ .build();
+}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
index 5458434..9961c6f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
@@ -46,12 +46,12 @@ import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.flowfile.attributes.FragmentAttributes;
import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processors.standard.util.AvroUtil;
-import org.apache.nifi.processors.standard.util.TestJdbcHugeStream;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
+import org.apache.nifi.util.db.AvroUtil;
+import org.apache.nifi.util.db.SimpleCommerceDataSet;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -540,7 +540,7 @@ public class TestExecuteSQL {
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
- TestJdbcHugeStream.loadTestData2Database(con, 100, 200, 100);
+ SimpleCommerceDataSet.loadTestData2Database(con, 100, 200, 100);
LOGGER.info("test data loaded");
// ResultSet size will be 1x200x100 = 20 000 rows
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
index 6f6a091..375cb98 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
@@ -28,7 +28,6 @@ import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.FragmentAttributes;
import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processors.standard.util.TestJdbcHugeStream;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
@@ -36,6 +35,7 @@ import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
+import org.apache.nifi.util.db.SimpleCommerceDataSet;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -492,7 +492,7 @@ public class TestExecuteSQLRecord {
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
- TestJdbcHugeStream.loadTestData2Database(con, 100, 200, 100);
+ SimpleCommerceDataSet.loadTestData2Database(con, 100, 200, 100);
LOGGER.info("test data loaded");
// ResultSet size will be 1x200x100 = 20 000 rows
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/pom.xml
index 2429ca4..ee3ba71 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/pom.xml
@@ -83,9 +83,8 @@
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
- <artifactId>nifi-mock</artifactId>
+ <artifactId>nifi-database-utils</artifactId>
<version>1.10.0-SNAPSHOT</version>
- <scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
@@ -105,6 +104,24 @@
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-dbcp-service-api</artifactId>
+ <version>1.10.0-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.burgstaller</groupId>
+ <artifactId>okhttp-digest</artifactId>
+ <version>1.18</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-mock</artifactId>
+ <version>1.10.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock-record-utils</artifactId>
<version>1.10.0-SNAPSHOT</version>
<scope>test</scope>
@@ -141,10 +158,10 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>com.burgstaller</groupId>
- <artifactId>okhttp-digest</artifactId>
- <version>1.18</version>
- <scope>compile</scope>
+ <groupId>org.apache.derby</groupId>
+ <artifactId>derby</artifactId>
+ <version>10.11.1.1</version>
+ <scope>test</scope>
</dependency>
</dependencies>
<build>
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/AbstractDatabaseLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/AbstractDatabaseLookupService.java
new file mode 100644
index 0000000..e91e6d7
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/AbstractDatabaseLookupService.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.lookup.db;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class AbstractDatabaseLookupService extends AbstractControllerService {
+
+ static final String KEY = "key";
+
+ static final Set<String> REQUIRED_KEYS = Collections.unmodifiableSet(Stream.of(KEY).collect(Collectors.toSet()));
+
+ static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder()
+ .name("dbrecord-lookup-dbcp-service")
+ .displayName("Database Connection Pooling Service")
+ .description("The Controller Service that is used to obtain connection to database")
+ .required(true)
+ .identifiesControllerService(DBCPService.class)
+ .build();
+
+ static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
+ .name("dbrecord-lookup-table-name")
+ .displayName("Table Name")
+ .description("The name of the database table to be queried. Note that this may be case-sensitive depending on the database.")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ static final PropertyDescriptor LOOKUP_KEY_COLUMN = new PropertyDescriptor.Builder()
+ .name("dbrecord-lookup-key-column")
+ .displayName("Lookup Key Column")
+ .description("The column in the table that will serve as the lookup key. This is the column that will be matched against "
+ + "the property specified in the lookup processor. Note that this may be case-sensitive depending on the database.")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ static final PropertyDescriptor CACHE_SIZE = new PropertyDescriptor.Builder()
+ .name("dbrecord-lookup-cache-size")
+ .displayName("Cache Size")
+ .description("Specifies how many lookup values/records should be cached. The cache is shared for all tables and keeps a map of lookup values to records. "
+ + "Setting this property to zero means no caching will be done and the table will be queried for each lookup value in each record. If the lookup "
+ + "table changes often or the most recent data must be retrieved, do not use the cache.")
+ .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .defaultValue("0")
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor CLEAR_CACHE_ON_ENABLED = new PropertyDescriptor.Builder()
+ .name("dbrecord-lookup-clear-cache-on-enabled")
+ .displayName("Clear Cache on Enabled")
+ .description("Whether to clear the cache when this service is enabled. If the Cache Size is zero then this property is ignored. Clearing the cache when the "
+ + "service is enabled ensures that the service will first go to the database to get the most recent data.")
+ .allowableValues("true", "false")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .defaultValue("true")
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor CACHE_EXPIRATION = new PropertyDescriptor.Builder()
+ .name("Cache Expiration")
+ .description("Time interval to clear all cache entries. If the Cache Size is zero then this property is ignored.")
+ .required(false)
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ protected List<PropertyDescriptor> properties;
+
+ DBCPService dbcpService;
+
+ volatile String lookupKeyColumn;
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/DatabaseRecordLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/DatabaseRecordLookupService.java
new file mode 100644
index 0000000..fdb1452
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/DatabaseRecordLookupService.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.lookup.db;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Expiry;
+import org.apache.avro.Schema;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.lookup.LookupFailureException;
+import org.apache.nifi.lookup.RecordLookupService;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.ResultSetRecordSet;
+import org.apache.nifi.util.Tuple;
+import org.apache.nifi.util.db.JdbcCommon;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Tags({"lookup", "cache", "enrich", "join", "rdbms", "database", "reloadable", "key", "value", "record"})
+@CapabilityDescription("A relational-database-based lookup service. When the lookup key is found in the database, "
+ + "the specified columns (or all if Lookup Value Columns are not specified) are returned as a Record. Only one row "
+ + "will be returned for each lookup, duplicate database entries are ignored.")
+public class DatabaseRecordLookupService extends AbstractDatabaseLookupService implements RecordLookupService {
+
+ private volatile Cache<Tuple<String, Object>, Record> cache;
+ private volatile JdbcCommon.AvroConversionOptions options;
+
+ static final PropertyDescriptor LOOKUP_VALUE_COLUMNS = new PropertyDescriptor.Builder()
+ .name("dbrecord-lookup-value-columns")
+ .displayName("Lookup Value Columns")
+ .description("A comma-delimited list of columns in the table that will be returned when the lookup key matches. Note that this may be case-sensitive depending on the database.")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ @Override
+ protected void init(final ControllerServiceInitializationContext context) {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(DBCP_SERVICE);
+ properties.add(TABLE_NAME);
+ properties.add(LOOKUP_KEY_COLUMN);
+ properties.add(LOOKUP_VALUE_COLUMNS);
+ properties.add(CACHE_SIZE);
+ properties.add(CLEAR_CACHE_ON_ENABLED);
+ properties.add(CACHE_EXPIRATION);
+ this.properties = Collections.unmodifiableList(properties);
+ }
+
+ @OnEnabled
+ public void onEnabled(final ConfigurationContext context) {
+ this.dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
+ this.lookupKeyColumn = context.getProperty(LOOKUP_KEY_COLUMN).evaluateAttributeExpressions().getValue();
+ final int cacheSize = context.getProperty(CACHE_SIZE).evaluateAttributeExpressions().asInteger();
+ final boolean clearCache = context.getProperty(CLEAR_CACHE_ON_ENABLED).asBoolean();
+ final long durationNanos = context.getProperty(CACHE_EXPIRATION).isSet() ? context.getProperty(CACHE_EXPIRATION).evaluateAttributeExpressions().asTimePeriod(TimeUnit.NANOSECONDS) : 0L;
+ if (this.cache == null || (cacheSize > 0 && clearCache)) {
+ if (durationNanos > 0) {
+ this.cache = Caffeine.newBuilder()
+ .maximumSize(cacheSize)
+ .expireAfter(new Expiry<Tuple<String, Object>, Record>() {
+ @Override
+ public long expireAfterCreate(Tuple<String, Object> stringObjectTuple, Record record, long currentTime) {
+ return durationNanos;
+ }
+
+ @Override
+ public long expireAfterUpdate(Tuple<String, Object> stringObjectTuple, Record record, long currentTime, long currentDuration) {
+ return currentDuration;
+ }
+
+ @Override
+ public long expireAfterRead(Tuple<String, Object> stringObjectTuple, Record record, long currentTime, long currentDuration) {
+ return currentDuration;
+ }
+ })
+ .build();
+ } else {
+ this.cache = Caffeine.newBuilder()
+ .maximumSize(cacheSize)
+ .build();
+ }
+ }
+
+ options = JdbcCommon.AvroConversionOptions.builder()
+ .recordName("NiFi_DB_Record_Lookup")
+ // Ignore duplicates
+ .maxRows(1)
+ // Keep column names as field names
+ .convertNames(false)
+ .useLogicalTypes(true)
+ .build();
+ }
+
+ @Override
+ public Optional<Record> lookup(Map<String, Object> coordinates) throws LookupFailureException {
+ return lookup(coordinates, null);
+ }
+
+ @Override
+ public Optional<Record> lookup(final Map<String, Object> coordinates, Map<String, String> context) throws LookupFailureException {
+ if (coordinates == null) {
+ return Optional.empty();
+ }
+
+ final Object key = coordinates.get(KEY);
+ if (StringUtils.isBlank(key.toString())) {
+ return Optional.empty();
+ }
+
+ final String tableName = getProperty(TABLE_NAME).evaluateAttributeExpressions(context).getValue();
+ final String lookupValueColumnsList = getProperty(LOOKUP_VALUE_COLUMNS).evaluateAttributeExpressions(context).getValue();
+
+ Set<String> lookupValueColumnsSet = new LinkedHashSet<>();
+ if (lookupValueColumnsList != null) {
+ Stream.of(lookupValueColumnsList)
+ .flatMap(path -> Arrays.stream(path.split(",")))
+ .filter(DatabaseRecordLookupService::isNotBlank)
+ .map(String::trim)
+ .forEach(lookupValueColumnsSet::add);
+ }
+
+ final String lookupValueColumns = lookupValueColumnsSet.isEmpty() ? "*" : String.join(",", lookupValueColumnsSet);
+
+ Tuple<String, Object> cacheLookupKey = new Tuple<>(tableName, key);
+
+ // Not using the function param of cache.get so we can catch and handle the checked exceptions
+ Record foundRecord = cache.get(cacheLookupKey, k -> null);
+
+ if (foundRecord == null) {
+ final String selectQuery = "SELECT " + lookupValueColumns + " FROM " + tableName + " WHERE " + lookupKeyColumn + " = ?";
+ try (final Connection con = dbcpService.getConnection(context);
+ final PreparedStatement st = con.prepareStatement(selectQuery)) {
+
+ st.setObject(1, key);
+ ResultSet resultSet = st.executeQuery();
+ final Schema avroSchema = JdbcCommon.createSchema(resultSet, options);
+ final RecordSchema recordAvroSchema = AvroTypeUtil.createSchema(avroSchema);
+ ResultSetRecordSet resultSetRecordSet = new ResultSetRecordSet(resultSet, recordAvroSchema, true);
+ foundRecord = resultSetRecordSet.next();
+
+ // Populate the cache if the record is present
+ if (foundRecord != null) {
+ cache.put(cacheLookupKey, foundRecord);
+ }
+
+ } catch (SQLException se) {
+ throw new LookupFailureException("Error executing SQL statement: " + selectQuery + "for value " + key.toString()
+ + " : " + (se.getCause() == null ? se.getMessage() : se.getCause().getMessage()), se);
+ } catch (IOException ioe) {
+ throw new LookupFailureException("Error retrieving result set for SQL statement: " + selectQuery + "for value " + key.toString()
+ + " : " + (ioe.getCause() == null ? ioe.getMessage() : ioe.getCause().getMessage()), ioe);
+ }
+ }
+
+ return Optional.ofNullable(foundRecord);
+ }
+
+ private static boolean isNotBlank(final String value) {
+ return value != null && !value.trim().isEmpty();
+ }
+
+ @Override
+ public Set<String> getRequiredKeys() {
+ return REQUIRED_KEYS;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/SimpleDatabaseLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/SimpleDatabaseLookupService.java
new file mode 100644
index 0000000..f649582
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/SimpleDatabaseLookupService.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.lookup.db;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Expiry;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.lookup.LookupFailureException;
+import org.apache.nifi.lookup.StringLookupService;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.Tuple;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@Tags({"lookup", "cache", "enrich", "join", "rdbms", "database", "reloadable", "key", "value"})
+@CapabilityDescription("A relational-database-based lookup service. When the lookup key is found in the database, " +
+ "the specified lookup value column is returned. Only one value will be returned for each lookup, duplicate database entries are ignored.")
+public class SimpleDatabaseLookupService extends AbstractDatabaseLookupService implements StringLookupService {
+
+ private volatile Cache<Tuple<String, Object>, String> cache;
+
+ static final PropertyDescriptor LOOKUP_VALUE_COLUMN =
+ new PropertyDescriptor.Builder()
+ .name("lookup-value-column")
+ .displayName("Lookup Value Column")
+ .description("The column whose value will be returned when the Lookup value is matched")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ @Override
+ protected void init(final ControllerServiceInitializationContext context) {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(DBCP_SERVICE);
+ properties.add(TABLE_NAME);
+ properties.add(LOOKUP_KEY_COLUMN);
+ properties.add(LOOKUP_VALUE_COLUMN);
+ properties.add(CACHE_SIZE);
+ properties.add(CLEAR_CACHE_ON_ENABLED);
+ properties.add(CACHE_EXPIRATION);
+ this.properties = Collections.unmodifiableList(properties);
+ }
+
+ @OnEnabled
+ public void onEnabled(final ConfigurationContext context) {
+ this.dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
+ this.lookupKeyColumn = context.getProperty(LOOKUP_KEY_COLUMN).evaluateAttributeExpressions().getValue();
+ int cacheSize = context.getProperty(CACHE_SIZE).evaluateAttributeExpressions().asInteger();
+ boolean clearCache = context.getProperty(CLEAR_CACHE_ON_ENABLED).asBoolean();
+ final long durationNanos = context.getProperty(CACHE_EXPIRATION).isSet() ? context.getProperty(CACHE_EXPIRATION).evaluateAttributeExpressions().asTimePeriod(TimeUnit.NANOSECONDS) : 0L;
+ if (this.cache == null || (cacheSize > 0 && clearCache)) {
+ if (durationNanos > 0) {
+ this.cache = Caffeine.newBuilder()
+ .maximumSize(cacheSize)
+ .expireAfter(new Expiry<Tuple<String, Object>, Object>() {
+ @Override
+ public long expireAfterCreate(Tuple<String, Object> stringObjectTuple, Object value, long currentTime) {
+ return durationNanos;
+ }
+
+ @Override
+ public long expireAfterUpdate(Tuple<String, Object> stringObjectTuple, Object value, long currentTime, long currentDuration) {
+ return currentDuration;
+ }
+
+ @Override
+ public long expireAfterRead(Tuple<String, Object> stringObjectTuple, Object value, long currentTime, long currentDuration) {
+ return currentDuration;
+ }
+ })
+ .build();
+ } else {
+ this.cache = Caffeine.newBuilder()
+ .maximumSize(cacheSize)
+ .build();
+ }
+ }
+ }
+
+ @Override
+ public Optional<String> lookup(Map<String, Object> coordinates) throws LookupFailureException {
+ return lookup(coordinates, null);
+ }
+
+ @Override
+ public Optional<String> lookup(Map<String, Object> coordinates, Map<String, String> context) throws LookupFailureException {
+ if (coordinates == null) {
+ return Optional.empty();
+ }
+
+ final Object key = coordinates.get(KEY);
+ if (StringUtils.isBlank(key.toString())) {
+ return Optional.empty();
+ }
+
+ final String tableName = getProperty(TABLE_NAME).evaluateAttributeExpressions(context).getValue();
+ final String lookupValueColumn = getProperty(LOOKUP_VALUE_COLUMN).evaluateAttributeExpressions(context).getValue();
+
+ Tuple<String, Object> cacheLookupKey = new Tuple<>(tableName, key);
+
+ // Not using the function param of cache.get so we can catch and handle the checked exceptions
+ String foundRecord = cache.get(cacheLookupKey, k -> null);
+
+ if (foundRecord == null) {
+ final String selectQuery = "SELECT " + lookupValueColumn + " FROM " + tableName + " WHERE " + lookupKeyColumn + " = ?";
+ try (final Connection con = dbcpService.getConnection(context);
+ final PreparedStatement st = con.prepareStatement(selectQuery)) {
+
+ st.setObject(1, key);
+ ResultSet resultSet = st.executeQuery();
+
+ if (!resultSet.next()) {
+ return Optional.empty();
+ }
+
+ Object o = resultSet.getObject(lookupValueColumn);
+ if (o == null) {
+ return Optional.empty();
+ }
+ foundRecord = o.toString();
+
+ // Populate the cache if the record is present
+ if (foundRecord != null) {
+ cache.put(cacheLookupKey, foundRecord);
+ }
+
+ } catch (SQLException se) {
+ throw new LookupFailureException("Error executing SQL statement: " + selectQuery + "for value " + key.toString()
+ + " : " + (se.getCause() == null ? se.getMessage() : se.getCause().getMessage()), se);
+ }
+ }
+
+ return Optional.ofNullable(foundRecord);
+ }
+
+ @Override
+ public Set<String> getRequiredKeys() {
+ return REQUIRED_KEYS;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index 631fdaa..06d7622 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -14,9 +14,11 @@
# limitations under the License.
org.apache.nifi.lookup.maxmind.IPLookupService
org.apache.nifi.lookup.CSVRecordLookupService
+org.apache.nifi.lookup.db.DatabaseRecordLookupService
org.apache.nifi.lookup.PropertiesFileLookupService
org.apache.nifi.lookup.RestLookupService
org.apache.nifi.lookup.SimpleKeyValueLookupService
org.apache.nifi.lookup.SimpleCsvFileLookupService
+org.apache.nifi.lookup.db.SimpleDatabaseLookupService
org.apache.nifi.lookup.XMLFileLookupService
org.apache.nifi.lookup.DistributedMapCacheLookupService
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/groovy/org/apache/nifi/lookup/db/TestDatabaseRecordLookupService.groovy b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/groovy/org/apache/nifi/lookup/db/TestDatabaseRecordLookupService.groovy
new file mode 100644
index 0000000..860a90d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/groovy/org/apache/nifi/lookup/db/TestDatabaseRecordLookupService.groovy
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.lookup.db
+
+import org.apache.nifi.controller.AbstractControllerService
+import org.apache.nifi.dbcp.DBCPService
+import org.apache.nifi.lookup.LookupFailureException
+import org.apache.nifi.lookup.LookupService
+import org.apache.nifi.lookup.TestProcessor
+import org.apache.nifi.processor.exception.ProcessException
+import org.apache.nifi.reporting.InitializationException
+import org.apache.nifi.serialization.record.Record
+import org.apache.nifi.util.TestRunner
+import org.apache.nifi.util.TestRunners
+import org.junit.Before
+import org.junit.BeforeClass
+import org.junit.Test
+
+import java.sql.Connection
+import java.sql.DriverManager
+import java.sql.SQLException
+import java.sql.Statement
+
+import static org.hamcrest.CoreMatchers.instanceOf
+import static org.junit.Assert.assertEquals
+import static org.junit.Assert.assertNull
+import static org.junit.Assert.assertThat
+
+
+class TestDatabaseRecordLookupService {
+
+ private TestRunner runner
+
+ private final static Optional<Record> EMPTY_RECORD = Optional.empty()
+ private final static String DB_LOCATION = "target/db"
+
+ @BeforeClass
+ static void setupClass() {
+ System.setProperty("derby.stream.error.file", "target/derby.log")
+ }
+
+ @Before
+ void setup() throws InitializationException {
+ final DBCPService dbcp = new DBCPServiceSimpleImpl()
+ final Map<String, String> dbcpProperties = new HashMap<>()
+
+ runner = TestRunners.newTestRunner(TestProcessor.class)
+ runner.addControllerService("dbcp", dbcp, dbcpProperties)
+ runner.enableControllerService(dbcp)
+ }
+
+ @Test
+ void testDatabaseLookupService() throws InitializationException, IOException, LookupFailureException {
+ // remove previous test database, if any
+ final File dbLocation = new File(DB_LOCATION)
+ dbLocation.delete()
+
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).connection
+ final Statement stmt = con.createStatement()
+
+ try {
+ stmt.execute("drop table TEST")
+ } catch (final SQLException sqle) {
+ }
+
+ stmt.execute("create table TEST (id integer not null, val1 integer, val2 varchar(10), constraint my_pk primary key (id))")
+ stmt.execute("insert into TEST (id, val1, val2) VALUES (0, NULL, 'Hello')")
+ stmt.execute("insert into TEST (id, val1, val2) VALUES (1, 1, 'World')")
+
+ final DatabaseRecordLookupService service = new DatabaseRecordLookupService()
+
+ runner.addControllerService("db-lookup-service", service)
+ runner.setProperty(service, DatabaseRecordLookupService.DBCP_SERVICE, "dbcp")
+ runner.assertNotValid()
+ runner.setProperty(service, DatabaseRecordLookupService.TABLE_NAME, "TEST")
+ runner.setProperty(service, DatabaseRecordLookupService.LOOKUP_KEY_COLUMN, "id")
+ runner.enableControllerService(service)
+ runner.assertValid(service)
+
+ def lookupService = (DatabaseRecordLookupService) runner.processContext.controllerServiceLookup.getControllerService("db-lookup-service")
+
+ assertThat(lookupService, instanceOf(LookupService.class))
+
+ final Optional<Record> property1 = lookupService.lookup(Collections.singletonMap("key", "0"))
+ assertNull("Should be null but is not", property1.get().getAsInt("VAL1"))
+ assertEquals("Hello", property1.get().getAsString("VAL2"))
+
+ final Optional<Record> property2 = lookupService.lookup(Collections.singletonMap("key", "1"))
+ assertEquals(1, property2.get().getAsInt("VAL1"))
+ assertEquals("World", property2.get().getAsString("VAL2"))
+
+ // Key not found
+ final Optional<Record> property3 = lookupService.lookup(Collections.singletonMap("key", "2"))
+ assertEquals(EMPTY_RECORD, property3)
+ }
+
+ @Test
+ void testDatabaseLookupServiceSpecifyColumns() throws InitializationException, IOException, LookupFailureException {
+ // remove previous test database, if any
+ final File dbLocation = new File(DB_LOCATION)
+ dbLocation.delete()
+
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).connection
+ final Statement stmt = con.createStatement()
+
+ try {
+ stmt.execute("drop table TEST")
+ } catch (final SQLException sqle) {
+ }
+
+ stmt.execute("create table TEST (id integer not null, val1 integer, val2 varchar(10), constraint my_pk primary key (id))")
+ stmt.execute("insert into TEST (id, val1, val2) VALUES (0, NULL, 'Hello')")
+ stmt.execute("insert into TEST (id, val1, val2) VALUES (1, 1, 'World')")
+
+ final DatabaseRecordLookupService service = new DatabaseRecordLookupService()
+
+ runner.addControllerService("db-lookup-service", service)
+ runner.setProperty(service, DatabaseRecordLookupService.DBCP_SERVICE, "dbcp")
+ runner.assertNotValid()
+ runner.setProperty(service, DatabaseRecordLookupService.TABLE_NAME, "TEST")
+ runner.setProperty(service, DatabaseRecordLookupService.LOOKUP_KEY_COLUMN, "id")
+ runner.setProperty(service, DatabaseRecordLookupService.LOOKUP_VALUE_COLUMNS, "val1")
+ runner.enableControllerService(service)
+ runner.assertValid(service)
+
+ def lookupService = (DatabaseRecordLookupService) runner.processContext.controllerServiceLookup.getControllerService("db-lookup-service")
+
+ assertThat(lookupService, instanceOf(LookupService.class))
+
+ final Optional<Record> property1 = lookupService.lookup(Collections.singletonMap("key", "0"))
+ assertNull("Should be null but is not", property1.get().getAsInt("VAL1"))
+
+ final Optional<Record> property2 = lookupService.lookup(Collections.singletonMap("key", "1"))
+ assertEquals(1, property2.get().getAsInt("VAL1"))
+
+ // Key not found
+ final Optional<Record> property3 = lookupService.lookup(Collections.singletonMap("key", "2"))
+ assertEquals(EMPTY_RECORD, property3)
+ }
+
+ @Test
+ void exerciseCacheLogic() {
+ // remove previous test database, if any
+ final File dbLocation = new File(DB_LOCATION)
+ dbLocation.delete()
+
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).connection
+ final Statement stmt = con.createStatement()
+
+ try {
+ stmt.execute("drop table TEST")
+ } catch (final SQLException sqle) {
+ }
+
+ stmt.execute("create table TEST (id integer not null, val1 integer, val2 varchar(10), constraint my_pk primary key (id))")
+ stmt.execute("insert into TEST (id, val1, val2) VALUES (0, NULL, 'Hello')")
+ stmt.execute("insert into TEST (id, val1, val2) VALUES (1, 1, 'World')")
+
+ final DatabaseRecordLookupService service = new DatabaseRecordLookupService()
+
+ runner.addControllerService("db-lookup-service", service)
+ runner.setProperty(service, DatabaseRecordLookupService.DBCP_SERVICE, "dbcp")
+ runner.assertNotValid()
+ runner.setProperty(service, DatabaseRecordLookupService.TABLE_NAME, "TEST")
+ runner.setProperty(service, DatabaseRecordLookupService.LOOKUP_KEY_COLUMN, "id")
+ runner.setProperty(service, DatabaseRecordLookupService.CACHE_SIZE, "10")
+ runner.enableControllerService(service)
+ runner.assertValid(service)
+
+ def lookupService = (DatabaseRecordLookupService) runner.processContext.controllerServiceLookup.getControllerService("db-lookup-service")
+
+ assertThat(lookupService, instanceOf(LookupService.class))
+
+ final Optional<Record> property1 = lookupService.lookup(Collections.singletonMap("key", "1"))
+ assertEquals(1, property1.get().getAsInt("VAL1"))
+ assertEquals("World", property1.get().getAsString("VAL2"))
+
+ final Optional<Record> property2 = lookupService.lookup(Collections.singletonMap("key", "1"))
+ assertEquals(1, property2.get().getAsInt("VAL1"))
+ assertEquals("World", property2.get().getAsString("VAL2"))
+
+ final Optional<Record> property3 = lookupService.lookup(Collections.singletonMap("key", "0"))
+ assertNull(property3.get().getAsInt("VAL1"))
+ assertEquals("Hello", property3.get().getAsString("VAL2"))
+
+ final Optional<Record> property4 = lookupService.lookup(Collections.singletonMap("key", "0"))
+ assertNull(property4.get().getAsInt("VAL1"))
+ assertEquals("Hello", property4.get().getAsString("VAL2"))
+ }
+
+ /**
+ * Simple implementation for component testing.
+ *
+ */
+ class DBCPServiceSimpleImpl extends AbstractControllerService implements DBCPService {
+
+ @Override
+ String getIdentifier() {
+ "dbcp"
+ }
+
+ @Override
+ Connection getConnection() throws ProcessException {
+ try {
+ Class.forName("org.apache.derby.jdbc.EmbeddedDriver")
+ DriverManager.getConnection("jdbc:derby:${DB_LOCATION};create=true")
+ } catch (e) {
+ throw new ProcessException("getConnection failed: " + e);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/groovy/org/apache/nifi/lookup/db/TestSimpleDatabaseLookupService.groovy b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/groovy/org/apache/nifi/lookup/db/TestSimpleDatabaseLookupService.groovy
new file mode 100644
index 0000000..e255771
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/groovy/org/apache/nifi/lookup/db/TestSimpleDatabaseLookupService.groovy
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.lookup.db
+
+import org.apache.nifi.controller.AbstractControllerService
+import org.apache.nifi.dbcp.DBCPService
+import org.apache.nifi.lookup.LookupFailureException
+import org.apache.nifi.lookup.LookupService
+import org.apache.nifi.lookup.TestProcessor
+import org.apache.nifi.processor.exception.ProcessException
+import org.apache.nifi.reporting.InitializationException
+import org.apache.nifi.serialization.record.Record
+import org.apache.nifi.util.TestRunner
+import org.apache.nifi.util.TestRunners
+import org.junit.Before
+import org.junit.BeforeClass
+import org.junit.Test
+
+import java.sql.Connection
+import java.sql.DriverManager
+import java.sql.SQLException
+import java.sql.Statement
+
+import static org.hamcrest.CoreMatchers.instanceOf
+import static org.junit.Assert.*
+
+class TestSimpleDatabaseLookupService {
+
+ private TestRunner runner
+
+ private final static Optional<Record> EMPTY_RECORD = Optional.empty()
+ private final static String DB_LOCATION = "target/db"
+
+ @BeforeClass
+ static void setupClass() {
+ System.setProperty("derby.stream.error.file", "target/derby.log")
+ }
+
+ @Before
+ void setup() throws InitializationException {
+ final DBCPService dbcp = new DBCPServiceSimpleImpl()
+ final Map<String, String> dbcpProperties = new HashMap<>()
+
+ runner = TestRunners.newTestRunner(TestProcessor.class)
+ runner.addControllerService("dbcp", dbcp, dbcpProperties)
+ runner.enableControllerService(dbcp)
+ }
+
+ @Test
+ void testDatabaseLookupService() throws InitializationException, IOException, LookupFailureException {
+ // remove previous test database, if any
+ final File dbLocation = new File(DB_LOCATION)
+ dbLocation.delete()
+
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).connection
+ final Statement stmt = con.createStatement()
+
+ try {
+ stmt.execute("drop table TEST")
+ } catch (final SQLException sqle) {
+ }
+
+ stmt.execute("create table TEST (id integer not null, val1 integer, val2 varchar(10), constraint my_pk primary key (id))")
+ stmt.execute("insert into TEST (id, val1, val2) VALUES (0, NULL, 'Hello')")
+ stmt.execute("insert into TEST (id, val1, val2) VALUES (1, 1, 'World')")
+
+ final SimpleDatabaseLookupService service = new SimpleDatabaseLookupService()
+
+ runner.addControllerService("db-lookup-service", service)
+ runner.setProperty(service, SimpleDatabaseLookupService.DBCP_SERVICE, "dbcp")
+ runner.assertNotValid()
+ runner.setProperty(service, SimpleDatabaseLookupService.TABLE_NAME, "TEST")
+ runner.setProperty(service, SimpleDatabaseLookupService.LOOKUP_KEY_COLUMN, "id")
+ runner.setProperty(service, SimpleDatabaseLookupService.LOOKUP_VALUE_COLUMN, "VAL1")
+ runner.enableControllerService(service)
+ runner.assertValid(service)
+
+ def lookupService = (SimpleDatabaseLookupService) runner.processContext.controllerServiceLookup.getControllerService("db-lookup-service")
+
+ assertThat(lookupService, instanceOf(LookupService.class))
+
+ // Lookup VAL1
+ final Optional<String> property1 = lookupService.lookup(Collections.singletonMap("key", "0"))
+ assertFalse(property1.isPresent())
+ // Key not found
+ final Optional<String> property3 = lookupService.lookup(Collections.singletonMap("key", "2"))
+ assertEquals(EMPTY_RECORD, property3)
+
+ runner.disableControllerService(service)
+ runner.setProperty(service, SimpleDatabaseLookupService.LOOKUP_VALUE_COLUMN, "VAL2")
+ runner.enableControllerService(service)
+ final Optional<String> property2 = lookupService.lookup(Collections.singletonMap("key", "1"))
+ assertEquals("World", property2.get())
+ }
+
+ @Test
+ void exerciseCacheLogic() {
+ // remove previous test database, if any
+ final File dbLocation = new File(DB_LOCATION)
+ dbLocation.delete()
+
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).connection
+ final Statement stmt = con.createStatement()
+
+ try {
+ stmt.execute("drop table TEST")
+ } catch (final SQLException sqle) {
+ }
+
+ stmt.execute("create table TEST (id integer not null, val1 integer, val2 varchar(10), constraint my_pk primary key (id))")
+ stmt.execute("insert into TEST (id, val1, val2) VALUES (0, NULL, 'Hello')")
+ stmt.execute("insert into TEST (id, val1, val2) VALUES (1, 1, 'World')")
+
+ final SimpleDatabaseLookupService service = new SimpleDatabaseLookupService()
+
+ runner.addControllerService("db-lookup-service", service)
+ runner.setProperty(service, SimpleDatabaseLookupService.DBCP_SERVICE, "dbcp")
+ runner.assertNotValid()
+ runner.setProperty(service, SimpleDatabaseLookupService.TABLE_NAME, "TEST")
+ runner.setProperty(service, SimpleDatabaseLookupService.LOOKUP_KEY_COLUMN, "id")
+ runner.setProperty(service, SimpleDatabaseLookupService.CACHE_SIZE, "10")
+ runner.setProperty(service, SimpleDatabaseLookupService.LOOKUP_VALUE_COLUMN, "VAL1")
+ runner.enableControllerService(service)
+ runner.assertValid(service)
+
+ def lookupService = (SimpleDatabaseLookupService) runner.processContext.controllerServiceLookup.getControllerService("db-lookup-service")
+
+ assertThat(lookupService, instanceOf(LookupService.class))
+
+ // Lookup VAL1
+ final Optional<String> property1 = lookupService.lookup(Collections.singletonMap("key", "1"))
+ assertEquals("1", property1.get())
+ final Optional<String> property3 = lookupService.lookup(Collections.singletonMap("key", "0"))
+ assertFalse(property3.isPresent())
+
+
+ runner.disableControllerService(service)
+ runner.setProperty(service, SimpleDatabaseLookupService.LOOKUP_VALUE_COLUMN, "VAL2")
+ runner.enableControllerService(service)
+ final Optional<String> property2 = lookupService.lookup(Collections.singletonMap("key", "1"))
+ assertEquals("World", property2.get())
+
+ final Optional<String> property4 = lookupService.lookup(Collections.singletonMap("key", "0"))
+ assertEquals("Hello", property4.get())
+ }
+
+ /**
+ * Simple implementation for component testing.
+ *
+ */
+ class DBCPServiceSimpleImpl extends AbstractControllerService implements DBCPService {
+
+ @Override
+ String getIdentifier() {
+ "dbcp"
+ }
+
+ @Override
+ Connection getConnection() throws ProcessException {
+ try {
+ Class.forName("org.apache.derby.jdbc.EmbeddedDriver")
+ DriverManager.getConnection("jdbc:derby:${DB_LOCATION};create=true")
+ } catch (e) {
+ throw new ProcessException("getConnection failed: " + e);
+ }
+ }
+ }
+}
\ No newline at end of file