You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2018/10/06 08:53:39 UTC
[1/4] nifi git commit: NIFI-4517: Added ExecuteSQLRecord and
QueryDatabaseTableRecord processors
Repository: nifi
Updated Branches:
refs/heads/master b4810b8dd -> c6572f042
http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
new file mode 100644
index 0000000..04c4c00
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.standard.util.TestJdbcHugeStream;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestExecuteSQLRecord {
+
+ private static final Logger LOGGER;
+
+ static {
+ System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
+ System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
+ System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug");
+ System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.ExecuteSQLRecord", "debug");
+ System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.TestExecuteSQLRecord", "debug");
+ LOGGER = LoggerFactory.getLogger(TestExecuteSQLRecord.class);
+ }
+
+ final static String DB_LOCATION = "target/db";
+
+ final static String QUERY_WITH_EL = "select "
+ + " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode"
+ + ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as ProductCode"
+ + ", REL.ID as RelId, REL.NAME as RelName, REL.CODE as RelCode"
+ + ", ROW_NUMBER() OVER () as rownr "
+ + " from persons PER, products PRD, relationships REL"
+ + " where PER.ID = ${person.id}";
+
+ final static String QUERY_WITHOUT_EL = "select "
+ + " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode"
+ + ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as ProductCode"
+ + ", REL.ID as RelId, REL.NAME as RelName, REL.CODE as RelCode"
+ + ", ROW_NUMBER() OVER () as rownr "
+ + " from persons PER, products PRD, relationships REL"
+ + " where PER.ID = 10";
+
+ final static String QUERY_WITHOUT_EL_WITH_PARAMS = "select "
+ + " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode"
+ + ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as ProductCode"
+ + ", REL.ID as RelId, REL.NAME as RelName, REL.CODE as RelCode"
+ + ", ROW_NUMBER() OVER () as rownr "
+ + " from persons PER, products PRD, relationships REL"
+ + " where PER.ID < ? AND REL.ID < ?";
+
+
+ @BeforeClass
+ public static void setupClass() {
+ System.setProperty("derby.stream.error.file", "target/derby.log");
+ }
+
+ private TestRunner runner;
+
+ @Before
+ public void setup() throws InitializationException {
+ final DBCPService dbcp = new DBCPServiceSimpleImpl();
+ final Map<String, String> dbcpProperties = new HashMap<>();
+
+ runner = TestRunners.newTestRunner(ExecuteSQLRecord.class);
+ runner.addControllerService("dbcp", dbcp, dbcpProperties);
+ runner.enableControllerService(dbcp);
+ runner.setProperty(AbstractExecuteSQL.DBCP_SERVICE, "dbcp");
+ }
+
+ @Test
+ public void testIncomingConnectionWithNoFlowFile() throws InitializationException {
+ runner.setIncomingConnection(true);
+ runner.setProperty(AbstractExecuteSQL.SQL_SELECT_QUERY, "SELECT * FROM persons");
+ MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
+ runner.addControllerService("writer", recordWriter);
+ runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
+ runner.enableControllerService(recordWriter);
+ runner.run();
+ runner.assertTransferCount(AbstractExecuteSQL.REL_SUCCESS, 0);
+ runner.assertTransferCount(AbstractExecuteSQL.REL_FAILURE, 0);
+ }
+
+ @Test
+ public void testIncomingConnectionWithNoFlowFileAndNoQuery() throws InitializationException {
+ runner.setIncomingConnection(true);
+ MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
+ runner.addControllerService("writer", recordWriter);
+ runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
+ runner.enableControllerService(recordWriter);
+ runner.run();
+ runner.assertTransferCount(AbstractExecuteSQL.REL_SUCCESS, 0);
+ runner.assertTransferCount(AbstractExecuteSQL.REL_FAILURE, 0);
+ }
+
+ @Test(expected = AssertionError.class)
+ public void testNoIncomingConnectionAndNoQuery() throws InitializationException {
+ runner.setIncomingConnection(false);
+ runner.run();
+ }
+
+ @Test
+ public void testNoIncomingConnection() throws ClassNotFoundException, SQLException, InitializationException, IOException {
+ runner.setIncomingConnection(false);
+ invokeOnTriggerRecords(null, QUERY_WITHOUT_EL, false, null, true);
+ assertEquals(ProvenanceEventType.RECEIVE, runner.getProvenanceEvents().get(0).getEventType());
+ }
+
+ @Test
+ public void testSelectQueryInFlowFile() throws InitializationException, ClassNotFoundException, SQLException, IOException {
+ invokeOnTriggerRecords(null, QUERY_WITHOUT_EL, true, null, false);
+ assertEquals(ProvenanceEventType.FORK, runner.getProvenanceEvents().get(0).getEventType());
+ assertEquals(ProvenanceEventType.FETCH, runner.getProvenanceEvents().get(1).getEventType());
+ }
+
+ @Test
+ public void testMaxRowsPerFlowFile() throws Exception {
+ // remove previous test database, if any
+ final File dbLocation = new File(DB_LOCATION);
+ dbLocation.delete();
+
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+
+ try {
+ stmt.execute("drop table TEST_NULL_INT");
+ } catch (final SQLException sqle) {
+ }
+
+ stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
+
+ for (int i = 0; i < 1000; i++) {
+ stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (" + i + ", 1, 1)");
+ }
+
+ runner.setIncomingConnection(false);
+ runner.setProperty(AbstractExecuteSQL.MAX_ROWS_PER_FLOW_FILE, "5");
+ runner.setProperty(AbstractExecuteSQL.OUTPUT_BATCH_SIZE, "0");
+ runner.setProperty(AbstractExecuteSQL.SQL_SELECT_QUERY, "SELECT * FROM TEST_NULL_INT");
+ MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
+ runner.addControllerService("writer", recordWriter);
+ runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
+ runner.enableControllerService(recordWriter);
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(AbstractExecuteSQL.REL_SUCCESS, 200);
+ runner.assertTransferCount(AbstractExecuteSQL.REL_FAILURE, 0);
+ runner.assertAllFlowFilesContainAttribute(AbstractExecuteSQL.REL_SUCCESS, FragmentAttributes.FRAGMENT_INDEX.key());
+ runner.assertAllFlowFilesContainAttribute(AbstractExecuteSQL.REL_SUCCESS, FragmentAttributes.FRAGMENT_ID.key());
+ runner.assertAllFlowFilesContainAttribute(AbstractExecuteSQL.REL_SUCCESS, FragmentAttributes.FRAGMENT_COUNT.key());
+
+ MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(AbstractExecuteSQL.REL_SUCCESS).get(0);
+
+ firstFlowFile.assertAttributeEquals(AbstractExecuteSQL.RESULT_ROW_COUNT, "5");
+ firstFlowFile.assertAttributeEquals("record.count", "5");
+ firstFlowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "text/plain"); // MockRecordWriter has text/plain MIME type
+ firstFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), "0");
+ firstFlowFile.assertAttributeEquals(AbstractExecuteSQL.RESULTSET_INDEX, "0");
+
+ MockFlowFile lastFlowFile = runner.getFlowFilesForRelationship(AbstractExecuteSQL.REL_SUCCESS).get(199);
+
+ lastFlowFile.assertAttributeEquals(AbstractExecuteSQL.RESULT_ROW_COUNT, "5");
+ lastFlowFile.assertAttributeEquals("record.count", "5");
+ lastFlowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "text/plain"); // MockRecordWriter has text/plain MIME type
+ lastFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), "199");
+ lastFlowFile.assertAttributeEquals(AbstractExecuteSQL.RESULTSET_INDEX, "0");
+ }
+
+ @Test
+ public void testInsertStatementCreatesFlowFile() throws Exception {
+ // remove previous test database, if any
+ final File dbLocation = new File(DB_LOCATION);
+ dbLocation.delete();
+
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+
+ try {
+ stmt.execute("drop table TEST_NULL_INT");
+ } catch (final SQLException sqle) {
+ }
+
+ stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
+
+ runner.setIncomingConnection(false);
+ runner.setProperty(AbstractExecuteSQL.SQL_SELECT_QUERY, "insert into TEST_NULL_INT (id, val1, val2) VALUES (0, NULL, 1)");
+ MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
+ runner.addControllerService("writer", recordWriter);
+ runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
+ runner.enableControllerService(recordWriter);
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(AbstractExecuteSQL.REL_SUCCESS, 1);
+ runner.getFlowFilesForRelationship(AbstractExecuteSQL.REL_SUCCESS).get(0).assertAttributeEquals(AbstractExecuteSQL.RESULT_ROW_COUNT, "0");
+ }
+
+ @Test
+ public void testNoRowsStatementCreatesEmptyFlowFile() throws Exception {
+ // remove previous test database, if any
+ final File dbLocation = new File(DB_LOCATION);
+ dbLocation.delete();
+
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+
+ try {
+ stmt.execute("drop table TEST_NULL_INT");
+ } catch (final SQLException sqle) {
+ }
+
+ stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
+
+ runner.setIncomingConnection(true);
+ runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from TEST_NULL_INT");
+ MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
+ runner.addControllerService("writer", recordWriter);
+ runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
+ runner.enableControllerService(recordWriter);
+ runner.enqueue("Hello".getBytes());
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
+ MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0);
+ firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "0");
+ firstFlowFile.assertContentEquals("");
+ }
+
+ @Test
+ public void testWithSqlException() throws Exception {
+ // remove previous test database, if any
+ final File dbLocation = new File(DB_LOCATION);
+ dbLocation.delete();
+
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+
+ try {
+ stmt.execute("drop table TEST_NO_ROWS");
+ } catch (final SQLException sqle) {
+ }
+
+ stmt.execute("create table TEST_NO_ROWS (id integer)");
+
+ runner.setIncomingConnection(false);
+ // Try a valid SQL statement that will generate an error (val1 does not exist, e.g.)
+ runner.setProperty(AbstractExecuteSQL.SQL_SELECT_QUERY, "SELECT val1 FROM TEST_NO_ROWS");
+ MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
+ runner.addControllerService("writer", recordWriter);
+ runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
+ runner.enableControllerService(recordWriter);
+ runner.run();
+
+ //No incoming flow file containing a query, and an exception causes no outbound flowfile.
+ // There should be no flow files on either relationship
+ runner.assertAllFlowFilesTransferred(AbstractExecuteSQL.REL_FAILURE, 0);
+ runner.assertAllFlowFilesTransferred(AbstractExecuteSQL.REL_SUCCESS, 0);
+ }
+
+ public void invokeOnTriggerRecords(final Integer queryTimeout, final String query, final boolean incomingFlowFile, final Map<String, String> attrs, final boolean setQueryProperty)
+ throws InitializationException, ClassNotFoundException, SQLException, IOException {
+
+ if (queryTimeout != null) {
+ runner.setProperty(AbstractExecuteSQL.QUERY_TIMEOUT, queryTimeout.toString() + " secs");
+ }
+
+ // remove previous test database, if any
+ final File dbLocation = new File(DB_LOCATION);
+ dbLocation.delete();
+
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+ TestJdbcHugeStream.loadTestData2Database(con, 100, 200, 100);
+ LOGGER.info("test data loaded");
+
+ // ResultSet size will be 1x200x100 = 20 000 rows
+ // because of where PER.ID = ${person.id}
+ final int nrOfRows = 20000;
+
+ MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
+ runner.addControllerService("writer", recordWriter);
+ runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
+ runner.enableControllerService(recordWriter);
+
+ if (incomingFlowFile) {
+ // incoming FlowFile content is not used, but attributes are used
+ final Map<String, String> attributes = (attrs == null) ? new HashMap<>() : attrs;
+ attributes.put("person.id", "10");
+ if (!setQueryProperty) {
+ runner.enqueue(query.getBytes(), attributes);
+ } else {
+ runner.enqueue("Hello".getBytes(), attributes);
+ }
+ }
+
+ if (setQueryProperty) {
+ runner.setProperty(AbstractExecuteSQL.SQL_SELECT_QUERY, query);
+ }
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(AbstractExecuteSQL.REL_SUCCESS, 1);
+ runner.assertAllFlowFilesContainAttribute(AbstractExecuteSQL.REL_SUCCESS, AbstractExecuteSQL.RESULT_QUERY_DURATION);
+ runner.assertAllFlowFilesContainAttribute(AbstractExecuteSQL.REL_SUCCESS, AbstractExecuteSQL.RESULT_QUERY_EXECUTION_TIME);
+ runner.assertAllFlowFilesContainAttribute(AbstractExecuteSQL.REL_SUCCESS, AbstractExecuteSQL.RESULT_QUERY_FETCH_TIME);
+ runner.assertAllFlowFilesContainAttribute(AbstractExecuteSQL.REL_SUCCESS, AbstractExecuteSQL.RESULT_ROW_COUNT);
+
+ final List<MockFlowFile> flowfiles = runner.getFlowFilesForRelationship(AbstractExecuteSQL.REL_SUCCESS);
+ final long executionTime = Long.parseLong(flowfiles.get(0).getAttribute(AbstractExecuteSQL.RESULT_QUERY_EXECUTION_TIME));
+ final long fetchTime = Long.parseLong(flowfiles.get(0).getAttribute(AbstractExecuteSQL.RESULT_QUERY_FETCH_TIME));
+ final long durationTime = Long.parseLong(flowfiles.get(0).getAttribute(AbstractExecuteSQL.RESULT_QUERY_DURATION));
+ assertEquals(durationTime, fetchTime + executionTime);
+ }
+
+
+ /**
+ * Simple implementation only for ExecuteSQL processor testing.
+ */
+ class DBCPServiceSimpleImpl extends AbstractControllerService implements DBCPService {
+
+ @Override
+ public String getIdentifier() {
+ return "dbcp";
+ }
+
+ @Override
+ public Connection getConnection() throws ProcessException {
+ try {
+ Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+ final Connection con = DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true");
+ return con;
+ } catch (final Exception e) {
+ throw new ProcessException("getConnection failed: " + e);
+ }
+ }
+ }
+
+}
[2/4] nifi git commit: NIFI-4517: Added ExecuteSQLRecord and
QueryDatabaseTableRecord processors
Posted by pv...@apache.org.
http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordTest.java
new file mode 100644
index 0000000..a1d67c0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordTest.java
@@ -0,0 +1,1332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.standard.db.DatabaseAdapter;
+import org.apache.nifi.processors.standard.db.impl.GenericDatabaseAdapter;
+import org.apache.nifi.processors.standard.db.impl.MSSQLDatabaseAdapter;
+import org.apache.nifi.processors.standard.db.impl.MySQLDatabaseAdapter;
+import org.apache.nifi.processors.standard.db.impl.OracleDatabaseAdapter;
+import org.apache.nifi.processors.standard.db.impl.PhoenixDatabaseAdapter;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.nifi.util.file.FileUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.SQLNonTransientConnectionException;
+import java.sql.Statement;
+import java.sql.Types;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TimeZone;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit tests for the QueryDatabaseTableRecord processor
+ */
+public class QueryDatabaseTableRecordTest {
+
+ MockQueryDatabaseTableRecord processor;
+ private TestRunner runner;
+ private final static String DB_LOCATION = "target/db_qdt";
+ private DatabaseAdapter dbAdapter;
+ private HashMap<String, DatabaseAdapter> origDbAdapters;
+ private final static String TABLE_NAME_KEY = "tableName";
+ private final static String MAX_ROWS_KEY = "maxRows";
+
+
+ @BeforeClass
+ public static void setupBeforeClass() {
+ System.setProperty("derby.stream.error.file", "target/derby.log");
+
+ // remove previous test database, if any
+ final File dbLocation = new File(DB_LOCATION);
+ try {
+ FileUtils.deleteFile(dbLocation, true);
+ } catch (IOException ioe) {
+ // Do nothing, may not have existed
+ }
+ }
+
+ @AfterClass
+ public static void cleanUpAfterClass() throws Exception {
+ try {
+ DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";shutdown=true");
+ } catch (SQLNonTransientConnectionException e) {
+ // Do nothing, this is what happens at Derby shutdown
+ }
+ // remove previous test database, if any
+ final File dbLocation = new File(DB_LOCATION);
+ try {
+ FileUtils.deleteFile(dbLocation, true);
+ } catch (IOException ioe) {
+ // Do nothing, may not have existed
+ }
+ }
+
+
+ @Before
+ public void setup() throws InitializationException, IOException {
+ final DBCPService dbcp = new DBCPServiceSimpleImpl();
+ final Map<String, String> dbcpProperties = new HashMap<>();
+ origDbAdapters = new HashMap<>(QueryDatabaseTableRecord.dbAdapters);
+ dbAdapter = new GenericDatabaseAdapter();
+ QueryDatabaseTableRecord.dbAdapters.put(dbAdapter.getName(), dbAdapter);
+ processor = new MockQueryDatabaseTableRecord();
+ runner = TestRunners.newTestRunner(processor);
+ runner.addControllerService("dbcp", dbcp, dbcpProperties);
+ runner.enableControllerService(dbcp);
+ runner.setProperty(QueryDatabaseTableRecord.DBCP_SERVICE, "dbcp");
+ runner.setProperty(QueryDatabaseTableRecord.DB_TYPE, dbAdapter.getName());
+ runner.getStateManager().clear(Scope.CLUSTER);
+ MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
+ runner.addControllerService("writer", recordWriter);
+ runner.setProperty(QueryDatabaseTableRecord.RECORD_WRITER_FACTORY, "writer");
+ runner.enableControllerService(recordWriter);
+ }
+
+ @After
+ public void teardown() throws IOException {
+ runner.getStateManager().clear(Scope.CLUSTER);
+ runner = null;
+ QueryDatabaseTableRecord.dbAdapters.clear();
+ QueryDatabaseTableRecord.dbAdapters.putAll(origDbAdapters);
+ }
+
+ @Test
+ public void testGetQuery() throws Exception {
+ String query = processor.getQuery(dbAdapter, "myTable", null, null, null, null);
+ assertEquals("SELECT * FROM myTable", query);
+ query = processor.getQuery(dbAdapter, "myTable", "col1,col2", null, null, null);
+ assertEquals("SELECT col1,col2 FROM myTable", query);
+
+ query = processor.getQuery(dbAdapter, "myTable", null, Collections.singletonList("id"), null, null);
+ assertEquals("SELECT * FROM myTable", query);
+
+ Map<String, String> maxValues = new HashMap<>();
+ maxValues.put("id", "509");
+ StateManager stateManager = runner.getStateManager();
+ stateManager.setState(maxValues, Scope.CLUSTER);
+ processor.putColumnType(AbstractDatabaseFetchProcessor.getStateKey("mytable", "id", dbAdapter), Types.INTEGER);
+ query = processor.getQuery(dbAdapter, "myTable", null, Collections.singletonList("id"), null, stateManager.getState(Scope.CLUSTER).toMap());
+ assertEquals("SELECT * FROM myTable WHERE id > 509", query);
+
+ maxValues.put("date_created", "2016-03-07 12:34:56");
+ stateManager.setState(maxValues, Scope.CLUSTER);
+ processor.putColumnType(AbstractDatabaseFetchProcessor.getStateKey("mytable", "date_created", dbAdapter), Types.TIMESTAMP);
+ query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"), null, stateManager.getState(Scope.CLUSTER).toMap());
+ assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= '2016-03-07 12:34:56'", query);
+
+ // Double quotes can be used to escape column and table names with most ANSI compatible database engines.
+ maxValues.put("mytable@!@date-created", "2016-03-07 12:34:56");
+ stateManager.setState(maxValues, Scope.CLUSTER);
+ processor.putColumnType(AbstractDatabaseFetchProcessor.getStateKey("\"myTable\"", "\"DATE-CREATED\"", dbAdapter), Types.TIMESTAMP);
+ query = processor.getQuery(dbAdapter, "\"myTable\"", null, Arrays.asList("id", "\"DATE-CREATED\""), null, stateManager.getState(Scope.CLUSTER).toMap());
+ assertEquals("SELECT * FROM \"myTable\" WHERE id > 509 AND \"DATE-CREATED\" >= '2016-03-07 12:34:56'", query);
+
+ // Back-ticks can be used to escape MySQL column and table names.
+ dbAdapter = new MySQLDatabaseAdapter();
+ processor.putColumnType(AbstractDatabaseFetchProcessor.getStateKey("`myTable`", "`DATE-CREATED`", dbAdapter), Types.TIMESTAMP);
+ query = processor.getQuery(dbAdapter, "`myTable`", null, Arrays.asList("id", "`DATE-CREATED`"), null, stateManager.getState(Scope.CLUSTER).toMap());
+ assertEquals("SELECT * FROM `myTable` WHERE id > 509 AND `DATE-CREATED` >= '2016-03-07 12:34:56'", query);
+
+ // Square brackets can be used to escape Microsoft SQL Server column and table names.
+ dbAdapter = new MSSQLDatabaseAdapter();
+ processor.putColumnType(AbstractDatabaseFetchProcessor.getStateKey("[myTable]", "[DATE-CREATED]", dbAdapter), Types.TIMESTAMP);
+ query = processor.getQuery(dbAdapter, "[myTable]", null, Arrays.asList("id", "[DATE-CREATED]"), null, stateManager.getState(Scope.CLUSTER).toMap());
+ assertEquals("SELECT * FROM [myTable] WHERE id > 509 AND [DATE-CREATED] >= '2016-03-07 12:34:56'", query);
+
+ // Test Oracle strategy
+ dbAdapter = new OracleDatabaseAdapter();
+ query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"), "type = \"CUSTOMER\"", stateManager.getState(Scope.CLUSTER).toMap());
+ assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= timestamp '2016-03-07 12:34:56' AND (type = \"CUSTOMER\")", query);
+
+ // Test time.
+ processor.putColumnType("mytable" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "time_created", Types.TIME);
+ maxValues.clear();
+ maxValues.put("id", "509");
+ maxValues.put("time_created", "12:34:57");
+ maxValues.put("date_created", "2016-03-07 12:34:56");
+ stateManager = runner.getStateManager();
+ stateManager.clear(Scope.CLUSTER);
+ stateManager.setState(maxValues, Scope.CLUSTER);
+ query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED", "TIME_CREATED"), "type = \"CUSTOMER\"", stateManager.getState(Scope.CLUSTER).toMap());
+ assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= timestamp '2016-03-07 12:34:56' AND TIME_CREATED >= timestamp '12:34:57' AND (type = \"CUSTOMER\")", query);
+ dbAdapter = new GenericDatabaseAdapter();
+ query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED", "TIME_CREATED"), "type = \"CUSTOMER\"", stateManager.getState(Scope.CLUSTER).toMap());
+ assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= '2016-03-07 12:34:56' AND TIME_CREATED >= '12:34:57' AND (type = \"CUSTOMER\")", query);
+ }
+
+ @Test
+ public void testGetQueryUsingPhoenixAdapter() throws Exception {
+ Map<String, String> maxValues = new HashMap<>();
+ StateManager stateManager = runner.getStateManager();
+ processor.putColumnType("mytable" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "id", Types.INTEGER);
+ processor.putColumnType("mytable" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "time_created", Types.TIME);
+ processor.putColumnType("mytable" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "date_created", Types.TIMESTAMP);
+
+ maxValues.put("id", "509");
+ maxValues.put("time_created", "12:34:57");
+ maxValues.put("date_created", "2016-03-07 12:34:56");
+ stateManager.setState(maxValues, Scope.CLUSTER);
+
+ dbAdapter = new PhoenixDatabaseAdapter();
+ String query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED", "TIME_CREATED"), "type = \"CUSTOMER\"", stateManager.getState(Scope.CLUSTER).toMap());
+ assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= timestamp '2016-03-07 12:34:56' AND TIME_CREATED >= time '12:34:57' AND (type = \"CUSTOMER\")", query);
+ // Cover the other path
+ dbAdapter = new GenericDatabaseAdapter();
+ query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED", "TIME_CREATED"), "type = \"CUSTOMER\"", stateManager.getState(Scope.CLUSTER).toMap());
+ assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= '2016-03-07 12:34:56' AND TIME_CREATED >= '12:34:57' AND (type = \"CUSTOMER\")", query);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testGetQueryNoTable() {
+ processor.getQuery(dbAdapter, null, null, null, null, null);
+ }
+
+ @Test
+ public void testAddedRows() throws SQLException, IOException {
+
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+
+ try {
+ stmt.execute("drop table TEST_QUERY_DB_TABLE");
+ } catch (final SQLException sqle) {
+ // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+ }
+
+ stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')");
+
+ runner.setProperty(QueryDatabaseTableRecord.TABLE_NAME, "TEST_QUERY_DB_TABLE");
+ runner.setIncomingConnection(false);
+ runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "ID");
+ runner.setProperty(QueryDatabaseTableRecord.MAX_ROWS_PER_FLOW_FILE, "2");
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 2);
+
+ MockFlowFile flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ assertEquals("TEST_QUERY_DB_TABLE", flowFile.getAttribute(QueryDatabaseTableRecord.RESULT_TABLENAME));
+ assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
+ runner.setProperty(QueryDatabaseTableRecord.FETCH_SIZE, "2");
+ flowFile.assertAttributeEquals("record.count", "2");
+
+ flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(1);
+ assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
+ flowFile.assertAttributeEquals("record.count", "1");
+ runner.clearTransferState();
+
+ // Run again, this time no flowfiles/rows should be transferred
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+ runner.clearTransferState();
+
+ //Remove Max Rows Per Flow File
+ runner.setProperty(QueryDatabaseTableRecord.MAX_ROWS_PER_FLOW_FILE, "0");
+
+ // Add a new row with a higher ID and run, one flowfile with one new row should be transferred
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (3, 'Mary West', 15.0, '2000-01-01 03:23:34.234')");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+ flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ assertEquals(flowFile.getAttribute("maxvalue.id"), "3");
+ flowFile.assertAttributeEquals("record.count", "1");
+
+ // Sanity check - run again, this time no flowfiles/rows should be transferred
+ runner.clearTransferState();
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+ runner.clearTransferState();
+
+ // Add timestamp as a max value column name
+ runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "id, created_on");
+
+ // Add a new row with a higher ID and run, one flow file will be transferred because no max value for the timestamp has been stored
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (4, 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+ flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ assertEquals(flowFile.getAttribute("maxvalue.id"), "4");
+ assertEquals(flowFile.getAttribute("maxvalue.created_on"), "2011-01-01 03:23:34.234");
+ flowFile.assertAttributeEquals("record.count", "1");
+ runner.clearTransferState();
+
+ // Add a new row with a higher ID but lower timestamp and run, no flow file will be transferred
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (5, 'NO NAME', 15.0, '2001-01-01 03:23:34.234')");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+ runner.clearTransferState();
+
+ // Add a new row with a higher ID and run, one flow file will be transferred
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (6, 'Mr. NiFi', 1.0, '2012-01-01 03:23:34.234')");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+ flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ flowFile.assertAttributeEquals("record.count", "1");
+ runner.clearTransferState();
+
+ // Set name as the max value column name (and clear the state), all rows should be returned since the max value for name has not been set
+ runner.getStateManager().clear(Scope.CLUSTER);
+ runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "name");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+ flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ flowFile.assertAttributeEquals("record.count", "7");
+ runner.clearTransferState();
+
+ // Add a new row with a "higher" name than the max but lower than "NULL" (to test that null values are skipped), one flow file will be transferred
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (7, 'NULK', 1.0, '2012-01-01 03:23:34.234')");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+ flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ flowFile.assertAttributeEquals("record.count", "1");
+ runner.clearTransferState();
+
+ // Set scale as the max value column name (and clear the state), all rows should be returned since the max value for name has not been set
+ runner.getStateManager().clear(Scope.CLUSTER);
+ runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "scale");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+ flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ flowFile.assertAttributeEquals("record.count", "8");
+ runner.clearTransferState();
+
+ // Add a new row with a higher value for scale than the max, one flow file will be transferred
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (8, 'NULK', 100.0, '2012-01-01 03:23:34.234')");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+ flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ flowFile.assertAttributeEquals("record.count", "1");
+ runner.clearTransferState();
+
+ // Set scale as the max value column name (and clear the state), all rows should be returned since the max value for name has not been set
+ runner.getStateManager().clear(Scope.CLUSTER);
+ runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "bignum");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+ flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ flowFile.assertAttributeEquals("record.count", "9");
+ runner.clearTransferState();
+
+ // Add a new row with a higher value for scale than the max, one flow file will be transferred
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on, bignum) VALUES (9, 'Alice Bob', 100.0, '2012-01-01 03:23:34.234', 1)");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+ flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ flowFile.assertAttributeEquals("record.count", "1");
+ runner.clearTransferState();
+ }
+
+ @Test
+ public void testAddedRowsTwoTables() throws SQLException {
+
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+
+ try {
+ stmt.execute("drop table TEST_QUERY_DB_TABLE");
+ } catch (final SQLException sqle) {
+ // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+ }
+
+ try {
+ stmt.execute("drop table TEST_QUERY_DB_TABLE2");
+ } catch (final SQLException sqle) {
+ // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+ }
+
+ stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')");
+
+ runner.setProperty(QueryDatabaseTableRecord.TABLE_NAME, "TEST_QUERY_DB_TABLE");
+ runner.setIncomingConnection(false);
+ runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "ID");
+ runner.setProperty(QueryDatabaseTableRecord.MAX_ROWS_PER_FLOW_FILE, "2");
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 2);
+
+ MockFlowFile flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ assertEquals("TEST_QUERY_DB_TABLE", flowFile.getAttribute(QueryDatabaseTableRecord.RESULT_TABLENAME));
+ assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
+ flowFile.assertAttributeEquals("record.count", "2");
+ flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(1);
+ assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
+ flowFile.assertAttributeEquals("record.count", "1");
+ runner.clearTransferState();
+
+ // Populate a second table and set
+ stmt.execute("create table TEST_QUERY_DB_TABLE2 (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE2 (id, name, scale, created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE2 (id, name, scale, created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE2 (id, name, scale, created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')");
+
+ runner.setProperty(QueryDatabaseTableRecord.TABLE_NAME, "TEST_QUERY_DB_TABLE2");
+ runner.setProperty(QueryDatabaseTableRecord.MAX_ROWS_PER_FLOW_FILE, "0");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+
+ flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ assertEquals("TEST_QUERY_DB_TABLE2", flowFile.getAttribute(QueryDatabaseTableRecord.RESULT_TABLENAME));
+ assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
+ flowFile.assertAttributeEquals("record.count", "3");
+ runner.clearTransferState();
+
+ // Add a new row with a higher ID and run, one flowfile with one new row should be transferred
+ stmt.execute("insert into TEST_QUERY_DB_TABLE2 (id, name, scale, created_on) VALUES (3, 'Mary West', 15.0, '2000-01-01 03:23:34.234')");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+ flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ assertEquals(flowFile.getAttribute("maxvalue.id"), "3");
+ flowFile.assertAttributeEquals("record.count", "1");
+
+ // Sanity check - run again, this time no flowfiles/rows should be transferred
+ runner.clearTransferState();
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+ runner.clearTransferState();
+ }
+
+ @Test
+ public void testMultiplePartitions() throws SQLException {
+
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+
+ try {
+ stmt.execute("drop table TEST_QUERY_DB_TABLE");
+ } catch (final SQLException sqle) {
+ // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+ }
+
+ stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, bucket integer not null)");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (0, 0)");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (1, 0)");
+
+ runner.setProperty(QueryDatabaseTableRecord.TABLE_NAME, "TEST_QUERY_DB_TABLE");
+ runner.setIncomingConnection(false);
+ runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "ID, BUCKET");
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+ assertEquals("2",
+ runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0).getAttribute(QueryDatabaseTableRecord.RESULT_ROW_COUNT)
+ );
+ runner.clearTransferState();
+
+ // Add a new row in the same bucket
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (2, 0)");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+ assertEquals("1",
+ runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0).getAttribute(QueryDatabaseTableRecord.RESULT_ROW_COUNT)
+ );
+ runner.clearTransferState();
+
+ // Add a new row in a new bucket
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (3, 1)");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+ assertEquals("1",
+ runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0).getAttribute(QueryDatabaseTableRecord.RESULT_ROW_COUNT)
+ );
+ runner.clearTransferState();
+
+ // Add a new row in an old bucket, it should not be transferred
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (4, 0)");
+ runner.run();
+ runner.assertTransferCount(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+
+ // Add a new row in the second bucket, only the new row should be transferred
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (5, 1)");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+ assertEquals("1",
+ runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0).getAttribute(QueryDatabaseTableRecord.RESULT_ROW_COUNT)
+ );
+ runner.clearTransferState();
+ }
+
+ @Test
+ public void testTimestampNanos() throws SQLException {
+
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+
+ try {
+ stmt.execute("drop table TEST_QUERY_DB_TABLE");
+ } catch (final SQLException sqle) {
+ // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+ }
+
+ stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.000123456')");
+
+ runner.setProperty(QueryDatabaseTableRecord.TABLE_NAME, "TEST_QUERY_DB_TABLE");
+ runner.setIncomingConnection(false);
+ runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "created_on");
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+ MockFlowFile flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ flowFile.assertAttributeEquals("record.count", "1");
+ runner.clearTransferState();
+
+ // Run again, this time no flowfiles/rows should be transferred
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+ runner.clearTransferState();
+
+ // Add a new row with a lower timestamp (but same millisecond value), no flow file should be transferred
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (3, 'Mary West', 15.0, '2000-01-01 03:23:34.000')");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+ runner.clearTransferState();
+
+ // Add a new row with a higher timestamp, one flow file should be transferred
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (3, 'Mary West', 15.0, '2000-01-01 03:23:34.0003')");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+ flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ flowFile.assertAttributeEquals("record.count", "1");
+ runner.clearTransferState();
+
+ // Run again, this time no flowfiles/rows should be transferred
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+ runner.clearTransferState();
+ }
+
+ @Test
+ public void testWithNullIntColumn() throws SQLException {
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+
+ try {
+ stmt.execute("drop table TEST_NULL_INT");
+ } catch (final SQLException sqle) {
+ // Ignore, usually due to Derby not having DROP TABLE IF EXISTS
+ }
+
+ stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
+
+ stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (0, NULL, 1)");
+ stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (1, 1, 1)");
+
+ runner.setIncomingConnection(false);
+ runner.setProperty(QueryDatabaseTableRecord.TABLE_NAME, "TEST_NULL_INT");
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+ runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0).assertAttributeEquals(QueryDatabaseTableRecord.RESULT_ROW_COUNT, "2");
+ }
+
+ @Test
+ public void testWithRuntimeException() throws SQLException {
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+
+ try {
+ stmt.execute("drop table TEST_NULL_INT");
+ } catch (final SQLException sqle) {
+ // Ignore, usually due to Derby not having DROP TABLE IF EXISTS
+ }
+
+ stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
+
+ stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (0, NULL, 1)");
+ stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (1, 1, 1)");
+
+ runner.setIncomingConnection(false);
+ runner.setProperty(QueryDatabaseTableRecord.TABLE_NAME, "TEST_NULL_INT");
+ runner.setProperty(AbstractDatabaseFetchProcessor.MAX_VALUE_COLUMN_NAMES, "id");
+
+ QueryDatabaseTableRecord.dbAdapters.put(dbAdapter.getName(), new GenericDatabaseAdapter() {
+ @Override
+ public String getName() {
+ throw new RuntimeException("test");
+ }
+ });
+ runner.run();
+
+ assertTrue(runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).isEmpty());
+ }
+
+ @Test
+ public void testWithSqlException() throws SQLException {
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+
+ try {
+ stmt.execute("drop table TEST_NO_ROWS");
+ } catch (final SQLException sqle) {
+ // Ignore, usually due to Derby not having DROP TABLE IF EXISTS
+ }
+
+ stmt.execute("create table TEST_NO_ROWS (id integer)");
+
+ runner.setIncomingConnection(false);
+ // Try a valid SQL statement that will generate an error (val1 does not exist, e.g.)
+ runner.setProperty(QueryDatabaseTableRecord.TABLE_NAME, "TEST_NO_ROWS");
+ runner.setProperty(QueryDatabaseTableRecord.COLUMN_NAMES, "val1");
+ runner.run();
+
+ assertTrue(runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).isEmpty());
+ }
+
+ @Test
+ public void testOutputBatchSize() throws SQLException {
+
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+ MockFlowFile mff;
+
+ try {
+ stmt.execute("drop table TEST_QUERY_DB_TABLE");
+ } catch (final SQLException sqle) {
+ // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+ }
+
+ stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+ int rowCount = 0;
+ // Create larger row set
+ for (int batch = 0; batch < 100; batch++) {
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+ rowCount++;
+ }
+
+ runner.setProperty(QueryDatabaseTableRecord.TABLE_NAME, "TEST_QUERY_DB_TABLE");
+ runner.setIncomingConnection(false);
+ runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "ID");
+ runner.setProperty(QueryDatabaseTableRecord.MAX_ROWS_PER_FLOW_FILE, "${" + MAX_ROWS_KEY + "}");
+ runner.setVariable(MAX_ROWS_KEY, "7");
+ runner.setProperty(QueryDatabaseTableRecord.OUTPUT_BATCH_SIZE, "${outputBatchSize}");
+ runner.setVariable("outputBatchSize", "4");
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 15);
+
+ // Ensure all but the last file have 7 records each
+ for (int ff = 0; ff < 14; ff++) {
+ mff = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(ff);
+ mff.assertAttributeEquals("record.count", "7");
+
+ mff.assertAttributeExists("fragment.identifier");
+ assertEquals(Integer.toString(ff), mff.getAttribute("fragment.index"));
+ // No fragment.count set for flow files sent when Output Batch Size is set
+ assertNull(mff.getAttribute("fragment.count"));
+ }
+
+ // Last file should have 2 records
+ mff = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(14);
+ mff.assertAttributeEquals("record.count", "2");
+ mff.assertAttributeExists("fragment.identifier");
+ assertEquals(Integer.toString(14), mff.getAttribute("fragment.index"));
+ // No fragment.count set for flow files sent when Output Batch Size is set
+ assertNull(mff.getAttribute("fragment.count"));
+ }
+
+ @Test
+ public void testMaxRowsPerFlowFile() throws IOException, SQLException {
+
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+ MockFlowFile mff;
+
+ try {
+ stmt.execute("drop table TEST_QUERY_DB_TABLE");
+ } catch (final SQLException sqle) {
+ // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+ }
+
+ stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+ int rowCount = 0;
+ //create larger row set
+ for (int batch = 0; batch < 100; batch++) {
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+ rowCount++;
+ }
+
+ runner.setProperty(QueryDatabaseTableRecord.TABLE_NAME, "TEST_QUERY_DB_TABLE");
+ runner.setIncomingConnection(false);
+ runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "ID");
+ runner.setProperty(QueryDatabaseTableRecord.MAX_ROWS_PER_FLOW_FILE, "${" + MAX_ROWS_KEY + "}");
+ runner.setVariable(MAX_ROWS_KEY, "9");
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 12);
+
+ //ensure all but the last file have 9 records each
+ for (int ff = 0; ff < 11; ff++) {
+ mff = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(ff);
+ mff.assertAttributeEquals("record.count", "9");
+
+ mff.assertAttributeExists("fragment.identifier");
+ assertEquals(Integer.toString(ff), mff.getAttribute("fragment.index"));
+ assertEquals("12", mff.getAttribute("fragment.count"));
+ }
+
+ //last file should have 1 record
+ mff = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(11);
+ mff.assertAttributeEquals("record.count", "1");
+ mff.assertAttributeExists("fragment.identifier");
+ assertEquals(Integer.toString(11), mff.getAttribute("fragment.index"));
+ assertEquals("12", mff.getAttribute("fragment.count"));
+ runner.clearTransferState();
+
+ // Run again, this time no flowfiles/rows should be transferred
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+ runner.clearTransferState();
+
+ // Run again, this time should be a single partial flow file
+ for (int batch = 0; batch < 5; batch++) {
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+ rowCount++;
+ }
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+ mff = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ mff.assertAttributeExists("fragment.identifier");
+ assertEquals(Integer.toString(0), mff.getAttribute("fragment.index"));
+ assertEquals("1", mff.getAttribute("fragment.count"));
+ mff.assertAttributeEquals("record.count", "5");
+ runner.clearTransferState();
+
+ // Run again, this time should be a full batch and a partial
+ for (int batch = 0; batch < 14; batch++) {
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+ rowCount++;
+ }
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 2);
+ mff = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ mff.assertAttributeEquals("record.count", "9");
+ mff = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(1);
+ mff.assertAttributeEquals("record.count", "5");
+ runner.clearTransferState();
+
+ // Run again with a cleaned state. Should get all rows split into batches
+ int ffCount = (int) Math.ceil(rowCount / 9D);
+ runner.getStateManager().clear(Scope.CLUSTER);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, ffCount);
+
+ //ensure all but the last file have 9 records each
+ for (int ff = 0; ff < ffCount - 1; ff++) {
+ mff = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(ff);
+ mff.assertAttributeEquals("record.count", "9");
+ }
+
+ mff = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(ffCount - 1);
+ mff.assertAttributeEquals("record.count", Integer.toString(rowCount % 9));
+ runner.clearTransferState();
+ }
+
+ @Test
+ public void testMaxRowsPerFlowFileWithMaxFragments() throws SQLException {
+
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+ MockFlowFile mff;
+
+ try {
+ stmt.execute("drop table TEST_QUERY_DB_TABLE");
+ } catch (final SQLException sqle) {
+ // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+ }
+
+ stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+ int rowCount = 0;
+ //create larger row set
+ for (int batch = 0; batch < 100; batch++) {
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+ rowCount++;
+ }
+
+ runner.setProperty(QueryDatabaseTableRecord.TABLE_NAME, "TEST_QUERY_DB_TABLE");
+ runner.setIncomingConnection(false);
+ runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "ID");
+ runner.setProperty(QueryDatabaseTableRecord.MAX_ROWS_PER_FLOW_FILE, "9");
+ Integer maxFragments = 3;
+ runner.setProperty(QueryDatabaseTableRecord.MAX_FRAGMENTS, maxFragments.toString());
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, maxFragments);
+
+ for (int i = 0; i < maxFragments; i++) {
+ mff = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(i);
+ mff.assertAttributeEquals("record.count", "9");
+
+ mff.assertAttributeExists("fragment.identifier");
+ assertEquals(Integer.toString(i), mff.getAttribute("fragment.index"));
+ assertEquals(maxFragments.toString(), mff.getAttribute("fragment.count"));
+ }
+
+ runner.clearTransferState();
+ }
+
+ @Test
+ public void testInitialMaxValue() throws SQLException {
+
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+
+ try {
+ stmt.execute("drop table TEST_QUERY_DB_TABLE");
+ } catch (final SQLException sqle) {
+ // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+ }
+
+ stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+
+ Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+ cal.setTimeInMillis(0);
+
+ SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+ dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+ int rowCount = 0;
+ //create larger row set
+ for (int batch = 0; batch < 10; batch++) {
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '" + dateFormat.format(cal.getTime().getTime()) + "')");
+
+ rowCount++;
+ cal.add(Calendar.MINUTE, 1);
+ }
+
+ runner.setProperty(QueryDatabaseTableRecord.TABLE_NAME, "${" + TABLE_NAME_KEY + "}");
+ runner.setVariable(TABLE_NAME_KEY, "TEST_QUERY_DB_TABLE");
+ runner.setIncomingConnection(false);
+ runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "created_on");
+
+ cal.setTimeInMillis(0);
+ cal.add(Calendar.MINUTE, 5);
+ runner.setProperty("initial.maxvalue.CREATED_ON", dateFormat.format(cal.getTime().getTime()));
+ // Initial run with no previous state. Should get only last 4 records
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+ MockFlowFile flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ flowFile.assertAttributeEquals("record.count", "4");
+ runner.getStateManager().assertStateEquals("test_query_db_table" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01 00:09:00.0", Scope.CLUSTER);
+ runner.clearTransferState();
+
+ // Run again, this time no flowfiles/rows should be transferred
+ // Validate Max Value doesn't change also
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+ runner.getStateManager().assertStateEquals("test_query_db_table" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01 00:09:00.0", Scope.CLUSTER);
+ runner.clearTransferState();
+ }
+
+ @Test
+ public void testInitialMaxValueWithEL() throws SQLException {
+
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+
+ try {
+ stmt.execute("drop table TEST_QUERY_DB_TABLE");
+ } catch (final SQLException sqle) {
+ // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+ }
+
+ stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+
+ Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+ cal.setTimeInMillis(0);
+
+ SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+ dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+ int rowCount = 0;
+ //create larger row set
+ for (int batch = 0; batch < 10; batch++) {
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '" + dateFormat.format(cal.getTime().getTime()) + "')");
+
+ rowCount++;
+ cal.add(Calendar.MINUTE, 1);
+ }
+
+ runner.setProperty(QueryDatabaseTableRecord.TABLE_NAME, "${" + TABLE_NAME_KEY + "}");
+ runner.setVariable(TABLE_NAME_KEY, "TEST_QUERY_DB_TABLE");
+ runner.setIncomingConnection(false);
+ runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "created_on");
+
+ cal.setTimeInMillis(0);
+ cal.add(Calendar.MINUTE, 5);
+ runner.setProperty("initial.maxvalue.CREATED_ON", "${created.on}");
+ runner.setVariable("created.on", dateFormat.format(cal.getTime().getTime()));
+ // Initial run with no previous state. Should get only last 4 records
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+ MockFlowFile flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ flowFile.assertAttributeEquals("record.count", "4");
+ runner.getStateManager().assertStateEquals("test_query_db_table" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01 00:09:00.0", Scope.CLUSTER);
+ runner.clearTransferState();
+
+ // Run again, this time no flowfiles/rows should be transferred
+ // Validate Max Value doesn't change also
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+ runner.getStateManager().assertStateEquals("test_query_db_table" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01 00:09:00.0", Scope.CLUSTER);
+ runner.clearTransferState();
+
+ // Append a new row, expect 1 flowfile one row
+ cal.setTimeInMillis(0);
+ cal.add(Calendar.MINUTE, rowCount);
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '" + dateFormat.format(cal.getTime().getTime()) + "')");
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+ flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ flowFile.assertAttributeEquals("record.count", "1");
+ runner.getStateManager().assertStateEquals("test_query_db_table" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01 00:10:00.0", Scope.CLUSTER);
+ runner.clearTransferState();
+ }
+
+ @Test
+ public void testAddedRowsCustomWhereClause() throws SQLException, IOException {
+
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+
+ try {
+ stmt.execute("drop table TEST_QUERY_DB_TABLE");
+ } catch (final SQLException sqle) {
+ // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+ }
+
+ stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, type varchar(20), name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (0, 'male', 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (1, 'female', 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (2, NULL, NULL, 2.0, '2010-01-01 00:00:00')");
+
+ runner.setProperty(QueryDatabaseTableRecord.TABLE_NAME, "TEST_QUERY_DB_TABLE");
+ runner.setProperty(QueryDatabaseTableRecord.WHERE_CLAUSE, "type = 'male'");
+ runner.setIncomingConnection(false);
+ runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "ID");
+ runner.setProperty(QueryDatabaseTableRecord.MAX_ROWS_PER_FLOW_FILE, "2");
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+
+ MockFlowFile flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ assertEquals("TEST_QUERY_DB_TABLE", flowFile.getAttribute(QueryDatabaseTableRecord.RESULT_TABLENAME));
+ assertEquals(flowFile.getAttribute("maxvalue.id"), "0");
+ flowFile.assertAttributeEquals("record.count", "1");
+
+ runner.clearTransferState();
+
+ // Run again, this time no flowfiles/rows should be transferred
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+ runner.clearTransferState();
+
+ //Remove Max Rows Per Flow File
+ runner.setProperty(QueryDatabaseTableRecord.MAX_ROWS_PER_FLOW_FILE, "0");
+
+ // Add a new row with a higher ID and run, one flowfile with one new row should be transferred
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (3, 'female', 'Mary West', 15.0, '2000-01-01 03:23:34.234')");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+ runner.clearTransferState();
+
+ // Sanity check - run again, this time no flowfiles/rows should be transferred
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+ runner.clearTransferState();
+
+ // Add timestamp as a max value column name
+ runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "id, created_on");
+
+ // Add a new row with a higher ID and run, one flow file will be transferred because no max value for the timestamp has been stored
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (4, 'male', 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+ flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ assertEquals(flowFile.getAttribute("maxvalue.id"), "4");
+ assertEquals(flowFile.getAttribute("maxvalue.created_on"), "2011-01-01 03:23:34.234");
+ flowFile.assertAttributeEquals("record.count", "1");
+ runner.clearTransferState();
+
+ // Add a new row with a higher ID but lower timestamp and run, no flow file will be transferred
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (5, 'male', 'NO NAME', 15.0, '2001-01-01 03:23:34.234')");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+ runner.clearTransferState();
+
+ // Add a new row with a higher ID and run, one flow file will be transferred
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (6, 'male', 'Mr. NiFi', 1.0, '2012-01-01 03:23:34.234')");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+ flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ flowFile.assertAttributeEquals("record.count", "1");
+ runner.clearTransferState();
+
+ // Set name as the max value column name (and clear the state), all rows should be returned since the max value for name has not been set
+ runner.getStateManager().clear(Scope.CLUSTER);
+ runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "name");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+ flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ flowFile.assertAttributeEquals("record.count", "4");
+ runner.clearTransferState();
+
+ // Add a new row with a "higher" name than the max but lower than "NULL" (to test that null values are skipped), one flow file will be transferred
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (7, 'male', 'NULK', 1.0, '2012-01-01 03:23:34.234')");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+ flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ flowFile.assertAttributeEquals("record.count", "1");
+ runner.clearTransferState();
+
+ // Set scale as the max value column name (and clear the state), all rows should be returned since the max value for name has not been set
+ runner.getStateManager().clear(Scope.CLUSTER);
+ runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "scale");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+ flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ flowFile.assertAttributeEquals("record.count", "5");
+ runner.clearTransferState();
+
+ // Add a new row with a higher value for scale than the max, one flow file will be transferred
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (8, 'male', 'NULK', 100.0, '2012-01-01 03:23:34.234')");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+ flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ flowFile.assertAttributeEquals("record.count", "1");
+ runner.clearTransferState();
+
+ // Set scale as the max value column name (and clear the state), all rows should be returned since the max value for name has not been set
+ runner.getStateManager().clear(Scope.CLUSTER);
+ runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "bignum");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+ flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ flowFile.assertAttributeEquals("record.count", "6");
+ runner.clearTransferState();
+
+ // Add a new row with a higher value for scale than the max, one flow file will be transferred
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on, bignum) VALUES (9, 'female', 'Alice Bob', 100.0, '2012-01-01 03:23:34.234', 1)");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+ runner.clearTransferState();
+ }
+
+ @Test
+ public void testCustomSQL() throws SQLException, IOException {
+
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+
+ try {
+ stmt.execute("drop table TEST_QUERY_DB_TABLE");
+ } catch (final SQLException sqle) {
+ // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+ }
+
+ try {
+ stmt.execute("drop table TYPE_LIST");
+ } catch (final SQLException sqle) {
+ // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+ }
+
+ stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, type varchar(20), name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (0, 'male', 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (1, 'female', 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (2, NULL, NULL, 2.0, '2010-01-01 00:00:00')");
+
+ stmt.execute("create table TYPE_LIST (type_id integer not null, type varchar(20), descr varchar(255))");
+ stmt.execute("insert into TYPE_LIST (type_id, type,descr) VALUES (0, 'male', 'Man')");
+ stmt.execute("insert into TYPE_LIST (type_id, type,descr) VALUES (1, 'female', 'Woman')");
+ stmt.execute("insert into TYPE_LIST (type_id, type,descr) VALUES (2, '', 'Unspecified')");
+
+ runner.setProperty(QueryDatabaseTableRecord.TABLE_NAME, "TEST_QUERY_DB_TABLE");
+ runner.setProperty(QueryDatabaseTableRecord.SQL_QUERY,
+ "SELECT id, b.type as gender, b.descr, name, scale, created_on, bignum FROM TEST_QUERY_DB_TABLE a INNER JOIN TYPE_LIST b ON (a.type=b.type)");
+ runner.setProperty(QueryDatabaseTableRecord.WHERE_CLAUSE, "gender = 'male'");
+ runner.setIncomingConnection(false);
+ runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "ID");
+ runner.setProperty(QueryDatabaseTableRecord.MAX_ROWS_PER_FLOW_FILE, "2");
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+
+ MockFlowFile flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ assertEquals("TEST_QUERY_DB_TABLE", flowFile.getAttribute(QueryDatabaseTableRecord.RESULT_TABLENAME));
+ assertEquals(flowFile.getAttribute("maxvalue.id"), "0");
+ flowFile.assertAttributeEquals("record.count", "1");
+
+ runner.clearTransferState();
+
+ // Run again, this time no flowfiles/rows should be transferred
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+ runner.clearTransferState();
+
+ //Remove Max Rows Per Flow File
+ runner.setProperty(QueryDatabaseTableRecord.MAX_ROWS_PER_FLOW_FILE, "0");
+
+ // Add a new row with a higher ID and run, one flowfile with one new row should be transferred
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (3, 'female', 'Mary West', 15.0, '2000-01-01 03:23:34.234')");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+ runner.clearTransferState();
+
+ // Sanity check - run again, this time no flowfiles/rows should be transferred
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+ runner.clearTransferState();
+
+ // Add timestamp as a max value column name
+ runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "id, created_on");
+
+ // Add a new row with a higher ID and run, one flow file will be transferred because no max value for the timestamp has been stored
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (4, 'male', 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+ flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ assertEquals(flowFile.getAttribute("maxvalue.id"), "4");
+ assertEquals(flowFile.getAttribute("maxvalue.created_on"), "2011-01-01 03:23:34.234");
+ flowFile.assertAttributeEquals("record.count", "1");
+ runner.clearTransferState();
+
+ // Add a new row with a higher ID but lower timestamp and run, no flow file will be transferred
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (5, 'male', 'NO NAME', 15.0, '2001-01-01 03:23:34.234')");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+ runner.clearTransferState();
+
+ // Add a new row with a higher ID and run, one flow file will be transferred
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (6, 'male', 'Mr. NiFi', 1.0, '2012-01-01 03:23:34.234')");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+ flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ flowFile.assertAttributeEquals("record.count", "1");
+ runner.clearTransferState();
+
+ // Set name as the max value column name (and clear the state), all rows should be returned since the max value for name has not been set
+ runner.getStateManager().clear(Scope.CLUSTER);
+ runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "name");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+ flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ flowFile.assertAttributeEquals("record.count", "4");
+ runner.clearTransferState();
+
+ // Add a new row with a "higher" name than the max but lower than "NULL" (to test that null values are skipped), one flow file will be transferred
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (7, 'male', 'NULK', 1.0, '2012-01-01 03:23:34.234')");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+ flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ flowFile.assertAttributeEquals("record.count", "1");
+ runner.clearTransferState();
+
+ // Set scale as the max value column name (and clear the state), all rows should be returned since the max value for name has not been set
+ runner.getStateManager().clear(Scope.CLUSTER);
+ runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "scale");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+ flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ flowFile.assertAttributeEquals("record.count", "5");
+ runner.clearTransferState();
+
+ // Add a new row with a higher value for scale than the max, one flow file will be transferred
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (8, 'male', 'NULK', 100.0, '2012-01-01 03:23:34.234')");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+ flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ flowFile.assertAttributeEquals("record.count", "1");
+ runner.clearTransferState();
+
+ // Set scale as the max value column name (and clear the state), all rows should be returned since the max value for name has not been set
+ runner.getStateManager().clear(Scope.CLUSTER);
+ runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "bignum");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+ flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ flowFile.assertAttributeEquals("record.count", "6");
+ runner.clearTransferState();
+
+ // Add a new row with a higher value for scale than the max, one flow file will be transferred
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on, bignum) VALUES (9, 'female', 'Alice Bob', 100.0, '2012-01-01 03:23:34.234', 1)");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+ runner.clearTransferState();
+ }
+
+ @Test(expected = AssertionError.class)
+ public void testMissingColumn() throws ProcessException, SQLException {
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+
+ try {
+ stmt.execute("drop table TEST_QUERY_DB_TABLE");
+ } catch (final SQLException sqle) {
+ // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+ }
+
+ try {
+ stmt.execute("drop table TYPE_LIST");
+ } catch (final SQLException sqle) {
+ // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+ }
+
+ stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, type varchar(20), name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (0, 'male', 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (1, 'female', 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (2, NULL, NULL, 2.0, '2010-01-01 00:00:00')");
+
+ stmt.execute("create table TYPE_LIST (type_id integer not null, type varchar(20), descr varchar(255))");
+ stmt.execute("insert into TYPE_LIST (type_id, type,descr) VALUES (0, 'male', 'Man')");
+ stmt.execute("insert into TYPE_LIST (type_id, type,descr) VALUES (1, 'female', 'Woman')");
+ stmt.execute("insert into TYPE_LIST (type_id, type,descr) VALUES (2, '', 'Unspecified')");
+
+ runner.setProperty(QueryDatabaseTableRecord.TABLE_NAME, "TYPE_LIST");
+ runner.setProperty(QueryDatabaseTableRecord.SQL_QUERY, "SELECT b.type, b.descr, name, scale, created_on, bignum FROM TEST_QUERY_DB_TABLE a INNER JOIN TYPE_LIST b ON (a.type=b.type)");
+ runner.setProperty(QueryDatabaseTableRecord.WHERE_CLAUSE, "type = 'male'");
+ runner.setIncomingConnection(false);
+ runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "ID");
+ runner.setProperty(QueryDatabaseTableRecord.MAX_ROWS_PER_FLOW_FILE, "2");
+
+ runner.run();
+ }
+
+ @Test
+ public void testWithExceptionAfterSomeRowsProcessed() throws SQLException {
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+
+ try {
+ stmt.execute("drop table TEST_NULL_INT");
+ } catch (final SQLException sqle) {
+ // Ignore, usually due to Derby not having DROP TABLE IF EXISTS
+ }
+
+ stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
+
+ stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (1, NULL, 1)");
+ stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (2, 1, 1)");
+
+ runner.setIncomingConnection(false);
+ runner.setProperty(QueryDatabaseTableRecord.TABLE_NAME, "TEST_NULL_INT");
+ runner.setProperty(AbstractDatabaseFetchProcessor.MAX_VALUE_COLUMN_NAMES, "id");
+
+ // Override adapter with one that fails after the first row is processed
+ QueryDatabaseTableRecord.dbAdapters.put(dbAdapter.getName(), new GenericDatabaseAdapter() {
+ boolean fail = false;
+
+ @Override
+ public String getName() {
+ if (!fail) {
+ fail = true;
+ return super.getName();
+ }
+ throw new RuntimeException("test");
+ }
+ });
+ runner.run();
+ assertTrue(runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).isEmpty());
+ // State should not have been updated
+ runner.getStateManager().assertStateNotSet("test_null_int@!@id", Scope.CLUSTER);
+
+ // Restore original (working) adapter and run again
+ QueryDatabaseTableRecord.dbAdapters.put(dbAdapter.getName(), dbAdapter);
+ runner.run();
+ assertFalse(runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).isEmpty());
+ runner.getStateManager().assertStateEquals("test_null_int@!@id", "2", Scope.CLUSTER);
+ }
+
+ /**
+ * Simple implementation only for QueryDatabaseTableRecord processor testing.
+ */
+ private class DBCPServiceSimpleImpl extends AbstractControllerService implements DBCPService {
+
+ @Override
+ public String getIdentifier() {
+ return "dbcp";
+ }
+
+ @Override
+ public Connection getConnection() throws ProcessException {
+ try {
+ Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+ return DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true");
+ } catch (final Exception e) {
+ throw new ProcessException("getConnection failed: " + e);
+ }
+ }
+ }
+
+ @Stateful(scopes = Scope.CLUSTER, description = "Mock for QueryDatabaseTableRecord processor")
+ private static class MockQueryDatabaseTableRecord extends QueryDatabaseTableRecord {
+ void putColumnType(String colName, Integer colType) {
+ columnTypeMap.put(colName, colType);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
index 1624c6d..8b51fe2 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
@@ -38,13 +38,13 @@ import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.util.file.FileUtils;
-import org.fusesource.hawtbuf.ByteArrayInputStream;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
index 33633f2..63de91a 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
@@ -22,6 +22,7 @@ import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@@ -51,7 +52,6 @@ import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
-import org.fusesource.hawtbuf.ByteArrayInputStream;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -352,6 +352,48 @@ public class TestExecuteSQL {
}
@Test
+ public void testNoRowsStatementCreatesEmptyFlowFile() throws Exception {
+ // remove previous test database, if any
+ final File dbLocation = new File(DB_LOCATION);
+ dbLocation.delete();
+
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+
+ try {
+ stmt.execute("drop table TEST_NULL_INT");
+ } catch (final SQLException sqle) {
+ }
+
+ stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
+
+ runner.setIncomingConnection(true);
+ runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from TEST_NULL_INT");
+ runner.enqueue("Hello".getBytes());
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
+ MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0);
+ firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "0");
+ final InputStream in = new ByteArrayInputStream(firstFlowFile.toByteArray());
+ final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
+ try (DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(in, datumReader)) {
+ GenericRecord record = null;
+ long recordsFromStream = 0;
+ while (dataFileReader.hasNext()) {
+ // Reuse record object by passing it to next(). This saves us from
+ // allocating and garbage collecting many objects for files with
+ // many items.
+ record = dataFileReader.next(record);
+ recordsFromStream += 1;
+ }
+
+ assertEquals(0, recordsFromStream);
+ }
+ }
+
+ @Test
public void testWithDuplicateColumns() throws SQLException {
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);
[3/4] nifi git commit: NIFI-4517: Added ExecuteSQLRecord and
QueryDatabaseTableRecord processors
Posted by pv...@apache.org.
http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
new file mode 100644
index 0000000..31d0ec8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.standard.sql.RecordSqlWriter;
+import org.apache.nifi.processors.standard.sql.SqlWriter;
+import org.apache.nifi.processors.standard.util.JdbcCommon;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGICAL_TYPES;
+
+@EventDriven
+@InputRequirement(Requirement.INPUT_ALLOWED)
+@Tags({"sql", "select", "jdbc", "query", "database", "record"})
+@CapabilityDescription("Executes provided SQL select query. Query result will be converted to the format specified by a Record Writer. "
+ + "Streaming is used so arbitrarily large result sets are supported. This processor can be scheduled to run on "
+ + "a timer, or cron expression, using the standard scheduling methods, or it can be triggered by an incoming FlowFile. "
+ + "If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the "
+ + "select query, and the query may use the ? to escape parameters. In this case, the parameters to use must exist as FlowFile attributes "
+ + "with the naming convention sql.args.N.type and sql.args.N.value, where N is a positive integer. The sql.args.N.type is expected to be "
+ + "a number indicating the JDBC Type. The content of the FlowFile is expected to be in UTF-8 format. "
+ + "FlowFile attribute 'executesql.row.count' indicates how many rows were selected.")
+@ReadsAttributes({
+ @ReadsAttribute(attribute = "sql.args.N.type", description = "Incoming FlowFiles are expected to be parametrized SQL statements. The type of each Parameter is specified as an integer "
+ + "that represents the JDBC Type of the parameter."),
+ @ReadsAttribute(attribute = "sql.args.N.value", description = "Incoming FlowFiles are expected to be parametrized SQL statements. The value of the Parameters are specified as "
+ + "sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. The type of the sql.args.1.value Parameter is specified by the sql.args.1.type attribute."),
+ @ReadsAttribute(attribute = "sql.args.N.format", description = "This attribute is always optional, but default options may not always work for your data. "
+ + "Incoming FlowFiles are expected to be parametrized SQL statements. In some cases "
+ + "a format option needs to be specified, currently this is only applicable for binary data types, dates, times and timestamps. Binary Data Types (defaults to 'ascii') - "
+ + "ascii: each string character in your attribute value represents a single byte. This is the format provided by Avro Processors. "
+ + "base64: the string is a Base64 encoded string that can be decoded to bytes. "
+ + "hex: the string is hex encoded with all letters in upper case and no '0x' at the beginning. "
+ + "Dates/Times/Timestamps - "
+ + "Date, Time and Timestamp formats all support both custom formats or named format ('yyyy-MM-dd','ISO_OFFSET_DATE_TIME') "
+ + "as specified according to java.time.format.DateTimeFormatter. "
+ + "If not specified, a long value input is expected to be an unix epoch (milli seconds from 1970/1/1), or a string value in "
+ + "'yyyy-MM-dd' format for Date, 'HH:mm:ss.SSS' for Time (some database engines e.g. Derby or MySQL do not support milliseconds and will truncate milliseconds), "
+ + "'yyyy-MM-dd HH:mm:ss.SSS' for Timestamp is used.")
+})
+@WritesAttributes({
+ @WritesAttribute(attribute = "executesql.row.count", description = "Contains the number of rows returned in the select query"),
+ @WritesAttribute(attribute = "executesql.query.duration", description = "Combined duration of the query execution time and fetch time in milliseconds"),
+ @WritesAttribute(attribute = "executesql.query.executiontime", description = "Duration of the query execution time in milliseconds"),
+ @WritesAttribute(attribute = "executesql.query.fetchtime", description = "Duration of the result set fetch time in milliseconds"),
+ @WritesAttribute(attribute = "executesql.resultset.index", description = "Assuming multiple result sets are returned, "
+ + "the zero based index of this result set."),
+ @WritesAttribute(attribute = "fragment.identifier", description = "If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set "
+ + "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."),
+ @WritesAttribute(attribute = "fragment.count", description = "If 'Max Rows Per Flow File' is set then this is the total number of "
+ + "FlowFiles produced by a single ResultSet. This can be used in conjunction with the "
+ + "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming ResultSet. If Output Batch Size is set, then this "
+ + "attribute will not be populated."),
+ @WritesAttribute(attribute = "fragment.index", description = "If 'Max Rows Per Flow File' is set then the position of this FlowFile in the list of "
+ + "outgoing FlowFiles that were all derived from the same result set FlowFile. This can be "
+ + "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same query result set and in what order "
+ + "FlowFiles were produced"),
+ @WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer."),
+ @WritesAttribute(attribute = "record.count", description = "The number of records output by the Record Writer.")
+})
+public class ExecuteSQLRecord extends AbstractExecuteSQL {
+
+
+ public static final PropertyDescriptor RECORD_WRITER_FACTORY = new PropertyDescriptor.Builder()
+ .name("esqlrecord-record-writer")
+ .displayName("Record Writer")
+ .description("Specifies the Controller Service to use for writing results to a FlowFile. The Record Writer may use Inherit Schema to emulate the inferred schema behavior, i.e. "
+ + "an explicit schema need not be defined in the writer, and will be supplied by the same logic used to infer the schema from the column types.")
+ .identifiesControllerService(RecordSetWriterFactory.class)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor NORMALIZE_NAMES = new PropertyDescriptor.Builder()
+ .name("esqlrecord-normalize")
+ .displayName("Normalize Table/Column Names")
+ .description("Whether to change characters in column names. For example, colons and periods will be changed to underscores.")
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .required(true)
+ .build();
+
+ public ExecuteSQLRecord() {
+ final Set<Relationship> r = new HashSet<>();
+ r.add(REL_SUCCESS);
+ r.add(REL_FAILURE);
+ relationships = Collections.unmodifiableSet(r);
+
+ final List<PropertyDescriptor> pds = new ArrayList<>();
+ pds.add(DBCP_SERVICE);
+ pds.add(SQL_SELECT_QUERY);
+ pds.add(QUERY_TIMEOUT);
+ pds.add(RECORD_WRITER_FACTORY);
+ pds.add(NORMALIZE_NAMES);
+ pds.add(USE_AVRO_LOGICAL_TYPES);
+ pds.add(MAX_ROWS_PER_FLOW_FILE);
+ pds.add(OUTPUT_BATCH_SIZE);
+ propDescriptors = Collections.unmodifiableList(pds);
+ }
+
+ @Override
+ protected SqlWriter configureSqlWriter(ProcessSession session, ProcessContext context, FlowFile fileToProcess) {
+ final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
+ final boolean convertNamesForAvro = context.getProperty(NORMALIZE_NAMES).asBoolean();
+ final Boolean useAvroLogicalTypes = context.getProperty(USE_AVRO_LOGICAL_TYPES).asBoolean();
+ final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder()
+ .convertNames(convertNamesForAvro)
+ .useLogicalTypes(useAvroLogicalTypes)
+ .build();
+ final RecordSetWriterFactory recordSetWriterFactory = context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
+
+ return new RecordSqlWriter(recordSetWriterFactory, options, maxRowsPerFlowFile, fileToProcess == null ? Collections.emptyMap() : fileToProcess.getAttributes());
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
index 1923e2c..71348ef 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
@@ -16,7 +16,6 @@
*/
package org.apache.nifi.processors.standard;
-import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -27,49 +26,21 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
-import org.apache.nifi.components.state.StateManager;
-import org.apache.nifi.components.state.StateMap;
-import org.apache.nifi.dbcp.DBCPService;
-import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.flowfile.attributes.FragmentAttributes;
-import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.standard.db.DatabaseAdapter;
+import org.apache.nifi.processors.standard.sql.DefaultAvroSqlWriter;
+import org.apache.nifi.processors.standard.sql.SqlWriter;
import org.apache.nifi.processors.standard.util.JdbcCommon;
-import org.apache.nifi.util.StopWatch;
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.text.ParseException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.IntStream;
import static org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_PRECISION;
import static org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_SCALE;
@@ -112,60 +83,7 @@ import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGIC
@DynamicProperty(name = "initial.maxvalue.<max_value_column>", value = "Initial maximum value for the specified column",
expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY, description = "Specifies an initial max value for max value column(s). Properties should "
+ "be added in the format `initial.maxvalue.<max_value_column>`. This value is only used the first time the table is accessed (when a Maximum Value Column is specified).")
-public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
-
- public static final String RESULT_TABLENAME = "tablename";
- public static final String RESULT_ROW_COUNT = "querydbtable.row.count";
-
- public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key();
- public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key();
-
- public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder()
- .name("Fetch Size")
- .description("The number of result rows to be fetched from the result set at a time. This is a hint to the database driver and may not be "
- + "honored and/or exact. If the value specified is zero, then the hint is ignored.")
- .defaultValue("0")
- .required(true)
- .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .build();
-
- public static final PropertyDescriptor MAX_ROWS_PER_FLOW_FILE = new PropertyDescriptor.Builder()
- .name("qdbt-max-rows")
- .displayName("Max Rows Per Flow File")
- .description("The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large "
- + "result sets into multiple FlowFiles. If the value specified is zero, then all rows are returned in a single FlowFile.")
- .defaultValue("0")
- .required(true)
- .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .build();
-
- public static final PropertyDescriptor OUTPUT_BATCH_SIZE = new PropertyDescriptor.Builder()
- .name("qdbt-output-batch-size")
- .displayName("Output Batch Size")
- .description("The number of output FlowFiles to queue before committing the process session. When set to zero, the session will be committed when all result set rows "
- + "have been processed and the output FlowFiles are ready for transfer to the downstream relationship. For large result sets, this can cause a large burst of FlowFiles "
- + "to be transferred at the end of processor execution. If this property is set, then when the specified number of FlowFiles are ready for transfer, then the session will "
- + "be committed, thus releasing the FlowFiles to the downstream relationship. NOTE: The maxvalue.* and fragment.count attributes will not be set on FlowFiles when this "
- + "property is set.")
- .defaultValue("0")
- .required(true)
- .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .build();
-
- public static final PropertyDescriptor MAX_FRAGMENTS = new PropertyDescriptor.Builder()
- .name("qdbt-max-frags")
- .displayName("Maximum Number of Fragments")
- .description("The maximum number of fragments. If the value specified is zero, then all fragments are returned. " +
- "This prevents OutOfMemoryError when this processor ingests huge table. NOTE: Setting this property can result in data loss, as the incoming results are "
- + "not ordered, and fragments may end at arbitrary boundaries where rows are not included in the result set.")
- .defaultValue("0")
- .required(true)
- .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .build();
+public class QueryDatabaseTable extends AbstractQueryDatabaseTable {
public QueryDatabaseTable() {
final Set<Relationship> r = new HashSet<>();
@@ -197,365 +115,22 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
}
@Override
- public Set<Relationship> getRelationships() {
- return relationships;
- }
-
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return propDescriptors;
- }
-
- @Override
- protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
- return new PropertyDescriptor.Builder()
- .name(propertyDescriptorName)
- .required(false)
- .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
- .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .dynamic(true)
- .build();
- }
-
- @OnScheduled
- public void setup(final ProcessContext context) {
- maxValueProperties = getDefaultMaxValueProperties(context, null);
- }
-
- @OnStopped
- public void stop() {
- // Reset the column type map in case properties change
- setupComplete.set(false);
- }
-
- @Override
- public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
- // Fetch the column/table info once
- if (!setupComplete.get()) {
- super.setup(context);
- }
- ProcessSession session = sessionFactory.createSession();
- final List<FlowFile> resultSetFlowFiles = new ArrayList<>();
-
- final ComponentLog logger = getLogger();
-
- final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
- final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue());
+ protected SqlWriter configureSqlWriter(ProcessSession session, ProcessContext context) {
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
- final String columnNames = context.getProperty(COLUMN_NAMES).evaluateAttributeExpressions().getValue();
- final String sqlQuery = context.getProperty(SQL_QUERY).evaluateAttributeExpressions().getValue();
- final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions().getValue();
- final String customWhereClause = context.getProperty(WHERE_CLAUSE).evaluateAttributeExpressions().getValue();
- final Integer fetchSize = context.getProperty(FETCH_SIZE).evaluateAttributeExpressions().asInteger();
+ final boolean convertNamesForAvro = context.getProperty(NORMALIZE_NAMES_FOR_AVRO).asBoolean();
+ final Boolean useAvroLogicalTypes = context.getProperty(USE_AVRO_LOGICAL_TYPES).asBoolean();
final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
- final Integer outputBatchSizeField = context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
- final int outputBatchSize = outputBatchSizeField == null ? 0 : outputBatchSizeField;
- final Integer maxFragments = context.getProperty(MAX_FRAGMENTS).isSet()
- ? context.getProperty(MAX_FRAGMENTS).evaluateAttributeExpressions().asInteger()
- : 0;
+ final Integer defaultPrecision = context.getProperty(DEFAULT_PRECISION).evaluateAttributeExpressions().asInteger();
+ final Integer defaultScale = context.getProperty(DEFAULT_SCALE).evaluateAttributeExpressions().asInteger();
+
final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder()
.recordName(tableName)
+ .convertNames(convertNamesForAvro)
+ .useLogicalTypes(useAvroLogicalTypes)
+ .defaultPrecision(defaultPrecision)
+ .defaultScale(defaultScale)
.maxRows(maxRowsPerFlowFile)
- .convertNames(context.getProperty(NORMALIZE_NAMES_FOR_AVRO).asBoolean())
- .useLogicalTypes(context.getProperty(USE_AVRO_LOGICAL_TYPES).asBoolean())
- .defaultPrecision(context.getProperty(DEFAULT_PRECISION).evaluateAttributeExpressions().asInteger())
- .defaultScale(context.getProperty(DEFAULT_SCALE).evaluateAttributeExpressions().asInteger())
.build();
-
- final StateManager stateManager = context.getStateManager();
- final StateMap stateMap;
-
- try {
- stateMap = stateManager.getState(Scope.CLUSTER);
- } catch (final IOException ioe) {
- getLogger().error("Failed to retrieve observed maximum values from the State Manager. Will not perform "
- + "query until this is accomplished.", ioe);
- context.yield();
- return;
- }
- // Make a mutable copy of the current state property map. This will be updated by the result row callback, and eventually
- // set as the current state map (after the session has been committed)
- final Map<String, String> statePropertyMap = new HashMap<>(stateMap.toMap());
-
- //If an initial max value for column(s) has been specified using properties, and this column is not in the state manager, sync them to the state property map
- for (final Map.Entry<String, String> maxProp : maxValueProperties.entrySet()) {
- String maxPropKey = maxProp.getKey().toLowerCase();
- String fullyQualifiedMaxPropKey = getStateKey(tableName, maxPropKey, dbAdapter);
- if (!statePropertyMap.containsKey(fullyQualifiedMaxPropKey)) {
- String newMaxPropValue;
- // If we can't find the value at the fully-qualified key name, it is possible (under a previous scheme)
- // the value has been stored under a key that is only the column name. Fall back to check the column name,
- // but store the new initial max value under the fully-qualified key.
- if (statePropertyMap.containsKey(maxPropKey)) {
- newMaxPropValue = statePropertyMap.get(maxPropKey);
- } else {
- newMaxPropValue = maxProp.getValue();
- }
- statePropertyMap.put(fullyQualifiedMaxPropKey, newMaxPropValue);
-
- }
- }
-
- List<String> maxValueColumnNameList = StringUtils.isEmpty(maxValueColumnNames)
- ? null
- : Arrays.asList(maxValueColumnNames.split("\\s*,\\s*"));
- final String selectQuery = getQuery(dbAdapter, tableName, sqlQuery, columnNames, maxValueColumnNameList, customWhereClause, statePropertyMap);
- final StopWatch stopWatch = new StopWatch(true);
- final String fragmentIdentifier = UUID.randomUUID().toString();
-
- try (final Connection con = dbcpService.getConnection(Collections.emptyMap());
- final Statement st = con.createStatement()) {
-
- if (fetchSize != null && fetchSize > 0) {
- try {
- st.setFetchSize(fetchSize);
- } catch (SQLException se) {
- // Not all drivers support this, just log the error (at debug level) and move on
- logger.debug("Cannot set fetch size to {} due to {}", new Object[]{fetchSize, se.getLocalizedMessage()}, se);
- }
- }
-
- String jdbcURL = "DBCPService";
- try {
- DatabaseMetaData databaseMetaData = con.getMetaData();
- if (databaseMetaData != null) {
- jdbcURL = databaseMetaData.getURL();
- }
- } catch (SQLException se) {
- // Ignore and use default JDBC URL. This shouldn't happen unless the driver doesn't implement getMetaData() properly
- }
-
- final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS).intValue();
- st.setQueryTimeout(queryTimeout); // timeout in seconds
- if (logger.isDebugEnabled()) {
- logger.debug("Executing query {}", new Object[] { selectQuery });
- }
- try (final ResultSet resultSet = st.executeQuery(selectQuery)) {
- int fragmentIndex=0;
- // Max values will be updated in the state property map by the callback
- final MaxValueResultSetRowCollector maxValCollector = new MaxValueResultSetRowCollector(tableName, statePropertyMap, dbAdapter);
-
- while(true) {
- final AtomicLong nrOfRows = new AtomicLong(0L);
-
- FlowFile fileToProcess = session.create();
- try {
- fileToProcess = session.write(fileToProcess, out -> {
- try {
- nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, maxValCollector));
- } catch (SQLException | RuntimeException e) {
- throw new ProcessException("Error during database query or conversion of records to Avro.", e);
- }
- });
- } catch (ProcessException e) {
- // Add flowfile to results before rethrowing so it will be removed from session in outer catch
- resultSetFlowFiles.add(fileToProcess);
- throw e;
- }
-
- if (nrOfRows.get() > 0) {
- // set attribute how many rows were selected
- fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
- fileToProcess = session.putAttribute(fileToProcess, RESULT_TABLENAME, tableName);
- fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
- if(maxRowsPerFlowFile > 0) {
- fileToProcess = session.putAttribute(fileToProcess, FRAGMENT_ID, fragmentIdentifier);
- fileToProcess = session.putAttribute(fileToProcess, FRAGMENT_INDEX, String.valueOf(fragmentIndex));
- }
-
- logger.info("{} contains {} Avro records; transferring to 'success'",
- new Object[]{fileToProcess, nrOfRows.get()});
-
- session.getProvenanceReporter().receive(fileToProcess, jdbcURL, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
- resultSetFlowFiles.add(fileToProcess);
- // If we've reached the batch size, send out the flow files
- if (outputBatchSize > 0 && resultSetFlowFiles.size() >= outputBatchSize) {
- session.transfer(resultSetFlowFiles, REL_SUCCESS);
- session.commit();
- resultSetFlowFiles.clear();
- }
- } else {
- // If there were no rows returned, don't send the flowfile
- session.remove(fileToProcess);
- // If no rows and this was first FlowFile, yield
- if(fragmentIndex == 0){
- context.yield();
- }
- break;
- }
-
- fragmentIndex++;
- if (maxFragments > 0 && fragmentIndex >= maxFragments) {
- break;
- }
-
- // If we aren't splitting up the data into flow files or fragments, then the result set has been entirely fetched so don't loop back around
- if (maxFragments == 0 && maxRowsPerFlowFile == 0) {
- break;
- }
-
- // If we are splitting up the data into flow files, don't loop back around if we've gotten all results
- if(maxRowsPerFlowFile > 0 && nrOfRows.get() < maxRowsPerFlowFile) {
- break;
- }
- }
-
- // Apply state changes from the Max Value tracker
- maxValCollector.applyStateChanges();
-
- // Even though the maximum value and total count are known at this point, to maintain consistent behavior if Output Batch Size is set, do not store the attributes
- if (outputBatchSize == 0) {
- for (int i = 0; i < resultSetFlowFiles.size(); i++) {
- // Add maximum values as attributes
- for (Map.Entry<String, String> entry : statePropertyMap.entrySet()) {
- // Get just the column name from the key
- String key = entry.getKey();
- String colName = key.substring(key.lastIndexOf(NAMESPACE_DELIMITER) + NAMESPACE_DELIMITER.length());
- resultSetFlowFiles.set(i, session.putAttribute(resultSetFlowFiles.get(i), "maxvalue." + colName, entry.getValue()));
- }
-
- //set count on all FlowFiles
- if (maxRowsPerFlowFile > 0) {
- resultSetFlowFiles.set(i,
- session.putAttribute(resultSetFlowFiles.get(i), "fragment.count", Integer.toString(fragmentIndex)));
- }
- }
- }
- } catch (final SQLException e) {
- throw e;
- }
-
- session.transfer(resultSetFlowFiles, REL_SUCCESS);
-
- } catch (final ProcessException | SQLException e) {
- logger.error("Unable to execute SQL select query {} due to {}", new Object[]{selectQuery, e});
- if (!resultSetFlowFiles.isEmpty()) {
- session.remove(resultSetFlowFiles);
- }
- context.yield();
- } finally {
- session.commit();
- try {
- // Update the state
- stateManager.setState(statePropertyMap, Scope.CLUSTER);
- } catch (IOException ioe) {
- getLogger().error("{} failed to update State Manager, maximum observed values will not be recorded", new Object[]{this, ioe});
- }
- }
- }
-
- protected String getQuery(DatabaseAdapter dbAdapter, String tableName, String columnNames, List<String> maxValColumnNames,
- String customWhereClause, Map<String, String> stateMap) {
-
- return getQuery(dbAdapter, tableName, null, columnNames, maxValColumnNames, customWhereClause, stateMap);
- }
-
- protected String getQuery(DatabaseAdapter dbAdapter, String tableName, String sqlQuery, String columnNames, List<String> maxValColumnNames,
- String customWhereClause, Map<String, String> stateMap) {
- if (StringUtils.isEmpty(tableName)) {
- throw new IllegalArgumentException("Table name must be specified");
- }
- final StringBuilder query;
-
- if (StringUtils.isEmpty(sqlQuery)) {
- query = new StringBuilder(dbAdapter.getSelectStatement(tableName, columnNames, null, null, null, null));
- } else {
- query = getWrappedQuery(dbAdapter, sqlQuery, tableName);
- }
-
- List<String> whereClauses = new ArrayList<>();
- // Check state map for last max values
- if (stateMap != null && !stateMap.isEmpty() && maxValColumnNames != null) {
- IntStream.range(0, maxValColumnNames.size()).forEach((index) -> {
- String colName = maxValColumnNames.get(index);
- String maxValueKey = getStateKey(tableName, colName, dbAdapter);
- String maxValue = stateMap.get(maxValueKey);
- if (StringUtils.isEmpty(maxValue)) {
- // If we can't find the value at the fully-qualified key name, it is possible (under a previous scheme)
- // the value has been stored under a key that is only the column name. Fall back to check the column name; either way, when a new
- // maximum value is observed, it will be stored under the fully-qualified key from then on.
- maxValue = stateMap.get(colName.toLowerCase());
- }
- if (!StringUtils.isEmpty(maxValue)) {
- Integer type = columnTypeMap.get(maxValueKey);
- if (type == null) {
- // This shouldn't happen as we are populating columnTypeMap when the processor is scheduled.
- throw new IllegalArgumentException("No column type found for: " + colName);
- }
- // Add a condition for the WHERE clause
- whereClauses.add(colName + (index == 0 ? " > " : " >= ") + getLiteralByType(type, maxValue, dbAdapter.getName()));
- }
- });
- }
-
- if (customWhereClause != null) {
- whereClauses.add("(" + customWhereClause + ")");
- }
-
- if (!whereClauses.isEmpty()) {
- query.append(" WHERE ");
- query.append(StringUtils.join(whereClauses, " AND "));
- }
-
- return query.toString();
- }
-
- protected class MaxValueResultSetRowCollector implements JdbcCommon.ResultSetRowCallback {
- DatabaseAdapter dbAdapter;
- final Map<String, String> newColMap;
- final Map<String, String> originalState;
- String tableName;
-
- public MaxValueResultSetRowCollector(String tableName, Map<String, String> stateMap, DatabaseAdapter dbAdapter) {
- this.dbAdapter = dbAdapter;
- this.originalState = stateMap;
-
- this.newColMap = new HashMap<>();
- this.newColMap.putAll(stateMap);
-
- this.tableName = tableName;
- }
-
- @Override
- public void processRow(ResultSet resultSet) throws IOException {
- if (resultSet == null) {
- return;
- }
- try {
- // Iterate over the row, check-and-set max values
- final ResultSetMetaData meta = resultSet.getMetaData();
- final int nrOfColumns = meta.getColumnCount();
- if (nrOfColumns > 0) {
- for (int i = 1; i <= nrOfColumns; i++) {
- String colName = meta.getColumnName(i).toLowerCase();
- String fullyQualifiedMaxValueKey = getStateKey(tableName, colName, dbAdapter);
- Integer type = columnTypeMap.get(fullyQualifiedMaxValueKey);
- // Skip any columns we're not keeping track of or whose value is null
- if (type == null || resultSet.getObject(i) == null) {
- continue;
- }
- String maxValueString = newColMap.get(fullyQualifiedMaxValueKey);
- // If we can't find the value at the fully-qualified key name, it is possible (under a previous scheme)
- // the value has been stored under a key that is only the column name. Fall back to check the column name; either way, when a new
- // maximum value is observed, it will be stored under the fully-qualified key from then on.
- if (StringUtils.isEmpty(maxValueString)) {
- maxValueString = newColMap.get(colName);
- }
- String newMaxValueString = getMaxValueFromRow(resultSet, i, type, maxValueString, dbAdapter.getName());
- if (newMaxValueString != null) {
- newColMap.put(fullyQualifiedMaxValueKey, newMaxValueString);
- }
- }
- }
- } catch (ParseException | SQLException e) {
- throw new IOException(e);
- }
- }
-
- @Override
- public void applyStateChanges() {
- this.originalState.putAll(this.newColMap);
- }
+ return new DefaultAvroSqlWriter(options);
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java
new file mode 100644
index 0000000..ea89256
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.standard.sql.RecordSqlWriter;
+import org.apache.nifi.processors.standard.sql.SqlWriter;
+import org.apache.nifi.processors.standard.util.JdbcCommon;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGICAL_TYPES;
+
+
+@TriggerSerially
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"sql", "select", "jdbc", "query", "database", "record"})
+@SeeAlso({GenerateTableFetch.class, ExecuteSQL.class})
+@CapabilityDescription("Generates a SQL select query, or uses a provided statement, and executes it to fetch all rows whose values in the specified "
+ + "Maximum Value column(s) are larger than the "
+ + "previously-seen maxima. Query result will be converted to the format specified by the record writer. Expression Language is supported for several properties, but no incoming "
+ + "connections are permitted. The Variable Registry may be used to provide values for any property containing Expression Language. If it is desired to "
+ + "leverage flow file attributes to perform these queries, the GenerateTableFetch and/or ExecuteSQL processors can be used for this purpose. "
+ + "Streaming is used so arbitrarily large result sets are supported. This processor can be scheduled to run on "
+ + "a timer or cron expression, using the standard scheduling methods. This processor is intended to be run on the Primary Node only. FlowFile attribute "
+ + "'querydbtable.row.count' indicates how many rows were selected.")
+@Stateful(scopes = Scope.CLUSTER, description = "After performing a query on the specified table, the maximum values for "
+ + "the specified column(s) will be retained for use in future executions of the query. This allows the Processor "
+ + "to fetch only those records that have max values greater than the retained values. This can be used for "
+ + "incremental fetching, fetching of newly added rows, etc. To clear the maximum values, clear the state of the processor "
+ + "per the State Management documentation")
+@WritesAttributes({
+ @WritesAttribute(attribute = "tablename", description="Name of the table being queried"),
+ @WritesAttribute(attribute = "querydbtable.row.count", description="The number of rows selected by the query"),
+ @WritesAttribute(attribute="fragment.identifier", description="If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set "
+ + "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."),
+ @WritesAttribute(attribute = "fragment.count", description = "If 'Max Rows Per Flow File' is set then this is the total number of "
+ + "FlowFiles produced by a single ResultSet. This can be used in conjunction with the "
+ + "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming ResultSet. If Output Batch Size is set, then this "
+ + "attribute will not be populated."),
+ @WritesAttribute(attribute="fragment.index", description="If 'Max Rows Per Flow File' is set then the position of this FlowFile in the list of "
+ + "outgoing FlowFiles that were all derived from the same result set FlowFile. This can be "
+ + "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same query result set and in what order "
+ + "FlowFiles were produced"),
+ @WritesAttribute(attribute = "maxvalue.*", description = "Each attribute contains the observed maximum value of a specified 'Maximum-value Column'. The "
+ + "suffix of the attribute is the name of the column. If Output Batch Size is set, then this attribute will not be populated."),
+ @WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer."),
+ @WritesAttribute(attribute = "record.count", description = "The number of records output by the Record Writer.")
+})
+@DynamicProperty(name = "initial.maxvalue.<max_value_column>", value = "Initial maximum value for the specified column",
+ expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY, description = "Specifies an initial max value for max value column(s). Properties should "
+ + "be added in the format `initial.maxvalue.<max_value_column>`. This value is only used the first time the table is accessed (when a Maximum Value Column is specified).")
+public class QueryDatabaseTableRecord extends AbstractQueryDatabaseTable {
+
+ public static final PropertyDescriptor RECORD_WRITER_FACTORY = new PropertyDescriptor.Builder()
+ .name("qdbtr-record-writer")
+ .displayName("Record Writer")
+ .description("Specifies the Controller Service to use for writing results to a FlowFile. The Record Writer may use Inherit Schema to emulate the inferred schema behavior, i.e. "
+ + "an explicit schema need not be defined in the writer, and will be supplied by the same logic used to infer the schema from the column types.")
+ .identifiesControllerService(RecordSetWriterFactory.class)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor NORMALIZE_NAMES = new PropertyDescriptor.Builder()
+ .name("qdbtr-normalize")
+ .displayName("Normalize Table/Column Names")
+ .description("Whether to change characters in column names when creating the output schema. For example, colons and periods will be changed to underscores.")
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .required(true)
+ .build();
+
+ public QueryDatabaseTableRecord() {
+ final Set<Relationship> r = new HashSet<>();
+ r.add(REL_SUCCESS);
+ relationships = Collections.unmodifiableSet(r);
+
+ final List<PropertyDescriptor> pds = new ArrayList<>();
+ pds.add(DBCP_SERVICE);
+ pds.add(DB_TYPE);
+ pds.add(new PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(TABLE_NAME)
+ .description("The name of the database table to be queried. When a custom query is used, this property is used to alias the query and appears as an attribute on the FlowFile.")
+ .build());
+ pds.add(COLUMN_NAMES);
+ pds.add(WHERE_CLAUSE);
+ pds.add(SQL_QUERY);
+ pds.add(RECORD_WRITER_FACTORY);
+ pds.add(MAX_VALUE_COLUMN_NAMES);
+ pds.add(QUERY_TIMEOUT);
+ pds.add(FETCH_SIZE);
+ pds.add(MAX_ROWS_PER_FLOW_FILE);
+ pds.add(OUTPUT_BATCH_SIZE);
+ pds.add(MAX_FRAGMENTS);
+ pds.add(NORMALIZE_NAMES);
+ pds.add(USE_AVRO_LOGICAL_TYPES);
+
+ propDescriptors = Collections.unmodifiableList(pds);
+ }
+
+ @Override
+ protected SqlWriter configureSqlWriter(ProcessSession session, ProcessContext context) {
+ final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
+ final boolean convertNamesForAvro = context.getProperty(NORMALIZE_NAMES).asBoolean();
+ final Boolean useAvroLogicalTypes = context.getProperty(USE_AVRO_LOGICAL_TYPES).asBoolean();
+ final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder()
+ .convertNames(convertNamesForAvro)
+ .useLogicalTypes(useAvroLogicalTypes)
+ .build();
+ final RecordSetWriterFactory recordSetWriterFactory = context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
+
+ return new RecordSqlWriter(recordSetWriterFactory, options, maxRowsPerFlowFile, Collections.emptyMap());
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/DefaultAvroSqlWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/DefaultAvroSqlWriter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/DefaultAvroSqlWriter.java
new file mode 100644
index 0000000..574aca7
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/DefaultAvroSqlWriter.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.sql;
+
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.standard.AbstractQueryDatabaseTable;
+import org.apache.nifi.processors.standard.util.JdbcCommon;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class DefaultAvroSqlWriter implements SqlWriter {
+
+ private final JdbcCommon.AvroConversionOptions options;
+
+ private final Map<String,String> attributesToAdd = new HashMap<String,String>() {{
+ put(CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
+ }};
+
+ public DefaultAvroSqlWriter(JdbcCommon.AvroConversionOptions options) {
+ this.options = options;
+ }
+
+ @Override
+ public long writeResultSet(ResultSet resultSet, OutputStream outputStream, ComponentLog logger, AbstractQueryDatabaseTable.MaxValueResultSetRowCollector callback) throws Exception {
+ try {
+ return JdbcCommon.convertToAvroStream(resultSet, outputStream, options, callback);
+ } catch (SQLException e) {
+ throw new ProcessException(e);
+ }
+ }
+
+ @Override
+ public Map<String, String> getAttributesToAdd() {
+ return attributesToAdd;
+ }
+
+ @Override
+ public void writeEmptyResultSet(OutputStream outputStream, ComponentLog logger) throws IOException {
+ JdbcCommon.createEmptyAvroStream(outputStream);
+ }
+
+ @Override
+ public String getMimeType() {
+ return JdbcCommon.MIME_TYPE_AVRO_BINARY;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/RecordSqlWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/RecordSqlWriter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/RecordSqlWriter.java
new file mode 100644
index 0000000..c1a76b4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/RecordSqlWriter.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.sql;
+
+import org.apache.avro.Schema;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.standard.AbstractQueryDatabaseTable;
+import org.apache.nifi.processors.standard.util.JdbcCommon;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.serialization.record.ResultSetRecordSet;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class RecordSqlWriter implements SqlWriter {
+
+ private final RecordSetWriterFactory recordSetWriterFactory;
+ private final AtomicReference<WriteResult> writeResultRef;
+ private final JdbcCommon.AvroConversionOptions options;
+ private final int maxRowsPerFlowFile;
+ private final Map<String, String> originalAttributes;
+ private ResultSetRecordSet fullRecordSet;
+ private RecordSchema writeSchema;
+ private String mimeType;
+
+ public RecordSqlWriter(RecordSetWriterFactory recordSetWriterFactory, JdbcCommon.AvroConversionOptions options, int maxRowsPerFlowFile, Map<String, String> originalAttributes) {
+ this.recordSetWriterFactory = recordSetWriterFactory;
+ this.writeResultRef = new AtomicReference<>();
+ this.maxRowsPerFlowFile = maxRowsPerFlowFile;
+ this.options = options;
+ this.originalAttributes = originalAttributes;
+ }
+
+ @Override
+ public long writeResultSet(ResultSet resultSet, OutputStream outputStream, ComponentLog logger, AbstractQueryDatabaseTable.MaxValueResultSetRowCollector callback) throws Exception {
+ final RecordSet recordSet;
+ try {
+ if (fullRecordSet == null) {
+ final Schema avroSchema = JdbcCommon.createSchema(resultSet, options);
+ final RecordSchema recordAvroSchema = AvroTypeUtil.createSchema(avroSchema);
+ fullRecordSet = new ResultSetRecordSetWithCallback(resultSet, recordAvroSchema, callback);
+ writeSchema = recordSetWriterFactory.getSchema(originalAttributes, fullRecordSet.getSchema());
+ }
+ recordSet = (maxRowsPerFlowFile > 0) ? fullRecordSet.limit(maxRowsPerFlowFile) : fullRecordSet;
+
+ } catch (final SQLException | SchemaNotFoundException | IOException e) {
+ throw new ProcessException(e);
+ }
+ try (final RecordSetWriter resultSetWriter = recordSetWriterFactory.createWriter(logger, writeSchema, outputStream)) {
+ writeResultRef.set(resultSetWriter.write(recordSet));
+ if (mimeType == null) {
+ mimeType = resultSetWriter.getMimeType();
+ }
+ return writeResultRef.get().getRecordCount();
+ } catch (final Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public Map<String, String> getAttributesToAdd() {
+ Map<String, String> attributesToAdd = new HashMap<>();
+ attributesToAdd.put(CoreAttributes.MIME_TYPE.key(), mimeType);
+
+ // Add any attributes from the record writer (if present)
+ final WriteResult result = writeResultRef.get();
+ if (result != null) {
+ if (result.getAttributes() != null) {
+ attributesToAdd.putAll(result.getAttributes());
+ }
+
+ attributesToAdd.put("record.count", String.valueOf(result.getRecordCount()));
+ }
+ return attributesToAdd;
+ }
+
+ @Override
+ public void updateCounters(ProcessSession session) {
+ final WriteResult result = writeResultRef.get();
+ if (result != null) {
+ session.adjustCounter("Records Written", result.getRecordCount(), false);
+ }
+ }
+
+ @Override
+ public void writeEmptyResultSet(OutputStream outputStream, ComponentLog logger) throws IOException {
+ try (final RecordSetWriter resultSetWriter = recordSetWriterFactory.createWriter(logger, writeSchema, outputStream)) {
+ mimeType = resultSetWriter.getMimeType();
+ resultSetWriter.beginRecordSet();
+ resultSetWriter.finishRecordSet();
+ } catch (final Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public String getMimeType() {
+ return mimeType;
+ }
+
+ private static class ResultSetRecordSetWithCallback extends ResultSetRecordSet {
+
+ private final AbstractQueryDatabaseTable.MaxValueResultSetRowCollector callback;
+
+ ResultSetRecordSetWithCallback(ResultSet rs, RecordSchema readerSchema, AbstractQueryDatabaseTable.MaxValueResultSetRowCollector callback) throws SQLException {
+ super(rs, readerSchema);
+ this.callback = callback;
+ }
+
+ @Override
+ public Record next() throws IOException {
+ try {
+ if (hasMoreRows()) {
+ ResultSet rs = getResultSet();
+ final Record record = createRecord(rs);
+ if (callback != null) {
+ callback.processRow(rs);
+ }
+ setMoreRows(rs.next());
+ return record;
+ } else {
+ return null;
+ }
+ } catch (final SQLException e) {
+ throw new IOException("Could not obtain next record from ResultSet", e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/SqlWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/SqlWriter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/SqlWriter.java
new file mode 100644
index 0000000..08fc3fd
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/SqlWriter.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.sql;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processors.standard.AbstractQueryDatabaseTable;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.sql.ResultSet;
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * The SqlWriter interface provides a standard way for processors such as ExecuteSQL, ExecuteSQLRecord, QueryDatabaseTable, and QueryDatabaseTableRecord
+ * to write SQL result sets out to a flow file in whichever manner is appropriate. For example, ExecuteSQL writes the result set as Avro but ExecuteSQLRecord
+ * uses the Record API to write the result set out as prescribed by the selected RecordSetWriter.
+ */
+public interface SqlWriter {
+
+ /**
+ * Writes the given result set out to the given output stream, possibly applying a callback as each row is processed.
+ * @param resultSet the ResultSet to be written
+ * @param outputStream the OutputStream to write the result set to
+ * @param logger a common logger that can be used to log messages during write
+ * @param callback a MaxValueResultSetRowCollector that may be called as each row in the ResultSet is processed
+ * @return the number of rows written to the output stream
+ * @throws Exception if any errors occur during the writing of the result set to the output stream
+ */
+ long writeResultSet(ResultSet resultSet, OutputStream outputStream, ComponentLog logger, AbstractQueryDatabaseTable.MaxValueResultSetRowCollector callback) throws Exception;
+
+ /**
+ * Returns a map of attribute key/value pairs to be added to any outgoing flow file(s). The default implementation is to return an empty map.
+ * @return a map of attribute key/value pairs
+ */
+ default Map<String, String> getAttributesToAdd() {
+ return Collections.emptyMap();
+ }
+
+ /**
+ * Updates any session counters as a result of processing result sets. The default implementation is empty, no counters will be updated.
+ * @param session the session upon which to update counters
+ */
+ default void updateCounters(ProcessSession session) {
+ }
+
+ /**
+ * Writes an empty result set to the output stream. In some cases a ResultSet might not have any viable rows, but will throw an error or
+ * behave unexpectedly if rows are attempted to be retrieved. This method indicates the implementation should write whatever output is
+ * appropriate for a result set with no rows.
+ * @param outputStream the OutputStream to write the empty result set to
+ * @param logger a common logger that can be used to log messages during write
+ * @throws IOException if any errors occur during the writing of an empty result set to the output stream
+ */
+ void writeEmptyResultSet(OutputStream outputStream, ComponentLog logger) throws IOException;
+
+ /**
+ * Returns the MIME type of the output format. This can be used in FlowFile attributes or to perform format-specific processing as necessary.
+ * @return the MIME type string of the output format.
+ */
+ String getMimeType();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index d21b7f4..bfe1403 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -35,6 +35,7 @@ org.apache.nifi.processors.standard.EvaluateXPath
org.apache.nifi.processors.standard.EvaluateXQuery
org.apache.nifi.processors.standard.ExecuteProcess
org.apache.nifi.processors.standard.ExecuteSQL
+org.apache.nifi.processors.standard.ExecuteSQLRecord
org.apache.nifi.processors.standard.ExecuteStreamCommand
org.apache.nifi.processors.standard.ExtractGrok
org.apache.nifi.processors.standard.ExtractText
@@ -96,6 +97,7 @@ org.apache.nifi.processors.standard.PutSyslog
org.apache.nifi.processors.standard.PutTCP
org.apache.nifi.processors.standard.PutUDP
org.apache.nifi.processors.standard.QueryDatabaseTable
+org.apache.nifi.processors.standard.QueryDatabaseTableRecord
org.apache.nifi.processors.standard.QueryRecord
org.apache.nifi.processors.standard.ReplaceText
org.apache.nifi.processors.standard.ReplaceTextWithMapping
[4/4] nifi git commit: NIFI-4517: Added ExecuteSQLRecord and
QueryDatabaseTableRecord processors
Posted by pv...@apache.org.
NIFI-4517: Added ExecuteSQLRecord and QueryDatabaseTableRecord processors
Signed-off-by: Pierre Villard <pi...@gmail.com>
This closes #2945.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c6572f04
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c6572f04
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c6572f04
Branch: refs/heads/master
Commit: c6572f042bf1637f6faaa2b2ffe4a56e297c6d1a
Parents: b4810b8
Author: Matthew Burgess <ma...@apache.org>
Authored: Fri Aug 10 16:49:25 2018 -0400
Committer: Pierre Villard <pi...@gmail.com>
Committed: Sat Oct 6 10:53:11 2018 +0200
----------------------------------------------------------------------
.../record/ResultSetRecordSet.java | 15 +-
.../processors/standard/AbstractExecuteSQL.java | 369 +++++
.../standard/AbstractQueryDatabaseTable.java | 483 +++++++
.../nifi/processors/standard/ExecuteSQL.java | 371 +----
.../processors/standard/ExecuteSQLRecord.java | 147 ++
.../processors/standard/QueryDatabaseTable.java | 453 +-----
.../standard/QueryDatabaseTableRecord.java | 148 ++
.../standard/sql/DefaultAvroSqlWriter.java | 67 +
.../standard/sql/RecordSqlWriter.java | 158 +++
.../nifi/processors/standard/sql/SqlWriter.java | 77 +
.../org.apache.nifi.processor.Processor | 2 +
.../standard/QueryDatabaseTableRecordTest.java | 1332 ++++++++++++++++++
.../standard/QueryDatabaseTableTest.java | 2 +-
.../processors/standard/TestExecuteSQL.java | 44 +-
.../standard/TestExecuteSQLRecord.java | 376 +++++
15 files changed, 3261 insertions(+), 783 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
index 551789c..bf7d224 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
@@ -63,6 +63,19 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
return schema;
}
+ // Protected methods for subclasses to access private member variables
+ protected ResultSet getResultSet() {
+ return rs;
+ }
+
+ protected boolean hasMoreRows() {
+ return moreRows;
+ }
+
+ protected void setMoreRows(boolean moreRows) {
+ this.moreRows = moreRows;
+ }
+
@Override
public Record next() throws IOException {
try {
@@ -87,7 +100,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
}
}
- private Record createRecord(final ResultSet rs) throws SQLException {
+ protected Record createRecord(final ResultSet rs) throws SQLException {
final Map<String, Object> values = new HashMap<>(schema.getFieldCount());
for (final RecordField field : schema.getFields()) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
new file mode 100644
index 0000000..bf46549
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.sql.SqlWriter;
+import org.apache.nifi.processors.standard.util.JdbcCommon;
+import org.apache.nifi.util.StopWatch;
+
+import java.nio.charset.Charset;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+public abstract class AbstractExecuteSQL extends AbstractProcessor {
+
+ public static final String RESULT_ROW_COUNT = "executesql.row.count";
+ public static final String RESULT_QUERY_DURATION = "executesql.query.duration";
+ public static final String RESULT_QUERY_EXECUTION_TIME = "executesql.query.executiontime";
+ public static final String RESULT_QUERY_FETCH_TIME = "executesql.query.fetchtime";
+ public static final String RESULTSET_INDEX = "executesql.resultset.index";
+
+ public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key();
+ public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key();
+ public static final String FRAGMENT_COUNT = FragmentAttributes.FRAGMENT_COUNT.key();
+
+ // Relationships
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("Successfully created FlowFile from SQL query result set.")
+ .build();
+ public static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("SQL query execution failed. Incoming FlowFile will be penalized and routed to this relationship")
+ .build();
+ protected Set<Relationship> relationships;
+
+ public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder()
+ .name("Database Connection Pooling Service")
+ .description("The Controller Service that is used to obtain connection to database")
+ .required(true)
+ .identifiesControllerService(DBCPService.class)
+ .build();
+
+ public static final PropertyDescriptor SQL_SELECT_QUERY = new PropertyDescriptor.Builder()
+ .name("SQL select query")
+ .description("The SQL select query to execute. The query can be empty, a constant value, or built from attributes "
+ + "using Expression Language. If this property is specified, it will be used regardless of the content of "
+ + "incoming flowfiles. If this property is empty, the content of the incoming flow file is expected "
+ + "to contain a valid SQL select query, to be issued by the processor to the database. Note that Expression "
+ + "Language is not evaluated for flow file contents.")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ public static final PropertyDescriptor QUERY_TIMEOUT = new PropertyDescriptor.Builder()
+ .name("Max Wait Time")
+ .description("The maximum amount of time allowed for a running SQL select query "
+ + " , zero means there is no limit. Max time less than 1 second will be equal to zero.")
+ .defaultValue("0 seconds")
+ .required(true)
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .sensitive(false)
+ .build();
+
+ public static final PropertyDescriptor MAX_ROWS_PER_FLOW_FILE = new PropertyDescriptor.Builder()
+ .name("esql-max-rows")
+ .displayName("Max Rows Per Flow File")
+ .description("The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large "
+ + "result sets into multiple FlowFiles. If the value specified is zero, then all rows are returned in a single FlowFile.")
+ .defaultValue("0")
+ .required(true)
+ .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ public static final PropertyDescriptor OUTPUT_BATCH_SIZE = new PropertyDescriptor.Builder()
+ .name("esql-output-batch-size")
+ .displayName("Output Batch Size")
+ .description("The number of output FlowFiles to queue before committing the process session. When set to zero, the session will be committed when all result set rows "
+ + "have been processed and the output FlowFiles are ready for transfer to the downstream relationship. For large result sets, this can cause a large burst of FlowFiles "
+ + "to be transferred at the end of processor execution. If this property is set, then when the specified number of FlowFiles are ready for transfer, then the session will "
+ + "be committed, thus releasing the FlowFiles to the downstream relationship. NOTE: The fragment.count attribute will not be set on FlowFiles when this "
+ + "property is set.")
+ .defaultValue("0")
+ .required(true)
+ .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ protected List<PropertyDescriptor> propDescriptors;
+
+ protected DBCPService dbcpService;
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return propDescriptors;
+ }
+
+ @OnScheduled
+ public void setup(ProcessContext context) {
+ // If the query is not set, then an incoming flow file is needed. Otherwise fail the initialization
+ if (!context.getProperty(SQL_SELECT_QUERY).isSet() && !context.hasIncomingConnection()) {
+ final String errorString = "Either the Select Query must be specified or there must be an incoming connection "
+ + "providing flowfile(s) containing a SQL select query";
+ getLogger().error(errorString);
+ throw new ProcessException(errorString);
+ }
+ dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
+
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ FlowFile fileToProcess = null;
+ if (context.hasIncomingConnection()) {
+ fileToProcess = session.get();
+
+ // If we have no FlowFile, and all incoming connections are self-loops then we can continue on.
+ // However, if we have no FlowFile and we have connections coming from other Processors, then
+ // we know that we should run only if we have a FlowFile.
+ if (fileToProcess == null && context.hasNonLoopConnection()) {
+ return;
+ }
+ }
+
+ final List<FlowFile> resultSetFlowFiles = new ArrayList<>();
+
+ final ComponentLog logger = getLogger();
+ final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
+ final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
+ final Integer outputBatchSizeField = context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
+ final int outputBatchSize = outputBatchSizeField == null ? 0 : outputBatchSizeField;
+
+ SqlWriter sqlWriter = configureSqlWriter(session, context, fileToProcess);
+
+ final String selectQuery;
+ if (context.getProperty(SQL_SELECT_QUERY).isSet()) {
+ selectQuery = context.getProperty(SQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
+ } else {
+ // If the query is not set, then an incoming flow file is required, and expected to contain a valid SQL select query.
+ // If there is no incoming connection, onTrigger will not be called as the processor will fail when scheduled.
+ final StringBuilder queryContents = new StringBuilder();
+ session.read(fileToProcess, in -> queryContents.append(IOUtils.toString(in, Charset.defaultCharset())));
+ selectQuery = queryContents.toString();
+ }
+
+ int resultCount = 0;
+ try (final Connection con = dbcpService.getConnection(fileToProcess == null ? Collections.emptyMap() : fileToProcess.getAttributes());
+ final PreparedStatement st = con.prepareStatement(selectQuery)) {
+ st.setQueryTimeout(queryTimeout); // timeout in seconds
+
+ if (fileToProcess != null) {
+ JdbcCommon.setParameters(st, fileToProcess.getAttributes());
+ }
+ logger.debug("Executing query {}", new Object[]{selectQuery});
+
+ int fragmentIndex = 0;
+ final String fragmentId = UUID.randomUUID().toString();
+
+ final StopWatch executionTime = new StopWatch(true);
+
+ boolean hasResults = st.execute();
+
+ long executionTimeElapsed = executionTime.getElapsed(TimeUnit.MILLISECONDS);
+
+ boolean hasUpdateCount = st.getUpdateCount() != -1;
+
+ while (hasResults || hasUpdateCount) {
+ //getMoreResults() and execute() return false to indicate that the result of the statement is just a number and not a ResultSet
+ if (hasResults) {
+ final AtomicLong nrOfRows = new AtomicLong(0L);
+
+ try {
+ final ResultSet resultSet = st.getResultSet();
+ do {
+ final StopWatch fetchTime = new StopWatch(true);
+
+ FlowFile resultSetFF;
+ if (fileToProcess == null) {
+ resultSetFF = session.create();
+ } else {
+ resultSetFF = session.create(fileToProcess);
+ resultSetFF = session.putAllAttributes(resultSetFF, fileToProcess.getAttributes());
+ }
+
+ try {
+ resultSetFF = session.write(resultSetFF, out -> {
+ try {
+ nrOfRows.set(sqlWriter.writeResultSet(resultSet, out, getLogger(), null));
+ } catch (Exception e) {
+ throw (e instanceof ProcessException) ? (ProcessException) e : new ProcessException(e);
+ }
+ });
+
+ long fetchTimeElapsed = fetchTime.getElapsed(TimeUnit.MILLISECONDS);
+
+ // set attributes
+ final Map<String, String> attributesToAdd = new HashMap<>();
+ attributesToAdd.put(RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
+ attributesToAdd.put(RESULT_QUERY_DURATION, String.valueOf(executionTimeElapsed + fetchTimeElapsed));
+ attributesToAdd.put(RESULT_QUERY_EXECUTION_TIME, String.valueOf(executionTimeElapsed));
+ attributesToAdd.put(RESULT_QUERY_FETCH_TIME, String.valueOf(fetchTimeElapsed));
+ attributesToAdd.put(RESULTSET_INDEX, String.valueOf(resultCount));
+ attributesToAdd.putAll(sqlWriter.getAttributesToAdd());
+ resultSetFF = session.putAllAttributes(resultSetFF, attributesToAdd);
+ sqlWriter.updateCounters(session);
+
+ // if fragmented ResultSet, determine if we should keep this fragment; set fragment attributes
+ if (maxRowsPerFlowFile > 0) {
+ // if row count is zero and this is not the first fragment, drop it instead of committing it.
+ if (nrOfRows.get() == 0 && fragmentIndex > 0) {
+ session.remove(resultSetFF);
+ break;
+ }
+
+ resultSetFF = session.putAttribute(resultSetFF, FRAGMENT_ID, fragmentId);
+ resultSetFF = session.putAttribute(resultSetFF, FRAGMENT_INDEX, String.valueOf(fragmentIndex));
+ }
+
+ logger.info("{} contains {} records; transferring to 'success'",
+ new Object[]{resultSetFF, nrOfRows.get()});
+ // Report a FETCH event if there was an incoming flow file, or a RECEIVE event otherwise
+ if(context.hasIncomingConnection()) {
+ session.getProvenanceReporter().fetch(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed + fetchTimeElapsed);
+ } else {
+ session.getProvenanceReporter().receive(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed + fetchTimeElapsed);
+ }
+ resultSetFlowFiles.add(resultSetFF);
+
+ // If we've reached the batch size, send out the flow files
+ if (outputBatchSize > 0 && resultSetFlowFiles.size() >= outputBatchSize) {
+ session.transfer(resultSetFlowFiles, REL_SUCCESS);
+ session.commit();
+ resultSetFlowFiles.clear();
+ }
+
+ fragmentIndex++;
+ } catch (Exception e) {
+ // Remove the result set flow file and propagate the exception
+ session.remove(resultSetFF);
+ if (e instanceof ProcessException) {
+ throw (ProcessException) e;
+ } else {
+ throw new ProcessException(e);
+ }
+ }
+ } while (maxRowsPerFlowFile > 0 && nrOfRows.get() == maxRowsPerFlowFile);
+
+ // If we are splitting results but not outputting batches, set count on all FlowFiles
+ if (outputBatchSize == 0 && maxRowsPerFlowFile > 0) {
+ for (int i = 0; i < resultSetFlowFiles.size(); i++) {
+ resultSetFlowFiles.set(i,
+ session.putAttribute(resultSetFlowFiles.get(i), FRAGMENT_COUNT, Integer.toString(fragmentIndex)));
+ }
+ }
+ } catch (final SQLException e) {
+ throw new ProcessException(e);
+ }
+
+ resultCount++;
+ }
+
+ // are there anymore result sets?
+ try {
+ hasResults = st.getMoreResults(Statement.CLOSE_CURRENT_RESULT);
+ hasUpdateCount = st.getUpdateCount() != -1;
+ } catch (SQLException ex) {
+ hasResults = false;
+ hasUpdateCount = false;
+ }
+ }
+
+ // Transfer any remaining files to SUCCESS
+ session.transfer(resultSetFlowFiles, REL_SUCCESS);
+ resultSetFlowFiles.clear();
+
+ //If we had at least one result then it's OK to drop the original file, but if we had no results then
+ // pass the original flow file down the line to trigger downstream processors
+ if (fileToProcess != null) {
+ if (resultCount > 0) {
+ session.remove(fileToProcess);
+ } else {
+ fileToProcess = session.write(fileToProcess, out -> sqlWriter.writeEmptyResultSet(out, getLogger()));
+ fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, "0");
+ fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), sqlWriter.getMimeType());
+ session.transfer(fileToProcess, REL_SUCCESS);
+ }
+ } else if (resultCount == 0) {
+ //If we had no inbound FlowFile, no exceptions, and the SQL generated no result sets (Insert/Update/Delete statements only)
+ // Then generate an empty Output FlowFile
+ FlowFile resultSetFF = session.create();
+
+ resultSetFF = session.write(resultSetFF, out -> sqlWriter.writeEmptyResultSet(out, getLogger()));
+ resultSetFF = session.putAttribute(resultSetFF, RESULT_ROW_COUNT, "0");
+ resultSetFF = session.putAttribute(resultSetFF, CoreAttributes.MIME_TYPE.key(), sqlWriter.getMimeType());
+ session.transfer(resultSetFF, REL_SUCCESS);
+ }
+ } catch (final ProcessException | SQLException e) {
+ //If we had at least one result then it's OK to drop the original file, but if we had no results then
+ // pass the original flow file down the line to trigger downstream processors
+ if (fileToProcess == null) {
+ // This can happen if any exceptions occur while setting up the connection, statement, etc.
+ logger.error("Unable to execute SQL select query {} due to {}. No FlowFile to route to failure",
+ new Object[]{selectQuery, e});
+ context.yield();
+ } else {
+ if (context.hasIncomingConnection()) {
+ logger.error("Unable to execute SQL select query {} for {} due to {}; routing to failure",
+ new Object[]{selectQuery, fileToProcess, e});
+ fileToProcess = session.penalize(fileToProcess);
+ } else {
+ logger.error("Unable to execute SQL select query {} due to {}; routing to failure",
+ new Object[]{selectQuery, e});
+ context.yield();
+ }
+ session.transfer(fileToProcess, REL_FAILURE);
+ }
+ }
+ }
+
+ protected abstract SqlWriter configureSqlWriter(ProcessSession session, ProcessContext context, FlowFile fileToProcess);
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
new file mode 100644
index 0000000..06df6c1
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
@@ -0,0 +1,483 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.expression.AttributeExpression;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.db.DatabaseAdapter;
+import org.apache.nifi.processors.standard.sql.SqlWriter;
+import org.apache.nifi.processors.standard.util.JdbcCommon;
+import org.apache.nifi.util.StopWatch;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.IntStream;
+
+
+public abstract class AbstractQueryDatabaseTable extends AbstractDatabaseFetchProcessor {
+
+ public static final String RESULT_TABLENAME = "tablename";
+ public static final String RESULT_ROW_COUNT = "querydbtable.row.count";
+
+ public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key();
+ public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key();
+
+ public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder()
+ .name("Fetch Size")
+ .description("The number of result rows to be fetched from the result set at a time. This is a hint to the database driver and may not be "
+ + "honored and/or exact. If the value specified is zero, then the hint is ignored.")
+ .defaultValue("0")
+ .required(true)
+ .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ public static final PropertyDescriptor MAX_ROWS_PER_FLOW_FILE = new PropertyDescriptor.Builder()
+ .name("qdbt-max-rows")
+ .displayName("Max Rows Per Flow File")
+ .description("The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large "
+ + "result sets into multiple FlowFiles. If the value specified is zero, then all rows are returned in a single FlowFile.")
+ .defaultValue("0")
+ .required(true)
+ .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ public static final PropertyDescriptor OUTPUT_BATCH_SIZE = new PropertyDescriptor.Builder()
+ .name("qdbt-output-batch-size")
+ .displayName("Output Batch Size")
+ .description("The number of output FlowFiles to queue before committing the process session. When set to zero, the session will be committed when all result set rows "
+ + "have been processed and the output FlowFiles are ready for transfer to the downstream relationship. For large result sets, this can cause a large burst of FlowFiles "
+ + "to be transferred at the end of processor execution. If this property is set, then when the specified number of FlowFiles are ready for transfer, then the session will "
+ + "be committed, thus releasing the FlowFiles to the downstream relationship. NOTE: The maxvalue.* and fragment.count attributes will not be set on FlowFiles when this "
+ + "property is set.")
+ .defaultValue("0")
+ .required(true)
+ .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ public static final PropertyDescriptor MAX_FRAGMENTS = new PropertyDescriptor.Builder()
+ .name("qdbt-max-frags")
+ .displayName("Maximum Number of Fragments")
+ .description("The maximum number of fragments. If the value specified is zero, then all fragments are returned. " +
+ "This prevents OutOfMemoryError when this processor ingests huge table. NOTE: Setting this property can result in data loss, as the incoming results are "
+ + "not ordered, and fragments may end at arbitrary boundaries where rows are not included in the result set.")
+ .defaultValue("0")
+ .required(true)
+ .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return propDescriptors;
+ }
+
+ @Override
+ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+ return new PropertyDescriptor.Builder()
+ .name(propertyDescriptorName)
+ .required(false)
+ .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
+ .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .dynamic(true)
+ .build();
+ }
+
+ @OnScheduled
+ public void setup(final ProcessContext context) {
+ maxValueProperties = getDefaultMaxValueProperties(context, null);
+ }
+
+ @OnStopped
+ public void stop() {
+ // Reset the column type map in case properties change
+ setupComplete.set(false);
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
+ // Fetch the column/table info once
+ if (!setupComplete.get()) {
+ super.setup(context);
+ }
+ ProcessSession session = sessionFactory.createSession();
+ final List<FlowFile> resultSetFlowFiles = new ArrayList<>();
+
+ final ComponentLog logger = getLogger();
+
+ final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
+ final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue());
+ final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
+ final String columnNames = context.getProperty(COLUMN_NAMES).evaluateAttributeExpressions().getValue();
+ final String sqlQuery = context.getProperty(SQL_QUERY).evaluateAttributeExpressions().getValue();
+ final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions().getValue();
+ final String customWhereClause = context.getProperty(WHERE_CLAUSE).evaluateAttributeExpressions().getValue();
+ final Integer fetchSize = context.getProperty(FETCH_SIZE).evaluateAttributeExpressions().asInteger();
+ final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
+ final Integer outputBatchSizeField = context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
+ final int outputBatchSize = outputBatchSizeField == null ? 0 : outputBatchSizeField;
+ final Integer maxFragments = context.getProperty(MAX_FRAGMENTS).isSet()
+ ? context.getProperty(MAX_FRAGMENTS).evaluateAttributeExpressions().asInteger()
+ : 0;
+
+
+ SqlWriter sqlWriter = configureSqlWriter(session, context);
+
+ final StateManager stateManager = context.getStateManager();
+ final StateMap stateMap;
+
+ try {
+ stateMap = stateManager.getState(Scope.CLUSTER);
+ } catch (final IOException ioe) {
+ getLogger().error("Failed to retrieve observed maximum values from the State Manager. Will not perform "
+ + "query until this is accomplished.", ioe);
+ context.yield();
+ return;
+ }
+ // Make a mutable copy of the current state property map. This will be updated by the result row callback, and eventually
+ // set as the current state map (after the session has been committed)
+ final Map<String, String> statePropertyMap = new HashMap<>(stateMap.toMap());
+
+ //If an initial max value for column(s) has been specified using properties, and this column is not in the state manager, sync them to the state property map
+ for (final Map.Entry<String, String> maxProp : maxValueProperties.entrySet()) {
+ String maxPropKey = maxProp.getKey().toLowerCase();
+ String fullyQualifiedMaxPropKey = getStateKey(tableName, maxPropKey, dbAdapter);
+ if (!statePropertyMap.containsKey(fullyQualifiedMaxPropKey)) {
+ String newMaxPropValue;
+ // If we can't find the value at the fully-qualified key name, it is possible (under a previous scheme)
+ // the value has been stored under a key that is only the column name. Fall back to check the column name,
+ // but store the new initial max value under the fully-qualified key.
+ if (statePropertyMap.containsKey(maxPropKey)) {
+ newMaxPropValue = statePropertyMap.get(maxPropKey);
+ } else {
+ newMaxPropValue = maxProp.getValue();
+ }
+ statePropertyMap.put(fullyQualifiedMaxPropKey, newMaxPropValue);
+
+ }
+ }
+
+ List<String> maxValueColumnNameList = StringUtils.isEmpty(maxValueColumnNames)
+ ? null
+ : Arrays.asList(maxValueColumnNames.split("\\s*,\\s*"));
+ final String selectQuery = getQuery(dbAdapter, tableName, sqlQuery, columnNames, maxValueColumnNameList, customWhereClause, statePropertyMap);
+ final StopWatch stopWatch = new StopWatch(true);
+ final String fragmentIdentifier = UUID.randomUUID().toString();
+
+ try (final Connection con = dbcpService.getConnection(Collections.emptyMap());
+ final Statement st = con.createStatement()) {
+
+ if (fetchSize != null && fetchSize > 0) {
+ try {
+ st.setFetchSize(fetchSize);
+ } catch (SQLException se) {
+ // Not all drivers support this, just log the error (at debug level) and move on
+ logger.debug("Cannot set fetch size to {} due to {}", new Object[]{fetchSize, se.getLocalizedMessage()}, se);
+ }
+ }
+
+ String jdbcURL = "DBCPService";
+ try {
+ DatabaseMetaData databaseMetaData = con.getMetaData();
+ if (databaseMetaData != null) {
+ jdbcURL = databaseMetaData.getURL();
+ }
+ } catch (SQLException se) {
+ // Ignore and use default JDBC URL. This shouldn't happen unless the driver doesn't implement getMetaData() properly
+ }
+
+ final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS).intValue();
+ st.setQueryTimeout(queryTimeout); // timeout in seconds
+ if (logger.isDebugEnabled()) {
+ logger.debug("Executing query {}", new Object[] { selectQuery });
+ }
+ try (final ResultSet resultSet = st.executeQuery(selectQuery)) {
+ int fragmentIndex=0;
+ // Max values will be updated in the state property map by the callback
+ final MaxValueResultSetRowCollector maxValCollector = new MaxValueResultSetRowCollector(tableName, statePropertyMap, dbAdapter);
+
+ while(true) {
+ final AtomicLong nrOfRows = new AtomicLong(0L);
+
+ FlowFile fileToProcess = session.create();
+ try {
+ fileToProcess = session.write(fileToProcess, out -> {
+ try {
+ nrOfRows.set(sqlWriter.writeResultSet(resultSet, out, getLogger(), maxValCollector));
+ } catch (Exception e) {
+ throw new ProcessException("Error during database query or conversion of records.", e);
+ }
+ });
+ } catch (ProcessException e) {
+ // Add flowfile to results before rethrowing so it will be removed from session in outer catch
+ resultSetFlowFiles.add(fileToProcess);
+ throw e;
+ }
+
+ if (nrOfRows.get() > 0) {
+ // set attributes
+ final Map<String, String> attributesToAdd = new HashMap<>();
+ attributesToAdd.put(RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
+ attributesToAdd.put(RESULT_TABLENAME, tableName);
+
+ if(maxRowsPerFlowFile > 0) {
+ attributesToAdd.put(FRAGMENT_ID, fragmentIdentifier);
+ attributesToAdd.put(FRAGMENT_INDEX, String.valueOf(fragmentIndex));
+ }
+
+ attributesToAdd.putAll(sqlWriter.getAttributesToAdd());
+ fileToProcess = session.putAllAttributes(fileToProcess, attributesToAdd);
+ sqlWriter.updateCounters(session);
+
+ logger.info("{} contains {} records; transferring to 'success'",
+ new Object[]{fileToProcess, nrOfRows.get()});
+
+ session.getProvenanceReporter().receive(fileToProcess, jdbcURL, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+ resultSetFlowFiles.add(fileToProcess);
+ // If we've reached the batch size, send out the flow files
+ if (outputBatchSize > 0 && resultSetFlowFiles.size() >= outputBatchSize) {
+ session.transfer(resultSetFlowFiles, REL_SUCCESS);
+ session.commit();
+ resultSetFlowFiles.clear();
+ }
+ } else {
+ // If there were no rows returned, don't send the flowfile
+ session.remove(fileToProcess);
+ // If no rows and this was first FlowFile, yield
+ if(fragmentIndex == 0){
+ context.yield();
+ }
+ break;
+ }
+
+ fragmentIndex++;
+ if (maxFragments > 0 && fragmentIndex >= maxFragments) {
+ break;
+ }
+
+ // If we aren't splitting up the data into flow files or fragments, then the result set has been entirely fetched so don't loop back around
+ if (maxFragments == 0 && maxRowsPerFlowFile == 0) {
+ break;
+ }
+
+ // If we are splitting up the data into flow files, don't loop back around if we've gotten all results
+ if(maxRowsPerFlowFile > 0 && nrOfRows.get() < maxRowsPerFlowFile) {
+ break;
+ }
+ }
+
+ // Apply state changes from the Max Value tracker
+ maxValCollector.applyStateChanges();
+
+ // Even though the maximum value and total count are known at this point, to maintain consistent behavior if Output Batch Size is set, do not store the attributes
+ if (outputBatchSize == 0) {
+ for (int i = 0; i < resultSetFlowFiles.size(); i++) {
+ // Add maximum values as attributes
+ for (Map.Entry<String, String> entry : statePropertyMap.entrySet()) {
+ // Get just the column name from the key
+ String key = entry.getKey();
+ String colName = key.substring(key.lastIndexOf(NAMESPACE_DELIMITER) + NAMESPACE_DELIMITER.length());
+ resultSetFlowFiles.set(i, session.putAttribute(resultSetFlowFiles.get(i), "maxvalue." + colName, entry.getValue()));
+ }
+
+ //set count on all FlowFiles
+ if (maxRowsPerFlowFile > 0) {
+ resultSetFlowFiles.set(i,
+ session.putAttribute(resultSetFlowFiles.get(i), "fragment.count", Integer.toString(fragmentIndex)));
+ }
+ }
+ }
+ } catch (final SQLException e) {
+ throw e;
+ }
+
+ session.transfer(resultSetFlowFiles, REL_SUCCESS);
+
+ } catch (final ProcessException | SQLException e) {
+ logger.error("Unable to execute SQL select query {} due to {}", new Object[]{selectQuery, e});
+ if (!resultSetFlowFiles.isEmpty()) {
+ session.remove(resultSetFlowFiles);
+ }
+ context.yield();
+ } finally {
+ session.commit();
+ try {
+ // Update the state
+ stateManager.setState(statePropertyMap, Scope.CLUSTER);
+ } catch (IOException ioe) {
+ getLogger().error("{} failed to update State Manager, maximum observed values will not be recorded", new Object[]{this, ioe});
+ }
+ }
+ }
+
+ protected String getQuery(DatabaseAdapter dbAdapter, String tableName, String columnNames, List<String> maxValColumnNames,
+ String customWhereClause, Map<String, String> stateMap) {
+
+ return getQuery(dbAdapter, tableName, null, columnNames, maxValColumnNames, customWhereClause, stateMap);
+ }
+
+ protected String getQuery(DatabaseAdapter dbAdapter, String tableName, String sqlQuery, String columnNames, List<String> maxValColumnNames,
+ String customWhereClause, Map<String, String> stateMap) {
+ if (StringUtils.isEmpty(tableName)) {
+ throw new IllegalArgumentException("Table name must be specified");
+ }
+ final StringBuilder query;
+
+ if (StringUtils.isEmpty(sqlQuery)) {
+ query = new StringBuilder(dbAdapter.getSelectStatement(tableName, columnNames, null, null, null, null));
+ } else {
+ query = getWrappedQuery(dbAdapter, sqlQuery, tableName);
+ }
+
+ List<String> whereClauses = new ArrayList<>();
+ // Check state map for last max values
+ if (stateMap != null && !stateMap.isEmpty() && maxValColumnNames != null) {
+ IntStream.range(0, maxValColumnNames.size()).forEach((index) -> {
+ String colName = maxValColumnNames.get(index);
+ String maxValueKey = getStateKey(tableName, colName, dbAdapter);
+ String maxValue = stateMap.get(maxValueKey);
+ if (StringUtils.isEmpty(maxValue)) {
+ // If we can't find the value at the fully-qualified key name, it is possible (under a previous scheme)
+ // the value has been stored under a key that is only the column name. Fall back to check the column name; either way, when a new
+ // maximum value is observed, it will be stored under the fully-qualified key from then on.
+ maxValue = stateMap.get(colName.toLowerCase());
+ }
+ if (!StringUtils.isEmpty(maxValue)) {
+ Integer type = columnTypeMap.get(maxValueKey);
+ if (type == null) {
+ // This shouldn't happen as we are populating columnTypeMap when the processor is scheduled.
+ throw new IllegalArgumentException("No column type found for: " + colName);
+ }
+ // Add a condition for the WHERE clause
+ whereClauses.add(colName + (index == 0 ? " > " : " >= ") + getLiteralByType(type, maxValue, dbAdapter.getName()));
+ }
+ });
+ }
+
+ if (customWhereClause != null) {
+ whereClauses.add("(" + customWhereClause + ")");
+ }
+
+ if (!whereClauses.isEmpty()) {
+ query.append(" WHERE ");
+ query.append(StringUtils.join(whereClauses, " AND "));
+ }
+
+ return query.toString();
+ }
+
+ public class MaxValueResultSetRowCollector implements JdbcCommon.ResultSetRowCallback {
+ DatabaseAdapter dbAdapter;
+ final Map<String, String> newColMap;
+ final Map<String, String> originalState;
+ String tableName;
+
+ public MaxValueResultSetRowCollector(String tableName, Map<String, String> stateMap, DatabaseAdapter dbAdapter) {
+ this.dbAdapter = dbAdapter;
+ this.originalState = stateMap;
+
+ this.newColMap = new HashMap<>();
+ this.newColMap.putAll(stateMap);
+
+ this.tableName = tableName;
+ }
+
+ @Override
+ public void processRow(ResultSet resultSet) throws IOException {
+ if (resultSet == null) {
+ return;
+ }
+ try {
+ // Iterate over the row, check-and-set max values
+ final ResultSetMetaData meta = resultSet.getMetaData();
+ final int nrOfColumns = meta.getColumnCount();
+ if (nrOfColumns > 0) {
+ for (int i = 1; i <= nrOfColumns; i++) {
+ String colName = meta.getColumnName(i).toLowerCase();
+ String fullyQualifiedMaxValueKey = getStateKey(tableName, colName, dbAdapter);
+ Integer type = columnTypeMap.get(fullyQualifiedMaxValueKey);
+ // Skip any columns we're not keeping track of or whose value is null
+ if (type == null || resultSet.getObject(i) == null) {
+ continue;
+ }
+ String maxValueString = newColMap.get(fullyQualifiedMaxValueKey);
+ // If we can't find the value at the fully-qualified key name, it is possible (under a previous scheme)
+ // the value has been stored under a key that is only the column name. Fall back to check the column name; either way, when a new
+ // maximum value is observed, it will be stored under the fully-qualified key from then on.
+ if (StringUtils.isEmpty(maxValueString)) {
+ maxValueString = newColMap.get(colName);
+ }
+ String newMaxValueString = getMaxValueFromRow(resultSet, i, type, maxValueString, dbAdapter.getName());
+ if (newMaxValueString != null) {
+ newColMap.put(fullyQualifiedMaxValueKey, newMaxValueString);
+ }
+ }
+ }
+ } catch (ParseException | SQLException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void applyStateChanges() {
+ this.originalState.putAll(this.newColMap);
+ }
+ }
+
+ protected abstract SqlWriter configureSqlWriter(ProcessSession session, ProcessContext context);
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
index df82e2e..cc6d508 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
@@ -16,22 +16,12 @@
*/
package org.apache.nifi.processors.standard;
-import java.nio.charset.Charset;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -41,23 +31,16 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.flowfile.attributes.FragmentAttributes;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.standard.util.AvroUtil.CodecType;
+import org.apache.nifi.processors.standard.sql.DefaultAvroSqlWriter;
+import org.apache.nifi.processors.standard.sql.SqlWriter;
import org.apache.nifi.processors.standard.util.JdbcCommon;
-import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.processors.standard.util.AvroUtil.CodecType;
import static org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_PRECISION;
import static org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_SCALE;
@@ -94,99 +77,24 @@ import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGIC
+ "'yyyy-MM-dd HH:mm:ss.SSS' for Timestamp is used.")
})
@WritesAttributes({
- @WritesAttribute(attribute="executesql.row.count", description = "Contains the number of rows returned in the select query"),
- @WritesAttribute(attribute="executesql.query.duration", description = "Combined duration of the query execution time and fetch time in milliseconds"),
- @WritesAttribute(attribute="executesql.query.executiontime", description = "Duration of the query execution time in milliseconds"),
- @WritesAttribute(attribute="executesql.query.fetchtime", description = "Duration of the result set fetch time in milliseconds"),
- @WritesAttribute(attribute="executesql.resultset.index", description = "Assuming multiple result sets are returned, "
- + "the zero based index of this result set."),
- @WritesAttribute(attribute="fragment.identifier", description="If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set "
- + "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."),
- @WritesAttribute(attribute = "fragment.count", description = "If 'Max Rows Per Flow File' is set then this is the total number of "
- + "FlowFiles produced by a single ResultSet. This can be used in conjunction with the "
- + "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming ResultSet. If Output Batch Size is set, then this "
- + "attribute will not be populated."),
- @WritesAttribute(attribute="fragment.index", description="If 'Max Rows Per Flow File' is set then the position of this FlowFile in the list of "
- + "outgoing FlowFiles that were all derived from the same result set FlowFile. This can be "
- + "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same query result set and in what order "
+ @WritesAttribute(attribute = "executesql.row.count", description = "Contains the number of rows returned in the select query"),
+ @WritesAttribute(attribute = "executesql.query.duration", description = "Combined duration of the query execution time and fetch time in milliseconds"),
+ @WritesAttribute(attribute = "executesql.query.executiontime", description = "Duration of the query execution time in milliseconds"),
+ @WritesAttribute(attribute = "executesql.query.fetchtime", description = "Duration of the result set fetch time in milliseconds"),
+ @WritesAttribute(attribute = "executesql.resultset.index", description = "Assuming multiple result sets are returned, "
+ + "the zero based index of this result set."),
+ @WritesAttribute(attribute = "fragment.identifier", description = "If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set "
+ + "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."),
+ @WritesAttribute(attribute = "fragment.count", description = "If 'Max Rows Per Flow File' is set then this is the total number of "
+ + "FlowFiles produced by a single ResultSet. This can be used in conjunction with the "
+ + "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming ResultSet. If Output Batch Size is set, then this "
+ + "attribute will not be populated."),
+ @WritesAttribute(attribute = "fragment.index", description = "If 'Max Rows Per Flow File' is set then the position of this FlowFile in the list of "
+ + "outgoing FlowFiles that were all derived from the same result set FlowFile. This can be "
+ + "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same query result set and in what order "
+ "FlowFiles were produced")
})
-public class ExecuteSQL extends AbstractProcessor {
-
- public static final String RESULT_ROW_COUNT = "executesql.row.count";
- public static final String RESULT_QUERY_DURATION = "executesql.query.duration";
- public static final String RESULT_QUERY_EXECUTION_TIME = "executesql.query.executiontime";
- public static final String RESULT_QUERY_FETCH_TIME = "executesql.query.fetchtime";
- public static final String RESULTSET_INDEX = "executesql.resultset.index";
-
- public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key();
- public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key();
- public static final String FRAGMENT_COUNT = FragmentAttributes.FRAGMENT_COUNT.key();
-
- // Relationships
- public static final Relationship REL_SUCCESS = new Relationship.Builder()
- .name("success")
- .description("Successfully created FlowFile from SQL query result set.")
- .build();
- public static final Relationship REL_FAILURE = new Relationship.Builder()
- .name("failure")
- .description("SQL query execution failed. Incoming FlowFile will be penalized and routed to this relationship")
- .build();
- private final Set<Relationship> relationships;
-
- public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder()
- .name("Database Connection Pooling Service")
- .description("The Controller Service that is used to obtain connection to database")
- .required(true)
- .identifiesControllerService(DBCPService.class)
- .build();
-
- public static final PropertyDescriptor SQL_SELECT_QUERY = new PropertyDescriptor.Builder()
- .name("SQL select query")
- .description("The SQL select query to execute. The query can be empty, a constant value, or built from attributes "
- + "using Expression Language. If this property is specified, it will be used regardless of the content of "
- + "incoming flowfiles. If this property is empty, the content of the incoming flow file is expected "
- + "to contain a valid SQL select query, to be issued by the processor to the database. Note that Expression "
- + "Language is not evaluated for flow file contents.")
- .required(false)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .build();
-
- public static final PropertyDescriptor QUERY_TIMEOUT = new PropertyDescriptor.Builder()
- .name("Max Wait Time")
- .description("The maximum amount of time allowed for a running SQL select query "
- + " , zero means there is no limit. Max time less than 1 second will be equal to zero.")
- .defaultValue("0 seconds")
- .required(true)
- .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
- .sensitive(false)
- .build();
-
- public static final PropertyDescriptor MAX_ROWS_PER_FLOW_FILE = new PropertyDescriptor.Builder()
- .name("esql-max-rows")
- .displayName("Max Rows Per Flow File")
- .description("The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large "
- + "result sets into multiple FlowFiles. If the value specified is zero, then all rows are returned in a single FlowFile.")
- .defaultValue("0")
- .required(true)
- .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .build();
-
- public static final PropertyDescriptor OUTPUT_BATCH_SIZE = new PropertyDescriptor.Builder()
- .name("esql-output-batch-size")
- .displayName("Output Batch Size")
- .description("The number of output FlowFiles to queue before committing the process session. When set to zero, the session will be committed when all result set rows "
- + "have been processed and the output FlowFiles are ready for transfer to the downstream relationship. For large result sets, this can cause a large burst of FlowFiles "
- + "to be transferred at the end of processor execution. If this property is set, then when the specified number of FlowFiles are ready for transfer, then the session will "
- + "be committed, thus releasing the FlowFiles to the downstream relationship. NOTE: The fragment.count attribute will not be set on FlowFiles when this "
- + "property is set.")
- .defaultValue("0")
- .required(true)
- .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .build();
+public class ExecuteSQL extends AbstractExecuteSQL {
public static final PropertyDescriptor COMPRESSION_FORMAT = new PropertyDescriptor.Builder()
.name("compression-format")
@@ -198,8 +106,6 @@ public class ExecuteSQL extends AbstractProcessor {
.required(true)
.build();
- private final List<PropertyDescriptor> propDescriptors;
-
public ExecuteSQL() {
final Set<Relationship> r = new HashSet<>();
r.add(REL_SUCCESS);
@@ -212,248 +118,31 @@ public class ExecuteSQL extends AbstractProcessor {
pds.add(QUERY_TIMEOUT);
pds.add(NORMALIZE_NAMES_FOR_AVRO);
pds.add(USE_AVRO_LOGICAL_TYPES);
+ pds.add(COMPRESSION_FORMAT);
pds.add(DEFAULT_PRECISION);
pds.add(DEFAULT_SCALE);
pds.add(MAX_ROWS_PER_FLOW_FILE);
pds.add(OUTPUT_BATCH_SIZE);
- pds.add(COMPRESSION_FORMAT);
propDescriptors = Collections.unmodifiableList(pds);
}
@Override
- public Set<Relationship> getRelationships() {
- return relationships;
- }
-
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return propDescriptors;
- }
-
- @OnScheduled
- public void setup(ProcessContext context) {
- // If the query is not set, then an incoming flow file is needed. Otherwise fail the initialization
- if (!context.getProperty(SQL_SELECT_QUERY).isSet() && !context.hasIncomingConnection()) {
- final String errorString = "Either the Select Query must be specified or there must be an incoming connection "
- + "providing flowfile(s) containing a SQL select query";
- getLogger().error(errorString);
- throw new ProcessException(errorString);
- }
- }
-
- @Override
- public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
- FlowFile fileToProcess = null;
- if (context.hasIncomingConnection()) {
- fileToProcess = session.get();
-
- // If we have no FlowFile, and all incoming connections are self-loops then we can continue on.
- // However, if we have no FlowFile and we have connections coming from other Processors, then
- // we know that we should run only if we have a FlowFile.
- if (fileToProcess == null && context.hasNonLoopConnection()) {
- return;
- }
- }
-
- final List<FlowFile> resultSetFlowFiles = new ArrayList<>();
-
- final ComponentLog logger = getLogger();
- final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
- final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
+ protected SqlWriter configureSqlWriter(ProcessSession session, ProcessContext context, FlowFile fileToProcess) {
final boolean convertNamesForAvro = context.getProperty(NORMALIZE_NAMES_FOR_AVRO).asBoolean();
final Boolean useAvroLogicalTypes = context.getProperty(USE_AVRO_LOGICAL_TYPES).asBoolean();
final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
- final Integer outputBatchSizeField = context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
- final int outputBatchSize = outputBatchSizeField == null ? 0 : outputBatchSizeField;
final Integer defaultPrecision = context.getProperty(DEFAULT_PRECISION).evaluateAttributeExpressions(fileToProcess).asInteger();
final Integer defaultScale = context.getProperty(DEFAULT_SCALE).evaluateAttributeExpressions(fileToProcess).asInteger();
final String codec = context.getProperty(COMPRESSION_FORMAT).getValue();
- final String selectQuery;
- if (context.getProperty(SQL_SELECT_QUERY).isSet()) {
- selectQuery = context.getProperty(SQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
- } else {
- // If the query is not set, then an incoming flow file is required, and expected to contain a valid SQL select query.
- // If there is no incoming connection, onTrigger will not be called as the processor will fail when scheduled.
- final StringBuilder queryContents = new StringBuilder();
- session.read(fileToProcess, in -> queryContents.append(IOUtils.toString(in, Charset.defaultCharset())));
- selectQuery = queryContents.toString();
- }
-
- int resultCount=0;
- try (final Connection con = dbcpService.getConnection(fileToProcess == null ? Collections.emptyMap() : fileToProcess.getAttributes());
- final PreparedStatement st = con.prepareStatement(selectQuery)) {
- st.setQueryTimeout(queryTimeout); // timeout in seconds
-
- if (fileToProcess != null) {
- JdbcCommon.setParameters(st, fileToProcess.getAttributes());
- }
- logger.debug("Executing query {}", new Object[]{selectQuery});
-
- int fragmentIndex=0;
- final String fragmentId = UUID.randomUUID().toString();
-
- final StopWatch executionTime = new StopWatch(true);
-
- boolean hasResults = st.execute();
-
- long executionTimeElapsed = executionTime.getElapsed(TimeUnit.MILLISECONDS);
-
- boolean hasUpdateCount = st.getUpdateCount() != -1;
-
- while(hasResults || hasUpdateCount) {
- //getMoreResults() and execute() return false to indicate that the result of the statement is just a number and not a ResultSet
- if (hasResults) {
- final AtomicLong nrOfRows = new AtomicLong(0L);
-
- try {
- final ResultSet resultSet = st.getResultSet();
- final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder()
- .convertNames(convertNamesForAvro)
- .useLogicalTypes(useAvroLogicalTypes)
- .defaultPrecision(defaultPrecision)
- .defaultScale(defaultScale)
- .maxRows(maxRowsPerFlowFile)
- .codecFactory(codec)
- .build();
-
- do {
- final StopWatch fetchTime = new StopWatch(true);
-
- FlowFile resultSetFF;
- if (fileToProcess == null) {
- resultSetFF = session.create();
- } else {
- resultSetFF = session.create(fileToProcess);
- resultSetFF = session.putAllAttributes(resultSetFF, fileToProcess.getAttributes());
- }
-
- try {
- resultSetFF = session.write(resultSetFF, out -> {
- try {
- nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, null));
- } catch (SQLException e) {
- throw new ProcessException(e);
- }
- });
-
- long fetchTimeElapsed = fetchTime.getElapsed(TimeUnit.MILLISECONDS);
-
- // set attribute how many rows were selected
- resultSetFF = session.putAttribute(resultSetFF, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
- resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_DURATION, String.valueOf(executionTimeElapsed + fetchTimeElapsed));
- resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_EXECUTION_TIME, String.valueOf(executionTimeElapsed));
- resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_FETCH_TIME, String.valueOf(fetchTimeElapsed));
- resultSetFF = session.putAttribute(resultSetFF, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
- resultSetFF = session.putAttribute(resultSetFF, RESULTSET_INDEX, String.valueOf(resultCount));
-
- // if fragmented ResultSet, determine if we should keep this fragment; set fragment attributes
- if (maxRowsPerFlowFile > 0) {
- // if row count is zero and this is not the first fragment, drop it instead of committing it.
- if (nrOfRows.get() == 0 && fragmentIndex > 0) {
- session.remove(resultSetFF);
- break;
- }
-
- resultSetFF = session.putAttribute(resultSetFF, FRAGMENT_ID, fragmentId);
- resultSetFF = session.putAttribute(resultSetFF, FRAGMENT_INDEX, String.valueOf(fragmentIndex));
- }
-
- logger.info("{} contains {} Avro records; transferring to 'success'",
- new Object[]{resultSetFF, nrOfRows.get()});
- session.getProvenanceReporter().modifyContent(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed + fetchTimeElapsed);
- resultSetFlowFiles.add(resultSetFF);
-
- // If we've reached the batch size, send out the flow files
- if (outputBatchSize > 0 && resultSetFlowFiles.size() >= outputBatchSize) {
- session.transfer(resultSetFlowFiles, REL_SUCCESS);
- session.commit();
- resultSetFlowFiles.clear();
- }
-
- fragmentIndex++;
- } catch (Exception e) {
- // Remove the result set flow file and propagate the exception
- session.remove(resultSetFF);
- if (e instanceof ProcessException) {
- throw (ProcessException) e;
- } else {
- throw new ProcessException(e);
- }
- }
- } while (maxRowsPerFlowFile > 0 && nrOfRows.get() == maxRowsPerFlowFile);
-
- // If we are splitting results but not outputting batches, set count on all FlowFiles
- if (outputBatchSize == 0 && maxRowsPerFlowFile > 0) {
- for (int i = 0; i < resultSetFlowFiles.size(); i++) {
- resultSetFlowFiles.set(i,
- session.putAttribute(resultSetFlowFiles.get(i), FRAGMENT_COUNT, Integer.toString(fragmentIndex)));
- }
- }
- } catch (final SQLException e) {
- throw new ProcessException(e);
- }
-
- resultCount++;
- }
-
- // are there anymore result sets?
- try{
- hasResults = st.getMoreResults(Statement.CLOSE_CURRENT_RESULT);
- hasUpdateCount = st.getUpdateCount() != -1;
- } catch(SQLException ex){
- hasResults = false;
- hasUpdateCount = false;
- }
- }
-
- // Transfer any remaining files to SUCCESS
- session.transfer(resultSetFlowFiles, REL_SUCCESS);
- resultSetFlowFiles.clear();
-
- //If we had at least one result then it's OK to drop the original file, but if we had no results then
- // pass the original flow file down the line to trigger downstream processors
- if(fileToProcess != null){
- if(resultCount > 0){
- session.remove(fileToProcess);
- } else {
- fileToProcess = session.write(fileToProcess, JdbcCommon::createEmptyAvroStream);
-
- fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, "0");
- fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
- session.transfer(fileToProcess, REL_SUCCESS);
- }
- } else if(resultCount == 0){
- //If we had no inbound FlowFile, no exceptions, and the SQL generated no result sets (Insert/Update/Delete statements only)
- // Then generate an empty Output FlowFile
- FlowFile resultSetFF = session.create();
-
- resultSetFF = session.write(resultSetFF, out -> JdbcCommon.createEmptyAvroStream(out));
-
- resultSetFF = session.putAttribute(resultSetFF, RESULT_ROW_COUNT, "0");
- resultSetFF = session.putAttribute(resultSetFF, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
- session.transfer(resultSetFF, REL_SUCCESS);
- }
- } catch (final ProcessException | SQLException e) {
- //If we had at least one result then it's OK to drop the original file, but if we had no results then
- // pass the original flow file down the line to trigger downstream processors
- if (fileToProcess == null) {
- // This can happen if any exceptions occur while setting up the connection, statement, etc.
- logger.error("Unable to execute SQL select query {} due to {}. No FlowFile to route to failure",
- new Object[]{selectQuery, e});
- context.yield();
- } else {
- if (context.hasIncomingConnection()) {
- logger.error("Unable to execute SQL select query {} for {} due to {}; routing to failure",
- new Object[]{selectQuery, fileToProcess, e});
- fileToProcess = session.penalize(fileToProcess);
- } else {
- logger.error("Unable to execute SQL select query {} due to {}; routing to failure",
- new Object[]{selectQuery, e});
- context.yield();
- }
- session.transfer(fileToProcess, REL_FAILURE);
- }
- }
+ final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder()
+ .convertNames(convertNamesForAvro)
+ .useLogicalTypes(useAvroLogicalTypes)
+ .defaultPrecision(defaultPrecision)
+ .defaultScale(defaultScale)
+ .maxRows(maxRowsPerFlowFile)
+ .codecFactory(codec)
+ .build();
+ return new DefaultAvroSqlWriter(options);
}
}