You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by wa...@apache.org on 2022/05/09 07:53:01 UTC

[incubator-linkis] branch dev-1.1.2 updated (613e142e4 -> d3b4b443e)

This is an automated email from the ASF dual-hosted git repository.

wangzhen pushed a change to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git


    from 613e142e4 UseFormat is required in functionManagement and udfManagement (#2087)
     new 50cd8a952 Optimize the progress get logic of Hive and Spark
     new 490333e83 add engine conn status scan time conf
     new d3b4b443e optimize code

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../computation/executor/utlis/ProgressUtils.scala | 33 ++++++++++++----------
 .../conf/AccessibleExecutorConfiguration.scala     |  1 +
 .../execution/AccessibleEngineConnExecution.scala  |  4 +--
 .../hive/executor/HiveEngineConnExecutor.scala     |  9 ++++--
 .../spark/executor/SparkEngineConnExecutor.scala   | 27 +++++-------------
 5 files changed, 35 insertions(+), 39 deletions(-)
 copy linkis-engineconn-plugins/engineconn-plugins/pipeline/src/main/scala/org/apache/linkis/manager/engineplugin/pipeline/executor/PipeLineExecutor.scala => linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/utlis/ProgressUtils.scala (57%)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org


[incubator-linkis] 01/03: Optimize the progress get logic of Hive and Spark

Posted by wa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wangzhen pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git

commit 50cd8a952b4a26c4e8ce28204f1443e968d01a48
Author: peacewong <wp...@gmail.com>
AuthorDate: Thu May 5 18:02:58 2022 +0800

    Optimize the progress get logic of Hive and Spark
---
 .../computation/executor/utlis/ProgressUtils.scala | 41 ++++++++++++++++++++++
 .../hive/executor/HiveEngineConnExecutor.scala     |  9 +++--
 .../spark/executor/SparkEngineConnExecutor.scala   | 27 ++++----------
 3 files changed, 55 insertions(+), 22 deletions(-)

diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/utlis/ProgressUtils.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/utlis/ProgressUtils.scala
new file mode 100644
index 000000000..695b783f3
--- /dev/null
+++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/utlis/ProgressUtils.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineconn.computation.executor.utlis
+
+import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext
+
+object ProgressUtils {
+
+  private val OLD_PROGRESS_KEY = "oldProgress"
+
+   def getOldProgress(engineExecutionContext: EngineExecutionContext): Float = {
+    if (null == engineExecutionContext) {
+      0f
+    } else {
+      val value = engineExecutionContext.getProperties.get(OLD_PROGRESS_KEY)
+      if (null == value) 0f else value.asInstanceOf[Float]
+    }
+  }
+
+   def putProgress(newProgress: Float, engineExecutionContext: EngineExecutionContext): Unit = {
+    if (null != engineExecutionContext) {
+      engineExecutionContext.getProperties.put(OLD_PROGRESS_KEY, newProgress.asInstanceOf[AnyRef])
+    }
+  }
+
+}
diff --git a/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala b/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
index 68875f9ca..d91225d95 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
+++ b/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
@@ -55,6 +55,7 @@ import org.apache.linkis.engineconn.executor.entity.ResourceFetchExecutor
 import org.apache.linkis.engineplugin.hive.conf.Counters
 import org.apache.linkis.manager.common.protocol.resource.ResourceWithStatus
 import org.apache.commons.lang.StringUtils
+import org.apache.linkis.engineconn.computation.executor.utlis.ProgressUtils
 import org.apache.linkis.governance.common.utils.JobUtils
 
 import scala.collection.JavaConversions._
@@ -405,8 +406,12 @@ class HiveEngineConnExecutor(id: Int,
       }
 
       logger.debug(s"hive progress is $totalProgress")
-      if (totalProgress.isNaN || totalProgress.isInfinite) return currentBegin
-      totalProgress + currentBegin
+      val newProgress = if (totalProgress.isNaN || totalProgress.isInfinite)  currentBegin else totalProgress + currentBegin
+      val oldProgress = ProgressUtils.getOldProgress(this.engineExecutorContext)
+      if(newProgress < oldProgress) oldProgress else {
+        ProgressUtils.putProgress(newProgress, this.engineExecutorContext)
+        newProgress
+      }
     } else 0.0f
   }
 
diff --git a/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala b/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
index 88f1f2b01..6da22b3a6 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
+++ b/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
@@ -18,12 +18,10 @@
 package org.apache.linkis.engineplugin.spark.executor
 
 import org.apache.commons.lang3.StringUtils
-
-import java.util
-import java.util.concurrent.atomic.AtomicLong
 import org.apache.linkis.common.log.LogUtils
 import org.apache.linkis.common.utils.{ByteTimeUtils, Logging, Utils}
 import org.apache.linkis.engineconn.computation.executor.execute.{ComputationExecutor, EngineExecutionContext}
+import org.apache.linkis.engineconn.computation.executor.utlis.ProgressUtils
 import org.apache.linkis.engineconn.executor.entity.ResourceFetchExecutor
 import org.apache.linkis.engineplugin.spark.common.Kind
 import org.apache.linkis.engineplugin.spark.cs.CSSparkHelper
@@ -40,6 +38,8 @@ import org.apache.linkis.protocol.engine.JobProgressInfo
 import org.apache.linkis.scheduler.executer.ExecuteResponse
 import org.apache.spark.SparkContext
 
+import java.util
+import java.util.concurrent.atomic.AtomicLong
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
@@ -48,7 +48,7 @@ abstract class SparkEngineConnExecutor(val sc: SparkContext, id: Long) extends C
 
   private var initialized: Boolean = false
 
-  private val OLD_PROGRESS_KEY = "oldProgress"
+
 
   private var jobGroup: String = _
 
