You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2018/07/10 23:05:26 UTC

nifi git commit: NIFI-5312 QueryDatabaseTable updates state when an SQLException is thrown

Repository: nifi
Updated Branches:
  refs/heads/master e13602f75 -> 0dd4a91a6


NIFI-5312 QueryDatabaseTable updates state when an SQLException is thrown

Signed-off-by: Matthew Burgess <ma...@apache.org>

This closes #2868


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0dd4a91a
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0dd4a91a
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0dd4a91a

Branch: refs/heads/master
Commit: 0dd4a91a6741eec04965a260c8aff38b72b3828d
Parents: e13602f
Author: patricker <pa...@gmail.com>
Authored: Mon Jul 9 14:14:57 2018 -0600
Committer: Matthew Burgess <ma...@apache.org>
Committed: Tue Jul 10 19:03:14 2018 -0400

----------------------------------------------------------------------
 .../processors/standard/QueryDatabaseTable.java | 22 ++++++++--
 .../processors/standard/util/JdbcCommon.java    |  1 +
 .../standard/QueryDatabaseTableTest.java        | 46 ++++++++++++++++++++
 3 files changed, 65 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/0dd4a91a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
index 5c210c4..f0efbb0 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
@@ -333,14 +333,15 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
             }
             try (final ResultSet resultSet = st.executeQuery(selectQuery)) {
                 int fragmentIndex=0;
+                // Max values will be updated in the state property map by the callback
+                final MaxValueResultSetRowCollector maxValCollector = new MaxValueResultSetRowCollector(tableName, statePropertyMap, dbAdapter);
+
                 while(true) {
                     final AtomicLong nrOfRows = new AtomicLong(0L);
 
                     FlowFile fileToProcess = session.create();
                     try {
                         fileToProcess = session.write(fileToProcess, out -> {
-                            // Max values will be updated in the state property map by the callback
-                            final MaxValueResultSetRowCollector maxValCollector = new MaxValueResultSetRowCollector(tableName, statePropertyMap, dbAdapter);
                             try {
                                 nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, maxValCollector));
                             } catch (SQLException | RuntimeException e) {
@@ -400,6 +401,9 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
                     }
                 }
 
+                // Apply state changes from the Max Value tracker
+                maxValCollector.applyStateChanges();
+
                 // Even though the maximum value and total count are known at this point, to maintain consistent behavior if Output Batch Size is set, do not store the attributes
                 if (outputBatchSize == 0) {
                     for (int i = 0; i < resultSetFlowFiles.size(); i++) {
@@ -499,12 +503,17 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
 
     protected class MaxValueResultSetRowCollector implements JdbcCommon.ResultSetRowCallback {
         DatabaseAdapter dbAdapter;
-        Map<String, String> newColMap;
+        final Map<String, String> newColMap;
+        final Map<String, String> originalState;
         String tableName;
 
         public MaxValueResultSetRowCollector(String tableName, Map<String, String> stateMap, DatabaseAdapter dbAdapter) {
             this.dbAdapter = dbAdapter;
-            newColMap = stateMap;
+            this.originalState = stateMap;
+
+            this.newColMap = new HashMap<>();
+            this.newColMap.putAll(stateMap);
+
             this.tableName = tableName;
         }
 
@@ -543,5 +552,10 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
                 throw new IOException(e);
             }
         }
+
+        @Override
+        public void applyStateChanges() {
+            this.originalState.putAll(this.newColMap);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0dd4a91a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
index 166a81c..88c8cda 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
@@ -882,6 +882,7 @@ public class JdbcCommon {
      */
     public interface ResultSetRowCallback {
         void processRow(ResultSet resultSet) throws IOException;
+        void applyStateChanges();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0dd4a91a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
index a27c529..1624c6d 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
@@ -63,6 +63,7 @@ import java.util.Map;
 import java.util.TimeZone;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
@@ -1284,6 +1285,51 @@ public class QueryDatabaseTableTest {
         runner.run();
     }
 
+    @Test
+    public void testWithExceptionAfterSomeRowsProcessed() throws SQLException {
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_NULL_INT");
+        } catch (final SQLException sqle) {
+            // Ignore, usually due to Derby not having DROP TABLE IF EXISTS
+        }
+
+        stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
+
+        stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (1, NULL, 1)");
+        stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (2, 1, 1)");
+
+        runner.setIncomingConnection(false);
+        runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_NULL_INT");
+        runner.setProperty(AbstractDatabaseFetchProcessor.MAX_VALUE_COLUMN_NAMES, "id");
+
+        // Override adapter with one that fails after the first row is processed
+        QueryDatabaseTable.dbAdapters.put(dbAdapter.getName(), new GenericDatabaseAdapter() {
+            boolean fail = false;
+            @Override
+            public String getName() {
+                if(!fail) {
+                    fail = true;
+                    return super.getName();
+                }
+                throw new DataFileWriter.AppendWriteException(null);
+            }
+        });
+        runner.run();
+        assertTrue(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).isEmpty());
+        // State should not have been updated
+        runner.getStateManager().assertStateNotSet("test_null_int@!@id", Scope.CLUSTER);
+
+        // Restore original (working) adapter and run again
+        QueryDatabaseTable.dbAdapters.put(dbAdapter.getName(), dbAdapter);
+        runner.run();
+        assertFalse(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).isEmpty());
+        runner.getStateManager().assertStateEquals("test_null_int@!@id", "2", Scope.CLUSTER);
+    }
+
     private long getNumberOfRecordsFromStream(InputStream in) throws IOException {
         final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
         try (DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(in, datumReader)) {