You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2019/01/30 15:19:15 UTC

[griffin] branch master updated: Job rate limit for scheduler

This is an automated email from the ASF dual-hosted git repository.

guoyp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/griffin.git


The following commit(s) were added to refs/heads/master by this push:
     new 53aa8bd  Job rate limit for scheduler
53aa8bd is described below

commit 53aa8bdb2c04f5b03d9b6d42c98183d9bc5f6199
Author: yangtong0912 <yt...@163.com>
AuthorDate: Wed Jan 30 23:19:09 2019 +0800

    Job rate limit for scheduler
    
    In order to prevent spark cluster resource constraints or too many concurrent tasks at the same time, which results in the failure of Livy submitting jobs, join Livy blocking queue and limit the number of concurrent tasks.GRIFFIN-220
    
    Author: yangtong0912 <yt...@163.com>
    Author: michael.yang <mi...@vipshop.com>
    
    Closes #468 from yangtong0912/master.
---
 pom.xml                                            |   2 +-
 service/pom.xml                                    |   1 +
 .../apache/griffin/core/job/JobServiceImpl.java    |  19 +-
 .../griffin/core/job/LivyTaskSubmitHelper.java     | 224 +++++++++++++++++++++
 .../apache/griffin/core/job/SparkSubmitJob.java    |  57 ++++--
 .../src/main/resources/application-dev.properties  |   2 +-
 .../main/resources/application-docker.properties   |   5 +-
 .../src/main/resources/application-prod.properties |   4 +
 service/src/main/resources/application.properties  |   4 +
 .../griffin/core/job/SparkSubmitJobTest.java       |   2 +
 10 files changed, 297 insertions(+), 23 deletions(-)

diff --git a/pom.xml b/pom.xml
index efc1f71..18c5f64 100644
--- a/pom.xml
+++ b/pom.xml
@@ -176,4 +176,4 @@ under the License.
         </plugins>
     </build>
 
-</project>
+</project>
\ No newline at end of file
diff --git a/service/pom.xml b/service/pom.xml
index 40e48ab..fed98bc 100644
--- a/service/pom.xml
+++ b/service/pom.xml
@@ -129,6 +129,7 @@ under the License.
             <artifactId>jackson-databind</artifactId>
             <version>2.6.3</version>
         </dependency>
+
         <!-- to access metastore from hive-->
         <dependency>
             <groupId>org.apache.hadoop</groupId>
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
index 69f965f..7dc7f6d 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
@@ -83,14 +83,16 @@ import static org.apache.griffin.core.exception.GriffinExceptionMessage.JOB_TYPE
 import static org.apache.griffin.core.exception.GriffinExceptionMessage.MEASURE_TYPE_DOES_NOT_SUPPORT;
 import static org.apache.griffin.core.exception.GriffinExceptionMessage.NO_SUCH_JOB_ACTION;
 import static org.apache.griffin.core.exception.GriffinExceptionMessage.QUARTZ_JOB_ALREADY_EXIST;
-import static org.apache.griffin.core.job.entity.LivySessionStates.State.BUSY;
-import static org.apache.griffin.core.job.entity.LivySessionStates.State.DEAD;
-import static org.apache.griffin.core.job.entity.LivySessionStates.State.IDLE;
+import static org.apache.griffin.core.job.entity.LivySessionStates.State.STARTING;
+import static org.apache.griffin.core.job.entity.LivySessionStates.State.UNKNOWN;
 import static org.apache.griffin.core.job.entity.LivySessionStates.State.NOT_STARTED;
 import static org.apache.griffin.core.job.entity.LivySessionStates.State.RECOVERING;
+import static org.apache.griffin.core.job.entity.LivySessionStates.State.IDLE;
 import static org.apache.griffin.core.job.entity.LivySessionStates.State.RUNNING;
-import static org.apache.griffin.core.job.entity.LivySessionStates.State.STARTING;
-import static org.apache.griffin.core.job.entity.LivySessionStates.State.UNKNOWN;
+import static org.apache.griffin.core.job.entity.LivySessionStates.State.BUSY;
+import static org.apache.griffin.core.job.entity.LivySessionStates.State.DEAD;
+import static org.apache.griffin.core.job.entity.LivySessionStates.State.SUCCESS;
+
 import static org.apache.griffin.core.job.entity.LivySessionStates.isActive;
 import static org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType.BATCH;
 import static org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType.STREAMING;
