You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2021/02/25 05:57:29 UTC

[nifi] 20/24: NIFI-8253: Restore call to session.commit() in GenerateTableFetch

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.13
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit f44e9a1028cad6a4371602b7b20618930916716a
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Tue Feb 23 15:03:36 2021 -0500

    NIFI-8253: Restore call to session.commit() in GenerateTableFetch
    
    Signed-off-by: Pierre Villard <pi...@gmail.com>
    
    This closes #4840.
---
 .../main/java/org/apache/nifi/util/MockSessionFactory.java   |  2 +-
 .../apache/nifi/processors/standard/GenerateTableFetch.java  |  3 +++
 .../nifi/processors/standard/TestGenerateTableFetch.java     | 12 ++++++++++++
 3 files changed, 16 insertions(+), 1 deletion(-)

diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockSessionFactory.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockSessionFactory.java
index 92df86f..0ec9bdd 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockSessionFactory.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockSessionFactory.java
@@ -47,7 +47,7 @@ public class MockSessionFactory implements ProcessSessionFactory {
         return session;
     }
 
-    Set<MockProcessSession> getCreatedSessions() {
+    public Set<MockProcessSession> getCreatedSessions() {
         return Collections.unmodifiableSet(createdSessions);
     }
 }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
index 671f326..fbc0e8b 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
@@ -554,6 +554,9 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
                 logger.error("{} failed to update State Manager, observed maximum values will not be recorded. "
                                 + "Also, any generated SQL statements may be duplicated.", this, ioe);
             }
+
+            session.commit();
+
         } catch (final ProcessException pe) {
             // Log the cause of the ProcessException if it is available
             Throwable t = (pe.getCause() == null ? pe : pe.getCause());
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
index c8141a6..1f4494f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
@@ -25,6 +25,8 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processors.standard.db.impl.DerbyDatabaseAdapter;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.MockProcessSession;
+import org.apache.nifi.util.MockSessionFactory;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.apache.nifi.util.file.FileUtils;
@@ -46,6 +48,7 @@ import java.sql.Types;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import static org.apache.nifi.processors.standard.AbstractDatabaseFetchProcessor.DB_TYPE;
 import static org.apache.nifi.processors.standard.AbstractDatabaseFetchProcessor.FRAGMENT_COUNT;
@@ -141,6 +144,15 @@ public class TestGenerateTableFetch {
         runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "ID");
 
         runner.run();
+
+        // Assert all the sessions were committed
+        MockSessionFactory runnerSessionFactory = (MockSessionFactory) runner.getProcessSessionFactory();
+        Set<MockProcessSession> sessions = runnerSessionFactory.getCreatedSessions();
+        for (MockProcessSession session : sessions) {
+            session.assertCommitted();
+        }
+
+        // Verify the expected FlowFile
         runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
         MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
         String query = new String(flowFile.toByteArray());