You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/07/01 06:19:51 UTC
[inlong] branch master updated: [INLONG-4726][SortStandalone] Support ClickHouse sink (#4728)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new bf8eb3a35 [INLONG-4726][SortStandalone] Support ClickHouse sink (#4728)
bf8eb3a35 is described below
commit bf8eb3a3514b927d1030291a7f30c4fa922ef2f2
Author: 卢春亮 <94...@qq.com>
AuthorDate: Fri Jul 1 14:19:46 2022 +0800
[INLONG-4726][SortStandalone] Support ClickHouse sink (#4728)
---
.../sort-standalone-source/pom.xml | 4 +
.../sink/clickhouse/ClickHouseChannelWorker.java | 158 ++++++++
.../sink/clickhouse/ClickHouseIdConfig.java | 218 ++++++++++
.../standalone/sink/clickhouse/ClickHouseSink.java | 155 ++++++++
.../sink/clickhouse/ClickHouseSinkContext.java | 438 +++++++++++++++++++++
.../sink/clickhouse/DefaultEventHandler.java | 207 ++++++++++
.../standalone/sink/clickhouse/IEventHandler.java | 50 +++
7 files changed, 1230 insertions(+)
diff --git a/inlong-sort-standalone/sort-standalone-source/pom.xml b/inlong-sort-standalone/sort-standalone-source/pom.xml
index 2ab041050..569fba2ad 100644
--- a/inlong-sort-standalone/sort-standalone-source/pom.xml
+++ b/inlong-sort-standalone/sort-standalone-source/pom.xml
@@ -47,5 +47,9 @@
<artifactId>audit-sdk</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>ru.yandex.clickhouse</groupId>
+ <artifactId>clickhouse-jdbc</artifactId>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/clickhouse/ClickHouseChannelWorker.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/clickhouse/ClickHouseChannelWorker.java
new file mode 100644
index 000000000..1d85a5fd0
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/clickhouse/ClickHouseChannelWorker.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.clickhouse;
+
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.dispatch.DispatchProfile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Map;
+
+/**
+ * ClickHouseChannelWorker
+ */
+public class ClickHouseChannelWorker extends Thread {
+
+ public static final Logger LOG = LoggerFactory.getLogger(ClickHouseChannelWorker.class);
+
+ private final ClickHouseSinkContext context;
+ private final int workerIndex;
+ private LifecycleState status;
+ private IEventHandler handler;
+ private Connection conn;
+
+ /**
+ * Constructor
+ *
+ * @param context
+ * @param workerIndex
+ */
+ public ClickHouseChannelWorker(ClickHouseSinkContext context, int workerIndex) {
+ this.context = context;
+ this.workerIndex = workerIndex;
+ this.status = LifecycleState.IDLE;
+ this.handler = this.context.createEventHandler();
+ }
+
+ /**
+ * run
+ */
+ @Override
+ public void run() {
+ status = LifecycleState.START;
+ LOG.info("start to ClickHouseChannelWorker:{},status:{},index:{}", context.getTaskName(), status, workerIndex);
+ while (status == LifecycleState.START) {
+ try {
+ this.doRun();
+ } catch (Throwable t) {
+ LOG.error(t.getMessage(), t);
+ }
+ }
+ }
+
+ /**
+ * doRun
+ */
+ public void doRun() {
+ DispatchProfile currentRecord = context.getDispatchQueue().poll();
+ try {
+ // prepare
+ if (currentRecord == null) {
+ this.sleepOneInterval();
+ return;
+ }
+ // check config
+ ClickHouseIdConfig idConfig = context.getIdConfig(currentRecord.getUid());
+ if (idConfig == null) {
+ context.addSendFailMetric("idConfig is null", currentRecord);
+ currentRecord.ack();
+ return;
+ }
+ // check sql
+ String insertSql = idConfig.getInsertSql();
+ if (insertSql == null) {
+ context.addSendFailMetric("sql is null", currentRecord);
+ currentRecord.ack();
+ return;
+ }
+ // execute sql
+ if (this.conn == null) {
+ this.reconnect();
+ }
+ try (PreparedStatement pstat = this.conn.prepareStatement(insertSql)) {
+ for (ProfileEvent event : currentRecord.getEvents()) {
+ Map<String, String> columnValueMap = this.handler.parse(idConfig, event);
+ this.handler.setValue(idConfig, columnValueMap, pstat);
+ pstat.addBatch();
+ }
+ pstat.executeBatch();
+ this.conn.commit();
+ } catch (Exception e) {
+ this.reconnect();
+ }
+ } catch (Throwable e) {
+ LOG.error(e.getMessage(), e);
+ if (currentRecord != null) {
+ context.getDispatchQueue().add(currentRecord);
+ }
+ this.sleepOneInterval();
+ }
+ }
+
+ /**
+ * close
+ */
+ public void close() {
+ this.status = LifecycleState.STOP;
+ }
+
+ /**
+ * sleepOneInterval
+ */
+ private void sleepOneInterval() {
+ try {
+ Thread.sleep(context.getProcessInterval());
+ } catch (InterruptedException e1) {
+ LOG.error(e1.getMessage(), e1);
+ }
+ }
+
+ /**
+ * reconnect
+ * @throws SQLException
+ */
+ private void reconnect() throws SQLException {
+ if (this.conn != null) {
+ try {
+ this.conn.close();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ this.conn = null;
+ }
+ this.conn = DriverManager.getConnection(context.getJdbcUrl(), context.getJdbcUsername(),
+ context.getJdbcPassword());
+ this.conn.setAutoCommit(false);
+ }
+}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/clickhouse/ClickHouseIdConfig.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/clickhouse/ClickHouseIdConfig.java
new file mode 100644
index 000000000..bafac7bc2
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/clickhouse/ClickHouseIdConfig.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.clickhouse;
+
+import org.apache.commons.math3.util.Pair;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ *
+ * ClickHouseIdConfig
+ */
+public class ClickHouseIdConfig {
+
+ public static final String FIELD_NAME_SEPARATOR = ",";
+
+ private String inlongGroupId;
+ private String inlongStreamId;
+ private String separator = "|";
+ private String contentFieldNames;
+ private int contentOffset = 0;// except for boss + tab(1)
+ private String tableName;
+ private String dbFieldNames;
+ // parse
+ private List<String> contentFieldList;
+ private List<Pair<String, Integer>> dbFieldList;
+ private String insertSql;
+
+ /**
+ * parseFieldList
+ */
+ public static List<String> parseFieldNames(String fieldNames) {
+ List<String> fieldList = new ArrayList<>();
+ if (fieldNames != null) {
+ String[] fieldNameArray = fieldNames.split(FIELD_NAME_SEPARATOR);
+ fieldList.addAll(Arrays.asList(fieldNameArray));
+ }
+ return fieldList;
+ }
+
+ /**
+ * get inlongGroupId
+ * @return the inlongGroupId
+ */
+ public String getInlongGroupId() {
+ return inlongGroupId;
+ }
+
+ /**
+ * set inlongGroupId
+ * @param inlongGroupId the inlongGroupId to set
+ */
+ public void setInlongGroupId(String inlongGroupId) {
+ this.inlongGroupId = inlongGroupId;
+ }
+
+ /**
+ * get inlongStreamId
+ * @return the inlongStreamId
+ */
+ public String getInlongStreamId() {
+ return inlongStreamId;
+ }
+
+ /**
+ * set inlongStreamId
+ * @param inlongStreamId the inlongStreamId to set
+ */
+ public void setInlongStreamId(String inlongStreamId) {
+ this.inlongStreamId = inlongStreamId;
+ }
+
+ /**
+ * get separator
+ * @return the separator
+ */
+ public String getSeparator() {
+ return separator;
+ }
+
+ /**
+ * set separator
+ * @param separator the separator to set
+ */
+ public void setSeparator(String separator) {
+ this.separator = separator;
+ }
+
+ /**
+ * get contentFieldNames
+ * @return the contentFieldNames
+ */
+ public String getContentFieldNames() {
+ return contentFieldNames;
+ }
+
+ /**
+ * set contentFieldNames
+ * @param contentFieldNames the contentFieldNames to set
+ */
+ public void setContentFieldNames(String contentFieldNames) {
+ this.contentFieldNames = contentFieldNames;
+ }
+
+ /**
+ * get contentOffset
+ * @return the contentOffset
+ */
+ public int getContentOffset() {
+ return contentOffset;
+ }
+
+ /**
+ * set contentOffset
+ * @param contentOffset the contentOffset to set
+ */
+ public void setContentOffset(int contentOffset) {
+ this.contentOffset = contentOffset;
+ }
+
+ /**
+ * get tableName
+ * @return the tableName
+ */
+ public String getTableName() {
+ return tableName;
+ }
+
+ /**
+ * set tableName
+ * @param tableName the tableName to set
+ */
+ public void setTableName(String tableName) {
+ this.tableName = tableName;
+ }
+
+ /**
+ * get dbFieldNames
+ * @return the dbFieldNames
+ */
+ public String getDbFieldNames() {
+ return dbFieldNames;
+ }
+
+ /**
+ * set dbFieldNames
+ * @param dbFieldNames the dbFieldNames to set
+ */
+ public void setDbFieldNames(String dbFieldNames) {
+ this.dbFieldNames = dbFieldNames;
+ }
+
+ /**
+ * get contentFieldList
+ * @return the contentFieldList
+ */
+ public List<String> getContentFieldList() {
+ return contentFieldList;
+ }
+
+ /**
+ * set contentFieldList
+ * @param contentFieldList the contentFieldList to set
+ */
+ public void setContentFieldList(List<String> contentFieldList) {
+ this.contentFieldList = contentFieldList;
+ }
+
+ /**
+ * get dbFieldList
+ * @return the dbFieldList
+ */
+ public List<Pair<String, Integer>> getDbFieldList() {
+ return dbFieldList;
+ }
+
+ /**
+ * set dbFieldList
+ * @param dbFieldList the dbFieldList to set
+ */
+ public void setDbFieldList(List<Pair<String, Integer>> dbFieldList) {
+ this.dbFieldList = dbFieldList;
+ }
+
+ /**
+ * get insertSql
+ * @return the insertSql
+ */
+ public String getInsertSql() {
+ return insertSql;
+ }
+
+ /**
+ * set insertSql
+ * @param insertSql the insertSql to set
+ */
+ public void setInsertSql(String insertSql) {
+ this.insertSql = insertSql;
+ }
+
+}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/clickhouse/ClickHouseSink.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/clickhouse/ClickHouseSink.java
new file mode 100644
index 000000000..5ae235549
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/clickhouse/ClickHouseSink.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.clickhouse;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.sink.AbstractSink;
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.dispatch.DispatchManager;
+import org.apache.inlong.sort.standalone.dispatch.DispatchProfile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * ClickHouseSink
+ */
+public class ClickHouseSink extends AbstractSink implements Configurable {
+
+ public static final Logger LOG = LoggerFactory.getLogger(ClickHouseSink.class);
+
+ private Context parentContext;
+ private ClickHouseSinkContext context;
+ private DispatchManager dispatchManager;
+ private LinkedBlockingQueue<DispatchProfile> dispatchQueue = new LinkedBlockingQueue<>();
+ // workers
+ private List<ClickHouseChannelWorker> workers = new ArrayList<>();
+ // schedule
+ private ScheduledExecutorService scheduledPool;
+
+ /**
+ * start
+ */
+ @Override
+ public void start() {
+ super.start();
+ try {
+ this.context = new ClickHouseSinkContext(getName(), parentContext, getChannel(), dispatchQueue);
+ this.context.start();
+ for (int i = 0; i < context.getMaxThreads(); i++) {
+ ClickHouseChannelWorker worker = new ClickHouseChannelWorker(context, i);
+ this.workers.add(worker);
+ worker.start();
+ }
+ this.dispatchManager = new DispatchManager(parentContext, dispatchQueue);
+ this.scheduledPool = Executors.newScheduledThreadPool(1);
+ // dispatch
+ this.scheduledPool.scheduleWithFixedDelay(new Runnable() {
+
+ public void run() {
+ dispatchManager.setNeedOutputOvertimeData();
+ }
+ }, this.dispatchManager.getDispatchTimeout(), this.dispatchManager.getDispatchTimeout(),
+ TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * stop
+ */
+ @Override
+ public void stop() {
+ super.stop();
+ try {
+ for (ClickHouseChannelWorker worker : this.workers) {
+ worker.close();
+ }
+ this.context.close();
+ this.scheduledPool.shutdown();
+ super.stop();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * configure
+ *
+ * @param context
+ */
+ @Override
+ public void configure(Context context) {
+ LOG.info("start to configure:{}, context:{}.", this.getName(), context.toString());
+ this.parentContext = context;
+ }
+
+ /**
+ * process
+ *
+ * @return Status
+ * @throws EventDeliveryException
+ */
+ @Override
+ public Status process() throws EventDeliveryException {
+ this.dispatchManager.outputOvertimeData();
+ Channel channel = getChannel();
+ Transaction tx = channel.getTransaction();
+ tx.begin();
+ try {
+ Event event = channel.take();
+ if (event == null) {
+ tx.commit();
+ return Status.BACKOFF;
+ }
+ if (!(event instanceof ProfileEvent)) {
+ tx.commit();
+ this.context.addSendFailMetric("event is not ProfileEvent");
+ return Status.READY;
+ }
+ //
+ ProfileEvent profileEvent = (ProfileEvent) event;
+ this.dispatchManager.addEvent(profileEvent);
+ tx.commit();
+ return Status.READY;
+ } catch (Throwable t) {
+ LOG.error("Process event failed!" + t.getMessage(), t);
+ try {
+ tx.rollback();
+ } catch (Throwable e) {
+ LOG.error("Channel take transaction rollback exception:" + e.getMessage(), e);
+ }
+ return Status.BACKOFF;
+ } finally {
+ tx.close();
+ }
+ }
+
+}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/clickhouse/ClickHouseSinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/clickhouse/ClickHouseSinkContext.java
new file mode 100644
index 000000000..1f33ff347
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/clickhouse/ClickHouseSinkContext.java
@@ -0,0 +1,438 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.clickhouse;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.commons.lang.ClassUtils;
+import org.apache.commons.math3.util.Pair;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
+import org.apache.inlong.common.util.NetworkUtils;
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
+import org.apache.inlong.sort.standalone.config.pojo.InlongId;
+import org.apache.inlong.sort.standalone.dispatch.DispatchProfile;
+import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
+import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
+import org.apache.inlong.sort.standalone.sink.SinkContext;
+import org.apache.inlong.sort.standalone.utils.Constants;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ *
+ * ClickHouseSinkContext
+ */
+public class ClickHouseSinkContext extends SinkContext {
+
+ public static final Logger LOG = InlongLoggerFactory.getLogger(ClickHouseSinkContext.class);
+ public static final String KEY_NODE_ID = "nodeId";
+ public static final String KEY_JDBC_DRIVER = "jdbcDriver";
+ public static final String DEFAULT_JDBC_DRIVER = "ru.yandex.clickhouse.ClickHouseDriver";
+ public static final String KEY_JDBC_URL = "jdbcUrl";
+ public static final String KEY_JDBC_USERNAME = "jdbcUsername";
+ public static final String KEY_JDBC_PASSWORD = "jdbcPassword";
+ public static final String KEY_EVENT_HANDLER = "clickHouseEventHandler";
+
+ private Context parentContext;
+ private String nodeId;
+ private Map<String, ClickHouseIdConfig> idConfigMap = new ConcurrentHashMap<>();
+ private final LinkedBlockingQueue<DispatchProfile> dispatchQueue;
+ // jdbc config
+ private String jdbcDriver;
+ private String jdbcUrl;
+ private String jdbcUsername;
+ private String jdbcPassword;
+
+ /**
+ * Constructor
+ *
+ * @param sinkName
+ * @param context
+ * @param channel
+ */
+ public ClickHouseSinkContext(String sinkName, Context context, Channel channel,
+ LinkedBlockingQueue<DispatchProfile> dispatchQueue) {
+ super(sinkName, context, channel);
+ this.parentContext = context;
+ this.dispatchQueue = dispatchQueue;
+ this.nodeId = CommonPropertiesHolder.getString(KEY_NODE_ID, NetworkUtils.getLocalIp());
+ }
+
+ /**
+ * reload
+ */
+ public void reload() {
+ try {
+ SortTaskConfig newSortTaskConfig = SortClusterConfigHolder.getTaskConfig(taskName);
+ LOG.info("start to get SortTaskConfig:taskName:{}:config:{}", taskName,
+ new ObjectMapper().writeValueAsString(newSortTaskConfig));
+ if (this.sortTaskConfig != null && this.sortTaskConfig.equals(newSortTaskConfig)) {
+ return;
+ }
+ // parse the config of id and topic
+ Map<String, ClickHouseIdConfig> newIdConfigMap = new ConcurrentHashMap<>();
+ List<Map<String, String>> idList = newSortTaskConfig.getIdParams();
+ ObjectMapper objectMapper = new ObjectMapper();
+ for (Map<String, String> idParam : idList) {
+ String inlongGroupId = idParam.get(Constants.INLONG_GROUP_ID);
+ String inlongStreamId = idParam.get(Constants.INLONG_STREAM_ID);
+ String uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
+ String jsonIdConfig = objectMapper.writeValueAsString(idParam);
+ ClickHouseIdConfig idConfig = objectMapper.readValue(jsonIdConfig, ClickHouseIdConfig.class);
+ newIdConfigMap.put(uid, idConfig);
+ }
+ // jdbc config
+ Context currentContext = new Context(this.parentContext.getParameters());
+ currentContext.putAll(newSortTaskConfig.getSinkParams());
+ this.jdbcDriver = currentContext.getString(KEY_JDBC_DRIVER, DEFAULT_JDBC_DRIVER);
+ this.jdbcUrl = currentContext.getString(KEY_JDBC_URL);
+ this.jdbcUsername = currentContext.getString(KEY_JDBC_USERNAME);
+ this.jdbcPassword = currentContext.getString(KEY_JDBC_PASSWORD);
+ Class.forName(this.jdbcDriver);
+ // load DB field
+ this.initIdConfig(newIdConfigMap);
+ // change current config
+ this.sortTaskConfig = newSortTaskConfig;
+ this.idConfigMap = newIdConfigMap;
+ LOG.info("end to get SortTaskConfig,taskName:{},newIdConfigMap:{},currentContext:{}", taskName,
+ new ObjectMapper().writeValueAsString(newIdConfigMap), currentContext);
+ } catch (Throwable e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * initIdConfig
+ * @param newIdConfigMap
+ * @throws SQLException
+ */
+ private void initIdConfig(Map<String, ClickHouseIdConfig> newIdConfigMap) throws SQLException {
+ try (Connection conn = DriverManager.getConnection(jdbcUrl, jdbcUsername, jdbcPassword);
+ Statement stat = conn.createStatement();) {
+ for (Entry<String, ClickHouseIdConfig> entry : newIdConfigMap.entrySet()) {
+ // parse field list
+ ClickHouseIdConfig idConfig = entry.getValue();
+ idConfig.setContentFieldList(ClickHouseIdConfig.parseFieldNames(idConfig.getContentFieldNames()));
+ // load db field type
+ Map<String, Integer> fullTypeMap = new HashMap<>();
+ try (ResultSet rs = stat.executeQuery("select * from " + idConfig.getTableName())) {
+ ResultSetMetaData meta = rs.getMetaData();
+ int columnCount = meta.getColumnCount();
+ for (int i = 1; i <= columnCount; i++) {
+ fullTypeMap.put(meta.getColumnName(i), meta.getColumnType(i));
+ }
+ } catch (Exception e) {
+ LOG.error("Can not get metadata,group:{},stream:{},error:{}", idConfig.getInlongGroupId(),
+ idConfig.getInlongStreamId(), e.getMessage(), e);
+ }
+ // parse db field type
+ List<String> dbFieldNameList = ClickHouseIdConfig.parseFieldNames(idConfig.getDbFieldNames());
+ List<Pair<String, Integer>> dbFieldList = new ArrayList<>(dbFieldNameList.size());
+ dbFieldNameList.forEach((fieldName) -> {
+ dbFieldList.add(new Pair<>(fieldName, fullTypeMap.getOrDefault(fieldName, Types.VARCHAR)));
+ });
+ idConfig.setDbFieldList(dbFieldList);
+ // load db sql
+ StringBuilder insertSql = new StringBuilder();
+ insertSql.append("insert into ").append(idConfig.getTableName()).append(" (");
+ idConfig.getDbFieldList().forEach((field) -> {
+ insertSql.append(field.getKey()).append(',');
+ });
+ insertSql.deleteCharAt(insertSql.length() - 1);
+ insertSql.append(") values (");
+ idConfig.getDbFieldList().forEach((field) -> {
+ insertSql.append("?,");
+ });
+ insertSql.deleteCharAt(insertSql.length() - 1);
+ insertSql.append(")");
+ idConfig.setInsertSql(insertSql.toString());
+ }
+ }
+ }
+
+ /**
+ * addSendMetric
+ *
+ * @param currentRecord
+ */
+ public void addSendMetric(DispatchProfile currentRecord) {
+ Map<String, String> dimensions = new HashMap<>();
+ dimensions.put(SortMetricItem.KEY_CLUSTER_ID, this.getClusterId());
+ dimensions.put(SortMetricItem.KEY_TASK_NAME, this.getTaskName());
+ dimensions.put(SortMetricItem.KEY_SOURCE_ID, "-");
+ dimensions.put(SortMetricItem.KEY_SOURCE_DATA_ID, "-");
+ dimensions.put(SortMetricItem.KEY_SINK_ID, this.getSinkName());
+ dimensions.put(SortMetricItem.KEY_SINK_DATA_ID, "-");
+ dimensions.put(SortMetricItem.KEY_INLONG_GROUP_ID, currentRecord.getInlongGroupId());
+ dimensions.put(SortMetricItem.KEY_INLONG_STREAM_ID, currentRecord.getInlongStreamId());
+ // msgTime
+ long msgTime = currentRecord.getDispatchTime();
+ long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
+ dimensions.put(SortMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
+ // find metric
+ SortMetricItem metricItem = this.getMetricItemSet().findMetricItem(dimensions);
+ metricItem.sendCount.addAndGet(currentRecord.getCount());
+ metricItem.sendSize.addAndGet(currentRecord.getSize());
+ }
+
+ /**
+ * addReadFailMetric
+ * @param errorMsg
+ * @param currentRecord
+ */
+ public void addSendFailMetric(String errorMsg, DispatchProfile currentRecord) {
+ Map<String, String> dimensions = new HashMap<>();
+ dimensions.put(SortMetricItem.KEY_CLUSTER_ID, this.getClusterId());
+ dimensions.put(SortMetricItem.KEY_TASK_NAME, this.getTaskName());
+ dimensions.put(SortMetricItem.KEY_SOURCE_ID, "-");
+ dimensions.put(SortMetricItem.KEY_SOURCE_DATA_ID, "-");
+ dimensions.put(SortMetricItem.KEY_SINK_ID, this.getSinkName());
+ dimensions.put(SortMetricItem.KEY_SINK_DATA_ID, errorMsg);
+ dimensions.put(SortMetricItem.KEY_INLONG_GROUP_ID, currentRecord.getInlongGroupId());
+ dimensions.put(SortMetricItem.KEY_INLONG_STREAM_ID, currentRecord.getInlongStreamId());
+ // msgTime
+ long msgTime = System.currentTimeMillis();
+ long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
+ dimensions.put(SortMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
+ // find metric
+ SortMetricItem metricItem = this.getMetricItemSet().findMetricItem(dimensions);
+ metricItem.readFailCount.addAndGet(currentRecord.getCount());
+ metricItem.readFailSize.addAndGet(currentRecord.getSize());
+ }
+
+ /**
+ * addReadFailMetric
+ * @param errorMsg
+ * @param event
+ */
+ public void addSendFailMetric(String errorMsg, ProfileEvent event) {
+ Map<String, String> dimensions = new HashMap<>();
+ dimensions.put(SortMetricItem.KEY_CLUSTER_ID, this.getClusterId());
+ dimensions.put(SortMetricItem.KEY_TASK_NAME, this.getTaskName());
+ dimensions.put(SortMetricItem.KEY_SOURCE_ID, "-");
+ dimensions.put(SortMetricItem.KEY_SOURCE_DATA_ID, "-");
+ dimensions.put(SortMetricItem.KEY_SINK_ID, this.getSinkName());
+ dimensions.put(SortMetricItem.KEY_SINK_DATA_ID, errorMsg);
+ dimensions.put(SortMetricItem.KEY_INLONG_GROUP_ID, event.getInlongGroupId());
+ dimensions.put(SortMetricItem.KEY_INLONG_STREAM_ID, event.getInlongStreamId());
+ // msgTime
+ long msgTime = System.currentTimeMillis();
+ long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
+ dimensions.put(SortMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
+ // find metric
+ SortMetricItem metricItem = this.getMetricItemSet().findMetricItem(dimensions);
+ metricItem.readFailCount.incrementAndGet();
+ metricItem.readFailSize.addAndGet(event.getBody().length);
+ }
+
+ /**
+ * addReadFailMetric
+ */
+ public void addSendFailMetric(String errorMsg) {
+ Map<String, String> dimensions = new HashMap<>();
+ dimensions.put(SortMetricItem.KEY_CLUSTER_ID, this.getClusterId());
+ dimensions.put(SortMetricItem.KEY_TASK_NAME, this.getTaskName());
+ dimensions.put(SortMetricItem.KEY_SOURCE_ID, "-");
+ dimensions.put(SortMetricItem.KEY_SOURCE_DATA_ID, "-");
+ dimensions.put(SortMetricItem.KEY_SINK_ID, this.getSinkName());
+ dimensions.put(SortMetricItem.KEY_SINK_DATA_ID, errorMsg);
+ dimensions.put(SortMetricItem.KEY_INLONG_GROUP_ID, "-");
+ dimensions.put(SortMetricItem.KEY_INLONG_STREAM_ID, "-");
+ // msgTime
+ long msgTime = System.currentTimeMillis();
+ long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
+ dimensions.put(SortMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
+ // find metric
+ SortMetricItem metricItem = this.getMetricItemSet().findMetricItem(dimensions);
+ metricItem.readFailCount.incrementAndGet();
+ }
+
+ /**
+ * addSendSuccessMetric
+ *
+ * @param currentRecord
+ * @param sendTime
+ */
+ public void addSendSuccessMetric(DispatchProfile currentRecord, long sendTime) {
+ Map<String, String> dimensions = new HashMap<>();
+ dimensions.put(SortMetricItem.KEY_CLUSTER_ID, this.getClusterId());
+ dimensions.put(SortMetricItem.KEY_TASK_NAME, this.getTaskName());
+ dimensions.put(SortMetricItem.KEY_SOURCE_ID, "-");
+ dimensions.put(SortMetricItem.KEY_SOURCE_DATA_ID, "-");
+ dimensions.put(SortMetricItem.KEY_SINK_ID, this.getSinkName());
+ dimensions.put(SortMetricItem.KEY_SINK_DATA_ID, "-");
+ dimensions.put(SortMetricItem.KEY_INLONG_GROUP_ID, currentRecord.getInlongGroupId());
+ dimensions.put(SortMetricItem.KEY_INLONG_STREAM_ID, currentRecord.getInlongStreamId());
+ long currentTime = System.currentTimeMillis();
+ for (ProfileEvent event : currentRecord.getEvents()) {
+ long msgTime = event.getRawLogTime();
+ long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
+ dimensions.put(SortMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
+ SortMetricItem metricItem = this.getMetricItemSet().findMetricItem(dimensions);
+ metricItem.sendSuccessCount.incrementAndGet();
+ metricItem.sendSuccessSize.addAndGet(event.getBody().length);
+ long sinkDuration = currentTime - sendTime;
+ long nodeDuration = currentTime - event.getFetchTime();
+ long wholeDuration = currentTime - msgTime;
+ metricItem.sinkDuration.addAndGet(sinkDuration);
+ metricItem.nodeDuration.addAndGet(nodeDuration);
+ metricItem.wholeDuration.addAndGet(wholeDuration);
+ AuditUtils.add(AuditUtils.AUDIT_ID_SEND_SUCCESS, event);
+ }
+ }
+
+ /**
+ * get nodeId
+ * @return the nodeId
+ */
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ /**
+ * set nodeId
+ * @param nodeId the nodeId to set
+ */
+ public void setNodeId(String nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ /**
+ * get jdbcDriver
+ * @return the jdbcDriver
+ */
+ public String getJdbcDriver() {
+ return jdbcDriver;
+ }
+
+ /**
+ * set jdbcDriver
+ * @param jdbcDriver the jdbcDriver to set
+ */
+ public void setJdbcDriver(String jdbcDriver) {
+ this.jdbcDriver = jdbcDriver;
+ }
+
+ /**
+ * get jdbcUrl
+ * @return the jdbcUrl
+ */
+ public String getJdbcUrl() {
+ return jdbcUrl;
+ }
+
+ /**
+ * set jdbcUrl
+ * @param jdbcUrl the jdbcUrl to set
+ */
+ public void setJdbcUrl(String jdbcUrl) {
+ this.jdbcUrl = jdbcUrl;
+ }
+
+ /**
+ * get jdbcUsername
+ * @return the jdbcUsername
+ */
+ public String getJdbcUsername() {
+ return jdbcUsername;
+ }
+
+ /**
+ * set jdbcUsername
+ * @param jdbcUsername the jdbcUsername to set
+ */
+ public void setJdbcUsername(String jdbcUsername) {
+ this.jdbcUsername = jdbcUsername;
+ }
+
+ /**
+ * get jdbcPassword
+ * @return the jdbcPassword
+ */
+ public String getJdbcPassword() {
+ return jdbcPassword;
+ }
+
+ /**
+ * set jdbcPassword
+ * @param jdbcPassword the jdbcPassword to set
+ */
+ public void setJdbcPassword(String jdbcPassword) {
+ this.jdbcPassword = jdbcPassword;
+ }
+
+ /**
+ * get dispatchQueue
+ * @return the dispatchQueue
+ */
+ public LinkedBlockingQueue<DispatchProfile> getDispatchQueue() {
+ return dispatchQueue;
+ }
+
+ /**
+ * getIdConfig
+ *
+ * @param uid
+ * @return
+ */
+ public ClickHouseIdConfig getIdConfig(String uid) {
+ return this.idConfigMap.get(uid);
+ }
+
+ /**
+ * create createEventHandler
+ *
+ * @return the IEventHandler
+ */
+ public IEventHandler createEventHandler() {
+ // IEventHandler
+ String eventHandlerClass = CommonPropertiesHolder.getString(KEY_EVENT_HANDLER,
+ DefaultEventHandler.class.getName());
+ try {
+ Class<?> handlerClass = ClassUtils.getClass(eventHandlerClass);
+ Object handlerObject = handlerClass.getDeclaredConstructor().newInstance();
+ if (handlerObject instanceof IEventHandler) {
+ IEventHandler handler = (IEventHandler) handlerObject;
+ return handler;
+ }
+ } catch (Throwable t) {
+ LOG.error("Fail to init IEventHandler,handlerClass:{},error:{}",
+ eventHandlerClass, t.getMessage(), t);
+ }
+ return null;
+ }
+}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/clickhouse/DefaultEventHandler.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/clickhouse/DefaultEventHandler.java
new file mode 100644
index 000000000..aa8a08b76
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/clickhouse/DefaultEventHandler.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.clickhouse;
+
+import org.apache.commons.math3.util.Pair;
+import org.apache.inlong.sdk.commons.protocol.EventConstants;
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.utils.UnescapeHelper;
+import org.apache.pulsar.shade.org.apache.commons.lang3.math.NumberUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.text.SimpleDateFormat;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * DefaultEventHandler
+ */
+public class DefaultEventHandler implements IEventHandler {
+
+ public static final Logger LOG = LoggerFactory.getLogger(DefaultEventHandler.class);
+
+ public static final String KEY_EXTINFO = "extinfo";
+
+ private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+ /**
+ * parse
+ * @param idConfig
+ * @param event
+ * @return
+ */
+ @Override
+ public Map<String, String> parse(ClickHouseIdConfig idConfig, ProfileEvent event) {
+ final Map<String, String> resultMap = new HashMap<>();
+ // parse fields
+ String delimeter = idConfig.getSeparator();
+ char cDelimeter = delimeter.charAt(0);
+ String strContext = null;
+ // for tab separator
+ byte[] bodyBytes = event.getBody();
+ int msgLength = event.getBody().length;
+ int contentOffset = idConfig.getContentOffset();
+ if (contentOffset > 0 && msgLength >= 1) {
+ strContext = new String(bodyBytes, contentOffset, msgLength - contentOffset, Charset.defaultCharset());
+ } else {
+ strContext = new String(bodyBytes, Charset.defaultCharset());
+ }
+ // unescape
+ List<String> columnValues = UnescapeHelper.toFiledList(strContext, cDelimeter);
+ // column size
+ List<String> contentFieldList = idConfig.getContentFieldList();
+ int matchSize = Math.min(contentFieldList.size(), columnValues.size());
+ for (int i = 0; i < matchSize; i++) {
+ resultMap.put(contentFieldList.get(i), columnValues.get(i));
+ }
+
+ // ftime
+ String ftime = dateFormat.format(new Date(event.getRawLogTime()));
+ resultMap.put("ftime", ftime);
+ // extinfo
+ String extinfo = getExtInfo(event);
+ resultMap.put("extinfo", extinfo);
+ return resultMap;
+ }
+
+ /**
+ * getExtInfo
+ *
+ * @param event
+ * @return
+ */
+ public static String getExtInfo(ProfileEvent event) {
+ String extinfoValue = event.getHeaders().get(KEY_EXTINFO);
+ if (extinfoValue != null) {
+ return KEY_EXTINFO + "=" + extinfoValue;
+ }
+ extinfoValue = KEY_EXTINFO + "=" + event.getHeaders().get(EventConstants.HEADER_KEY_SOURCE_IP);
+ return extinfoValue;
+ }
+
+ /**
+ * setValue
+ * @param idConfig
+ * @param columnValueMap
+ * @param pstat
+ * @throws SQLException
+ */
+ public void setValue(ClickHouseIdConfig idConfig, Map<String, String> columnValueMap, PreparedStatement pstat)
+ throws SQLException {
+ List<Pair<String, Integer>> dbFieldList = idConfig.getDbFieldList();
+ for (int i = 1; i <= dbFieldList.size(); i++) {
+ Pair<String, Integer> pair = dbFieldList.get(i - 1);
+ String fieldValue = columnValueMap.getOrDefault(pair.getKey(), "");
+ int fieldType = pair.getValue();
+ switch (fieldType) {
+ // Int8 - [-128 : 127]
+ case Types.TINYINT :
+ // TINYINT = -6;
+ pstat.setByte(i, NumberUtils.toByte(fieldValue, (byte) 0));
+ break;
+ // Int16 - [-32768 : 32767]
+ case Types.SMALLINT :
+ // SMALLINT= 5;
+ pstat.setShort(i, NumberUtils.toShort(fieldValue, (short) 0));
+ break;
+ // Int32 - [-2147483648 : 2147483647]
+ case Types.INTEGER :
+ // INTEGER = 4;
+ pstat.setInt(i, NumberUtils.toInt(fieldValue, 0));
+ break;
+ // Int64 - [-9223372036854775808 : 9223372036854775807]
+ // UInt8 - [0 : 255]
+ // UInt16 - [0 : 65535]
+ // UInt32 - [0 : 4294967295]
+ // UInt64 - [0 : 18446744073709551615]
+ case Types.BIGINT :
+ // BIGINT = -5;
+ pstat.setLong(i, NumberUtils.toLong(fieldValue, 0));
+ break;
+ // Float32 - float
+ case Types.FLOAT :
+ // FLOAT = 6;
+ pstat.setFloat(i, NumberUtils.toFloat(fieldValue, 0));
+ break;
+ // Float64 – double
+ case Types.DOUBLE :
+ // DOUBLE = 8;
+ pstat.setDouble(i, NumberUtils.toDouble(fieldValue, 0));
+ break;
+ // Decimal32(s)
+ // Decimal64(s)
+ // Decimal128(s)
+ case Types.NUMERIC :
+ // NUMERIC = 2;
+ pstat.setBigDecimal(i, NumberUtils.toScaledBigDecimal(fieldValue));
+ break;
+ // String
+ // FixedString(N)
+ // Enum8
+ // Enum16
+ case Types.VARCHAR :
+ // VARCHAR = 12;
+ case Types.LONGVARCHAR :
+ // LONGVARCHAR = -1;
+ pstat.setString(i, fieldValue);
+ break;
+ // Date
+ case Types.DATE :
+ // DATE= 91;
+ pstat.setDate(i, this.parseDate(fieldValue));
+ break;
+ // Datetime
+ // Datetime64
+ case Types.TIMESTAMP :
+ // TIMESTAMP = 93;
+ pstat.setTimestamp(i, new Timestamp(this.parseDate(fieldValue).getTime()));
+ break;
+
+ case Types.TIME :
+ // TIME= 92;
+ pstat.setTime(i, new Time(this.parseDate(fieldValue).getTime()));
+ break;
+ default :
+ pstat.setString(i, fieldValue);
+ break;
+ }
+ }
+ }
+
+ /**
+ * parseDate
+ * @param fieldValue
+ * @return
+ */
+ private Date parseDate(String fieldValue) {
+ try {
+ return new Date(dateFormat.parse(fieldValue).getTime());
+ } catch (Exception e) {
+ return new Date(0);
+ }
+ }
+}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/clickhouse/IEventHandler.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/clickhouse/IEventHandler.java
new file mode 100644
index 000000000..11b2aae8a
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/clickhouse/IEventHandler.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.clickhouse;
+
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Map;
+
+/**
+ *
+ * IEventHandler
+ */
+public interface IEventHandler {
+
+ /**
+ * parse
+ *
+ * @param idConfig
+ * @param event
+ * @return
+ */
+ Map<String, String> parse(ClickHouseIdConfig idConfig, ProfileEvent event);
+
+ /**
+ * setValue
+ * @param idConfig
+ * @param columnValueMap
+ * @param pstat
+ * @throws SQLException
+ */
+ void setValue(ClickHouseIdConfig idConfig, Map<String, String> columnValueMap, PreparedStatement pstat)
+ throws SQLException;
+}