You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2020/09/18 00:56:57 UTC

[nifi] 08/11: NIFI-5583: Add cdc processor for MySQL referring to GTID.

This is an automated email from the ASF dual-hosted git repository.

alopresto pushed a commit to branch support/nifi-1.12.x
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 174720104872b43dbaf94a268690d986de7f6b03
Author: yoshiata <yo...@yahoo-corp.jp>
AuthorDate: Mon Jul 2 15:55:50 2018 +0900

    NIFI-5583: Add cdc processor for MySQL referring to GTID.
    
    Signed-off-by: Pierre Villard <pi...@gmail.com>
    
    This closes #2997.
---
 .../nifi/cdc/mysql/event/BaseBinlogEventInfo.java  |  10 ++
 .../cdc/mysql/event/BaseBinlogRowEventInfo.java    |   6 +
 .../cdc/mysql/event/BaseBinlogTableEventInfo.java  |   5 +
 .../cdc/mysql/event/BeginTransactionEventInfo.java |   5 +
 .../nifi/cdc/mysql/event/BinlogEventInfo.java      |   3 +
 .../mysql/event/CommitTransactionEventInfo.java    |   5 +
 .../apache/nifi/cdc/mysql/event/DDLEventInfo.java  |   5 +
 .../nifi/cdc/mysql/event/DeleteRowsEventInfo.java  |   4 +
 .../nifi/cdc/mysql/event/InsertRowsEventInfo.java  |   5 +
 .../nifi/cdc/mysql/event/UpdateRowsEventInfo.java  |   5 +
 .../mysql/event/io/AbstractBinlogEventWriter.java  |  19 +-
 .../cdc/mysql/processors/CaptureChangeMySQL.java   | 177 ++++++++++++++-----
 .../mysql/processors/CaptureChangeMySQLTest.groovy | 193 +++++++++++++++++++++
 13 files changed, 393 insertions(+), 49 deletions(-)

diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogEventInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogEventInfo.java
index 7089cda..9106e93 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogEventInfo.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogEventInfo.java
@@ -25,6 +25,7 @@ public class BaseBinlogEventInfo extends BaseEventInfo implements BinlogEventInf
 
     private String binlogFilename;
     private Long binlogPosition;
+    private String binlogGtidSet;
 
     public BaseBinlogEventInfo(String eventType, Long timestamp, String binlogFilename, Long binlogPosition) {
         super(eventType, timestamp);
@@ -32,6 +33,11 @@ public class BaseBinlogEventInfo extends BaseEventInfo implements BinlogEventInf
         this.binlogPosition = binlogPosition;
     }
 
+    public BaseBinlogEventInfo(String eventType, Long timestamp, String binlogGtidSet) {
+        super(eventType, timestamp);
+        this.binlogGtidSet = binlogGtidSet;
+    }
+
     public String getBinlogFilename() {
         return binlogFilename;
     }
@@ -39,4 +45,8 @@ public class BaseBinlogEventInfo extends BaseEventInfo implements BinlogEventInf
     public Long getBinlogPosition() {
         return binlogPosition;
     }
+
+    public String getBinlogGtidSet() {
+        return binlogGtidSet;
+    }
 }
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogRowEventInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogRowEventInfo.java
index 4796947..6517225 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogRowEventInfo.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogRowEventInfo.java
@@ -37,6 +37,12 @@ public class BaseBinlogRowEventInfo<RowEventDataType> extends BaseBinlogTableEve
         this.delegate = new BaseRowEventInfo<>(tableInfo, type, timestamp, rows);
     }
 
+    public BaseBinlogRowEventInfo(TableInfo tableInfo, String type, Long timestamp, String binlogGtidSet, BitSet includedColumns, List<RowEventDataType> rows) {
+        super(tableInfo, type, timestamp, binlogGtidSet);
+        this.includedColumns = includedColumns;
+        this.delegate = new BaseRowEventInfo<>(tableInfo, type, timestamp, rows);
+    }
+
     public BitSet getIncludedColumns() {
         return includedColumns;
     }
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogTableEventInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogTableEventInfo.java
index c5d3ddf..ea23023 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogTableEventInfo.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogTableEventInfo.java
@@ -35,6 +35,11 @@ public class BaseBinlogTableEventInfo extends BaseBinlogEventInfo implements Bin
         this.delegate = new BaseTableEventInfo(tableInfo, DDL_EVENT, timestamp);
     }
 
