You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2019/10/14 12:16:19 UTC
[griffin] branch master updated: [GRIFFIN-293][SERVICE]
livy.need.queue=true
This is an automated email from the ASF dual-hosted git repository.
guoyp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/griffin.git
The following commit(s) were added to refs/heads/master by this push:
new e9dda8b [GRIFFIN-293][SERVICE] livy.need.queue=true
e9dda8b is described below
commit e9dda8b53aadfc127854a89ac797c761b95de92a
Author: neveljkovic <nv...@plume.com>
AuthorDate: Mon Oct 14 20:14:56 2019 +0800
[GRIFFIN-293][SERVICE] livy.need.queue=true
This is how we fixed issue described in https://issues.apache.org/jira/browse/GRIFFIN-293
Solution is deployed to our servers and works OK.
Author: neveljkovic <nv...@plume.com>
Closes #541 from neveljkovic/griffin-293.
---
.../java/org/apache/griffin/core/job/JobServiceImpl.java | 4 ++--
.../org/apache/griffin/core/job/LivyTaskSubmitHelper.java | 1 +
.../java/org/apache/griffin/core/job/SparkSubmitJob.java | 12 ++++++------
3 files changed, 9 insertions(+), 8 deletions(-)
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
index 77fbec3..3dfc830 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
@@ -629,8 +629,8 @@ public class JobServiceImpl implements JobService {
.getProperty("yarn.uri") + "/cluster/app/" + appId);
instanceRepo.save(instance);
// If Livy returns to success or dead, task execution completes one,TaskNum--
- if (SUCCESS.equals(state) || DEAD.equals(state)) {
- livyTaskSubmitHelper.decreaseCurTaskNum(instance.getId());
+ if (instance.getState().equals(SUCCESS) || instance.getState().equals(DEAD)) {
+ livyTaskSubmitHelper.decreaseCurTaskNum(instance.getSessionId());
}
}
}
diff --git a/service/src/main/java/org/apache/griffin/core/job/LivyTaskSubmitHelper.java b/service/src/main/java/org/apache/griffin/core/job/LivyTaskSubmitHelper.java
index 54cc582..558c311 100644
--- a/service/src/main/java/org/apache/griffin/core/job/LivyTaskSubmitHelper.java
+++ b/service/src/main/java/org/apache/griffin/core/job/LivyTaskSubmitHelper.java
@@ -61,6 +61,7 @@ public class LivyTaskSubmitHelper {
public static final int DEFAULT_QUEUE_SIZE = 20000;
private static final int SLEEP_TIME = 300;
+ @Autowired
private SparkSubmitJob sparkSubmitJob;
private ConcurrentMap<Long, Integer> taskAppIdMap = new ConcurrentHashMap<>();
// Current number of tasks
diff --git a/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java b/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java
index e48053d..8af38ab 100644
--- a/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java
+++ b/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java
@@ -100,12 +100,6 @@ public class SparkSubmitJob implements Job {
public void execute(JobExecutionContext context) {
JobDetail jd = context.getJobDetail();
try {
- initParam(jd);
- setLivyConf();
- if (!success(mPredicates)) {
- updateJobInstanceState(context);
- return;
- }
if (isNeedLivyQueue) {
//livy batch limit
livyTaskSubmitHelper.addTaskToWaitingQueue(jd);
@@ -210,6 +204,12 @@ public class SparkSubmitJob implements Job {
IOException {
// If result is null, it may livy uri is wrong
// or livy parameter is wrong.
+ initParam(jd);
+ setLivyConf();
+ if (!success(mPredicates)) {
+ updateJobInstanceState((JobExecutionContext) jd);
+ return;
+ }
Map<String, Object> resultMap = post2LivyWithRetry();
String group = jd.getKey().getGroup();
String name = jd.getKey().getName();