You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/07/01 06:19:51 UTC

[inlong] branch master updated: [INLONG-4726][SortStandalone] Support ClickHouse sink (#4728)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new bf8eb3a35 [INLONG-4726][SortStandalone] Support ClickHouse sink (#4728)
bf8eb3a35 is described below

commit bf8eb3a3514b927d1030291a7f30c4fa922ef2f2
Author: 卢春亮 <94...@qq.com>
AuthorDate: Fri Jul 1 14:19:46 2022 +0800

    [INLONG-4726][SortStandalone] Support ClickHouse sink (#4728)
---
 .../sort-standalone-source/pom.xml                 |   4 +
 .../sink/clickhouse/ClickHouseChannelWorker.java   | 158 ++++++++
 .../sink/clickhouse/ClickHouseIdConfig.java        | 218 ++++++++++
 .../standalone/sink/clickhouse/ClickHouseSink.java | 155 ++++++++
 .../sink/clickhouse/ClickHouseSinkContext.java     | 438 +++++++++++++++++++++
 .../sink/clickhouse/DefaultEventHandler.java       | 207 ++++++++++
 .../standalone/sink/clickhouse/IEventHandler.java  |  50 +++
 7 files changed, 1230 insertions(+)

diff --git a/inlong-sort-standalone/sort-standalone-source/pom.xml b/inlong-sort-standalone/sort-standalone-source/pom.xml
index 2ab041050..569fba2ad 100644
--- a/inlong-sort-standalone/sort-standalone-source/pom.xml
+++ b/inlong-sort-standalone/sort-standalone-source/pom.xml
@@ -47,5 +47,9 @@
             <artifactId>audit-sdk</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>ru.yandex.clickhouse</groupId>
+            <artifactId>clickhouse-jdbc</artifactId>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/clickhouse/ClickHouseChannelWorker.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/clickhouse/ClickHouseChannelWorker.java
new file mode 100644
index 000000000..1d85a5fd0
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/clickhouse/ClickHouseChannelWorker.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.clickhouse;
+
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.dispatch.DispatchProfile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Map;
+
+/**
+ * ClickHouseChannelWorker
+ */
+public class ClickHouseChannelWorker extends Thread {
+
+    public static final Logger LOG = LoggerFactory.getLogger(ClickHouseChannelWorker.class);
+
+    private final ClickHouseSinkContext context;
+    private final int workerIndex;
+    private LifecycleState status;
+    private IEventHandler handler;
+    private Connection conn;
+
+    /**
+     * Constructor
+     * 
+     * @param context
+     * @param workerIndex
+     */
+    public ClickHouseChannelWorker(ClickHouseSinkContext context, int workerIndex) {
+        this.context = context;
+        this.workerIndex = workerIndex;
+        this.status = LifecycleState.IDLE;
+        this.handler = this.context.createEventHandler();
+    }
+
+    /**
+     * run
+     */
+    @Override
+    public void run() {
+        status = LifecycleState.START;
+        LOG.info("start to ClickHouseChannelWorker:{},status:{},index:{}", context.getTaskName(), status, workerIndex);
+        while (status == LifecycleState.START) {
+            try {
+                this.doRun();
+            } catch (Throwable t) {
+                LOG.error(t.getMessage(), t);
+            }
+        }
+    }
+
+    /**
+     * doRun
+     */
+    public void doRun() {
+        DispatchProfile currentRecord = context.getDispatchQueue().poll();
+        try {
+            // prepare
+            if (currentRecord == null) {
+                this.sleepOneInterval();
+                return;
+            }
+            // check config
+            ClickHouseIdConfig idConfig = context.getIdConfig(currentRecord.getUid());
+            if (idConfig == null) {
+                context.addSendFailMetric("idConfig is null", currentRecord);
+                currentRecord.ack();
+                return;
+            }
+            // check sql
+            String insertSql = idConfig.getInsertSql();
+            if (insertSql == null) {
+                context.addSendFailMetric("sql is null", currentRecord);
+                currentRecord.ack();
+                return;
+            }
+            // execute sql
+            if (this.conn == null) {
+                this.reconnect();
+            }
+            try (PreparedStatement pstat = this.conn.prepareStatement(insertSql)) {
+                for (ProfileEvent event : currentRecord.getEvents()) {
+                    Map<String, String> columnValueMap = this.handler.parse(idConfig, event);
+                    this.handler.setValue(idConfig, columnValueMap, pstat);
+                    pstat.addBatch();
+                }
+                pstat.executeBatch();
+                this.conn.commit();
+            } catch (Exception e) {
+                this.reconnect();
+            }
+        } catch (Throwable e) {
+            LOG.error(e.getMessage(), e);
+            if (currentRecord != null) {
+                context.getDispatchQueue().add(currentRecord);
+            }
+            this.sleepOneInterval();
+        }
+    }
+
+    /**
+     * close
+     */
+    public void close() {
+        this.status = LifecycleState.STOP;
+    }
+
+    /**
+     * sleepOneInterval
+     */
+    private void sleepOneInterval() {
+        try {
+            Thread.sleep(context.getProcessInterval());
+        } catch (InterruptedException e1) {
+            LOG.error(e1.getMessage(), e1);
+        }
+    }
+
+    /**
+     * reconnect
+     * @throws SQLException
+     */
+    private void reconnect() throws SQLException {
+        if (this.conn != null) {
+            try {
+                this.conn.close();
+            } catch (Exception e) {
+                LOG.error(e.getMessage(), e);
+            }
+            this.conn = null;
+        }
+        this.conn = DriverManager.getConnection(context.getJdbcUrl(), context.getJdbcUsername(),
+                context.getJdbcPassword());
+        this.conn.setAutoCommit(false);
+    }
+}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/clickhouse/ClickHouseIdConfig.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/clickhouse/ClickHouseIdConfig.java
new file mode 100644
index 000000000..bafac7bc2
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/clickhouse/ClickHouseIdConfig.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.clickhouse;
+
+import org.apache.commons.math3.util.Pair;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * 
+ * ClickHouseIdConfig
+ */
+public class ClickHouseIdConfig {
+
+    public static final String FIELD_NAME_SEPARATOR = ",";
+
+    private String inlongGroupId;
+    private String inlongStreamId;
+    private String separator = "|";
+    private String contentFieldNames;
+    private int contentOffset = 0;// except for boss + tab(1)
+    private String tableName;
+    private String dbFieldNames;
+    // parse
+    private List<String> contentFieldList;
+    private List<Pair<String, Integer>> dbFieldList;
+    private String insertSql;
+
+    /**
+     * parseFieldList
+     */
+    public static List<String> parseFieldNames(String fieldNames) {
+        List<String> fieldList = new ArrayList<>();
+        if (fieldNames != null) {
+            String[] fieldNameArray = fieldNames.split(FIELD_NAME_SEPARATOR);
+            fieldList.addAll(Arrays.asList(fieldNameArray));
+        }
+        return fieldList;
+    }
+
+    /**
+     * get inlongGroupId
+     * @return the inlongGroupId
+     */
+    public String getInlongGroupId() {
+        return inlongGroupId;
+    }
+
+    /**
+     * set inlongGroupId
+     * @param inlongGroupId the inlongGroupId to set
+     */
+    public void setInlongGroupId(String inlongGroupId) {
+        this.inlongGroupId = inlongGroupId;
+    }
+
+    /**
+     * get inlongStreamId
+     * @return the inlongStreamId
+     */
+    public String getInlongStreamId() {
+        return inlongStreamId;
+    }
+
+    /**
+     * set inlongStreamId
+     * @param inlongStreamId the inlongStreamId to set
+     */
+    public void setInlongStreamId(String inlongStreamId) {
+        this.inlongStreamId = inlongStreamId;
+    }
+
+    /**
+     * get separator
+     * @return the separator
+     */
+    public String getSeparator() {
+        return separator;
+    }
+
+    /**
+     * set separator
+     * @param separator the separator to set
+     */
+    public void setSeparator(String separator) {
+        this.separator = separator;
+    }
+
+    /**
+     * get contentFieldNames
+     * @return the contentFieldNames
+     */
+    public String getContentFieldNames() {
+        return contentFieldNames;
+    }
+
+    /**
+     * set contentFieldNames
+     * @param contentFieldNames the contentFieldNames to set
+     */
+    public void setContentFieldNames(String contentFieldNames) {
+        this.contentFieldNames = contentFieldNames;
+    }
+
+    /**
+     * get contentOffset
+     * @return the contentOffset
+     */
+    public int getContentOffset() {
+        return contentOffset;
+    }
+
+    /**
+     * set contentOffset
+     * @param contentOffset the contentOffset to set
+     */
+    public void setContentOffset(int contentOffset) {
+        this.contentOffset = contentOffset;
+    }
+
+    /**
+     * get tableName
+     * @return the tableName
+     */
+    public String getTableName() {
+        return tableName;
+    }
+
+    /**
+     * set tableName
+     * @param tableName the tableName to set
+     */
+    public void setTableName(String tableName) {
+        this.tableName = tableName;
+    }
+
+    /**
+     * get dbFieldNames
+     * @return the dbFieldNames
+     */
+    public String getDbFieldNames() {
+        return dbFieldNames;
+    }
+
+    /**
+     * set dbFieldNames
+     * @param dbFieldNames the dbFieldNames to set
+     */
+    public void setDbFieldNames(String dbFieldNames) {
+        this.dbFieldNames = dbFieldNames;
+    }
+
+    /**
+     * get contentFieldList
+     * @return the contentFieldList
+     */
+    public List<String> getContentFieldList() {
+        return contentFieldList;
+    }
+
+    /**
+     * set contentFieldList
+     * @param contentFieldList the contentFieldList to set
+     */
+    public void setContentFieldList(List<String> contentFieldList) {
+        this.contentFieldList = contentFieldList;
+    }
+
+    /**
+     * get dbFieldList
+     * @return the dbFieldList
+     */
+    public List<Pair<String, Integer>> getDbFieldList() {
+        return dbFieldList;
+    }
+
+    /**
+     * set dbFieldList
+     * @param dbFieldList the dbFieldList to set
+     */
+    public void setDbFieldList(List<Pair<String, Integer>> dbFieldList) {
+        this.dbFieldList = dbFieldList;
+    }
+
+    /**
+     * get insertSql
+     * @return the insertSql
+     */
+    public String getInsertSql() {
+        return insertSql;
+    }
+
+    /**
+     * set insertSql
+     * @param insertSql the insertSql to set
+     */
+    public void setInsertSql(String insertSql) {
+        this.insertSql = insertSql;
+    }
+
+}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/clickhouse/ClickHouseSink.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/clickhouse/ClickHouseSink.java
new file mode 100644
index 000000000..5ae235549
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/clickhouse/ClickHouseSink.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.clickhouse;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.sink.AbstractSink;
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.dispatch.DispatchManager;
+import org.apache.inlong.sort.standalone.dispatch.DispatchProfile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * ClickHouseSink
+ */
+public class ClickHouseSink extends AbstractSink implements Configurable {
+
+    public static final Logger LOG = LoggerFactory.getLogger(ClickHouseSink.class);
+
+    private Context parentContext;
+    private ClickHouseSinkContext context;
+    private DispatchManager dispatchManager;
+    private LinkedBlockingQueue<DispatchProfile> dispatchQueue = new LinkedBlockingQueue<>();
+    // workers
+    private List<ClickHouseChannelWorker> workers = new ArrayList<>();
+    // schedule
+    private ScheduledExecutorService scheduledPool;
+
+    /**
+     * start
+     */
+    @Override
+    public void start() {
+        super.start();
+        try {
+            this.context = new ClickHouseSinkContext(getName(), parentContext, getChannel(), dispatchQueue);
+            this.context.start();
+            for (int i = 0; i < context.getMaxThreads(); i++) {
+                ClickHouseChannelWorker worker = new ClickHouseChannelWorker(context, i);
+                this.workers.add(worker);
+                worker.start();
+            }
+            this.dispatchManager = new DispatchManager(parentContext, dispatchQueue);
+            this.scheduledPool = Executors.newScheduledThreadPool(1);
+            // dispatch
+            this.scheduledPool.scheduleWithFixedDelay(new Runnable() {
+
+                public void run() {
+                    dispatchManager.setNeedOutputOvertimeData();
+                }
+            }, this.dispatchManager.getDispatchTimeout(), this.dispatchManager.getDispatchTimeout(),
+                    TimeUnit.MILLISECONDS);
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * stop
+     */
+    @Override
+    public void stop() {
+        super.stop();
+        try {
+            for (ClickHouseChannelWorker worker : this.workers) {
+                worker.close();
+            }
+            this.context.close();
+            this.scheduledPool.shutdown();
+            super.stop();
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * configure
+     * 
+     * @param context
+     */
+    @Override
+    public void configure(Context context) {
+        LOG.info("start to configure:{}, context:{}.", this.getName(), context.toString());
+        this.parentContext = context;
+    }
+
+    /**
+     * process
+     * 
+     * @return                        Status
+     * @throws EventDeliveryException
+     */
+    @Override
+    public Status process() throws EventDeliveryException {
+        this.dispatchManager.outputOvertimeData();
+        Channel channel = getChannel();
+        Transaction tx = channel.getTransaction();
+        tx.begin();
+        try {
+            Event event = channel.take();
+            if (event == null) {
+                tx.commit();
+                return Status.BACKOFF;
+            }
+            if (!(event instanceof ProfileEvent)) {
+                tx.commit();
+                this.context.addSendFailMetric("event is not ProfileEvent");
+                return Status.READY;
+            }
+            //
+            ProfileEvent profileEvent = (ProfileEvent) event;
+            this.dispatchManager.addEvent(profileEvent);
+            tx.commit();
+            return Status.READY;
+        } catch (Throwable t) {
+            LOG.error("Process event failed!" + t.getMessage(), t);
+            try {
+                tx.rollback();
+            } catch (Throwable e) {
+                LOG.error("Channel take transaction rollback exception:" + e.getMessage(), e);
+            }
+            return Status.BACKOFF;
+        } finally {
+            tx.close();
+        }
+    }
+
+}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/clickhouse/ClickHouseSinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/clickhouse/ClickHouseSinkContext.java
new file mode 100644
index 000000000..1f33ff347
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/clickhouse/ClickHouseSinkContext.java
@@ -0,0 +1,438 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.clickhouse;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.commons.lang.ClassUtils;
+import org.apache.commons.math3.util.Pair;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
+import org.apache.inlong.common.util.NetworkUtils;
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
+import org.apache.inlong.sort.standalone.config.pojo.InlongId;
+import org.apache.inlong.sort.standalone.dispatch.DispatchProfile;
+import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
+import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
+import org.apache.inlong.sort.standalone.sink.SinkContext;
+import org.apache.inlong.sort.standalone.utils.Constants;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * 
+ * ClickHouseSinkContext
+ */
+public class ClickHouseSinkContext extends SinkContext {
+
+    public static final Logger LOG = InlongLoggerFactory.getLogger(ClickHouseSinkContext.class);
+    public static final String KEY_NODE_ID = "nodeId";
+    public static final String KEY_JDBC_DRIVER = "jdbcDriver";
+    public static final String DEFAULT_JDBC_DRIVER = "ru.yandex.clickhouse.ClickHouseDriver";
+    public static final String KEY_JDBC_URL = "jdbcUrl";
+    public static final String KEY_JDBC_USERNAME = "jdbcUsername";
+    public static final String KEY_JDBC_PASSWORD = "jdbcPassword";
+    public static final String KEY_EVENT_HANDLER = "clickHouseEventHandler";
+
+    private Context parentContext;
+    private String nodeId;
+    private Map<String, ClickHouseIdConfig> idConfigMap = new ConcurrentHashMap<>();
+    private final LinkedBlockingQueue<DispatchProfile> dispatchQueue;
+    // jdbc config
+    private String jdbcDriver;
+    private String jdbcUrl;
+    private String jdbcUsername;
+    private String jdbcPassword;
+
+    /**
+     * Constructor
+     * 
+     * @param sinkName
+     * @param context
+     * @param channel
+     */
+    public ClickHouseSinkContext(String sinkName, Context context, Channel channel,
+            LinkedBlockingQueue<DispatchProfile> dispatchQueue) {
+        super(sinkName, context, channel);
+        this.parentContext = context;
+        this.dispatchQueue = dispatchQueue;
+        this.nodeId = CommonPropertiesHolder.getString(KEY_NODE_ID, NetworkUtils.getLocalIp());
+    }
+
+    /**
+     * reload
+     */
+    public void reload() {
+        try {
+            SortTaskConfig newSortTaskConfig = SortClusterConfigHolder.getTaskConfig(taskName);
+            LOG.info("start to get SortTaskConfig:taskName:{}:config:{}", taskName,
+                    new ObjectMapper().writeValueAsString(newSortTaskConfig));
+            if (this.sortTaskConfig != null && this.sortTaskConfig.equals(newSortTaskConfig)) {
+                return;
+            }
+            // parse the config of id and topic
+            Map<String, ClickHouseIdConfig> newIdConfigMap = new ConcurrentHashMap<>();
+            List<Map<String, String>> idList = newSortTaskConfig.getIdParams();
+            ObjectMapper objectMapper = new ObjectMapper();
+            for (Map<String, String> idParam : idList) {
+                String inlongGroupId = idParam.get(Constants.INLONG_GROUP_ID);
+                String inlongStreamId = idParam.get(Constants.INLONG_STREAM_ID);
+                String uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
+                String jsonIdConfig = objectMapper.writeValueAsString(idParam);
+                ClickHouseIdConfig idConfig = objectMapper.readValue(jsonIdConfig, ClickHouseIdConfig.class);
+                newIdConfigMap.put(uid, idConfig);
+            }
+            // jdbc config
+            Context currentContext = new Context(this.parentContext.getParameters());
+            currentContext.putAll(newSortTaskConfig.getSinkParams());
+            this.jdbcDriver = currentContext.getString(KEY_JDBC_DRIVER, DEFAULT_JDBC_DRIVER);
+            this.jdbcUrl = currentContext.getString(KEY_JDBC_URL);
+            this.jdbcUsername = currentContext.getString(KEY_JDBC_USERNAME);
+            this.jdbcPassword = currentContext.getString(KEY_JDBC_PASSWORD);
+            Class.forName(this.jdbcDriver);
+            // load DB field
+            this.initIdConfig(newIdConfigMap);
+            // change current config
+            this.sortTaskConfig = newSortTaskConfig;
+            this.idConfigMap = newIdConfigMap;
+            LOG.info("end to get SortTaskConfig,taskName:{},newIdConfigMap:{},currentContext:{}", taskName,
+                    new ObjectMapper().writeValueAsString(newIdConfigMap), currentContext);
+        } catch (Throwable e) {
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * initIdConfig
+     * @param newIdConfigMap
+     * @throws SQLException
+     */
+    private void initIdConfig(Map<String, ClickHouseIdConfig> newIdConfigMap) throws SQLException {
+        try (Connection conn = DriverManager.getConnection(jdbcUrl, jdbcUsername, jdbcPassword);
+                Statement stat = conn.createStatement();) {
+            for (Entry<String, ClickHouseIdConfig> entry : newIdConfigMap.entrySet()) {
+                // parse field list
+                ClickHouseIdConfig idConfig = entry.getValue();
+                idConfig.setContentFieldList(ClickHouseIdConfig.parseFieldNames(idConfig.getContentFieldNames()));
+                // load db field type
+                Map<String, Integer> fullTypeMap = new HashMap<>();
+                try (ResultSet rs = stat.executeQuery("select * from " + idConfig.getTableName())) {
+                    ResultSetMetaData meta = rs.getMetaData();
+                    int columnCount = meta.getColumnCount();
+                    for (int i = 1; i <= columnCount; i++) {
+                        fullTypeMap.put(meta.getColumnName(i), meta.getColumnType(i));
+                    }
+                } catch (Exception e) {
+                    LOG.error("Can not get metadata,group:{},stream:{},error:{}", idConfig.getInlongGroupId(),
+                            idConfig.getInlongStreamId(), e.getMessage(), e);
+                }
+                // parse db field type
+                List<String> dbFieldNameList = ClickHouseIdConfig.parseFieldNames(idConfig.getDbFieldNames());
+                List<Pair<String, Integer>> dbFieldList = new ArrayList<>(dbFieldNameList.size());
+                dbFieldNameList.forEach((fieldName) -> {
+                    dbFieldList.add(new Pair<>(fieldName, fullTypeMap.getOrDefault(fieldName, Types.VARCHAR)));
+                });
+                idConfig.setDbFieldList(dbFieldList);
+                // load db sql
+                StringBuilder insertSql = new StringBuilder();
+                insertSql.append("insert into ").append(idConfig.getTableName()).append(" (");
+                idConfig.getDbFieldList().forEach((field) -> {
+                    insertSql.append(field.getKey()).append(',');
+                });
+                insertSql.deleteCharAt(insertSql.length() - 1);
+                insertSql.append(") values (");
+                idConfig.getDbFieldList().forEach((field) -> {
+                    insertSql.append("?,");
+                });
+                insertSql.deleteCharAt(insertSql.length() - 1);
+                insertSql.append(")");
+                idConfig.setInsertSql(insertSql.toString());
+            }
+        }
+    }
+
+    /**
+     * addSendMetric
+     * 
+     * @param currentRecord
+     */
+    public void addSendMetric(DispatchProfile currentRecord) {
+        Map<String, String> dimensions = new HashMap<>();
+        dimensions.put(SortMetricItem.KEY_CLUSTER_ID, this.getClusterId());
+        dimensions.put(SortMetricItem.KEY_TASK_NAME, this.getTaskName());
+        dimensions.put(SortMetricItem.KEY_SOURCE_ID, "-");
+        dimensions.put(SortMetricItem.KEY_SOURCE_DATA_ID, "-");
+        dimensions.put(SortMetricItem.KEY_SINK_ID, this.getSinkName());
+        dimensions.put(SortMetricItem.KEY_SINK_DATA_ID, "-");
+        dimensions.put(SortMetricItem.KEY_INLONG_GROUP_ID, currentRecord.getInlongGroupId());
+        dimensions.put(SortMetricItem.KEY_INLONG_STREAM_ID, currentRecord.getInlongStreamId());
+        // msgTime
+        long msgTime = currentRecord.getDispatchTime();
+        long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
+        dimensions.put(SortMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
+        // find metric
+        SortMetricItem metricItem = this.getMetricItemSet().findMetricItem(dimensions);
+        metricItem.sendCount.addAndGet(currentRecord.getCount());
+        metricItem.sendSize.addAndGet(currentRecord.getSize());
+    }
+
+    /**
+     * addReadFailMetric
+     * @param errorMsg
+     * @param currentRecord
+     */
+    public void addSendFailMetric(String errorMsg, DispatchProfile currentRecord) {
+        Map<String, String> dimensions = new HashMap<>();
+        dimensions.put(SortMetricItem.KEY_CLUSTER_ID, this.getClusterId());
+        dimensions.put(SortMetricItem.KEY_TASK_NAME, this.getTaskName());
+        dimensions.put(SortMetricItem.KEY_SOURCE_ID, "-");
+        dimensions.put(SortMetricItem.KEY_SOURCE_DATA_ID, "-");
+        dimensions.put(SortMetricItem.KEY_SINK_ID, this.getSinkName());
+        dimensions.put(SortMetricItem.KEY_SINK_DATA_ID, errorMsg);
+        dimensions.put(SortMetricItem.KEY_INLONG_GROUP_ID, currentRecord.getInlongGroupId());
+        dimensions.put(SortMetricItem.KEY_INLONG_STREAM_ID, currentRecord.getInlongStreamId());
+        // msgTime
+        long msgTime = System.currentTimeMillis();
+        long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
+        dimensions.put(SortMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
+        // find metric
+        SortMetricItem metricItem = this.getMetricItemSet().findMetricItem(dimensions);
+        metricItem.readFailCount.addAndGet(currentRecord.getCount());
+        metricItem.readFailSize.addAndGet(currentRecord.getSize());
+    }
+
+    /**
+     * addReadFailMetric
+     * @param errorMsg
+     * @param event
+     */
+    public void addSendFailMetric(String errorMsg, ProfileEvent event) {
+        Map<String, String> dimensions = new HashMap<>();
+        dimensions.put(SortMetricItem.KEY_CLUSTER_ID, this.getClusterId());
+        dimensions.put(SortMetricItem.KEY_TASK_NAME, this.getTaskName());
+        dimensions.put(SortMetricItem.KEY_SOURCE_ID, "-");
+        dimensions.put(SortMetricItem.KEY_SOURCE_DATA_ID, "-");
+        dimensions.put(SortMetricItem.KEY_SINK_ID, this.getSinkName());
+        dimensions.put(SortMetricItem.KEY_SINK_DATA_ID, errorMsg);
+        dimensions.put(SortMetricItem.KEY_INLONG_GROUP_ID, event.getInlongGroupId());
+        dimensions.put(SortMetricItem.KEY_INLONG_STREAM_ID, event.getInlongStreamId());
+        // msgTime
+        long msgTime = System.currentTimeMillis();
+        long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
+        dimensions.put(SortMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
+        // find metric
+        SortMetricItem metricItem = this.getMetricItemSet().findMetricItem(dimensions);
+        metricItem.readFailCount.incrementAndGet();
+        metricItem.readFailSize.addAndGet(event.getBody().length);
+    }
+
+    /**
+     * addReadFailMetric
+     */
+    public void addSendFailMetric(String errorMsg) {
+        Map<String, String> dimensions = new HashMap<>();
+        dimensions.put(SortMetricItem.KEY_CLUSTER_ID, this.getClusterId());
+        dimensions.put(SortMetricItem.KEY_TASK_NAME, this.getTaskName());
+        dimensions.put(SortMetricItem.KEY_SOURCE_ID, "-");
+        dimensions.put(SortMetricItem.KEY_SOURCE_DATA_ID, "-");
+        dimensions.put(SortMetricItem.KEY_SINK_ID, this.getSinkName());
+        dimensions.put(SortMetricItem.KEY_SINK_DATA_ID, errorMsg);
+        dimensions.put(SortMetricItem.KEY_INLONG_GROUP_ID, "-");
+        dimensions.put(SortMetricItem.KEY_INLONG_STREAM_ID, "-");
+        // msgTime
+        long msgTime = System.currentTimeMillis();
+        long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
+        dimensions.put(SortMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
+        // find metric
+        SortMetricItem metricItem = this.getMetricItemSet().findMetricItem(dimensions);
+        metricItem.readFailCount.incrementAndGet();
+    }
+
+    /**
+     * addSendSuccessMetric
+     * 
+     * @param currentRecord
+     * @param sendTime
+     */
+    public void addSendSuccessMetric(DispatchProfile currentRecord, long sendTime) {
+        Map<String, String> dimensions = new HashMap<>();
+        dimensions.put(SortMetricItem.KEY_CLUSTER_ID, this.getClusterId());
+        dimensions.put(SortMetricItem.KEY_TASK_NAME, this.getTaskName());
+        dimensions.put(SortMetricItem.KEY_SOURCE_ID, "-");
+        dimensions.put(SortMetricItem.KEY_SOURCE_DATA_ID, "-");
+        dimensions.put(SortMetricItem.KEY_SINK_ID, this.getSinkName());
+        dimensions.put(SortMetricItem.KEY_SINK_DATA_ID, "-");
+        dimensions.put(SortMetricItem.KEY_INLONG_GROUP_ID, currentRecord.getInlongGroupId());
+        dimensions.put(SortMetricItem.KEY_INLONG_STREAM_ID, currentRecord.getInlongStreamId());
+        long currentTime = System.currentTimeMillis();
+        for (ProfileEvent event : currentRecord.getEvents()) {
+            long msgTime = event.getRawLogTime();
+            long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
+            dimensions.put(SortMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
+            SortMetricItem metricItem = this.getMetricItemSet().findMetricItem(dimensions);
+            metricItem.sendSuccessCount.incrementAndGet();
+            metricItem.sendSuccessSize.addAndGet(event.getBody().length);
+            long sinkDuration = currentTime - sendTime;
+            long nodeDuration = currentTime - event.getFetchTime();
+            long wholeDuration = currentTime - msgTime;
+            metricItem.sinkDuration.addAndGet(sinkDuration);
+            metricItem.nodeDuration.addAndGet(nodeDuration);
+            metricItem.wholeDuration.addAndGet(wholeDuration);
+            AuditUtils.add(AuditUtils.AUDIT_ID_SEND_SUCCESS, event);
+        }
+    }
+
+    /**
+     * get nodeId
+     * @return the nodeId
+     */
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    /**
+     * set nodeId
+     * @param nodeId the nodeId to set
+     */
+    public void setNodeId(String nodeId) {
+        this.nodeId = nodeId;
+    }
+
+    /**
+     * get jdbcDriver
+     * @return the jdbcDriver
+     */
+    public String getJdbcDriver() {
+        return jdbcDriver;
+    }
+
+    /**
+     * set jdbcDriver
+     * @param jdbcDriver the jdbcDriver to set
+     */
+    public void setJdbcDriver(String jdbcDriver) {
+        this.jdbcDriver = jdbcDriver;
+    }
+
+    /**
+     * get jdbcUrl
+     * @return the jdbcUrl
+     */
+    public String getJdbcUrl() {
+        return jdbcUrl;
+    }
+
+    /**
+     * set jdbcUrl
+     * @param jdbcUrl the jdbcUrl to set
+     */
+    public void setJdbcUrl(String jdbcUrl) {
+        this.jdbcUrl = jdbcUrl;
+    }
+
+    /**
+     * get jdbcUsername
+     * @return the jdbcUsername
+     */
+    public String getJdbcUsername() {
+        return jdbcUsername;
+    }
+
+    /**
+     * set jdbcUsername
+     * @param jdbcUsername the jdbcUsername to set
+     */
+    public void setJdbcUsername(String jdbcUsername) {
+        this.jdbcUsername = jdbcUsername;
+    }
+
+    /**
+     * get jdbcPassword
+     * @return the jdbcPassword
+     */
+    public String getJdbcPassword() {
+        return jdbcPassword;
+    }
+
+    /**
+     * set jdbcPassword
+     * @param jdbcPassword the jdbcPassword to set
+     */
+    public void setJdbcPassword(String jdbcPassword) {
+        this.jdbcPassword = jdbcPassword;
+    }
+
+    /**
+     * get dispatchQueue
+     * @return the dispatchQueue
+     */
+    public LinkedBlockingQueue<DispatchProfile> getDispatchQueue() {
+        return dispatchQueue;
+    }
+
+    /**
+     * getIdConfig
+     * 
+     * @param  uid
+     * @return
+     */
+    public ClickHouseIdConfig getIdConfig(String uid) {
+        return this.idConfigMap.get(uid);
+    }
+
+    /**
+     * create createEventHandler
+     * 
+     * @return the IEventHandler
+     */
+    public IEventHandler createEventHandler() {
+        // IEventHandler
+        String eventHandlerClass = CommonPropertiesHolder.getString(KEY_EVENT_HANDLER,
+                DefaultEventHandler.class.getName());
+        try {
+            Class<?> handlerClass = ClassUtils.getClass(eventHandlerClass);
+            Object handlerObject = handlerClass.getDeclaredConstructor().newInstance();
+            if (handlerObject instanceof IEventHandler) {
+                IEventHandler handler = (IEventHandler) handlerObject;
+                return handler;
+            }
+        } catch (Throwable t) {
+            LOG.error("Fail to init IEventHandler,handlerClass:{},error:{}",
+                    eventHandlerClass, t.getMessage(), t);
+        }
+        return null;
+    }
+}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/clickhouse/DefaultEventHandler.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/clickhouse/DefaultEventHandler.java
new file mode 100644
index 000000000..aa8a08b76
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/clickhouse/DefaultEventHandler.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.clickhouse;
+
+import org.apache.commons.math3.util.Pair;
+import org.apache.inlong.sdk.commons.protocol.EventConstants;
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.utils.UnescapeHelper;
+import org.apache.pulsar.shade.org.apache.commons.lang3.math.NumberUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.text.SimpleDateFormat;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * DefaultEventHandler
+ */
+public class DefaultEventHandler implements IEventHandler {
+
+    public static final Logger LOG = LoggerFactory.getLogger(DefaultEventHandler.class);
+
+    public static final String KEY_EXTINFO = "extinfo";
+
+    private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+    /**
+     * parse
+     * @param idConfig
+     * @param event
+     * @return
+     */
+    @Override
+    public Map<String, String> parse(ClickHouseIdConfig idConfig, ProfileEvent event) {
+        final Map<String, String> resultMap = new HashMap<>();
+        // parse fields
+        String delimeter = idConfig.getSeparator();
+        char cDelimeter = delimeter.charAt(0);
+        String strContext = null;
+        // for tab separator
+        byte[] bodyBytes = event.getBody();
+        int msgLength = event.getBody().length;
+        int contentOffset = idConfig.getContentOffset();
+        if (contentOffset > 0 && msgLength >= 1) {
+            strContext = new String(bodyBytes, contentOffset, msgLength - contentOffset, Charset.defaultCharset());
+        } else {
+            strContext = new String(bodyBytes, Charset.defaultCharset());
+        }
+        // unescape
+        List<String> columnValues = UnescapeHelper.toFiledList(strContext, cDelimeter);
+        // column size
+        List<String> contentFieldList = idConfig.getContentFieldList();
+        int matchSize = Math.min(contentFieldList.size(), columnValues.size());
+        for (int i = 0; i < matchSize; i++) {
+            resultMap.put(contentFieldList.get(i), columnValues.get(i));
+        }
+
+        // ftime
+        String ftime = dateFormat.format(new Date(event.getRawLogTime()));
+        resultMap.put("ftime", ftime);
+        // extinfo
+        String extinfo = getExtInfo(event);
+        resultMap.put("extinfo", extinfo);
+        return resultMap;
+    }
+
+    /**
+     * getExtInfo
+     * 
+     * @param  event
+     * @return
+     */
+    public static String getExtInfo(ProfileEvent event) {
+        String extinfoValue = event.getHeaders().get(KEY_EXTINFO);
+        if (extinfoValue != null) {
+            return KEY_EXTINFO + "=" + extinfoValue;
+        }
+        extinfoValue = KEY_EXTINFO + "=" + event.getHeaders().get(EventConstants.HEADER_KEY_SOURCE_IP);
+        return extinfoValue;
+    }
+
+    /**
+     * setValue
+     * @param idConfig
+     * @param columnValueMap
+     * @param pstat
+     * @throws SQLException 
+     */
+    public void setValue(ClickHouseIdConfig idConfig, Map<String, String> columnValueMap, PreparedStatement pstat)
+            throws SQLException {
+        List<Pair<String, Integer>> dbFieldList = idConfig.getDbFieldList();
+        for (int i = 1; i <= dbFieldList.size(); i++) {
+            Pair<String, Integer> pair = dbFieldList.get(i - 1);
+            String fieldValue = columnValueMap.getOrDefault(pair.getKey(), "");
+            int fieldType = pair.getValue();
+            switch (fieldType) {
+                // Int8 - [-128 : 127]
+                case Types.TINYINT :
+                    // TINYINT = -6;
+                    pstat.setByte(i, NumberUtils.toByte(fieldValue, (byte) 0));
+                    break;
+                // Int16 - [-32768 : 32767]
+                case Types.SMALLINT :
+                    // SMALLINT= 5;
+                    pstat.setShort(i, NumberUtils.toShort(fieldValue, (short) 0));
+                    break;
+                // Int32 - [-2147483648 : 2147483647]
+                case Types.INTEGER :
+                    // INTEGER = 4;
+                    pstat.setInt(i, NumberUtils.toInt(fieldValue, 0));
+                    break;
+                // Int64 - [-9223372036854775808 : 9223372036854775807]
+                // UInt8 - [0 : 255]
+                // UInt16 - [0 : 65535]
+                // UInt32 - [0 : 4294967295]
+                // UInt64 - [0 : 18446744073709551615]
+                case Types.BIGINT :
+                    // BIGINT = -5;
+                    pstat.setLong(i, NumberUtils.toLong(fieldValue, 0));
+                    break;
+                // Float32 - float
+                case Types.FLOAT :
+                    // FLOAT = 6;
+                    pstat.setFloat(i, NumberUtils.toFloat(fieldValue, 0));
+                    break;
+                // Float64 – double
+                case Types.DOUBLE :
+                    // DOUBLE = 8;
+                    pstat.setDouble(i, NumberUtils.toDouble(fieldValue, 0));
+                    break;
+                // Decimal32(s)
+                // Decimal64(s)
+                // Decimal128(s)
+                case Types.NUMERIC :
+                    // NUMERIC = 2;
+                    pstat.setBigDecimal(i, NumberUtils.toScaledBigDecimal(fieldValue));
+                    break;
+                // String
+                // FixedString(N)
+                // Enum8
+                // Enum16
+                case Types.VARCHAR :
+                    // VARCHAR = 12;
+                case Types.LONGVARCHAR :
+                    // LONGVARCHAR = -1;
+                    pstat.setString(i, fieldValue);
+                    break;
+                // Date
+                case Types.DATE :
+                    // DATE= 91;
+                    pstat.setDate(i, this.parseDate(fieldValue));
+                    break;
+                // Datetime
+                // Datetime64
+                case Types.TIMESTAMP :
+                    // TIMESTAMP = 93;
+                    pstat.setTimestamp(i, new Timestamp(this.parseDate(fieldValue).getTime()));
+                    break;
+
+                case Types.TIME :
+                    // TIME= 92;
+                    pstat.setTime(i, new Time(this.parseDate(fieldValue).getTime()));
+                    break;
+                default :
+                    pstat.setString(i, fieldValue);
+                    break;
+            }
+        }
+    }
+
+    /**
+     * parseDate
+     * @param fieldValue
+     * @return
+     */
+    private Date parseDate(String fieldValue) {
+        try {
+            return new Date(dateFormat.parse(fieldValue).getTime());
+        } catch (Exception e) {
+            return new Date(0);
+        }
+    }
+}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/clickhouse/IEventHandler.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/clickhouse/IEventHandler.java
new file mode 100644
index 000000000..11b2aae8a
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/clickhouse/IEventHandler.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.clickhouse;
+
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Map;
+
+/**
+ * 
+ * IEventHandler
+ */
+public interface IEventHandler {
+
+    /**
+     * parse
+     * 
+     * @param  idConfig
+     * @param  event
+     * @return
+     */
+    Map<String, String> parse(ClickHouseIdConfig idConfig, ProfileEvent event);
+
+    /**
+     * setValue
+     * @param idConfig
+     * @param columnValueMap
+     * @param pstat
+     * @throws SQLException
+     */
+    void setValue(ClickHouseIdConfig idConfig, Map<String, String> columnValueMap, PreparedStatement pstat)
+            throws SQLException;
+}