+    public BaseBinlogTableEventInfo(TableInfo tableInfo, String eventType, Long timestamp, String binlogGtidSet) {
+        super(eventType, timestamp, binlogGtidSet);
+        this.delegate = new BaseTableEventInfo(tableInfo, DDL_EVENT, timestamp);
+    }
+
     @Override
     public String getDatabaseName() {
         return delegate.getDatabaseName();
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BeginTransactionEventInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BeginTransactionEventInfo.java
index c6f5af0..ed899f5 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BeginTransactionEventInfo.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BeginTransactionEventInfo.java
@@ -28,6 +28,11 @@ public class BeginTransactionEventInfo extends BaseBinlogEventInfo {
         this.databaseName = databaseName;
     }
 
+    public BeginTransactionEventInfo(String databaseName, Long timestamp, String binlogGtidSet) {
+        super(BEGIN_EVENT, timestamp, binlogGtidSet);
+        this.databaseName = databaseName;
+    }
+
     public String getDatabaseName() {
         return databaseName;
     }
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogEventInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogEventInfo.java
index 6398d4f..9784fc4 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogEventInfo.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogEventInfo.java
@@ -25,8 +25,11 @@ public interface BinlogEventInfo extends EventInfo {
 
     String BINLOG_FILENAME_KEY = "binlog.filename";
     String BINLOG_POSITION_KEY = "binlog.position";
+    String BINLOG_GTIDSET_KEY = "binlog.gtidset";
 
     String getBinlogFilename();
 
     Long getBinlogPosition();
+
+    String getBinlogGtidSet();
 }
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/CommitTransactionEventInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/CommitTransactionEventInfo.java
index b4a230a..a5406ea 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/CommitTransactionEventInfo.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/CommitTransactionEventInfo.java
@@ -29,6 +29,11 @@ public class CommitTransactionEventInfo extends BaseBinlogEventInfo {
         this.databaseName = databaseName;
     }
 
+    public CommitTransactionEventInfo(String databaseName, Long timestamp, String binlogGtidSet) {
+        super(COMMIT_EVENT, timestamp, binlogGtidSet);
+        this.databaseName = databaseName;
+    }
+
     public String getDatabaseName() {
         return databaseName;
     }
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/DDLEventInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/DDLEventInfo.java
index bc2c871..6da8d80 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/DDLEventInfo.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/DDLEventInfo.java
@@ -32,6 +32,11 @@ public class DDLEventInfo extends BaseBinlogTableEventInfo implements TableEvent
         this.query = query;
     }
 
+    public DDLEventInfo(TableInfo tableInfo, Long timestamp, String binlogGtidSet, String query) {
+        super(tableInfo, DDL_EVENT, timestamp, binlogGtidSet);
+        this.query = query;
+    }
+
     public String getQuery() {
         return query;
     }
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/DeleteRowsEventInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/DeleteRowsEventInfo.java
index 3b538ec..98db096 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/DeleteRowsEventInfo.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/DeleteRowsEventInfo.java
@@ -29,4 +29,8 @@ public class DeleteRowsEventInfo extends BaseBinlogRowEventInfo<Serializable[]>
     public DeleteRowsEventInfo(TableInfo tableInfo, Long timestamp, String binlogFilename, Long binlogPosition, DeleteRowsEventData data) {
         super(tableInfo, DELETE_EVENT, timestamp, binlogFilename, binlogPosition, data.getIncludedColumns(), data.getRows());
     }
+
+    public DeleteRowsEventInfo(TableInfo tableInfo, Long timestamp, String binlogGtidSet, DeleteRowsEventData data) {
+        super(tableInfo, DELETE_EVENT, timestamp, binlogGtidSet, data.getIncludedColumns(), data.getRows());
+    }
 }
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/InsertRowsEventInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/InsertRowsEventInfo.java
index 7d75511..edce548 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/InsertRowsEventInfo.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/InsertRowsEventInfo.java
@@ -32,4 +32,9 @@ public class InsertRowsEventInfo extends BaseBinlogRowEventInfo<Serializable[]>
         super(tableInfo, INSERT_EVENT, timestamp, binlogFilename, binlogPosition, data.getIncludedColumns(), data.getRows());
         this.data = data;
     }
+
+    public InsertRowsEventInfo(TableInfo tableInfo, Long timestamp, String binlogGtidSet, WriteRowsEventData data) {
+        super(tableInfo, INSERT_EVENT, timestamp, binlogGtidSet, data.getIncludedColumns(), data.getRows());
+        this.data = data;
+    }
 }
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/UpdateRowsEventInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/UpdateRowsEventInfo.java
index 2e20dc6..14edcaa 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/UpdateRowsEventInfo.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/UpdateRowsEventInfo.java
@@ -35,6 +35,11 @@ public class UpdateRowsEventInfo extends BaseBinlogRowEventInfo<Map.Entry<Serial
         includedColumnsBeforeUpdate = data.getIncludedColumnsBeforeUpdate();
     }
 
