You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/01/01 12:11:21 UTC
[incubator-seatunnel] branch dev updated: [SeaTunnel #910] Support spark-hive sink (#911)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 2f7b9fa [SeaTunnel #910] Support spark-hive sink (#911)
2f7b9fa is described below
commit 2f7b9fa327c5aff5848392aed8dba74e744c65cb
Author: Simon <36...@qq.com>
AuthorDate: Sat Jan 1 20:11:17 2022 +0800
[SeaTunnel #910] Support spark-hive sink (#911)
* spark.sink.Hive
* spark.sink.Hive
* Update Hive.md
---
docs/en/configuration/sink-plugins/Hive.md | 63 +++++++++++++++++++
.../org.apache.seatunnel.spark.BaseSparkSink | 1 +
.../org/apache/seatunnel/spark/sink/Hive.scala | 71 ++++++++++++++++++++++
3 files changed, 135 insertions(+)
diff --git a/docs/en/configuration/sink-plugins/Hive.md b/docs/en/configuration/sink-plugins/Hive.md
new file mode 100644
index 0000000..61c2f65
--- /dev/null
+++ b/docs/en/configuration/sink-plugins/Hive.md
@@ -0,0 +1,63 @@
+# Sink plugin: Hive
+
+### Description
+
+Write Rows to [Apache Hive](https://hive.apache.org).
+
+### Options
+
+| name | type | required | default value |
+| --------------------------------------- | ------------- | -------- | ------------- |
+| [sql](#sql-string) | string | no | - |
+| [source_table_name](#source_table_name-string) | string | no | - |
+| [result_table_name](#result_table_name-string) | string | no | - |
+| [sink_columns](#sink_columns-string) | string | no | - |
+| [save_mode](#save_mode-string) | string | no | - |
+| [partition_by](#partition_by-arraystring) | Array[string] | no | - |
+
+##### sql [string]
+Hive sql:insert into/overwrite $table select * from xxx_table
+
+If this option exists, other options will be ignored
+
+##### Source_table_name [string]
+
+Datasource of this plugin.
+
+##### result_table_name [string]
+
+The output hive table name.
+
+##### save_mode [string]
+
+Same with option `spark.mode` in Spark.
+
+##### sink_columns [string]
+
+Select the required fields in source_table_name and store them in result_table_name, separated by commas.
+
+##### partition_by [Array[string]]
+
+Hive partition fields
+
+### Example
+
+```conf
+sink {
+ Hive {
+ sql = "insert overwrite table seatunnel.test1 partition(province) select name,age,province from myTable2"
+ }
+}
+```
+
+```conf
+sink {
+ Hive {
+ source_table_name = "myTable2"
+ result_table_name = "seatunnel.test1"
+ save_mode = "overwrite"
+ sink_columns = "name,age,province"
+ partition_by = ["province"]
+ }
+}
+```
diff --git a/seatunnel-connectors/seatunnel-connector-spark-hive/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkSink b/seatunnel-connectors/seatunnel-connector-spark-hive/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkSink
new file mode 100644
index 0000000..3e6b5e8
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connector-spark-hive/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkSink
@@ -0,0 +1 @@
+org.apache.seatunnel.spark.sink.Hive
\ No newline at end of file
diff --git a/seatunnel-connectors/seatunnel-connector-spark-hive/src/main/scala/org/apache/seatunnel/spark/sink/Hive.scala b/seatunnel-connectors/seatunnel-connector-spark-hive/src/main/scala/org/apache/seatunnel/spark/sink/Hive.scala
new file mode 100644
index 0000000..cd3ee1f
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connector-spark-hive/src/main/scala/org/apache/seatunnel/spark/sink/Hive.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.spark.sink
+
+import java.util
+
+import scala.collection.JavaConversions._
+
+import org.apache.seatunnel.common.config.CheckResult
+import org.apache.seatunnel.spark.SparkEnvironment
+import org.apache.seatunnel.spark.batch.SparkBatchSink
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{DataFrameWriter, Dataset, Row}
+
+class Hive extends SparkBatchSink with Logging {
+
+ override def checkConfig(): CheckResult = {
+ config.hasPath("sql") || (config.hasPath("source_table_name") && config.hasPath(
+ "result_table_name")) match {
+ case true =>
+ new CheckResult(true, "")
+ case false =>
+ new CheckResult(false, "please specify sql or source_table_name && result_table_name")
+ }
+ }
+
+ override def prepare(env: SparkEnvironment): Unit = {}
+
+ override def output(df: Dataset[Row], environment: SparkEnvironment): Unit = {
+ val sparkSession = df.sparkSession
+ config.hasPath("sql") match {
+ case true => {
+ val sql = config.getString("sql")
+ sparkSession.sql(sql)
+ }
+ case _ => {
+ val sourceTableName = config.getString("source_table_name")
+ val resultTableName = config.getString("result_table_name")
+ val sinkColumns = config.hasPath("sink_columns") match {
+ case true => config.getString("sink_columns")
+ case false => "*"
+ }
+ val sinkFrame = sparkSession.sql(s"select $sinkColumns from $sourceTableName")
+ val frameWriter: DataFrameWriter[Row] = config.hasPath("save_mode") match {
+ case true => sinkFrame.write.mode(config.getString("save_mode"))
+ case _ => sinkFrame.write
+ }
+ config.hasPath("partition_by") match {
+ case true =>
+ val partitionList: util.List[String] = config.getStringList("partition_by")
+ frameWriter.partitionBy(partitionList: _*).saveAsTable(resultTableName)
+ case _ => frameWriter.saveAsTable(resultTableName)
+ }
+ }
+ }
+ }
+}