You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2022/06/07 12:44:55 UTC
[incubator-linkis] 09/09: Fix NPE
This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
commit 16f645bb840ed6d48f9ae9fa61c0f76cbd945f52
Author: peacewong <wp...@gmail.com>
AuthorDate: Tue Jun 7 20:00:37 2022 +0800
Fix NPE
---
.../linkis/entrance/restful/EntranceRestfulApi.java | 2 +-
.../linkis/entrance/execute/DefaultEntranceExecutor.scala | 15 +++++++++------
.../operation/progress/DefaultProgressOperation.scala | 2 +-
.../operation/progress/ProgressProcessor.scala | 2 +-
4 files changed, 12 insertions(+), 9 deletions(-)
diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java
index 7703f8b25..31e49f748 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java
+++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java
@@ -311,7 +311,7 @@ public class EntranceRestfulApi implements EntranceRestfulRemote {
.reduce((x, y) -> x + y);
float corePercent = 0.0f;
float memoryPercent = 0.0f;
- if (cores.isPresent()) {
+ if (cores.isPresent() && memory.isPresent()) {
corePercent =
cores.get().floatValue()
/ EntranceConfiguration.YARN_QUEUE_CORES_MAX()
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
index c6165523c..1a1fed310 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
@@ -90,8 +90,10 @@ class DefaultEntranceExecutor(id: Long, mark: MarkReq, entranceExecutorManager:
if (runningIndex >= 0 && runningIndex <= jobGroupSize -1) {
val totalProgress = 1.0 * (runningIndex + progressInfoEvent.progress) / jobGroupSize
//Update progress info
- progressInfoEvent.progressInfo.foreach(progressInfo =>
- subJobInfo.getProgressInfoMap.put(progressInfo.id, progressInfo))
+ if (null != progressInfoEvent.progressInfo) {
+ progressInfoEvent.progressInfo.foreach(progressInfo =>
+ subJobInfo.getProgressInfoMap.put(progressInfo.id, progressInfo))
+ }
entranceJob.getProgressListener.foreach(_.onProgressUpdate(entranceJob, totalProgress.toFloat,
entranceJob.getProgressInfo))
} else {
@@ -108,10 +110,11 @@ class DefaultEntranceExecutor(id: Long, mark: MarkReq, entranceExecutorManager:
}
val metricsMap = entranceJob.getJobRequest.getMetrics
val resourceMap = metricsMap.get(TaskConstant.ENTRANCEJOB_YARNRESOURCE)
+ val ecResourceMap = if (progressInfoEvent.resourceMap == null) new util.HashMap[String, ResourceWithStatus] else progressInfoEvent.resourceMap
if(resourceMap != null) {
- resourceMap.asInstanceOf[util.HashMap[String, ResourceWithStatus]].putAll(progressInfoEvent.resourceMap)
+ resourceMap.asInstanceOf[util.HashMap[String, ResourceWithStatus]].putAll(ecResourceMap)
} else {
- metricsMap.put(TaskConstant.ENTRANCEJOB_YARNRESOURCE, progressInfoEvent.resourceMap)
+ metricsMap.put(TaskConstant.ENTRANCEJOB_YARNRESOURCE, ecResourceMap)
}
// update engine info
// todo
@@ -123,9 +126,9 @@ class DefaultEntranceExecutor(id: Long, mark: MarkReq, entranceExecutorManager:
metricsMap.put(TaskConstant.ENTRANCEJOB_ENGINECONN_MAP, engineInstanceMap)
}
val infoMap = progressInfoEvent.infoMap
- if (infoMap.containsKey(TaskConstant.ENGINE_INSTANCE)) {
+ if (null != infoMap && infoMap.containsKey(TaskConstant.ENGINE_INSTANCE)) {
val instance = infoMap.get(TaskConstant.ENGINE_INSTANCE).asInstanceOf[String]
- val engineExtraInfoMap = engineInstanceMap.getOrDefault(instance, new util.HashMap[String, Object]).asInstanceOf[util.HashMap[String, Object]]
+ val engineExtraInfoMap = engineInstanceMap.getOrDefault(instance, new util.HashMap[String, Object]).asInstanceOf[util.HashMap[String, Object]]
engineInstanceMap.put(instance, engineExtraInfoMap)
engineExtraInfoMap.putAll(infoMap)
} else {
diff --git a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/operation/progress/DefaultProgressOperation.scala b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/operation/progress/DefaultProgressOperation.scala
index f812976e9..0f492c138 100644
--- a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/operation/progress/DefaultProgressOperation.scala
+++ b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/operation/progress/DefaultProgressOperation.scala
@@ -55,7 +55,7 @@ class DefaultProgressOperation(orchestratorSession: OrchestratorSession) extends
/**
* To deal with the progress event from engineConn
- * @param taskRunningInfoEvent progress event
+ * @param oriTaskRunningInfoEvent progress event
*/
override def onProgressOn(oriTaskRunningInfoEvent: TaskRunningInfoEvent): Unit = {
var taskRunningInfoEvent: TaskRunningInfoEvent = null
diff --git a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/operation/progress/ProgressProcessor.scala b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/operation/progress/ProgressProcessor.scala
index 807c4f1ad..6a1fbd126 100644
--- a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/operation/progress/ProgressProcessor.scala
+++ b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/operation/progress/ProgressProcessor.scala
@@ -41,7 +41,7 @@ class ProgressProcessor(rootExecTaskId: String,
}
def onProgress(progress: Float, progressInfo: Array[JobProgressInfo], resourceMap: util.HashMap[String, ResourceWithStatus], infoMap: util.HashMap[String, Object]): Unit = {
- val progressInfoWithResourceEvent = ProgressInfoEvent(orchestration, progress, progressInfo, resourceMap, infoMap: util.HashMap[String, Object])
+ val progressInfoWithResourceEvent = ProgressInfoEvent(orchestration, progress, progressInfo, resourceMap, infoMap)
listeners.foreach(_(progressInfoWithResourceEvent))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org