+    public UpdateRowsEventInfo(TableInfo tableInfo, Long timestamp, String binlogGtidSet, UpdateRowsEventData data) {
+        super(tableInfo, UPDATE_EVENT, timestamp, binlogGtidSet, data.getIncludedColumns(), data.getRows());
+        includedColumnsBeforeUpdate = data.getIncludedColumnsBeforeUpdate();
+    }
+
     public BitSet getIncludedColumnsBeforeUpdate() {
         return includedColumnsBeforeUpdate;
     }
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogEventWriter.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogEventWriter.java
index df4424c..09186f2 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogEventWriter.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogEventWriter.java
@@ -33,8 +33,14 @@ import java.util.Map;
 public abstract class AbstractBinlogEventWriter<T extends BinlogEventInfo> extends AbstractEventWriter<T> {
 
     protected void writeJson(T event) throws IOException {
-        jsonGenerator.writeStringField("binlog_filename", event.getBinlogFilename());
-        jsonGenerator.writeNumberField("binlog_position", event.getBinlogPosition());
+        String gtidSet = event.getBinlogGtidSet();
+
+        if (gtidSet == null) {
+            jsonGenerator.writeStringField("binlog_filename", event.getBinlogFilename());
+            jsonGenerator.writeNumberField("binlog_position", event.getBinlogPosition());
+        } else {
+            jsonGenerator.writeStringField("binlog_gtidset", event.getBinlogGtidSet());
+        }
     }
 
     protected Map<String, String> getCommonAttributes(final long sequenceId, BinlogEventInfo eventInfo) {
@@ -42,8 +48,13 @@ public abstract class AbstractBinlogEventWriter<T extends BinlogEventInfo> exten
             {
                 put(SEQUENCE_ID_KEY, Long.toString(sequenceId));
                 put(CDC_EVENT_TYPE_ATTRIBUTE, eventInfo.getEventType());
-                put(BinlogEventInfo.BINLOG_FILENAME_KEY, eventInfo.getBinlogFilename());
-                put(BinlogEventInfo.BINLOG_POSITION_KEY, Long.toString(eventInfo.getBinlogPosition()));
+                String gtidSet = eventInfo.getBinlogGtidSet();
+                if (gtidSet == null) {
+                    put(BinlogEventInfo.BINLOG_FILENAME_KEY, eventInfo.getBinlogFilename());
+                    put(BinlogEventInfo.BINLOG_POSITION_KEY, Long.toString(eventInfo.getBinlogPosition()));
+                } else {
+                    put(BinlogEventInfo.BINLOG_GTIDSET_KEY, gtidSet);
+                }
                 put(CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON);
             }
         };
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
index 1fcbfa6..edeb49c 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
@@ -26,9 +26,11 @@ import static com.github.shyiko.mysql.binlog.event.EventType.ROTATE;
 import static com.github.shyiko.mysql.binlog.event.EventType.WRITE_ROWS;
 
 import com.github.shyiko.mysql.binlog.BinaryLogClient;
+import com.github.shyiko.mysql.binlog.GtidSet;
 import com.github.shyiko.mysql.binlog.event.Event;
 import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
 import com.github.shyiko.mysql.binlog.event.EventType;
+import com.github.shyiko.mysql.binlog.event.GtidEventData;
 import com.github.shyiko.mysql.binlog.event.QueryEventData;
 import com.github.shyiko.mysql.binlog.event.RotateEventData;
 import com.github.shyiko.mysql.binlog.event.TableMapEventData;
@@ -314,7 +316,8 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
             .name("capture-change-mysql-init-binlog-filename")
             .displayName("Initial Binlog Filename")
             .description("Specifies an initial binlog filename to use if this processor's State does not have a current binlog filename. If a filename is present "
-                    + "in the processor's State, this property is ignored. This can be used along with Initial Binlog Position to \"skip ahead\" if previous events are not desired. "
+                    + "in the processor's State or \"Use GTID\" property is set to false, this property is ignored. "
+                    + "This can be used along with Initial Binlog Position to \"skip ahead\" if previous events are not desired. "
                     + "Note that NiFi Expression Language is supported, but this property is evaluated when the processor is configured, so FlowFile attributes may not be used. Expression "
                     + "Language is supported to enable the use of the Variable Registry and/or environment properties.")
             .required(false)
@@ -326,30 +329,57 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
             .name("capture-change-mysql-init-binlog-position")
             .displayName("Initial Binlog Position")
             .description("Specifies an initial offset into a binlog (specified by Initial Binlog Filename) to use if this processor's State does not have a current "
-                    + "binlog filename. If a filename is present in the processor's State, this property is ignored. This can be used along with Initial Binlog Filename "
-                    + "to \"skip ahead\" if previous events are not desired. Note that NiFi Expression Language is supported, but this property is evaluated when the "
-                    + "processor is configured, so FlowFile attributes may not be used. Expression Language is supported to enable the use of the Variable Registry "
-                    + "and/or environment properties.")
+                    + "binlog filename. If a filename is present in the processor's State or \"Use GTID\" property is false, this property is ignored. "
+                    + "This can be used along with Initial Binlog Filename to \"skip ahead\" if previous events are not desired. Note that NiFi Expression Language "
+                    + "is supported, but this property is evaluated when the processor is configured, so FlowFile attributes may not be used. Expression Language is "
+                    + "supported to enable the use of the Variable Registry and/or environment properties.")
             .required(false)
             .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
