You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/05/26 03:11:33 UTC
[incubator-seatunnel] branch dev updated: [Feature] [clickhouse| ClickhouseFile support ReplicatedMergeTree (#1911)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 5057448c [Feature] [clickhouse| ClickhouseFile support ReplicatedMergeTree (#1911)
5057448c is described below
commit 5057448c96293bfe39f472dc89fa4eb1da2190fd
Author: robin <97...@users.noreply.github.com>
AuthorDate: Thu May 26 11:11:28 2022 +0800
[Feature] [clickhouse| ClickhouseFile support ReplicatedMergeTree (#1911)
* ClickhouseFile support ReplicatedMergeTree
* format the dependency
---
.../seatunnel-connector-flink-clickhouse/pom.xml | 6 +++
.../clickhouse/sink/client/ClickhouseClient.java | 25 +++++++++
.../flink/clickhouse/sink/ClickhouseFileTest.java | 60 ++++++++++++++++++++++
.../seatunnel-connector-spark-clickhouse/pom.xml | 7 +++
.../seatunnel/spark/clickhouse/sink/Table.scala | 20 +++++++-
.../spark/clickhouse/sink/ClickhouseFileTest.scala | 44 ++++++++++++++++
6 files changed, 161 insertions(+), 1 deletion(-)
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/pom.xml b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/pom.xml
index bf198afd..44034e57 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/pom.xml
@@ -59,6 +59,12 @@
<artifactId>sshd-scp</artifactId>
</dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
</project>
\ No newline at end of file
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/client/ClickhouseClient.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/client/ClickhouseClient.java
index 2e6d121d..012e1403 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/client/ClickhouseClient.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/client/ClickhouseClient.java
@@ -196,6 +196,15 @@ public class ClickhouseClient {
DistributedEngine distributedEngine = null;
if ("Distributed".equals(engine)) {
distributedEngine = getClickhouseDistributedTable(connection, database, table);
+ String localTableSQL = String.format("select engine,create_table_query from system.tables where database = '%s' and name = '%s'",
+ distributedEngine.getDatabase(), distributedEngine.getTable());
+ ResultSet rs = statement.executeQuery(localTableSQL);
+ if (!rs.next()) {
+ throw new RuntimeException("Cannot get table from clickhouse, resultSet is empty");
+ }
+ String localEngine = rs.getString(1);
+ String createLocalTableDDL = rs.getString(2);
+ createTableDDL = localizationEngine(localEngine, createLocalTableDDL);
}
return new ClickhouseTable(
database,
@@ -212,4 +221,20 @@ public class ClickhouseClient {
}
+ /**
+ * Localization the engine in clickhouse local table's createTableDDL to support specific engine.
+ * For example: change ReplicatedMergeTree to MergeTree.
+ * @param engine original engine of clickhouse local table
+ * @param ddl createTableDDL of clickhouse local table
+ * @return createTableDDL of clickhouse local table which can support specific engine
+ * TODO: support more engine
+ */
+ public String localizationEngine(String engine, String ddl) {
+ if ("ReplicatedMergeTree".equalsIgnoreCase(engine)) {
+ return ddl.replaceAll("ReplicatedMergeTree(\\([^\\)]*\\))", "MergeTree()");
+ } else {
+ return ddl;
+ }
+ }
+
}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/test/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseFileTest.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/test/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseFileTest.java
new file mode 100644
index 00000000..56768860
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/test/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseFileTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.clickhouse.sink;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.seatunnel.flink.clickhouse.sink.client.ClickhouseClient;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ClickhouseFileTest {
+
+ @Test
+ public void testLocalizationEngine() {
+ String originalEngine = "ReplicatedMergeTree";
+ String replicatedMergeTreeDDL = "CREATE TABLE default.replicatedMergeTreeTable (`shard_key` Int32, `order_id` String, " +
+ "`user_name` String, `user_id` String, `order_time` DateTime, `bi_dt` String)" +
+ " ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/default/replicatedMergeTreeTable', " +
+ "'{replica}') PARTITION BY bi_dt ORDER BY (order_time, user_id)" +
+ " SETTINGS index_granularity = 8192";
+ String localizationDDL = "CREATE TABLE default.replicatedMergeTreeTable (`shard_key` Int32, `order_id` String, " +
+ "`user_name` String, `user_id` String, `order_time` DateTime, `bi_dt` String)" +
+ " ENGINE = MergeTree() PARTITION BY bi_dt ORDER BY (order_time, user_id)" +
+ " SETTINGS index_granularity = 8192";
+ Config config = getConfig();
+ ClickhouseClient client = new ClickhouseClient(config);
+ assertEquals(localizationDDL, client.localizationEngine(originalEngine, replicatedMergeTreeDDL));
+ }
+
+ public Config getConfig() {
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put("username", "test");
+ configMap.put("password", "test");
+ configMap.put("host", "localhost:8080");
+ configMap.put("database", "test");
+ return ConfigFactory.parseMap(configMap);
+ }
+
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/pom.xml b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/pom.xml
index d255ab9e..f9560d02 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/pom.xml
@@ -56,6 +56,13 @@
<artifactId>sshd-scp</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.binary.version}</artifactId>
+ <version>3.2.3</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
</project>
\ No newline at end of file
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Table.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Table.scala
index 89f0a4d8..3dcbade4 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Table.scala
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Table.scala
@@ -38,6 +38,7 @@ class Table(val name: String, val database: String, val engine: String, val crea
var tableSchema: util.LinkedHashMap[String, String] = new util.LinkedHashMap[String, String]()
var shardKeyType: String = _
var localCreateTableDDL: String = createTableDDL
+ var localTableEngine: String = _
def initTableInfo(hosts: List[HostAndPort], conn: ClickHouseConnectionImpl): Unit = {
if (shards.size() == 0) {
@@ -51,7 +52,9 @@ class Table(val name: String, val database: String, val engine: String, val crea
weight += elem.shardWeight
}
this.shardWeightCount = weight
- this.localCreateTableDDL = getClickhouseTableInfo(conn, localTable.database, localTable.table)._2.createTableDDL
+ val localTableInfo = getClickhouseTableInfo(conn, localTable.database, localTable.table)._2
+ this.localTableEngine = localTableInfo.engine
+ this.localCreateTableDDL = localizationEngine(this.localTableEngine, localTableInfo.createTableDDL)
} else {
this.shards.put(0, Shard(1, 1, 1, hosts.head.host, hosts.head.host, hosts.head.port, database))
}
@@ -102,4 +105,19 @@ class Table(val name: String, val database: String, val engine: String, val crea
CheckResult.success()
}
}
+
+ /**
+ * Localization the engine in clickhouse local table's createTableDDL to support specific engine.
+ * For example: change ReplicatedMergeTree to MergeTree.
+ * @param engine original engine of clickhouse local table
+ * @param ddl createTableDDL of clickhouse local table
+ * @return createTableDDL of clickhouse local table which can support specific engine
+ * TODO: support more engine
+ */
+ def localizationEngine(engine: String, ddl: String): String = {
+ if ("ReplicatedMergeTree".equalsIgnoreCase(engine)) {
+ ddl.replaceAll("""ReplicatedMergeTree(\([^\)]*\))""", "MergeTree()")
+ } else ddl
+ }
+
}
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/test/scala/org/apache/seatunnel/spark/clickhouse/sink/ClickhouseFileTest.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/test/scala/org/apache/seatunnel/spark/clickhouse/sink/ClickhouseFileTest.scala
new file mode 100644
index 00000000..8037dc57
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/test/scala/org/apache/seatunnel/spark/clickhouse/sink/ClickhouseFileTest.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.spark.clickhouse.sink
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
+
+import org.scalatest.funsuite.AnyFunSuite
+
+class ClickhouseFileTest extends AnyFunSuite {
+
+ test("test for localizationEngine") {
+ var originalEngine: String = "ReplicatedMergeTree";
+ var replicatedMergeTreeDDL: String = "CREATE TABLE default.replicatedMergeTreeTable (`shard_key` Int32, `order_id` String, " +
+ "`user_name` String, `user_id` String, `order_time` DateTime, `bi_dt` String)" +
+ " ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/default/replicatedMergeTreeTable', " +
+ "'{replica}') PARTITION BY bi_dt ORDER BY (order_time, user_id)" +
+ " SETTINGS index_granularity = 8192";
+ var localizationDDL = "CREATE TABLE default.replicatedMergeTreeTable (`shard_key` Int32, `order_id` String, " +
+ "`user_name` String, `user_id` String, `order_time` DateTime, `bi_dt` String)" +
+ " ENGINE = MergeTree() PARTITION BY bi_dt ORDER BY (order_time, user_id)" +
+ " SETTINGS index_granularity = 8192";
+ val table: Table = new Table(name = "test", database = "test",
+ engine = originalEngine, createTableDDL = replicatedMergeTreeDDL,
+ engineFull = "replicatedMergeTree", dataPaths = List[String]())
+ assert(localizationDDL.equals(table.localizationEngine(originalEngine, replicatedMergeTreeDDL)))
+ }
+
+}