You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2022/11/30 19:53:41 UTC

[nifi] 03/22: NIFI-6428 Add 'inTransaction' value to state,fix bug

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.19
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 0a09aa57fd9d69626cc6b2a1ae24fb515f1d4944
Author: sssqhai <sh...@outlook.com>
AuthorDate: Sat May 7 16:35:08 2022 +0800

    NIFI-6428 Add 'inTransaction' value to state,fix bug
    
    Signed-off-by: Matthew Burgess <ma...@apache.org>
    
    This closes #6036
---
 .../cdc/mysql/processors/CaptureChangeMySQL.java   |  26 +-
 .../mysql/processors/CaptureChangeMySQLTest.groovy |  21 +-
 .../nifi/cdc/mysql/CaptureChangeMySQLTest.java     | 393 +++++++++++++++++++++
 .../nifi/cdc/mysql/MockBinlogClientJava.java       | 108 ++++++
 4 files changed, 535 insertions(+), 13 deletions(-)

diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
index cc9d86137c..3e245c7ab0 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
@@ -130,6 +130,8 @@ import static com.github.shyiko.mysql.binlog.event.EventType.PRE_GA_DELETE_ROWS;
 import static com.github.shyiko.mysql.binlog.event.EventType.PRE_GA_WRITE_ROWS;
 import static com.github.shyiko.mysql.binlog.event.EventType.ROTATE;
 import static com.github.shyiko.mysql.binlog.event.EventType.WRITE_ROWS;
