You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/08/16 14:41:19 UTC

[shardingsphere-elasticjob] branch dag updated: add dag (#1376)

This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch dag
in repository https://gitbox.apache.org/repos/asf/shardingsphere-elasticjob.git


The following commit(s) were added to refs/heads/dag by this push:
     new cd324fa  add dag (#1376)
cd324fa is described below

commit cd324fade73ff8aba4a1695e4eba45c6a7d8df53
Author: coodajingang <11...@users.noreply.github.com>
AuthorDate: Sun Aug 16 22:41:05 2020 +0800

    add dag (#1376)
    
    * Added dag
    
    * modify ci error
    
    * modify ci error
    
    * Add unit test
    
    * Add licenses
    
    * Add licenses
    
    Co-authored-by: dutengxiao <du...@aibank.com>
---
 .../elasticjob/api/JobConfiguration.java           |  29 +-
 .../elasticjob/api/JobDagConfiguration.java        |  64 +--
 .../elasticjob/api/JobConfigurationTest.java       |  28 ++
 .../elasticjob/cloud/facade/CloudJobFacade.java    |  23 +-
 .../cloud/executor/facade/CloudJobFacadeTest.java  |  15 +
 .../elasticjob/executor/ElasticJobExecutor.java    |  16 +-
 .../elasticjob/executor/JobFacade.java             |  28 +-
 .../executor/ElasticJobExecutorTest.java           |  41 +-
 .../infra/exception/DagRuntimeException.java       |  39 +-
 .../infra/pojo/JobConfigurationPOJO.java           |  29 +-
 .../infra/exception/DagRuntimeExceptionTest.java   |  40 +-
 .../infra/pojo/JobConfigurationPOJOTest.java       |  69 ++-
 .../tracing/event/DagJobExecutionEvent.java        |  90 ++++
 .../tracing/listener/TracingListener.java          |  10 +
 .../tracing/fixture/TestTracingListener.java       |   9 +-
 .../tracing/rdb/listener/RDBTracingListener.java   |   6 +
 .../tracing/rdb/storage/RDBJobEventStorage.java    |  53 +-
 .../tracing/rdb/storage/RDBStorageSQLMapper.java   |   6 +
 .../src/main/resources/META-INF/sql/DB2.properties |   3 +
 .../src/main/resources/META-INF/sql/H2.properties  |   3 +
 .../main/resources/META-INF/sql/MySQL.properties   |   3 +
 .../main/resources/META-INF/sql/Oracle.properties  |   3 +
 .../resources/META-INF/sql/PostgreSQL.properties   |   3 +
 .../main/resources/META-INF/sql/SQL92.properties   |   3 +
 .../resources/META-INF/sql/SQLServer.properties    |   3 +
 .../rdb/listener/RDBTracingListenerTest.java       |   8 +
 .../rdb/storage/RDBJobEventStorageTest.java        |   9 +-
 .../elasticjob/lite/internal/dag/DagJobStates.java |  73 +++
 .../lite/internal/dag/DagNodeStorage.java          | 499 ++++++++++++++++++
 .../elasticjob/lite/internal/dag/DagService.java   | 557 +++++++++++++++++++++
 .../elasticjob/lite/internal/dag/DagStates.java    |  60 ++-
 .../lite/internal/dag/JobRetryTrigger.java         |  78 +++
 .../lite/internal/schedule/JobScheduler.java       |   8 +-
 .../lite/internal/schedule/LiteJobFacade.java      |  65 ++-
 .../lite/internal/sharding/ExecutionService.java   |  57 ++-
 .../lite/internal/state/JobStateEnum.java          |  67 +++
 .../lite/internal/state/JobStateNode.java          | 102 ++++
 .../lite/internal/storage/JobNodeStorage.java      |  14 +-
 .../lite/internal/dag/DagJobRetryTriggerTest.java  |  57 +++
 .../lite/internal/dag/DagNodeStorageTest.java      | 233 +++++++++
 .../lite/internal/dag/DagServiceTest.java          | 123 +++++
 .../lite/internal/dag/JobDagConfigTest.java        |  95 ++++
 .../lite/internal/schedule/LiteJobFacadeTest.java  |  25 +
 .../internal/sharding/ExecutionServiceTest.java    |  36 +-
 .../lite/internal/state/JobStateEnumTest.java}     |  40 +-
 .../lite/internal/state/JobStateNodeTest.java}     |  55 +-
 .../src/test/resources/logback-test.xml            |   3 +
 .../lite/lifecycle/api/DagOperateAPI.java          |  83 +++
 .../lite/lifecycle/api/JobAPIFactory.java          |  13 +
 .../lite/lifecycle/domain/DagBriefInfo.java}       |  48 +-
 .../internal/operate/DagOperateAPIImpl.java        | 177 +++++++
 .../lite/lifecycle/api/JobAPIFactoryTest.java      |   5 +
 .../lifecycle/fixture/LifecycleYamlConstants.java  |   4 +
 .../internal/operate/DagOperateAPIImplTest.java    | 105 ++++
 .../job/ElasticJobConfigurationProperties.java     |  20 +-
 .../spring/boot/job/ElasticJobSpringBootTest.java  |  17 +
 .../src/test/resources/application-elasticjob.yml  |   4 +
 .../job/parser/JobBeanDefinitionParser.java        |  17 +
 .../namespace/job/tag/JobBeanDefinitionTag.java    |  12 +
 .../resources/META-INF/namespace/elasticjob.xsd    |   6 +
 .../fixture/job/ref/RefFooSimpleElasticJob.java    |   6 +
 .../job/JobSpringNamespaceWithRefDagTest.java      |  67 +++
 .../test/resources/META-INF/job/withJobRefDag.xml  |  50 ++
 .../src/test/resources/conf/job/conf.properties    |   4 +
 64 files changed, 3308 insertions(+), 210 deletions(-)

diff --git a/elasticjob-api/src/main/java/org/apache/shardingsphere/elasticjob/api/JobConfiguration.java b/elasticjob-api/src/main/java/org/apache/shardingsphere/elasticjob/api/JobConfiguration.java
index a567d02..5ca2347 100644
--- a/elasticjob-api/src/main/java/org/apache/shardingsphere/elasticjob/api/JobConfiguration.java
+++ b/elasticjob-api/src/main/java/org/apache/shardingsphere/elasticjob/api/JobConfiguration.java
@@ -66,7 +66,9 @@ public final class JobConfiguration {
     private final boolean disabled;
     
     private final boolean overwrite;
-    
+
+    private final JobDagConfiguration jobDagConfiguration;
+
     /**
      * Create ElasticJob configuration builder.
      *
@@ -114,7 +116,9 @@ public final class JobConfiguration {
         private boolean disabled;
         
         private boolean overwrite;
-    
+
+        private JobDagConfiguration jobDagConfiguration;
+
         /**
          * Cron expression.
          *
@@ -331,7 +335,18 @@ public final class JobConfiguration {
             this.overwrite = overwrite;
             return this;
         }
-        
+
+        /**
+         * Set Dag configuration.
+         *
+         * @param jobDagConfiguration dag configuration
+         * @return ElasticJob configuration builder
+         */
+        public Builder jobDagConfiguration(final JobDagConfiguration jobDagConfiguration) {
+            this.jobDagConfiguration = jobDagConfiguration;
+            return this;
+        }
+
         /**
          * Build ElasticJob configuration.
          * 
@@ -340,9 +355,15 @@ public final class JobConfiguration {
         public final JobConfiguration build() {
             Preconditions.checkArgument(!Strings.isNullOrEmpty(jobName), "jobName can not be empty.");
             Preconditions.checkArgument(shardingTotalCount > 0, "shardingTotalCount should larger than zero.");
+
+            if (null != jobDagConfiguration) {
+                Preconditions.checkArgument(!Strings.isNullOrEmpty(jobDagConfiguration.getDagName()), "dagName can not be empty When use DAG");
+                Preconditions.checkArgument(!Strings.isNullOrEmpty(jobDagConfiguration.getDagDependencies()), "dagDependencies can not be empty When use DAG");
+            }
+
             return new JobConfiguration(jobName, cron, shardingTotalCount, shardingItemParameters, jobParameter, 
                     monitorExecution, failover, misfire, maxTimeDiffSeconds, reconcileIntervalMinutes,
-                    jobShardingStrategyType, jobExecutorServiceHandlerType, jobErrorHandlerType, description, props, disabled, overwrite);
+                    jobShardingStrategyType, jobExecutorServiceHandlerType, jobErrorHandlerType, description, props, disabled, overwrite, jobDagConfiguration);
         }
     }
 }
diff --git a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/job/ref/RefFooSimpleElasticJob.java b/elasticjob-api/src/main/java/org/apache/shardingsphere/elasticjob/api/JobDagConfiguration.java
similarity index 50%
copy from elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/job/ref/RefFooSimpleElasticJob.java
copy to elasticjob-api/src/main/java/org/apache/shardingsphere/elasticjob/api/JobDagConfiguration.java
index 157d33d..5f31600 100644
--- a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/job/ref/RefFooSimpleElasticJob.java
+++ b/elasticjob-api/src/main/java/org/apache/shardingsphere/elasticjob/api/JobDagConfiguration.java
@@ -7,7 +7,7 @@
  * the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,33 +15,37 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.elasticjob.lite.spring.namespace.fixture.job.ref;
-
-import lombok.Getter;
-import lombok.Setter;
-import org.apache.shardingsphere.elasticjob.api.ShardingContext;
-import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
-import org.apache.shardingsphere.elasticjob.lite.spring.namespace.fixture.service.FooService;
-
-public class RefFooSimpleElasticJob implements SimpleJob {
-
-    @Getter
-    private static volatile boolean completed;
-    
-    @Getter
-    @Setter
-    private FooService fooService;
-    
-    @Override
-    public void execute(final ShardingContext shardingContext) {
-        fooService.foo();
-        completed = true;
-    }
-    
-    /**
-     * Set completed to false.
-     */
-    public static void reset() {
-        completed = false;
-    }
+package org.apache.shardingsphere.elasticjob.api;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.ToString;
+
+/**
+ * Job Dag configuration.
+ *
+ **/
+@AllArgsConstructor
+@NoArgsConstructor
+@Data
+@ToString
+public class JobDagConfiguration {
+    /** DAG name. */
+    private String dagName;
+
+    /** DAG dependencies. */
+    private String dagDependencies;
+
+    /** DAG retry times. */
+    private int retryTimes;
+
+    /** DAG retry interval. */
+    private int retryInterval;
+
+    /** Is dag job can run alone.  */
+    private boolean dagRunAlone;
+
+    /** Dag job can skip when fail. */
+    private boolean dagSkipWhenFail;
 }
diff --git a/elasticjob-api/src/test/java/org/apache/shardingsphere/elasticjob/api/JobConfigurationTest.java b/elasticjob-api/src/test/java/org/apache/shardingsphere/elasticjob/api/JobConfigurationTest.java
index a1e85ef..9056aa1 100644
--- a/elasticjob-api/src/test/java/org/apache/shardingsphere/elasticjob/api/JobConfigurationTest.java
+++ b/elasticjob-api/src/test/java/org/apache/shardingsphere/elasticjob/api/JobConfigurationTest.java
@@ -20,7 +20,9 @@ package org.apache.shardingsphere.elasticjob.api;
 import org.junit.Test;
 
 import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
@@ -87,4 +89,30 @@ public final class JobConfigurationTest {
     public void assertBuildWithInvalidShardingTotalCount() {
         JobConfiguration.newBuilder("test_job", -1).cron("0/1 * * * * ?").build();
     }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void assertBuildWithEmptyDagJobConfiguration() {
+        JobDagConfiguration jobDagConfiguration = new JobDagConfiguration();
+        JobConfiguration.newBuilder("test_job", 3).cron("0/1 * * * * ?").jobDagConfiguration(jobDagConfiguration).build();
+    }
+
+    @Test
+    public void assertBuildNullDagJobConfiguration() {
+        JobConfiguration actual = JobConfiguration.newBuilder("test_job", 3).cron("0/1 * * * * ?").jobDagConfiguration(null).build();
+        assertNull(actual.getJobDagConfiguration());
+    }
+
+    @Test
+    public void assertBuildDagJobConfiguration() {
+        JobDagConfiguration jobDagConfiguration = new JobDagConfiguration("fake_dag", "fake_dependencies", 3, 500, true, false);
+        JobConfiguration actual = JobConfiguration.newBuilder("test_job", 3).cron("0/1 * * * * ?").jobDagConfiguration(jobDagConfiguration).build();
+        assertThat(actual.getJobName(), is("test_job"));
+        assertNotNull(actual.getJobDagConfiguration());
+        assertTrue(actual.getJobDagConfiguration().isDagRunAlone());
+        assertFalse(actual.getJobDagConfiguration().isDagSkipWhenFail());
+        assertEquals(actual.getJobDagConfiguration().getDagName(), "fake_dag");
+        assertEquals(actual.getJobDagConfiguration().getDagDependencies(), "fake_dependencies");
+        assertEquals(actual.getJobDagConfiguration().getRetryTimes(), 3);
+        assertEquals(actual.getJobDagConfiguration().getRetryInterval(), 500);
+    }
 }
diff --git a/elasticjob-cloud/elasticjob-cloud-executor/src/main/java/org/apache/shardingsphere/elasticjob/cloud/facade/CloudJobFacade.java b/elasticjob-cloud/elasticjob-cloud-executor/src/main/java/org/apache/shardingsphere/elasticjob/cloud/facade/CloudJobFacade.java
index 7329e0b..21ca2cf 100755
--- a/elasticjob-cloud/elasticjob-cloud-executor/src/main/java/org/apache/shardingsphere/elasticjob/cloud/facade/CloudJobFacade.java
+++ b/elasticjob-cloud/elasticjob-cloud-executor/src/main/java/org/apache/shardingsphere/elasticjob/cloud/facade/CloudJobFacade.java
@@ -29,6 +29,7 @@ import org.apache.shardingsphere.elasticjob.tracing.event.JobStatusTraceEvent.So
 import org.apache.shardingsphere.elasticjob.tracing.event.JobStatusTraceEvent.State;
 
 import java.util.Collection;
+import java.util.Map;
 
 /**
  * Cloud job facade.
@@ -68,7 +69,12 @@ public final class CloudJobFacade implements JobFacade {
     @Override
     public void registerJobCompleted(final ShardingContexts shardingContexts) {
     }
-    
+
+    @Override
+    public void registerJobCompleted(final ShardingContexts shardingContexts, final Map<Integer, String> itemErrorMessages) {
+
+    }
+
     @Override
     public ShardingContexts getShardingContexts() {
         return shardingContexts;
@@ -112,4 +118,19 @@ public final class CloudJobFacade implements JobFacade {
         jobEventBus.post(new JobStatusTraceEvent(taskContext.getMetaInfo().getJobName(), taskContext.getId(), taskContext.getSlaveId(), 
                 Source.CLOUD_EXECUTOR, taskContext.getType().toString(), String.valueOf(taskContext.getMetaInfo().getShardingItems()), state, message));
     }
+
+    @Override
+    public boolean isDagJob() {
+        return false;
+    }
+
+    @Override
+    public void dagStatesCheck() {
+
+    }
+
+    @Override
+    public void dagJobDependenciesCheck() {
+
+    }
 }
diff --git a/elasticjob-cloud/elasticjob-cloud-executor/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/facade/CloudJobFacadeTest.java b/elasticjob-cloud/elasticjob-cloud-executor/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/facade/CloudJobFacadeTest.java
index a38a6fb..e23d8db 100755
--- a/elasticjob-cloud/elasticjob-cloud-executor/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/facade/CloudJobFacadeTest.java
+++ b/elasticjob-cloud/elasticjob-cloud-executor/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/facade/CloudJobFacadeTest.java
@@ -133,4 +133,19 @@ public final class CloudJobFacadeTest {
     public void assertPostJobStatusTraceEvent() {
         jobFacade.postJobStatusTraceEvent(String.format("%s@-@0@-@%s@-@fake_slave_id@-@0", "test_job", ExecutionType.READY), State.TASK_RUNNING, "message is empty.");
     }
+
+    @Test
+    public void assertIsDagJob() {
+        assertFalse(jobFacade.isDagJob());
+    }
+
+    @Test
+    public void assertDagStatesCheck() {
+        jobFacade.dagStatesCheck();
+    }
+
+    @Test
+    public void assertDagJobDependenciesCheck() {
+        jobFacade.dagJobDependenciesCheck();
+    }
 }
diff --git a/elasticjob-executor/elasticjob-executor-kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutor.java b/elasticjob-executor/elasticjob-executor-kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutor.java
index a21428e..d65f478 100644
--- a/elasticjob-executor/elasticjob-executor-kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutor.java
+++ b/elasticjob-executor/elasticjob-executor-kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutor.java
@@ -86,6 +86,19 @@ public final class ElasticJobExecutor {
         } catch (final JobExecutionEnvironmentException cause) {
             jobErrorHandler.handleException(jobConfig.getJobName(), cause);
         }
+
+        if (jobFacade.isDagJob()) {
+            try {
+                jobFacade.dagStatesCheck();
+                jobFacade.dagJobDependenciesCheck();
+                //CHECKSTYLE:OFF
+            } catch (Exception e) {
+                //CHECKSTYLE:ON
+                log.error("DAG job - {} exception! Check !", jobConfig.getJobName(), e);
+                return;
+            }
+        }
+
         ShardingContexts shardingContexts = jobFacade.getShardingContexts();
         jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, String.format("Job '%s' execute begin.", jobConfig.getJobName()));
         if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) {
@@ -127,8 +140,7 @@ public final class ElasticJobExecutor {
         try {
             process(shardingContexts, executionSource);
         } finally {
-            // TODO Consider increasing the status of job failure, and how to handle the overall loop of job failure
-            jobFacade.registerJobCompleted(shardingContexts);
+            jobFacade.registerJobCompleted(shardingContexts, itemErrorMessages);
             if (itemErrorMessages.isEmpty()) {
                 jobFacade.postJobStatusTraceEvent(taskId, State.TASK_FINISHED, "");
             } else {
diff --git a/elasticjob-executor/elasticjob-executor-kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/JobFacade.java b/elasticjob-executor/elasticjob-executor-kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/JobFacade.java
index 6d0c79b..d6d7e2a 100644
--- a/elasticjob-executor/elasticjob-executor-kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/JobFacade.java
+++ b/elasticjob-executor/elasticjob-executor-kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/JobFacade.java
@@ -24,6 +24,7 @@ import org.apache.shardingsphere.elasticjob.tracing.event.JobExecutionEvent;
 import org.apache.shardingsphere.elasticjob.tracing.event.JobStatusTraceEvent.State;
 
 import java.util.Collection;
+import java.util.Map;
 
 /**
  * Job facade.
@@ -63,7 +64,15 @@ public interface JobFacade {
      * @param shardingContexts sharding contexts
      */
     void registerJobCompleted(ShardingContexts shardingContexts);
-    
+
+    /**
+     * Register job completed. And error sharding items.
+     *
+     * @param shardingContexts sharding contexts
+     * @param itemErrorMessages error items
+     */
+    void registerJobCompleted(ShardingContexts shardingContexts, Map<Integer, String> itemErrorMessages);
+
     /**
      * Get sharding contexts.
      *
@@ -130,4 +139,21 @@ public interface JobFacade {
      * @param message job message
      */
     void postJobStatusTraceEvent(String taskId, State state, String message);
+
+    /**
+     * Is current job belong to DAG.
+     *
+     * @return is dag job
+     */
+    boolean isDagJob();
+
+    /**
+     * Check Dag Group States.
+     */
+    void dagStatesCheck();
+
+    /**
+     * check current job's dependencies are all success.
+     */
+    void dagJobDependenciesCheck();
 }
diff --git a/elasticjob-executor/elasticjob-executor-kernel/src/test/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutorTest.java b/elasticjob-executor/elasticjob-executor-kernel/src/test/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutorTest.java
index f3d27a4..1f0699b 100644
--- a/elasticjob-executor/elasticjob-executor-kernel/src/test/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutorTest.java
+++ b/elasticjob-executor/elasticjob-executor-kernel/src/test/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutorTest.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.elasticjob.executor;
 
 import lombok.SneakyThrows;
 import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
+import org.apache.shardingsphere.elasticjob.api.JobDagConfiguration;
 import org.apache.shardingsphere.elasticjob.api.listener.ShardingContexts;
 import org.apache.shardingsphere.elasticjob.executor.fixture.executor.ClassedFooJobExecutor;
 import org.apache.shardingsphere.elasticjob.executor.fixture.job.FooJob;
@@ -35,6 +36,7 @@ import java.lang.reflect.Field;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
@@ -70,7 +72,15 @@ public final class ElasticJobExecutorTest {
         return JobConfiguration.newBuilder("test_job", 3)
                 .cron("0/1 * * * * ?").shardingItemParameters("0=A,1=B,2=C").jobParameter("param").failover(true).misfire(false).jobErrorHandlerType("THROW").description("desc").build();
     }
-    
+
+    private JobConfiguration createJobConfigurationWithDag() {
+        JobDagConfiguration jobDagConfiguration = new JobDagConfiguration("fakeDag", "job1,job2", 3, 400,
+                false, false);
+        return JobConfiguration.newBuilder("test_job", 3)
+                .cron("0/1 * * * * ?").shardingItemParameters("0=A,1=B,2=C").jobParameter("param").failover(true).misfire(false).jobErrorHandlerType("THROW").description("desc")
+                .jobDagConfiguration(jobDagConfiguration).build();
+    }
+
     @SneakyThrows
     private void setJobItemExecutor() {
         Field field = ElasticJobExecutor.class.getDeclaredField("jobItemExecutor");
@@ -87,7 +97,17 @@ public final class ElasticJobExecutorTest {
             verify(jobItemExecutor, times(0)).process(eq(fooJob), eq(jobConfig), eq(jobFacade), any());
         }
     }
-    
+
+    @Test
+    public void assertExecuteWhenDagConfigEmpty() {
+        ShardingContexts shardingContexts = new ShardingContexts("fake_task_id", "test_job", 3, "", Collections.emptyMap());
+        when(jobFacade.getShardingContexts()).thenReturn(shardingContexts);
+        when(jobFacade.isDagJob()).thenReturn(true);
+        elasticJobExecutor.execute();
+        verify(jobFacade).postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, "Job 'test_job' execute begin.");
+        verify(jobItemExecutor, times(0)).process(eq(fooJob), eq(jobConfig), eq(jobFacade), any());
+    }
+
     @Test
     public void assertExecuteWhenPreviousJobStillRunning() {
         ShardingContexts shardingContexts = new ShardingContexts("fake_task_id", "test_job", 3, "", Collections.emptyMap());
@@ -131,10 +151,19 @@ public final class ElasticJobExecutorTest {
             verify(jobFacade).postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_ERROR, getErrorMessage(shardingContexts));
             verify(jobFacade).registerJobBegin(shardingContexts);
             verify(jobItemExecutor, times(shardingContexts.getShardingTotalCount())).process(eq(fooJob), eq(jobConfig), eq(jobFacade), any());
-            verify(jobFacade).registerJobCompleted(shardingContexts);
+            verify(jobFacade).registerJobCompleted(shardingContexts, getItemErrorMessage(shardingContexts));
         }
     }
