You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tp...@apache.org on 2023/02/22 18:28:15 UTC

[nifi] branch main updated: NIFI-11094: Allow CaptureChangeMySQL to send multiple events per FlowFile

This is an automated email from the ASF dual-hosted git repository.

tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new a2b98e0c9c NIFI-11094: Allow CaptureChangeMySQL to send multiple events per FlowFile
a2b98e0c9c is described below

commit a2b98e0c9c17ba7603dcc666bfae543abeb1ea7a
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Mon Jan 30 16:59:53 2023 -0500

    NIFI-11094: Allow CaptureChangeMySQL to send multiple events per FlowFile
    
    This closes #6907.
    
    Co-authored-by: Tamas Palfy <tp...@apache.org>
    
    Signed-off-by: Tamas Palfy <tp...@apache.org>
---
 .../nifi/cdc/event/io/AbstractEventWriter.java     |  10 +-
 .../org/apache/nifi/cdc/event/io/EventWriter.java  |  16 +-
 .../cdc/event/io/EventWriterConfiguration.java     |  82 ++++++++
 .../cdc/event/io/FlowFileEventWriteStrategy.java   |  54 +++++
 .../mysql/event/io/AbstractBinlogEventWriter.java  | 128 +++++++++--
 .../event/io/AbstractBinlogTableEventWriter.java   |  19 --
 .../event/io/CommitTransactionEventWriter.java     |  15 ++
 .../nifi/cdc/mysql/event/io/DDLEventWriter.java    |  31 ++-
 .../nifi/cdc/mysql/event/io/DeleteRowsWriter.java  |  35 ++--
 .../nifi/cdc/mysql/event/io/InsertRowsWriter.java  |  34 +--
 .../nifi/cdc/mysql/event/io/UpdateRowsWriter.java  |  34 +--
 .../cdc/mysql/processors/CaptureChangeMySQL.java   | 165 ++++++++++++---
 .../mysql/processors/CaptureChangeMySQLTest.groovy | 233 +++++++++++++++++++++
 13 files changed, 731 insertions(+), 125 deletions(-)

diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/AbstractEventWriter.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/AbstractEventWriter.java
index c805d3cf7f..e18a3db5d1 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/AbstractEventWriter.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/AbstractEventWriter.java
@@ -33,7 +33,9 @@ public abstract class AbstractEventWriter<T extends EventInfo> implements EventW
 
     // Common method to create a JSON generator and start the root object. Should be called by sub-classes unless they need their own generator and such.
     protected void startJson(OutputStream outputStream, T event) throws IOException {
-        jsonGenerator = createJsonGenerator(outputStream);
+        if (jsonGenerator == null) {
+            jsonGenerator = createJsonGenerator(outputStream);
+        }
         jsonGenerator.writeStartObject();
         String eventType = event.getEventType();
         if (eventType == null) {
@@ -54,11 +56,15 @@ public abstract class AbstractEventWriter<T extends EventInfo> implements EventW
             throw new IOException("endJson called without a JsonGenerator");
         }
         jsonGenerator.writeEndObject();
+    }
+
+    protected void endFile() throws IOException {
         jsonGenerator.flush();
         jsonGenerator.close();
+        jsonGenerator = null;
     }
 
