You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2019/10/14 12:16:19 UTC

[griffin] branch master updated: [GRIFFIN-293][SERVICE] livy.need.queue=true

This is an automated email from the ASF dual-hosted git repository.

guoyp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/griffin.git


The following commit(s) were added to refs/heads/master by this push:
     new e9dda8b  [GRIFFIN-293][SERVICE] livy.need.queue=true
e9dda8b is described below

commit e9dda8b53aadfc127854a89ac797c761b95de92a
Author: neveljkovic <nv...@plume.com>
AuthorDate: Mon Oct 14 20:14:56 2019 +0800

    [GRIFFIN-293][SERVICE] livy.need.queue=true
    
    This is how we fixed issue described in https://issues.apache.org/jira/browse/GRIFFIN-293
    Solution is deployed to our servers and works OK.
    
    Author: neveljkovic <nv...@plume.com>
    
    Closes #541 from neveljkovic/griffin-293.
---
 .../java/org/apache/griffin/core/job/JobServiceImpl.java     |  4 ++--
 .../org/apache/griffin/core/job/LivyTaskSubmitHelper.java    |  1 +
 .../java/org/apache/griffin/core/job/SparkSubmitJob.java     | 12 ++++++------
 3 files changed, 9 insertions(+), 8 deletions(-)

diff --git a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
index 77fbec3..3dfc830 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
@@ -629,8 +629,8 @@ public class JobServiceImpl implements JobService {
                 .getProperty("yarn.uri") + "/cluster/app/" + appId);
             instanceRepo.save(instance);
             // If Livy returns to success or dead, task execution completes one,TaskNum--
-            if (SUCCESS.equals(state) || DEAD.equals(state)) {
-                livyTaskSubmitHelper.decreaseCurTaskNum(instance.getId());
+            if (instance.getState().equals(SUCCESS) || instance.getState().equals(DEAD)) {
+                livyTaskSubmitHelper.decreaseCurTaskNum(instance.getSessionId());
             }
         }
     }
diff --git a/service/src/main/java/org/apache/griffin/core/job/LivyTaskSubmitHelper.java b/service/src/main/java/org/apache/griffin/core/job/LivyTaskSubmitHelper.java
index 54cc582..558c311 100644
--- a/service/src/main/java/org/apache/griffin/core/job/LivyTaskSubmitHelper.java
+++ b/service/src/main/java/org/apache/griffin/core/job/LivyTaskSubmitHelper.java
@@ -61,6 +61,7 @@ public class LivyTaskSubmitHelper {
     public static final int DEFAULT_QUEUE_SIZE = 20000;
     private static final int SLEEP_TIME = 300;
 
+    @Autowired
     private SparkSubmitJob sparkSubmitJob;
     private ConcurrentMap<Long, Integer> taskAppIdMap = new ConcurrentHashMap<>();
     // Current number of tasks
diff --git a/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java b/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java
index e48053d..8af38ab 100644
--- a/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java
+++ b/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java
@@ -100,12 +100,6 @@ public class SparkSubmitJob implements Job {
     public void execute(JobExecutionContext context) {
         JobDetail jd = context.getJobDetail();
         try {
-            initParam(jd);
-            setLivyConf();
-            if (!success(mPredicates)) {
-                updateJobInstanceState(context);
-                return;
-            }
             if (isNeedLivyQueue) {
                 //livy batch limit
                 livyTaskSubmitHelper.addTaskToWaitingQueue(jd);
@@ -210,6 +204,12 @@ public class SparkSubmitJob implements Job {
         IOException {
         // If result is null, it may livy uri is wrong
         // or livy parameter is wrong.
+        initParam(jd);
+        setLivyConf();
+        if (!success(mPredicates)) {
+            updateJobInstanceState((JobExecutionContext) jd);
+            return;
+        }
         Map<String, Object> resultMap = post2LivyWithRetry();
         String group = jd.getKey().getGroup();
         String name = jd.getKey().getName();