You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2019/11/19 21:50:15 UTC

[nifi] branch master updated: NIFI-5970 Handle multiple input FlowFiles at Put.initConnection

This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new b3880a4  NIFI-5970 Handle multiple input FlowFiles at Put.initConnection
b3880a4 is described below

commit b3880a4a067915c56aa3d1b602717eab7ffa02fb
Author: Koji Kawamura <ij...@apache.org>
AuthorDate: Wed Jul 17 11:28:37 2019 +0900

    NIFI-5970 Handle multiple input FlowFiles at Put.initConnection
    
    Signed-off-by: Matthew Burgess <ma...@apache.org>
    
    This closes #3583
---
 .../processor/util/pattern/PartialFunctions.java   |  2 +-
 .../apache/nifi/processor/util/pattern/Put.java    |  4 +-
 .../org/apache/nifi/processors/hive/PutHiveQL.java |  4 +-
 .../apache/nifi/processors/hive/PutHive3QL.java    |  2 +-
 .../apache/nifi/processors/hive/PutHive_1_1QL.java |  4 +-
 .../processors/standard/PutDatabaseRecord.java     |  4 +-
 .../apache/nifi/processors/standard/PutSQL.java    | 31 ++++++++--
 .../nifi/processors/standard/TestPutSQL.java       | 71 ++++++++++++++++++++++
 .../java/org/apache/nifi/dbcp/DBCPService.java     | 47 ++++++++++++++
 .../apache/nifi/dbcp/DBCPConnectionPoolLookup.java | 20 +++++-
 .../nifi/dbcp/TestDBCPConnectionPoolLookup.java    | 51 ++++++++++++++++
 11 files changed, 223 insertions(+), 17 deletions(-)

diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/PartialFunctions.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/PartialFunctions.java
index 7b969b0..9c27200 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/PartialFunctions.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/PartialFunctions.java
@@ -32,7 +32,7 @@ public class PartialFunctions {
 
     @FunctionalInterface
     public interface InitConnection<FC, C> {
-        C apply(ProcessContext context, ProcessSession session, FC functionContext, FlowFile flowFile) throws ProcessException;
+        C apply(ProcessContext context, ProcessSession session, FC functionContext, List<FlowFile> flowFiles) throws ProcessException;
     }
 
     @FunctionalInterface
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/Put.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/Put.java
index 80b8088..bcea833 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/Put.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/Put.java
@@ -93,8 +93,8 @@ public class Put<FC, C extends AutoCloseable> {
             return;
         }
 
-        // Only pass in a flow file if there is a single one present
-        try (C connection = initConnection.apply(context, session, functionContext, flowFiles.size() == 1 ? flowFiles.get(0) : null)) {
+        // Pass the FlowFiles to initialize a connection
+        try (C connection = initConnection.apply(context, session, functionContext, flowFiles)) {
 
             try {
                 // Execute the core function.
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java
index 943e288..e1aeade 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java
@@ -204,9 +204,9 @@ public class PutHiveQL extends AbstractHiveQLProcessor {
         }
     }
 
-    private InitConnection<FunctionContext, Connection> initConnection = (context, session, fc, ff) -> {
+    private InitConnection<FunctionContext, Connection> initConnection = (context, session, fc, ffs) -> {
         final HiveDBCPService dbcpService = context.getProperty(HIVE_DBCP_SERVICE).asControllerService(HiveDBCPService.class);
-        final Connection connection = dbcpService.getConnection(ff == null ? Collections.emptyMap() : ff.getAttributes());
+        final Connection connection = dbcpService.getConnection(ffs == null || ffs.isEmpty() ? Collections.emptyMap() : ffs.get(0).getAttributes());
         fc.connectionUrl = dbcpService.getConnectionURL();
         return connection;
     };
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3QL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3QL.java
index 7a5b389..7d137e7 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3QL.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3QL.java
@@ -205,7 +205,7 @@ public class PutHive3QL extends AbstractHive3QLProcessor {
         }
     }
 
-    private InitConnection<FunctionContext, Connection> initConnection = (context, session, fc, ff) -> {
+    private InitConnection<FunctionContext, Connection> initConnection = (context, session, fc, ffs) -> {
         final Hive3DBCPService dbcpService = context.getProperty(HIVE_DBCP_SERVICE).asControllerService(Hive3DBCPService.class);
         final Connection connection = dbcpService.getConnection();
         fc.connectionUrl = dbcpService.getConnectionURL();
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/main/java/org/apache/nifi/processors/hive/PutHive_1_1QL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/main/java/org/apache/nifi/processors/hive/PutHive_1_1QL.java
index d571789..d337f0d 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/main/java/org/apache/nifi/processors/hive/PutHive_1_1QL.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/main/java/org/apache/nifi/processors/hive/PutHive_1_1QL.java
@@ -204,9 +204,9 @@ public class PutHive_1_1QL extends AbstractHive_1_1QLProcessor {
         }
     }
 
-    private InitConnection<FunctionContext, Connection> initConnection = (context, session, fc, ff) -> {
+    private InitConnection<FunctionContext, Connection> initConnection = (context, session, fc, ffs) -> {
         final Hive_1_1DBCPService dbcpService = context.getProperty(HIVE_DBCP_SERVICE).asControllerService(Hive_1_1DBCPService.class);
-        final Connection connection = dbcpService.getConnection(ff == null ? Collections.emptyMap() : ff.getAttributes());
+        final Connection connection = dbcpService.getConnection(ffs == null || ffs.isEmpty() ? Collections.emptyMap() : ffs.get(0).getAttributes());
         fc.connectionUrl = dbcpService.getConnectionURL();
         return connection;
     };
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index 8b4ad78..55adabb 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -358,9 +358,9 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor {
                 .build();
     }
 
-    private final PartialFunctions.InitConnection<FunctionContext, Connection> initConnection = (c, s, fc, ff) -> {
+    private final PartialFunctions.InitConnection<FunctionContext, Connection> initConnection = (c, s, fc, ffs) -> {
         final Connection connection = c.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class)
-                .getConnection(ff == null ? Collections.emptyMap() : ff.getAttributes());
+                .getConnection(ffs == null || ffs.isEmpty() ? Collections.emptyMap() : ffs.get(0).getAttributes());
         try {
             fc.originalAutoCommit = connection.getAutoCommit();
             connection.setAutoCommit(false);
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
index 6a4e3a6..8c44f96 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
@@ -275,9 +275,9 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
         return poll.getFlowFiles();
     };
 
-    private final PartialFunctions.InitConnection<FunctionContext, Connection> initConnection = (c, s, fc, ff) -> {
+    private final PartialFunctions.InitConnection<FunctionContext, Connection> initConnection = (c, s, fc, ffs) -> {
         final Connection connection = c.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class)
-                .getConnection(ff == null ? Collections.emptyMap() : ff.getAttributes());
+                .getConnection(ffs == null || ffs.isEmpty() ? Collections.emptyMap() : ffs.get(0).getAttributes());
         try {
             fc.originalAutoCommit = connection.getAutoCommit();
             final boolean autocommit = c.getProperty(AUTO_COMMIT).asBoolean();
@@ -621,13 +621,18 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
         boolean fragmentedTransaction = false;
 
         final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+        final FlowFileFilter dbcpServiceFlowFileFilter = context.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class).getFlowFileFilter(batchSize);
         List<FlowFile> flowFiles;
         if (useTransactions) {
-            final TransactionalFlowFileFilter filter = new TransactionalFlowFileFilter();
+            final TransactionalFlowFileFilter filter = new TransactionalFlowFileFilter(dbcpServiceFlowFileFilter);
             flowFiles = session.get(filter);
             fragmentedTransaction = filter.isFragmentedTransaction();
         } else {
-            flowFiles = session.get(batchSize);
+            if (dbcpServiceFlowFileFilter == null) {
+                flowFiles = session.get(batchSize);
+            } else {
+                flowFiles = session.get(dbcpServiceFlowFileFilter);
+            }
         }
 
         if (flowFiles.isEmpty()) {
@@ -804,14 +809,28 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
      * across multiple FlowFiles) or that none of the FlowFiles belongs to a fragmented transaction
      */
     static class TransactionalFlowFileFilter implements FlowFileFilter {
+        private final FlowFileFilter nonFragmentedTransactionFilter;
         private String selectedId = null;
         private int numSelected = 0;
         private boolean ignoreFragmentIdentifiers = false;
 
+        public TransactionalFlowFileFilter(FlowFileFilter nonFragmentedTransactionFilter) {
+            this.nonFragmentedTransactionFilter = nonFragmentedTransactionFilter;
+        }
+
         public boolean isFragmentedTransaction() {
             return !ignoreFragmentIdentifiers;
         }
 
+        private FlowFileFilterResult filterNonFragmentedTransaction(final FlowFile flowFile) {
+            if (nonFragmentedTransactionFilter == null) {
+                return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
+            } else {
+                // Use non-fragmented tx filter for further filtering.
+                return nonFragmentedTransactionFilter.filter(flowFile);
+            }
+        }
+
         @Override
         public FlowFileFilterResult filter(final FlowFile flowFile) {
             final String fragmentId = flowFile.getAttribute(FRAGMENT_ID_ATTR);
@@ -821,7 +840,7 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
             // we accept any FlowFile that is also not part of a fragmented transaction.
             if (ignoreFragmentIdentifiers) {
                 if (fragmentId == null || "1".equals(fragCount)) {
-                    return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
+                    return filterNonFragmentedTransaction(flowFile);
                 } else {
                     return FlowFileFilterResult.REJECT_AND_CONTINUE;
                 }
@@ -831,7 +850,7 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
                 if (selectedId == null) {
                     // Only one FlowFile in the transaction.
                     ignoreFragmentIdentifiers = true;
-                    return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
+                    return filterNonFragmentedTransaction(flowFile);
                 } else {
                     // we've already selected 1 FlowFile, and this one doesn't match.
                     return FlowFileFilterResult.REJECT_AND_CONTINUE;
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
index b804447..5d155ee 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
@@ -16,6 +16,9 @@
  */
 package org.apache.nifi.processors.standard;
 
+import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE;
+import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_TERMINATE;
+import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -38,6 +41,7 @@ import java.time.LocalTime;
 import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
@@ -47,6 +51,7 @@ import javax.xml.bind.DatatypeConverter;
 import org.apache.commons.lang3.RandomUtils;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.processor.FlowFileFilter;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
 import org.apache.nifi.reporting.InitializationException;
@@ -1458,6 +1463,72 @@ public class TestPutSQL {
         }
     }
 
+    private Map<String, String> createFragmentedTransactionAttributes(String id, int count, int index) {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("fragment.identifier", id);
+        attributes.put("fragment.count", String.valueOf(count));
+        attributes.put("fragment.index", String.valueOf(index));
+        return attributes;
+    }
+
+    @Test
+    public void testTransactionalFlowFileFilter() {
+        final MockFlowFile ff0 = new MockFlowFile(0);
+        final MockFlowFile ff1 = new MockFlowFile(1);
+        final MockFlowFile ff2 = new MockFlowFile(2);
+        final MockFlowFile ff3 = new MockFlowFile(3);
+        final MockFlowFile ff4 = new MockFlowFile(4);
+
+        ff0.putAttributes(createFragmentedTransactionAttributes("tx-1", 3, 0));
+        ff1.putAttributes(Collections.singletonMap("accept", "false"));
+        ff2.putAttributes(createFragmentedTransactionAttributes("tx-1", 3, 1));
+        ff3.putAttributes(Collections.singletonMap("accept", "true"));
+        ff4.putAttributes(createFragmentedTransactionAttributes("tx-1", 3, 2));
+
+        // TEST 1: Fragmented TX with null service filter
+        // Even if the controller service does not have filtering rule, tx filter should work.
+        FlowFileFilter txFilter = new PutSQL.TransactionalFlowFileFilter(null);
+        // Should perform a fragmented tx.
+        assertEquals(ACCEPT_AND_CONTINUE, txFilter.filter(ff0));
+        assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff1));
+        assertEquals(ACCEPT_AND_CONTINUE, txFilter.filter(ff2));
+        assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff3));
+        assertEquals(ACCEPT_AND_TERMINATE, txFilter.filter(ff4));
+
+        // TEST 2: Non-Fragmented TX with null service filter
+        txFilter = new PutSQL.TransactionalFlowFileFilter(null);
+        // Should perform a non-fragmented tx.
+        assertEquals(ACCEPT_AND_CONTINUE, txFilter.filter(ff1));
+        assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff0));
+        assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff2));
+        assertEquals(ACCEPT_AND_CONTINUE, txFilter.filter(ff3));
+        assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff4));
+
+
+        final FlowFileFilter nonTxFilter = flowFile -> "true".equals(flowFile.getAttribute("accept"))
+            ? ACCEPT_AND_CONTINUE
+            : REJECT_AND_CONTINUE;
+
+        // TEST 3: Fragmented TX with a service filter
+        // Even if the controller service does not have filtering rule, tx filter should work.
+        txFilter = new PutSQL.TransactionalFlowFileFilter(nonTxFilter);
+        // Should perform a fragmented tx. The nonTxFilter doesn't affect in this case.
+        assertEquals(ACCEPT_AND_CONTINUE, txFilter.filter(ff0));
+        assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff1));
+        assertEquals(ACCEPT_AND_CONTINUE, txFilter.filter(ff2));
+        assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff3));
+        assertEquals(ACCEPT_AND_TERMINATE, txFilter.filter(ff4));
+
+        // TEST 4: Non-Fragmented TX with a service filter
+        txFilter = new PutSQL.TransactionalFlowFileFilter(nonTxFilter);
+        // Should perform a non-fragmented tx and use the nonTxFilter.
+        assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff1));
+        assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff0));
+        assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff2));
+        assertEquals(ACCEPT_AND_CONTINUE, txFilter.filter(ff3));
+        assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff4));
+    }
+
     /**
      * Simple implementation only for testing purposes
      */
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-api/src/main/java/org/apache/nifi/dbcp/DBCPService.java b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-api/src/main/java/org/apache/nifi/dbcp/DBCPService.java
index ffc9b3a..3c88f83 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-api/src/main/java/org/apache/nifi/dbcp/DBCPService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-api/src/main/java/org/apache/nifi/dbcp/DBCPService.java
@@ -18,12 +18,18 @@ package org.apache.nifi.dbcp;
 
 import java.sql.Connection;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.processor.FlowFileFilter;