-    private JsonGenerator createJsonGenerator(OutputStream out) throws IOException {
+    protected JsonGenerator createJsonGenerator(OutputStream out) throws IOException {
         return JSON_FACTORY.createGenerator(out);
     }
 }
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/EventWriter.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/EventWriter.java
index 096e3c1e69..e74a56ee99 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/EventWriter.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/EventWriter.java
@@ -32,12 +32,14 @@ public interface EventWriter<T extends EventInfo> {
     /**
      * Writes the given event to the process session, possibly via transferring it to the specified relationship (usually used for success)
      *
-     * @param session           The session to write the event to
-     * @param transitUri        The URI indicating the source MySQL system from which the specified event is associated
-     * @param eventInfo         The event data
-     * @param currentSequenceId the current sequence ID
-     * @param relationship      A relationship to transfer any flowfile(s) to
-     * @return a sequence ID, usually incremented from the specified current sequence id by the number of flow files transferred and/or committed
+     * @param session                   The session to write the event to
+     * @param transitUri                The URI indicating the source MySQL system from which the specified event is associated
+     * @param eventInfo                 The event data
+     * @param currentSequenceId         The current sequence ID
+     * @param relationship              A relationship to transfer any flowfile(s) to
+     * @param eventWriterConfiguration  A configuration object used for FlowFile management (how many events to write to each FlowFile, e.g.)
+     * @return a sequence ID, usually incremented from the specified current sequence ID by the number of FlowFiles transferred and/or committed
      */
-    long writeEvent(final ProcessSession session, String transitUri, final T eventInfo, final long currentSequenceId, Relationship relationship);
+    long writeEvent(final ProcessSession session, String transitUri, final T eventInfo, final long currentSequenceId, Relationship relationship,
+                    final EventWriterConfiguration eventWriterConfiguration);
 }
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/EventWriterConfiguration.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/EventWriterConfiguration.java
new file mode 100644
index 0000000000..153ce8319c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/EventWriterConfiguration.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.event.io;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.nifi.flowfile.FlowFile;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class EventWriterConfiguration {
+
+    private final FlowFileEventWriteStrategy flowFileEventWriteStrategy;
+    private final int numberOfEventsPerFlowFile;
+
+    private int numberOfEventsWritten;
+
+    private FlowFile currentFlowFile;
+    private OutputStream flowFileOutputStream;
+    private JsonGenerator jsonGenerator;
+
+    public EventWriterConfiguration(FlowFileEventWriteStrategy flowFileEventWriteStrategy, int numberOfEventsPerFlowFile) {
+        this.flowFileEventWriteStrategy = flowFileEventWriteStrategy;
+        this.numberOfEventsPerFlowFile = numberOfEventsPerFlowFile;
+    }
+
+    public FlowFileEventWriteStrategy getFlowFileEventWriteStrategy() {
+        return flowFileEventWriteStrategy;
+    }
+
+    public int getNumberOfEventsWritten() {
+        return numberOfEventsWritten;
+    }
+
+    public void incrementNumberOfEventsWritten() {
+        this.numberOfEventsWritten++;
+    }
+
+    public void startNewFlowFile(FlowFile flowFile, OutputStream flowFileOutputStream, JsonGenerator jsonGenerator) {
+        this.currentFlowFile = flowFile;
+        this.flowFileOutputStream = flowFileOutputStream;
+        this.jsonGenerator = jsonGenerator;
+    }
+
+    public void cleanUp() throws IOException {
+        this.currentFlowFile = null;
+        this.flowFileOutputStream.close();
+        this.flowFileOutputStream = null;
+        this.jsonGenerator = null;
+        this.numberOfEventsWritten = 0;
+    }
+
+    public int getNumberOfEventsPerFlowFile() {
+        return numberOfEventsPerFlowFile;
+    }
+
+    public FlowFile getCurrentFlowFile() {
+        return currentFlowFile;
+    }
+
+    public OutputStream getFlowFileOutputStream() {
+        return flowFileOutputStream;
+    }
+
+    public JsonGenerator getJsonGenerator() {
+        return jsonGenerator;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/FlowFileEventWriteStrategy.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/FlowFileEventWriteStrategy.java
new file mode 100644
index 0000000000..bd39174db2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/FlowFileEventWriteStrategy.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.event.io;
+
+import org.apache.nifi.components.DescribedValue;
+
+public enum FlowFileEventWriteStrategy implements DescribedValue {
+    MAX_EVENTS_PER_FLOWFILE(
+            "Max Events Per FlowFile",
+            "This strategy causes at most the number of events specified in the 'Number of Events Per FlowFile' property to be written per FlowFile. If the processor is stopped before the "
+                    + "specified number of events has been written (or the event queue becomes empty), the fewer number of events will still be written as a FlowFile before stopping."
+    ),
+    ONE_TRANSACTION_PER_FLOWFILE(
+            "One Transaction Per FlowFile",
+            "This strategy causes each event from a transaction (from BEGIN to COMMIT) to be written to a FlowFile"
+    );
+
+    private String displayName;
+    private String description;
+
+    FlowFileEventWriteStrategy(String displayName, String description) {
+        this.displayName = displayName;
+        this.description = description;
+    }
+
+    @Override
+    public String getValue() {
+        return name();
+    }
+
+    @Override
+    public String getDisplayName() {
+        return displayName;
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogEventWriter.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogEventWriter.java
index 09186f23c8..420da6c2d4 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogEventWriter.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogEventWriter.java
@@ -16,14 +16,21 @@
  */
 package org.apache.nifi.cdc.mysql.event.io;
 
+import org.apache.nifi.cdc.event.EventInfo;
+import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
+import org.apache.nifi.cdc.event.io.FlowFileEventWriteStrategy;
 import org.apache.nifi.cdc.mysql.event.BinlogEventInfo;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.cdc.event.io.AbstractEventWriter;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.FlowFileAccessException;
+import org.apache.nifi.processor.exception.ProcessException;
 
 import java.io.IOException;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -44,35 +51,114 @@ public abstract class AbstractBinlogEventWriter<T extends BinlogEventInfo> exten
     }
 
     protected Map<String, String> getCommonAttributes(final long sequenceId, BinlogEventInfo eventInfo) {
-        return new HashMap<String, String>() {
-            {
-                put(SEQUENCE_ID_KEY, Long.toString(sequenceId));
-                put(CDC_EVENT_TYPE_ATTRIBUTE, eventInfo.getEventType());
-                String gtidSet = eventInfo.getBinlogGtidSet();
-                if (gtidSet == null) {
-                    put(BinlogEventInfo.BINLOG_FILENAME_KEY, eventInfo.getBinlogFilename());
-                    put(BinlogEventInfo.BINLOG_POSITION_KEY, Long.toString(eventInfo.getBinlogPosition()));
-                } else {
-                    put(BinlogEventInfo.BINLOG_GTIDSET_KEY, gtidSet);
-                }
-                put(CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON);
-            }
-        };
+        final Map<String, String> commonAttributeMap = new HashMap<>();
+
+        commonAttributeMap.put(SEQUENCE_ID_KEY, Long.toString(sequenceId));
+        commonAttributeMap.put(CDC_EVENT_TYPE_ATTRIBUTE, eventInfo.getEventType());
+        String gtidSet = eventInfo.getBinlogGtidSet();
+        if (gtidSet == null) {
+            commonAttributeMap.put(BinlogEventInfo.BINLOG_FILENAME_KEY, eventInfo.getBinlogFilename());
+            commonAttributeMap.put(BinlogEventInfo.BINLOG_POSITION_KEY, Long.toString(eventInfo.getBinlogPosition()));
+        } else {
+            commonAttributeMap.put(BinlogEventInfo.BINLOG_GTIDSET_KEY, gtidSet);
+        }
+        commonAttributeMap.put(CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON);
+
+        return commonAttributeMap;
     }
 
     // Default implementation for binlog events
     @Override
-    public long writeEvent(ProcessSession session, String transitUri, T eventInfo, long currentSequenceId, Relationship relationship) {
-        FlowFile flowFile = session.create();
-        flowFile = session.write(flowFile, (outputStream) -> {
+    public long writeEvent(ProcessSession session, String transitUri, T eventInfo, long currentSequenceId, Relationship relationship,
+                           final EventWriterConfiguration eventWriterConfiguration) {
+        configureEventWriter(eventWriterConfiguration, session, eventInfo);
+
+        OutputStream outputStream = eventWriterConfiguration.getFlowFileOutputStream();
+        try {
             super.startJson(outputStream, eventInfo);
             writeJson(eventInfo);
             // Nothing in the body
             super.endJson();
-        });
-        flowFile = session.putAllAttributes(flowFile, getCommonAttributes(currentSequenceId, eventInfo));
-        session.transfer(flowFile, relationship);
-        session.getProvenanceReporter().receive(flowFile, transitUri);
+        } catch (IOException ioe) {
+            throw new UncheckedIOException("Write JSON start array failed", ioe);
+        }
+
+        eventWriterConfiguration.incrementNumberOfEventsWritten();
+
+        // Check if it is time to finish the FlowFile
+        if (maxEventsPerFlowFile(eventWriterConfiguration)
+                && eventWriterConfiguration.getNumberOfEventsWritten() == eventWriterConfiguration.getNumberOfEventsPerFlowFile()) {
+            finishAndTransferFlowFile(session, eventWriterConfiguration, transitUri, currentSequenceId, eventInfo, relationship);
+        }
         return currentSequenceId + 1;
     }
+
+    public void finishAndTransferFlowFile(final ProcessSession session, final EventWriterConfiguration eventWriterConfiguration, final String transitUri, final long seqId,
+                                          final BinlogEventInfo eventInfo, final Relationship relationship) {
+        if (writtenMultipleEvents(eventWriterConfiguration)) {
+            try {
+                jsonGenerator.writeEndArray();
+            } catch (IOException ioe) {
+                throw new UncheckedIOException("Write JSON end array failed", ioe);
+            }
+        }
+        try {
+            endFile();
+
+            FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile();
+            if (session == null || flowFile == null) {
+                throw new ProcessException("No open FlowFile or ProcessSession to write to");
+            }
+            flowFile = session.putAllAttributes(flowFile, getCommonAttributes(seqId, eventInfo));
+            session.transfer(flowFile, relationship);
+            session.getProvenanceReporter().receive(flowFile, transitUri);
+
+            eventWriterConfiguration.cleanUp();
+        } catch (IOException ioe) {
+            throw new FlowFileAccessException("Failed to close event writer", ioe);
+        }
+    }
+
+    protected void configureEventWriter(final EventWriterConfiguration eventWriterConfiguration, final ProcessSession session, final EventInfo eventInfo) {
+        FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile();
+        if (flowFile == null) {
+            flowFile = session.create();
+            OutputStream flowFileOutputStream = session.write(flowFile);
+            if (eventWriterConfiguration.getJsonGenerator() == null) {
+                try {
+                    jsonGenerator = createJsonGenerator(flowFileOutputStream);
+                } catch (IOException ioe) {
+                    throw new UncheckedIOException("JSON Generator creation failed", ioe);
+                }
+            }
+            if (multipleEventsPerFlowFile(eventWriterConfiguration)) {
+                try {
+                    jsonGenerator.writeStartArray();
+                } catch (IOException ioe) {
+                    throw new UncheckedIOException("Write JSON start array failed", ioe);
+                }
+            }
+            eventWriterConfiguration.startNewFlowFile(flowFile, flowFileOutputStream, jsonGenerator);
+        }
+        jsonGenerator = eventWriterConfiguration.getJsonGenerator();
+    }
+
+    private boolean multipleEventsPerFlowFile(EventWriterConfiguration eventWriterConfiguration) {
+        return (maxEventsPerFlowFile(eventWriterConfiguration)
+                && eventWriterConfiguration.getNumberOfEventsPerFlowFile() > 1)
+                || oneTransactionPerFlowFile(eventWriterConfiguration);
+    }
+
+    private boolean writtenMultipleEvents(EventWriterConfiguration eventWriterConfiguration) {
+        return eventWriterConfiguration.getNumberOfEventsWritten() > 1
+                || oneTransactionPerFlowFile(eventWriterConfiguration);
+    }
+
+    protected boolean maxEventsPerFlowFile(EventWriterConfiguration eventWriterConfiguration) {
+        return FlowFileEventWriteStrategy.MAX_EVENTS_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy());
+    }
+
+    protected boolean oneTransactionPerFlowFile(EventWriterConfiguration eventWriterConfiguration) {
+        return FlowFileEventWriteStrategy.ONE_TRANSACTION_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy());
+    }
 }
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogTableEventWriter.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogTableEventWriter.java
index a674386de7..f618a76a6f 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogTableEventWriter.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogTableEventWriter.java
@@ -16,9 +16,6 @@
  */
 package org.apache.nifi.cdc.mysql.event.io;
 
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.cdc.mysql.event.BinlogTableEventInfo;
 
 import java.io.IOException;
@@ -46,20 +43,4 @@ public abstract class AbstractBinlogTableEventWriter<T extends BinlogTableEventI
             jsonGenerator.writeNullField("table_id");
         }
     }
