You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mt...@apache.org on 2018/04/24 14:28:34 UTC

nifi git commit: NIFI-4561 ExecuteSQL returns no FlowFile for some queries

Repository: nifi
Updated Branches:
  refs/heads/master 5ca6261de -> 0390c0f19


NIFI-4561 ExecuteSQL returns no FlowFile for some queries

This closes #2243

Signed-off-by: Mike Thomsen <mi...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0390c0f1
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0390c0f1
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0390c0f1

Branch: refs/heads/master
Commit: 0390c0f1967d1a57a333d15e1ec41b06ceb88590
Parents: 5ca6261
Author: patricker <pa...@gmail.com>
Authored: Wed Nov 1 10:25:26 2017 +0800
Committer: Mike Thomsen <mi...@gmail.com>
Committed: Tue Apr 24 10:27:44 2018 -0400

----------------------------------------------------------------------
 .../nifi/processors/standard/ExecuteSQL.java    | 99 ++++++++++++--------
 .../processors/standard/TestExecuteSQL.java     | 26 +++++
 2 files changed, 87 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/0390c0f1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
index 203d02a..13be8d5 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
@@ -21,6 +21,7 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -91,12 +92,15 @@ import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGIC
 })
 @WritesAttributes({
     @WritesAttribute(attribute="executesql.row.count", description = "Contains the number of rows returned in the select query"),
-    @WritesAttribute(attribute="executesql.query.duration", description = "Duration of the query in milliseconds")
+    @WritesAttribute(attribute="executesql.query.duration", description = "Duration of the query in milliseconds"),
+    @WritesAttribute(attribute="executesql.resultset.index", description = "Assuming multiple result sets are returned, "
+       + "the zero based index of this result set.")
 })
 public class ExecuteSQL extends AbstractProcessor {
 
     public static final String RESULT_ROW_COUNT = "executesql.row.count";
     public static final String RESULT_QUERY_DURATION = "executesql.query.duration";
+    public static final String RESULTSET_INDEX = "executesql.resultset.index";
 
     // Relationships
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
@@ -220,53 +224,60 @@ public class ExecuteSQL extends AbstractProcessor {
                 JdbcCommon.setParameters(st, fileToProcess.getAttributes());
             }
             logger.debug("Executing query {}", new Object[]{selectQuery});
-            boolean results = st.execute();
+            boolean hasResults = st.execute();
+            boolean hasUpdateCount = st.getUpdateCount() != -1;
 
+            while(hasResults || hasUpdateCount) {
+                //getMoreResults() and execute() return false to indicate that the result of the statement is just a number and not a ResultSet
+                if (hasResults) {
+                    FlowFile resultSetFF;
+                    if (fileToProcess == null) {
+                        resultSetFF = session.create();
+                    } else {
+                        resultSetFF = session.create(fileToProcess);
+                        resultSetFF = session.putAllAttributes(resultSetFF, fileToProcess.getAttributes());
+                    }
 
-            while(results){
-                FlowFile resultSetFF;
-                if(fileToProcess == null){
-                    resultSetFF = session.create();
-                } else {
-                    resultSetFF = session.create(fileToProcess);
-                    resultSetFF = session.putAllAttributes(resultSetFF, fileToProcess.getAttributes());
-                }
+                    final AtomicLong nrOfRows = new AtomicLong(0L);
+                    resultSetFF = session.write(resultSetFF, out -> {
+                        try {
 
-                final AtomicLong nrOfRows = new AtomicLong(0L);
-                resultSetFF = session.write(resultSetFF, out -> {
-                    try {
-
-                        final ResultSet resultSet = st.getResultSet();
-                        final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder()
-                                .convertNames(convertNamesForAvro)
-                                .useLogicalTypes(useAvroLogicalTypes)
-                                .defaultPrecision(defaultPrecision)
-                                .defaultScale(defaultScale)
-                                .build();
-                        nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, null));
-                    } catch (final SQLException e) {
-                        throw new ProcessException(e);
-                    }
-                });
+                            final ResultSet resultSet = st.getResultSet();
+                            final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder()
+                                    .convertNames(convertNamesForAvro)
+                                    .useLogicalTypes(useAvroLogicalTypes)
+                                    .defaultPrecision(defaultPrecision)
+                                    .defaultScale(defaultScale)
+                                    .build();
+                            nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, null));
+                        } catch (final SQLException e) {
+                            throw new ProcessException(e);
+                        }
+                    });
 