-    
+
+    private Map<Integer, String> getItemErrorMessage(final ShardingContexts shardingContexts) {
+        Map<Integer, String> itemErrorMsg = new ConcurrentHashMap<>(jobConfig.getShardingTotalCount(), 1);
+        itemErrorMsg.put(0, "java.lang.RuntimeException" + System.lineSeparator());
+        if (shardingContexts.getShardingItemParameters().size() > 1) {
+            itemErrorMsg.put(1, "java.lang.RuntimeException" + System.lineSeparator());
+        }
+        return itemErrorMsg;
+    }
+
     private String getErrorMessage(final ShardingContexts shardingContexts) {
         return 1 == shardingContexts.getShardingItemParameters().size()
                 ? "{0=java.lang.RuntimeException" + System.lineSeparator() + "}"
@@ -190,7 +219,7 @@ public final class ElasticJobExecutorTest {
         verify(jobFacade).misfireIfRunning(shardingContexts.getShardingItemParameters().keySet());
         verify(jobFacade, times(2)).registerJobBegin(shardingContexts);
         verify(jobItemExecutor, times(4)).process(eq(fooJob), eq(jobConfig), eq(jobFacade), any());
-        verify(jobFacade, times(2)).registerJobCompleted(shardingContexts);
+        verify(jobFacade, times(2)).registerJobCompleted(shardingContexts, new ConcurrentHashMap<>(jobConfig.getShardingTotalCount(), 1));
     }
     
     @Test(expected = JobSystemException.class)
@@ -245,7 +274,7 @@ public final class ElasticJobExecutorTest {
         verify(jobFacade).postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, "Job 'test_job' execute begin.");
         verify(jobFacade).beforeJobExecuted(shardingContexts);
         verify(jobFacade).registerJobBegin(shardingContexts);
-        verify(jobFacade).registerJobCompleted(shardingContexts);
+        verify(jobFacade).registerJobCompleted(shardingContexts, new ConcurrentHashMap<>(jobConfig.getShardingTotalCount(), 1));
         verify(jobFacade).afterJobExecuted(shardingContexts);
     }
 }
diff --git a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/job/ref/RefFooSimpleElasticJob.java b/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/exception/DagRuntimeException.java
similarity index 50%
copy from elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/job/ref/RefFooSimpleElasticJob.java
copy to elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/exception/DagRuntimeException.java
index 157d33d..c744e22 100644
--- a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/job/ref/RefFooSimpleElasticJob.java
+++ b/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/exception/DagRuntimeException.java
@@ -7,7 +7,7 @@
  * the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,33 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.elasticjob.lite.spring.namespace.fixture.job.ref;
-
-import lombok.Getter;
-import lombok.Setter;
-import org.apache.shardingsphere.elasticjob.api.ShardingContext;
-import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
-import org.apache.shardingsphere.elasticjob.lite.spring.namespace.fixture.service.FooService;
+package org.apache.shardingsphere.elasticjob.infra.exception;
 
