You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@griffin.apache.org by yangtong0912 <gi...@git.apache.org> on 2018/12/07 06:05:20 UTC

[GitHub] griffin pull request #468: Livy concurrent submission queue

GitHub user yangtong0912 opened a pull request:

    https://github.com/apache/griffin/pull/468

    Livy concurrent submission queue

    In order to prevent spark cluster resource constraints or too many concurrent tasks at the same time, which results in the failure of Livy submitting jobs, join Livy blocking queue and limit the number of concurrent tasks.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/yangtong0912/griffin master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/griffin/pull/468.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #468
    
----
commit 52b4c4b7ecd96892ecc27be383cf3143e0c8c091
Author: yangtong0912 <yt...@...>
Date:   2018-12-07T05:21:35Z

    For task GRIFFIN-220 Livy concurrent submission queue

commit 27de281a0e31773a7f5967c94b56b48e5f4dd2f9
Author: yangtong0912 <yt...@...>
Date:   2018-12-07T05:31:03Z

    For task GRIFFIN-220 Livy concurrent submission queue

commit cb3103ffd922f50ab7dbd1f51eb4d66e43a9b4eb
Author: yangtong0912 <yt...@...>
Date:   2018-12-07T05:35:05Z

    For task GRIFFIN-220 Livy concurrent submission queue

commit b4cc0cc61c6635423219e4beb65073dabfcb9686
Author: yangtong0912 <yt...@...>
Date:   2018-12-07T05:55:19Z

    For task GRIFFIN-220 Livy concurrent submission queue

----


---

[GitHub] griffin pull request #468: Livy concurrent submission queue