+    public static final PropertyDescriptor USE_BINLOG_GTID = new PropertyDescriptor.Builder()
+            .name("capture-change-mysql-use-gtid")
+            .displayName("Use Binlog GTID")
+            .description("Specifies whether to use Global Transaction ID (GTID) for binlog tracking. If set to true, processor's state of binlog file name and position is ignored. "
+                    + "The main benefit of using GTID is to have much reliable failover than using binlog filename/position.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor INIT_BINLOG_GTID = new PropertyDescriptor.Builder()
+            .name("capture-change-mysql-init-gtid")
+            .displayName("Initial Binlog GTID")
+            .description("Specifies an initial GTID to use if this processor's State does not have a current GTID. "
+                    + "If a GTID is present in the processor's State or \"Use GTID\" property is set to false, this property is ignored. "
+                    + "This can be used to \"skip ahead\" if previous events are not desired. "
+                    + "Note that NiFi Expression Language is supported, but this property is evaluated when the processor is configured, so FlowFile attributes may not be used. "
+                    + "Expression Language is supported to enable the use of the Variable Registry and/or environment properties.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
     private static final List<PropertyDescriptor> propDescriptors;
 
     private volatile ProcessSession currentSession;
     private BinaryLogClient binlogClient;
     private BinlogEventListener eventListener;
     private BinlogLifecycleListener lifecycleListener;
+    private GtidSet gtidSet;
 
     private final LinkedBlockingQueue<RawBinlogEvent> queue = new LinkedBlockingQueue<>();
     private volatile String currentBinlogFile = null;
     private volatile long currentBinlogPosition = 4;
+    private volatile String currentGtidSet = null;
 
-    // The following variables save the value of the binlog filename and position (and sequence id) at the beginning of a transaction. Used for rollback
+    // The following variables save the value of the binlog filename, position, (sequence id), and gtid at the beginning of a transaction. Used for rollback
     private volatile String xactBinlogFile = null;
     private volatile long xactBinlogPosition = 4;
     private volatile long xactSequenceId = 0;
+    private volatile String xactGtidSet = null;
 
     private volatile TableInfo currentTable = null;
     private volatile String currentDatabase = null;
@@ -357,6 +387,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
     private volatile Pattern tableNamePattern;
     private volatile boolean includeBeginCommit = false;
     private volatile boolean includeDDLEvents = false;
+    private volatile boolean useGtid = false;
 
     private volatile boolean inTransaction = false;
     private volatile boolean skipTable = false;
@@ -408,6 +439,8 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
         pds.add(INIT_SEQUENCE_ID);
         pds.add(INIT_BINLOG_FILENAME);
         pds.add(INIT_BINLOG_POSITION);
+        pds.add(USE_BINLOG_GTID);
+        pds.add(INIT_BINLOG_GTID);
         propDescriptors = Collections.unmodifiableList(pds);
     }
 
@@ -450,32 +483,48 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
 
         includeBeginCommit = context.getProperty(INCLUDE_BEGIN_COMMIT).asBoolean();
         includeDDLEvents = context.getProperty(INCLUDE_DDL_EVENTS).asBoolean();
-
-        // Set current binlog filename to whatever is in State, falling back to the Retrieve All Records then Initial Binlog Filename if no State variable is present
-        currentBinlogFile = stateMap.get(BinlogEventInfo.BINLOG_FILENAME_KEY);
-        if (currentBinlogFile == null) {
-            if (!getAllRecords) {
-                if (context.getProperty(INIT_BINLOG_FILENAME).isSet()) {
-                    currentBinlogFile = context.getProperty(INIT_BINLOG_FILENAME).evaluateAttributeExpressions().getValue();
+        useGtid = context.getProperty(USE_BINLOG_GTID).asBoolean();
+
+        if (useGtid) {
+            // Set current gtid to whatever is in State, falling back to the Retrieve All Records then Initial Gtid if no State variable is present
+            currentGtidSet = stateMap.get(BinlogEventInfo.BINLOG_GTIDSET_KEY);
+            if (currentGtidSet == null) {
+                if (!getAllRecords && context.getProperty(INIT_BINLOG_GTID).isSet()) {
+                    currentGtidSet = context.getProperty(INIT_BINLOG_GTID).evaluateAttributeExpressions().getValue();
+                } else {
+                    // If we're starting from the beginning of all binlogs, the binlog gtid must be the empty string (not null)
+                    currentGtidSet = "";
+                }
+            }
+            currentBinlogFile = "";
+            currentBinlogPosition = DO_NOT_SET;
+        } else {
+            // Set current binlog filename to whatever is in State, falling back to the Retrieve All Records then Initial Binlog Filename if no State variable is present
+            currentBinlogFile = stateMap.get(BinlogEventInfo.BINLOG_FILENAME_KEY);
+            if (currentBinlogFile == null) {
+                if (!getAllRecords) {
+                    if (context.getProperty(INIT_BINLOG_FILENAME).isSet()) {
+                        currentBinlogFile = context.getProperty(INIT_BINLOG_FILENAME).evaluateAttributeExpressions().getValue();
+                    }
+                } else {
+                    // If we're starting from the beginning of all binlogs, the binlog filename must be the empty string (not null)
+                    currentBinlogFile = "";
                 }
-            } else {
-                // If we're starting from the beginning of all binlogs, the binlog filename must be the empty string (not null)
-                currentBinlogFile = "";
             }
-        }
 
-        // Set current binlog position to whatever is in State, falling back to the Retrieve All Records then Initial Binlog Filename if no State variable is present
-        String binlogPosition = stateMap.get(BinlogEventInfo.BINLOG_POSITION_KEY);
-        if (binlogPosition != null) {
-            currentBinlogPosition = Long.valueOf(binlogPosition);
-        } else if (!getAllRecords) {
-            if (context.getProperty(INIT_BINLOG_POSITION).isSet()) {
-                currentBinlogPosition = context.getProperty(INIT_BINLOG_POSITION).evaluateAttributeExpressions().asLong();
+            // Set current binlog position to whatever is in State, falling back to the Retrieve All Records then Initial Binlog Filename if no State variable is present
+            String binlogPosition = stateMap.get(BinlogEventInfo.BINLOG_POSITION_KEY);
+            if (binlogPosition != null) {
+                currentBinlogPosition = Long.valueOf(binlogPosition);
+            } else if (!getAllRecords) {
+                if (context.getProperty(INIT_BINLOG_POSITION).isSet()) {
+                    currentBinlogPosition = context.getProperty(INIT_BINLOG_POSITION).evaluateAttributeExpressions().asLong();
+                } else {
+                    currentBinlogPosition = DO_NOT_SET;
+                }
             } else {
-                currentBinlogPosition = DO_NOT_SET;
+                currentBinlogPosition = -1;
             }
-        } else {
-            currentBinlogPosition = -1;
         }
 
         // Get current sequence ID from state
@@ -571,7 +620,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
             long timeSinceLastUpdate = now - lastStateUpdate;
 
             if (stateUpdateInterval != 0 && timeSinceLastUpdate >= stateUpdateInterval) {
-                updateState(stateManager, currentBinlogFile, currentBinlogPosition, currentSequenceId.get());
+                updateState(stateManager, currentBinlogFile, currentBinlogPosition, currentSequenceId.get(), currentGtidSet);
                 lastStateUpdate = now;
             }
         } catch (IOException ioe) {
@@ -580,6 +629,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
                 currentBinlogFile = xactBinlogFile == null ? "" : xactBinlogFile;
                 currentBinlogPosition = xactBinlogPosition;
                 currentSequenceId.set(xactSequenceId);
+                currentGtidSet = xactGtidSet;
                 inTransaction = false;
                 stop(stateManager);
                 queue.clear();
@@ -681,6 +731,9 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
                 binlogClient.setBinlogPosition(currentBinlogPosition);
             }
 
+            binlogClient.setGtidSet(currentGtidSet);
+            binlogClient.setGtidSetFallbackToPurged(true);
+
             if (serverId != null) {
                 binlogClient.setServerId(serverId);
             }
@@ -719,6 +772,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
             }
         }
 
+        gtidSet = new GtidSet(binlogClient.getGtidSet());
         doStop.set(false);
     }
 
@@ -736,7 +790,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
             // We always get ROTATE and FORMAT_DESCRIPTION messages no matter where we start (even from the end), and they won't have the correct "next position" value, so only
             // advance the position if it is not that type of event. ROTATE events don't generate output CDC events and have the current binlog position in a special field, which
             // is filled in during the ROTATE case
-            if (eventType != ROTATE && eventType != FORMAT_DESCRIPTION) {
+            if (eventType != ROTATE && eventType != FORMAT_DESCRIPTION && !useGtid) {
                 currentBinlogPosition = header.getPosition();
             }
             log.debug("Got message event type: {} ", new Object[]{header.getEventType().toString()});
@@ -790,13 +844,16 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
                         if (inTransaction) {
                             throw new IOException("BEGIN event received while already processing a transaction. This could indicate that your binlog position is invalid.");
                         }
-                        // Mark the current binlog position in case we have to rollback the transaction (if the processor is stopped, e.g.)
+                        // Mark the current binlog position and GTID in case we have to rollback the transaction (if the processor is stopped, e.g.)
                         xactBinlogFile = currentBinlogFile;
                         xactBinlogPosition = currentBinlogPosition;
                         xactSequenceId = currentSequenceId.get();
+                        xactGtidSet = currentGtidSet;
 
                         if (includeBeginCommit && (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches())) {
-                            BeginTransactionEventInfo beginEvent = new BeginTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition);
+                            BeginTransactionEventInfo beginEvent = useGtid
+                                    ? new BeginTransactionEventInfo(currentDatabase, timestamp, currentGtidSet)
+                                    : new BeginTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition);
                             currentSequenceId.set(beginEventWriter.writeEvent(currentSession, transitUri, beginEvent, currentSequenceId.get(), REL_SUCCESS));
                         }
                         inTransaction = true;