-public class RefFooSimpleElasticJob implements SimpleJob {
+/**
+ * Dag runtime exception.
+ *
+ **/
+public class DagRuntimeException extends RuntimeException {
+    private static final long serialVersionUID = 3244908974343209468L;
 
-    @Getter
-    private static volatile boolean completed;
-    
-    @Getter
-    @Setter
-    private FooService fooService;
-    
-    @Override
-    public void execute(final ShardingContext shardingContext) {
-        fooService.foo();
-        completed = true;
+    public DagRuntimeException(final String errorMessage, final Object... args) {
+        super(String.format(errorMessage, args));
     }
-    
-    /**
-     * Set completed to false.
-     */
-    public static void reset() {
-        completed = false;
+
+    public DagRuntimeException(final Throwable cause) {
+        super(cause);
     }
 }
diff --git a/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/pojo/JobConfigurationPOJO.java b/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/pojo/JobConfigurationPOJO.java
index a9ee4c7..b6982a8 100644
--- a/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/pojo/JobConfigurationPOJO.java
+++ b/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/pojo/JobConfigurationPOJO.java
@@ -19,7 +19,9 @@ package org.apache.shardingsphere.elasticjob.infra.pojo;
 
 import lombok.Getter;
 import lombok.Setter;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
+import org.apache.shardingsphere.elasticjob.api.JobDagConfiguration;
 
 import java.util.Properties;
 
@@ -63,6 +65,18 @@ public final class JobConfigurationPOJO {
     private boolean disabled;
     
     private boolean overwrite;
+
+    private String dagName;
+
+    private String dagDependencies;
+
+    private int retryTimes;
+
+    private int retryInterval;
+
+    private boolean dagRunAlone;
+
+    private boolean dagSkipWhenFail;
     
     /**
      * Convert to job configuration.
@@ -70,12 +84,17 @@ public final class JobConfigurationPOJO {
      * @return job configuration
      */
     public JobConfiguration toJobConfiguration() {
+        JobDagConfiguration jobDagConfiguration = null;
+        if (StringUtils.isNotEmpty(dagName)) {
+            jobDagConfiguration = new JobDagConfiguration(dagName, dagDependencies, retryTimes, retryInterval, dagRunAlone, dagSkipWhenFail);
+        }
+
         JobConfiguration result = JobConfiguration.newBuilder(jobName, shardingTotalCount)
                 .cron(cron).shardingItemParameters(shardingItemParameters).jobParameter(jobParameter)
                 .monitorExecution(monitorExecution).failover(failover).misfire(misfire)
                 .maxTimeDiffSeconds(maxTimeDiffSeconds).reconcileIntervalMinutes(reconcileIntervalMinutes)
                 .jobShardingStrategyType(jobShardingStrategyType).jobExecutorServiceHandlerType(jobExecutorServiceHandlerType).jobErrorHandlerType(jobErrorHandlerType)
-                .description(description).disabled(disabled).overwrite(overwrite).build();
+                .description(description).disabled(disabled).overwrite(overwrite).jobDagConfiguration(jobDagConfiguration).build();
         for (Object each : props.keySet()) {
             result.getProps().setProperty(each.toString(), props.get(each.toString()).toString());
         }
@@ -107,6 +126,14 @@ public final class JobConfigurationPOJO {
         result.setProps(jobConfiguration.getProps());
         result.setDisabled(jobConfiguration.isDisabled());
         result.setOverwrite(jobConfiguration.isOverwrite());
+        if (jobConfiguration.getJobDagConfiguration() != null) {
+            result.setDagName(jobConfiguration.getJobDagConfiguration().getDagName());
+            result.setDagDependencies(jobConfiguration.getJobDagConfiguration().getDagDependencies());
+            result.setRetryTimes(jobConfiguration.getJobDagConfiguration().getRetryTimes());
+            result.setRetryInterval(jobConfiguration.getJobDagConfiguration().getRetryInterval());
+            result.setDagRunAlone(jobConfiguration.getJobDagConfiguration().isDagRunAlone());
+            result.setDagSkipWhenFail(jobConfiguration.getJobDagConfiguration().isDagSkipWhenFail());
+        }
         return result;
     }
 }
diff --git a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/job/ref/RefFooSimpleElasticJob.java b/elasticjob-infra/elasticjob-infra-common/src/test/java/org/apache/shardingsphere/elasticjob/infra/exception/DagRuntimeExceptionTest.java
similarity index 51%
copy from elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/job/ref/RefFooSimpleElasticJob.java
copy to elasticjob-infra/elasticjob-infra-common/src/test/java/org/apache/shardingsphere/elasticjob/infra/exception/DagRuntimeExceptionTest.java
index 157d33d..861536d 100644
--- a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/job/ref/RefFooSimpleElasticJob.java
+++ b/elasticjob-infra/elasticjob-infra-common/src/test/java/org/apache/shardingsphere/elasticjob/infra/exception/DagRuntimeExceptionTest.java
@@ -7,7 +7,7 @@
  * the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,33 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.elasticjob.lite.spring.namespace.fixture.job.ref;
+package org.apache.shardingsphere.elasticjob.infra.exception;
+
+import org.junit.Test;
 
-import lombok.Getter;
-import lombok.Setter;
-import org.apache.shardingsphere.elasticjob.api.ShardingContext;
-import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
-import org.apache.shardingsphere.elasticjob.lite.spring.namespace.fixture.service.FooService;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
 
-public class RefFooSimpleElasticJob implements SimpleJob {
+public class DagRuntimeExceptionTest {
 
-    @Getter
-    private static volatile boolean completed;
-    
-    @Getter
-    @Setter
-    private FooService fooService;
-    
-    @Override
-    public void execute(final ShardingContext shardingContext) {
-        fooService.foo();
-        completed = true;
+    @Test
+    public void assertGetMessage() {
+        assertThat(new DagRuntimeException("message is: '%s'", "test").getMessage(), is("message is: 'test'"));
     }
-    
-    /**
-     * Set completed to false.
-     */
-    public static void reset() {
-        completed = false;
+
+    @Test
+    public void assertGetCause() {
+        assertThat(new DagRuntimeException(new RuntimeException()).getCause(), instanceOf(RuntimeException.class));
     }
 }
diff --git a/elasticjob-infra/elasticjob-infra-common/src/test/java/org/apache/shardingsphere/elasticjob/infra/pojo/JobConfigurationPOJOTest.java b/elasticjob-infra/elasticjob-infra-common/src/test/java/org/apache/shardingsphere/elasticjob/infra/pojo/JobConfigurationPOJOTest.java
index 0ed703c..b5d54c9 100644
--- a/elasticjob-infra/elasticjob-infra-common/src/test/java/org/apache/shardingsphere/elasticjob/infra/pojo/JobConfigurationPOJOTest.java
+++ b/elasticjob-infra/elasticjob-infra-common/src/test/java/org/apache/shardingsphere/elasticjob/infra/pojo/JobConfigurationPOJOTest.java
@@ -30,6 +30,8 @@ import static org.junit.Assert.assertTrue;
 public final class JobConfigurationPOJOTest {
     
     private static final String YAML = "cron: 0/1 * * * * ?\n"
+            + "dagRunAlone: false\n"
+            + "dagSkipWhenFail: false\n"
             + "description: Job description\n"
             + "disabled: false\n"
             + "failover: false\n"
@@ -45,10 +47,39 @@ public final class JobConfigurationPOJOTest {
             + "props:\n"
             + "  key: value\n"
             + "reconcileIntervalMinutes: 0\n"
+            + "retryInterval: 0\n"
+            + "retryTimes: 0\n"
             + "shardingItemParameters: 0=A,1=B,2=C\n"
             + "shardingTotalCount: 3\n";
-    
+
+    private static final String DAG_YAML = "cron: 0/1 * * * * ?\n"
+            + "dagDependencies: jobA,jobB\n"
+            + "dagName: fooDag\n"
+            + "dagRunAlone: false\n"
+            + "dagSkipWhenFail: true\n"
+            + "description: Job description\n"
+            + "disabled: false\n"
+            + "failover: false\n"
+            + "jobErrorHandlerType: IGNORE\n"
+            + "jobExecutorServiceHandlerType: CPU\n"
+            + "jobName: test_job\n"
+            + "jobParameter: param\n"
+            + "jobShardingStrategyType: AVG_ALLOCATION\n"
+            + "maxTimeDiffSeconds: -1\n"
+            + "misfire: false\n"
+            + "monitorExecution: false\n"
+            + "overwrite: false\n"
+            + "props:\n"
+            + "  key: value\n"
+            + "reconcileIntervalMinutes: 0\n"
+            + "retryInterval: 500\n"
+            + "retryTimes: 3\n"
+            + "shardingItemParameters: 0=A,1=B,2=C\n"
+            + "shardingTotalCount: 3\n";
+
     private static final String YAML_WITH_NULL = "cron: 0/1 * * * * ?\n"
+            + "dagRunAlone: false\n"
+            + "dagSkipWhenFail: false\n"
             + "disabled: false\n"
             + "failover: false\n"
             + "jobName: test_job\n"
@@ -57,6 +88,8 @@ public final class JobConfigurationPOJOTest {
             + "monitorExecution: false\n"
             + "overwrite: false\n"
             + "reconcileIntervalMinutes: 0\n"
+            + "retryInterval: 0\n"
+            + "retryTimes: 0\n"
             + "shardingTotalCount: 3\n";
     
     @Test
@@ -186,4 +219,38 @@ public final class JobConfigurationPOJOTest {
         assertFalse(actual.isDisabled());
         assertFalse(actual.isOverwrite());
     }
+
+    @Test
+    public void assertMarshalWithDag() {
+        JobConfigurationPOJO actual = new JobConfigurationPOJO();
+        actual.setJobName("test_job");
+        actual.setCron("0/1 * * * * ?");
+        actual.setShardingTotalCount(3);
+        actual.setShardingItemParameters("0=A,1=B,2=C");
+        actual.setJobParameter("param");
+        actual.setMaxTimeDiffSeconds(-1);
+        actual.setJobShardingStrategyType("AVG_ALLOCATION");
+        actual.setJobExecutorServiceHandlerType("CPU");
+        actual.setJobErrorHandlerType("IGNORE");
+        actual.setDescription("Job description");
+        actual.getProps().setProperty("key", "value");
+        actual.setDagSkipWhenFail(true);
+        actual.setDagRunAlone(false);
+        actual.setDagName("fooDag");
+        actual.setDagDependencies("jobA,jobB");
+        actual.setRetryInterval(500);
+        actual.setRetryTimes(3);
+        assertThat(YamlEngine.marshal(actual), is(DAG_YAML));
+    }
+
+    @Test
+    public void assertUnMarshalWithDag() {
+        JobConfigurationPOJO actual = YamlEngine.unmarshal(DAG_YAML, JobConfigurationPOJO.class);
+        assertThat(actual.getDagName(), is("fooDag"));
+        assertThat(actual.getDagDependencies(), is("jobA,jobB"));
+        assertThat(actual.getRetryInterval(), is(500));
+        assertThat(actual.getRetryTimes(), is(3));
+        assertTrue(actual.isDagSkipWhenFail());
+        assertFalse(actual.isDagRunAlone());
+    }
 }
diff --git a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-api/src/main/java/org/apache/shardingsphere/elasticjob/tracing/event/DagJobExecutionEvent.java b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-api/src/main/java/org/apache/shardingsphere/elasticjob/tracing/event/DagJobExecutionEvent.java
new file mode 100644
index 0000000..d974502
--- /dev/null
+++ b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-api/src/main/java/org/apache/shardingsphere/elasticjob/tracing/event/DagJobExecutionEvent.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.tracing.event;
+
+import lombok.Getter;
+import lombok.ToString;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.time.DateFormatUtils;
+
+import java.util.Date;
+import java.util.UUID;
+
+/**
+ * Dag job execution event.
+ *
+ **/
+@Getter
+@ToString
+public class DagJobExecutionEvent implements JobEvent {
+    private String id;
+
+    private String dagName;
+
+    private String jobName;
+
+    private String execTime;
+
+    private String execDate;
+
+    private String batchNo;
+
+    private String state;
+
+    private String message;
+
+    public DagJobExecutionEvent(final String dagName, final String jobName, final String batchNo, final String state,
+                                final String message) {
+        this(dagName, jobName, DateFormatUtils.format(new Date(), "HHmmss"),
+                DateFormatUtils.format(new Date(), "yyyyMMdd"),
+                batchNo, state, message);
+    }
+
+    public DagJobExecutionEvent(final String dagName, final String jobName, final String execTime, final String execDate,
+                                final String batchNo, final String state, final String message) {
+        this.id = UUID.randomUUID().toString();
+        this.dagName = dagName;
+        this.jobName = jobName;
+        this.execTime = execTime;
+        this.execDate = execDate;
+        this.batchNo = batchNo;
+        this.state = state;
+        this.message = truncateMessage(message);
+    }
+
+    public DagJobExecutionEvent(final String id, final String dagName, final String jobName, final String execTime,
+                                final String execDate, final String batchNo, final String state, final String message) {
+        this.id = id;
+        this.dagName = dagName;
+        this.jobName = jobName;
+        this.execTime = execTime;
+        this.execDate = execDate;
+        this.batchNo = batchNo;
+        this.state = state;
+        this.message = truncateMessage(message);
+    }
+
+    private static String truncateMessage(final String str) {
+        return StringUtils.isNotEmpty(str) && str.length() > 255 ? StringUtils.substring(str, 0, 255) : str;
+    }
+
+    @Override
+    public String getJobName() {
+        return jobName;
+    }
+}
diff --git a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-api/src/main/java/org/apache/shardingsphere/elasticjob/tracing/listener/TracingListener.java b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-api/src/main/java/org/apache/shardingsphere/elasticjob/tracing/listener/TracingListener.java
index 414ddeb..6391c72 100644
--- a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-api/src/main/java/org/apache/shardingsphere/elasticjob/tracing/listener/TracingListener.java
+++ b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-api/src/main/java/org/apache/shardingsphere/elasticjob/tracing/listener/TracingListener.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.elasticjob.tracing.listener;
 
 import com.google.common.eventbus.AllowConcurrentEvents;
 import com.google.common.eventbus.Subscribe;
+import org.apache.shardingsphere.elasticjob.tracing.event.DagJobExecutionEvent;
 import org.apache.shardingsphere.elasticjob.tracing.event.JobExecutionEvent;
 import org.apache.shardingsphere.elasticjob.tracing.event.JobStatusTraceEvent;
 
@@ -44,4 +45,13 @@ public interface TracingListener {
     @Subscribe
     @AllowConcurrentEvents
     void listen(JobStatusTraceEvent jobStatusTraceEvent);
+
+    /**
+     * Listen dag job trace event.
+     *
+     * @param dagJobExecutionEvent dag job status trace event
+     */
+    @Subscribe
+    @AllowConcurrentEvents
+    void listen(DagJobExecutionEvent dagJobExecutionEvent);
 }
diff --git a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-api/src/test/java/org/apache/shardingsphere/elasticjob/tracing/fixture/TestTracingListener.java b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-api/src/test/java/org/apache/shardingsphere/elasticjob/tracing/fixture/TestTracingListener.java
index 48f09f0..e7aaf62 100644
--- a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-api/src/test/java/org/apache/shardingsphere/elasticjob/tracing/fixture/TestTracingListener.java
+++ b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-api/src/test/java/org/apache/shardingsphere/elasticjob/tracing/fixture/TestTracingListener.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.elasticjob.tracing.fixture;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.elasticjob.tracing.event.DagJobExecutionEvent;
 import org.apache.shardingsphere.elasticjob.tracing.listener.TracingListener;
 import org.apache.shardingsphere.elasticjob.tracing.event.JobExecutionEvent;
 import org.apache.shardingsphere.elasticjob.tracing.event.JobStatusTraceEvent;
@@ -41,7 +42,13 @@ public final class TestTracingListener implements TracingListener {
     public void listen(final JobStatusTraceEvent jobStatusTraceEvent) {
         jobEventCaller.call();
     }
-    
+
+    @Override
+    public void listen(final DagJobExecutionEvent dagJobExecutionEvent) {
+        jobEventCaller.call();
+        executionEventCalled = true;
+    }
+
     /**
      * Set executionEventCalled to false.
      */
diff --git a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/java/org/apache/shardingsphere/elasticjob/tracing/rdb/listener/RDBTracingListener.java b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/java/org/apache/shardingsphere/elasticjob/tracing/rdb/listener/RDBTracingListener.java
index 5369302..e19b849 100644
--- a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/java/org/apache/shardingsphere/elasticjob/tracing/rdb/listener/RDBTracingListener.java
+++ b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/java/org/apache/shardingsphere/elasticjob/tracing/rdb/listener/RDBTracingListener.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.elasticjob.tracing.rdb.listener;
 
+import org.apache.shardingsphere.elasticjob.tracing.event.DagJobExecutionEvent;
 import org.apache.shardingsphere.elasticjob.tracing.event.JobExecutionEvent;
 import org.apache.shardingsphere.elasticjob.tracing.event.JobStatusTraceEvent;
 import org.apache.shardingsphere.elasticjob.tracing.listener.TracingListener;
@@ -45,4 +46,9 @@ public final class RDBTracingListener implements TracingListener {
     public void listen(final JobStatusTraceEvent jobStatusTraceEvent) {
         repository.addJobStatusTraceEvent(jobStatusTraceEvent);
     }
+
+    @Override
+    public void listen(final DagJobExecutionEvent dagJobExecutionEvent) {
+        repository.addDagJobExecutionEvent(dagJobExecutionEvent);
+    }
 }
diff --git a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/java/org/apache/shardingsphere/elasticjob/tracing/rdb/storage/RDBJobEventStorage.java b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/java/org/apache/shardingsphere/elasticjob/tracing/rdb/storage/RDBJobEventStorage.java
index b2b0263..a984261 100644
--- a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/java/org/apache/shardingsphere/elasticjob/tracing/rdb/storage/RDBJobEventStorage.java
+++ b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/java/org/apache/shardingsphere/elasticjob/tracing/rdb/storage/RDBJobEventStorage.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.elasticjob.tracing.rdb.storage;
 
 import com.google.common.base.Strings;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.elasticjob.tracing.event.DagJobExecutionEvent;
 import org.apache.shardingsphere.elasticjob.tracing.event.JobExecutionEvent;
 import org.apache.shardingsphere.elasticjob.tracing.event.JobStatusTraceEvent;
 import org.apache.shardingsphere.elasticjob.tracing.event.JobStatusTraceEvent.Source;
@@ -53,6 +54,8 @@ public final class RDBJobEventStorage {
     private static final String TABLE_JOB_STATUS_TRACE_LOG = "JOB_STATUS_TRACE_LOG";
     
     private static final String TASK_ID_STATE_INDEX = "TASK_ID_STATE_INDEX";
+
+    private static final String TABLE_DAG_JOB_EXECUTION_LOG = "DAG_JOB_EXECUTION_LOG";
     
     private static final Map<String, DatabaseType> DATABASE_TYPES = new HashMap<>();
     
@@ -91,6 +94,7 @@ public final class RDBJobEventStorage {
         try (Connection connection = dataSource.getConnection()) {
             createJobExecutionTableAndIndexIfNeeded(connection);
             createJobStatusTraceTableAndIndexIfNeeded(connection);
+            createDagJobExecutionTableAndIndexIfNeeded(connection);
         }
     }
     
@@ -112,7 +116,16 @@ public final class RDBJobEventStorage {
         }
         createTaskIdIndexIfNeeded(connection);
     }
-    
+
+    private void createDagJobExecutionTableAndIndexIfNeeded(final Connection connection) throws SQLException {
+        DatabaseMetaData dbMetaData = connection.getMetaData();
+        try (ResultSet resultSet = dbMetaData.getTables(connection.getCatalog(), null, TABLE_DAG_JOB_EXECUTION_LOG, new String[]{"TABLE"})) {
+            if (!resultSet.next()) {
+                createDagJobExecutionTable(connection);
+            }
+        }
+    }
+
     private void createTaskIdIndexIfNeeded(final Connection connection) throws SQLException {
         DatabaseMetaData dbMetaData = connection.getMetaData();
         try (ResultSet resultSet = dbMetaData.getIndexInfo(connection.getCatalog(), null, TABLE_JOB_STATUS_TRACE_LOG, false, false)) {
@@ -133,7 +146,13 @@ public final class RDBJobEventStorage {
             preparedStatement.execute();
         }
     }
-    
+
+    private void createDagJobExecutionTable(final Connection connection) throws SQLException {
+        try (PreparedStatement preparedStatement = connection.prepareStatement(sqlMapper.getCreateTableForDagJobExecutionLog())) {
+            preparedStatement.execute();
+        }
+    }
+
     private void createJobStatusTraceTable(final Connection connection) throws SQLException {
         try (PreparedStatement preparedStatement = connection.prepareStatement(sqlMapper.getCreateTableForJobStatusTraceLog())) {
             preparedStatement.execute();
@@ -363,4 +382,34 @@ public final class RDBJobEventStorage {
         }
         return result;
     }
+
+    /**
+     * Add dag execution event to db.
+     *
+     * @param dagJobExecutionEvent dag execution event
+     * @return add success?
+     */
+    public boolean addDagJobExecutionEvent(final DagJobExecutionEvent dagJobExecutionEvent) {
+        boolean result = false;
+        try (
+                Connection connection = dataSource.getConnection();
+                PreparedStatement preparedStatement = connection.prepareStatement(sqlMapper.getInsertForDagJobExecutionLog())) {
+            preparedStatement.setString(1, dagJobExecutionEvent.getId());
+            preparedStatement.setString(2, dagJobExecutionEvent.getDagName());
+            preparedStatement.setString(3, dagJobExecutionEvent.getJobName());
+            preparedStatement.setString(4, dagJobExecutionEvent.getExecTime());
+            preparedStatement.setString(5, dagJobExecutionEvent.getExecDate());
+            preparedStatement.setString(6, dagJobExecutionEvent.getBatchNo());
+            preparedStatement.setString(7, dagJobExecutionEvent.getState());
+            preparedStatement.setString(8, dagJobExecutionEvent.getMessage());
+            preparedStatement.execute();
+            result = true;
+        } catch (final SQLException ex) {
+            if (!isDuplicateRecord(ex)) {
+                // TODO log failure directly to output log, consider to be configurable in the future
+                log.error(ex.getMessage());
+            }
+        }
+        return result;
+    }
 }
diff --git a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/java/org/apache/shardingsphere/elasticjob/tracing/rdb/storage/RDBStorageSQLMapper.java b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/java/org/apache/shardingsphere/elasticjob/tracing/rdb/storage/RDBStorageSQLMapper.java
index a92d92f..08de9a5 100644
--- a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/java/org/apache/shardingsphere/elasticjob/tracing/rdb/storage/RDBStorageSQLMapper.java
+++ b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/java/org/apache/shardingsphere/elasticjob/tracing/rdb/storage/RDBStorageSQLMapper.java
@@ -50,6 +50,10 @@ public final class RDBStorageSQLMapper {
     private final String selectForJobStatusTraceLog;
     
     private final String selectOriginalTaskIdForJobStatusTraceLog;
+
+    private final String createTableForDagJobExecutionLog;
+
+    private final String insertForDagJobExecutionLog;
     
     public RDBStorageSQLMapper(final String sqlPropertiesFileName) {
         Properties props = loadProps(sqlPropertiesFileName);
@@ -64,6 +68,8 @@ public final class RDBStorageSQLMapper {
         insertForJobStatusTraceLog = props.getProperty("JOB_STATUS_TRACE_LOG.INSERT");
         selectForJobStatusTraceLog = props.getProperty("JOB_STATUS_TRACE_LOG.SELECT");
         selectOriginalTaskIdForJobStatusTraceLog = props.getProperty("JOB_STATUS_TRACE_LOG.SELECT_ORIGINAL_TASK_ID");
+        createTableForDagJobExecutionLog = props.getProperty("DAG_JOB_EXECUTION_LOG.TABLE.CREATE");
+        insertForDagJobExecutionLog = props.getProperty("DAG_JOB_EXECUTION_LOG.INSERT");
     }
     
     @SneakyThrows
diff --git a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/DB2.properties b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/DB2.properties
index 61dcf4f..0eef217 100644
--- a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/DB2.properties
+++ b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/DB2.properties
@@ -29,3 +29,6 @@ TASK_ID_STATE_INDEX.INDEX.CREATE=CREATE INDEX TASK_ID_STATE_INDEX ON JOB_STATUS_
 JOB_STATUS_TRACE_LOG.INSERT=INSERT INTO JOB_STATUS_TRACE_LOG (id, job_name, original_task_id, task_id, slave_id, source, execution_type, sharding_item, state, message, creation_time) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
 JOB_STATUS_TRACE_LOG.SELECT=SELECT * FROM JOB_STATUS_TRACE_LOG WHERE task_id = ?
 JOB_STATUS_TRACE_LOG.SELECT_ORIGINAL_TASK_ID=SELECT * FROM (SELECT ROWNUMBER() OVER() AS ROW, A.* FROM JOB_STATUS_TRACE_LOG A WHERE A.TASK_ID = '4' AND A.STATE= 'TASK_STAGING') AS B WHERE B.ROW = 1
+
+DAG_JOB_EXECUTION_LOG.TABLE.CREATE=CREATE TABLE DAG_JOB_EXECUTION_LOG (id VARCHAR(40) not null ,dag_name varchar(40) not null ,job_name varchar(100) not null, exec_time varchar(20) not null,exec_date varchar(20) not null,batch_no varchar(40), state varchar(8) not null,message varchar(255) ,create_time datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,update_time datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (id))
+DAG_JOB_EXECUTION_LOG.INSERT=INSERT INTO DAG_JOB_EXECUTION_LOG (id,group_name, job_name, exec_time, exec_date,batch_no, state, message) VALUES (?,?, ?, ?, ?, ?, ?, ?)
diff --git a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/H2.properties b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/H2.properties
index c4a6e44..08f29bc 100644
--- a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/H2.properties
+++ b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/H2.properties
@@ -29,3 +29,6 @@ TASK_ID_STATE_INDEX.INDEX.CREATE=CREATE INDEX IF NOT EXISTS TASK_ID_STATE_INDEX
 JOB_STATUS_TRACE_LOG.INSERT=INSERT INTO JOB_STATUS_TRACE_LOG (id, job_name, original_task_id, task_id, slave_id, source, execution_type, sharding_item, state, message, creation_time) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
 JOB_STATUS_TRACE_LOG.SELECT=SELECT * FROM JOB_STATUS_TRACE_LOG WHERE task_id = ?
 JOB_STATUS_TRACE_LOG.SELECT_ORIGINAL_TASK_ID=SELECT original_task_id FROM JOB_STATUS_TRACE_LOG WHERE task_id = ? and state= 'TASK_STAGING' LIMIT 1
+
+DAG_JOB_EXECUTION_LOG.TABLE.CREATE=CREATE TABLE DAG_JOB_EXECUTION_LOG (id VARCHAR(40) not null ,dag_name varchar(40) not null ,job_name varchar(100) not null, exec_time varchar(20) not null,exec_date varchar(20) not null,batch_no varchar(40), state varchar(8) not null,message varchar(255) ,create_time datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,update_time datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (id))
+DAG_JOB_EXECUTION_LOG.INSERT=INSERT INTO DAG_JOB_EXECUTION_LOG (id,dag_name, job_name, exec_time, exec_date,batch_no, state, message) VALUES (?,?, ?, ?, ?, ?, ?, ?)
diff --git a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/MySQL.properties b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/MySQL.properties
index 8be48c5..fe60af3 100644
--- a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/MySQL.properties
+++ b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/MySQL.properties
@@ -29,3 +29,6 @@ TASK_ID_STATE_INDEX.INDEX.CREATE=CREATE INDEX TASK_ID_STATE_INDEX ON JOB_STATUS_
 JOB_STATUS_TRACE_LOG.INSERT=INSERT INTO JOB_STATUS_TRACE_LOG (id, job_name, original_task_id, task_id, slave_id, source, execution_type, sharding_item, state, message, creation_time) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
 JOB_STATUS_TRACE_LOG.SELECT=SELECT * FROM JOB_STATUS_TRACE_LOG WHERE task_id = ?
 JOB_STATUS_TRACE_LOG.SELECT_ORIGINAL_TASK_ID=SELECT original_task_id FROM JOB_STATUS_TRACE_LOG WHERE task_id = ? and state= 'TASK_STAGING' LIMIT 1
+
+DAG_JOB_EXECUTION_LOG.TABLE.CREATE=CREATE TABLE DAG_JOB_EXECUTION_LOG (id VARCHAR(40) not null ,dag_name varchar(40) not null ,job_name varchar(100) not null, exec_time varchar(20) not null,exec_date varchar(20) not null,batch_no varchar(40), state varchar(8) not null,message varchar(255) ,create_time datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,update_time datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (id))
+DAG_JOB_EXECUTION_LOG.INSERT=INSERT INTO DAG_JOB_EXECUTION_LOG (id,dag_name, job_name, exec_time, exec_date,batch_no, state, message) VALUES (?,?, ?, ?, ?, ?, ?, ?)
diff --git a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/Oracle.properties b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/Oracle.properties
index 5952f09..75ba48f 100644
--- a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/Oracle.properties
+++ b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/Oracle.properties
@@ -29,3 +29,6 @@ TASK_ID_STATE_INDEX.INDEX.CREATE=CREATE INDEX TASK_ID_STATE_INDEX ON JOB_STATUS_
 JOB_STATUS_TRACE_LOG.INSERT=INSERT INTO JOB_STATUS_TRACE_LOG (id, job_name, original_task_id, task_id, slave_id, source, execution_type, sharding_item, state, message, creation_time) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
 JOB_STATUS_TRACE_LOG.SELECT=SELECT * FROM JOB_STATUS_TRACE_LOG WHERE task_id = ?
 JOB_STATUS_TRACE_LOG.SELECT_ORIGINAL_TASK_ID=SELECT original_task_id FROM JOB_STATUS_TRACE_LOG WHERE task_id = ? and state= 'TASK_STAGING' and ROWNUM = 1
+
+DAG_JOB_EXECUTION_LOG.TABLE.CREATE=CREATE TABLE DAG_JOB_EXECUTION_LOG (id VARCHAR(40) not null ,dag_name varchar(40) not null ,job_name varchar(100) not null, exec_time varchar(20) not null,exec_date varchar(20) not null,batch_no varchar(40), state varchar(8) not null,message varchar(255) ,create_time datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,update_time datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (id))
+DAG_JOB_EXECUTION_LOG.INSERT=INSERT INTO DAG_JOB_EXECUTION_LOG (id,dag_name, job_name, exec_time, exec_date,batch_no, state, message) VALUES (?,?, ?, ?, ?, ?, ?, ?)
diff --git a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/PostgreSQL.properties b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/PostgreSQL.properties
index 815b6b9..c16e0bb 100644
--- a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/PostgreSQL.properties
+++ b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/PostgreSQL.properties
@@ -29,3 +29,6 @@ TASK_ID_STATE_INDEX.INDEX.CREATE=CREATE INDEX TASK_ID_STATE_INDEX ON JOB_STATUS_
 JOB_STATUS_TRACE_LOG.INSERT=INSERT INTO JOB_STATUS_TRACE_LOG (id, job_name, original_task_id, task_id, slave_id, source, execution_type, sharding_item,  state, message, creation_time) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
 JOB_STATUS_TRACE_LOG.SELECT=SELECT * FROM JOB_STATUS_TRACE_LOG WHERE task_id=?
 JOB_STATUS_TRACE_LOG.SELECT_ORIGINAL_TASK_ID=SELECT original_task_id FROM JOB_STATUS_TRACE_LOG WHERE task_id=? and state='TASK_STAGING' LIMIT 1
+
+DAG_JOB_EXECUTION_LOG.TABLE.CREATE=CREATE TABLE DAG_JOB_EXECUTION_LOG (id VARCHAR(40) not null ,dag_name varchar(40) not null ,job_name varchar(100) not null, exec_time varchar(20) not null,exec_date varchar(20) not null,batch_no varchar(40), state varchar(8) not null,message varchar(255) ,create_time datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,update_time datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (id))
+DAG_JOB_EXECUTION_LOG.INSERT=INSERT INTO DAG_JOB_EXECUTION_LOG (id,dag_name, job_name, exec_time, exec_date,batch_no, state, message) VALUES (?,?, ?, ?, ?, ?, ?, ?)
diff --git a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/SQL92.properties b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/SQL92.properties
index 1d0f5c1..d3fb39d 100644
--- a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/SQL92.properties
+++ b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/SQL92.properties
@@ -29,3 +29,6 @@ TASK_ID_STATE_INDEX.INDEX.CREATE=CREATE INDEX TASK_ID_STATE_INDEX ON JOB_STATUS_
 JOB_STATUS_TRACE_LOG.INSERT=INSERT INTO JOB_STATUS_TRACE_LOG (id, job_name, original_task_id, task_id, slave_id, source, execution_type, sharding_item,  state, message, creation_time) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
 JOB_STATUS_TRACE_LOG.SELECT=SELECT * FROM JOB_STATUS_TRACE_LOG WHERE task_id=?
 JOB_STATUS_TRACE_LOG.SELECT_ORIGINAL_TASK_ID=SELECT original_task_id FROM JOB_STATUS_TRACE_LOG WHERE task_id=? and state='TASK_STAGING' LIMIT 1
+
+DAG_JOB_EXECUTION_LOG.TABLE.CREATE=CREATE TABLE DAG_JOB_EXECUTION_LOG (id VARCHAR(40) not null ,dag_name varchar(40) not null ,job_name varchar(100) not null, exec_time varchar(20) not null,exec_date varchar(20) not null,batch_no varchar(40), state varchar(8) not null,message varchar(255) ,create_time datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,update_time datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (id))
+DAG_JOB_EXECUTION_LOG.INSERT=INSERT INTO DAG_JOB_EXECUTION_LOG (id,dag_name, job_name, exec_time, exec_date,batch_no, state, message) VALUES (?,?, ?, ?, ?, ?, ?, ?)
diff --git a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/SQLServer.properties b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/SQLServer.properties
index 30f3e92..f638ce8 100644
--- a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/SQLServer.properties
+++ b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/SQLServer.properties
@@ -29,3 +29,6 @@ TASK_ID_STATE_INDEX.INDEX.CREATE=CREATE INDEX TASK_ID_STATE_INDEX ON JOB_STATUS_
 JOB_STATUS_TRACE_LOG.INSERT=INSERT INTO JOB_STATUS_TRACE_LOG (id, job_name, original_task_id, task_id, slave_id, source, execution_type, sharding_item, state, message, creation_time) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
 JOB_STATUS_TRACE_LOG.SELECT=SELECT * FROM JOB_STATUS_TRACE_LOG WHERE task_id = ?
 JOB_STATUS_TRACE_LOG.SELECT_ORIGINAL_TASK_ID=SELECT TOP 1 original_task_id FROM JOB_STATUS_TRACE_LOG WHERE task_id = ? and state = 'TASK_STAGING'
+
+DAG_JOB_EXECUTION_LOG.TABLE.CREATE=CREATE TABLE DAG_JOB_EXECUTION_LOG (id VARCHAR(40) not null ,dag_name varchar(40) not null ,job_name varchar(100) not null, exec_time varchar(20) not null,exec_date varchar(20) not null,batch_no varchar(40), state varchar(8) not null,message varchar(255) ,create_time datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,update_time datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (id))
+DAG_JOB_EXECUTION_LOG.INSERT=INSERT INTO DAG_JOB_EXECUTION_LOG (id,dag_name, job_name, exec_time, exec_date,batch_no, state, message) VALUES (?,?, ?, ?, ?, ?, ?, ?)
diff --git a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/test/java/org/apache/shardingsphere/elasticjob/tracing/rdb/listener/RDBTracingListenerTest.java b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/test/java/org/apache/shardingsphere/elasticjob/tracing/rdb/listener/RDBTracingListenerTest.java
index 049677c..6e79a14 100644
--- a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/test/java/org/apache/shardingsphere/elasticjob/tracing/rdb/listener/RDBTracingListenerTest.java
+++ b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/test/java/org/apache/shardingsphere/elasticjob/tracing/rdb/listener/RDBTracingListenerTest.java
@@ -21,6 +21,7 @@ import lombok.SneakyThrows;
 import org.apache.commons.dbcp.BasicDataSource;
 import org.apache.shardingsphere.elasticjob.tracing.JobEventBus;
 import org.apache.shardingsphere.elasticjob.tracing.api.TracingConfiguration;
+import org.apache.shardingsphere.elasticjob.tracing.event.DagJobExecutionEvent;
 import org.apache.shardingsphere.elasticjob.tracing.event.JobExecutionEvent;
 import org.apache.shardingsphere.elasticjob.tracing.event.JobStatusTraceEvent;
 import org.apache.shardingsphere.elasticjob.tracing.event.JobStatusTraceEvent.Source;
@@ -81,4 +82,11 @@ public final class RDBTracingListenerTest {
         jobEventBus.post(jobStatusTraceEvent);
         verify(repository, atMost(1)).addJobStatusTraceEvent(jobStatusTraceEvent);
     }
+
+    @Test
+    public void assertDagJobExecutionEvent() {
+        DagJobExecutionEvent dagJobExecutionEvent = new DagJobExecutionEvent("dagName", JOB_NAME, "batchno", "1", "message is empty.");
+        jobEventBus.post(dagJobExecutionEvent);
+        verify(repository, atMost(1)).addDagJobExecutionEvent(dagJobExecutionEvent);
+    }
 }
diff --git a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/test/java/org/apache/shardingsphere/elasticjob/tracing/rdb/storage/RDBJobEventStorageTest.java b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/test/java/org/apache/shardingsphere/elasticjob/tracing/rdb/storage/RDBJobEventStorageTest.java
index 92a4c04..55ed466 100644
--- a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/test/java/org/apache/shardingsphere/elasticjob/tracing/rdb/storage/RDBJobEventStorageTest.java
+++ b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/test/java/org/apache/shardingsphere/elasticjob/tracing/rdb/storage/RDBJobEventStorageTest.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.elasticjob.tracing.rdb.storage;
 
 import org.apache.commons.dbcp.BasicDataSource;
+import org.apache.shardingsphere.elasticjob.tracing.event.DagJobExecutionEvent;
 import org.apache.shardingsphere.elasticjob.tracing.event.JobExecutionEvent;
 import org.apache.shardingsphere.elasticjob.tracing.event.JobStatusTraceEvent;
 import org.apache.shardingsphere.elasticjob.tracing.event.JobStatusTraceEvent.Source;
@@ -85,7 +86,13 @@ public final class RDBJobEventStorageTest {
             assertThat(jobStatusTraceEvent.getOriginalTaskId(), is("original_fake_failed_failover_task_id"));
         }
     }
-    
+
+    @Test
+    public void assertAddDagJobExecutionEvent() {
+        DagJobExecutionEvent dagJobExecutionEvent = new DagJobExecutionEvent("fake_dag", "fake_job", "batchno", "1", "message is empty.");
+        assertTrue(storage.addDagJobExecutionEvent(dagJobExecutionEvent));
+    }
+
     @Test
     public void assertUpdateJobExecutionEventWhenSuccess() {
         JobExecutionEvent startEvent = new JobExecutionEvent("localhost", "127.0.0.1", "fake_task_id", "test_job", JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER, 0);
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/DagJobStates.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/DagJobStates.java
new file mode 100644
index 0000000..a389a3c
--- /dev/null
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/DagJobStates.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.lite.internal.dag;
+
+import org.apache.commons.lang3.StringUtils;
+
+public enum DagJobStates {
+    NONE("none"),
+    REG("register"),
+    INIT("graph"),
+    READY("ready"),
+    RUNNING("running"),
+    PAUSE("pause"),
+    FAIL("fail"),
+    SUCCESS("success"),
+    SKIP("skip"),
+    RETRY("retry"),
+    ERROR("error");
+
+    private String value;
+    DagJobStates(final String value) {
+        this.value = value;
+    }
+
+    /**
+     * Give string return enums.
+     *
+     * @param value enum string value
+     * @return DagJobStates enum
+     */
+    public static DagJobStates of(final String value) {
+        for (DagJobStates states : DagJobStates.values()) {
+            if (StringUtils.equalsIgnoreCase(value, states.getValue())) {
+                return states;
+            }
+        }
+        return DagJobStates.NONE;
+    }
+
+    /**
+     * Get value.
+     *
+     * @return string value
+     */
+    public String getValue() {
+        return value;
+    }
+
+    /**
+     * Set value.
+     *
+     * @param value enum value
+     */
+    public void setValue(final String value) {
+        this.value = value;
+    }
+
+}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/DagNodeStorage.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/DagNodeStorage.java
new file mode 100644
index 0000000..5df835f
--- /dev/null
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/DagNodeStorage.java
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.lite.internal.dag;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.shardingsphere.elasticjob.lite.internal.state.JobStateEnum;
+import org.apache.shardingsphere.elasticjob.lite.internal.state.JobStateNode;
+import org.apache.shardingsphere.elasticjob.lite.internal.storage.JobNodePath;
+import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Dag storage class.
+ *             DAG  zk path:
+ *             /dag/
+ *                 {dagName}/
+ *                             config/ {jobName} value=dependencies comm split
+ *                             graph/ {jobName} value=dependencies comm split
+ *                             states  value= dag states
+ *                             running/ {jobName}
+ *                             success/ {jobName}
+ *                             fail/ {jobName}
+ *                             skip/ {jobName}
+ *                             retry/ {jobName}
+ */
+@Slf4j
+public class DagNodeStorage {
+
+    private static final String DAG_ROOT = "/dag/%s";
+
+    private static final String DAG_CONFIG = "/dag/%s/config";
+
+    private static final String DAG_CONFIG_JOB = "/dag/%s/config/%s";
+
+    private static final String DAG_STATES = "/dag/%s/states";
+
+    private static final String DAG_GRAPH = "/dag/%s/graph";
+
+    private static final String DAG_GRAPH_JOB = "/dag/%s/graph/%s";
+
+    private static final String DAG_GRAPH_JOB_RETRYTIMES = "/dag/%s/graph/%s/retry";
+
+    private static final String DAG_RUNNING = "/dag/%s/running";
+
+    private static final String DAG_RUNNING_JOB = "/dag/%s/running/%s";
+
+    private static final String DAG_SUCCESS = "/dag/%s/success";
+
+    private static final String DAG_SUCCESS_JOB = "/dag/%s/success/%s";
+
+    private static final String DAG_FAIL = "/dag/%s/fail";
+
+    private static final String DAG_FAIL_JOB = "/dag/%s/fail/%s";
+
+    private static final String DAG_SKIP = "/dag/%s/skip";
+
+    private static final String DAG_SKIP_JOB = "/dag/%s/skip/%s";
+
+    private static final String DAG_RETRY = "/dag/%s/retry";
+
+    private static final String DAG_RETRY_JOB = "/dag/%s/retry/%s";
+
+    private final CoordinatorRegistryCenter regCenter;
+
+    private final String jobName;
+
+    private final String dagName;
+
+    /**
+     * Constructor.
+     *
+     * @param regCenter register center
+     * @param dagName dag name
+     * @param jobName dag job name
+     */
+    public DagNodeStorage(final CoordinatorRegistryCenter regCenter, final String dagName, final String jobName) {
+        this.regCenter = regCenter;
+        this.jobName = jobName;
+        this.dagName = dagName;
+    }
+
+    /**
+     * Persist /dag/dagName/config/jobName ,value=job's dependencies with comm split.
+     *
+     * @param value job dependencies.
+     */
+    public void persistDagConfig(final String value) {
+        regCenter.persist(pathOfDagConfigJob(jobName), value);
+    }
+
+    private String pathOfDagRoot() {
+        return String.format(DAG_ROOT, dagName);
+    }
+
+    private String pathOfDagConfig() {
+        return String.format(DAG_CONFIG, dagName);
+    }
+
+    private String pathOfDagConfigJob(final String jobName) {
+        return String.format(DAG_CONFIG_JOB, dagName, jobName);
+    }
+
+    private String pathOfDagStates() {
+        return String.format(DAG_STATES, dagName);
+    }
+
+    private String pathOfDagGraph() {
+        return String.format(DAG_GRAPH, dagName);
+    }
+
+    private String pathOfDagGraphJob(final String jobName) {
+        return String.format(DAG_GRAPH_JOB, dagName, jobName);
+    }
+
+    private String pathOfDagGraphJobRetryTimes() {
+        return String.format(DAG_GRAPH_JOB_RETRYTIMES, dagName, jobName);
+    }
+
+    private String pathOfDagRunning() {
+        return String.format(DAG_RUNNING, dagName);
+    }
+
+    private String pathOfDagRunningJob(final String jobName) {
+        return String.format(DAG_RUNNING_JOB, dagName, jobName);
+    }
+
+    private String pathOfDagSuccess() {
+        return String.format(DAG_SUCCESS, dagName);
+    }
+
+    private String pathOfDagSuccessJob(final String jobName) {
+        return String.format(DAG_SUCCESS_JOB, dagName, jobName);
+    }
+
+    private String pathOfDagFail() {
+        return String.format(DAG_FAIL, dagName);
+    }
+
+    private String pathOfDagFailJob(final String jobName) {
+        return String.format(DAG_FAIL_JOB, dagName, jobName);
+    }
+
+    private String pathOfDagSkip() {
+        return String.format(DAG_SKIP, dagName);
+    }
+
+    private String pathOfDagSkipJob(final String jobName) {
+        return String.format(DAG_SKIP_JOB, dagName, jobName);
+    }
+
+    private String pathOfDagRetry() {
+        return String.format(DAG_RETRY, dagName);
+    }
+
+    private String pathOfDagRetryJob(final String jobName) {
+        return String.format(DAG_RETRY_JOB, dagName, jobName);
+    }
+
+    /**
+     * job state path.
+     *
+     * @return path of job state.
+     */
+    public String pathOfJobNodeState() {
+        return String.format("/%s/%s", jobName, JobStateNode.ROOT_STATE);
+    }
+
+    /**
+     * Init dag graph before run.
+     *
+     * @param allDagNode dag job name and dependencies.
+     * @param batchNo batch no
+     */
+    public void initDagGraph(final Map<String, Set<String>> allDagNode, final String batchNo) {
+        log.debug("Dag-{}, before create Dag Graph, clean exist path.", dagName);
+        // clean
+        if (regCenter.isExisted(pathOfDagGraph())) {
+            regCenter.remove(pathOfDagGraph());
+        }
+        if (regCenter.isExisted(pathOfDagStates())) {
+            regCenter.remove(pathOfDagStates());
+        }
+        if (regCenter.isExisted(pathOfDagRunning())) {
+            regCenter.remove(pathOfDagRunning());
+        }
+        if (regCenter.isExisted(pathOfDagSuccess())) {
+            regCenter.remove(pathOfDagSuccess());
+        }
+        if (regCenter.isExisted(pathOfDagFail())) {
+            regCenter.remove(pathOfDagFail());
+        }
+        if (regCenter.isExisted(pathOfDagSkip())) {
+            regCenter.remove(pathOfDagSkip());
+        }
+        if (regCenter.isExisted(pathOfDagRetry())) {
+            regCenter.remove(pathOfDagRetry());
+        }
+
+        log.debug("Dag-{}, Create Dag Graph, create path.", dagName);
+        // create path
+        regCenter.persist(pathOfDagGraph(), batchNo);
+        regCenter.persist(pathOfDagStates(), "");
+        regCenter.persist(pathOfDagRunning(), "");
+        regCenter.persist(pathOfDagSuccess(), "");
+        regCenter.persist(pathOfDagFail(), "");
+        regCenter.persist(pathOfDagSkip(), "");
+        regCenter.persist(pathOfDagRetry(), "");
+
+        log.debug("Dag-{}, Create Dag Graph, create graph.", dagName);
+        // init graph
+        for (Map.Entry<String, Set<String>> entry : allDagNode.entrySet()) {
+            regCenter.persist(pathOfDagGraphJob(entry.getKey()), Joiner.on(",").join(entry.getValue()));
+        }
+
+        log.info("Dag-{}, Create Dag Graph success.", dagName);
+    }
+
+    /**
+     * Get value of /dag/dagName/graph.
+     *
+     * @return batch no.
+     */
+    public String currentDagBatchNo() {
+        return regCenter.getDirectly(pathOfDagGraph());
+    }
+
+    /**
+     * get value of /dag/dagName/states.
+     *
+     * @return dag state
+     */
+    public String currentDagStates() {
+        if (this.regCenter.isExisted(pathOfDagStates())) {
+            return this.regCenter.getDirectly(pathOfDagStates());
+        }
+        return "";
+    }
+
+    /**
+     * update value of /dag/dagName/states.
+     *
+     * @param dagStates dag state
+     */
+    public void updateDagStates(final DagStates dagStates) {
+        regCenter.update(pathOfDagStates(), dagStates.getValue());
+    }
+
+
+    /**
+     * running:/dag/dagName/running/{}.
+     * success:/dag/dagName/success/{}.
+     * fail:/dag/dagName/fail/{}.
+     *
+     * @param jobState job state
+     */
+    public void updateDagJobStates(final JobStateEnum jobState) {
+        if (jobState == JobStateEnum.RUNNING) {
+            regCenter.persist(pathOfDagRunningJob(jobName), String.valueOf(System.currentTimeMillis()));
+            return;
+        }
+        regCenter.remove(pathOfDagRunningJob(jobName));
+        if (jobState == JobStateEnum.SUCCESS) {
+            regCenter.persist(pathOfDagSuccessJob(jobName), String.valueOf(System.currentTimeMillis()));
+        } else if (jobState == JobStateEnum.FAIL) {
+            regCenter.persist(pathOfDagFailJob(jobName), String.valueOf(System.currentTimeMillis()));
+        } else if (jobState == JobStateEnum.SKIP) {
+            regCenter.persist(pathOfDagSkipJob(jobName), String.valueOf(System.currentTimeMillis()));
+        }
+    }
+
+    /**
+     * Get /dag/dagName/config/all config jobs with their dependencies.
+     *
+     * @return dag config info
+     */
+    public Map<String, Set<String>> getAllDagConfigJobs() {
+        Map<String, Set<String>> map = Maps.newHashMap();
+        List<String> childrenKeys = this.regCenter.getChildrenKeys(pathOfDagConfig());
+        childrenKeys.forEach(s -> map.put(s, Sets.newHashSet(splitJobDeps(regCenter.getDirectly(pathOfDagConfigJob(s))))));
+        return map;
+    }
+
+    /**
+     * Get /dag/dagName/graph/all nodes with their dependencies.
+     *
+     * @return dag graph info
+     */
+    public Map<String, Set<String>> getAllDagGraphJobs() {
+        List<String> jobList = this.regCenter.getChildrenKeys(pathOfDagGraph());
+        Map<String, Set<String>> allDagMap = Maps.newHashMap();
+        jobList.forEach(j ->
+            allDagMap.put(j, Sets.newHashSet(splitJobDeps(regCenter.getDirectly(pathOfDagGraphJob(j))))));
+        return allDagMap;
+    }
+
+    private Set<String> splitJobDeps(final String j) {
+        return Splitter.on(",").trimResults().splitToList(j).stream().collect(Collectors.toSet());
+    }
+
+    /**
+     * Get dag job lists by their state.
+     *
+     * @param dagJobState job state
+     * @return job name lists
+     */
+    public List<String> getDagJobListByState(final DagJobStates dagJobState) {
+        if (dagJobState == DagJobStates.RUNNING) {
+            return regCenter.getChildrenKeys(pathOfDagRunning());
+        }
+        if (dagJobState == DagJobStates.SUCCESS) {
+            return regCenter.getChildrenKeys(pathOfDagSuccess());
+        }
+        if (dagJobState == DagJobStates.FAIL) {
+            return regCenter.getChildrenKeys(pathOfDagFail());
+        }
+        if (dagJobState == DagJobStates.SKIP) {
+            return regCenter.getChildrenKeys(pathOfDagSkip());
+        }
+        if (dagJobState == DagJobStates.RETRY) {
+            return regCenter.getChildrenKeys(pathOfDagRetry());
+        }
+        return Lists.newArrayList();
+    }
+
+    private CuratorFramework getClient() {
+        return (CuratorFramework) regCenter.getRawClient();
+    }
+
+    /**
+     * Get job denpendencies from reg center.
+     *
+     * @return job denpendencies.
+     */
+    public String[] getJobDenpendencies() {
+        return StringUtils.split(this.regCenter.get(pathOfDagGraphJob(jobName)), ",");
+    }
+
+    /**
+     * Get job state.
+     *
+     * @param depJob job name.
+     * @return job state
+     */
+    public DagJobStates getDagJobRunStates(final String depJob) {
+        if (regCenter.isExisted(pathOfDagSuccessJob(depJob))) {
+            return DagJobStates.SUCCESS;
+        }
+        if (regCenter.isExisted(pathOfDagFailJob(depJob))) {
+            return DagJobStates.FAIL;
+        }
+        if (regCenter.isExisted(pathOfDagSkipJob(depJob))) {
+            return DagJobStates.SKIP;
+        }
+        if (regCenter.isExisted(pathOfDagRunningJob(depJob))) {
+            return DagJobStates.RUNNING;
+        }
+        return DagJobStates.READY;
+    }
+
+    /**
+     * Trigger the job.
+     *
+     * @param job job name
+     */
+    public void triggerJob(final String job) {
+        if (isJobTriggered(job)) {
+            log.debug("Dag-{} Job-{} has already been tiggered!", dagName, job);
+            return;
+        }
+        log.debug("Dag-{} trigger job-[{}] in transaction.", dagName, job);
+        try {
+            CuratorFramework rawClient = (CuratorFramework) regCenter.getRawClient();
+            List<CuratorOp> opList = new ArrayList<>();
+            opList.add(rawClient.transactionOp().create().forPath(pathOfDagRunningJob(job)));
+            JobNodePath jobNodePath = new JobNodePath(job);
+            for (String each : regCenter.getChildrenKeys(jobNodePath.getInstancesNodePath())) {
+                opList.add(rawClient.transactionOp().setData().forPath(jobNodePath.getInstanceNodePath(each), "TRIGGER".getBytes()));
+            }
+            rawClient.transaction().forOperations(opList);
+            //CHECKSTYLE:OFF
+        } catch (Exception exp) {
+            //CHECKSTYLE:ON
+            log.debug("Dag-{}[{}] trigger job in transaction Exception!", dagName, job, exp);
+        }
+
+        log.info("Dag-{}[{}] has been triggered [{}]", dagName, job, isJobTriggered(job));
+        printZkPath();
+    }
+
+    /**
+     * Trigger retry job.
+     */
+    public void triggerRetryJob() {
+        log.debug("Dag-{} trigger RETRY job-[{}] in transaction.", dagName, jobName);
+        try {
+            CuratorFramework rawClient = (CuratorFramework) regCenter.getRawClient();
+            List<CuratorOp> opList = new ArrayList<>();
+            opList.add(rawClient.transactionOp().delete().forPath(pathOfDagRetryJob(jobName)));
+            JobNodePath jobNodePath = new JobNodePath(jobName);
+            for (String each : regCenter.getChildrenKeys(jobNodePath.getInstancesNodePath())) {
+                opList.add(rawClient.transactionOp().setData().forPath(jobNodePath.getInstanceNodePath(each), "TRIGGER".getBytes()));
+            }
+            rawClient.transaction().forOperations(opList);
+            //CHECKSTYLE:OFF
+        } catch (final Exception exp) {
+            //CHECKSTYLE:ON
+            log.debug("Dag-{}[{}] trigger RETRY job in transaction Exception!", dagName, jobName, exp);
+        }
+
+        log.info("Dag-{}[{}] RETRY job has been triggered!", dagName, jobName);
+        printZkPath();
+    }
+
+    private void printZkPath() {
+        if (log.isDebugEnabled()) {
+            log.debug("Dag-{}[{}] after trigger:", dagName, jobName);
+            log.debug("Dag-{}[{}] ZK path of SUCC  : {}", dagName, jobName, getDagJobListByState(DagJobStates.SUCCESS));
+            log.debug("Dag-{}[{}] ZK path of RUN   : {}", dagName, jobName, getDagJobListByState(DagJobStates.RUNNING));
+            log.debug("Dag-{}[{}] ZK path of FAIL  : {}", dagName, jobName, getDagJobListByState(DagJobStates.FAIL));
+            log.debug("Dag-{}[{}] ZK path of SKIP  : {}", dagName, jobName, getDagJobListByState(DagJobStates.SKIP));
+            log.debug("Dag-{}[{}] ZK path of RETRY : {}", dagName, jobName, getDagJobListByState(DagJobStates.RETRY));
+        }
+    }
+
+    private boolean isJobTriggered(final String job) {
+        return regCenter.isExisted(pathOfDagRunningJob(job)) || regCenter.isExisted(pathOfDagSuccessJob(job)) || regCenter.isExisted(pathOfDagFailJob(job));
+    }
+
+    /**
+     * Get current job retry times.
+     *
+     * @return retry times
+     */
+    public int getJobRetryTimes() {
+        String times = regCenter.getDirectly(pathOfDagGraphJobRetryTimes());
+        if (StringUtils.isEmpty(times)) {
+            return 0;
+        }
+        return Integer.valueOf(times);
+    }
+
+
+    /**
+     * Add dag retry job times.
+     * Persist jobName to path '/dag/dagName/retry'
+     *
+     * @param i retry times.
+     */
+    public void updateJobRetryTimes(final int i) {
+        regCenter.persist(pathOfDagRetryJob(jobName), "");
+        regCenter.persist(pathOfDagGraphJobRetryTimes(), "" + i);
+    }
+
+    /**
+     * Remove fail job from register center.
+     *
+     * @param job fail job name.
+     */
+    public void removeFailJob(final String job) {
+        regCenter.remove(pathOfDagFailJob(job));
+    }
+
+    /**
+     * Get dag list.
+     *
+     * @return list fo dag name
+     */
+    public List<String> getAllDags() {
+        return regCenter.getChildrenKeys("/dag");
+    }
+}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/DagService.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/DagService.java
new file mode 100644
index 0000000..037f51f
--- /dev/null
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/DagService.java
@@ -0,0 +1,557 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.lite.internal.dag;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.time.DateFormatUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.framework.recipes.queue.DistributedDelayQueue;
+import org.apache.curator.framework.recipes.queue.QueueBuilder;
+import org.apache.curator.framework.recipes.queue.QueueSerializer;
+import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
+import org.apache.shardingsphere.elasticjob.api.JobDagConfiguration;
+import org.apache.shardingsphere.elasticjob.infra.concurrent.BlockUtils;
+import org.apache.shardingsphere.elasticjob.infra.env.IpUtils;
+import org.apache.shardingsphere.elasticjob.infra.exception.DagRuntimeException;
+import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
+import org.apache.shardingsphere.elasticjob.lite.internal.state.JobStateEnum;
+import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
+import org.apache.shardingsphere.elasticjob.tracing.JobEventBus;
+import org.apache.shardingsphere.elasticjob.tracing.event.DagJobExecutionEvent;
+
+import java.io.UnsupportedEncodingException;
+import java.lang.management.ManagementFactory;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Job dag service.
+ */
+@Slf4j
+public class DagService implements CuratorCacheListener {
+    public static final String ROOT_JOB = "self";
+
+    private static final String DAG_LATCH_PATH = "/daglatch/";
+
+    private static final int DEFAULT_RETRY_INTERVAL = 30;
+
+    private static final String RETRY_PATH = "/dagretry/%s/%s";
+
+    private final DagNodeStorage dagNodeStorage;
+
+    private final JobDagConfiguration jobDagConfig;
+
+    private final String jobName;
+
+    private final String dagName;
+
+    private final InterProcessMutex mutex;
+
+    private final CoordinatorRegistryCenter regCenter;
+
+    private final JobEventBus jobEventBus;
+
+    private CuratorCache jobStateCache;
+
+    private DistributedDelayQueue<String> delayQueue;
+
+    public DagService(final CoordinatorRegistryCenter regCenter, final String jobName, final JobEventBus jobEventBus, final JobDagConfiguration jobDagConfig) {
+        this.jobDagConfig = jobDagConfig;
+        this.regCenter = regCenter;
+        this.jobName = jobName;
+        this.dagNodeStorage = new DagNodeStorage(regCenter, jobDagConfig.getDagName(), jobName);
+        this.dagName = jobDagConfig.getDagName();
+        this.jobEventBus = jobEventBus;
+        if (StringUtils.equals(jobDagConfig.getDagDependencies(), ROOT_JOB)) {
+            this.mutex = new InterProcessMutex((CuratorFramework) regCenter.getRawClient(), DAG_LATCH_PATH + dagName);
+        } else {
+            this.mutex = null;
+        }
+        regDagConfig();
+    }
+
+    public DagService(final CoordinatorRegistryCenter regCenter, final String dagName, final DagNodeStorage dagNodeStorage) {
+        this.regCenter = regCenter;
+        this.dagName = dagName;
+        this.dagNodeStorage = dagNodeStorage;
+        this.jobName = "";
+        this.jobDagConfig = null;
+        this.mutex = null;
+        this.jobStateCache = null;
+        this.delayQueue = null;
+        this.jobEventBus = null;
+    }
+
+    /**
+     * Init delay queue for retry jobs.
+     *
+     * @return DistributedDelayQueue
+     */
+    private DistributedDelayQueue<String> initDelayQueue() {
+        String retryPath = String.format(RETRY_PATH, dagName, jobName);
+        DistributedDelayQueue<String> delayQueue = QueueBuilder.builder((CuratorFramework) regCenter.getRawClient(), new JobRetryTrigger(regCenter, dagName), new QueueSerializer<String>() {
+            @Override
+            public byte[] serialize(final String item) {
+                try {
+                    return item.getBytes("utf-8");
+                } catch (UnsupportedEncodingException e) {
+                    log.error("Dag-{}[{}] Init delay queue exception.", dagName, jobName, e);
+                }
+                return null;
+            }
+
+            @Override
+            public String deserialize(final byte[] bytes) {
+                return new String(bytes);
+            }
+        }, retryPath).buildDelayQueue();
+
+        try {
+            delayQueue.start();
+            log.info("Dag-{}[{}] start delay queue, path={}", dagName, jobName, retryPath);
+            //CHECKSTYLE:OFF
+        } catch (Exception e) {
+            //CHECKSTYLE:ON
+            log.error("Dag-{}[{}] start delay queue Exception, path={}", dagName, jobName, retryPath, e);
+        }
+
+        return delayQueue;
+    }
+
+    private void startJobStateListener() {
+        jobStateCache.listenable().addListener(this);
+        try {
+            jobStateCache.start();
+            postEvent(DagJobStates.REG.getValue(), "Job register success");
+            //CHECKSTYLE:OFF
+        } catch (Exception exp) {
+            //CHECKSTYLE:ON
+            log.error("Start dag-{} job-{} state path listener Exception.", dagName, jobName, exp);
+            // ignore
+            postEvent(DagJobStates.REG.getValue(), "Job register Error:" + exp.getMessage());
+        }
+        log.info("Dag-{} job-{} state path listener has started success.", dagName, jobName);
+    }
+
+    private void stopJobStateListener() {
+        jobStateCache.close();
+    }
+
+    /**
+     * Is dag root job.
+     *
+     * @return boolean is dag root job
+     */
+    public boolean isDagRootJob() {
+        return StringUtils.equals(jobDagConfig.getDagDependencies(), "self");
+    }
+
+    /**
+     * current dag status.
+     *
+     * @return DagStates
+     */
+    public DagStates getDagStates() {
+        return DagStates.of(this.dagNodeStorage.currentDagStates());
+    }
+
+    /**
+     * Persist Dag config into zk.
+     * always overwrite.
+     */
+    private void regDagConfig() {
+        this.dagNodeStorage.persistDagConfig(genDependenciesString());
+        this.delayQueue = initDelayQueue();
+        this.jobStateCache = CuratorCache.build((CuratorFramework) regCenter.getRawClient(), this.dagNodeStorage.pathOfJobNodeState());
+        this.startJobStateListener();
+    }
+
+    private String genDependenciesString() {
+        return jobDagConfig.getDagDependencies();
+    }
+
+    /**
+     * 1. select leader ;
+     * 2. ReGraph DAG ;
+     * 3. Change DAG states to running
+     */
+    public void changeDagStatesAndReGraph() {
+        if (null == mutex) {
+            log.error("Need root job when change dag states and regraph!");
+            throw new DagRuntimeException("Need root job when change dag states and regraph!");
+        }
+
+        if (!acquireDagLeader()) {
+            blockUntilCompleted();
+            return;
+        }
+
+        if (getDagStates() == DagStates.RUNNING) {
+            log.info("Dag-{} states already RUNNING", dagName);
+            return;
+        }
+
+        try {
+            String batchNo = getBatchNo();
+            Map<String, Set<String>> allDagNode = dagNodeStorage.getAllDagConfigJobs();
+            checkCycle(allDagNode);
+            dagNodeStorage.initDagGraph(allDagNode, batchNo);
+            dagNodeStorage.updateDagStates(DagStates.RUNNING);
+            dagNodeStorage.updateDagJobStates(JobStateEnum.RUNNING);
+            // create graph event
+            postEvent(DagJobStates.INIT.getValue(), "Create graph success");
+            //CHECKSTYLE:OFF
+        } catch (Exception ex) {
+            //CHECKSTYLE:ON
+            postEvent(DagJobStates.INIT.getValue(), "Create graph error:" + ex.getMessage());
+        } finally {
+            releaseDagLeader();
+        }
+    }
+
+    private void blockUntilCompleted() {
+        int count = 0;
+        while (getDagStates() != DagStates.RUNNING) {
+            count++;
+            log.debug("DAG '{}' sleep short time until DAG graph completed. {}", dagName, count);
+            BlockUtils.sleep(300L);
+            if (count > 200) {
+                log.error("Dag-{} Wait a long time with Dag graph NOT complete", dagName);
+                throw new DagRuntimeException("Dag graph not complete!");
+            }
+        }
+    }
+
+    private boolean acquireDagLeader() {
+        try {
+            return mutex.acquire(200, TimeUnit.MILLISECONDS);
+            //CHECKSTYLE:OFF
+        } catch (Exception exp) {
+            //CHECKSTYLE:ON
+            log.debug("Dag-{} acquire lock error!", dagName, exp);
+        }
+        return false;
+    }
+
+    private void releaseDagLeader() {
+        try {
+            if (mutex.isAcquiredInThisProcess()) {
+                mutex.release();
+            }
+            //CHECKSTYLE:OFF
+        } catch (Exception exp) {
+            //CHECKSTYLE:ON
+            log.debug("Dag-{} release lock error!", dagName, exp);
+        }
+    }
+
+    /**
+     * Check dag has cycle.
+     *
+     * @param allDagNode dag config info.
+     */
+    private void checkCycle(final Map<String, Set<String>> allDagNode) {
+        Map<String, Set<String>> cloneMap = Maps.newHashMap();
+        allDagNode.forEach((key, value) -> cloneMap.put(key, Sets.newHashSet(value)));
+
+        while (removeSelf(cloneMap)) {
+            if (log.isDebugEnabled()) {
+                log.debug("Dag-{} remove root job.", dagName);
+            }
+        }
+        if (!cloneMap.isEmpty()) {
+            log.error("Dag {} find cycle {}", dagName, cloneMap.keySet().size());
+            printCycleNode(cloneMap);
+            throw new DagRuntimeException("Dag job find cycles");
+        }
+        log.info("Dag {} checkCycle success", dagName);
+    }
+
+    private void printCycleNode(final Map<String, Set<String>> cloneMap) {
+        cloneMap.forEach((k, v) -> {
+            log.error("{} has cycle with {}", k, Joiner.on("|").join(v));
+        });
+    }
+
+    private boolean removeSelf(final Map<String, Set<String>> cloneMap) {
+        Iterator<Map.Entry<String, Set<String>>> iterator = cloneMap.entrySet().iterator();
+        boolean removed = false;
+        while (iterator.hasNext()) {
+            Map.Entry<String, Set<String>> next = iterator.next();
+            Set<String> value = next.getValue();
+            value.remove("self");
+            if (value.isEmpty()) {
+                markKeyAsSelf(cloneMap, next.getKey());
+                iterator.remove();
+                removed = true;
+            }
+        }
+        return removed;
+    }
+
+    private void markKeyAsSelf(final Map<String, Set<String>> cloneMap, final String key) {
+        cloneMap.values().forEach(s -> s.remove(key));
+    }
+
+    private String getBatchNo() {
+        String date = DateFormatUtils.format(new Date(), "yyMMddHHmmss");
+        return dagName + IpUtils.getIp() + ManagementFactory.getRuntimeMXBean().getName().split("@")[0] + date;
+    }
+
+    /**
+     * When dag job start run ,check it's dependencies jobs states.
+     */
+    public void checkJobDependenciesState() {
+        DagJobStates currentJobRunStates = dagNodeStorage.getDagJobRunStates(jobName);
+        if (currentJobRunStates == DagJobStates.SUCCESS || currentJobRunStates == DagJobStates.FAIL) {
+            log.info("DAG- {} job- {} 's states is {},Can not run again!", jobDagConfig.getDagName(), jobName, currentJobRunStates);
+            throw new DagRuntimeException("Dag job has been completed");
+        }
+        if (isDagRootJob()) {
+            log.debug("DAG {} job {} is root,No deps.", jobDagConfig.getDagName(), jobName);
+            return;
+        }
+
+        // 要求dep skip 或 success
+        String[] deps = dagNodeStorage.getJobDenpendencies();
+        for (String dep : deps) {
+            if (StringUtils.equals(dep, "self")) {
+                continue;
+            }
+            DagJobStates jobRunStates = dagNodeStorage.getDagJobRunStates(dep);
+            if (jobRunStates != DagJobStates.SUCCESS && jobRunStates != DagJobStates.SKIP) {
+                log.info("DAG- {} job- {} Dependens job- {} Not ready!", dagName, jobName, dep);
+                throw new DagRuntimeException("Dag dependencies jobs not Ready");
+            }
+        }
+    }
+
+    private boolean retryJob() {
+        // get zk config ,check can retry?
+        JobDagConfiguration jobDagConfig = getJobDagConfig(false);
+        int times = dagNodeStorage.getJobRetryTimes();
+
+        if (jobDagConfig.getRetryTimes() < 1 || jobDagConfig.getRetryTimes() <= times) {
+            log.debug("Dag-{}[{}] config retry times{}, current times {} , skip retry!", dagName, jobName, jobDagConfig.getRetryTimes(), times);
+            if (jobDagConfig.isDagSkipWhenFail()) {
+                log.info("Dag-{}[{}] fail, mark as SKIP!", dagName, jobName);
+                dagNodeStorage.updateDagJobStates(JobStateEnum.SKIP);
+            } else {
+                dagNodeStorage.updateDagJobStates(JobStateEnum.FAIL);
+            }
+            return false;
+        }
+
+        // send to retry queue
+        try {
+            long interval = (jobDagConfig.getRetryInterval() <= 0 ? DEFAULT_RETRY_INTERVAL : jobDagConfig.getRetryInterval()) * 1000L;
+            delayQueue.put(dagName + "||" + jobName, System.currentTimeMillis() + interval);
+            //CHECKSTYLE:OFF
+        } catch (Exception exp) {
+            //CHECKSTYLE:ON
+            log.error("Dag-{}[{}] retry job to Delay queue Exception!", dagName, jobName, exp);
+            return false;
+        }
+
+        dagNodeStorage.updateJobRetryTimes(times + 1);
+        log.info("Dag-{}[{}] Retry job to delay queue success, times-[{}]", dagName, jobName, times + 1);
+        postEvent("retry", "Put to DQ");
+        return true;
+    }
+
+    /**
+     * Get JobDagConfig from local or zk.
+     *
+     * @param fromLocal From local or register center.
+     * @return dag config.
+     */
+    private JobDagConfiguration getJobDagConfig(final boolean fromLocal) {
+        if (fromLocal) {
+            return jobDagConfig;
+        }
+        ConfigurationService configurationService = new ConfigurationService(this.regCenter, this.jobName);
+        JobConfiguration jobConfiguration = configurationService.load(false);
+        if (jobConfiguration == null || jobConfiguration.getJobDagConfiguration() == null) {
+            return jobDagConfig;
+        }
+        return jobConfiguration.getJobDagConfiguration();
+    }
+
+    /**
+     * There is no dag job running.
+     *
+     * @return true if no job running.
+     */
+    private boolean hasNoJobRunning() {
+        return dagNodeStorage.getDagJobListByState(DagJobStates.RUNNING).isEmpty();
+    }
+
+    /**
+     * Acquire next should trigger jobs.
+     *
+     * @return next should trigger jobs
+     */
+    public List<String> nextShouldTriggerJob() {
+        final Map<String, Set<String>> allDagRunJobs = dagNodeStorage.getAllDagGraphJobs();
+        final List<String> successList = dagNodeStorage.getDagJobListByState(DagJobStates.SUCCESS);
+        final List<String> failList = dagNodeStorage.getDagJobListByState(DagJobStates.FAIL);
+        final List<String> runningList = dagNodeStorage.getDagJobListByState(DagJobStates.RUNNING);
+        final List<String> skipList = dagNodeStorage.getDagJobListByState(DagJobStates.SKIP);
+        List<String> nextList = Lists.newLinkedList();
+
+        allDagRunJobs.values().forEach(s -> {
+            if (s.removeIf(x -> successList.contains(x) || skipList.contains(x))) {
+                s.add("self");
+            }
+        });
+
+        successList.stream().forEach(j -> allDagRunJobs.remove(j));
+        skipList.stream().forEach(j -> allDagRunJobs.remove(j));
+
+        allDagRunJobs.entrySet().forEach(x -> {
+            if (x.getValue().isEmpty() || (x.getValue().size() == 1 && x.getValue().contains("self"))) {
+                nextList.add(x.getKey());
+            }
+        });
+
+        nextList.removeAll(runningList);
+        nextList.removeAll(failList);
+
+        log.info("Dag-{} acquire next should Trigger jobs: {}", dagName, nextList);
+        if (log.isDebugEnabled()) {
+            log.debug("Dag-{} NextTrigger all job nodes size:{}", dagName, allDagRunJobs.size());
+            allDagRunJobs.forEach((key, value) -> log.debug("Dag-{} acquire next should Trigger JOBS, Graph- {} = {}", dagName, key, Joiner.on(",").join(value)));
+            log.debug("Dag-{} acquire next should Trigger JOBS, SUCC-[{}], FAIL-[{}], RUN-[{}] , SKIP-[{}] ,Will Trigger-[{}].", dagName, successList, failList, runningList, skipList, nextList);
+        }
+
+        return nextList;
+    }
+
+    /**
+     * When Dag jobs all completed, Statistics dag state.
+     *
+     */
+    private void statisticsDagState() {
+        DagStates dagStates = null;
+        Map<String, Set<String>> allDagRunJobs = dagNodeStorage.getAllDagGraphJobs();
+        List<String> successList = dagNodeStorage.getDagJobListByState(DagJobStates.SUCCESS);
+        List<String> runningList = dagNodeStorage.getDagJobListByState(DagJobStates.RUNNING);
+        List<String> failList = dagNodeStorage.getDagJobListByState(DagJobStates.FAIL);
+        List<String> skipList = dagNodeStorage.getDagJobListByState(DagJobStates.SKIP);
+        int totalJob = allDagRunJobs.size();
+        int succJob = successList.size();
+        int failJob = failList.size();
+        int skipJob = skipList.size();
+        int runningJob = runningList.size();
+
+        log.info("Dag-{}[Statistics] totalJob-{}, successJob-{}, failJob-{}, runningJob-{}, skipJob-{}", dagName, totalJob, succJob, failJob, runningJob, skipJob);
+        if (log.isDebugEnabled()) {
+            log.debug("Dag-{}[Statistics] SUCC-[{}], FAIL-[{}], RUNNING-[{}], SKIP-[{}]", dagName, successList, failList, runningList, skipList);
+        }
+
+        if ((succJob + skipJob) == totalJob) {
+            dagStates = DagStates.SUCCESS;
+        } else if (failJob > 0) {
+            dagStates = DagStates.FAIL;
+        } else {
+            dagStates = DagStates.RUNNING;
+        }
+
+        postEvent(dagStates.getValue(), "Dag Complete");
+        log.info("Dag-{}[Statistics] DAG run complete, Final State-[{}]!", dagName, dagStates);
+        dagNodeStorage.updateDagStates(dagStates);
+    }
+
+    private void postEvent(final String state, final String message) {
+        if (null != jobEventBus) {
+            jobEventBus.post(new DagJobExecutionEvent(dagName, jobName, dagNodeStorage.currentDagBatchNo(), state, message));
+        }
+    }
+
+    /**
+     * zkpath: jobName/state listener.
+     * @param type event type.
+     * @param oldData old data
+     * @param data new data.
+     */
+    @Override
+    public void event(final Type type, final ChildData oldData, final ChildData data) {
+        if (!(type == Type.NODE_CHANGED || type == Type.NODE_CREATED)) {
+            return;
+        }
+
+        JobStateEnum jobState = JobStateEnum.of(new String(data.getData()));
+        log.info("Dag-{}[{}] receive job state EVENT-{}", dagName, jobName, jobState);
+
+        if (jobState == JobStateEnum.RUNNING || jobState == JobStateEnum.NONE) {
+            log.debug("Dag-{}[{}] receive job state EVENT NOT last state-[{}], skip.", dagName, jobName, jobState);
+            return;
+        }
+
+        // deal with retry when fail
+        if (jobState == JobStateEnum.FAIL) {
+            if (retryJob()) {
+                log.info("Dag-{}[{}] Put FAIL job to DQ Success! Waiting Triggered!", dagName, jobName);
+                return;
+            }
+        } else {
+            dagNodeStorage.updateDagJobStates(jobState);
+        }
+
+        postEvent(jobState.getValue(), "Job Complete");
+
+        DagStates dagState = getDagStates();
+        if (dagState == DagStates.PAUSE) {
+            log.info("Dag-{} current dag state is PAUSE, Do not trigger next jobs!", dagName);
+            return;
+        }
+
+        // Acquire next should trigger jobs
+        List<String> willTriggerJobs = nextShouldTriggerJob();
+
+        // If their is none job should trigger and current running job list is empty , start statistics dag state
+        if (willTriggerJobs.isEmpty()) {
+            if (hasNoJobRunning()) {
+                log.info("Dag-{}, No job running, Start statistics The DAG State.", dagName);
+                statisticsDagState();
+                return;
+            }
+            log.info("Dag-{}[{}] No trigger job, Wating for other running jobs.", dagName, jobName);
+        } else {
+            // Else register next trigger jobs.
+            log.info("Dag-{}[{}] trigger job list [{}].", dagName, jobName, willTriggerJobs);
+            willTriggerJobs.forEach(job -> {
+                postEvent("trigger", "Trigger Job");
+                dagNodeStorage.triggerJob(job);
+            });
+        }
+    }
+}
diff --git a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-api/src/main/java/org/apache/shardingsphere/elasticjob/tracing/listener/TracingListener.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/DagStates.java
similarity index 50%
copy from elasticjob-infra/elasticjob-tracing/elasticjob-tracing-api/src/main/java/org/apache/shardingsphere/elasticjob/tracing/listener/TracingListener.java
copy to elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/DagStates.java
index 414ddeb..2b1d68d 100644
--- a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-api/src/main/java/org/apache/shardingsphere/elasticjob/tracing/listener/TracingListener.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/DagStates.java
@@ -7,7 +7,7 @@
  * the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,33 +15,53 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.elasticjob.tracing.listener;
+package org.apache.shardingsphere.elasticjob.lite.internal.dag;
 
-import com.google.common.eventbus.AllowConcurrentEvents;
-import com.google.common.eventbus.Subscribe;
-import org.apache.shardingsphere.elasticjob.tracing.event.JobExecutionEvent;
-import org.apache.shardingsphere.elasticjob.tracing.event.JobStatusTraceEvent;
+import org.apache.commons.lang3.StringUtils;
 
 /**
- * Tracing listener.
+ * Dag state.
+ *
  */
-public interface TracingListener {
-    
+public enum DagStates {
+    NONE("none"),
+
+    RUNNING("running"),
+
+    PAUSE("pause"),
+
+    FAIL("fail"),
+
+    SUCCESS("success");
+
+    private String value;
+
+    DagStates(final String value) {
+        this.value = value;
+    }
+
     /**
-     * Listen job execution event.
+     * give value return Enum.
      *
-     * @param jobExecutionEvent job execution event
+     * @param value enum value
+     * @return DagStates enum.
      */
-    @Subscribe
-    @AllowConcurrentEvents
-    void listen(JobExecutionEvent jobExecutionEvent);
-    
+    public static DagStates of(final String value) {
+        for (DagStates states : DagStates.values()) {
+            if (StringUtils.equalsIgnoreCase(value, states.getValue())) {
+                return states;
+            }
+        }
+        return DagStates.NONE;
+    }
+
     /**
-     * Listen job status trace event.
+     * Get enum value.
      *
-     * @param jobStatusTraceEvent job status trace event
+     * @return string
      */
-    @Subscribe
-    @AllowConcurrentEvents
-    void listen(JobStatusTraceEvent jobStatusTraceEvent);
+    public String getValue() {
+        return value;
+    }
+
 }
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/JobRetryTrigger.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/JobRetryTrigger.java
new file mode 100644
index 0000000..85c70b6
--- /dev/null
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/JobRetryTrigger.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.lite.internal.dag;
+
+import com.google.common.base.Splitter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.queue.QueueConsumer;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
+
+import java.util.List;
+
+/**
+ * Trigger retry dag job.
+ *
+ **/
+@Slf4j
+public class JobRetryTrigger implements QueueConsumer<String> {
+    private final CoordinatorRegistryCenter regCenter;
+
+    private final String dagName;
+
+    public JobRetryTrigger(final CoordinatorRegistryCenter regCenter, final String dagName) {
+        this.regCenter = regCenter;
+        this.dagName = dagName;
+    }
+
+    @Override
+    public void consumeMessage(final String message) throws Exception {
+        // message format: dagName||jobName
+        // trigger the job only when dag state is running
+        if (StringUtils.isEmpty(message)) {
+            log.info("Dag-{} Retry job Receive message is empty, return!", dagName);
+            return;
+        }
+
+        List<String> strings = Splitter.on("||").splitToList(message);
+        if (strings.size() != 2) {
+            log.info("Dag-{} Retry job message format not right! {}", dagName, message);
+            return;
+        }
+
+        String jobName = strings.get(1);
+        log.info("Dag-{} start Retry job-{}", dagName, jobName);
+        DagNodeStorage dagNodeStorage = new DagNodeStorage(regCenter, dagName, jobName);
+
+        DagStates dagState = DagStates.of(dagNodeStorage.currentDagStates());
+        if (dagState != DagStates.RUNNING) {
+            log.info("Dag-{} retry job, dag state-{} not RUNNING, quit!", dagName, jobName, dagState);
+            return;
+        }
+
+        dagNodeStorage.triggerRetryJob();
+    }
+
+    @Override
+    public void stateChanged(final CuratorFramework client, final ConnectionState newState) {
+
+    }
+
+}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduler.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduler.java
index 559cfaf..f175907 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduler.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduler.java
@@ -82,10 +82,10 @@ public final class JobScheduler {
         this.elasticJobListeners = Arrays.asList(elasticJobListeners);
         setUpFacade = new SetUpFacade(regCenter, jobConfig.getJobName(), this.elasticJobListeners);
         schedulerFacade = new SchedulerFacade(regCenter, jobConfig.getJobName());
-        jobFacade = new LiteJobFacade(regCenter, jobConfig.getJobName(), this.elasticJobListeners, tracingConfig);
-        jobExecutor = null == elasticJob ? new ElasticJobExecutor(elasticJobType, jobConfig, jobFacade) : new ElasticJobExecutor(elasticJob, jobConfig, jobFacade);
         String jobClassName = JobClassNameProviderFactory.getProvider().getJobClassName(elasticJob);
         this.jobConfig = setUpFacade.setUpJobConfiguration(jobClassName, jobConfig);
+        jobFacade = new LiteJobFacade(regCenter, jobConfig.getJobName(), this.elasticJobListeners, tracingConfig, jobConfig.getJobDagConfiguration());
+        jobExecutor = null == elasticJob ? new ElasticJobExecutor(elasticJobType, jobConfig, jobFacade) : new ElasticJobExecutor(elasticJob, jobConfig, jobFacade);
         setGuaranteeServiceForElasticJobListeners(regCenter, this.elasticJobListeners);
         jobScheduleController = createJobScheduleController();
     }
@@ -102,9 +102,9 @@ public final class JobScheduler {
         this.elasticJobListeners = Arrays.asList(elasticJobListeners);
         setUpFacade = new SetUpFacade(regCenter, jobConfig.getJobName(), this.elasticJobListeners);
         schedulerFacade = new SchedulerFacade(regCenter, jobConfig.getJobName());
-        jobFacade = new LiteJobFacade(regCenter, jobConfig.getJobName(), this.elasticJobListeners, tracingConfig);
-        jobExecutor = null == elasticJob ? new ElasticJobExecutor(elasticJobType, jobConfig, jobFacade) : new ElasticJobExecutor(elasticJob, jobConfig, jobFacade);
         this.jobConfig = setUpFacade.setUpJobConfiguration(elasticJobType, jobConfig);
+        jobFacade = new LiteJobFacade(regCenter, jobConfig.getJobName(), this.elasticJobListeners, tracingConfig, jobConfig.getJobDagConfiguration());
+        jobExecutor = null == elasticJob ? new ElasticJobExecutor(elasticJobType, jobConfig, jobFacade) : new ElasticJobExecutor(elasticJob, jobConfig, jobFacade);
         setGuaranteeServiceForElasticJobListeners(regCenter, this.elasticJobListeners);
         jobScheduleController = createJobScheduleController();
     }
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/LiteJobFacade.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/LiteJobFacade.java
index 23067b5..af72da4 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/LiteJobFacade.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/LiteJobFacade.java
@@ -20,12 +20,16 @@ package org.apache.shardingsphere.elasticjob.lite.internal.schedule;
 import com.google.common.base.Strings;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
+import org.apache.shardingsphere.elasticjob.api.JobDagConfiguration;
 import org.apache.shardingsphere.elasticjob.api.listener.ElasticJobListener;
 import org.apache.shardingsphere.elasticjob.api.listener.ShardingContexts;
 import org.apache.shardingsphere.elasticjob.executor.JobFacade;
 import org.apache.shardingsphere.elasticjob.infra.context.TaskContext;
+import org.apache.shardingsphere.elasticjob.infra.exception.DagRuntimeException;
 import org.apache.shardingsphere.elasticjob.infra.exception.JobExecutionEnvironmentException;
 import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
+import org.apache.shardingsphere.elasticjob.lite.internal.dag.DagService;
+import org.apache.shardingsphere.elasticjob.lite.internal.dag.DagStates;
 import org.apache.shardingsphere.elasticjob.lite.internal.failover.FailoverService;
 import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ExecutionContextService;
 import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ExecutionService;
@@ -40,6 +44,7 @@ import org.apache.shardingsphere.elasticjob.tracing.event.JobStatusTraceEvent.St
 
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Lite job facade.
@@ -60,6 +65,8 @@ public final class LiteJobFacade implements JobFacade {
     private final List<ElasticJobListener> elasticJobListeners;
     
     private final JobEventBus jobEventBus;
+
+    private final DagService dagService;
     
     public LiteJobFacade(final CoordinatorRegistryCenter regCenter, final String jobName, final List<ElasticJobListener> elasticJobListeners, final TracingConfiguration tracingConfig) {
         configService = new ConfigurationService(regCenter, jobName);
@@ -69,8 +76,25 @@ public final class LiteJobFacade implements JobFacade {
         failoverService = new FailoverService(regCenter, jobName);
         this.elasticJobListeners = elasticJobListeners;
         this.jobEventBus = null == tracingConfig ? new JobEventBus() : new JobEventBus(tracingConfig);
+        this.dagService = null;
     }
-    
+
+    public LiteJobFacade(final CoordinatorRegistryCenter regCenter, final String jobName, final List<ElasticJobListener> elasticJobListeners,
+                         final TracingConfiguration tracingConfig, final JobDagConfiguration jobDagConfiguration) {
+        configService = new ConfigurationService(regCenter, jobName);
+        shardingService = new ShardingService(regCenter, jobName);
+        executionContextService = new ExecutionContextService(regCenter, jobName);
+        executionService = new ExecutionService(regCenter, jobName);
+        failoverService = new FailoverService(regCenter, jobName);
+        this.elasticJobListeners = elasticJobListeners;
+        this.jobEventBus = null == tracingConfig ? new JobEventBus() : new JobEventBus(tracingConfig);
+        if (null == jobDagConfiguration) {
+            this.dagService = null;
+        } else {
+            this.dagService = new DagService(regCenter, jobName, jobEventBus, jobDagConfiguration);
+        }
+    }
+
     @Override
     public JobConfiguration loadJobConfiguration(final boolean fromCache) {
         return configService.load(fromCache);
@@ -100,7 +124,15 @@ public final class LiteJobFacade implements JobFacade {
             failoverService.updateFailoverComplete(shardingContexts.getShardingItemParameters().keySet());
         }
     }
-    
+
+    @Override
+    public void registerJobCompleted(final ShardingContexts shardingContexts, final Map<Integer, String> itemErrorMessages) {
+        executionService.registerJobCompleted(shardingContexts, itemErrorMessages);
+        if (configService.load(true).isFailover()) {
+            failoverService.updateFailoverComplete(shardingContexts.getShardingItemParameters().keySet());
+        }
+    }
+
     @Override
     public ShardingContexts getShardingContexts() {
         boolean isFailover = configService.load(true).isFailover();
@@ -167,4 +199,33 @@ public final class LiteJobFacade implements JobFacade {
             log.trace(message);
         }
     }
+
+    @Override
+    public boolean isDagJob() {
+        return dagService != null;
+    }
+
+    @Override
+    public void dagStatesCheck() {
+        if (dagService.getDagStates() == DagStates.RUNNING) {
+            return;
+        }
+        if (dagService.getDagStates() == DagStates.PAUSE) {
+            throw new DagRuntimeException("Dag Job states PAUSE");
+        }
+
+        if (dagService.isDagRootJob()) {
+            dagService.changeDagStatesAndReGraph();
+        }
+
+        if (dagService.getDagStates() != DagStates.RUNNING) {
+            log.info("DAG states not right {}", dagService.getDagStates().getValue());
+            throw new DagRuntimeException("Dag states Not right!");
+        }
+    }
+
+    @Override
+    public void dagJobDependenciesCheck() {
+        dagService.checkJobDependenciesState();
+    }
 }
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ExecutionService.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ExecutionService.java
index de8a711..34169ff 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ExecutionService.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ExecutionService.java
@@ -21,12 +21,15 @@ import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
 import org.apache.shardingsphere.elasticjob.api.listener.ShardingContexts;
 import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
 import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobRegistry;
+import org.apache.shardingsphere.elasticjob.lite.internal.state.JobStateEnum;
+import org.apache.shardingsphere.elasticjob.lite.internal.state.JobStateNode;
 import org.apache.shardingsphere.elasticjob.lite.internal.storage.JobNodeStorage;
 import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
 
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Execution service.
@@ -58,6 +61,16 @@ public final class ExecutionService {
         for (int each : shardingContexts.getShardingItemParameters().keySet()) {
             jobNodeStorage.fillEphemeralJobNode(ShardingNode.getRunningNode(each), "");
         }
+        if (!jobNodeStorage.isJobNodeExisted(JobStateNode.getRootState())) {
+            jobNodeStorage.createJobNodeIfNeeded(JobStateNode.getRootState(), JobStateEnum.RUNNING);
+        } else {
+            JobStateEnum jobState = JobStateEnum.of(jobNodeStorage.getJobNodeDataDirectly(JobStateNode.getRootState()));
+            if (jobState != JobStateEnum.RUNNING) {
+                jobNodeStorage.updateJobNode(JobStateNode.getRootState(), JobStateEnum.RUNNING);
+            }
+        }
+        jobNodeStorage.createJobNodeIfNeeded(JobStateNode.getRooProcSucc());
+        jobNodeStorage.createJobNodeIfNeeded(JobStateNode.getRooProcFail());
     }
     
     /**
@@ -74,7 +87,49 @@ public final class ExecutionService {
             jobNodeStorage.removeJobNodeIfExisted(ShardingNode.getRunningNode(each));
         }
     }
-    
+
+    /**
+     * Register job completed and Statistical the job status.
+     *
+     * @param shardingContexts sharding contexts
+     * @param itemErrorMessages error items.
+     */
+    public void registerJobCompleted(final ShardingContexts shardingContexts, final Map<Integer, String> itemErrorMessages) {
+        JobRegistry.getInstance().setJobRunning(jobName, false);
+        if (!configService.load(true).isMonitorExecution()) {
+            return;
+        }
+        for (int each : shardingContexts.getShardingItemParameters().keySet()) {
+            jobNodeStorage.removeJobNodeIfExisted(ShardingNode.getRunningNode(each));
+
+            if (itemErrorMessages.containsKey(each)) {
+                // fail
+                jobNodeStorage.removeJobNodeIfExisted(JobStateNode.getProcSucc(each));
+                jobNodeStorage.createJobNodeIfNeeded(JobStateNode.getProcFail(each));
+            } else {
+                // succ
+                jobNodeStorage.removeJobNodeIfExisted(JobStateNode.getProcFail(each));
+                jobNodeStorage.createJobNodeIfNeeded(JobStateNode.getProcSucc(each));
+            }
+        }
+
+        List<String> succList = jobNodeStorage.getJobNodeChildrenKeys(JobStateNode.getRooProcSucc());
+        List<String> failList = jobNodeStorage.getJobNodeChildrenKeys(JobStateNode.getRooProcFail());
+        int shardingTotalCount = shardingContexts.getShardingTotalCount();
+        int succCount = succList.size();
+        int failCount = failList.size();
+        if ((succCount + failCount) == shardingTotalCount) {
+            if (failCount == 0) {
+                jobNodeStorage.updateJobNode(JobStateNode.getRootState(), JobStateEnum.SUCCESS);
+            } else {
+                jobNodeStorage.updateJobNode(JobStateNode.getRootState(), JobStateEnum.FAIL);
+            }
+            jobNodeStorage.removeJobNodeIfExisted(JobStateNode.getRooProcFail());
+            jobNodeStorage.removeJobNodeIfExisted(JobStateNode.getRooProcSucc());
+        }
+
+    }
+
     /**
      * Clear all running info.
      */
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/state/JobStateEnum.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/state/JobStateEnum.java
new file mode 100644
index 0000000..c2068b6
--- /dev/null
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/state/JobStateEnum.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.lite.internal.state;
+
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * job state enum.
+ *
+ **/
+public enum JobStateEnum {
+    NONE("none"),
+
+    RUNNING("running"),
+
+    SKIP("skip"),
+
+    FAIL("fail"),
+
+    SUCCESS("success");
+
+    private String value;
+
+    JobStateEnum(final String value) {
+        this.value = value;
+    }
+
+    /**
+     * Give value return enum.
+     *
+     * @param value of enum.
+     * @return JobStateEnum
+     */
+    public static JobStateEnum of(final String value) {
+        for (JobStateEnum states : JobStateEnum.values()) {
+            if (StringUtils.equalsIgnoreCase(value, states.getValue())) {
+                return states;
+            }
+        }
+        return JobStateEnum.NONE;
+    }
+
+    /**
+     * Get value of enum.
+     *
+     * @return string
+     */
+    public String getValue() {
+        return value;
+    }
+
+}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/state/JobStateNode.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/state/JobStateNode.java
new file mode 100644
index 0000000..dbf648c
--- /dev/null
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/state/JobStateNode.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.lite.internal.state;
+
+import org.apache.shardingsphere.elasticjob.lite.internal.storage.JobNodePath;
+
+/**
+ * Job state state node path.
+ *
+ **/
+public class JobStateNode {
+    public static final String ROOT_STATE = "state/state";
+
+    public static final String ROOT_STATE_FOR_CACHE = "state";
+
+    private static final String ROOT_PROC = "proc";
+
+    private static final String ROOT_FAIL = "proc/fail";
+
+    private static final String ROOT_SUCC = "proc/succ";
+
+    private static final String PROC_FAIL = "proc/fail/%s";
+
+    private static final String PROC_SUCC = "proc/succ/%s";
+
+    private final JobNodePath jobNodePath;
+
+    public JobStateNode(final String jobName) {
+        jobNodePath = new JobNodePath(jobName);
+    }
+
+    /**
+     * Get job's root state path.
+     *
+     * @return root state path.
+     */
+    public static String getRootState() {
+        return ROOT_STATE;
+    }
+
+    /**
+     * Get job's proc path.
+     *
+     * @return proc path
+     */
+    public static String getRootProc() {
+        return ROOT_PROC;
+    }
+
+    /**
+     * Get job item's fail path.
+     *
+     * @param item sharding item
+     * @return fail path.
+     */
+    public static String getProcFail(final int item) {
+        return String.format(PROC_FAIL, item);
+    }
+
+    /**
+     * Get job item's success path.
+     * @param item sharding item.
+     * @return success path.
+     */
+    public static String getProcSucc(final int item) {
+        return String.format(PROC_SUCC, item);
+    }
+
+    /**
+     * Get job proc fail path.
+     *
+     * @return proc fail path
+     */
+    public static String getRooProcFail() {
+        return ROOT_FAIL;
+    }
+
+    /**
+     * Get job proc success path.
+     *
+     * @return proc success path
+     */
+    public static String getRooProcSucc() {
+        return ROOT_SUCC;
+    }
+
+}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/storage/JobNodeStorage.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/storage/JobNodeStorage.java
index 63e8eef..55d0f3c 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/storage/JobNodeStorage.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/storage/JobNodeStorage.java
@@ -118,7 +118,19 @@ public final class JobNodeStorage {
             regCenter.persist(jobNodePath.getFullPath(node), "");
         }
     }
-    
+
+    /**
+     * Create job node if needed.
+     *
+     * @param node node
+     * @param value value
+     */
+    public void createJobNodeIfNeeded(final String node, final Object value) {
+        if (isJobRootNodeExisted() && !isJobNodeExisted(node)) {
+            regCenter.persist(jobNodePath.getFullPath(node), value.toString());
+        }
+    }
+
     /**
      * Remove job node if existed.
      * 
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/DagJobRetryTriggerTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/DagJobRetryTriggerTest.java
new file mode 100644
index 0000000..611b8cd
--- /dev/null
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/DagJobRetryTriggerTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.lite.internal.dag;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.shardingsphere.elasticjob.lite.util.ReflectionUtils;
+import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DagJobRetryTriggerTest {
+
+    @Mock
+    private CoordinatorRegistryCenter regCenter;
+
+    private JobRetryTrigger jobRetryTrigger;
+
+    @Before
+    public void setUp() {
+        jobRetryTrigger = new JobRetryTrigger(regCenter, "testDag");
+        ReflectionUtils.setFieldValue(jobRetryTrigger, "regCenter", regCenter);
+    }
+
+    @Test
+    public void consumeMessage() throws Exception {
+        when(regCenter.isExisted("/dag/testDag/states")).thenReturn(true);
+        when(regCenter.getDirectly("/dag/testDag/states")).thenReturn("running");
+        jobRetryTrigger.consumeMessage("testDag||job1");
+    }
+
+    @Test
+    public void stateChanged() {
+        jobRetryTrigger.stateChanged((CuratorFramework) regCenter.getRawClient(), ConnectionState.CONNECTED);
+    }
+}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/DagNodeStorageTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/DagNodeStorageTest.java
new file mode 100644
index 0000000..7865545
--- /dev/null
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/DagNodeStorageTest.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.lite.internal.dag;
+
+import com.google.common.collect.Lists;
+import org.apache.shardingsphere.elasticjob.lite.internal.state.JobStateEnum;
+import org.apache.shardingsphere.elasticjob.lite.util.ReflectionUtils;
+import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.internal.util.collections.Sets;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DagNodeStorageTest {
+
+    @Mock
+    private CoordinatorRegistryCenter regCenter;
+
+    private DagNodeStorage dagNodeStorage;
+
+    @Before
+    public void setUp() {
+        dagNodeStorage = new DagNodeStorage(regCenter, "testDag", "testJob");
+        ReflectionUtils.setFieldValue(dagNodeStorage, "regCenter", regCenter);
+    }
+
+    private Map<String, Set<String>> createAllDagNode() {
+        Map<String, Set<String>> allDagNode = new HashMap<>(6);
+        allDagNode.put("testJob", Sets.newSet("self"));
+        allDagNode.put("job1", Sets.newSet("testJob"));
+        allDagNode.put("job2", Sets.newSet("testJob"));
+        allDagNode.put("job3", Sets.newSet("testJob", "job1"));
+        allDagNode.put("job4", Sets.newSet("job1", "job2"));
+        allDagNode.put("job5", Sets.newSet("job3", "job4"));
+        return allDagNode;
+    }
+
+    @Test
+    public void persistDagConfig() {
+        dagNodeStorage.persistDagConfig("value");
+        verify(regCenter).persist("/dag/testDag/config/testJob", "value");
+    }
+
+    @Test
+    public void pathOfJobNodeState() {
+        String s = dagNodeStorage.pathOfJobNodeState();
+        assertThat(s, is("/testJob/state/state"));
+    }
+
+    @Test
+    public void initDagGraph() {
+        when(regCenter.isExisted("/dag/testDag/graph")).thenReturn(true);
+        dagNodeStorage.initDagGraph(createAllDagNode(), "batch-foo-no");
+        verify(regCenter).remove("/dag/testDag/graph");
+        verify(regCenter).persist("/dag/testDag/graph", "batch-foo-no");
+    }
+
+    @Test
+    public void currentDagBatchNo() {
+        when(regCenter.getDirectly("/dag/testDag/graph")).thenReturn("batch-foo-no");
+        assertThat(dagNodeStorage.currentDagBatchNo(), is("batch-foo-no"));
+        verify(regCenter).getDirectly("/dag/testDag/graph");
+    }
+
+    @Test
+    public void currentDagStates() {
+        when(regCenter.isExisted("/dag/testDag/states")).thenReturn(true);
+        when(regCenter.getDirectly("/dag/testDag/states")).thenReturn("fail");
+        assertThat(dagNodeStorage.currentDagStates(), is("fail"));
+        verify(regCenter).getDirectly("/dag/testDag/states");
+    }
+
+    @Test
+    public void updateDagStates() {
+        dagNodeStorage.updateDagStates(DagStates.SUCCESS);
+        verify(regCenter).update("/dag/testDag/states", "success");
+    }
+
+    @Test
+    public void updateDagJobStatesRunning() {
+        dagNodeStorage.updateDagJobStates(JobStateEnum.RUNNING);
+        verify(regCenter, times(1)).persist(eq("/dag/testDag/running/testJob"), anyString());
+    }
+
+    @Test
+    public void updateDagJobStatesSkip() {
+        dagNodeStorage.updateDagJobStates(JobStateEnum.SKIP);
+        verify(regCenter).remove("/dag/testDag/running/testJob");
+        verify(regCenter, times(1)).persist(eq("/dag/testDag/skip/testJob"), anyString());
+    }
+
+    @Test
+    public void updateDagJobStatesSuccess() {
+        dagNodeStorage.updateDagJobStates(JobStateEnum.SUCCESS);
+        verify(regCenter).remove("/dag/testDag/running/testJob");
+        verify(regCenter, times(1)).persist(eq("/dag/testDag/success/testJob"), anyString());
+    }
+
+    @Test
+    public void updateDagJobStatesFail() {
+        dagNodeStorage.updateDagJobStates(JobStateEnum.FAIL);
+        verify(regCenter).remove("/dag/testDag/running/testJob");
+        verify(regCenter, times(1)).persist(eq("/dag/testDag/fail/testJob"), anyString());
+    }
+
+    @Test
+    public void getAllDagConfigJobs() {
+        when(regCenter.getChildrenKeys("/dag/testDag/config")).thenReturn(Lists.newArrayList("job1", "job2", "job3"));
+        when(regCenter.getDirectly("/dag/testDag/config/job1")).thenReturn("self");
+        when(regCenter.getDirectly("/dag/testDag/config/job2")).thenReturn("self,job1");
+        when(regCenter.getDirectly("/dag/testDag/config/job3")).thenReturn("job1,job2");
+        Map<String, Set<String>> allDagConfigJobs = dagNodeStorage.getAllDagConfigJobs();
+        assertThat(allDagConfigJobs.size(), is(3));
+    }
+
+    @Test
+    public void getAllDagGraphJobs() {
+        when(regCenter.getChildrenKeys("/dag/testDag/graph")).thenReturn(Lists.newArrayList("job1", "job2", "job3"));
+        when(regCenter.getDirectly("/dag/testDag/graph/job1")).thenReturn("self");
+        when(regCenter.getDirectly("/dag/testDag/graph/job2")).thenReturn("self,job1");
+        when(regCenter.getDirectly("/dag/testDag/graph/job3")).thenReturn("job1,job2");
+        Map<String, Set<String>> allDagGraphJobs = dagNodeStorage.getAllDagGraphJobs();
+        assertThat(allDagGraphJobs.size(), is(3));
+    }
+
+    @Test
+    public void getDagJobListByState() {
+        when(regCenter.getChildrenKeys("/dag/testDag/running")).thenReturn(Lists.newArrayList("job1", "job2"));
+        when(regCenter.getChildrenKeys("/dag/testDag/success")).thenReturn(Lists.newArrayList("job1", "job2"));
+        when(regCenter.getChildrenKeys("/dag/testDag/fail")).thenReturn(Lists.newArrayList("job1", "job2"));
+        when(regCenter.getChildrenKeys("/dag/testDag/skip")).thenReturn(Lists.newArrayList("job1", "job2"));
+        when(regCenter.getChildrenKeys("/dag/testDag/retry")).thenReturn(Lists.newArrayList("job1", "job2"));
+        assertThat(dagNodeStorage.getDagJobListByState(DagJobStates.RUNNING).size(), is(2));
+        assertThat(dagNodeStorage.getDagJobListByState(DagJobStates.SUCCESS).size(), is(2));
+        assertThat(dagNodeStorage.getDagJobListByState(DagJobStates.FAIL).size(), is(2));
+        assertThat(dagNodeStorage.getDagJobListByState(DagJobStates.SKIP).size(), is(2));
+        assertThat(dagNodeStorage.getDagJobListByState(DagJobStates.RETRY).size(), is(2));
+    }
+
+    @Test
+    public void getJobDenpendencies() {
+        when(regCenter.get("/dag/testDag/graph/testJob")).thenReturn("job1,job2");
+        String[] jobDenpendencies = dagNodeStorage.getJobDenpendencies();
+        assertThat(jobDenpendencies, is(new String[]{"job1", "job2"}));
+    }
+
+    @Test
+    public void getDagJobRunStates() {
+        when(regCenter.isExisted("/dag/testDag/success/job1")).thenReturn(true);
+        when(regCenter.isExisted("/dag/testDag/fail/job2")).thenReturn(true);
+        when(regCenter.isExisted("/dag/testDag/running/job3")).thenReturn(true);
+        when(regCenter.isExisted("/dag/testDag/skip/job4")).thenReturn(true);
+        assertThat(dagNodeStorage.getDagJobRunStates("job1"), is(DagJobStates.SUCCESS));
+        assertThat(dagNodeStorage.getDagJobRunStates("job2"), is(DagJobStates.FAIL));
+        assertThat(dagNodeStorage.getDagJobRunStates("job3"), is(DagJobStates.RUNNING));
+        assertThat(dagNodeStorage.getDagJobRunStates("job4"), is(DagJobStates.SKIP));
+    }
+
+    @Test
+    public void triggerJobWhenTrigged() {
+        when(regCenter.isExisted("/dag/testDag/running/job1")).thenReturn(true);
+        dagNodeStorage.triggerJob("job1");
+    }
+
+    @Test
+    public void triggerJob() {
+        when(regCenter.isExisted("/dag/testDag/running/job1")).thenReturn(false);
+        when(regCenter.isExisted("/dag/testDag/success/job1")).thenReturn(false);
+        when(regCenter.isExisted("/dag/testDag/fail/job1")).thenReturn(false);
+        dagNodeStorage.triggerJob("job1");
+    }
+
+    @Test
+    public void triggerRetryJob() {
+        dagNodeStorage.triggerRetryJob();
+    }
+
+    @Test
+    public void getJobRetryTimes() {
+        when(regCenter.getDirectly("/dag/testDag/graph/testJob/retry")).thenReturn("3");
+        assertThat(dagNodeStorage.getJobRetryTimes(), is(3));
+    }
+
+    @Test
+    public void updateJobRetryTimes() {
+        dagNodeStorage.updateJobRetryTimes(3);
+        verify(regCenter).persist("/dag/testDag/retry/testJob", "");
+        verify(regCenter).persist("/dag/testDag/graph/testJob/retry", "3");
+    }
+
+    @Test
+    public void removeFailJob() {
+        dagNodeStorage.removeFailJob("job1");
+        verify(regCenter).remove("/dag/testDag/fail/job1");
+    }
+
+    @Test
+    public void getAllDags() {
+        dagNodeStorage.getAllDags();
+        verify(regCenter).getChildrenKeys("/dag");
+    }
+
+}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/DagServiceTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/DagServiceTest.java
new file mode 100644
index 0000000..d5baf2e
--- /dev/null
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/DagServiceTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.lite.internal.dag;
+
+import com.google.common.collect.Lists;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
+import org.apache.shardingsphere.elasticjob.api.JobDagConfiguration;
+import org.apache.shardingsphere.elasticjob.infra.exception.DagRuntimeException;
+import org.apache.shardingsphere.elasticjob.lite.util.ReflectionUtils;
+import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
+import org.apache.shardingsphere.elasticjob.tracing.JobEventBus;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.internal.util.collections.Sets;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DagServiceTest {
+
+    @Mock
+    private CoordinatorRegistryCenter regCenter;
+
+    @Mock
+    private DagNodeStorage dagNodeStorage;
+
+    @Mock
+    private JobEventBus jobEventBus;
+
+    private DagService dagService;
+
+    @Before
+    public void setUp() {
+        dagService = new DagService(regCenter, "testDag", dagNodeStorage);
+        ReflectionUtils.setFieldValue(dagNodeStorage, "regCenter", regCenter);
+        ReflectionUtils.setFieldValue(dagService, "jobEventBus", jobEventBus);
+        ReflectionUtils.setFieldValue(dagService, "dagNodeStorage", dagNodeStorage);
+        ReflectionUtils.setFieldValue(dagService, "jobDagConfig",
+                new JobDagConfiguration("testDag", "job1, job2", 3, 300, false, false));
+        ReflectionUtils.setFieldValue(dagService, "jobName", "testJob");
+    }
+
+    @Test
+    public void isDagRootJob() {
+        assertFalse(dagService.isDagRootJob());
+    }
+
+    @Test
+    public void getDagStates() {
+        when(dagNodeStorage.currentDagStates()).thenReturn("running");
+        assertThat(dagService.getDagStates(), is(DagStates.RUNNING));
+    }
+
+    @Test(expected = DagRuntimeException.class)
+    public void changeDagStatesAndReGraph() {
+        dagService.changeDagStatesAndReGraph();
+    }
+
+    @Test(expected = DagRuntimeException.class)
+    public void checkJobDependenciesState() {
+        when(dagNodeStorage.getDagJobRunStates("testJob")).thenReturn(DagJobStates.FAIL);
+        dagService.checkJobDependenciesState();
+    }
+
+    @Test
+    public void checkJobDependenciesStateRunning() {
+        when(dagNodeStorage.getDagJobRunStates("testJob")).thenReturn(DagJobStates.RUNNING);
+        when(dagNodeStorage.getJobDenpendencies()).thenReturn(new String[]{"self"});
+        dagService.checkJobDependenciesState();
+    }
+
+    @Test
+    public void nextShouldTriggerJob() {
+        when(dagNodeStorage.getAllDagGraphJobs()).thenReturn(createAllDagGraph());
+        when(dagNodeStorage.getDagJobListByState(DagJobStates.RUNNING)).thenReturn(Lists.newArrayList("job3"));
+        when(dagNodeStorage.getDagJobListByState(DagJobStates.SKIP)).thenReturn(Lists.newArrayList("job1"));
+        when(dagNodeStorage.getDagJobListByState(DagJobStates.SUCCESS)).thenReturn(Lists.newArrayList("job2", "testJob"));
+        when(dagNodeStorage.getDagJobListByState(DagJobStates.FAIL)).thenReturn(Lists.newArrayList());
+        assertThat(dagService.nextShouldTriggerJob(), is(Lists.newArrayList("job4")));
+    }
+
+    private Map<String, Set<String>> createAllDagGraph() {
+        Map<String, Set<String>> allDagNode = new HashMap<>(6);
+        allDagNode.put("testJob", Sets.newSet("self"));
+        allDagNode.put("job1", Sets.newSet("testJob"));
+        allDagNode.put("job2", Sets.newSet("testJob"));
+        allDagNode.put("job3", Sets.newSet("testJob", "job1"));
+        allDagNode.put("job4", Sets.newSet("job1", "job2"));
+        allDagNode.put("job5", Sets.newSet("job3", "job4"));
+        return allDagNode;
+    }
+
+    @Test
+    public void event() {
+        dagService.event(CuratorCacheListener.Type.NODE_CHANGED, new ChildData("/job1/state/state", null, "running".getBytes()), new ChildData("/job1/state/state", null, "success".getBytes()));
+    }
+}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/JobDagConfigTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/JobDagConfigTest.java
new file mode 100644
index 0000000..b76f9ec
--- /dev/null
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/JobDagConfigTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.lite.internal.dag;
+
+import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
+import org.apache.shardingsphere.elasticjob.api.JobDagConfiguration;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import org.apache.shardingsphere.elasticjob.infra.yaml.YamlEngine;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+//CHECKSTYLE:OFF
+//CHECKSTYLE:ON
+
+/**
+ * Test for dag config.
+ **/
+public class JobDagConfigTest {
+
+    @Test
+    public void assertBuildJobDagConfig() {
+        JobDagConfiguration jobDagConfig = new JobDagConfiguration();
+        jobDagConfig.setDagName("dagGroup");
+        jobDagConfig.setDagDependencies("jobb,jobc");
+        JobConfiguration actual = JobConfiguration.newBuilder("test_job", 3)
+                .jobDagConfiguration(jobDagConfig)
+                .cron("0/1 * * * * ?")
+                .build();
+        assertNotNull(actual.getJobDagConfiguration());
+        assertThat(actual.getJobDagConfiguration().getDagName(), is("dagGroup"));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void assertBuildJobDagConfigEmpty() {
+        JobDagConfiguration jobDagConfig = new JobDagConfiguration();
+        JobConfiguration actual = JobConfiguration.newBuilder("test_job", 3)
+                .jobDagConfiguration(jobDagConfig)
+                .build();
+    }
+
+    @Test
+    public void assertJobDagConfigGson() {
+        JobDagConfiguration jobDagConfig = new JobDagConfiguration();
+        jobDagConfig.setDagName("dagGroup");
+        jobDagConfig.setDagDependencies("jobb,jobc");
+        jobDagConfig.setDagSkipWhenFail(true);
+        jobDagConfig.setDagRunAlone(false);
+        jobDagConfig.setRetryInterval(5000);
+        jobDagConfig.setRetryTimes(5);
+        JobConfiguration actual = JobConfiguration.newBuilder("test_job", 3)
+                .jobDagConfiguration(jobDagConfig)
+                .cron("0/1 * * * * ?")
+                .jobExecutorServiceHandlerType("simple")
+                .description("test yaml")
+                .failover(true)
+                .jobParameter("a=1,b=-2")
+                .overwrite(true)
+                .build();
+
+        System.out.println(actual);
+
+        String marshal = YamlEngine.marshal(JobConfigurationPOJO.fromJobConfiguration(actual));
+        System.out.println(marshal);
+
+        JobConfiguration unmarshal = YamlEngine.unmarshal(marshal, JobConfigurationPOJO.class).toJobConfiguration();
+
+        assertNotNull(unmarshal);
+        assertNotNull(unmarshal.getJobDagConfiguration());
+        assertEquals(unmarshal.getJobDagConfiguration().getDagName(), "dagGroup");
+        assertEquals(unmarshal.getJobDagConfiguration().getDagDependencies(), "jobb,jobc");
+        assertEquals(unmarshal.getJobDagConfiguration().getRetryTimes(), 5);
+        assertEquals(unmarshal.getJobDagConfiguration().getRetryInterval(), 5000);
+        assertEquals(unmarshal.getJobDagConfiguration().isDagRunAlone(), false);
+        assertEquals(unmarshal.getJobDagConfiguration().isDagSkipWhenFail(), true);
+    }
+}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/LiteJobFacadeTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/LiteJobFacadeTest.java
index 1642ab6..fc438a3 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/LiteJobFacadeTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/LiteJobFacadeTest.java
@@ -20,10 +20,13 @@ package org.apache.shardingsphere.elasticjob.lite.internal.schedule;
 import com.google.common.collect.Lists;
 import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
 import org.apache.shardingsphere.elasticjob.api.listener.ShardingContexts;
+import org.apache.shardingsphere.elasticjob.infra.exception.DagRuntimeException;
 import org.apache.shardingsphere.elasticjob.infra.exception.JobExecutionEnvironmentException;
 import org.apache.shardingsphere.elasticjob.lite.api.listener.fixture.ElasticJobListenerCaller;
 import org.apache.shardingsphere.elasticjob.lite.api.listener.fixture.TestElasticJobListener;
 import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
+import org.apache.shardingsphere.elasticjob.lite.internal.dag.DagService;
+import org.apache.shardingsphere.elasticjob.lite.internal.dag.DagStates;
 import org.apache.shardingsphere.elasticjob.lite.internal.failover.FailoverService;
 import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ExecutionContextService;
 import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ExecutionService;
@@ -41,6 +44,7 @@ import java.util.Collections;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -68,6 +72,9 @@ public final class LiteJobFacadeTest {
     
     @Mock
     private ElasticJobListenerCaller caller;
+
+    @Mock
+    private DagService dagService;
     
     private LiteJobFacade liteJobFacade;
     
@@ -80,6 +87,7 @@ public final class LiteJobFacadeTest {
         ReflectionUtils.setFieldValue(liteJobFacade, "executionService", executionService);
         ReflectionUtils.setFieldValue(liteJobFacade, "failoverService", failoverService);
         ReflectionUtils.setFieldValue(liteJobFacade, "jobEventBus", jobEventBus);
+        ReflectionUtils.setFieldValue(liteJobFacade, "dagService", dagService);
     }
     
     @Test
@@ -212,4 +220,21 @@ public final class LiteJobFacadeTest {
         liteJobFacade.postJobExecutionEvent(null);
         verify(jobEventBus).post(null);
     }
+
+    @Test
+    public void assertIsDagJob() {
+        assertTrue(liteJobFacade.isDagJob());
+    }
+
+    @Test(expected = DagRuntimeException.class)
+    public void assertDagStatesCheck() {
+        when(dagService.getDagStates()).thenReturn(DagStates.PAUSE);
+        liteJobFacade.dagStatesCheck();
+    }
+
+    @Test
+    public void assertDagJobDependenciesCheck() {
+        liteJobFacade.dagJobDependenciesCheck();
+        verify(dagService, times(1)).checkJobDependenciesState();
+    }
 }
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ExecutionServiceTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ExecutionServiceTest.java
index defe681..67c1fa1 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ExecutionServiceTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ExecutionServiceTest.java
@@ -21,6 +21,7 @@ import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
 import org.apache.shardingsphere.elasticjob.api.listener.ShardingContexts;
 import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
 import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobRegistry;
+import org.apache.shardingsphere.elasticjob.lite.internal.state.JobStateEnum;
 import org.apache.shardingsphere.elasticjob.lite.internal.storage.JobNodeStorage;
 import org.apache.shardingsphere.elasticjob.lite.util.ReflectionUtils;
 import org.junit.After;
@@ -82,6 +83,9 @@ public final class ExecutionServiceTest {
         verify(jobNodeStorage).fillEphemeralJobNode("sharding/1/running", "");
         verify(jobNodeStorage).fillEphemeralJobNode("sharding/2/running", "");
         assertTrue(JobRegistry.getInstance().isJobRunning("test_job"));
+        verify(jobNodeStorage).createJobNodeIfNeeded("state/state", JobStateEnum.RUNNING);
+        verify(jobNodeStorage).createJobNodeIfNeeded("proc/succ");
+        verify(jobNodeStorage).createJobNodeIfNeeded("proc/fail");
     }
     
     @Test
@@ -104,7 +108,37 @@ public final class ExecutionServiceTest {
         verify(jobNodeStorage).removeJobNodeIfExisted("sharding/2/running");
         assertFalse(JobRegistry.getInstance().isJobRunning("test_job"));
     }
-    
+
+    private Map<Integer, String> itemErrorMessages() {
+        Map<Integer, String> items = new HashMap<>(2);
+        items.put(0, "Some error!");
+        items.put(1, "Some error!");
+        return items;
+    }
+
+    @Test
+    public void assertRegisterJobCompletedWithItemWithoutMonitorExecution() {
+        JobRegistry.getInstance().setJobRunning("test_job", true);
+        when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 3).cron("0/1 * * * * ?").monitorExecution(false).build());
+        executionService.registerJobCompleted(new ShardingContexts("fake_task_id", "test_job", 10, "", Collections.emptyMap()), itemErrorMessages());
+        verify(jobNodeStorage, times(0)).removeJobNodeIfExisted(any());
+        verify(jobNodeStorage, times(0)).createJobNodeIfNeeded(any());
+        assertFalse(JobRegistry.getInstance().isJobRunning("test_job"));
+    }
+
+    @Test
+    public void assertRegisterJobCompletedWithItemWithMonitorExecution() {
+        JobRegistry.getInstance().setJobRunning("test_job", true);
+        when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 3).cron("0/1 * * * * ?").monitorExecution(true).build());
+        executionService.registerJobCompleted(getShardingContext(), itemErrorMessages());
+        verify(jobNodeStorage).removeJobNodeIfExisted("sharding/0/running");
+        verify(jobNodeStorage).removeJobNodeIfExisted("sharding/1/running");
+        verify(jobNodeStorage).removeJobNodeIfExisted("sharding/2/running");
+        assertFalse(JobRegistry.getInstance().isJobRunning("test_job"));
+        verify(jobNodeStorage).createJobNodeIfNeeded("proc/fail/0");
+        verify(jobNodeStorage).createJobNodeIfNeeded("proc/fail/1");
+    }
+
     @Test
     public void assertClearAllRunningInfo() {
         when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 3).cron("0/1 * * * * ?").monitorExecution(false).build());
diff --git a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/job/ref/RefFooSimpleElasticJob.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/state/JobStateEnumTest.java
similarity index 50%
copy from elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/job/ref/RefFooSimpleElasticJob.java
copy to elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/state/JobStateEnumTest.java
index 157d33d..53c27b5 100644
--- a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/job/ref/RefFooSimpleElasticJob.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/state/JobStateEnumTest.java
@@ -7,7 +7,7 @@
  * the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,33 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.elasticjob.lite.spring.namespace.fixture.job.ref;
+package org.apache.shardingsphere.elasticjob.lite.internal.state;
 
-import lombok.Getter;
-import lombok.Setter;
-import org.apache.shardingsphere.elasticjob.api.ShardingContext;
-import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
-import org.apache.shardingsphere.elasticjob.lite.spring.namespace.fixture.service.FooService;
+import org.junit.Test;
 
-public class RefFooSimpleElasticJob implements SimpleJob {
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
 
-    @Getter
-    private static volatile boolean completed;
-    
-    @Getter
-    @Setter
-    private FooService fooService;
-    
-    @Override
-    public void execute(final ShardingContext shardingContext) {
-        fooService.foo();
-        completed = true;
-    }
-    
-    /**
-     * Set completed to false.
-     */
-    public static void reset() {
-        completed = false;
+public class JobStateEnumTest {
+
+    @Test
+    public void assertEnumOf() {
+        assertThat(JobStateEnum.of("none"), is(JobStateEnum.NONE));
+        assertThat(JobStateEnum.of("running"), is(JobStateEnum.RUNNING));
+        assertThat(JobStateEnum.of("success"), is(JobStateEnum.SUCCESS));
+        assertThat(JobStateEnum.of("skip"), is(JobStateEnum.SKIP));
+        assertThat(JobStateEnum.of("fail"), is(JobStateEnum.FAIL));
     }
 }
diff --git a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/job/ref/RefFooSimpleElasticJob.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/state/JobStateNodeTest.java
similarity index 51%
copy from elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/job/ref/RefFooSimpleElasticJob.java
copy to elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/state/JobStateNodeTest.java
index 157d33d..37961ac 100644
--- a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/job/ref/RefFooSimpleElasticJob.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/state/JobStateNodeTest.java
@@ -7,7 +7,7 @@
  * the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,33 +15,32 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.elasticjob.lite.spring.namespace.fixture.job.ref;
-
-import lombok.Getter;
-import lombok.Setter;
-import org.apache.shardingsphere.elasticjob.api.ShardingContext;
-import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
-import org.apache.shardingsphere.elasticjob.lite.spring.namespace.fixture.service.FooService;
-
-public class RefFooSimpleElasticJob implements SimpleJob {
-
-    @Getter
-    private static volatile boolean completed;
-    
-    @Getter
-    @Setter
-    private FooService fooService;
-    
-    @Override
-    public void execute(final ShardingContext shardingContext) {
-        fooService.foo();
-        completed = true;
+package org.apache.shardingsphere.elasticjob.lite.internal.state;
+
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+public class JobStateNodeTest {
+
+    @Test
+    public void assertGetRootState() {
+        assertThat(JobStateNode.getRootState(), is("state/state"));
     }
-    
-    /**
-     * Set completed to false.
-     */
-    public static void reset() {
-        completed = false;
+
+    @Test
+    public void assertGetRootProc() {
+        assertThat(JobStateNode.getRootProc(), is("proc"));
+    }
+
+    @Test
+    public void assertGetProcFail() {
+        assertThat(JobStateNode.getProcFail(3), is("proc/fail/3"));
+    }
+
+    @Test
+    public void assertGetProcSucc() {
+        assertThat(JobStateNode.getProcSucc(2), is("proc/succ/2"));
     }
 }
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/resources/logback-test.xml b/elasticjob-lite/elasticjob-lite-core/src/test/resources/logback-test.xml
index 473355d..428a781 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/resources/logback-test.xml
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/resources/logback-test.xml
@@ -37,6 +37,9 @@
     </root>
     
     <logger name="org.apache.shardingsphere.elasticjob.lite.internal.snapshot.SnapshotService" level="OFF" />
+    <logger name="org.apache.shardingsphere.elasticjob.lite.internal.dag" level="debug" />
+    <logger name="org.apache.zookeeper.ClientCnxn" level="ERROR" />
     <logger name="org.apache.shardingsphere.elasticjob.infra.handler.error.impl.LogJobErrorHandler" level="OFF" />
     <logger name="org.apache.curator.framework.listen.MappingListenerManager" level="OFF" />
+
 </configuration>
diff --git a/elasticjob-lite/elasticjob-lite-lifecycle/src/main/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/api/DagOperateAPI.java b/elasticjob-lite/elasticjob-lite-lifecycle/src/main/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/api/DagOperateAPI.java
new file mode 100644
index 0000000..6cebf1b
--- /dev/null
+++ b/elasticjob-lite/elasticjob-lite-lifecycle/src/main/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/api/DagOperateAPI.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.lite.lifecycle.api;
+
+import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.DagBriefInfo;
+
+import java.util.List;
+
+/**
+ * Dag operate api.
+ *
+ **/
+public interface DagOperateAPI {
+    /**
+     * Pause the running dag, do not trigger next job.
+     *
+     * @param dagName dag name
+     * @return toggle success?
+     */
+    boolean toggleDagPause(String dagName);
+
+    /**
+     * Resume the pause dag.
+     *
+     * @param dagName dag name
+     * @return toggle success?
+     */
+    boolean toggleDagResume(String dagName);
+
+    /**
+     * Start the dag, trigger the first job.
+     *
+     * @param dagName dag name
+     * @return toggle success?
+     */
+    boolean toggleDagStart(String dagName);
+
+    /**
+     * Stop the pause dag, can not resume.
+     *
+     * @param dagName dag name
+     * @return toggle success?
+     */
+    boolean toggleDagStop(String dagName);
+
+    /**
+     * when the dag state fail, rerun the fail job .
+     *
+     * @param dagName dag name
+     * @return toggle success?
+     */
+    boolean toggleDagRerunWhenFail(String dagName);
+
+    /**
+     * query dag list.
+     *
+     * @return dag info list
+     */
+    List<DagBriefInfo> getDagList();
+
+    /**
+     * get dag's job and it's dependencies.
+     *
+     * @param dagName dag name
+     * @return dag info list
+     */
+    List<DagBriefInfo> getDagJobDependencies(String dagName);
+}
diff --git a/elasticjob-lite/elasticjob-lite-lifecycle/src/main/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/api/JobAPIFactory.java b/elasticjob-lite/elasticjob-lite-lifecycle/src/main/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/api/JobAPIFactory.java
index b1a842b..dfa6a29 100644
--- a/elasticjob-lite/elasticjob-lite-lifecycle/src/main/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/api/JobAPIFactory.java
+++ b/elasticjob-lite/elasticjob-lite-lifecycle/src/main/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/api/JobAPIFactory.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.elasticjob.lite.lifecycle.api;
 
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.elasticjob.lite.lifecycle.internal.operate.DagOperateAPIImpl;
 import org.apache.shardingsphere.elasticjob.lite.lifecycle.internal.operate.JobOperateAPIImpl;
 import org.apache.shardingsphere.elasticjob.lite.lifecycle.internal.operate.ShardingOperateAPIImpl;
 import org.apache.shardingsphere.elasticjob.lite.lifecycle.internal.reg.RegistryCenterFactory;
@@ -104,4 +105,16 @@ public final class JobAPIFactory {
     public static ShardingStatisticsAPI createShardingStatisticsAPI(final String connectString, final String namespace, final String digest) {
         return new ShardingStatisticsAPIImpl(RegistryCenterFactory.createCoordinatorRegistryCenter(connectString, namespace, digest));
     }
+
+    /**
+     * Create dag operate API.
+     *
+     * @param connectString registry center connect string
+     * @param namespace registry center namespace
+     * @param digest registry center digest
+     * @return job sharding statistics API
+     */
+    public static DagOperateAPI createDagOperateAPI(final String connectString, final String namespace, final String digest) {
+        return new DagOperateAPIImpl(RegistryCenterFactory.createCoordinatorRegistryCenter(connectString, namespace, digest));
+    }
 }
diff --git a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/job/ref/RefFooSimpleElasticJob.java b/elasticjob-lite/elasticjob-lite-lifecycle/src/main/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/domain/DagBriefInfo.java
similarity index 51%
copy from elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/job/ref/RefFooSimpleElasticJob.java
copy to elasticjob-lite/elasticjob-lite-lifecycle/src/main/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/domain/DagBriefInfo.java
index 157d33d..84a8ce4 100644
--- a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/job/ref/RefFooSimpleElasticJob.java
+++ b/elasticjob-lite/elasticjob-lite-lifecycle/src/main/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/domain/DagBriefInfo.java
@@ -7,7 +7,7 @@
  * the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,33 +15,31 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.elasticjob.lite.spring.namespace.fixture.job.ref;
+package org.apache.shardingsphere.elasticjob.lite.lifecycle.domain;
+
+import lombok.Data;
+import lombok.ToString;
 
-import lombok.Getter;
-import lombok.Setter;
-import org.apache.shardingsphere.elasticjob.api.ShardingContext;
-import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
-import org.apache.shardingsphere.elasticjob.lite.spring.namespace.fixture.service.FooService;
+/**
+ * Dag config domain.
+ *
+ **/
+@Data
+@ToString
+public class DagBriefInfo {
+    private String dagName;
 
-public class RefFooSimpleElasticJob implements SimpleJob {
+    private String jobName;
 
-    @Getter
-    private static volatile boolean completed;
-    
-    @Getter
-    @Setter
-    private FooService fooService;
-    
-    @Override
-    public void execute(final ShardingContext shardingContext) {
-        fooService.foo();
-        completed = true;
+    private String dependencies;
+
+    public DagBriefInfo(final String group) {
+        this.dagName = group;
     }
-    
-    /**
-     * Set completed to false.
-     */
-    public static void reset() {
-        completed = false;
+
+    public DagBriefInfo(final String dagName, final String jobName, final String dependencies) {
+        this.dagName = dagName;
+        this.jobName = jobName;
+        this.dependencies = dependencies;
     }
 }
diff --git a/elasticjob-lite/elasticjob-lite-lifecycle/src/main/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/operate/DagOperateAPIImpl.java b/elasticjob-lite/elasticjob-lite-lifecycle/src/main/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/operate/DagOperateAPIImpl.java
new file mode 100644
index 0000000..ba5ad22
--- /dev/null
+++ b/elasticjob-lite/elasticjob-lite-lifecycle/src/main/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/operate/DagOperateAPIImpl.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.lite.lifecycle.internal.operate;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
+import org.apache.shardingsphere.elasticjob.lite.internal.dag.DagJobStates;
+import org.apache.shardingsphere.elasticjob.lite.internal.dag.DagNodeStorage;
+import org.apache.shardingsphere.elasticjob.lite.internal.dag.DagService;
+import org.apache.shardingsphere.elasticjob.lite.internal.dag.DagStates;
+import org.apache.shardingsphere.elasticjob.lite.internal.storage.JobNodePath;
+import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.DagOperateAPI;
+import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.DagBriefInfo;
+import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Dag operate api implemention.
+ *
+ **/
+@Slf4j
+public class DagOperateAPIImpl implements DagOperateAPI {
+    private final CoordinatorRegistryCenter regCenter;
+
+    public DagOperateAPIImpl(final CoordinatorRegistryCenter regCenter) {
+        this.regCenter = regCenter;
+    }
+
+    @Override
+    public boolean toggleDagPause(final String dagName) {
+        DagNodeStorage dagNodeStorage = new DagNodeStorage(regCenter, dagName, "");
+        DagStates dagStates = DagStates.of(dagNodeStorage.currentDagStates());
+        if (dagStates != DagStates.RUNNING) {
+            log.error("Current dag-{} not RUNNING , Cannot turn to PAUSE", dagName);
+            return false;
+        }
+        dagNodeStorage.updateDagStates(DagStates.PAUSE);
+        return true;
+    }
+
+    @Override
+    public boolean toggleDagResume(final String dagName) {
+        DagNodeStorage dagNodeStorage = new DagNodeStorage(regCenter, dagName, "");
+        DagStates dagStates = DagStates.of(dagNodeStorage.currentDagStates());
+        if (dagStates != DagStates.PAUSE) {
+            log.error("Current dag-{} not PAUSE , Cannot turn to RUNNING", dagName);
+            return false;
+        }
+
+        // get next should trigger job.
+        DagService dagService = new DagService(regCenter, dagName, dagNodeStorage);
+        List<String> nextJobs = dagService.nextShouldTriggerJob();
+        dagNodeStorage.updateDagStates(DagStates.RUNNING);
+        nextJobs.forEach(job -> triggerJobManual(job));
+        return true;
+    }
+
+    @Override
+    public boolean toggleDagStart(final String dagName) {
+        DagNodeStorage dagNodeStorage = new DagNodeStorage(regCenter, dagName, "");
+        DagStates dagStates = DagStates.of(dagNodeStorage.currentDagStates());
+        if (dagStates == DagStates.RUNNING || dagStates == DagStates.PAUSE) {
+            log.error("Can not start DAG-{}, current state-{}", dagName, dagStates);
+            return false;
+        }
+        // trigger self
+        List<String> rootJobs = Lists.newArrayList();
+        Map<String, Set<String>> allDagConfigJobs = dagNodeStorage.getAllDagConfigJobs();
+
+        allDagConfigJobs.forEach((key, value) -> {
+            if (value.size() == 1 && value.contains(DagService.ROOT_JOB)) {
+                rootJobs.add(key);
+            }
+        });
+
+        log.info("Dag-{} start jobs [{}]", dagName, rootJobs);
+        rootJobs.forEach(job -> triggerJobManual(job));
+        return true;
+    }
+
+    @Override
+    public boolean toggleDagStop(final String dagName) {
+        DagNodeStorage dagNodeStorage = new DagNodeStorage(regCenter, dagName, "");
+        DagStates dagStates = DagStates.of(dagNodeStorage.currentDagStates());
+        if (dagStates != DagStates.PAUSE) {
+            log.error("Current dag-{} not PAUSE , Cannot turn to FAIL", dagName);
+            return false;
+        }
+        dagNodeStorage.updateDagStates(DagStates.FAIL);
+        return true;
+    }
+
+    @Override
+    public boolean toggleDagRerunWhenFail(final String dagName) {
+        DagNodeStorage dagNodeStorage = new DagNodeStorage(regCenter, dagName, "");
+        DagStates dagStates = DagStates.of(dagNodeStorage.currentDagStates());
+        if (dagStates != DagStates.FAIL) {
+            log.error("Current dag-{} not FAIL , Cannot rerun fail jobs", dagName);
+            return false;
+        }
+
+        // delete fail job
+        List<String> failJobs = dagNodeStorage.getDagJobListByState(DagJobStates.FAIL);
+        if (failJobs.isEmpty()) {
+            log.error("Dag-{} don't have fail jobs", dagName);
+            return false;
+        }
+        log.info("Dag-{} rerun fail jobs [{}]", dagName, failJobs);
+        failJobs.forEach(job -> dagNodeStorage.removeFailJob(job));
+        dagNodeStorage.updateDagStates(DagStates.RUNNING);
+        failJobs.forEach(job -> triggerJobManual(job));
+        return true;
+    }
+
+    @Override
+    public List<DagBriefInfo> getDagList() {
+        DagNodeStorage dagNodeStorage = new DagNodeStorage(regCenter, "", "");
+        List<String> allDags = dagNodeStorage.getAllDags();
+        List<DagBriefInfo> resList = Lists.newArrayList();
+        allDags.forEach(name -> resList.add(new DagBriefInfo(name)));
+        return resList;
+    }
+
+    @Override
+    public List<DagBriefInfo> getDagJobDependencies(final String dagName) {
+        DagNodeStorage dagNodeStorage = new DagNodeStorage(regCenter, dagName, "");
+        Map<String, Set<String>> allDagConfigJobs = dagNodeStorage.getAllDagConfigJobs();
+        List<DagBriefInfo> resList = Lists.newArrayList();
+        allDagConfigJobs.forEach((key, value) -> {
+            resList.add(new DagBriefInfo(dagName, key, Joiner.on(",").join(value)));
+        });
+        return resList;
+    }
+
+    private void triggerJobManual(final String job) {
+        CuratorTransactionFinal curatorTransactionFinal = null;
+
+        log.info("Trigger Dag job-{} manually.", job);
+        try {
+            JobNodePath jobNodePath = new JobNodePath(job);
+            CuratorFramework rawClient = (CuratorFramework) regCenter.getRawClient();
+            List<CuratorOp> opList = new ArrayList<>();
+            for (String each : regCenter.getChildrenKeys(jobNodePath.getInstancesNodePath())) {
+                opList.add(rawClient.transactionOp().setData().forPath(jobNodePath.getInstanceNodePath(each), "TRIGGER".getBytes()));
+            }
+            rawClient.transaction().forOperations(opList);
+            //CHECKSTYLE:OFF
+        } catch (final Exception exp) {
+            //CHECKSTYLE:ON
+            log.error("Trigger Dag job-{} by hand in transaction Exception!", job, exp);
+        }
+    }
+
+}
diff --git a/elasticjob-lite/elasticjob-lite-lifecycle/src/test/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/api/JobAPIFactoryTest.java b/elasticjob-lite/elasticjob-lite-lifecycle/src/test/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/api/JobAPIFactoryTest.java
index 63eea65..da9c7c1 100644
--- a/elasticjob-lite/elasticjob-lite-lifecycle/src/test/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/api/JobAPIFactoryTest.java
+++ b/elasticjob-lite/elasticjob-lite-lifecycle/src/test/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/api/JobAPIFactoryTest.java
@@ -54,4 +54,9 @@ public final class JobAPIFactoryTest extends AbstractEmbedZookeeperBaseTest {
     public void assertCreateShardingStatisticsAPI() {
         assertThat(JobAPIFactory.createShardingStatisticsAPI(getConnectionString(), "namespace", null), instanceOf(ShardingStatisticsAPI.class));
     }
+
+    @Test
+    public void assertCreateDagOperateAPI() {
+        assertThat(JobAPIFactory.createDagOperateAPI(getConnectionString(), "namespace", null), instanceOf(DagOperateAPI.class));
+    }
 }
diff --git a/elasticjob-lite/elasticjob-lite-lifecycle/src/test/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/fixture/LifecycleYamlConstants.java b/elasticjob-lite/elasticjob-lite-lifecycle/src/test/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/fixture/LifecycleYamlConstants.java
index b9e2db8..89933b8 100644
--- a/elasticjob-lite/elasticjob-lite-lifecycle/src/test/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/fixture/LifecycleYamlConstants.java
+++ b/elasticjob-lite/elasticjob-lite-lifecycle/src/test/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/fixture/LifecycleYamlConstants.java
@@ -37,6 +37,8 @@ public final class LifecycleYamlConstants {
             + "overwrite: false\n";
     
     private static final String DATAFLOW_JOB_YAML = "cron: 0/1 * * * * ?\n"
+            + "dagRunAlone: false\n"
+            + "dagSkipWhenFail: false\n"
             + "description: ''\n"
             + "disabled: false\n"
             + "failover: false\n"
@@ -49,6 +51,8 @@ public final class LifecycleYamlConstants {
             + "props:\n"
             + "  streaming.process: 'true'\n"
             + "reconcileIntervalMinutes: 10\n"
+            + "retryInterval: 0\n"
+            + "retryTimes: 0\n"
             + "shardingTotalCount: 3\n";
     
     private static final String SCRIPT_JOB_YAML = "jobName: test_job\n"
diff --git a/elasticjob-lite/elasticjob-lite-lifecycle/src/test/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/operate/DagOperateAPIImplTest.java b/elasticjob-lite/elasticjob-lite-lifecycle/src/test/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/operate/DagOperateAPIImplTest.java
new file mode 100644
index 0000000..f029ddf
--- /dev/null
+++ b/elasticjob-lite/elasticjob-lite-lifecycle/src/test/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/operate/DagOperateAPIImplTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.lite.lifecycle.internal.operate;
+
+import com.google.common.collect.Lists;
+import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.DagOperateAPI;
+import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.DagBriefInfo;
+import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DagOperateAPIImplTest {
+
+    @Mock
+    private CoordinatorRegistryCenter regCenter;
+
+    private DagOperateAPI dagOperateAPI;
+
+    @Before
+    public void setUp() {
+        dagOperateAPI = new DagOperateAPIImpl(regCenter);
+    }
+
+    @Test
+    public void toggleDagPause() {
+        when(regCenter.isExisted("/dag/testDag/states")).thenReturn(true);
+        when(regCenter.getDirectly("/dag/testDag/states")).thenReturn("running");
+        dagOperateAPI.toggleDagPause("testDag");
+        verify(regCenter).update("/dag/testDag/states", "pause");
+    }
+
+    @Test
+    public void toggleDagResume() {
+        when(regCenter.isExisted("/dag/testDag/states")).thenReturn(true);
+        when(regCenter.getDirectly("/dag/testDag/states")).thenReturn("pause");
+        dagOperateAPI.toggleDagResume("testDag");
+        verify(regCenter).update("/dag/testDag/states", "running");
+    }
+
+    @Test
+    public void toggleDagStart() {
+        when(regCenter.isExisted("/dag/testDag/states")).thenReturn(true);
+        when(regCenter.getDirectly("/dag/testDag/states")).thenReturn("pause");
+        assertFalse(dagOperateAPI.toggleDagStart("testDag"));
+    }
+
+    @Test
+    public void toggleDagStop() {
+        when(regCenter.isExisted("/dag/testDag/states")).thenReturn(true);
+        when(regCenter.getDirectly("/dag/testDag/states")).thenReturn("pause");
+        dagOperateAPI.toggleDagStop("testDag");
+        verify(regCenter).update("/dag/testDag/states", "fail");
+    }
+
+    @Test
+    public void toggleDagRerunWhenFail() {
+        when(regCenter.isExisted("/dag/testDag/states")).thenReturn(true);
+        when(regCenter.getDirectly("/dag/testDag/states")).thenReturn("pause");
+        assertFalse(dagOperateAPI.toggleDagRerunWhenFail("testDag"));
+    }
+
+    @Test
+    public void getDagList() {
+        when(regCenter.getChildrenKeys("/dag")).thenReturn(Lists.newArrayList("d1", "d2"));
+        List<DagBriefInfo> dagList = dagOperateAPI.getDagList();
+        assertThat(dagList.size(), is(2));
+    }
+
+    @Test
+    public void getDagJobDependencies() {
+        when(regCenter.getChildrenKeys("/dag/testDag/config")).thenReturn(Lists.newArrayList("job1", "job2", "job3"));
+        when(regCenter.getDirectly("/dag/testDag/config/job1")).thenReturn("self");
+        when(regCenter.getDirectly("/dag/testDag/config/job2")).thenReturn("self,job1");
+        when(regCenter.getDirectly("/dag/testDag/config/job3")).thenReturn("job1,job2");
+        List<DagBriefInfo> testDag = dagOperateAPI.getDagJobDependencies("testDag");
+        assertThat(testDag.size(), is(3));
+    }
+}
diff --git a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-boot-starter/src/main/java/org/apache/shardingsphere/elasticjob/lite/spring/boot/job/ElasticJobConfigurationProperties.java b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-boot-starter/src/main/java/org/apache/shardingsphere/elasticjob/lite/spring/boot/job/ElasticJobConfigurationProperties.java
index ba9348d..5d9095a 100644
--- a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-boot-starter/src/main/java/org/apache/shardingsphere/elasticjob/lite/spring/boot/job/ElasticJobConfigurationProperties.java
+++ b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-boot-starter/src/main/java/org/apache/shardingsphere/elasticjob/lite/spring/boot/job/ElasticJobConfigurationProperties.java
@@ -19,8 +19,10 @@ package org.apache.shardingsphere.elasticjob.lite.spring.boot.job;
 
 import lombok.Getter;
 import lombok.Setter;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.shardingsphere.elasticjob.api.ElasticJob;
 import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
+import org.apache.shardingsphere.elasticjob.api.JobDagConfiguration;
 
 import java.util.Properties;
 
@@ -66,6 +68,18 @@ public class ElasticJobConfigurationProperties {
 
     private boolean overwrite;
 
+    private String dagName;
+
+    private String dagDependencies;
+
+    private int dagRetryTimes;
+
+    private int dagRetryInterval;
+
+    private boolean dagRunAlone;
+
+    private boolean dagSkipWhenFail;
+
     /**
      * Convert to job configuration.
      *
@@ -73,12 +87,16 @@ public class ElasticJobConfigurationProperties {
      * @return job configuration
      */
     public JobConfiguration toJobConfiguration(final String jobName) {
+        JobDagConfiguration jobDagConfiguration = null;
+        if (StringUtils.isNotEmpty(dagName)) {
+            jobDagConfiguration = new JobDagConfiguration(dagName, dagDependencies, dagRetryTimes, dagRetryInterval, dagRunAlone, dagSkipWhenFail);
+        }
         JobConfiguration result = JobConfiguration.newBuilder(jobName, shardingTotalCount)
                 .cron(cron).shardingItemParameters(shardingItemParameters).jobParameter(jobParameter)
                 .monitorExecution(monitorExecution).failover(failover).misfire(misfire)
                 .maxTimeDiffSeconds(maxTimeDiffSeconds).reconcileIntervalMinutes(reconcileIntervalMinutes)
                 .jobShardingStrategyType(jobShardingStrategyType).jobExecutorServiceHandlerType(jobExecutorServiceHandlerType).jobErrorHandlerType(jobErrorHandlerType)
-                .description(description).disabled(disabled).overwrite(overwrite).build();
+                .description(description).disabled(disabled).overwrite(overwrite).jobDagConfiguration(jobDagConfiguration).build();
         for (Object each : props.keySet()) {
             result.getProps().setProperty(each.toString(), props.get(each.toString()).toString());
         }
diff --git a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-boot-starter/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/boot/job/ElasticJobSpringBootTest.java b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-boot-starter/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/boot/job/ElasticJobSpringBootTest.java
index 3959198..7867359 100644
--- a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-boot-starter/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/boot/job/ElasticJobSpringBootTest.java
+++ b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-boot-starter/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/boot/job/ElasticJobSpringBootTest.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.elasticjob.lite.spring.boot.job;
 
 import org.apache.shardingsphere.elasticjob.api.ElasticJob;
+import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
 import org.apache.shardingsphere.elasticjob.infra.concurrent.BlockUtils;
 import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.JobBootstrap;
 import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
@@ -37,6 +38,7 @@ import java.sql.SQLException;
 import java.util.Map;
 
 import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
@@ -96,4 +98,19 @@ public class ElasticJobSpringBootTest extends AbstractJUnit4SpringContextTests {
         assertNotNull(applicationContext.getBean("customTestJobBean", OneOffJobBootstrap.class));
         assertNotNull(applicationContext.getBean("printTestJobBean", OneOffJobBootstrap.class));
     }
+
+    @Test
+    public void assertJobDagConfiguration() {
+        assertNotNull(applicationContext);
+        ElasticJobProperties bean = applicationContext.getBean(ElasticJobProperties.class);
+        assertNotNull(bean);
+        assertNotNull(bean.getJobs());
+        Map<String, ElasticJobConfigurationProperties> jobs = bean.getJobs();
+        jobs.forEach((key, value) -> {
+            JobConfiguration job = value.toJobConfiguration("name");
+            assertNotNull(job);
+            assertNotNull(job.getJobDagConfiguration());
+            assertEquals(job.getJobDagConfiguration().getDagName(), "dagA");
+        });
+    }
 }
diff --git a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-boot-starter/src/test/resources/application-elasticjob.yml b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-boot-starter/src/test/resources/application-elasticjob.yml
index fccac3f..ff4a499 100644
--- a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-boot-starter/src/test/resources/application-elasticjob.yml
+++ b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-boot-starter/src/test/resources/application-elasticjob.yml
@@ -33,9 +33,13 @@ elasticjob:
       elasticJobClass: org.apache.shardingsphere.elasticjob.lite.spring.boot.job.fixture.job.impl.CustomTestJob
       jobBootstrapBeanName: customTestJobBean
       shardingTotalCount: 3
+      dagName: dagA
+      dagDependencies: job1,printTestJob
     printTestJob:
       elasticJobType: PRINT
       jobBootstrapBeanName: printTestJobBean
       shardingTotalCount: 3
       props:
         print.content: "test print job"
+      dagName: dagA
+      dagDependencies: job1,customTestJob
diff --git a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/main/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/job/parser/JobBeanDefinitionParser.java b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/main/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/job/parser/JobBeanDefinitionParser.java
index 5d9da40..3245d77 100644
--- a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/main/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/job/parser/JobBeanDefinitionParser.java
+++ b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/main/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/job/parser/JobBeanDefinitionParser.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.elasticjob.lite.spring.namespace.job.parser;
 
 import com.google.common.base.Strings;
+import org.apache.shardingsphere.elasticjob.api.JobDagConfiguration;
 import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
 import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
 import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
@@ -29,6 +30,7 @@ import org.springframework.beans.factory.support.BeanDefinitionBuilder;
 import org.springframework.beans.factory.support.ManagedList;
 import org.springframework.beans.factory.xml.AbstractBeanDefinitionParser;
 import org.springframework.beans.factory.xml.ParserContext;
+import org.springframework.util.StringUtils;
 import org.springframework.util.xml.DomUtils;
 import org.w3c.dom.Element;
 
@@ -84,6 +86,7 @@ public final class JobBeanDefinitionParser extends AbstractBeanDefinitionParser
         result.addConstructorArgValue(parsePropsElement(element, parserContext));
         result.addConstructorArgValue(element.getAttribute(JobBeanDefinitionTag.DISABLED_ATTRIBUTE));
         result.addConstructorArgValue(element.getAttribute(JobBeanDefinitionTag.OVERWRITE_ATTRIBUTE));
+        result.addConstructorArgValue(createJobDagConfig(element));
         return result.getBeanDefinition();
     }
     
@@ -110,4 +113,18 @@ public final class JobBeanDefinitionParser extends AbstractBeanDefinitionParser
         }
         return result;
     }
+
+    private BeanDefinition createJobDagConfig(final Element element) {
+        if (StringUtils.isEmpty(element.getAttribute(JobBeanDefinitionTag.DAG_NAME))) {
+            return null;
+        }
+        BeanDefinitionBuilder jobDagConfig = BeanDefinitionBuilder.rootBeanDefinition(JobDagConfiguration.class);
+        jobDagConfig.addConstructorArgValue(element.getAttribute(JobBeanDefinitionTag.DAG_NAME));
+        jobDagConfig.addConstructorArgValue(element.getAttribute(JobBeanDefinitionTag.DAG_DEPENDENCIES));
+        jobDagConfig.addConstructorArgValue(element.getAttribute(JobBeanDefinitionTag.DAG_RETRY_TIMES));
+        jobDagConfig.addConstructorArgValue(element.getAttribute(JobBeanDefinitionTag.DAG_RETRY_INTERVAL));
+        jobDagConfig.addConstructorArgValue(element.getAttribute(JobBeanDefinitionTag.DAG_RUN_ALONE));
+        jobDagConfig.addConstructorArgValue(element.getAttribute(JobBeanDefinitionTag.DAG_SKIP_WHEN_FAIL));
+        return jobDagConfig.getBeanDefinition();
+    }
 }
diff --git a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/main/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/job/tag/JobBeanDefinitionTag.java b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/main/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/job/tag/JobBeanDefinitionTag.java
index 96cc6d2..de25a78 100644
--- a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/main/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/job/tag/JobBeanDefinitionTag.java
+++ b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/main/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/job/tag/JobBeanDefinitionTag.java
@@ -65,4 +65,16 @@ public final class JobBeanDefinitionTag {
     public static final String DISABLED_ATTRIBUTE = "disabled";
     
     public static final String OVERWRITE_ATTRIBUTE = "overwrite";
+
+    public static final String DAG_NAME = "dag-name";
+
+    public static final String DAG_DEPENDENCIES = "dag-dependencies";
+
+    public static final String DAG_RETRY_TIMES = "dag-retry-times";
+
+    public static final String DAG_RETRY_INTERVAL = "dag-retry-interval";
+
+    public static final String DAG_RUN_ALONE = "dag-run-alone";
+
+    public static final String DAG_SKIP_WHEN_FAIL = "dag-skip-when-fail";
 }
diff --git a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/main/resources/META-INF/namespace/elasticjob.xsd b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/main/resources/META-INF/namespace/elasticjob.xsd
index d42c23b..ae8b9c9 100644
--- a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/main/resources/META-INF/namespace/elasticjob.xsd
+++ b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/main/resources/META-INF/namespace/elasticjob.xsd
@@ -65,6 +65,12 @@
                     <xsd:attribute name="description" type="xsd:string" />
                     <xsd:attribute name="disabled" type="xsd:string" default="false" />
                     <xsd:attribute name="overwrite" type="xsd:string" default="false" />
+                    <xsd:attribute name="dag-name" type="xsd:string" />
+                    <xsd:attribute name="dag-dependencies" type="xsd:string" />
+                    <xsd:attribute name="dag-retry-times" type="xsd:string" default="0" />
+                    <xsd:attribute name="dag-retry-interval" type="xsd:string" default="0" />
+                    <xsd:attribute name="dag-run-alone" type="xsd:string" default="false" />
+                    <xsd:attribute name="dag-skip-when-fail" type="xsd:string" default="false" />
                 </xsd:extension>
             </xsd:complexContent>
         </xsd:complexType>
diff --git a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/job/ref/RefFooSimpleElasticJob.java b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/job/ref/RefFooSimpleElasticJob.java
index 157d33d..bc9190d 100644
--- a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/job/ref/RefFooSimpleElasticJob.java
+++ b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/job/ref/RefFooSimpleElasticJob.java
@@ -23,10 +23,15 @@ import org.apache.shardingsphere.elasticjob.api.ShardingContext;
 import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
 import org.apache.shardingsphere.elasticjob.lite.spring.namespace.fixture.service.FooService;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
 public class RefFooSimpleElasticJob implements SimpleJob {
 
     @Getter
     private static volatile boolean completed;
+
+    @Getter
+    private static volatile AtomicInteger times = new AtomicInteger(0);
     
     @Getter
     @Setter
@@ -36,6 +41,7 @@ public class RefFooSimpleElasticJob implements SimpleJob {
     public void execute(final ShardingContext shardingContext) {
         fooService.foo();
         completed = true;
+        times.getAndIncrement();
     }
     
     /**
diff --git a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/job/JobSpringNamespaceWithRefDagTest.java b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/job/JobSpringNamespaceWithRefDagTest.java
new file mode 100644
index 0000000..c21a32d
--- /dev/null
+++ b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/job/JobSpringNamespaceWithRefDagTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.lite.spring.namespace.job;
+
+import org.apache.shardingsphere.elasticjob.infra.concurrent.BlockUtils;
+import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobRegistry;
+import org.apache.shardingsphere.elasticjob.lite.spring.namespace.fixture.job.ref.RefFooSimpleElasticJob;
+import org.apache.shardingsphere.elasticjob.lite.spring.namespace.test.AbstractZookeeperJUnit4SpringContextTests;
+import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.test.context.ContextConfiguration;
+
+import javax.annotation.Resource;
+
+import static org.junit.Assert.assertTrue;
+
+@ContextConfiguration(locations = "classpath:META-INF/job/withJobRefDag.xml")
+public final class JobSpringNamespaceWithRefDagTest extends AbstractZookeeperJUnit4SpringContextTests {
+    
+    private final String rootJob = "simpleElasticJob_job_ref_root";
+
+    private final String job2 = "simpleElasticJob_job_ref_job2";
+
+    @Resource
+    private CoordinatorRegistryCenter regCenter;
+    
+    @Before
+    @After
+    public void reset() {
+        RefFooSimpleElasticJob.reset();
+    }
+    
+    @After
+    public void tearDown() {
+        JobRegistry.getInstance().shutdown(rootJob);
+    }
+    
+    @Test
+    public void assertSpringJobBean() {
+        assertSimpleElasticJobDagBean();
+    }
+
+    private void assertSimpleElasticJobDagBean() {
+        while (RefFooSimpleElasticJob.getTimes().intValue() < 9) {
+            BlockUtils.sleep(300L);
+        }
+        BlockUtils.sleep(300L);
+        assertTrue(RefFooSimpleElasticJob.isCompleted());
+    }
+}
diff --git a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/resources/META-INF/job/withJobRefDag.xml b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/resources/META-INF/job/withJobRefDag.xml
new file mode 100644
index 0000000..f6d2c24
--- /dev/null
+++ b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/resources/META-INF/job/withJobRefDag.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xmlns:elasticjob="http://shardingsphere.apache.org/schema/elasticjob"
+       xsi:schemaLocation="http://www.springframework.org/schema/beans 
+                           http://www.springframework.org/schema/beans/spring-beans.xsd
+                           http://shardingsphere.apache.org/schema/elasticjob
+                           http://shardingsphere.apache.org/schema/elasticjob/elasticjob.xsd
+                           ">
+    <import resource="base.xml"/>
+    
+    <bean id="refSimpleJobRoot" class="org.apache.shardingsphere.elasticjob.lite.spring.namespace.fixture.job.ref.RefFooSimpleElasticJob">
+        <property name="fooService" ref="foo" />
+    </bean>
+    <bean id="refSimpleJobJob1" class="org.apache.shardingsphere.elasticjob.lite.spring.namespace.fixture.job.ref.RefFooSimpleElasticJob">
+        <property name="fooService" ref="foo" />
+    </bean>
+    <bean id="refSimpleJobJob2" class="org.apache.shardingsphere.elasticjob.lite.spring.namespace.fixture.job.ref.RefFooSimpleElasticJob">
+        <property name="fooService" ref="foo" />
+    </bean>
+    
+    <elasticjob:job id="simpleElasticJob_job_ref_root" job-ref="refSimpleJobRoot" registry-center-ref="regCenter"
+             cron="${simpelJobDagRoot.cron}" sharding-total-count="${simpleJob.shardingTotalCount}" sharding-item-parameters="${simpleJob.shardingItemParameters}"
+             disabled="${simpleJob.disabled}" overwrite="${simpleJob.overwrite}" job-executor-service-handler-type="SINGLE_THREAD" dag-name="fooDag" dag-dependencies="self" />
+
+    <elasticjob:job id="simpleElasticJob_job_ref_job1" job-ref="refSimpleJobJob1" registry-center-ref="regCenter"
+                    cron="${simpelJobDagDep.cron}" sharding-total-count="${simpleJob.shardingTotalCount}" sharding-item-parameters="${simpleJob.shardingItemParameters}"
+                    disabled="${simpleJob.disabled}" overwrite="${simpleJob.overwrite}" job-executor-service-handler-type="SINGLE_THREAD" dag-name="fooDag" dag-dependencies="simpleElasticJob_job_ref_root" />
+
+    <elasticjob:job id="simpleElasticJob_job_ref_job2" job-ref="refSimpleJobJob2" registry-center-ref="regCenter"
+                    cron="${simpelJobDagDep.cron}" sharding-total-count="${simpleJob.shardingTotalCount}" sharding-item-parameters="${simpleJob.shardingItemParameters}"
+                    disabled="${simpleJob.disabled}" overwrite="${simpleJob.overwrite}" job-executor-service-handler-type="SINGLE_THREAD" dag-name="fooDag" dag-dependencies=" simpleElasticJob_job_ref_root , simpleElasticJob_job_ref_job1"  />
+</beans>
diff --git a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/resources/conf/job/conf.properties b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/resources/conf/job/conf.properties
index 18714ea..9829492 100644
--- a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/resources/conf/job/conf.properties
+++ b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/resources/conf/job/conf.properties
@@ -41,3 +41,7 @@ dataflowJob.streamingProcess=true
 # need absolute path
 #script.scriptCommandLine=your_path/elasticjob-lite/elasticjob-lite-spring/src/test/resources/script/demo.sh
 script.scriptCommandLine=echo test
+
+# for dag job test
+simpelJobDagRoot.cron=0/59 * * * * ?
+simpelJobDagDep.cron=0 * * 1 1 ? 2099