You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2018/08/29 11:22:21 UTC

nifi git commit: NIFI-5553: Fixed issue with rollback on result set errors

Repository: nifi
Updated Branches:
  refs/heads/master 53969adcc -> 8a8a8cb07


NIFI-5553: Fixed issue with rollback on result set errors

Signed-off-by: Pierre Villard <pi...@gmail.com>

This closes #2967.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/8a8a8cb0
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/8a8a8cb0
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/8a8a8cb0

Branch: refs/heads/master
Commit: 8a8a8cb072c458a0cea458ae026a2e5ffebccfaf
Parents: 53969ad
Author: Matthew Burgess <ma...@apache.org>
Authored: Fri Aug 24 13:56:32 2018 -0400
Committer: Pierre Villard <pi...@gmail.com>
Committed: Wed Aug 29 13:22:10 2018 +0200

----------------------------------------------------------------------
 .../nifi/processors/standard/ExecuteSQL.java    | 86 +++++++++++---------
 .../processors/standard/TestExecuteSQL.java     | 33 ++++++++
 2 files changed, 81 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/8a8a8cb0/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
index 6cd3ae6..b8c48d9 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
@@ -313,49 +313,59 @@ public class ExecuteSQL extends AbstractProcessor {
                                 resultSetFF = session.putAllAttributes(resultSetFF, fileToProcess.getAttributes());
                             }
 
-                            resultSetFF = session.write(resultSetFF, out -> {
-                                try {
-                                    nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, null));
-                                } catch (SQLException e) {
-                                    throw new ProcessException(e);
-                                }
-                            });
-
-                            long fetchTimeElapsed = fetchTime.getElapsed(TimeUnit.MILLISECONDS);
-
-                            // set attribute how many rows were selected
-                            resultSetFF = session.putAttribute(resultSetFF, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
-                            resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_DURATION, String.valueOf(executionTimeElapsed + fetchTimeElapsed));
-                            resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_EXECUTION_TIME, String.valueOf(executionTimeElapsed));
-                            resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_FETCH_TIME, String.valueOf(fetchTimeElapsed));
-                            resultSetFF = session.putAttribute(resultSetFF, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
-                            resultSetFF = session.putAttribute(resultSetFF, RESULTSET_INDEX, String.valueOf(resultCount));
-
-                            // if fragmented ResultSet, determine if we should keep this fragment; set fragment attributes
-                            if(maxRowsPerFlowFile > 0) {
-                                // if row count is zero and this is not the first fragment, drop it instead of committing it.
-                                if(nrOfRows.get() == 0 && fragmentIndex > 0) {
-                                    session.remove(resultSetFF);
-                                    break;
+                            try {
+                                resultSetFF = session.write(resultSetFF, out -> {
+                                    try {
+                                        nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, null));
+                                    } catch (SQLException e) {
+                                        throw new ProcessException(e);
+                                    }
+                                });
+
+                                long fetchTimeElapsed = fetchTime.getElapsed(TimeUnit.MILLISECONDS);
+
+                                // set attribute how many rows were selected
+                                resultSetFF = session.putAttribute(resultSetFF, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
+                                resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_DURATION, String.valueOf(executionTimeElapsed + fetchTimeElapsed));
+                                resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_EXECUTION_TIME, String.valueOf(executionTimeElapsed));
+                                resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_FETCH_TIME, String.valueOf(fetchTimeElapsed));
+                                resultSetFF = session.putAttribute(resultSetFF, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
+                                resultSetFF = session.putAttribute(resultSetFF, RESULTSET_INDEX, String.valueOf(resultCount));
+
+                                // if fragmented ResultSet, determine if we should keep this fragment; set fragment attributes
+                                if (maxRowsPerFlowFile > 0) {
+                                    // if row count is zero and this is not the first fragment, drop it instead of committing it.
+                                    if (nrOfRows.get() == 0 && fragmentIndex > 0) {
+                                        session.remove(resultSetFF);
+                                        break;
+                                    }
+
+                                    resultSetFF = session.putAttribute(resultSetFF, FRAGMENT_ID, fragmentId);
+                                    resultSetFF = session.putAttribute(resultSetFF, FRAGMENT_INDEX, String.valueOf(fragmentIndex));
                                 }
 
-                                resultSetFF = session.putAttribute(resultSetFF, FRAGMENT_ID, fragmentId);
-                                resultSetFF = session.putAttribute(resultSetFF, FRAGMENT_INDEX, String.valueOf(fragmentIndex));
-                            }
+                                logger.info("{} contains {} Avro records; transferring to 'success'",
+                                        new Object[]{resultSetFF, nrOfRows.get()});
+                                session.getProvenanceReporter().modifyContent(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed + fetchTimeElapsed);
+                                resultSetFlowFiles.add(resultSetFF);
 
