You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/02/25 03:43:59 UTC
[incubator-seatunnel] branch dev updated: [Feature] [connector] Remove `Hdfs Connector` and make it compatible with `File Connector` (#1256)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 7fee8ef [Feature] [connector] Remove `Hdfs Connector` and make it compatible with `File Connector` (#1256)
7fee8ef is described below
commit 7fee8eff00884b7040968d4865fc9cbb4ce034be
Author: Simon <zh...@cvte.com>
AuthorDate: Fri Feb 25 11:43:52 2022 +0800
[Feature] [connector] Remove `Hdfs Connector` and make it compatible with `File Connector` (#1256)
---
docs/en/spark/configuration/sink-plugins/File.md | 4 +-
docs/en/spark/configuration/sink-plugins/Hdfs.md | 70 ------------
docs/en/spark/configuration/source-plugins/File.md | 41 +++++++
.../spark/{sink/Hdfs.scala => Config.scala} | 24 ++--
.../org/apache/seatunnel/spark/sink/File.scala | 61 ++++++++++-
.../apache/seatunnel/spark/sink/FileSinkBase.scala | 121 ---------------------
.../org/apache/seatunnel/spark/source/File.scala | 34 ++----
7 files changed, 122 insertions(+), 233 deletions(-)
diff --git a/docs/en/spark/configuration/sink-plugins/File.md b/docs/en/spark/configuration/sink-plugins/File.md
index 5eb1ada..e79419b 100644
--- a/docs/en/spark/configuration/sink-plugins/File.md
+++ b/docs/en/spark/configuration/sink-plugins/File.md
@@ -4,7 +4,7 @@
## Description
-Output data to file
+Output data to local or hdfs file.
## Options
@@ -28,7 +28,7 @@ Partition data based on selected fields
### path [string]
-Output file path, starting with `file://`
+Output file path, starting with `file://` or `hdfs://`
### path_time_format [string]
diff --git a/docs/en/spark/configuration/sink-plugins/Hdfs.md b/docs/en/spark/configuration/sink-plugins/Hdfs.md
deleted file mode 100644
index b7f3c65..0000000
--- a/docs/en/spark/configuration/sink-plugins/Hdfs.md
+++ /dev/null
@@ -1,70 +0,0 @@
-# Hdfs
-
-> Sink plugin : Hdfs [Spark]
-
-## Description
-
-Export data to HDFS
-
-## Options
-
-| name | type | required | default value |
-| ---------------- | ------ | -------- | -------------- |
-| options | object | no | - |
-| partition_by | array | no | - |
-| path | string | yes | - |
-| path_time_format | string | no | yyyyMMddHHmmss |
-| save_mode | string | no | error |
-| serializer | string | no | json |
-| common-options | string | no | - |
-
-### options [object]
-
-Custom parameters
-
-### partition_by [array]
-
-Partition data based on selected fields
-
-### path [string]
-
-Output file path, starting with `hdfs://`
-
-### path_time_format [string]
-
-When the format in the `path` parameter is `xxxx-${now}` , `path_time_format` can specify the time format of the path, and the default value is `yyyy.MM.dd` . The commonly used time formats are listed as follows:
-
-| Symbol | Description |
-| ------ | ------------------ |
-| y | Year |
-| M | Month |
-| d | Day of month |
-| H | Hour in day (0-23) |
-| m | Minute in hour |
-| s | Second in minute |
-
-See [Java SimpleDateFormat](https://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html) for detailed time format syntax.
-
-### save_mode [string]
-
-Storage mode, currently supports `overwrite` , `append` , `ignore` and `error` . For the specific meaning of each mode, see [save-modes](https://spark.apache.org/docs/latest/sql-programming-guide.html#save-modes)
-
-### serializer [string]
-
-Serialization method, currently supports `csv` , `json` , `parquet` , `orc` and `text`
-
-### common options [string]
-
-Sink plugin common parameters, please refer to [Sink Plugin](./sink-plugin.md) for details
-
-## Examples
-
-```bash
-hdfs {
- path = "hdfs:///var/logs-${now}"
- serializer = "json"
- path_time_format = "yyyy.MM.dd"
-}
-```
-
-> Generate HDFS files on a daily basis, such as `logs-2021.12.15`
diff --git a/docs/en/spark/configuration/source-plugins/File.md b/docs/en/spark/configuration/source-plugins/File.md
new file mode 100644
index 0000000..8ac1000
--- /dev/null
+++ b/docs/en/spark/configuration/source-plugins/File.md
@@ -0,0 +1,41 @@
+# File
+
+> Source plugin : File [Spark]
+
+## Description
+read data from local or hdfs file.
+
+## Options
+
+| name | type | required | default value |
+| --- | --- | --- | --- |
+| format | string | no | json |
+| path | string | yes | - |
+| common-options| string | yes | - |
+
+##### format [string]
+Format for reading files, currently supports text, parquet, json, orc, csv.
+
+##### path [string]
+- If read data from hdfs , the file path should start with `hdfs://`
+- If read data from local , the file path should start with `file://`
+
+### common options [string]
+
+Source plugin common parameters, please refer to [Source Plugin](./source-plugin.md) for details
+
+## Examples
+
+```
+file {
+ path = "hdfs:///var/logs"
+ result_table_name = "access_log"
+}
+```
+
+```
+file {
+ path = "file:///var/logs"
+ result_table_name = "access_log"
+}
+```
diff --git a/seatunnel-connectors/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/sink/Hdfs.scala b/seatunnel-connectors/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/Config.scala
similarity index 65%
rename from seatunnel-connectors/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/sink/Hdfs.scala
rename to seatunnel-connectors/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/Config.scala
index 34fe6b7..a130d1f 100644
--- a/seatunnel-connectors/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/sink/Hdfs.scala
+++ b/seatunnel-connectors/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/Config.scala
@@ -14,19 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.seatunnel.spark.sink
+package org.apache.seatunnel.spark
-import org.apache.spark.sql.{Dataset, Row}
-import org.apache.seatunnel.common.config.CheckResult
-import org.apache.seatunnel.spark.SparkEnvironment
+object Config extends Serializable {
-class Hdfs extends FileSinkBase {
+ val PATH = "path"
+ val PARTITION_BY = "partition_by"
+ val SAVE_MODE = "save_mode"
+ val SERIALIZER = "serializer"
+ val PATH_TIME_FORMAT = "path_time_format"
+ val FORMAT = "format"
- override def checkConfig(): CheckResult = {
- checkConfigImpl(List("hdfs://"))
- }
+ val TEXT = "text"
+ val PARQUET = "parquet"
+ val JSON = "json"
+ val ORC = "orc"
+ val CSV = "csv"
- override def output(data: Dataset[Row], env: SparkEnvironment): Unit = {
- outputImpl(data, "hdfs://")
- }
}
diff --git a/seatunnel-connectors/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/sink/File.scala b/seatunnel-connectors/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/sink/File.scala
index 84c7476..08b5e10 100644
--- a/seatunnel-connectors/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/sink/File.scala
+++ b/seatunnel-connectors/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/sink/File.scala
@@ -16,17 +16,66 @@
*/
package org.apache.seatunnel.spark.sink
-import org.apache.spark.sql.{Dataset, Row}
-import org.apache.seatunnel.common.config.CheckResult
+import java.util
+
+import scala.collection.JavaConversions._
+import scala.util.{Failure, Success, Try}
+
+import org.apache.seatunnel.common.config.{CheckResult, TypesafeConfigUtils}
+import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
+import org.apache.seatunnel.common.utils.StringTemplate
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
+import org.apache.seatunnel.spark.Config.{CSV, JSON, ORC, PARQUET, PARTITION_BY, PATH, PATH_TIME_FORMAT, SAVE_MODE, SERIALIZER, TEXT}
import org.apache.seatunnel.spark.SparkEnvironment
+import org.apache.seatunnel.spark.batch.SparkBatchSink
+import org.apache.spark.sql.{Dataset, Row}
-class File extends FileSinkBase {
+class File extends SparkBatchSink {
override def checkConfig(): CheckResult = {
- checkConfigImpl(List("file://"))
+ checkAllExists(config, PATH)
+ }
+
+ override def prepare(env: SparkEnvironment): Unit = {
+ val defaultConfig = ConfigFactory.parseMap(
+ Map(
+ PARTITION_BY -> util.Arrays.asList(),
+ SAVE_MODE -> "error", // allowed values: overwrite, append, ignore, error
+ SERIALIZER -> "json", // allowed values: csv, json, parquet, text
+ PATH_TIME_FORMAT -> "yyyyMMddHHmmss" // if variable 'now' is used in path, this option specifies its time_format
+ ))
+ config = config.withFallback(defaultConfig)
}
- override def output(data: Dataset[Row], env: SparkEnvironment): Unit = {
- outputImpl(data, "file://")
+ override def output(ds: Dataset[Row], env: SparkEnvironment): Unit = {
+ var writer = ds.write.mode(config.getString(SAVE_MODE))
+ writer = config.getStringList(PARTITION_BY).isEmpty match {
+ case true => writer
+ case false =>
+ val partitionKeys = config.getStringList(PARTITION_BY)
+ writer.partitionBy(partitionKeys: _*)
+ }
+
+ Try(TypesafeConfigUtils.extractSubConfigThrowable(config, "options.", false)) match {
+
+ case Success(options) =>
+ val optionMap = options
+ .entrySet()
+ .foldRight(Map[String, String]())((entry, m) => {
+ m + (entry.getKey -> entry.getValue.unwrapped().toString)
+ })
+
+ writer.options(optionMap)
+ case Failure(_) => // do nothing
+ }
+
+ val path = StringTemplate.substitute(config.getString(PATH), config.getString(PATH_TIME_FORMAT))
+ config.getString(SERIALIZER) match {
+ case CSV => writer.csv(path)
+ case JSON => writer.json(path)
+ case PARQUET => writer.parquet(path)
+ case TEXT => writer.text(path)
+ case ORC => writer.orc(path)
+ }
}
}
diff --git a/seatunnel-connectors/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/sink/FileSinkBase.scala b/seatunnel-connectors/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/sink/FileSinkBase.scala
deleted file mode 100644
index a1cd0fc..0000000
--- a/seatunnel-connectors/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/sink/FileSinkBase.scala
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.seatunnel.spark.sink
-
-import java.util
-
-
-import scala.collection.JavaConversions._
-import scala.util.{Failure, Success, Try}
-import org.apache.spark.sql.{Dataset, Row}
-import org.apache.seatunnel.common.config.{CheckResult, TypesafeConfigUtils}
-import org.apache.seatunnel.common.utils.StringTemplate
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
-import org.apache.seatunnel.spark.SparkEnvironment
-import org.apache.seatunnel.spark.batch.SparkBatchSink
-
-abstract class FileSinkBase extends SparkBatchSink {
-
- def checkConfigImpl(allowedURISchema: List[String]): CheckResult = {
- config.hasPath("path") && !config.getString("path").trim.isEmpty match {
- case true => {
- val dir = config.getString("path")
-
- dir.startsWith("/") || uriInAllowedSchema(dir, allowedURISchema) match {
- case true => CheckResult.success()
- case false =>
- CheckResult.error(
- "invalid path URI, please set the following allowed schemas: " + allowedURISchema.mkString(
- ", "))
- }
- }
- case false => CheckResult.error("please specify [path] as non-empty string")
- }
- }
-
- override def prepare(env: SparkEnvironment): Unit = {
- val defaultConfig = ConfigFactory.parseMap(
- Map(
- "partition_by" -> util.Arrays.asList(),
- "save_mode" -> "error", // allowed values: overwrite, append, ignore, error
- "serializer" -> "json", // allowed values: csv, json, parquet, text
- "path_time_format" -> "yyyyMMddHHmmss" // if variable 'now' is used in path, this option specifies its time_format
- ))
- config = config.withFallback(defaultConfig)
- }
-
- /**
- * check if the schema name in this uri is allowed.
- * @return true if schema name is allowed
- */
- protected def uriInAllowedSchema(uri: String, allowedURISchema: List[String]): Boolean = {
-
- val notAllowed = allowedURISchema.forall(schema => {
- !uri.startsWith(schema)
- })
-
- !notAllowed
- }
-
- protected def buildPathWithDefaultSchema(uri: String, defaultUriSchema: String): String = {
-
- val path = uri.startsWith("/") match {
- case true => defaultUriSchema + uri
- case false => uri
- }
-
- path
- }
-
- def outputImpl(df: Dataset[Row], defaultUriSchema: String): Unit = {
-
- var writer = df.write.mode(config.getString("save_mode"))
-
- writer = config.getStringList("partition_by").length == 0 match {
- case true => writer
- case false => {
- val partitionKeys = config.getStringList("partition_by")
- writer.partitionBy(partitionKeys: _*)
- }
- }
-
- Try(TypesafeConfigUtils.extractSubConfigThrowable(config, "options.", false)) match {
-
- case Success(options) => {
- val optionMap = options
- .entrySet()
- .foldRight(Map[String, String]())((entry, m) => {
- m + (entry.getKey -> entry.getValue.unwrapped().toString)
- })
-
- writer.options(optionMap)
- }
- case Failure(exception) => // do nothing
-
- }
-
- var path = buildPathWithDefaultSchema(config.getString("path"), defaultUriSchema)
- path = StringTemplate.substitute(path, config.getString("path_time_format"))
- config.getString("serializer") match {
- case "csv" => writer.csv(path)
- case "json" => writer.json(path)
- case "parquet" => writer.parquet(path)
- case "text" => writer.text(path)
- case "orc" => writer.orc(path)
- }
- }
-}
diff --git a/seatunnel-connectors/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/source/File.scala b/seatunnel-connectors/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/source/File.scala
index 2f16187..69c2249 100644
--- a/seatunnel-connectors/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/source/File.scala
+++ b/seatunnel-connectors/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/source/File.scala
@@ -21,6 +21,7 @@ import scala.util.{Failure, Success, Try}
import org.apache.seatunnel.common.config.{CheckResult, TypesafeConfigUtils}
import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
+import org.apache.seatunnel.spark.Config.{CSV, FORMAT, JSON, ORC, PARQUET, PATH, TEXT}
import org.apache.seatunnel.spark.SparkEnvironment
import org.apache.seatunnel.spark.batch.SparkBatchSource
import org.apache.spark.sql.{Dataset, Row, SparkSession}
@@ -30,21 +31,19 @@ class File extends SparkBatchSource {
override def prepare(env: SparkEnvironment): Unit = {}
override def getData(env: SparkEnvironment): Dataset[Row] = {
- val path = buildPathWithDefaultSchema(config.getString("path"), "file://")
- fileReader(env.getSparkSession, path)
+ fileReader(env.getSparkSession, config.getString(PATH))
}
override def checkConfig(): CheckResult = {
- checkAllExists(config, "path")
+ checkAllExists(config, PATH)
}
protected def fileReader(spark: SparkSession, path: String): Dataset[Row] = {
- val format = config.getString("format")
+ val format = config.getString(FORMAT)
var reader = spark.read.format(format)
Try(TypesafeConfigUtils.extractSubConfigThrowable(config, "options.", false)) match {
-
- case Success(options) => {
+ case Success(options) =>
val optionMap = options
.entrySet()
.foldRight(Map[String, String]())((entry, m) => {
@@ -52,27 +51,16 @@ class File extends SparkBatchSource {
})
reader = reader.options(optionMap)
- }
- case Failure(_) => // do nothing
+ case Failure(_) =>
}
format match {
- case "text" => reader.load(path).withColumnRenamed("value", "raw_message")
- case "parquet" => reader.parquet(path)
- case "json" => reader.json(path)
- case "orc" => reader.orc(path)
- case "csv" => reader.csv(path)
+ case TEXT => reader.load(path).withColumnRenamed("value", "raw_message")
+ case PARQUET => reader.parquet(path)
+ case JSON => reader.json(path)
+ case ORC => reader.orc(path)
+ case CSV => reader.csv(path)
case _ => reader.format(format).load(path)
}
}
-
- protected def buildPathWithDefaultSchema(uri: String, defaultUriSchema: String): String = {
-
- val path = uri.startsWith("/") match {
- case true => defaultUriSchema + uri
- case false => uri
- }
-
- path
- }
}