@@ -112,28 +112,15 @@ abstract class SparkEngineConnExecutor(val sc: SparkContext, id: Long) extends C
   }
 
 
-  private def getOldProgress(): Float = {
-    if (null == this.engineExecutionContext) {
-      0f
-    } else {
-      val value = this.engineExecutionContext.getProperties.get(OLD_PROGRESS_KEY)
-      if (null == value) 0f else value.asInstanceOf[Float]
-    }
-  }
 
-  private def putProgress(newProgress: Float): Unit = {
-    if (null != this.engineExecutionContext) {
-      this.engineExecutionContext.getProperties.put(OLD_PROGRESS_KEY, newProgress.asInstanceOf[AnyRef])
-    }
-  }
 
-  override def progress(taskID: String): Float = if (jobGroup == null || engineExecutionContext.getTotalParagraph == 0) getOldProgress()
+  override def progress(taskID: String): Float = if (jobGroup == null || engineExecutionContext.getTotalParagraph == 0) ProgressUtils.getOldProgress(this.engineExecutionContext)
   else {
     val newProgress = (engineExecutionContext.getCurrentParagraph * 1f - 1f)/ engineExecutionContext.getTotalParagraph + JobProgressUtil.progress(sc, jobGroup)/engineExecutionContext.getTotalParagraph
     val normalizedProgress = if (newProgress >= 1) newProgress - 0.1f else newProgress
-    val oldProgress = getOldProgress()
+    val oldProgress = ProgressUtils.getOldProgress(this.engineExecutionContext)
     if(normalizedProgress < oldProgress) oldProgress else {
-      putProgress(normalizedProgress)
+      ProgressUtils.putProgress(normalizedProgress, this.engineExecutionContext)
       normalizedProgress
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org


[incubator-linkis] 03/03: optimize code

Posted by wa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wangzhen pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git

commit d3b4b443e02e590d0958a14bc8158196d2c23977
Author: peacewong <wp...@gmail.com>
AuthorDate: Mon May 9 15:09:33 2022 +0800

    optimize code
---
 .../linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala      | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala b/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
index d91225d95..53bf60169 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
+++ b/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
@@ -406,7 +406,7 @@ class HiveEngineConnExecutor(id: Int,
       }
 
       logger.debug(s"hive progress is $totalProgress")
-      val newProgress = if (totalProgress.isNaN || totalProgress.isInfinite)  currentBegin else totalProgress + currentBegin
+      val newProgress = if (totalProgress.isNaN || totalProgress.isInfinite) currentBegin else totalProgress + currentBegin
       val oldProgress = ProgressUtils.getOldProgress(this.engineExecutorContext)
       if(newProgress < oldProgress) oldProgress else {
         ProgressUtils.putProgress(newProgress, this.engineExecutorContext)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org


[incubator-linkis] 02/03: add engine conn status scan time conf

Posted by wa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wangzhen pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git

commit 490333e837fefaee5ad30c4d52f9d6bc554dad39
Author: peacewong <wp...@gmail.com>
AuthorDate: Sat May 7 10:47:34 2022 +0800

    add engine conn status scan time conf
---
 .../acessible/executor/conf/AccessibleExecutorConfiguration.scala     | 1 +
 .../acessible/executor/execution/AccessibleEngineConnExecution.scala  | 4 ++--
 2 files changed, 3 insertions(+), 2 deletions(-)

diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorConfiguration.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorConfiguration.scala
index dff9b7990..ce63912bd 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorConfiguration.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorConfiguration.scala
@@ -45,6 +45,7 @@ object AccessibleExecutorConfiguration {
 
   val ENGINECONN_HEARTBEAT_TIME = CommonVars("wds.linkis.engineconn.heartbeat.time", new TimeType("2m"))
 
+  val ENGINECONN_STATUS_SCAN_TIME = CommonVars("wds.linkis.engineconn.status.scan.time", new TimeType("1m"))
 
   val ENABLE_MAINTAIN = CommonVars("wds.linkis.engineconn.maintain.enable", false)
 
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/execution/AccessibleEngineConnExecution.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/execution/AccessibleEngineConnExecution.scala
index 26943e2ca..a8a59d3c5 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/execution/AccessibleEngineConnExecution.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/execution/AccessibleEngineConnExecution.scala
@@ -74,7 +74,7 @@ class AccessibleEngineConnExecution extends EngineConnExecution with Logging {
     val maxFreeTimeVar = AccessibleExecutorConfiguration.ENGINECONN_MAX_FREE_TIME.getValue(context.getOptions)
     val maxFreeTimeStr = maxFreeTimeVar.toString
     val maxFreeTime = maxFreeTimeVar.toLong
-    logger.info("executorStatusChecker created, maxFreeTimeMills is " + maxFreeTime)
+    logger.info("executorStatusChecker created, maxFreeTimeMills is " + maxFreeTime)
     Utils.defaultScheduler.scheduleAtFixedRate(new Runnable {
       override def run(): Unit = Utils.tryAndWarn {
         val accessibleExecutor = ExecutorManager.getInstance.getReportExecutor match {
@@ -104,7 +104,7 @@ class AccessibleEngineConnExecution extends EngineConnExecution with Logging {
 
         }
       }
-    }, 3 * 60 * 1000, AccessibleExecutorConfiguration.ENGINECONN_HEARTBEAT_TIME.getValue.toLong, TimeUnit.MILLISECONDS)
+    }, 3 * 60 * 1000, AccessibleExecutorConfiguration.ENGINECONN_STATUS_SCAN_TIME.getValue.toLong, TimeUnit.MILLISECONDS)
   }
 
   def requestManagerReleaseExecutor(msg: String): Unit = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org