You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2022/01/05 08:40:14 UTC
[incubator-linkis] branch dev-1.0.3 updated: Pyspark support CDH and HDP versions
This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.0.3
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.0.3 by this push:
new d042983 Pyspark support CDH and HDP versions
new ceea4c7 Merge pull request #1269 from Zosimer/pyspark_sparkversion_bug
d042983 is described below
commit d0429834a45e02d69fb76b0d44b29e8f5f46ef63
Author: e <31...@qq.com>
AuthorDate: Tue Jan 4 20:49:09 2022 +0800
Pyspark support CDH and HDP versions
Because pyspark obtains the spark version number through ‘spark submit -- version’, and converts the version number into an integer. For example, spark-2.4.1 has a version number of 241, but the spark version of CDH is 2.4.1-cdh6 3.2 this type cannot be converted to integer. The spark version of HDP is 2.3.2.3.1.4.0-315. After conversion, it is 2.3.2.3.1.4.0-315, and then an error will be reported。
Therefore, I withdraw the top five digits of the version number, such as 2.4.1-cdh6 3.2, take 2.4.1 as the version number, which is similar to HDP and can meet the requirements。
---
.../engineplugin/spark/utils/EngineUtils.scala | 51 +++++++++++++---------
1 file changed, 30 insertions(+), 21 deletions(-)
diff --git a/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/EngineUtils.scala b/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/EngineUtils.scala
index c7f86e3..1962d8c 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/EngineUtils.scala
+++ b/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/EngineUtils.scala
@@ -14,13 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
+
package org.apache.linkis.engineplugin.spark.utils
import java.io.{IOException, InputStream, OutputStream}
import java.net.ServerSocket
import java.util.HashMap
+import org.apache.commons.lang3.StringUtils
import org.apache.linkis.common.conf.CommonVars
import org.apache.linkis.common.io.FsPath
import org.apache.linkis.common.utils.Utils
@@ -29,11 +30,11 @@ import org.apache.linkis.rpc.Sender
import org.apache.linkis.storage.resultset.ResultSetReader
import org.apache.linkis.storage.utils.StorageUtils
import org.apache.linkis.storage.{FSFactory, LineMetaData}
-
+import org.apache.linkis.common.utils.Logging
/**
- *
- */
-object EngineUtils {
+ *
+ */
+object EngineUtils extends Logging{
private val user:String = System.getProperty("user.name")
private var sparkVersion: String = _
private var fileSystem : org.apache.linkis.common.io.Fs = _
@@ -46,24 +47,32 @@ object EngineUtils {
}
def sparkSubmitVersion(): String = {
- if(sparkVersion != null) {
+ if (sparkVersion != null) {
return sparkVersion
}
- val sparkSubmit = CommonVars("wds.linkis.server.spark-submit", "spark-submit").getValue
- val pb = new ProcessBuilder(sparkSubmit, "--version")
- pb.redirectErrorStream(true)
- pb.redirectInput(ProcessBuilder.Redirect.PIPE)
-
- val process = new LineBufferedProcess(pb.start())
- val exitCode = process.waitFor()
- val output = process.inputIterator.mkString("\n")
-
- val regex = """version (.*)""".r.unanchored
-
- sparkVersion = output match {
- case regex(version) => version
- case _ => throw new IOException(f"Unable to determing spark-submit version [$exitCode]:\n$output")
+ val sparkVersionVar = CommonVars("wds.linkis.engine.spark.version", "")
+ if (StringUtils.isNotBlank(sparkVersionVar.getValue.trim)) {
+ val output = sparkVersionVar.getValue.trim
+ val regex = """([\d.]*)""".r.unanchored
+ sparkVersion = output match {
+ case regex(version) => version
+ case _ => throw new IOException(f"spark version is invalid :\n$output")
+ }
+ } else {
+ val sparkSubmit = CommonVars("wds.linkis.server.spark-submit", "spark-submit").getValue
+ val pb = new ProcessBuilder(sparkSubmit, "--version")
+ pb.redirectErrorStream(true)
+ pb.redirectInput(ProcessBuilder.Redirect.PIPE)
+ val process = new LineBufferedProcess(pb.start())
+ val exitCode = process.waitFor()
+ val output = process.inputIterator.mkString("\n")
+ val regex = """version ([\d.]*)""".r.unanchored
+ sparkVersion = output match {
+ case regex(version) => version
+ case _ => throw new IOException(f"Unable to determing spark-submit version [$exitCode]:\n$output")
+ }
}
+ info("spark version is " + sparkVersion)
sparkVersion
}
@@ -113,7 +122,7 @@ object EngineUtils {
while (resultSetReader.hasNext){
sb.append(resultSetReader.getRecord).append("\n")
}
- sb.toString()
+ sb.toString()
case _ => dolphinContent
}
errorMsg
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org