You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2020/09/18 00:56:57 UTC
[nifi] 08/11: NIFI-5583: Add cdc processor for MySQL referring to
GTID.
This is an automated email from the ASF dual-hosted git repository.
alopresto pushed a commit to branch support/nifi-1.12.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 174720104872b43dbaf94a268690d986de7f6b03
Author: yoshiata <yo...@yahoo-corp.jp>
AuthorDate: Mon Jul 2 15:55:50 2018 +0900
NIFI-5583: Add cdc processor for MySQL referring to GTID.
Signed-off-by: Pierre Villard <pi...@gmail.com>
This closes #2997.
---
.../nifi/cdc/mysql/event/BaseBinlogEventInfo.java | 10 ++
.../cdc/mysql/event/BaseBinlogRowEventInfo.java | 6 +
.../cdc/mysql/event/BaseBinlogTableEventInfo.java | 5 +
.../cdc/mysql/event/BeginTransactionEventInfo.java | 5 +
.../nifi/cdc/mysql/event/BinlogEventInfo.java | 3 +
.../mysql/event/CommitTransactionEventInfo.java | 5 +
.../apache/nifi/cdc/mysql/event/DDLEventInfo.java | 5 +
.../nifi/cdc/mysql/event/DeleteRowsEventInfo.java | 4 +
.../nifi/cdc/mysql/event/InsertRowsEventInfo.java | 5 +
.../nifi/cdc/mysql/event/UpdateRowsEventInfo.java | 5 +
.../mysql/event/io/AbstractBinlogEventWriter.java | 19 +-
.../cdc/mysql/processors/CaptureChangeMySQL.java | 177 ++++++++++++++-----
.../mysql/processors/CaptureChangeMySQLTest.groovy | 193 +++++++++++++++++++++
13 files changed, 393 insertions(+), 49 deletions(-)
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogEventInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogEventInfo.java
index 7089cda..9106e93 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogEventInfo.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogEventInfo.java
@@ -25,6 +25,7 @@ public class BaseBinlogEventInfo extends BaseEventInfo implements BinlogEventInf
private String binlogFilename;
private Long binlogPosition;
+ private String binlogGtidSet;
public BaseBinlogEventInfo(String eventType, Long timestamp, String binlogFilename, Long binlogPosition) {
super(eventType, timestamp);
@@ -32,6 +33,11 @@ public class BaseBinlogEventInfo extends BaseEventInfo implements BinlogEventInf
this.binlogPosition = binlogPosition;
}
+ public BaseBinlogEventInfo(String eventType, Long timestamp, String binlogGtidSet) {
+ super(eventType, timestamp);
+ this.binlogGtidSet = binlogGtidSet;
+ }
+
public String getBinlogFilename() {
return binlogFilename;
}
@@ -39,4 +45,8 @@ public class BaseBinlogEventInfo extends BaseEventInfo implements BinlogEventInf
public Long getBinlogPosition() {
return binlogPosition;
}
+
+ public String getBinlogGtidSet() {
+ return binlogGtidSet;
+ }
}
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogRowEventInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogRowEventInfo.java
index 4796947..6517225 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogRowEventInfo.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogRowEventInfo.java
@@ -37,6 +37,12 @@ public class BaseBinlogRowEventInfo<RowEventDataType> extends BaseBinlogTableEve
this.delegate = new BaseRowEventInfo<>(tableInfo, type, timestamp, rows);
}
+ public BaseBinlogRowEventInfo(TableInfo tableInfo, String type, Long timestamp, String binlogGtidSet, BitSet includedColumns, List<RowEventDataType> rows) {
+ super(tableInfo, type, timestamp, binlogGtidSet);
+ this.includedColumns = includedColumns;
+ this.delegate = new BaseRowEventInfo<>(tableInfo, type, timestamp, rows);
+ }
+
public BitSet getIncludedColumns() {
return includedColumns;
}
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogTableEventInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogTableEventInfo.java
index c5d3ddf..ea23023 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogTableEventInfo.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogTableEventInfo.java
@@ -35,6 +35,11 @@ public class BaseBinlogTableEventInfo extends BaseBinlogEventInfo implements Bin
this.delegate = new BaseTableEventInfo(tableInfo, DDL_EVENT, timestamp);
}
+ public BaseBinlogTableEventInfo(TableInfo tableInfo, String eventType, Long timestamp, String binlogGtidSet) {
+ super(eventType, timestamp, binlogGtidSet);
+ this.delegate = new BaseTableEventInfo(tableInfo, DDL_EVENT, timestamp);
+ }
+
@Override
public String getDatabaseName() {
return delegate.getDatabaseName();
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BeginTransactionEventInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BeginTransactionEventInfo.java
index c6f5af0..ed899f5 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BeginTransactionEventInfo.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BeginTransactionEventInfo.java
@@ -28,6 +28,11 @@ public class BeginTransactionEventInfo extends BaseBinlogEventInfo {
this.databaseName = databaseName;
}
+ public BeginTransactionEventInfo(String databaseName, Long timestamp, String binlogGtidSet) {
+ super(BEGIN_EVENT, timestamp, binlogGtidSet);
+ this.databaseName = databaseName;
+ }
+
public String getDatabaseName() {
return databaseName;
}
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogEventInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogEventInfo.java
index 6398d4f..9784fc4 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogEventInfo.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogEventInfo.java
@@ -25,8 +25,11 @@ public interface BinlogEventInfo extends EventInfo {
String BINLOG_FILENAME_KEY = "binlog.filename";
String BINLOG_POSITION_KEY = "binlog.position";
+ String BINLOG_GTIDSET_KEY = "binlog.gtidset";
String getBinlogFilename();
Long getBinlogPosition();
+
+ String getBinlogGtidSet();
}
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/CommitTransactionEventInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/CommitTransactionEventInfo.java
index b4a230a..a5406ea 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/CommitTransactionEventInfo.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/CommitTransactionEventInfo.java
@@ -29,6 +29,11 @@ public class CommitTransactionEventInfo extends BaseBinlogEventInfo {
this.databaseName = databaseName;
}
+ public CommitTransactionEventInfo(String databaseName, Long timestamp, String binlogGtidSet) {
+ super(COMMIT_EVENT, timestamp, binlogGtidSet);
+ this.databaseName = databaseName;
+ }
+
public String getDatabaseName() {
return databaseName;
}
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/DDLEventInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/DDLEventInfo.java
index bc2c871..6da8d80 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/DDLEventInfo.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/DDLEventInfo.java
@@ -32,6 +32,11 @@ public class DDLEventInfo extends BaseBinlogTableEventInfo implements TableEvent
this.query = query;
}
+ public DDLEventInfo(TableInfo tableInfo, Long timestamp, String binlogGtidSet, String query) {
+ super(tableInfo, DDL_EVENT, timestamp, binlogGtidSet);
+ this.query = query;
+ }
+
public String getQuery() {
return query;
}
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/DeleteRowsEventInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/DeleteRowsEventInfo.java
index 3b538ec..98db096 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/DeleteRowsEventInfo.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/DeleteRowsEventInfo.java
@@ -29,4 +29,8 @@ public class DeleteRowsEventInfo extends BaseBinlogRowEventInfo<Serializable[]>
public DeleteRowsEventInfo(TableInfo tableInfo, Long timestamp, String binlogFilename, Long binlogPosition, DeleteRowsEventData data) {
super(tableInfo, DELETE_EVENT, timestamp, binlogFilename, binlogPosition, data.getIncludedColumns(), data.getRows());
}
+
+ public DeleteRowsEventInfo(TableInfo tableInfo, Long timestamp, String binlogGtidSet, DeleteRowsEventData data) {
+ super(tableInfo, DELETE_EVENT, timestamp, binlogGtidSet, data.getIncludedColumns(), data.getRows());
+ }
}
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/InsertRowsEventInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/InsertRowsEventInfo.java
index 7d75511..edce548 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/InsertRowsEventInfo.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/InsertRowsEventInfo.java
@@ -32,4 +32,9 @@ public class InsertRowsEventInfo extends BaseBinlogRowEventInfo<Serializable[]>
super(tableInfo, INSERT_EVENT, timestamp, binlogFilename, binlogPosition, data.getIncludedColumns(), data.getRows());
this.data = data;
}
+
+ public InsertRowsEventInfo(TableInfo tableInfo, Long timestamp, String binlogGtidSet, WriteRowsEventData data) {
+ super(tableInfo, INSERT_EVENT, timestamp, binlogGtidSet, data.getIncludedColumns(), data.getRows());
+ this.data = data;
+ }
}
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/UpdateRowsEventInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/UpdateRowsEventInfo.java
index 2e20dc6..14edcaa 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/UpdateRowsEventInfo.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/UpdateRowsEventInfo.java
@@ -35,6 +35,11 @@ public class UpdateRowsEventInfo extends BaseBinlogRowEventInfo<Map.Entry<Serial
includedColumnsBeforeUpdate = data.getIncludedColumnsBeforeUpdate();
}
+ public UpdateRowsEventInfo(TableInfo tableInfo, Long timestamp, String binlogGtidSet, UpdateRowsEventData data) {
+ super(tableInfo, UPDATE_EVENT, timestamp, binlogGtidSet, data.getIncludedColumns(), data.getRows());
+ includedColumnsBeforeUpdate = data.getIncludedColumnsBeforeUpdate();
+ }
+
public BitSet getIncludedColumnsBeforeUpdate() {
return includedColumnsBeforeUpdate;
}
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogEventWriter.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogEventWriter.java
index df4424c..09186f2 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogEventWriter.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogEventWriter.java
@@ -33,8 +33,14 @@ import java.util.Map;
public abstract class AbstractBinlogEventWriter<T extends BinlogEventInfo> extends AbstractEventWriter<T> {
protected void writeJson(T event) throws IOException {
- jsonGenerator.writeStringField("binlog_filename", event.getBinlogFilename());
- jsonGenerator.writeNumberField("binlog_position", event.getBinlogPosition());
+ String gtidSet = event.getBinlogGtidSet();
+
+ if (gtidSet == null) {
+ jsonGenerator.writeStringField("binlog_filename", event.getBinlogFilename());
+ jsonGenerator.writeNumberField("binlog_position", event.getBinlogPosition());
+ } else {
+ jsonGenerator.writeStringField("binlog_gtidset", event.getBinlogGtidSet());
+ }
}
protected Map<String, String> getCommonAttributes(final long sequenceId, BinlogEventInfo eventInfo) {
@@ -42,8 +48,13 @@ public abstract class AbstractBinlogEventWriter<T extends BinlogEventInfo> exten
{
put(SEQUENCE_ID_KEY, Long.toString(sequenceId));
put(CDC_EVENT_TYPE_ATTRIBUTE, eventInfo.getEventType());
- put(BinlogEventInfo.BINLOG_FILENAME_KEY, eventInfo.getBinlogFilename());
- put(BinlogEventInfo.BINLOG_POSITION_KEY, Long.toString(eventInfo.getBinlogPosition()));
+ String gtidSet = eventInfo.getBinlogGtidSet();
+ if (gtidSet == null) {
+ put(BinlogEventInfo.BINLOG_FILENAME_KEY, eventInfo.getBinlogFilename());
+ put(BinlogEventInfo.BINLOG_POSITION_KEY, Long.toString(eventInfo.getBinlogPosition()));
+ } else {
+ put(BinlogEventInfo.BINLOG_GTIDSET_KEY, gtidSet);
+ }
put(CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON);
}
};
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
index 1fcbfa6..edeb49c 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
@@ -26,9 +26,11 @@ import static com.github.shyiko.mysql.binlog.event.EventType.ROTATE;
import static com.github.shyiko.mysql.binlog.event.EventType.WRITE_ROWS;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
+import com.github.shyiko.mysql.binlog.GtidSet;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
import com.github.shyiko.mysql.binlog.event.EventType;
+import com.github.shyiko.mysql.binlog.event.GtidEventData;
import com.github.shyiko.mysql.binlog.event.QueryEventData;
import com.github.shyiko.mysql.binlog.event.RotateEventData;
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
@@ -314,7 +316,8 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
.name("capture-change-mysql-init-binlog-filename")
.displayName("Initial Binlog Filename")
.description("Specifies an initial binlog filename to use if this processor's State does not have a current binlog filename. If a filename is present "
- + "in the processor's State, this property is ignored. This can be used along with Initial Binlog Position to \"skip ahead\" if previous events are not desired. "
+ + "in the processor's State or \"Use GTID\" property is set to false, this property is ignored. "
+ + "This can be used along with Initial Binlog Position to \"skip ahead\" if previous events are not desired. "
+ "Note that NiFi Expression Language is supported, but this property is evaluated when the processor is configured, so FlowFile attributes may not be used. Expression "
+ "Language is supported to enable the use of the Variable Registry and/or environment properties.")
.required(false)
@@ -326,30 +329,57 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
.name("capture-change-mysql-init-binlog-position")
.displayName("Initial Binlog Position")
.description("Specifies an initial offset into a binlog (specified by Initial Binlog Filename) to use if this processor's State does not have a current "
- + "binlog filename. If a filename is present in the processor's State, this property is ignored. This can be used along with Initial Binlog Filename "
- + "to \"skip ahead\" if previous events are not desired. Note that NiFi Expression Language is supported, but this property is evaluated when the "
- + "processor is configured, so FlowFile attributes may not be used. Expression Language is supported to enable the use of the Variable Registry "
- + "and/or environment properties.")
+ + "binlog filename. If a filename is present in the processor's State or \"Use GTID\" property is false, this property is ignored. "
+ + "This can be used along with Initial Binlog Filename to \"skip ahead\" if previous events are not desired. Note that NiFi Expression Language "
+ + "is supported, but this property is evaluated when the processor is configured, so FlowFile attributes may not be used. Expression Language is "
+ + "supported to enable the use of the Variable Registry and/or environment properties.")
.required(false)
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
+ public static final PropertyDescriptor USE_BINLOG_GTID = new PropertyDescriptor.Builder()
+ .name("capture-change-mysql-use-gtid")
+ .displayName("Use Binlog GTID")
+ .description("Specifies whether to use Global Transaction ID (GTID) for binlog tracking. If set to true, processor's state of binlog file name and position is ignored. "
+ + "The main benefit of using GTID is to have much reliable failover than using binlog filename/position.")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor INIT_BINLOG_GTID = new PropertyDescriptor.Builder()
+ .name("capture-change-mysql-init-gtid")
+ .displayName("Initial Binlog GTID")
+ .description("Specifies an initial GTID to use if this processor's State does not have a current GTID. "
+ + "If a GTID is present in the processor's State or \"Use GTID\" property is set to false, this property is ignored. "
+ + "This can be used to \"skip ahead\" if previous events are not desired. "
+ + "Note that NiFi Expression Language is supported, but this property is evaluated when the processor is configured, so FlowFile attributes may not be used. "
+ + "Expression Language is supported to enable the use of the Variable Registry and/or environment properties.")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
private static final List<PropertyDescriptor> propDescriptors;
private volatile ProcessSession currentSession;
private BinaryLogClient binlogClient;
private BinlogEventListener eventListener;
private BinlogLifecycleListener lifecycleListener;
+ private GtidSet gtidSet;
private final LinkedBlockingQueue<RawBinlogEvent> queue = new LinkedBlockingQueue<>();
private volatile String currentBinlogFile = null;
private volatile long currentBinlogPosition = 4;
+ private volatile String currentGtidSet = null;
- // The following variables save the value of the binlog filename and position (and sequence id) at the beginning of a transaction. Used for rollback
+ // The following variables save the value of the binlog filename, position, (sequence id), and gtid at the beginning of a transaction. Used for rollback
private volatile String xactBinlogFile = null;
private volatile long xactBinlogPosition = 4;
private volatile long xactSequenceId = 0;
+ private volatile String xactGtidSet = null;
private volatile TableInfo currentTable = null;
private volatile String currentDatabase = null;
@@ -357,6 +387,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
private volatile Pattern tableNamePattern;
private volatile boolean includeBeginCommit = false;
private volatile boolean includeDDLEvents = false;
+ private volatile boolean useGtid = false;
private volatile boolean inTransaction = false;
private volatile boolean skipTable = false;
@@ -408,6 +439,8 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
pds.add(INIT_SEQUENCE_ID);
pds.add(INIT_BINLOG_FILENAME);
pds.add(INIT_BINLOG_POSITION);
+ pds.add(USE_BINLOG_GTID);
+ pds.add(INIT_BINLOG_GTID);
propDescriptors = Collections.unmodifiableList(pds);
}
@@ -450,32 +483,48 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
includeBeginCommit = context.getProperty(INCLUDE_BEGIN_COMMIT).asBoolean();
includeDDLEvents = context.getProperty(INCLUDE_DDL_EVENTS).asBoolean();
-
- // Set current binlog filename to whatever is in State, falling back to the Retrieve All Records then Initial Binlog Filename if no State variable is present
- currentBinlogFile = stateMap.get(BinlogEventInfo.BINLOG_FILENAME_KEY);
- if (currentBinlogFile == null) {
- if (!getAllRecords) {
- if (context.getProperty(INIT_BINLOG_FILENAME).isSet()) {
- currentBinlogFile = context.getProperty(INIT_BINLOG_FILENAME).evaluateAttributeExpressions().getValue();
+ useGtid = context.getProperty(USE_BINLOG_GTID).asBoolean();
+
+ if (useGtid) {
+ // Set current gtid to whatever is in State, falling back to the Retrieve All Records then Initial Gtid if no State variable is present
+ currentGtidSet = stateMap.get(BinlogEventInfo.BINLOG_GTIDSET_KEY);
+ if (currentGtidSet == null) {
+ if (!getAllRecords && context.getProperty(INIT_BINLOG_GTID).isSet()) {
+ currentGtidSet = context.getProperty(INIT_BINLOG_GTID).evaluateAttributeExpressions().getValue();
+ } else {
+ // If we're starting from the beginning of all binlogs, the binlog gtid must be the empty string (not null)
+ currentGtidSet = "";
+ }
+ }
+ currentBinlogFile = "";
+ currentBinlogPosition = DO_NOT_SET;
+ } else {
+ // Set current binlog filename to whatever is in State, falling back to the Retrieve All Records then Initial Binlog Filename if no State variable is present
+ currentBinlogFile = stateMap.get(BinlogEventInfo.BINLOG_FILENAME_KEY);
+ if (currentBinlogFile == null) {
+ if (!getAllRecords) {
+ if (context.getProperty(INIT_BINLOG_FILENAME).isSet()) {
+ currentBinlogFile = context.getProperty(INIT_BINLOG_FILENAME).evaluateAttributeExpressions().getValue();
+ }
+ } else {
+ // If we're starting from the beginning of all binlogs, the binlog filename must be the empty string (not null)
+ currentBinlogFile = "";
}
- } else {
- // If we're starting from the beginning of all binlogs, the binlog filename must be the empty string (not null)
- currentBinlogFile = "";
}
- }
- // Set current binlog position to whatever is in State, falling back to the Retrieve All Records then Initial Binlog Filename if no State variable is present
- String binlogPosition = stateMap.get(BinlogEventInfo.BINLOG_POSITION_KEY);
- if (binlogPosition != null) {
- currentBinlogPosition = Long.valueOf(binlogPosition);
- } else if (!getAllRecords) {
- if (context.getProperty(INIT_BINLOG_POSITION).isSet()) {
- currentBinlogPosition = context.getProperty(INIT_BINLOG_POSITION).evaluateAttributeExpressions().asLong();
+ // Set current binlog position to whatever is in State, falling back to the Retrieve All Records then Initial Binlog Filename if no State variable is present
+ String binlogPosition = stateMap.get(BinlogEventInfo.BINLOG_POSITION_KEY);
+ if (binlogPosition != null) {
+ currentBinlogPosition = Long.valueOf(binlogPosition);
+ } else if (!getAllRecords) {
+ if (context.getProperty(INIT_BINLOG_POSITION).isSet()) {
+ currentBinlogPosition = context.getProperty(INIT_BINLOG_POSITION).evaluateAttributeExpressions().asLong();
+ } else {
+ currentBinlogPosition = DO_NOT_SET;
+ }
} else {
- currentBinlogPosition = DO_NOT_SET;
+ currentBinlogPosition = -1;
}
- } else {
- currentBinlogPosition = -1;
}
// Get current sequence ID from state
@@ -571,7 +620,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
long timeSinceLastUpdate = now - lastStateUpdate;
if (stateUpdateInterval != 0 && timeSinceLastUpdate >= stateUpdateInterval) {
- updateState(stateManager, currentBinlogFile, currentBinlogPosition, currentSequenceId.get());
+ updateState(stateManager, currentBinlogFile, currentBinlogPosition, currentSequenceId.get(), currentGtidSet);
lastStateUpdate = now;
}
} catch (IOException ioe) {
@@ -580,6 +629,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
currentBinlogFile = xactBinlogFile == null ? "" : xactBinlogFile;
currentBinlogPosition = xactBinlogPosition;
currentSequenceId.set(xactSequenceId);
+ currentGtidSet = xactGtidSet;
inTransaction = false;
stop(stateManager);
queue.clear();
@@ -681,6 +731,9 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
binlogClient.setBinlogPosition(currentBinlogPosition);
}
+ binlogClient.setGtidSet(currentGtidSet);
+ binlogClient.setGtidSetFallbackToPurged(true);
+
if (serverId != null) {
binlogClient.setServerId(serverId);
}
@@ -719,6 +772,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
}
}
+ gtidSet = new GtidSet(binlogClient.getGtidSet());
doStop.set(false);
}
@@ -736,7 +790,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
// We always get ROTATE and FORMAT_DESCRIPTION messages no matter where we start (even from the end), and they won't have the correct "next position" value, so only
// advance the position if it is not that type of event. ROTATE events don't generate output CDC events and have the current binlog position in a special field, which
// is filled in during the ROTATE case
- if (eventType != ROTATE && eventType != FORMAT_DESCRIPTION) {
+ if (eventType != ROTATE && eventType != FORMAT_DESCRIPTION && !useGtid) {
currentBinlogPosition = header.getPosition();
}
log.debug("Got message event type: {} ", new Object[]{header.getEventType().toString()});
@@ -790,13 +844,16 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
if (inTransaction) {
throw new IOException("BEGIN event received while already processing a transaction. This could indicate that your binlog position is invalid.");
}
- // Mark the current binlog position in case we have to rollback the transaction (if the processor is stopped, e.g.)
+ // Mark the current binlog position and GTID in case we have to rollback the transaction (if the processor is stopped, e.g.)
xactBinlogFile = currentBinlogFile;
xactBinlogPosition = currentBinlogPosition;
xactSequenceId = currentSequenceId.get();
+ xactGtidSet = currentGtidSet;
if (includeBeginCommit && (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches())) {
- BeginTransactionEventInfo beginEvent = new BeginTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition);
+ BeginTransactionEventInfo beginEvent = useGtid
+ ? new BeginTransactionEventInfo(currentDatabase, timestamp, currentGtidSet)
+ : new BeginTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition);
currentSequenceId.set(beginEventWriter.writeEvent(currentSession, transitUri, beginEvent, currentSequenceId.get(), REL_SUCCESS));
}
inTransaction = true;
@@ -807,7 +864,9 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
}
// InnoDB generates XID events for "commit", but MyISAM generates Query events with "COMMIT", so handle that here
if (includeBeginCommit && (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches())) {
- CommitTransactionEventInfo commitTransactionEvent = new CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition);
+ CommitTransactionEventInfo commitTransactionEvent = useGtid
+ ? new CommitTransactionEventInfo(currentDatabase, timestamp, currentGtidSet)
+ : new CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition);
currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS));
}
// Commit the NiFi session
@@ -829,7 +888,9 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
if (includeDDLEvents && (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches())) {
// If we don't have table information, we can still use the database name
TableInfo ddlTableInfo = (currentTable != null) ? currentTable : new TableInfo(currentDatabase, null, null, null);
- DDLEventInfo ddlEvent = new DDLEventInfo(ddlTableInfo, timestamp, currentBinlogFile, currentBinlogPosition, sql);
+ DDLEventInfo ddlEvent = useGtid
+ ? new DDLEventInfo(ddlTableInfo, timestamp, currentGtidSet, sql)
+ : new DDLEventInfo(ddlTableInfo, timestamp, currentBinlogFile, currentBinlogPosition, sql);
currentSequenceId.set(ddlEventWriter.writeEvent(currentSession, transitUri, ddlEvent, currentSequenceId.get(), REL_SUCCESS));
}
// Remove all the keys from the cache that this processor added
@@ -850,7 +911,9 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
+ "This could indicate that your binlog position is invalid.");
}
if (includeBeginCommit && (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches())) {
- CommitTransactionEventInfo commitTransactionEvent = new CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition);
+ CommitTransactionEventInfo commitTransactionEvent = useGtid
+ ? new CommitTransactionEventInfo(currentDatabase, timestamp, currentGtidSet)
+ : new CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition);
currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS));
}
// Commit the NiFi session
@@ -887,29 +950,47 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
|| eventType == EXT_WRITE_ROWS
|| eventType == PRE_GA_WRITE_ROWS) {
- InsertRowsEventInfo eventInfo = new InsertRowsEventInfo(currentTable, timestamp, currentBinlogFile, currentBinlogPosition, event.getData());
+ InsertRowsEventInfo eventInfo = useGtid
+ ? new InsertRowsEventInfo(currentTable, timestamp, currentGtidSet, event.getData())
+ : new InsertRowsEventInfo(currentTable, timestamp, currentBinlogFile, currentBinlogPosition, event.getData());
currentSequenceId.set(insertRowsWriter.writeEvent(currentSession, transitUri, eventInfo, currentSequenceId.get(), REL_SUCCESS));
} else if (eventType == DELETE_ROWS
|| eventType == EXT_DELETE_ROWS
|| eventType == PRE_GA_DELETE_ROWS) {
- DeleteRowsEventInfo eventInfo = new DeleteRowsEventInfo(currentTable, timestamp, currentBinlogFile, currentBinlogPosition, event.getData());
+ DeleteRowsEventInfo eventInfo = useGtid
+ ? new DeleteRowsEventInfo(currentTable, timestamp, currentGtidSet, event.getData())
+ : new DeleteRowsEventInfo(currentTable, timestamp, currentBinlogFile, currentBinlogPosition, event.getData());
currentSequenceId.set(deleteRowsWriter.writeEvent(currentSession, transitUri, eventInfo, currentSequenceId.get(), REL_SUCCESS));
} else {
// Update event
- UpdateRowsEventInfo eventInfo = new UpdateRowsEventInfo(currentTable, timestamp, currentBinlogFile, currentBinlogPosition, event.getData());
+ UpdateRowsEventInfo eventInfo = useGtid
+ ? new UpdateRowsEventInfo(currentTable, timestamp, currentGtidSet, event.getData())
+ : new UpdateRowsEventInfo(currentTable, timestamp, currentBinlogFile, currentBinlogPosition, event.getData());
currentSequenceId.set(updateRowsWriter.writeEvent(currentSession, transitUri, eventInfo, currentSequenceId.get(), REL_SUCCESS));
}
break;
case ROTATE:
- // Update current binlog filename
- RotateEventData rotateEventData = event.getData();
- currentBinlogFile = rotateEventData.getBinlogFilename();
- currentBinlogPosition = rotateEventData.getBinlogPosition();
+ if (!useGtid) {
+ // Update current binlog filename
+ RotateEventData rotateEventData = event.getData();
+ currentBinlogFile = rotateEventData.getBinlogFilename();
+ currentBinlogPosition = rotateEventData.getBinlogPosition();
+ }
break;
+
+ case GTID:
+ if (useGtid) {
+ // Update current binlog gtid
+ GtidEventData gtidEventData = event.getData();
+ gtidSet.add(gtidEventData.getGtid());
+ currentGtidSet = gtidSet.toString();
+ }
+ break;
+
default:
break;
}
@@ -917,7 +998,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
// Advance the current binlog position. This way if no more events are received and the processor is stopped, it will resume after the event that was just processed.
// We always get ROTATE and FORMAT_DESCRIPTION messages no matter where we start (even from the end), and they won't have the correct "next position" value, so only
// advance the position if it is not that type of event.
- if (eventType != ROTATE && eventType != FORMAT_DESCRIPTION) {
+ if (eventType != ROTATE && eventType != FORMAT_DESCRIPTION && !useGtid) {
currentBinlogPosition = header.getNextPosition();
}
}
@@ -937,7 +1018,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
doStop.set(true);
if (hasRun.getAndSet(false)) {
- updateState(stateManager, currentBinlogFile, currentBinlogPosition, currentSequenceId.get());
+ updateState(stateManager, currentBinlogFile, currentBinlogPosition, currentSequenceId.get(), currentGtidSet);
}
currentBinlogPosition = -1;
@@ -952,17 +1033,23 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
}
}
- private void updateState(StateManager stateManager, String binlogFile, long binlogPosition, long sequenceId) throws IOException {
+ private void updateState(StateManager stateManager, String binlogFile, long binlogPosition, long sequenceId, String gtidSet) throws IOException {
// Update state with latest values
if (stateManager != null) {
Map<String, String> newStateMap = new HashMap<>(stateManager.getState(Scope.CLUSTER).toMap());
- // Save current binlog filename and position to the state map
+ // Save current binlog filename, position and GTID to the state map
if (binlogFile != null) {
newStateMap.put(BinlogEventInfo.BINLOG_FILENAME_KEY, binlogFile);
}
+
newStateMap.put(BinlogEventInfo.BINLOG_POSITION_KEY, Long.toString(binlogPosition));
newStateMap.put(EventWriter.SEQUENCE_ID_KEY, String.valueOf(sequenceId));
+
+ if (gtidSet != null) {
+ newStateMap.put(BinlogEventInfo.BINLOG_GTIDSET_KEY, gtidSet);
+ }
+
stateManager.setState(newStateMap, Scope.CLUSTER);
}
}
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
index 5df44f3..bde8479 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
@@ -22,6 +22,7 @@ import com.github.shyiko.mysql.binlog.event.Event
import com.github.shyiko.mysql.binlog.event.EventData
import com.github.shyiko.mysql.binlog.event.EventHeaderV4
import com.github.shyiko.mysql.binlog.event.EventType
+import com.github.shyiko.mysql.binlog.event.GtidEventData
import com.github.shyiko.mysql.binlog.event.QueryEventData
import com.github.shyiko.mysql.binlog.event.RotateEventData
import com.github.shyiko.mysql.binlog.event.TableMapEventData
@@ -825,11 +826,13 @@ class CaptureChangeMySQLTest {
// Ensure state not set, as the processor hasn't been stopped and no State Update Interval has been set
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY, null, Scope.CLUSTER)
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, null, Scope.CLUSTER)
+ testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY, null, Scope.CLUSTER)
// Stop the processor and verify the state is set
testRunner.run(1, true, false)
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY, 'master.000001', Scope.CLUSTER)
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, '4', Scope.CLUSTER)
+ testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY, null, Scope.CLUSTER)
testRunner.stateManager.clear(Scope.CLUSTER)
@@ -855,6 +858,7 @@ class CaptureChangeMySQLTest {
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY, 'master.000001', Scope.CLUSTER)
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, '6', Scope.CLUSTER)
+ testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY, null, Scope.CLUSTER)
// COMMIT
client.sendEvent(new Event(
@@ -866,7 +870,106 @@ class CaptureChangeMySQLTest {
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY, 'master.000001', Scope.CLUSTER)
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, '12', Scope.CLUSTER)
+ testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY, null, Scope.CLUSTER)
+ }
+
+ @Test
+ void testUpdateStateUseGtid() throws Exception {
+ testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 'file:///path/to/mysql-connector-java-5.1.38-bin.jar')
+ testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
+ testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
+ testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
+ testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
+ testRunner.setProperty(CaptureChangeMySQL.USE_BINLOG_GTID, 'true')
+
+ testRunner.run(1, false, true)
+
+ // GTID
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.GTID, nextPosition: 2] as EventHeaderV4,
+ [gtid: 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:1'] as GtidEventData
+ ))
+
+ // BEGIN
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4,
+ [database: 'myDB', sql: 'BEGIN'] as QueryEventData
+ ))
+
+ // COMMIT
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.XID, nextPosition: 6] as EventHeaderV4,
+ {} as EventData
+ ))
+
+ testRunner.run(1, false, false)
+
+ // Ensure state not set, as the processor hasn't been stopped and no State Update Interval has been set
+ testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY, null, Scope.CLUSTER)
+ testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, null, Scope.CLUSTER)
+ testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY, null, Scope.CLUSTER)
+
+ // Stop the processor and verify the state is set
+ testRunner.run(1, true, false)
+ testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY, '', Scope.CLUSTER)
+ testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, '-1000', Scope.CLUSTER)
+ testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY, 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:1-1', Scope.CLUSTER)
+
+ testRunner.stateManager.clear(Scope.CLUSTER)
+
+ // Send some events, wait for the State Update Interval, and verify the state was set
+ testRunner.setProperty(CaptureChangeMySQL.STATE_UPDATE_INTERVAL, '1 second')
+ testRunner.run(1, false, true)
+
+ // GTID
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.GTID, nextPosition: 8] as EventHeaderV4,
+ [gtid: 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:2'] as GtidEventData
+ ))
+
+ // BEGIN
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 10] as EventHeaderV4,
+ [database: 'myDB', sql: 'BEGIN'] as QueryEventData
+ ))
+
+ // COMMIT
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.XID, nextPosition: 12] as EventHeaderV4,
+ {} as EventData
+ ))
+
+ sleep(1000)
+
+ testRunner.run(1, false, false)
+
+ testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY, '', Scope.CLUSTER)
+ testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, '-1000', Scope.CLUSTER)
+ testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY, 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:2-2', Scope.CLUSTER)
+
+ // GTID
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.GTID, nextPosition: 14] as EventHeaderV4,
+ [gtid: 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:3'] as GtidEventData
+ ))
+
+ // BEGIN
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 16] as EventHeaderV4,
+ [database: 'myDB', sql: 'BEGIN'] as QueryEventData
+ ))
+
+ // COMMIT
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.XID, nextPosition: 18] as EventHeaderV4,
+ {} as EventData
+ ))
+
+ testRunner.run(1, true, false)
+ testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY, '', Scope.CLUSTER)
+ testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, '-1000', Scope.CLUSTER)
+ testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY, 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:2-3', Scope.CLUSTER)
}
@Test
@@ -936,6 +1039,96 @@ class CaptureChangeMySQLTest {
assertEquals(1, resultFiles.size())
}
+ @Test
+ void testInitialGtidIgnoredWhenStatePresent() throws Exception {
+ testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 'file:///path/to/mysql-connector-java-5.1.38-bin.jar')
+ testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
+ testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
+ testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
+ testRunner.setProperty(CaptureChangeMySQL.USE_BINLOG_GTID, 'true')
+ testRunner.setProperty(CaptureChangeMySQL.INIT_BINLOG_GTID, 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:1')
+ testRunner.setProperty(CaptureChangeMySQL.INIT_SEQUENCE_ID, '10')
+ testRunner.setProperty(CaptureChangeMySQL.RETRIEVE_ALL_RECORDS, 'false')
+ testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, 'true')
+ testRunner.getStateManager().setState([
+ ("${BinlogEventInfo.BINLOG_GTIDSET_KEY}".toString()): 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:2',
+ ("${EventWriter.SEQUENCE_ID_KEY}".toString()): '1'
+ ], Scope.CLUSTER)
+
+ testRunner.run(1, false, true)
+
+ // GTID
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.GTID, nextPosition: 2] as EventHeaderV4,
+ [gtid: 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:3'] as GtidEventData
+ ))
+
+ // BEGIN
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4,
+ [database: 'myDB', sql: 'BEGIN'] as QueryEventData
+ ))
+
+ // COMMIT
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.XID, nextPosition: 12] as EventHeaderV4,
+ {} as EventData
+ ))
+
+ testRunner.run(1, true, false)
+
+ def resultFiles = testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS)
+
+ assertEquals(2, resultFiles.size())
+ assertEquals(
+ 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:2-3',
+ resultFiles.last().getAttribute(BinlogEventInfo.BINLOG_GTIDSET_KEY)
+ )
+ }
+
+ @Test
+ void testInitialGtidNoStatePresent() throws Exception {
+ testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 'file:///path/to/mysql-connector-java-5.1.38-bin.jar')
+ testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
+ testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
+ testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
+ testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
+ testRunner.setProperty(CaptureChangeMySQL.USE_BINLOG_GTID, 'true')
+ testRunner.setProperty(CaptureChangeMySQL.INIT_BINLOG_GTID, 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:1')
+ testRunner.setProperty(CaptureChangeMySQL.RETRIEVE_ALL_RECORDS, 'false')
+ testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, 'true')
+
+ testRunner.run(1, false, true)
+
+ // GTID
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.GTID, nextPosition: 2] as EventHeaderV4,
+ [gtid: 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:3'] as GtidEventData
+ ))
+
+ // BEGIN
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4,
+ [database: 'myDB', sql: 'BEGIN'] as QueryEventData
+ ))
+
+ // COMMIT
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.XID, nextPosition: 12] as EventHeaderV4,
+ {} as EventData
+ ))
+
+ testRunner.run(1, true, false)
+
+ def resultFiles = testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS)
+
+ assertEquals(2, resultFiles.size())
+ assertEquals(
+ 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:1-1:3-3',
+ resultFiles.last().getAttribute(BinlogEventInfo.BINLOG_GTIDSET_KEY)
+ )
+ }
+
/********************************
* Mock and helper classes below
********************************/