You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2021/02/25 05:57:29 UTC
[nifi] 20/24: NIFI-8253: Restore call to session.commit() in
GenerateTableFetch
This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch support/nifi-1.13
in repository https://gitbox.apache.org/repos/asf/nifi.git
commit f44e9a1028cad6a4371602b7b20618930916716a
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Tue Feb 23 15:03:36 2021 -0500
NIFI-8253: Restore call to session.commit() in GenerateTableFetch
Signed-off-by: Pierre Villard <pi...@gmail.com>
This closes #4840.
---
.../main/java/org/apache/nifi/util/MockSessionFactory.java | 2 +-
.../apache/nifi/processors/standard/GenerateTableFetch.java | 3 +++
.../nifi/processors/standard/TestGenerateTableFetch.java | 12 ++++++++++++
3 files changed, 16 insertions(+), 1 deletion(-)
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockSessionFactory.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockSessionFactory.java
index 92df86f..0ec9bdd 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockSessionFactory.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockSessionFactory.java
@@ -47,7 +47,7 @@ public class MockSessionFactory implements ProcessSessionFactory {
return session;
}
- Set<MockProcessSession> getCreatedSessions() {
+ public Set<MockProcessSession> getCreatedSessions() {
return Collections.unmodifiableSet(createdSessions);
}
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
index 671f326..fbc0e8b 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
@@ -554,6 +554,9 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
logger.error("{} failed to update State Manager, observed maximum values will not be recorded. "
+ "Also, any generated SQL statements may be duplicated.", this, ioe);
}
+
+ session.commit();
+
} catch (final ProcessException pe) {
// Log the cause of the ProcessException if it is available
Throwable t = (pe.getCause() == null ? pe : pe.getCause());
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
index c8141a6..1f4494f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
@@ -25,6 +25,8 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.standard.db.impl.DerbyDatabaseAdapter;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.MockProcessSession;
+import org.apache.nifi.util.MockSessionFactory;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.util.file.FileUtils;
@@ -46,6 +48,7 @@ import java.sql.Types;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import static org.apache.nifi.processors.standard.AbstractDatabaseFetchProcessor.DB_TYPE;
import static org.apache.nifi.processors.standard.AbstractDatabaseFetchProcessor.FRAGMENT_COUNT;
@@ -141,6 +144,15 @@ public class TestGenerateTableFetch {
runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "ID");
runner.run();
+
+ // Assert all the sessions were committed
+ MockSessionFactory runnerSessionFactory = (MockSessionFactory) runner.getProcessSessionFactory();
+ Set<MockProcessSession> sessions = runnerSessionFactory.getCreatedSessions();
+ for (MockProcessSession session : sessions) {
+ session.assertCommitted();
+ }
+
+ // Verify the expected FlowFile
runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
String query = new String(flowFile.toByteArray());