-
-    // Default implementation for table-related binlog events
-    @Override
-    public long writeEvent(ProcessSession session, String transitUri, T eventInfo, long currentSequenceId, Relationship relationship) {
-        FlowFile flowFile = session.create();
-        flowFile = session.write(flowFile, (outputStream) -> {
-            super.startJson(outputStream, eventInfo);
-            writeJson(eventInfo);
-            // Nothing in the body
-            super.endJson();
-        });
-        flowFile = session.putAllAttributes(flowFile, getCommonAttributes(currentSequenceId, eventInfo));
-        session.transfer(flowFile, relationship);
-        session.getProvenanceReporter().receive(flowFile, transitUri);
-        return currentSequenceId + 1;
-    }
 }
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/CommitTransactionEventWriter.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/CommitTransactionEventWriter.java
index c69b4b242b..9ed576b023 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/CommitTransactionEventWriter.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/CommitTransactionEventWriter.java
@@ -17,7 +17,10 @@
 package org.apache.nifi.cdc.mysql.event.io;
 
 
+import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
 import org.apache.nifi.cdc.mysql.event.CommitTransactionEventInfo;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
 
 import java.io.IOException;
 
@@ -25,6 +28,18 @@ import java.io.IOException;
  * A writer for events corresponding to the end (i.e. commit) of a MySQL transaction
  */
 public class CommitTransactionEventWriter extends AbstractBinlogEventWriter<CommitTransactionEventInfo> {
+
+    @Override
+    public long writeEvent(ProcessSession session, String transitUri, CommitTransactionEventInfo eventInfo, long currentSequenceId,
+                           Relationship relationship, EventWriterConfiguration eventWriterConfiguration) {
+        long sequenceId = super.writeEvent(session, transitUri, eventInfo, currentSequenceId, relationship, eventWriterConfiguration);
+        // If writing one transaction per flowfile, finish the flowfile here before committing the session
+        if (oneTransactionPerFlowFile(eventWriterConfiguration)) {
+            super.finishAndTransferFlowFile(session, eventWriterConfiguration, transitUri, sequenceId, eventInfo, relationship);
+        }
+        return sequenceId;
+    }
+
     protected void writeJson(CommitTransactionEventInfo event) throws IOException {
         super.writeJson(event);
         if (event.getDatabaseName() != null) {
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DDLEventWriter.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DDLEventWriter.java
index 0064c29322..afe084c6ad 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DDLEventWriter.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DDLEventWriter.java
@@ -16,28 +16,43 @@
  */
 package org.apache.nifi.cdc.mysql.event.io;
 
-import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.cdc.mysql.event.DDLEventInfo;
 
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
+
+
 /**
  * A writer class to output MySQL binlog Data Definition Language (DDL) events to flow file(s).
  */
 public class DDLEventWriter extends AbstractBinlogTableEventWriter<DDLEventInfo> {
 
     @Override
-    public long writeEvent(ProcessSession session, String transitUri, DDLEventInfo eventInfo, long currentSequenceId, Relationship relationship) {
-        FlowFile flowFile = session.create();
-        flowFile = session.write(flowFile, (outputStream) -> {
+    public long writeEvent(ProcessSession session, String transitUri, DDLEventInfo eventInfo, long currentSequenceId, Relationship relationship,
+                           final EventWriterConfiguration eventWriterConfiguration) {
+        configureEventWriter(eventWriterConfiguration, session, eventInfo);
+        OutputStream outputStream = eventWriterConfiguration.getFlowFileOutputStream();
+
+        try {
             super.startJson(outputStream, eventInfo);
             super.writeJson(eventInfo);
             jsonGenerator.writeStringField("query", eventInfo.getQuery());
             super.endJson();
-        });
-        flowFile = session.putAllAttributes(flowFile, getCommonAttributes(currentSequenceId, eventInfo));
-        session.transfer(flowFile, relationship);
-        session.getProvenanceReporter().receive(flowFile, transitUri);
+        } catch (IOException ioe) {
+            throw new UncheckedIOException("Write JSON start array failed", ioe);
+        }
+
+        eventWriterConfiguration.incrementNumberOfEventsWritten();
+
+        // Check if it is time to finish the FlowFile
+        if (maxEventsPerFlowFile(eventWriterConfiguration)
+                && eventWriterConfiguration.getNumberOfEventsWritten() == eventWriterConfiguration.getNumberOfEventsPerFlowFile()) {
+            finishAndTransferFlowFile(session, eventWriterConfiguration, transitUri, currentSequenceId, eventInfo, relationship);
+        }
         return currentSequenceId + 1;
     }
 }
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DeleteRowsWriter.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DeleteRowsWriter.java
index 7e316d41c5..4828ce8662 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DeleteRowsWriter.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DeleteRowsWriter.java
@@ -16,18 +16,18 @@
  */
 package org.apache.nifi.cdc.mysql.event.io;
 
+import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
 import org.apache.nifi.cdc.mysql.event.MySQLCDCUtils;
-import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.cdc.event.ColumnDefinition;
 import org.apache.nifi.cdc.mysql.event.DeleteRowsEventInfo;
 import org.apache.nifi.processor.Relationship;
 
 import java.io.IOException;
+import java.io.OutputStream;
 import java.io.Serializable;
+import java.io.UncheckedIOException;
 import java.util.BitSet;
-import java.util.concurrent.atomic.AtomicLong;
-
 
 /**
  * A writer class to output MySQL binlog "delete rows" events to flow file(s).
@@ -42,12 +42,13 @@ public class DeleteRowsWriter extends AbstractBinlogTableEventWriter<DeleteRowsE
      * @return The next available CDC sequence ID for use by the CDC processor
      */
     @Override
-    public long writeEvent(final ProcessSession session, String transitUri, final DeleteRowsEventInfo eventInfo, final long currentSequenceId, Relationship relationship) {
-        final AtomicLong seqId = new AtomicLong(currentSequenceId);
+    public long writeEvent(final ProcessSession session, String transitUri, final DeleteRowsEventInfo eventInfo, final long currentSequenceId, Relationship relationship,
+                           final EventWriterConfiguration eventWriterConfiguration) {
+        long seqId = currentSequenceId;
         for (Serializable[] row : eventInfo.getRows()) {
-
-            FlowFile flowFile = session.create();
-            flowFile = session.write(flowFile, outputStream -> {
+            configureEventWriter(eventWriterConfiguration, session, eventInfo);
+            OutputStream outputStream = eventWriterConfiguration.getFlowFileOutputStream();
+            try {
 
                 super.startJson(outputStream, eventInfo);
                 super.writeJson(eventInfo);
@@ -56,14 +57,20 @@ public class DeleteRowsWriter extends AbstractBinlogTableEventWriter<DeleteRowsE
                 writeRow(eventInfo, row, bitSet);
 
                 super.endJson();
-            });
+            } catch (IOException ioe) {
+                throw new UncheckedIOException("Write JSON start array failed", ioe);
+            }
 
-            flowFile = session.putAllAttributes(flowFile, getCommonAttributes(seqId.get(), eventInfo));
-            session.transfer(flowFile, relationship);
-            session.getProvenanceReporter().receive(flowFile, transitUri);
-            seqId.getAndIncrement();
+            eventWriterConfiguration.incrementNumberOfEventsWritten();
+
+            // Check if it is time to finish the FlowFile
+            if (maxEventsPerFlowFile(eventWriterConfiguration)
+                    && eventWriterConfiguration.getNumberOfEventsWritten() == eventWriterConfiguration.getNumberOfEventsPerFlowFile()) {
+                finishAndTransferFlowFile(session, eventWriterConfiguration, transitUri, seqId, eventInfo, relationship);
+            }
+            seqId++;
         }
-        return seqId.get();
+        return seqId;
     }
 
     protected void writeRow(DeleteRowsEventInfo event, Serializable[] row, BitSet includedColumns) throws IOException {
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/InsertRowsWriter.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/InsertRowsWriter.java
index 6cdfbdd0fb..4453889d53 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/InsertRowsWriter.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/InsertRowsWriter.java
@@ -16,17 +16,18 @@
  */
 package org.apache.nifi.cdc.mysql.event.io;
 
+import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
 import org.apache.nifi.cdc.mysql.event.MySQLCDCUtils;
-import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.cdc.event.ColumnDefinition;
 import org.apache.nifi.cdc.mysql.event.InsertRowsEventInfo;
 import org.apache.nifi.processor.Relationship;
 
 import java.io.IOException;
+import java.io.OutputStream;
 import java.io.Serializable;
+import java.io.UncheckedIOException;
 import java.util.BitSet;
-import java.util.concurrent.atomic.AtomicLong;
 
 
 /**
@@ -42,12 +43,13 @@ public class InsertRowsWriter extends AbstractBinlogTableEventWriter<InsertRowsE
      * @return The next available CDC sequence ID for use by the CDC processor
      */
     @Override
-    public long writeEvent(final ProcessSession session, String transitUri, final InsertRowsEventInfo eventInfo, final long currentSequenceId, Relationship relationship) {
-        final AtomicLong seqId = new AtomicLong(currentSequenceId);
+    public long writeEvent(final ProcessSession session, String transitUri, final InsertRowsEventInfo eventInfo, final long currentSequenceId, Relationship relationship,
+                           final EventWriterConfiguration eventWriterConfiguration) {
+        long seqId = currentSequenceId;
         for (Serializable[] row : eventInfo.getRows()) {
-
-            FlowFile flowFile = session.create();
-            flowFile = session.write(flowFile, outputStream -> {
+            configureEventWriter(eventWriterConfiguration, session, eventInfo);
+            OutputStream outputStream = eventWriterConfiguration.getFlowFileOutputStream();
+            try {
 
                 super.startJson(outputStream, eventInfo);
                 super.writeJson(eventInfo);
@@ -56,14 +58,20 @@ public class InsertRowsWriter extends AbstractBinlogTableEventWriter<InsertRowsE
                 writeRow(eventInfo, row, bitSet);
 
                 super.endJson();
-            });
+            } catch (IOException ioe) {
+                throw new UncheckedIOException("Write JSON start array failed", ioe);
+            }
 
-            flowFile = session.putAllAttributes(flowFile, getCommonAttributes(seqId.get(), eventInfo));
-            session.transfer(flowFile, relationship);
-            session.getProvenanceReporter().receive(flowFile, transitUri);
-            seqId.getAndIncrement();
+            eventWriterConfiguration.incrementNumberOfEventsWritten();
+
+            // Check if it is time to finish the FlowFile
+            if (maxEventsPerFlowFile(eventWriterConfiguration)
+                    && eventWriterConfiguration.getNumberOfEventsWritten() == eventWriterConfiguration.getNumberOfEventsPerFlowFile()) {
+                finishAndTransferFlowFile(session, eventWriterConfiguration, transitUri, seqId, eventInfo, relationship);
+            }
+            seqId++;
         }
-        return seqId.get();
+        return seqId;
     }
 
     protected void writeRow(InsertRowsEventInfo event, Serializable[] row, BitSet includedColumns) throws IOException {
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/UpdateRowsWriter.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/UpdateRowsWriter.java
index a4934fd40c..4a9a2d9785 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/UpdateRowsWriter.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/UpdateRowsWriter.java
@@ -16,18 +16,19 @@
  */
 package org.apache.nifi.cdc.mysql.event.io;
 
+import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
 import org.apache.nifi.cdc.mysql.event.MySQLCDCUtils;
-import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.cdc.event.ColumnDefinition;
 import org.apache.nifi.cdc.mysql.event.UpdateRowsEventInfo;
 import org.apache.nifi.processor.Relationship;
 
 import java.io.IOException;
+import java.io.OutputStream;
 import java.io.Serializable;
+import java.io.UncheckedIOException;
 import java.util.BitSet;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
 
 
 /**
@@ -43,13 +44,14 @@ public class UpdateRowsWriter extends AbstractBinlogTableEventWriter<UpdateRowsE
      * @return The next available CDC sequence ID for use by the CDC processor
      */
     @Override
-    public long writeEvent(final ProcessSession session, String transitUri, final UpdateRowsEventInfo eventInfo, final long currentSequenceId, Relationship relationship) {
-        final AtomicLong seqId = new AtomicLong(currentSequenceId);
+    public long writeEvent(final ProcessSession session, String transitUri, final UpdateRowsEventInfo eventInfo, final long currentSequenceId, Relationship relationship,
+                           final EventWriterConfiguration eventWriterConfiguration) {
+        long seqId = currentSequenceId;
         for (Map.Entry<Serializable[], Serializable[]> row : eventInfo.getRows()) {
+            configureEventWriter(eventWriterConfiguration, session, eventInfo);
+            OutputStream outputStream = eventWriterConfiguration.getFlowFileOutputStream();
 
-            FlowFile flowFile = session.create();
-            flowFile = session.write(flowFile, outputStream -> {
-
+            try {
                 super.startJson(outputStream, eventInfo);
                 super.writeJson(eventInfo);
 
@@ -57,14 +59,20 @@ public class UpdateRowsWriter extends AbstractBinlogTableEventWriter<UpdateRowsE
                 writeRow(eventInfo, row, bitSet);
 
                 super.endJson();
-            });
+            } catch (IOException ioe) {
+                throw new UncheckedIOException("Write JSON start array failed", ioe);
+            }
 
-            flowFile = session.putAllAttributes(flowFile, getCommonAttributes(seqId.get(), eventInfo));
-            session.transfer(flowFile, relationship);
-            session.getProvenanceReporter().receive(flowFile, transitUri);
-            seqId.getAndIncrement();
+            eventWriterConfiguration.incrementNumberOfEventsWritten();
+
+            // Check if it is time to finish the FlowFile
+            if (maxEventsPerFlowFile(eventWriterConfiguration)
+                    && eventWriterConfiguration.getNumberOfEventsWritten() == eventWriterConfiguration.getNumberOfEventsPerFlowFile()) {
+                finishAndTransferFlowFile(session, eventWriterConfiguration, transitUri, seqId, eventInfo, relationship);
+            }
+            seqId++;
         }
-        return seqId.get();
+        return seqId;
     }
 
     protected void writeRow(UpdateRowsEventInfo event, Map.Entry<Serializable[], Serializable[]> row, BitSet includedColumns) throws IOException {
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
index 71bea808e1..d2665fa231 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
@@ -44,7 +44,6 @@ import org.apache.nifi.cdc.event.ColumnDefinition;
 import org.apache.nifi.cdc.event.RowEventException;
 import org.apache.nifi.cdc.event.TableInfo;
 import org.apache.nifi.cdc.event.TableInfoCacheKey;
-import org.apache.nifi.cdc.event.io.EventWriter;
 import org.apache.nifi.cdc.mysql.event.BeginTransactionEventInfo;
 import org.apache.nifi.cdc.mysql.event.BinlogEventInfo;
 import org.apache.nifi.cdc.mysql.event.BinlogEventListener;
@@ -52,9 +51,12 @@ import org.apache.nifi.cdc.mysql.event.BinlogLifecycleListener;
 import org.apache.nifi.cdc.mysql.event.CommitTransactionEventInfo;
 import org.apache.nifi.cdc.mysql.event.DDLEventInfo;
 import org.apache.nifi.cdc.mysql.event.DeleteRowsEventInfo;
+import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
+import org.apache.nifi.cdc.event.io.FlowFileEventWriteStrategy;
 import org.apache.nifi.cdc.mysql.event.InsertRowsEventInfo;
 import org.apache.nifi.cdc.mysql.event.RawBinlogEvent;
 import org.apache.nifi.cdc.mysql.event.UpdateRowsEventInfo;
+import org.apache.nifi.cdc.mysql.event.io.AbstractBinlogEventWriter;
 import org.apache.nifi.cdc.mysql.event.io.BeginTransactionEventWriter;
 import org.apache.nifi.cdc.mysql.event.io.CommitTransactionEventWriter;
 import org.apache.nifi.cdc.mysql.event.io.DDLEventWriter;
@@ -78,6 +80,7 @@ import org.apache.nifi.distributed.cache.client.Deserializer;
 import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
 import org.apache.nifi.distributed.cache.client.Serializer;
 import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
 import org.apache.nifi.processor.ProcessContext;
@@ -132,6 +135,9 @@ import static com.github.shyiko.mysql.binlog.event.EventType.PRE_GA_WRITE_ROWS;
 import static com.github.shyiko.mysql.binlog.event.EventType.ROTATE;
 import static com.github.shyiko.mysql.binlog.event.EventType.WRITE_ROWS;
 import static com.github.shyiko.mysql.binlog.event.EventType.XID;
+import static org.apache.nifi.cdc.event.io.EventWriter.CDC_EVENT_TYPE_ATTRIBUTE;
+import static org.apache.nifi.cdc.event.io.EventWriter.SEQUENCE_ID_KEY;
+import static org.apache.nifi.cdc.event.io.FlowFileEventWriteStrategy.MAX_EVENTS_PER_FLOWFILE;
 
 
 /**
@@ -140,15 +146,17 @@ import static com.github.shyiko.mysql.binlog.event.EventType.XID;
 @TriggerSerially
 @PrimaryNodeOnly
 @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
-@Tags({"sql", "jdbc", "cdc", "mysql"})
+@Tags({"sql", "jdbc", "cdc", "mysql", "transaction", "event"})
 @CapabilityDescription("Retrieves Change Data Capture (CDC) events from a MySQL database. CDC Events include INSERT, UPDATE, DELETE operations. Events "
-        + "are output as individual flow files ordered by the time at which the operation occurred.")
+        + "are output as either a group of a specified number of events (the default is 1 so each event becomes its own flow file) or grouped as a full transaction (BEGIN to COMMIT). All events "
+        + "are ordered by the time at which the operation occurred. NOTE: If the processor is stopped before the specified number of events have been written to a flow file, "
+        + "the partial flow file will be output in order to maintain the consistency of the event stream.")
 @Stateful(scopes = Scope.CLUSTER, description = "Information such as a 'pointer' to the current CDC event in the database is stored by this processor, such "
         + "that it can continue from the same location if restarted.")
 @WritesAttributes({
-        @WritesAttribute(attribute = EventWriter.SEQUENCE_ID_KEY, description = "A sequence identifier (i.e. strictly increasing integer value) specifying the order "
+        @WritesAttribute(attribute = SEQUENCE_ID_KEY, description = "A sequence identifier (i.e. strictly increasing integer value) specifying the order "
                 + "of the CDC event flow file relative to the other event flow file(s)."),
-        @WritesAttribute(attribute = EventWriter.CDC_EVENT_TYPE_ATTRIBUTE, description = "A string indicating the type of CDC event that occurred, including (but not limited to) "
+        @WritesAttribute(attribute = CDC_EVENT_TYPE_ATTRIBUTE, description = "A string indicating the type of CDC event that occurred, including (but not limited to) "
                 + "'begin', 'insert', 'update', 'delete', 'ddl' and 'commit'."),
         @WritesAttribute(attribute = "mime.type", description = "The processor outputs flow file content in JSON format, and sets the mime.type attribute to "
                 + "application/json")
@@ -185,6 +193,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
             SSLMode.VERIFY_IDENTITY.toString(),
             "Connect with TLS or fail when server support not enabled. Verify server hostname matches presented X.509 certificate names or fail when not matched");
 
+
     // Properties
     public static final PropertyDescriptor DATABASE_NAME_PATTERN = new PropertyDescriptor.Builder()
             .name("capture-change-mysql-db-name-pattern")
@@ -269,6 +278,31 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
+    public static final PropertyDescriptor EVENTS_PER_FLOWFILE_STRATEGY = new PropertyDescriptor.Builder()
+            .name("events-per-flowfile-strategy")
+            .displayName("Event Processing Strategy")
+            .description("Specifies the strategy to use when writing events to FlowFile(s), such as '" + MAX_EVENTS_PER_FLOWFILE.getDisplayName() + "'")
+            .required(true)
+            .sensitive(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(FlowFileEventWriteStrategy.class)
+            .defaultValue(MAX_EVENTS_PER_FLOWFILE.getValue())
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    public static final PropertyDescriptor NUMBER_OF_EVENTS_PER_FLOWFILE = new PropertyDescriptor.Builder()
+            .name("number-of-events-per-flowfile")
+            .displayName("Events Per FlowFile")
+            .description("Specifies how many events should be written to a single FlowFile. If the processor is stopped before the specified number of events has been written,"
+                    + "the events will still be written as a FlowFile before stopping.")
+            .required(true)
+            .sensitive(false)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("1")
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .dependsOn(EVENTS_PER_FLOWFILE_STRATEGY, MAX_EVENTS_PER_FLOWFILE.getValue())
+            .build();
+
     public static final PropertyDescriptor SERVER_ID = new PropertyDescriptor.Builder()
             .name("capture-change-mysql-server-id")
             .displayName("Server ID")
@@ -479,6 +513,10 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
     private final DeleteRowsWriter deleteRowsWriter = new DeleteRowsWriter();
     private final UpdateRowsWriter updateRowsWriter = new UpdateRowsWriter();
 
+    private volatile EventWriterConfiguration eventWriterConfiguration;
+    private volatile BinlogEventInfo currentEventInfo;
+    private AbstractBinlogEventWriter<? extends BinlogEventInfo> currentEventWriter;
+
     static {
 
         final Set<Relationship> r = new HashSet<>();
@@ -491,6 +529,8 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
         pds.add(DRIVER_LOCATION);
         pds.add(USERNAME);
         pds.add(PASSWORD);
+        pds.add(EVENTS_PER_FLOWFILE_STRATEGY);
+        pds.add(NUMBER_OF_EVENTS_PER_FLOWFILE);
         pds.add(SERVER_ID);
         pds.add(DATABASE_NAME_PATTERN);
         pds.add(TABLE_NAME_PATTERN);
@@ -569,6 +609,13 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
             return;
         }
 
+        // Build a event writer config object for the event writers to use
+        final FlowFileEventWriteStrategy flowFileEventWriteStrategy = FlowFileEventWriteStrategy.valueOf(context.getProperty(EVENTS_PER_FLOWFILE_STRATEGY).getValue());
+        eventWriterConfiguration = new EventWriterConfiguration(
+                flowFileEventWriteStrategy,
+                context.getProperty(NUMBER_OF_EVENTS_PER_FLOWFILE).evaluateAttributeExpressions().asInteger()
+        );
+
         PropertyValue dbNameValue = context.getProperty(DATABASE_NAME_PATTERN);
         databaseNamePattern = dbNameValue.isSet() ? Pattern.compile(dbNameValue.getValue()) : null;
 
@@ -624,7 +671,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
         }
 
         // Get current sequence ID from state
-        String seqIdString = stateMap.get(EventWriter.SEQUENCE_ID_KEY);
+        String seqIdString = stateMap.get(SEQUENCE_ID_KEY);
         if (StringUtils.isEmpty(seqIdString)) {
             // Use Initial Sequence ID property if none is found in state
             PropertyValue seqIdProp = context.getProperty(INIT_SEQUENCE_ID);
@@ -888,7 +935,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
             if (eventType != ROTATE && eventType != FORMAT_DESCRIPTION && !useGtid) {
                 currentBinlogPosition = header.getPosition();
             }
-            log.debug("Got message event type: {} ", new Object[]{header.getEventType().toString()});
+            log.debug("Got message event type: {} ", header.getEventType().toString());
             switch (eventType) {
                 case TABLE_MAP:
                     // This is sent to inform which table is about to be changed by subsequent events
@@ -952,7 +999,10 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
                             BeginTransactionEventInfo beginEvent = useGtid
                                     ? new BeginTransactionEventInfo(currentDatabase, timestamp, currentGtidSet)
                                     : new BeginTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition);
-                            currentSequenceId.set(beginEventWriter.writeEvent(currentSession, transitUri, beginEvent, currentSequenceId.get(), REL_SUCCESS));
+                            currentEventInfo = beginEvent;
+                            currentEventWriter = beginEventWriter;
+                            currentSequenceId.set(beginEventWriter.writeEvent(currentSession, transitUri, beginEvent, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration));
+
                         }
                         inTransaction = true;
                         //update inTransaction value to state
@@ -963,17 +1013,35 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
                                     + "This could indicate that your binlog position is invalid.");
                         }
                         // InnoDB generates XID events for "commit", but MyISAM generates Query events with "COMMIT", so handle that here
-                        if (includeBeginCommit && (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches())) {
-                            CommitTransactionEventInfo commitTransactionEvent = useGtid
-                                    ? new CommitTransactionEventInfo(currentDatabase, timestamp, currentGtidSet)
-                                    : new CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition);
-                            currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS));
+                        if (includeBeginCommit) {
+                            if (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches()) {
+                                CommitTransactionEventInfo commitTransactionEvent = useGtid
+                                        ? new CommitTransactionEventInfo(currentDatabase, timestamp, currentGtidSet)
+                                        : new CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition);
+                                currentEventInfo = commitTransactionEvent;
+                                currentEventWriter = commitEventWriter;
+                                currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration));
+                            }
+                        } else {
+                            // If the COMMIT event is not to be written, the FlowFile should still be finished and the session committed.
+                            if (currentSession != null) {
+                                FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile();
+                                if (flowFile != null && currentEventWriter != null) {
+                                    // Flush the events to the FlowFile when the processor is stopped
+                                    currentEventWriter.finishAndTransferFlowFile(currentSession, eventWriterConfiguration, transitUri, currentSequenceId.get(), currentEventInfo, REL_SUCCESS);
+                                }
+                                currentSession.commitAsync();
+                            }
                         }
+
                         //update inTransaction value to state
                         inTransaction = false;
                         updateState(session);
-                        // Commit the NiFi session
-                        session.commitAsync();
+                        // If there is no FlowFile open, commit the session
+                        if (eventWriterConfiguration.getCurrentFlowFile() == null) {
+                            // Commit the NiFi session
+                            session.commitAsync();
+                        }
                         currentTable = null;
                     } else {
                         // Check for DDL events (alter table, e.g.). Normalize the query to do string matching on the type of change
@@ -993,7 +1061,9 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
                                 DDLEventInfo ddlEvent = useGtid
                                         ? new DDLEventInfo(ddlTableInfo, timestamp, currentGtidSet, sql)
                                         : new DDLEventInfo(ddlTableInfo, timestamp, currentBinlogFile, currentBinlogPosition, sql);
-                                currentSequenceId.set(ddlEventWriter.writeEvent(currentSession, transitUri, ddlEvent, currentSequenceId.get(), REL_SUCCESS));
+                                currentEventInfo = ddlEvent;
+                                currentEventWriter = ddlEventWriter;
+                                currentSequenceId.set(ddlEventWriter.writeEvent(currentSession, transitUri, ddlEvent, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration));
                             }
                             // Remove all the keys from the cache that this processor added
                             if (cacheClient != null) {
@@ -1002,7 +1072,19 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
                             // If not in a transaction, commit the session so the DDL event(s) will be transferred
                             if (includeDDLEvents && !inTransaction) {
                                 updateState(session);
-                                session.commitAsync();
+                                if (FlowFileEventWriteStrategy.ONE_TRANSACTION_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy())) {
+                                    if (currentSession != null) {
+                                        FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile();
+                                        if (flowFile != null && currentEventWriter != null) {
+                                            // Flush the events to the FlowFile when the processor is stopped
+                                            currentEventWriter.finishAndTransferFlowFile(currentSession, eventWriterConfiguration, transitUri, currentSequenceId.get(), currentEventInfo, REL_SUCCESS);
+                                        }
+                                    }
+                                }
+                                // If there is no FlowFile open, commit the session
+                                if (eventWriterConfiguration.getCurrentFlowFile() == null) {
+                                    session.commitAsync();
+                                }
                             }
                         }
                     }
@@ -1013,19 +1095,35 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
                         throw new IOException("COMMIT event received while not processing a transaction (i.e. no corresponding BEGIN event). "
                                 + "This could indicate that your binlog position is invalid.");
                     }
-                    if (includeBeginCommit && (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches())) {
-                        CommitTransactionEventInfo commitTransactionEvent = useGtid
-                                ? new CommitTransactionEventInfo(currentDatabase, timestamp, currentGtidSet)
-                                : new CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition);
-                        currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS));
+                    if (includeBeginCommit) {
+                        if (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches()) {
+                            CommitTransactionEventInfo commitTransactionEvent = useGtid
+                                    ? new CommitTransactionEventInfo(currentDatabase, timestamp, currentGtidSet)
+                                    : new CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition);
+                            currentEventInfo = commitTransactionEvent;
+                            currentEventWriter = commitEventWriter;
+                            currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration));
+                        }
+                    } else {
+                        // If the COMMIT event is not to be written, the FlowFile should still be finished and the session committed.
+                        if (currentSession != null) {
+                            FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile();
+                            if (flowFile != null && currentEventWriter != null) {
+                                // Flush the events to the FlowFile when the processor is stopped
+                                currentEventWriter.finishAndTransferFlowFile(currentSession, eventWriterConfiguration, transitUri, currentSequenceId.get(), currentEventInfo, REL_SUCCESS);
+                            }
+                            currentSession.commitAsync();
+                        }
                     }
-                    // Commit the NiFi session
                     // update inTransaction value and save next position
                     // so when restart this processor,we will not read xid event again
                     inTransaction = false;
                     currentBinlogPosition = header.getNextPosition();
                     updateState(session);
-                    session.commitAsync();
+                    // If there is no FlowFile open, commit the session
+                    if (eventWriterConfiguration.getCurrentFlowFile() == null) {
+                        session.commitAsync();
+                    }
                     currentTable = null;
                     currentDatabase = null;
                     break;
@@ -1060,7 +1158,9 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
                         InsertRowsEventInfo eventInfo = useGtid
                                 ? new InsertRowsEventInfo(currentTable, timestamp, currentGtidSet, event.getData())
                                 : new InsertRowsEventInfo(currentTable, timestamp, currentBinlogFile, currentBinlogPosition, event.getData());
-                        currentSequenceId.set(insertRowsWriter.writeEvent(currentSession, transitUri, eventInfo, currentSequenceId.get(), REL_SUCCESS));
+                        currentEventInfo = eventInfo;
+                        currentEventWriter = insertRowsWriter;
+                        currentSequenceId.set(insertRowsWriter.writeEvent(currentSession, transitUri, eventInfo, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration));
 
                     } else if (eventType == DELETE_ROWS
                             || eventType == EXT_DELETE_ROWS
@@ -1069,14 +1169,18 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
                         DeleteRowsEventInfo eventInfo = useGtid
                                 ? new DeleteRowsEventInfo(currentTable, timestamp, currentGtidSet, event.getData())
                                 : new DeleteRowsEventInfo(currentTable, timestamp, currentBinlogFile, currentBinlogPosition, event.getData());
-                        currentSequenceId.set(deleteRowsWriter.writeEvent(currentSession, transitUri, eventInfo, currentSequenceId.get(), REL_SUCCESS));
+                        currentEventInfo = eventInfo;
+                        currentEventWriter = deleteRowsWriter;
+                        currentSequenceId.set(deleteRowsWriter.writeEvent(currentSession, transitUri, eventInfo, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration));
 
                     } else {
                         // Update event
                         UpdateRowsEventInfo eventInfo = useGtid
                                 ? new UpdateRowsEventInfo(currentTable, timestamp, currentGtidSet, event.getData())
                                 : new UpdateRowsEventInfo(currentTable, timestamp, currentBinlogFile, currentBinlogPosition, event.getData());
-                        currentSequenceId.set(updateRowsWriter.writeEvent(currentSession, transitUri, eventInfo, currentSequenceId.get(), REL_SUCCESS));
+                        currentEventInfo = eventInfo;
+                        currentEventWriter = updateRowsWriter;
+                        currentSequenceId.set(updateRowsWriter.writeEvent(currentSession, transitUri, eventInfo, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration));
                     }
                     break;
 
@@ -1144,6 +1248,11 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
             }
 
             if (currentSession != null) {
+                FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile();
+                if (flowFile != null && currentEventWriter != null) {
+                    // Flush the events to the FlowFile when the processor is stopped
+                    currentEventWriter.finishAndTransferFlowFile(currentSession, eventWriterConfiguration, transitUri, currentSequenceId.get(), currentEventInfo, REL_SUCCESS);
+                }
                 currentSession.commitAsync();
             }
 
@@ -1174,7 +1283,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
         }
 
         newStateMap.put(BinlogEventInfo.BINLOG_POSITION_KEY, Long.toString(binlogPosition));
-        newStateMap.put(EventWriter.SEQUENCE_ID_KEY, String.valueOf(sequenceId));
+        newStateMap.put(SEQUENCE_ID_KEY, String.valueOf(sequenceId));
         //add inTransaction value into state
         newStateMap.put("inTransaction", inTransaction ? "true" : "false");
 
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
index 66828c3570..f67567bfa5 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
@@ -35,6 +35,7 @@ import org.apache.nifi.cdc.event.ColumnDefinition
 import org.apache.nifi.cdc.event.TableInfo
 import org.apache.nifi.cdc.event.TableInfoCacheKey
 import org.apache.nifi.cdc.event.io.EventWriter
+import org.apache.nifi.cdc.event.io.FlowFileEventWriteStrategy
 import org.apache.nifi.cdc.mysql.MockBinlogClient
 import org.apache.nifi.cdc.mysql.event.BinlogEventInfo
 import org.apache.nifi.cdc.mysql.processors.ssl.BinaryLogSSLSocketFactory
@@ -734,6 +735,238 @@ class CaptureChangeMySQLTest {
         assertEquals(5, resultFiles.size())
     }
 
+    @Test
+    void testSkipTableMultipleEventsPerFlowFile() throws Exception {
+        testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, DRIVER_LOCATION)
+        testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
+        testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
+        testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
+        testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
+        testRunner.setProperty(CaptureChangeMySQL.DATABASE_NAME_PATTERN, "myDB")
+        testRunner.setProperty(CaptureChangeMySQL.TABLE_NAME_PATTERN, "user")
+        testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, 'true')
+        testRunner.setProperty(CaptureChangeMySQL.NUMBER_OF_EVENTS_PER_FLOWFILE, '2')
+
+        testRunner.run(1, false, true)
+
+        // ROTATE
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.ROTATE, nextPosition: 2] as EventHeaderV4,
+                [binlogFilename: 'master.000001', binlogPosition: 4L] as RotateEventData
+        ))
+
+        // BEGIN
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4,
+                [database: 'myDB', sql: 'BEGIN'] as QueryEventData
+        ))
+
+        // TABLE MAP for table not matching the regex (note the s on the end of users vs the regex of 'user')
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.TABLE_MAP, nextPosition: 6] as EventHeaderV4,
+                [tableId: 1, database: 'myDB', table: 'users', columnTypes: [4, -4] as byte[]] as TableMapEventData
+        ))
+
+        // This WRITE ROWS should be skipped
+        def cols = new BitSet()
+        cols.set(1)
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.EXT_WRITE_ROWS, nextPosition: 8] as EventHeaderV4,
+                [tableId: 1, includedColumns: cols,
+                 rows   : [[2, 'Smith'] as Serializable[]] as List<Serializable[]>] as WriteRowsEventData
+        ))
+
+        // TABLE MAP for table matching, all modification events (1) should be emitted
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.TABLE_MAP, nextPosition: 10] as EventHeaderV4,
+                [tableId: 1, database: 'myDB', table: 'user', columnTypes: [4, -4] as byte[]] as TableMapEventData
+        ))
+
+        // WRITE ROWS for matching table
+        cols = new BitSet()
+        cols.set(1)
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.EXT_WRITE_ROWS, nextPosition: 12] as EventHeaderV4,
+                [tableId: 1, includedColumns: cols,
+                 rows   : [[10, 'Cruz'] as Serializable[]] as List<Serializable[]>] as WriteRowsEventData
+        ))
+
+        // COMMIT
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.XID, nextPosition: 14] as EventHeaderV4,
+                {} as EventData
+        ))
+
+        ////////////////////////
+        // Test database filter
+        ////////////////////////
+
+        // BEGIN
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4,
+                [database: 'myDB', sql: 'BEGIN'] as QueryEventData
+        ))
+
+        // TABLE MAP for database not matching the regex
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.TABLE_MAP, nextPosition: 6] as EventHeaderV4,
+                [tableId: 1, database: 'myDB', table: 'notMyDB', columnTypes: [4, -4] as byte[]] as TableMapEventData
+        ))
+
+        // This WRITE ROWS should be skipped
+        cols = new BitSet()
+        cols.set(1)
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.EXT_WRITE_ROWS, nextPosition: 8] as EventHeaderV4,
+                [tableId: 1, includedColumns: cols,
+                 rows   : [[2, 'Smith'] as Serializable[]] as List<Serializable[]>] as WriteRowsEventData
+        ))
+
+        // COMMIT
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.XID, nextPosition: 14] as EventHeaderV4,
+                {} as EventData
+        ))
+
+        testRunner.run(1, true, false)
+
+        def resultFiles = testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS)
+        // Five events total, 2 max per flow file, so 3 flow files
+        assertEquals(3, resultFiles.size())
+        def json = new JsonSlurper().parseText(new String(resultFiles[0].toByteArray()))
+        assertTrue (json instanceof ArrayList)
+        assertEquals(2, json.size())
+        // BEGIN, INSERT, COMMIT (verifies that one of the INSERTs was skipped)
+        assertEquals('begin', json[0]?.type)
+        assertEquals('insert', json[1]?.type)
+
+        json = new JsonSlurper().parseText(new String(resultFiles[1].toByteArray()))
+        assertTrue (json instanceof ArrayList)
+        assertEquals(2, json.size())
+        assertEquals('commit', json[0]?.type)
+        assertEquals('begin', json[1]?.type)
+
+        json = new JsonSlurper().parseText(new String(resultFiles[2].toByteArray()))
+        assertTrue (json instanceof ArrayList)
+        // One event left
+        assertEquals(1, json.size())
+        assertEquals('commit', json[0]?.type)
+    }
+
+    @Test
+    void testSkipTableOneTransactionPerFlowFile() throws Exception {
+        testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, DRIVER_LOCATION)
+        testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
+        testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
+        testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
+        testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
+        testRunner.setProperty(CaptureChangeMySQL.DATABASE_NAME_PATTERN, "myDB")
+        testRunner.setProperty(CaptureChangeMySQL.TABLE_NAME_PATTERN, "user")
+        testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, 'true')
+        testRunner.setProperty(CaptureChangeMySQL.EVENTS_PER_FLOWFILE_STRATEGY, FlowFileEventWriteStrategy.ONE_TRANSACTION_PER_FLOWFILE.name())
+
+        testRunner.run(1, false, true)
+
+        // ROTATE
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.ROTATE, nextPosition: 2] as EventHeaderV4,
+                [binlogFilename: 'master.000001', binlogPosition: 4L] as RotateEventData
+        ))
+
+        // BEGIN
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4,
+                [database: 'myDB', sql: 'BEGIN'] as QueryEventData
+        ))
+
+        // TABLE MAP for table not matching the regex (note the s on the end of users vs the regex of 'user')
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.TABLE_MAP, nextPosition: 6] as EventHeaderV4,
+                [tableId: 1, database: 'myDB', table: 'users', columnTypes: [4, -4] as byte[]] as TableMapEventData
+        ))
+
+        // This WRITE ROWS should be skipped
+        def cols = new BitSet()
+        cols.set(1)
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.EXT_WRITE_ROWS, nextPosition: 8] as EventHeaderV4,
+                [tableId: 1, includedColumns: cols,
+                 rows   : [[2, 'Smith'] as Serializable[]] as List<Serializable[]>] as WriteRowsEventData
+        ))
+
+        // TABLE MAP for table matching, all modification events (1) should be emitted
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.TABLE_MAP, nextPosition: 10] as EventHeaderV4,
+                [tableId: 1, database: 'myDB', table: 'user', columnTypes: [4, -4] as byte[]] as TableMapEventData
+        ))
+
+        // WRITE ROWS for matching table
+        cols = new BitSet()
+        cols.set(1)
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.EXT_WRITE_ROWS, nextPosition: 12] as EventHeaderV4,
+                [tableId: 1, includedColumns: cols,
+                 rows   : [[10, 'Cruz'] as Serializable[]] as List<Serializable[]>] as WriteRowsEventData
+        ))
+
+        // COMMIT
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.XID, nextPosition: 14] as EventHeaderV4,
+                {} as EventData
+        ))
+
+        ////////////////////////
+        // Test database filter
+        ////////////////////////
+
+        // BEGIN
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4,
+                [database: 'myDB', sql: 'BEGIN'] as QueryEventData
+        ))
+
+        // TABLE MAP for database not matching the regex
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.TABLE_MAP, nextPosition: 6] as EventHeaderV4,
+                [tableId: 1, database: 'myDB', table: 'notMyDB', columnTypes: [4, -4] as byte[]] as TableMapEventData
+        ))
+
+        // This WRITE ROWS should be skipped
+        cols = new BitSet()
+        cols.set(1)
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.EXT_WRITE_ROWS, nextPosition: 8] as EventHeaderV4,
+                [tableId: 1, includedColumns: cols,
+                 rows   : [[2, 'Smith'] as Serializable[]] as List<Serializable[]>] as WriteRowsEventData
+        ))
+
+        // COMMIT
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.XID, nextPosition: 14] as EventHeaderV4,
+                {} as EventData
+        ))
+
+        testRunner.run(1, true, false)
+
+        def resultFiles = testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS)
+        // Five events total, 3 max per flow file, so 2 flow files
+        assertEquals(2, resultFiles.size())
+        def json = new JsonSlurper().parseText(new String(resultFiles[0].toByteArray()))
+        assertTrue (json instanceof ArrayList)
+        assertEquals(3, json.size())
+        // BEGIN, INSERT, COMMIT (verifies that one of the INSERTs was skipped)
+        assertEquals('begin', json[0]?.type)
+        assertEquals('insert', json[1]?.type)
+        assertEquals('commit', json[2]?.type)
+
+        json = new JsonSlurper().parseText(new String(resultFiles[1].toByteArray()))
+        assertTrue (json instanceof ArrayList)
+        // Only two events left
+        assertEquals(2, json.size())
+        assertEquals('begin', json[0]?.type)
+        assertEquals('commit', json[1]?.type)
+    }
+
     @Test
     void testFilterDatabase() throws Exception {
         testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, DRIVER_LOCATION)