You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/02/25 03:43:59 UTC

[incubator-seatunnel] branch dev updated: [Feature] [connector] Remove `Hdfs Connector` and make it compatible with `File Connector` (#1256)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7fee8ef  [Feature] [connector] Remove `Hdfs Connector` and make it compatible with `File Connector` (#1256)
7fee8ef is described below

commit 7fee8eff00884b7040968d4865fc9cbb4ce034be
Author: Simon <zh...@cvte.com>
AuthorDate: Fri Feb 25 11:43:52 2022 +0800

    [Feature] [connector] Remove `Hdfs Connector` and make it compatible with `File Connector` (#1256)
---
 docs/en/spark/configuration/sink-plugins/File.md   |   4 +-
 docs/en/spark/configuration/sink-plugins/Hdfs.md   |  70 ------------
 docs/en/spark/configuration/source-plugins/File.md |  41 +++++++
 .../spark/{sink/Hdfs.scala => Config.scala}        |  24 ++--
 .../org/apache/seatunnel/spark/sink/File.scala     |  61 ++++++++++-
 .../apache/seatunnel/spark/sink/FileSinkBase.scala | 121 ---------------------
 .../org/apache/seatunnel/spark/source/File.scala   |  34 ++----
 7 files changed, 122 insertions(+), 233 deletions(-)

diff --git a/docs/en/spark/configuration/sink-plugins/File.md b/docs/en/spark/configuration/sink-plugins/File.md
index 5eb1ada..e79419b 100644
--- a/docs/en/spark/configuration/sink-plugins/File.md
+++ b/docs/en/spark/configuration/sink-plugins/File.md
@@ -4,7 +4,7 @@
 
 ## Description
 
-Output data to file
+Output data to local or hdfs file.
 
 ## Options
 
@@ -28,7 +28,7 @@ Partition data based on selected fields
 
 ### path [string]
 
-Output file path, starting with `file://`
+Output file path, starting with `file://` or  `hdfs://`
 
 ### path_time_format [string]
 
diff --git a/docs/en/spark/configuration/sink-plugins/Hdfs.md b/docs/en/spark/configuration/sink-plugins/Hdfs.md
deleted file mode 100644
index b7f3c65..0000000
--- a/docs/en/spark/configuration/sink-plugins/Hdfs.md
+++ /dev/null
@@ -1,70 +0,0 @@
-# Hdfs
-
-> Sink plugin : Hdfs [Spark]
-
-## Description
-
-Export data to HDFS
-
-## Options
-
-| name             | type   | required | default value  |
-| ---------------- | ------ | -------- | -------------- |
-| options          | object | no       | -              |
-| partition_by     | array  | no       | -              |
-| path             | string | yes      | -              |
-| path_time_format | string | no       | yyyyMMddHHmmss |
-| save_mode        | string | no       | error          |
-| serializer       | string | no       | json           |
-| common-options   | string | no       | -              |
-
-### options [object]
-
-Custom parameters
-
-### partition_by [array]
-
-Partition data based on selected fields
-
-### path [string]
-
-Output file path, starting with `hdfs://`
-
-### path_time_format [string]
-
-When the format in the `path` parameter is `xxxx-${now}` , `path_time_format` can specify the time format of the path, and the default value is `yyyy.MM.dd` . The commonly used time formats are listed as follows:
-
-| Symbol | Description        |
-| ------ | ------------------ |
-| y      | Year               |
-| M      | Month              |
-| d      | Day of month       |
-| H      | Hour in day (0-23) |
-| m      | Minute in hour     |
-| s      | Second in minute   |
-
-See [Java SimpleDateFormat](https://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html) for detailed time format syntax.
-
-### save_mode [string]
-
-Storage mode, currently supports `overwrite` , `append` , `ignore` and `error` . For the specific meaning of each mode, see [save-modes](https://spark.apache.org/docs/latest/sql-programming-guide.html#save-modes)
-
-### serializer [string]
-
-Serialization method, currently supports `csv` , `json` , `parquet` , `orc` and `text`
-
-### common options [string]
-
-Sink plugin common parameters, please refer to [Sink Plugin](./sink-plugin.md) for details
-
-## Examples
-
-```bash
-hdfs {
-    path = "hdfs:///var/logs-${now}"
-    serializer = "json"
-    path_time_format = "yyyy.MM.dd"
-}
-```
-
-> Generate HDFS files on a daily basis, such as `logs-2021.12.15`
diff --git a/docs/en/spark/configuration/source-plugins/File.md b/docs/en/spark/configuration/source-plugins/File.md
new file mode 100644
index 0000000..8ac1000
--- /dev/null
+++ b/docs/en/spark/configuration/source-plugins/File.md
@@ -0,0 +1,41 @@
+# File
+
+> Source plugin : File [Spark]
+
+## Description
+read data from local or hdfs file.
+
+## Options
+
+| name | type | required | default value |
+| --- | --- | --- | --- |
+| format | string | no | json |
+| path | string | yes | - |
+| common-options| string | yes | - |
+
+##### format [string]
+Format for reading files, currently supports text, parquet, json, orc, csv.
+
+##### path [string]
+- If read data from hdfs , the file path should start with `hdfs://`  
+- If read data from local , the file path should start with `file://`
+
+### common options [string]
+
+Source plugin common parameters, please refer to [Source Plugin](./source-plugin.md) for details
+
+## Examples
+
+```
+file {
+    path = "hdfs:///var/logs"
+    result_table_name = "access_log"
+}
+```
+
+```
+file {
+    path = "file:///var/logs"
+    result_table_name = "access_log"
+}
+```
diff --git a/seatunnel-connectors/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/sink/Hdfs.scala b/seatunnel-connectors/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/Config.scala
similarity index 65%
rename from seatunnel-connectors/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/sink/Hdfs.scala
rename to seatunnel-connectors/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/Config.scala
index 34fe6b7..a130d1f 100644
--- a/seatunnel-connectors/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/sink/Hdfs.scala
+++ b/seatunnel-connectors/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/Config.scala
@@ -14,19 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.seatunnel.spark.sink
+package org.apache.seatunnel.spark
 
-import org.apache.spark.sql.{Dataset, Row}
-import org.apache.seatunnel.common.config.CheckResult
-import org.apache.seatunnel.spark.SparkEnvironment
+object Config extends Serializable {
 
-class Hdfs extends FileSinkBase {
+  val PATH = "path"
+  val PARTITION_BY = "partition_by"
+  val SAVE_MODE = "save_mode"
+  val SERIALIZER = "serializer"
+  val PATH_TIME_FORMAT = "path_time_format"
+  val FORMAT = "format"
 
-  override def checkConfig(): CheckResult = {
-    checkConfigImpl(List("hdfs://"))
-  }
+  val TEXT = "text"
+  val PARQUET = "parquet"
+  val JSON = "json"
+  val ORC = "orc"
+  val CSV = "csv"
 
-  override def output(data: Dataset[Row], env: SparkEnvironment): Unit = {
-    outputImpl(data, "hdfs://")
-  }
 }
diff --git a/seatunnel-connectors/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/sink/File.scala b/seatunnel-connectors/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/sink/File.scala
index 84c7476..08b5e10 100644
--- a/seatunnel-connectors/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/sink/File.scala
+++ b/seatunnel-connectors/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/sink/File.scala
@@ -16,17 +16,66 @@
  */
 package org.apache.seatunnel.spark.sink
 
-import org.apache.spark.sql.{Dataset, Row}
-import org.apache.seatunnel.common.config.CheckResult
+import java.util
+
+import scala.collection.JavaConversions._
+import scala.util.{Failure, Success, Try}
+
+import org.apache.seatunnel.common.config.{CheckResult, TypesafeConfigUtils}
+import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
+import org.apache.seatunnel.common.utils.StringTemplate
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
+import org.apache.seatunnel.spark.Config.{CSV, JSON, ORC, PARQUET, PARTITION_BY, PATH, PATH_TIME_FORMAT, SAVE_MODE, SERIALIZER, TEXT}
 import org.apache.seatunnel.spark.SparkEnvironment
+import org.apache.seatunnel.spark.batch.SparkBatchSink
+import org.apache.spark.sql.{Dataset, Row}
 
-class File extends FileSinkBase {
+class File extends SparkBatchSink {
 
   override def checkConfig(): CheckResult = {
-    checkConfigImpl(List("file://"))
+    checkAllExists(config, PATH)
+  }
+
+  override def prepare(env: SparkEnvironment): Unit = {
+    val defaultConfig = ConfigFactory.parseMap(
+      Map(
+        PARTITION_BY -> util.Arrays.asList(),
+        SAVE_MODE -> "error", // allowed values: overwrite, append, ignore, error
+        SERIALIZER -> "json", // allowed values: csv, json, parquet, text
+        PATH_TIME_FORMAT -> "yyyyMMddHHmmss" // if variable 'now' is used in path, this option specifies its time_format
+      ))
+    config = config.withFallback(defaultConfig)
   }
 
-  override def output(data: Dataset[Row], env: SparkEnvironment): Unit = {
-    outputImpl(data, "file://")
+  override def output(ds: Dataset[Row], env: SparkEnvironment): Unit = {
+    var writer = ds.write.mode(config.getString(SAVE_MODE))
+    writer = config.getStringList(PARTITION_BY).isEmpty match {
+      case true => writer
+      case false =>
+        val partitionKeys = config.getStringList(PARTITION_BY)
+        writer.partitionBy(partitionKeys: _*)
+    }
+
+    Try(TypesafeConfigUtils.extractSubConfigThrowable(config, "options.", false)) match {
+
+      case Success(options) =>
+        val optionMap = options
+          .entrySet()
+          .foldRight(Map[String, String]())((entry, m) => {
+            m + (entry.getKey -> entry.getValue.unwrapped().toString)
+          })
+
+        writer.options(optionMap)
+      case Failure(_) => // do nothing
+    }
+
+    val path = StringTemplate.substitute(config.getString(PATH), config.getString(PATH_TIME_FORMAT))
+    config.getString(SERIALIZER) match {
+      case CSV => writer.csv(path)
+      case JSON => writer.json(path)
+      case PARQUET => writer.parquet(path)
+      case TEXT => writer.text(path)
+      case ORC => writer.orc(path)
+    }
   }
 }
diff --git a/seatunnel-connectors/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/sink/FileSinkBase.scala b/seatunnel-connectors/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/sink/FileSinkBase.scala
deleted file mode 100644
index a1cd0fc..0000000
--- a/seatunnel-connectors/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/sink/FileSinkBase.scala
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.seatunnel.spark.sink
-
-import java.util
-
-
-import scala.collection.JavaConversions._
-import scala.util.{Failure, Success, Try}
-import org.apache.spark.sql.{Dataset, Row}
-import org.apache.seatunnel.common.config.{CheckResult, TypesafeConfigUtils}
-import org.apache.seatunnel.common.utils.StringTemplate
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
-import org.apache.seatunnel.spark.SparkEnvironment
-import org.apache.seatunnel.spark.batch.SparkBatchSink
-
-abstract class FileSinkBase extends SparkBatchSink {
-
-  def checkConfigImpl(allowedURISchema: List[String]): CheckResult = {
-    config.hasPath("path") && !config.getString("path").trim.isEmpty match {
-      case true => {
-        val dir = config.getString("path")
-
-        dir.startsWith("/") || uriInAllowedSchema(dir, allowedURISchema) match {
-          case true => CheckResult.success()
-          case false =>
-            CheckResult.error(
-              "invalid path URI, please set the following allowed schemas: " + allowedURISchema.mkString(
-                ", "))
-        }
-      }
-      case false => CheckResult.error("please specify [path] as non-empty string")
-    }
-  }
-
-  override def prepare(env: SparkEnvironment): Unit = {
-    val defaultConfig = ConfigFactory.parseMap(
-      Map(
-        "partition_by" -> util.Arrays.asList(),
-        "save_mode" -> "error", // allowed values: overwrite, append, ignore, error
-        "serializer" -> "json", // allowed values: csv, json, parquet, text
-        "path_time_format" -> "yyyyMMddHHmmss" // if variable 'now' is used in path, this option specifies its time_format
-      ))
-    config = config.withFallback(defaultConfig)
-  }
-
-  /**
-   * check if the schema name in this uri is allowed.
-   * @return true if schema name is allowed
-   */
-  protected def uriInAllowedSchema(uri: String, allowedURISchema: List[String]): Boolean = {
-
-    val notAllowed = allowedURISchema.forall(schema => {
-      !uri.startsWith(schema)
-    })
-
-    !notAllowed
-  }
-
-  protected def buildPathWithDefaultSchema(uri: String, defaultUriSchema: String): String = {
-
-    val path = uri.startsWith("/") match {
-      case true => defaultUriSchema + uri
-      case false => uri
-    }
-
-    path
-  }
-
-  def outputImpl(df: Dataset[Row], defaultUriSchema: String): Unit = {
-
-    var writer = df.write.mode(config.getString("save_mode"))
-
-    writer = config.getStringList("partition_by").length == 0 match {
-      case true => writer
-      case false => {
-        val partitionKeys = config.getStringList("partition_by")
-        writer.partitionBy(partitionKeys: _*)
-      }
-    }
-
-    Try(TypesafeConfigUtils.extractSubConfigThrowable(config, "options.", false)) match {
-
-      case Success(options) => {
-        val optionMap = options
-          .entrySet()
-          .foldRight(Map[String, String]())((entry, m) => {
-            m + (entry.getKey -> entry.getValue.unwrapped().toString)
-          })
-
-        writer.options(optionMap)
-      }
-      case Failure(exception) => // do nothing
-
-    }
-
-    var path = buildPathWithDefaultSchema(config.getString("path"), defaultUriSchema)
-    path = StringTemplate.substitute(path, config.getString("path_time_format"))
-    config.getString("serializer") match {
-      case "csv" => writer.csv(path)
-      case "json" => writer.json(path)
-      case "parquet" => writer.parquet(path)
-      case "text" => writer.text(path)
-      case "orc" => writer.orc(path)
-    }
-  }
-}
diff --git a/seatunnel-connectors/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/source/File.scala b/seatunnel-connectors/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/source/File.scala
index 2f16187..69c2249 100644
--- a/seatunnel-connectors/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/source/File.scala
+++ b/seatunnel-connectors/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/source/File.scala
@@ -21,6 +21,7 @@ import scala.util.{Failure, Success, Try}
 
 import org.apache.seatunnel.common.config.{CheckResult, TypesafeConfigUtils}
 import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
+import org.apache.seatunnel.spark.Config.{CSV, FORMAT, JSON, ORC, PARQUET, PATH, TEXT}
 import org.apache.seatunnel.spark.SparkEnvironment
 import org.apache.seatunnel.spark.batch.SparkBatchSource
 import org.apache.spark.sql.{Dataset, Row, SparkSession}
@@ -30,21 +31,19 @@ class File extends SparkBatchSource {
   override def prepare(env: SparkEnvironment): Unit = {}
 
   override def getData(env: SparkEnvironment): Dataset[Row] = {
-    val path = buildPathWithDefaultSchema(config.getString("path"), "file://")
-    fileReader(env.getSparkSession, path)
+    fileReader(env.getSparkSession, config.getString(PATH))
   }
 
   override def checkConfig(): CheckResult = {
-    checkAllExists(config, "path")
+    checkAllExists(config, PATH)
   }
 
   protected def fileReader(spark: SparkSession, path: String): Dataset[Row] = {
-    val format = config.getString("format")
+    val format = config.getString(FORMAT)
     var reader = spark.read.format(format)
 
     Try(TypesafeConfigUtils.extractSubConfigThrowable(config, "options.", false)) match {
-
-      case Success(options) => {
+      case Success(options) =>
         val optionMap = options
           .entrySet()
           .foldRight(Map[String, String]())((entry, m) => {
@@ -52,27 +51,16 @@ class File extends SparkBatchSource {
           })
 
         reader = reader.options(optionMap)
-      }
-      case Failure(_) => // do nothing
+      case Failure(_) =>
     }
 
     format match {
-      case "text" => reader.load(path).withColumnRenamed("value", "raw_message")
-      case "parquet" => reader.parquet(path)
-      case "json" => reader.json(path)
-      case "orc" => reader.orc(path)
-      case "csv" => reader.csv(path)
+      case TEXT => reader.load(path).withColumnRenamed("value", "raw_message")
+      case PARQUET => reader.parquet(path)
+      case JSON => reader.json(path)
+      case ORC => reader.orc(path)
+      case CSV => reader.csv(path)
       case _ => reader.format(format).load(path)
     }
   }
-
-  protected def buildPathWithDefaultSchema(uri: String, defaultUriSchema: String): String = {
-
-    val path = uri.startsWith("/") match {
-      case true => defaultUriSchema + uri
-      case false => uri
-    }
-
-    path
-  }
 }