You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/08/02 08:34:24 UTC

[GitHub] [inlong] EMsnap opened a new pull request, #5329: [INLONG-5156][Sort] Add metric for sql server source with flink metrics group

EMsnap opened a new pull request, #5329:
URL: https://github.com/apache/inlong/pull/5329

   - Fixes #5156 
   
   ### Motivation
   
   Add metric for sql server source with flink metrics group
   
   ### Modifications
   
   Add metric for sql server source with flink metrics group
   
   ### Documentation
   
     - Does this pull request introduce a new feature? (no)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on pull request #5329: [INLONG-5156][Sort] Add metric for sql server source with flink metrics group

Posted by GitBox <gi...@apache.org>.
gong commented on PR #5329:
URL: https://github.com/apache/inlong/pull/5329#issuecomment-1204737547

   Add some license file.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #5329: [INLONG-5156][Sort] Add metric for sql server source with flink metrics group

Posted by GitBox <gi...@apache.org>.
gong commented on code in PR #5329:
URL: https://github.com/apache/inlong/pull/5329#discussion_r937331520


##########
inlong-sort/sort-connectors/sqlserver-cdc/pom.xml:
##########
@@ -36,6 +36,11 @@
             <groupId>com.ververica</groupId>
             <artifactId>flink-connector-sqlserver-cdc</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-connector-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>

Review Comment:
   It need add `<include>org.apache.inlong:*</include>` in  maven package config. Because now packaging will lost class file of connector-base.
                       



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap merged pull request #5329: [INLONG-5156][Sort] Add metric for SQLServer source with Flink metrics group

Posted by GitBox <gi...@apache.org>.
EMsnap merged PR #5329:
URL: https://github.com/apache/inlong/pull/5329


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap commented on a diff in pull request #5329: [INLONG-5156][Sort] Add metric for sql server source with flink metrics group

Posted by GitBox <gi...@apache.org>.
EMsnap commented on code in PR #5329:
URL: https://github.com/apache/inlong/pull/5329#discussion_r944194383


