You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/01/05 11:08:17 UTC

[incubator-seatunnel] branch dev updated: Optimize Use TypesafeConfigUtils (#945)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 0f45b63  Optimize Use TypesafeConfigUtils (#945)
0f45b63 is described below

commit 0f45b63cb8ada88f12daa4a8fe131fa192f01c44
Author: xtr_1993 <38...@users.noreply.github.com>
AuthorDate: Wed Jan 5 19:08:12 2022 +0800

    Optimize Use TypesafeConfigUtils (#945)
    
    * Use TypesafeConfigUtils
    
    * Use TypesafeConfigUtils
    
    * Scala checkStyle
    
    * Scala checkStyle
    
    Co-authored-by: xutengrui_1993 <xu...@yoozoo.com>
---
 .../org/apache/seatunnel/spark/sink/Doris.scala    | 26 +++++++++++++++-------
 .../apache/seatunnel/spark/sink/DorisUtil.scala    |  9 +++++---
 2 files changed, 24 insertions(+), 11 deletions(-)

diff --git a/seatunnel-connectors/seatunnel-connector-spark-doris/src/main/scala/org/apache/seatunnel/spark/sink/Doris.scala b/seatunnel-connectors/seatunnel-connector-spark-doris/src/main/scala/org/apache/seatunnel/spark/sink/Doris.scala
index 01a8068..3117c86 100644
--- a/seatunnel-connectors/seatunnel-connector-spark-doris/src/main/scala/org/apache/seatunnel/spark/sink/Doris.scala
+++ b/seatunnel-connectors/seatunnel-connector-spark-doris/src/main/scala/org/apache/seatunnel/spark/sink/Doris.scala
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.spark.sink
 
-import org.apache.seatunnel.common.config.CheckResult
+import org.apache.seatunnel.common.config.{CheckResult, TypesafeConfigUtils}
 import org.apache.seatunnel.spark.SparkEnvironment
 import org.apache.seatunnel.spark.batch.SparkBatchSink
 import org.apache.spark.sql.{Dataset, Row}
@@ -40,7 +40,15 @@ class Doris extends SparkBatchSink with Serializable {
     }
     val sparkSession = env.getSparkSession
     import sparkSession.implicits._
-    val dataFrame = data.map(x => x.toString().replaceAll("\\[|\\]", "").replace(",", column_separator))
+    val fields = data.schema.fields
+    val dataFrame = data.map(row => {
+      val builder = new StringBuilder
+      fields.foreach(f => {
+        val filedValue = row.getAs[Any](f.name)
+        builder.append(filedValue).append(column_separator)
+      })
+      builder.substring(0, builder.length - 1)
+    })
     dataFrame.foreachPartition { partition =>
       var count: Int = 0
       val buffer = new ListBuffer[String]
@@ -78,14 +86,16 @@ class Doris extends SparkBatchSink with Serializable {
       val dataBase: String = config.getString(Config.DATABASE)
       val tableName: String = config.getString(Config.TABLE_NAME)
       this.apiUrl = s"http://$host/api/$dataBase/$tableName/_stream_load"
-      val httpConfig = JavaConversions.asScalaSet(config.entrySet()).filter(x => x.getKey.startsWith(Config.ARGS_PREFIX))
-      if (httpConfig.nonEmpty) {
-        httpConfig.foreach(tuple => {
-          val split = tuple.getKey.split(".")
+      if (TypesafeConfigUtils.hasSubConfig(config, Config.ARGS_PREFIX)) {
+        val properties = TypesafeConfigUtils.extractSubConfig(config, Config.ARGS_PREFIX, true)
+        val iterator = properties.entrySet().iterator()
+        while (iterator.hasNext) {
+          val map = iterator.next()
+          val split = map.getKey.split("\\.")
           if (split.size == 2) {
-            propertiesMap.put(split(1), tuple.getValue.render())
+            propertiesMap.put(split(1), String.valueOf(map.getValue.unwrapped))
           }
-        })
+        }
       }
       new CheckResult(true, Config.CHECK_SUCCESS)
     }
diff --git a/seatunnel-connectors/seatunnel-connector-spark-doris/src/main/scala/org/apache/seatunnel/spark/sink/DorisUtil.scala b/seatunnel-connectors/seatunnel-connector-spark-doris/src/main/scala/org/apache/seatunnel/spark/sink/DorisUtil.scala
index ed16533..e4a5309 100644
--- a/seatunnel-connectors/seatunnel-connector-spark-doris/src/main/scala/org/apache/seatunnel/spark/sink/DorisUtil.scala
+++ b/seatunnel-connectors/seatunnel-connector-spark-doris/src/main/scala/org/apache/seatunnel/spark/sink/DorisUtil.scala
@@ -24,9 +24,9 @@ import org.apache.http.client.methods.{CloseableHttpResponse, HttpPut}
 import org.apache.http.entity.StringEntity
 import org.apache.http.impl.client.{CloseableHttpClient, DefaultConnectionKeepAliveStrategy, DefaultRedirectStrategy, HttpClientBuilder}
 import org.apache.log4j.Logger
-
 import java.io.{BufferedReader, InputStreamReader}
 import java.nio.charset.{Charset, StandardCharsets}
+
 import scala.util.{Failure, Success, Try}
 
 object DorisUtil extends Serializable {
@@ -94,6 +94,7 @@ object DorisUtil extends Serializable {
     (status, httpclient, response)
   }
 
+
   def basicAuthHeader(username: String, password: String): String = {
     val tobeEncode: String = username + ":" + password
     val encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8))
@@ -107,13 +108,15 @@ class DorisUtil(httpHeader: Map[String, String], apiUrl: String, user: String, p
     val httpClient = DorisUtil.createClient
     val result = Try(DorisUtil.streamLoad(httpClient, httpHeader, messages, apiUrl, user, password))
     result match {
-      case Success(_) =>
+      case Success(_) => {
         httpClient.close()
         result.get._2.close()
-      case Failure(var1: Exception) =>
+      }
+      case Failure(var1: Exception) => {
         httpClient.close()
         result.get._2.close()
         throw new RuntimeException(var1.getMessage)
+      }
     }
   }
 }