@@ -131,6 +133,8 @@ public class JobServiceImpl implements JobService {
     private StreamingJobOperatorImpl streamingJobOp;
     @Autowired
     private GriffinEventManager eventManager;
+    @Autowired
+    private LivyTaskSubmitHelper livyTaskSubmitHelper;
 
     private RestTemplate restTemplate;
 
@@ -529,6 +533,7 @@ public class JobServiceImpl implements JobService {
             LOGGER.warn("sessionId({}) appId({}) {}.", instance.getSessionId(),
                 instance.getAppId(), e.getMessage());
             setStateByYarn(instance, e);
+            livyTaskSubmitHelper.decreaseCurTaskNum(instance.getId());
         } catch (Exception e) {
             LOGGER.error(e.getMessage());
         }
@@ -606,6 +611,10 @@ public class JobServiceImpl implements JobService {
             instance.setAppUri(appId == null ? null : env
                 .getProperty("yarn.uri") + "/cluster/app/" + appId);
             instanceRepo.save(instance);
+            // If Livy returns to success or dead, task execution completes one,TaskNum--
+            if (SUCCESS.equals(state) || DEAD.equals(state)) {
+                livyTaskSubmitHelper.decreaseCurTaskNum(instance.getId());
+            }
         }
     }
 
diff --git a/service/src/main/java/org/apache/griffin/core/job/LivyTaskSubmitHelper.java b/service/src/main/java/org/apache/griffin/core/job/LivyTaskSubmitHelper.java
new file mode 100644
index 0000000..df33fa1
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/job/LivyTaskSubmitHelper.java
@@ -0,0 +1,224 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.job;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.annotation.PostConstruct;
+
+import static org.apache.griffin.core.job.entity.LivySessionStates.State.NOT_FOUND;
+import static org.apache.griffin.core.util.JsonUtil.toEntity;
+
+import org.apache.commons.collections.map.HashedMap;
+import org.quartz.JobDetail;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.core.env.Environment;
+import org.springframework.stereotype.Component;
+import org.springframework.web.client.RestTemplate;
+
+@Component
+public class LivyTaskSubmitHelper {
+
+    private static final Logger logger = LoggerFactory.getLogger(LivyTaskSubmitHelper.class);
+
+    private static final String REQUEST_BY_HEADER = "X-Requested-By";
+    private SparkSubmitJob sparkSubmitJob;
+    private ConcurrentMap<Long, Integer> taskAppIdMap = new ConcurrentHashMap<>();
+    // Current number of tasks
+    private AtomicInteger curConcurrentTaskNum = new AtomicInteger(0);
+    private String workerNamePre;
+    private RestTemplate restTemplate = new RestTemplate();
+    // queue for pub or sub
+    private BlockingQueue<JobDetail> queue;
+    public static final int DEFAULT_QUEUE_SIZE = 20000;
+    private static final int SLEEP_TIME = 300;
+    private String uri;
+
+    @Value("${livy.task.max.concurrent.count:20}")
+    private int maxConcurrentTaskCount;
+    @Value("${livy.task.submit.interval.second:3}")
+    private int batchIntervalSecond;
+
+
+    @Autowired
+    private Environment env;
+
+    /**
+     * Initialize related parameters and open consumer threads.
+     */
+    @PostConstruct
+    public void init() {
+        startWorker();
+        uri = env.getProperty("livy.uri");
+        logger.info("Livy uri : {}", uri);
+    }
+
+    public LivyTaskSubmitHelper() {
+        this.workerNamePre = "livy-task-submit-worker";
+    }
+
+    /**
+     * Initialize blocking queues and start consumer threads.
+     */
+    public void startWorker() {
+        queue = new LinkedBlockingQueue<>(DEFAULT_QUEUE_SIZE);
+        ExecutorService executorService = Executors.newSingleThreadExecutor();
+        TaskInner taskInner = new TaskInner(executorService);
+        executorService.execute(taskInner);
+    }
+
+    /**
+     * Put job detail into the queue.
+     *
+     * @param jd job detail.
+     */
+    public void addTaskToWaitingQueue(JobDetail jd) throws IOException {
+        if (jd == null) {
+            logger.warn("task is blank, workerNamePre: {}", workerNamePre);
+            return;
+        }
+
+        if (queue.remainingCapacity() <= 0) {
+            logger.warn("task is discard, workerNamePre: {}, task: {}", workerNamePre, jd);
+            sparkSubmitJob.saveJobInstance(null, NOT_FOUND);
+            return;
+        }
+
+        queue.add(jd);
+
+        logger.info("add_task_to_waiting_queue_success, workerNamePre: {}, task: {}",
+                workerNamePre, jd);
+    }
+
+    /**
+     * Consumer thread.
+     */
+    class TaskInner implements Runnable {
+        private ExecutorService es;
+
+        public TaskInner(ExecutorService es) {
+            this.es = es;
+        }
+
+        public void run() {
+            long insertTime = System.currentTimeMillis();
+            while (true) {
+                try {
+                    if (curConcurrentTaskNum.get() < maxConcurrentTaskCount
+                            && (System.currentTimeMillis() - insertTime) >= batchIntervalSecond * 1000) {
+                        JobDetail jd = queue.take();
+                        sparkSubmitJob.saveJobInstance(jd);
+                        insertTime = System.currentTimeMillis();
+                    } else {
+                        Thread.sleep(SLEEP_TIME);
+                    }
+                } catch (Exception e) {
+                    logger.error("Async_worker_doTask_failed, {}", e.getMessage(), e);
+                    es.execute(this);
+                }
+            }
+        }
+    }
+
+    /**
+     * Add the batch id returned by Livy.
+     *
+     * @param scheduleId livy batch id.
+     */
+    public void increaseCurTaskNum(Long scheduleId) {
+        curConcurrentTaskNum.incrementAndGet();
+        if (scheduleId != null) {
+            taskAppIdMap.put(scheduleId, 1);
+        }
+    }
+
+    /**
+     * Remove tasks after job status updates.
+     *
+     * @param scheduleId livy batch id.
+     */
+    public void decreaseCurTaskNum(Long scheduleId) {
+        if (scheduleId != null && taskAppIdMap.containsKey(scheduleId)) {
+            curConcurrentTaskNum.decrementAndGet();
+            taskAppIdMap.remove(scheduleId);
+        }
+    }
+
+    protected Map<String, Object> retryLivyGetAppId(String result, int appIdRetryCount)
+            throws IOException {
+
+        int retryCount = appIdRetryCount;
+        TypeReference<HashMap<String, Object>> type =
+                new TypeReference<HashMap<String, Object>>() {
+                };
+        Map<String, Object> resultMap = toEntity(result, type);
+
+        if (retryCount <= 0) {
+            return null;
+        }
+
+        if (resultMap.get("appId") != null) {
+            return resultMap;
+        }
+
+        Object livyBatchesId = resultMap.get("id");
+        if (livyBatchesId == null) {
+            return resultMap;
+        }
+
+        while (retryCount-- > 0) {
+            try {
+                Thread.sleep(SLEEP_TIME);
+            } catch (InterruptedException e) {
+                logger.error(e.getMessage(), e);
+            }
+            resultMap = getResultByLivyId(livyBatchesId, type);
+            logger.info("retry get livy resultMap: {}, batches id : {}", resultMap, livyBatchesId);
+
+            if (resultMap.get("appId") != null) {
+                break;
+            }
+        }
+
+        return resultMap;
+    }
+
+    private Map<String, Object> getResultByLivyId(Object livyBatchesId, TypeReference<HashMap<String, Object>> type)
+            throws IOException {
+        Map<String, Object> resultMap = new HashedMap();
+        String newResult = restTemplate.getForObject(uri + "/" + livyBatchesId, String.class);
+        return newResult == null ? resultMap : toEntity(newResult, type);
+    }
+
+}
diff --git a/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java b/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java
index cd0e7e8..28ead85 100644
--- a/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java
+++ b/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java
@@ -31,7 +31,6 @@ import static org.apache.griffin.core.job.entity.LivySessionStates.State.FOUND;
 import static org.apache.griffin.core.job.entity.LivySessionStates.State.NOT_FOUND;
 import static org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType.BATCH;
 import static org.apache.griffin.core.util.JsonUtil.toEntity;
