You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by GitBox <gi...@apache.org> on 2020/08/11 02:55:15 UTC

[GitHub] [shardingsphere-elasticjob] coodajingang opened a new pull request #1358: Add dag

coodajingang opened a new pull request #1358:
URL: https://github.com/apache/shardingsphere-elasticjob/pull/1358


   Fixes #ISSUSE_ID.
   
   Changes proposed in this pull request:
   ## Dag 
   ### dag job 自注册 
   在job的配置类`JobConfigruation` 中增加`JobDagConfiguration`类,用来配置Dag相关信息,包括dag的名称dagName、该job依赖的其他jobName、job重试次数和间隔等。
   当job注册自身到zk节点{jobName}/config 时,dag配置信息也会注册到其下。 同时,dag job也会把自己注册到{dagName}/config/{jobName} 节点下,其值为该job的依赖。
   ### 触发机制
   注册完后,dag job会启动一个`CuratorCacheListener`,监听路径是 {jobName}/state ,当该job运行结束时,成功失败状态会写到该路径下,监听器接收到事件后进行下一依赖job触发或统计整个dag状态,这是dag实现自我触发的动力。
   ### job 增加状态统计
   ZK结构如下:
   * /{namespace}	/{jobName}	/state/state	    运行状态,success-成功;fail-失败;running-处理中
   * /{namespace}	/{jobName}	/proc/succ/{item}	成功分片
   * /{namespace}	/{jobName}	/proc/fail/{item}	失败分片
   
   当job开始运行时状态置为running; 
   当一个分片执行结束时,会根据其成功失败状态登记到proc/succ或fail下,当所有的分片都有终态后,更新/state/state的值。
   
   ### dag 相关zk路径
   * /{namespace}    /dag    /{dagName}    /config     /{jobName}  值为依赖job,逗号分割;job自注册时登记;
   * /{namespace}    /dag    /{dagName}    /graph      /{jobName}  值为依赖job,逗号分割;当根节点运行时根据config生成,可以理解为执行计划
   * /{namespace}    /dag    /{dagName}    /graph      /{jobName}/retry  值为当前的重试次数
   * /{namespace}    /dag    /{dagName}    /states                 值为dag的状态,有成功、失败、运行中、暂停状态;
   * /{namespace}    /dag    /{dagName}    /running     /{jobName} 指正在运行中的job,job被触发运行时登记在该路径下
   * /{namespace}    /dag    /{dagName}    /success     /{jobName} 指运行成功的job,job运行成功后由running路径转移到该路径下
   * /{namespace}    /dag    /{dagName}    /fail        /{jobName} 指运行失败的job,job运行失败后由running路径转移到该路径下
   * /{namespace}    /dag    /{dagName}    /skip        /{jobName} 指skip的job,job运行失败时如果标记为可以跳过,则由running路径转移到该路径下
   * /{namespace}    /dag    /{dagName}    /retry       /{jobName} 指等待被重试的job,job运行失败时根据重试参数进行重试的登记在该路径下 
   * /{namespace}    /daglatch   /{dagName}                        根job选主路径 
   * /{namespace}    /dagretry   /{dagName}   /{jobName}           job重试延迟队列  
   
   ### 关于dag job 重试 
   dag job可以配置重试参数,失败时检查重试参数,需要重试则放入zk的延时队列进行重试;
   延时队列到期后触发job进行执行。 
   为了防止重试时重复触发,通过事务执行: 删除dag retry下该job + 写instances trigger进行控制, 保证trigger只会写一遍。  
   
   ### 关于graph 
   graph下的配置信息跟config下一样,引入原因有二:
   1. dag需要检查有无环,而config下信息可能会在dag运行期间变动,所以在运行前先生成dag graph,在本次dag运行中,不管config下如何变动都以graph为准执行。
   2. graph下保存有当前重试次数,每次执行时若dag达到终态都会重新生成,便于保存运行中参数。
   
   ### dag 状态 
   dag状态有running、pause 、 fail 、success , 状态转换如下:
   running:通过控制台暂停按钮转换为pause,将不再触发后续依赖job; 当没有可触发的job时,更新为fail或success; 
   pause: 通过控制台恢复安装转换为running,并触发后续依赖job ;
   fail: 在无可触发的job时,若存在失败job,则更新为fail 
   success:在无可触发的job时,若graph中所有的job都成功,更新为success;
   
   ### dag job cron 
   根job,指依赖为`self`的job, 在一个dagName中可以有多个,其cron表达式按正常定时需要来; 
   其他job,指依赖为其他job的job,它不应该被定时调起,而应该被依赖的job调起,其cron请设置为`1/59 * * * * ? 2099`
   
   ### 支持spring boot 配置


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [shardingsphere-elasticjob] coodajingang closed pull request #1358: Add dag

