You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/01/07 13:22:37 UTC
[incubator-seatunnel] branch dev updated: [SeaTunnel #925] Add spark-kudu sink plugin (#926)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 72b57b2 [SeaTunnel #925] Add spark-kudu sink plugin (#926)
72b57b2 is described below
commit 72b57b22688f17376fe7e5cf522b4bdd3f62cce0
Author: Simon <zh...@cvte.com>
AuthorDate: Fri Jan 7 21:21:47 2022 +0800
[SeaTunnel #925] Add spark-kudu sink plugin (#926)
---
docs/en/configuration/sink-plugins/Kudu.md | 31 +++++++++++
.../org.apache.seatunnel.spark.BaseSparkSink | 1 +
.../org/apache/seatunnel/spark/sink/Kudu.scala | 60 ++++++++++++++++++++++
3 files changed, 92 insertions(+)
diff --git a/docs/en/configuration/sink-plugins/Kudu.md b/docs/en/configuration/sink-plugins/Kudu.md
new file mode 100644
index 0000000..aae48e4
--- /dev/null
+++ b/docs/en/configuration/sink-plugins/Kudu.md
@@ -0,0 +1,31 @@
+# Sink plugin: Kudu
+
+## Description
+
+Write data to Kudu.
+
+## Options
+
+| name | type | required | default value |
+| -------------- | ------ | -------- | ------------- |
+| [kudu_master](#kudu_master-string) | string | yes | - |
+| [kudu_table](#kudu_table-string) | string | yes | - |
+| [mode](#mode-string) | string | no | insert |
+
+### kudu_master [string]
+Kudu master, multiple masters are separated by commas
+
+### kudu_table [string]
+The name of the table to be written in kudu, the table must already exist
+
+### mode [string]
+Write the mode adopted in kudu, support insert|update|upsert|insertIgnore, the default is insert.
+## Example
+
+```bash
+kudu {
+ kudu_master="hadoop01:7051,hadoop02:7051,hadoop03:7051"
+ kudu_table="my_kudu_table"
+ mode="upsert"
+ }
+```
diff --git a/seatunnel-connectors/seatunnel-connector-spark-kudu/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkSink b/seatunnel-connectors/seatunnel-connector-spark-kudu/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkSink
new file mode 100644
index 0000000..c4b318d
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connector-spark-kudu/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkSink
@@ -0,0 +1 @@
+org.apache.seatunnel.spark.sink.Kudu
\ No newline at end of file
diff --git a/seatunnel-connectors/seatunnel-connector-spark-kudu/src/main/scala/org/apache/seatunnel/spark/sink/Kudu.scala b/seatunnel-connectors/seatunnel-connector-spark-kudu/src/main/scala/org/apache/seatunnel/spark/sink/Kudu.scala
new file mode 100644
index 0000000..0f3162e
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connector-spark-kudu/src/main/scala/org/apache/seatunnel/spark/sink/Kudu.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.spark.sink
+
+import scala.collection.JavaConversions._
+
+import org.apache.kudu.spark.kudu._
+import org.apache.seatunnel.common.config.CheckResult
+import org.apache.seatunnel.config.ConfigFactory
+import org.apache.seatunnel.spark.SparkEnvironment
+import org.apache.seatunnel.spark.batch.SparkBatchSink
+import org.apache.spark.sql.{Dataset, Row}
+
+class Kudu extends SparkBatchSink {
+
+ override def prepare(env: SparkEnvironment): Unit = {
+ val defaultConfig = ConfigFactory.parseMap(
+ Map(
+ "mode" -> "insert"))
+ this.config = config.withFallback(defaultConfig)
+ }
+
+ override def checkConfig(): CheckResult = {
+ config.hasPath("kudu_master") && config.hasPath("kudu_table") match {
+ case true =>
+ new CheckResult(true, "")
+ case false =>
+ new CheckResult(false, "please specify [kudu_master] and [kudu_table] ")
+ }
+ }
+
+ override def output(df: Dataset[Row], environment: SparkEnvironment): Unit = {
+ val kuduContext = new KuduContext(
+ config.getString("kudu_master"),
+ df.sparkSession.sparkContext)
+
+ val table = config.getString("kudu_table")
+ config.getString("mode") match {
+ case "insert" => kuduContext.insertRows(df, table)
+ case "update" => kuduContext.updateRows(df, table)
+ case "upsert" => kuduContext.upsertRows(df, table)
+ case "insertIgnore" => kuduContext.insertIgnoreRows(df, table)
+ }
+ }
+}