-
 import com.fasterxml.jackson.core.type.TypeReference;
 
 import java.io.IOException;
@@ -56,16 +55,18 @@ import org.quartz.SchedulerException;
 import org.quartz.SimpleTrigger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.core.env.Environment;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.MediaType;
 import org.springframework.stereotype.Component;
 import org.springframework.util.CollectionUtils;
 import org.springframework.util.StringUtils;
 import org.springframework.web.client.HttpClientErrorException;
 import org.springframework.web.client.RestTemplate;
-import org.springframework.http.HttpEntity;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.MediaType;
 import static org.apache.griffin.core.util.JsonUtil.toJsonWithFormat;
 
 @PersistJobDataAfterExecution
@@ -81,6 +82,13 @@ public class SparkSubmitJob implements Job {
     private BatchJobOperatorImpl batchJobOp;
     @Autowired
     private Environment env;
+    @Autowired
+    private LivyTaskSubmitHelper livyTaskSubmitHelper;
+
+    @Value("${livy.need.queue:false}")
+    private boolean isNeedLivyQueue;
+    @Value("${livy.task.appId.retry.count:3}")
+    private int appIdRetryCount;
 
     private GriffinMeasure measure;
     private String livyUri;
@@ -98,7 +106,12 @@ public class SparkSubmitJob implements Job {
                 updateJobInstanceState(context);
                 return;
             }
-            saveJobInstance(jd);
+            if (isNeedLivyQueue) {
+                //livy batch limit
+                livyTaskSubmitHelper.addTaskToWaitingQueue(jd);
+            } else {
+                saveJobInstance(jd);
+            }
         } catch (Exception e) {
             LOGGER.error("Post spark task ERROR.", e);
         }
