You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/06/29 03:54:00 UTC

[incubator-seatunnel] branch dev updated: Support Rsync to transfer clickhouse data file (#2074)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 858c5a853 Support Rsync to transfer clickhouse data file (#2074)
858c5a853 is described below

commit 858c5a85366f86abd477732ce59e329021b1c0b2
Author: wangtao <em...@foxmail.com>
AuthorDate: Wed Jun 29 11:53:54 2022 +0800

    Support Rsync to transfer clickhouse data file (#2074)
    
    * Support Rsync to transfer clickhouse data file
    
    * replace rsync4j with buildin package scala.sys.process
    
    * resolve scalastyle problem
---
 .../spark/clickhouse/sink/ClickhouseFile.scala     | 31 ++++++-----
 .../sink/filetransfer/RsyncFileTransfer.scala      | 65 ++++++++++++++++++++--
 2 files changed, 77 insertions(+), 19 deletions(-)

diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/ClickhouseFile.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/ClickhouseFile.scala
index 095b318a1..c7f37bffc 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/ClickhouseFile.scala
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/ClickhouseFile.scala
@@ -29,8 +29,7 @@ import org.apache.seatunnel.spark.batch.SparkBatchSink
 import org.apache.seatunnel.spark.clickhouse.Config.{CLICKHOUSE_LOCAL_PATH, COPY_METHOD, DATABASE, FIELDS, HOST, NODE_ADDRESS, NODE_FREE_PASSWORD, NODE_PASS, PASSWORD, SHARDING_KEY, TABLE, TMP_BATCH_CACHE_LINE, USERNAME}
 import org.apache.seatunnel.spark.clickhouse.sink.Clickhouse._
 import org.apache.seatunnel.spark.clickhouse.sink.ClickhouseFile.{CLICKHOUSE_FILE_PREFIX, LOGGER, UUID_LENGTH, getClickhouseTableInfo}
-import org.apache.seatunnel.spark.clickhouse.sink.Table
-import org.apache.seatunnel.spark.clickhouse.sink.filetransfer.{FileTransfer, ScpFileTransfer}
+import org.apache.seatunnel.spark.clickhouse.sink.filetransfer.{FileTransfer, RsyncFileTransfer, ScpFileTransfer}
 import org.apache.seatunnel.spark.clickhouse.sink.filetransfer.TransferMethod.{RSYNC, SCP, TransferMethod, getCopyMethod}
 import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
 import org.apache.spark.sql.{Dataset, Encoders, Row}
@@ -165,21 +164,25 @@ class ClickhouseFile extends SparkBatchSink {
   private def moveFileToServer(shard: Shard, paths: List[String]): Unit = {
 
     var fileTransfer: FileTransfer = null
-    if (this.copyFileMethod == SCP) {
-      var scpFileTransfer: ScpFileTransfer = null
-      if (nodePass.contains(shard.hostAddress)) {
-        scpFileTransfer = new ScpFileTransfer(shard.hostAddress, nodePass(shard.hostAddress))
-      } else {
-        scpFileTransfer = new ScpFileTransfer(shard.hostAddress)
+    this.copyFileMethod match {
+      case SCP => {
+        if (freePass || !nodePass.contains(shard.hostAddress)) {
+          fileTransfer = new ScpFileTransfer(shard.hostAddress)
+        } else {
+          fileTransfer = new ScpFileTransfer(shard.hostAddress, nodePass(shard.hostAddress))
+        }
       }
-      scpFileTransfer.init()
-      fileTransfer = scpFileTransfer
-    } else if (this.copyFileMethod == RSYNC) {
-      throw new UnsupportedOperationException(s"not support copy file method: '$copyFileMethod' yet")
-    } else {
-      throw new UnsupportedOperationException(s"unknown copy file method: '$copyFileMethod', please use " +
+      case RSYNC => {
+        if (freePass || !nodePass.contains(shard.hostAddress)) {
+          fileTransfer = new RsyncFileTransfer(shard.hostAddress)
+        } else {
+          fileTransfer = new RsyncFileTransfer(shard.hostAddress, nodePass(shard.hostAddress))
+        }
+      }
+      case _ => throw new UnsupportedOperationException(s"unknown copy file method: '$copyFileMethod', please use " +
         s"scp/rsync instead")
     }
+    fileTransfer.init()
     fileTransfer.transferAndChown(paths, s"${this.table.getLocalDataPath(shard).head}detached/")
 
     fileTransfer.close()
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/filetransfer/RsyncFileTransfer.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/filetransfer/RsyncFileTransfer.scala
index 57fad6bea..1618683e8 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/filetransfer/RsyncFileTransfer.scala
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/filetransfer/RsyncFileTransfer.scala
@@ -17,21 +17,76 @@
 
 package org.apache.seatunnel.spark.clickhouse.sink.filetransfer
 
-object RsyncFileTransfer extends FileTransfer {
+import org.apache.sshd.client.SshClient
+import org.apache.sshd.client.session.ClientSession
+import org.slf4j.LoggerFactory
+
+import scala.collection.mutable
+import scala.sys.process.Process
+
+class RsyncFileTransfer(host: String) extends FileTransfer {
+
+  private val LOGGER = LoggerFactory.getLogger(classOf[RsyncFileTransfer])
+  var password: String = _
+
+  def this(host: String, password: String) {
+    this(host)
+    this.password = password
+  }
+
+  private var session: ClientSession = _
+  private var client: SshClient = _
 
   override def transferAndChown(sourcePath: String, targetPath: String): Unit = {
-    throw new UnsupportedOperationException("not support rsync file transfer yet")
+
+    try {
+      //  we use sshpass to support non-interactive password authentication
+      val sshParameter = if (password!=null) s"sshpass -p $password ssh -o StrictHostKeyChecking=no -p 22" else "ssh -o StrictHostKeyChecking=no -p 22"
+      val exec = mutable.ListBuffer[String]()
+      exec.append("rsync")
+      // recursive
+      exec.append("-r")
+      //  compress during transfer file with -z
+      exec.append("-z")
+      //  use ssh protocol with -e
+      exec.append("-e")
+      exec.append(sshParameter)
+      exec.append(sourcePath)
+      exec.append(s"root@$host:$targetPath")
+      val command = Process(exec)
+      LOGGER.info(command.lineStream.mkString("\n"))
+      // remote exec command to change file owner. Only file owner equal with server's clickhouse user can
+      // make ATTACH command work.
+      session.executeRemoteCommand("ls -l " + targetPath.substring(0, targetPath.stripSuffix("/").lastIndexOf("/")) +
+        "/ | tail -n 1 | awk '{print $3}' | xargs -t -i chown -R {}:{} " + targetPath)
+    } catch {
+      case e: Exception =>
+      // always return error cause xargs return shell command result
+    }
   }
 
   override def init(): Unit = {
-    throw new UnsupportedOperationException("not support rsync file transfer yet")
+    client = SshClient.setUpDefaultClient()
+    client.start()
+    session = client.connect("root", this.host, 22).verify().getSession
+    if (password != null) {
+      session.addPasswordIdentity(this.password)
+    }
+    val isSuccess = session.auth.verify.isSuccess
+    if (!isSuccess) {
+      throw new IllegalArgumentException(s"ssh host '$host' verify failed, please check your config")
+    }
   }
 
   override def transferAndChown(sourcePath: List[String], targetPath: String): Unit = {
-    throw new UnsupportedOperationException("not support rsync file transfer yet")
+    sourcePath.foreach(s => {
+      transferAndChown(s, targetPath)
+    })
   }
 
   override def close(): Unit = {
-    throw new UnsupportedOperationException("not support rsync file transfer yet")
+    if (session != null && session.isOpen) {
+      session.close()
+    }
   }
 }