You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/06/29 03:54:00 UTC
[incubator-seatunnel] branch dev updated: Support Rsync to transfer clickhouse data file (#2074)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 858c5a853 Support Rsync to transfer clickhouse data file (#2074)
858c5a853 is described below
commit 858c5a85366f86abd477732ce59e329021b1c0b2
Author: wangtao <em...@foxmail.com>
AuthorDate: Wed Jun 29 11:53:54 2022 +0800
Support Rsync to transfer clickhouse data file (#2074)
* Support Rsync to transfer clickhouse data file
* replace rsync4j with buildin package scala.sys.process
* resolve scalastyle problem
---
.../spark/clickhouse/sink/ClickhouseFile.scala | 31 ++++++-----
.../sink/filetransfer/RsyncFileTransfer.scala | 65 ++++++++++++++++++++--
2 files changed, 77 insertions(+), 19 deletions(-)
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/ClickhouseFile.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/ClickhouseFile.scala
index 095b318a1..c7f37bffc 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/ClickhouseFile.scala
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/ClickhouseFile.scala
@@ -29,8 +29,7 @@ import org.apache.seatunnel.spark.batch.SparkBatchSink
import org.apache.seatunnel.spark.clickhouse.Config.{CLICKHOUSE_LOCAL_PATH, COPY_METHOD, DATABASE, FIELDS, HOST, NODE_ADDRESS, NODE_FREE_PASSWORD, NODE_PASS, PASSWORD, SHARDING_KEY, TABLE, TMP_BATCH_CACHE_LINE, USERNAME}
import org.apache.seatunnel.spark.clickhouse.sink.Clickhouse._
import org.apache.seatunnel.spark.clickhouse.sink.ClickhouseFile.{CLICKHOUSE_FILE_PREFIX, LOGGER, UUID_LENGTH, getClickhouseTableInfo}
-import org.apache.seatunnel.spark.clickhouse.sink.Table
-import org.apache.seatunnel.spark.clickhouse.sink.filetransfer.{FileTransfer, ScpFileTransfer}
+import org.apache.seatunnel.spark.clickhouse.sink.filetransfer.{FileTransfer, RsyncFileTransfer, ScpFileTransfer}
import org.apache.seatunnel.spark.clickhouse.sink.filetransfer.TransferMethod.{RSYNC, SCP, TransferMethod, getCopyMethod}
import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
import org.apache.spark.sql.{Dataset, Encoders, Row}
@@ -165,21 +164,25 @@ class ClickhouseFile extends SparkBatchSink {
private def moveFileToServer(shard: Shard, paths: List[String]): Unit = {
var fileTransfer: FileTransfer = null
- if (this.copyFileMethod == SCP) {
- var scpFileTransfer: ScpFileTransfer = null
- if (nodePass.contains(shard.hostAddress)) {
- scpFileTransfer = new ScpFileTransfer(shard.hostAddress, nodePass(shard.hostAddress))
- } else {
- scpFileTransfer = new ScpFileTransfer(shard.hostAddress)
+ this.copyFileMethod match {
+ case SCP => {
+ if (freePass || !nodePass.contains(shard.hostAddress)) {
+ fileTransfer = new ScpFileTransfer(shard.hostAddress)
+ } else {
+ fileTransfer = new ScpFileTransfer(shard.hostAddress, nodePass(shard.hostAddress))
+ }
}
- scpFileTransfer.init()
- fileTransfer = scpFileTransfer
- } else if (this.copyFileMethod == RSYNC) {
- throw new UnsupportedOperationException(s"not support copy file method: '$copyFileMethod' yet")
- } else {
- throw new UnsupportedOperationException(s"unknown copy file method: '$copyFileMethod', please use " +
+ case RSYNC => {
+ if (freePass || !nodePass.contains(shard.hostAddress)) {
+ fileTransfer = new RsyncFileTransfer(shard.hostAddress)
+ } else {
+ fileTransfer = new RsyncFileTransfer(shard.hostAddress, nodePass(shard.hostAddress))
+ }
+ }
+ case _ => throw new UnsupportedOperationException(s"unknown copy file method: '$copyFileMethod', please use " +
s"scp/rsync instead")
}
+ fileTransfer.init()
fileTransfer.transferAndChown(paths, s"${this.table.getLocalDataPath(shard).head}detached/")
fileTransfer.close()
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/filetransfer/RsyncFileTransfer.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/filetransfer/RsyncFileTransfer.scala
index 57fad6bea..1618683e8 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/filetransfer/RsyncFileTransfer.scala
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/filetransfer/RsyncFileTransfer.scala
@@ -17,21 +17,76 @@
package org.apache.seatunnel.spark.clickhouse.sink.filetransfer
-object RsyncFileTransfer extends FileTransfer {
+import org.apache.sshd.client.SshClient
+import org.apache.sshd.client.session.ClientSession
+import org.slf4j.LoggerFactory
+
+import scala.collection.mutable
+import scala.sys.process.Process
+
+class RsyncFileTransfer(host: String) extends FileTransfer {
+
+ private val LOGGER = LoggerFactory.getLogger(classOf[RsyncFileTransfer])
+ var password: String = _
+
+ def this(host: String, password: String) {
+ this(host)
+ this.password = password
+ }
+
+ private var session: ClientSession = _
+ private var client: SshClient = _
override def transferAndChown(sourcePath: String, targetPath: String): Unit = {
- throw new UnsupportedOperationException("not support rsync file transfer yet")
+
+ try {
+ // we use sshpass to support non-interactive password authentication
+ val sshParameter = if (password!=null) s"sshpass -p $password ssh -o StrictHostKeyChecking=no -p 22" else "ssh -o StrictHostKeyChecking=no -p 22"
+ val exec = mutable.ListBuffer[String]()
+ exec.append("rsync")
+ // recursive
+ exec.append("-r")
+ // compress during transfer file with -z
+ exec.append("-z")
+ // use ssh protocol with -e
+ exec.append("-e")
+ exec.append(sshParameter)
+ exec.append(sourcePath)
+ exec.append(s"root@$host:$targetPath")
+ val command = Process(exec)
+ LOGGER.info(command.lineStream.mkString("\n"))
+ // remote exec command to change file owner. Only file owner equal with server's clickhouse user can
+ // make ATTACH command work.
+ session.executeRemoteCommand("ls -l " + targetPath.substring(0, targetPath.stripSuffix("/").lastIndexOf("/")) +
+ "/ | tail -n 1 | awk '{print $3}' | xargs -t -i chown -R {}:{} " + targetPath)
+ } catch {
+ case e: Exception =>
+ // always return error cause xargs return shell command result
+ }
}
override def init(): Unit = {
- throw new UnsupportedOperationException("not support rsync file transfer yet")
+ client = SshClient.setUpDefaultClient()
+ client.start()
+ session = client.connect("root", this.host, 22).verify().getSession
+ if (password != null) {
+ session.addPasswordIdentity(this.password)
+ }
+ val isSuccess = session.auth.verify.isSuccess
+ if (!isSuccess) {
+ throw new IllegalArgumentException(s"ssh host '$host' verify failed, please check your config")
+ }
}
override def transferAndChown(sourcePath: List[String], targetPath: String): Unit = {
- throw new UnsupportedOperationException("not support rsync file transfer yet")
+ sourcePath.foreach(s => {
+ transferAndChown(s, targetPath)
+ })
}
override def close(): Unit = {
- throw new UnsupportedOperationException("not support rsync file transfer yet")
+ if (session != null && session.isOpen) {
+ session.close()
+ }
}
}