You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by ra...@apache.org on 2023/03/08 06:41:57 UTC

[linkis] branch dev-1.4.0 updated: [feat]Support spark3.3+ and spark2.2- compile (#4301)

This is an automated email from the ASF dual-hosted git repository.

rarexixi pushed a commit to branch dev-1.4.0
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/dev-1.4.0 by this push:
     new 2c5abd7a0 [feat]Support spark3.3+  and spark2.2- compile (#4301)
2c5abd7a0 is described below

commit 2c5abd7a0504db7b2ef5c34a2e43257c66d783c0
Author: GuoPhilipse <46...@users.noreply.github.com>
AuthorDate: Wed Mar 8 14:41:49 2023 +0800

    [feat]Support spark3.3+  and spark2.2- compile (#4301)
    
    * Support spark3.3+  and spark2.2- compile
    
    * fix unknown dependency
    
    * improve code
    
    * remove unused dependency
    
    * remove unused dependency for hive sink
    
    ---------
    
    Co-authored-by: gf13871 <gf...@ly.com>
---
 linkis-engineconn-plugins/spark/pom.xml                  | 10 ++++++++++
 .../engineplugin/spark/datacalc/sink/HiveSink.scala      | 16 ++++++++--------
 .../engineplugin/spark/datacalc/sink/JdbcSink.scala      | 16 ++++++++++++----
 tool/dependencies/known-dependencies.txt                 |  1 +
 4 files changed, 31 insertions(+), 12 deletions(-)

diff --git a/linkis-engineconn-plugins/spark/pom.xml b/linkis-engineconn-plugins/spark/pom.xml
index c7993b28d..a5e7523d0 100644
--- a/linkis-engineconn-plugins/spark/pom.xml
+++ b/linkis-engineconn-plugins/spark/pom.xml
@@ -180,12 +180,22 @@
       <artifactId>linkis-rpc</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>net.sf.py4j</groupId>
+      <artifactId>py4j</artifactId>
+      <version>0.10.7</version>
+      <scope>provided</scope>
+    </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-core_${scala.binary.version}</artifactId>
       <version>${spark.version}</version>
       <scope>provided</scope>
       <exclusions>
+        <exclusion>
+          <groupId>net.sf.py4j</groupId>
+          <artifactId>py4j</artifactId>
+        </exclusion>
         <exclusion>
           <groupId>org.apache.hadoop</groupId>
           <artifactId>hadoop-common</artifactId>
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/HiveSink.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/HiveSink.scala
index 8ba618776..1a81d6537 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/HiveSink.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/HiveSink.scala
@@ -24,14 +24,11 @@ import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
 import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.sources.DataSourceRegister
 import org.apache.spark.sql.types.StructField
 
-import org.slf4j.{Logger, LoggerFactory}
-
 class HiveSink extends DataCalcSink[HiveSinkConfig] with Logging {
 
   def output(spark: SparkSession, ds: Dataset[Row]): Unit = {
@@ -122,7 +119,9 @@ class HiveSink extends DataCalcSink[HiveSinkConfig] with Logging {
       logFields(sourceFields, targetFields)
       throw new HiveSinkException(
         SparkErrorCodeSummary.DATA_CALC_COLUMN_NUM_NOT_MATCH.getErrorCode,
-        s"$targetTable requires that the data to be inserted have the same number of columns as the target table: target table has ${targetFields.length} column(s) but the inserted data has ${sourceFields.length} column(s)"
+        s"$targetTable requires that the data to be inserted have the same number of columns " +
+          s"as the target table: target table has ${targetFields.length} column(s) " +
+          s"but the inserted data has ${sourceFields.length} column(s)"
       )
     }
 
@@ -184,17 +183,18 @@ class HiveSink extends DataCalcSink[HiveSinkConfig] with Logging {
           logicalRelation.relation match {
             case hadoopFsRelation: HadoopFsRelation =>
               hadoopFsRelation.fileFormat match {
-                case _: org.apache.spark.sql.execution.datasources.orc.OrcFileFormat =>
-                  fileFormat = FileFormat.ORC
                 case _: org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat =>
                   fileFormat = FileFormat.PARQUET
                 case dataSourceRegister: DataSourceRegister =>
                   fileFormat = FileFormat.withName(dataSourceRegister.shortName.toUpperCase)
                 case _ =>
+                  if (hadoopFsRelation.fileFormat.getClass.getSimpleName.equals("OrcFileFormat")) {
+                    fileFormat = FileFormat.ORC
+                  }
               }
           }
-        case hiveTableRelation: HiveTableRelation =>
-        // todo
+        // case hiveTableRelation: HiveTableRelation =>
+        // todo please note `HiveTableRelation` was added after spark 2.2.1
       }
       fileFormat
     } catch {
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/JdbcSink.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/JdbcSink.scala
index ab8a21c3f..e9d60bd2b 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/JdbcSink.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/JdbcSink.scala
@@ -17,14 +17,16 @@
 
 package org.apache.linkis.engineplugin.spark.datacalc.sink
 
+import org.apache.linkis.common.utils.ClassUtils.getFieldVal
 import org.apache.linkis.common.utils.Logging
 import org.apache.linkis.engineplugin.spark.datacalc.api.DataCalcSink
 
 import org.apache.commons.lang3.StringUtils
+import org.apache.spark.SPARK_VERSION
 import org.apache.spark.sql.{Dataset, Row, SparkSession}
-import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
+import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
 
-import java.sql.Connection
+import java.sql.{Connection, DriverManager}
 
 import scala.collection.JavaConverters._
 
@@ -58,7 +60,8 @@ class JdbcSink extends DataCalcSink[JdbcSinkConfig] with Logging {
         .repartition(1)
         .foreachPartition((_: Iterator[Row]) => {
           val jdbcOptions = new JDBCOptions(options)
-          val conn: Connection = JdbcUtils.createConnectionFactory(jdbcOptions)()
+          val conn: Connection =
+            DriverManager.getConnection(config.getUrl, config.getUser, config.getPassword)
           try {
             config.getPreQueries.asScala.foreach(query => {
               logger.info(s"Execute pre query: $query")
@@ -86,7 +89,12 @@ class JdbcSink extends DataCalcSink[JdbcSinkConfig] with Logging {
     logger.info("Execute query: {}", query)
     val statement = conn.prepareStatement(query)
     try {
-      statement.setQueryTimeout(jdbcOptions.queryTimeout)
+      // `queryTimeout` was added after spark2.4.0, more details please check SPARK-23856
+      if (SPARK_VERSION >= "2.4") {
+        val queryTimeout = getFieldVal(jdbcOptions, "queryTimeout").asInstanceOf[Int]
+        statement.setQueryTimeout(queryTimeout)
+      }
+
       val rows = statement.executeUpdate()
       logger.info("{} rows affected", rows)
     } catch {
diff --git a/tool/dependencies/known-dependencies.txt b/tool/dependencies/known-dependencies.txt
index 55f0163aa..5b3b459d6 100644
--- a/tool/dependencies/known-dependencies.txt
+++ b/tool/dependencies/known-dependencies.txt
@@ -434,6 +434,7 @@ protostuff-collectionschema-1.6.2.jar
 protostuff-core-1.6.2.jar
 protostuff-runtime-1.6.2.jar
 py4j-0.10.4.jar
+py4j-0.10.7.jar
 quartz-2.3.2.jar
 reactive-streams-1.0.3.jar
 reactor-core-3.3.17.RELEASE.jar


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org