You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/05/26 03:11:33 UTC

[incubator-seatunnel] branch dev updated: [Feature] [clickhouse| ClickhouseFile support ReplicatedMergeTree (#1911)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 5057448c [Feature] [clickhouse| ClickhouseFile support ReplicatedMergeTree (#1911)
5057448c is described below

commit 5057448c96293bfe39f472dc89fa4eb1da2190fd
Author: robin <97...@users.noreply.github.com>
AuthorDate: Thu May 26 11:11:28 2022 +0800

    [Feature] [clickhouse| ClickhouseFile support ReplicatedMergeTree (#1911)
    
    * ClickhouseFile support ReplicatedMergeTree
    
    * format the dependency
---
 .../seatunnel-connector-flink-clickhouse/pom.xml   |  6 +++
 .../clickhouse/sink/client/ClickhouseClient.java   | 25 +++++++++
 .../flink/clickhouse/sink/ClickhouseFileTest.java  | 60 ++++++++++++++++++++++
 .../seatunnel-connector-spark-clickhouse/pom.xml   |  7 +++
 .../seatunnel/spark/clickhouse/sink/Table.scala    | 20 +++++++-
 .../spark/clickhouse/sink/ClickhouseFileTest.scala | 44 ++++++++++++++++
 6 files changed, 161 insertions(+), 1 deletion(-)

diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/pom.xml b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/pom.xml
index bf198afd..44034e57 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/pom.xml
@@ -59,6 +59,12 @@
             <artifactId>sshd-scp</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/client/ClickhouseClient.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/client/ClickhouseClient.java
index 2e6d121d..012e1403 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/client/ClickhouseClient.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/client/ClickhouseClient.java
@@ -196,6 +196,15 @@ public class ClickhouseClient {
             DistributedEngine distributedEngine = null;
             if ("Distributed".equals(engine)) {
                 distributedEngine = getClickhouseDistributedTable(connection, database, table);
+                String localTableSQL = String.format("select engine,create_table_query from system.tables where database = '%s' and name = '%s'",
+                        distributedEngine.getDatabase(), distributedEngine.getTable());
+                ResultSet rs = statement.executeQuery(localTableSQL);
+                if (!rs.next()) {
+                    throw new RuntimeException("Cannot get table from clickhouse, resultSet is empty");
+                }
+                String localEngine = rs.getString(1);
+                String createLocalTableDDL = rs.getString(2);
+                createTableDDL = localizationEngine(localEngine, createLocalTableDDL);
             }
             return new ClickhouseTable(
                 database,
@@ -212,4 +221,20 @@ public class ClickhouseClient {
 
     }
 
+    /**
+     * Localization the engine in clickhouse local table's createTableDDL to support specific engine.
+     * For example: change ReplicatedMergeTree to MergeTree.
+     * @param engine original engine of clickhouse local table
+     * @param ddl createTableDDL of clickhouse local table
+     * @return createTableDDL of clickhouse local table which can support specific engine
+     * TODO: support more engine
+     */
+    public String localizationEngine(String engine, String ddl) {
+        if ("ReplicatedMergeTree".equalsIgnoreCase(engine)) {
+            return ddl.replaceAll("ReplicatedMergeTree(\\([^\\)]*\\))", "MergeTree()");
+        } else {
+            return ddl;
+        }
+    }
+
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/test/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseFileTest.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/test/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseFileTest.java
new file mode 100644
index 00000000..56768860
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/test/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseFileTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.clickhouse.sink;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.seatunnel.flink.clickhouse.sink.client.ClickhouseClient;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ClickhouseFileTest {
+
+    @Test
+    public void testLocalizationEngine() {
+        String originalEngine = "ReplicatedMergeTree";
+        String replicatedMergeTreeDDL = "CREATE TABLE default.replicatedMergeTreeTable (`shard_key` Int32, `order_id` String, " +
+                "`user_name` String, `user_id` String, `order_time` DateTime, `bi_dt` String)" +
+                " ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/default/replicatedMergeTreeTable', " +
+                "'{replica}') PARTITION BY bi_dt ORDER BY (order_time, user_id)" +
+                " SETTINGS index_granularity = 8192";
+        String localizationDDL = "CREATE TABLE default.replicatedMergeTreeTable (`shard_key` Int32, `order_id` String, " +
+                "`user_name` String, `user_id` String, `order_time` DateTime, `bi_dt` String)" +
+                " ENGINE = MergeTree() PARTITION BY bi_dt ORDER BY (order_time, user_id)" +
+                " SETTINGS index_granularity = 8192";
+        Config config = getConfig();
+        ClickhouseClient client = new ClickhouseClient(config);
+        assertEquals(localizationDDL, client.localizationEngine(originalEngine, replicatedMergeTreeDDL));
+    }
+
+    public Config getConfig() {
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put("username", "test");
+        configMap.put("password", "test");
+        configMap.put("host", "localhost:8080");
+        configMap.put("database", "test");
+        return ConfigFactory.parseMap(configMap);
+    }
+
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/pom.xml b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/pom.xml
index d255ab9e..f9560d02 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/pom.xml
@@ -56,6 +56,13 @@
             <artifactId>sshd-scp</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.scalatest</groupId>
+            <artifactId>scalatest_${scala.binary.version}</artifactId>
+            <version>3.2.3</version>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Table.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Table.scala
index 89f0a4d8..3dcbade4 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Table.scala
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Table.scala
@@ -38,6 +38,7 @@ class Table(val name: String, val database: String, val engine: String, val crea
   var tableSchema: util.LinkedHashMap[String, String] = new util.LinkedHashMap[String, String]()
   var shardKeyType: String = _
   var localCreateTableDDL: String = createTableDDL
+  var localTableEngine: String = _
 
   def initTableInfo(hosts: List[HostAndPort], conn: ClickHouseConnectionImpl): Unit = {
     if (shards.size() == 0) {
@@ -51,7 +52,9 @@ class Table(val name: String, val database: String, val engine: String, val crea
           weight += elem.shardWeight
         }
         this.shardWeightCount = weight
-        this.localCreateTableDDL = getClickhouseTableInfo(conn, localTable.database, localTable.table)._2.createTableDDL
+        val localTableInfo = getClickhouseTableInfo(conn, localTable.database, localTable.table)._2
+        this.localTableEngine = localTableInfo.engine
+        this.localCreateTableDDL = localizationEngine(this.localTableEngine, localTableInfo.createTableDDL)
       } else {
         this.shards.put(0, Shard(1, 1, 1, hosts.head.host, hosts.head.host, hosts.head.port, database))
       }
@@ -102,4 +105,19 @@ class Table(val name: String, val database: String, val engine: String, val crea
       CheckResult.success()
     }
   }
+
+  /**
+   * Localization the engine in clickhouse local table's createTableDDL to support specific engine.
+   * For example: change ReplicatedMergeTree to MergeTree.
+   * @param engine original engine of clickhouse local table
+   * @param ddl createTableDDL of clickhouse local table
+   * @return createTableDDL of clickhouse local table which can support specific engine
+   * TODO: support more engine
+   */
+  def localizationEngine(engine: String, ddl: String): String = {
+    if ("ReplicatedMergeTree".equalsIgnoreCase(engine)) {
+      ddl.replaceAll("""ReplicatedMergeTree(\([^\)]*\))""", "MergeTree()")
+    } else ddl
+  }
+
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/test/scala/org/apache/seatunnel/spark/clickhouse/sink/ClickhouseFileTest.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/test/scala/org/apache/seatunnel/spark/clickhouse/sink/ClickhouseFileTest.scala
new file mode 100644
index 00000000..8037dc57
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/test/scala/org/apache/seatunnel/spark/clickhouse/sink/ClickhouseFileTest.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.spark.clickhouse.sink
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
+
+import org.scalatest.funsuite.AnyFunSuite
+
+class ClickhouseFileTest extends AnyFunSuite {
+
+  test("test for localizationEngine") {
+    var originalEngine: String = "ReplicatedMergeTree";
+    var replicatedMergeTreeDDL: String = "CREATE TABLE default.replicatedMergeTreeTable (`shard_key` Int32, `order_id` String, " +
+      "`user_name` String, `user_id` String, `order_time` DateTime, `bi_dt` String)" +
+      " ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/default/replicatedMergeTreeTable', " +
+      "'{replica}') PARTITION BY bi_dt ORDER BY (order_time, user_id)" +
+      " SETTINGS index_granularity = 8192";
+    var localizationDDL = "CREATE TABLE default.replicatedMergeTreeTable (`shard_key` Int32, `order_id` String, " +
+      "`user_name` String, `user_id` String, `order_time` DateTime, `bi_dt` String)" +
+      " ENGINE = MergeTree() PARTITION BY bi_dt ORDER BY (order_time, user_id)" +
+      " SETTINGS index_granularity = 8192";
+    val table: Table = new Table(name = "test", database = "test",
+      engine = originalEngine, createTableDDL = replicatedMergeTreeDDL,
+      engineFull = "replicatedMergeTree", dataPaths = List[String]())
+    assert(localizationDDL.equals(table.localizationEngine(originalEngine, replicatedMergeTreeDDL)))
+  }
+
+}