You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mt...@apache.org on 2018/04/24 14:28:34 UTC
nifi git commit: NIFI-4561 ExecuteSQL returns no FlowFile for some
queries
Repository: nifi
Updated Branches:
refs/heads/master 5ca6261de -> 0390c0f19
NIFI-4561 ExecuteSQL returns no FlowFile for some queries
This closes #2243
Signed-off-by: Mike Thomsen <mi...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0390c0f1
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0390c0f1
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0390c0f1
Branch: refs/heads/master
Commit: 0390c0f1967d1a57a333d15e1ec41b06ceb88590
Parents: 5ca6261
Author: patricker <pa...@gmail.com>
Authored: Wed Nov 1 10:25:26 2017 +0800
Committer: Mike Thomsen <mi...@gmail.com>
Committed: Tue Apr 24 10:27:44 2018 -0400
----------------------------------------------------------------------
.../nifi/processors/standard/ExecuteSQL.java | 99 ++++++++++++--------
.../processors/standard/TestExecuteSQL.java | 26 +++++
2 files changed, 87 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/0390c0f1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
index 203d02a..13be8d5 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
@@ -21,6 +21,7 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -91,12 +92,15 @@ import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGIC
})
@WritesAttributes({
@WritesAttribute(attribute="executesql.row.count", description = "Contains the number of rows returned in the select query"),
- @WritesAttribute(attribute="executesql.query.duration", description = "Duration of the query in milliseconds")
+ @WritesAttribute(attribute="executesql.query.duration", description = "Duration of the query in milliseconds"),
+ @WritesAttribute(attribute="executesql.resultset.index", description = "Assuming multiple result sets are returned, "
+ + "the zero based index of this result set.")
})
public class ExecuteSQL extends AbstractProcessor {
public static final String RESULT_ROW_COUNT = "executesql.row.count";
public static final String RESULT_QUERY_DURATION = "executesql.query.duration";
+ public static final String RESULTSET_INDEX = "executesql.resultset.index";
// Relationships
public static final Relationship REL_SUCCESS = new Relationship.Builder()
@@ -220,53 +224,60 @@ public class ExecuteSQL extends AbstractProcessor {
JdbcCommon.setParameters(st, fileToProcess.getAttributes());
}
logger.debug("Executing query {}", new Object[]{selectQuery});
- boolean results = st.execute();
+ boolean hasResults = st.execute();
+ boolean hasUpdateCount = st.getUpdateCount() != -1;
+ while(hasResults || hasUpdateCount) {
+ //getMoreResults() and execute() return false to indicate that the result of the statement is just a number and not a ResultSet
+ if (hasResults) {
+ FlowFile resultSetFF;
+ if (fileToProcess == null) {
+ resultSetFF = session.create();
+ } else {
+ resultSetFF = session.create(fileToProcess);
+ resultSetFF = session.putAllAttributes(resultSetFF, fileToProcess.getAttributes());
+ }
- while(results){
- FlowFile resultSetFF;
- if(fileToProcess == null){
- resultSetFF = session.create();
- } else {
- resultSetFF = session.create(fileToProcess);
- resultSetFF = session.putAllAttributes(resultSetFF, fileToProcess.getAttributes());
- }
+ final AtomicLong nrOfRows = new AtomicLong(0L);
+ resultSetFF = session.write(resultSetFF, out -> {
+ try {
- final AtomicLong nrOfRows = new AtomicLong(0L);
- resultSetFF = session.write(resultSetFF, out -> {
- try {
-
- final ResultSet resultSet = st.getResultSet();
- final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder()
- .convertNames(convertNamesForAvro)
- .useLogicalTypes(useAvroLogicalTypes)
- .defaultPrecision(defaultPrecision)
- .defaultScale(defaultScale)
- .build();
- nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, null));
- } catch (final SQLException e) {
- throw new ProcessException(e);
- }
- });
+ final ResultSet resultSet = st.getResultSet();
+ final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder()
+ .convertNames(convertNamesForAvro)
+ .useLogicalTypes(useAvroLogicalTypes)
+ .defaultPrecision(defaultPrecision)
+ .defaultScale(defaultScale)
+ .build();
+ nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, null));
+ } catch (final SQLException e) {
+ throw new ProcessException(e);
+ }
+ });
- long duration = stopWatch.getElapsed(TimeUnit.MILLISECONDS);
+ long duration = stopWatch.getElapsed(TimeUnit.MILLISECONDS);
- // set attribute how many rows were selected
- resultSetFF = session.putAttribute(resultSetFF, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
- resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_DURATION, String.valueOf(duration));
- resultSetFF = session.putAttribute(resultSetFF, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
+ // set attribute how many rows were selected
+ resultSetFF = session.putAttribute(resultSetFF, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
+ resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_DURATION, String.valueOf(duration));
+ resultSetFF = session.putAttribute(resultSetFF, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
+ resultSetFF = session.putAttribute(resultSetFF, RESULTSET_INDEX, String.valueOf(resultCount));
- logger.info("{} contains {} Avro records; transferring to 'success'",
- new Object[]{resultSetFF, nrOfRows.get()});
- session.getProvenanceReporter().modifyContent(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", duration);
- session.transfer(resultSetFF, REL_SUCCESS);
+ logger.info("{} contains {} Avro records; transferring to 'success'",
+ new Object[]{resultSetFF, nrOfRows.get()});
+ session.getProvenanceReporter().modifyContent(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", duration);
+ session.transfer(resultSetFF, REL_SUCCESS);
+
+ resultCount++;
+ }
- resultCount++;
// are there anymore result sets?
try{
- results = st.getMoreResults();
+ hasResults = st.getMoreResults(Statement.CLOSE_CURRENT_RESULT);
+ hasUpdateCount = st.getUpdateCount() != -1;
} catch(SQLException ex){
- results = false;
+ hasResults = false;
+ hasUpdateCount = false;
}
}
@@ -278,8 +289,20 @@ public class ExecuteSQL extends AbstractProcessor {
} else {
fileToProcess = session.write(fileToProcess, JdbcCommon::createEmptyAvroStream);
+ fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, "0");
+ fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
session.transfer(fileToProcess, REL_SUCCESS);
}
+ } else if(resultCount == 0){
+ //If we had no inbound FlowFile, no exceptions, and the SQL generated no result sets (Insert/Update/Delete statements only)
+ // Then generate an empty Output FlowFile
+ FlowFile resultSetFF = session.create();
+
+ resultSetFF = session.write(resultSetFF, out -> JdbcCommon.createEmptyAvroStream(out));
+
+ resultSetFF = session.putAttribute(resultSetFF, RESULT_ROW_COUNT, "0");
+ resultSetFF = session.putAttribute(resultSetFF, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
+ session.transfer(resultSetFF, REL_SUCCESS);
}
} catch (final ProcessException | SQLException e) {
//If we had at least one result then it's OK to drop the original file, but if we had no results then
http://git-wip-us.apache.org/repos/asf/nifi/blob/0390c0f1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
index 3a0b773..b4a7c69 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
@@ -189,6 +189,32 @@ public class TestExecuteSQL {
runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0).assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "2");
+ runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0).assertAttributeEquals(ExecuteSQL.RESULTSET_INDEX, "0");
+ }
+
+ @Test
+ public void testInsertStatementCreatesFlowFile() throws SQLException {
+ // remove previous test database, if any
+ final File dbLocation = new File(DB_LOCATION);
+ dbLocation.delete();
+
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+
+ try {
+ stmt.execute("drop table TEST_NULL_INT");
+ } catch (final SQLException sqle) {
+ }
+
+ stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
+
+ runner.setIncomingConnection(false);
+ runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "insert into TEST_NULL_INT (id, val1, val2) VALUES (0, NULL, 1)");
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
+ runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0).assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "0");
}
@Test