You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2018/08/29 11:22:21 UTC
nifi git commit: NIFI-5553: Fixed issue with rollback on result set
errors
Repository: nifi
Updated Branches:
refs/heads/master 53969adcc -> 8a8a8cb07
NIFI-5553: Fixed issue with rollback on result set errors
Signed-off-by: Pierre Villard <pi...@gmail.com>
This closes #2967.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/8a8a8cb0
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/8a8a8cb0
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/8a8a8cb0
Branch: refs/heads/master
Commit: 8a8a8cb072c458a0cea458ae026a2e5ffebccfaf
Parents: 53969ad
Author: Matthew Burgess <ma...@apache.org>
Authored: Fri Aug 24 13:56:32 2018 -0400
Committer: Pierre Villard <pi...@gmail.com>
Committed: Wed Aug 29 13:22:10 2018 +0200
----------------------------------------------------------------------
.../nifi/processors/standard/ExecuteSQL.java | 86 +++++++++++---------
.../processors/standard/TestExecuteSQL.java | 33 ++++++++
2 files changed, 81 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/8a8a8cb0/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
index 6cd3ae6..b8c48d9 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
@@ -313,49 +313,59 @@ public class ExecuteSQL extends AbstractProcessor {
resultSetFF = session.putAllAttributes(resultSetFF, fileToProcess.getAttributes());
}
- resultSetFF = session.write(resultSetFF, out -> {
- try {
- nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, null));
- } catch (SQLException e) {
- throw new ProcessException(e);
- }
- });
-
- long fetchTimeElapsed = fetchTime.getElapsed(TimeUnit.MILLISECONDS);
-
- // set attribute how many rows were selected
- resultSetFF = session.putAttribute(resultSetFF, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
- resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_DURATION, String.valueOf(executionTimeElapsed + fetchTimeElapsed));
- resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_EXECUTION_TIME, String.valueOf(executionTimeElapsed));
- resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_FETCH_TIME, String.valueOf(fetchTimeElapsed));
- resultSetFF = session.putAttribute(resultSetFF, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
- resultSetFF = session.putAttribute(resultSetFF, RESULTSET_INDEX, String.valueOf(resultCount));
-
- // if fragmented ResultSet, determine if we should keep this fragment; set fragment attributes
- if(maxRowsPerFlowFile > 0) {
- // if row count is zero and this is not the first fragment, drop it instead of committing it.
- if(nrOfRows.get() == 0 && fragmentIndex > 0) {
- session.remove(resultSetFF);
- break;
+ try {
+ resultSetFF = session.write(resultSetFF, out -> {
+ try {
+ nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, null));
+ } catch (SQLException e) {
+ throw new ProcessException(e);
+ }
+ });
+
+ long fetchTimeElapsed = fetchTime.getElapsed(TimeUnit.MILLISECONDS);
+
+ // set attribute how many rows were selected
+ resultSetFF = session.putAttribute(resultSetFF, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
+ resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_DURATION, String.valueOf(executionTimeElapsed + fetchTimeElapsed));
+ resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_EXECUTION_TIME, String.valueOf(executionTimeElapsed));
+ resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_FETCH_TIME, String.valueOf(fetchTimeElapsed));
+ resultSetFF = session.putAttribute(resultSetFF, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
+ resultSetFF = session.putAttribute(resultSetFF, RESULTSET_INDEX, String.valueOf(resultCount));
+
+ // if fragmented ResultSet, determine if we should keep this fragment; set fragment attributes
+ if (maxRowsPerFlowFile > 0) {
+ // if row count is zero and this is not the first fragment, drop it instead of committing it.
+ if (nrOfRows.get() == 0 && fragmentIndex > 0) {
+ session.remove(resultSetFF);
+ break;
+ }
+
+ resultSetFF = session.putAttribute(resultSetFF, FRAGMENT_ID, fragmentId);
+ resultSetFF = session.putAttribute(resultSetFF, FRAGMENT_INDEX, String.valueOf(fragmentIndex));
}
- resultSetFF = session.putAttribute(resultSetFF, FRAGMENT_ID, fragmentId);
- resultSetFF = session.putAttribute(resultSetFF, FRAGMENT_INDEX, String.valueOf(fragmentIndex));
- }
+ logger.info("{} contains {} Avro records; transferring to 'success'",
+ new Object[]{resultSetFF, nrOfRows.get()});
+ session.getProvenanceReporter().modifyContent(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed + fetchTimeElapsed);
+ resultSetFlowFiles.add(resultSetFF);
- logger.info("{} contains {} Avro records; transferring to 'success'",
- new Object[]{resultSetFF, nrOfRows.get()});
- session.getProvenanceReporter().modifyContent(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed + fetchTimeElapsed);
- resultSetFlowFiles.add(resultSetFF);
+ // If we've reached the batch size, send out the flow files
+ if (outputBatchSize > 0 && resultSetFlowFiles.size() >= outputBatchSize) {
+ session.transfer(resultSetFlowFiles, REL_SUCCESS);
+ session.commit();
+ resultSetFlowFiles.clear();
+ }
- // If we've reached the batch size, send out the flow files
- if (outputBatchSize > 0 && resultSetFlowFiles.size() >= outputBatchSize) {
- session.transfer(resultSetFlowFiles, REL_SUCCESS);
- session.commit();
- resultSetFlowFiles.clear();
+ fragmentIndex++;
+ } catch (Exception e) {
+ // Remove the result set flow file and propagate the exception
+ session.remove(resultSetFF);
+ if (e instanceof ProcessException) {
+ throw (ProcessException) e;
+ } else {
+ throw new ProcessException(e);
+ }
}
-
- fragmentIndex++;
} while (maxRowsPerFlowFile > 0 && nrOfRows.get() == maxRowsPerFlowFile);
// If we are splitting results but not outputting batches, set count on all FlowFiles
http://git-wip-us.apache.org/repos/asf/nifi/blob/8a8a8cb0/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
index 9405bd0..10d9edd 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
@@ -17,12 +17,18 @@
package org.apache.nifi.processors.standard;
import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
@@ -365,6 +371,33 @@ public class TestExecuteSQL {
runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 0);
}
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testWithSqlExceptionErrorProcessingResultSet() throws Exception {
+ DBCPService dbcp = mock(DBCPService.class);
+ Connection conn = mock(Connection.class);
+ when(dbcp.getConnection(any(Map.class))).thenReturn(conn);
+ when(dbcp.getIdentifier()).thenReturn("mockdbcp");
+ PreparedStatement statement = mock(PreparedStatement.class);
+ when(conn.prepareStatement(anyString())).thenReturn(statement);
+ when(statement.execute()).thenReturn(true);
+ ResultSet rs = mock(ResultSet.class);
+ when(statement.getResultSet()).thenReturn(rs);
+ // Throw an exception the first time you access the ResultSet, this is after the flow file to hold the results has been created.
+ when(rs.getMetaData()).thenThrow(SQLException.class);
+
+ runner.addControllerService("mockdbcp", dbcp, new HashMap<>());
+ runner.enableControllerService(dbcp);
+ runner.setProperty(ExecuteSQL.DBCP_SERVICE, "mockdbcp");
+
+ runner.setIncomingConnection(true);
+ runner.enqueue("SELECT 1");
+ runner.run();
+
+ runner.assertTransferCount(ExecuteSQL.REL_FAILURE, 1);
+ runner.assertTransferCount(ExecuteSQL.REL_SUCCESS, 0);
+ }
+
public void invokeOnTrigger(final Integer queryTimeout, final String query, final boolean incomingFlowFile, final Map<String,String> attrs, final boolean setQueryProperty)
throws InitializationException, ClassNotFoundException, SQLException, IOException {