You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "mattyb149 (via GitHub)" <gi...@apache.org> on 2023/04/03 20:20:39 UTC

[GitHub] [nifi] mattyb149 opened a new pull request, #7116: NIFI-11380: Refactor CaptureChangeMySQL with improvements

mattyb149 opened a new pull request, #7116:
URL: https://github.com/apache/nifi/pull/7116

   # Summary
   
   [NIFI-11380](https://issues.apache.org/jira/browse/NIFI-11380) This PR makes a number of improvements to the readability, maintainability, and capability of the CaptureChangeMySQL processor. See the Jira for a detailed list
   
   # Tracking
   
   Please complete the following tracking steps prior to pull request creation.
   
   ### Issue Tracking
   
   - [x] [Apache NiFi Jira](https://issues.apache.org/jira/browse/NIFI) issue created
   
   ### Pull Request Tracking
   
   - [x] Pull Request title starts with Apache NiFi Jira issue number, such as `NIFI-00000`
   - [x] Pull Request commit message starts with Apache NiFi Jira issue number, as such `NIFI-00000`
   
   ### Pull Request Formatting
   
   - [x] Pull Request based on current revision of the `main` branch
   - [x] Pull Request refers to a feature branch with one commit containing changes
   
   # Verification
   
   Please indicate the verification steps performed prior to pull request creation.
   
   ### Build
   
   - [ ] Build completed using `mvn clean install -P contrib-check`
     - [x] JDK 11
     - [ ] JDK 17
   
   ### Licensing
   
   - [ ] New dependencies are compatible with the [Apache License 2.0](https://apache.org/licenses/LICENSE-2.0) according to the [License Policy](https://www.apache.org/legal/resolved.html)
   - [ ] New dependencies are documented in applicable `LICENSE` and `NOTICE` files
   
   ### Documentation
   
   - [ ] Documentation formatting appears as expected in rendered files
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] mattyb149 commented on a diff in pull request #7116: NIFI-11380: Refactor CaptureChangeMySQL with improvements

Posted by "mattyb149 (via GitHub)" <gi...@apache.org>.
mattyb149 commented on code in PR #7116:
URL: https://github.com/apache/nifi/pull/7116#discussion_r1183708724


##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java:
##########
@@ -464,58 +460,44 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
 
     private static final List<PropertyDescriptor> propDescriptors;
 
-    private volatile ProcessSession currentSession;
-    private BinaryLogClient binlogClient;
-    private BinlogEventListener eventListener;
-    private BinlogLifecycleListener lifecycleListener;
-    private GtidSet gtidSet;
+    private volatile BinaryLogClient binlogClient;
+    private volatile BinlogEventListener eventListener;
+    private volatile BinlogLifecycleListener lifecycleListener;
+    private volatile GtidSet gtidSet;
 
     // Set queue capacity to avoid excessive memory consumption
     private final BlockingQueue<RawBinlogEvent> queue = new LinkedBlockingQueue<>(1000);
-    private volatile String currentBinlogFile = null;
-    private volatile long currentBinlogPosition = 4;
-    private volatile String currentGtidSet = null;
-
-    // The following variables save the value of the binlog filename, position, (sequence id), and gtid at the beginning of a transaction. Used for rollback
-    private volatile String xactBinlogFile = null;
-    private volatile long xactBinlogPosition = 4;
-    private volatile long xactSequenceId = 0;
-    private volatile String xactGtidSet = null;
-
-    private volatile TableInfo currentTable = null;
-    private volatile String currentDatabase = null;
-    private volatile Pattern databaseNamePattern;
-    private volatile Pattern tableNamePattern;
-    private volatile boolean includeBeginCommit = false;
-    private volatile boolean includeDDLEvents = false;
-    private volatile boolean useGtid = false;
 
-    private volatile boolean inTransaction = false;
-    private volatile boolean skipTable = false;
-    private final AtomicBoolean hasRun = new AtomicBoolean(false);
+    private final Map<TableInfoCacheKey, TableInfo> tableInfoCache = new HashMap<>();
 
-    private int currentHost = 0;
-    private String transitUri = "<unknown>";
+    private volatile ProcessSession currentSession;
+    private DataCaptureState currentDataCaptureState = new DataCaptureState();
 
-    private final AtomicLong currentSequenceId = new AtomicLong(0);
+    private volatile BinlogResourceInfo binlogResourceInfo = new BinlogResourceInfo();
 
-    private volatile DistributedMapCacheClient cacheClient = null;
-    private final Serializer<TableInfoCacheKey> cacheKeySerializer = new TableInfoCacheKey.Serializer();
-    private final Serializer<TableInfo> cacheValueSerializer = new TableInfo.Serializer();
-    private final Deserializer<TableInfo> cacheValueDeserializer = new TableInfo.Deserializer();
+    private volatile Pattern databaseNamePattern;
+    private volatile Pattern tableNamePattern;
+    private volatile boolean skipTable = false;
+    private int currentHost = 0;
+    private volatile JDBCConnectionHolder jdbcConnectionHolder = null;
 
-    private JDBCConnectionHolder jdbcConnectionHolder = null;
+    private final BinlogEventState binlogEventState = new BinlogEventState();
 
     private final BeginTransactionEventWriter beginEventWriter = new BeginTransactionEventWriter();
+    private final BeginEventHandler beginEventHandler = new BeginEventHandler();
     private final CommitTransactionEventWriter commitEventWriter = new CommitTransactionEventWriter();
+    private final CommitEventHandler commitEventHandler = new CommitEventHandler();
     private final DDLEventWriter ddlEventWriter = new DDLEventWriter();
+    private final DDLEventHandler ddlEventHandler = new DDLEventHandler();
     private final InsertRowsWriter insertRowsWriter = new InsertRowsWriter();
+    private final InsertEventHandler insertEventHandler = new InsertEventHandler();

Review Comment:
   That was what I was hoping to do, but I think the required parameters made the interface method pretty awkward. I can take another look



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] mattyb149 commented on pull request #7116: NIFI-11380: Refactor CaptureChangeMySQL with improvements

Posted by "mattyb149 (via GitHub)" <gi...@apache.org>.
mattyb149 commented on PR #7116:
URL: https://github.com/apache/nifi/pull/7116#issuecomment-1529554574

   @pvillard I had tried to upgrade but noticed the breaking tests so went back to the last version that didn't involve a regression or change in behavior. Is this the time to upgrade and change the logic? I suppose as it doesn't manifest itself to the user we just have to change our logic to support any changes in behavior, so I will take another look. If it will affect the user (such as giving different output from before) then we might need to keep this version or at least add it to the Migration Guide.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] asfgit closed pull request #7116: NIFI-11380: Refactor CaptureChangeMySQL with improvements

