You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by "e-mhui (via GitHub)" <gi...@apache.org> on 2023/04/25 13:33:38 UTC

[GitHub] [inlong] e-mhui commented on a diff in pull request #7921: [INLONG-7908][Sort] Update cdc-base for Incremental Snapshot Framework

e-mhui commented on code in PR #7921:
URL: https://github.com/apache/inlong/pull/7921#discussion_r1176483196


##########
inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/relational/JdbcSourceEventDispatcher.java:
##########
@@ -71,14 +70,14 @@ public class JdbcSourceEventDispatcher extends EventDispatcher<TableId> {
 
     private static final DocumentWriter DOCUMENT_WRITER = DocumentWriter.defaultWriter();
 
-    public final ChangeEventQueue<DataChangeEvent> queue;
-    public final HistorizedDatabaseSchema historizedSchema;
-    public final DataCollectionFilters.DataCollectionFilter<TableId> filter;
-    public final CommonConnectorConfig connectorConfig;
-    public final TopicSelector<TableId> topicSelector;
-    public final Schema schemaChangeKeySchema;
-    public final Schema schemaChangeValueSchema;
-    public final String topic;
+    private final ChangeEventQueue<DataChangeEvent> queue;
+    private final HistorizedDatabaseSchema historizedSchema;
+    private final DataCollectionFilters.DataCollectionFilter<TableId> filter;
+    private final CommonConnectorConfig connectorConfig;

Review Comment:
   These changes will affect this PR https://github.com/apache/inlong/pull/7709, so you should not modify it.
   



##########
inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/debezium/table/AppendMetadataCollector.java:
##########
@@ -46,14 +46,14 @@ public AppendMetadataCollector(MetadataConverter[] metadataConverters, boolean m
     }
 
     public void collect(RowData physicalRow, TableChange tableSchema) {
-        GenericRowData metaRow = new GenericRowData(metadataConverters.length);
+        GenericRowData metaRow = new GenericRowData(physicalRow.getRowKind(), metadataConverters.length);

Review Comment:
   Why add `rowkind` ? 
   



##########
inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/relational/JdbcSourceEventDispatcher.java:
##########
@@ -191,16 +186,14 @@ private Struct schemaChangeRecordKey(SchemaChangeEvent event) {
         }
 
         private Struct schemaChangeRecordValue(SchemaChangeEvent event) throws IOException {
+            Struct sourceInfo = event.getSource();
             Map<String, Object> source = new HashMap<>();
-            if (isMysqlConnector(event.getSource())) {
-                Struct sourceInfo = event.getSource();
-                String fileName = sourceInfo.getString(BINLOG_FILENAME_OFFSET_KEY);
-                Long pos = sourceInfo.getInt64(BINLOG_POSITION_OFFSET_KEY);
-                Long serverId = sourceInfo.getInt64(SERVER_ID_KEY);
-                source.put(SERVER_ID_KEY, serverId);
-                source.put(BINLOG_FILENAME_OFFSET_KEY, fileName);
-                source.put(BINLOG_POSITION_OFFSET_KEY, pos);
-            }
+            String fileName = sourceInfo.getString(BINLOG_FILENAME_OFFSET_KEY);
+            Long pos = sourceInfo.getInt64(BINLOG_POSITION_OFFSET_KEY);
+            Long serverId = sourceInfo.getInt64(SERVER_ID_KEY);
+            source.put(SERVER_ID_KEY, serverId);
+            source.put(BINLOG_FILENAME_OFFSET_KEY, fileName);
+            source.put(BINLOG_POSITION_OFFSET_KEY, pos);

Review Comment:
   These changes will affect this PR https://github.com/apache/inlong/pull/7709, so you should not modify it.
   



##########
inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/assigner/HybridSplitAssigner.java:
##########
@@ -115,15 +115,23 @@ public Optional<SourceSplitBase> getNext() {
             // stream split assigning
             if (isStreamSplitAssigned) {
                 // no more splits for the assigner
+                LOG.trace(
+                        "No more splits for the SnapshotSplitAssigner. StreamSplit is already assigned.");
                 return Optional.empty();
             } else if (snapshotSplitAssigner.isFinished()) {
                 // we need to wait snapshot-assigner to be finished before
                 // assigning the stream split. Otherwise, records emitted from stream split
                 // might be out-of-order in terms of same primary key with snapshot splits.
                 isStreamSplitAssigned = true;
-                return Optional.of(createStreamSplit());
+                StreamSplit streamSplit = createStreamSplit();
+                LOG.trace(
+                        "SnapshotSplitAssigner is finished: creating a new stream split {}",
+                        streamSplit);

Review Comment:
   If some logs are not important, they can be removed. Excessive logging can impact performance.



##########
inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java:
##########
@@ -97,25 +100,27 @@ public void submitTask(FetchTask<SourceSplitBase> fetchTask) {
 
     @Override
     public boolean isFinished() {
-        return currentStreamSplit == null || !streamFetchTask.isRunning();
+        return currentStreamSplit == null || !currentTaskRunning;
     }
 
     @Nullable
     @Override
     public Iterator<SourceRecords> pollSplitRecords() throws InterruptedException {
         checkReadException();
         final List<SourceRecord> sourceRecords = new ArrayList<>();
-        if (streamFetchTask.isRunning()) {
+        if (currentTaskRunning) {

Review Comment:
   Why replace `streamFetchTask.isRunning()`?
   



##########
inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/reader/external/JdbcSourceFetchTaskContext.java:
##########
@@ -158,7 +157,7 @@ public CommonConnectorConfig getDbzConnectorConfig() {
     }
 
     public SchemaNameAdjuster getSchemaNameAdjuster() {
-        return null;
+        return SchemaNameAdjuster.create();

Review Comment:
   Why not return null ?



##########
inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/SerializerUtils.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.base.util;
+
+import io.debezium.DebeziumException;
+import io.debezium.relational.TableId;
+import io.debezium.util.HexConverter;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+public class SerializerUtils {

Review Comment:
   It's possible to directly import from flink cdc-base instead of copying it.



##########
inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/SourceRecordUtils.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.base.util;
+
+import io.debezium.data.Envelope;
+import io.debezium.document.DocumentReader;
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.HistoryRecord;
+import io.debezium.util.SchemaNameAdjuster;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Arrays;
+
+import static org.apache.inlong.sort.cdc.base.relational.JdbcSourceEventDispatcher.HISTORY_RECORD_FIELD;
+import static io.debezium.connector.AbstractSourceInfo.DATABASE_NAME_KEY;
+import static io.debezium.connector.AbstractSourceInfo.SCHEMA_NAME_KEY;
+import static io.debezium.connector.AbstractSourceInfo.TABLE_NAME_KEY;
+
+/** Utility class to deal record. */
+public class SourceRecordUtils {

Review Comment:
   It's possible to directly import from flink cdc-base instead of copying it.



##########
inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/EmbeddedFlinkDatabaseHistory.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.base.source;
+
+import io.debezium.config.Configuration;
+import io.debezium.relational.TableId;
+import io.debezium.relational.Tables;
+import io.debezium.relational.ddl.DdlParser;
+import io.debezium.relational.history.DatabaseHistory;
+import io.debezium.relational.history.DatabaseHistoryException;
+import io.debezium.relational.history.DatabaseHistoryListener;
+import io.debezium.relational.history.HistoryRecord;
+import io.debezium.relational.history.HistoryRecordComparator;
+import io.debezium.relational.history.TableChanges;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * A {@link DatabaseHistory} implementation which store the latest table schema in Flink state.
+ *
+ * <p>It stores/recovers history using data offered by {@link SourceSplitState}.
+ */
+public class EmbeddedFlinkDatabaseHistory implements DatabaseHistory {
+

Review Comment:
    It's possible to directly import from flink cdc-base, instead of copying it.
   



##########
inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/assigner/state/PendingSplitsStateSerializer.java:
##########
@@ -367,8 +370,9 @@ private List<TableId> readTableIds(DataInputDeserializer in) throws IOException
         List<TableId> tableIds = new ArrayList<>();
         final int size = in.readInt();
         for (int i = 0; i < size; i++) {
+            boolean useCatalogBeforeSchema = in.readBoolean();
             String tableIdStr = in.readUTF();
-            tableIds.add(TableId.parse(tableIdStr));
+            tableIds.add(TableId.parse(tableIdStr, useCatalogBeforeSchema));

Review Comment:
   Please test if the task restore from savepoint is working properly.



##########
inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/assigner/HybridSplitAssigner.java:
##########
@@ -115,15 +115,23 @@ public Optional<SourceSplitBase> getNext() {
             // stream split assigning
             if (isStreamSplitAssigned) {
                 // no more splits for the assigner
+                LOG.trace(
+                        "No more splits for the SnapshotSplitAssigner. StreamSplit is already assigned.");

Review Comment:
   If some logs are not important, they can be removed. Excessive logging can impact performance.
   



##########
inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java:
##########
@@ -97,25 +100,27 @@ public void submitTask(FetchTask<SourceSplitBase> fetchTask) {
 
     @Override
     public boolean isFinished() {
-        return currentStreamSplit == null || !streamFetchTask.isRunning();
+        return currentStreamSplit == null || !currentTaskRunning;

Review Comment:
   Why replace `!streamFetchTask.isRunning()`?
   



##########
inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/reader/external/JdbcSourceFetchTaskContext.java:
##########
@@ -59,18 +59,17 @@ public JdbcSourceFetchTaskContext(
 
     @Override
     public TableId getTableId(SourceRecord record) {
-        return null;
+        return SourceRecordUtils.getTableId(record);
     }
 
     @Override
     public boolean isDataChangeRecord(SourceRecord record) {
-        return false;
+        return SourceRecordUtils.isDataChangeRecord(record);
     }

Review Comment:
   Why not return null ?



##########
inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/reader/external/JdbcSourceFetchTaskContext.java:
##########
@@ -59,18 +59,17 @@ public JdbcSourceFetchTaskContext(
 
     @Override
     public TableId getTableId(SourceRecord record) {
-        return null;
+        return SourceRecordUtils.getTableId(record);

Review Comment:
   Why not return null ? 



##########
inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/ObjectUtils.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.base.util;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+
+/** Utilities for operation on {@link Object}. */
+public class ObjectUtils {

Review Comment:
   It's possible to directly import from flink cdc-base instead of copying it.
   



##########
inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java:
##########
@@ -164,8 +171,7 @@ private boolean shouldEmit(SourceRecord sourceRecord) {
         if (taskContext.isDataChangeRecord(sourceRecord)) {
             TableId tableId = taskContext.getTableId(sourceRecord);
             Offset position = taskContext.getStreamOffset(sourceRecord);
-            // source record has no primary need no comparing for binlog position
-            if (hasEnterPureStreamPhase(tableId, position) || sourceRecord.key() == null) {
+            if (hasEnterPureStreamPhase(tableId, position)) {

Review Comment:
   These changes will affect this PR https://github.com/apache/inlong/pull/7489, so you should not modify it.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org