+import org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult;
 import org.apache.nifi.processor.exception.ProcessException;
 
+import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE;
+import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.REJECT_AND_TERMINATE;
+
 /**
  * Definition for Database Connection Pooling Service.
  *
@@ -48,4 +54,45 @@ public interface DBCPService extends ControllerService {
         // without attributes
         return getConnection();
     }
+
+    /**
+     * Implementation classes should override this method to provide DBCPService specific FlowFile filtering rule.
+     * For example, when processing multiple incoming FlowFiles at the same time, every FlowFile should have the same attribute value.
+     * Components using this service and also accepting multiple incoming FlowFiles should use
+     * the FlowFileFilter returned by this method to get target FlowFiles from a process session.
+     * @return a FlowFileFilter or null if no service specific filtering is required
+     */
+    default FlowFileFilter getFlowFileFilter() {
+        return null;
+    }
+
+    /**
+     * An utility default method to composite DBCPService specific filtering provided by {@link #getFlowFileFilter()} and batch size limitation.
+     * Implementation classes do not have to override this method. Instead, override {@link #getFlowFileFilter()} to provide service specific filtering.
+     * Components using this service and also accepting multiple incoming FlowFiles should use
+     * the FlowFileFilter returned by this method to get target FlowFiles from a process session.
+     * @param batchSize the maximum number of FlowFiles to accept
+     * @return a composited FlowFileFilter having service specific filtering and batch size limitation, or null if no service specific filtering is required.
+     */
+    default FlowFileFilter getFlowFileFilter(int batchSize) {
+        final FlowFileFilter filter = getFlowFileFilter();
+        if (filter == null) {
+            return null;
+        }
+
+        final AtomicInteger count = new AtomicInteger(0);
+        return flowFile -> {
+            if (count.get() >= batchSize) {
+                return REJECT_AND_TERMINATE;
+            }
+
+            final FlowFileFilterResult result = filter.filter(flowFile);
+            if (ACCEPT_AND_CONTINUE.equals(result)) {
+                count.incrementAndGet();
+                return ACCEPT_AND_CONTINUE;
+            } else {
+                return result;
+            }
+        };
+    }
 }
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPoolLookup.java b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPoolLookup.java
index b0fb964..978c912 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPoolLookup.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPoolLookup.java
@@ -16,16 +16,21 @@
  */
 package org.apache.nifi.dbcp;
 