@@ -120,10 +133,10 @@ public class SparkSubmitJob implements Job {
             HttpHeaders headers = new HttpHeaders();
             headers.setContentType(MediaType.APPLICATION_JSON);
             headers.set(REQUEST_BY_HEADER,"admin");
-          
-            HttpEntity<String> springEntity = new HttpEntity<String>(toJsonWithFormat(livyConfMap), headers );
+
+            HttpEntity<String> springEntity = new HttpEntity<String>(toJsonWithFormat(livyConfMap),headers);
             result = restTemplate.postForObject(livyUri,springEntity,String.class);
-           
+
             LOGGER.info(result);
         } catch (HttpClientErrorException e) {
             LOGGER.error("Post to livy ERROR. \n {} {}",
@@ -205,7 +218,7 @@ public class SparkSubmitJob implements Job {
         List<String> args = new ArrayList<>();
         args.add(genEnv());
         String measureJson = JsonUtil.toJsonWithFormat(measure);
-        // to fix livy bug: character ` will be ignored by livy
+        // to fix livy bug: character will be ignored by livy
         String finalMeasureJson = escapeCharacter(measureJson, "\\`");
         LOGGER.info(finalMeasureJson);
         args.add(finalMeasureJson);
@@ -214,23 +227,37 @@ public class SparkSubmitJob implements Job {
     }
 
 
-    private void saveJobInstance(JobDetail jd) throws SchedulerException,
+    protected void saveJobInstance(JobDetail jd) throws SchedulerException,
             IOException {
         // If result is null, it may livy uri is wrong
         // or livy parameter is wrong.
-        String result = post2Livy();
+        Map<String, Object> resultMap = post2LivyWithRetry();
         String group = jd.getKey().getGroup();
         String name = jd.getKey().getName();
         batchJobOp.deleteJob(group, name);
         LOGGER.info("Delete predicate job({},{}) SUCCESS.", group, name);
-        saveJobInstance(result, FOUND);
+        setJobInstance(resultMap, FOUND);
+        jobInstanceRepo.save(jobInstance);
+    }
+
+    private Map<String, Object> post2LivyWithRetry()
+            throws IOException {
+        String result = post2Livy();
+        Map<String, Object> resultMap = null;
+        if (result != null) {
+            resultMap = livyTaskSubmitHelper.retryLivyGetAppId(result,appIdRetryCount);
+            if (resultMap != null) {
+                livyTaskSubmitHelper.increaseCurTaskNum(Long.valueOf(String.valueOf(resultMap.get("id"))).longValue());
+            }
+        }
+
+        return resultMap;
     }
 
-    private void saveJobInstance(String result, State state)
+    protected void saveJobInstance(String result, State state)
             throws IOException {
         TypeReference<HashMap<String, Object>> type =
-                new TypeReference<HashMap<String, Object>>() {
-                };
+                new TypeReference<HashMap<String, Object>>() {};
         Map<String, Object> resultMap = null;
         if (result != null) {
             resultMap = toEntity(result, type);
diff --git a/service/src/main/resources/application-dev.properties b/service/src/main/resources/application-dev.properties
index 544dbb4..d310ce3 100644
--- a/service/src/main/resources/application-dev.properties
+++ b/service/src/main/resources/application-dev.properties
@@ -25,4 +25,4 @@ spring.datasource.driver-class-name=org.h2.Driver
 spring.datasource.schema=classpath:init_quartz_h2.sql
 # enable h2 console, default path: http://localhost:8080/h2-console/
 spring.h2.console.enabled=true
-spring.jpa.show-sql=true
+spring.jpa.show-sql=true
\ No newline at end of file
diff --git a/service/src/main/resources/application-docker.properties b/service/src/main/resources/application-docker.properties
index e07979b..8c7806e 100644
--- a/service/src/main/resources/application-docker.properties
+++ b/service/src/main/resources/application-docker.properties
@@ -74,6 +74,9 @@ elasticsearch.scheme = http
 
 # livy
 livy.uri=http://10.148.215.23:38998/batches
-
+livy.need.queue=false
+livy.task.max.concurrent.count=20
+livy.task.submit.interval.second=3
+livy.task.appId.retry.count=3
 # yarn url
 yarn.uri=http://10.148.215.23:38088
diff --git a/service/src/main/resources/application-prod.properties b/service/src/main/resources/application-prod.properties
index 199a5a3..b977276 100644
--- a/service/src/main/resources/application-prod.properties
+++ b/service/src/main/resources/application-prod.properties
@@ -60,5 +60,9 @@ elasticsearch.scheme=http
 # elasticsearch.password = password
 # livy
 livy.uri=http://localhost:8998/batches
+livy.need.queue=false
+livy.task.max.concurrent.count=20
+livy.task.submit.interval.second=3
+livy.task.appId.retry.count=3
 # yarn url
 yarn.uri=http://localhost:8088
\ No newline at end of file
diff --git a/service/src/main/resources/application.properties b/service/src/main/resources/application.properties
index 1c26319..cb1dae5 100644
--- a/service/src/main/resources/application.properties
+++ b/service/src/main/resources/application.properties
@@ -60,6 +60,10 @@ elasticsearch.scheme=http
 # elasticsearch.password = password
 # livy
 livy.uri=http://localhost:8998/batches
+livy.need.queue=false
+livy.task.max.concurrent.count=20
+livy.task.submit.interval.second=3
+livy.task.appId.retry.count=3
 # yarn url
 yarn.uri=http://localhost:8088
 # griffin event listener
diff --git a/service/src/test/java/org/apache/griffin/core/job/SparkSubmitJobTest.java b/service/src/test/java/org/apache/griffin/core/job/SparkSubmitJobTest.java
index e9a481e..3d8fe26 100644
--- a/service/src/test/java/org/apache/griffin/core/job/SparkSubmitJobTest.java
+++ b/service/src/test/java/org/apache/griffin/core/job/SparkSubmitJobTest.java
@@ -88,6 +88,8 @@ public class SparkSubmitJobTest {
     @MockBean
     private BatchJobOperatorImpl batchJobOp;
 
+    @MockBean
+    private LivyTaskSubmitHelper livyTaskSubmitHelper;
 
     @Before
     public void setUp() {