You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/01/05 11:08:17 UTC
[incubator-seatunnel] branch dev updated: Optimize Use TypesafeConfigUtils (#945)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 0f45b63 Optimize Use TypesafeConfigUtils (#945)
0f45b63 is described below
commit 0f45b63cb8ada88f12daa4a8fe131fa192f01c44
Author: xtr_1993 <38...@users.noreply.github.com>
AuthorDate: Wed Jan 5 19:08:12 2022 +0800
Optimize Use TypesafeConfigUtils (#945)
* Use TypesafeConfigUtils
* Use TypesafeConfigUtils
* Scala checkStyle
* Scala checkStyle
Co-authored-by: xutengrui_1993 <xu...@yoozoo.com>
---
.../org/apache/seatunnel/spark/sink/Doris.scala | 26 +++++++++++++++-------
.../apache/seatunnel/spark/sink/DorisUtil.scala | 9 +++++---
2 files changed, 24 insertions(+), 11 deletions(-)
diff --git a/seatunnel-connectors/seatunnel-connector-spark-doris/src/main/scala/org/apache/seatunnel/spark/sink/Doris.scala b/seatunnel-connectors/seatunnel-connector-spark-doris/src/main/scala/org/apache/seatunnel/spark/sink/Doris.scala
index 01a8068..3117c86 100644
--- a/seatunnel-connectors/seatunnel-connector-spark-doris/src/main/scala/org/apache/seatunnel/spark/sink/Doris.scala
+++ b/seatunnel-connectors/seatunnel-connector-spark-doris/src/main/scala/org/apache/seatunnel/spark/sink/Doris.scala
@@ -17,7 +17,7 @@
package org.apache.seatunnel.spark.sink
-import org.apache.seatunnel.common.config.CheckResult
+import org.apache.seatunnel.common.config.{CheckResult, TypesafeConfigUtils}
import org.apache.seatunnel.spark.SparkEnvironment
import org.apache.seatunnel.spark.batch.SparkBatchSink
import org.apache.spark.sql.{Dataset, Row}
@@ -40,7 +40,15 @@ class Doris extends SparkBatchSink with Serializable {
}
val sparkSession = env.getSparkSession
import sparkSession.implicits._
- val dataFrame = data.map(x => x.toString().replaceAll("\\[|\\]", "").replace(",", column_separator))
+ val fields = data.schema.fields
+ val dataFrame = data.map(row => {
+ val builder = new StringBuilder
+ fields.foreach(f => {
+ val filedValue = row.getAs[Any](f.name)
+ builder.append(filedValue).append(column_separator)
+ })
+ builder.substring(0, builder.length - 1)
+ })
dataFrame.foreachPartition { partition =>
var count: Int = 0
val buffer = new ListBuffer[String]
@@ -78,14 +86,16 @@ class Doris extends SparkBatchSink with Serializable {
val dataBase: String = config.getString(Config.DATABASE)
val tableName: String = config.getString(Config.TABLE_NAME)
this.apiUrl = s"http://$host/api/$dataBase/$tableName/_stream_load"
- val httpConfig = JavaConversions.asScalaSet(config.entrySet()).filter(x => x.getKey.startsWith(Config.ARGS_PREFIX))
- if (httpConfig.nonEmpty) {
- httpConfig.foreach(tuple => {
- val split = tuple.getKey.split(".")
+ if (TypesafeConfigUtils.hasSubConfig(config, Config.ARGS_PREFIX)) {
+ val properties = TypesafeConfigUtils.extractSubConfig(config, Config.ARGS_PREFIX, true)
+ val iterator = properties.entrySet().iterator()
+ while (iterator.hasNext) {
+ val map = iterator.next()
+ val split = map.getKey.split("\\.")
if (split.size == 2) {
- propertiesMap.put(split(1), tuple.getValue.render())
+ propertiesMap.put(split(1), String.valueOf(map.getValue.unwrapped))
}
- })
+ }
}
new CheckResult(true, Config.CHECK_SUCCESS)
}
diff --git a/seatunnel-connectors/seatunnel-connector-spark-doris/src/main/scala/org/apache/seatunnel/spark/sink/DorisUtil.scala b/seatunnel-connectors/seatunnel-connector-spark-doris/src/main/scala/org/apache/seatunnel/spark/sink/DorisUtil.scala
index ed16533..e4a5309 100644
--- a/seatunnel-connectors/seatunnel-connector-spark-doris/src/main/scala/org/apache/seatunnel/spark/sink/DorisUtil.scala
+++ b/seatunnel-connectors/seatunnel-connector-spark-doris/src/main/scala/org/apache/seatunnel/spark/sink/DorisUtil.scala
@@ -24,9 +24,9 @@ import org.apache.http.client.methods.{CloseableHttpResponse, HttpPut}
import org.apache.http.entity.StringEntity
import org.apache.http.impl.client.{CloseableHttpClient, DefaultConnectionKeepAliveStrategy, DefaultRedirectStrategy, HttpClientBuilder}
import org.apache.log4j.Logger
-
import java.io.{BufferedReader, InputStreamReader}
import java.nio.charset.{Charset, StandardCharsets}
+
import scala.util.{Failure, Success, Try}
object DorisUtil extends Serializable {
@@ -94,6 +94,7 @@ object DorisUtil extends Serializable {
(status, httpclient, response)
}
+
def basicAuthHeader(username: String, password: String): String = {
val tobeEncode: String = username + ":" + password
val encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8))
@@ -107,13 +108,15 @@ class DorisUtil(httpHeader: Map[String, String], apiUrl: String, user: String, p
val httpClient = DorisUtil.createClient
val result = Try(DorisUtil.streamLoad(httpClient, httpHeader, messages, apiUrl, user, password))
result match {
- case Success(_) =>
+ case Success(_) => {
httpClient.close()
result.get._2.close()
- case Failure(var1: Exception) =>
+ }
+ case Failure(var1: Exception) => {
httpClient.close()
result.get._2.close()
throw new RuntimeException(var1.getMessage)
+ }
}
}
}