@@ -807,7 +864,9 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
                         }
                         // InnoDB generates XID events for "commit", but MyISAM generates Query events with "COMMIT", so handle that here
                         if (includeBeginCommit && (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches())) {
-                            CommitTransactionEventInfo commitTransactionEvent = new CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition);
+                            CommitTransactionEventInfo commitTransactionEvent = useGtid
+                                    ? new CommitTransactionEventInfo(currentDatabase, timestamp, currentGtidSet)
+                                    : new CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition);
                             currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS));
                         }
                         // Commit the NiFi session
@@ -829,7 +888,9 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
                             if (includeDDLEvents && (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches())) {
                                 // If we don't have table information, we can still use the database name
                                 TableInfo ddlTableInfo = (currentTable != null) ? currentTable : new TableInfo(currentDatabase, null, null, null);
-                                DDLEventInfo ddlEvent = new DDLEventInfo(ddlTableInfo, timestamp, currentBinlogFile, currentBinlogPosition, sql);
+                                DDLEventInfo ddlEvent = useGtid
+                                        ? new DDLEventInfo(ddlTableInfo, timestamp, currentGtidSet, sql)
+                                        : new DDLEventInfo(ddlTableInfo, timestamp, currentBinlogFile, currentBinlogPosition, sql);
                                 currentSequenceId.set(ddlEventWriter.writeEvent(currentSession, transitUri, ddlEvent, currentSequenceId.get(), REL_SUCCESS));
                             }
                             // Remove all the keys from the cache that this processor added