Posted by "asfgit (via GitHub)" <gi...@apache.org>.
asfgit closed pull request #7116: NIFI-11380: Refactor CaptureChangeMySQL with improvements
URL: https://github.com/apache/nifi/pull/7116


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] pvillard31 commented on pull request #7116: NIFI-11380: Refactor CaptureChangeMySQL with improvements

Posted by "pvillard31 (via GitHub)" <gi...@apache.org>.
pvillard31 commented on PR #7116:
URL: https://github.com/apache/nifi/pull/7116#issuecomment-1529485618

   Hey @mattyb149 - while you're doing this, can we upgrade mysql-binlog-connector to 0.28.0. The upgrade to this version is breaking a few tests in the Groovy test class though:
   
   ````
   [ERROR] Failures: 
   [ERROR]   CaptureChangeMySQLTest.testInitialGtidIgnoredWhenStatePresent:1352 java.lang.NumberFormatException: Error at index 0 in: "xxxxxxxx"
   [ERROR]   CaptureChangeMySQLTest.testInitialGtidNoStatePresent:1395 java.lang.NumberFormatException: Error at index 0 in: "xxxxxxxx"
   [ERROR]   CaptureChangeMySQLTest.testUpdateStateUseGtid:1207 expected: <> but was: <null>
   [INFO] 
   [ERROR] Tests run: 34, Failures: 3, Errors: 0, Skipped: 0
   ````


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] nandorsoma commented on a diff in pull request #7116: NIFI-11380: Refactor CaptureChangeMySQL with improvements

Posted by "nandorsoma (via GitHub)" <gi...@apache.org>.
nandorsoma commented on code in PR #7116:
URL: https://github.com/apache/nifi/pull/7116#discussion_r1176308032


