You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2018/07/10 23:05:26 UTC
nifi git commit: NIFI-5312 QueryDatabaseTable updates state when an
SQLException is thrown
Repository: nifi
Updated Branches:
refs/heads/master e13602f75 -> 0dd4a91a6
NIFI-5312 QueryDatabaseTable updates state when an SQLException is thrown
Signed-off-by: Matthew Burgess <ma...@apache.org>
This closes #2868
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0dd4a91a
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0dd4a91a
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0dd4a91a
Branch: refs/heads/master
Commit: 0dd4a91a6741eec04965a260c8aff38b72b3828d
Parents: e13602f
Author: patricker <pa...@gmail.com>
Authored: Mon Jul 9 14:14:57 2018 -0600
Committer: Matthew Burgess <ma...@apache.org>
Committed: Tue Jul 10 19:03:14 2018 -0400
----------------------------------------------------------------------
.../processors/standard/QueryDatabaseTable.java | 22 ++++++++--
.../processors/standard/util/JdbcCommon.java | 1 +
.../standard/QueryDatabaseTableTest.java | 46 ++++++++++++++++++++
3 files changed, 65 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/0dd4a91a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
index 5c210c4..f0efbb0 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
@@ -333,14 +333,15 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
}
try (final ResultSet resultSet = st.executeQuery(selectQuery)) {
int fragmentIndex=0;
+ // Max values will be updated in the state property map by the callback
+ final MaxValueResultSetRowCollector maxValCollector = new MaxValueResultSetRowCollector(tableName, statePropertyMap, dbAdapter);
+
while(true) {
final AtomicLong nrOfRows = new AtomicLong(0L);
FlowFile fileToProcess = session.create();
try {
fileToProcess = session.write(fileToProcess, out -> {
- // Max values will be updated in the state property map by the callback
- final MaxValueResultSetRowCollector maxValCollector = new MaxValueResultSetRowCollector(tableName, statePropertyMap, dbAdapter);
try {
nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, maxValCollector));
} catch (SQLException | RuntimeException e) {
@@ -400,6 +401,9 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
}
}
+ // Apply state changes from the Max Value tracker
+ maxValCollector.applyStateChanges();
+
// Even though the maximum value and total count are known at this point, to maintain consistent behavior if Output Batch Size is set, do not store the attributes
if (outputBatchSize == 0) {
for (int i = 0; i < resultSetFlowFiles.size(); i++) {
@@ -499,12 +503,17 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
protected class MaxValueResultSetRowCollector implements JdbcCommon.ResultSetRowCallback {
DatabaseAdapter dbAdapter;
- Map<String, String> newColMap;
+ final Map<String, String> newColMap;
+ final Map<String, String> originalState;
String tableName;
public MaxValueResultSetRowCollector(String tableName, Map<String, String> stateMap, DatabaseAdapter dbAdapter) {
this.dbAdapter = dbAdapter;
- newColMap = stateMap;
+ this.originalState = stateMap;
+
+ this.newColMap = new HashMap<>();
+ this.newColMap.putAll(stateMap);
+
this.tableName = tableName;
}
@@ -543,5 +552,10 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
throw new IOException(e);
}
}
+
+ @Override
+ public void applyStateChanges() {
+ this.originalState.putAll(this.newColMap);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0dd4a91a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
index 166a81c..88c8cda 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
@@ -882,6 +882,7 @@ public class JdbcCommon {
*/
public interface ResultSetRowCallback {
void processRow(ResultSet resultSet) throws IOException;
+ void applyStateChanges();
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0dd4a91a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
index a27c529..1624c6d 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
@@ -63,6 +63,7 @@ import java.util.Map;
import java.util.TimeZone;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -1284,6 +1285,51 @@ public class QueryDatabaseTableTest {
runner.run();
}
+ @Test
+ public void testWithExceptionAfterSomeRowsProcessed() throws SQLException {
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+
+ try {
+ stmt.execute("drop table TEST_NULL_INT");
+ } catch (final SQLException sqle) {
+ // Ignore, usually due to Derby not having DROP TABLE IF EXISTS
+ }
+
+ stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
+
+ stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (1, NULL, 1)");
+ stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (2, 1, 1)");
+
+ runner.setIncomingConnection(false);
+ runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_NULL_INT");
+ runner.setProperty(AbstractDatabaseFetchProcessor.MAX_VALUE_COLUMN_NAMES, "id");
+
+ // Override adapter with one that fails after the first row is processed
+ QueryDatabaseTable.dbAdapters.put(dbAdapter.getName(), new GenericDatabaseAdapter() {
+ boolean fail = false;
+ @Override
+ public String getName() {
+ if(!fail) {
+ fail = true;
+ return super.getName();
+ }
+ throw new DataFileWriter.AppendWriteException(null);
+ }
+ });
+ runner.run();
+ assertTrue(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).isEmpty());
+ // State should not have been updated
+ runner.getStateManager().assertStateNotSet("test_null_int@!@id", Scope.CLUSTER);
+
+ // Restore original (working) adapter and run again
+ QueryDatabaseTable.dbAdapters.put(dbAdapter.getName(), dbAdapter);
+ runner.run();
+ assertFalse(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).isEmpty());
+ runner.getStateManager().assertStateEquals("test_null_int@!@id", "2", Scope.CLUSTER);
+ }
+
private long getNumberOfRecordsFromStream(InputStream in) throws IOException {
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
try (DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(in, datumReader)) {