@@ -850,7 +911,9 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
                                 + "This could indicate that your binlog position is invalid.");
                     }
                     if (includeBeginCommit && (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches())) {
-                        CommitTransactionEventInfo commitTransactionEvent = new CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition);
+                        CommitTransactionEventInfo commitTransactionEvent = useGtid
+                                ? new CommitTransactionEventInfo(currentDatabase, timestamp, currentGtidSet)
+                                : new CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition);
                         currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS));
                     }
                     // Commit the NiFi session
@@ -887,29 +950,47 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
                             || eventType == EXT_WRITE_ROWS
                             || eventType == PRE_GA_WRITE_ROWS) {
 
-                        InsertRowsEventInfo eventInfo = new InsertRowsEventInfo(currentTable, timestamp, currentBinlogFile, currentBinlogPosition, event.getData());
+                        InsertRowsEventInfo eventInfo = useGtid
+                                ? new InsertRowsEventInfo(currentTable, timestamp, currentGtidSet, event.getData())
+                                : new InsertRowsEventInfo(currentTable, timestamp, currentBinlogFile, currentBinlogPosition, event.getData());
                         currentSequenceId.set(insertRowsWriter.writeEvent(currentSession, transitUri, eventInfo, currentSequenceId.get(), REL_SUCCESS));
 
                     } else if (eventType == DELETE_ROWS
                             || eventType == EXT_DELETE_ROWS
                             || eventType == PRE_GA_DELETE_ROWS) {
 
-                        DeleteRowsEventInfo eventInfo = new DeleteRowsEventInfo(currentTable, timestamp, currentBinlogFile, currentBinlogPosition, event.getData());
+                        DeleteRowsEventInfo eventInfo = useGtid
+                                ? new DeleteRowsEventInfo(currentTable, timestamp, currentGtidSet, event.getData())
+                                : new DeleteRowsEventInfo(currentTable, timestamp, currentBinlogFile, currentBinlogPosition, event.getData());
                         currentSequenceId.set(deleteRowsWriter.writeEvent(currentSession, transitUri, eventInfo, currentSequenceId.get(), REL_SUCCESS));
 
                     } else {
                         // Update event
-                        UpdateRowsEventInfo eventInfo = new UpdateRowsEventInfo(currentTable, timestamp, currentBinlogFile, currentBinlogPosition, event.getData());
+                        UpdateRowsEventInfo eventInfo = useGtid
+                                ? new UpdateRowsEventInfo(currentTable, timestamp, currentGtidSet, event.getData())
+                                : new UpdateRowsEventInfo(currentTable, timestamp, currentBinlogFile, currentBinlogPosition, event.getData());
                         currentSequenceId.set(updateRowsWriter.writeEvent(currentSession, transitUri, eventInfo, currentSequenceId.get(), REL_SUCCESS));
                     }
                     break;
 
                 case ROTATE:
-                    // Update current binlog filename
-                    RotateEventData rotateEventData = event.getData();
-                    currentBinlogFile = rotateEventData.getBinlogFilename();
-                    currentBinlogPosition = rotateEventData.getBinlogPosition();
+                    if (!useGtid) {
+                        // Update current binlog filename
+                        RotateEventData rotateEventData = event.getData();
+                        currentBinlogFile = rotateEventData.getBinlogFilename();
+                        currentBinlogPosition = rotateEventData.getBinlogPosition();
+                    }
                     break;
+
+                case GTID:
+                    if (useGtid) {
+                        // Update current binlog gtid
+                        GtidEventData gtidEventData = event.getData();
+                        gtidSet.add(gtidEventData.getGtid());
+                        currentGtidSet = gtidSet.toString();
+                    }
+                    break;
+
                 default:
                     break;
             }
@@ -917,7 +998,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
             // Advance the current binlog position. This way if no more events are received and the processor is stopped, it will resume after the event that was just processed.
             // We always get ROTATE and FORMAT_DESCRIPTION messages no matter where we start (even from the end), and they won't have the correct "next position" value, so only
             // advance the position if it is not that type of event.