##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/DDLEventHandler.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event.handler;
+
+import com.github.shyiko.mysql.binlog.event.QueryEventData;
+import org.apache.nifi.cdc.event.TableInfo;
+import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
+import org.apache.nifi.cdc.mysql.event.DDLEventInfo;
+import org.apache.nifi.cdc.mysql.event.DataCaptureState;
+import org.apache.nifi.cdc.mysql.event.io.AbstractBinlogEventWriter;
+import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL;
+import org.apache.nifi.processor.ProcessSession;
+
+import static org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL.REL_SUCCESS;
+
+public class DDLEventHandler implements BinlogEventHandler<QueryEventData, DDLEventInfo> {
+    @Override
+    public void handleEvent(final QueryEventData eventData, final boolean writeEvent, DataCaptureState dataCaptureState,
+                            CaptureChangeMySQL.BinlogResourceInfo binlogResourceInfo, CaptureChangeMySQL.BinlogEventState binlogEventState,
+                            final String sql, AbstractBinlogEventWriter<DDLEventInfo> eventWriter,
+                            EventWriterConfiguration eventWriterConfiguration, ProcessSession session, final long timestamp) {
+        TableInfo ddlTableInfo = (binlogResourceInfo.getCurrentTable() != null)

Review Comment:
   Parenthesis is not needed and it can be final. Also in other cases variables could be final. Could you fix them?



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/DDLEventHandler.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event.handler;
+
+import com.github.shyiko.mysql.binlog.event.QueryEventData;
+import org.apache.nifi.cdc.event.TableInfo;
+import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
+import org.apache.nifi.cdc.mysql.event.DDLEventInfo;
+import org.apache.nifi.cdc.mysql.event.DataCaptureState;
+import org.apache.nifi.cdc.mysql.event.io.AbstractBinlogEventWriter;
+import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL;
+import org.apache.nifi.processor.ProcessSession;
+
+import static org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL.REL_SUCCESS;
+
+public class DDLEventHandler implements BinlogEventHandler<QueryEventData, DDLEventInfo> {
+    @Override
+    public void handleEvent(final QueryEventData eventData, final boolean writeEvent, DataCaptureState dataCaptureState,
+                            CaptureChangeMySQL.BinlogResourceInfo binlogResourceInfo, CaptureChangeMySQL.BinlogEventState binlogEventState,
+                            final String sql, AbstractBinlogEventWriter<DDLEventInfo> eventWriter,
+                            EventWriterConfiguration eventWriterConfiguration, ProcessSession session, final long timestamp) {
+        TableInfo ddlTableInfo = (binlogResourceInfo.getCurrentTable() != null)
+                ? binlogResourceInfo.getCurrentTable()
+                : new TableInfo(binlogResourceInfo.getCurrentDatabase(), null, null, null);
+        DDLEventInfo ddlEvent = dataCaptureState.isUseGtid()

Review Comment:
   I think ddlTableInfo and ddlEvent creation could go into the `if (writeEvent)` block to eliminate unnecessary memory allocation. 



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/DeleteEventHandler.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event.handler;
+
+import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
+import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
+import org.apache.nifi.cdc.mysql.event.DataCaptureState;
+import org.apache.nifi.cdc.mysql.event.DeleteRowsEventInfo;
+import org.apache.nifi.cdc.mysql.event.io.AbstractBinlogEventWriter;
+import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL;
+import org.apache.nifi.processor.ProcessSession;
+
+import static org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL.REL_SUCCESS;
+
+public class DeleteEventHandler implements BinlogEventHandler<DeleteRowsEventData, DeleteRowsEventInfo> {
+    @Override
+    public void handleEvent(final DeleteRowsEventData eventData, final boolean writeEvent, DataCaptureState dataCaptureState,
+                            CaptureChangeMySQL.BinlogResourceInfo binlogResourceInfo, CaptureChangeMySQL.BinlogEventState binlogEventState,
+                            final String sql, AbstractBinlogEventWriter<DeleteRowsEventInfo> eventWriter,
+                            EventWriterConfiguration eventWriterConfiguration, ProcessSession session, final long timestamp) {
+        DeleteRowsEventInfo eventInfo = dataCaptureState.isUseGtid()

Review Comment:
   Same as at DDLEventHandler. We could move those lines into the `if (writeEvent)` block.



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/UpdateEventHandler.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event.handler;
+
+import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
+import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
+import org.apache.nifi.cdc.mysql.event.DataCaptureState;
+import org.apache.nifi.cdc.mysql.event.UpdateRowsEventInfo;
+import org.apache.nifi.cdc.mysql.event.io.AbstractBinlogEventWriter;
+import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL;
+import org.apache.nifi.processor.ProcessSession;
+
+import static org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL.REL_SUCCESS;
+
+public class UpdateEventHandler implements BinlogEventHandler<UpdateRowsEventData, UpdateRowsEventInfo> {
+    @Override
+    public void handleEvent(final UpdateRowsEventData eventData, final boolean writeEvent, DataCaptureState dataCaptureState,
+                            CaptureChangeMySQL.BinlogResourceInfo binlogResourceInfo, CaptureChangeMySQL.BinlogEventState binlogEventState,
+                            final String sql, AbstractBinlogEventWriter<UpdateRowsEventInfo> eventWriter,
+                            EventWriterConfiguration eventWriterConfiguration, ProcessSession session, final long timestamp) {
+        UpdateRowsEventInfo eventInfo = dataCaptureState.isUseGtid()

Review Comment:
   Same like in the other cases.



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogTableEventWriter.java:
##########
@@ -19,12 +19,35 @@
 import org.apache.nifi.cdc.mysql.event.BinlogTableEventInfo;
 
 import java.io.IOException;
+import java.io.Serializable;
 
 /**
  * An abstract base class for writing MYSQL table-related binlog events into flow file(s), e.g.
  */
 public abstract class AbstractBinlogTableEventWriter<T extends BinlogTableEventInfo> extends AbstractBinlogEventWriter<T> {
 
+    protected Object getWritableObject(Integer type, Serializable value) {
+        if (value == null) {
+            return null;
+        }
+        if (type == null) {
+            if (value instanceof byte[]) {
+                return new String((byte[]) value);
+            } else if (value instanceof Number) {
+                return value;
+            }
+        } else if (value instanceof Number) {
+            return value;
+        } else {
+            if (value instanceof byte[]) {
+                return new String((byte[]) value);
+            } else {
+                return value.toString();
+            }
+        }
+        return null;
+    }

Review Comment:
   I think it is a bit easier to read this way:
   ```suggestion
       protected Object getWritableObject(Integer type, Serializable value) {
           if (value == null) {
               return null;
           }
   
           if (type == null) {
               if (value instanceof byte[]) {
                   return new String((byte[]) value);
               } else if (value instanceof Number) {
                   return value;
               } else {
                   return null;
               }
           } else {
               if (value instanceof byte[]) {
                   return new String((byte[]) value);
               } else if (value instanceof Number) {
                   return value;
               } else {
                   return value.toString();
               }
           }
       }
   ```



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/InsertEventHandler.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event.handler;
+
+import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
+import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
+import org.apache.nifi.cdc.mysql.event.DataCaptureState;
+import org.apache.nifi.cdc.mysql.event.InsertRowsEventInfo;
+import org.apache.nifi.cdc.mysql.event.io.AbstractBinlogEventWriter;
+import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL;
+import org.apache.nifi.processor.ProcessSession;
+
+import static org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL.REL_SUCCESS;
+
+public class InsertEventHandler implements BinlogEventHandler<WriteRowsEventData, InsertRowsEventInfo> {
+    @Override
+    public void handleEvent(final WriteRowsEventData eventData, final boolean writeEvent, DataCaptureState dataCaptureState,
+                            CaptureChangeMySQL.BinlogResourceInfo binlogResourceInfo, CaptureChangeMySQL.BinlogEventState binlogEventState,
+                            final String sql, AbstractBinlogEventWriter<InsertRowsEventInfo> eventWriter,
+                            EventWriterConfiguration eventWriterConfiguration, ProcessSession session, final long timestamp) {
+        InsertRowsEventInfo eventInfo = dataCaptureState.isUseGtid()

Review Comment:
   Same like in the other cases.



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/BinlogEventHandler.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event.handler;
+
+import com.github.shyiko.mysql.binlog.event.EventData;
+import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
+import org.apache.nifi.cdc.mysql.event.BinlogEventInfo;
+import org.apache.nifi.cdc.mysql.event.DataCaptureState;
+import org.apache.nifi.cdc.mysql.event.io.AbstractBinlogEventWriter;
+import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL;
+import org.apache.nifi.processor.ProcessSession;
+
+public interface BinlogEventHandler<T extends EventData, S extends BinlogEventInfo> {
+
+    void handleEvent(T eventData,
+                     final boolean writeEvent,

Review Comment:
   I'm not really in favor of using finals, but I think when we use them, we should use them consistently. It seems like other attributes can be final in this signature.



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java:
##########
@@ -228,13 +225,14 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
 
     public static final PropertyDescriptor HOSTS = new PropertyDescriptor.Builder()
             .name("capture-change-mysql-hosts")
-            .displayName("MySQL Hosts")
-            .description("A list of hostname/port entries corresponding to nodes in a MySQL cluster. The entries should be comma separated "
-                    + "using a colon such as host1:port,host2:port,....  For example mysql.myhost.com:3306. This processor will attempt to connect to "
+            .displayName("MySQL Nodes")

Review Comment:
   In a separate PR we could rename this property on the main branch. What do you think?



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/CommitEventHandler.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event.handler;
+
+import com.github.shyiko.mysql.binlog.event.EventData;
+import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
+import org.apache.nifi.cdc.mysql.event.CommitTransactionEventInfo;
+import org.apache.nifi.cdc.mysql.event.DataCaptureState;
+import org.apache.nifi.cdc.mysql.event.io.AbstractBinlogEventWriter;
+import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+
+import static org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL.REL_SUCCESS;
+
+public class CommitEventHandler implements BinlogEventHandler<EventData, CommitTransactionEventInfo> {
+    @Override
+    public void handleEvent(final EventData eventData, final boolean writeEvent, DataCaptureState dataCaptureState,
+                            CaptureChangeMySQL.BinlogResourceInfo binlogResourceInfo, CaptureChangeMySQL.BinlogEventState binlogEventState,
+                            final String sql, AbstractBinlogEventWriter<CommitTransactionEventInfo> eventWriter,
+                            EventWriterConfiguration eventWriterConfiguration, ProcessSession session, final long timestamp) {
+        final String currentDatabase = binlogResourceInfo.getCurrentDatabase();

Review Comment:
   As in the other cases it is enough to create currentDatabase and commitEvent in the `if (writeEvent)` block.



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java:
##########
@@ -317,10 +315,8 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
 
     public static final PropertyDescriptor DIST_CACHE_CLIENT = new PropertyDescriptor.Builder()
             .name("capture-change-mysql-dist-map-cache-client")
-            .displayName("Distributed Map Cache Client")
-            .description("Identifies a Distributed Map Cache Client controller service to be used for keeping information about the various table columns, datatypes, etc. "
-                    + "needed by the processor. If a client is not specified, the generated events will not include column type or name information (but they will include database "
-                    + "and table information.")
+            .displayName("Distributed Map Cache Client - unused")

Review Comment:
   In a separate PR we could remove this property on the main branch. I also see that there is another deprecated property "State Update Interval". Maybe we could remove that also. What do you think?



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java:
##########
@@ -464,58 +460,44 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
 
     private static final List<PropertyDescriptor> propDescriptors;
 
-    private volatile ProcessSession currentSession;
-    private BinaryLogClient binlogClient;
-    private BinlogEventListener eventListener;
-    private BinlogLifecycleListener lifecycleListener;
-    private GtidSet gtidSet;
+    private volatile BinaryLogClient binlogClient;
+    private volatile BinlogEventListener eventListener;
+    private volatile BinlogLifecycleListener lifecycleListener;
+    private volatile GtidSet gtidSet;
 
     // Set queue capacity to avoid excessive memory consumption
     private final BlockingQueue<RawBinlogEvent> queue = new LinkedBlockingQueue<>(1000);
-    private volatile String currentBinlogFile = null;
-    private volatile long currentBinlogPosition = 4;
-    private volatile String currentGtidSet = null;
-
-    // The following variables save the value of the binlog filename, position, (sequence id), and gtid at the beginning of a transaction. Used for rollback
-    private volatile String xactBinlogFile = null;
-    private volatile long xactBinlogPosition = 4;
-    private volatile long xactSequenceId = 0;
-    private volatile String xactGtidSet = null;
-
-    private volatile TableInfo currentTable = null;
-    private volatile String currentDatabase = null;
-    private volatile Pattern databaseNamePattern;
-    private volatile Pattern tableNamePattern;
-    private volatile boolean includeBeginCommit = false;
-    private volatile boolean includeDDLEvents = false;
-    private volatile boolean useGtid = false;
 
-    private volatile boolean inTransaction = false;
-    private volatile boolean skipTable = false;
-    private final AtomicBoolean hasRun = new AtomicBoolean(false);
+    private final Map<TableInfoCacheKey, TableInfo> tableInfoCache = new HashMap<>();
 
-    private int currentHost = 0;
-    private String transitUri = "<unknown>";
+    private volatile ProcessSession currentSession;
+    private DataCaptureState currentDataCaptureState = new DataCaptureState();
 
-    private final AtomicLong currentSequenceId = new AtomicLong(0);
+    private volatile BinlogResourceInfo binlogResourceInfo = new BinlogResourceInfo();

Review Comment:
   It's my personal taste, so feel free to ignore it, but I think it is better to initialize the objects when you actually fill then up with values. In this case in the setup method. Also when you stop the processor, you can clean it up by assigning null to it. This way it will use less memory when the processor is stopped. What do you think?



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/BinlogEventHandler.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event.handler;
+
+import com.github.shyiko.mysql.binlog.event.EventData;
+import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
+import org.apache.nifi.cdc.mysql.event.BinlogEventInfo;
+import org.apache.nifi.cdc.mysql.event.DataCaptureState;
+import org.apache.nifi.cdc.mysql.event.io.AbstractBinlogEventWriter;
+import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL;
+import org.apache.nifi.processor.ProcessSession;
+
+public interface BinlogEventHandler<T extends EventData, S extends BinlogEventInfo> {
+
+    void handleEvent(T eventData,
+                     final boolean writeEvent,
+                     DataCaptureState dataCaptureState,
+                     CaptureChangeMySQL.BinlogResourceInfo binlogResourceInfo,
+                     CaptureChangeMySQL.BinlogEventState binlogEventState,
+                     final String sql,
+                     AbstractBinlogEventWriter<S> eventWriter,

Review Comment:
   It seems like there is a 1:1 connection between EventHandlers and EventWriters. So InsertEventHandler will always use InsertEventWriter. Is there a specific reason then that the eventWriter is coming from "outside"? Am I missing something?



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java:
##########
@@ -974,139 +938,92 @@ public void outputEvents(ProcessSession session, ComponentLog log) throws IOExce
 
                     if (!skipTable) {
                         TableInfoCacheKey key = new TableInfoCacheKey(this.getIdentifier(), data.getDatabase(), data.getTable(), data.getTableId());
-                        if (cacheClient != null) {
+                        binlogResourceInfo.setCurrentTable(tableInfoCache.get(key));
+                        if (binlogResourceInfo.getCurrentTable() == null) {
+                            // We don't have an entry for this table yet, so fetch the info from the database and populate the cache
                             try {
-                                currentTable = cacheClient.get(key, cacheKeySerializer, cacheValueDeserializer);
-                            } catch (ConnectException ce) {
-                                throw new IOException("Could not connect to Distributed Map Cache server to get table information", ce);
-                            }
-
-                            if (currentTable == null) {
-                                // We don't have an entry for this table yet, so fetch the info from the database and populate the cache
-                                try {
-                                    currentTable = loadTableInfo(key);
-                                    try {
-                                        cacheClient.put(key, currentTable, cacheKeySerializer, cacheValueSerializer);
-                                    } catch (ConnectException ce) {
-                                        throw new IOException("Could not connect to Distributed Map Cache server to put table information", ce);
-                                    }
-                                } catch (SQLException se) {
-                                    // Propagate the error up, so things like rollback and logging/bulletins can be handled
-                                    throw new IOException(se.getMessage(), se);
-                                }
+                                binlogResourceInfo.setCurrentTable(loadTableInfo(key));
+                                tableInfoCache.put(key, binlogResourceInfo.getCurrentTable());
+                            } catch (SQLException se) {
+                                // Propagate the error up, so things like rollback and logging/bulletins can be handled
+                                throw new IOException(se.getMessage(), se);
                             }
-                        } else {
-                            // Populate a limited version of TableInfo without column information
-                            currentTable = new TableInfo(key.getDatabaseName(), key.getTableName(), key.getTableId(), Collections.emptyList());
                         }
                     } else {
-                        // Clear the current table, to force a reload next time we get a TABLE_MAP event we care about
-                        currentTable = null;
+                        // Clear the current table, to force reload next time we get a TABLE_MAP event we care about
+                        binlogResourceInfo.setCurrentTable(null);
                     }
                     break;
                 case QUERY:
                     QueryEventData queryEventData = event.getData();
-                    currentDatabase = queryEventData.getDatabase();
+                    binlogResourceInfo.setCurrentDatabase(queryEventData.getDatabase());
 
                     String sql = queryEventData.getSql();
 
                     // Is this the start of a transaction?
                     if ("BEGIN".equals(sql)) {
                         // If we're already in a transaction, something bad happened, alert the user
-                        if (inTransaction) {
+                        if (binlogResourceInfo.isInTransaction()) {
                             getLogger().debug("BEGIN event received at pos={} file={} while already processing a transaction. This could indicate that your binlog position is invalid "
-                                    + "or the event stream is out of sync or there was an issue with the processor state.", currentBinlogPosition, currentBinlogFile);
+                                    + "or the event stream is out of sync or there was an issue with the processor state.", dataCaptureState.getBinlogPosition(), dataCaptureState.getBinlogFile());
                         }
-                        // Mark the current binlog position and GTID in case we have to rollback the transaction (if the processor is stopped, e.g.)
-                        xactBinlogFile = currentBinlogFile;
-                        xactBinlogPosition = currentBinlogPosition;
-                        xactSequenceId = currentSequenceId.get();
-                        xactGtidSet = currentGtidSet;
-
-                        if (includeBeginCommit && (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches())) {
-                            BeginTransactionEventInfo beginEvent = useGtid
-                                    ? new BeginTransactionEventInfo(currentDatabase, timestamp, currentGtidSet)
-                                    : new BeginTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition);
-                            currentEventInfo = beginEvent;
-                            currentEventWriter = beginEventWriter;
-                            currentSequenceId.set(beginEventWriter.writeEvent(currentSession, transitUri, beginEvent, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration));
 
+                        if (!(databaseNamePattern != null && !databaseNamePattern.matcher(binlogResourceInfo.getCurrentDatabase()).matches())) {

Review Comment:
   I think this can be simplified.
   ```suggestion
                           if ((databaseNamePattern == null || databaseNamePattern.matcher(binlogResourceInfo.getCurrentDatabase()).matches())) {
   ```



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java:
##########
@@ -464,58 +460,44 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
 
     private static final List<PropertyDescriptor> propDescriptors;
 
-    private volatile ProcessSession currentSession;
-    private BinaryLogClient binlogClient;
-    private BinlogEventListener eventListener;
-    private BinlogLifecycleListener lifecycleListener;
-    private GtidSet gtidSet;
+    private volatile BinaryLogClient binlogClient;
+    private volatile BinlogEventListener eventListener;
+    private volatile BinlogLifecycleListener lifecycleListener;
+    private volatile GtidSet gtidSet;
 
     // Set queue capacity to avoid excessive memory consumption
     private final BlockingQueue<RawBinlogEvent> queue = new LinkedBlockingQueue<>(1000);
-    private volatile String currentBinlogFile = null;
-    private volatile long currentBinlogPosition = 4;
-    private volatile String currentGtidSet = null;
-
-    // The following variables save the value of the binlog filename, position, (sequence id), and gtid at the beginning of a transaction. Used for rollback
-    private volatile String xactBinlogFile = null;
-    private volatile long xactBinlogPosition = 4;
-    private volatile long xactSequenceId = 0;
-    private volatile String xactGtidSet = null;
-
-    private volatile TableInfo currentTable = null;
-    private volatile String currentDatabase = null;
-    private volatile Pattern databaseNamePattern;
-    private volatile Pattern tableNamePattern;
-    private volatile boolean includeBeginCommit = false;
-    private volatile boolean includeDDLEvents = false;
-    private volatile boolean useGtid = false;
 
-    private volatile boolean inTransaction = false;
-    private volatile boolean skipTable = false;
-    private final AtomicBoolean hasRun = new AtomicBoolean(false);
+    private final Map<TableInfoCacheKey, TableInfo> tableInfoCache = new HashMap<>();
 
-    private int currentHost = 0;
-    private String transitUri = "<unknown>";
+    private volatile ProcessSession currentSession;
+    private DataCaptureState currentDataCaptureState = new DataCaptureState();
 
-    private final AtomicLong currentSequenceId = new AtomicLong(0);
+    private volatile BinlogResourceInfo binlogResourceInfo = new BinlogResourceInfo();
 
-    private volatile DistributedMapCacheClient cacheClient = null;
-    private final Serializer<TableInfoCacheKey> cacheKeySerializer = new TableInfoCacheKey.Serializer();
-    private final Serializer<TableInfo> cacheValueSerializer = new TableInfo.Serializer();
-    private final Deserializer<TableInfo> cacheValueDeserializer = new TableInfo.Deserializer();
+    private volatile Pattern databaseNamePattern;
+    private volatile Pattern tableNamePattern;
+    private volatile boolean skipTable = false;
+    private int currentHost = 0;
+    private volatile JDBCConnectionHolder jdbcConnectionHolder = null;
 
-    private JDBCConnectionHolder jdbcConnectionHolder = null;
+    private final BinlogEventState binlogEventState = new BinlogEventState();
 
     private final BeginTransactionEventWriter beginEventWriter = new BeginTransactionEventWriter();
+    private final BeginEventHandler beginEventHandler = new BeginEventHandler();
     private final CommitTransactionEventWriter commitEventWriter = new CommitTransactionEventWriter();
+    private final CommitEventHandler commitEventHandler = new CommitEventHandler();
     private final DDLEventWriter ddlEventWriter = new DDLEventWriter();
+    private final DDLEventHandler ddlEventHandler = new DDLEventHandler();
     private final InsertRowsWriter insertRowsWriter = new InsertRowsWriter();
+    private final InsertEventHandler insertEventHandler = new InsertEventHandler();

Review Comment:
   I'm wondering, is there an option to extract these handlers to a different class, that is able to retreive the required handler? This could greatly simplify the code, but I'm not sure that we can shrink the required parameters to a simple enough interface.



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java:
##########
@@ -974,139 +938,92 @@ public void outputEvents(ProcessSession session, ComponentLog log) throws IOExce
 
                     if (!skipTable) {
                         TableInfoCacheKey key = new TableInfoCacheKey(this.getIdentifier(), data.getDatabase(), data.getTable(), data.getTableId());
-                        if (cacheClient != null) {
+                        binlogResourceInfo.setCurrentTable(tableInfoCache.get(key));
+                        if (binlogResourceInfo.getCurrentTable() == null) {
+                            // We don't have an entry for this table yet, so fetch the info from the database and populate the cache
                             try {
-                                currentTable = cacheClient.get(key, cacheKeySerializer, cacheValueDeserializer);
-                            } catch (ConnectException ce) {
-                                throw new IOException("Could not connect to Distributed Map Cache server to get table information", ce);
-                            }
-
-                            if (currentTable == null) {
-                                // We don't have an entry for this table yet, so fetch the info from the database and populate the cache
-                                try {
-                                    currentTable = loadTableInfo(key);
-                                    try {
-                                        cacheClient.put(key, currentTable, cacheKeySerializer, cacheValueSerializer);
-                                    } catch (ConnectException ce) {
-                                        throw new IOException("Could not connect to Distributed Map Cache server to put table information", ce);
-                                    }
-                                } catch (SQLException se) {
-                                    // Propagate the error up, so things like rollback and logging/bulletins can be handled
-                                    throw new IOException(se.getMessage(), se);
-                                }
+                                binlogResourceInfo.setCurrentTable(loadTableInfo(key));
+                                tableInfoCache.put(key, binlogResourceInfo.getCurrentTable());
+                            } catch (SQLException se) {
+                                // Propagate the error up, so things like rollback and logging/bulletins can be handled
+                                throw new IOException(se.getMessage(), se);
                             }
-                        } else {
-                            // Populate a limited version of TableInfo without column information
-                            currentTable = new TableInfo(key.getDatabaseName(), key.getTableName(), key.getTableId(), Collections.emptyList());
                         }
                     } else {
-                        // Clear the current table, to force a reload next time we get a TABLE_MAP event we care about
-                        currentTable = null;
+                        // Clear the current table, to force reload next time we get a TABLE_MAP event we care about
+                        binlogResourceInfo.setCurrentTable(null);
                     }
                     break;
                 case QUERY:
                     QueryEventData queryEventData = event.getData();
-                    currentDatabase = queryEventData.getDatabase();
+                    binlogResourceInfo.setCurrentDatabase(queryEventData.getDatabase());
 
                     String sql = queryEventData.getSql();
 
                     // Is this the start of a transaction?
                     if ("BEGIN".equals(sql)) {
                         // If we're already in a transaction, something bad happened, alert the user
-                        if (inTransaction) {
+                        if (binlogResourceInfo.isInTransaction()) {
                             getLogger().debug("BEGIN event received at pos={} file={} while already processing a transaction. This could indicate that your binlog position is invalid "
-                                    + "or the event stream is out of sync or there was an issue with the processor state.", currentBinlogPosition, currentBinlogFile);
+                                    + "or the event stream is out of sync or there was an issue with the processor state.", dataCaptureState.getBinlogPosition(), dataCaptureState.getBinlogFile());
                         }
-                        // Mark the current binlog position and GTID in case we have to rollback the transaction (if the processor is stopped, e.g.)
-                        xactBinlogFile = currentBinlogFile;
-                        xactBinlogPosition = currentBinlogPosition;
-                        xactSequenceId = currentSequenceId.get();
-                        xactGtidSet = currentGtidSet;
-
-                        if (includeBeginCommit && (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches())) {
-                            BeginTransactionEventInfo beginEvent = useGtid
-                                    ? new BeginTransactionEventInfo(currentDatabase, timestamp, currentGtidSet)
-                                    : new BeginTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition);
-                            currentEventInfo = beginEvent;
-                            currentEventWriter = beginEventWriter;
-                            currentSequenceId.set(beginEventWriter.writeEvent(currentSession, transitUri, beginEvent, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration));
 
+                        if (!(databaseNamePattern != null && !databaseNamePattern.matcher(binlogResourceInfo.getCurrentDatabase()).matches())) {
+                            beginEventHandler.handleEvent(queryEventData, includeBeginCommit, currentDataCaptureState, binlogResourceInfo,
+                                    binlogEventState, sql, beginEventWriter, eventWriterConfiguration, currentSession, timestamp);
                         }
-                        inTransaction = true;
-                        //update inTransaction value to state
-                        updateState(session);
+                        // Whether we skip this event or not, it's still the beginning of a transaction
+                        binlogResourceInfo.setInTransaction(true);
+
+                        // Update inTransaction value to state
+                        updateState(session, dataCaptureState);
                     } else if ("COMMIT".equals(sql)) {
-                        if (!inTransaction) {
+                        // InnoDB generates XID events for "commit", but MyISAM generates Query events with "COMMIT", so handle that here
+                        if (!binlogResourceInfo.isInTransaction()) {
                             getLogger().debug("COMMIT event received at pos={} file={} while not processing a transaction (i.e. no corresponding BEGIN event). "
                                     + "This could indicate that your binlog position is invalid or the event stream is out of sync or there was an issue with the processor state "
-                                    + "or there was an issue with the processor state.", currentBinlogPosition, currentBinlogFile);
+                                    + "or there was an issue with the processor state.", dataCaptureState.getBinlogPosition(), dataCaptureState.getBinlogFile());
                         }
-                        // InnoDB generates XID events for "commit", but MyISAM generates Query events with "COMMIT", so handle that here
-                        if (includeBeginCommit) {
-                            if (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches()) {
-                                CommitTransactionEventInfo commitTransactionEvent = useGtid
-                                        ? new CommitTransactionEventInfo(currentDatabase, timestamp, currentGtidSet)
-                                        : new CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition);
-                                currentEventInfo = commitTransactionEvent;
-                                currentEventWriter = commitEventWriter;
-                                currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration));
-                            }
-                        } else {
-                            // If the COMMIT event is not to be written, the FlowFile should still be finished and the session committed.
-                            if (currentSession != null) {
-                                FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile();
-                                if (flowFile != null && currentEventWriter != null) {
-                                    // Flush the events to the FlowFile when the processor is stopped
-                                    currentEventWriter.finishAndTransferFlowFile(currentSession, eventWriterConfiguration, transitUri, currentSequenceId.get(), currentEventInfo, REL_SUCCESS);
-                                }
-                                currentSession.commitAsync();
-                            }
+                        if (!(databaseNamePattern != null && !databaseNamePattern.matcher(binlogResourceInfo.getCurrentDatabase()).matches())) {
+                            commitEventHandler.handleEvent(queryEventData, includeBeginCommit, currentDataCaptureState, binlogResourceInfo,
+                                    binlogEventState, sql, commitEventWriter, eventWriterConfiguration, currentSession, timestamp);
                         }
-
-                        //update inTransaction value to state
-                        inTransaction = false;
-                        updateState(session);
+                        // Whether we skip this event or not, it's the end of a transaction
+                        binlogResourceInfo.setInTransaction(false);
+                        updateState(session, dataCaptureState);
                         // If there is no FlowFile open, commit the session
                         if (eventWriterConfiguration.getCurrentFlowFile() == null) {
                             // Commit the NiFi session
                             session.commitAsync();
                         }
-                        currentTable = null;
+                        binlogResourceInfo.setCurrentTable(null);
+                        binlogResourceInfo.setCurrentDatabase(null);
                     } else {
                         // Check for DDL events (alter table, e.g.). Normalize the query to do string matching on the type of change
                         String normalizedQuery = normalizeQuery(sql);
 
-                        if (normalizedQuery.startsWith("alter table")
-                                || normalizedQuery.startsWith("alter ignore table")
-                                || normalizedQuery.startsWith("create table")
-                                || normalizedQuery.startsWith("truncate table")
-                                || normalizedQuery.startsWith("rename table")
-                                || normalizedQuery.startsWith("drop table")
-                                || normalizedQuery.startsWith("drop database")) {
-
-                            if (includeDDLEvents && (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches())) {
-                                // If we don't have table information, we can still use the database name
-                                TableInfo ddlTableInfo = (currentTable != null) ? currentTable : new TableInfo(currentDatabase, null, null, null);
-                                DDLEventInfo ddlEvent = useGtid
-                                        ? new DDLEventInfo(ddlTableInfo, timestamp, currentGtidSet, sql)
-                                        : new DDLEventInfo(ddlTableInfo, timestamp, currentBinlogFile, currentBinlogPosition, sql);
-                                currentEventInfo = ddlEvent;
-                                currentEventWriter = ddlEventWriter;
-                                currentSequenceId.set(ddlEventWriter.writeEvent(currentSession, transitUri, ddlEvent, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration));
-                            }
-                            // Remove all the keys from the cache that this processor added
-                            if (cacheClient != null) {
-                                cacheClient.removeByPattern(this.getIdentifier() + ".*");
+                        if (isQueryDDL(normalizedQuery)) {
+                            if (!(databaseNamePattern != null && !databaseNamePattern.matcher(binlogResourceInfo.getCurrentDatabase()).matches())) {

Review Comment:
   This can be simplified.
   ```suggestion
                               if (databaseNamePattern == null || databaseNamePattern.matcher(binlogResourceInfo.getCurrentDatabase()).matches())) {
   ```



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java:
##########
@@ -839,19 +802,18 @@ protected void connect(List<InetSocketAddress> hosts, String username, String pa
         InetSocketAddress connectedHost = null;
         Exception lastConnectException = new Exception("Unknown connection error");
 
-        if (createEnrichmentConnection) {
-            try {
-                // Ensure driverLocation and driverName are correct before establishing binlog connection
-                // to avoid failing after binlog messages are received.
-                // Actual JDBC connection is created after binlog client gets started, because we need
-                // the connect-able host same as the binlog client.
-                registerDriver(driverLocation, driverName);
-            } catch (InitializationException e) {
-                throw new RuntimeException("Failed to register JDBC driver. Ensure MySQL Driver Location(s)" +
-                        " and MySQL Driver Class Name are configured correctly. " + e, e);
-            }
+        try {

Review Comment:
   Can you explain why `createEnrichmentConnection` check is no longer needed? I don't follow.



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java:
##########
@@ -1391,8 +1296,12 @@ private class JDBCConnectionHolder {
         private JDBCConnectionHolder(InetSocketAddress host, String username, String password, Map<String, String> customProperties, long connectionTimeoutMillis) {
             this.connectionUrl = "jdbc:mysql://" + host.getHostString() + ":" + host.getPort();
             connectionProps.putAll(customProperties);
-            connectionProps.put("user", username);
-            connectionProps.put("password", password);
+            if (username != null) {
+                connectionProps.put("user", username);
+            }
+            if (password != null) {

Review Comment:
   Does it make sense to set password when username is null?



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/InsertEventHandler.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event.handler;
+
+import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
+import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
+import org.apache.nifi.cdc.mysql.event.DataCaptureState;
+import org.apache.nifi.cdc.mysql.event.InsertRowsEventInfo;
+import org.apache.nifi.cdc.mysql.event.io.AbstractBinlogEventWriter;
+import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL;
+import org.apache.nifi.processor.ProcessSession;
+
+import static org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL.REL_SUCCESS;
+
+public class InsertEventHandler implements BinlogEventHandler<WriteRowsEventData, InsertRowsEventInfo> {
+    @Override
+    public void handleEvent(final WriteRowsEventData eventData, final boolean writeEvent, DataCaptureState dataCaptureState,
+                            CaptureChangeMySQL.BinlogResourceInfo binlogResourceInfo, CaptureChangeMySQL.BinlogEventState binlogEventState,
+                            final String sql, AbstractBinlogEventWriter<InsertRowsEventInfo> eventWriter,
+                            EventWriterConfiguration eventWriterConfiguration, ProcessSession session, final long timestamp) {
+        InsertRowsEventInfo eventInfo = dataCaptureState.isUseGtid()
+                ? new InsertRowsEventInfo(binlogResourceInfo.getCurrentTable(), timestamp, dataCaptureState.getGtidSet(), eventData)
+                : new InsertRowsEventInfo(binlogResourceInfo.getCurrentTable(), timestamp, dataCaptureState.getBinlogFile(), dataCaptureState.getBinlogPosition(), eventData);
+        if (writeEvent) {

Review Comment:
   There are multiple cases when the handler does nothing when it is not a write event. If the handler would be able to decide alone whether it is a write event or not, it would be ok. But in this case, I think we should even call the event handler. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] pvillard31 commented on pull request #7116: NIFI-11380: Refactor CaptureChangeMySQL with improvements

Posted by "pvillard31 (via GitHub)" <gi...@apache.org>.
pvillard31 commented on PR #7116:
URL: https://github.com/apache/nifi/pull/7116#issuecomment-1529564795

   Yep, definitely agree with you. I guess it's worth looking into it as they mention performance improvements.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] mattyb149 commented on a diff in pull request #7116: NIFI-11380: Refactor CaptureChangeMySQL with improvements

Posted by "mattyb149 (via GitHub)" <gi...@apache.org>.
mattyb149 commented on code in PR #7116:
URL: https://github.com/apache/nifi/pull/7116#discussion_r1183705169


##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java:
##########
@@ -839,19 +802,18 @@ protected void connect(List<InetSocketAddress> hosts, String username, String pa
         InetSocketAddress connectedHost = null;
         Exception lastConnectException = new Exception("Unknown connection error");
 
-        if (createEnrichmentConnection) {
-            try {
-                // Ensure driverLocation and driverName are correct before establishing binlog connection
-                // to avoid failing after binlog messages are received.
-                // Actual JDBC connection is created after binlog client gets started, because we need
-                // the connect-able host same as the binlog client.
-                registerDriver(driverLocation, driverName);
-            } catch (InitializationException e) {
-                throw new RuntimeException("Failed to register JDBC driver. Ensure MySQL Driver Location(s)" +
-                        " and MySQL Driver Class Name are configured correctly. " + e, e);
-            }
+        try {

Review Comment:
   We stopped using the DistributedMapCache and are always using an internal Map now, so we always create an enrichment connection, so we don't need to see if we need to.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org