You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2019/03/12 19:39:59 UTC

[nifi] 07/21: NIFI-6040: Fixed ExecuteSQL processors when Output Batch Size is set

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.9.x
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 04dc1d6de2062c00c95551b6273b9b04c586de5a
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Wed Mar 6 10:48:25 2019 -0500

    NIFI-6040: Fixed ExecuteSQL processors when Output Batch Size is set
    
    This closes #3355.
    
    Signed-off-by: Bryan Bende <bb...@apache.org>
---
 .../processors/standard/AbstractExecuteSQL.java    |   5 +
 .../nifi/processors/standard/TestExecuteSQL.java   |  45 +++++++
 .../processors/standard/TestExecuteSQLRecord.java  | 146 +++++++++++++++++----
 3 files changed, 173 insertions(+), 23 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
index 76e36fd..e013a5c 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
@@ -312,6 +312,11 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor {
                                 // If we've reached the batch size, send out the flow files
                                 if (outputBatchSize > 0 && resultSetFlowFiles.size() >= outputBatchSize) {
                                     session.transfer(resultSetFlowFiles, REL_SUCCESS);
+                                    // Need to remove the original input file if it exists
+                                    if (fileToProcess != null) {
+                                        session.remove(fileToProcess);
+                                        fileToProcess = null;
+                                    }
                                     session.commit();
                                     resultSetFlowFiles.clear();
                                 }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
index 35dfe76..5458434 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
@@ -283,6 +283,51 @@ public class TestExecuteSQL {
     }
 
     @Test
+    public void testWithOutputBatchingAndIncomingFlowFile() throws SQLException {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_NULL_INT");
+        } catch (final SQLException sqle) {
+        }
+
+        stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
+
+        for (int i = 0; i < 1000; i++) {
+            stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (" + i + ", 1, 1)");
+        }
+
+        runner.setIncomingConnection(true);
+        runner.setProperty(ExecuteSQL.MAX_ROWS_PER_FLOW_FILE, "5");
+        runner.setProperty(ExecuteSQL.OUTPUT_BATCH_SIZE, "1");
+        runner.enqueue("SELECT * FROM TEST_NULL_INT");
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 200);
+        runner.assertAllFlowFilesContainAttribute(ExecuteSQL.REL_SUCCESS, FragmentAttributes.FRAGMENT_INDEX.key());
+        runner.assertAllFlowFilesContainAttribute(ExecuteSQL.REL_SUCCESS, FragmentAttributes.FRAGMENT_ID.key());
+
+        MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0);
+
+        firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "5");
+        firstFlowFile.assertAttributeNotExists(FragmentAttributes.FRAGMENT_COUNT.key());
+        firstFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), "0");
+        firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULTSET_INDEX, "0");
+
+        MockFlowFile lastFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(199);
+
+        lastFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "5");
+        lastFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), "199");
+        lastFlowFile.assertAttributeEquals(ExecuteSQL.RESULTSET_INDEX, "0");
+    }
+
+    @Test
     public void testMaxRowsPerFlowFile() throws SQLException {
         // remove previous test database, if any
         final File dbLocation = new File(DB_LOCATION);
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
index b0f5cda..3d172ca 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
@@ -157,6 +157,106 @@ public class TestExecuteSQLRecord {
     }
 
     @Test
+    public void testWithOutputBatching() throws InitializationException, SQLException {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_NULL_INT");
+        } catch (final SQLException sqle) {
+        }
+
+        stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
+
+        for (int i = 0; i < 1000; i++) {
+            stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (" + i + ", 1, 1)");
+        }
+
+        MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
+        runner.addControllerService("writer", recordWriter);
+        runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.enableControllerService(recordWriter);
+
+        runner.setIncomingConnection(false);
+        runner.setProperty(ExecuteSQLRecord.MAX_ROWS_PER_FLOW_FILE, "5");
+        runner.setProperty(ExecuteSQLRecord.OUTPUT_BATCH_SIZE, "5");
+        runner.setProperty(ExecuteSQLRecord.SQL_SELECT_QUERY, "SELECT * FROM TEST_NULL_INT");
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ExecuteSQLRecord.REL_SUCCESS, 200);
+        runner.assertAllFlowFilesContainAttribute(ExecuteSQLRecord.REL_SUCCESS, FragmentAttributes.FRAGMENT_INDEX.key());
+        runner.assertAllFlowFilesContainAttribute(ExecuteSQLRecord.REL_SUCCESS, FragmentAttributes.FRAGMENT_ID.key());
+
+        MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQLRecord.REL_SUCCESS).get(0);
+
+        firstFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULT_ROW_COUNT, "5");
+        firstFlowFile.assertAttributeNotExists(FragmentAttributes.FRAGMENT_COUNT.key());
+        firstFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), "0");
+        firstFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULTSET_INDEX, "0");
+
+        MockFlowFile lastFlowFile = runner.getFlowFilesForRelationship(ExecuteSQLRecord.REL_SUCCESS).get(199);
+
+        lastFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULT_ROW_COUNT, "5");
+        lastFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), "199");
+        lastFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULTSET_INDEX, "0");
+    }
+
+    @Test
+    public void testWithOutputBatchingAndIncomingFlowFile() throws InitializationException, SQLException {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_NULL_INT");
+        } catch (final SQLException sqle) {
+        }
+
+        stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
+
+        for (int i = 0; i < 1000; i++) {
+            stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (" + i + ", 1, 1)");
+        }
+
+        MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
+        runner.addControllerService("writer", recordWriter);
+        runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.enableControllerService(recordWriter);
+
+        runner.setIncomingConnection(true);
+        runner.setProperty(ExecuteSQLRecord.MAX_ROWS_PER_FLOW_FILE, "5");
+        runner.setProperty(ExecuteSQLRecord.OUTPUT_BATCH_SIZE, "1");
+        runner.enqueue("SELECT * FROM TEST_NULL_INT");
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ExecuteSQLRecord.REL_SUCCESS, 200);
+        runner.assertAllFlowFilesContainAttribute(ExecuteSQLRecord.REL_SUCCESS, FragmentAttributes.FRAGMENT_INDEX.key());
+        runner.assertAllFlowFilesContainAttribute(ExecuteSQLRecord.REL_SUCCESS, FragmentAttributes.FRAGMENT_ID.key());
+
+        MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQLRecord.REL_SUCCESS).get(0);
+
+        firstFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULT_ROW_COUNT, "5");
+        firstFlowFile.assertAttributeNotExists(FragmentAttributes.FRAGMENT_COUNT.key());
+        firstFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), "0");
+        firstFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULTSET_INDEX, "0");
+
+        MockFlowFile lastFlowFile = runner.getFlowFilesForRelationship(ExecuteSQLRecord.REL_SUCCESS).get(199);
+
+        lastFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULT_ROW_COUNT, "5");
+        lastFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), "199");
+        lastFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULTSET_INDEX, "0");
+    }
+
+    @Test
     public void testMaxRowsPerFlowFile() throws Exception {
         // remove previous test database, if any
         final File dbLocation = new File(DB_LOCATION);
@@ -257,7 +357,7 @@ public class TestExecuteSQLRecord {
         stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
 
         runner.setIncomingConnection(true);
-        runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from TEST_NULL_INT");
+        runner.setProperty(ExecuteSQLRecord.SQL_SELECT_QUERY, "select * from TEST_NULL_INT");
         MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
         runner.addControllerService("writer", recordWriter);
         runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
@@ -265,9 +365,9 @@ public class TestExecuteSQLRecord {
         runner.enqueue("Hello".getBytes());
         runner.run();
 
-        runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
-        MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0);
-        firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "0");
+        runner.assertAllFlowFilesTransferred(ExecuteSQLRecord.REL_SUCCESS, 1);
+        MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQLRecord.REL_SUCCESS).get(0);
+        firstFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULT_ROW_COUNT, "0");
         firstFlowFile.assertContentEquals("");
     }
 