+import static com.github.shyiko.mysql.binlog.event.EventType.XID;
+
 
 /**
  * A processor to retrieve Change Data Capture (CDC) events and send them as flow files.
@@ -626,6 +628,8 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
         } else {
             currentSequenceId.set(Long.parseLong(seqIdString));
         }
+        //get inTransaction value from state
+        inTransaction = "true".equals(stateMap.get("inTransaction"));
 
         // Get reference to Distributed Cache if one exists. If it does not, no enrichment (resolution of column names, e.g.) will be performed
         boolean createEnrichmentConnection = false;
@@ -942,6 +946,8 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
                             currentSequenceId.set(beginEventWriter.writeEvent(currentSession, transitUri, beginEvent, currentSequenceId.get(), REL_SUCCESS));
                         }
                         inTransaction = true;
+                        //update inTransaction value to state
+                        updateState(session);
                     } else if ("COMMIT".equals(sql)) {
                         if (!inTransaction) {
                             throw new IOException("COMMIT event received while not processing a transaction (i.e. no corresponding BEGIN event). "
@@ -954,12 +960,11 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
                                     : new CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition);
                             currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS));
                         }
-
+                        //update inTransaction value to state
+                        inTransaction = false;
                         updateState(session);
-
                         // Commit the NiFi session
                         session.commitAsync();
-                        inTransaction = false;
                         currentTable = null;
                     } else {
                         // Check for DDL events (alter table, e.g.). Normalize the query to do string matching on the type of change
@@ -1005,9 +1010,12 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
                         currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS));
                     }
                     // Commit the NiFi session
+                    // update inTransaction value and save next position
+                    // so when restart this processor,we will not read xid event again
+                    inTransaction = false;
+                    currentBinlogPosition = header.getNextPosition();
                     updateState(session);
                     session.commitAsync();
-                    inTransaction = false;
                     currentTable = null;
                     currentDatabase = null;
                     break;
@@ -1089,7 +1097,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
             // Advance the current binlog position. This way if no more events are received and the processor is stopped, it will resume after the event that was just processed.
             // We always get ROTATE and FORMAT_DESCRIPTION messages no matter where we start (even from the end), and they won't have the correct "next position" value, so only
             // advance the position if it is not that type of event.
-            if (eventType != ROTATE && eventType != FORMAT_DESCRIPTION && !useGtid) {
+            if (eventType != ROTATE && eventType != FORMAT_DESCRIPTION && !useGtid && eventType != XID) {
                 currentBinlogPosition = header.getNextPosition();
             }
         }
@@ -1133,10 +1141,10 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
     }
 
     private void updateState(ProcessSession session) throws IOException {
-        updateState(session, currentBinlogFile, currentBinlogPosition, currentSequenceId.get(), currentGtidSet);
+        updateState(session, currentBinlogFile, currentBinlogPosition, currentSequenceId.get(), currentGtidSet, inTransaction);
     }
 
-    private void updateState(ProcessSession session, String binlogFile, long binlogPosition, long sequenceId, String gtidSet) throws IOException {
+    private void updateState(ProcessSession session, String binlogFile, long binlogPosition, long sequenceId, String gtidSet, boolean inTransaction) throws IOException {
         // Update state with latest values
         final Map<String, String> newStateMap = new HashMap<>(session.getState(Scope.CLUSTER).toMap());
 
@@ -1147,6 +1155,8 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
 
         newStateMap.put(BinlogEventInfo.BINLOG_POSITION_KEY, Long.toString(binlogPosition));
         newStateMap.put(EventWriter.SEQUENCE_ID_KEY, String.valueOf(sequenceId));
+        //add inTransaction value into state
+        newStateMap.put("inTransaction", inTransaction ? "true" : "false");
 
         if (gtidSet != null) {
             newStateMap.put(BinlogEventInfo.BINLOG_GTIDSET_KEY, gtidSet);
@@ -1178,7 +1188,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
     }
 
 
-    BinaryLogClient createBinlogClient(String hostname, int port, String username, String password) {
+    protected BinaryLogClient createBinlogClient(String hostname, int port, String username, String password) {
         return new BinaryLogClient(hostname, port, username, password);
     }
 
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
index 5b166599fb..37231e533e 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
@@ -868,7 +868,12 @@ class CaptureChangeMySQLTest {
         testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
         testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
         testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
-
+        final DistributedMapCacheClientImpl cacheClient = createCacheClient()
+        def clientProperties = [:]
+        clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), 'localhost')
+        testRunner.addControllerService('client', cacheClient, clientProperties)
+        testRunner.setProperty(CaptureChangeMySQL.DIST_CACHE_CLIENT, 'client')
+        testRunner.enableControllerService(cacheClient)
         testRunner.run(1, false, true)
 
         // ROTATE
@@ -907,7 +912,7 @@ class CaptureChangeMySQLTest {
         testRunner.run(1, false, false)
 
         testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY, 'master.000001', Scope.CLUSTER)
-        testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, '4', Scope.CLUSTER)
+        testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, '6', Scope.CLUSTER)
         testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY, null, Scope.CLUSTER)
 
         // COMMIT
@@ -931,6 +936,12 @@ class CaptureChangeMySQLTest {
         testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
         testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
         testRunner.setProperty(CaptureChangeMySQL.USE_BINLOG_GTID, 'true')
+        final DistributedMapCacheClientImpl cacheClient = createCacheClient()
+        def clientProperties = [:]
+        clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), 'localhost')
+        testRunner.addControllerService('client', cacheClient, clientProperties)
+        testRunner.setProperty(CaptureChangeMySQL.DIST_CACHE_CLIENT, 'client')
+        testRunner.enableControllerService(cacheClient)
 
         testRunner.run(1, false, true)
 
@@ -956,7 +967,7 @@ class CaptureChangeMySQLTest {
 
         // Stop the processor and verify the state is set
         testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY, '', Scope.CLUSTER)
-        testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, '-1000', Scope.CLUSTER)
+        testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, '6', Scope.CLUSTER)
         testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY, 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:1-1', Scope.CLUSTER)
 
         ((CaptureChangeMySQL) testRunner.getProcessor()).clearState()
@@ -989,7 +1000,7 @@ class CaptureChangeMySQLTest {
         testRunner.run(1, false, false)
 
         testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY, '', Scope.CLUSTER)
-        testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, '-1000', Scope.CLUSTER)
+        testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, '12', Scope.CLUSTER)
         testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY, 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:2-2', Scope.CLUSTER)
 
         // GTID
@@ -1013,7 +1024,7 @@ class CaptureChangeMySQLTest {
         testRunner.run(1, true, false)
 
         testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY, '', Scope.CLUSTER)
-        testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, '-1000', Scope.CLUSTER)
+        testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, '18', Scope.CLUSTER)
         testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY, 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:2-3', Scope.CLUSTER)
     }
 
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/CaptureChangeMySQLTest.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/CaptureChangeMySQLTest.java
new file mode 100644
index 0000000000..0d3fd0f257
--- /dev/null
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/CaptureChangeMySQLTest.java
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql;
+
+
+import com.github.shyiko.mysql.binlog.BinaryLogClient;
+import com.github.shyiko.mysql.binlog.event.Event;
+import com.github.shyiko.mysql.binlog.event.EventData;
+import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
+import com.github.shyiko.mysql.binlog.event.EventType;
+import com.github.shyiko.mysql.binlog.event.QueryEventData;
+import com.github.shyiko.mysql.binlog.event.RotateEventData;
+import com.github.shyiko.mysql.binlog.network.SSLMode;
+import org.apache.commons.io.output.WriterOutputStream;
+import org.apache.nifi.cdc.event.ColumnDefinition;
+import org.apache.nifi.cdc.event.TableInfo;
+import org.apache.nifi.cdc.event.TableInfoCacheKey;
+import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.state.MockStateManager;
+import org.apache.nifi.util.MockComponentLog;
+import org.apache.nifi.util.MockControllerServiceInitializationContext;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class CaptureChangeMySQLTest {
+
+    private static final String DRIVER_LOCATION = "http://mysql-driver.com/driver.jar";
+    CaptureChangeMySQL processor;
+    TestRunner testRunner;
+    MockBinlogClientJava client = new MockBinlogClientJava("localhost", 3306, "root", "password");
+
+    @BeforeEach
+    void setUp() throws Exception {
+        processor = new MockCaptureChangeMySQL();
+        testRunner = TestRunners.newTestRunner(processor);
+    }
+
+    @Test
+    void testConnectionFailures() throws Exception {
+        testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, DRIVER_LOCATION);
+        testRunner.setProperty(CaptureChangeMySQL.HOSTS, "localhost:3306");
+        testRunner.setProperty(CaptureChangeMySQL.USERNAME, "root");
+        testRunner.setProperty(CaptureChangeMySQL.SERVER_ID, "1");
+        final DistributedMapCacheClientImpl cacheClient = createCacheClient();
+        Map<String, String> clientProperties = new HashMap<>();
+        clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), "localhost");
+        testRunner.addControllerService("client", cacheClient, clientProperties);
+        testRunner.setProperty(CaptureChangeMySQL.DIST_CACHE_CLIENT, "client");
+        testRunner.enableControllerService(cacheClient);
+        client.connectionError = true;
+        try {
+            testRunner.run();
+        } catch (AssertionError ae) {
+            Throwable pe = ae.getCause();
+            assertTrue(pe instanceof ProcessException);
+            Throwable ioe = pe.getCause();
+            assertTrue(ioe instanceof IOException);
+            assertEquals("Could not connect binlog client to any of the specified hosts due to: Error during connect", ioe.getMessage());
+            assertTrue(ioe.getCause() instanceof IOException);
+        }
+        client.connectionError = false;
+
+        client.connectionTimeout = true;
+        try {
+            testRunner.run();
+        } catch (AssertionError ae) {
+            Throwable pe = ae.getCause();
+            assertTrue(pe instanceof ProcessException);
+            Throwable ioe = pe.getCause();
+            assertTrue(ioe instanceof IOException);
+            assertEquals("Could not connect binlog client to any of the specified hosts due to: Connection timed out", ioe.getMessage());
+            assertTrue(ioe.getCause() instanceof TimeoutException);
+        }
+        client.connectionTimeout = false;
+    }
+
+    @Test
+    void testSslModeDisabledSslContextServiceNotRequired() {
+        testRunner.setProperty(CaptureChangeMySQL.HOSTS, "localhost:3306");
+        testRunner.setProperty(CaptureChangeMySQL.SSL_MODE, SSLMode.DISABLED.toString());
+        testRunner.assertValid();
+    }
+
+    @Test
+    void testGetXIDEvents() throws Exception {
+        testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, DRIVER_LOCATION);
+        testRunner.setProperty(CaptureChangeMySQL.HOSTS, "localhost:3306");
+        testRunner.setProperty(CaptureChangeMySQL.USERNAME, "root");
+        testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, "2 seconds");
+        testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, "true");
+        final DistributedMapCacheClientImpl cacheClient = createCacheClient();
+        Map<String, String> clientProperties = new HashMap<>();
+        clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), "localhost");
+        testRunner.addControllerService("client", cacheClient, clientProperties);
+        testRunner.setProperty(CaptureChangeMySQL.DIST_CACHE_CLIENT, "client");
+        testRunner.enableControllerService(cacheClient);
+
+        testRunner.run(1, false, true);
+        // COMMIT
+        EventHeaderV4 header2 = new EventHeaderV4();
+        header2.setEventType(EventType.XID);
+        header2.setNextPosition(12);
+        header2.setTimestamp(new Date().getTime());
+        EventData eventData = new EventData() {
+        };
+        client.sendEvent(new Event(header2, eventData));
+
+        // when we ge a xid event without having got a 'begin' event ,throw an exception
+        assertThrows(AssertionError.class, () -> testRunner.run(1, false, false));
+    }
+
+    @Test
+    void testBeginCommitTransaction() throws Exception {
+        testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, DRIVER_LOCATION);
+        testRunner.setProperty(CaptureChangeMySQL.HOSTS, "localhost:3306");
+        testRunner.setProperty(CaptureChangeMySQL.USERNAME, "root");
+        testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, "2 seconds");
+        testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, "true");
+        final DistributedMapCacheClientImpl cacheClient = createCacheClient();
+        Map<String, String> clientProperties = new HashMap<>();
+        clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), "localhost");
+        testRunner.addControllerService("client", cacheClient, clientProperties);
+        testRunner.setProperty(CaptureChangeMySQL.DIST_CACHE_CLIENT, "client");
+        testRunner.enableControllerService(cacheClient);
+
+
+        testRunner.run(1, false, true);
+
+        EventHeaderV4 header = new EventHeaderV4();
+        header.setEventType(EventType.ROTATE);
+        header.setNextPosition(2);
+        header.setTimestamp(new Date().getTime());
+        RotateEventData rotateEventData = new RotateEventData();
+        rotateEventData.setBinlogFilename("mysql-bin.000001");
+        rotateEventData.setBinlogPosition(4L);
+        client.sendEvent(new Event(header, rotateEventData));
+
+        // BEGIN
+        EventHeaderV4 header1 = new EventHeaderV4();
+        header1.setEventType(EventType.QUERY);
+        header1.setNextPosition(6);
+        header1.setTimestamp(new Date().getTime());
+        QueryEventData rotateEventData1 = new QueryEventData();
+        rotateEventData1.setDatabase("mysql-bin.000001");
+        rotateEventData1.setDatabase("myDB");
+        rotateEventData1.setSql("BEGIN");
+        client.sendEvent(new Event(header1, rotateEventData1));
+
+        // COMMIT
+        EventHeaderV4 header2 = new EventHeaderV4();
+        header2.setEventType(EventType.XID);
+        header2.setNextPosition(12);
+        header2.setTimestamp(new Date().getTime());
+        EventData eventData2 = new EventData() {
+        };
+        client.sendEvent(new Event(header2, eventData2));
+
+        //when get a xid event,stop and restart the processor
+        //here we used to get an exception
+        testRunner.run(1, true, false);
+        testRunner.run(1, false, false);
+
+        // next transaction
+        // BEGIN
+        EventHeaderV4 header3 = new EventHeaderV4();
+        header3.setEventType(EventType.QUERY);
+        header3.setNextPosition(16);
+        header3.setTimestamp(new Date().getTime());
+        QueryEventData rotateEventData3 = new QueryEventData();
+        rotateEventData3.setDatabase("mysql-bin.000001");
+        rotateEventData3.setDatabase("myDB");
+        rotateEventData3.setSql("BEGIN");
+        client.sendEvent(new Event(header3, rotateEventData3));
+
+
+        testRunner.run(1, true, false);
+    }
+
+    /********************************
+     * Mock and helper classes below
+     ********************************/
+
+    static DistributedMapCacheClientImpl createCacheClient() throws InitializationException {
+
+        final DistributedMapCacheClientImpl client = new DistributedMapCacheClientImpl();
+        final ComponentLog logger = new MockComponentLog("client", client);
+        final MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client", logger, new MockStateManager(client));
+
+        client.initialize(clientInitContext);
+
+        return client;
+    }
+
+    static final class DistributedMapCacheClientImpl extends AbstractControllerService implements DistributedMapCacheClient {
+
+        private final Map<String, String> cacheMap = new HashMap<>();
+
+
+        @Override
+        protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+            List<PropertyDescriptor> descriptors = new ArrayList<>();
+            descriptors.add(DistributedMapCacheClientService.HOSTNAME);
+            descriptors.add(DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT);
+            descriptors.add(DistributedMapCacheClientService.PORT);
+            descriptors.add(DistributedMapCacheClientService.SSL_CONTEXT_SERVICE);
+            return descriptors;
+        }
+
+        @Override
+        public <K, V> boolean putIfAbsent(
+                final K key,
+                final V value,
+                final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
+
+            StringWriter keyWriter = new StringWriter();
+            keySerializer.serialize(key, new WriterOutputStream(keyWriter));
+            String keyString = keyWriter.toString();
+
+            if (cacheMap.containsKey(keyString)) return false;
+
+            StringWriter valueWriter = new StringWriter();
+            valueSerializer.serialize(value, new WriterOutputStream(valueWriter));
+            return true;
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public <K, V> V getAndPutIfAbsent(
+                final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer,
+                final Deserializer<V> valueDeserializer) throws IOException {
+            StringWriter keyWriter = new StringWriter();
+            keySerializer.serialize(key, new WriterOutputStream(keyWriter));
+            String keyString = keyWriter.toString();
+
+            if (cacheMap.containsKey(keyString))
+                return valueDeserializer.deserialize(cacheMap.get(keyString).getBytes(StandardCharsets.UTF_8));
+
+            StringWriter valueWriter = new StringWriter();
+            valueSerializer.serialize(value, new WriterOutputStream(valueWriter));
+            return null;
+        }
+
+        @Override
+        public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException {
+            StringWriter keyWriter = new StringWriter();
+            keySerializer.serialize(key, new WriterOutputStream(keyWriter));
+            String keyString = keyWriter.toString();
+
+            return cacheMap.containsKey(keyString);
+        }
+
+        @Override
+        public <K, V> V get(
+                final K key,
+                final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException {
+            StringWriter keyWriter = new StringWriter();
+            keySerializer.serialize(key, new WriterOutputStream(keyWriter));
+            String keyString = keyWriter.toString();
+
+            return (cacheMap.containsKey(keyString)) ? valueDeserializer.deserialize(cacheMap.get(keyString).getBytes(StandardCharsets.UTF_8)) : null;
+        }
+
+        @Override
+        public void close() throws IOException {
+
+        }
+
+        @Override
+        public <K> boolean remove(final K key, final Serializer<K> serializer) throws IOException {
+            StringWriter keyWriter = new StringWriter();
+            serializer.serialize(key, new WriterOutputStream(keyWriter));
+            String keyString = keyWriter.toString();
+
+            boolean removed = (cacheMap.containsKey(keyString));
+            cacheMap.remove(keyString);
+            return removed;
+        }
+
+        @Override
+        public long removeByPattern(String regex) throws IOException {
+            final List<String> removedRecords = new ArrayList<>();
+            Pattern p = Pattern.compile(regex);
+            for (String key : cacheMap.keySet()) {
+                // Key must be backed by something that can be converted into a String
+                Matcher m = p.matcher(key);
+                if (m.matches()) {
+                    removedRecords.add(cacheMap.get(key));
+                }
+            }
+            final long numRemoved = removedRecords.size();
+            for (String it : removedRecords) {
+                cacheMap.remove(it);
+            }
+            return numRemoved;
+        }
+
+        @Override
+        public <K, V> void put(
+                final K key,
+                final V value,
+                final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
+            StringWriter keyWriter = new StringWriter();
+            keySerializer.serialize(key, new WriterOutputStream(keyWriter));
+            StringWriter valueWriter = new StringWriter();
+            valueSerializer.serialize(value, new WriterOutputStream(valueWriter));
+        }
+    }
+
+    public class MockCaptureChangeMySQL extends CaptureChangeMySQL {
+
+        Map<TableInfoCacheKey, TableInfo> cache = new HashMap<>();
+
+        public BinaryLogClient createBinlogClient(String hostname, int port, String username, String password) {
+            return client;
+        }
+
+        @Override
+        public TableInfo loadTableInfo(TableInfoCacheKey key) {
+            TableInfo tableInfo = cache.get(key);
+            if (tableInfo == null) {
+                List<ColumnDefinition> column = new ArrayList<>();
+                column.add(new ColumnDefinition((byte) 4, "id"));
+                column.add(new ColumnDefinition((byte) -4, "string1"));
+
+                tableInfo = new TableInfo(key.getDatabaseName(), key.getTableName(), key.getTableId(), column);
+                cache.put(key, tableInfo);
+            }
+            return tableInfo;
+        }
+
+        @Override
+        protected void registerDriver(String locationString, String drvName) throws InitializationException {
+        }
+
+        @Override
+        protected Connection getJdbcConnection() throws SQLException {
+            Connection mockConnection = mock(Connection.class);
+            Statement mockStatement = mock(Statement.class);
+            when(mockConnection.createStatement()).thenReturn(mockStatement);
+            ResultSet mockResultSet = mock(ResultSet.class);
+            when(mockStatement.executeQuery(anyString())).thenReturn(mockResultSet);
+            return mockConnection;
+        }
+    }
+}
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/MockBinlogClientJava.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/MockBinlogClientJava.java
new file mode 100644
index 0000000000..d23822292e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/MockBinlogClientJava.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql;
+
+import com.github.shyiko.mysql.binlog.BinaryLogClient;
+import com.github.shyiko.mysql.binlog.event.Event;
+import com.github.shyiko.mysql.binlog.network.SSLSocketFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
+public class MockBinlogClientJava extends BinaryLogClient {
+    String hostname;
+    int port;
+    String username;
+    String password;
+
+    boolean connected;
+    public boolean connectionTimeout = false;
+    public boolean connectionError = false;
+
+    List<LifecycleListener> lifecycleListeners = new ArrayList<>();
+    SSLSocketFactory sslSocketFactory;
+
+    List<EventListener> eventListeners = new ArrayList<>();
+
+
+    public MockBinlogClientJava(String hostname, int port, String username, String password) {
+        super(hostname, port, username, password);
+        this.hostname = hostname;
+        this.port = port;
+        this.username = username;
+        this.password = password;
+    }
+
+    @Override
+    public void connect(long timeoutInMilliseconds) throws IOException, TimeoutException {
+        if (connectionTimeout) {
+            throw new TimeoutException("Connection timed out");
+        }
+        if (connectionError) {
+            throw new IOException("Error during connect");
+        }
+        if (password == null) {
+            throw new NullPointerException("Password can't be null");
+        }
+        connected = true;
+    }
+
+    @Override
+    public void disconnect() throws IOException {
+        connected = false;
+    }
+
+    @Override
+    public boolean isConnected() {
+        return connected;
+    }
+
+    @Override
+    public void registerEventListener(EventListener eventListener) {
+        eventListeners.add(eventListener);
+    }
+
+    public void unregisterEventListener(EventListener eventListener) {
+        eventListeners.remove(eventListener);
+    }
+
+    @Override
+    public void registerLifecycleListener(LifecycleListener lifecycleListener) {
+        if (!lifecycleListeners.contains(lifecycleListener)) {
+            lifecycleListeners.add(lifecycleListener);
+        }
+    }
+
+    @Override
+    public void unregisterLifecycleListener(LifecycleListener lifecycleListener) {
+        lifecycleListeners.remove(lifecycleListener);
+    }
+
+    @Override
+    public void setSslSocketFactory(SSLSocketFactory sslSocketFactory) {
+        super.setSslSocketFactory(sslSocketFactory);
+        this.sslSocketFactory = sslSocketFactory;
+    }
+
+    public void sendEvent(Event event) {
+        for (EventListener eventListener : eventListeners) {
+            eventListener.onEvent(event);
+        }
+    }
+}