You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tp...@apache.org on 2023/02/22 18:28:15 UTC
[nifi] branch main updated: NIFI-11094: Allow CaptureChangeMySQL to send multiple events per FlowFile
This is an automated email from the ASF dual-hosted git repository.
tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new a2b98e0c9c NIFI-11094: Allow CaptureChangeMySQL to send multiple events per FlowFile
a2b98e0c9c is described below
commit a2b98e0c9c17ba7603dcc666bfae543abeb1ea7a
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Mon Jan 30 16:59:53 2023 -0500
NIFI-11094: Allow CaptureChangeMySQL to send multiple events per FlowFile
This closes #6907.
Co-authored-by: Tamas Palfy <tp...@apache.org>
Signed-off-by: Tamas Palfy <tp...@apache.org>
---
.../nifi/cdc/event/io/AbstractEventWriter.java | 10 +-
.../org/apache/nifi/cdc/event/io/EventWriter.java | 16 +-
.../cdc/event/io/EventWriterConfiguration.java | 82 ++++++++
.../cdc/event/io/FlowFileEventWriteStrategy.java | 54 +++++
.../mysql/event/io/AbstractBinlogEventWriter.java | 128 +++++++++--
.../event/io/AbstractBinlogTableEventWriter.java | 19 --
.../event/io/CommitTransactionEventWriter.java | 15 ++
.../nifi/cdc/mysql/event/io/DDLEventWriter.java | 31 ++-
.../nifi/cdc/mysql/event/io/DeleteRowsWriter.java | 35 ++--
.../nifi/cdc/mysql/event/io/InsertRowsWriter.java | 34 +--
.../nifi/cdc/mysql/event/io/UpdateRowsWriter.java | 34 +--
.../cdc/mysql/processors/CaptureChangeMySQL.java | 165 ++++++++++++---
.../mysql/processors/CaptureChangeMySQLTest.groovy | 233 +++++++++++++++++++++
13 files changed, 731 insertions(+), 125 deletions(-)
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/AbstractEventWriter.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/AbstractEventWriter.java
index c805d3cf7f..e18a3db5d1 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/AbstractEventWriter.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/AbstractEventWriter.java
@@ -33,7 +33,9 @@ public abstract class AbstractEventWriter<T extends EventInfo> implements EventW
// Common method to create a JSON generator and start the root object. Should be called by sub-classes unless they need their own generator and such.
protected void startJson(OutputStream outputStream, T event) throws IOException {
- jsonGenerator = createJsonGenerator(outputStream);
+ if (jsonGenerator == null) {
+ jsonGenerator = createJsonGenerator(outputStream);
+ }
jsonGenerator.writeStartObject();
String eventType = event.getEventType();
if (eventType == null) {
@@ -54,11 +56,15 @@ public abstract class AbstractEventWriter<T extends EventInfo> implements EventW
throw new IOException("endJson called without a JsonGenerator");
}
jsonGenerator.writeEndObject();
+ }
+
+ protected void endFile() throws IOException {
jsonGenerator.flush();
jsonGenerator.close();
+ jsonGenerator = null;
}
- private JsonGenerator createJsonGenerator(OutputStream out) throws IOException {
+ protected JsonGenerator createJsonGenerator(OutputStream out) throws IOException {
return JSON_FACTORY.createGenerator(out);
}
}
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/EventWriter.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/EventWriter.java
index 096e3c1e69..e74a56ee99 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/EventWriter.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/EventWriter.java
@@ -32,12 +32,14 @@ public interface EventWriter<T extends EventInfo> {
/**
* Writes the given event to the process session, possibly via transferring it to the specified relationship (usually used for success)
*
- * @param session The session to write the event to
- * @param transitUri The URI indicating the source MySQL system from which the specified event is associated
- * @param eventInfo The event data
- * @param currentSequenceId the current sequence ID
- * @param relationship A relationship to transfer any flowfile(s) to
- * @return a sequence ID, usually incremented from the specified current sequence id by the number of flow files transferred and/or committed
+ * @param session The session to write the event to
+ * @param transitUri The URI indicating the source MySQL system from which the specified event is associated
+ * @param eventInfo The event data
+ * @param currentSequenceId The current sequence ID
+ * @param relationship A relationship to transfer any flowfile(s) to
+ * @param eventWriterConfiguration A configuration object used for FlowFile management (how many events to write to each FlowFile, e.g.)
+ * @return a sequence ID, usually incremented from the specified current sequence ID by the number of FlowFiles transferred and/or committed
*/
- long writeEvent(final ProcessSession session, String transitUri, final T eventInfo, final long currentSequenceId, Relationship relationship);
+ long writeEvent(final ProcessSession session, String transitUri, final T eventInfo, final long currentSequenceId, Relationship relationship,
+ final EventWriterConfiguration eventWriterConfiguration);
}
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/EventWriterConfiguration.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/EventWriterConfiguration.java
new file mode 100644
index 0000000000..153ce8319c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/EventWriterConfiguration.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.event.io;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.nifi.flowfile.FlowFile;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class EventWriterConfiguration {
+
+ private final FlowFileEventWriteStrategy flowFileEventWriteStrategy;
+ private final int numberOfEventsPerFlowFile;
+
+ private int numberOfEventsWritten;
+
+ private FlowFile currentFlowFile;
+ private OutputStream flowFileOutputStream;
+ private JsonGenerator jsonGenerator;
+
+ public EventWriterConfiguration(FlowFileEventWriteStrategy flowFileEventWriteStrategy, int numberOfEventsPerFlowFile) {
+ this.flowFileEventWriteStrategy = flowFileEventWriteStrategy;
+ this.numberOfEventsPerFlowFile = numberOfEventsPerFlowFile;
+ }
+
+ public FlowFileEventWriteStrategy getFlowFileEventWriteStrategy() {
+ return flowFileEventWriteStrategy;
+ }
+
+ public int getNumberOfEventsWritten() {
+ return numberOfEventsWritten;
+ }
+
+ public void incrementNumberOfEventsWritten() {
+ this.numberOfEventsWritten++;
+ }
+
+ public void startNewFlowFile(FlowFile flowFile, OutputStream flowFileOutputStream, JsonGenerator jsonGenerator) {
+ this.currentFlowFile = flowFile;
+ this.flowFileOutputStream = flowFileOutputStream;
+ this.jsonGenerator = jsonGenerator;
+ }
+
+ public void cleanUp() throws IOException {
+ this.currentFlowFile = null;
+ this.flowFileOutputStream.close();
+ this.flowFileOutputStream = null;
+ this.jsonGenerator = null;
+ this.numberOfEventsWritten = 0;
+ }
+
+ public int getNumberOfEventsPerFlowFile() {
+ return numberOfEventsPerFlowFile;
+ }
+
+ public FlowFile getCurrentFlowFile() {
+ return currentFlowFile;
+ }
+
+ public OutputStream getFlowFileOutputStream() {
+ return flowFileOutputStream;
+ }
+
+ public JsonGenerator getJsonGenerator() {
+ return jsonGenerator;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/FlowFileEventWriteStrategy.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/FlowFileEventWriteStrategy.java
new file mode 100644
index 0000000000..bd39174db2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/FlowFileEventWriteStrategy.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.event.io;
+
+import org.apache.nifi.components.DescribedValue;
+
+public enum FlowFileEventWriteStrategy implements DescribedValue {
+ MAX_EVENTS_PER_FLOWFILE(
+ "Max Events Per FlowFile",
+ "This strategy causes at most the number of events specified in the 'Number of Events Per FlowFile' property to be written per FlowFile. If the processor is stopped before the "
+ + "specified number of events has been written (or the event queue becomes empty), the fewer number of events will still be written as a FlowFile before stopping."
+ ),
+ ONE_TRANSACTION_PER_FLOWFILE(
+ "One Transaction Per FlowFile",
+ "This strategy causes each event from a transaction (from BEGIN to COMMIT) to be written to a FlowFile"
+ );
+
+ private String displayName;
+ private String description;
+
+ FlowFileEventWriteStrategy(String displayName, String description) {
+ this.displayName = displayName;
+ this.description = description;
+ }
+
+ @Override
+ public String getValue() {
+ return name();
+ }
+
+ @Override
+ public String getDisplayName() {
+ return displayName;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogEventWriter.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogEventWriter.java
index 09186f23c8..420da6c2d4 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogEventWriter.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogEventWriter.java
@@ -16,14 +16,21 @@
*/
package org.apache.nifi.cdc.mysql.event.io;
+import org.apache.nifi.cdc.event.EventInfo;
+import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
+import org.apache.nifi.cdc.event.io.FlowFileEventWriteStrategy;
import org.apache.nifi.cdc.mysql.event.BinlogEventInfo;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.cdc.event.io.AbstractEventWriter;
import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.FlowFileAccessException;
+import org.apache.nifi.processor.exception.ProcessException;
import java.io.IOException;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
import java.util.HashMap;
import java.util.Map;
@@ -44,35 +51,114 @@ public abstract class AbstractBinlogEventWriter<T extends BinlogEventInfo> exten
}
protected Map<String, String> getCommonAttributes(final long sequenceId, BinlogEventInfo eventInfo) {
- return new HashMap<String, String>() {
- {
- put(SEQUENCE_ID_KEY, Long.toString(sequenceId));
- put(CDC_EVENT_TYPE_ATTRIBUTE, eventInfo.getEventType());
- String gtidSet = eventInfo.getBinlogGtidSet();
- if (gtidSet == null) {
- put(BinlogEventInfo.BINLOG_FILENAME_KEY, eventInfo.getBinlogFilename());
- put(BinlogEventInfo.BINLOG_POSITION_KEY, Long.toString(eventInfo.getBinlogPosition()));
- } else {
- put(BinlogEventInfo.BINLOG_GTIDSET_KEY, gtidSet);
- }
- put(CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON);
- }
- };
+ final Map<String, String> commonAttributeMap = new HashMap<>();
+
+ commonAttributeMap.put(SEQUENCE_ID_KEY, Long.toString(sequenceId));
+ commonAttributeMap.put(CDC_EVENT_TYPE_ATTRIBUTE, eventInfo.getEventType());
+ String gtidSet = eventInfo.getBinlogGtidSet();
+ if (gtidSet == null) {
+ commonAttributeMap.put(BinlogEventInfo.BINLOG_FILENAME_KEY, eventInfo.getBinlogFilename());
+ commonAttributeMap.put(BinlogEventInfo.BINLOG_POSITION_KEY, Long.toString(eventInfo.getBinlogPosition()));
+ } else {
+ commonAttributeMap.put(BinlogEventInfo.BINLOG_GTIDSET_KEY, gtidSet);
+ }
+ commonAttributeMap.put(CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON);
+
+ return commonAttributeMap;
}
// Default implementation for binlog events
@Override
- public long writeEvent(ProcessSession session, String transitUri, T eventInfo, long currentSequenceId, Relationship relationship) {
- FlowFile flowFile = session.create();
- flowFile = session.write(flowFile, (outputStream) -> {
+ public long writeEvent(ProcessSession session, String transitUri, T eventInfo, long currentSequenceId, Relationship relationship,
+ final EventWriterConfiguration eventWriterConfiguration) {
+ configureEventWriter(eventWriterConfiguration, session, eventInfo);
+
+ OutputStream outputStream = eventWriterConfiguration.getFlowFileOutputStream();
+ try {
super.startJson(outputStream, eventInfo);
writeJson(eventInfo);
// Nothing in the body
super.endJson();
- });
- flowFile = session.putAllAttributes(flowFile, getCommonAttributes(currentSequenceId, eventInfo));
- session.transfer(flowFile, relationship);
- session.getProvenanceReporter().receive(flowFile, transitUri);
+ } catch (IOException ioe) {
+ throw new UncheckedIOException("Write JSON start array failed", ioe);
+ }
+
+ eventWriterConfiguration.incrementNumberOfEventsWritten();
+
+ // Check if it is time to finish the FlowFile
+ if (maxEventsPerFlowFile(eventWriterConfiguration)
+ && eventWriterConfiguration.getNumberOfEventsWritten() == eventWriterConfiguration.getNumberOfEventsPerFlowFile()) {
+ finishAndTransferFlowFile(session, eventWriterConfiguration, transitUri, currentSequenceId, eventInfo, relationship);
+ }
return currentSequenceId + 1;
}
+
+ public void finishAndTransferFlowFile(final ProcessSession session, final EventWriterConfiguration eventWriterConfiguration, final String transitUri, final long seqId,
+ final BinlogEventInfo eventInfo, final Relationship relationship) {
+ if (writtenMultipleEvents(eventWriterConfiguration)) {
+ try {
+ jsonGenerator.writeEndArray();
+ } catch (IOException ioe) {
+ throw new UncheckedIOException("Write JSON end array failed", ioe);
+ }
+ }
+ try {
+ endFile();
+
+ FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile();
+ if (session == null || flowFile == null) {
+ throw new ProcessException("No open FlowFile or ProcessSession to write to");
+ }
+ flowFile = session.putAllAttributes(flowFile, getCommonAttributes(seqId, eventInfo));
+ session.transfer(flowFile, relationship);
+ session.getProvenanceReporter().receive(flowFile, transitUri);
+
+ eventWriterConfiguration.cleanUp();
+ } catch (IOException ioe) {
+ throw new FlowFileAccessException("Failed to close event writer", ioe);
+ }
+ }
+
+ protected void configureEventWriter(final EventWriterConfiguration eventWriterConfiguration, final ProcessSession session, final EventInfo eventInfo) {
+ FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile();
+ if (flowFile == null) {
+ flowFile = session.create();
+ OutputStream flowFileOutputStream = session.write(flowFile);
+ if (eventWriterConfiguration.getJsonGenerator() == null) {
+ try {
+ jsonGenerator = createJsonGenerator(flowFileOutputStream);
+ } catch (IOException ioe) {
+ throw new UncheckedIOException("JSON Generator creation failed", ioe);
+ }
+ }
+ if (multipleEventsPerFlowFile(eventWriterConfiguration)) {
+ try {
+ jsonGenerator.writeStartArray();
+ } catch (IOException ioe) {
+ throw new UncheckedIOException("Write JSON start array failed", ioe);
+ }
+ }
+ eventWriterConfiguration.startNewFlowFile(flowFile, flowFileOutputStream, jsonGenerator);
+ }
+ jsonGenerator = eventWriterConfiguration.getJsonGenerator();
+ }
+
+ private boolean multipleEventsPerFlowFile(EventWriterConfiguration eventWriterConfiguration) {
+ return (maxEventsPerFlowFile(eventWriterConfiguration)
+ && eventWriterConfiguration.getNumberOfEventsPerFlowFile() > 1)
+ || oneTransactionPerFlowFile(eventWriterConfiguration);
+ }
+
+ private boolean writtenMultipleEvents(EventWriterConfiguration eventWriterConfiguration) {
+ return eventWriterConfiguration.getNumberOfEventsWritten() > 1
+ || oneTransactionPerFlowFile(eventWriterConfiguration);
+ }
+
+ protected boolean maxEventsPerFlowFile(EventWriterConfiguration eventWriterConfiguration) {
+ return FlowFileEventWriteStrategy.MAX_EVENTS_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy());
+ }
+
+ protected boolean oneTransactionPerFlowFile(EventWriterConfiguration eventWriterConfiguration) {
+ return FlowFileEventWriteStrategy.ONE_TRANSACTION_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy());
+ }
}
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogTableEventWriter.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogTableEventWriter.java
index a674386de7..f618a76a6f 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogTableEventWriter.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogTableEventWriter.java
@@ -16,9 +16,6 @@
*/
package org.apache.nifi.cdc.mysql.event.io;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
import org.apache.nifi.cdc.mysql.event.BinlogTableEventInfo;
import java.io.IOException;
@@ -46,20 +43,4 @@ public abstract class AbstractBinlogTableEventWriter<T extends BinlogTableEventI
jsonGenerator.writeNullField("table_id");
}
}
-
- // Default implementation for table-related binlog events
- @Override
- public long writeEvent(ProcessSession session, String transitUri, T eventInfo, long currentSequenceId, Relationship relationship) {
- FlowFile flowFile = session.create();
- flowFile = session.write(flowFile, (outputStream) -> {
- super.startJson(outputStream, eventInfo);
- writeJson(eventInfo);
- // Nothing in the body
- super.endJson();
- });
- flowFile = session.putAllAttributes(flowFile, getCommonAttributes(currentSequenceId, eventInfo));
- session.transfer(flowFile, relationship);
- session.getProvenanceReporter().receive(flowFile, transitUri);
- return currentSequenceId + 1;
- }
}
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/CommitTransactionEventWriter.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/CommitTransactionEventWriter.java
index c69b4b242b..9ed576b023 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/CommitTransactionEventWriter.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/CommitTransactionEventWriter.java
@@ -17,7 +17,10 @@
package org.apache.nifi.cdc.mysql.event.io;
+import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
import org.apache.nifi.cdc.mysql.event.CommitTransactionEventInfo;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
import java.io.IOException;
@@ -25,6 +28,18 @@ import java.io.IOException;
* A writer for events corresponding to the end (i.e. commit) of a MySQL transaction
*/
public class CommitTransactionEventWriter extends AbstractBinlogEventWriter<CommitTransactionEventInfo> {
+
+ @Override
+ public long writeEvent(ProcessSession session, String transitUri, CommitTransactionEventInfo eventInfo, long currentSequenceId,
+ Relationship relationship, EventWriterConfiguration eventWriterConfiguration) {
+ long sequenceId = super.writeEvent(session, transitUri, eventInfo, currentSequenceId, relationship, eventWriterConfiguration);
+ // If writing one transaction per flowfile, finish the flowfile here before committing the session
+ if (oneTransactionPerFlowFile(eventWriterConfiguration)) {
+ super.finishAndTransferFlowFile(session, eventWriterConfiguration, transitUri, sequenceId, eventInfo, relationship);
+ }
+ return sequenceId;
+ }
+
protected void writeJson(CommitTransactionEventInfo event) throws IOException {
super.writeJson(event);
if (event.getDatabaseName() != null) {
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DDLEventWriter.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DDLEventWriter.java
index 0064c29322..afe084c6ad 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DDLEventWriter.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DDLEventWriter.java
@@ -16,28 +16,43 @@
*/
package org.apache.nifi.cdc.mysql.event.io;
-import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.cdc.mysql.event.DDLEventInfo;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
+
+
/**
* A writer class to output MySQL binlog Data Definition Language (DDL) events to flow file(s).
*/
public class DDLEventWriter extends AbstractBinlogTableEventWriter<DDLEventInfo> {
@Override
- public long writeEvent(ProcessSession session, String transitUri, DDLEventInfo eventInfo, long currentSequenceId, Relationship relationship) {
- FlowFile flowFile = session.create();
- flowFile = session.write(flowFile, (outputStream) -> {
+ public long writeEvent(ProcessSession session, String transitUri, DDLEventInfo eventInfo, long currentSequenceId, Relationship relationship,
+ final EventWriterConfiguration eventWriterConfiguration) {
+ configureEventWriter(eventWriterConfiguration, session, eventInfo);
+ OutputStream outputStream = eventWriterConfiguration.getFlowFileOutputStream();
+
+ try {
super.startJson(outputStream, eventInfo);
super.writeJson(eventInfo);
jsonGenerator.writeStringField("query", eventInfo.getQuery());
super.endJson();
- });
- flowFile = session.putAllAttributes(flowFile, getCommonAttributes(currentSequenceId, eventInfo));
- session.transfer(flowFile, relationship);
- session.getProvenanceReporter().receive(flowFile, transitUri);
+ } catch (IOException ioe) {
+ throw new UncheckedIOException("Write JSON start array failed", ioe);
+ }
+
+ eventWriterConfiguration.incrementNumberOfEventsWritten();
+
+ // Check if it is time to finish the FlowFile
+ if (maxEventsPerFlowFile(eventWriterConfiguration)
+ && eventWriterConfiguration.getNumberOfEventsWritten() == eventWriterConfiguration.getNumberOfEventsPerFlowFile()) {
+ finishAndTransferFlowFile(session, eventWriterConfiguration, transitUri, currentSequenceId, eventInfo, relationship);
+ }
return currentSequenceId + 1;
}
}
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DeleteRowsWriter.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DeleteRowsWriter.java
index 7e316d41c5..4828ce8662 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DeleteRowsWriter.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DeleteRowsWriter.java
@@ -16,18 +16,18 @@
*/
package org.apache.nifi.cdc.mysql.event.io;
+import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
import org.apache.nifi.cdc.mysql.event.MySQLCDCUtils;
-import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.cdc.event.ColumnDefinition;
import org.apache.nifi.cdc.mysql.event.DeleteRowsEventInfo;
import org.apache.nifi.processor.Relationship;
import java.io.IOException;
+import java.io.OutputStream;
import java.io.Serializable;
+import java.io.UncheckedIOException;
import java.util.BitSet;
-import java.util.concurrent.atomic.AtomicLong;
-
/**
* A writer class to output MySQL binlog "delete rows" events to flow file(s).
@@ -42,12 +42,13 @@ public class DeleteRowsWriter extends AbstractBinlogTableEventWriter<DeleteRowsE
* @return The next available CDC sequence ID for use by the CDC processor
*/
@Override
- public long writeEvent(final ProcessSession session, String transitUri, final DeleteRowsEventInfo eventInfo, final long currentSequenceId, Relationship relationship) {
- final AtomicLong seqId = new AtomicLong(currentSequenceId);
+ public long writeEvent(final ProcessSession session, String transitUri, final DeleteRowsEventInfo eventInfo, final long currentSequenceId, Relationship relationship,
+ final EventWriterConfiguration eventWriterConfiguration) {
+ long seqId = currentSequenceId;
for (Serializable[] row : eventInfo.getRows()) {
-
- FlowFile flowFile = session.create();
- flowFile = session.write(flowFile, outputStream -> {
+ configureEventWriter(eventWriterConfiguration, session, eventInfo);
+ OutputStream outputStream = eventWriterConfiguration.getFlowFileOutputStream();
+ try {
super.startJson(outputStream, eventInfo);
super.writeJson(eventInfo);
@@ -56,14 +57,20 @@ public class DeleteRowsWriter extends AbstractBinlogTableEventWriter<DeleteRowsE
writeRow(eventInfo, row, bitSet);
super.endJson();
- });
+ } catch (IOException ioe) {
+ throw new UncheckedIOException("Write JSON start array failed", ioe);
+ }
- flowFile = session.putAllAttributes(flowFile, getCommonAttributes(seqId.get(), eventInfo));
- session.transfer(flowFile, relationship);
- session.getProvenanceReporter().receive(flowFile, transitUri);
- seqId.getAndIncrement();
+ eventWriterConfiguration.incrementNumberOfEventsWritten();
+
+ // Check if it is time to finish the FlowFile
+ if (maxEventsPerFlowFile(eventWriterConfiguration)
+ && eventWriterConfiguration.getNumberOfEventsWritten() == eventWriterConfiguration.getNumberOfEventsPerFlowFile()) {
+ finishAndTransferFlowFile(session, eventWriterConfiguration, transitUri, seqId, eventInfo, relationship);
+ }
+ seqId++;
}
- return seqId.get();
+ return seqId;
}
protected void writeRow(DeleteRowsEventInfo event, Serializable[] row, BitSet includedColumns) throws IOException {
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/InsertRowsWriter.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/InsertRowsWriter.java
index 6cdfbdd0fb..4453889d53 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/InsertRowsWriter.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/InsertRowsWriter.java
@@ -16,17 +16,18 @@
*/
package org.apache.nifi.cdc.mysql.event.io;
+import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
import org.apache.nifi.cdc.mysql.event.MySQLCDCUtils;
-import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.cdc.event.ColumnDefinition;
import org.apache.nifi.cdc.mysql.event.InsertRowsEventInfo;
import org.apache.nifi.processor.Relationship;
import java.io.IOException;
+import java.io.OutputStream;
import java.io.Serializable;
+import java.io.UncheckedIOException;
import java.util.BitSet;
-import java.util.concurrent.atomic.AtomicLong;
/**
@@ -42,12 +43,13 @@ public class InsertRowsWriter extends AbstractBinlogTableEventWriter<InsertRowsE
* @return The next available CDC sequence ID for use by the CDC processor
*/
@Override
- public long writeEvent(final ProcessSession session, String transitUri, final InsertRowsEventInfo eventInfo, final long currentSequenceId, Relationship relationship) {
- final AtomicLong seqId = new AtomicLong(currentSequenceId);
+ public long writeEvent(final ProcessSession session, String transitUri, final InsertRowsEventInfo eventInfo, final long currentSequenceId, Relationship relationship,
+ final EventWriterConfiguration eventWriterConfiguration) {
+ long seqId = currentSequenceId;
for (Serializable[] row : eventInfo.getRows()) {
-
- FlowFile flowFile = session.create();
- flowFile = session.write(flowFile, outputStream -> {
+ configureEventWriter(eventWriterConfiguration, session, eventInfo);
+ OutputStream outputStream = eventWriterConfiguration.getFlowFileOutputStream();
+ try {
super.startJson(outputStream, eventInfo);
super.writeJson(eventInfo);
@@ -56,14 +58,20 @@ public class InsertRowsWriter extends AbstractBinlogTableEventWriter<InsertRowsE
writeRow(eventInfo, row, bitSet);
super.endJson();
- });
+ } catch (IOException ioe) {
+ throw new UncheckedIOException("Write JSON start array failed", ioe);
+ }
- flowFile = session.putAllAttributes(flowFile, getCommonAttributes(seqId.get(), eventInfo));
- session.transfer(flowFile, relationship);
- session.getProvenanceReporter().receive(flowFile, transitUri);
- seqId.getAndIncrement();
+ eventWriterConfiguration.incrementNumberOfEventsWritten();
+
+ // Check if it is time to finish the FlowFile
+ if (maxEventsPerFlowFile(eventWriterConfiguration)
+ && eventWriterConfiguration.getNumberOfEventsWritten() == eventWriterConfiguration.getNumberOfEventsPerFlowFile()) {
+ finishAndTransferFlowFile(session, eventWriterConfiguration, transitUri, seqId, eventInfo, relationship);
+ }
+ seqId++;
}
- return seqId.get();
+ return seqId;
}
protected void writeRow(InsertRowsEventInfo event, Serializable[] row, BitSet includedColumns) throws IOException {
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/UpdateRowsWriter.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/UpdateRowsWriter.java
index a4934fd40c..4a9a2d9785 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/UpdateRowsWriter.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/UpdateRowsWriter.java
@@ -16,18 +16,19 @@
*/
package org.apache.nifi.cdc.mysql.event.io;
+import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
import org.apache.nifi.cdc.mysql.event.MySQLCDCUtils;
-import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.cdc.event.ColumnDefinition;
import org.apache.nifi.cdc.mysql.event.UpdateRowsEventInfo;
import org.apache.nifi.processor.Relationship;
import java.io.IOException;
+import java.io.OutputStream;
import java.io.Serializable;
+import java.io.UncheckedIOException;
import java.util.BitSet;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
/**
@@ -43,13 +44,14 @@ public class UpdateRowsWriter extends AbstractBinlogTableEventWriter<UpdateRowsE
* @return The next available CDC sequence ID for use by the CDC processor
*/
@Override
- public long writeEvent(final ProcessSession session, String transitUri, final UpdateRowsEventInfo eventInfo, final long currentSequenceId, Relationship relationship) {
- final AtomicLong seqId = new AtomicLong(currentSequenceId);
+ public long writeEvent(final ProcessSession session, String transitUri, final UpdateRowsEventInfo eventInfo, final long currentSequenceId, Relationship relationship,
+ final EventWriterConfiguration eventWriterConfiguration) {
+ long seqId = currentSequenceId;
for (Map.Entry<Serializable[], Serializable[]> row : eventInfo.getRows()) {
+ configureEventWriter(eventWriterConfiguration, session, eventInfo);
+ OutputStream outputStream = eventWriterConfiguration.getFlowFileOutputStream();
- FlowFile flowFile = session.create();
- flowFile = session.write(flowFile, outputStream -> {
-
+ try {
super.startJson(outputStream, eventInfo);
super.writeJson(eventInfo);
@@ -57,14 +59,20 @@ public class UpdateRowsWriter extends AbstractBinlogTableEventWriter<UpdateRowsE
writeRow(eventInfo, row, bitSet);
super.endJson();
- });
+ } catch (IOException ioe) {
+ throw new UncheckedIOException("Write JSON start array failed", ioe);
+ }
- flowFile = session.putAllAttributes(flowFile, getCommonAttributes(seqId.get(), eventInfo));
- session.transfer(flowFile, relationship);
- session.getProvenanceReporter().receive(flowFile, transitUri);
- seqId.getAndIncrement();
+ eventWriterConfiguration.incrementNumberOfEventsWritten();
+
+ // Check if it is time to finish the FlowFile
+ if (maxEventsPerFlowFile(eventWriterConfiguration)
+ && eventWriterConfiguration.getNumberOfEventsWritten() == eventWriterConfiguration.getNumberOfEventsPerFlowFile()) {
+ finishAndTransferFlowFile(session, eventWriterConfiguration, transitUri, seqId, eventInfo, relationship);
+ }
+ seqId++;
}
- return seqId.get();
+ return seqId;
}
protected void writeRow(UpdateRowsEventInfo event, Map.Entry<Serializable[], Serializable[]> row, BitSet includedColumns) throws IOException {
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
index 71bea808e1..d2665fa231 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
@@ -44,7 +44,6 @@ import org.apache.nifi.cdc.event.ColumnDefinition;
import org.apache.nifi.cdc.event.RowEventException;
import org.apache.nifi.cdc.event.TableInfo;
import org.apache.nifi.cdc.event.TableInfoCacheKey;
-import org.apache.nifi.cdc.event.io.EventWriter;
import org.apache.nifi.cdc.mysql.event.BeginTransactionEventInfo;
import org.apache.nifi.cdc.mysql.event.BinlogEventInfo;
import org.apache.nifi.cdc.mysql.event.BinlogEventListener;
@@ -52,9 +51,12 @@ import org.apache.nifi.cdc.mysql.event.BinlogLifecycleListener;
import org.apache.nifi.cdc.mysql.event.CommitTransactionEventInfo;
import org.apache.nifi.cdc.mysql.event.DDLEventInfo;
import org.apache.nifi.cdc.mysql.event.DeleteRowsEventInfo;
+import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
+import org.apache.nifi.cdc.event.io.FlowFileEventWriteStrategy;
import org.apache.nifi.cdc.mysql.event.InsertRowsEventInfo;
import org.apache.nifi.cdc.mysql.event.RawBinlogEvent;
import org.apache.nifi.cdc.mysql.event.UpdateRowsEventInfo;
+import org.apache.nifi.cdc.mysql.event.io.AbstractBinlogEventWriter;
import org.apache.nifi.cdc.mysql.event.io.BeginTransactionEventWriter;
import org.apache.nifi.cdc.mysql.event.io.CommitTransactionEventWriter;
import org.apache.nifi.cdc.mysql.event.io.DDLEventWriter;
@@ -78,6 +80,7 @@ import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.ProcessContext;
@@ -132,6 +135,9 @@ import static com.github.shyiko.mysql.binlog.event.EventType.PRE_GA_WRITE_ROWS;
import static com.github.shyiko.mysql.binlog.event.EventType.ROTATE;
import static com.github.shyiko.mysql.binlog.event.EventType.WRITE_ROWS;
import static com.github.shyiko.mysql.binlog.event.EventType.XID;
+import static org.apache.nifi.cdc.event.io.EventWriter.CDC_EVENT_TYPE_ATTRIBUTE;
+import static org.apache.nifi.cdc.event.io.EventWriter.SEQUENCE_ID_KEY;
+import static org.apache.nifi.cdc.event.io.FlowFileEventWriteStrategy.MAX_EVENTS_PER_FLOWFILE;
/**
@@ -140,15 +146,17 @@ import static com.github.shyiko.mysql.binlog.event.EventType.XID;
@TriggerSerially
@PrimaryNodeOnly
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
-@Tags({"sql", "jdbc", "cdc", "mysql"})
+@Tags({"sql", "jdbc", "cdc", "mysql", "transaction", "event"})
@CapabilityDescription("Retrieves Change Data Capture (CDC) events from a MySQL database. CDC Events include INSERT, UPDATE, DELETE operations. Events "
- + "are output as individual flow files ordered by the time at which the operation occurred.")
+ + "are output as either a group of a specified number of events (the default is 1 so each event becomes its own flow file) or grouped as a full transaction (BEGIN to COMMIT). All events "
+ + "are ordered by the time at which the operation occurred. NOTE: If the processor is stopped before the specified number of events have been written to a flow file, "
+ + "the partial flow file will be output in order to maintain the consistency of the event stream.")
@Stateful(scopes = Scope.CLUSTER, description = "Information such as a 'pointer' to the current CDC event in the database is stored by this processor, such "
+ "that it can continue from the same location if restarted.")
@WritesAttributes({
- @WritesAttribute(attribute = EventWriter.SEQUENCE_ID_KEY, description = "A sequence identifier (i.e. strictly increasing integer value) specifying the order "
+ @WritesAttribute(attribute = SEQUENCE_ID_KEY, description = "A sequence identifier (i.e. strictly increasing integer value) specifying the order "
+ "of the CDC event flow file relative to the other event flow file(s)."),
- @WritesAttribute(attribute = EventWriter.CDC_EVENT_TYPE_ATTRIBUTE, description = "A string indicating the type of CDC event that occurred, including (but not limited to) "
+ @WritesAttribute(attribute = CDC_EVENT_TYPE_ATTRIBUTE, description = "A string indicating the type of CDC event that occurred, including (but not limited to) "
+ "'begin', 'insert', 'update', 'delete', 'ddl' and 'commit'."),
@WritesAttribute(attribute = "mime.type", description = "The processor outputs flow file content in JSON format, and sets the mime.type attribute to "
+ "application/json")
@@ -185,6 +193,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
SSLMode.VERIFY_IDENTITY.toString(),
"Connect with TLS or fail when server support not enabled. Verify server hostname matches presented X.509 certificate names or fail when not matched");
+
// Properties
public static final PropertyDescriptor DATABASE_NAME_PATTERN = new PropertyDescriptor.Builder()
.name("capture-change-mysql-db-name-pattern")
@@ -269,6 +278,31 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
+ public static final PropertyDescriptor EVENTS_PER_FLOWFILE_STRATEGY = new PropertyDescriptor.Builder()
+ .name("events-per-flowfile-strategy")
+ .displayName("Event Processing Strategy")
+ .description("Specifies the strategy to use when writing events to FlowFile(s), such as '" + MAX_EVENTS_PER_FLOWFILE.getDisplayName() + "'")
+ .required(true)
+ .sensitive(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .allowableValues(FlowFileEventWriteStrategy.class)
+ .defaultValue(MAX_EVENTS_PER_FLOWFILE.getValue())
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .build();
+
+ public static final PropertyDescriptor NUMBER_OF_EVENTS_PER_FLOWFILE = new PropertyDescriptor.Builder()
+ .name("number-of-events-per-flowfile")
+ .displayName("Events Per FlowFile")
+ .description("Specifies how many events should be written to a single FlowFile. If the processor is stopped before the specified number of events has been written,"
+ + "the events will still be written as a FlowFile before stopping.")
+ .required(true)
+ .sensitive(false)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("1")
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .dependsOn(EVENTS_PER_FLOWFILE_STRATEGY, MAX_EVENTS_PER_FLOWFILE.getValue())
+ .build();
+
public static final PropertyDescriptor SERVER_ID = new PropertyDescriptor.Builder()
.name("capture-change-mysql-server-id")
.displayName("Server ID")
@@ -479,6 +513,10 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
private final DeleteRowsWriter deleteRowsWriter = new DeleteRowsWriter();
private final UpdateRowsWriter updateRowsWriter = new UpdateRowsWriter();
+ private volatile EventWriterConfiguration eventWriterConfiguration;
+ private volatile BinlogEventInfo currentEventInfo;
+ private AbstractBinlogEventWriter<? extends BinlogEventInfo> currentEventWriter;
+
static {
final Set<Relationship> r = new HashSet<>();
@@ -491,6 +529,8 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
pds.add(DRIVER_LOCATION);
pds.add(USERNAME);
pds.add(PASSWORD);
+ pds.add(EVENTS_PER_FLOWFILE_STRATEGY);
+ pds.add(NUMBER_OF_EVENTS_PER_FLOWFILE);
pds.add(SERVER_ID);
pds.add(DATABASE_NAME_PATTERN);
pds.add(TABLE_NAME_PATTERN);
@@ -569,6 +609,13 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
return;
}
+ // Build a event writer config object for the event writers to use
+ final FlowFileEventWriteStrategy flowFileEventWriteStrategy = FlowFileEventWriteStrategy.valueOf(context.getProperty(EVENTS_PER_FLOWFILE_STRATEGY).getValue());
+ eventWriterConfiguration = new EventWriterConfiguration(
+ flowFileEventWriteStrategy,
+ context.getProperty(NUMBER_OF_EVENTS_PER_FLOWFILE).evaluateAttributeExpressions().asInteger()
+ );
+
PropertyValue dbNameValue = context.getProperty(DATABASE_NAME_PATTERN);
databaseNamePattern = dbNameValue.isSet() ? Pattern.compile(dbNameValue.getValue()) : null;
@@ -624,7 +671,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
}
// Get current sequence ID from state
- String seqIdString = stateMap.get(EventWriter.SEQUENCE_ID_KEY);
+ String seqIdString = stateMap.get(SEQUENCE_ID_KEY);
if (StringUtils.isEmpty(seqIdString)) {
// Use Initial Sequence ID property if none is found in state
PropertyValue seqIdProp = context.getProperty(INIT_SEQUENCE_ID);
@@ -888,7 +935,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
if (eventType != ROTATE && eventType != FORMAT_DESCRIPTION && !useGtid) {
currentBinlogPosition = header.getPosition();
}
- log.debug("Got message event type: {} ", new Object[]{header.getEventType().toString()});
+ log.debug("Got message event type: {} ", header.getEventType().toString());
switch (eventType) {
case TABLE_MAP:
// This is sent to inform which table is about to be changed by subsequent events
@@ -952,7 +999,10 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
BeginTransactionEventInfo beginEvent = useGtid
? new BeginTransactionEventInfo(currentDatabase, timestamp, currentGtidSet)
: new BeginTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition);
- currentSequenceId.set(beginEventWriter.writeEvent(currentSession, transitUri, beginEvent, currentSequenceId.get(), REL_SUCCESS));
+ currentEventInfo = beginEvent;
+ currentEventWriter = beginEventWriter;
+ currentSequenceId.set(beginEventWriter.writeEvent(currentSession, transitUri, beginEvent, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration));
+
}
inTransaction = true;
//update inTransaction value to state
@@ -963,17 +1013,35 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
+ "This could indicate that your binlog position is invalid.");
}
// InnoDB generates XID events for "commit", but MyISAM generates Query events with "COMMIT", so handle that here
- if (includeBeginCommit && (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches())) {
- CommitTransactionEventInfo commitTransactionEvent = useGtid
- ? new CommitTransactionEventInfo(currentDatabase, timestamp, currentGtidSet)
- : new CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition);
- currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS));
+ if (includeBeginCommit) {
+ if (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches()) {
+ CommitTransactionEventInfo commitTransactionEvent = useGtid
+ ? new CommitTransactionEventInfo(currentDatabase, timestamp, currentGtidSet)
+ : new CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition);
+ currentEventInfo = commitTransactionEvent;
+ currentEventWriter = commitEventWriter;
+ currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration));
+ }
+ } else {
+ // If the COMMIT event is not to be written, the FlowFile should still be finished and the session committed.
+ if (currentSession != null) {
+ FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile();
+ if (flowFile != null && currentEventWriter != null) {
+ // Flush the events to the FlowFile when the processor is stopped
+ currentEventWriter.finishAndTransferFlowFile(currentSession, eventWriterConfiguration, transitUri, currentSequenceId.get(), currentEventInfo, REL_SUCCESS);
+ }
+ currentSession.commitAsync();
+ }
}
+
//update inTransaction value to state
inTransaction = false;
updateState(session);
- // Commit the NiFi session
- session.commitAsync();
+ // If there is no FlowFile open, commit the session
+ if (eventWriterConfiguration.getCurrentFlowFile() == null) {
+ // Commit the NiFi session
+ session.commitAsync();
+ }
currentTable = null;
} else {
// Check for DDL events (alter table, e.g.). Normalize the query to do string matching on the type of change
@@ -993,7 +1061,9 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
DDLEventInfo ddlEvent = useGtid
? new DDLEventInfo(ddlTableInfo, timestamp, currentGtidSet, sql)
: new DDLEventInfo(ddlTableInfo, timestamp, currentBinlogFile, currentBinlogPosition, sql);
- currentSequenceId.set(ddlEventWriter.writeEvent(currentSession, transitUri, ddlEvent, currentSequenceId.get(), REL_SUCCESS));
+ currentEventInfo = ddlEvent;
+ currentEventWriter = ddlEventWriter;
+ currentSequenceId.set(ddlEventWriter.writeEvent(currentSession, transitUri, ddlEvent, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration));
}
// Remove all the keys from the cache that this processor added
if (cacheClient != null) {
@@ -1002,7 +1072,19 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
// If not in a transaction, commit the session so the DDL event(s) will be transferred
if (includeDDLEvents && !inTransaction) {
updateState(session);
- session.commitAsync();
+ if (FlowFileEventWriteStrategy.ONE_TRANSACTION_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy())) {
+ if (currentSession != null) {
+ FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile();
+ if (flowFile != null && currentEventWriter != null) {
+ // Flush the events to the FlowFile when the processor is stopped
+ currentEventWriter.finishAndTransferFlowFile(currentSession, eventWriterConfiguration, transitUri, currentSequenceId.get(), currentEventInfo, REL_SUCCESS);
+ }
+ }
+ }
+ // If there is no FlowFile open, commit the session
+ if (eventWriterConfiguration.getCurrentFlowFile() == null) {
+ session.commitAsync();
+ }
}
}
}
@@ -1013,19 +1095,35 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
throw new IOException("COMMIT event received while not processing a transaction (i.e. no corresponding BEGIN event). "
+ "This could indicate that your binlog position is invalid.");
}
- if (includeBeginCommit && (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches())) {
- CommitTransactionEventInfo commitTransactionEvent = useGtid
- ? new CommitTransactionEventInfo(currentDatabase, timestamp, currentGtidSet)
- : new CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition);
- currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS));
+ if (includeBeginCommit) {
+ if (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches()) {
+ CommitTransactionEventInfo commitTransactionEvent = useGtid
+ ? new CommitTransactionEventInfo(currentDatabase, timestamp, currentGtidSet)
+ : new CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition);
+ currentEventInfo = commitTransactionEvent;
+ currentEventWriter = commitEventWriter;
+ currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration));
+ }
+ } else {
+ // If the COMMIT event is not to be written, the FlowFile should still be finished and the session committed.
+ if (currentSession != null) {
+ FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile();
+ if (flowFile != null && currentEventWriter != null) {
+ // Flush the events to the FlowFile when the processor is stopped
+ currentEventWriter.finishAndTransferFlowFile(currentSession, eventWriterConfiguration, transitUri, currentSequenceId.get(), currentEventInfo, REL_SUCCESS);
+ }
+ currentSession.commitAsync();
+ }
}
- // Commit the NiFi session
// update inTransaction value and save next position
// so when restart this processor,we will not read xid event again
inTransaction = false;
currentBinlogPosition = header.getNextPosition();
updateState(session);
- session.commitAsync();
+ // If there is no FlowFile open, commit the session
+ if (eventWriterConfiguration.getCurrentFlowFile() == null) {
+ session.commitAsync();
+ }
currentTable = null;
currentDatabase = null;
break;
@@ -1060,7 +1158,9 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
InsertRowsEventInfo eventInfo = useGtid
? new InsertRowsEventInfo(currentTable, timestamp, currentGtidSet, event.getData())
: new InsertRowsEventInfo(currentTable, timestamp, currentBinlogFile, currentBinlogPosition, event.getData());
- currentSequenceId.set(insertRowsWriter.writeEvent(currentSession, transitUri, eventInfo, currentSequenceId.get(), REL_SUCCESS));
+ currentEventInfo = eventInfo;
+ currentEventWriter = insertRowsWriter;
+ currentSequenceId.set(insertRowsWriter.writeEvent(currentSession, transitUri, eventInfo, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration));
} else if (eventType == DELETE_ROWS
|| eventType == EXT_DELETE_ROWS
@@ -1069,14 +1169,18 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
DeleteRowsEventInfo eventInfo = useGtid
? new DeleteRowsEventInfo(currentTable, timestamp, currentGtidSet, event.getData())
: new DeleteRowsEventInfo(currentTable, timestamp, currentBinlogFile, currentBinlogPosition, event.getData());
- currentSequenceId.set(deleteRowsWriter.writeEvent(currentSession, transitUri, eventInfo, currentSequenceId.get(), REL_SUCCESS));
+ currentEventInfo = eventInfo;
+ currentEventWriter = deleteRowsWriter;
+ currentSequenceId.set(deleteRowsWriter.writeEvent(currentSession, transitUri, eventInfo, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration));
} else {
// Update event
UpdateRowsEventInfo eventInfo = useGtid
? new UpdateRowsEventInfo(currentTable, timestamp, currentGtidSet, event.getData())
: new UpdateRowsEventInfo(currentTable, timestamp, currentBinlogFile, currentBinlogPosition, event.getData());
- currentSequenceId.set(updateRowsWriter.writeEvent(currentSession, transitUri, eventInfo, currentSequenceId.get(), REL_SUCCESS));
+ currentEventInfo = eventInfo;
+ currentEventWriter = updateRowsWriter;
+ currentSequenceId.set(updateRowsWriter.writeEvent(currentSession, transitUri, eventInfo, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration));
}
break;
@@ -1144,6 +1248,11 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
}
if (currentSession != null) {
+ FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile();
+ if (flowFile != null && currentEventWriter != null) {
+ // Flush the events to the FlowFile when the processor is stopped
+ currentEventWriter.finishAndTransferFlowFile(currentSession, eventWriterConfiguration, transitUri, currentSequenceId.get(), currentEventInfo, REL_SUCCESS);
+ }
currentSession.commitAsync();
}
@@ -1174,7 +1283,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
}
newStateMap.put(BinlogEventInfo.BINLOG_POSITION_KEY, Long.toString(binlogPosition));
- newStateMap.put(EventWriter.SEQUENCE_ID_KEY, String.valueOf(sequenceId));
+ newStateMap.put(SEQUENCE_ID_KEY, String.valueOf(sequenceId));
//add inTransaction value into state
newStateMap.put("inTransaction", inTransaction ? "true" : "false");
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
index 66828c3570..f67567bfa5 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
@@ -35,6 +35,7 @@ import org.apache.nifi.cdc.event.ColumnDefinition
import org.apache.nifi.cdc.event.TableInfo
import org.apache.nifi.cdc.event.TableInfoCacheKey
import org.apache.nifi.cdc.event.io.EventWriter
+import org.apache.nifi.cdc.event.io.FlowFileEventWriteStrategy
import org.apache.nifi.cdc.mysql.MockBinlogClient
import org.apache.nifi.cdc.mysql.event.BinlogEventInfo
import org.apache.nifi.cdc.mysql.processors.ssl.BinaryLogSSLSocketFactory
@@ -734,6 +735,238 @@ class CaptureChangeMySQLTest {
assertEquals(5, resultFiles.size())
}
+ @Test
+ void testSkipTableMultipleEventsPerFlowFile() throws Exception {
+ testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, DRIVER_LOCATION)
+ testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
+ testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
+ testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
+ testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
+ testRunner.setProperty(CaptureChangeMySQL.DATABASE_NAME_PATTERN, "myDB")
+ testRunner.setProperty(CaptureChangeMySQL.TABLE_NAME_PATTERN, "user")
+ testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, 'true')
+ testRunner.setProperty(CaptureChangeMySQL.NUMBER_OF_EVENTS_PER_FLOWFILE, '2')
+
+ testRunner.run(1, false, true)
+
+ // ROTATE
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.ROTATE, nextPosition: 2] as EventHeaderV4,
+ [binlogFilename: 'master.000001', binlogPosition: 4L] as RotateEventData
+ ))
+
+ // BEGIN
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4,
+ [database: 'myDB', sql: 'BEGIN'] as QueryEventData
+ ))
+
+ // TABLE MAP for table not matching the regex (note the s on the end of users vs the regex of 'user')
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.TABLE_MAP, nextPosition: 6] as EventHeaderV4,
+ [tableId: 1, database: 'myDB', table: 'users', columnTypes: [4, -4] as byte[]] as TableMapEventData
+ ))
+
+ // This WRITE ROWS should be skipped
+ def cols = new BitSet()
+ cols.set(1)
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.EXT_WRITE_ROWS, nextPosition: 8] as EventHeaderV4,
+ [tableId: 1, includedColumns: cols,
+ rows : [[2, 'Smith'] as Serializable[]] as List<Serializable[]>] as WriteRowsEventData
+ ))
+
+ // TABLE MAP for table matching, all modification events (1) should be emitted
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.TABLE_MAP, nextPosition: 10] as EventHeaderV4,
+ [tableId: 1, database: 'myDB', table: 'user', columnTypes: [4, -4] as byte[]] as TableMapEventData
+ ))
+
+ // WRITE ROWS for matching table
+ cols = new BitSet()
+ cols.set(1)
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.EXT_WRITE_ROWS, nextPosition: 12] as EventHeaderV4,
+ [tableId: 1, includedColumns: cols,
+ rows : [[10, 'Cruz'] as Serializable[]] as List<Serializable[]>] as WriteRowsEventData
+ ))
+
+ // COMMIT
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.XID, nextPosition: 14] as EventHeaderV4,
+ {} as EventData
+ ))
+
+ ////////////////////////
+ // Test database filter
+ ////////////////////////
+
+ // BEGIN
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4,
+ [database: 'myDB', sql: 'BEGIN'] as QueryEventData
+ ))
+
+ // TABLE MAP for database not matching the regex
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.TABLE_MAP, nextPosition: 6] as EventHeaderV4,
+ [tableId: 1, database: 'myDB', table: 'notMyDB', columnTypes: [4, -4] as byte[]] as TableMapEventData
+ ))
+
+ // This WRITE ROWS should be skipped
+ cols = new BitSet()
+ cols.set(1)
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.EXT_WRITE_ROWS, nextPosition: 8] as EventHeaderV4,
+ [tableId: 1, includedColumns: cols,
+ rows : [[2, 'Smith'] as Serializable[]] as List<Serializable[]>] as WriteRowsEventData
+ ))
+
+ // COMMIT
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.XID, nextPosition: 14] as EventHeaderV4,
+ {} as EventData
+ ))
+
+ testRunner.run(1, true, false)
+
+ def resultFiles = testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS)
+ // Five events total, 2 max per flow file, so 3 flow files
+ assertEquals(3, resultFiles.size())
+ def json = new JsonSlurper().parseText(new String(resultFiles[0].toByteArray()))
+ assertTrue (json instanceof ArrayList)
+ assertEquals(2, json.size())
+ // BEGIN, INSERT, COMMIT (verifies that one of the INSERTs was skipped)
+ assertEquals('begin', json[0]?.type)
+ assertEquals('insert', json[1]?.type)
+
+ json = new JsonSlurper().parseText(new String(resultFiles[1].toByteArray()))
+ assertTrue (json instanceof ArrayList)
+ assertEquals(2, json.size())
+ assertEquals('commit', json[0]?.type)
+ assertEquals('begin', json[1]?.type)
+
+ json = new JsonSlurper().parseText(new String(resultFiles[2].toByteArray()))
+ assertTrue (json instanceof ArrayList)
+ // One event left
+ assertEquals(1, json.size())
+ assertEquals('commit', json[0]?.type)
+ }
+
+ @Test
+ void testSkipTableOneTransactionPerFlowFile() throws Exception {
+ testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, DRIVER_LOCATION)
+ testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
+ testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
+ testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
+ testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
+ testRunner.setProperty(CaptureChangeMySQL.DATABASE_NAME_PATTERN, "myDB")
+ testRunner.setProperty(CaptureChangeMySQL.TABLE_NAME_PATTERN, "user")
+ testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, 'true')
+ testRunner.setProperty(CaptureChangeMySQL.EVENTS_PER_FLOWFILE_STRATEGY, FlowFileEventWriteStrategy.ONE_TRANSACTION_PER_FLOWFILE.name())
+
+ testRunner.run(1, false, true)
+
+ // ROTATE
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.ROTATE, nextPosition: 2] as EventHeaderV4,
+ [binlogFilename: 'master.000001', binlogPosition: 4L] as RotateEventData
+ ))
+
+ // BEGIN
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4,
+ [database: 'myDB', sql: 'BEGIN'] as QueryEventData
+ ))
+
+ // TABLE MAP for table not matching the regex (note the s on the end of users vs the regex of 'user')
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.TABLE_MAP, nextPosition: 6] as EventHeaderV4,
+ [tableId: 1, database: 'myDB', table: 'users', columnTypes: [4, -4] as byte[]] as TableMapEventData
+ ))
+
+ // This WRITE ROWS should be skipped
+ def cols = new BitSet()
+ cols.set(1)
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.EXT_WRITE_ROWS, nextPosition: 8] as EventHeaderV4,
+ [tableId: 1, includedColumns: cols,
+ rows : [[2, 'Smith'] as Serializable[]] as List<Serializable[]>] as WriteRowsEventData
+ ))
+
+ // TABLE MAP for table matching, all modification events (1) should be emitted
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.TABLE_MAP, nextPosition: 10] as EventHeaderV4,
+ [tableId: 1, database: 'myDB', table: 'user', columnTypes: [4, -4] as byte[]] as TableMapEventData
+ ))
+
+ // WRITE ROWS for matching table
+ cols = new BitSet()
+ cols.set(1)
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.EXT_WRITE_ROWS, nextPosition: 12] as EventHeaderV4,
+ [tableId: 1, includedColumns: cols,
+ rows : [[10, 'Cruz'] as Serializable[]] as List<Serializable[]>] as WriteRowsEventData
+ ))
+
+ // COMMIT
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.XID, nextPosition: 14] as EventHeaderV4,
+ {} as EventData
+ ))
+
+ ////////////////////////
+ // Test database filter
+ ////////////////////////
+
+ // BEGIN
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4,
+ [database: 'myDB', sql: 'BEGIN'] as QueryEventData
+ ))
+
+ // TABLE MAP for database not matching the regex
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.TABLE_MAP, nextPosition: 6] as EventHeaderV4,
+ [tableId: 1, database: 'myDB', table: 'notMyDB', columnTypes: [4, -4] as byte[]] as TableMapEventData
+ ))
+
+ // This WRITE ROWS should be skipped
+ cols = new BitSet()
+ cols.set(1)
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.EXT_WRITE_ROWS, nextPosition: 8] as EventHeaderV4,
+ [tableId: 1, includedColumns: cols,
+ rows : [[2, 'Smith'] as Serializable[]] as List<Serializable[]>] as WriteRowsEventData
+ ))
+
+ // COMMIT
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.XID, nextPosition: 14] as EventHeaderV4,
+ {} as EventData
+ ))
+
+ testRunner.run(1, true, false)
+
+ def resultFiles = testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS)
+ // Five events total, 3 max per flow file, so 2 flow files
+ assertEquals(2, resultFiles.size())
+ def json = new JsonSlurper().parseText(new String(resultFiles[0].toByteArray()))
+ assertTrue (json instanceof ArrayList)
+ assertEquals(3, json.size())
+ // BEGIN, INSERT, COMMIT (verifies that one of the INSERTs was skipped)
+ assertEquals('begin', json[0]?.type)
+ assertEquals('insert', json[1]?.type)
+ assertEquals('commit', json[2]?.type)
+
+ json = new JsonSlurper().parseText(new String(resultFiles[1].toByteArray()))
+ assertTrue (json instanceof ArrayList)
+ // Only two events left
+ assertEquals(2, json.size())
+ assertEquals('begin', json[0]?.type)
+ assertEquals('commit', json[1]?.type)
+ }
+
@Test
void testFilterDatabase() throws Exception {
testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, DRIVER_LOCATION)