You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by "TaoZex (via GitHub)" <gi...@apache.org> on 2024/02/10 16:17:31 UTC

[PR] [Connector]Add druid sink connector [seatunnel]

TaoZex opened a new pull request, #6346:
URL: https://github.com/apache/seatunnel/pull/6346

   <!--
   
   Thank you for contributing to SeaTunnel! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [GITHUB issue](https://github.com/apache/seatunnel/issues).
   
     - Name the pull request in the form "[Feature] [component] Title of the pull request", where *Feature* can be replaced by `Hotfix`, `Bug`, etc.
   
     - Minor fixes should be named following this pattern: `[hotfix] [docs] Fix typo in README.md doc`.
   
   -->
   
   ### Purpose of this pull request
   
   <!-- Describe the purpose of this pull request. For example: This pull request adds checkstyle plugin.-->
   
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released SeaTunnel versions or within the unreleased branches such as dev.
   If no, write 'No'.
   If you are adding/modifying connector documents, please follow our new specifications: https://github.com/apache/seatunnel/issues/4544.
   -->
   
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If you are adding E2E test cases, maybe refer to https://github.com/apache/seatunnel/blob/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql.conf, here is a good example.
   -->
   
   
   ### Check list
   
   * [ ] If any new Jar binary package adding in your PR, please add License Notice according
     [New License Guide](https://github.com/apache/seatunnel/blob/dev/docs/en/contribution/new-license.md)
   * [ ] If necessary, please update the documentation to describe the new feature. https://github.com/apache/seatunnel/tree/dev/docs
   * [ ] If you are contributing the connector code, please check that the following files are updated:
     1. Update change log that in connector document. For more details you can refer to [connector-v2](https://github.com/apache/seatunnel/tree/dev/docs/en/connector-v2)
     2. Update [plugin-mapping.properties](https://github.com/apache/seatunnel/blob/dev/plugin-mapping.properties) and add new connector information in it
     3. Update the pom file of [seatunnel-dist](https://github.com/apache/seatunnel/blob/dev/seatunnel-dist/pom.xml)
   * [ ] Update the [`release-note`](https://github.com/apache/seatunnel/blob/dev/release-note.md).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Connector]Add druid sink connector [seatunnel]

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #6346:
URL: https://github.com/apache/seatunnel/pull/6346#discussion_r1560466033


##########
seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSink.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.druid.sink;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.druid.exception.DruidConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+
+import com.google.auto.service.AutoService;
+
+import java.io.IOException;
+
+import static org.apache.seatunnel.connectors.druid.config.DruidConfig.BATCH_SIZE;
+import static org.apache.seatunnel.connectors.druid.config.DruidConfig.BATCH_SIZE_DEFAULT;
+import static org.apache.seatunnel.connectors.druid.config.DruidConfig.COORDINATOR_URL;
+import static org.apache.seatunnel.connectors.druid.config.DruidConfig.DATASOURCE;
+
+@AutoService(SeaTunnelSink.class)

Review Comment:
   Please use TableSinkFactory to create `DruidSink` instand of SPI. Refer https://github.com/apache/seatunnel/pull/5901/files#diff-a2ab7919885ab0faaa78aa34aba3221b2e3a66ea51e82bc8a77ad76a9ac9396bR38



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Connector]Add druid sink connector [seatunnel]

Posted by "TaoZex (via GitHub)" <gi...@apache.org>.
TaoZex commented on PR #6346:
URL: https://github.com/apache/seatunnel/pull/6346#issuecomment-2048811075

   PTAL @Hisoka-X @EricJoy2048 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Connector]Add druid sink connector [seatunnel]

Posted by "TaoZex (via GitHub)" <gi...@apache.org>.
TaoZex commented on code in PR #6346:
URL: https://github.com/apache/seatunnel/pull/6346#discussion_r1560350408


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/java/org/apache/seatunnel/e2e/connector/druid/DruidIT.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.druid;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.DockerComposeContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+
+import java.io.File;
+import java.time.Duration;
+
+public class DruidIT extends TestSuiteBase implements TestResource {
+
+    private static final String DRUID_SERVICE_NAME = "router";
+    private static final int DRUID_SERVICE_PORT = 8888;
+    private DockerComposeContainer environment;
+
+    @BeforeAll
+    @Override
+    public void startUp() throws Exception {
+        environment =
+                new DockerComposeContainer(new File("src/test/resources/docker-compose.yml"))
+                        .withExposedService(
+                                DRUID_SERVICE_NAME,
+                                DRUID_SERVICE_PORT,
+                                Wait.forListeningPort()
+                                        .withStartupTimeout(Duration.ofSeconds(180)));
+        environment.start();
+    }
+
+    @AfterAll
+    @Override
+    public void tearDown() throws Exception {
+        environment.close();
+    }
+
+    @TestTemplate
+    @DisabledOnContainer(

Review Comment:
   Thanks. I've fixed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Connector]Add druid sink connector [seatunnel]

Posted by "TaoZex (via GitHub)" <gi...@apache.org>.
TaoZex commented on code in PR #6346:
URL: https://github.com/apache/seatunnel/pull/6346#discussion_r1560980964


##########
seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSink.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.druid.sink;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.druid.exception.DruidConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+
+import com.google.auto.service.AutoService;
+
+import java.io.IOException;
+
+import static org.apache.seatunnel.connectors.druid.config.DruidConfig.BATCH_SIZE;
+import static org.apache.seatunnel.connectors.druid.config.DruidConfig.BATCH_SIZE_DEFAULT;
+import static org.apache.seatunnel.connectors.druid.config.DruidConfig.COORDINATOR_URL;
+import static org.apache.seatunnel.connectors.druid.config.DruidConfig.DATASOURCE;
+
+@AutoService(SeaTunnelSink.class)

Review Comment:
   Thanks for the advice. I've already fixed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Connector]Add druid sink connector [seatunnel]

Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on code in PR #6346:
URL: https://github.com/apache/seatunnel/pull/6346#discussion_r1570255083


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/java/org/apache/seatunnel/e2e/connector/druid/DruidIT.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.druid;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.DockerComposeContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+
+import java.io.File;
+import java.time.Duration;
+
+public class DruidIT extends TestSuiteBase implements TestResource {
+
+    private static final String DRUID_SERVICE_NAME = "router";
+    private static final int DRUID_SERVICE_PORT = 8888;
+    private DockerComposeContainer environment;
+
+    @BeforeAll
+    @Override
+    public void startUp() throws Exception {
+        environment =
+                new DockerComposeContainer(new File("src/test/resources/docker-compose.yml"))
+                        .withExposedService(
+                                DRUID_SERVICE_NAME,
+                                DRUID_SERVICE_PORT,
+                                Wait.forListeningPort()
+                                        .withStartupTimeout(Duration.ofSeconds(180)));
+        environment.start();
+    }
+
+    @AfterAll
+    @Override
+    public void tearDown() throws Exception {
+        environment.close();
+    }
+
+    @TestTemplate
+    public void testDruidSink(TestContainer container) throws Exception {
+        Container.ExecResult execResult = container.executeJob("/fakesource_to_druid.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());

Review Comment:
   Please check druid data



##########
docs/en/connector-v2/sink/Druid.md:
##########
@@ -0,0 +1,52 @@
+# Druid
+
+> Druid sink connector
+
+## Description
+
+Write data to Druid
+
+## Key features
+
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+
+## Options
+
+|      name      |  type  | required | default value |
+|----------------|--------|----------|---------------|
+| coordinatorUrl | string | yes      | -             |
+| datasource     | string | yes      | -             |
+| batchSize      | int    | no       | 1024          |
+| common-options |        | no       | -             |
+
+### coordinatorUrl [string]

Review Comment:
   Please add datatype mapping



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/resources/fakesource_to_druid.conf:
##########
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    result_table_name = "fake"
+    row.num = 1000
+    schema = {
+      fields {
+        name = "string"
+        age = "int"

Review Comment:
   Please test all datatypes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Connector]Add druid sink connector [seatunnel]

Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on PR #6346:
URL: https://github.com/apache/seatunnel/pull/6346#issuecomment-2063321472

   @TaoZex 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org