You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/09/05 18:42:47 UTC

[GitHub] [incubator-seatunnel] githublaohu commented on a diff in pull request #2651: [Feature][Connector] New SeaTunnel API Connectors #1946 Add Druid Source&Sink

githublaohu commented on code in PR #2651:
URL: https://github.com/apache/incubator-seatunnel/pull/2651#discussion_r963081268


##########
docs/en/connector-v2/source/Druid.md:
##########
@@ -0,0 +1,54 @@
+# Druid
+
+> Druid source connector
+
+## Description
+
+Read data from Apache Druid.
+
+## Options
+
+| name       | type           | required | default value |
+| ---------- | -------------- | -------- | ------------- |
+| url        | `String`       | yes      | -             |
+| datasource | `String`       | yes      | -             |
+| start_date | `String`       | no       | -             |
+| end_date   | `String`       | no       | -             |
+| columns    | `List<String>` | no       | `*`           |
+
+### url [`String`]
+
+The URL of JDBC of Apache Druid.
+
+### datasource [`String`]
+
+The DataSource name in Apache Druid.
+
+### start_date [`String`]
+
+The start date of DataSource, for example, `'2016-06-27'`, `'2016-06-27 00:00:00'`, etc.
+
+### end_date [`String`]
+
+The end date of DataSource, for example, `'2016-06-28'`, `'2016-06-28 00:00:00'`, etc.
+
+### columns [`List<String>`]
+
+These columns that you want to write  of DataSource.
+
+### common options [string]
+
+Source Plugin common parameters, refer to [Source Plugin](common-options.mdx) for details
+
+
+## Example
+
+```hocon
+DruidSource {
+  url = "jdbc:avatica:remote:url=http://localhost:8082/druid/v2/sql/avatica/"

Review Comment:
   1. jdbc的协议有http吗?
   2. 请使用说明对url进行标注, http://{IP or domain}:[port]/
   3. 字段作用与说明请写上



##########
seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/client/DruidInputFormat.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.druid.client;
+
+import lombok.Data;
+import org.apache.seatunnel.api.table.type.*;
+import org.apache.seatunnel.connectors.seatunnel.druid.config.DruidSourceOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.seatunnel.connectors.seatunnel.druid.config.DruidTypeMapper;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.sql.*;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import java.util.TimeZone;
+
+@Data
+public class DruidInputFormat implements Serializable {
+    private static final Logger LOGGER = LoggerFactory.getLogger(DruidInputFormat.class);
+
+    protected transient Connection connection;
+    protected transient PreparedStatement statement;
+    protected transient ResultSet resultSet;
+    protected SeaTunnelRowType rowTypeInfo;
+    protected DruidSourceOptions druidSourceOptions;
+
+    protected static final String COLUMNS_DEFAULT = "*";
+    protected static final String QUERY_TEMPLATE = "SELECT %s FROM %s WHERE 1=1";
+    protected String quarySQL ;
+    protected boolean hasNext;
+
+    public DruidInputFormat(DruidSourceOptions druidSourceOptions) {
+        this.druidSourceOptions = druidSourceOptions;
+        this.rowTypeInfo = initTableField();
+    }
+
+    public ResultSetMetaData getResultSetMetaData() throws SQLException {
+        try {
+            quarySQL = getSQL();
+            connection = DriverManager.getConnection(druidSourceOptions.getURL());
+            statement = connection.prepareStatement(quarySQL, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+            return statement.getMetaData();
+        } catch (SQLException se) {
+            throw new SQLException("ResultSetMetaData() failed." + se.getMessage(), se);

Review Comment:
   与openInputFormat的异常不一样,不应该使用一样的异常吗?



##########
seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/client/DruidInputFormat.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.druid.client;
+
+import lombok.Data;
+import org.apache.seatunnel.api.table.type.*;
+import org.apache.seatunnel.connectors.seatunnel.druid.config.DruidSourceOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.seatunnel.connectors.seatunnel.druid.config.DruidTypeMapper;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.sql.*;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import java.util.TimeZone;
+
+@Data
+public class DruidInputFormat implements Serializable {
+    private static final Logger LOGGER = LoggerFactory.getLogger(DruidInputFormat.class);
+
+    protected transient Connection connection;
+    protected transient PreparedStatement statement;
+    protected transient ResultSet resultSet;
+    protected SeaTunnelRowType rowTypeInfo;
+    protected DruidSourceOptions druidSourceOptions;
+
+    protected static final String COLUMNS_DEFAULT = "*";
+    protected static final String QUERY_TEMPLATE = "SELECT %s FROM %s WHERE 1=1";
+    protected String quarySQL ;
+    protected boolean hasNext;
+
+    public DruidInputFormat(DruidSourceOptions druidSourceOptions) {
+        this.druidSourceOptions = druidSourceOptions;
+        this.rowTypeInfo = initTableField();
+    }
+
+    public ResultSetMetaData getResultSetMetaData() throws SQLException {
+        try {
+            quarySQL = getSQL();
+            connection = DriverManager.getConnection(druidSourceOptions.getURL());
+            statement = connection.prepareStatement(quarySQL, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+            return statement.getMetaData();
+        } catch (SQLException se) {
+            throw new SQLException("ResultSetMetaData() failed." + se.getMessage(), se);
+        }
+    }
+    public void openInputFormat() {
+        try {
+            connection = DriverManager.getConnection(druidSourceOptions.getURL());
+            statement = connection.prepareStatement(quarySQL, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+            resultSet = statement.executeQuery();
+            hasNext = resultSet.next();
+        } catch (SQLException se) {
+            throw new IllegalArgumentException("openInputFormat() failed." + se.getMessage(), se);
+        }
+    }
+
+    private String getSQL() throws SQLException {
+        String columns = COLUMNS_DEFAULT;
+        String startTimestamp = druidSourceOptions.getStartTimestamp();
+        String endTimestamp = druidSourceOptions.getEndTimestamp();
+        String dataSource = druidSourceOptions.getDatasource();
+
+        if (druidSourceOptions.getColumns() != null && druidSourceOptions.getColumns().size() > 0) {
+            columns = String.join(",", druidSourceOptions.getColumns());
+        }
+
+        String sql = String.format(QUERY_TEMPLATE, columns, dataSource);

Review Comment:
   请使用stringbuffer,代码可维护性好些



##########
seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/client/DruidOutputFormat.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.druid.client;
+
+import com.fasterxml.jackson.datatype.joda.JodaModule;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import lombok.Data;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.druid.data.input.MaxSizeSplitHintSpec;
+import org.apache.druid.data.input.impl.CsvInputFormat;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.InlineInputSource;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig;
+import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec;
+import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
+import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+@Data
+public class DruidOutputFormat implements Serializable {
+    private static final Logger LOGGER = LoggerFactory.getLogger(DruidOutputFormat.class);
+
+    private static final String DEFAULT_TIMESTAMP_COLUMN = "timestamp";
+    private static final String DEFAULT_TIMESTAMP_FORMAT = "auto";
+    private static final DateTime DEFAULT_TIMESTAMP_MISSING_VALUE = new DateTime();
+    public static final String DEFAULT_LINE_DELIMITER = "\n";
+    public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+    private final transient StringBuffer data;
+    private final String coordinatorURL;
+    private final String datasource;
+    private final String timestampColumn;
+    private final String timestampFormat;
+    private final DateTime timestampMissingValue;
+    private  List<String> columns;
+
+    public DruidOutputFormat(String coordinatorURL,
+                             String datasource,
+                             String timestampColumn,
+                             String timestampFormat,
+                             String timestampMissingValue,
+                             List<String> columns
+                             ) {
+        this.data = new StringBuffer();
+        this.coordinatorURL = coordinatorURL;
+        this.datasource = datasource;
+        this.timestampColumn = timestampColumn == null ? DEFAULT_TIMESTAMP_COLUMN : timestampColumn;
+        this.timestampFormat = timestampFormat == null ? DEFAULT_TIMESTAMP_FORMAT : timestampFormat;
+        this.timestampMissingValue = timestampMissingValue == null ? DEFAULT_TIMESTAMP_MISSING_VALUE : DateTimes.of(timestampMissingValue);
+        this.columns = columns;
+    }
+
+    public void write(SeaTunnelRow element) {
+        int fieldIndex = element.getArity();
+        for (int i = 0; i < fieldIndex; i++) {
+            Object v = element.getField(i);
+            if (i != 0) {
+                this.data.append(DEFAULT_FIELD_DELIMITER);
+            }
+            if (v != null) {
+                this.data.append(v);
+            }
+        }
+        this.data.append(DEFAULT_LINE_DELIMITER);
+    }
+
+    public void closeOutputFormat() {
+        try {
+            ParallelIndexIOConfig ioConfig = parallelIndexIOConfig();
+            ParallelIndexTuningConfig tuningConfig = tuningConfig();
+            ParallelIndexSupervisorTask indexTask = parallelIndexSupervisorTask(ioConfig, tuningConfig);
+            ObjectMapper mapper = new ObjectMapper();
+            mapper.registerModule(new JodaModule());
+            mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+            mapper.configure(MapperFeature.AUTO_DETECT_GETTERS, false);
+            mapper.configure(MapperFeature.AUTO_DETECT_FIELDS, false);
+            mapper.configure(MapperFeature.AUTO_DETECT_IS_GETTERS, false);
+            mapper.configure(MapperFeature.AUTO_DETECT_SETTERS, false);
+            mapper.configure(SerializationFeature.INDENT_OUTPUT, false);
+            mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
+            String taskJSON = mapper.writeValueAsString(indexTask);
+            JSONObject jsonObject = JSON.parseObject(taskJSON);
+            jsonObject.remove("id");
+            jsonObject.remove("groupId");
+            jsonObject.remove("resource");
+            JSONObject spec = jsonObject.getJSONObject("spec");
+            spec.remove("tuningConfig");
+            jsonObject.put("spec", spec);
+            taskJSON = jsonObject.toJSONString();
+
+            URL url = new URL(this.coordinatorURL + "druid/indexer/v1/task");
+            HttpURLConnection con = (HttpURLConnection) url.openConnection();
+            con.setRequestMethod("POST");
+            con.setRequestProperty("Content-Type", "application/json");
+            con.setRequestProperty("Accept", "application/json, text/plain, */*");
+            con.setDoOutput(true);
+            try (OutputStream os = con.getOutputStream()) {
+                byte[] input = taskJSON.getBytes(StandardCharsets.UTF_8);
+                os.write(input, 0, input.length);
+            }
+            try (BufferedReader br = new BufferedReader(new InputStreamReader(con.getInputStream(), StandardCharsets.UTF_8))) {
+                StringBuilder response = new StringBuilder();
+                String responseLine;
+                while ((responseLine = br.readLine()) != null) {
+                    response.append(responseLine.trim());
+                }
+                LOGGER.info("Druid write task has been sent, and the response is {}", response.toString());
+            }
+        }catch (IOException e){
+            e.printStackTrace();
+        }
+    }
+
+    private ParallelIndexSupervisorTask parallelIndexSupervisorTask(ParallelIndexIOConfig ioConfig, ParallelIndexTuningConfig tuningConfig) {
+        return new ParallelIndexSupervisorTask(

Review Comment:
   请使用构造者模式,这样的代码可维护与可读太差了



##########
seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/client/DruidOutputFormat.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.druid.client;
+
+import com.fasterxml.jackson.datatype.joda.JodaModule;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import lombok.Data;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.druid.data.input.MaxSizeSplitHintSpec;
+import org.apache.druid.data.input.impl.CsvInputFormat;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.InlineInputSource;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig;
+import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec;
+import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
+import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+@Data
+public class DruidOutputFormat implements Serializable {
+    private static final Logger LOGGER = LoggerFactory.getLogger(DruidOutputFormat.class);
+
+    private static final String DEFAULT_TIMESTAMP_COLUMN = "timestamp";
+    private static final String DEFAULT_TIMESTAMP_FORMAT = "auto";
+    private static final DateTime DEFAULT_TIMESTAMP_MISSING_VALUE = new DateTime();
+    public static final String DEFAULT_LINE_DELIMITER = "\n";
+    public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+    private final transient StringBuffer data;
+    private final String coordinatorURL;
+    private final String datasource;
+    private final String timestampColumn;
+    private final String timestampFormat;
+    private final DateTime timestampMissingValue;
+    private  List<String> columns;
+
+    public DruidOutputFormat(String coordinatorURL,
+                             String datasource,
+                             String timestampColumn,
+                             String timestampFormat,
+                             String timestampMissingValue,
+                             List<String> columns
+                             ) {
+        this.data = new StringBuffer();
+        this.coordinatorURL = coordinatorURL;
+        this.datasource = datasource;
+        this.timestampColumn = timestampColumn == null ? DEFAULT_TIMESTAMP_COLUMN : timestampColumn;
+        this.timestampFormat = timestampFormat == null ? DEFAULT_TIMESTAMP_FORMAT : timestampFormat;
+        this.timestampMissingValue = timestampMissingValue == null ? DEFAULT_TIMESTAMP_MISSING_VALUE : DateTimes.of(timestampMissingValue);
+        this.columns = columns;
+    }
+
+    public void write(SeaTunnelRow element) {
+        int fieldIndex = element.getArity();
+        for (int i = 0; i < fieldIndex; i++) {
+            Object v = element.getField(i);
+            if (i != 0) {
+                this.data.append(DEFAULT_FIELD_DELIMITER);
+            }
+            if (v != null) {
+                this.data.append(v);
+            }
+        }
+        this.data.append(DEFAULT_LINE_DELIMITER);
+    }
+
+    public void closeOutputFormat() {
+        try {
+            ParallelIndexIOConfig ioConfig = parallelIndexIOConfig();
+            ParallelIndexTuningConfig tuningConfig = tuningConfig();
+            ParallelIndexSupervisorTask indexTask = parallelIndexSupervisorTask(ioConfig, tuningConfig);
+            ObjectMapper mapper = new ObjectMapper();
+            mapper.registerModule(new JodaModule());
+            mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+            mapper.configure(MapperFeature.AUTO_DETECT_GETTERS, false);
+            mapper.configure(MapperFeature.AUTO_DETECT_FIELDS, false);
+            mapper.configure(MapperFeature.AUTO_DETECT_IS_GETTERS, false);
+            mapper.configure(MapperFeature.AUTO_DETECT_SETTERS, false);
+            mapper.configure(SerializationFeature.INDENT_OUTPUT, false);
+            mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
+            String taskJSON = mapper.writeValueAsString(indexTask);
+            JSONObject jsonObject = JSON.parseObject(taskJSON);
+            jsonObject.remove("id");
+            jsonObject.remove("groupId");
+            jsonObject.remove("resource");
+            JSONObject spec = jsonObject.getJSONObject("spec");
+            spec.remove("tuningConfig");
+            jsonObject.put("spec", spec);
+            taskJSON = jsonObject.toJSONString();
+
+            URL url = new URL(this.coordinatorURL + "druid/indexer/v1/task");
+            HttpURLConnection con = (HttpURLConnection) url.openConnection();
+            con.setRequestMethod("POST");
+            con.setRequestProperty("Content-Type", "application/json");
+            con.setRequestProperty("Accept", "application/json, text/plain, */*");
+            con.setDoOutput(true);
+            try (OutputStream os = con.getOutputStream()) {
+                byte[] input = taskJSON.getBytes(StandardCharsets.UTF_8);
+                os.write(input, 0, input.length);
+            }
+            try (BufferedReader br = new BufferedReader(new InputStreamReader(con.getInputStream(), StandardCharsets.UTF_8))) {
+                StringBuilder response = new StringBuilder();
+                String responseLine;
+                while ((responseLine = br.readLine()) != null) {
+                    response.append(responseLine.trim());
+                }
+                LOGGER.info("Druid write task has been sent, and the response is {}", response.toString());
+            }
+        }catch (IOException e){
+            e.printStackTrace();
+        }
+    }
+
+    private ParallelIndexSupervisorTask parallelIndexSupervisorTask(ParallelIndexIOConfig ioConfig, ParallelIndexTuningConfig tuningConfig) {
+        return new ParallelIndexSupervisorTask(
+                null,
+                null,
+                null,
+                new ParallelIndexIngestionSpec(
+                        new DataSchema(
+                                this.datasource,
+                                new TimestampSpec(this.timestampColumn, this.timestampFormat, this.timestampMissingValue),
+                                new DimensionsSpec(Collections.emptyList()),
+                                null,
+                                new UniformGranularitySpec(Granularities.HOUR, Granularities.MINUTE, false, null),
+                                null
+                        ),
+                        ioConfig,
+                        tuningConfig
+                ),
+                null
+        );
+    }
+
+    private ParallelIndexIOConfig parallelIndexIOConfig() {
+        List columnss = new ArrayList();
+        CollectionUtils.addAll(columnss,this.getColumns().get(0).split(","));
+        columnss.add(timestampColumn);
+
+        return new ParallelIndexIOConfig(
+                null,
+                new InlineInputSource(this.data.toString()),
+                new CsvInputFormat(
+                        columnss,
+                        "|",
+                        null,
+                        false,
+                        0
+                ),
+                false,
+                null
+        );
+    }
+
+    private ParallelIndexTuningConfig tuningConfig() {
+        return new ParallelIndexTuningConfig(

Review Comment:
   同151



##########
seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/client/DruidOutputFormat.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.druid.client;
+
+import com.fasterxml.jackson.datatype.joda.JodaModule;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import lombok.Data;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.druid.data.input.MaxSizeSplitHintSpec;
+import org.apache.druid.data.input.impl.CsvInputFormat;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.InlineInputSource;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig;
+import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec;
+import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
+import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+@Data
+public class DruidOutputFormat implements Serializable {
+    private static final Logger LOGGER = LoggerFactory.getLogger(DruidOutputFormat.class);
+
+    private static final String DEFAULT_TIMESTAMP_COLUMN = "timestamp";
+    private static final String DEFAULT_TIMESTAMP_FORMAT = "auto";
+    private static final DateTime DEFAULT_TIMESTAMP_MISSING_VALUE = new DateTime();
+    public static final String DEFAULT_LINE_DELIMITER = "\n";
+    public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+    private final transient StringBuffer data;
+    private final String coordinatorURL;
+    private final String datasource;
+    private final String timestampColumn;
+    private final String timestampFormat;
+    private final DateTime timestampMissingValue;
+    private  List<String> columns;
+
+    public DruidOutputFormat(String coordinatorURL,
+                             String datasource,
+                             String timestampColumn,
+                             String timestampFormat,
+                             String timestampMissingValue,
+                             List<String> columns
+                             ) {
+        this.data = new StringBuffer();
+        this.coordinatorURL = coordinatorURL;
+        this.datasource = datasource;
+        this.timestampColumn = timestampColumn == null ? DEFAULT_TIMESTAMP_COLUMN : timestampColumn;
+        this.timestampFormat = timestampFormat == null ? DEFAULT_TIMESTAMP_FORMAT : timestampFormat;
+        this.timestampMissingValue = timestampMissingValue == null ? DEFAULT_TIMESTAMP_MISSING_VALUE : DateTimes.of(timestampMissingValue);
+        this.columns = columns;
+    }
+
+    public void write(SeaTunnelRow element) {
+        int fieldIndex = element.getArity();
+        for (int i = 0; i < fieldIndex; i++) {
+            Object v = element.getField(i);
+            if (i != 0) {
+                this.data.append(DEFAULT_FIELD_DELIMITER);
+            }
+            if (v != null) {
+                this.data.append(v);
+            }
+        }
+        this.data.append(DEFAULT_LINE_DELIMITER);
+    }
+
+    public void closeOutputFormat() {
+        try {
+            ParallelIndexIOConfig ioConfig = parallelIndexIOConfig();
+            ParallelIndexTuningConfig tuningConfig = tuningConfig();
+            ParallelIndexSupervisorTask indexTask = parallelIndexSupervisorTask(ioConfig, tuningConfig);
+            ObjectMapper mapper = new ObjectMapper();
+            mapper.registerModule(new JodaModule());
+            mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+            mapper.configure(MapperFeature.AUTO_DETECT_GETTERS, false);
+            mapper.configure(MapperFeature.AUTO_DETECT_FIELDS, false);
+            mapper.configure(MapperFeature.AUTO_DETECT_IS_GETTERS, false);
+            mapper.configure(MapperFeature.AUTO_DETECT_SETTERS, false);
+            mapper.configure(SerializationFeature.INDENT_OUTPUT, false);
+            mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
+            String taskJSON = mapper.writeValueAsString(indexTask);
+            JSONObject jsonObject = JSON.parseObject(taskJSON);
+            jsonObject.remove("id");
+            jsonObject.remove("groupId");
+            jsonObject.remove("resource");
+            JSONObject spec = jsonObject.getJSONObject("spec");
+            spec.remove("tuningConfig");
+            jsonObject.put("spec", spec);
+            taskJSON = jsonObject.toJSONString();
+
+            URL url = new URL(this.coordinatorURL + "druid/indexer/v1/task");
+            HttpURLConnection con = (HttpURLConnection) url.openConnection();
+            con.setRequestMethod("POST");
+            con.setRequestProperty("Content-Type", "application/json");
+            con.setRequestProperty("Accept", "application/json, text/plain, */*");
+            con.setDoOutput(true);
+            try (OutputStream os = con.getOutputStream()) {
+                byte[] input = taskJSON.getBytes(StandardCharsets.UTF_8);
+                os.write(input, 0, input.length);
+            }
+            try (BufferedReader br = new BufferedReader(new InputStreamReader(con.getInputStream(), StandardCharsets.UTF_8))) {
+                StringBuilder response = new StringBuilder();
+                String responseLine;
+                while ((responseLine = br.readLine()) != null) {
+                    response.append(responseLine.trim());
+                }
+                LOGGER.info("Druid write task has been sent, and the response is {}", response.toString());
+            }
+        }catch (IOException e){
+            e.printStackTrace();
+        }
+    }
+
+    private ParallelIndexSupervisorTask parallelIndexSupervisorTask(ParallelIndexIOConfig ioConfig, ParallelIndexTuningConfig tuningConfig) {
+        return new ParallelIndexSupervisorTask(
+                null,
+                null,
+                null,
+                new ParallelIndexIngestionSpec(
+                        new DataSchema(
+                                this.datasource,
+                                new TimestampSpec(this.timestampColumn, this.timestampFormat, this.timestampMissingValue),
+                                new DimensionsSpec(Collections.emptyList()),
+                                null,
+                                new UniformGranularitySpec(Granularities.HOUR, Granularities.MINUTE, false, null),
+                                null
+                        ),
+                        ioConfig,
+                        tuningConfig
+                ),
+                null
+        );
+    }
+
+    private ParallelIndexIOConfig parallelIndexIOConfig() {
+        List columnss = new ArrayList();
+        CollectionUtils.addAll(columnss,this.getColumns().get(0).split(","));
+        columnss.add(timestampColumn);
+
+        return new ParallelIndexIOConfig(

Review Comment:
   同151行



##########
seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/config/DruidSinkOptions.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.druid.config;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.joda.time.DateTime;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Data
+@AllArgsConstructor
+public class DruidSinkOptions implements Serializable {
+    private  String coordinatorURL;
+    private  String datasource;
+    private  String timestampColumn;
+    private  String timestampFormat;
+    private  String  timestampMissingValue;
+    private List<String> columns;
+    private  int parallelism;
+
+    private static final String DEFAULT_TIMESTAMP_COLUMN = "timestamp";
+    private static final String DEFAULT_TIMESTAMP_FORMAT = "auto";
+    private static final DateTime DEFAULT_TIMESTAMP_MISSING_VALUE = null;
+    private static final int DEFAULT_PARALLELISM = 1;
+
+    public DruidSinkOptions(Config pluginConfig) {
+        this.coordinatorURL = pluginConfig.getString(DruidSinkConfig.COORDINATOR_URL);
+        this.datasource = pluginConfig.getString(DruidSinkConfig.DATASOURCE);
+        this.columns = pluginConfig.getStringList(DruidSinkConfig.COLUMNS);
+        this.timestampColumn = pluginConfig.hasPath(DruidSinkConfig.TIMESTAMP_COLUMN) ? pluginConfig.getString(DruidSinkConfig.TIMESTAMP_COLUMN) : null;
+        this.timestampFormat = pluginConfig.hasPath(DruidSinkConfig.TIMESTAMP_FORMAT) ? pluginConfig.getString(DruidSinkConfig.TIMESTAMP_FORMAT) : null;

Review Comment:
   是否可以加强Config对象, pluginConfig.getString(DruidSinkConfig.TIMESTAMP_COLUMN,null);
   这样谁用都简单



##########
seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/client/DruidInputFormat.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.druid.client;
+
+import lombok.Data;
+import org.apache.seatunnel.api.table.type.*;
+import org.apache.seatunnel.connectors.seatunnel.druid.config.DruidSourceOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.seatunnel.connectors.seatunnel.druid.config.DruidTypeMapper;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.sql.*;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import java.util.TimeZone;
+
+@Data
+public class DruidInputFormat implements Serializable {
+    private static final Logger LOGGER = LoggerFactory.getLogger(DruidInputFormat.class);
+
+    protected transient Connection connection;
+    protected transient PreparedStatement statement;
+    protected transient ResultSet resultSet;
+    protected SeaTunnelRowType rowTypeInfo;
+    protected DruidSourceOptions druidSourceOptions;
+
+    protected static final String COLUMNS_DEFAULT = "*";
+    protected static final String QUERY_TEMPLATE = "SELECT %s FROM %s WHERE 1=1";
+    protected String quarySQL ;
+    protected boolean hasNext;
+
+    public DruidInputFormat(DruidSourceOptions druidSourceOptions) {
+        this.druidSourceOptions = druidSourceOptions;
+        this.rowTypeInfo = initTableField();
+    }
+
+    public ResultSetMetaData getResultSetMetaData() throws SQLException {
+        try {
+            quarySQL = getSQL();
+            connection = DriverManager.getConnection(druidSourceOptions.getURL());
+            statement = connection.prepareStatement(quarySQL, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+            return statement.getMetaData();
+        } catch (SQLException se) {
+            throw new SQLException("ResultSetMetaData() failed." + se.getMessage(), se);
+        }
+    }
+    public void openInputFormat() {
+        try {
+            connection = DriverManager.getConnection(druidSourceOptions.getURL());
+            statement = connection.prepareStatement(quarySQL, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);

Review Comment:
   是否重复执行了 69,70



##########
seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/client/DruidInputFormat.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.druid.client;
+
+import lombok.Data;
+import org.apache.seatunnel.api.table.type.*;
+import org.apache.seatunnel.connectors.seatunnel.druid.config.DruidSourceOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.seatunnel.connectors.seatunnel.druid.config.DruidTypeMapper;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.sql.*;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import java.util.TimeZone;
+
+@Data
+public class DruidInputFormat implements Serializable {
+    private static final Logger LOGGER = LoggerFactory.getLogger(DruidInputFormat.class);
+
+    protected transient Connection connection;
+    protected transient PreparedStatement statement;
+    protected transient ResultSet resultSet;
+    protected SeaTunnelRowType rowTypeInfo;
+    protected DruidSourceOptions druidSourceOptions;
+
+    protected static final String COLUMNS_DEFAULT = "*";
+    protected static final String QUERY_TEMPLATE = "SELECT %s FROM %s WHERE 1=1";
+    protected String quarySQL ;
+    protected boolean hasNext;
+
+    public DruidInputFormat(DruidSourceOptions druidSourceOptions) {
+        this.druidSourceOptions = druidSourceOptions;
+        this.rowTypeInfo = initTableField();
+    }
+
+    public ResultSetMetaData getResultSetMetaData() throws SQLException {
+        try {
+            quarySQL = getSQL();
+            connection = DriverManager.getConnection(druidSourceOptions.getURL());
+            statement = connection.prepareStatement(quarySQL, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+            return statement.getMetaData();

Review Comment:
   是否可以把 60,62独立一个方法。因为与69,70一样。或则与openInputFormat方法进行合并下



##########
seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/client/DruidInputFormat.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.druid.client;
+
+import lombok.Data;
+import org.apache.seatunnel.api.table.type.*;
+import org.apache.seatunnel.connectors.seatunnel.druid.config.DruidSourceOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.seatunnel.connectors.seatunnel.druid.config.DruidTypeMapper;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.sql.*;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import java.util.TimeZone;
+
+@Data
+public class DruidInputFormat implements Serializable {
+    private static final Logger LOGGER = LoggerFactory.getLogger(DruidInputFormat.class);
+
+    protected transient Connection connection;
+    protected transient PreparedStatement statement;
+    protected transient ResultSet resultSet;
+    protected SeaTunnelRowType rowTypeInfo;
+    protected DruidSourceOptions druidSourceOptions;
+
+    protected static final String COLUMNS_DEFAULT = "*";
+    protected static final String QUERY_TEMPLATE = "SELECT %s FROM %s WHERE 1=1";
+    protected String quarySQL ;
+    protected boolean hasNext;
+
+    public DruidInputFormat(DruidSourceOptions druidSourceOptions) {
+        this.druidSourceOptions = druidSourceOptions;
+        this.rowTypeInfo = initTableField();
+    }
+
+    public ResultSetMetaData getResultSetMetaData() throws SQLException {
+        try {
+            quarySQL = getSQL();
+            connection = DriverManager.getConnection(druidSourceOptions.getURL());
+            statement = connection.prepareStatement(quarySQL, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+            return statement.getMetaData();
+        } catch (SQLException se) {
+            throw new SQLException("ResultSetMetaData() failed." + se.getMessage(), se);
+        }
+    }
+    public void openInputFormat() {
+        try {
+            connection = DriverManager.getConnection(druidSourceOptions.getURL());
+            statement = connection.prepareStatement(quarySQL, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+            resultSet = statement.executeQuery();
+            hasNext = resultSet.next();
+        } catch (SQLException se) {
+            throw new IllegalArgumentException("openInputFormat() failed." + se.getMessage(), se);
+        }
+    }
+
+    private String getSQL() throws SQLException {
+        String columns = COLUMNS_DEFAULT;
+        String startTimestamp = druidSourceOptions.getStartTimestamp();
+        String endTimestamp = druidSourceOptions.getEndTimestamp();
+        String dataSource = druidSourceOptions.getDatasource();
+
+        if (druidSourceOptions.getColumns() != null && druidSourceOptions.getColumns().size() > 0) {
+            columns = String.join(",", druidSourceOptions.getColumns());
+        }
+
+        String sql = String.format(QUERY_TEMPLATE, columns, dataSource);
+        if (startTimestamp != null) {
+            sql += " AND __time >=  '" + startTimestamp + "'";
+        }
+        if (endTimestamp != null) {
+            sql += " AND __time <  '" + endTimestamp + "'";
+        }
+        return sql;
+    }
+
+    public void closeInputFormat() {
+        try {
+            if (resultSet != null) {
+                resultSet.close();
+            }
+            if (statement != null) {
+                statement.close();
+            }
+        } catch (SQLException se) {
+            LOGGER.error("DruidInputFormat Statement couldn't be closed", se);
+        } finally {
+            statement = null;
+            resultSet = null;
+            try {
+                if (connection != null) {
+                    connection.close();
+                }
+            } catch (SQLException se) {
+                LOGGER.error("DruidInputFormat Connection couldn't be closed", se);
+            } finally {
+                connection = null;
+            }
+        }
+    }
+
+
+    /**
+     * Checks whether all data has been read.
+     *
+     * @return boolean value indication whether all data has been read.
+     */
+    public boolean reachedEnd() throws IOException {
+        return !hasNext;
+    }
+
+    /**
+     * Convert a row of data to seatunnelRow
+     */
+    public SeaTunnelRow nextRecord() throws IOException {
+        try {
+            if (!hasNext) {
+                return null;
+            }
+            SeaTunnelRow seaTunnelRow = toInternal(resultSet, rowTypeInfo);
+            // update hasNext after we've read the record
+            hasNext = resultSet.next();
+            return seaTunnelRow;
+        } catch (SQLException se) {
+            throw new IOException("Couldn't read data - " + se.getMessage(), se);
+        } catch (NullPointerException npe) {
+            throw new IOException("Couldn't access resultSet", npe);
+        }
+    }
+
+    public SeaTunnelRow toInternal(ResultSet rs, SeaTunnelRowType rowTypeInfo) throws SQLException{

Review Comment:
   方法不应该定义为私有吗?



##########
seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/config/DruidSinkConfig.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.druid.config;
+
+import java.io.Serializable;
+
+public class DruidSinkConfig {

Review Comment:
   方法命名是否应该是常量类,而不是配置类。提交里面好多常量,可以收敛。



##########
seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/client/DruidInputFormat.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.druid.client;
+
+import lombok.Data;
+import org.apache.seatunnel.api.table.type.*;
+import org.apache.seatunnel.connectors.seatunnel.druid.config.DruidSourceOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.seatunnel.connectors.seatunnel.druid.config.DruidTypeMapper;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.sql.*;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import java.util.TimeZone;
+
+@Data
+public class DruidInputFormat implements Serializable {
+    private static final Logger LOGGER = LoggerFactory.getLogger(DruidInputFormat.class);
+
+    protected transient Connection connection;
+    protected transient PreparedStatement statement;
+    protected transient ResultSet resultSet;
+    protected SeaTunnelRowType rowTypeInfo;
+    protected DruidSourceOptions druidSourceOptions;
+
+    protected static final String COLUMNS_DEFAULT = "*";
+    protected static final String QUERY_TEMPLATE = "SELECT %s FROM %s WHERE 1=1";
+    protected String quarySQL ;
+    protected boolean hasNext;
+
+    public DruidInputFormat(DruidSourceOptions druidSourceOptions) {
+        this.druidSourceOptions = druidSourceOptions;
+        this.rowTypeInfo = initTableField();
+    }
+
+    public ResultSetMetaData getResultSetMetaData() throws SQLException {
+        try {
+            quarySQL = getSQL();
+            connection = DriverManager.getConnection(druidSourceOptions.getURL());
+            statement = connection.prepareStatement(quarySQL, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+            return statement.getMetaData();
+        } catch (SQLException se) {
+            throw new SQLException("ResultSetMetaData() failed." + se.getMessage(), se);
+        }
+    }
+    public void openInputFormat() {
+        try {
+            connection = DriverManager.getConnection(druidSourceOptions.getURL());
+            statement = connection.prepareStatement(quarySQL, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+            resultSet = statement.executeQuery();
+            hasNext = resultSet.next();
+        } catch (SQLException se) {
+            throw new IllegalArgumentException("openInputFormat() failed." + se.getMessage(), se);
+        }
+    }
+
+    private String getSQL() throws SQLException {
+        String columns = COLUMNS_DEFAULT;
+        String startTimestamp = druidSourceOptions.getStartTimestamp();
+        String endTimestamp = druidSourceOptions.getEndTimestamp();
+        String dataSource = druidSourceOptions.getDatasource();
+
+        if (druidSourceOptions.getColumns() != null && druidSourceOptions.getColumns().size() > 0) {
+            columns = String.join(",", druidSourceOptions.getColumns());
+        }
+
+        String sql = String.format(QUERY_TEMPLATE, columns, dataSource);
+        if (startTimestamp != null) {
+            sql += " AND __time >=  '" + startTimestamp + "'";
+        }
+        if (endTimestamp != null) {
+            sql += " AND __time <  '" + endTimestamp + "'";
+        }
+        return sql;
+    }
+
+    public void closeInputFormat() {
+        try {

Review Comment:
   是否可以使用try-with解决方案,可以提前释放statement,connent



##########
seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/client/DruidInputFormat.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.druid.client;
+
+import lombok.Data;
+import org.apache.seatunnel.api.table.type.*;
+import org.apache.seatunnel.connectors.seatunnel.druid.config.DruidSourceOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.seatunnel.connectors.seatunnel.druid.config.DruidTypeMapper;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.sql.*;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import java.util.TimeZone;
+
+@Data
+public class DruidInputFormat implements Serializable {
+    private static final Logger LOGGER = LoggerFactory.getLogger(DruidInputFormat.class);
+
+    protected transient Connection connection;
+    protected transient PreparedStatement statement;
+    protected transient ResultSet resultSet;
+    protected SeaTunnelRowType rowTypeInfo;
+    protected DruidSourceOptions druidSourceOptions;
+
+    protected static final String COLUMNS_DEFAULT = "*";
+    protected static final String QUERY_TEMPLATE = "SELECT %s FROM %s WHERE 1=1";
+    protected String quarySQL ;
+    protected boolean hasNext;
+
+    public DruidInputFormat(DruidSourceOptions druidSourceOptions) {
+        this.druidSourceOptions = druidSourceOptions;
+        this.rowTypeInfo = initTableField();
+    }
+
+    public ResultSetMetaData getResultSetMetaData() throws SQLException {
+        try {
+            quarySQL = getSQL();
+            connection = DriverManager.getConnection(druidSourceOptions.getURL());
+            statement = connection.prepareStatement(quarySQL, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+            return statement.getMetaData();
+        } catch (SQLException se) {
+            throw new SQLException("ResultSetMetaData() failed." + se.getMessage(), se);
+        }
+    }
+    public void openInputFormat() {
+        try {
+            connection = DriverManager.getConnection(druidSourceOptions.getURL());
+            statement = connection.prepareStatement(quarySQL, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+            resultSet = statement.executeQuery();
+            hasNext = resultSet.next();
+        } catch (SQLException se) {
+            throw new IllegalArgumentException("openInputFormat() failed." + se.getMessage(), se);
+        }
+    }
+
+    private String getSQL() throws SQLException {
+        String columns = COLUMNS_DEFAULT;
+        String startTimestamp = druidSourceOptions.getStartTimestamp();
+        String endTimestamp = druidSourceOptions.getEndTimestamp();
+        String dataSource = druidSourceOptions.getDatasource();
+
+        if (druidSourceOptions.getColumns() != null && druidSourceOptions.getColumns().size() > 0) {
+            columns = String.join(",", druidSourceOptions.getColumns());
+        }
+
+        String sql = String.format(QUERY_TEMPLATE, columns, dataSource);
+        if (startTimestamp != null) {
+            sql += " AND __time >=  '" + startTimestamp + "'";
+        }
+        if (endTimestamp != null) {
+            sql += " AND __time <  '" + endTimestamp + "'";
+        }
+        return sql;
+    }
+
+    public void closeInputFormat() {
+        try {
+            if (resultSet != null) {
+                resultSet.close();
+            }
+            if (statement != null) {
+                statement.close();
+            }
+        } catch (SQLException se) {
+            LOGGER.error("DruidInputFormat Statement couldn't be closed", se);
+        } finally {
+            statement = null;
+            resultSet = null;
+            try {
+                if (connection != null) {
+                    connection.close();
+                }
+            } catch (SQLException se) {
+                LOGGER.error("DruidInputFormat Connection couldn't be closed", se);
+            } finally {
+                connection = null;
+            }
+        }
+    }
+
+
+    /**
+     * Checks whether all data has been read.
+     *
+     * @return boolean value indication whether all data has been read.
+     */
+    public boolean reachedEnd() throws IOException {
+        return !hasNext;
+    }
+
+    /**
+     * Convert a row of data to seatunnelRow
+     */
+    public SeaTunnelRow nextRecord() throws IOException {
+        try {
+            if (!hasNext) {
+                return null;
+            }
+            SeaTunnelRow seaTunnelRow = toInternal(resultSet, rowTypeInfo);
+            // update hasNext after we've read the record
+            hasNext = resultSet.next();
+            return seaTunnelRow;
+        } catch (SQLException se) {
+            throw new IOException("Couldn't read data - " + se.getMessage(), se);
+        } catch (NullPointerException npe) {
+            throw new IOException("Couldn't access resultSet", npe);
+        }
+    }
+
+    public SeaTunnelRow toInternal(ResultSet rs, SeaTunnelRowType rowTypeInfo) throws SQLException{
+        List<Object> fields = new ArrayList<>();
+        SeaTunnelDataType<?>[] seaTunnelDataTypes = rowTypeInfo.getFieldTypes();
+
+        for (int i = 1; i <= seaTunnelDataTypes.length; i++) {
+            Object seatunnelField;
+            SeaTunnelDataType<?> seaTunnelDataType = seaTunnelDataTypes[i - 1];
+            if (null == rs.getObject(i)) {
+                seatunnelField = null;
+            }
+            else if (BasicType.BOOLEAN_TYPE.equals(seaTunnelDataType)) {
+                seatunnelField = rs.getBoolean(i);
+            } else if (BasicType.BYTE_TYPE.equals(seaTunnelDataType)) {
+                seatunnelField = rs.getByte(i);
+            } else if (BasicType.SHORT_TYPE.equals(seaTunnelDataType)) {
+                seatunnelField = rs.getShort(i);
+            } else if (BasicType.INT_TYPE.equals(seaTunnelDataType)) {
+                seatunnelField = rs.getInt(i);
+            } else if (BasicType.LONG_TYPE.equals(seaTunnelDataType)) {
+                seatunnelField = rs.getLong(i);
+            } else if (BasicType.FLOAT_TYPE.equals(seaTunnelDataType)) {
+                seatunnelField = rs.getFloat(i);
+            } else if (BasicType.DOUBLE_TYPE.equals(seaTunnelDataType)) {
+                seatunnelField = rs.getDouble(i);
+            } else if (BasicType.STRING_TYPE.equals(seaTunnelDataType)) {
+                seatunnelField = rs.getString(i);
+            }else if (LocalTimeType.LOCAL_DATE_TIME_TYPE.equals(seaTunnelDataType)) {
+                Timestamp ts = rs.getTimestamp(i, Calendar.getInstance(TimeZone.getTimeZone("UTC")));
+                LocalDateTime localDateTime = LocalDateTime.ofInstant(ts.toInstant(), ZoneId.of("UTC"));  // good
+                seatunnelField = localDateTime;
+            }else if (LocalTimeType.LOCAL_TIME_TYPE.equals(seaTunnelDataType)) {
+                seatunnelField = rs.getDate(i);

Review Comment:
   Date 与 LocalTime 一样?



##########
seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/client/DruidOutputFormat.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.druid.client;
+
+import com.fasterxml.jackson.datatype.joda.JodaModule;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import lombok.Data;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.druid.data.input.MaxSizeSplitHintSpec;
+import org.apache.druid.data.input.impl.CsvInputFormat;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.InlineInputSource;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig;
+import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec;
+import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
+import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+@Data
+public class DruidOutputFormat implements Serializable {
+    private static final Logger LOGGER = LoggerFactory.getLogger(DruidOutputFormat.class);
+
+    private static final String DEFAULT_TIMESTAMP_COLUMN = "timestamp";
+    private static final String DEFAULT_TIMESTAMP_FORMAT = "auto";
+    private static final DateTime DEFAULT_TIMESTAMP_MISSING_VALUE = new DateTime();
+    public static final String DEFAULT_LINE_DELIMITER = "\n";
+    public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+    private final transient StringBuffer data;
+    private final String coordinatorURL;
+    private final String datasource;
+    private final String timestampColumn;
+    private final String timestampFormat;
+    private final DateTime timestampMissingValue;
+    private  List<String> columns;
+
+    public DruidOutputFormat(String coordinatorURL,
+                             String datasource,
+                             String timestampColumn,
+                             String timestampFormat,
+                             String timestampMissingValue,
+                             List<String> columns
+                             ) {
+        this.data = new StringBuffer();
+        this.coordinatorURL = coordinatorURL;
+        this.datasource = datasource;
+        this.timestampColumn = timestampColumn == null ? DEFAULT_TIMESTAMP_COLUMN : timestampColumn;
+        this.timestampFormat = timestampFormat == null ? DEFAULT_TIMESTAMP_FORMAT : timestampFormat;
+        this.timestampMissingValue = timestampMissingValue == null ? DEFAULT_TIMESTAMP_MISSING_VALUE : DateTimes.of(timestampMissingValue);
+        this.columns = columns;
+    }
+
+    public void write(SeaTunnelRow element) {
+        int fieldIndex = element.getArity();
+        for (int i = 0; i < fieldIndex; i++) {
+            Object v = element.getField(i);
+            if (i != 0) {
+                this.data.append(DEFAULT_FIELD_DELIMITER);
+            }
+            if (v != null) {
+                this.data.append(v);
+            }
+        }
+        this.data.append(DEFAULT_LINE_DELIMITER);
+    }
+
+    public void closeOutputFormat() {
+        try {
+            ParallelIndexIOConfig ioConfig = parallelIndexIOConfig();
+            ParallelIndexTuningConfig tuningConfig = tuningConfig();
+            ParallelIndexSupervisorTask indexTask = parallelIndexSupervisorTask(ioConfig, tuningConfig);
+            ObjectMapper mapper = new ObjectMapper();
+            mapper.registerModule(new JodaModule());
+            mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+            mapper.configure(MapperFeature.AUTO_DETECT_GETTERS, false);
+            mapper.configure(MapperFeature.AUTO_DETECT_FIELDS, false);
+            mapper.configure(MapperFeature.AUTO_DETECT_IS_GETTERS, false);
+            mapper.configure(MapperFeature.AUTO_DETECT_SETTERS, false);
+            mapper.configure(SerializationFeature.INDENT_OUTPUT, false);
+            mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
+            String taskJSON = mapper.writeValueAsString(indexTask);
+            JSONObject jsonObject = JSON.parseObject(taskJSON);
+            jsonObject.remove("id");
+            jsonObject.remove("groupId");
+            jsonObject.remove("resource");
+            JSONObject spec = jsonObject.getJSONObject("spec");
+            spec.remove("tuningConfig");
+            jsonObject.put("spec", spec);
+            taskJSON = jsonObject.toJSONString();
+
+            URL url = new URL(this.coordinatorURL + "druid/indexer/v1/task");
+            HttpURLConnection con = (HttpURLConnection) url.openConnection();
+            con.setRequestMethod("POST");
+            con.setRequestProperty("Content-Type", "application/json");
+            con.setRequestProperty("Accept", "application/json, text/plain, */*");
+            con.setDoOutput(true);
+            try (OutputStream os = con.getOutputStream()) {
+                byte[] input = taskJSON.getBytes(StandardCharsets.UTF_8);
+                os.write(input, 0, input.length);
+            }
+            try (BufferedReader br = new BufferedReader(new InputStreamReader(con.getInputStream(), StandardCharsets.UTF_8))) {
+                StringBuilder response = new StringBuilder();
+                String responseLine;
+                while ((responseLine = br.readLine()) != null) {
+                    response.append(responseLine.trim());
+                }
+                LOGGER.info("Druid write task has been sent, and the response is {}", response.toString());
+            }
+        }catch (IOException e){
+            e.printStackTrace();

Review Comment:
   P0,故障。请thorw异常,或则把异常打印出来



##########
seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/config/DruidSourceConfig.java:
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.druid.config;
+
+import java.io.Serializable;
+
+public class DruidSourceConfig {
+    public static final String URL = "url";

Review Comment:
   是否可以收敛成为常量类



##########
seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/client/DruidOutputFormat.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.druid.client;
+
+import com.fasterxml.jackson.datatype.joda.JodaModule;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import lombok.Data;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.druid.data.input.MaxSizeSplitHintSpec;
+import org.apache.druid.data.input.impl.CsvInputFormat;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.InlineInputSource;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig;
+import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec;
+import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
+import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+@Data
+public class DruidOutputFormat implements Serializable {
+    private static final Logger LOGGER = LoggerFactory.getLogger(DruidOutputFormat.class);
+
+    private static final String DEFAULT_TIMESTAMP_COLUMN = "timestamp";
+    private static final String DEFAULT_TIMESTAMP_FORMAT = "auto";
+    private static final DateTime DEFAULT_TIMESTAMP_MISSING_VALUE = new DateTime();
+    public static final String DEFAULT_LINE_DELIMITER = "\n";
+    public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+    private final transient StringBuffer data;
+    private final String coordinatorURL;
+    private final String datasource;
+    private final String timestampColumn;
+    private final String timestampFormat;
+    private final DateTime timestampMissingValue;
+    private  List<String> columns;
+
+    public DruidOutputFormat(String coordinatorURL,
+                             String datasource,
+                             String timestampColumn,
+                             String timestampFormat,
+                             String timestampMissingValue,
+                             List<String> columns
+                             ) {
+        this.data = new StringBuffer();
+        this.coordinatorURL = coordinatorURL;
+        this.datasource = datasource;
+        this.timestampColumn = timestampColumn == null ? DEFAULT_TIMESTAMP_COLUMN : timestampColumn;

Review Comment:
   使用Object.isNull()是否更好



##########
seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/config/DruidSinkOptions.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.druid.config;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.joda.time.DateTime;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Data
+@AllArgsConstructor
+public class DruidSinkOptions implements Serializable {
+    private  String coordinatorURL;
+    private  String datasource;
+    private  String timestampColumn;
+    private  String timestampFormat;
+    private  String  timestampMissingValue;
+    private List<String> columns;
+    private  int parallelism;
+
+    private static final String DEFAULT_TIMESTAMP_COLUMN = "timestamp";

Review Comment:
   是否可以收敛



##########
seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/config/DruidTypeMapper.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.druid.config;
+
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class DruidTypeMapper {

Review Comment:
   是否可以收敛,同时在api里面定义



##########
seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/config/DruidSourceOptions.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.druid.config;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Optional;
+
+@Data
+@AllArgsConstructor
+public class DruidSourceOptions implements Serializable {
+    private String URL;

Review Comment:
   是否可以变量小写,github编辑器都提示变量名是类



##########
seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/sink/DruidSink.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.druid.sink;
+
+import com.google.auto.service.AutoService;
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.druid.config.DruidSinkOptions;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import java.io.IOException;
+
+@AutoService(SeaTunnelSink.class)
+public class DruidSink  extends AbstractSimpleSink<SeaTunnelRow, Void> {
+
+    private Config config;

Review Comment:
   config变量,是否可以放到AbstratSimpleSink里面



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org