You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by ra...@apache.org on 2023/03/08 06:41:57 UTC
[linkis] branch dev-1.4.0 updated: [feat]Support spark3.3+ and spark2.2- compile (#4301)
This is an automated email from the ASF dual-hosted git repository.
rarexixi pushed a commit to branch dev-1.4.0
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/dev-1.4.0 by this push:
new 2c5abd7a0 [feat]Support spark3.3+ and spark2.2- compile (#4301)
2c5abd7a0 is described below
commit 2c5abd7a0504db7b2ef5c34a2e43257c66d783c0
Author: GuoPhilipse <46...@users.noreply.github.com>
AuthorDate: Wed Mar 8 14:41:49 2023 +0800
[feat]Support spark3.3+ and spark2.2- compile (#4301)
* Support spark3.3+ and spark2.2- compile
* fix unknown dependency
* improve code
* remove unused dependency
* remove unused dependency for hive sink
---------
Co-authored-by: gf13871 <gf...@ly.com>
---
linkis-engineconn-plugins/spark/pom.xml | 10 ++++++++++
.../engineplugin/spark/datacalc/sink/HiveSink.scala | 16 ++++++++--------
.../engineplugin/spark/datacalc/sink/JdbcSink.scala | 16 ++++++++++++----
tool/dependencies/known-dependencies.txt | 1 +
4 files changed, 31 insertions(+), 12 deletions(-)
diff --git a/linkis-engineconn-plugins/spark/pom.xml b/linkis-engineconn-plugins/spark/pom.xml
index c7993b28d..a5e7523d0 100644
--- a/linkis-engineconn-plugins/spark/pom.xml
+++ b/linkis-engineconn-plugins/spark/pom.xml
@@ -180,12 +180,22 @@
<artifactId>linkis-rpc</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>net.sf.py4j</groupId>
+ <artifactId>py4j</artifactId>
+ <version>0.10.7</version>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
<exclusions>
+ <exclusion>
+ <groupId>net.sf.py4j</groupId>
+ <artifactId>py4j</artifactId>
+ </exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/HiveSink.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/HiveSink.scala
index 8ba618776..1a81d6537 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/HiveSink.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/HiveSink.scala
@@ -24,14 +24,11 @@ import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.types.StructField
-import org.slf4j.{Logger, LoggerFactory}
-
class HiveSink extends DataCalcSink[HiveSinkConfig] with Logging {
def output(spark: SparkSession, ds: Dataset[Row]): Unit = {
@@ -122,7 +119,9 @@ class HiveSink extends DataCalcSink[HiveSinkConfig] with Logging {
logFields(sourceFields, targetFields)
throw new HiveSinkException(
SparkErrorCodeSummary.DATA_CALC_COLUMN_NUM_NOT_MATCH.getErrorCode,
- s"$targetTable requires that the data to be inserted have the same number of columns as the target table: target table has ${targetFields.length} column(s) but the inserted data has ${sourceFields.length} column(s)"
+ s"$targetTable requires that the data to be inserted have the same number of columns " +
+ s"as the target table: target table has ${targetFields.length} column(s) " +
+ s"but the inserted data has ${sourceFields.length} column(s)"
)
}
@@ -184,17 +183,18 @@ class HiveSink extends DataCalcSink[HiveSinkConfig] with Logging {
logicalRelation.relation match {
case hadoopFsRelation: HadoopFsRelation =>
hadoopFsRelation.fileFormat match {
- case _: org.apache.spark.sql.execution.datasources.orc.OrcFileFormat =>
- fileFormat = FileFormat.ORC
case _: org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat =>
fileFormat = FileFormat.PARQUET
case dataSourceRegister: DataSourceRegister =>
fileFormat = FileFormat.withName(dataSourceRegister.shortName.toUpperCase)
case _ =>
+ if (hadoopFsRelation.fileFormat.getClass.getSimpleName.equals("OrcFileFormat")) {
+ fileFormat = FileFormat.ORC
+ }
}
}
- case hiveTableRelation: HiveTableRelation =>
- // todo
+ // case hiveTableRelation: HiveTableRelation =>
+ // todo please note `HiveTableRelation` was added after spark 2.2.1
}
fileFormat
} catch {
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/JdbcSink.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/JdbcSink.scala
index ab8a21c3f..e9d60bd2b 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/JdbcSink.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/JdbcSink.scala
@@ -17,14 +17,16 @@
package org.apache.linkis.engineplugin.spark.datacalc.sink
+import org.apache.linkis.common.utils.ClassUtils.getFieldVal
import org.apache.linkis.common.utils.Logging
import org.apache.linkis.engineplugin.spark.datacalc.api.DataCalcSink
import org.apache.commons.lang3.StringUtils
+import org.apache.spark.SPARK_VERSION
import org.apache.spark.sql.{Dataset, Row, SparkSession}
-import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
+import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
-import java.sql.Connection
+import java.sql.{Connection, DriverManager}
import scala.collection.JavaConverters._
@@ -58,7 +60,8 @@ class JdbcSink extends DataCalcSink[JdbcSinkConfig] with Logging {
.repartition(1)
.foreachPartition((_: Iterator[Row]) => {
val jdbcOptions = new JDBCOptions(options)
- val conn: Connection = JdbcUtils.createConnectionFactory(jdbcOptions)()
+ val conn: Connection =
+ DriverManager.getConnection(config.getUrl, config.getUser, config.getPassword)
try {
config.getPreQueries.asScala.foreach(query => {
logger.info(s"Execute pre query: $query")
@@ -86,7 +89,12 @@ class JdbcSink extends DataCalcSink[JdbcSinkConfig] with Logging {
logger.info("Execute query: {}", query)
val statement = conn.prepareStatement(query)
try {
- statement.setQueryTimeout(jdbcOptions.queryTimeout)
+ // `queryTimeout` was added after spark2.4.0, more details please check SPARK-23856
+ if (SPARK_VERSION >= "2.4") {
+ val queryTimeout = getFieldVal(jdbcOptions, "queryTimeout").asInstanceOf[Int]
+ statement.setQueryTimeout(queryTimeout)
+ }
+
val rows = statement.executeUpdate()
logger.info("{} rows affected", rows)
} catch {
diff --git a/tool/dependencies/known-dependencies.txt b/tool/dependencies/known-dependencies.txt
index 55f0163aa..5b3b459d6 100644
--- a/tool/dependencies/known-dependencies.txt
+++ b/tool/dependencies/known-dependencies.txt
@@ -434,6 +434,7 @@ protostuff-collectionschema-1.6.2.jar
protostuff-core-1.6.2.jar
protostuff-runtime-1.6.2.jar
py4j-0.10.4.jar
+py4j-0.10.7.jar
quartz-2.3.2.jar
reactive-streams-1.0.3.jar
reactor-core-3.3.17.RELEASE.jar
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org