-            if (eventType != ROTATE && eventType != FORMAT_DESCRIPTION) {
+            if (eventType != ROTATE && eventType != FORMAT_DESCRIPTION && !useGtid) {
                 currentBinlogPosition = header.getNextPosition();
             }
         }
@@ -937,7 +1018,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
             doStop.set(true);
 
             if (hasRun.getAndSet(false)) {
-                updateState(stateManager, currentBinlogFile, currentBinlogPosition, currentSequenceId.get());
+                updateState(stateManager, currentBinlogFile, currentBinlogPosition, currentSequenceId.get(), currentGtidSet);
             }
             currentBinlogPosition = -1;
 
@@ -952,17 +1033,23 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
         }
     }
 
-    private void updateState(StateManager stateManager, String binlogFile, long binlogPosition, long sequenceId) throws IOException {
+    private void updateState(StateManager stateManager, String binlogFile, long binlogPosition, long sequenceId, String gtidSet) throws IOException {
         // Update state with latest values
         if (stateManager != null) {
             Map<String, String> newStateMap = new HashMap<>(stateManager.getState(Scope.CLUSTER).toMap());
 
-            // Save current binlog filename and position to the state map
+            // Save current binlog filename, position and GTID to the state map
             if (binlogFile != null) {
                 newStateMap.put(BinlogEventInfo.BINLOG_FILENAME_KEY, binlogFile);
             }
+
             newStateMap.put(BinlogEventInfo.BINLOG_POSITION_KEY, Long.toString(binlogPosition));
             newStateMap.put(EventWriter.SEQUENCE_ID_KEY, String.valueOf(sequenceId));
+
+            if (gtidSet != null) {
+                newStateMap.put(BinlogEventInfo.BINLOG_GTIDSET_KEY, gtidSet);
+            }
+
             stateManager.setState(newStateMap, Scope.CLUSTER);
         }
     }
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
index 5df44f3..bde8479 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
@@ -22,6 +22,7 @@ import com.github.shyiko.mysql.binlog.event.Event
 import com.github.shyiko.mysql.binlog.event.EventData
 import com.github.shyiko.mysql.binlog.event.EventHeaderV4
 import com.github.shyiko.mysql.binlog.event.EventType
+import com.github.shyiko.mysql.binlog.event.GtidEventData
 import com.github.shyiko.mysql.binlog.event.QueryEventData
 import com.github.shyiko.mysql.binlog.event.RotateEventData
 import com.github.shyiko.mysql.binlog.event.TableMapEventData
@@ -825,11 +826,13 @@ class CaptureChangeMySQLTest {
         // Ensure state not set, as the processor hasn't been stopped and no State Update Interval has been set
         testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY, null, Scope.CLUSTER)
         testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, null, Scope.CLUSTER)
+        testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY, null, Scope.CLUSTER)
 
         // Stop the processor and verify the state is set
         testRunner.run(1, true, false)
         testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY, 'master.000001', Scope.CLUSTER)
         testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, '4', Scope.CLUSTER)
+        testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY, null, Scope.CLUSTER)
 
         testRunner.stateManager.clear(Scope.CLUSTER)
 
@@ -855,6 +858,7 @@ class CaptureChangeMySQLTest {
 
         testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY, 'master.000001', Scope.CLUSTER)
         testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, '6', Scope.CLUSTER)
+        testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY, null, Scope.CLUSTER)
 
         // COMMIT
         client.sendEvent(new Event(
@@ -866,7 +870,106 @@ class CaptureChangeMySQLTest {
 
         testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY, 'master.000001', Scope.CLUSTER)
         testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, '12', Scope.CLUSTER)
+        testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY, null, Scope.CLUSTER)
+    }
+
+    @Test
+    void testUpdateStateUseGtid() throws Exception {
+        testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 'file:///path/to/mysql-connector-java-5.1.38-bin.jar')
+        testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
+        testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
+        testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
+        testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
+        testRunner.setProperty(CaptureChangeMySQL.USE_BINLOG_GTID, 'true')
+
+        testRunner.run(1, false, true)
+
+        // GTID
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.GTID, nextPosition: 2] as EventHeaderV4,
+                [gtid: 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:1'] as GtidEventData
+        ))
+
+        // BEGIN
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4,
+                [database: 'myDB', sql: 'BEGIN'] as QueryEventData
+        ))
+
+        // COMMIT
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.XID, nextPosition: 6] as EventHeaderV4,
+                {} as EventData
+        ))
+
+        testRunner.run(1, false, false)
+
+        // Ensure state not set, as the processor hasn't been stopped and no State Update Interval has been set
+        testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY, null, Scope.CLUSTER)
+        testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, null, Scope.CLUSTER)
+        testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY, null, Scope.CLUSTER)
+
+        // Stop the processor and verify the state is set
+        testRunner.run(1, true, false)
+        testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY, '', Scope.CLUSTER)
+        testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, '-1000', Scope.CLUSTER)
+        testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY, 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:1-1', Scope.CLUSTER)
+
+        testRunner.stateManager.clear(Scope.CLUSTER)
+
+        // Send some events, wait for the State Update Interval, and verify the state was set
+        testRunner.setProperty(CaptureChangeMySQL.STATE_UPDATE_INTERVAL, '1 second')
+        testRunner.run(1, false, true)
+
+        // GTID
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.GTID, nextPosition: 8] as EventHeaderV4,
+                [gtid: 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:2'] as GtidEventData
+        ))
+
+        // BEGIN
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 10] as EventHeaderV4,
+                [database: 'myDB', sql: 'BEGIN'] as QueryEventData
+        ))
+
+        // COMMIT
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.XID, nextPosition: 12] as EventHeaderV4,
+                {} as EventData
+        ))
+
+        sleep(1000)
+
+        testRunner.run(1, false, false)
+
+        testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY, '', Scope.CLUSTER)
+        testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, '-1000', Scope.CLUSTER)
+        testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY, 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:2-2', Scope.CLUSTER)
+
+        // GTID
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.GTID, nextPosition: 14] as EventHeaderV4,
+                [gtid: 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:3'] as GtidEventData
+        ))
+
+        // BEGIN
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 16] as EventHeaderV4,
+                [database: 'myDB', sql: 'BEGIN'] as QueryEventData
+        ))
+
+        // COMMIT
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.XID, nextPosition: 18] as EventHeaderV4,
+                {} as EventData
+        ))
+
+        testRunner.run(1, true, false)
 
