You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/05/17 04:58:52 UTC

[GitHub] [incubator-inlong] healchow commented on a diff in pull request #4229: [INLONG-4224][Sort] Add debezium module to connectors

healchow commented on code in PR #4229:
URL: https://github.com/apache/incubator-inlong/pull/4229#discussion_r874366900


##########
inlong-sort/sort-connectors/sort-connector-debezium/src/main/java/org/apache/inlong/cdc/debezium/utils/TemporalConversions.java:
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.cdc.debezium.utils;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Temporal conversion constants.
+ */
+public final class TemporalConversions {
+
+    static final long MILLISECONDS_PER_SECOND = TimeUnit.SECONDS.toMillis(1);
+    static final long MICROSECONDS_PER_SECOND = TimeUnit.SECONDS.toMicros(1);
+    static final long MICROSECONDS_PER_MILLISECOND = TimeUnit.MILLISECONDS.toMicros(1);
+    static final long NANOSECONDS_PER_MILLISECOND = TimeUnit.MILLISECONDS.toNanos(1);
+    static final long NANOSECONDS_PER_MICROSECOND = TimeUnit.MICROSECONDS.toNanos(1);
+    static final long NANOSECONDS_PER_SECOND = TimeUnit.SECONDS.toNanos(1);
+    static final long NANOSECONDS_PER_DAY = TimeUnit.DAYS.toNanos(1);
+    static final long SECONDS_PER_DAY = TimeUnit.DAYS.toSeconds(1);
+    static final long MICROSECONDS_PER_DAY = TimeUnit.DAYS.toMicros(1);
+    static final LocalDate EPOCH = LocalDate.ofEpochDay(0);
+
+    private TemporalConversions() {
+    }
+
+    public static LocalDate toLocalDate(Object obj) {

Review Comment:
   Please add some Java docs for those static methods, thanks.



##########
inlong-sort/sort-connectors/sort-connector-debezium/src/main/java/org/apache/inlong/cdc/debezium/internal/FlinkOffsetBackingStore.java:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.cdc.debezium.internal;
+
+import io.debezium.embedded.EmbeddedEngine;
+import io.debezium.engine.DebeziumEngine;
+import org.apache.inlong.cdc.debezium.DebeziumSourceFunction;
+import org.apache.kafka.common.utils.ThreadUtils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.OffsetBackingStore;
+import org.apache.kafka.connect.storage.OffsetStorageWriter;
+import org.apache.kafka.connect.util.Callback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * A implementation of {@link OffsetBackingStore} backed on Flink's state mechanism.
+ *
+ * <p>The {@link #OFFSET_STATE_VALUE} in the {@link WorkerConfig} is the raw position and offset
+ * data in JSON format. It is set into the config when recovery from failover by {@link
+ * DebeziumSourceFunction} before startup the {@link DebeziumEngine}. If it is not a restoration,
+ * the {@link #OFFSET_STATE_VALUE} is empty. {@link DebeziumEngine} relies on the {@link
+ * OffsetBackingStore} for failover recovery.
+ *
+ * @see DebeziumSourceFunction
+ */
+public class FlinkOffsetBackingStore implements OffsetBackingStore {
+
+    public static final String OFFSET_STATE_VALUE = "offset.storage.flink.state.value";
+    public static final int FLUSH_TIMEOUT_SECONDS = 10;
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkOffsetBackingStore.class);
+    protected Map<ByteBuffer, ByteBuffer> data = new HashMap<>();
+    protected ExecutorService executor;
+
+    @Override
+    public void configure(WorkerConfig config) {
+        // eagerly initialize the executor, because OffsetStorageWriter will use it later
+        start();
+
+        Map<String, ?> conf = config.originals();
+        if (!conf.containsKey(OFFSET_STATE_VALUE)) {
+            // a normal startup from clean state, not need to initialize the offset
+            return;
+        }
+
+        String stateJson = (String) conf.get(OFFSET_STATE_VALUE);
+        DebeziumOffsetSerializer serializer = new DebeziumOffsetSerializer();
+        DebeziumOffset debeziumOffset;
+        try {
+            debeziumOffset = serializer.deserialize(stateJson.getBytes(StandardCharsets.UTF_8));
+        } catch (IOException e) {
+            LOG.error("Can't deserialize debezium offset state from JSON: " + stateJson, e);
+            throw new RuntimeException(e);
+        }
+
+        final String engineName = (String) conf.get(EmbeddedEngine.ENGINE_NAME.name());
+        Converter keyConverter = new JsonConverter();
+        Converter valueConverter = new JsonConverter();
+        keyConverter.configure(config.originals(), true);
+        Map<String, Object> valueConfigs = new HashMap<>(conf);
+        valueConfigs.put("schemas.enable", false);
+        valueConverter.configure(valueConfigs, true);
+        OffsetStorageWriter offsetWriter =
+                new OffsetStorageWriter(
+                        this,
+                        // must use engineName as namespace to align with Debezium Engine
+                        // implementation
+                        engineName,
+                        keyConverter,
+                        valueConverter);
+
+        offsetWriter.offset(debeziumOffset.sourcePartition, debeziumOffset.sourceOffset);
+
+        // flush immediately
+        if (!offsetWriter.beginFlush()) {
+            // if nothing is needed to be flushed, there must be something wrong with the
+            // initialization
+            LOG.warn(
+                    "Initialize FlinkOffsetBackingStore from empty offset state, this shouldn't happen.");
+            return;
+        }
+
+        // trigger flushing
+        Future<Void> flushFuture =
+                offsetWriter.doFlush(
+                        (error, result) -> {
+                            if (error != null) {
+                                LOG.error("Failed to flush initial offset.", error);
+                            } else {
+                                LOG.debug("Successfully flush initial offset.");
+                            }
+                        });
+
+        // wait until flushing finished
+        try {
+            flushFuture.get(FLUSH_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+            LOG.info(
+                    "Flush offsets successfully, partition: {}, offsets: {}",
+                    debeziumOffset.sourcePartition,
+                    debeziumOffset.sourceOffset);
+        } catch (InterruptedException e) {
+            LOG.warn("Flush offsets interrupted, cancelling.", e);
+            offsetWriter.cancelFlush();
+        } catch (ExecutionException e) {
+            LOG.error("Flush offsets threw an unexpected exception.", e);
+            offsetWriter.cancelFlush();
+        } catch (TimeoutException e) {
+            LOG.error("Timed out waiting to flush offsets to storage.", e);
+            offsetWriter.cancelFlush();
+        }
+    }
+
+    @Override
+    public void start() {
+        if (executor == null) {
+            executor =

Review Comment:
   Please refactor those lines into one or more lines, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org