-
+import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.FlowFileFilter;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.service.lookup.AbstractSingleAttributeBasedControllerServiceLookup;
 
 import java.sql.Connection;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE;
+import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE;
 
 @Tags({ "dbcp", "jdbc", "database", "connection", "pooling", "store" })
 @CapabilityDescription("Provides a DBCPService that can be used to dynamically select another DBCPService. This service " +
@@ -61,4 +66,17 @@ public class DBCPConnectionPoolLookup
     public Connection getConnection(Map<String, String> attributes) {
         return lookupService(attributes).getConnection(attributes);
     }
+
+    @Override
+    public FlowFileFilter getFlowFileFilter() {
+        final AtomicReference<String> ref = new AtomicReference<>();
+        return flowFile -> {
+            final String flowFileDBName = flowFile.getAttribute(DATABASE_NAME_ATTRIBUTE);
+            if (StringUtils.isEmpty(flowFileDBName)) {
+                throw new ProcessException("FlowFile attributes must contain an attribute name '" + DATABASE_NAME_ATTRIBUTE + "'");
+            }
+            final String databaseName = ref.compareAndSet(null, flowFileDBName) ? flowFileDBName : ref.get();
+            return flowFileDBName.equals(databaseName) ? ACCEPT_AND_CONTINUE : REJECT_AND_CONTINUE;
+        };
+    }
 }
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/TestDBCPConnectionPoolLookup.java b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/TestDBCPConnectionPoolLookup.java
index d02437f..978907e 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/TestDBCPConnectionPoolLookup.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/TestDBCPConnectionPoolLookup.java
@@ -17,14 +17,17 @@
 package org.apache.nifi.dbcp;
 
 import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.processor.FlowFileFilter;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.sql.Connection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -144,6 +147,54 @@ public class TestDBCPConnectionPoolLookup {
         runner.assertNotValid(dbcpLookupService);
     }
 