@@ -412,8 +512,8 @@ public class TestExecuteSQLRecord {
         stmt.execute("insert into TEST_NULL_INT values(1,2,3)");
 
         runner.setIncomingConnection(true);
-        runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
-        runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from TEST_NULL_INT");
+        runner.setProperty(ExecuteSQLRecord.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
+        runner.setProperty(ExecuteSQLRecord.SQL_SELECT_QUERY, "select * from TEST_NULL_INT");
         MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
         runner.addControllerService("writer", recordWriter);
         runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
@@ -421,9 +521,9 @@ public class TestExecuteSQLRecord {
         runner.enqueue("test".getBytes());
         runner.run();
 
-        runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
-        MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0);
-        firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "1");
+        runner.assertAllFlowFilesTransferred(ExecuteSQLRecord.REL_SUCCESS, 1);
+        MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQLRecord.REL_SUCCESS).get(0);
+        firstFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULT_ROW_COUNT, "1");
     }
 
     @Test
@@ -445,9 +545,9 @@ public class TestExecuteSQLRecord {
         stmt.execute("insert into TEST_NULL_INT values(1,2,3)");
 
         runner.setIncomingConnection(true);
-        runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
-        runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from TEST_NULL_INT");
-        runner.setProperty(ExecuteSQL.SQL_POST_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(0);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(0)");
+        runner.setProperty(ExecuteSQLRecord.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
+        runner.setProperty(ExecuteSQLRecord.SQL_SELECT_QUERY, "select * from TEST_NULL_INT");
+        runner.setProperty(ExecuteSQLRecord.SQL_POST_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(0);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(0)");
         MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
         runner.addControllerService("writer", recordWriter);
         runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
@@ -455,9 +555,9 @@ public class TestExecuteSQLRecord {
         runner.enqueue("test".getBytes());
         runner.run();
 
-        runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
-        MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0);
-        firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "1");
+        runner.assertAllFlowFilesTransferred(ExecuteSQLRecord.REL_SUCCESS, 1);
+        MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQLRecord.REL_SUCCESS).get(0);
+        firstFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULT_ROW_COUNT, "1");
     }
 
     @Test