+        testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY, '', Scope.CLUSTER)
+        testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, '-1000', Scope.CLUSTER)
+        testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY, 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:2-3', Scope.CLUSTER)
     }
 
     @Test
@@ -936,6 +1039,96 @@ class CaptureChangeMySQLTest {
         assertEquals(1, resultFiles.size())
     }
 
+    @Test
+    void testInitialGtidIgnoredWhenStatePresent() throws Exception {
+        testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 'file:///path/to/mysql-connector-java-5.1.38-bin.jar')
+        testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
+        testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
+        testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
+        testRunner.setProperty(CaptureChangeMySQL.USE_BINLOG_GTID, 'true')
+        testRunner.setProperty(CaptureChangeMySQL.INIT_BINLOG_GTID, 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:1')
+        testRunner.setProperty(CaptureChangeMySQL.INIT_SEQUENCE_ID, '10')
+        testRunner.setProperty(CaptureChangeMySQL.RETRIEVE_ALL_RECORDS, 'false')
+        testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, 'true')
+        testRunner.getStateManager().setState([
+                ("${BinlogEventInfo.BINLOG_GTIDSET_KEY}".toString()): 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:2',
+                ("${EventWriter.SEQUENCE_ID_KEY}".toString()): '1'
+        ], Scope.CLUSTER)
+
+        testRunner.run(1, false, true)
+
+        // GTID
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.GTID, nextPosition: 2] as EventHeaderV4,
+                [gtid: 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:3'] as GtidEventData
+        ))
+
+        // BEGIN
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4,
+                [database: 'myDB', sql: 'BEGIN'] as QueryEventData
+        ))
+
+        // COMMIT
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.XID, nextPosition: 12] as EventHeaderV4,
+                {} as EventData
+        ))
+
+        testRunner.run(1, true, false)
+
+        def resultFiles = testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS)
+
+        assertEquals(2, resultFiles.size())
+        assertEquals(
+                'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:2-3',
+                resultFiles.last().getAttribute(BinlogEventInfo.BINLOG_GTIDSET_KEY)
+        )
+    }
+
+    @Test
+    void testInitialGtidNoStatePresent() throws Exception {
+        testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 'file:///path/to/mysql-connector-java-5.1.38-bin.jar')
+        testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
+        testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
+        testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
+        testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
+        testRunner.setProperty(CaptureChangeMySQL.USE_BINLOG_GTID, 'true')
+        testRunner.setProperty(CaptureChangeMySQL.INIT_BINLOG_GTID, 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:1')
+        testRunner.setProperty(CaptureChangeMySQL.RETRIEVE_ALL_RECORDS, 'false')
+        testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, 'true')
+
+        testRunner.run(1, false, true)
+
+        // GTID
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.GTID, nextPosition: 2] as EventHeaderV4,
+                [gtid: 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:3'] as GtidEventData
+        ))
+
+        // BEGIN
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4,
+                [database: 'myDB', sql: 'BEGIN'] as QueryEventData
+        ))
+
+        // COMMIT
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.XID, nextPosition: 12] as EventHeaderV4,
+                {} as EventData
+        ))
+
+        testRunner.run(1, true, false)
+
+        def resultFiles = testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS)
+
+        assertEquals(2, resultFiles.size())
+        assertEquals(
+                'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:1-1:3-3',
+                resultFiles.last().getAttribute(BinlogEventInfo.BINLOG_GTIDSET_KEY)
+        )
+    }
+
     /********************************
      * Mock and helper classes below
      ********************************/