+    @Test
+    public void testFlowFileFiltering() {
+        final FlowFileFilter filter = dbcpLookupService.getFlowFileFilter();
+        assertNotNull(filter);
+
+        final MockFlowFile ff0 = new MockFlowFile(0);
+        final MockFlowFile ff1 = new MockFlowFile(1);
+        final MockFlowFile ff2 = new MockFlowFile(2);
+        final MockFlowFile ff3 = new MockFlowFile(3);
+        final MockFlowFile ff4 = new MockFlowFile(4);
+
+        ff0.putAttributes(Collections.singletonMap(DBCPConnectionPoolLookup.DATABASE_NAME_ATTRIBUTE, "db-A"));
+        ff1.putAttributes(Collections.singletonMap(DBCPConnectionPoolLookup.DATABASE_NAME_ATTRIBUTE, "db-B"));
+        ff2.putAttributes(Collections.singletonMap(DBCPConnectionPoolLookup.DATABASE_NAME_ATTRIBUTE, "db-A"));
+        ff3.putAttributes(Collections.singletonMap(DBCPConnectionPoolLookup.DATABASE_NAME_ATTRIBUTE, "db-B"));
+        ff4.putAttributes(Collections.singletonMap(DBCPConnectionPoolLookup.DATABASE_NAME_ATTRIBUTE, "db-A"));
+
+        assertEquals(FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE, filter.filter(ff0));
+        assertEquals(FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE, filter.filter(ff1));
+        assertEquals(FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE, filter.filter(ff2));
+        assertEquals(FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE, filter.filter(ff3));
+        assertEquals(FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE, filter.filter(ff4));
+    }
+
+    @Test
+    public void testFlowFileFilteringWithBatchSize() {
+        final FlowFileFilter filter = dbcpLookupService.getFlowFileFilter(2);
+        assertNotNull(filter);
+
+        final MockFlowFile ff0 = new MockFlowFile(0);
+        final MockFlowFile ff1 = new MockFlowFile(1);
+        final MockFlowFile ff2 = new MockFlowFile(2);
+        final MockFlowFile ff3 = new MockFlowFile(3);
+        final MockFlowFile ff4 = new MockFlowFile(4);
+
+        ff0.putAttributes(Collections.singletonMap(DBCPConnectionPoolLookup.DATABASE_NAME_ATTRIBUTE, "db-A"));
+        ff1.putAttributes(Collections.singletonMap(DBCPConnectionPoolLookup.DATABASE_NAME_ATTRIBUTE, "db-B"));
+        ff2.putAttributes(Collections.singletonMap(DBCPConnectionPoolLookup.DATABASE_NAME_ATTRIBUTE, "db-A"));
+        ff3.putAttributes(Collections.singletonMap(DBCPConnectionPoolLookup.DATABASE_NAME_ATTRIBUTE, "db-B"));
+        ff4.putAttributes(Collections.singletonMap(DBCPConnectionPoolLookup.DATABASE_NAME_ATTRIBUTE, "db-A"));
+
+        assertEquals(FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE, filter.filter(ff0));
+        assertEquals(FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE, filter.filter(ff1));
+        assertEquals(FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE, filter.filter(ff2));
+        assertEquals(FlowFileFilter.FlowFileFilterResult.REJECT_AND_TERMINATE, filter.filter(ff3));
+        assertEquals(FlowFileFilter.FlowFileFilterResult.REJECT_AND_TERMINATE, filter.filter(ff4));
+    }
+
     /**
      * A mock DBCPService that will always return the passed in MockConnection.
      */