You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by wa...@apache.org on 2022/05/09 07:53:01 UTC
[incubator-linkis] branch dev-1.1.2 updated (613e142e4 -> d3b4b443e)
This is an automated email from the ASF dual-hosted git repository.
wangzhen pushed a change to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
from 613e142e4 UseFormat is required in functionManagement and udfManagement (#2087)
new 50cd8a952 Optimize the progress get logic of Hive and Spark
new 490333e83 add engine conn status scan time conf
new d3b4b443e optimize code
The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
.../computation/executor/utlis/ProgressUtils.scala | 33 ++++++++++++----------
.../conf/AccessibleExecutorConfiguration.scala | 1 +
.../execution/AccessibleEngineConnExecution.scala | 4 +--
.../hive/executor/HiveEngineConnExecutor.scala | 9 ++++--
.../spark/executor/SparkEngineConnExecutor.scala | 27 +++++-------------
5 files changed, 35 insertions(+), 39 deletions(-)
copy linkis-engineconn-plugins/engineconn-plugins/pipeline/src/main/scala/org/apache/linkis/manager/engineplugin/pipeline/executor/PipeLineExecutor.scala => linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/utlis/ProgressUtils.scala (57%)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org
[incubator-linkis] 01/03: Optimize the progress get logic of Hive and Spark
Posted by wa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
wangzhen pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
commit 50cd8a952b4a26c4e8ce28204f1443e968d01a48
Author: peacewong <wp...@gmail.com>
AuthorDate: Thu May 5 18:02:58 2022 +0800
Optimize the progress get logic of Hive and Spark
---
.../computation/executor/utlis/ProgressUtils.scala | 41 ++++++++++++++++++++++
.../hive/executor/HiveEngineConnExecutor.scala | 9 +++--
.../spark/executor/SparkEngineConnExecutor.scala | 27 ++++----------
3 files changed, 55 insertions(+), 22 deletions(-)
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/utlis/ProgressUtils.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/utlis/ProgressUtils.scala
new file mode 100644
index 000000000..695b783f3
--- /dev/null
+++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/utlis/ProgressUtils.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineconn.computation.executor.utlis
+
+import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext
+
+object ProgressUtils {
+
+ private val OLD_PROGRESS_KEY = "oldProgress"
+
+ def getOldProgress(engineExecutionContext: EngineExecutionContext): Float = {
+ if (null == engineExecutionContext) {
+ 0f
+ } else {
+ val value = engineExecutionContext.getProperties.get(OLD_PROGRESS_KEY)
+ if (null == value) 0f else value.asInstanceOf[Float]
+ }
+ }
+
+ def putProgress(newProgress: Float, engineExecutionContext: EngineExecutionContext): Unit = {
+ if (null != engineExecutionContext) {
+ engineExecutionContext.getProperties.put(OLD_PROGRESS_KEY, newProgress.asInstanceOf[AnyRef])
+ }
+ }
+
+}
diff --git a/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala b/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
index 68875f9ca..d91225d95 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
+++ b/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
@@ -55,6 +55,7 @@ import org.apache.linkis.engineconn.executor.entity.ResourceFetchExecutor
import org.apache.linkis.engineplugin.hive.conf.Counters
import org.apache.linkis.manager.common.protocol.resource.ResourceWithStatus
import org.apache.commons.lang.StringUtils
+import org.apache.linkis.engineconn.computation.executor.utlis.ProgressUtils
import org.apache.linkis.governance.common.utils.JobUtils
import scala.collection.JavaConversions._
@@ -405,8 +406,12 @@ class HiveEngineConnExecutor(id: Int,
}
logger.debug(s"hive progress is $totalProgress")
- if (totalProgress.isNaN || totalProgress.isInfinite) return currentBegin
- totalProgress + currentBegin
+ val newProgress = if (totalProgress.isNaN || totalProgress.isInfinite) currentBegin else totalProgress + currentBegin
+ val oldProgress = ProgressUtils.getOldProgress(this.engineExecutorContext)
+ if(newProgress < oldProgress) oldProgress else {
+ ProgressUtils.putProgress(newProgress, this.engineExecutorContext)
+ newProgress
+ }
} else 0.0f
}
diff --git a/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala b/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
index 88f1f2b01..6da22b3a6 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
+++ b/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
@@ -18,12 +18,10 @@
package org.apache.linkis.engineplugin.spark.executor
import org.apache.commons.lang3.StringUtils
-
-import java.util
-import java.util.concurrent.atomic.AtomicLong
import org.apache.linkis.common.log.LogUtils
import org.apache.linkis.common.utils.{ByteTimeUtils, Logging, Utils}
import org.apache.linkis.engineconn.computation.executor.execute.{ComputationExecutor, EngineExecutionContext}
+import org.apache.linkis.engineconn.computation.executor.utlis.ProgressUtils
import org.apache.linkis.engineconn.executor.entity.ResourceFetchExecutor
import org.apache.linkis.engineplugin.spark.common.Kind
import org.apache.linkis.engineplugin.spark.cs.CSSparkHelper
@@ -40,6 +38,8 @@ import org.apache.linkis.protocol.engine.JobProgressInfo
import org.apache.linkis.scheduler.executer.ExecuteResponse
import org.apache.spark.SparkContext
+import java.util
+import java.util.concurrent.atomic.AtomicLong
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
@@ -48,7 +48,7 @@ abstract class SparkEngineConnExecutor(val sc: SparkContext, id: Long) extends C
private var initialized: Boolean = false
- private val OLD_PROGRESS_KEY = "oldProgress"
+
private var jobGroup: String = _
@@ -112,28 +112,15 @@ abstract class SparkEngineConnExecutor(val sc: SparkContext, id: Long) extends C
}
- private def getOldProgress(): Float = {
- if (null == this.engineExecutionContext) {
- 0f
- } else {
- val value = this.engineExecutionContext.getProperties.get(OLD_PROGRESS_KEY)
- if (null == value) 0f else value.asInstanceOf[Float]
- }
- }
- private def putProgress(newProgress: Float): Unit = {
- if (null != this.engineExecutionContext) {
- this.engineExecutionContext.getProperties.put(OLD_PROGRESS_KEY, newProgress.asInstanceOf[AnyRef])
- }
- }
- override def progress(taskID: String): Float = if (jobGroup == null || engineExecutionContext.getTotalParagraph == 0) getOldProgress()
+ override def progress(taskID: String): Float = if (jobGroup == null || engineExecutionContext.getTotalParagraph == 0) ProgressUtils.getOldProgress(this.engineExecutionContext)
else {
val newProgress = (engineExecutionContext.getCurrentParagraph * 1f - 1f)/ engineExecutionContext.getTotalParagraph + JobProgressUtil.progress(sc, jobGroup)/engineExecutionContext.getTotalParagraph
val normalizedProgress = if (newProgress >= 1) newProgress - 0.1f else newProgress
- val oldProgress = getOldProgress()
+ val oldProgress = ProgressUtils.getOldProgress(this.engineExecutionContext)
if(normalizedProgress < oldProgress) oldProgress else {
- putProgress(normalizedProgress)
+ ProgressUtils.putProgress(normalizedProgress, this.engineExecutionContext)
normalizedProgress
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org
[incubator-linkis] 03/03: optimize code
Posted by wa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
wangzhen pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
commit d3b4b443e02e590d0958a14bc8158196d2c23977
Author: peacewong <wp...@gmail.com>
AuthorDate: Mon May 9 15:09:33 2022 +0800
optimize code
---
.../linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala b/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
index d91225d95..53bf60169 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
+++ b/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
@@ -406,7 +406,7 @@ class HiveEngineConnExecutor(id: Int,
}
logger.debug(s"hive progress is $totalProgress")
- val newProgress = if (totalProgress.isNaN || totalProgress.isInfinite) currentBegin else totalProgress + currentBegin
+ val newProgress = if (totalProgress.isNaN || totalProgress.isInfinite) currentBegin else totalProgress + currentBegin
val oldProgress = ProgressUtils.getOldProgress(this.engineExecutorContext)
if(newProgress < oldProgress) oldProgress else {
ProgressUtils.putProgress(newProgress, this.engineExecutorContext)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org
[incubator-linkis] 02/03: add engine conn status scan time conf
Posted by wa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
wangzhen pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
commit 490333e837fefaee5ad30c4d52f9d6bc554dad39
Author: peacewong <wp...@gmail.com>
AuthorDate: Sat May 7 10:47:34 2022 +0800
add engine conn status scan time conf
---
.../acessible/executor/conf/AccessibleExecutorConfiguration.scala | 1 +
.../acessible/executor/execution/AccessibleEngineConnExecution.scala | 4 ++--
2 files changed, 3 insertions(+), 2 deletions(-)
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorConfiguration.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorConfiguration.scala
index dff9b7990..ce63912bd 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorConfiguration.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorConfiguration.scala
@@ -45,6 +45,7 @@ object AccessibleExecutorConfiguration {
val ENGINECONN_HEARTBEAT_TIME = CommonVars("wds.linkis.engineconn.heartbeat.time", new TimeType("2m"))
+ val ENGINECONN_STATUS_SCAN_TIME = CommonVars("wds.linkis.engineconn.status.scan.time", new TimeType("1m"))
val ENABLE_MAINTAIN = CommonVars("wds.linkis.engineconn.maintain.enable", false)
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/execution/AccessibleEngineConnExecution.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/execution/AccessibleEngineConnExecution.scala
index 26943e2ca..a8a59d3c5 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/execution/AccessibleEngineConnExecution.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/execution/AccessibleEngineConnExecution.scala
@@ -74,7 +74,7 @@ class AccessibleEngineConnExecution extends EngineConnExecution with Logging {
val maxFreeTimeVar = AccessibleExecutorConfiguration.ENGINECONN_MAX_FREE_TIME.getValue(context.getOptions)
val maxFreeTimeStr = maxFreeTimeVar.toString
val maxFreeTime = maxFreeTimeVar.toLong
- logger.info("executorStatusChecker created, maxFreeTimeMills is " + maxFreeTime)
+ logger.info("executorStatusChecker created, maxFreeTimeMills is " + maxFreeTime)
Utils.defaultScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryAndWarn {
val accessibleExecutor = ExecutorManager.getInstance.getReportExecutor match {
@@ -104,7 +104,7 @@ class AccessibleEngineConnExecution extends EngineConnExecution with Logging {
}
}
- }, 3 * 60 * 1000, AccessibleExecutorConfiguration.ENGINECONN_HEARTBEAT_TIME.getValue.toLong, TimeUnit.MILLISECONDS)
+ }, 3 * 60 * 1000, AccessibleExecutorConfiguration.ENGINECONN_STATUS_SCAN_TIME.getValue.toLong, TimeUnit.MILLISECONDS)
}
def requestManagerReleaseExecutor(msg: String): Unit = {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org