##########
inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/DebeziumSourceFunction.java:
##########
@@ -0,0 +1,627 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.sqlserver.table;
+
+import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
+import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
+
+import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
+import com.ververica.cdc.debezium.Validator;
+import com.ververica.cdc.debezium.internal.DebeziumChangeConsumer;
+import com.ververica.cdc.debezium.internal.DebeziumChangeFetcher;
+import com.ververica.cdc.debezium.internal.DebeziumOffset;
+import com.ververica.cdc.debezium.internal.DebeziumOffsetSerializer;
+import com.ververica.cdc.debezium.internal.FlinkDatabaseHistory;
+import com.ververica.cdc.debezium.internal.FlinkDatabaseSchemaHistory;
+import com.ververica.cdc.debezium.internal.FlinkOffsetBackingStore;
+import com.ververica.cdc.debezium.internal.Handover;
+import com.ververica.cdc.debezium.internal.SchemaRecord;
+import io.debezium.document.DocumentReader;
+import io.debezium.document.DocumentWriter;
+import io.debezium.embedded.Connect;
+import io.debezium.engine.DebeziumEngine;
+import io.debezium.engine.spi.OffsetCommitPolicy;
+import io.debezium.heartbeat.Heartbeat;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.commons.collections.map.LinkedMap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.inlong.sort.base.Constants;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The {@link DebeziumSourceFunction} is a streaming data source that pulls captured change data
+ * from databases into Flink.
+ *
+ * <p>Please refer to Debezium's documentation for the available configuration properties:
+ * https://debezium.io/documentation/reference/1.5/development/engine.html#engine-properties
+ */
+@PublicEvolving
+public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
+        implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<T> {
+
+    private static final long serialVersionUID = -5808108641062931623L;
+
+    protected static final Logger LOG = LoggerFactory.getLogger(DebeziumSourceFunction.class);
+
+    /**
+     * State name of the consumer's partition offset states.
+     */
+    public static final String OFFSETS_STATE_NAME = "offset-states";
+
+    /**
+     * State name of the consumer's history records state.
+     */
+    public static final String HISTORY_RECORDS_STATE_NAME = "history-records-states";
+
+    /**
+     * The maximum number of pending non-committed checkpoints to track, to avoid memory leaks.
+     */
+    public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
+
+    /**
+     * The configuration represents the Debezium MySQL Connector uses the legacy implementation or
+     * not.
+     */
+    public static final String LEGACY_IMPLEMENTATION_KEY = "internal.implementation";
+
+    /**
+     * The configuration value represents legacy implementation.
+     */
+    public static final String LEGACY_IMPLEMENTATION_VALUE = "legacy";
+
+    // ---------------------------------------------------------------------------------------
+    // Properties
+    // ---------------------------------------------------------------------------------------
+
+    /**
+     * The schema to convert from Debezium's messages into Flink's objects.
+     */
+    private final DebeziumDeserializationSchema<T> deserializer;
+
+    /**
+     * User-supplied properties for Kafka. *
+     */
+    private final Properties properties;
+
+    /**
+     * The specific binlog offset to read from when the first startup.
+     */
+    private final @Nullable
+    DebeziumOffset specificOffset;
+
+    /**
+     * Data for pending but uncommitted offsets.
+     */
+    private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
+
+    /**
+     * Flag indicating whether the Debezium Engine is started.
+     */
+    private volatile boolean debeziumStarted = false;
+
+    /**
+     * Validator to validate the connected database satisfies the cdc connector's requirements.
+     */
+    private final Validator validator;
+
+    // ---------------------------------------------------------------------------------------
+    // State
+    // ---------------------------------------------------------------------------------------
+
+    /**
+     * The offsets to restore to, if the consumer restores state from a checkpoint.
+     *
+     * <p>This map will be populated by the {@link #initializeState(FunctionInitializationContext)}
+     * method.
+     *
+     * <p>Using a String because we are encoding the offset state in JSON bytes.
+     */
+    private transient volatile String restoredOffsetState;
+
+    /**
+     * Accessor for state in the operator state backend.
+     */
+    private transient ListState<byte[]> offsetState;
+
+    /**
+     * State to store the history records, i.e. schema changes.
+     *
+     * @see FlinkDatabaseHistory
+     * @see FlinkDatabaseSchemaHistory
+     */
+    private transient ListState<String> schemaRecordsState;
+
+    // ---------------------------------------------------------------------------------------
+    // Worker
+    // ---------------------------------------------------------------------------------------
+
+    private transient ExecutorService executor;
+    private transient DebeziumEngine<?> engine;
+    /**
+     * Unique name of this Debezium Engine instance across all the jobs. Currently we randomly
+     * generate a UUID for it. This is used for {@link FlinkDatabaseHistory}.
+     */
+    private transient String engineInstanceName;
+
+    /**
+     * Consume the events from the engine and commit the offset to the engine.
+     */
+    private transient DebeziumChangeConsumer changeConsumer;
+
+    /**
+     * The consumer to fetch records from {@link Handover}.
+     */
+    private transient DebeziumChangeFetcher<T> debeziumChangeFetcher;
+
+    /**
+     * Buffer the events from the source and record the errors from the debezium.
+     */
+    private transient Handover handover;
+
+    private String inlongMetric;
+
+    private SourceMetricData metricData;
+
+    // ---------------------------------------------------------------------------------------
+
+    public DebeziumSourceFunction(
+            DebeziumDeserializationSchema<T> deserializer,
+            Properties properties,
+            @Nullable DebeziumOffset specificOffset,
+            Validator validator, String inlongMetric) {
+        this.deserializer = deserializer;
+        this.properties = properties;
+        this.specificOffset = specificOffset;
+        this.validator = validator;
+        this.inlongMetric = inlongMetric;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        validator.validate();
+        super.open(parameters);
+        ThreadFactory threadFactory =
+                new ThreadFactoryBuilder().setNameFormat("debezium-engine").build();
+        this.executor = Executors.newSingleThreadExecutor(threadFactory);
+        this.handover = new Handover();
+        this.changeConsumer = new DebeziumChangeConsumer(handover);
+    }
+
+    // ------------------------------------------------------------------------
+    //  Checkpoint and restore
+    // ------------------------------------------------------------------------
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws Exception {
+        OperatorStateStore stateStore = context.getOperatorStateStore();
+        this.offsetState =
+                stateStore.getUnionListState(
+                        new ListStateDescriptor<>(
+                                OFFSETS_STATE_NAME,
+                                PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO));
+        this.schemaRecordsState =
+                stateStore.getUnionListState(
+                        new ListStateDescriptor<>(
+                                HISTORY_RECORDS_STATE_NAME, BasicTypeInfo.STRING_TYPE_INFO));
+
+        if (context.isRestored()) {
+            restoreOffsetState();
+            restoreHistoryRecordsState();
+        } else {
+            if (specificOffset != null) {
+                byte[] serializedOffset =
+                        DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset);
+                restoredOffsetState = new String(serializedOffset, StandardCharsets.UTF_8);
+                LOG.info(
+                        "Consumer subtask {} starts to read from specified offset {}.",
+                        getRuntimeContext().getIndexOfThisSubtask(),
+                        restoredOffsetState);
+            } else {
+                LOG.info(
+                        "Consumer subtask {} has no restore state.",
+                        getRuntimeContext().getIndexOfThisSubtask());
+            }
+        }
+    }
+
+    private void restoreOffsetState() throws Exception {
+        for (byte[] serializedOffset : offsetState.get()) {
+            if (restoredOffsetState == null) {
+                restoredOffsetState = new String(serializedOffset, StandardCharsets.UTF_8);
+            } else {
+                throw new RuntimeException(
+                        "Debezium Source only support single task, "
+                                + "however, this is restored from multiple tasks.");
+            }
+        }
+        LOG.info(
+                "Consumer subtask {} restored offset state: {}.",
+                getRuntimeContext().getIndexOfThisSubtask(),
+                restoredOffsetState);
+    }
+
+    private void restoreHistoryRecordsState() throws Exception {
+        DocumentReader reader = DocumentReader.defaultReader();
+        ConcurrentLinkedQueue<SchemaRecord> historyRecords = new ConcurrentLinkedQueue<>();
+        int recordsCount = 0;
+        boolean firstEntry = true;
+        for (String record : schemaRecordsState.get()) {
+            if (firstEntry) {
+                // we store the engine instance name in the first element
+                this.engineInstanceName = record;
+                firstEntry = false;
+            } else {
+                // Put the records into the state. The database history should read, reorganize and
+                // register the state.
+                historyRecords.add(new SchemaRecord(reader.read(record)));
+                recordsCount++;
+            }
+        }
+        if (engineInstanceName != null) {
+            registerHistory(engineInstanceName, historyRecords);
+        }
+        LOG.info(
+                "Consumer subtask {} restored history records state: {} with {} records.",
+                getRuntimeContext().getIndexOfThisSubtask(),
+                engineInstanceName,
+                recordsCount);
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
+        if (handover.hasError()) {
+            LOG.debug("snapshotState() called on closed source");
+            throw new FlinkRuntimeException(
+                    "Call snapshotState() on closed source, checkpoint failed.");
+        } else {
+            snapshotOffsetState(functionSnapshotContext.getCheckpointId());
+            snapshotHistoryRecordsState();
+        }
+    }
+
+    private void snapshotOffsetState(long checkpointId) throws Exception {
+        offsetState.clear();
+
+        final DebeziumChangeFetcher<?> fetcher = this.debeziumChangeFetcher;
+
+        byte[] serializedOffset = null;
+        if (fetcher == null) {
+            // the fetcher has not yet been initialized, which means we need to return the
+            // originally restored offsets
+            if (restoredOffsetState != null) {
+                serializedOffset = restoredOffsetState.getBytes(StandardCharsets.UTF_8);
+            }
+        } else {
+            byte[] currentState = fetcher.snapshotCurrentState();
+            if (currentState == null && restoredOffsetState != null) {
+                // the fetcher has been initialized, but has not yet received any data,
+                // which means we need to return the originally restored offsets.
+                serializedOffset = restoredOffsetState.getBytes(StandardCharsets.UTF_8);
+            } else {
+                serializedOffset = currentState;
+            }
+        }
+
+        if (serializedOffset != null) {
+            offsetState.add(serializedOffset);
+            // the map cannot be asynchronously updated, because only one checkpoint call
+            // can happen on this function at a time: either snapshotState() or
+            // notifyCheckpointComplete()
+            pendingOffsetsToCommit.put(checkpointId, serializedOffset);
+            // truncate the map of pending offsets to commit, to prevent infinite growth
+            while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
+                pendingOffsetsToCommit.remove(0);
+            }
+        }
+    }
+
+    private void snapshotHistoryRecordsState() throws Exception {
+        schemaRecordsState.clear();
+
+        if (engineInstanceName != null) {
+            schemaRecordsState.add(engineInstanceName);
+            Collection<SchemaRecord> records = retrieveHistory(engineInstanceName);
+            DocumentWriter writer = DocumentWriter.defaultWriter();
+            for (SchemaRecord record : records) {
+                schemaRecordsState.add(writer.write(record.toDocument()));
+            }
+        }
+    }
+
+    @Override
+    public void run(SourceContext<T> sourceContext) throws Exception {
+
+        // initialize metrics
+        // make RuntimeContext#getMetricGroup compatible between Flink 1.13 and Flink 1.14
+        final Method getMetricGroupMethod =
+                getRuntimeContext().getClass().getMethod("getMetricGroup");
+        getMetricGroupMethod.setAccessible(true);
+        final MetricGroup metricGroup =
+                (MetricGroup) getMetricGroupMethod.invoke(getRuntimeContext());
+
+        metricGroup.gauge(
+                "currentFetchEventTimeLag",
+                (Gauge<Long>) () -> debeziumChangeFetcher.getFetchDelay());
+        metricGroup.gauge(
+                "currentEmitEventTimeLag",
+                (Gauge<Long>) () -> debeziumChangeFetcher.getEmitDelay());
+        metricGroup.gauge(
+                "sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime());
+        if (StringUtils.isNotEmpty(this.inlongMetric)) {
+            String[] inlongMetricArray = inlongMetric.split(Constants.DELIMITER);
+            String groupId = inlongMetricArray[0];
+            String streamId = inlongMetricArray[1];
+            String nodeId = inlongMetricArray[2];
+            metricData = new SourceMetricData(metricGroup);
+            metricData.registerMetricsForNumRecordsIn(groupId, streamId, nodeId, "numRecordsIn");

Review Comment:
   done



##########
inlong-sort/sort-connectors/sqlserver-cdc/pom.xml:
##########
@@ -36,6 +36,11 @@
             <groupId>com.ververica</groupId>
             <artifactId>flink-connector-sqlserver-cdc</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-connector-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #5329: [INLONG-5156][Sort] Add metric for sql server source with flink metrics group

Posted by GitBox <gi...@apache.org>.
yunqingmoswu commented on code in PR #5329:
URL: https://github.com/apache/inlong/pull/5329#discussion_r937465300


##########
inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/DebeziumSourceFunction.java:
##########
@@ -0,0 +1,627 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.sqlserver.table;
+
+import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
+import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
+
+import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
+import com.ververica.cdc.debezium.Validator;
+import com.ververica.cdc.debezium.internal.DebeziumChangeConsumer;
+import com.ververica.cdc.debezium.internal.DebeziumChangeFetcher;
+import com.ververica.cdc.debezium.internal.DebeziumOffset;
+import com.ververica.cdc.debezium.internal.DebeziumOffsetSerializer;
+import com.ververica.cdc.debezium.internal.FlinkDatabaseHistory;
+import com.ververica.cdc.debezium.internal.FlinkDatabaseSchemaHistory;
+import com.ververica.cdc.debezium.internal.FlinkOffsetBackingStore;
+import com.ververica.cdc.debezium.internal.Handover;
+import com.ververica.cdc.debezium.internal.SchemaRecord;
+import io.debezium.document.DocumentReader;
+import io.debezium.document.DocumentWriter;
+import io.debezium.embedded.Connect;
+import io.debezium.engine.DebeziumEngine;
+import io.debezium.engine.spi.OffsetCommitPolicy;
+import io.debezium.heartbeat.Heartbeat;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.commons.collections.map.LinkedMap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.inlong.sort.base.Constants;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The {@link DebeziumSourceFunction} is a streaming data source that pulls captured change data
+ * from databases into Flink.
+ *
+ * <p>Please refer to Debezium's documentation for the available configuration properties:
+ * https://debezium.io/documentation/reference/1.5/development/engine.html#engine-properties
+ */
+@PublicEvolving
+public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
+        implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<T> {
+
+    private static final long serialVersionUID = -5808108641062931623L;
+
+    protected static final Logger LOG = LoggerFactory.getLogger(DebeziumSourceFunction.class);
+
+    /**
+     * State name of the consumer's partition offset states.
+     */
+    public static final String OFFSETS_STATE_NAME = "offset-states";
+
+    /**
+     * State name of the consumer's history records state.
+     */
+    public static final String HISTORY_RECORDS_STATE_NAME = "history-records-states";
+
+    /**
+     * The maximum number of pending non-committed checkpoints to track, to avoid memory leaks.
+     */
+    public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
+
+    /**
+     * The configuration represents the Debezium MySQL Connector uses the legacy implementation or
+     * not.
+     */
+    public static final String LEGACY_IMPLEMENTATION_KEY = "internal.implementation";
+
+    /**
+     * The configuration value represents legacy implementation.
+     */
+    public static final String LEGACY_IMPLEMENTATION_VALUE = "legacy";
+
+    // ---------------------------------------------------------------------------------------
+    // Properties
+    // ---------------------------------------------------------------------------------------
+
+    /**
+     * The schema to convert from Debezium's messages into Flink's objects.
+     */
+    private final DebeziumDeserializationSchema<T> deserializer;
+
+    /**
+     * User-supplied properties for Kafka. *
+     */
+    private final Properties properties;
+
+    /**
+     * The specific binlog offset to read from when the first startup.
+     */
+    private final @Nullable
+    DebeziumOffset specificOffset;
+
+    /**
+     * Data for pending but uncommitted offsets.
+     */
+    private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
+
+    /**
+     * Flag indicating whether the Debezium Engine is started.
+     */
+    private volatile boolean debeziumStarted = false;
+
+    /**
+     * Validator to validate the connected database satisfies the cdc connector's requirements.
+     */
+    private final Validator validator;
+
+    // ---------------------------------------------------------------------------------------
+    // State
+    // ---------------------------------------------------------------------------------------
+
+    /**
+     * The offsets to restore to, if the consumer restores state from a checkpoint.
+     *
+     * <p>This map will be populated by the {@link #initializeState(FunctionInitializationContext)}
+     * method.
+     *
+     * <p>Using a String because we are encoding the offset state in JSON bytes.
+     */
+    private transient volatile String restoredOffsetState;
+
+    /**
+     * Accessor for state in the operator state backend.
+     */
+    private transient ListState<byte[]> offsetState;
+
+    /**
+     * State to store the history records, i.e. schema changes.
+     *
+     * @see FlinkDatabaseHistory
+     * @see FlinkDatabaseSchemaHistory
+     */
+    private transient ListState<String> schemaRecordsState;
+
+    // ---------------------------------------------------------------------------------------
+    // Worker
+    // ---------------------------------------------------------------------------------------
+
+    private transient ExecutorService executor;
+    private transient DebeziumEngine<?> engine;
+    /**
+     * Unique name of this Debezium Engine instance across all the jobs. Currently we randomly
+     * generate a UUID for it. This is used for {@link FlinkDatabaseHistory}.
+     */
+    private transient String engineInstanceName;
+
+    /**
+     * Consume the events from the engine and commit the offset to the engine.
+     */
+    private transient DebeziumChangeConsumer changeConsumer;
+
+    /**
+     * The consumer to fetch records from {@link Handover}.
+     */
+    private transient DebeziumChangeFetcher<T> debeziumChangeFetcher;
+
+    /**
+     * Buffer the events from the source and record the errors from the debezium.
+     */
+    private transient Handover handover;
+
+    private String inlongMetric;
+
+    private SourceMetricData metricData;
+
+    // ---------------------------------------------------------------------------------------
+
+    public DebeziumSourceFunction(
+            DebeziumDeserializationSchema<T> deserializer,
+            Properties properties,
+            @Nullable DebeziumOffset specificOffset,
+            Validator validator, String inlongMetric) {
+        this.deserializer = deserializer;
+        this.properties = properties;
+        this.specificOffset = specificOffset;
+        this.validator = validator;
+        this.inlongMetric = inlongMetric;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        validator.validate();
+        super.open(parameters);
+        ThreadFactory threadFactory =
+                new ThreadFactoryBuilder().setNameFormat("debezium-engine").build();
+        this.executor = Executors.newSingleThreadExecutor(threadFactory);
+        this.handover = new Handover();
+        this.changeConsumer = new DebeziumChangeConsumer(handover);
+    }
+
+    // ------------------------------------------------------------------------
+    //  Checkpoint and restore
+    // ------------------------------------------------------------------------
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws Exception {
+        OperatorStateStore stateStore = context.getOperatorStateStore();
+        this.offsetState =
+                stateStore.getUnionListState(
+                        new ListStateDescriptor<>(
+                                OFFSETS_STATE_NAME,
+                                PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO));
+        this.schemaRecordsState =
+                stateStore.getUnionListState(
+                        new ListStateDescriptor<>(
+                                HISTORY_RECORDS_STATE_NAME, BasicTypeInfo.STRING_TYPE_INFO));
+
+        if (context.isRestored()) {
+            restoreOffsetState();
+            restoreHistoryRecordsState();
+        } else {
+            if (specificOffset != null) {
+                byte[] serializedOffset =
+                        DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset);
+                restoredOffsetState = new String(serializedOffset, StandardCharsets.UTF_8);
+                LOG.info(
+                        "Consumer subtask {} starts to read from specified offset {}.",
+                        getRuntimeContext().getIndexOfThisSubtask(),
+                        restoredOffsetState);
+            } else {
+                LOG.info(
+                        "Consumer subtask {} has no restore state.",
+                        getRuntimeContext().getIndexOfThisSubtask());
+            }
+        }
+    }
+
+    private void restoreOffsetState() throws Exception {
+        for (byte[] serializedOffset : offsetState.get()) {
+            if (restoredOffsetState == null) {
+                restoredOffsetState = new String(serializedOffset, StandardCharsets.UTF_8);
+            } else {
+                throw new RuntimeException(
+                        "Debezium Source only support single task, "
+                                + "however, this is restored from multiple tasks.");
+            }
+        }
+        LOG.info(
+                "Consumer subtask {} restored offset state: {}.",
+                getRuntimeContext().getIndexOfThisSubtask(),
+                restoredOffsetState);
+    }
+
+    private void restoreHistoryRecordsState() throws Exception {
+        DocumentReader reader = DocumentReader.defaultReader();
+        ConcurrentLinkedQueue<SchemaRecord> historyRecords = new ConcurrentLinkedQueue<>();
+        int recordsCount = 0;
+        boolean firstEntry = true;
+        for (String record : schemaRecordsState.get()) {
+            if (firstEntry) {
+                // we store the engine instance name in the first element
+                this.engineInstanceName = record;
+                firstEntry = false;
+            } else {
+                // Put the records into the state. The database history should read, reorganize and
+                // register the state.
+                historyRecords.add(new SchemaRecord(reader.read(record)));
+                recordsCount++;
+            }
+        }
+        if (engineInstanceName != null) {
+            registerHistory(engineInstanceName, historyRecords);
+        }
+        LOG.info(
+                "Consumer subtask {} restored history records state: {} with {} records.",
+                getRuntimeContext().getIndexOfThisSubtask(),
+                engineInstanceName,
+                recordsCount);
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
+        if (handover.hasError()) {
+            LOG.debug("snapshotState() called on closed source");
+            throw new FlinkRuntimeException(
+                    "Call snapshotState() on closed source, checkpoint failed.");
+        } else {
+            snapshotOffsetState(functionSnapshotContext.getCheckpointId());
+            snapshotHistoryRecordsState();
+        }
+    }
+
+    private void snapshotOffsetState(long checkpointId) throws Exception {
+        offsetState.clear();
+
+        final DebeziumChangeFetcher<?> fetcher = this.debeziumChangeFetcher;
+
+        byte[] serializedOffset = null;
+        if (fetcher == null) {
+            // the fetcher has not yet been initialized, which means we need to return the
+            // originally restored offsets
+            if (restoredOffsetState != null) {
+                serializedOffset = restoredOffsetState.getBytes(StandardCharsets.UTF_8);
+            }
+        } else {
+            byte[] currentState = fetcher.snapshotCurrentState();
+            if (currentState == null && restoredOffsetState != null) {
+                // the fetcher has been initialized, but has not yet received any data,
+                // which means we need to return the originally restored offsets.
+                serializedOffset = restoredOffsetState.getBytes(StandardCharsets.UTF_8);
+            } else {
+                serializedOffset = currentState;
+            }
+        }
+
+        if (serializedOffset != null) {
+            offsetState.add(serializedOffset);
+            // the map cannot be asynchronously updated, because only one checkpoint call
+            // can happen on this function at a time: either snapshotState() or
+            // notifyCheckpointComplete()
+            pendingOffsetsToCommit.put(checkpointId, serializedOffset);
+            // truncate the map of pending offsets to commit, to prevent infinite growth
+            while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
+                pendingOffsetsToCommit.remove(0);
+            }
+        }
+    }
+
+    private void snapshotHistoryRecordsState() throws Exception {
+        schemaRecordsState.clear();
+
+        if (engineInstanceName != null) {
+            schemaRecordsState.add(engineInstanceName);
+            Collection<SchemaRecord> records = retrieveHistory(engineInstanceName);
+            DocumentWriter writer = DocumentWriter.defaultWriter();
+            for (SchemaRecord record : records) {
+                schemaRecordsState.add(writer.write(record.toDocument()));
+            }
+        }
+    }
+
+    @Override
+    public void run(SourceContext<T> sourceContext) throws Exception {
+
+        // initialize metrics
+        // make RuntimeContext#getMetricGroup compatible between Flink 1.13 and Flink 1.14
+        final Method getMetricGroupMethod =
+                getRuntimeContext().getClass().getMethod("getMetricGroup");
+        getMetricGroupMethod.setAccessible(true);
+        final MetricGroup metricGroup =
+                (MetricGroup) getMetricGroupMethod.invoke(getRuntimeContext());
+
+        metricGroup.gauge(
+                "currentFetchEventTimeLag",
+                (Gauge<Long>) () -> debeziumChangeFetcher.getFetchDelay());
+        metricGroup.gauge(
+                "currentEmitEventTimeLag",
+                (Gauge<Long>) () -> debeziumChangeFetcher.getEmitDelay());
+        metricGroup.gauge(
+                "sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime());
+        if (StringUtils.isNotEmpty(this.inlongMetric)) {
+            String[] inlongMetricArray = inlongMetric.split(Constants.DELIMITER);
+            String groupId = inlongMetricArray[0];
+            String streamId = inlongMetricArray[1];
+            String nodeId = inlongMetricArray[2];
+            metricData = new SourceMetricData(metricGroup);
+            metricData.registerMetricsForNumRecordsIn(groupId, streamId, nodeId, "numRecordsIn");

Review Comment:
   Please replace it with the constant in connector-base.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #5329: [INLONG-5156][Sort] Add metric for SQLServer source with Flink metrics group

Posted by GitBox <gi...@apache.org>.
yunqingmoswu commented on code in PR #5329:
URL: https://github.com/apache/inlong/pull/5329#discussion_r944394886


##########
inlong-sort/sort-connectors/sqlserver-cdc/pom.xml:
##########
@@ -36,6 +36,21 @@
             <groupId>com.ververica</groupId>
             <artifactId>flink-connector-sqlserver-cdc</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-connector-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-connector-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>

Review Comment:
   Remove it  because of it is exists in the pom.xml of sort-connector-base.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap commented on pull request #5329: [INLONG-5156][Sort] Add metric for sql server source with flink metrics group

Posted by GitBox <gi...@apache.org>.
EMsnap commented on PR #5329:
URL: https://github.com/apache/inlong/pull/5329#issuecomment-1212911312

   > Add some license file.
   
   done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org