You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2022/11/30 19:53:41 UTC
[nifi] 03/22: NIFI-6428 Add 'inTransaction' value to state,fix bug
This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch support/nifi-1.19
in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 0a09aa57fd9d69626cc6b2a1ae24fb515f1d4944
Author: sssqhai <sh...@outlook.com>
AuthorDate: Sat May 7 16:35:08 2022 +0800
NIFI-6428 Add 'inTransaction' value to state,fix bug
Signed-off-by: Matthew Burgess <ma...@apache.org>
This closes #6036
---
.../cdc/mysql/processors/CaptureChangeMySQL.java | 26 +-
.../mysql/processors/CaptureChangeMySQLTest.groovy | 21 +-
.../nifi/cdc/mysql/CaptureChangeMySQLTest.java | 393 +++++++++++++++++++++
.../nifi/cdc/mysql/MockBinlogClientJava.java | 108 ++++++
4 files changed, 535 insertions(+), 13 deletions(-)
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
index cc9d86137c..3e245c7ab0 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
@@ -130,6 +130,8 @@ import static com.github.shyiko.mysql.binlog.event.EventType.PRE_GA_DELETE_ROWS;
import static com.github.shyiko.mysql.binlog.event.EventType.PRE_GA_WRITE_ROWS;
import static com.github.shyiko.mysql.binlog.event.EventType.ROTATE;
import static com.github.shyiko.mysql.binlog.event.EventType.WRITE_ROWS;
+import static com.github.shyiko.mysql.binlog.event.EventType.XID;
+
/**
* A processor to retrieve Change Data Capture (CDC) events and send them as flow files.
@@ -626,6 +628,8 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
} else {
currentSequenceId.set(Long.parseLong(seqIdString));
}
+ //get inTransaction value from state
+ inTransaction = "true".equals(stateMap.get("inTransaction"));
// Get reference to Distributed Cache if one exists. If it does not, no enrichment (resolution of column names, e.g.) will be performed
boolean createEnrichmentConnection = false;
@@ -942,6 +946,8 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
currentSequenceId.set(beginEventWriter.writeEvent(currentSession, transitUri, beginEvent, currentSequenceId.get(), REL_SUCCESS));
}
inTransaction = true;
+ //update inTransaction value to state
+ updateState(session);
} else if ("COMMIT".equals(sql)) {
if (!inTransaction) {
throw new IOException("COMMIT event received while not processing a transaction (i.e. no corresponding BEGIN event). "
@@ -954,12 +960,11 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
: new CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition);
currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS));
}
-
+ //update inTransaction value to state
+ inTransaction = false;
updateState(session);
-
// Commit the NiFi session
session.commitAsync();
- inTransaction = false;
currentTable = null;
} else {
// Check for DDL events (alter table, e.g.). Normalize the query to do string matching on the type of change
@@ -1005,9 +1010,12 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS));
}
// Commit the NiFi session
+ // update inTransaction value and save next position
+ // so when restart this processor,we will not read xid event again
+ inTransaction = false;
+ currentBinlogPosition = header.getNextPosition();
updateState(session);
session.commitAsync();
- inTransaction = false;
currentTable = null;
currentDatabase = null;
break;
@@ -1089,7 +1097,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
// Advance the current binlog position. This way if no more events are received and the processor is stopped, it will resume after the event that was just processed.
// We always get ROTATE and FORMAT_DESCRIPTION messages no matter where we start (even from the end), and they won't have the correct "next position" value, so only
// advance the position if it is not that type of event.
- if (eventType != ROTATE && eventType != FORMAT_DESCRIPTION && !useGtid) {
+ if (eventType != ROTATE && eventType != FORMAT_DESCRIPTION && !useGtid && eventType != XID) {
currentBinlogPosition = header.getNextPosition();
}
}
@@ -1133,10 +1141,10 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
}
private void updateState(ProcessSession session) throws IOException {
- updateState(session, currentBinlogFile, currentBinlogPosition, currentSequenceId.get(), currentGtidSet);
+ updateState(session, currentBinlogFile, currentBinlogPosition, currentSequenceId.get(), currentGtidSet, inTransaction);
}
- private void updateState(ProcessSession session, String binlogFile, long binlogPosition, long sequenceId, String gtidSet) throws IOException {
+ private void updateState(ProcessSession session, String binlogFile, long binlogPosition, long sequenceId, String gtidSet, boolean inTransaction) throws IOException {
// Update state with latest values
final Map<String, String> newStateMap = new HashMap<>(session.getState(Scope.CLUSTER).toMap());
@@ -1147,6 +1155,8 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
newStateMap.put(BinlogEventInfo.BINLOG_POSITION_KEY, Long.toString(binlogPosition));
newStateMap.put(EventWriter.SEQUENCE_ID_KEY, String.valueOf(sequenceId));
+ //add inTransaction value into state
+ newStateMap.put("inTransaction", inTransaction ? "true" : "false");
if (gtidSet != null) {
newStateMap.put(BinlogEventInfo.BINLOG_GTIDSET_KEY, gtidSet);
@@ -1178,7 +1188,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
}
- BinaryLogClient createBinlogClient(String hostname, int port, String username, String password) {
+ protected BinaryLogClient createBinlogClient(String hostname, int port, String username, String password) {
return new BinaryLogClient(hostname, port, username, password);
}
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
index 5b166599fb..37231e533e 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
@@ -868,7 +868,12 @@ class CaptureChangeMySQLTest {
testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
-
+ final DistributedMapCacheClientImpl cacheClient = createCacheClient()
+ def clientProperties = [:]
+ clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), 'localhost')
+ testRunner.addControllerService('client', cacheClient, clientProperties)
+ testRunner.setProperty(CaptureChangeMySQL.DIST_CACHE_CLIENT, 'client')
+ testRunner.enableControllerService(cacheClient)
testRunner.run(1, false, true)
// ROTATE
@@ -907,7 +912,7 @@ class CaptureChangeMySQLTest {
testRunner.run(1, false, false)
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY, 'master.000001', Scope.CLUSTER)
- testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, '4', Scope.CLUSTER)
+ testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, '6', Scope.CLUSTER)
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY, null, Scope.CLUSTER)
// COMMIT
@@ -931,6 +936,12 @@ class CaptureChangeMySQLTest {
testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
testRunner.setProperty(CaptureChangeMySQL.USE_BINLOG_GTID, 'true')
+ final DistributedMapCacheClientImpl cacheClient = createCacheClient()
+ def clientProperties = [:]
+ clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), 'localhost')
+ testRunner.addControllerService('client', cacheClient, clientProperties)
+ testRunner.setProperty(CaptureChangeMySQL.DIST_CACHE_CLIENT, 'client')
+ testRunner.enableControllerService(cacheClient)
testRunner.run(1, false, true)
@@ -956,7 +967,7 @@ class CaptureChangeMySQLTest {
// Stop the processor and verify the state is set
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY, '', Scope.CLUSTER)
- testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, '-1000', Scope.CLUSTER)
+ testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, '6', Scope.CLUSTER)
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY, 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:1-1', Scope.CLUSTER)
((CaptureChangeMySQL) testRunner.getProcessor()).clearState()
@@ -989,7 +1000,7 @@ class CaptureChangeMySQLTest {
testRunner.run(1, false, false)
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY, '', Scope.CLUSTER)
- testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, '-1000', Scope.CLUSTER)
+ testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, '12', Scope.CLUSTER)
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY, 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:2-2', Scope.CLUSTER)
// GTID
@@ -1013,7 +1024,7 @@ class CaptureChangeMySQLTest {
testRunner.run(1, true, false)
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY, '', Scope.CLUSTER)
- testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, '-1000', Scope.CLUSTER)
+ testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, '18', Scope.CLUSTER)
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY, 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:2-3', Scope.CLUSTER)
}
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/CaptureChangeMySQLTest.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/CaptureChangeMySQLTest.java
new file mode 100644
index 0000000000..0d3fd0f257
--- /dev/null
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/CaptureChangeMySQLTest.java
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql;
+
+
+import com.github.shyiko.mysql.binlog.BinaryLogClient;
+import com.github.shyiko.mysql.binlog.event.Event;
+import com.github.shyiko.mysql.binlog.event.EventData;
+import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
+import com.github.shyiko.mysql.binlog.event.EventType;
+import com.github.shyiko.mysql.binlog.event.QueryEventData;
+import com.github.shyiko.mysql.binlog.event.RotateEventData;
+import com.github.shyiko.mysql.binlog.network.SSLMode;
+import org.apache.commons.io.output.WriterOutputStream;
+import org.apache.nifi.cdc.event.ColumnDefinition;
+import org.apache.nifi.cdc.event.TableInfo;
+import org.apache.nifi.cdc.event.TableInfoCacheKey;
+import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.state.MockStateManager;
+import org.apache.nifi.util.MockComponentLog;
+import org.apache.nifi.util.MockControllerServiceInitializationContext;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class CaptureChangeMySQLTest {
+
+ private static final String DRIVER_LOCATION = "http://mysql-driver.com/driver.jar";
+ CaptureChangeMySQL processor;
+ TestRunner testRunner;
+ MockBinlogClientJava client = new MockBinlogClientJava("localhost", 3306, "root", "password");
+
+ @BeforeEach
+ void setUp() throws Exception {
+ processor = new MockCaptureChangeMySQL();
+ testRunner = TestRunners.newTestRunner(processor);
+ }
+
+ @Test
+ void testConnectionFailures() throws Exception {
+ testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, DRIVER_LOCATION);
+ testRunner.setProperty(CaptureChangeMySQL.HOSTS, "localhost:3306");
+ testRunner.setProperty(CaptureChangeMySQL.USERNAME, "root");
+ testRunner.setProperty(CaptureChangeMySQL.SERVER_ID, "1");
+ final DistributedMapCacheClientImpl cacheClient = createCacheClient();
+ Map<String, String> clientProperties = new HashMap<>();
+ clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), "localhost");
+ testRunner.addControllerService("client", cacheClient, clientProperties);
+ testRunner.setProperty(CaptureChangeMySQL.DIST_CACHE_CLIENT, "client");
+ testRunner.enableControllerService(cacheClient);
+ client.connectionError = true;
+ try {
+ testRunner.run();
+ } catch (AssertionError ae) {
+ Throwable pe = ae.getCause();
+ assertTrue(pe instanceof ProcessException);
+ Throwable ioe = pe.getCause();
+ assertTrue(ioe instanceof IOException);
+ assertEquals("Could not connect binlog client to any of the specified hosts due to: Error during connect", ioe.getMessage());
+ assertTrue(ioe.getCause() instanceof IOException);
+ }
+ client.connectionError = false;
+
+ client.connectionTimeout = true;
+ try {
+ testRunner.run();
+ } catch (AssertionError ae) {
+ Throwable pe = ae.getCause();
+ assertTrue(pe instanceof ProcessException);
+ Throwable ioe = pe.getCause();
+ assertTrue(ioe instanceof IOException);
+ assertEquals("Could not connect binlog client to any of the specified hosts due to: Connection timed out", ioe.getMessage());
+ assertTrue(ioe.getCause() instanceof TimeoutException);
+ }
+ client.connectionTimeout = false;
+ }
+
+ @Test
+ void testSslModeDisabledSslContextServiceNotRequired() {
+ testRunner.setProperty(CaptureChangeMySQL.HOSTS, "localhost:3306");
+ testRunner.setProperty(CaptureChangeMySQL.SSL_MODE, SSLMode.DISABLED.toString());
+ testRunner.assertValid();
+ }
+
+ @Test
+ void testGetXIDEvents() throws Exception {
+ testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, DRIVER_LOCATION);
+ testRunner.setProperty(CaptureChangeMySQL.HOSTS, "localhost:3306");
+ testRunner.setProperty(CaptureChangeMySQL.USERNAME, "root");
+ testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, "2 seconds");
+ testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, "true");
+ final DistributedMapCacheClientImpl cacheClient = createCacheClient();
+ Map<String, String> clientProperties = new HashMap<>();
+ clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), "localhost");
+ testRunner.addControllerService("client", cacheClient, clientProperties);
+ testRunner.setProperty(CaptureChangeMySQL.DIST_CACHE_CLIENT, "client");
+ testRunner.enableControllerService(cacheClient);
+
+ testRunner.run(1, false, true);
+ // COMMIT
+ EventHeaderV4 header2 = new EventHeaderV4();
+ header2.setEventType(EventType.XID);
+ header2.setNextPosition(12);
+ header2.setTimestamp(new Date().getTime());
+ EventData eventData = new EventData() {
+ };
+ client.sendEvent(new Event(header2, eventData));
+
+ // when we ge a xid event without having got a 'begin' event ,throw an exception
+ assertThrows(AssertionError.class, () -> testRunner.run(1, false, false));
+ }
+
+ @Test
+ void testBeginCommitTransaction() throws Exception {
+ testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, DRIVER_LOCATION);
+ testRunner.setProperty(CaptureChangeMySQL.HOSTS, "localhost:3306");
+ testRunner.setProperty(CaptureChangeMySQL.USERNAME, "root");
+ testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, "2 seconds");
+ testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, "true");
+ final DistributedMapCacheClientImpl cacheClient = createCacheClient();
+ Map<String, String> clientProperties = new HashMap<>();
+ clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), "localhost");
+ testRunner.addControllerService("client", cacheClient, clientProperties);
+ testRunner.setProperty(CaptureChangeMySQL.DIST_CACHE_CLIENT, "client");
+ testRunner.enableControllerService(cacheClient);
+
+
+ testRunner.run(1, false, true);
+
+ EventHeaderV4 header = new EventHeaderV4();
+ header.setEventType(EventType.ROTATE);
+ header.setNextPosition(2);
+ header.setTimestamp(new Date().getTime());
+ RotateEventData rotateEventData = new RotateEventData();
+ rotateEventData.setBinlogFilename("mysql-bin.000001");
+ rotateEventData.setBinlogPosition(4L);
+ client.sendEvent(new Event(header, rotateEventData));
+
+ // BEGIN
+ EventHeaderV4 header1 = new EventHeaderV4();
+ header1.setEventType(EventType.QUERY);
+ header1.setNextPosition(6);
+ header1.setTimestamp(new Date().getTime());
+ QueryEventData rotateEventData1 = new QueryEventData();
+ rotateEventData1.setDatabase("mysql-bin.000001");
+ rotateEventData1.setDatabase("myDB");
+ rotateEventData1.setSql("BEGIN");
+ client.sendEvent(new Event(header1, rotateEventData1));
+
+ // COMMIT
+ EventHeaderV4 header2 = new EventHeaderV4();
+ header2.setEventType(EventType.XID);
+ header2.setNextPosition(12);
+ header2.setTimestamp(new Date().getTime());
+ EventData eventData2 = new EventData() {
+ };
+ client.sendEvent(new Event(header2, eventData2));
+
+ //when get a xid event,stop and restart the processor
+ //here we used to get an exception
+ testRunner.run(1, true, false);
+ testRunner.run(1, false, false);
+
+ // next transaction
+ // BEGIN
+ EventHeaderV4 header3 = new EventHeaderV4();
+ header3.setEventType(EventType.QUERY);
+ header3.setNextPosition(16);
+ header3.setTimestamp(new Date().getTime());
+ QueryEventData rotateEventData3 = new QueryEventData();
+ rotateEventData3.setDatabase("mysql-bin.000001");
+ rotateEventData3.setDatabase("myDB");
+ rotateEventData3.setSql("BEGIN");
+ client.sendEvent(new Event(header3, rotateEventData3));
+
+
+ testRunner.run(1, true, false);
+ }
+
+ /********************************
+ * Mock and helper classes below
+ ********************************/
+
+ static DistributedMapCacheClientImpl createCacheClient() throws InitializationException {
+
+ final DistributedMapCacheClientImpl client = new DistributedMapCacheClientImpl();
+ final ComponentLog logger = new MockComponentLog("client", client);
+ final MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client", logger, new MockStateManager(client));
+
+ client.initialize(clientInitContext);
+
+ return client;
+ }
+
+ static final class DistributedMapCacheClientImpl extends AbstractControllerService implements DistributedMapCacheClient {
+
+ private final Map<String, String> cacheMap = new HashMap<>();
+
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ List<PropertyDescriptor> descriptors = new ArrayList<>();
+ descriptors.add(DistributedMapCacheClientService.HOSTNAME);
+ descriptors.add(DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT);
+ descriptors.add(DistributedMapCacheClientService.PORT);
+ descriptors.add(DistributedMapCacheClientService.SSL_CONTEXT_SERVICE);
+ return descriptors;
+ }
+
+ @Override
+ public <K, V> boolean putIfAbsent(
+ final K key,
+ final V value,
+ final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
+
+ StringWriter keyWriter = new StringWriter();
+ keySerializer.serialize(key, new WriterOutputStream(keyWriter));
+ String keyString = keyWriter.toString();
+
+ if (cacheMap.containsKey(keyString)) return false;
+
+ StringWriter valueWriter = new StringWriter();
+ valueSerializer.serialize(value, new WriterOutputStream(valueWriter));
+ return true;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <K, V> V getAndPutIfAbsent(
+ final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer,
+ final Deserializer<V> valueDeserializer) throws IOException {
+ StringWriter keyWriter = new StringWriter();
+ keySerializer.serialize(key, new WriterOutputStream(keyWriter));
+ String keyString = keyWriter.toString();
+
+ if (cacheMap.containsKey(keyString))
+ return valueDeserializer.deserialize(cacheMap.get(keyString).getBytes(StandardCharsets.UTF_8));
+
+ StringWriter valueWriter = new StringWriter();
+ valueSerializer.serialize(value, new WriterOutputStream(valueWriter));
+ return null;
+ }
+
+ @Override
+ public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException {
+ StringWriter keyWriter = new StringWriter();
+ keySerializer.serialize(key, new WriterOutputStream(keyWriter));
+ String keyString = keyWriter.toString();
+
+ return cacheMap.containsKey(keyString);
+ }
+
+ @Override
+ public <K, V> V get(
+ final K key,
+ final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException {
+ StringWriter keyWriter = new StringWriter();
+ keySerializer.serialize(key, new WriterOutputStream(keyWriter));
+ String keyString = keyWriter.toString();
+
+ return (cacheMap.containsKey(keyString)) ? valueDeserializer.deserialize(cacheMap.get(keyString).getBytes(StandardCharsets.UTF_8)) : null;
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public <K> boolean remove(final K key, final Serializer<K> serializer) throws IOException {
+ StringWriter keyWriter = new StringWriter();
+ serializer.serialize(key, new WriterOutputStream(keyWriter));
+ String keyString = keyWriter.toString();
+
+ boolean removed = (cacheMap.containsKey(keyString));
+ cacheMap.remove(keyString);
+ return removed;
+ }
+
+ @Override
+ public long removeByPattern(String regex) throws IOException {
+ final List<String> removedRecords = new ArrayList<>();
+ Pattern p = Pattern.compile(regex);
+ for (String key : cacheMap.keySet()) {
+ // Key must be backed by something that can be converted into a String
+ Matcher m = p.matcher(key);
+ if (m.matches()) {
+ removedRecords.add(cacheMap.get(key));
+ }
+ }
+ final long numRemoved = removedRecords.size();
+ for (String it : removedRecords) {
+ cacheMap.remove(it);
+ }
+ return numRemoved;
+ }
+
+ @Override
+ public <K, V> void put(
+ final K key,
+ final V value,
+ final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
+ StringWriter keyWriter = new StringWriter();
+ keySerializer.serialize(key, new WriterOutputStream(keyWriter));
+ StringWriter valueWriter = new StringWriter();
+ valueSerializer.serialize(value, new WriterOutputStream(valueWriter));
+ }
+ }
+
+ public class MockCaptureChangeMySQL extends CaptureChangeMySQL {
+
+ Map<TableInfoCacheKey, TableInfo> cache = new HashMap<>();
+
+ public BinaryLogClient createBinlogClient(String hostname, int port, String username, String password) {
+ return client;
+ }
+
+ @Override
+ public TableInfo loadTableInfo(TableInfoCacheKey key) {
+ TableInfo tableInfo = cache.get(key);
+ if (tableInfo == null) {
+ List<ColumnDefinition> column = new ArrayList<>();
+ column.add(new ColumnDefinition((byte) 4, "id"));
+ column.add(new ColumnDefinition((byte) -4, "string1"));
+
+ tableInfo = new TableInfo(key.getDatabaseName(), key.getTableName(), key.getTableId(), column);
+ cache.put(key, tableInfo);
+ }
+ return tableInfo;
+ }
+
+ @Override
+ protected void registerDriver(String locationString, String drvName) throws InitializationException {
+ }
+
+ @Override
+ protected Connection getJdbcConnection() throws SQLException {
+ Connection mockConnection = mock(Connection.class);
+ Statement mockStatement = mock(Statement.class);
+ when(mockConnection.createStatement()).thenReturn(mockStatement);
+ ResultSet mockResultSet = mock(ResultSet.class);
+ when(mockStatement.executeQuery(anyString())).thenReturn(mockResultSet);
+ return mockConnection;
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/MockBinlogClientJava.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/MockBinlogClientJava.java
new file mode 100644
index 0000000000..d23822292e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/MockBinlogClientJava.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql;
+
+import com.github.shyiko.mysql.binlog.BinaryLogClient;
+import com.github.shyiko.mysql.binlog.event.Event;
+import com.github.shyiko.mysql.binlog.network.SSLSocketFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
+public class MockBinlogClientJava extends BinaryLogClient {
+ String hostname;
+ int port;
+ String username;
+ String password;
+
+ boolean connected;
+ public boolean connectionTimeout = false;
+ public boolean connectionError = false;
+
+ List<LifecycleListener> lifecycleListeners = new ArrayList<>();
+ SSLSocketFactory sslSocketFactory;
+
+ List<EventListener> eventListeners = new ArrayList<>();
+
+
+ public MockBinlogClientJava(String hostname, int port, String username, String password) {
+ super(hostname, port, username, password);
+ this.hostname = hostname;
+ this.port = port;
+ this.username = username;
+ this.password = password;
+ }
+
+ @Override
+ public void connect(long timeoutInMilliseconds) throws IOException, TimeoutException {
+ if (connectionTimeout) {
+ throw new TimeoutException("Connection timed out");
+ }
+ if (connectionError) {
+ throw new IOException("Error during connect");
+ }
+ if (password == null) {
+ throw new NullPointerException("Password can't be null");
+ }
+ connected = true;
+ }
+
+ @Override
+ public void disconnect() throws IOException {
+ connected = false;
+ }
+
+ @Override
+ public boolean isConnected() {
+ return connected;
+ }
+
+ @Override
+ public void registerEventListener(EventListener eventListener) {
+ eventListeners.add(eventListener);
+ }
+
+ public void unregisterEventListener(EventListener eventListener) {
+ eventListeners.remove(eventListener);
+ }
+
+ @Override
+ public void registerLifecycleListener(LifecycleListener lifecycleListener) {
+ if (!lifecycleListeners.contains(lifecycleListener)) {
+ lifecycleListeners.add(lifecycleListener);
+ }
+ }
+
+ @Override
+ public void unregisterLifecycleListener(LifecycleListener lifecycleListener) {
+ lifecycleListeners.remove(lifecycleListener);
+ }
+
+ @Override
+ public void setSslSocketFactory(SSLSocketFactory sslSocketFactory) {
+ super.setSslSocketFactory(sslSocketFactory);
+ this.sslSocketFactory = sslSocketFactory;
+ }
+
+ public void sendEvent(Event event) {
+ for (EventListener eventListener : eventListeners) {
+ eventListener.onEvent(event);
+ }
+ }
+}