-                long duration = stopWatch.getElapsed(TimeUnit.MILLISECONDS);
+                    long duration = stopWatch.getElapsed(TimeUnit.MILLISECONDS);
 
-                // set attribute how many rows were selected
-                resultSetFF = session.putAttribute(resultSetFF, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
-                resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_DURATION, String.valueOf(duration));
-                resultSetFF = session.putAttribute(resultSetFF, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
+                    // set attribute how many rows were selected
+                    resultSetFF = session.putAttribute(resultSetFF, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
+                    resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_DURATION, String.valueOf(duration));
+                    resultSetFF = session.putAttribute(resultSetFF, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
+                    resultSetFF = session.putAttribute(resultSetFF, RESULTSET_INDEX, String.valueOf(resultCount));
 
-                logger.info("{} contains {} Avro records; transferring to 'success'",
-                        new Object[]{resultSetFF, nrOfRows.get()});
-                session.getProvenanceReporter().modifyContent(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", duration);
-                session.transfer(resultSetFF, REL_SUCCESS);
+                    logger.info("{} contains {} Avro records; transferring to 'success'",
+                            new Object[]{resultSetFF, nrOfRows.get()});
+                    session.getProvenanceReporter().modifyContent(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", duration);
+                    session.transfer(resultSetFF, REL_SUCCESS);
+
+                    resultCount++;
+                }
 
-                resultCount++;
                 // are there anymore result sets?
                 try{
-                    results = st.getMoreResults();
+                    hasResults = st.getMoreResults(Statement.CLOSE_CURRENT_RESULT);
+                    hasUpdateCount = st.getUpdateCount() != -1;
                 } catch(SQLException ex){
-                    results = false;
+                    hasResults = false;
+                    hasUpdateCount = false;
                 }
             }
 
@@ -278,8 +289,20 @@ public class ExecuteSQL extends AbstractProcessor {
                 } else {
                     fileToProcess = session.write(fileToProcess, JdbcCommon::createEmptyAvroStream);
 
+                    fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, "0");
+                    fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
                     session.transfer(fileToProcess, REL_SUCCESS);
                 }
+            } else if(resultCount == 0){
+                //If we had no inbound FlowFile, no exceptions, and the SQL generated no result sets (Insert/Update/Delete statements only)
+                // Then generate an empty Output FlowFile
+                FlowFile resultSetFF = session.create();
+
+                resultSetFF = session.write(resultSetFF, out -> JdbcCommon.createEmptyAvroStream(out));
+
+                resultSetFF = session.putAttribute(resultSetFF, RESULT_ROW_COUNT, "0");
+                resultSetFF = session.putAttribute(resultSetFF, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
+                session.transfer(resultSetFF, REL_SUCCESS);
             }
         } catch (final ProcessException | SQLException e) {
             //If we had at least one result then it's OK to drop the original file, but if we had no results then

http://git-wip-us.apache.org/repos/asf/nifi/blob/0390c0f1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
index 3a0b773..b4a7c69 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
@@ -189,6 +189,32 @@ public class TestExecuteSQL {
 
         runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
         runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0).assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "2");
+        runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0).assertAttributeEquals(ExecuteSQL.RESULTSET_INDEX, "0");
+    }
+
+    @Test
+    public void testInsertStatementCreatesFlowFile() throws SQLException {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_NULL_INT");
+        } catch (final SQLException sqle) {
+        }
+
+        stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
+
+        runner.setIncomingConnection(false);
+        runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "insert into TEST_NULL_INT (id, val1, val2) VALUES (0, NULL, 1)");
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
+        runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0).assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "0");
     }
 
     @Test