@@ -479,8 +579,8 @@ public class TestExecuteSQLRecord {
 
         runner.setIncomingConnection(true);
         // Simulate failure by not provide parameter
-        runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS()");
-        runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from TEST_NULL_INT");
+        runner.setProperty(ExecuteSQLRecord.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS()");
+        runner.setProperty(ExecuteSQLRecord.SQL_SELECT_QUERY, "select * from TEST_NULL_INT");
         MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
         runner.addControllerService("writer", recordWriter);
         runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
@@ -488,7 +588,7 @@ public class TestExecuteSQLRecord {
         runner.enqueue("test".getBytes());
         runner.run();
 
-        runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_FAILURE, 1);
+        runner.assertAllFlowFilesTransferred(ExecuteSQLRecord.REL_FAILURE, 1);
     }
 
     @Test
@@ -509,10 +609,10 @@ public class TestExecuteSQLRecord {
         stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
 
         runner.setIncomingConnection(true);
-        runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
-        runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from TEST_NULL_INT");
+        runner.setProperty(ExecuteSQLRecord.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
+        runner.setProperty(ExecuteSQLRecord.SQL_SELECT_QUERY, "select * from TEST_NULL_INT");
         // Simulate failure by not provide parameter
-        runner.setProperty(ExecuteSQL.SQL_POST_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS()");
+        runner.setProperty(ExecuteSQLRecord.SQL_POST_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS()");
         MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
         runner.addControllerService("writer", recordWriter);
         runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
@@ -520,8 +620,8 @@ public class TestExecuteSQLRecord {
         runner.enqueue("test".getBytes());
         runner.run();
 
-        runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_FAILURE, 1);
-        MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_FAILURE).get(0);
+        runner.assertAllFlowFilesTransferred(ExecuteSQLRecord.REL_FAILURE, 1);
+        MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQLRecord.REL_FAILURE).get(0);
         firstFlowFile.assertContentEquals("test");
     }