-                            logger.info("{} contains {} Avro records; transferring to 'success'",
-                                    new Object[]{resultSetFF, nrOfRows.get()});
-                            session.getProvenanceReporter().modifyContent(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed + fetchTimeElapsed);
-                            resultSetFlowFiles.add(resultSetFF);
+                                // If we've reached the batch size, send out the flow files
+                                if (outputBatchSize > 0 && resultSetFlowFiles.size() >= outputBatchSize) {
+                                    session.transfer(resultSetFlowFiles, REL_SUCCESS);
+                                    session.commit();
+                                    resultSetFlowFiles.clear();
+                                }
 
-                            // If we've reached the batch size, send out the flow files
-                            if (outputBatchSize > 0 && resultSetFlowFiles.size() >= outputBatchSize) {
-                                session.transfer(resultSetFlowFiles, REL_SUCCESS);
-                                session.commit();
-                                resultSetFlowFiles.clear();
+                                fragmentIndex++;
+                            } catch (Exception e) {
+                                // Remove the result set flow file and propagate the exception
+                                session.remove(resultSetFF);
+                                if (e instanceof ProcessException) {
+                                    throw (ProcessException) e;
+                                } else {
+                                    throw new ProcessException(e);
+                                }
                             }
-
-                            fragmentIndex++;
                         } while (maxRowsPerFlowFile > 0 && nrOfRows.get() == maxRowsPerFlowFile);
 
                         // If we are splitting results but not outputting batches, set count on all FlowFiles

http://git-wip-us.apache.org/repos/asf/nifi/blob/8a8a8cb0/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
index 9405bd0..10d9edd 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
@@ -17,12 +17,18 @@
 package org.apache.nifi.processors.standard;
 
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.sql.Connection;
 import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.HashMap;
@@ -365,6 +371,33 @@ public class TestExecuteSQL {
         runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 0);
     }
 
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testWithSqlExceptionErrorProcessingResultSet() throws Exception {
+        DBCPService dbcp = mock(DBCPService.class);
+        Connection conn = mock(Connection.class);
+        when(dbcp.getConnection(any(Map.class))).thenReturn(conn);
+        when(dbcp.getIdentifier()).thenReturn("mockdbcp");
+        PreparedStatement statement = mock(PreparedStatement.class);
+        when(conn.prepareStatement(anyString())).thenReturn(statement);
+        when(statement.execute()).thenReturn(true);
+        ResultSet rs = mock(ResultSet.class);
+        when(statement.getResultSet()).thenReturn(rs);
+        // Throw an exception the first time you access the ResultSet, this is after the flow file to hold the results has been created.
+        when(rs.getMetaData()).thenThrow(SQLException.class);
+
+        runner.addControllerService("mockdbcp", dbcp, new HashMap<>());
+        runner.enableControllerService(dbcp);
+        runner.setProperty(ExecuteSQL.DBCP_SERVICE, "mockdbcp");
+
+        runner.setIncomingConnection(true);
+        runner.enqueue("SELECT 1");
+        runner.run();
+
+        runner.assertTransferCount(ExecuteSQL.REL_FAILURE, 1);
+        runner.assertTransferCount(ExecuteSQL.REL_SUCCESS, 0);
+    }
+
     public void invokeOnTrigger(final Integer queryTimeout, final String query, final boolean incomingFlowFile, final Map<String,String> attrs, final boolean setQueryProperty)
         throws InitializationException, ClassNotFoundException, SQLException, IOException {