You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2018/11/15 15:46:03 UTC

nifi git commit: NIFI-5788: Introduce batch size limit in PutDatabaseRecord processor

Repository: nifi
Updated Branches:
  refs/heads/master 13011ac6d -> d319a3ef2


NIFI-5788: Introduce batch size limit in PutDatabaseRecord processor

NIFI-5788: Introduce batch size limit in PutDatabaseRecord processor

        Renamed 'batch size' to 'Maximum Batch Size'.
        Changed default value of max_batch_size to zero (INFINITE)
        Fixed parameter validation.
        Added unit tests

Signed-off-by: Matthew Burgess <ma...@apache.org>

This closes #3128


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d319a3ef
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d319a3ef
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d319a3ef

Branch: refs/heads/master
Commit: d319a3ef2f14317f29a1be5a189bc34f8b3fdbd6
Parents: 13011ac
Author: vadimar <va...@ebay.com>
Authored: Mon Nov 5 13:15:12 2018 +0200
Committer: Matthew Burgess <ma...@apache.org>
Committed: Thu Nov 15 10:31:34 2018 -0500

----------------------------------------------------------------------
 .../processors/standard/PutDatabaseRecord.java  |  29 +++++-
 .../standard/TestPutDatabaseRecord.groovy       | 103 +++++++++++++++++++
 2 files changed, 130 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/d319a3ef/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index 2f2d901..d79cf3c 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -275,6 +275,17 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor {
             .required(true)
             .build();
 
+    static final PropertyDescriptor MAX_BATCH_SIZE = new PropertyDescriptor.Builder()
+            .name("put-db-record-max-batch-size")
+            .displayName("Maximum Batch Size")
+            .description("Specifies maximum batch size for INSERT and UPDATE statements. This parameter has no effect for other statements specified in 'Statement Type'."
+                            + " Zero means the batch size is not limited.")
+            .defaultValue("0")
+            .required(false)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
     protected static List<PropertyDescriptor> propDescriptors;
 
     private Cache<SchemaKey, TableSchema> schemaCache;
@@ -303,6 +314,7 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor {
         pds.add(QUERY_TIMEOUT);
         pds.add(RollbackOnFailure.ROLLBACK_ON_FAILURE);
         pds.add(TABLE_SCHEMA_CACHE_SIZE);
+        pds.add(MAX_BATCH_SIZE);
 
         propDescriptors = Collections.unmodifiableList(pds);
     }
@@ -641,6 +653,10 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor {
             Record currentRecord;
             List<Integer> fieldIndexes = sqlHolder.getFieldIndexes();
 
+            final Integer maxBatchSize = context.getProperty(MAX_BATCH_SIZE).evaluateAttributeExpressions(flowFile).asInteger();
+            int currentBatchSize = 0;
+            int batchIndex = 0;
+
             while ((currentRecord = recordParser.nextRecord()) != null) {
                 Object[] values = currentRecord.getValues();
                 if (values != null) {
@@ -667,11 +683,20 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor {
                         }
                     }
                     ps.addBatch();
+                    if (++currentBatchSize == maxBatchSize) {
+                        batchIndex++;
+                        log.debug("Executing query {}; fieldIndexes: {}; batch index: {}; batch size: {}", new Object[]{sqlHolder.getSql(), sqlHolder.getFieldIndexes(), batchIndex, currentBatchSize});
+                        ps.executeBatch();
+                        currentBatchSize = 0;
+                    }
                 }
             }
 
-            log.debug("Executing query {}", new Object[]{sqlHolder});
-            ps.executeBatch();
+            if (currentBatchSize > 0) {
+                batchIndex++;
+                log.debug("Executing query {}; fieldIndexes: {}; batch index: {}; batch size: {}", new Object[]{sqlHolder.getSql(), sqlHolder.getFieldIndexes(), batchIndex, currentBatchSize});
+                ps.executeBatch();
+            }
             result.routeTo(flowFile, REL_SUCCESS);
             session.getProvenanceReporter().send(flowFile, functionContext.jdbcUrl);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/d319a3ef/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
index ebf8460..0ddac80 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
@@ -16,6 +16,8 @@
  */
 package org.apache.nifi.processors.standard
 
+
+import org.apache.commons.dbcp2.DelegatingConnection
 import org.apache.nifi.processor.exception.ProcessException
 import org.apache.nifi.processor.util.pattern.RollbackOnFailure
 import org.apache.nifi.reporting.InitializationException
@@ -36,17 +38,25 @@ import org.junit.runners.JUnit4
 
 import java.sql.Connection
 import java.sql.DriverManager
+import java.sql.PreparedStatement
 import java.sql.ResultSet
 import java.sql.SQLDataException
 import java.sql.SQLException
 import java.sql.SQLNonTransientConnectionException
 import java.sql.Statement
+import java.util.function.Supplier
 
 import static org.junit.Assert.assertEquals
 import static org.junit.Assert.assertFalse
+import static org.junit.Assert.assertNotNull
 import static org.junit.Assert.assertTrue
 import static org.junit.Assert.fail
+import static org.mockito.Matchers.anyMap
+import static org.mockito.Mockito.doAnswer
+import static org.mockito.Mockito.only
 import static org.mockito.Mockito.spy
+import static org.mockito.Mockito.times
+import static org.mockito.Mockito.verify
 
 /**
  * Unit tests for the PutDatabaseRecord processor
@@ -718,6 +728,99 @@ class TestPutDatabaseRecord {
         conn.close()
     }
 
+    @Test
+    void testInsertWithMaxBatchSize() throws InitializationException, ProcessException, SQLException, IOException {
+        recreateTable("PERSONS", createPersons)
+        final MockRecordParser parser = new MockRecordParser()
+        runner.addControllerService("parser", parser)
+        runner.enableControllerService(parser)
+
+        parser.addSchemaField("id", RecordFieldType.INT)
+        parser.addSchemaField("name", RecordFieldType.STRING)
+        parser.addSchemaField("code", RecordFieldType.INT)
+
+        (1..11).each {
+            parser.addRecord(it, "rec$it".toString(), 100 + it)
+        }
+
+        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
+        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE)
+        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')
+        runner.setProperty(PutDatabaseRecord.MAX_BATCH_SIZE, "5")
+
+        Supplier<PreparedStatement> spyStmt = createPreparedStatementSpy()
+
+        runner.enqueue(new byte[0])
+        runner.run()
+
+        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
+
+        assertEquals(11, getTableSize())
+
+        assertNotNull(spyStmt.get())
+        verify(spyStmt.get(), times(3)).executeBatch()
+    }
+
+    @Test
+    void testInsertWithDefaultMaxBatchSize() throws InitializationException, ProcessException, SQLException, IOException {
+        recreateTable("PERSONS", createPersons)
+        final MockRecordParser parser = new MockRecordParser()
+        runner.addControllerService("parser", parser)
+        runner.enableControllerService(parser)
+
+        parser.addSchemaField("id", RecordFieldType.INT)
+        parser.addSchemaField("name", RecordFieldType.STRING)
+        parser.addSchemaField("code", RecordFieldType.INT)
+
+        (1..11).each {
+            parser.addRecord(it, "rec$it".toString(), 100 + it)
+        }
+
+        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
+        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE)
+        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')
+
+        Supplier<PreparedStatement> spyStmt = createPreparedStatementSpy()
+
+        runner.enqueue(new byte[0])
+        runner.run()
+
+        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
+
+        assertEquals(11, getTableSize())
+
+        assertNotNull(spyStmt.get())
+        verify(spyStmt.get(), times(1)).executeBatch()
+    }
+
+    private Supplier<PreparedStatement> createPreparedStatementSpy() {
+        PreparedStatement spyStmt
+        doAnswer({ inv ->
+            new DelegatingConnection((Connection)inv.callRealMethod()) {
+                @Override
+                PreparedStatement prepareStatement(String sql) throws SQLException {
+                    spyStmt = spy(getDelegate().prepareStatement(sql))
+                }
+            }
+        }).when(dbcp).getConnection(anyMap())
+        return { spyStmt }
+    }
+
+    private int getTableSize() {
+        final Connection connection = dbcp.getConnection()
+        try {
+            final Statement stmt = connection.createStatement()
+            try {
+                final ResultSet rs = stmt.executeQuery('SELECT count(*) FROM PERSONS')
+                assertTrue(rs.next())
+                rs.getInt(1)
+            } finally {
+                stmt.close()
+            }
+        } finally {
+            connection.close()
+        }
+    }
 
     private void recreateTable(String tableName, String createSQL) throws ProcessException, SQLException {
         final Connection conn = dbcp.getConnection()