You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2023/03/08 03:44:41 UTC
[linkis] branch dev-1.4.0 updated: [feat] EngineConn no longer imports the dependencies of the underlying engine (#4278)
This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.4.0
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/dev-1.4.0 by this push:
new 9563457f8 [feat] EngineConn no longer imports the dependencies of the underlying engine (#4278)
9563457f8 is described below
commit 9563457f8b44f6d307a79bc5b617916eccfc9164
Author: GuoPhilipse <46...@users.noreply.github.com>
AuthorDate: Wed Mar 8 11:44:33 2023 +0800
[feat] EngineConn no longer imports the dependencies of the underlying engine (#4278)
* EngineConn no longer imports the dependencies of the underlying engine
---
linkis-dist/release-docs/LICENSE | 8 +-
linkis-engineconn-plugins/elasticsearch/pom.xml | 4 -
linkis-engineconn-plugins/flink/pom.xml | 11 --
linkis-engineconn-plugins/hive/pom.xml | 7 -
linkis-engineconn-plugins/io_file/pom.xml | 10 --
.../io/executor/IoEngineConnExecutor.scala | 5 -
linkis-engineconn-plugins/jdbc/pom.xml | 7 -
linkis-engineconn-plugins/openlookeng/pom.xml | 4 -
linkis-engineconn-plugins/pipeline/pom.xml | 10 --
linkis-engineconn-plugins/presto/pom.xml | 4 -
linkis-engineconn-plugins/python/pom.xml | 10 --
linkis-engineconn-plugins/seatunnel/pom.xml | 3 -
linkis-engineconn-plugins/shell/pom.xml | 10 --
linkis-engineconn-plugins/spark/pom.xml | 7 -
.../spark/Interpreter/Interpreter.scala | 43 ------
.../spark/Interpreter/ProcessInterpreter.scala | 125 -----------------
.../spark/Interpreter/PythonInterpreter.scala | 155 +--------------------
.../engineplugin/spark/common/SparkKind.scala | 23 ---
.../engineplugin/spark/imexport/ExportData.scala | 13 +-
.../engineplugin/spark/imexport/LoadData.scala | 33 ++---
linkis-engineconn-plugins/sqoop/pom.xml | 4 -
linkis-engineconn-plugins/trino/pom.xml | 4 +-
linkis-hadoop-hdfs-client-shade/pom.xml | 2 +-
pom.xml | 16 ++-
tool/dependencies/known-dependencies.txt | 8 +-
25 files changed, 41 insertions(+), 485 deletions(-)
diff --git a/linkis-dist/release-docs/LICENSE b/linkis-dist/release-docs/LICENSE
index 7fd83cb1c..8fb514892 100644
--- a/linkis-dist/release-docs/LICENSE
+++ b/linkis-dist/release-docs/LICENSE
@@ -446,10 +446,10 @@ See licenses/ for text of these licenses.
(Apache License, Version 2.0) jackson-databind (com.fasterxml.jackson.core:jackson-databind:2.13.4.1 - http://github.com/FasterXML/jackson)
(Apache License, Version 2.0) jackson-module-scala (com.fasterxml.jackson.module:jackson-module-scala_2.11:2.13.4 - http://wiki.fasterxml.com/JacksonModuleScala)
(Apache License, Version 2.0) javax.inject (javax.inject:javax.inject:1 - http://code.google.com/p/atinject/)
- (Apache License, Version 2.0) json4s-ast (org.json4s:json4s-ast_2.11:3.5.3 - https://github.com/json4s/json4s)
- (Apache License, Version 2.0) json4s-core (org.json4s:json4s-core_2.11:3.5.3 - https://github.com/json4s/json4s)
- (Apache License, Version 2.0) json4s-jackson (org.json4s:json4s-jackson_2.11:3.5.3 - https://github.com/json4s/json4s)
- (Apache License, Version 2.0) json4s-scalap (org.json4s:json4s-scalap_2.11:3.5.3 - https://github.com/json4s/json4s)
+ (Apache License, Version 2.0) json4s-ast (org.json4s:json4s-ast_2.11:3.7.0-M11 - https://github.com/json4s/json4s)
+ (Apache License, Version 2.0) json4s-core (org.json4s:json4s-core_2.11:3.7.0-M11 - https://github.com/json4s/json4s)
+ (Apache License, Version 2.0) json4s-jackson (org.json4s:json4s-jackson_2.11:3.7.0-M11 - https://github.com/json4s/json4s)
+ (Apache License, Version 2.0) json4s-scalap (org.json4s:json4s-scalap_2.11:3.7.0-M11 - https://github.com/json4s/json4s)
(Apache License, Version 2.0) jna (net.java.dev.jna:jna:5.6.0 - https://github.com/java-native-access/jna)
(Apache License, Version 2.0) jna-platform (net.java.dev.jna:jna-platform:5.6.0 - https://github.com/java-native-access/jna)
(Apache License, Version 2.0) micrometer-core (io.micrometer:micrometer-core:1.3.1 - https://github.com/micrometer-metrics/micrometer)
diff --git a/linkis-engineconn-plugins/elasticsearch/pom.xml b/linkis-engineconn-plugins/elasticsearch/pom.xml
index 6b8cc7d1a..cb54e5d77 100644
--- a/linkis-engineconn-plugins/elasticsearch/pom.xml
+++ b/linkis-engineconn-plugins/elasticsearch/pom.xml
@@ -26,10 +26,6 @@
<artifactId>linkis-engineplugin-elasticsearch</artifactId>
- <properties>
- <elasticsearch.version>7.6.2</elasticsearch.version>
- </properties>
-
<dependencies>
<dependency>
<groupId>org.apache.linkis</groupId>
diff --git a/linkis-engineconn-plugins/flink/pom.xml b/linkis-engineconn-plugins/flink/pom.xml
index 7fe87fb2d..ed9474e42 100644
--- a/linkis-engineconn-plugins/flink/pom.xml
+++ b/linkis-engineconn-plugins/flink/pom.xml
@@ -25,10 +25,6 @@
</parent>
<artifactId>linkis-engineconn-plugin-flink</artifactId>
- <properties>
- <flink.version>1.12.2</flink.version>
- <commons-cli.version>1.3.1</commons-cli.version>
- </properties>
<dependencies>
@@ -414,13 +410,6 @@
<scope>provided</scope>
</dependency>
- <dependency>
- <groupId>org.json4s</groupId>
- <artifactId>json4s-jackson_${scala.binary.version}</artifactId>
- <version>${json4s.version}</version>
- <scope>provided</scope>
- </dependency>
-
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
diff --git a/linkis-engineconn-plugins/hive/pom.xml b/linkis-engineconn-plugins/hive/pom.xml
index 28b60fff0..74cc14314 100644
--- a/linkis-engineconn-plugins/hive/pom.xml
+++ b/linkis-engineconn-plugins/hive/pom.xml
@@ -312,13 +312,6 @@
<scope>provided</scope>
</dependency>
- <dependency>
- <groupId>org.json4s</groupId>
- <artifactId>json4s-jackson_${scala.binary.version}</artifactId>
- <version>${json4s.version}</version>
- <scope>provided</scope>
- </dependency>
-
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
diff --git a/linkis-engineconn-plugins/io_file/pom.xml b/linkis-engineconn-plugins/io_file/pom.xml
index 4aeddd125..cc8136e96 100644
--- a/linkis-engineconn-plugins/io_file/pom.xml
+++ b/linkis-engineconn-plugins/io_file/pom.xml
@@ -25,9 +25,6 @@
</parent>
<artifactId>linkis-engineplugin-io_file</artifactId>
- <properties>
- <io_file.version>1.0</io_file.version>
- </properties>
<dependencies>
@@ -80,13 +77,6 @@
<scope>provided</scope>
</dependency>
- <dependency>
- <groupId>org.json4s</groupId>
- <artifactId>json4s-jackson_${scala.binary.version}</artifactId>
- <version>${json4s.version}</version>
- <scope>provided</scope>
- </dependency>
-
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
diff --git a/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/executor/IoEngineConnExecutor.scala b/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/executor/IoEngineConnExecutor.scala
index fbc4a77d1..ef9ba73b3 100644
--- a/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/executor/IoEngineConnExecutor.scala
+++ b/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/executor/IoEngineConnExecutor.scala
@@ -29,7 +29,6 @@ import org.apache.linkis.manager.common.entity.resource.{
LoadResource,
NodeResource
}
-import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
import org.apache.linkis.manager.engineplugin.io.conf.IOEngineConnConfiguration
import org.apache.linkis.manager.engineplugin.io.domain.FSInfo
@@ -61,14 +60,10 @@ import java.util.concurrent.atomic.AtomicLong
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
-import org.json4s.DefaultFormats
-
class IoEngineConnExecutor(val id: Int, val outputLimit: Int = 10)
extends ConcurrentComputationExecutor(outputLimit)
with Logging {
- implicit val formats = DefaultFormats
-
val fsIdCount = new AtomicLong()
val FS_ID_LIMIT = IOEngineConnConfiguration.IO_FS_ID_LIMIT.getValue
diff --git a/linkis-engineconn-plugins/jdbc/pom.xml b/linkis-engineconn-plugins/jdbc/pom.xml
index b42cb1ae8..bb4367308 100644
--- a/linkis-engineconn-plugins/jdbc/pom.xml
+++ b/linkis-engineconn-plugins/jdbc/pom.xml
@@ -172,13 +172,6 @@
<scope>provided</scope>
</dependency>
- <dependency>
- <groupId>org.json4s</groupId>
- <artifactId>json4s-jackson_${scala.binary.version}</artifactId>
- <version>${json4s.version}</version>
- <scope>provided</scope>
- </dependency>
-
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
diff --git a/linkis-engineconn-plugins/openlookeng/pom.xml b/linkis-engineconn-plugins/openlookeng/pom.xml
index 7f9f02f3a..8ccc2b4dc 100644
--- a/linkis-engineconn-plugins/openlookeng/pom.xml
+++ b/linkis-engineconn-plugins/openlookeng/pom.xml
@@ -27,10 +27,6 @@
<artifactId>linkis-engineplugin-openlookeng</artifactId>
- <properties>
- <openlookeng.version>1.5.0</openlookeng.version>
- </properties>
-
<dependencies>
<dependency>
<groupId>org.apache.linkis</groupId>
diff --git a/linkis-engineconn-plugins/pipeline/pom.xml b/linkis-engineconn-plugins/pipeline/pom.xml
index 4ff0bdf93..2f720f1e3 100644
--- a/linkis-engineconn-plugins/pipeline/pom.xml
+++ b/linkis-engineconn-plugins/pipeline/pom.xml
@@ -25,9 +25,6 @@
</parent>
<artifactId>linkis-engineplugin-pipeline</artifactId>
- <properties>
- <pipeline.version>1</pipeline.version>
- </properties>
<dependencies>
@@ -80,13 +77,6 @@
<scope>provided</scope>
</dependency>
- <dependency>
- <groupId>org.json4s</groupId>
- <artifactId>json4s-jackson_${scala.binary.version}</artifactId>
- <version>${json4s.version}</version>
- <scope>provided</scope>
- </dependency>
-
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
diff --git a/linkis-engineconn-plugins/presto/pom.xml b/linkis-engineconn-plugins/presto/pom.xml
index 0394a0fcb..2b2d234d7 100644
--- a/linkis-engineconn-plugins/presto/pom.xml
+++ b/linkis-engineconn-plugins/presto/pom.xml
@@ -26,10 +26,6 @@
<artifactId>linkis-engineplugin-presto</artifactId>
- <properties>
- <presto.version>0.234</presto.version>
- </properties>
-
<dependencies>
<dependency>
<groupId>org.apache.linkis</groupId>
diff --git a/linkis-engineconn-plugins/python/pom.xml b/linkis-engineconn-plugins/python/pom.xml
index 6ede3100f..6a9020dc8 100644
--- a/linkis-engineconn-plugins/python/pom.xml
+++ b/linkis-engineconn-plugins/python/pom.xml
@@ -25,9 +25,6 @@
</parent>
<artifactId>linkis-engineplugin-python</artifactId>
- <properties>
- <python.version>python2</python.version>
- </properties>
<dependencies>
<dependency>
@@ -90,13 +87,6 @@
<scope>provided</scope>
</dependency>
- <dependency>
- <groupId>org.json4s</groupId>
- <artifactId>json4s-jackson_${scala.binary.version}</artifactId>
- <version>${json4s.version}</version>
- <scope>provided</scope>
- </dependency>
-
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
diff --git a/linkis-engineconn-plugins/seatunnel/pom.xml b/linkis-engineconn-plugins/seatunnel/pom.xml
index e831db2be..ed843f9f2 100644
--- a/linkis-engineconn-plugins/seatunnel/pom.xml
+++ b/linkis-engineconn-plugins/seatunnel/pom.xml
@@ -25,9 +25,6 @@
</parent>
<artifactId>linkis-engineplugin-seatunnel</artifactId>
- <properties>
- <seatunnel.version>2.1.2</seatunnel.version>
- </properties>
<dependencies>
<dependency>
diff --git a/linkis-engineconn-plugins/shell/pom.xml b/linkis-engineconn-plugins/shell/pom.xml
index b622d7756..ad10c0c1a 100755
--- a/linkis-engineconn-plugins/shell/pom.xml
+++ b/linkis-engineconn-plugins/shell/pom.xml
@@ -25,9 +25,6 @@
</parent>
<artifactId>linkis-engineplugin-shell</artifactId>
- <properties>
- <shell.version>1</shell.version>
- </properties>
<dependencies>
<dependency>
@@ -98,13 +95,6 @@
<scope>provided</scope>
</dependency>
- <dependency>
- <groupId>org.json4s</groupId>
- <artifactId>json4s-jackson_${scala.binary.version}</artifactId>
- <version>${json4s.version}</version>
- <scope>provided</scope>
- </dependency>
-
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
diff --git a/linkis-engineconn-plugins/spark/pom.xml b/linkis-engineconn-plugins/spark/pom.xml
index eebadfbb4..c7993b28d 100644
--- a/linkis-engineconn-plugins/spark/pom.xml
+++ b/linkis-engineconn-plugins/spark/pom.xml
@@ -159,13 +159,6 @@
<scope>provided</scope>
</dependency>
- <dependency>
- <groupId>org.json4s</groupId>
- <artifactId>json4s-jackson_${scala.binary.version}</artifactId>
- <version>${json4s.version}</version>
- <scope>provided</scope>
- </dependency>
-
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/Interpreter/Interpreter.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/Interpreter/Interpreter.scala
deleted file mode 100644
index f6d6c797e..000000000
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/Interpreter/Interpreter.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.engineplugin.spark.Interpreter
-
-import org.apache.linkis.common.utils.Utils
-import org.apache.linkis.engineplugin.spark.common.State
-import org.apache.linkis.scheduler.executer.ExecuteResponse
-
-import scala.concurrent.TimeoutException
-import scala.concurrent.duration.Duration
-
-/**
- */
-
-trait Interpreter {
- def state: State
-
- def execute(code: String): ExecuteResponse
-
- def close(): Unit
-
- @throws(classOf[TimeoutException])
- @throws(classOf[InterruptedException])
- final def waitForStateChange(oldState: State, atMost: Duration): Unit = {
- Utils.waitUntil({ () => state != oldState }, atMost)
- }
-
-}
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/Interpreter/ProcessInterpreter.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/Interpreter/ProcessInterpreter.scala
deleted file mode 100644
index 171d48e4f..000000000
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/Interpreter/ProcessInterpreter.scala
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.engineplugin.spark.Interpreter
-
-import org.apache.linkis.common.utils.{Logging, Utils}
-import org.apache.linkis.engineplugin.spark.common._
-import org.apache.linkis.scheduler.executer.{
- ErrorExecuteResponse,
- ExecuteResponse,
- SuccessExecuteResponse
-}
-
-import org.apache.commons.io.IOUtils
-
-import java.io.{BufferedReader, InputStreamReader, PrintWriter}
-import java.util.concurrent.TimeUnit
-
-import scala.concurrent.{Await, ExecutionContext, Future}
-import scala.concurrent.duration.Duration
-
-import org.json4s._
-
-/**
- */
-abstract class ProcessInterpreter(process: Process) extends Interpreter with Logging {
-
- implicit val executor: ExecutionContext = ExecutionContext.global
-
- protected[this] var _state: State = Starting()
-
- protected[this] val stdin = new PrintWriter(process.getOutputStream)
-
- protected[this] val stdout =
- new BufferedReader(new InputStreamReader(process.getInputStream()), 1)
-
- protected[this] val errOut = new LineBufferedStream(process.getErrorStream())
-
- override def state: State = _state
-
- override def execute(code: String): ExecuteResponse = {
- if (code == "sc.cancelAllJobs" || code == "sc.cancelAllJobs()") {
- sendExecuteRequest(code)
- }
- _state match {
- case (Dead() | ShuttingDown() | Error() | Success()) =>
- throw new IllegalStateException("interpreter is not running")
- case Idle() =>
- require(state == Idle())
- code match {
- case "SHUTDOWN" =>
- sendShutdownRequest()
- close()
- ErrorExecuteResponse("shutdown", new Exception("shutdown"))
- case _ =>
- _state = Busy()
- sendExecuteRequest(code) match {
- case Some(rep) =>
- _state = Idle()
- // ExecuteComplete(rep)
- SuccessExecuteResponse()
- case None =>
- _state = Error()
- val errorMsg = errOut.lines.mkString(", ")
- throw new Exception(errorMsg)
- }
- }
- case _ =>
- throw new IllegalStateException(s"interpreter is in ${_state} state, cannot do query.")
- }
- }
-
- Future {
- val exitCode = process.waitFor()
- if (exitCode != 0) {
- // scalastyle:off println
- errOut.lines.foreach(println)
- println(getClass.getSimpleName + " has stopped with exit code " + process.exitValue)
- _state = Error()
- } else {
- println(getClass.getSimpleName + " has finished.")
- _state = Success()
- }
- }
-
- protected def waitUntilReady(): Unit
-
- protected def sendExecuteRequest(request: String): Option[JValue]
-
- protected def sendShutdownRequest(): Unit = {}
-
- override def close(): Unit = {
- val future = Future {
- _state match {
- case (Dead() | ShuttingDown() | Success()) =>
- Future.successful()
- case _ =>
- sendShutdownRequest()
- }
- }
- _state = Dead()
- IOUtils.closeQuietly(stdin)
- IOUtils.closeQuietly(stdout)
- errOut.close
- // scalastyle:off awaitresult
- Utils.tryFinally(Await.result(future, Duration(10, TimeUnit.SECONDS))) {
- process.destroy()
- }
- }
-
-}
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/Interpreter/PythonInterpreter.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/Interpreter/PythonInterpreter.scala
index 4223db8ba..dbbac2623 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/Interpreter/PythonInterpreter.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/Interpreter/PythonInterpreter.scala
@@ -17,178 +17,27 @@
package org.apache.linkis.engineplugin.spark.Interpreter
-import org.apache.linkis.common.conf.CommonVars
import org.apache.linkis.common.io.FsPath
import org.apache.linkis.common.utils.{ClassUtils, Logging, Utils}
-import org.apache.linkis.engineplugin.spark.common.LineBufferedStream
import org.apache.linkis.engineplugin.spark.config.SparkConfiguration
import org.apache.linkis.storage.FSFactory
import org.apache.commons.io.IOUtils
-import org.apache.spark.{SparkContext, SparkException}
+import org.apache.spark.SparkContext
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.catalyst.expressions.Attribute
import java.io._
import java.nio.file.Files
-import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
-import org.json4s.{DefaultFormats, JValue}
-import org.json4s.jackson.JsonMethods._
-import org.json4s.jackson.Serialization
-import py4j.GatewayServer
-
/**
*/
object PythonInterpreter {
- def create(): Interpreter = {
- val pythonExec = CommonVars("PYSPARK_DRIVER_PYTHON", "python").getValue
-
- val gatewayServer = new GatewayServer(SQLSession, 0)
- gatewayServer.start()
-
- val builder = new ProcessBuilder(Array(pythonExec, createFakeShell().toString).toList.asJava)
-
- val env = builder.environment()
- env.put("PYTHONPATH", pythonPath)
- env.put("PYTHONUNBUFFERED", "YES")
- env.put("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort)
- env.put("SPARK_HOME", SparkConfiguration.SPARK_HOME.getValue)
-
- val process = builder.start()
-
- new PythonInterpreter(process, gatewayServer)
- }
-
- def pythonPath: String = {
- val pythonPath = new ArrayBuffer[String]
- val pythonHomePath = new File(SparkConfiguration.SPARK_HOME.getValue, "python").getPath
- val pythonParentPath = new File(pythonHomePath, "lib")
- pythonPath += pythonHomePath
- pythonParentPath
- .listFiles(new FileFilter {
- override def accept(pathname: File): Boolean = pathname.getName.endsWith(".zip")
- })
- .foreach(f => pythonPath += f.getPath)
- ClassUtils.jarOfClass(classOf[SparkContext]).foreach(pythonPath += _)
- pythonPath.mkString(File.pathSeparator)
- }
-
- def createFakeShell(): File = createFakeShell("python/fake_shell.py")
-
- def createFakeShell(script: String, fileType: String = ".py"): File = {
- val source: InputStream = getClass.getClassLoader.getResourceAsStream(script)
-
- val file = Files.createTempFile("", fileType).toFile
- file.deleteOnExit()
-
- val sink = new FileOutputStream(file)
- val buf = new Array[Byte](1024)
- var n = source.read(buf)
-
- while (n > 0) {
- sink.write(buf, 0, n)
- n = source.read(buf)
- }
-
- source.close()
- sink.close()
-
- file
- }
-
- private def createFakePySpark(): File = {
- val source: InputStream = getClass.getClassLoader.getResourceAsStream("fake_pyspark.sh")
-
- val file = Files.createTempFile("", "").toFile
- file.deleteOnExit()
-
- file.setExecutable(true)
-
- val sink = new FileOutputStream(file)
- val buf = new Array[Byte](1024)
- var n = source.read(buf)
-
- while (n > 0) {
- sink.write(buf, 0, n)
- n = source.read(buf)
- }
-
- source.close()
- sink.close()
-
- file
- }
-
-}
-
-private class PythonInterpreter(process: Process, gatewayServer: GatewayServer)
- extends ProcessInterpreter(process)
- with Logging {
- implicit val formats = DefaultFormats
-
- override def close(): Unit = {
- try {
- super.close()
- } finally {
- gatewayServer.shutdown()
- }
- }
-
- final override protected def waitUntilReady(): Unit = {
- var running = false
- val code =
- try process.exitValue
- catch { case t: IllegalThreadStateException => running = true; -1 }
- if (!running) {
- throw new SparkException(
- s"Spark python application has already finished with exit code $code, now exit..."
- )
- }
- var continue = true
- val initOut = new LineBufferedStream(process.getInputStream)
- val iterable = initOut.iterator
- while (continue && iterable.hasNext) {
- iterable.next match {
- // scalastyle:off println
- case "READY" => println("Start python application succeed."); continue = false
- case str: String => println(str)
- case _ =>
- }
- }
- initOut.close
- }
-
- override protected def sendExecuteRequest(code: String): Option[JValue] = {
- val rep = sendRequest(Map("msg_type" -> "execute_request", "content" -> Map("code" -> code)))
- rep.map { rep =>
- assert((rep \ "msg_type").extract[String] == "execute_reply")
-
- val content: JValue = rep \ "content"
-
- content
- }
- }
-
- override protected def sendShutdownRequest(): Unit = {
- sendRequest(Map("msg_type" -> "shutdown_request", "content" -> ())).foreach { rep =>
- logger.warn(f"process failed to shut down while returning $rep")
- }
- }
-
- private def sendRequest(request: Map[String, Any]): Option[JValue] = {
- // scalastyle:off println
- stdin.println(Serialization.write(request))
- stdin.flush()
-
- Option(stdout.readLine()).map { line => parse(line) }
- }
-
def pythonPath: String = {
val pythonPath = new ArrayBuffer[String]
val pythonHomePath = new File(SparkConfiguration.SPARK_HOME.getValue, "python").getPath
@@ -296,7 +145,7 @@ object SQLSession extends Logging {
logger.warn(s"Fetched $colCount col(s) : $index row(s).")
sc.clearJobGroup()
Utils.tryFinally({
- msg.flush();
+ msg.flush()
msg.toString
}) { () => IOUtils.closeQuietly(msg) }
}
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/common/SparkKind.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/common/SparkKind.scala
index 46e8b5def..26c8ea3fc 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/common/SparkKind.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/common/SparkKind.scala
@@ -17,9 +17,6 @@
package org.apache.linkis.engineplugin.spark.common
-import org.json4s.CustomSerializer
-import org.json4s.JsonAST.JString
-
/**
*/
object SparkKind {
@@ -91,23 +88,3 @@ case class SparkDataCalc() extends Kind {
case class SparkMLSQL() extends Kind {
override val toString = SparkKind.SPARKMLSQL_TYPE
}
-
-case object SparkSessionKindSerializer
- extends CustomSerializer[Kind](implicit formats =>
- (
- {
- case JString(SparkKind.SPARKSCALA_TYPE) | JString(SparkKind.SCALA_LAN) => SparkScala()
- case JString(SparkKind.PYSPARK_TYPE) | JString(SparkKind.PYTHON_LAN) | JString(
- SparkKind.PYTHON_END
- ) =>
- PySpark()
- case JString(SparkKind.SPARKR_TYPE) | JString(SparkKind.R_LAN) => SparkR()
- case JString(SparkKind.SPARKMIX_TYPE) | JString(SparkKind.MIX_TYPE) => SparkMix()
- case JString(SparkKind.SQL_LAN) | JString(SparkKind.SPARKSQL_TYPE) => SparkSQL()
- case JString(SparkKind.SPARKMLSQL_TYPE) | JString(SparkKind.ML_LAN) => SparkMLSQL()
- },
- { case kind: Kind =>
- JString(kind.toString)
- }
- )
- )
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/imexport/ExportData.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/imexport/ExportData.scala
index 187277349..cbfb5195c 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/imexport/ExportData.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/imexport/ExportData.scala
@@ -20,22 +20,19 @@ package org.apache.linkis.engineplugin.spark.imexport
import org.apache.linkis.common.utils.Logging
import org.apache.linkis.engineplugin.spark.config.SparkConfiguration
import org.apache.linkis.engineplugin.spark.imexport.util.BackGroundServiceUtils
+import org.apache.linkis.server.BDPJettyServerHelper
import org.apache.spark.sql.SparkSession
-import org.json4s.{DefaultFormats, _}
-import org.json4s.jackson.JsonMethods._
-
/**
*/
object ExportData extends Logging {
- implicit val formats = DefaultFormats
def exportData(spark: SparkSession, dataInfo: String, destination: String): Unit = {
exportDataFromFile(
spark,
- parse(dataInfo).extract[Map[String, Any]],
- parse(destination).extract[Map[String, Any]]
+ BDPJettyServerHelper.gson.fromJson(dataInfo, classOf[Map[String, Any]]),
+ BDPJettyServerHelper.gson.fromJson(destination, classOf[Map[String, Any]])
)
}
@@ -43,8 +40,8 @@ object ExportData extends Logging {
val dataInfo = BackGroundServiceUtils.exchangeExecutionCode(dataInfoPath)
exportDataFromFile(
spark,
- parse(dataInfo).extract[Map[String, Any]],
- parse(destination).extract[Map[String, Any]]
+ BDPJettyServerHelper.gson.fromJson(dataInfo, classOf[Map[String, Any]]),
+ BDPJettyServerHelper.gson.fromJson(destination, classOf[Map[String, Any]])
)
}
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/imexport/LoadData.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/imexport/LoadData.scala
index 6d278175b..fd8e5cac5 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/imexport/LoadData.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/imexport/LoadData.scala
@@ -22,6 +22,7 @@ import org.apache.linkis.engineplugin.spark.config.SparkConfiguration
import org.apache.linkis.engineplugin.spark.imexport.util.{BackGroundServiceUtils, ImExportUtils}
import org.apache.linkis.hadoop.common.conf.HadoopConf
import org.apache.linkis.hadoop.common.utils.HDFSUtils
+import org.apache.linkis.server.BDPJettyServerHelper
import org.apache.linkis.storage.excel.XlsUtils
import org.apache.commons.lang3.StringUtils
@@ -35,26 +36,22 @@ import java.util.Locale
import scala.collection.JavaConverters._
-import org.json4s._
-import org.json4s.jackson.JsonMethods._
-
/**
*/
object LoadData {
- implicit val formats = DefaultFormats
def loadDataToTable(spark: SparkSession, source: String, destination: String): Unit = {
- create_table_from_a_file(spark, parse(source), parse(destination))
+ create_table_from_a_file(spark, source, destination)
}
def loadDataToTableByFile(spark: SparkSession, destinationPath: String, source: String): Unit = {
val destination = BackGroundServiceUtils.exchangeExecutionCode(destinationPath)
- create_table_from_a_file(spark, parse(source), parse(destination))
+ create_table_from_a_file(spark, source, destination)
}
- def create_table_from_a_file(spark: SparkSession, src: JValue, dest: JValue): Unit = {
- val source = src.extract[Map[String, Any]]
- val destination = dest.extract[Map[String, Any]]
+ def create_table_from_a_file(spark: SparkSession, src: String, dest: String): Unit = {
+ val source = BDPJettyServerHelper.gson.fromJson(src, classOf[Map[String, Any]])
+ val destination = BDPJettyServerHelper.gson.fromJson(src, classOf[Map[String, Any]])
var path = getMapValue[String](source, "path")
val pathType = getMapValue[String](source, "pathType", "share")
@@ -79,7 +76,9 @@ object LoadData {
val partition = getMapValue[String](destination, "partition", "ds")
val partitionValue = getMapValue[String](destination, "partitionValue", "1993-01-02")
- val columns = (dest \ "columns").extract[List[Map[String, Any]]]
+ val columnsJson = getMapValue[String](destination, "columns", "")
+ val columns = BDPJettyServerHelper.gson.fromJson(columnsJson, classOf[List[Map[String, Any]]])
+
val dateFormats =
columns.map(_.get("dateFormat").get.toString).map(f => if (f isEmpty) "yyyy-MM-dd" else f)
var isFirst = true
@@ -204,20 +203,6 @@ object LoadData {
hdfsPath
}
- def getNodeValue[T](json: JValue, node: String, default: T = null.asInstanceOf[T])(implicit
- m: Manifest[T]
- ): T = {
- json \ node match {
- case JNothing => default
- case value: JValue =>
- if ("JString()".equals(value.toString)) default
- else {
- try value.extract[T]
- catch { case t: Throwable => default }
- }
- }
- }
-
def getMapValue[T](map: Map[String, Any], key: String, default: T = null.asInstanceOf[T]): T = {
val value = map.get(key).map(_.asInstanceOf[T]).getOrElse(default)
if (StringUtils.isEmpty(value.toString)) {
diff --git a/linkis-engineconn-plugins/sqoop/pom.xml b/linkis-engineconn-plugins/sqoop/pom.xml
index 5428047fe..96ca23cd8 100644
--- a/linkis-engineconn-plugins/sqoop/pom.xml
+++ b/linkis-engineconn-plugins/sqoop/pom.xml
@@ -25,10 +25,6 @@
</parent>
<artifactId>linkis-engineplugin-sqoop</artifactId>
- <properties>
- <sqoop.version>1.4.6</sqoop.version>
- <hive.version>3.1.2</hive.version>
- </properties>
<dependencies>
<dependency>
diff --git a/linkis-engineconn-plugins/trino/pom.xml b/linkis-engineconn-plugins/trino/pom.xml
index 246f54751..d9ea2d686 100644
--- a/linkis-engineconn-plugins/trino/pom.xml
+++ b/linkis-engineconn-plugins/trino/pom.xml
@@ -25,9 +25,7 @@
</parent>
<artifactId>linkis-engineplugin-trino</artifactId>
- <properties>
- <trino.version>371</trino.version>
- </properties>
+
<dependencies>
<dependency>
<groupId>org.apache.linkis</groupId>
diff --git a/linkis-hadoop-hdfs-client-shade/pom.xml b/linkis-hadoop-hdfs-client-shade/pom.xml
index e4990f857..560bbaa69 100644
--- a/linkis-hadoop-hdfs-client-shade/pom.xml
+++ b/linkis-hadoop-hdfs-client-shade/pom.xml
@@ -207,7 +207,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
- <version>3.3.0</version>
+ <version>${maven-shade-plugin.version}</version>
<configuration>
<shadeSourcesContent>true</shadeSourcesContent>
<shadedArtifactAttached>false</shadedArtifactAttached>
diff --git a/pom.xml b/pom.xml
index 8549ec8e9..0d4170613 100644
--- a/pom.xml
+++ b/pom.xml
@@ -105,6 +105,18 @@
<properties>
<revision>1.3.2-SNAPSHOT</revision>
<jedis.version>2.9.2</jedis.version>
+ <elasticsearch.version>7.6.2</elasticsearch.version>
+ <flink.version>1.12.2</flink.version>
+ <io_file.version>1.0</io_file.version>
+ <jdbc.version>4</jdbc.version>
+ <trino.version>371</trino.version>
+ <openlookeng.version>1.5.0</openlookeng.version>
+ <pipeline.version>1</pipeline.version>
+ <presto.version>0.234</presto.version>
+ <python.version>python2</python.version>
+ <seatunnel.version>2.1.2</seatunnel.version>
+ <shell.version>1</shell.version>
+ <sqoop.version>1.4.6</sqoop.version>
<spark.version>3.2.1</spark.version>
<hive.version>3.1.3</hive.version>
<hadoop.version>3.3.4</hadoop.version>
@@ -124,7 +136,6 @@
<!-- json -->
<gson.version>2.8.9</gson.version>
<jackson-bom.version>2.13.4.20221013</jackson-bom.version>
- <!-- spark2.4 use 3.5.3, spark3.2 use 3.7.0-M11 -->
<json4s.version>3.7.0-M11</json4s.version>
<jersey.version>1.19.4</jersey.version>
@@ -141,6 +152,7 @@
<mysql.connector.version>8.0.28</mysql.connector.version>
<druid.version>1.1.22</druid.version>
<javassist.version>3.27.0-GA</javassist.version>
+ <commons-cli.version>1.3.1</commons-cli.version>
<commons-collections.version>3.2.2</commons-collections.version>
<commons-lang.version>2.6</commons-lang.version>
<commons-lang3.version>3.12.0</commons-lang3.version>
@@ -201,6 +213,8 @@
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<maven-jar-plugin.version>3.3.0</maven-jar-plugin.version>
+ <maven-shade-plugin.version>3.3.0</maven-shade-plugin.version>
+ <maven-helper-plugin.version>3.2.0</maven-helper-plugin.version>
<scala-maven-plugin.version>4.7.1</scala-maven-plugin.version>
<spotless-maven-plugin.version>2.24.1</spotless-maven-plugin.version>
<jacoco-maven-plugin.version>0.8.7</jacoco-maven-plugin.version>
diff --git a/tool/dependencies/known-dependencies.txt b/tool/dependencies/known-dependencies.txt
index 3c4cbf9d3..55f0163aa 100644
--- a/tool/dependencies/known-dependencies.txt
+++ b/tool/dependencies/known-dependencies.txt
@@ -311,10 +311,10 @@ jpam-1.1.jar
json-0.191.jar
json-0.193.jar
json-1.8.jar
-json4s-ast_2.11-3.5.3.jar
-json4s-core_2.11-3.5.3.jar
-json4s-jackson_2.11-3.5.3.jar
-json4s-scalap_2.11-3.5.3.jar
+json4s-ast_2.11-3.7.0-M11.jar
+json4s-core_2.11-3.7.0-M11.jar
+json4s-jackson_2.11-3.7.0-M11.jar
+json4s-scalap_2.11-3.7.0-M11.jar
jsp-api-2.1.jar
jsqlparser-1.0.jar
jsqlparser-4.2.jar
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org