You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2018/11/15 15:46:03 UTC
nifi git commit: NIFI-5788: Introduce batch size limit in
PutDatabaseRecord processor
Repository: nifi
Updated Branches:
refs/heads/master 13011ac6d -> d319a3ef2
NIFI-5788: Introduce batch size limit in PutDatabaseRecord processor
NIFI-5788: Introduce batch size limit in PutDatabaseRecord processor
Renamed 'batch size' to 'Maximum Batch Size'.
Changed default value of max_batch_size to zero (INFINITE)
Fixed parameter validation.
Added unit tests
Signed-off-by: Matthew Burgess <ma...@apache.org>
This closes #3128
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d319a3ef
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d319a3ef
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d319a3ef
Branch: refs/heads/master
Commit: d319a3ef2f14317f29a1be5a189bc34f8b3fdbd6
Parents: 13011ac
Author: vadimar <va...@ebay.com>
Authored: Mon Nov 5 13:15:12 2018 +0200
Committer: Matthew Burgess <ma...@apache.org>
Committed: Thu Nov 15 10:31:34 2018 -0500
----------------------------------------------------------------------
.../processors/standard/PutDatabaseRecord.java | 29 +++++-
.../standard/TestPutDatabaseRecord.groovy | 103 +++++++++++++++++++
2 files changed, 130 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/d319a3ef/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index 2f2d901..d79cf3c 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -275,6 +275,17 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor {
.required(true)
.build();
+ static final PropertyDescriptor MAX_BATCH_SIZE = new PropertyDescriptor.Builder()
+ .name("put-db-record-max-batch-size")
+ .displayName("Maximum Batch Size")
+ .description("Specifies maximum batch size for INSERT and UPDATE statements. This parameter has no effect for other statements specified in 'Statement Type'."
+ + " Zero means the batch size is not limited.")
+ .defaultValue("0")
+ .required(false)
+ .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
protected static List<PropertyDescriptor> propDescriptors;
private Cache<SchemaKey, TableSchema> schemaCache;
@@ -303,6 +314,7 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor {
pds.add(QUERY_TIMEOUT);
pds.add(RollbackOnFailure.ROLLBACK_ON_FAILURE);
pds.add(TABLE_SCHEMA_CACHE_SIZE);
+ pds.add(MAX_BATCH_SIZE);
propDescriptors = Collections.unmodifiableList(pds);
}
@@ -641,6 +653,10 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor {
Record currentRecord;
List<Integer> fieldIndexes = sqlHolder.getFieldIndexes();
+ final Integer maxBatchSize = context.getProperty(MAX_BATCH_SIZE).evaluateAttributeExpressions(flowFile).asInteger();
+ int currentBatchSize = 0;
+ int batchIndex = 0;
+
while ((currentRecord = recordParser.nextRecord()) != null) {
Object[] values = currentRecord.getValues();
if (values != null) {
@@ -667,11 +683,20 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor {
}
}
ps.addBatch();
+ if (++currentBatchSize == maxBatchSize) {
+ batchIndex++;
+ log.debug("Executing query {}; fieldIndexes: {}; batch index: {}; batch size: {}", new Object[]{sqlHolder.getSql(), sqlHolder.getFieldIndexes(), batchIndex, currentBatchSize});
+ ps.executeBatch();
+ currentBatchSize = 0;
+ }
}
}
- log.debug("Executing query {}", new Object[]{sqlHolder});
- ps.executeBatch();
+ if (currentBatchSize > 0) {
+ batchIndex++;
+ log.debug("Executing query {}; fieldIndexes: {}; batch index: {}; batch size: {}", new Object[]{sqlHolder.getSql(), sqlHolder.getFieldIndexes(), batchIndex, currentBatchSize});
+ ps.executeBatch();
+ }
result.routeTo(flowFile, REL_SUCCESS);
session.getProvenanceReporter().send(flowFile, functionContext.jdbcUrl);
http://git-wip-us.apache.org/repos/asf/nifi/blob/d319a3ef/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
index ebf8460..0ddac80 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
@@ -16,6 +16,8 @@
*/
package org.apache.nifi.processors.standard
+
+import org.apache.commons.dbcp2.DelegatingConnection
import org.apache.nifi.processor.exception.ProcessException
import org.apache.nifi.processor.util.pattern.RollbackOnFailure
import org.apache.nifi.reporting.InitializationException
@@ -36,17 +38,25 @@ import org.junit.runners.JUnit4
import java.sql.Connection
import java.sql.DriverManager
+import java.sql.PreparedStatement
import java.sql.ResultSet
import java.sql.SQLDataException
import java.sql.SQLException
import java.sql.SQLNonTransientConnectionException
import java.sql.Statement
+import java.util.function.Supplier
import static org.junit.Assert.assertEquals
import static org.junit.Assert.assertFalse
+import static org.junit.Assert.assertNotNull
import static org.junit.Assert.assertTrue
import static org.junit.Assert.fail
+import static org.mockito.Matchers.anyMap
+import static org.mockito.Mockito.doAnswer
+import static org.mockito.Mockito.only
import static org.mockito.Mockito.spy
+import static org.mockito.Mockito.times
+import static org.mockito.Mockito.verify
/**
* Unit tests for the PutDatabaseRecord processor
@@ -718,6 +728,99 @@ class TestPutDatabaseRecord {
conn.close()
}
+ @Test
+ void testInsertWithMaxBatchSize() throws InitializationException, ProcessException, SQLException, IOException {
+ recreateTable("PERSONS", createPersons)
+ final MockRecordParser parser = new MockRecordParser()
+ runner.addControllerService("parser", parser)
+ runner.enableControllerService(parser)
+
+ parser.addSchemaField("id", RecordFieldType.INT)
+ parser.addSchemaField("name", RecordFieldType.STRING)
+ parser.addSchemaField("code", RecordFieldType.INT)
+
+ (1..11).each {
+ parser.addRecord(it, "rec$it".toString(), 100 + it)
+ }
+
+ runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
+ runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE)
+ runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')
+ runner.setProperty(PutDatabaseRecord.MAX_BATCH_SIZE, "5")
+
+ Supplier<PreparedStatement> spyStmt = createPreparedStatementSpy()
+
+ runner.enqueue(new byte[0])
+ runner.run()
+
+ runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
+
+ assertEquals(11, getTableSize())
+
+ assertNotNull(spyStmt.get())
+ verify(spyStmt.get(), times(3)).executeBatch()
+ }
+
+ @Test
+ void testInsertWithDefaultMaxBatchSize() throws InitializationException, ProcessException, SQLException, IOException {
+ recreateTable("PERSONS", createPersons)
+ final MockRecordParser parser = new MockRecordParser()
+ runner.addControllerService("parser", parser)
+ runner.enableControllerService(parser)
+
+ parser.addSchemaField("id", RecordFieldType.INT)
+ parser.addSchemaField("name", RecordFieldType.STRING)
+ parser.addSchemaField("code", RecordFieldType.INT)
+
+ (1..11).each {
+ parser.addRecord(it, "rec$it".toString(), 100 + it)
+ }
+
+ runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
+ runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE)
+ runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')
+
+ Supplier<PreparedStatement> spyStmt = createPreparedStatementSpy()
+
+ runner.enqueue(new byte[0])
+ runner.run()
+
+ runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
+
+ assertEquals(11, getTableSize())
+
+ assertNotNull(spyStmt.get())
+ verify(spyStmt.get(), times(1)).executeBatch()
+ }
+
+ private Supplier<PreparedStatement> createPreparedStatementSpy() {
+ PreparedStatement spyStmt
+ doAnswer({ inv ->
+ new DelegatingConnection((Connection)inv.callRealMethod()) {
+ @Override
+ PreparedStatement prepareStatement(String sql) throws SQLException {
+ spyStmt = spy(getDelegate().prepareStatement(sql))
+ }
+ }
+ }).when(dbcp).getConnection(anyMap())
+ return { spyStmt }
+ }
+
+ private int getTableSize() {
+ final Connection connection = dbcp.getConnection()
+ try {
+ final Statement stmt = connection.createStatement()
+ try {
+ final ResultSet rs = stmt.executeQuery('SELECT count(*) FROM PERSONS')
+ assertTrue(rs.next())
+ rs.getInt(1)
+ } finally {
+ stmt.close()
+ }
+ } finally {
+ connection.close()
+ }
+ }
private void recreateTable(String tableName, String createSQL) throws ProcessException, SQLException {
final Connection conn = dbcp.getConnection()