Posted by chemikadze <gi...@git.apache.org>.
Github user chemikadze commented on a diff in the pull request:

    https://github.com/apache/griffin/pull/468#discussion_r239994161
  
    --- Diff: service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java ---
    @@ -606,6 +610,10 @@ private void setJobInstanceIdAndUri(JobInstanceBean instance, HashMap<String
                 instance.setAppUri(appId == null ? null : env
                     .getProperty("yarn.uri") + "/cluster/app/" + appId);
                 instanceRepo.save(instance);
    +            // If Livy returns to success or dead, task execution completes one,TaskNum--
    +            if ("SUCCESS".equals(state) || "DEAD".equals(state)) {
    --- End diff --
    
    It feels more correct to compare it to enum values, than strings (or at least, toString of enum values).


---

[GitHub] griffin pull request #468: Livy concurrent submission queue

Posted by toyboxman <gi...@git.apache.org>.
Github user toyboxman commented on a diff in the pull request:

    https://github.com/apache/griffin/pull/468#discussion_r242442619
  
    --- Diff: service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java ---
    @@ -83,14 +83,7 @@ Licensed to the Apache Software Foundation (ASF) under one
     import static org.apache.griffin.core.exception.GriffinExceptionMessage.MEASURE_TYPE_DOES_NOT_SUPPORT;
     import static org.apache.griffin.core.exception.GriffinExceptionMessage.NO_SUCH_JOB_ACTION;
     import static org.apache.griffin.core.exception.GriffinExceptionMessage.QUARTZ_JOB_ALREADY_EXIST;
    -import static org.apache.griffin.core.job.entity.LivySessionStates.State.BUSY;
    -import static org.apache.griffin.core.job.entity.LivySessionStates.State.DEAD;
    -import static org.apache.griffin.core.job.entity.LivySessionStates.State.IDLE;
    -import static org.apache.griffin.core.job.entity.LivySessionStates.State.NOT_STARTED;
    -import static org.apache.griffin.core.job.entity.LivySessionStates.State.RECOVERING;
    -import static org.apache.griffin.core.job.entity.LivySessionStates.State.RUNNING;
    -import static org.apache.griffin.core.job.entity.LivySessionStates.State.STARTING;
    -import static org.apache.griffin.core.job.entity.LivySessionStates.State.UNKNOWN;
    +import static org.apache.griffin.core.job.entity.LivySessionStates.State.*;
    --- End diff --
    
    please follow https://github.com/apache/griffin/blob/master/griffin-doc/dev/code-style.md
    
    remove * from import statement


---

[GitHub] griffin pull request #468: Livy concurrent submission queue

Posted by chemikadze <gi...@git.apache.org>.
Github user chemikadze commented on a diff in the pull request:

    https://github.com/apache/griffin/pull/468#discussion_r239993455
  
    --- Diff: service/pom.xml ---
    @@ -124,6 +124,12 @@ under the License.
                 <artifactId>jackson-databind</artifactId>
                 <version>2.6.3</version>
             </dependency>
    +
    +        <dependency>
    +            <groupId>com.alibaba</groupId>
    --- End diff --
    
    Do we really need this dependency just for logging?


---

[GitHub] griffin pull request #468: Livy concurrent submission queue

Posted by ahutsunshine <gi...@git.apache.org>.
Github user ahutsunshine commented on a diff in the pull request:

    https://github.com/apache/griffin/pull/468#discussion_r239800609
  
    --- Diff: service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java ---
    @@ -98,7 +100,12 @@ public void execute(JobExecutionContext context) {
                     updateJobInstanceState(context);
                     return;
                 }
    -            saveJobInstance(jd);
    +            if(livyTaskSubmitHelper.isNeedLivyQueue()){
    +                //livy batch limit
    +                livyTaskSubmitHelper.addTaskToWaitingQueue(livyConfMap);
    +            }else{
    +                saveJobInstance(jd);
    +            }
    --- End diff --
    
    As you modify the job execution process, you need to update relevant tests in SparkSubmitJobTest class to make travis build pass.


---

[GitHub] griffin pull request #468: Livy concurrent submission queue

Posted by chemikadze <gi...@git.apache.org>.
Github user chemikadze commented on a diff in the pull request:

    https://github.com/apache/griffin/pull/468#discussion_r239993948
  
    --- Diff: service/src/main/java/org/apache/griffin/core/job/LivyTaskSubmitHelper.java ---
    @@ -0,0 +1,238 @@
    +package org.apache.griffin.core.job;
    +
    +import com.alibaba.fastjson.JSON;
    +import com.google.gson.Gson;
    +import org.apache.commons.lang.StringUtils;
    +import org.apache.griffin.core.job.entity.JobInstanceBean;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.springframework.beans.factory.annotation.Autowired;
    +import org.springframework.core.env.Environment;
    +import org.springframework.http.HttpEntity;
    +import org.springframework.http.HttpHeaders;
    +import org.springframework.http.MediaType;
    +import org.springframework.stereotype.Component;
    +import org.springframework.web.client.RestTemplate;
    +
    +import javax.annotation.PostConstruct;
    +import java.util.Map;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.ConcurrentMap;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import static org.apache.griffin.core.job.entity.LivySessionStates.State.FOUND;
    +import static org.apache.griffin.core.util.JsonUtil.toJsonWithFormat;
    +
    +@Component
    +public class LivyTaskSubmitHelper {
    +
    +    private static final Logger logger = LoggerFactory
    +            .getLogger(LivyTaskSubmitHelper.class);
    +
    +    private static final String REQUEST_BY_HEADER = "X-Requested-By";
    +    private JobInstanceBean jobInstance;
    +    private SparkSubmitJob sparkSubmitJob;
    +    private ConcurrentMap<Long, Integer> taskAppIdMap = new ConcurrentHashMap<>();
    +    // Current number of tasks
    +    private AtomicInteger curConcurrentTaskNum = new AtomicInteger(0);
    +    private String workerNamePre;
    +    private RestTemplate restTemplate = new RestTemplate();
    +    // queue for pub or sub
    +    private BlockingQueue<Map<String, Object>> queue;
    +    public static final int DEFAULT_QUEUE_SIZE = 20000;
    +    private String uri;
    +
    +    @Autowired
    +    private Environment env;
    +
    +    @PostConstruct
    +    public void init() {
    +        startWorker();
    +        uri = env.getProperty("livy.uri");
    +        logger.info("Livy uri : {}",uri);
    +    }
    +
    +    public LivyTaskSubmitHelper() {
    +        this.workerNamePre = "livy-task-submit-worker";
    +    }
    +
    +    public void startWorker() {
    +        queue = new LinkedBlockingQueue<>(DEFAULT_QUEUE_SIZE);
    +        BatchTaskWorker worker = new BatchTaskWorker();
    +        worker.setDaemon(true);
    +        worker.setName(workerNamePre + "-" + worker.getName());
    +        worker.start();
    +    }
    +
    +    public void addTaskToWaitingQueue(Map<String, Object> t) {
    +        if (t == null) {
    +            logger.warn("task is blank, workerNamePre: {}", workerNamePre);
    +            return;
    +        }
    +
    +        if (queue.remainingCapacity() <= 0) {
    +            logger.warn("task is discard, workerNamePre: {}, task: {}", workerNamePre, JSON.toJSON(t));
    +            return;
    +        }
    +
    +        queue.add(t);
    +
    +        logger.info("add_task_to_waiting_queue_success, workerNamePre: {}, task: {}", workerNamePre, JSON.toJSON(t));
    +    }
    +
    +    /**
    +     * Consumer thread
    +     */
    +    class BatchTaskWorker extends Thread {
    +        public void run() {
    +            long insertTime = System.currentTimeMillis();
    +
    +            // Keep sequential execution within a limited number of tasks
    +            while (true) {
    +                try {
    +                    if (curConcurrentTaskNum.get() < getMaxConcurrentTaskCount()
    +                            && (System.currentTimeMillis() - insertTime) >= getBatchIntervalSecond() * 1000) {
    +                        Map<String, Object> task = queue.take();
    +                        doTask(task);
    +                        insertTime = System.currentTimeMillis();
    +                    }
    +                } catch (Throwable e) {
    +                    logger.error("Async_worker_doTask_failed, {}", workerNamePre + e.getMessage(), e);
    +                }
    +            }
    +        }
    +    }
    +
    +    public void increaseCurTaskNum(Long scheduleId) {
    +        curConcurrentTaskNum.incrementAndGet();
    +        if (scheduleId != null) taskAppIdMap.put(scheduleId, 1);
    +    }
    +
    +    //Remove tasks after job status updates
    +    public void decreaseCurTaskNum(Long scheduleId) {
    +        if (scheduleId != null && taskAppIdMap.containsKey(scheduleId)) {
    +            curConcurrentTaskNum.decrementAndGet();
    +            taskAppIdMap.remove(scheduleId);
    +        }
    +    }
    +
    +    /**
    +     * Submit a task to Livy and concurrent TaskNum++
    +     */
    +    protected void doTask(Map<String, Object> livyConfMap) {
    +        try {
    +            HttpHeaders headers = new HttpHeaders();
    +            headers.setContentType(MediaType.APPLICATION_JSON);
    +            headers.set(REQUEST_BY_HEADER, "admin");
    +
    +            HttpEntity<String> springEntity = new HttpEntity<String>(toJsonWithFormat(livyConfMap), headers);
    +
    +            String result = restTemplate.postForObject(uri, springEntity, String.class);
    +            logger.info("submit livy result: {}", result);
    +
    +
    +            Gson gson = new Gson();
    +            try {
    +                jobInstance = gson.fromJson(result, JobInstanceBean.class);
    +
    +                // The first time didn't get appId, try again several times
    +                if (StringUtils.isBlank(jobInstance.getAppId())) {
    +                    jobInstance = retryLivyGetAppId(jobInstance);
    +                }
    +
    +                logger.info("submit livy scheduleResult: {}", jobInstance);
    +            } catch (Exception e) {
    +                logger.error("submit livy scheduleResult covert error!", e);
    +            }
    +
    +            if (jobInstance != null) {
    +                //save result info into DataBase
    +                sparkSubmitJob.saveJobInstance(result, FOUND);
    --- End diff --
    
    I don't see it getting set anywhere, where is it coming from?


---

[GitHub] griffin pull request #468: Livy concurrent submission queue

Posted by yangtong0912 <gi...@git.apache.org>.
Github user yangtong0912 commented on a diff in the pull request:

    https://github.com/apache/griffin/pull/468#discussion_r239994489
  
    --- Diff: service/src/main/java/org/apache/griffin/core/job/LivyTaskSubmitHelper.java ---
    @@ -0,0 +1,238 @@
    +package org.apache.griffin.core.job;
    +
    +import com.alibaba.fastjson.JSON;
    +import com.google.gson.Gson;
    +import org.apache.commons.lang.StringUtils;
    +import org.apache.griffin.core.job.entity.JobInstanceBean;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.springframework.beans.factory.annotation.Autowired;
    +import org.springframework.core.env.Environment;
    +import org.springframework.http.HttpEntity;
    +import org.springframework.http.HttpHeaders;
    +import org.springframework.http.MediaType;
    +import org.springframework.stereotype.Component;
    +import org.springframework.web.client.RestTemplate;
    +
    +import javax.annotation.PostConstruct;
    +import java.util.Map;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.ConcurrentMap;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import static org.apache.griffin.core.job.entity.LivySessionStates.State.FOUND;
    +import static org.apache.griffin.core.util.JsonUtil.toJsonWithFormat;
    +
    +@Component
    +public class LivyTaskSubmitHelper {
    +
    +    private static final Logger logger = LoggerFactory
    +            .getLogger(LivyTaskSubmitHelper.class);
    +
    +    private static final String REQUEST_BY_HEADER = "X-Requested-By";
    +    private JobInstanceBean jobInstance;
    +    private SparkSubmitJob sparkSubmitJob;
    +    private ConcurrentMap<Long, Integer> taskAppIdMap = new ConcurrentHashMap<>();
    +    // Current number of tasks
    +    private AtomicInteger curConcurrentTaskNum = new AtomicInteger(0);
    +    private String workerNamePre;
    +    private RestTemplate restTemplate = new RestTemplate();
    +    // queue for pub or sub
    +    private BlockingQueue<Map<String, Object>> queue;
    +    public static final int DEFAULT_QUEUE_SIZE = 20000;
    +    private String uri;
    +
    +    @Autowired
    +    private Environment env;
    +
    +    @PostConstruct
    +    public void init() {
    +        startWorker();
    +        uri = env.getProperty("livy.uri");
    +        logger.info("Livy uri : {}",uri);
    +    }
    +
    +    public LivyTaskSubmitHelper() {
    +        this.workerNamePre = "livy-task-submit-worker";
    +    }
    +
    +    public void startWorker() {
    +        queue = new LinkedBlockingQueue<>(DEFAULT_QUEUE_SIZE);
    +        BatchTaskWorker worker = new BatchTaskWorker();
    +        worker.setDaemon(true);
    +        worker.setName(workerNamePre + "-" + worker.getName());
    +        worker.start();
    +    }
    +
    +    public void addTaskToWaitingQueue(Map<String, Object> t) {
    +        if (t == null) {
    +            logger.warn("task is blank, workerNamePre: {}", workerNamePre);
    +            return;
    +        }
    +
    +        if (queue.remainingCapacity() <= 0) {
    +            logger.warn("task is discard, workerNamePre: {}, task: {}", workerNamePre, JSON.toJSON(t));
    +            return;
    +        }
    +
    +        queue.add(t);
    +
    +        logger.info("add_task_to_waiting_queue_success, workerNamePre: {}, task: {}", workerNamePre, JSON.toJSON(t));
    +    }
    +
    +    /**
    +     * Consumer thread
    +     */
    +    class BatchTaskWorker extends Thread {
    +        public void run() {
    +            long insertTime = System.currentTimeMillis();
    +
    +            // Keep sequential execution within a limited number of tasks
    +            while (true) {
    +                try {
    +                    if (curConcurrentTaskNum.get() < getMaxConcurrentTaskCount()
    +                            && (System.currentTimeMillis() - insertTime) >= getBatchIntervalSecond() * 1000) {
    +                        Map<String, Object> task = queue.take();
    +                        doTask(task);
    +                        insertTime = System.currentTimeMillis();
    +                    }
    +                } catch (Throwable e) {
    +                    logger.error("Async_worker_doTask_failed, {}", workerNamePre + e.getMessage(), e);
    +                }
    +            }
    +        }
    +    }
    +
    +    public void increaseCurTaskNum(Long scheduleId) {
    +        curConcurrentTaskNum.incrementAndGet();
    +        if (scheduleId != null) taskAppIdMap.put(scheduleId, 1);
    +    }
    +
    +    //Remove tasks after job status updates
    +    public void decreaseCurTaskNum(Long scheduleId) {
    +        if (scheduleId != null && taskAppIdMap.containsKey(scheduleId)) {
    +            curConcurrentTaskNum.decrementAndGet();
    +            taskAppIdMap.remove(scheduleId);
    +        }
    +    }
    +
    +    /**
    +     * Submit a task to Livy and concurrent TaskNum++
    +     */
    +    protected void doTask(Map<String, Object> livyConfMap) {
    +        try {
    +            HttpHeaders headers = new HttpHeaders();
    +            headers.setContentType(MediaType.APPLICATION_JSON);
    +            headers.set(REQUEST_BY_HEADER, "admin");
    +
    +            HttpEntity<String> springEntity = new HttpEntity<String>(toJsonWithFormat(livyConfMap), headers);
    +
    +            String result = restTemplate.postForObject(uri, springEntity, String.class);
    +            logger.info("submit livy result: {}", result);
    +
    +
    +            Gson gson = new Gson();
    +            try {
    +                jobInstance = gson.fromJson(result, JobInstanceBean.class);
    +
    +                // The first time didn't get appId, try again several times
    +                if (StringUtils.isBlank(jobInstance.getAppId())) {
    +                    jobInstance = retryLivyGetAppId(jobInstance);
    +                }
    +
    +                logger.info("submit livy scheduleResult: {}", jobInstance);
    +            } catch (Exception e) {
    +                logger.error("submit livy scheduleResult covert error!", e);
    +            }
    +
    +            if (jobInstance != null) {
    +                //save result info into DataBase
    +                sparkSubmitJob.saveJobInstance(result, FOUND);
    +
    +                // Successful submission of a task
    +                increaseCurTaskNum(jobInstance.getId());
    +            }
    +        } catch (Exception e) {
    +            logger.error("submit task to livy error.", e);
    +        }
    +    }
    +
    +    private JobInstanceBean retryLivyGetAppId(JobInstanceBean jobInstance) {
    +
    +        int retryCount = getAppIdRetryCount();
    +
    +        if (retryCount <= 0) {
    +            return jobInstance;
    +        }
    +
    +        Long livyBatchesId = jobInstance.getId();
    +        if (livyBatchesId == null) {
    +            return jobInstance;
    +        }
    +
    +        while (retryCount-- > 0) {
    +            try {
    +                Thread.sleep(300);
    +            } catch (InterruptedException e) {
    +                logger.error(e.getMessage(), e);
    +            }
    +            String result = restTemplate.getForObject(uri + "/" + livyBatchesId, String.class);
    +            logger.info("retry get livy result: {}, batches id : {}", result, livyBatchesId);
    +
    +            Gson gson = new Gson();
    +            JobInstanceBean newJobInstance = gson.fromJson(result, JobInstanceBean.class);
    +            if (StringUtils.isNotBlank(newJobInstance.getAppId())) {
    +                return newJobInstance;
    +            }
    +        }
    +
    +        return jobInstance;
    +    }
    +
    +    // Maximum number of parallel tasks
    +    protected int getMaxConcurrentTaskCount() {
    --- End diff --
    
    > Can't these getters be a `@Value`?
    
    Sure.  That's right. I'll have a try.


---

[GitHub] griffin pull request #468: Livy concurrent submission queue

Posted by yangtong0912 <gi...@git.apache.org>.
Github user yangtong0912 commented on a diff in the pull request:

    https://github.com/apache/griffin/pull/468#discussion_r243890050
  
    --- Diff: service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java ---
    @@ -83,14 +83,7 @@ Licensed to the Apache Software Foundation (ASF) under one
     import static org.apache.griffin.core.exception.GriffinExceptionMessage.MEASURE_TYPE_DOES_NOT_SUPPORT;
     import static org.apache.griffin.core.exception.GriffinExceptionMessage.NO_SUCH_JOB_ACTION;
     import static org.apache.griffin.core.exception.GriffinExceptionMessage.QUARTZ_JOB_ALREADY_EXIST;
    -import static org.apache.griffin.core.job.entity.LivySessionStates.State.BUSY;
    -import static org.apache.griffin.core.job.entity.LivySessionStates.State.DEAD;
    -import static org.apache.griffin.core.job.entity.LivySessionStates.State.IDLE;
    -import static org.apache.griffin.core.job.entity.LivySessionStates.State.NOT_STARTED;
    -import static org.apache.griffin.core.job.entity.LivySessionStates.State.RECOVERING;
    -import static org.apache.griffin.core.job.entity.LivySessionStates.State.RUNNING;
    -import static org.apache.griffin.core.job.entity.LivySessionStates.State.STARTING;
    -import static org.apache.griffin.core.job.entity.LivySessionStates.State.UNKNOWN;
    +import static org.apache.griffin.core.job.entity.LivySessionStates.State.*;
    --- End diff --
    
    Okay, I've checked the code.Thanks.


---

[GitHub] griffin pull request #468: Livy concurrent submission queue

Posted by chemikadze <gi...@git.apache.org>.
Github user chemikadze commented on a diff in the pull request:

    https://github.com/apache/griffin/pull/468#discussion_r239993879
  
    --- Diff: service/src/main/java/org/apache/griffin/core/job/LivyTaskSubmitHelper.java ---
    @@ -0,0 +1,238 @@
    +package org.apache.griffin.core.job;
    +
    +import com.alibaba.fastjson.JSON;
    +import com.google.gson.Gson;
    +import org.apache.commons.lang.StringUtils;
    +import org.apache.griffin.core.job.entity.JobInstanceBean;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.springframework.beans.factory.annotation.Autowired;
    +import org.springframework.core.env.Environment;
    +import org.springframework.http.HttpEntity;
    +import org.springframework.http.HttpHeaders;
    +import org.springframework.http.MediaType;
    +import org.springframework.stereotype.Component;
    +import org.springframework.web.client.RestTemplate;
    +
    +import javax.annotation.PostConstruct;
    +import java.util.Map;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.ConcurrentMap;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import static org.apache.griffin.core.job.entity.LivySessionStates.State.FOUND;
    +import static org.apache.griffin.core.util.JsonUtil.toJsonWithFormat;
    +
    +@Component
    +public class LivyTaskSubmitHelper {
    +
    +    private static final Logger logger = LoggerFactory
    +            .getLogger(LivyTaskSubmitHelper.class);
    +
    +    private static final String REQUEST_BY_HEADER = "X-Requested-By";
    +    private JobInstanceBean jobInstance;
    +    private SparkSubmitJob sparkSubmitJob;
    +    private ConcurrentMap<Long, Integer> taskAppIdMap = new ConcurrentHashMap<>();
    +    // Current number of tasks
    +    private AtomicInteger curConcurrentTaskNum = new AtomicInteger(0);
    +    private String workerNamePre;
    +    private RestTemplate restTemplate = new RestTemplate();
    +    // queue for pub or sub
    +    private BlockingQueue<Map<String, Object>> queue;
    +    public static final int DEFAULT_QUEUE_SIZE = 20000;
    +    private String uri;
    +
    +    @Autowired
    +    private Environment env;
    +
    +    @PostConstruct
    +    public void init() {
    +        startWorker();
    +        uri = env.getProperty("livy.uri");
    +        logger.info("Livy uri : {}",uri);
    +    }
    +
    +    public LivyTaskSubmitHelper() {
    +        this.workerNamePre = "livy-task-submit-worker";
    +    }
    +
    +    public void startWorker() {
    +        queue = new LinkedBlockingQueue<>(DEFAULT_QUEUE_SIZE);
    +        BatchTaskWorker worker = new BatchTaskWorker();
    +        worker.setDaemon(true);
    +        worker.setName(workerNamePre + "-" + worker.getName());
    +        worker.start();
    +    }
    +
    +    public void addTaskToWaitingQueue(Map<String, Object> t) {
    +        if (t == null) {
    +            logger.warn("task is blank, workerNamePre: {}", workerNamePre);
    +            return;
    +        }
    +
    +        if (queue.remainingCapacity() <= 0) {
    +            logger.warn("task is discard, workerNamePre: {}, task: {}", workerNamePre, JSON.toJSON(t));
    +            return;
    +        }
    +
    +        queue.add(t);
    +
    +        logger.info("add_task_to_waiting_queue_success, workerNamePre: {}, task: {}", workerNamePre, JSON.toJSON(t));
    +    }
    +
    +    /**
    +     * Consumer thread
    +     */
    +    class BatchTaskWorker extends Thread {
    +        public void run() {
    +            long insertTime = System.currentTimeMillis();
    +
    +            // Keep sequential execution within a limited number of tasks
    +            while (true) {
    +                try {
    +                    if (curConcurrentTaskNum.get() < getMaxConcurrentTaskCount()
    +                            && (System.currentTimeMillis() - insertTime) >= getBatchIntervalSecond() * 1000) {
    +                        Map<String, Object> task = queue.take();
    +                        doTask(task);
    +                        insertTime = System.currentTimeMillis();
    +                    }
    +                } catch (Throwable e) {
    +                    logger.error("Async_worker_doTask_failed, {}", workerNamePre + e.getMessage(), e);
    +                }
    +            }
    +        }
    +    }
    +
    +    public void increaseCurTaskNum(Long scheduleId) {
    +        curConcurrentTaskNum.incrementAndGet();
    +        if (scheduleId != null) taskAppIdMap.put(scheduleId, 1);
    +    }
    +
    +    //Remove tasks after job status updates
    +    public void decreaseCurTaskNum(Long scheduleId) {
    +        if (scheduleId != null && taskAppIdMap.containsKey(scheduleId)) {
    +            curConcurrentTaskNum.decrementAndGet();
    +            taskAppIdMap.remove(scheduleId);
    +        }
    +    }
    +
    +    /**
    +     * Submit a task to Livy and concurrent TaskNum++
    +     */
    +    protected void doTask(Map<String, Object> livyConfMap) {
    --- End diff --
    
    Looks like this is duplicating "post2Livy", probably it's better to merge it together.


---

[GitHub] griffin pull request #468: Livy concurrent submission queue

Posted by chemikadze <gi...@git.apache.org>.
Github user chemikadze commented on a diff in the pull request:

    https://github.com/apache/griffin/pull/468#discussion_r239993856
  
    --- Diff: service/src/main/java/org/apache/griffin/core/job/LivyTaskSubmitHelper.java ---
    @@ -0,0 +1,238 @@
    +package org.apache.griffin.core.job;
    +
    +import com.alibaba.fastjson.JSON;
    +import com.google.gson.Gson;
    +import org.apache.commons.lang.StringUtils;
    +import org.apache.griffin.core.job.entity.JobInstanceBean;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.springframework.beans.factory.annotation.Autowired;
    +import org.springframework.core.env.Environment;
    +import org.springframework.http.HttpEntity;
    +import org.springframework.http.HttpHeaders;
    +import org.springframework.http.MediaType;
    +import org.springframework.stereotype.Component;
    +import org.springframework.web.client.RestTemplate;
    +
    +import javax.annotation.PostConstruct;
    +import java.util.Map;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.ConcurrentMap;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import static org.apache.griffin.core.job.entity.LivySessionStates.State.FOUND;
    +import static org.apache.griffin.core.util.JsonUtil.toJsonWithFormat;
    +
    +@Component
    +public class LivyTaskSubmitHelper {
    +
    +    private static final Logger logger = LoggerFactory
    +            .getLogger(LivyTaskSubmitHelper.class);
    +
    +    private static final String REQUEST_BY_HEADER = "X-Requested-By";
    +    private JobInstanceBean jobInstance;
    +    private SparkSubmitJob sparkSubmitJob;
    +    private ConcurrentMap<Long, Integer> taskAppIdMap = new ConcurrentHashMap<>();
    +    // Current number of tasks
    +    private AtomicInteger curConcurrentTaskNum = new AtomicInteger(0);
    +    private String workerNamePre;
    +    private RestTemplate restTemplate = new RestTemplate();
    +    // queue for pub or sub
    +    private BlockingQueue<Map<String, Object>> queue;
    +    public static final int DEFAULT_QUEUE_SIZE = 20000;
    +    private String uri;
    +
    +    @Autowired
    +    private Environment env;
    +
    +    @PostConstruct
    +    public void init() {
    +        startWorker();
    +        uri = env.getProperty("livy.uri");
    +        logger.info("Livy uri : {}",uri);
    +    }
    +
    +    public LivyTaskSubmitHelper() {
    +        this.workerNamePre = "livy-task-submit-worker";
    +    }
    +
    +    public void startWorker() {
    +        queue = new LinkedBlockingQueue<>(DEFAULT_QUEUE_SIZE);
    +        BatchTaskWorker worker = new BatchTaskWorker();
    +        worker.setDaemon(true);
    +        worker.setName(workerNamePre + "-" + worker.getName());
    +        worker.start();
    +    }
    +
    +    public void addTaskToWaitingQueue(Map<String, Object> t) {
    +        if (t == null) {
    +            logger.warn("task is blank, workerNamePre: {}", workerNamePre);
    +            return;
    +        }
    +
    +        if (queue.remainingCapacity() <= 0) {
    +            logger.warn("task is discard, workerNamePre: {}, task: {}", workerNamePre, JSON.toJSON(t));
    +            return;
    +        }
    +
    +        queue.add(t);
    +
    +        logger.info("add_task_to_waiting_queue_success, workerNamePre: {}, task: {}", workerNamePre, JSON.toJSON(t));
    +    }
    +
    +    /**
    +     * Consumer thread
    +     */
    +    class BatchTaskWorker extends Thread {
    +        public void run() {
    +            long insertTime = System.currentTimeMillis();
    +
    +            // Keep sequential execution within a limited number of tasks
    +            while (true) {
    +                try {
    +                    if (curConcurrentTaskNum.get() < getMaxConcurrentTaskCount()
    --- End diff --
    
    Am I understanding right, that if `curConcurrentTaskNum.get() == getMaxConcurrentTaskCount()`, or `(System.currentTimeMillis() - insertTime) < getBatchIntervalSecond() * 1000`, this loop will be spinning without any pauses? Taking into account, that curConcurrentTaskNum decreases only when actual livy job ends, it will be taking whole core for long periods of time.


---

[GitHub] griffin pull request #468: Livy concurrent submission queue

Posted by chemikadze <gi...@git.apache.org>.
Github user chemikadze commented on a diff in the pull request:

    https://github.com/apache/griffin/pull/468#discussion_r239993250
  
    --- Diff: service/src/main/java/org/apache/griffin/core/job/LivyTaskSubmitHelper.java ---
    @@ -0,0 +1,238 @@
    +package org.apache.griffin.core.job;
    +
    +import com.alibaba.fastjson.JSON;
    +import com.google.gson.Gson;
    +import org.apache.commons.lang.StringUtils;
    +import org.apache.griffin.core.job.entity.JobInstanceBean;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.springframework.beans.factory.annotation.Autowired;
    +import org.springframework.core.env.Environment;
    +import org.springframework.http.HttpEntity;
    +import org.springframework.http.HttpHeaders;
    +import org.springframework.http.MediaType;
    +import org.springframework.stereotype.Component;
    +import org.springframework.web.client.RestTemplate;
    +
    +import javax.annotation.PostConstruct;
    +import java.util.Map;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.ConcurrentMap;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import static org.apache.griffin.core.job.entity.LivySessionStates.State.FOUND;
    +import static org.apache.griffin.core.util.JsonUtil.toJsonWithFormat;
    +
    +@Component
    +public class LivyTaskSubmitHelper {
    +
    +    private static final Logger logger = LoggerFactory
    +            .getLogger(LivyTaskSubmitHelper.class);
    +
    +    private static final String REQUEST_BY_HEADER = "X-Requested-By";
    +    private JobInstanceBean jobInstance;
    +    private SparkSubmitJob sparkSubmitJob;
    +    private ConcurrentMap<Long, Integer> taskAppIdMap = new ConcurrentHashMap<>();
    +    // Current number of tasks
    +    private AtomicInteger curConcurrentTaskNum = new AtomicInteger(0);
    +    private String workerNamePre;
    +    private RestTemplate restTemplate = new RestTemplate();
    +    // queue for pub or sub
    +    private BlockingQueue<Map<String, Object>> queue;
    +    public static final int DEFAULT_QUEUE_SIZE = 20000;
    +    private String uri;
    +
    +    @Autowired
    +    private Environment env;
    +
    +    @PostConstruct
    +    public void init() {
    +        startWorker();
    +        uri = env.getProperty("livy.uri");
    +        logger.info("Livy uri : {}",uri);
    +    }
    +
    +    public LivyTaskSubmitHelper() {
    +        this.workerNamePre = "livy-task-submit-worker";
    +    }
    +
    +    public void startWorker() {
    +        queue = new LinkedBlockingQueue<>(DEFAULT_QUEUE_SIZE);
    +        BatchTaskWorker worker = new BatchTaskWorker();
    +        worker.setDaemon(true);
    +        worker.setName(workerNamePre + "-" + worker.getName());
    +        worker.start();
    +    }
    +
    +    public void addTaskToWaitingQueue(Map<String, Object> t) {
    +        if (t == null) {
    +            logger.warn("task is blank, workerNamePre: {}", workerNamePre);
    +            return;
    +        }
    +
    +        if (queue.remainingCapacity() <= 0) {
    +            logger.warn("task is discard, workerNamePre: {}, task: {}", workerNamePre, JSON.toJSON(t));
    +            return;
    +        }
    +
    +        queue.add(t);
    +
    +        logger.info("add_task_to_waiting_queue_success, workerNamePre: {}, task: {}", workerNamePre, JSON.toJSON(t));
    +    }
    +
    +    /**
    +     * Consumer thread
    +     */
    +    class BatchTaskWorker extends Thread {
    +        public void run() {
    +            long insertTime = System.currentTimeMillis();
    +
    +            // Keep sequential execution within a limited number of tasks
    +            while (true) {
    +                try {
    +                    if (curConcurrentTaskNum.get() < getMaxConcurrentTaskCount()
    +                            && (System.currentTimeMillis() - insertTime) >= getBatchIntervalSecond() * 1000) {
    +                        Map<String, Object> task = queue.take();
    +                        doTask(task);
    +                        insertTime = System.currentTimeMillis();
    +                    }
    +                } catch (Throwable e) {
    +                    logger.error("Async_worker_doTask_failed, {}", workerNamePre + e.getMessage(), e);
    +                }
    +            }
    +        }
    +    }
    +
    +    public void increaseCurTaskNum(Long scheduleId) {
    +        curConcurrentTaskNum.incrementAndGet();
    +        if (scheduleId != null) taskAppIdMap.put(scheduleId, 1);
    +    }
    +
    +    //Remove tasks after job status updates
    +    public void decreaseCurTaskNum(Long scheduleId) {
    +        if (scheduleId != null && taskAppIdMap.containsKey(scheduleId)) {
    +            curConcurrentTaskNum.decrementAndGet();
    +            taskAppIdMap.remove(scheduleId);
    +        }
    +    }
    +
    +    /**
    +     * Submit a task to Livy and concurrent TaskNum++
    +     */
    +    protected void doTask(Map<String, Object> livyConfMap) {
    +        try {
    +            HttpHeaders headers = new HttpHeaders();
    +            headers.setContentType(MediaType.APPLICATION_JSON);
    +            headers.set(REQUEST_BY_HEADER, "admin");
    +
    +            HttpEntity<String> springEntity = new HttpEntity<String>(toJsonWithFormat(livyConfMap), headers);
    +
    +            String result = restTemplate.postForObject(uri, springEntity, String.class);
    +            logger.info("submit livy result: {}", result);
    +
    +
    +            Gson gson = new Gson();
    +            try {
    +                jobInstance = gson.fromJson(result, JobInstanceBean.class);
    +
    +                // The first time didn't get appId, try again several times
    +                if (StringUtils.isBlank(jobInstance.getAppId())) {
    +                    jobInstance = retryLivyGetAppId(jobInstance);
    +                }
    +
    +                logger.info("submit livy scheduleResult: {}", jobInstance);
    +            } catch (Exception e) {
    +                logger.error("submit livy scheduleResult covert error!", e);
    +            }
    +
    +            if (jobInstance != null) {
    +                //save result info into DataBase
    +                sparkSubmitJob.saveJobInstance(result, FOUND);
    +
    +                // Successful submission of a task
    +                increaseCurTaskNum(jobInstance.getId());
    +            }
    +        } catch (Exception e) {
    +            logger.error("submit task to livy error.", e);
    +        }
    +    }
    +
    +    private JobInstanceBean retryLivyGetAppId(JobInstanceBean jobInstance) {
    +
    +        int retryCount = getAppIdRetryCount();
    +
    +        if (retryCount <= 0) {
    +            return jobInstance;
    +        }
    +
    +        Long livyBatchesId = jobInstance.getId();
    +        if (livyBatchesId == null) {
    +            return jobInstance;
    +        }
    +
    +        while (retryCount-- > 0) {
    +            try {
    +                Thread.sleep(300);
    +            } catch (InterruptedException e) {
    +                logger.error(e.getMessage(), e);
    +            }
    +            String result = restTemplate.getForObject(uri + "/" + livyBatchesId, String.class);
    +            logger.info("retry get livy result: {}, batches id : {}", result, livyBatchesId);
    +
    +            Gson gson = new Gson();
    +            JobInstanceBean newJobInstance = gson.fromJson(result, JobInstanceBean.class);
    +            if (StringUtils.isNotBlank(newJobInstance.getAppId())) {
    +                return newJobInstance;
    +            }
    +        }
    +
    +        return jobInstance;
    +    }
    +
    +    // Maximum number of parallel tasks
    +    protected int getMaxConcurrentTaskCount() {
    --- End diff --
    
    Can't these getters be a `@Value`?


---

[GitHub] griffin issue #468: Livy concurrent submission queue

Posted by yangtong0912 <gi...@git.apache.org>.
Github user yangtong0912 commented on the issue:

    https://github.com/apache/griffin/pull/468
  
    Now there are other hive connection timeouts. I haven't altered the relevant code and configuration file. Could you please have a look? Do you need to configure something else?


---

[GitHub] griffin pull request #468: Livy concurrent submission queue

Posted by yangtong0912 <gi...@git.apache.org>.
Github user yangtong0912 commented on a diff in the pull request:

    https://github.com/apache/griffin/pull/468#discussion_r239994469
  
    --- Diff: service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java ---
    @@ -98,7 +100,12 @@ public void execute(JobExecutionContext context) {
                     updateJobInstanceState(context);
                     return;
                 }
    -            saveJobInstance(jd);
    +            if(livyTaskSubmitHelper.isNeedLivyQueue()){
    +                //livy batch limit
    +                livyTaskSubmitHelper.addTaskToWaitingQueue(livyConfMap);
    +            }else{
    +                saveJobInstance(jd);
    +            }
    --- End diff --
    
    > As you modify the job execution process, you need to update relevant tests in SparkSubmitJobTest class to make travis build pass.
    
    Thanks. Add mock bean in test and build success.


---

[GitHub] griffin pull request #468: Livy concurrent submission queue

Posted by chemikadze <gi...@git.apache.org>.
Github user chemikadze commented on a diff in the pull request:

    https://github.com/apache/griffin/pull/468#discussion_r239993349
  
    --- Diff: service/src/main/java/org/apache/griffin/core/job/LivyTaskSubmitHelper.java ---
    @@ -0,0 +1,238 @@
    +package org.apache.griffin.core.job;
    +
    +import com.alibaba.fastjson.JSON;
    +import com.google.gson.Gson;
    +import org.apache.commons.lang.StringUtils;
    +import org.apache.griffin.core.job.entity.JobInstanceBean;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.springframework.beans.factory.annotation.Autowired;
    +import org.springframework.core.env.Environment;
    +import org.springframework.http.HttpEntity;
    +import org.springframework.http.HttpHeaders;
    +import org.springframework.http.MediaType;
    +import org.springframework.stereotype.Component;
    +import org.springframework.web.client.RestTemplate;
    +
    +import javax.annotation.PostConstruct;
    +import java.util.Map;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.ConcurrentMap;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import static org.apache.griffin.core.job.entity.LivySessionStates.State.FOUND;
    +import static org.apache.griffin.core.util.JsonUtil.toJsonWithFormat;
    +
    +@Component
    +public class LivyTaskSubmitHelper {
    +
    +    private static final Logger logger = LoggerFactory
    +            .getLogger(LivyTaskSubmitHelper.class);
    +
    +    private static final String REQUEST_BY_HEADER = "X-Requested-By";
    +    private JobInstanceBean jobInstance;
    +    private SparkSubmitJob sparkSubmitJob;
    +    private ConcurrentMap<Long, Integer> taskAppIdMap = new ConcurrentHashMap<>();
    +    // Current number of tasks
    +    private AtomicInteger curConcurrentTaskNum = new AtomicInteger(0);
    +    private String workerNamePre;
    +    private RestTemplate restTemplate = new RestTemplate();
    +    // queue for pub or sub
    +    private BlockingQueue<Map<String, Object>> queue;
    +    public static final int DEFAULT_QUEUE_SIZE = 20000;
    +    private String uri;
    +
    +    @Autowired
    +    private Environment env;
    +
    +    @PostConstruct
    +    public void init() {
    +        startWorker();
    +        uri = env.getProperty("livy.uri");
    +        logger.info("Livy uri : {}",uri);
    +    }
    +
    +    public LivyTaskSubmitHelper() {
    +        this.workerNamePre = "livy-task-submit-worker";
    +    }
    +
    +    public void startWorker() {
    +        queue = new LinkedBlockingQueue<>(DEFAULT_QUEUE_SIZE);
    +        BatchTaskWorker worker = new BatchTaskWorker();
    +        worker.setDaemon(true);
    +        worker.setName(workerNamePre + "-" + worker.getName());
    +        worker.start();
    +    }
    +
    +    public void addTaskToWaitingQueue(Map<String, Object> t) {
    +        if (t == null) {
    +            logger.warn("task is blank, workerNamePre: {}", workerNamePre);
    +            return;
    +        }
    +
    +        if (queue.remainingCapacity() <= 0) {
    +            logger.warn("task is discard, workerNamePre: {}, task: {}", workerNamePre, JSON.toJSON(t));
    --- End diff --
    
    What will happen to job instance status after that?


---

[GitHub] griffin pull request #468: Livy concurrent submission queue

Posted by yangtong0912 <gi...@git.apache.org>.
Github user yangtong0912 commented on a diff in the pull request:

    https://github.com/apache/griffin/pull/468#discussion_r239994552
  
    --- Diff: service/pom.xml ---
    @@ -124,6 +124,12 @@ under the License.
                 <artifactId>jackson-databind</artifactId>
                 <version>2.6.3</version>
             </dependency>
    +
    +        <dependency>
    +            <groupId>com.alibaba</groupId>
    --- End diff --
    
    > Do we really need this dependency just for logging?
    
    Thanks. I'll remove this dependency.  


---

[GitHub] griffin issue #468: Livy concurrent submission queue

Posted by bhlx3lyx7 <gi...@git.apache.org>.
Github user bhlx3lyx7 commented on the issue:

    https://github.com/apache/griffin/pull/468
  
    The CI check fails, would you take a look about this first?


---