You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/01/07 13:22:37 UTC

[incubator-seatunnel] branch dev updated: [SeaTunnel #925] Add spark-kudu sink plugin (#926)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 72b57b2  [SeaTunnel #925] Add spark-kudu sink plugin (#926)
72b57b2 is described below

commit 72b57b22688f17376fe7e5cf522b4bdd3f62cce0
Author: Simon <zh...@cvte.com>
AuthorDate: Fri Jan 7 21:21:47 2022 +0800

    [SeaTunnel #925] Add spark-kudu sink plugin (#926)
---
 docs/en/configuration/sink-plugins/Kudu.md         | 31 +++++++++++
 .../org.apache.seatunnel.spark.BaseSparkSink       |  1 +
 .../org/apache/seatunnel/spark/sink/Kudu.scala     | 60 ++++++++++++++++++++++
 3 files changed, 92 insertions(+)

diff --git a/docs/en/configuration/sink-plugins/Kudu.md b/docs/en/configuration/sink-plugins/Kudu.md
new file mode 100644
index 0000000..aae48e4
--- /dev/null
+++ b/docs/en/configuration/sink-plugins/Kudu.md
@@ -0,0 +1,31 @@
+# Sink plugin: Kudu
+
+## Description
+
+Write data to Kudu.
+
+## Options
+
+| name           | type   | required | default value |
+| -------------- | ------ | -------- | ------------- |
+| [kudu_master](#kudu_master-string)            | string | yes      | -             |
+| [kudu_table](#kudu_table-string)       | string | yes      | -         |
+| [mode](#mode-string)       | string | no      | insert         |
+
+### kudu_master [string]
+Kudu master, multiple masters are separated by commas
+
+### kudu_table [string]
+The name of the table to be written in kudu, the table must already exist
+
+### mode [string]
+Write the mode adopted in kudu, support insert|update|upsert|insertIgnore, the default is insert.
+## Example
+
+```bash
+kudu {
+   kudu_master="hadoop01:7051,hadoop02:7051,hadoop03:7051"
+   kudu_table="my_kudu_table"
+   mode="upsert"
+ }
+```
diff --git a/seatunnel-connectors/seatunnel-connector-spark-kudu/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkSink b/seatunnel-connectors/seatunnel-connector-spark-kudu/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkSink
new file mode 100644
index 0000000..c4b318d
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connector-spark-kudu/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkSink
@@ -0,0 +1 @@
+org.apache.seatunnel.spark.sink.Kudu
\ No newline at end of file
diff --git a/seatunnel-connectors/seatunnel-connector-spark-kudu/src/main/scala/org/apache/seatunnel/spark/sink/Kudu.scala b/seatunnel-connectors/seatunnel-connector-spark-kudu/src/main/scala/org/apache/seatunnel/spark/sink/Kudu.scala
new file mode 100644
index 0000000..0f3162e
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connector-spark-kudu/src/main/scala/org/apache/seatunnel/spark/sink/Kudu.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.spark.sink
+
+import scala.collection.JavaConversions._
+
+import org.apache.kudu.spark.kudu._
+import org.apache.seatunnel.common.config.CheckResult
+import org.apache.seatunnel.config.ConfigFactory
+import org.apache.seatunnel.spark.SparkEnvironment
+import org.apache.seatunnel.spark.batch.SparkBatchSink
+import org.apache.spark.sql.{Dataset, Row}
+
+class Kudu extends SparkBatchSink {
+
+  override def prepare(env: SparkEnvironment): Unit = {
+    val defaultConfig = ConfigFactory.parseMap(
+      Map(
+        "mode" -> "insert"))
+    this.config = config.withFallback(defaultConfig)
+  }
+
+  override def checkConfig(): CheckResult = {
+    config.hasPath("kudu_master") && config.hasPath("kudu_table") match {
+      case true =>
+        new CheckResult(true, "")
+      case false =>
+        new CheckResult(false, "please specify [kudu_master] and [kudu_table] ")
+    }
+  }
+
+  override def output(df: Dataset[Row], environment: SparkEnvironment): Unit = {
+    val kuduContext = new KuduContext(
+      config.getString("kudu_master"),
+      df.sparkSession.sparkContext)
+
+    val table = config.getString("kudu_table")
+    config.getString("mode") match {
+      case "insert" => kuduContext.insertRows(df, table)
+      case "update" => kuduContext.updateRows(df, table)
+      case "upsert" => kuduContext.upsertRows(df, table)
+      case "insertIgnore" => kuduContext.insertIgnoreRows(df, table)
+    }
+  }
+}