You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/02/26 11:51:41 UTC

[incubator-inlong] branch master updated: [INLONG-2687][Agent] Agent provides binlog reader ability using debezium engine (#2737)

This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fc08fd  [INLONG-2687][Agent] Agent provides binlog reader ability using debezium engine (#2737)
1fc08fd is described below

commit 1fc08fd808e60d050f450f766a47748bb13ddd30
Author: ziruipeng <zp...@connect.ust.hk>
AuthorDate: Sat Feb 26 19:50:45 2022 +0800

    [INLONG-2687][Agent] Agent provides binlog reader ability using debezium engine (#2737)
    
    * [INLONG-2687][Agent][Feature] Agent provide binlog reader ability using debezium engine #2687
---
 inlong-agent/agent-plugins/pom.xml                 |  16 ++
 .../inlong/agent/plugin/sources/BinlogSource.java  |  64 +++++++
 .../agent/plugin/sources/reader/BinlogReader.java  | 203 +++++++++++++++++++++
 .../sources/snapshot/BinlogSnapshotBase.java       |  87 +++++++++
 .../plugin/sources/snapshot/SnapshotBase.java      |  33 ++++
 .../plugin/sources/TestBinlogOffsetManager.java    |  57 ++++++
 6 files changed, 460 insertions(+)

diff --git a/inlong-agent/agent-plugins/pom.xml b/inlong-agent/agent-plugins/pom.xml
index 2c9b22a..5124dad 100644
--- a/inlong-agent/agent-plugins/pom.xml
+++ b/inlong-agent/agent-plugins/pom.xml
@@ -43,6 +43,7 @@
     <properties>
         <powermock.version>2.0.2</powermock.version>
         <mysql-connector-java.version>8.0.26</mysql-connector-java.version>
+        <version.debezium>1.8.1.Final</version.debezium>
     </properties>
 
     <dependencies>
@@ -51,6 +52,21 @@
             <groupId>com.google.guava</groupId>
         </dependency>
         <dependency>
+            <groupId>io.debezium</groupId>
+            <artifactId>debezium-api</artifactId>
+            <version>${version.debezium}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.debezium</groupId>
+            <artifactId>debezium-embedded</artifactId>
+            <version>${version.debezium}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.debezium</groupId>
+            <artifactId>debezium-connector-mysql</artifactId>
+            <version>${version.debezium}</version>
+        </dependency>
+        <dependency>
             <groupId>org.powermock</groupId>
             <artifactId>powermock-module-junit4</artifactId>
             <version>${powermock.version}</version>
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/BinlogSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/BinlogSource.java
new file mode 100644
index 0000000..dd073ec
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/BinlogSource.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.plugin.Reader;
+import org.apache.inlong.agent.plugin.Source;
+import org.apache.inlong.agent.plugin.metrics.SourceJmxMetric;
+import org.apache.inlong.agent.plugin.metrics.SourceMetrics;
+import org.apache.inlong.agent.plugin.metrics.SourcePrometheusMetrics;
+import org.apache.inlong.agent.plugin.sources.reader.BinlogReader;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.ConfigUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BinlogSource implements Source {
+
+    private final SourceMetrics sourceMetrics;
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(TextFileSource.class);
+
+    private static final String BINLOG_SOURCE_TAG_NAME = "BinlogSourceMetric";
+
+    private static AtomicLong metricsIndex = new AtomicLong(0);
+
+    public BinlogSource() {
+        if (ConfigUtil.isPrometheusEnabled()) {
+            this.sourceMetrics = new SourcePrometheusMetrics(AgentUtils.getUniqId(
+                BINLOG_SOURCE_TAG_NAME, metricsIndex.incrementAndGet()));
+        } else {
+            this.sourceMetrics = new SourceJmxMetric(AgentUtils.getUniqId(
+                BINLOG_SOURCE_TAG_NAME, metricsIndex.incrementAndGet()));
+        }
+    }
+
+    @Override
+    public List<Reader> split(JobProfile conf) {
+        Reader binlogReader = new BinlogReader();
+        List<Reader> readerList = new ArrayList<>();
+        readerList.add(binlogReader);
+        sourceMetrics.incSourceSuccessCount();
+        return readerList;
+    }
+
+}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
new file mode 100644
index 0000000..a702542
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources.reader;
+
+import io.debezium.connector.mysql.MySqlConnector;
+import io.debezium.engine.ChangeEvent;
+import io.debezium.engine.DebeziumEngine;
+import io.debezium.relational.history.FileDatabaseHistory;
+import java.nio.charset.StandardCharsets;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.message.DefaultMessage;
+import org.apache.inlong.agent.plugin.Message;
+import org.apache.inlong.agent.plugin.Reader;
+import org.apache.inlong.agent.plugin.sources.snapshot.BinlogSnapshotBase;
+import org.apache.kafka.connect.storage.FileOffsetBackingStore;
+import org.slf4j.Logger;
+
+import org.slf4j.LoggerFactory;
+
+public class BinlogReader implements Reader {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(BinlogReader.class);
+
+    private static final String JOB_DATABASE_USER = "job.binlogJob.user";
+    private static final String JOB_DATABASE_PASSWORD = "job.binlogJob.password";
+    private static final String JOB_DATABASE_HOSTNAME = "job.binlogJob.hostname";
+    private static final String JOB_TABLE_WHITELIST = "job.binlogJob.tableWhiteList";
+    private static final String JOB_DATABASE_WHITELIST = "job.binlogJob.databaseWhiteList";
+
+    private static final String JOB_DATABASE_SNAPSHOT = "job.binlogJob.offset";
+    private static final String JOB_DATABASE_OFFSET_FILENAME = "job.binlogJob.offset.filename";
+
+    private static final String JOB_DATABASE_SERVER_TIME_ZONE = "job.binlogJob.serverTimezone";
+    private static final String JOB_DATABASE_STORE_OFFSET_INTERVAL_MS = "job.binlogJob.offset.intervalMs";
+
+    private static final String JOB_DATABASE_STORE_HISTORY_FILENAME = "job.binlogJob.history.filename";
+    private static final String JOB_DATABASE_INCLUDE_SCHEMA_CHANGES = "job.binlogJob.schema";
+    private static final String JOB_DATABASE_SNAPSHOT_MODE = "job.binlogJob.snapshot.mode";
+    private static final String JOB_DATABASE_HISTORY_MONITOR_DDL = "job.binlogJob.ddl";
+    private static final String JOB_DATABASE_PORT = "job.binlogJob.port";
+
+    private static LinkedBlockingQueue<String> messageQueue = new LinkedBlockingQueue<>();
+
+    private boolean finished = false;
+    private String userName;
+    private String password;
+    private String hostName;
+    private String port;
+    private String tableWhiteList;
+    private String databaseWhiteList;
+    private String serverTimeZone;
+    private String offsetStoreFileName;
+    private String offsetFlushIntervalMs;
+    private String databaseStoreHistoryName;
+    private String includeSchemaChanges;
+    private String snapshotMode;
+    private String historyMonitorDdl;
+    private String instanceId;
+    private ExecutorService executor;
+    private String offset;
+    private BinlogSnapshotBase binlogSnapshot;
+    private JobProfile jobProfile;
+
+    public BinlogReader() {
+    }
+
+    @Override
+    public Message read() {
+        if (!messageQueue.isEmpty()) {
+            return new DefaultMessage(messageQueue.poll().getBytes(StandardCharsets.UTF_8));
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public void init(JobProfile jobConf) {
+        jobProfile = jobConf;
+        LOGGER.info("init binlog reader with jobConf {}", jobConf.toJsonStr());
+        userName = jobConf.get(JOB_DATABASE_USER);
+        password = jobConf.get(JOB_DATABASE_PASSWORD);
+        hostName = jobConf.get(JOB_DATABASE_HOSTNAME);
+        port = jobConf.get(JOB_DATABASE_PORT);
+        tableWhiteList = jobConf.get(JOB_TABLE_WHITELIST, "");
+        databaseWhiteList = jobConf.get(JOB_DATABASE_WHITELIST, "");
+        serverTimeZone = jobConf.get(JOB_DATABASE_SERVER_TIME_ZONE, "");
+        offsetFlushIntervalMs = jobConf.get(JOB_DATABASE_STORE_OFFSET_INTERVAL_MS, "1000");
+        databaseStoreHistoryName = jobConf.get(JOB_DATABASE_STORE_HISTORY_FILENAME)
+            + "history.dat" + jobConf.getInstanceId();
+        offsetStoreFileName = jobConf.get(JOB_DATABASE_STORE_HISTORY_FILENAME)
+            + "offset.dat" + jobConf.getInstanceId();
+        snapshotMode = jobConf.get(JOB_DATABASE_SNAPSHOT_MODE, "");
+        includeSchemaChanges = jobConf.get(JOB_DATABASE_INCLUDE_SCHEMA_CHANGES, "false");
+        historyMonitorDdl = jobConf.get(JOB_DATABASE_HISTORY_MONITOR_DDL, "false");
+        instanceId = jobConf.getInstanceId();
+        finished = false;
+
+        offset = jobConf.get(JOB_DATABASE_SNAPSHOT, "");
+        binlogSnapshot = new BinlogSnapshotBase(offsetStoreFileName);
+        binlogSnapshot.save(offset);
+
+        Properties props = getEngineProps();
+
+        DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(
+            io.debezium.engine.format.Json.class)
+            .using(props)
+            .notifying((records, committer) -> {
+                for (ChangeEvent<String, String> record : records) {
+                    messageQueue.add(record.value());
+                    committer.markProcessed(record);
+                }
+                committer.markBatchFinished();
+            })
+            .using((success, message, error) -> {
+            }).build();
+
+        executor = Executors.newSingleThreadExecutor();
+        executor.execute(engine);
+    }
+
+    private Properties getEngineProps() {
+        Properties props = new Properties();
+
+        props.setProperty("name", "engine" + instanceId);
+        props.setProperty("connector.class", MySqlConnector.class.getCanonicalName());
+
+        props.setProperty("database.server.name", instanceId);
+        props.setProperty("database.hostname", hostName);
+        props.setProperty("database.port", port);
+        props.setProperty("database.user", userName);
+        props.setProperty("database.password", password);
+        props.setProperty("database.serverTimezone", serverTimeZone);
+        props.setProperty("table.whitelist", tableWhiteList);
+        props.setProperty("database.whitelist", databaseWhiteList);
+
+        props.setProperty("offset.storage", FileOffsetBackingStore.class.getCanonicalName());
+        props.setProperty("offset.storage.file.filename", offsetStoreFileName);
+        props.setProperty("offset.flush.interval.ms", offsetFlushIntervalMs);
+        props.setProperty("database.history", FileDatabaseHistory.class.getCanonicalName());
+        props.setProperty("database.history.file.filename", databaseStoreHistoryName);
+        props.setProperty("database.snapshot.mode", snapshotMode);
+        props.setProperty("database.history.store.only.monitored.tables.ddl", historyMonitorDdl);
+        props.setProperty("key.converter.schemas.enable", "false");
+        props.setProperty("value.converter.schemas.enable", "false");
+        props.setProperty("include.schema.changes", includeSchemaChanges);
+        props.setProperty("snapshot.mode", snapshotMode);
+        props.setProperty("tombstones.on.delete", "false");
+        LOGGER.info("binlog job {} start with props {}", jobProfile.getInstanceId(), props);
+        return props;
+    }
+
+    @Override
+    public void destroy() {
+        finished = true;
+        executor.shutdownNow();
+        binlogSnapshot.close();
+    }
+
+    @Override
+    public boolean isFinished() {
+        return finished;
+    }
+
+    @Override
+    public String getReadSource() {
+        return instanceId;
+    }
+
+    @Override
+    public void setReadTimeout(long mill) {
+        return;
+    }
+
+    @Override
+    public void setWaitMillisecs(long millis) {
+        return;
+    }
+
+    @Override
+    public String getSnapshot() {
+        return binlogSnapshot.getSnapshot();
+    }
+
+}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/BinlogSnapshotBase.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/BinlogSnapshotBase.java
new file mode 100644
index 0000000..0c8b8dd
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/BinlogSnapshotBase.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources.snapshot;
+
+import java.io.BufferedInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BinlogSnapshotBase implements SnapshotBase {
+
+    private static final Logger log = LoggerFactory.getLogger(BinlogSnapshotBase.class);
+    public static final int BUFFER_SIZE = 1024;
+    public static final int START_OFFSET = 0;
+
+    private File file;
+
+    private byte[] offset;
+
+    public BinlogSnapshotBase(String filePath) {
+        file = new File(filePath);
+    }
+
+    @Override
+    public String getSnapshot() {
+        load();
+        return new String(offset, StandardCharsets.ISO_8859_1);
+    }
+
+    @Override
+    public void close() {
+    }
+
+    public void load() {
+        try {
+            if (!file.exists()) {
+                file.createNewFile();
+            }
+            FileInputStream fis = new FileInputStream(file);
+            BufferedInputStream inputStream = new BufferedInputStream(fis);
+            ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+            int len;
+            byte[] buf = new byte[BUFFER_SIZE];
+            while ((len = inputStream.read(buf)) != -1) {
+                outputStream.write(buf, START_OFFSET, len);
+            }
+            offset = outputStream.toByteArray();
+            inputStream.close();
+            outputStream.close();
+        } catch (Exception ex) {
+            log.error("load binlog offset error", ex);
+        }
+    }
+
+    public void save(String snapshot) {
+        byte[] bytes = snapshot.getBytes(StandardCharsets.ISO_8859_1);
+        if (bytes.length != 0) {
+            offset = bytes;
+            try (OutputStream output = new FileOutputStream(file)) {
+                output.write(bytes);
+            } catch (Exception e) {
+                log.error("save offset to file error", e);
+            }
+        }
+    }
+
+}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/SnapshotBase.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/SnapshotBase.java
new file mode 100644
index 0000000..cef7cfe
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/SnapshotBase.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources.snapshot;
+
+public interface SnapshotBase {
+
+    /**
+     * get snapshot of the job
+     * @return
+     */
+    String getSnapshot();
+
+    /**
+     * close resources
+     */
+    void close();
+
+}
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestBinlogOffsetManager.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestBinlogOffsetManager.java
new file mode 100644
index 0000000..ff45ec0
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestBinlogOffsetManager.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources;
+
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
+import org.apache.inlong.agent.plugin.sources.snapshot.BinlogSnapshotBase;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestBinlogOffsetManager {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(TestTextFileReader.class);
+    private static Path testDir;
+    private static AgentBaseTestsHelper helper;
+
+    @BeforeClass
+    public static void setup() {
+        helper = new AgentBaseTestsHelper(TestTextFileReader.class.getName()).setupAgentHome();
+        testDir = helper.getTestRootDir();
+    }
+
+    @AfterClass
+    public static void teardown() {
+        helper.teardownAgentHome();
+    }
+
+    @Test
+    public void testOffset() {
+        BinlogSnapshotBase snapshotManager = new BinlogSnapshotBase(testDir.toString());
+        byte[] snapshotBytes = new byte[]{-65, -14, -23};
+        String snapshotString = new String(snapshotBytes, StandardCharsets.ISO_8859_1);
+        snapshotManager.save(snapshotString);
+        Assert.assertEquals(snapshotManager.getSnapshot(), snapshotString);
+    }
+
+}