You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2022/06/07 12:44:55 UTC

[incubator-linkis] 09/09: Fix NPE

This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch dev-1.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git

commit 16f645bb840ed6d48f9ae9fa61c0f76cbd945f52
Author: peacewong <wp...@gmail.com>
AuthorDate: Tue Jun 7 20:00:37 2022 +0800

    Fix NPE
---
 .../linkis/entrance/restful/EntranceRestfulApi.java       |  2 +-
 .../linkis/entrance/execute/DefaultEntranceExecutor.scala | 15 +++++++++------
 .../operation/progress/DefaultProgressOperation.scala     |  2 +-
 .../operation/progress/ProgressProcessor.scala            |  2 +-
 4 files changed, 12 insertions(+), 9 deletions(-)

diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java
index 7703f8b25..31e49f748 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java
+++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java
@@ -311,7 +311,7 @@ public class EntranceRestfulApi implements EntranceRestfulRemote {
                                         .reduce((x, y) -> x + y);
                         float corePercent = 0.0f;
                         float memoryPercent = 0.0f;
-                        if (cores.isPresent()) {
+                        if (cores.isPresent() && memory.isPresent()) {
                             corePercent =
                                     cores.get().floatValue()
                                             / EntranceConfiguration.YARN_QUEUE_CORES_MAX()
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
index c6165523c..1a1fed310 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
@@ -90,8 +90,10 @@ class DefaultEntranceExecutor(id: Long, mark: MarkReq, entranceExecutorManager:
             if (runningIndex >= 0 && runningIndex <= jobGroupSize -1) {
               val totalProgress = 1.0 * (runningIndex + progressInfoEvent.progress) / jobGroupSize
               //Update progress info
-              progressInfoEvent.progressInfo.foreach(progressInfo =>
-                subJobInfo.getProgressInfoMap.put(progressInfo.id, progressInfo))
+              if (null != progressInfoEvent.progressInfo) {
+                progressInfoEvent.progressInfo.foreach(progressInfo =>
+                  subJobInfo.getProgressInfoMap.put(progressInfo.id, progressInfo))
+              }
               entranceJob.getProgressListener.foreach(_.onProgressUpdate(entranceJob, totalProgress.toFloat,
                 entranceJob.getProgressInfo))
             } else {
@@ -108,10 +110,11 @@ class DefaultEntranceExecutor(id: Long, mark: MarkReq, entranceExecutorManager:
         }
         val metricsMap = entranceJob.getJobRequest.getMetrics
         val resourceMap = metricsMap.get(TaskConstant.ENTRANCEJOB_YARNRESOURCE)
+        val ecResourceMap = if (progressInfoEvent.resourceMap == null) new util.HashMap[String, ResourceWithStatus] else progressInfoEvent.resourceMap
         if(resourceMap != null) {
-          resourceMap.asInstanceOf[util.HashMap[String, ResourceWithStatus]].putAll(progressInfoEvent.resourceMap)
+          resourceMap.asInstanceOf[util.HashMap[String, ResourceWithStatus]].putAll(ecResourceMap)
         } else {
-          metricsMap.put(TaskConstant.ENTRANCEJOB_YARNRESOURCE, progressInfoEvent.resourceMap)
+          metricsMap.put(TaskConstant.ENTRANCEJOB_YARNRESOURCE, ecResourceMap)
         }
         // update engine info
         // todo
@@ -123,9 +126,9 @@ class DefaultEntranceExecutor(id: Long, mark: MarkReq, entranceExecutorManager:
           metricsMap.put(TaskConstant.ENTRANCEJOB_ENGINECONN_MAP, engineInstanceMap)
         }
         val infoMap = progressInfoEvent.infoMap
-        if (infoMap.containsKey(TaskConstant.ENGINE_INSTANCE)) {
+        if (null != infoMap && infoMap.containsKey(TaskConstant.ENGINE_INSTANCE)) {
           val instance = infoMap.get(TaskConstant.ENGINE_INSTANCE).asInstanceOf[String]
-          val engineExtraInfoMap =  engineInstanceMap.getOrDefault(instance, new util.HashMap[String, Object]).asInstanceOf[util.HashMap[String, Object]]
+          val engineExtraInfoMap = engineInstanceMap.getOrDefault(instance, new util.HashMap[String, Object]).asInstanceOf[util.HashMap[String, Object]]
           engineInstanceMap.put(instance, engineExtraInfoMap)
           engineExtraInfoMap.putAll(infoMap)
         } else {
diff --git a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/operation/progress/DefaultProgressOperation.scala b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/operation/progress/DefaultProgressOperation.scala
index f812976e9..0f492c138 100644
--- a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/operation/progress/DefaultProgressOperation.scala
+++ b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/operation/progress/DefaultProgressOperation.scala
@@ -55,7 +55,7 @@ class DefaultProgressOperation(orchestratorSession: OrchestratorSession) extends
 
   /**
     * To deal with the progress event from engineConn
-    * @param taskRunningInfoEvent progress event
+    * @param oriTaskRunningInfoEvent progress event
     */
   override def onProgressOn(oriTaskRunningInfoEvent: TaskRunningInfoEvent): Unit = {
     var taskRunningInfoEvent: TaskRunningInfoEvent = null
diff --git a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/operation/progress/ProgressProcessor.scala b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/operation/progress/ProgressProcessor.scala
index 807c4f1ad..6a1fbd126 100644
--- a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/operation/progress/ProgressProcessor.scala
+++ b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/operation/progress/ProgressProcessor.scala
@@ -41,7 +41,7 @@ class ProgressProcessor(rootExecTaskId: String,
   }
 
   def onProgress(progress: Float, progressInfo: Array[JobProgressInfo], resourceMap: util.HashMap[String, ResourceWithStatus], infoMap: util.HashMap[String, Object]): Unit = {
-     val progressInfoWithResourceEvent = ProgressInfoEvent(orchestration, progress, progressInfo, resourceMap, infoMap: util.HashMap[String, Object])
+     val progressInfoWithResourceEvent = ProgressInfoEvent(orchestration, progress, progressInfo, resourceMap, infoMap)
      listeners.foreach(_(progressInfoWithResourceEvent))
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org