You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by ja...@apache.org on 2023/02/07 13:36:45 UTC

[linkis] branch dev-1.3.2 updated: [feat] support different spark version (#4146)

This is an automated email from the ASF dual-hosted git repository.

jackxu2011 pushed a commit to branch dev-1.3.2
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/dev-1.3.2 by this push:
     new 5375d9550 [feat] support different spark version (#4146)
5375d9550 is described below

commit 5375d95500382aff0a6e68da636fc4fa0a17e196
Author: GuoPhilipse <46...@users.noreply.github.com>
AuthorDate: Tue Feb 7 21:36:38 2023 +0800

    [feat] support different spark version (#4146)
    
    * add support different spark version
    
    Co-authored-by: gf13871 <gf...@ly.com>
---
 linkis-engineconn-plugins/spark/pom.xml                        |  4 ----
 .../linkis/engineplugin/spark/datacalc/sink/JdbcSink.scala     |  2 +-
 .../engineplugin/spark/executor/SparkDataCalcExecutor.scala    |  8 --------
 .../engineplugin/spark/metadata/SparkSQLHistoryParser.scala    | 10 +++++++++-
 pom.xml                                                        | 10 ++++++++++
 5 files changed, 20 insertions(+), 14 deletions(-)

diff --git a/linkis-engineconn-plugins/spark/pom.xml b/linkis-engineconn-plugins/spark/pom.xml
index 4885ea986..46ed7abab 100644
--- a/linkis-engineconn-plugins/spark/pom.xml
+++ b/linkis-engineconn-plugins/spark/pom.xml
@@ -25,12 +25,8 @@
   </parent>
 
   <artifactId>linkis-engineplugin-spark</artifactId>
-  <properties>
-    <spark.version>2.4.3</spark.version>
-  </properties>
 
   <dependencies>
-
     <dependency>
       <groupId>org.apache.linkis</groupId>
       <artifactId>linkis-engineconn-plugin-core</artifactId>
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/JdbcSink.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/JdbcSink.scala
index 498c000ae..1d97ba22a 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/JdbcSink.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/JdbcSink.scala
@@ -59,7 +59,7 @@ class JdbcSink extends DataCalcSink[JdbcSinkConfig] {
       spark
         .sql("select 1")
         .repartition(1)
-        .foreachPartition(_ => {
+        .foreachPartition((_: Iterator[Row]) => {
           val jdbcOptions = new JDBCOptions(options)
           val conn: Connection = JdbcUtils.createConnectionFactory(jdbcOptions)()
           try {
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkDataCalcExecutor.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkDataCalcExecutor.scala
index 81679dce0..c2757c34e 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkDataCalcExecutor.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkDataCalcExecutor.scala
@@ -34,9 +34,7 @@ import org.apache.linkis.scheduler.executer.{
   SuccessExecuteResponse
 }
 
-import org.apache.arrow.memory.OutOfMemoryException
 import org.apache.commons.lang.exception.ExceptionUtils
-import org.apache.orc.storage.common.io.Allocator.AllocatorOutOfMemoryException
 
 import java.lang.reflect.InvocationTargetException
 
@@ -74,16 +72,10 @@ class SparkDataCalcExecutor(sparkEngineSession: SparkEngineSession, id: Long)
         var cause = ExceptionUtils.getCause(e)
         if (cause == null) cause = e
         ErrorExecuteResponse(ExceptionUtils.getRootCauseMessage(e), cause)
-      case e: OutOfMemoryException =>
-        getErrorResponse(e, true)
-      case e: AllocatorOutOfMemoryException =>
-        getErrorResponse(e, true)
       case e: FatalException =>
         getErrorResponse(e, true)
       case e: Exception =>
         getErrorResponse(e, false)
-      case err: OutOfMemoryError =>
-        getErrorResponse(err, true)
       case err: VirtualMachineError =>
         getErrorResponse(err, true)
       case err: Error =>
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/metadata/SparkSQLHistoryParser.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/metadata/SparkSQLHistoryParser.scala
index 3fa5b1322..adce99739 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/metadata/SparkSQLHistoryParser.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/metadata/SparkSQLHistoryParser.scala
@@ -23,6 +23,7 @@ import org.apache.linkis.cs.common.entity.metadata.CSColumn
 import org.apache.linkis.engineplugin.spark.metadata.{SparkHiveObject => HPO}
 
 import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivilegeObjectType
+import org.apache.spark.SPARK_VERSION
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
@@ -262,7 +263,14 @@ object SparkSQLHistoryParser {
           columns = toCSColumnsByNamed(c.output),
           actionType = TableOperationType.CREATE
         )
-        ParseQuery(c.child, inputObjects)
+
+        // after spark 3.2.0, `child` field will be replaced by `plan` in CreateViewCommand
+        val logicalPlan = if (SPARK_VERSION < "3.2") {
+          getFieldVal(c, "child").asInstanceOf[LogicalPlan]
+        } else {
+          getFieldVal(c, "plan").asInstanceOf[LogicalPlan]
+        }
+        ParseQuery(logicalPlan, inputObjects)
 
       case l: LoadDataCommand => addTableOrViewLevelObjs(l.table, outputObjects)
 
diff --git a/pom.xml b/pom.xml
index d4af75e6c..6e42ea39d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -105,6 +105,7 @@
   <properties>
     <revision>1.3.2-SNAPSHOT</revision>
     <jedis.version>2.9.2</jedis.version>
+    <spark.version>2.4.3</spark.version>
     <hadoop.version>2.7.2</hadoop.version>
     <hadoop-hdfs-client.artifact>hadoop-hdfs</hadoop-hdfs-client.artifact>
     <hadoop-hdfs-client-shade.version>2.7.2</hadoop-hdfs-client-shade.version>
@@ -1367,6 +1368,15 @@
         <curator.version>2.7.1</curator.version>
       </properties>
     </profile>
+    <profile>
+      <id>spark-3.2</id>
+      <properties>
+        <json4s.version>3.7.0-M11</json4s.version>
+        <spark.version>3.2.1</spark.version>
+        <scala.version>2.12.15</scala.version>
+        <scala.binary.version>2.12</scala.binary.version>
+      </properties>
+    </profile>
     <!-- jacoco: mvn validate -Pjacoco  -->
     <profile>
       <id>jacoco</id>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org