You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2022/01/05 08:40:14 UTC

[incubator-linkis] branch dev-1.0.3 updated: Pyspark support CDH and HDP versions

This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch dev-1.0.3
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git


The following commit(s) were added to refs/heads/dev-1.0.3 by this push:
     new d042983  Pyspark support CDH and HDP versions
     new ceea4c7  Merge pull request #1269 from Zosimer/pyspark_sparkversion_bug
d042983 is described below

commit d0429834a45e02d69fb76b0d44b29e8f5f46ef63
Author: e <31...@qq.com>
AuthorDate: Tue Jan 4 20:49:09 2022 +0800

    Pyspark support CDH and HDP versions
    
    Because pyspark obtains the spark version number through ‘spark submit -- version’, and converts the version number into an integer. For example, spark-2.4.1 has a version number of 241, but the spark version of CDH is 2.4.1-cdh6 3.2 this type cannot be converted to integer. The spark version of HDP is 2.3.2.3.1.4.0-315. After conversion, it is 2.3.2.3.1.4.0-315, and then an error will be reported。
    Therefore, I withdraw the top five digits of the version number, such as 2.4.1-cdh6 3.2, take 2.4.1 as the version number, which is similar to HDP and can meet the requirements。
---
 .../engineplugin/spark/utils/EngineUtils.scala     | 51 +++++++++++++---------
 1 file changed, 30 insertions(+), 21 deletions(-)

diff --git a/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/EngineUtils.scala b/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/EngineUtils.scala
index c7f86e3..1962d8c 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/EngineUtils.scala
+++ b/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/EngineUtils.scala
@@ -14,13 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
- 
+
 package org.apache.linkis.engineplugin.spark.utils
 
 import java.io.{IOException, InputStream, OutputStream}
 import java.net.ServerSocket
 import java.util.HashMap
 
+import org.apache.commons.lang3.StringUtils
 import org.apache.linkis.common.conf.CommonVars
 import org.apache.linkis.common.io.FsPath
 import org.apache.linkis.common.utils.Utils
@@ -29,11 +30,11 @@ import org.apache.linkis.rpc.Sender
 import org.apache.linkis.storage.resultset.ResultSetReader
 import org.apache.linkis.storage.utils.StorageUtils
 import org.apache.linkis.storage.{FSFactory, LineMetaData}
-
+import org.apache.linkis.common.utils.Logging
 /**
-  *
-  */
-object EngineUtils {
+ *
+ */
+object EngineUtils extends  Logging{
   private val user:String = System.getProperty("user.name")
   private var sparkVersion: String = _
   private  var fileSystem : org.apache.linkis.common.io.Fs = _
@@ -46,24 +47,32 @@ object EngineUtils {
   }
 
   def sparkSubmitVersion(): String = {
-    if(sparkVersion != null) {
+    if (sparkVersion != null) {
       return sparkVersion
     }
-    val sparkSubmit = CommonVars("wds.linkis.server.spark-submit", "spark-submit").getValue
-    val pb = new ProcessBuilder(sparkSubmit, "--version")
-    pb.redirectErrorStream(true)
-    pb.redirectInput(ProcessBuilder.Redirect.PIPE)
-
-    val process = new LineBufferedProcess(pb.start())
-    val exitCode = process.waitFor()
-    val output = process.inputIterator.mkString("\n")
-
-    val regex = """version (.*)""".r.unanchored
-
-    sparkVersion = output match {
-      case regex(version) => version
-      case _ => throw new IOException(f"Unable to determing spark-submit version [$exitCode]:\n$output")
+    val sparkVersionVar = CommonVars("wds.linkis.engine.spark.version", "")
+    if (StringUtils.isNotBlank(sparkVersionVar.getValue.trim)) {
+      val output = sparkVersionVar.getValue.trim
+      val regex = """([\d.]*)""".r.unanchored
+      sparkVersion = output match {
+        case regex(version) => version
+        case _ => throw new IOException(f"spark version  is invalid :\n$output")
+      }
+    } else {
+      val sparkSubmit = CommonVars("wds.linkis.server.spark-submit", "spark-submit").getValue
+      val pb = new ProcessBuilder(sparkSubmit, "--version")
+      pb.redirectErrorStream(true)
+      pb.redirectInput(ProcessBuilder.Redirect.PIPE)
+      val process = new LineBufferedProcess(pb.start())
+      val exitCode = process.waitFor()
+      val output = process.inputIterator.mkString("\n")
+      val regex = """version ([\d.]*)""".r.unanchored
+      sparkVersion = output match {
+        case regex(version) => version
+        case _ => throw new IOException(f"Unable to determing spark-submit version [$exitCode]:\n$output")
+      }
     }
+    info("spark version is " + sparkVersion)
     sparkVersion
   }
 
@@ -113,7 +122,7 @@ object EngineUtils {
         while (resultSetReader.hasNext){
           sb.append(resultSetReader.getRecord).append("\n")
         }
-       sb.toString()
+        sb.toString()
       case _ => dolphinContent
     }
     errorMsg

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org