Posted by GitBox <gi...@apache.org>.
coodajingang closed pull request #1358:
URL: https://github.com/apache/shardingsphere-elasticjob/pull/1358


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [shardingsphere-elasticjob] terrymanu commented on a change in pull request #1358: Add dag

Posted by GitBox <gi...@apache.org>.
terrymanu commented on a change in pull request #1358:
URL: https://github.com/apache/shardingsphere-elasticjob/pull/1358#discussion_r470976191



##########
File path: elasticjob-executor/elasticjob-executor-kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutor.java
##########
@@ -86,6 +86,19 @@ public void execute() {
         } catch (final JobExecutionEnvironmentException cause) {
             jobErrorHandler.handleException(jobConfig.getJobName(), cause);
         }
+

Review comment:
       Please remove useless blank line

##########
File path: elasticjob-executor/elasticjob-executor-kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutor.java
##########
@@ -86,6 +86,19 @@ public void execute() {
         } catch (final JobExecutionEnvironmentException cause) {
             jobErrorHandler.handleException(jobConfig.getJobName(), cause);
         }
+
+        if (jobFacade.isDagJob()) {
+            try {
+                jobFacade.dagStatesCheck();
+                jobFacade.dagJobDependenciesCheck();
+                //CHECKSTYLE:OFF
+            } catch (Exception e) {
+                //CHECKSTYLE:ON
+                log.error("DAG job - {} exception! Check !", jobConfig.getJobName(), e);

Review comment:
       Throw exception is better than log only if configuration error




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [shardingsphere-elasticjob] TeslaCN commented on a change in pull request #1358: Add dag

Posted by GitBox <gi...@apache.org>.
TeslaCN commented on a change in pull request #1358:
URL: https://github.com/apache/shardingsphere-elasticjob/pull/1358#discussion_r468307674



##########
File path: elasticjob-infra/elasticjob-tracing/elasticjob-tracing-api/src/test/java/org/apache/shardingsphere/elasticjob/tracing/fixture/TestTracingListener.java
##########
@@ -41,7 +42,13 @@ public void listen(final JobExecutionEvent jobExecutionEvent) {
     public void listen(final JobStatusTraceEvent jobStatusTraceEvent) {
         jobEventCaller.call();
     }
-    
+
+    @Override
+    public void listen(DagJobExecutionEvent dagJobExecutionEvent) {

Review comment:
       Parameter dagJobExecutionEvent should be final.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [shardingsphere-elasticjob] Technoboy- commented on a change in pull request #1358: Add dag

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on a change in pull request #1358:
URL: https://github.com/apache/shardingsphere-elasticjob/pull/1358#discussion_r471057983



##########
File path: elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/DagService.java
##########
@@ -0,0 +1,557 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.lite.internal.dag;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.time.DateFormatUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.framework.recipes.queue.DistributedDelayQueue;
+import org.apache.curator.framework.recipes.queue.QueueBuilder;
+import org.apache.curator.framework.recipes.queue.QueueSerializer;
+import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
+import org.apache.shardingsphere.elasticjob.api.JobDagConfiguration;
+import org.apache.shardingsphere.elasticjob.infra.concurrent.BlockUtils;
+import org.apache.shardingsphere.elasticjob.infra.env.IpUtils;
+import org.apache.shardingsphere.elasticjob.infra.exception.DagRuntimeException;
+import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
+import org.apache.shardingsphere.elasticjob.lite.internal.state.JobStateEnum;
+import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
+import org.apache.shardingsphere.elasticjob.tracing.JobEventBus;
+import org.apache.shardingsphere.elasticjob.tracing.event.DagJobExecutionEvent;
+
+import java.io.UnsupportedEncodingException;
+import java.lang.management.ManagementFactory;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Job dag service.
+ */
+@Slf4j
+public class DagService implements CuratorCacheListener {
+    public static final String ROOT_JOB = "self";
+
+    private static final String DAG_LATCH_PATH = "/daglatch/";
+
+    private static final int DEFAULT_RETRY_INTERVAL = 30;
+
+    private static final String RETRY_PATH = "/dagretry/%s/%s";
+
+    private final DagNodeStorage dagNodeStorage;
+
+    private final JobDagConfiguration jobDagConfig;
+
+    private final String jobName;
+
+    private final String dagName;
+
+    private final InterProcessMutex mutex;
+
+    private final CoordinatorRegistryCenter regCenter;
+
+    private final JobEventBus jobEventBus;
+
+    private CuratorCache jobStateCache;
+
+    private DistributedDelayQueue<String> delayQueue;
+
+    public DagService(final CoordinatorRegistryCenter regCenter, final String jobName, final JobEventBus jobEventBus, final JobDagConfiguration jobDagConfig) {
+        this.jobDagConfig = jobDagConfig;
+        this.regCenter = regCenter;
+        this.jobName = jobName;
+        this.dagNodeStorage = new DagNodeStorage(regCenter, jobDagConfig.getDagName(), jobName);
+        this.dagName = jobDagConfig.getDagName();
+        this.jobEventBus = jobEventBus;
+        if (StringUtils.equals(jobDagConfig.getDagDependencies(), ROOT_JOB)) {
+            this.mutex = new InterProcessMutex((CuratorFramework) regCenter.getRawClient(), DAG_LATCH_PATH + dagName);
+        } else {
+            this.mutex = null;
+        }
+        regDagConfig();
+    }
+
+    public DagService(final CoordinatorRegistryCenter regCenter, final String dagName, final DagNodeStorage dagNodeStorage) {
+        this.regCenter = regCenter;
+        this.dagName = dagName;
+        this.dagNodeStorage = dagNodeStorage;
+        this.jobName = "";
+        this.jobDagConfig = null;
+        this.mutex = null;
+        this.jobStateCache = null;
+        this.delayQueue = null;
+        this.jobEventBus = null;
+    }
+
+    /**
+     * Init delay queue for retry jobs.
+     *
+     * @return DistributedDelayQueue
+     */
+    private DistributedDelayQueue<String> initDelayQueue() {
+        String retryPath = String.format(RETRY_PATH, dagName, jobName);
+        DistributedDelayQueue<String> delayQueue = QueueBuilder.builder((CuratorFramework) regCenter.getRawClient(), new JobRetryTrigger(regCenter, dagName), new QueueSerializer<String>() {
+            @Override
+            public byte[] serialize(final String item) {
+                try {
+                    return item.getBytes("utf-8");
+                } catch (UnsupportedEncodingException e) {
+                    log.error("Dag-{}[{}] Init delay queue exception.", dagName, jobName, e);
+                }
+                return null;
+            }
+
+            @Override
+            public String deserialize(final byte[] bytes) {
+                return new String(bytes);
+            }
+        }, retryPath).buildDelayQueue();
+
+        try {
+            delayQueue.start();
+            log.info("Dag-{}[{}] start delay queue, path={}", dagName, jobName, retryPath);
+            //CHECKSTYLE:OFF
+        } catch (Exception e) {
+            //CHECKSTYLE:ON
+            log.error("Dag-{}[{}] start delay queue Exception, path={}", dagName, jobName, retryPath, e);
+        }
+
+        return delayQueue;
+    }
+
+    private void startJobStateListener() {
+        jobStateCache.listenable().addListener(this);
+        try {
+            jobStateCache.start();
+            postEvent(DagJobStates.REG.getValue(), "Job register success");
+            //CHECKSTYLE:OFF
+        } catch (Exception exp) {
+            //CHECKSTYLE:ON
+            log.error("Start dag-{} job-{} state path listener Exception.", dagName, jobName, exp);
+            // ignore
+            postEvent(DagJobStates.REG.getValue(), "Job register Error:" + exp.getMessage());
+        }
+        log.info("Dag-{} job-{} state path listener has started success.", dagName, jobName);
+    }
+
+    private void stopJobStateListener() {
+        jobStateCache.close();
+    }
+
+    /**
+     * Is dag root job.
+     *
+     * @return boolean is dag root job
+     */
+    public boolean isDagRootJob() {
+        return StringUtils.equals(jobDagConfig.getDagDependencies(), "self");
+    }
+
+    /**
+     * current dag status.
+     *
+     * @return DagStates
+     */
+    public DagStates getDagStates() {
+        return DagStates.of(this.dagNodeStorage.currentDagStates());
+    }
+
+    /**
+     * Persist Dag config into zk.
+     * always overwrite.
+     */
+    private void regDagConfig() {
+        this.dagNodeStorage.persistDagConfig(genDependenciesString());
+        this.delayQueue = initDelayQueue();
+        this.jobStateCache = CuratorCache.build((CuratorFramework) regCenter.getRawClient(), this.dagNodeStorage.pathOfJobNodeState());
+        this.startJobStateListener();
+    }
+
+    private String genDependenciesString() {
+        return jobDagConfig.getDagDependencies();
+    }
+
+    /**
+     * 1. select leader ;
+     * 2. ReGraph DAG ;
+     * 3. Change DAG states to running
+     */
+    public void changeDagStatesAndReGraph() {
+        if (null == mutex) {
+            log.error("Need root job when change dag states and regraph!");
+            throw new DagRuntimeException("Need root job when change dag states and regraph!");
+        }
+
+        if (!acquireDagLeader()) {
+            blockUntilCompleted();
+            return;
+        }
+
+        if (getDagStates() == DagStates.RUNNING) {
+            log.info("Dag-{} states already RUNNING", dagName);
+            return;
+        }
+
+        try {
+            String batchNo = getBatchNo();
+            Map<String, Set<String>> allDagNode = dagNodeStorage.getAllDagConfigJobs();
+            checkCycle(allDagNode);
+            dagNodeStorage.initDagGraph(allDagNode, batchNo);
+            dagNodeStorage.updateDagStates(DagStates.RUNNING);
+            dagNodeStorage.updateDagJobStates(JobStateEnum.RUNNING);
+            // create graph event
+            postEvent(DagJobStates.INIT.getValue(), "Create graph success");
+            //CHECKSTYLE:OFF
+        } catch (Exception ex) {
+            //CHECKSTYLE:ON
+            postEvent(DagJobStates.INIT.getValue(), "Create graph error:" + ex.getMessage());
+        } finally {
+            releaseDagLeader();
+        }
+    }
+
+    private void blockUntilCompleted() {
+        int count = 0;
+        while (getDagStates() != DagStates.RUNNING) {
+            count++;
+            log.debug("DAG '{}' sleep short time until DAG graph completed. {}", dagName, count);
+            BlockUtils.sleep(300L);
+            if (count > 200) {
+                log.error("Dag-{} Wait a long time with Dag graph NOT complete", dagName);
+                throw new DagRuntimeException("Dag graph not complete!");
+            }
+        }
+    }
+
+    private boolean acquireDagLeader() {
+        try {
+            return mutex.acquire(200, TimeUnit.MILLISECONDS);
+            //CHECKSTYLE:OFF
+        } catch (Exception exp) {
+            //CHECKSTYLE:ON
+            log.debug("Dag-{} acquire lock error!", dagName, exp);
+        }
+        return false;
+    }
+
+    private void releaseDagLeader() {
+        try {
+            if (mutex.isAcquiredInThisProcess()) {
+                mutex.release();
+            }
+            //CHECKSTYLE:OFF
+        } catch (Exception exp) {
+            //CHECKSTYLE:ON
+            log.debug("Dag-{} release lock error!", dagName, exp);
+        }
+    }
+
+    /**
+     * Check dag has cycle.
+     *
+     * @param allDagNode dag config info.
+     */
+    private void checkCycle(final Map<String, Set<String>> allDagNode) {
+        Map<String, Set<String>> cloneMap = Maps.newHashMap();
+        allDagNode.forEach((key, value) -> cloneMap.put(key, Sets.newHashSet(value)));
+
+        while (removeSelf(cloneMap)) {
+            if (log.isDebugEnabled()) {
+                log.debug("Dag-{} remove root job.", dagName);
+            }
+        }
+        if (!cloneMap.isEmpty()) {
+            log.error("Dag {} find cycle {}", dagName, cloneMap.keySet().size());
+            printCycleNode(cloneMap);
+            throw new DagRuntimeException("Dag job find cycles");
+        }
+        log.info("Dag {} checkCycle success", dagName);
+    }
+
+    private void printCycleNode(final Map<String, Set<String>> cloneMap) {
+        cloneMap.forEach((k, v) -> {
+            log.error("{} has cycle with {}", k, Joiner.on("|").join(v));
+        });
+    }
+
+    private boolean removeSelf(final Map<String, Set<String>> cloneMap) {
+        Iterator<Map.Entry<String, Set<String>>> iterator = cloneMap.entrySet().iterator();
+        boolean removed = false;
+        while (iterator.hasNext()) {
+            Map.Entry<String, Set<String>> next = iterator.next();
+            Set<String> value = next.getValue();
+            value.remove("self");
+            if (value.isEmpty()) {
+                markKeyAsSelf(cloneMap, next.getKey());
+                iterator.remove();
+                removed = true;
+            }
+        }
+        return removed;
+    }
+
+    private void markKeyAsSelf(final Map<String, Set<String>> cloneMap, final String key) {
+        cloneMap.values().forEach(s -> s.remove(key));
+    }
+
+    private String getBatchNo() {
+        String date = DateFormatUtils.format(new Date(), "yyMMddHHmmss");
+        return dagName + IpUtils.getIp() + ManagementFactory.getRuntimeMXBean().getName().split("@")[0] + date;
+    }
+
+    /**
+     * When dag job start run ,check it's dependencies jobs states.
+     */
+    public void checkJobDependenciesState() {
+        DagJobStates currentJobRunStates = dagNodeStorage.getDagJobRunStates(jobName);
+        if (currentJobRunStates == DagJobStates.SUCCESS || currentJobRunStates == DagJobStates.FAIL) {
+            log.info("DAG- {} job- {} 's states is {},Can not run again!", jobDagConfig.getDagName(), jobName, currentJobRunStates);
+            throw new DagRuntimeException("Dag job has been completed");
+        }
+        if (isDagRootJob()) {
+            log.debug("DAG {} job {} is root,No deps.", jobDagConfig.getDagName(), jobName);
+            return;
+        }
+
+        // 要求dep skip 或 success

Review comment:
       Need to change to English




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [shardingsphere-elasticjob] Technoboy- commented on a change in pull request #1358: Add dag

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on a change in pull request #1358:
URL: https://github.com/apache/shardingsphere-elasticjob/pull/1358#discussion_r471057703



##########
File path: elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/DagJobStates.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.lite.internal.dag;
+
+import org.apache.commons.lang3.StringUtils;
+
+public enum DagJobStates {

Review comment:
       Need docs




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [shardingsphere-elasticjob] coodajingang commented on pull request #1358: Add dag

Posted by GitBox <gi...@apache.org>.
coodajingang commented on pull request #1358:
URL: https://github.com/apache/shardingsphere-elasticjob/pull/1358#issuecomment-674507711


   I will pull this request to the  DAG branch


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [shardingsphere-elasticjob] TeslaCN commented on a change in pull request #1358: Add dag

Posted by GitBox <gi...@apache.org>.
TeslaCN commented on a change in pull request #1358:
URL: https://github.com/apache/shardingsphere-elasticjob/pull/1358#discussion_r468308168



##########
File path: elasticjob-infra/elasticjob-tracing/elasticjob-tracing-api/src/test/java/org/apache/shardingsphere/elasticjob/tracing/fixture/TestTracingListener.java
##########
@@ -41,7 +42,13 @@ public void listen(final JobExecutionEvent jobExecutionEvent) {
     public void listen(final JobStatusTraceEvent jobStatusTraceEvent) {
         jobEventCaller.call();
     }
-    
+
+    @Override
+    public void listen(DagJobExecutionEvent dagJobExecutionEvent) {

Review comment:
       Parameter dagJobExecutionEvent should be final.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org