You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/17 02:22:22 UTC

[inlong] branch master updated: [INLONG-5046][Agent] Support collect data from PostgreSQL (#5367)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new e11d7d850 [INLONG-5046][Agent] Support collect data from PostgreSQL (#5367)
e11d7d850 is described below

commit e11d7d850b3f1f3dcde2369e02d270b61ec06794
Author: iamsee123 <61...@users.noreply.github.com>
AuthorDate: Wed Aug 17 10:22:18 2022 +0800

    [INLONG-5046][Agent] Support collect data from PostgreSQL (#5367)
---
 .../inlong/agent/constant/PostgreSQLConstants.java |  36 +++
 inlong-agent/agent-plugins/pom.xml                 |   4 +
 .../agent/plugin/sources/PostgreSQLSource.java     |  48 ++++
 .../plugin/sources/reader/PostgreSQLReader.java    | 300 +++++++++++++++++++++
 .../sources/snapshot/PostgreSQLSnapshotBase.java   | 110 ++++++++
 .../sources/PostgreSQLOffsetManagerTest.java       |  67 +++++
 .../agent/plugin/sources/PostgreSQLReaderTest.java | 101 +++++++
 licenses/inlong-agent/LICENSE                      |   1 +
 pom.xml                                            |   5 +
 9 files changed, 672 insertions(+)

diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/PostgreSQLConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/PostgreSQLConstants.java
new file mode 100644
index 000000000..7f114a92d
--- /dev/null
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/PostgreSQLConstants.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.constant;
+
+/**
+ * Constants of job fetcher PostgreSQL snapshot mode
+ */
+public class PostgreSQLConstants {
+
+    public static final String INITIAL = "initial";
+
+    public static final String EARLIEST_OFFSET = "never";
+
+    public static final String ALWAYS = "always";
+
+    public static final String EXPORTED = "exported";
+
+    public static final String INITIAL_ONLY = "initial_only";
+
+    public static final String CUSTOM = "custom";
+}
diff --git a/inlong-agent/agent-plugins/pom.xml b/inlong-agent/agent-plugins/pom.xml
index 48303112a..46f2e040a 100644
--- a/inlong-agent/agent-plugins/pom.xml
+++ b/inlong-agent/agent-plugins/pom.xml
@@ -88,6 +88,10 @@
             <groupId>org.postgresql</groupId>
             <artifactId>postgresql</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.debezium</groupId>
+            <artifactId>debezium-connector-postgres</artifactId>
+        </dependency>
         <dependency>
             <artifactId>awaitility</artifactId>
             <groupId>org.awaitility</groupId>
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PostgreSQLSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PostgreSQLSource.java
new file mode 100644
index 000000000..243d205dd
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PostgreSQLSource.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources;
+
+import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.plugin.Reader;
+import org.apache.inlong.agent.plugin.sources.reader.PostgreSQLReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * PostgreSQL source, split PostgreSQL source job into multi readers
+ */
+public class PostgreSQLSource extends AbstractSource {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(PostgreSQLSource.class);
+
+    public PostgreSQLSource() {
+
+    }
+
+    @Override
+    public List<Reader> split(JobProfile conf) {
+        super.init(conf);
+        Reader postgreSQLReader = new PostgreSQLReader();
+        List<Reader> readerList = new ArrayList<>();
+        readerList.add(postgreSQLReader);
+        sourceMetric.sourceSuccessCount.incrementAndGet();
+        return readerList;
+    }
+}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java
new file mode 100644
index 000000000..5d3e9207c
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources.reader;
+
+import com.google.common.base.Preconditions;
+import com.google.gson.Gson;
+import io.debezium.connector.postgresql.PostgresConnector;
+import io.debezium.engine.ChangeEvent;
+import io.debezium.engine.DebeziumEngine;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.constant.AgentConstants;
+import org.apache.inlong.agent.constant.PostgreSQLConstants;
+import org.apache.inlong.agent.message.DefaultMessage;
+import org.apache.inlong.agent.metrics.AgentMetricItem;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
+import org.apache.inlong.agent.plugin.Message;
+import org.apache.inlong.agent.plugin.sources.snapshot.PostgreSQLSnapshotBase;
+import org.apache.inlong.agent.plugin.utils.InLongFileOffsetBackingStore;
+import org.apache.inlong.agent.pojo.DebeziumFormat;
+import org.apache.inlong.agent.pojo.DebeziumOffset;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.DebeziumOffsetSerializer;
+import org.apache.kafka.connect.storage.FileOffsetBackingStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_MAP_CAPACITY;
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA;
+
+/**
+ * Read postgreSQL data
+ */
+public class PostgreSQLReader extends AbstractReader {
+
+    public static final String COMPONENT_NAME = "PostgreSQLReader";
+    public static final String JOB_POSTGRESQL_USER = "job.postgreSQLJob.user";
+    public static final String JOB_DATABASE_PASSWORD = "job.postgreSQLJob.password";
+    public static final String JOB_DATABASE_HOSTNAME = "job.postgreSQLJob.hostname";
+    public static final String JOB_DATABASE_PORT = "job.postgreSQLJob.port";
+    public static final String JOB_DATABASE_STORE_OFFSET_INTERVAL_MS = "job.postgreSQLjob.offset.intervalMs";
+    public static final String JOB_DATABASE_STORE_HISTORY_FILENAME = "job.postgreSQLjob.history.filename";
+    public static final String JOB_DATABASE_SNAPSHOT_MODE = "job.postgreSQLjob.snapshot.mode";
+    public static final String JOB_DATABASE_QUEUE_SIZE = "job.postgreSQLjob.queueSize";
+    public static final String JOB_DATABASE_OFFSETS = "job.postgreSQLjob.offsets";
+    public static final String JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE = "job.postgreSQLjob.offset.specificOffsetFile";
+    public static final String JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS = "job.postgreSQLjob.offset.specificOffsetPos";
+    public static final String JOB_DATABASE_DBNAME = "job.postgreSQLjob.dbname";
+    public static final String JOB_DATABASE_SERVER_NAME = "job.postgreSQLjob.servername";
+    public static final String JOB_DATABASE_PLUGIN_NAME = "job.postgreSQLjob.pluginname";
+    private static final Gson GSON = new Gson();
+    private static final Logger LOGGER = LoggerFactory.getLogger(PostgreSQLReader.class);
+    private final AgentConfiguration agentConf = AgentConfiguration.getAgentConf();
+    protected AgentMetricItem readerMetric;
+    private String userName;
+    private String password;
+    private String hostName;
+    private String port;
+    private String offsetFlushIntervalMs;
+    private String offsetStoreFileName;
+    private String snapshotMode;
+    private String instanceId;
+    private String offset;
+    private String specificOffsetFile;
+    private String specificOffsetPos;
+    private String dbName;
+    private String pluginName;
+    private String serverName;
+    private PostgreSQLSnapshotBase postgreSQLSnapshot;
+    private boolean finished = false;
+    private ExecutorService executor;
+    /**
+     * pair.left : table name
+     * pair.right : actual data
+     */
+    private LinkedBlockingQueue<Pair<String, String>> postgreSQLMessageQueue;
+    private JobProfile jobProfile;
+    private boolean destroyed = false;
+
+    public PostgreSQLReader() {
+    }
+
+    @Override
+    public Message read() {
+        if (!postgreSQLMessageQueue.isEmpty()) {
+            readerMetric.pluginReadCount.incrementAndGet();
+            return getPostgreSQLMessage();
+        } else {
+            return null;
+        }
+    }
+
+    private DefaultMessage getPostgreSQLMessage() {
+        Pair<String, String> message = postgreSQLMessageQueue.poll();
+        Map<String, String> header = new HashMap<>(DEFAULT_MAP_CAPACITY);
+        header.put(PROXY_KEY_DATA, message.getKey());
+        return new DefaultMessage(message.getValue().getBytes(StandardCharsets.UTF_8), header);
+    }
+
+    @Override
+    public void init(JobProfile jobConf) {
+        super.init(jobConf);
+        jobProfile = jobConf;
+        LOGGER.info("init PostgreSQL reader with jobConf {}", jobConf.toJsonStr());
+        userName = jobConf.get(JOB_POSTGRESQL_USER);
+        password = jobConf.get(JOB_DATABASE_PASSWORD);
+        hostName = jobConf.get(JOB_DATABASE_HOSTNAME);
+        port = jobConf.get(JOB_DATABASE_PORT);
+        dbName = jobConf.get(JOB_DATABASE_DBNAME);
+        serverName = jobConf.get(JOB_DATABASE_SERVER_NAME);
+        pluginName = jobConf.get(JOB_DATABASE_PLUGIN_NAME, "pgoutput");
+        instanceId = jobConf.getInstanceId();
+        offsetFlushIntervalMs = jobConf.get(JOB_DATABASE_STORE_OFFSET_INTERVAL_MS, "100000");
+        offsetStoreFileName = jobConf.get(JOB_DATABASE_STORE_HISTORY_FILENAME,
+                tryToInitAndGetHistoryPath()) + "/offset.dat" + instanceId;
+        snapshotMode = jobConf.get(JOB_DATABASE_SNAPSHOT_MODE, "");
+        postgreSQLMessageQueue = new LinkedBlockingQueue<>(jobConf.getInt(JOB_DATABASE_QUEUE_SIZE, 1000));
+        finished = false;
+
+        offset = jobConf.get(JOB_DATABASE_OFFSETS, "");
+        specificOffsetFile = jobConf.get(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE, "");
+        specificOffsetPos = jobConf.get(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS, "-1");
+        postgreSQLSnapshot = new PostgreSQLSnapshotBase(offsetStoreFileName);
+        postgreSQLSnapshot.save(offset);
+
+        Properties props = getEngineProps();
+
+        DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(
+                        io.debezium.engine.format.Json.class)
+                .using(props)
+                .notifying((records, committer) -> {
+                    try {
+                        for (ChangeEvent<String, String> record : records) {
+                            DebeziumFormat debeziumFormat = GSON
+                                    .fromJson(record.value(), DebeziumFormat.class);
+                            postgreSQLMessageQueue.put(Pair.of(debeziumFormat.getSource().getTable(), record.value()));
+                            committer.markProcessed(record);
+                        }
+                        committer.markBatchFinished();
+                        AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, inlongStreamId,
+                                System.currentTimeMillis(), records.size());
+                        readerMetric.pluginReadCount.addAndGet(records.size());
+                    } catch (Exception e) {
+                        readerMetric.pluginReadFailCount.addAndGet(records.size());
+                        LOGGER.error("parse binlog message error", e);
+                    }
+                })
+                .using((success, message, error) -> {
+                    if (!success) {
+                        LOGGER.error("PostgreSQL job with jobConf {} has error {}", instanceId, message, error);
+                    }
+                }).build();
+
+        executor = Executors.newSingleThreadExecutor();
+        executor.execute(engine);
+
+        LOGGER.info("get initial snapshot of job {}, snapshot {}", instanceId, getSnapshot());
+    }
+
+    private String tryToInitAndGetHistoryPath() {
+        String historyPath = agentConf.get(
+                AgentConstants.AGENT_HISTORY_PATH, AgentConstants.DEFAULT_AGENT_HISTORY_PATH);
+        String parentPath = agentConf.get(
+                AgentConstants.AGENT_HOME, AgentConstants.DEFAULT_AGENT_HOME);
+        return AgentUtils.makeDirsIfNotExist(historyPath, parentPath).getAbsolutePath();
+    }
+
+    private Properties getEngineProps() {
+        Properties props = new Properties();
+
+        props.setProperty("name", "engine" + instanceId);
+        props.setProperty("connector.class", PostgresConnector.class.getCanonicalName());
+        props.setProperty("database.server.name", serverName);
+        props.setProperty("plugin.name", pluginName);
+        props.setProperty("database.hostname", hostName);
+        props.setProperty("database.port", port);
+        props.setProperty("database.user", userName);
+        props.setProperty("database.dbname", dbName);
+        props.setProperty("database.password", password);
+
+        props.setProperty("offset.flush.interval.ms", offsetFlushIntervalMs);
+        props.setProperty("database.snapshot.mode", snapshotMode);
+        props.setProperty("key.converter.schemas.enable", "false");
+        props.setProperty("value.converter.schemas.enable", "false");
+        props.setProperty("snapshot.mode", snapshotMode);
+        props.setProperty("offset.storage.file.filename", offsetStoreFileName);
+        if (PostgreSQLConstants.CUSTOM.equals(snapshotMode)) {
+            props.setProperty("offset.storage", InLongFileOffsetBackingStore.class.getCanonicalName());
+            props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE, serializeOffset());
+        } else {
+            props.setProperty("offset.storage", FileOffsetBackingStore.class.getCanonicalName());
+        }
+        props.setProperty("tombstones.on.delete", "false");
+        props.setProperty("converters", "datetime");
+        props.setProperty("datetime.type", "org.apache.inlong.agent.plugin.utils.BinlogTimeConverter");
+        props.setProperty("datetime.format.date", "yyyy-MM-dd");
+        props.setProperty("datetime.format.time", "HH:mm:ss");
+        props.setProperty("datetime.format.datetime", "yyyy-MM-dd HH:mm:ss");
+        props.setProperty("datetime.format.timestamp", "yyyy-MM-dd HH:mm:ss");
+
+        LOGGER.info("PostgreSQL job {} start with props {}", jobProfile.getInstanceId(), props);
+        return props;
+    }
+
+    private String serializeOffset() {
+        Map<String, Object> sourceOffset = new HashMap<>();
+        Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE,
+                JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE + " cannot be null");
+        sourceOffset.put("file", specificOffsetFile);
+        Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS,
+                JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS + " cannot be null");
+        sourceOffset.put("pos", specificOffsetPos);
+        DebeziumOffset specificOffset = new DebeziumOffset();
+        specificOffset.setSourceOffset(sourceOffset);
+        Map<String, String> sourcePartition = new HashMap<>();
+        sourcePartition.put("server", instanceId);
+        specificOffset.setSourcePartition(sourcePartition);
+        byte[] serializedOffset = new byte[0];
+        try {
+            serializedOffset = DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset);
+        } catch (IOException e) {
+            LOGGER.error("serialize offset message error", e);
+        }
+        return new String(serializedOffset, StandardCharsets.UTF_8);
+    }
+
+    @Override
+    public void destroy() {
+        synchronized (this) {
+            if (!destroyed) {
+                executor.shutdownNow();
+                postgreSQLSnapshot.close();
+                destroyed = true;
+            }
+        }
+    }
+
+    @Override
+    public boolean isFinished() {
+        return finished;
+    }
+
+    @Override
+    public String getReadSource() {
+        return instanceId;
+    }
+
+    @Override
+    public void setReadTimeout(long mill) {
+        return;
+    }
+
+    @Override
+    public void setWaitMillisecond(long millis) {
+        return;
+    }
+
+    @Override
+    public String getSnapshot() {
+        if (postgreSQLSnapshot != null) {
+            return postgreSQLSnapshot.getSnapshot();
+        }
+        return "";
+    }
+
+    @Override
+    public void finishRead() {
+        finished = true;
+    }
+
+    @Override
+    public boolean isSourceExist() {
+        return true;
+    }
+}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/PostgreSQLSnapshotBase.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/PostgreSQLSnapshotBase.java
new file mode 100644
index 000000000..d27213389
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/PostgreSQLSnapshotBase.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources.snapshot;
+
+import org.apache.inlong.agent.utils.ThreadUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.util.Base64;
+
+/**
+ * PostgreSQL Snapshot
+ */
+public class PostgreSQLSnapshotBase implements SnapshotBase {
+
+    public static final int BUFFER_SIZE = 1024;
+    public static final int START_OFFSET = 0;
+    private static final Logger LOGGER = LoggerFactory.getLogger(PostgreSQLSnapshotBase.class);
+    private final Base64.Decoder decoder = Base64.getDecoder();
+    private final Base64.Encoder encoder = Base64.getEncoder();
+    private File file;
+    private byte[] offset;
+
+    public PostgreSQLSnapshotBase(String filePath) {
+        file = new File(filePath);
+    }
+
+    @Override
+    public String getSnapshot() {
+        load();
+        return encoder.encodeToString(offset);
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    /**
+     * Load postgres offset from local file
+     */
+    private void load() {
+        try {
+            if (!file.exists()) {
+                // if parentDir not exist, create first
+                File parentDir = file.getParentFile();
+                if (parentDir == null) {
+                    LOGGER.info("no parent dir, file:{}", file.getAbsolutePath());
+                    return;
+                }
+                if (!parentDir.exists()) {
+                    boolean success = parentDir.mkdir();
+                    LOGGER.info("create dir {} result {}", parentDir, success);
+                }
+                file.createNewFile();
+            }
+            FileInputStream fis = new FileInputStream(file);
+            BufferedInputStream inputStream = new BufferedInputStream(fis);
+            ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+            int len;
+            byte[] buf = new byte[BUFFER_SIZE];
+            while ((len = inputStream.read(buf)) != -1) {
+                outputStream.write(buf, START_OFFSET, len);
+            }
+            offset = outputStream.toByteArray();
+            inputStream.close();
+            outputStream.close();
+        } catch (Throwable ex) {
+            LOGGER.error("load PostgreSQL WAL log error", ex);
+            ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex);
+        }
+    }
+
+    /**
+     * Save PostgreSQL offset to local file
+     */
+    public void save(String snapshot) {
+        byte[] bytes = decoder.decode(snapshot);
+        if (bytes.length != 0) {
+            offset = bytes;
+            try (OutputStream output = Files.newOutputStream(file.toPath())) {
+                output.write(bytes);
+            } catch (Throwable e) {
+                LOGGER.error("save offset to file error", e);
+                ThreadUtils.threadThrowableHandler(Thread.currentThread(), e);
+            }
+        }
+    }
+}
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/PostgreSQLOffsetManagerTest.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/PostgreSQLOffsetManagerTest.java
new file mode 100644
index 000000000..63cb8ae93
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/PostgreSQLOffsetManagerTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
+import org.apache.inlong.agent.plugin.sources.snapshot.PostgreSQLSnapshotBase;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+/**
+ * Test for PostgreSQL snapshot
+ */
+public class PostgreSQLOffsetManagerTest {
+
+    private static AgentBaseTestsHelper helper;
+
+    private static final String fileName = "testPostgreSQL.txt";
+
+    private static Path filePath;
+
+    @BeforeClass
+    public static void setup() {
+        helper = new AgentBaseTestsHelper(PostgreSQLOffsetManagerTest.class.getName()).setupAgentHome();
+        Path testDir = helper.getTestRootDir();
+        filePath = Paths.get(testDir.toString(), fileName);
+    }
+
+    @AfterClass
+    public static void teardown() {
+        helper.teardownAgentHome();
+    }
+
+    @Test
+    public void testOffset() {
+        PostgreSQLSnapshotBase snapshotManager = new PostgreSQLSnapshotBase(filePath.toString());
+        byte[] snapshotBytes = new byte[]{-65,-14,23};
+        final Base64 base64 = new Base64();
+        String encodeSnapshot = base64.encodeAsString(snapshotBytes);
+        snapshotManager.save(encodeSnapshot);
+        Assert.assertEquals(snapshotManager.getSnapshot(),encodeSnapshot);
+        File file = new File(filePath.toString());
+        Assert.assertEquals(file.exists(),true);
+        System.out.println(file.getAbsolutePath());
+    }
+}
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/PostgreSQLReaderTest.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/PostgreSQLReaderTest.java
new file mode 100644
index 000000000..812f7604a
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/PostgreSQLReaderTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources;
+
+import com.google.gson.Gson;
+import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.constant.SnapshotModeConstants;
+import org.apache.inlong.agent.plugin.Message;
+import org.apache.inlong.agent.plugin.sources.reader.PostgreSQLReader;
+import org.apache.inlong.agent.pojo.DebeziumFormat;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
+
+/**
+ * Test for PostgreSQL reader
+ */
+public class PostgreSQLReaderTest {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(PostgreSQLReaderTest.class);
+    private static final Gson GSON = new Gson();
+
+    @Test
+    public void testDebeziumFormat() {
+        String debeziumJson = "{\n"
+                + "    \"before\": null,\n"
+                + "    \"after\": {\n"
+                + "      \"id\": 1004,\n"
+                + "      \"first_name\": \"name1\",\n"
+                + "      \"last_name\": \"name2\"\n"
+                + "    },\n"
+                + "    \"source\": {\n"
+                + "      \"version\": \"12\",\n"
+                + "      \"name\": \"myserver\",\n"
+                + "      \"ts_sec\": 0,\n"
+                + "      \"gtid\": null,\n"
+                + "      \"file\": \"000000010000000000000001\",\n"
+                + "      \"row\": 0,\n"
+                + "      \"snapshot\": true,\n"
+                + "      \"thread\": null,\n"
+                + "      \"db\": \"postgres\",\n"
+                + "      \"table\": \"customers\"\n"
+                + "    },\n"
+                + "    \"op\": \"r\",\n"
+                + "    \"ts_ms\": 1486500577691\n"
+                + "  }";
+        DebeziumFormat debeziumFormat = GSON.fromJson(debeziumJson, DebeziumFormat.class);
+        Assert.assertEquals("customers", debeziumFormat.getSource().getTable());
+        Assert.assertEquals("true", debeziumFormat.getSource().getSnapshot());
+    }
+
+    /**
+     * this test is used for testing collect data from postgreSQL in unit test,
+     * and it may cause failure in compile
+     * thus we annotate it.
+     */
+    // @Test
+    public void postgresLoadTest() {
+        JobProfile jobProfile = new JobProfile();
+        jobProfile.set(PostgreSQLReader.JOB_POSTGRESQL_USER, "postgres");
+        jobProfile.set(PostgreSQLReader.JOB_DATABASE_SERVER_NAME, "postgres");
+        jobProfile.set(PostgreSQLReader.JOB_DATABASE_PLUGIN_NAME, "pgoutput");
+        jobProfile.set(PostgreSQLReader.JOB_DATABASE_PASSWORD, "123456");
+        jobProfile.set(PostgreSQLReader.JOB_DATABASE_HOSTNAME, "localhost");
+        jobProfile.set(PostgreSQLReader.JOB_DATABASE_PORT, "5432");
+        jobProfile.set(PostgreSQLReader.JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE, "000000010000000000000001");
+        jobProfile.set(PostgreSQLReader.JOB_DATABASE_SNAPSHOT_MODE, SnapshotModeConstants.INITIAL);
+        jobProfile.set(PostgreSQLReader.JOB_DATABASE_DBNAME, "postgres");
+        jobProfile.set("job.instance.id", "_1");
+        jobProfile.set(PROXY_INLONG_GROUP_ID, "groupid");
+        jobProfile.set(PROXY_INLONG_STREAM_ID, "streamid");
+        PostgreSQLReader postgreSQLReader = new PostgreSQLReader();
+        postgreSQLReader.init(jobProfile);
+        while (true) {
+            Message message = postgreSQLReader.read();
+            if (message != null) {
+                LOGGER.info("read message is {}", message.toString());
+                break;
+            }
+        }
+    }
+}
diff --git a/licenses/inlong-agent/LICENSE b/licenses/inlong-agent/LICENSE
index 94420705e..dc8bd8022 100644
--- a/licenses/inlong-agent/LICENSE
+++ b/licenses/inlong-agent/LICENSE
@@ -367,6 +367,7 @@ The text of each license is the standard Apache 2.0 license.
   org.apache.curator:curator-recipes:2.12.0 - Curator Recipes (https://curator.apache.org/curator-recipes), (The Apache Software License, Version 2.0)
   io.debezium:debezium-api:1.8.0.Final - Debezium API (https://github.com/debezium/debezium/tree/v1.8.0.Final), (The Apache Software License, Version 2.0)
   io.debezium:debezium-connector-mysql:1.8.0.Final - Debezium Connector for MySQL (https://github.com/debezium/debezium/tree/v1.8.0.Final), (The Apache Software License, Version 2.0)
+  io.debezium:debezium-connector-postgres:1.8.0.Final - Debezium Connector for PostgreSQL (https://github.com/debezium/debezium/tree/v1.8.0.Final), (The Apache Software License, Version 2.0)
   io.debezium:debezium-core:1.8.0.Final - Debezium Core (https://github.com/debezium/debezium/tree/v1.8.0.Final), (The Apache Software License, Version 2.0)
   io.debezium:debezium-ddl-parser:1.8.0.Final - Debezium ANTLR DDL parsers (https://github.com/debezium/debezium/tree/v1.8.0.Final), (The Apache Software License, Version 2.0)
   io.debezium:debezium-embedded:1.8.0.Final - Debezium Embedded (https://github.com/debezium/debezium/tree/v1.8.0.Final), (The Apache Software License, Version 2.0)
diff --git a/pom.xml b/pom.xml
index 05800d85a..aa43f1121 100644
--- a/pom.xml
+++ b/pom.xml
@@ -589,6 +589,11 @@
                 <artifactId>debezium-connector-mysql</artifactId>
                 <version>${debezium.version}</version>
             </dependency>
+            <dependency>
+                <groupId>io.debezium</groupId>
+                <artifactId>debezium-connector-postgres</artifactId>
+                <version>${debezium.version}</version>
+            </dependency>
             <dependency>
                 <groupId>com.h2database</groupId>
                 <artifactId>h2</artifactId>