You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by ja...@apache.org on 2023/02/07 13:36:45 UTC
[linkis] branch dev-1.3.2 updated: [feat] support different spark version (#4146)
This is an automated email from the ASF dual-hosted git repository.
jackxu2011 pushed a commit to branch dev-1.3.2
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/dev-1.3.2 by this push:
new 5375d9550 [feat] support different spark version (#4146)
5375d9550 is described below
commit 5375d95500382aff0a6e68da636fc4fa0a17e196
Author: GuoPhilipse <46...@users.noreply.github.com>
AuthorDate: Tue Feb 7 21:36:38 2023 +0800
[feat] support different spark version (#4146)
* add support different spark version
Co-authored-by: gf13871 <gf...@ly.com>
---
linkis-engineconn-plugins/spark/pom.xml | 4 ----
.../linkis/engineplugin/spark/datacalc/sink/JdbcSink.scala | 2 +-
.../engineplugin/spark/executor/SparkDataCalcExecutor.scala | 8 --------
.../engineplugin/spark/metadata/SparkSQLHistoryParser.scala | 10 +++++++++-
pom.xml | 10 ++++++++++
5 files changed, 20 insertions(+), 14 deletions(-)
diff --git a/linkis-engineconn-plugins/spark/pom.xml b/linkis-engineconn-plugins/spark/pom.xml
index 4885ea986..46ed7abab 100644
--- a/linkis-engineconn-plugins/spark/pom.xml
+++ b/linkis-engineconn-plugins/spark/pom.xml
@@ -25,12 +25,8 @@
</parent>
<artifactId>linkis-engineplugin-spark</artifactId>
- <properties>
- <spark.version>2.4.3</spark.version>
- </properties>
<dependencies>
-
<dependency>
<groupId>org.apache.linkis</groupId>
<artifactId>linkis-engineconn-plugin-core</artifactId>
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/JdbcSink.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/JdbcSink.scala
index 498c000ae..1d97ba22a 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/JdbcSink.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/JdbcSink.scala
@@ -59,7 +59,7 @@ class JdbcSink extends DataCalcSink[JdbcSinkConfig] {
spark
.sql("select 1")
.repartition(1)
- .foreachPartition(_ => {
+ .foreachPartition((_: Iterator[Row]) => {
val jdbcOptions = new JDBCOptions(options)
val conn: Connection = JdbcUtils.createConnectionFactory(jdbcOptions)()
try {
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkDataCalcExecutor.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkDataCalcExecutor.scala
index 81679dce0..c2757c34e 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkDataCalcExecutor.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkDataCalcExecutor.scala
@@ -34,9 +34,7 @@ import org.apache.linkis.scheduler.executer.{
SuccessExecuteResponse
}
-import org.apache.arrow.memory.OutOfMemoryException
import org.apache.commons.lang.exception.ExceptionUtils
-import org.apache.orc.storage.common.io.Allocator.AllocatorOutOfMemoryException
import java.lang.reflect.InvocationTargetException
@@ -74,16 +72,10 @@ class SparkDataCalcExecutor(sparkEngineSession: SparkEngineSession, id: Long)
var cause = ExceptionUtils.getCause(e)
if (cause == null) cause = e
ErrorExecuteResponse(ExceptionUtils.getRootCauseMessage(e), cause)
- case e: OutOfMemoryException =>
- getErrorResponse(e, true)
- case e: AllocatorOutOfMemoryException =>
- getErrorResponse(e, true)
case e: FatalException =>
getErrorResponse(e, true)
case e: Exception =>
getErrorResponse(e, false)
- case err: OutOfMemoryError =>
- getErrorResponse(err, true)
case err: VirtualMachineError =>
getErrorResponse(err, true)
case err: Error =>
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/metadata/SparkSQLHistoryParser.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/metadata/SparkSQLHistoryParser.scala
index 3fa5b1322..adce99739 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/metadata/SparkSQLHistoryParser.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/metadata/SparkSQLHistoryParser.scala
@@ -23,6 +23,7 @@ import org.apache.linkis.cs.common.entity.metadata.CSColumn
import org.apache.linkis.engineplugin.spark.metadata.{SparkHiveObject => HPO}
import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivilegeObjectType
+import org.apache.spark.SPARK_VERSION
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.catalog.CatalogTable
@@ -262,7 +263,14 @@ object SparkSQLHistoryParser {
columns = toCSColumnsByNamed(c.output),
actionType = TableOperationType.CREATE
)
- ParseQuery(c.child, inputObjects)
+
+ // after spark 3.2.0, `child` field will be replaced by `plan` in CreateViewCommand
+ val logicalPlan = if (SPARK_VERSION < "3.2") {
+ getFieldVal(c, "child").asInstanceOf[LogicalPlan]
+ } else {
+ getFieldVal(c, "plan").asInstanceOf[LogicalPlan]
+ }
+ ParseQuery(logicalPlan, inputObjects)
case l: LoadDataCommand => addTableOrViewLevelObjs(l.table, outputObjects)
diff --git a/pom.xml b/pom.xml
index d4af75e6c..6e42ea39d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -105,6 +105,7 @@
<properties>
<revision>1.3.2-SNAPSHOT</revision>
<jedis.version>2.9.2</jedis.version>
+ <spark.version>2.4.3</spark.version>
<hadoop.version>2.7.2</hadoop.version>
<hadoop-hdfs-client.artifact>hadoop-hdfs</hadoop-hdfs-client.artifact>
<hadoop-hdfs-client-shade.version>2.7.2</hadoop-hdfs-client-shade.version>
@@ -1367,6 +1368,15 @@
<curator.version>2.7.1</curator.version>
</properties>
</profile>
+ <profile>
+ <id>spark-3.2</id>
+ <properties>
+ <json4s.version>3.7.0-M11</json4s.version>
+ <spark.version>3.2.1</spark.version>
+ <scala.version>2.12.15</scala.version>
+ <scala.binary.version>2.12</scala.binary.version>
+ </properties>
+ </profile>
<!-- jacoco: mvn validate -Pjacoco -->
<profile>
<id>jacoco</id>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org