You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/01/01 12:11:21 UTC

[incubator-seatunnel] branch dev updated: [SeaTunnel #910] Support spark-hive sink (#911)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 2f7b9fa  [SeaTunnel #910] Support spark-hive sink (#911)
2f7b9fa is described below

commit 2f7b9fa327c5aff5848392aed8dba74e744c65cb
Author: Simon <36...@qq.com>
AuthorDate: Sat Jan 1 20:11:17 2022 +0800

    [SeaTunnel #910] Support spark-hive sink (#911)
    
    * spark.sink.Hive
    
    * spark.sink.Hive
    
    * Update Hive.md
---
 docs/en/configuration/sink-plugins/Hive.md         | 63 +++++++++++++++++++
 .../org.apache.seatunnel.spark.BaseSparkSink       |  1 +
 .../org/apache/seatunnel/spark/sink/Hive.scala     | 71 ++++++++++++++++++++++
 3 files changed, 135 insertions(+)

diff --git a/docs/en/configuration/sink-plugins/Hive.md b/docs/en/configuration/sink-plugins/Hive.md
new file mode 100644
index 0000000..61c2f65
--- /dev/null
+++ b/docs/en/configuration/sink-plugins/Hive.md
@@ -0,0 +1,63 @@
+# Sink plugin: Hive
+
+### Description
+
+Write Rows to [Apache Hive](https://hive.apache.org).
+
+### Options
+
+| name                                    | type          | required | default value |
+| --------------------------------------- | ------------- | -------- | ------------- |
+| [sql](#sql-string)                             | string        | no       | -             |
+| [source_table_name](#source_table_name-string) | string        | no       | -             |
+| [result_table_name](#result_table_name-string) | string        | no       | -             |
+| [sink_columns](#sink_columns-string)           | string        | no       | -             |
+| [save_mode](#save_mode-string)                 | string        | no       | -             |
+| [partition_by](#partition_by-arraystring)           | Array[string] | no       | -             |
+
+##### sql [string]
+Hive sql:insert into/overwrite $table  select * from xxx_table  
+
+If this option exists, other options will be ignored
+
+##### Source_table_name [string]
+
+Datasource of this plugin.
+
+##### result_table_name [string]
+
+The output hive table name.
+
+##### save_mode [string]
+
+Same with option `spark.mode` in Spark.
+
+##### sink_columns [string]
+
+Select the required fields in source_table_name and store them in result_table_name, separated by commas.
+
+##### partition_by [Array[string]]
+
+Hive partition fields
+
+### Example
+
+```conf
+sink {
+  Hive {
+    sql = "insert overwrite table seatunnel.test1 partition(province) select name,age,province from myTable2"
+  }
+}
+```
+
+```conf
+sink {
+  Hive {
+    source_table_name = "myTable2"
+    result_table_name = "seatunnel.test1"
+    save_mode = "overwrite"
+    sink_columns = "name,age,province"
+    partition_by = ["province"]
+  }
+}
+```
diff --git a/seatunnel-connectors/seatunnel-connector-spark-hive/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkSink b/seatunnel-connectors/seatunnel-connector-spark-hive/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkSink
new file mode 100644
index 0000000..3e6b5e8
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connector-spark-hive/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkSink
@@ -0,0 +1 @@
+org.apache.seatunnel.spark.sink.Hive
\ No newline at end of file
diff --git a/seatunnel-connectors/seatunnel-connector-spark-hive/src/main/scala/org/apache/seatunnel/spark/sink/Hive.scala b/seatunnel-connectors/seatunnel-connector-spark-hive/src/main/scala/org/apache/seatunnel/spark/sink/Hive.scala
new file mode 100644
index 0000000..cd3ee1f
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connector-spark-hive/src/main/scala/org/apache/seatunnel/spark/sink/Hive.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.spark.sink
+
+import java.util
+
+import scala.collection.JavaConversions._
+
+import org.apache.seatunnel.common.config.CheckResult
+import org.apache.seatunnel.spark.SparkEnvironment
+import org.apache.seatunnel.spark.batch.SparkBatchSink
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{DataFrameWriter, Dataset, Row}
+
+class Hive extends SparkBatchSink with Logging {
+
+  override def checkConfig(): CheckResult = {
+    config.hasPath("sql") || (config.hasPath("source_table_name") && config.hasPath(
+      "result_table_name")) match {
+      case true =>
+        new CheckResult(true, "")
+      case false =>
+        new CheckResult(false, "please specify sql or source_table_name && result_table_name")
+    }
+  }
+
+  override def prepare(env: SparkEnvironment): Unit = {}
+
+  override def output(df: Dataset[Row], environment: SparkEnvironment): Unit = {
+    val sparkSession = df.sparkSession
+    config.hasPath("sql") match {
+      case true => {
+        val sql = config.getString("sql")
+        sparkSession.sql(sql)
+      }
+      case _ => {
+        val sourceTableName = config.getString("source_table_name")
+        val resultTableName = config.getString("result_table_name")
+        val sinkColumns = config.hasPath("sink_columns") match {
+          case true => config.getString("sink_columns")
+          case false => "*"
+        }
+        val sinkFrame = sparkSession.sql(s"select $sinkColumns from $sourceTableName")
+        val frameWriter: DataFrameWriter[Row] = config.hasPath("save_mode") match {
+          case true => sinkFrame.write.mode(config.getString("save_mode"))
+          case _ => sinkFrame.write
+        }
+        config.hasPath("partition_by") match {
+          case true =>
+            val partitionList: util.List[String] = config.getStringList("partition_by")
+            frameWriter.partitionBy(partitionList: _*).saveAsTable(resultTableName)
+          case _ => frameWriter.saveAsTable(resultTableName)
+        }
+      }
+    }
+  }
+}