You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/08/16 14:41:19 UTC
[shardingsphere-elasticjob] branch dag updated: add dag (#1376)
This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch dag
in repository https://gitbox.apache.org/repos/asf/shardingsphere-elasticjob.git
The following commit(s) were added to refs/heads/dag by this push:
new cd324fa add dag (#1376)
cd324fa is described below
commit cd324fade73ff8aba4a1695e4eba45c6a7d8df53
Author: coodajingang <11...@users.noreply.github.com>
AuthorDate: Sun Aug 16 22:41:05 2020 +0800
add dag (#1376)
* Added dag
* modify ci error
* modify ci error
* Add unit test
* Add licenses
* Add licenses
Co-authored-by: dutengxiao <du...@aibank.com>
---
.../elasticjob/api/JobConfiguration.java | 29 +-
.../elasticjob/api/JobDagConfiguration.java | 64 +--
.../elasticjob/api/JobConfigurationTest.java | 28 ++
.../elasticjob/cloud/facade/CloudJobFacade.java | 23 +-
.../cloud/executor/facade/CloudJobFacadeTest.java | 15 +
.../elasticjob/executor/ElasticJobExecutor.java | 16 +-
.../elasticjob/executor/JobFacade.java | 28 +-
.../executor/ElasticJobExecutorTest.java | 41 +-
.../infra/exception/DagRuntimeException.java | 39 +-
.../infra/pojo/JobConfigurationPOJO.java | 29 +-
.../infra/exception/DagRuntimeExceptionTest.java | 40 +-
.../infra/pojo/JobConfigurationPOJOTest.java | 69 ++-
.../tracing/event/DagJobExecutionEvent.java | 90 ++++
.../tracing/listener/TracingListener.java | 10 +
.../tracing/fixture/TestTracingListener.java | 9 +-
.../tracing/rdb/listener/RDBTracingListener.java | 6 +
.../tracing/rdb/storage/RDBJobEventStorage.java | 53 +-
.../tracing/rdb/storage/RDBStorageSQLMapper.java | 6 +
.../src/main/resources/META-INF/sql/DB2.properties | 3 +
.../src/main/resources/META-INF/sql/H2.properties | 3 +
.../main/resources/META-INF/sql/MySQL.properties | 3 +
.../main/resources/META-INF/sql/Oracle.properties | 3 +
.../resources/META-INF/sql/PostgreSQL.properties | 3 +
.../main/resources/META-INF/sql/SQL92.properties | 3 +
.../resources/META-INF/sql/SQLServer.properties | 3 +
.../rdb/listener/RDBTracingListenerTest.java | 8 +
.../rdb/storage/RDBJobEventStorageTest.java | 9 +-
.../elasticjob/lite/internal/dag/DagJobStates.java | 73 +++
.../lite/internal/dag/DagNodeStorage.java | 499 ++++++++++++++++++
.../elasticjob/lite/internal/dag/DagService.java | 557 +++++++++++++++++++++
.../elasticjob/lite/internal/dag/DagStates.java | 60 ++-
.../lite/internal/dag/JobRetryTrigger.java | 78 +++
.../lite/internal/schedule/JobScheduler.java | 8 +-
.../lite/internal/schedule/LiteJobFacade.java | 65 ++-
.../lite/internal/sharding/ExecutionService.java | 57 ++-
.../lite/internal/state/JobStateEnum.java | 67 +++
.../lite/internal/state/JobStateNode.java | 102 ++++
.../lite/internal/storage/JobNodeStorage.java | 14 +-
.../lite/internal/dag/DagJobRetryTriggerTest.java | 57 +++
.../lite/internal/dag/DagNodeStorageTest.java | 233 +++++++++
.../lite/internal/dag/DagServiceTest.java | 123 +++++
.../lite/internal/dag/JobDagConfigTest.java | 95 ++++
.../lite/internal/schedule/LiteJobFacadeTest.java | 25 +
.../internal/sharding/ExecutionServiceTest.java | 36 +-
.../lite/internal/state/JobStateEnumTest.java} | 40 +-
.../lite/internal/state/JobStateNodeTest.java} | 55 +-
.../src/test/resources/logback-test.xml | 3 +
.../lite/lifecycle/api/DagOperateAPI.java | 83 +++
.../lite/lifecycle/api/JobAPIFactory.java | 13 +
.../lite/lifecycle/domain/DagBriefInfo.java} | 48 +-
.../internal/operate/DagOperateAPIImpl.java | 177 +++++++
.../lite/lifecycle/api/JobAPIFactoryTest.java | 5 +
.../lifecycle/fixture/LifecycleYamlConstants.java | 4 +
.../internal/operate/DagOperateAPIImplTest.java | 105 ++++
.../job/ElasticJobConfigurationProperties.java | 20 +-
.../spring/boot/job/ElasticJobSpringBootTest.java | 17 +
.../src/test/resources/application-elasticjob.yml | 4 +
.../job/parser/JobBeanDefinitionParser.java | 17 +
.../namespace/job/tag/JobBeanDefinitionTag.java | 12 +
.../resources/META-INF/namespace/elasticjob.xsd | 6 +
.../fixture/job/ref/RefFooSimpleElasticJob.java | 6 +
.../job/JobSpringNamespaceWithRefDagTest.java | 67 +++
.../test/resources/META-INF/job/withJobRefDag.xml | 50 ++
.../src/test/resources/conf/job/conf.properties | 4 +
64 files changed, 3308 insertions(+), 210 deletions(-)
diff --git a/elasticjob-api/src/main/java/org/apache/shardingsphere/elasticjob/api/JobConfiguration.java b/elasticjob-api/src/main/java/org/apache/shardingsphere/elasticjob/api/JobConfiguration.java
index a567d02..5ca2347 100644
--- a/elasticjob-api/src/main/java/org/apache/shardingsphere/elasticjob/api/JobConfiguration.java
+++ b/elasticjob-api/src/main/java/org/apache/shardingsphere/elasticjob/api/JobConfiguration.java
@@ -66,7 +66,9 @@ public final class JobConfiguration {
private final boolean disabled;
private final boolean overwrite;
-
+
+ private final JobDagConfiguration jobDagConfiguration;
+
/**
* Create ElasticJob configuration builder.
*
@@ -114,7 +116,9 @@ public final class JobConfiguration {
private boolean disabled;
private boolean overwrite;
-
+
+ private JobDagConfiguration jobDagConfiguration;
+
/**
* Cron expression.
*
@@ -331,7 +335,18 @@ public final class JobConfiguration {
this.overwrite = overwrite;
return this;
}
-
+
+ /**
+ * Set Dag configuration.
+ *
+ * @param jobDagConfiguration dag configuration
+ * @return ElasticJob configuration builder
+ */
+ public Builder jobDagConfiguration(final JobDagConfiguration jobDagConfiguration) {
+ this.jobDagConfiguration = jobDagConfiguration;
+ return this;
+ }
+
/**
* Build ElasticJob configuration.
*
@@ -340,9 +355,15 @@ public final class JobConfiguration {
public final JobConfiguration build() {
Preconditions.checkArgument(!Strings.isNullOrEmpty(jobName), "jobName can not be empty.");
Preconditions.checkArgument(shardingTotalCount > 0, "shardingTotalCount should larger than zero.");
+
+ if (null != jobDagConfiguration) {
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(jobDagConfiguration.getDagName()), "dagName can not be empty When use DAG");
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(jobDagConfiguration.getDagDependencies()), "dagDependencies can not be empty When use DAG");
+ }
+
return new JobConfiguration(jobName, cron, shardingTotalCount, shardingItemParameters, jobParameter,
monitorExecution, failover, misfire, maxTimeDiffSeconds, reconcileIntervalMinutes,
- jobShardingStrategyType, jobExecutorServiceHandlerType, jobErrorHandlerType, description, props, disabled, overwrite);
+ jobShardingStrategyType, jobExecutorServiceHandlerType, jobErrorHandlerType, description, props, disabled, overwrite, jobDagConfiguration);
}
}
}
diff --git a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/job/ref/RefFooSimpleElasticJob.java b/elasticjob-api/src/main/java/org/apache/shardingsphere/elasticjob/api/JobDagConfiguration.java
similarity index 50%
copy from elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/job/ref/RefFooSimpleElasticJob.java
copy to elasticjob-api/src/main/java/org/apache/shardingsphere/elasticjob/api/JobDagConfiguration.java
index 157d33d..5f31600 100644
--- a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/job/ref/RefFooSimpleElasticJob.java
+++ b/elasticjob-api/src/main/java/org/apache/shardingsphere/elasticjob/api/JobDagConfiguration.java
@@ -7,7 +7,7 @@
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,33 +15,37 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.elasticjob.lite.spring.namespace.fixture.job.ref;
-
-import lombok.Getter;
-import lombok.Setter;
-import org.apache.shardingsphere.elasticjob.api.ShardingContext;
-import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
-import org.apache.shardingsphere.elasticjob.lite.spring.namespace.fixture.service.FooService;
-
-public class RefFooSimpleElasticJob implements SimpleJob {
-
- @Getter
- private static volatile boolean completed;
-
- @Getter
- @Setter
- private FooService fooService;
-
- @Override
- public void execute(final ShardingContext shardingContext) {
- fooService.foo();
- completed = true;
- }
-
- /**
- * Set completed to false.
- */
- public static void reset() {
- completed = false;
- }
+package org.apache.shardingsphere.elasticjob.api;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.ToString;
+
+/**
+ * Job Dag configuration.
+ *
+ **/
+@AllArgsConstructor
+@NoArgsConstructor
+@Data
+@ToString
+public class JobDagConfiguration {
+ /** DAG name. */
+ private String dagName;
+
+ /** DAG dependencies. */
+ private String dagDependencies;
+
+ /** DAG retry times. */
+ private int retryTimes;
+
+ /** DAG retry interval. */
+ private int retryInterval;
+
+ /** Is dag job can run alone. */
+ private boolean dagRunAlone;
+
+ /** Dag job can skip when fail. */
+ private boolean dagSkipWhenFail;
}
diff --git a/elasticjob-api/src/test/java/org/apache/shardingsphere/elasticjob/api/JobConfigurationTest.java b/elasticjob-api/src/test/java/org/apache/shardingsphere/elasticjob/api/JobConfigurationTest.java
index a1e85ef..9056aa1 100644
--- a/elasticjob-api/src/test/java/org/apache/shardingsphere/elasticjob/api/JobConfigurationTest.java
+++ b/elasticjob-api/src/test/java/org/apache/shardingsphere/elasticjob/api/JobConfigurationTest.java
@@ -20,7 +20,9 @@ package org.apache.shardingsphere.elasticjob.api;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@@ -87,4 +89,30 @@ public final class JobConfigurationTest {
public void assertBuildWithInvalidShardingTotalCount() {
JobConfiguration.newBuilder("test_job", -1).cron("0/1 * * * * ?").build();
}
+
+ @Test(expected = IllegalArgumentException.class)
+ public void assertBuildWithEmptyDagJobConfiguration() {
+ JobDagConfiguration jobDagConfiguration = new JobDagConfiguration();
+ JobConfiguration.newBuilder("test_job", 3).cron("0/1 * * * * ?").jobDagConfiguration(jobDagConfiguration).build();
+ }
+
+ @Test
+ public void assertBuildNullDagJobConfiguration() {
+ JobConfiguration actual = JobConfiguration.newBuilder("test_job", 3).cron("0/1 * * * * ?").jobDagConfiguration(null).build();
+ assertNull(actual.getJobDagConfiguration());
+ }
+
+ @Test
+ public void assertBuildDagJobConfiguration() {
+ JobDagConfiguration jobDagConfiguration = new JobDagConfiguration("fake_dag", "fake_dependencies", 3, 500, true, false);
+ JobConfiguration actual = JobConfiguration.newBuilder("test_job", 3).cron("0/1 * * * * ?").jobDagConfiguration(jobDagConfiguration).build();
+ assertThat(actual.getJobName(), is("test_job"));
+ assertNotNull(actual.getJobDagConfiguration());
+ assertTrue(actual.getJobDagConfiguration().isDagRunAlone());
+ assertFalse(actual.getJobDagConfiguration().isDagSkipWhenFail());
+ assertEquals(actual.getJobDagConfiguration().getDagName(), "fake_dag");
+ assertEquals(actual.getJobDagConfiguration().getDagDependencies(), "fake_dependencies");
+ assertEquals(actual.getJobDagConfiguration().getRetryTimes(), 3);
+ assertEquals(actual.getJobDagConfiguration().getRetryInterval(), 500);
+ }
}
diff --git a/elasticjob-cloud/elasticjob-cloud-executor/src/main/java/org/apache/shardingsphere/elasticjob/cloud/facade/CloudJobFacade.java b/elasticjob-cloud/elasticjob-cloud-executor/src/main/java/org/apache/shardingsphere/elasticjob/cloud/facade/CloudJobFacade.java
index 7329e0b..21ca2cf 100755
--- a/elasticjob-cloud/elasticjob-cloud-executor/src/main/java/org/apache/shardingsphere/elasticjob/cloud/facade/CloudJobFacade.java
+++ b/elasticjob-cloud/elasticjob-cloud-executor/src/main/java/org/apache/shardingsphere/elasticjob/cloud/facade/CloudJobFacade.java
@@ -29,6 +29,7 @@ import org.apache.shardingsphere.elasticjob.tracing.event.JobStatusTraceEvent.So
import org.apache.shardingsphere.elasticjob.tracing.event.JobStatusTraceEvent.State;
import java.util.Collection;
+import java.util.Map;
/**
* Cloud job facade.
@@ -68,7 +69,12 @@ public final class CloudJobFacade implements JobFacade {
@Override
public void registerJobCompleted(final ShardingContexts shardingContexts) {
}
-
+
+ @Override
+ public void registerJobCompleted(final ShardingContexts shardingContexts, final Map<Integer, String> itemErrorMessages) {
+
+ }
+
@Override
public ShardingContexts getShardingContexts() {
return shardingContexts;
@@ -112,4 +118,19 @@ public final class CloudJobFacade implements JobFacade {
jobEventBus.post(new JobStatusTraceEvent(taskContext.getMetaInfo().getJobName(), taskContext.getId(), taskContext.getSlaveId(),
Source.CLOUD_EXECUTOR, taskContext.getType().toString(), String.valueOf(taskContext.getMetaInfo().getShardingItems()), state, message));
}
+
+ @Override
+ public boolean isDagJob() {
+ return false;
+ }
+
+ @Override
+ public void dagStatesCheck() {
+
+ }
+
+ @Override
+ public void dagJobDependenciesCheck() {
+
+ }
}
diff --git a/elasticjob-cloud/elasticjob-cloud-executor/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/facade/CloudJobFacadeTest.java b/elasticjob-cloud/elasticjob-cloud-executor/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/facade/CloudJobFacadeTest.java
index a38a6fb..e23d8db 100755
--- a/elasticjob-cloud/elasticjob-cloud-executor/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/facade/CloudJobFacadeTest.java
+++ b/elasticjob-cloud/elasticjob-cloud-executor/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/facade/CloudJobFacadeTest.java
@@ -133,4 +133,19 @@ public final class CloudJobFacadeTest {
public void assertPostJobStatusTraceEvent() {
jobFacade.postJobStatusTraceEvent(String.format("%s@-@0@-@%s@-@fake_slave_id@-@0", "test_job", ExecutionType.READY), State.TASK_RUNNING, "message is empty.");
}
+
+ @Test
+ public void assertIsDagJob() {
+ assertFalse(jobFacade.isDagJob());
+ }
+
+ @Test
+ public void assertDagStatesCheck() {
+ jobFacade.dagStatesCheck();
+ }
+
+ @Test
+ public void assertDagJobDependenciesCheck() {
+ jobFacade.dagJobDependenciesCheck();
+ }
}
diff --git a/elasticjob-executor/elasticjob-executor-kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutor.java b/elasticjob-executor/elasticjob-executor-kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutor.java
index a21428e..d65f478 100644
--- a/elasticjob-executor/elasticjob-executor-kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutor.java
+++ b/elasticjob-executor/elasticjob-executor-kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutor.java
@@ -86,6 +86,19 @@ public final class ElasticJobExecutor {
} catch (final JobExecutionEnvironmentException cause) {
jobErrorHandler.handleException(jobConfig.getJobName(), cause);
}
+
+ if (jobFacade.isDagJob()) {
+ try {
+ jobFacade.dagStatesCheck();
+ jobFacade.dagJobDependenciesCheck();
+ //CHECKSTYLE:OFF
+ } catch (Exception e) {
+ //CHECKSTYLE:ON
+ log.error("DAG job - {} exception! Check !", jobConfig.getJobName(), e);
+ return;
+ }
+ }
+
ShardingContexts shardingContexts = jobFacade.getShardingContexts();
jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, String.format("Job '%s' execute begin.", jobConfig.getJobName()));
if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) {
@@ -127,8 +140,7 @@ public final class ElasticJobExecutor {
try {
process(shardingContexts, executionSource);
} finally {
- // TODO Consider increasing the status of job failure, and how to handle the overall loop of job failure
- jobFacade.registerJobCompleted(shardingContexts);
+ jobFacade.registerJobCompleted(shardingContexts, itemErrorMessages);
if (itemErrorMessages.isEmpty()) {
jobFacade.postJobStatusTraceEvent(taskId, State.TASK_FINISHED, "");
} else {
diff --git a/elasticjob-executor/elasticjob-executor-kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/JobFacade.java b/elasticjob-executor/elasticjob-executor-kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/JobFacade.java
index 6d0c79b..d6d7e2a 100644
--- a/elasticjob-executor/elasticjob-executor-kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/JobFacade.java
+++ b/elasticjob-executor/elasticjob-executor-kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/JobFacade.java
@@ -24,6 +24,7 @@ import org.apache.shardingsphere.elasticjob.tracing.event.JobExecutionEvent;
import org.apache.shardingsphere.elasticjob.tracing.event.JobStatusTraceEvent.State;
import java.util.Collection;
+import java.util.Map;
/**
* Job facade.
@@ -63,7 +64,15 @@ public interface JobFacade {
* @param shardingContexts sharding contexts
*/
void registerJobCompleted(ShardingContexts shardingContexts);
-
+
+ /**
+ * Register job completed. And error sharding items.
+ *
+ * @param shardingContexts sharding contexts
+ * @param itemErrorMessages error items
+ */
+ void registerJobCompleted(ShardingContexts shardingContexts, Map<Integer, String> itemErrorMessages);
+
/**
* Get sharding contexts.
*
@@ -130,4 +139,21 @@ public interface JobFacade {
* @param message job message
*/
void postJobStatusTraceEvent(String taskId, State state, String message);
+
+ /**
+ * Is current job belong to DAG.
+ *
+ * @return is dag job
+ */
+ boolean isDagJob();
+
+ /**
+ * Check Dag Group States.
+ */
+ void dagStatesCheck();
+
+ /**
+ * check current job's dependencies are all success.
+ */
+ void dagJobDependenciesCheck();
}
diff --git a/elasticjob-executor/elasticjob-executor-kernel/src/test/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutorTest.java b/elasticjob-executor/elasticjob-executor-kernel/src/test/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutorTest.java
index f3d27a4..1f0699b 100644
--- a/elasticjob-executor/elasticjob-executor-kernel/src/test/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutorTest.java
+++ b/elasticjob-executor/elasticjob-executor-kernel/src/test/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutorTest.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.elasticjob.executor;
import lombok.SneakyThrows;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
+import org.apache.shardingsphere.elasticjob.api.JobDagConfiguration;
import org.apache.shardingsphere.elasticjob.api.listener.ShardingContexts;
import org.apache.shardingsphere.elasticjob.executor.fixture.executor.ClassedFooJobExecutor;
import org.apache.shardingsphere.elasticjob.executor.fixture.job.FooJob;
@@ -35,6 +36,7 @@ import java.lang.reflect.Field;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
@@ -70,7 +72,15 @@ public final class ElasticJobExecutorTest {
return JobConfiguration.newBuilder("test_job", 3)
.cron("0/1 * * * * ?").shardingItemParameters("0=A,1=B,2=C").jobParameter("param").failover(true).misfire(false).jobErrorHandlerType("THROW").description("desc").build();
}
-
+
+ private JobConfiguration createJobConfigurationWithDag() {
+ JobDagConfiguration jobDagConfiguration = new JobDagConfiguration("fakeDag", "job1,job2", 3, 400,
+ false, false);
+ return JobConfiguration.newBuilder("test_job", 3)
+ .cron("0/1 * * * * ?").shardingItemParameters("0=A,1=B,2=C").jobParameter("param").failover(true).misfire(false).jobErrorHandlerType("THROW").description("desc")
+ .jobDagConfiguration(jobDagConfiguration).build();
+ }
+
@SneakyThrows
private void setJobItemExecutor() {
Field field = ElasticJobExecutor.class.getDeclaredField("jobItemExecutor");
@@ -87,7 +97,17 @@ public final class ElasticJobExecutorTest {
verify(jobItemExecutor, times(0)).process(eq(fooJob), eq(jobConfig), eq(jobFacade), any());
}
}
-
+
+ @Test
+ public void assertExecuteWhenDagConfigEmpty() {
+ ShardingContexts shardingContexts = new ShardingContexts("fake_task_id", "test_job", 3, "", Collections.emptyMap());
+ when(jobFacade.getShardingContexts()).thenReturn(shardingContexts);
+ when(jobFacade.isDagJob()).thenReturn(true);
+ elasticJobExecutor.execute();
+ verify(jobFacade).postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, "Job 'test_job' execute begin.");
+ verify(jobItemExecutor, times(0)).process(eq(fooJob), eq(jobConfig), eq(jobFacade), any());
+ }
+
@Test
public void assertExecuteWhenPreviousJobStillRunning() {
ShardingContexts shardingContexts = new ShardingContexts("fake_task_id", "test_job", 3, "", Collections.emptyMap());
@@ -131,10 +151,19 @@ public final class ElasticJobExecutorTest {
verify(jobFacade).postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_ERROR, getErrorMessage(shardingContexts));
verify(jobFacade).registerJobBegin(shardingContexts);
verify(jobItemExecutor, times(shardingContexts.getShardingTotalCount())).process(eq(fooJob), eq(jobConfig), eq(jobFacade), any());
- verify(jobFacade).registerJobCompleted(shardingContexts);
+ verify(jobFacade).registerJobCompleted(shardingContexts, getItemErrorMessage(shardingContexts));
}
}
-
+
+ private Map<Integer, String> getItemErrorMessage(final ShardingContexts shardingContexts) {
+ Map<Integer, String> itemErrorMsg = new ConcurrentHashMap<>(jobConfig.getShardingTotalCount(), 1);
+ itemErrorMsg.put(0, "java.lang.RuntimeException" + System.lineSeparator());
+ if (shardingContexts.getShardingItemParameters().size() > 1) {
+ itemErrorMsg.put(1, "java.lang.RuntimeException" + System.lineSeparator());
+ }
+ return itemErrorMsg;
+ }
+
private String getErrorMessage(final ShardingContexts shardingContexts) {
return 1 == shardingContexts.getShardingItemParameters().size()
? "{0=java.lang.RuntimeException" + System.lineSeparator() + "}"
@@ -190,7 +219,7 @@ public final class ElasticJobExecutorTest {
verify(jobFacade).misfireIfRunning(shardingContexts.getShardingItemParameters().keySet());
verify(jobFacade, times(2)).registerJobBegin(shardingContexts);
verify(jobItemExecutor, times(4)).process(eq(fooJob), eq(jobConfig), eq(jobFacade), any());
- verify(jobFacade, times(2)).registerJobCompleted(shardingContexts);
+ verify(jobFacade, times(2)).registerJobCompleted(shardingContexts, new ConcurrentHashMap<>(jobConfig.getShardingTotalCount(), 1));
}
@Test(expected = JobSystemException.class)
@@ -245,7 +274,7 @@ public final class ElasticJobExecutorTest {
verify(jobFacade).postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, "Job 'test_job' execute begin.");
verify(jobFacade).beforeJobExecuted(shardingContexts);
verify(jobFacade).registerJobBegin(shardingContexts);
- verify(jobFacade).registerJobCompleted(shardingContexts);
+ verify(jobFacade).registerJobCompleted(shardingContexts, new ConcurrentHashMap<>(jobConfig.getShardingTotalCount(), 1));
verify(jobFacade).afterJobExecuted(shardingContexts);
}
}
diff --git a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/job/ref/RefFooSimpleElasticJob.java b/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/exception/DagRuntimeException.java
similarity index 50%
copy from elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/job/ref/RefFooSimpleElasticJob.java
copy to elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/exception/DagRuntimeException.java
index 157d33d..c744e22 100644
--- a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/job/ref/RefFooSimpleElasticJob.java
+++ b/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/exception/DagRuntimeException.java
@@ -7,7 +7,7 @@
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,33 +15,20 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.elasticjob.lite.spring.namespace.fixture.job.ref;
-
-import lombok.Getter;
-import lombok.Setter;
-import org.apache.shardingsphere.elasticjob.api.ShardingContext;
-import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
-import org.apache.shardingsphere.elasticjob.lite.spring.namespace.fixture.service.FooService;
+package org.apache.shardingsphere.elasticjob.infra.exception;
-public class RefFooSimpleElasticJob implements SimpleJob {
+/**
+ * Dag runtime exception.
+ *
+ **/
+public class DagRuntimeException extends RuntimeException {
+ private static final long serialVersionUID = 3244908974343209468L;
- @Getter
- private static volatile boolean completed;
-
- @Getter
- @Setter
- private FooService fooService;
-
- @Override
- public void execute(final ShardingContext shardingContext) {
- fooService.foo();
- completed = true;
+ public DagRuntimeException(final String errorMessage, final Object... args) {
+ super(String.format(errorMessage, args));
}
-
- /**
- * Set completed to false.
- */
- public static void reset() {
- completed = false;
+
+ public DagRuntimeException(final Throwable cause) {
+ super(cause);
}
}
diff --git a/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/pojo/JobConfigurationPOJO.java b/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/pojo/JobConfigurationPOJO.java
index a9ee4c7..b6982a8 100644
--- a/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/pojo/JobConfigurationPOJO.java
+++ b/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/pojo/JobConfigurationPOJO.java
@@ -19,7 +19,9 @@ package org.apache.shardingsphere.elasticjob.infra.pojo;
import lombok.Getter;
import lombok.Setter;
+import org.apache.commons.lang3.StringUtils;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
+import org.apache.shardingsphere.elasticjob.api.JobDagConfiguration;
import java.util.Properties;
@@ -63,6 +65,18 @@ public final class JobConfigurationPOJO {
private boolean disabled;
private boolean overwrite;
+
+ private String dagName;
+
+ private String dagDependencies;
+
+ private int retryTimes;
+
+ private int retryInterval;
+
+ private boolean dagRunAlone;
+
+ private boolean dagSkipWhenFail;
/**
* Convert to job configuration.
@@ -70,12 +84,17 @@ public final class JobConfigurationPOJO {
* @return job configuration
*/
public JobConfiguration toJobConfiguration() {
+ JobDagConfiguration jobDagConfiguration = null;
+ if (StringUtils.isNotEmpty(dagName)) {
+ jobDagConfiguration = new JobDagConfiguration(dagName, dagDependencies, retryTimes, retryInterval, dagRunAlone, dagSkipWhenFail);
+ }
+
JobConfiguration result = JobConfiguration.newBuilder(jobName, shardingTotalCount)
.cron(cron).shardingItemParameters(shardingItemParameters).jobParameter(jobParameter)
.monitorExecution(monitorExecution).failover(failover).misfire(misfire)
.maxTimeDiffSeconds(maxTimeDiffSeconds).reconcileIntervalMinutes(reconcileIntervalMinutes)
.jobShardingStrategyType(jobShardingStrategyType).jobExecutorServiceHandlerType(jobExecutorServiceHandlerType).jobErrorHandlerType(jobErrorHandlerType)
- .description(description).disabled(disabled).overwrite(overwrite).build();
+ .description(description).disabled(disabled).overwrite(overwrite).jobDagConfiguration(jobDagConfiguration).build();
for (Object each : props.keySet()) {
result.getProps().setProperty(each.toString(), props.get(each.toString()).toString());
}
@@ -107,6 +126,14 @@ public final class JobConfigurationPOJO {
result.setProps(jobConfiguration.getProps());
result.setDisabled(jobConfiguration.isDisabled());
result.setOverwrite(jobConfiguration.isOverwrite());
+ if (jobConfiguration.getJobDagConfiguration() != null) {
+ result.setDagName(jobConfiguration.getJobDagConfiguration().getDagName());
+ result.setDagDependencies(jobConfiguration.getJobDagConfiguration().getDagDependencies());
+ result.setRetryTimes(jobConfiguration.getJobDagConfiguration().getRetryTimes());
+ result.setRetryInterval(jobConfiguration.getJobDagConfiguration().getRetryInterval());
+ result.setDagRunAlone(jobConfiguration.getJobDagConfiguration().isDagRunAlone());
+ result.setDagSkipWhenFail(jobConfiguration.getJobDagConfiguration().isDagSkipWhenFail());
+ }
return result;
}
}
diff --git a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/job/ref/RefFooSimpleElasticJob.java b/elasticjob-infra/elasticjob-infra-common/src/test/java/org/apache/shardingsphere/elasticjob/infra/exception/DagRuntimeExceptionTest.java
similarity index 51%
copy from elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/job/ref/RefFooSimpleElasticJob.java
copy to elasticjob-infra/elasticjob-infra-common/src/test/java/org/apache/shardingsphere/elasticjob/infra/exception/DagRuntimeExceptionTest.java
index 157d33d..861536d 100644
--- a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/job/ref/RefFooSimpleElasticJob.java
+++ b/elasticjob-infra/elasticjob-infra-common/src/test/java/org/apache/shardingsphere/elasticjob/infra/exception/DagRuntimeExceptionTest.java
@@ -7,7 +7,7 @@
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,33 +15,23 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.elasticjob.lite.spring.namespace.fixture.job.ref;
+package org.apache.shardingsphere.elasticjob.infra.exception;
+
+import org.junit.Test;
-import lombok.Getter;
-import lombok.Setter;
-import org.apache.shardingsphere.elasticjob.api.ShardingContext;
-import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
-import org.apache.shardingsphere.elasticjob.lite.spring.namespace.fixture.service.FooService;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
-public class RefFooSimpleElasticJob implements SimpleJob {
+public class DagRuntimeExceptionTest {
- @Getter
- private static volatile boolean completed;
-
- @Getter
- @Setter
- private FooService fooService;
-
- @Override
- public void execute(final ShardingContext shardingContext) {
- fooService.foo();
- completed = true;
+ @Test
+ public void assertGetMessage() {
+ assertThat(new DagRuntimeException("message is: '%s'", "test").getMessage(), is("message is: 'test'"));
}
-
- /**
- * Set completed to false.
- */
- public static void reset() {
- completed = false;
+
+ @Test
+ public void assertGetCause() {
+ assertThat(new DagRuntimeException(new RuntimeException()).getCause(), instanceOf(RuntimeException.class));
}
}
diff --git a/elasticjob-infra/elasticjob-infra-common/src/test/java/org/apache/shardingsphere/elasticjob/infra/pojo/JobConfigurationPOJOTest.java b/elasticjob-infra/elasticjob-infra-common/src/test/java/org/apache/shardingsphere/elasticjob/infra/pojo/JobConfigurationPOJOTest.java
index 0ed703c..b5d54c9 100644
--- a/elasticjob-infra/elasticjob-infra-common/src/test/java/org/apache/shardingsphere/elasticjob/infra/pojo/JobConfigurationPOJOTest.java
+++ b/elasticjob-infra/elasticjob-infra-common/src/test/java/org/apache/shardingsphere/elasticjob/infra/pojo/JobConfigurationPOJOTest.java
@@ -30,6 +30,8 @@ import static org.junit.Assert.assertTrue;
public final class JobConfigurationPOJOTest {
private static final String YAML = "cron: 0/1 * * * * ?\n"
+ + "dagRunAlone: false\n"
+ + "dagSkipWhenFail: false\n"
+ "description: Job description\n"
+ "disabled: false\n"
+ "failover: false\n"
@@ -45,10 +47,39 @@ public final class JobConfigurationPOJOTest {
+ "props:\n"
+ " key: value\n"
+ "reconcileIntervalMinutes: 0\n"
+ + "retryInterval: 0\n"
+ + "retryTimes: 0\n"
+ "shardingItemParameters: 0=A,1=B,2=C\n"
+ "shardingTotalCount: 3\n";
-
+
+ private static final String DAG_YAML = "cron: 0/1 * * * * ?\n"
+ + "dagDependencies: jobA,jobB\n"
+ + "dagName: fooDag\n"
+ + "dagRunAlone: false\n"
+ + "dagSkipWhenFail: true\n"
+ + "description: Job description\n"
+ + "disabled: false\n"
+ + "failover: false\n"
+ + "jobErrorHandlerType: IGNORE\n"
+ + "jobExecutorServiceHandlerType: CPU\n"
+ + "jobName: test_job\n"
+ + "jobParameter: param\n"
+ + "jobShardingStrategyType: AVG_ALLOCATION\n"
+ + "maxTimeDiffSeconds: -1\n"
+ + "misfire: false\n"
+ + "monitorExecution: false\n"
+ + "overwrite: false\n"
+ + "props:\n"
+ + " key: value\n"
+ + "reconcileIntervalMinutes: 0\n"
+ + "retryInterval: 500\n"
+ + "retryTimes: 3\n"
+ + "shardingItemParameters: 0=A,1=B,2=C\n"
+ + "shardingTotalCount: 3\n";
+
private static final String YAML_WITH_NULL = "cron: 0/1 * * * * ?\n"
+ + "dagRunAlone: false\n"
+ + "dagSkipWhenFail: false\n"
+ "disabled: false\n"
+ "failover: false\n"
+ "jobName: test_job\n"
@@ -57,6 +88,8 @@ public final class JobConfigurationPOJOTest {
+ "monitorExecution: false\n"
+ "overwrite: false\n"
+ "reconcileIntervalMinutes: 0\n"
+ + "retryInterval: 0\n"
+ + "retryTimes: 0\n"
+ "shardingTotalCount: 3\n";
@Test
@@ -186,4 +219,38 @@ public final class JobConfigurationPOJOTest {
assertFalse(actual.isDisabled());
assertFalse(actual.isOverwrite());
}
+
+ @Test
+ public void assertMarshalWithDag() {
+ JobConfigurationPOJO actual = new JobConfigurationPOJO();
+ actual.setJobName("test_job");
+ actual.setCron("0/1 * * * * ?");
+ actual.setShardingTotalCount(3);
+ actual.setShardingItemParameters("0=A,1=B,2=C");
+ actual.setJobParameter("param");
+ actual.setMaxTimeDiffSeconds(-1);
+ actual.setJobShardingStrategyType("AVG_ALLOCATION");
+ actual.setJobExecutorServiceHandlerType("CPU");
+ actual.setJobErrorHandlerType("IGNORE");
+ actual.setDescription("Job description");
+ actual.getProps().setProperty("key", "value");
+ actual.setDagSkipWhenFail(true);
+ actual.setDagRunAlone(false);
+ actual.setDagName("fooDag");
+ actual.setDagDependencies("jobA,jobB");
+ actual.setRetryInterval(500);
+ actual.setRetryTimes(3);
+ assertThat(YamlEngine.marshal(actual), is(DAG_YAML));
+ }
+
+ @Test
+ public void assertUnMarshalWithDag() {
+ JobConfigurationPOJO actual = YamlEngine.unmarshal(DAG_YAML, JobConfigurationPOJO.class);
+ assertThat(actual.getDagName(), is("fooDag"));
+ assertThat(actual.getDagDependencies(), is("jobA,jobB"));
+ assertThat(actual.getRetryInterval(), is(500));
+ assertThat(actual.getRetryTimes(), is(3));
+ assertTrue(actual.isDagSkipWhenFail());
+ assertFalse(actual.isDagRunAlone());
+ }
}
diff --git a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-api/src/main/java/org/apache/shardingsphere/elasticjob/tracing/event/DagJobExecutionEvent.java b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-api/src/main/java/org/apache/shardingsphere/elasticjob/tracing/event/DagJobExecutionEvent.java
new file mode 100644
index 0000000..d974502
--- /dev/null
+++ b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-api/src/main/java/org/apache/shardingsphere/elasticjob/tracing/event/DagJobExecutionEvent.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.tracing.event;
+
+import lombok.Getter;
+import lombok.ToString;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.time.DateFormatUtils;
+
+import java.util.Date;
+import java.util.UUID;
+
+/**
+ * Dag job execution event.
+ *
+ **/
+@Getter
+@ToString
+public class DagJobExecutionEvent implements JobEvent {
+ private String id;
+
+ private String dagName;
+
+ private String jobName;
+
+ private String execTime;
+
+ private String execDate;
+
+ private String batchNo;
+
+ private String state;
+
+ private String message;
+
+ public DagJobExecutionEvent(final String dagName, final String jobName, final String batchNo, final String state,
+ final String message) {
+ this(dagName, jobName, DateFormatUtils.format(new Date(), "HHmmss"),
+ DateFormatUtils.format(new Date(), "yyyyMMdd"),
+ batchNo, state, message);
+ }
+
+ public DagJobExecutionEvent(final String dagName, final String jobName, final String execTime, final String execDate,
+ final String batchNo, final String state, final String message) {
+ this.id = UUID.randomUUID().toString();
+ this.dagName = dagName;
+ this.jobName = jobName;
+ this.execTime = execTime;
+ this.execDate = execDate;
+ this.batchNo = batchNo;
+ this.state = state;
+ this.message = truncateMessage(message);
+ }
+
+ public DagJobExecutionEvent(final String id, final String dagName, final String jobName, final String execTime,
+ final String execDate, final String batchNo, final String state, final String message) {
+ this.id = id;
+ this.dagName = dagName;
+ this.jobName = jobName;
+ this.execTime = execTime;
+ this.execDate = execDate;
+ this.batchNo = batchNo;
+ this.state = state;
+ this.message = truncateMessage(message);
+ }
+
+ private static String truncateMessage(final String str) {
+ return StringUtils.isNotEmpty(str) && str.length() > 255 ? StringUtils.substring(str, 0, 255) : str;
+ }
+
+ @Override
+ public String getJobName() {
+ return jobName;
+ }
+}
diff --git a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-api/src/main/java/org/apache/shardingsphere/elasticjob/tracing/listener/TracingListener.java b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-api/src/main/java/org/apache/shardingsphere/elasticjob/tracing/listener/TracingListener.java
index 414ddeb..6391c72 100644
--- a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-api/src/main/java/org/apache/shardingsphere/elasticjob/tracing/listener/TracingListener.java
+++ b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-api/src/main/java/org/apache/shardingsphere/elasticjob/tracing/listener/TracingListener.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.elasticjob.tracing.listener;
import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe;
+import org.apache.shardingsphere.elasticjob.tracing.event.DagJobExecutionEvent;
import org.apache.shardingsphere.elasticjob.tracing.event.JobExecutionEvent;
import org.apache.shardingsphere.elasticjob.tracing.event.JobStatusTraceEvent;
@@ -44,4 +45,13 @@ public interface TracingListener {
@Subscribe
@AllowConcurrentEvents
void listen(JobStatusTraceEvent jobStatusTraceEvent);
+
+ /**
+ * Listen dag job trace event.
+ *
+ * @param dagJobExecutionEvent dag job status trace event
+ */
+ @Subscribe
+ @AllowConcurrentEvents
+ void listen(DagJobExecutionEvent dagJobExecutionEvent);
}
diff --git a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-api/src/test/java/org/apache/shardingsphere/elasticjob/tracing/fixture/TestTracingListener.java b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-api/src/test/java/org/apache/shardingsphere/elasticjob/tracing/fixture/TestTracingListener.java
index 48f09f0..e7aaf62 100644
--- a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-api/src/test/java/org/apache/shardingsphere/elasticjob/tracing/fixture/TestTracingListener.java
+++ b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-api/src/test/java/org/apache/shardingsphere/elasticjob/tracing/fixture/TestTracingListener.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.elasticjob.tracing.fixture;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.elasticjob.tracing.event.DagJobExecutionEvent;
import org.apache.shardingsphere.elasticjob.tracing.listener.TracingListener;
import org.apache.shardingsphere.elasticjob.tracing.event.JobExecutionEvent;
import org.apache.shardingsphere.elasticjob.tracing.event.JobStatusTraceEvent;
@@ -41,7 +42,13 @@ public final class TestTracingListener implements TracingListener {
public void listen(final JobStatusTraceEvent jobStatusTraceEvent) {
jobEventCaller.call();
}
-
+
+ @Override
+ public void listen(final DagJobExecutionEvent dagJobExecutionEvent) {
+ jobEventCaller.call();
+ executionEventCalled = true;
+ }
+
/**
* Set executionEventCalled to false.
*/
diff --git a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/java/org/apache/shardingsphere/elasticjob/tracing/rdb/listener/RDBTracingListener.java b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/java/org/apache/shardingsphere/elasticjob/tracing/rdb/listener/RDBTracingListener.java
index 5369302..e19b849 100644
--- a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/java/org/apache/shardingsphere/elasticjob/tracing/rdb/listener/RDBTracingListener.java
+++ b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/java/org/apache/shardingsphere/elasticjob/tracing/rdb/listener/RDBTracingListener.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.elasticjob.tracing.rdb.listener;
+import org.apache.shardingsphere.elasticjob.tracing.event.DagJobExecutionEvent;
import org.apache.shardingsphere.elasticjob.tracing.event.JobExecutionEvent;
import org.apache.shardingsphere.elasticjob.tracing.event.JobStatusTraceEvent;
import org.apache.shardingsphere.elasticjob.tracing.listener.TracingListener;
@@ -45,4 +46,9 @@ public final class RDBTracingListener implements TracingListener {
public void listen(final JobStatusTraceEvent jobStatusTraceEvent) {
repository.addJobStatusTraceEvent(jobStatusTraceEvent);
}
+
+ @Override
+ public void listen(final DagJobExecutionEvent dagJobExecutionEvent) {
+ repository.addDagJobExecutionEvent(dagJobExecutionEvent);
+ }
}
diff --git a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/java/org/apache/shardingsphere/elasticjob/tracing/rdb/storage/RDBJobEventStorage.java b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/java/org/apache/shardingsphere/elasticjob/tracing/rdb/storage/RDBJobEventStorage.java
index b2b0263..a984261 100644
--- a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/java/org/apache/shardingsphere/elasticjob/tracing/rdb/storage/RDBJobEventStorage.java
+++ b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/java/org/apache/shardingsphere/elasticjob/tracing/rdb/storage/RDBJobEventStorage.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.elasticjob.tracing.rdb.storage;
import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.elasticjob.tracing.event.DagJobExecutionEvent;
import org.apache.shardingsphere.elasticjob.tracing.event.JobExecutionEvent;
import org.apache.shardingsphere.elasticjob.tracing.event.JobStatusTraceEvent;
import org.apache.shardingsphere.elasticjob.tracing.event.JobStatusTraceEvent.Source;
@@ -53,6 +54,8 @@ public final class RDBJobEventStorage {
private static final String TABLE_JOB_STATUS_TRACE_LOG = "JOB_STATUS_TRACE_LOG";
private static final String TASK_ID_STATE_INDEX = "TASK_ID_STATE_INDEX";
+
+ private static final String TABLE_DAG_JOB_EXECUTION_LOG = "DAG_JOB_EXECUTION_LOG";
private static final Map<String, DatabaseType> DATABASE_TYPES = new HashMap<>();
@@ -91,6 +94,7 @@ public final class RDBJobEventStorage {
try (Connection connection = dataSource.getConnection()) {
createJobExecutionTableAndIndexIfNeeded(connection);
createJobStatusTraceTableAndIndexIfNeeded(connection);
+ createDagJobExecutionTableAndIndexIfNeeded(connection);
}
}
@@ -112,7 +116,16 @@ public final class RDBJobEventStorage {
}
createTaskIdIndexIfNeeded(connection);
}
-
+
+ private void createDagJobExecutionTableAndIndexIfNeeded(final Connection connection) throws SQLException {
+ DatabaseMetaData dbMetaData = connection.getMetaData();
+ try (ResultSet resultSet = dbMetaData.getTables(connection.getCatalog(), null, TABLE_DAG_JOB_EXECUTION_LOG, new String[]{"TABLE"})) {
+ if (!resultSet.next()) {
+ createDagJobExecutionTable(connection);
+ }
+ }
+ }
+
private void createTaskIdIndexIfNeeded(final Connection connection) throws SQLException {
DatabaseMetaData dbMetaData = connection.getMetaData();
try (ResultSet resultSet = dbMetaData.getIndexInfo(connection.getCatalog(), null, TABLE_JOB_STATUS_TRACE_LOG, false, false)) {
@@ -133,7 +146,13 @@ public final class RDBJobEventStorage {
preparedStatement.execute();
}
}
-
+
+ private void createDagJobExecutionTable(final Connection connection) throws SQLException {
+ try (PreparedStatement preparedStatement = connection.prepareStatement(sqlMapper.getCreateTableForDagJobExecutionLog())) {
+ preparedStatement.execute();
+ }
+ }
+
private void createJobStatusTraceTable(final Connection connection) throws SQLException {
try (PreparedStatement preparedStatement = connection.prepareStatement(sqlMapper.getCreateTableForJobStatusTraceLog())) {
preparedStatement.execute();
@@ -363,4 +382,34 @@ public final class RDBJobEventStorage {
}
return result;
}
+
+ /**
+ * Add dag execution event to db.
+ *
+ * @param dagJobExecutionEvent dag execution event
+ * @return add success?
+ */
+ public boolean addDagJobExecutionEvent(final DagJobExecutionEvent dagJobExecutionEvent) {
+ boolean result = false;
+ try (
+ Connection connection = dataSource.getConnection();
+ PreparedStatement preparedStatement = connection.prepareStatement(sqlMapper.getInsertForDagJobExecutionLog())) {
+ preparedStatement.setString(1, dagJobExecutionEvent.getId());
+ preparedStatement.setString(2, dagJobExecutionEvent.getDagName());
+ preparedStatement.setString(3, dagJobExecutionEvent.getJobName());
+ preparedStatement.setString(4, dagJobExecutionEvent.getExecTime());
+ preparedStatement.setString(5, dagJobExecutionEvent.getExecDate());
+ preparedStatement.setString(6, dagJobExecutionEvent.getBatchNo());
+ preparedStatement.setString(7, dagJobExecutionEvent.getState());
+ preparedStatement.setString(8, dagJobExecutionEvent.getMessage());
+ preparedStatement.execute();
+ result = true;
+ } catch (final SQLException ex) {
+ if (!isDuplicateRecord(ex)) {
+ // TODO log failure directly to output log, consider to be configurable in the future
+ log.error(ex.getMessage());
+ }
+ }
+ return result;
+ }
}
diff --git a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/java/org/apache/shardingsphere/elasticjob/tracing/rdb/storage/RDBStorageSQLMapper.java b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/java/org/apache/shardingsphere/elasticjob/tracing/rdb/storage/RDBStorageSQLMapper.java
index a92d92f..08de9a5 100644
--- a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/java/org/apache/shardingsphere/elasticjob/tracing/rdb/storage/RDBStorageSQLMapper.java
+++ b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/java/org/apache/shardingsphere/elasticjob/tracing/rdb/storage/RDBStorageSQLMapper.java
@@ -50,6 +50,10 @@ public final class RDBStorageSQLMapper {
private final String selectForJobStatusTraceLog;
private final String selectOriginalTaskIdForJobStatusTraceLog;
+
+ private final String createTableForDagJobExecutionLog;
+
+ private final String insertForDagJobExecutionLog;
public RDBStorageSQLMapper(final String sqlPropertiesFileName) {
Properties props = loadProps(sqlPropertiesFileName);
@@ -64,6 +68,8 @@ public final class RDBStorageSQLMapper {
insertForJobStatusTraceLog = props.getProperty("JOB_STATUS_TRACE_LOG.INSERT");
selectForJobStatusTraceLog = props.getProperty("JOB_STATUS_TRACE_LOG.SELECT");
selectOriginalTaskIdForJobStatusTraceLog = props.getProperty("JOB_STATUS_TRACE_LOG.SELECT_ORIGINAL_TASK_ID");
+ createTableForDagJobExecutionLog = props.getProperty("DAG_JOB_EXECUTION_LOG.TABLE.CREATE");
+ insertForDagJobExecutionLog = props.getProperty("DAG_JOB_EXECUTION_LOG.INSERT");
}
@SneakyThrows
diff --git a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/DB2.properties b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/DB2.properties
index 61dcf4f..0eef217 100644
--- a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/DB2.properties
+++ b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/DB2.properties
@@ -29,3 +29,6 @@ TASK_ID_STATE_INDEX.INDEX.CREATE=CREATE INDEX TASK_ID_STATE_INDEX ON JOB_STATUS_
JOB_STATUS_TRACE_LOG.INSERT=INSERT INTO JOB_STATUS_TRACE_LOG (id, job_name, original_task_id, task_id, slave_id, source, execution_type, sharding_item, state, message, creation_time) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
JOB_STATUS_TRACE_LOG.SELECT=SELECT * FROM JOB_STATUS_TRACE_LOG WHERE task_id = ?
JOB_STATUS_TRACE_LOG.SELECT_ORIGINAL_TASK_ID=SELECT * FROM (SELECT ROWNUMBER() OVER() AS ROW, A.* FROM JOB_STATUS_TRACE_LOG A WHERE A.TASK_ID = '4' AND A.STATE= 'TASK_STAGING') AS B WHERE B.ROW = 1
+
+DAG_JOB_EXECUTION_LOG.TABLE.CREATE=CREATE TABLE DAG_JOB_EXECUTION_LOG (id VARCHAR(40) not null ,dag_name varchar(40) not null ,job_name varchar(100) not null, exec_time varchar(20) not null,exec_date varchar(20) not null,batch_no varchar(40), state varchar(8) not null,message varchar(255) ,create_time datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,update_time datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (id))
+DAG_JOB_EXECUTION_LOG.INSERT=INSERT INTO DAG_JOB_EXECUTION_LOG (id,group_name, job_name, exec_time, exec_date,batch_no, state, message) VALUES (?,?, ?, ?, ?, ?, ?, ?)
diff --git a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/H2.properties b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/H2.properties
index c4a6e44..08f29bc 100644
--- a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/H2.properties
+++ b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/H2.properties
@@ -29,3 +29,6 @@ TASK_ID_STATE_INDEX.INDEX.CREATE=CREATE INDEX IF NOT EXISTS TASK_ID_STATE_INDEX
JOB_STATUS_TRACE_LOG.INSERT=INSERT INTO JOB_STATUS_TRACE_LOG (id, job_name, original_task_id, task_id, slave_id, source, execution_type, sharding_item, state, message, creation_time) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
JOB_STATUS_TRACE_LOG.SELECT=SELECT * FROM JOB_STATUS_TRACE_LOG WHERE task_id = ?
JOB_STATUS_TRACE_LOG.SELECT_ORIGINAL_TASK_ID=SELECT original_task_id FROM JOB_STATUS_TRACE_LOG WHERE task_id = ? and state= 'TASK_STAGING' LIMIT 1
+
+DAG_JOB_EXECUTION_LOG.TABLE.CREATE=CREATE TABLE DAG_JOB_EXECUTION_LOG (id VARCHAR(40) not null ,dag_name varchar(40) not null ,job_name varchar(100) not null, exec_time varchar(20) not null,exec_date varchar(20) not null,batch_no varchar(40), state varchar(8) not null,message varchar(255) ,create_time datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,update_time datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (id))
+DAG_JOB_EXECUTION_LOG.INSERT=INSERT INTO DAG_JOB_EXECUTION_LOG (id,dag_name, job_name, exec_time, exec_date,batch_no, state, message) VALUES (?,?, ?, ?, ?, ?, ?, ?)
diff --git a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/MySQL.properties b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/MySQL.properties
index 8be48c5..fe60af3 100644
--- a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/MySQL.properties
+++ b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/MySQL.properties
@@ -29,3 +29,6 @@ TASK_ID_STATE_INDEX.INDEX.CREATE=CREATE INDEX TASK_ID_STATE_INDEX ON JOB_STATUS_
JOB_STATUS_TRACE_LOG.INSERT=INSERT INTO JOB_STATUS_TRACE_LOG (id, job_name, original_task_id, task_id, slave_id, source, execution_type, sharding_item, state, message, creation_time) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
JOB_STATUS_TRACE_LOG.SELECT=SELECT * FROM JOB_STATUS_TRACE_LOG WHERE task_id = ?
JOB_STATUS_TRACE_LOG.SELECT_ORIGINAL_TASK_ID=SELECT original_task_id FROM JOB_STATUS_TRACE_LOG WHERE task_id = ? and state= 'TASK_STAGING' LIMIT 1
+
+DAG_JOB_EXECUTION_LOG.TABLE.CREATE=CREATE TABLE DAG_JOB_EXECUTION_LOG (id VARCHAR(40) not null ,dag_name varchar(40) not null ,job_name varchar(100) not null, exec_time varchar(20) not null,exec_date varchar(20) not null,batch_no varchar(40), state varchar(8) not null,message varchar(255) ,create_time datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,update_time datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (id))
+DAG_JOB_EXECUTION_LOG.INSERT=INSERT INTO DAG_JOB_EXECUTION_LOG (id,dag_name, job_name, exec_time, exec_date,batch_no, state, message) VALUES (?,?, ?, ?, ?, ?, ?, ?)
diff --git a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/Oracle.properties b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/Oracle.properties
index 5952f09..75ba48f 100644
--- a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/Oracle.properties
+++ b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/Oracle.properties
@@ -29,3 +29,6 @@ TASK_ID_STATE_INDEX.INDEX.CREATE=CREATE INDEX TASK_ID_STATE_INDEX ON JOB_STATUS_
JOB_STATUS_TRACE_LOG.INSERT=INSERT INTO JOB_STATUS_TRACE_LOG (id, job_name, original_task_id, task_id, slave_id, source, execution_type, sharding_item, state, message, creation_time) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
JOB_STATUS_TRACE_LOG.SELECT=SELECT * FROM JOB_STATUS_TRACE_LOG WHERE task_id = ?
JOB_STATUS_TRACE_LOG.SELECT_ORIGINAL_TASK_ID=SELECT original_task_id FROM JOB_STATUS_TRACE_LOG WHERE task_id = ? and state= 'TASK_STAGING' and ROWNUM = 1
+
+DAG_JOB_EXECUTION_LOG.TABLE.CREATE=CREATE TABLE DAG_JOB_EXECUTION_LOG (id VARCHAR(40) not null ,dag_name varchar(40) not null ,job_name varchar(100) not null, exec_time varchar(20) not null,exec_date varchar(20) not null,batch_no varchar(40), state varchar(8) not null,message varchar(255) ,create_time datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,update_time datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (id))
+DAG_JOB_EXECUTION_LOG.INSERT=INSERT INTO DAG_JOB_EXECUTION_LOG (id,dag_name, job_name, exec_time, exec_date,batch_no, state, message) VALUES (?,?, ?, ?, ?, ?, ?, ?)
diff --git a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/PostgreSQL.properties b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/PostgreSQL.properties
index 815b6b9..c16e0bb 100644
--- a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/PostgreSQL.properties
+++ b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/PostgreSQL.properties
@@ -29,3 +29,6 @@ TASK_ID_STATE_INDEX.INDEX.CREATE=CREATE INDEX TASK_ID_STATE_INDEX ON JOB_STATUS_
JOB_STATUS_TRACE_LOG.INSERT=INSERT INTO JOB_STATUS_TRACE_LOG (id, job_name, original_task_id, task_id, slave_id, source, execution_type, sharding_item, state, message, creation_time) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
JOB_STATUS_TRACE_LOG.SELECT=SELECT * FROM JOB_STATUS_TRACE_LOG WHERE task_id=?
JOB_STATUS_TRACE_LOG.SELECT_ORIGINAL_TASK_ID=SELECT original_task_id FROM JOB_STATUS_TRACE_LOG WHERE task_id=? and state='TASK_STAGING' LIMIT 1
+
+DAG_JOB_EXECUTION_LOG.TABLE.CREATE=CREATE TABLE DAG_JOB_EXECUTION_LOG (id VARCHAR(40) not null ,dag_name varchar(40) not null ,job_name varchar(100) not null, exec_time varchar(20) not null,exec_date varchar(20) not null,batch_no varchar(40), state varchar(8) not null,message varchar(255) ,create_time datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,update_time datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (id))
+DAG_JOB_EXECUTION_LOG.INSERT=INSERT INTO DAG_JOB_EXECUTION_LOG (id,dag_name, job_name, exec_time, exec_date,batch_no, state, message) VALUES (?,?, ?, ?, ?, ?, ?, ?)
diff --git a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/SQL92.properties b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/SQL92.properties
index 1d0f5c1..d3fb39d 100644
--- a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/SQL92.properties
+++ b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/SQL92.properties
@@ -29,3 +29,6 @@ TASK_ID_STATE_INDEX.INDEX.CREATE=CREATE INDEX TASK_ID_STATE_INDEX ON JOB_STATUS_
JOB_STATUS_TRACE_LOG.INSERT=INSERT INTO JOB_STATUS_TRACE_LOG (id, job_name, original_task_id, task_id, slave_id, source, execution_type, sharding_item, state, message, creation_time) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
JOB_STATUS_TRACE_LOG.SELECT=SELECT * FROM JOB_STATUS_TRACE_LOG WHERE task_id=?
JOB_STATUS_TRACE_LOG.SELECT_ORIGINAL_TASK_ID=SELECT original_task_id FROM JOB_STATUS_TRACE_LOG WHERE task_id=? and state='TASK_STAGING' LIMIT 1
+
+DAG_JOB_EXECUTION_LOG.TABLE.CREATE=CREATE TABLE DAG_JOB_EXECUTION_LOG (id VARCHAR(40) not null ,dag_name varchar(40) not null ,job_name varchar(100) not null, exec_time varchar(20) not null,exec_date varchar(20) not null,batch_no varchar(40), state varchar(8) not null,message varchar(255) ,create_time datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,update_time datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (id))
+DAG_JOB_EXECUTION_LOG.INSERT=INSERT INTO DAG_JOB_EXECUTION_LOG (id,dag_name, job_name, exec_time, exec_date,batch_no, state, message) VALUES (?,?, ?, ?, ?, ?, ?, ?)
diff --git a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/SQLServer.properties b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/SQLServer.properties
index 30f3e92..f638ce8 100644
--- a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/SQLServer.properties
+++ b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/sql/SQLServer.properties
@@ -29,3 +29,6 @@ TASK_ID_STATE_INDEX.INDEX.CREATE=CREATE INDEX TASK_ID_STATE_INDEX ON JOB_STATUS_
JOB_STATUS_TRACE_LOG.INSERT=INSERT INTO JOB_STATUS_TRACE_LOG (id, job_name, original_task_id, task_id, slave_id, source, execution_type, sharding_item, state, message, creation_time) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
JOB_STATUS_TRACE_LOG.SELECT=SELECT * FROM JOB_STATUS_TRACE_LOG WHERE task_id = ?
JOB_STATUS_TRACE_LOG.SELECT_ORIGINAL_TASK_ID=SELECT TOP 1 original_task_id FROM JOB_STATUS_TRACE_LOG WHERE task_id = ? and state = 'TASK_STAGING'
+
+DAG_JOB_EXECUTION_LOG.TABLE.CREATE=CREATE TABLE DAG_JOB_EXECUTION_LOG (id VARCHAR(40) not null ,dag_name varchar(40) not null ,job_name varchar(100) not null, exec_time varchar(20) not null,exec_date varchar(20) not null,batch_no varchar(40), state varchar(8) not null,message varchar(255) ,create_time datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,update_time datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (id))
+DAG_JOB_EXECUTION_LOG.INSERT=INSERT INTO DAG_JOB_EXECUTION_LOG (id,dag_name, job_name, exec_time, exec_date,batch_no, state, message) VALUES (?,?, ?, ?, ?, ?, ?, ?)
diff --git a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/test/java/org/apache/shardingsphere/elasticjob/tracing/rdb/listener/RDBTracingListenerTest.java b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/test/java/org/apache/shardingsphere/elasticjob/tracing/rdb/listener/RDBTracingListenerTest.java
index 049677c..6e79a14 100644
--- a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/test/java/org/apache/shardingsphere/elasticjob/tracing/rdb/listener/RDBTracingListenerTest.java
+++ b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/test/java/org/apache/shardingsphere/elasticjob/tracing/rdb/listener/RDBTracingListenerTest.java
@@ -21,6 +21,7 @@ import lombok.SneakyThrows;
import org.apache.commons.dbcp.BasicDataSource;
import org.apache.shardingsphere.elasticjob.tracing.JobEventBus;
import org.apache.shardingsphere.elasticjob.tracing.api.TracingConfiguration;
+import org.apache.shardingsphere.elasticjob.tracing.event.DagJobExecutionEvent;
import org.apache.shardingsphere.elasticjob.tracing.event.JobExecutionEvent;
import org.apache.shardingsphere.elasticjob.tracing.event.JobStatusTraceEvent;
import org.apache.shardingsphere.elasticjob.tracing.event.JobStatusTraceEvent.Source;
@@ -81,4 +82,11 @@ public final class RDBTracingListenerTest {
jobEventBus.post(jobStatusTraceEvent);
verify(repository, atMost(1)).addJobStatusTraceEvent(jobStatusTraceEvent);
}
+
+ @Test
+ public void assertDagJobExecutionEvent() {
+ DagJobExecutionEvent dagJobExecutionEvent = new DagJobExecutionEvent("dagName", JOB_NAME, "batchno", "1", "message is empty.");
+ jobEventBus.post(dagJobExecutionEvent);
+ verify(repository, atMost(1)).addDagJobExecutionEvent(dagJobExecutionEvent);
+ }
}
diff --git a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/test/java/org/apache/shardingsphere/elasticjob/tracing/rdb/storage/RDBJobEventStorageTest.java b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/test/java/org/apache/shardingsphere/elasticjob/tracing/rdb/storage/RDBJobEventStorageTest.java
index 92a4c04..55ed466 100644
--- a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/test/java/org/apache/shardingsphere/elasticjob/tracing/rdb/storage/RDBJobEventStorageTest.java
+++ b/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-rdb/src/test/java/org/apache/shardingsphere/elasticjob/tracing/rdb/storage/RDBJobEventStorageTest.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.elasticjob.tracing.rdb.storage;
import org.apache.commons.dbcp.BasicDataSource;
+import org.apache.shardingsphere.elasticjob.tracing.event.DagJobExecutionEvent;
import org.apache.shardingsphere.elasticjob.tracing.event.JobExecutionEvent;
import org.apache.shardingsphere.elasticjob.tracing.event.JobStatusTraceEvent;
import org.apache.shardingsphere.elasticjob.tracing.event.JobStatusTraceEvent.Source;
@@ -85,7 +86,13 @@ public final class RDBJobEventStorageTest {
assertThat(jobStatusTraceEvent.getOriginalTaskId(), is("original_fake_failed_failover_task_id"));
}
}
-
+
+ @Test
+ public void assertAddDagJobExecutionEvent() {
+ DagJobExecutionEvent dagJobExecutionEvent = new DagJobExecutionEvent("fake_dag", "fake_job", "batchno", "1", "message is empty.");
+ assertTrue(storage.addDagJobExecutionEvent(dagJobExecutionEvent));
+ }
+
@Test
public void assertUpdateJobExecutionEventWhenSuccess() {
JobExecutionEvent startEvent = new JobExecutionEvent("localhost", "127.0.0.1", "fake_task_id", "test_job", JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER, 0);
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/DagJobStates.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/DagJobStates.java
new file mode 100644
index 0000000..a389a3c
--- /dev/null
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/DagJobStates.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.lite.internal.dag;
+
+import org.apache.commons.lang3.StringUtils;
+
+public enum DagJobStates {
+ NONE("none"),
+ REG("register"),
+ INIT("graph"),
+ READY("ready"),
+ RUNNING("running"),
+ PAUSE("pause"),
+ FAIL("fail"),
+ SUCCESS("success"),
+ SKIP("skip"),
+ RETRY("retry"),
+ ERROR("error");
+
+ private String value;
+ DagJobStates(final String value) {
+ this.value = value;
+ }
+
+ /**
+ * Give string return enums.
+ *
+ * @param value enum string value
+ * @return DagJobStates enum
+ */
+ public static DagJobStates of(final String value) {
+ for (DagJobStates states : DagJobStates.values()) {
+ if (StringUtils.equalsIgnoreCase(value, states.getValue())) {
+ return states;
+ }
+ }
+ return DagJobStates.NONE;
+ }
+
+ /**
+ * Get value.
+ *
+ * @return string value
+ */
+ public String getValue() {
+ return value;
+ }
+
+ /**
+ * Set value.
+ *
+ * @param value enum value
+ */
+ public void setValue(final String value) {
+ this.value = value;
+ }
+
+}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/DagNodeStorage.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/DagNodeStorage.java
new file mode 100644
index 0000000..5df835f
--- /dev/null
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/DagNodeStorage.java
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.lite.internal.dag;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.shardingsphere.elasticjob.lite.internal.state.JobStateEnum;
+import org.apache.shardingsphere.elasticjob.lite.internal.state.JobStateNode;
+import org.apache.shardingsphere.elasticjob.lite.internal.storage.JobNodePath;
+import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Dag storage class.
+ * DAG zk path:
+ * /dag/
+ * {dagName}/
+ * config/ {jobName} value=dependencies comm split
+ * graph/ {jobName} value=dependencies comm split
+ * states value= dag states
+ * running/ {jobName}
+ * success/ {jobName}
+ * fail/ {jobName}
+ * skip/ {jobName}
+ * retry/ {jobName}
+ */
+@Slf4j
+public class DagNodeStorage {
+
+ private static final String DAG_ROOT = "/dag/%s";
+
+ private static final String DAG_CONFIG = "/dag/%s/config";
+
+ private static final String DAG_CONFIG_JOB = "/dag/%s/config/%s";
+
+ private static final String DAG_STATES = "/dag/%s/states";
+
+ private static final String DAG_GRAPH = "/dag/%s/graph";
+
+ private static final String DAG_GRAPH_JOB = "/dag/%s/graph/%s";
+
+ private static final String DAG_GRAPH_JOB_RETRYTIMES = "/dag/%s/graph/%s/retry";
+
+ private static final String DAG_RUNNING = "/dag/%s/running";
+
+ private static final String DAG_RUNNING_JOB = "/dag/%s/running/%s";
+
+ private static final String DAG_SUCCESS = "/dag/%s/success";
+
+ private static final String DAG_SUCCESS_JOB = "/dag/%s/success/%s";
+
+ private static final String DAG_FAIL = "/dag/%s/fail";
+
+ private static final String DAG_FAIL_JOB = "/dag/%s/fail/%s";
+
+ private static final String DAG_SKIP = "/dag/%s/skip";
+
+ private static final String DAG_SKIP_JOB = "/dag/%s/skip/%s";
+
+ private static final String DAG_RETRY = "/dag/%s/retry";
+
+ private static final String DAG_RETRY_JOB = "/dag/%s/retry/%s";
+
+ private final CoordinatorRegistryCenter regCenter;
+
+ private final String jobName;
+
+ private final String dagName;
+
+ /**
+ * Constructor.
+ *
+ * @param regCenter register center
+ * @param dagName dag name
+ * @param jobName dag job name
+ */
+ public DagNodeStorage(final CoordinatorRegistryCenter regCenter, final String dagName, final String jobName) {
+ this.regCenter = regCenter;
+ this.jobName = jobName;
+ this.dagName = dagName;
+ }
+
+ /**
+ * Persist /dag/dagName/config/jobName ,value=job's dependencies with comm split.
+ *
+ * @param value job dependencies.
+ */
+ public void persistDagConfig(final String value) {
+ regCenter.persist(pathOfDagConfigJob(jobName), value);
+ }
+
+ private String pathOfDagRoot() {
+ return String.format(DAG_ROOT, dagName);
+ }
+
+ private String pathOfDagConfig() {
+ return String.format(DAG_CONFIG, dagName);
+ }
+
+ private String pathOfDagConfigJob(final String jobName) {
+ return String.format(DAG_CONFIG_JOB, dagName, jobName);
+ }
+
+ private String pathOfDagStates() {
+ return String.format(DAG_STATES, dagName);
+ }
+
+ private String pathOfDagGraph() {
+ return String.format(DAG_GRAPH, dagName);
+ }
+
+ private String pathOfDagGraphJob(final String jobName) {
+ return String.format(DAG_GRAPH_JOB, dagName, jobName);
+ }
+
+ private String pathOfDagGraphJobRetryTimes() {
+ return String.format(DAG_GRAPH_JOB_RETRYTIMES, dagName, jobName);
+ }
+
+ private String pathOfDagRunning() {
+ return String.format(DAG_RUNNING, dagName);
+ }
+
+ private String pathOfDagRunningJob(final String jobName) {
+ return String.format(DAG_RUNNING_JOB, dagName, jobName);
+ }
+
+ private String pathOfDagSuccess() {
+ return String.format(DAG_SUCCESS, dagName);
+ }
+
+ private String pathOfDagSuccessJob(final String jobName) {
+ return String.format(DAG_SUCCESS_JOB, dagName, jobName);
+ }
+
+ private String pathOfDagFail() {
+ return String.format(DAG_FAIL, dagName);
+ }
+
+ private String pathOfDagFailJob(final String jobName) {
+ return String.format(DAG_FAIL_JOB, dagName, jobName);
+ }
+
+ private String pathOfDagSkip() {
+ return String.format(DAG_SKIP, dagName);
+ }
+
+ private String pathOfDagSkipJob(final String jobName) {
+ return String.format(DAG_SKIP_JOB, dagName, jobName);
+ }
+
+ private String pathOfDagRetry() {
+ return String.format(DAG_RETRY, dagName);
+ }
+
+ private String pathOfDagRetryJob(final String jobName) {
+ return String.format(DAG_RETRY_JOB, dagName, jobName);
+ }
+
+ /**
+ * job state path.
+ *
+ * @return path of job state.
+ */
+ public String pathOfJobNodeState() {
+ return String.format("/%s/%s", jobName, JobStateNode.ROOT_STATE);
+ }
+
+ /**
+ * Init dag graph before run.
+ *
+ * @param allDagNode dag job name and dependencies.
+ * @param batchNo batch no
+ */
+ public void initDagGraph(final Map<String, Set<String>> allDagNode, final String batchNo) {
+ log.debug("Dag-{}, before create Dag Graph, clean exist path.", dagName);
+ // clean
+ if (regCenter.isExisted(pathOfDagGraph())) {
+ regCenter.remove(pathOfDagGraph());
+ }
+ if (regCenter.isExisted(pathOfDagStates())) {
+ regCenter.remove(pathOfDagStates());
+ }
+ if (regCenter.isExisted(pathOfDagRunning())) {
+ regCenter.remove(pathOfDagRunning());
+ }
+ if (regCenter.isExisted(pathOfDagSuccess())) {
+ regCenter.remove(pathOfDagSuccess());
+ }
+ if (regCenter.isExisted(pathOfDagFail())) {
+ regCenter.remove(pathOfDagFail());
+ }
+ if (regCenter.isExisted(pathOfDagSkip())) {
+ regCenter.remove(pathOfDagSkip());
+ }
+ if (regCenter.isExisted(pathOfDagRetry())) {
+ regCenter.remove(pathOfDagRetry());
+ }
+
+ log.debug("Dag-{}, Create Dag Graph, create path.", dagName);
+ // create path
+ regCenter.persist(pathOfDagGraph(), batchNo);
+ regCenter.persist(pathOfDagStates(), "");
+ regCenter.persist(pathOfDagRunning(), "");
+ regCenter.persist(pathOfDagSuccess(), "");
+ regCenter.persist(pathOfDagFail(), "");
+ regCenter.persist(pathOfDagSkip(), "");
+ regCenter.persist(pathOfDagRetry(), "");
+
+ log.debug("Dag-{}, Create Dag Graph, create graph.", dagName);
+ // init graph
+ for (Map.Entry<String, Set<String>> entry : allDagNode.entrySet()) {
+ regCenter.persist(pathOfDagGraphJob(entry.getKey()), Joiner.on(",").join(entry.getValue()));
+ }
+
+ log.info("Dag-{}, Create Dag Graph success.", dagName);
+ }
+
+ /**
+ * Get value of /dag/dagName/graph.
+ *
+ * @return batch no.
+ */
+ public String currentDagBatchNo() {
+ return regCenter.getDirectly(pathOfDagGraph());
+ }
+
+ /**
+ * get value of /dag/dagName/states.
+ *
+ * @return dag state
+ */
+ public String currentDagStates() {
+ if (this.regCenter.isExisted(pathOfDagStates())) {
+ return this.regCenter.getDirectly(pathOfDagStates());
+ }
+ return "";
+ }
+
+ /**
+ * update value of /dag/dagName/states.
+ *
+ * @param dagStates dag state
+ */
+ public void updateDagStates(final DagStates dagStates) {
+ regCenter.update(pathOfDagStates(), dagStates.getValue());
+ }
+
+
+ /**
+ * running:/dag/dagName/running/{}.
+ * success:/dag/dagName/success/{}.
+ * fail:/dag/dagName/fail/{}.
+ *
+ * @param jobState job state
+ */
+ public void updateDagJobStates(final JobStateEnum jobState) {
+ if (jobState == JobStateEnum.RUNNING) {
+ regCenter.persist(pathOfDagRunningJob(jobName), String.valueOf(System.currentTimeMillis()));
+ return;
+ }
+ regCenter.remove(pathOfDagRunningJob(jobName));
+ if (jobState == JobStateEnum.SUCCESS) {
+ regCenter.persist(pathOfDagSuccessJob(jobName), String.valueOf(System.currentTimeMillis()));
+ } else if (jobState == JobStateEnum.FAIL) {
+ regCenter.persist(pathOfDagFailJob(jobName), String.valueOf(System.currentTimeMillis()));
+ } else if (jobState == JobStateEnum.SKIP) {
+ regCenter.persist(pathOfDagSkipJob(jobName), String.valueOf(System.currentTimeMillis()));
+ }
+ }
+
+ /**
+ * Get /dag/dagName/config/all config jobs with their dependencies.
+ *
+ * @return dag config info
+ */
+ public Map<String, Set<String>> getAllDagConfigJobs() {
+ Map<String, Set<String>> map = Maps.newHashMap();
+ List<String> childrenKeys = this.regCenter.getChildrenKeys(pathOfDagConfig());
+ childrenKeys.forEach(s -> map.put(s, Sets.newHashSet(splitJobDeps(regCenter.getDirectly(pathOfDagConfigJob(s))))));
+ return map;
+ }
+
+ /**
+ * Get /dag/dagName/graph/all nodes with their dependencies.
+ *
+ * @return dag graph info
+ */
+ public Map<String, Set<String>> getAllDagGraphJobs() {
+ List<String> jobList = this.regCenter.getChildrenKeys(pathOfDagGraph());
+ Map<String, Set<String>> allDagMap = Maps.newHashMap();
+ jobList.forEach(j ->
+ allDagMap.put(j, Sets.newHashSet(splitJobDeps(regCenter.getDirectly(pathOfDagGraphJob(j))))));
+ return allDagMap;
+ }
+
+ private Set<String> splitJobDeps(final String j) {
+ return Splitter.on(",").trimResults().splitToList(j).stream().collect(Collectors.toSet());
+ }
+
+ /**
+ * Get dag job lists by their state.
+ *
+ * @param dagJobState job state
+ * @return job name lists
+ */
+ public List<String> getDagJobListByState(final DagJobStates dagJobState) {
+ if (dagJobState == DagJobStates.RUNNING) {
+ return regCenter.getChildrenKeys(pathOfDagRunning());
+ }
+ if (dagJobState == DagJobStates.SUCCESS) {
+ return regCenter.getChildrenKeys(pathOfDagSuccess());
+ }
+ if (dagJobState == DagJobStates.FAIL) {
+ return regCenter.getChildrenKeys(pathOfDagFail());
+ }
+ if (dagJobState == DagJobStates.SKIP) {
+ return regCenter.getChildrenKeys(pathOfDagSkip());
+ }
+ if (dagJobState == DagJobStates.RETRY) {
+ return regCenter.getChildrenKeys(pathOfDagRetry());
+ }
+ return Lists.newArrayList();
+ }
+
+ private CuratorFramework getClient() {
+ return (CuratorFramework) regCenter.getRawClient();
+ }
+
+ /**
+ * Get job denpendencies from reg center.
+ *
+ * @return job denpendencies.
+ */
+ public String[] getJobDenpendencies() {
+ return StringUtils.split(this.regCenter.get(pathOfDagGraphJob(jobName)), ",");
+ }
+
+ /**
+ * Get job state.
+ *
+ * @param depJob job name.
+ * @return job state
+ */
+ public DagJobStates getDagJobRunStates(final String depJob) {
+ if (regCenter.isExisted(pathOfDagSuccessJob(depJob))) {
+ return DagJobStates.SUCCESS;
+ }
+ if (regCenter.isExisted(pathOfDagFailJob(depJob))) {
+ return DagJobStates.FAIL;
+ }
+ if (regCenter.isExisted(pathOfDagSkipJob(depJob))) {
+ return DagJobStates.SKIP;
+ }
+ if (regCenter.isExisted(pathOfDagRunningJob(depJob))) {
+ return DagJobStates.RUNNING;
+ }
+ return DagJobStates.READY;
+ }
+
+ /**
+ * Trigger the job.
+ *
+ * @param job job name
+ */
+ public void triggerJob(final String job) {
+ if (isJobTriggered(job)) {
+ log.debug("Dag-{} Job-{} has already been tiggered!", dagName, job);
+ return;
+ }
+ log.debug("Dag-{} trigger job-[{}] in transaction.", dagName, job);
+ try {
+ CuratorFramework rawClient = (CuratorFramework) regCenter.getRawClient();
+ List<CuratorOp> opList = new ArrayList<>();
+ opList.add(rawClient.transactionOp().create().forPath(pathOfDagRunningJob(job)));
+ JobNodePath jobNodePath = new JobNodePath(job);
+ for (String each : regCenter.getChildrenKeys(jobNodePath.getInstancesNodePath())) {
+ opList.add(rawClient.transactionOp().setData().forPath(jobNodePath.getInstanceNodePath(each), "TRIGGER".getBytes()));
+ }
+ rawClient.transaction().forOperations(opList);
+ //CHECKSTYLE:OFF
+ } catch (Exception exp) {
+ //CHECKSTYLE:ON
+ log.debug("Dag-{}[{}] trigger job in transaction Exception!", dagName, job, exp);
+ }
+
+ log.info("Dag-{}[{}] has been triggered [{}]", dagName, job, isJobTriggered(job));
+ printZkPath();
+ }
+
+ /**
+ * Trigger retry job.
+ */
+ public void triggerRetryJob() {
+ log.debug("Dag-{} trigger RETRY job-[{}] in transaction.", dagName, jobName);
+ try {
+ CuratorFramework rawClient = (CuratorFramework) regCenter.getRawClient();
+ List<CuratorOp> opList = new ArrayList<>();
+ opList.add(rawClient.transactionOp().delete().forPath(pathOfDagRetryJob(jobName)));
+ JobNodePath jobNodePath = new JobNodePath(jobName);
+ for (String each : regCenter.getChildrenKeys(jobNodePath.getInstancesNodePath())) {
+ opList.add(rawClient.transactionOp().setData().forPath(jobNodePath.getInstanceNodePath(each), "TRIGGER".getBytes()));
+ }
+ rawClient.transaction().forOperations(opList);
+ //CHECKSTYLE:OFF
+ } catch (final Exception exp) {
+ //CHECKSTYLE:ON
+ log.debug("Dag-{}[{}] trigger RETRY job in transaction Exception!", dagName, jobName, exp);
+ }
+
+ log.info("Dag-{}[{}] RETRY job has been triggered!", dagName, jobName);
+ printZkPath();
+ }
+
+ private void printZkPath() {
+ if (log.isDebugEnabled()) {
+ log.debug("Dag-{}[{}] after trigger:", dagName, jobName);
+ log.debug("Dag-{}[{}] ZK path of SUCC : {}", dagName, jobName, getDagJobListByState(DagJobStates.SUCCESS));
+ log.debug("Dag-{}[{}] ZK path of RUN : {}", dagName, jobName, getDagJobListByState(DagJobStates.RUNNING));
+ log.debug("Dag-{}[{}] ZK path of FAIL : {}", dagName, jobName, getDagJobListByState(DagJobStates.FAIL));
+ log.debug("Dag-{}[{}] ZK path of SKIP : {}", dagName, jobName, getDagJobListByState(DagJobStates.SKIP));
+ log.debug("Dag-{}[{}] ZK path of RETRY : {}", dagName, jobName, getDagJobListByState(DagJobStates.RETRY));
+ }
+ }
+
+ private boolean isJobTriggered(final String job) {
+ return regCenter.isExisted(pathOfDagRunningJob(job)) || regCenter.isExisted(pathOfDagSuccessJob(job)) || regCenter.isExisted(pathOfDagFailJob(job));
+ }
+
+ /**
+ * Get current job retry times.
+ *
+ * @return retry times
+ */
+ public int getJobRetryTimes() {
+ String times = regCenter.getDirectly(pathOfDagGraphJobRetryTimes());
+ if (StringUtils.isEmpty(times)) {
+ return 0;
+ }
+ return Integer.valueOf(times);
+ }
+
+
+ /**
+ * Add dag retry job times.
+ * Persist jobName to path '/dag/dagName/retry'
+ *
+ * @param i retry times.
+ */
+ public void updateJobRetryTimes(final int i) {
+ regCenter.persist(pathOfDagRetryJob(jobName), "");
+ regCenter.persist(pathOfDagGraphJobRetryTimes(), "" + i);
+ }
+
+ /**
+ * Remove fail job from register center.
+ *
+ * @param job fail job name.
+ */
+ public void removeFailJob(final String job) {
+ regCenter.remove(pathOfDagFailJob(job));
+ }
+
+ /**
+ * Get dag list.
+ *
+ * @return list fo dag name
+ */
+ public List<String> getAllDags() {
+ return regCenter.getChildrenKeys("/dag");
+ }
+}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/DagService.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/DagService.java
new file mode 100644
index 0000000..037f51f
--- /dev/null
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/DagService.java
@@ -0,0 +1,557 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.lite.internal.dag;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.time.DateFormatUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.framework.recipes.queue.DistributedDelayQueue;
+import org.apache.curator.framework.recipes.queue.QueueBuilder;
+import org.apache.curator.framework.recipes.queue.QueueSerializer;
+import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
+import org.apache.shardingsphere.elasticjob.api.JobDagConfiguration;
+import org.apache.shardingsphere.elasticjob.infra.concurrent.BlockUtils;
+import org.apache.shardingsphere.elasticjob.infra.env.IpUtils;
+import org.apache.shardingsphere.elasticjob.infra.exception.DagRuntimeException;
+import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
+import org.apache.shardingsphere.elasticjob.lite.internal.state.JobStateEnum;
+import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
+import org.apache.shardingsphere.elasticjob.tracing.JobEventBus;
+import org.apache.shardingsphere.elasticjob.tracing.event.DagJobExecutionEvent;
+
+import java.io.UnsupportedEncodingException;
+import java.lang.management.ManagementFactory;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Job dag service.
+ */
+@Slf4j
+public class DagService implements CuratorCacheListener {
+ public static final String ROOT_JOB = "self";
+
+ private static final String DAG_LATCH_PATH = "/daglatch/";
+
+ private static final int DEFAULT_RETRY_INTERVAL = 30;
+
+ private static final String RETRY_PATH = "/dagretry/%s/%s";
+
+ private final DagNodeStorage dagNodeStorage;
+
+ private final JobDagConfiguration jobDagConfig;
+
+ private final String jobName;
+
+ private final String dagName;
+
+ private final InterProcessMutex mutex;
+
+ private final CoordinatorRegistryCenter regCenter;
+
+ private final JobEventBus jobEventBus;
+
+ private CuratorCache jobStateCache;
+
+ private DistributedDelayQueue<String> delayQueue;
+
+ public DagService(final CoordinatorRegistryCenter regCenter, final String jobName, final JobEventBus jobEventBus, final JobDagConfiguration jobDagConfig) {
+ this.jobDagConfig = jobDagConfig;
+ this.regCenter = regCenter;
+ this.jobName = jobName;
+ this.dagNodeStorage = new DagNodeStorage(regCenter, jobDagConfig.getDagName(), jobName);
+ this.dagName = jobDagConfig.getDagName();
+ this.jobEventBus = jobEventBus;
+ if (StringUtils.equals(jobDagConfig.getDagDependencies(), ROOT_JOB)) {
+ this.mutex = new InterProcessMutex((CuratorFramework) regCenter.getRawClient(), DAG_LATCH_PATH + dagName);
+ } else {
+ this.mutex = null;
+ }
+ regDagConfig();
+ }
+
+ public DagService(final CoordinatorRegistryCenter regCenter, final String dagName, final DagNodeStorage dagNodeStorage) {
+ this.regCenter = regCenter;
+ this.dagName = dagName;
+ this.dagNodeStorage = dagNodeStorage;
+ this.jobName = "";
+ this.jobDagConfig = null;
+ this.mutex = null;
+ this.jobStateCache = null;
+ this.delayQueue = null;
+ this.jobEventBus = null;
+ }
+
+ /**
+ * Init delay queue for retry jobs.
+ *
+ * @return DistributedDelayQueue
+ */
+ private DistributedDelayQueue<String> initDelayQueue() {
+ String retryPath = String.format(RETRY_PATH, dagName, jobName);
+ DistributedDelayQueue<String> delayQueue = QueueBuilder.builder((CuratorFramework) regCenter.getRawClient(), new JobRetryTrigger(regCenter, dagName), new QueueSerializer<String>() {
+ @Override
+ public byte[] serialize(final String item) {
+ try {
+ return item.getBytes("utf-8");
+ } catch (UnsupportedEncodingException e) {
+ log.error("Dag-{}[{}] Init delay queue exception.", dagName, jobName, e);
+ }
+ return null;
+ }
+
+ @Override
+ public String deserialize(final byte[] bytes) {
+ return new String(bytes);
+ }
+ }, retryPath).buildDelayQueue();
+
+ try {
+ delayQueue.start();
+ log.info("Dag-{}[{}] start delay queue, path={}", dagName, jobName, retryPath);
+ //CHECKSTYLE:OFF
+ } catch (Exception e) {
+ //CHECKSTYLE:ON
+ log.error("Dag-{}[{}] start delay queue Exception, path={}", dagName, jobName, retryPath, e);
+ }
+
+ return delayQueue;
+ }
+
+ private void startJobStateListener() {
+ jobStateCache.listenable().addListener(this);
+ try {
+ jobStateCache.start();
+ postEvent(DagJobStates.REG.getValue(), "Job register success");
+ //CHECKSTYLE:OFF
+ } catch (Exception exp) {
+ //CHECKSTYLE:ON
+ log.error("Start dag-{} job-{} state path listener Exception.", dagName, jobName, exp);
+ // ignore
+ postEvent(DagJobStates.REG.getValue(), "Job register Error:" + exp.getMessage());
+ }
+ log.info("Dag-{} job-{} state path listener has started success.", dagName, jobName);
+ }
+
+ private void stopJobStateListener() {
+ jobStateCache.close();
+ }
+
+ /**
+ * Is dag root job.
+ *
+ * @return boolean is dag root job
+ */
+ public boolean isDagRootJob() {
+ return StringUtils.equals(jobDagConfig.getDagDependencies(), "self");
+ }
+
+ /**
+ * current dag status.
+ *
+ * @return DagStates
+ */
+ public DagStates getDagStates() {
+ return DagStates.of(this.dagNodeStorage.currentDagStates());
+ }
+
+ /**
+ * Persist Dag config into zk.
+ * always overwrite.
+ */
+ private void regDagConfig() {
+ this.dagNodeStorage.persistDagConfig(genDependenciesString());
+ this.delayQueue = initDelayQueue();
+ this.jobStateCache = CuratorCache.build((CuratorFramework) regCenter.getRawClient(), this.dagNodeStorage.pathOfJobNodeState());
+ this.startJobStateListener();
+ }
+
+ private String genDependenciesString() {
+ return jobDagConfig.getDagDependencies();
+ }
+
+ /**
+ * 1. select leader ;
+ * 2. ReGraph DAG ;
+ * 3. Change DAG states to running
+ */
+ public void changeDagStatesAndReGraph() {
+ if (null == mutex) {
+ log.error("Need root job when change dag states and regraph!");
+ throw new DagRuntimeException("Need root job when change dag states and regraph!");
+ }
+
+ if (!acquireDagLeader()) {
+ blockUntilCompleted();
+ return;
+ }
+
+ if (getDagStates() == DagStates.RUNNING) {
+ log.info("Dag-{} states already RUNNING", dagName);
+ return;
+ }
+
+ try {
+ String batchNo = getBatchNo();
+ Map<String, Set<String>> allDagNode = dagNodeStorage.getAllDagConfigJobs();
+ checkCycle(allDagNode);
+ dagNodeStorage.initDagGraph(allDagNode, batchNo);
+ dagNodeStorage.updateDagStates(DagStates.RUNNING);
+ dagNodeStorage.updateDagJobStates(JobStateEnum.RUNNING);
+ // create graph event
+ postEvent(DagJobStates.INIT.getValue(), "Create graph success");
+ //CHECKSTYLE:OFF
+ } catch (Exception ex) {
+ //CHECKSTYLE:ON
+ postEvent(DagJobStates.INIT.getValue(), "Create graph error:" + ex.getMessage());
+ } finally {
+ releaseDagLeader();
+ }
+ }
+
+ private void blockUntilCompleted() {
+ int count = 0;
+ while (getDagStates() != DagStates.RUNNING) {
+ count++;
+ log.debug("DAG '{}' sleep short time until DAG graph completed. {}", dagName, count);
+ BlockUtils.sleep(300L);
+ if (count > 200) {
+ log.error("Dag-{} Wait a long time with Dag graph NOT complete", dagName);
+ throw new DagRuntimeException("Dag graph not complete!");
+ }
+ }
+ }
+
+ private boolean acquireDagLeader() {
+ try {
+ return mutex.acquire(200, TimeUnit.MILLISECONDS);
+ //CHECKSTYLE:OFF
+ } catch (Exception exp) {
+ //CHECKSTYLE:ON
+ log.debug("Dag-{} acquire lock error!", dagName, exp);
+ }
+ return false;
+ }
+
+ private void releaseDagLeader() {
+ try {
+ if (mutex.isAcquiredInThisProcess()) {
+ mutex.release();
+ }
+ //CHECKSTYLE:OFF
+ } catch (Exception exp) {
+ //CHECKSTYLE:ON
+ log.debug("Dag-{} release lock error!", dagName, exp);
+ }
+ }
+
+ /**
+ * Check dag has cycle.
+ *
+ * @param allDagNode dag config info.
+ */
+ private void checkCycle(final Map<String, Set<String>> allDagNode) {
+ Map<String, Set<String>> cloneMap = Maps.newHashMap();
+ allDagNode.forEach((key, value) -> cloneMap.put(key, Sets.newHashSet(value)));
+
+ while (removeSelf(cloneMap)) {
+ if (log.isDebugEnabled()) {
+ log.debug("Dag-{} remove root job.", dagName);
+ }
+ }
+ if (!cloneMap.isEmpty()) {
+ log.error("Dag {} find cycle {}", dagName, cloneMap.keySet().size());
+ printCycleNode(cloneMap);
+ throw new DagRuntimeException("Dag job find cycles");
+ }
+ log.info("Dag {} checkCycle success", dagName);
+ }
+
+ private void printCycleNode(final Map<String, Set<String>> cloneMap) {
+ cloneMap.forEach((k, v) -> {
+ log.error("{} has cycle with {}", k, Joiner.on("|").join(v));
+ });
+ }
+
+ private boolean removeSelf(final Map<String, Set<String>> cloneMap) {
+ Iterator<Map.Entry<String, Set<String>>> iterator = cloneMap.entrySet().iterator();
+ boolean removed = false;
+ while (iterator.hasNext()) {
+ Map.Entry<String, Set<String>> next = iterator.next();
+ Set<String> value = next.getValue();
+ value.remove("self");
+ if (value.isEmpty()) {
+ markKeyAsSelf(cloneMap, next.getKey());
+ iterator.remove();
+ removed = true;
+ }
+ }
+ return removed;
+ }
+
+ private void markKeyAsSelf(final Map<String, Set<String>> cloneMap, final String key) {
+ cloneMap.values().forEach(s -> s.remove(key));
+ }
+
+ private String getBatchNo() {
+ String date = DateFormatUtils.format(new Date(), "yyMMddHHmmss");
+ return dagName + IpUtils.getIp() + ManagementFactory.getRuntimeMXBean().getName().split("@")[0] + date;
+ }
+
+ /**
+ * When dag job start run ,check it's dependencies jobs states.
+ */
+ public void checkJobDependenciesState() {
+ DagJobStates currentJobRunStates = dagNodeStorage.getDagJobRunStates(jobName);
+ if (currentJobRunStates == DagJobStates.SUCCESS || currentJobRunStates == DagJobStates.FAIL) {
+ log.info("DAG- {} job- {} 's states is {},Can not run again!", jobDagConfig.getDagName(), jobName, currentJobRunStates);
+ throw new DagRuntimeException("Dag job has been completed");
+ }
+ if (isDagRootJob()) {
+ log.debug("DAG {} job {} is root,No deps.", jobDagConfig.getDagName(), jobName);
+ return;
+ }
+
+ // 要求dep skip 或 success
+ String[] deps = dagNodeStorage.getJobDenpendencies();
+ for (String dep : deps) {
+ if (StringUtils.equals(dep, "self")) {
+ continue;
+ }
+ DagJobStates jobRunStates = dagNodeStorage.getDagJobRunStates(dep);
+ if (jobRunStates != DagJobStates.SUCCESS && jobRunStates != DagJobStates.SKIP) {
+ log.info("DAG- {} job- {} Dependens job- {} Not ready!", dagName, jobName, dep);
+ throw new DagRuntimeException("Dag dependencies jobs not Ready");
+ }
+ }
+ }
+
+ private boolean retryJob() {
+ // get zk config ,check can retry?
+ JobDagConfiguration jobDagConfig = getJobDagConfig(false);
+ int times = dagNodeStorage.getJobRetryTimes();
+
+ if (jobDagConfig.getRetryTimes() < 1 || jobDagConfig.getRetryTimes() <= times) {
+ log.debug("Dag-{}[{}] config retry times{}, current times {} , skip retry!", dagName, jobName, jobDagConfig.getRetryTimes(), times);
+ if (jobDagConfig.isDagSkipWhenFail()) {
+ log.info("Dag-{}[{}] fail, mark as SKIP!", dagName, jobName);
+ dagNodeStorage.updateDagJobStates(JobStateEnum.SKIP);
+ } else {
+ dagNodeStorage.updateDagJobStates(JobStateEnum.FAIL);
+ }
+ return false;
+ }
+
+ // send to retry queue
+ try {
+ long interval = (jobDagConfig.getRetryInterval() <= 0 ? DEFAULT_RETRY_INTERVAL : jobDagConfig.getRetryInterval()) * 1000L;
+ delayQueue.put(dagName + "||" + jobName, System.currentTimeMillis() + interval);
+ //CHECKSTYLE:OFF
+ } catch (Exception exp) {
+ //CHECKSTYLE:ON
+ log.error("Dag-{}[{}] retry job to Delay queue Exception!", dagName, jobName, exp);
+ return false;
+ }
+
+ dagNodeStorage.updateJobRetryTimes(times + 1);
+ log.info("Dag-{}[{}] Retry job to delay queue success, times-[{}]", dagName, jobName, times + 1);
+ postEvent("retry", "Put to DQ");
+ return true;
+ }
+
+ /**
+ * Get JobDagConfig from local or zk.
+ *
+ * @param fromLocal From local or register center.
+ * @return dag config.
+ */
+ private JobDagConfiguration getJobDagConfig(final boolean fromLocal) {
+ if (fromLocal) {
+ return jobDagConfig;
+ }
+ ConfigurationService configurationService = new ConfigurationService(this.regCenter, this.jobName);
+ JobConfiguration jobConfiguration = configurationService.load(false);
+ if (jobConfiguration == null || jobConfiguration.getJobDagConfiguration() == null) {
+ return jobDagConfig;
+ }
+ return jobConfiguration.getJobDagConfiguration();
+ }
+
+ /**
+ * There is no dag job running.
+ *
+ * @return true if no job running.
+ */
+ private boolean hasNoJobRunning() {
+ return dagNodeStorage.getDagJobListByState(DagJobStates.RUNNING).isEmpty();
+ }
+
+ /**
+ * Acquire next should trigger jobs.
+ *
+ * @return next should trigger jobs
+ */
+ public List<String> nextShouldTriggerJob() {
+ final Map<String, Set<String>> allDagRunJobs = dagNodeStorage.getAllDagGraphJobs();
+ final List<String> successList = dagNodeStorage.getDagJobListByState(DagJobStates.SUCCESS);
+ final List<String> failList = dagNodeStorage.getDagJobListByState(DagJobStates.FAIL);
+ final List<String> runningList = dagNodeStorage.getDagJobListByState(DagJobStates.RUNNING);
+ final List<String> skipList = dagNodeStorage.getDagJobListByState(DagJobStates.SKIP);
+ List<String> nextList = Lists.newLinkedList();
+
+ allDagRunJobs.values().forEach(s -> {
+ if (s.removeIf(x -> successList.contains(x) || skipList.contains(x))) {
+ s.add("self");
+ }
+ });
+
+ successList.stream().forEach(j -> allDagRunJobs.remove(j));
+ skipList.stream().forEach(j -> allDagRunJobs.remove(j));
+
+ allDagRunJobs.entrySet().forEach(x -> {
+ if (x.getValue().isEmpty() || (x.getValue().size() == 1 && x.getValue().contains("self"))) {
+ nextList.add(x.getKey());
+ }
+ });
+
+ nextList.removeAll(runningList);
+ nextList.removeAll(failList);
+
+ log.info("Dag-{} acquire next should Trigger jobs: {}", dagName, nextList);
+ if (log.isDebugEnabled()) {
+ log.debug("Dag-{} NextTrigger all job nodes size:{}", dagName, allDagRunJobs.size());
+ allDagRunJobs.forEach((key, value) -> log.debug("Dag-{} acquire next should Trigger JOBS, Graph- {} = {}", dagName, key, Joiner.on(",").join(value)));
+ log.debug("Dag-{} acquire next should Trigger JOBS, SUCC-[{}], FAIL-[{}], RUN-[{}] , SKIP-[{}] ,Will Trigger-[{}].", dagName, successList, failList, runningList, skipList, nextList);
+ }
+
+ return nextList;
+ }
+
+ /**
+ * When Dag jobs all completed, Statistics dag state.
+ *
+ */
+ private void statisticsDagState() {
+ DagStates dagStates = null;
+ Map<String, Set<String>> allDagRunJobs = dagNodeStorage.getAllDagGraphJobs();
+ List<String> successList = dagNodeStorage.getDagJobListByState(DagJobStates.SUCCESS);
+ List<String> runningList = dagNodeStorage.getDagJobListByState(DagJobStates.RUNNING);
+ List<String> failList = dagNodeStorage.getDagJobListByState(DagJobStates.FAIL);
+ List<String> skipList = dagNodeStorage.getDagJobListByState(DagJobStates.SKIP);
+ int totalJob = allDagRunJobs.size();
+ int succJob = successList.size();
+ int failJob = failList.size();
+ int skipJob = skipList.size();
+ int runningJob = runningList.size();
+
+ log.info("Dag-{}[Statistics] totalJob-{}, successJob-{}, failJob-{}, runningJob-{}, skipJob-{}", dagName, totalJob, succJob, failJob, runningJob, skipJob);
+ if (log.isDebugEnabled()) {
+ log.debug("Dag-{}[Statistics] SUCC-[{}], FAIL-[{}], RUNNING-[{}], SKIP-[{}]", dagName, successList, failList, runningList, skipList);
+ }
+
+ if ((succJob + skipJob) == totalJob) {
+ dagStates = DagStates.SUCCESS;
+ } else if (failJob > 0) {
+ dagStates = DagStates.FAIL;
+ } else {
+ dagStates = DagStates.RUNNING;
+ }
+
+ postEvent(dagStates.getValue(), "Dag Complete");
+ log.info("Dag-{}[Statistics] DAG run complete, Final State-[{}]!", dagName, dagStates);
+ dagNodeStorage.updateDagStates(dagStates);
+ }
+
+ private void postEvent(final String state, final String message) {
+ if (null != jobEventBus) {
+ jobEventBus.post(new DagJobExecutionEvent(dagName, jobName, dagNodeStorage.currentDagBatchNo(), state, message));
+ }
+ }
+
+ /**
+ * zkpath: jobName/state listener.
+ * @param type event type.
+ * @param oldData old data
+ * @param data new data.
+ */
+ @Override
+ public void event(final Type type, final ChildData oldData, final ChildData data) {
+ if (!(type == Type.NODE_CHANGED || type == Type.NODE_CREATED)) {
+ return;
+ }
+
+ JobStateEnum jobState = JobStateEnum.of(new String(data.getData()));
+ log.info("Dag-{}[{}] receive job state EVENT-{}", dagName, jobName, jobState);
+
+ if (jobState == JobStateEnum.RUNNING || jobState == JobStateEnum.NONE) {
+ log.debug("Dag-{}[{}] receive job state EVENT NOT last state-[{}], skip.", dagName, jobName, jobState);
+ return;
+ }
+
+ // deal with retry when fail
+ if (jobState == JobStateEnum.FAIL) {
+ if (retryJob()) {
+ log.info("Dag-{}[{}] Put FAIL job to DQ Success! Waiting Triggered!", dagName, jobName);
+ return;
+ }
+ } else {
+ dagNodeStorage.updateDagJobStates(jobState);
+ }
+
+ postEvent(jobState.getValue(), "Job Complete");
+
+ DagStates dagState = getDagStates();
+ if (dagState == DagStates.PAUSE) {
+ log.info("Dag-{} current dag state is PAUSE, Do not trigger next jobs!", dagName);
+ return;
+ }
+
+ // Acquire next should trigger jobs
+ List<String> willTriggerJobs = nextShouldTriggerJob();
+
+ // If their is none job should trigger and current running job list is empty , start statistics dag state
+ if (willTriggerJobs.isEmpty()) {
+ if (hasNoJobRunning()) {
+ log.info("Dag-{}, No job running, Start statistics The DAG State.", dagName);
+ statisticsDagState();
+ return;
+ }
+ log.info("Dag-{}[{}] No trigger job, Wating for other running jobs.", dagName, jobName);
+ } else {
+ // Else register next trigger jobs.
+ log.info("Dag-{}[{}] trigger job list [{}].", dagName, jobName, willTriggerJobs);
+ willTriggerJobs.forEach(job -> {
+ postEvent("trigger", "Trigger Job");
+ dagNodeStorage.triggerJob(job);
+ });
+ }
+ }
+}
diff --git a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-api/src/main/java/org/apache/shardingsphere/elasticjob/tracing/listener/TracingListener.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/DagStates.java
similarity index 50%
copy from elasticjob-infra/elasticjob-tracing/elasticjob-tracing-api/src/main/java/org/apache/shardingsphere/elasticjob/tracing/listener/TracingListener.java
copy to elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/DagStates.java
index 414ddeb..2b1d68d 100644
--- a/elasticjob-infra/elasticjob-tracing/elasticjob-tracing-api/src/main/java/org/apache/shardingsphere/elasticjob/tracing/listener/TracingListener.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/DagStates.java
@@ -7,7 +7,7 @@
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,33 +15,53 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.elasticjob.tracing.listener;
+package org.apache.shardingsphere.elasticjob.lite.internal.dag;
-import com.google.common.eventbus.AllowConcurrentEvents;
-import com.google.common.eventbus.Subscribe;
-import org.apache.shardingsphere.elasticjob.tracing.event.JobExecutionEvent;
-import org.apache.shardingsphere.elasticjob.tracing.event.JobStatusTraceEvent;
+import org.apache.commons.lang3.StringUtils;
/**
- * Tracing listener.
+ * Dag state.
+ *
*/
-public interface TracingListener {
-
+public enum DagStates {
+ NONE("none"),
+
+ RUNNING("running"),
+
+ PAUSE("pause"),
+
+ FAIL("fail"),
+
+ SUCCESS("success");
+
+ private String value;
+
+ DagStates(final String value) {
+ this.value = value;
+ }
+
/**
- * Listen job execution event.
+ * give value return Enum.
*
- * @param jobExecutionEvent job execution event
+ * @param value enum value
+ * @return DagStates enum.
*/
- @Subscribe
- @AllowConcurrentEvents
- void listen(JobExecutionEvent jobExecutionEvent);
-
+ public static DagStates of(final String value) {
+ for (DagStates states : DagStates.values()) {
+ if (StringUtils.equalsIgnoreCase(value, states.getValue())) {
+ return states;
+ }
+ }
+ return DagStates.NONE;
+ }
+
/**
- * Listen job status trace event.
+ * Get enum value.
*
- * @param jobStatusTraceEvent job status trace event
+ * @return string
*/
- @Subscribe
- @AllowConcurrentEvents
- void listen(JobStatusTraceEvent jobStatusTraceEvent);
+ public String getValue() {
+ return value;
+ }
+
}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/JobRetryTrigger.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/JobRetryTrigger.java
new file mode 100644
index 0000000..85c70b6
--- /dev/null
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/JobRetryTrigger.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.lite.internal.dag;
+
+import com.google.common.base.Splitter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.queue.QueueConsumer;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
+
+import java.util.List;
+
+/**
+ * Trigger retry dag job.
+ *
+ **/
+@Slf4j
+public class JobRetryTrigger implements QueueConsumer<String> {
+ private final CoordinatorRegistryCenter regCenter;
+
+ private final String dagName;
+
+ public JobRetryTrigger(final CoordinatorRegistryCenter regCenter, final String dagName) {
+ this.regCenter = regCenter;
+ this.dagName = dagName;
+ }
+
+ @Override
+ public void consumeMessage(final String message) throws Exception {
+ // message format: dagName||jobName
+ // trigger the job only when dag state is running
+ if (StringUtils.isEmpty(message)) {
+ log.info("Dag-{} Retry job Receive message is empty, return!", dagName);
+ return;
+ }
+
+ List<String> strings = Splitter.on("||").splitToList(message);
+ if (strings.size() != 2) {
+ log.info("Dag-{} Retry job message format not right! {}", dagName, message);
+ return;
+ }
+
+ String jobName = strings.get(1);
+ log.info("Dag-{} start Retry job-{}", dagName, jobName);
+ DagNodeStorage dagNodeStorage = new DagNodeStorage(regCenter, dagName, jobName);
+
+ DagStates dagState = DagStates.of(dagNodeStorage.currentDagStates());
+ if (dagState != DagStates.RUNNING) {
+ log.info("Dag-{} retry job, dag state-{} not RUNNING, quit!", dagName, jobName, dagState);
+ return;
+ }
+
+ dagNodeStorage.triggerRetryJob();
+ }
+
+ @Override
+ public void stateChanged(final CuratorFramework client, final ConnectionState newState) {
+
+ }
+
+}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduler.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduler.java
index 559cfaf..f175907 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduler.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduler.java
@@ -82,10 +82,10 @@ public final class JobScheduler {
this.elasticJobListeners = Arrays.asList(elasticJobListeners);
setUpFacade = new SetUpFacade(regCenter, jobConfig.getJobName(), this.elasticJobListeners);
schedulerFacade = new SchedulerFacade(regCenter, jobConfig.getJobName());
- jobFacade = new LiteJobFacade(regCenter, jobConfig.getJobName(), this.elasticJobListeners, tracingConfig);
- jobExecutor = null == elasticJob ? new ElasticJobExecutor(elasticJobType, jobConfig, jobFacade) : new ElasticJobExecutor(elasticJob, jobConfig, jobFacade);
String jobClassName = JobClassNameProviderFactory.getProvider().getJobClassName(elasticJob);
this.jobConfig = setUpFacade.setUpJobConfiguration(jobClassName, jobConfig);
+ jobFacade = new LiteJobFacade(regCenter, jobConfig.getJobName(), this.elasticJobListeners, tracingConfig, jobConfig.getJobDagConfiguration());
+ jobExecutor = null == elasticJob ? new ElasticJobExecutor(elasticJobType, jobConfig, jobFacade) : new ElasticJobExecutor(elasticJob, jobConfig, jobFacade);
setGuaranteeServiceForElasticJobListeners(regCenter, this.elasticJobListeners);
jobScheduleController = createJobScheduleController();
}
@@ -102,9 +102,9 @@ public final class JobScheduler {
this.elasticJobListeners = Arrays.asList(elasticJobListeners);
setUpFacade = new SetUpFacade(regCenter, jobConfig.getJobName(), this.elasticJobListeners);
schedulerFacade = new SchedulerFacade(regCenter, jobConfig.getJobName());
- jobFacade = new LiteJobFacade(regCenter, jobConfig.getJobName(), this.elasticJobListeners, tracingConfig);
- jobExecutor = null == elasticJob ? new ElasticJobExecutor(elasticJobType, jobConfig, jobFacade) : new ElasticJobExecutor(elasticJob, jobConfig, jobFacade);
this.jobConfig = setUpFacade.setUpJobConfiguration(elasticJobType, jobConfig);
+ jobFacade = new LiteJobFacade(regCenter, jobConfig.getJobName(), this.elasticJobListeners, tracingConfig, jobConfig.getJobDagConfiguration());
+ jobExecutor = null == elasticJob ? new ElasticJobExecutor(elasticJobType, jobConfig, jobFacade) : new ElasticJobExecutor(elasticJob, jobConfig, jobFacade);
setGuaranteeServiceForElasticJobListeners(regCenter, this.elasticJobListeners);
jobScheduleController = createJobScheduleController();
}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/LiteJobFacade.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/LiteJobFacade.java
index 23067b5..af72da4 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/LiteJobFacade.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/LiteJobFacade.java
@@ -20,12 +20,16 @@ package org.apache.shardingsphere.elasticjob.lite.internal.schedule;
import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
+import org.apache.shardingsphere.elasticjob.api.JobDagConfiguration;
import org.apache.shardingsphere.elasticjob.api.listener.ElasticJobListener;
import org.apache.shardingsphere.elasticjob.api.listener.ShardingContexts;
import org.apache.shardingsphere.elasticjob.executor.JobFacade;
import org.apache.shardingsphere.elasticjob.infra.context.TaskContext;
+import org.apache.shardingsphere.elasticjob.infra.exception.DagRuntimeException;
import org.apache.shardingsphere.elasticjob.infra.exception.JobExecutionEnvironmentException;
import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
+import org.apache.shardingsphere.elasticjob.lite.internal.dag.DagService;
+import org.apache.shardingsphere.elasticjob.lite.internal.dag.DagStates;
import org.apache.shardingsphere.elasticjob.lite.internal.failover.FailoverService;
import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ExecutionContextService;
import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ExecutionService;
@@ -40,6 +44,7 @@ import org.apache.shardingsphere.elasticjob.tracing.event.JobStatusTraceEvent.St
import java.util.Collection;
import java.util.List;
+import java.util.Map;
/**
* Lite job facade.
@@ -60,6 +65,8 @@ public final class LiteJobFacade implements JobFacade {
private final List<ElasticJobListener> elasticJobListeners;
private final JobEventBus jobEventBus;
+
+ private final DagService dagService;
public LiteJobFacade(final CoordinatorRegistryCenter regCenter, final String jobName, final List<ElasticJobListener> elasticJobListeners, final TracingConfiguration tracingConfig) {
configService = new ConfigurationService(regCenter, jobName);
@@ -69,8 +76,25 @@ public final class LiteJobFacade implements JobFacade {
failoverService = new FailoverService(regCenter, jobName);
this.elasticJobListeners = elasticJobListeners;
this.jobEventBus = null == tracingConfig ? new JobEventBus() : new JobEventBus(tracingConfig);
+ this.dagService = null;
}
-
+
+ public LiteJobFacade(final CoordinatorRegistryCenter regCenter, final String jobName, final List<ElasticJobListener> elasticJobListeners,
+ final TracingConfiguration tracingConfig, final JobDagConfiguration jobDagConfiguration) {
+ configService = new ConfigurationService(regCenter, jobName);
+ shardingService = new ShardingService(regCenter, jobName);
+ executionContextService = new ExecutionContextService(regCenter, jobName);
+ executionService = new ExecutionService(regCenter, jobName);
+ failoverService = new FailoverService(regCenter, jobName);
+ this.elasticJobListeners = elasticJobListeners;
+ this.jobEventBus = null == tracingConfig ? new JobEventBus() : new JobEventBus(tracingConfig);
+ if (null == jobDagConfiguration) {
+ this.dagService = null;
+ } else {
+ this.dagService = new DagService(regCenter, jobName, jobEventBus, jobDagConfiguration);
+ }
+ }
+
@Override
public JobConfiguration loadJobConfiguration(final boolean fromCache) {
return configService.load(fromCache);
@@ -100,7 +124,15 @@ public final class LiteJobFacade implements JobFacade {
failoverService.updateFailoverComplete(shardingContexts.getShardingItemParameters().keySet());
}
}
-
+
+ @Override
+ public void registerJobCompleted(final ShardingContexts shardingContexts, final Map<Integer, String> itemErrorMessages) {
+ executionService.registerJobCompleted(shardingContexts, itemErrorMessages);
+ if (configService.load(true).isFailover()) {
+ failoverService.updateFailoverComplete(shardingContexts.getShardingItemParameters().keySet());
+ }
+ }
+
@Override
public ShardingContexts getShardingContexts() {
boolean isFailover = configService.load(true).isFailover();
@@ -167,4 +199,33 @@ public final class LiteJobFacade implements JobFacade {
log.trace(message);
}
}
+
+ @Override
+ public boolean isDagJob() {
+ return dagService != null;
+ }
+
+ @Override
+ public void dagStatesCheck() {
+ if (dagService.getDagStates() == DagStates.RUNNING) {
+ return;
+ }
+ if (dagService.getDagStates() == DagStates.PAUSE) {
+ throw new DagRuntimeException("Dag Job states PAUSE");
+ }
+
+ if (dagService.isDagRootJob()) {
+ dagService.changeDagStatesAndReGraph();
+ }
+
+ if (dagService.getDagStates() != DagStates.RUNNING) {
+ log.info("DAG states not right {}", dagService.getDagStates().getValue());
+ throw new DagRuntimeException("Dag states Not right!");
+ }
+ }
+
+ @Override
+ public void dagJobDependenciesCheck() {
+ dagService.checkJobDependenciesState();
+ }
}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ExecutionService.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ExecutionService.java
index de8a711..34169ff 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ExecutionService.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ExecutionService.java
@@ -21,12 +21,15 @@ import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.api.listener.ShardingContexts;
import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobRegistry;
+import org.apache.shardingsphere.elasticjob.lite.internal.state.JobStateEnum;
+import org.apache.shardingsphere.elasticjob.lite.internal.state.JobStateNode;
import org.apache.shardingsphere.elasticjob.lite.internal.storage.JobNodeStorage;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
/**
* Execution service.
@@ -58,6 +61,16 @@ public final class ExecutionService {
for (int each : shardingContexts.getShardingItemParameters().keySet()) {
jobNodeStorage.fillEphemeralJobNode(ShardingNode.getRunningNode(each), "");
}
+ if (!jobNodeStorage.isJobNodeExisted(JobStateNode.getRootState())) {
+ jobNodeStorage.createJobNodeIfNeeded(JobStateNode.getRootState(), JobStateEnum.RUNNING);
+ } else {
+ JobStateEnum jobState = JobStateEnum.of(jobNodeStorage.getJobNodeDataDirectly(JobStateNode.getRootState()));
+ if (jobState != JobStateEnum.RUNNING) {
+ jobNodeStorage.updateJobNode(JobStateNode.getRootState(), JobStateEnum.RUNNING);
+ }
+ }
+ jobNodeStorage.createJobNodeIfNeeded(JobStateNode.getRooProcSucc());
+ jobNodeStorage.createJobNodeIfNeeded(JobStateNode.getRooProcFail());
}
/**
@@ -74,7 +87,49 @@ public final class ExecutionService {
jobNodeStorage.removeJobNodeIfExisted(ShardingNode.getRunningNode(each));
}
}
-
+
+ /**
+ * Register job completed and Statistical the job status.
+ *
+ * @param shardingContexts sharding contexts
+ * @param itemErrorMessages error items.
+ */
+ public void registerJobCompleted(final ShardingContexts shardingContexts, final Map<Integer, String> itemErrorMessages) {
+ JobRegistry.getInstance().setJobRunning(jobName, false);
+ if (!configService.load(true).isMonitorExecution()) {
+ return;
+ }
+ for (int each : shardingContexts.getShardingItemParameters().keySet()) {
+ jobNodeStorage.removeJobNodeIfExisted(ShardingNode.getRunningNode(each));
+
+ if (itemErrorMessages.containsKey(each)) {
+ // fail
+ jobNodeStorage.removeJobNodeIfExisted(JobStateNode.getProcSucc(each));
+ jobNodeStorage.createJobNodeIfNeeded(JobStateNode.getProcFail(each));
+ } else {
+ // succ
+ jobNodeStorage.removeJobNodeIfExisted(JobStateNode.getProcFail(each));
+ jobNodeStorage.createJobNodeIfNeeded(JobStateNode.getProcSucc(each));
+ }
+ }
+
+ List<String> succList = jobNodeStorage.getJobNodeChildrenKeys(JobStateNode.getRooProcSucc());
+ List<String> failList = jobNodeStorage.getJobNodeChildrenKeys(JobStateNode.getRooProcFail());
+ int shardingTotalCount = shardingContexts.getShardingTotalCount();
+ int succCount = succList.size();
+ int failCount = failList.size();
+ if ((succCount + failCount) == shardingTotalCount) {
+ if (failCount == 0) {
+ jobNodeStorage.updateJobNode(JobStateNode.getRootState(), JobStateEnum.SUCCESS);
+ } else {
+ jobNodeStorage.updateJobNode(JobStateNode.getRootState(), JobStateEnum.FAIL);
+ }
+ jobNodeStorage.removeJobNodeIfExisted(JobStateNode.getRooProcFail());
+ jobNodeStorage.removeJobNodeIfExisted(JobStateNode.getRooProcSucc());
+ }
+
+ }
+
/**
* Clear all running info.
*/
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/state/JobStateEnum.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/state/JobStateEnum.java
new file mode 100644
index 0000000..c2068b6
--- /dev/null
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/state/JobStateEnum.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.lite.internal.state;
+
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * job state enum.
+ *
+ **/
+public enum JobStateEnum {
+ NONE("none"),
+
+ RUNNING("running"),
+
+ SKIP("skip"),
+
+ FAIL("fail"),
+
+ SUCCESS("success");
+
+ private String value;
+
+ JobStateEnum(final String value) {
+ this.value = value;
+ }
+
+ /**
+ * Give value return enum.
+ *
+ * @param value of enum.
+ * @return JobStateEnum
+ */
+ public static JobStateEnum of(final String value) {
+ for (JobStateEnum states : JobStateEnum.values()) {
+ if (StringUtils.equalsIgnoreCase(value, states.getValue())) {
+ return states;
+ }
+ }
+ return JobStateEnum.NONE;
+ }
+
+ /**
+ * Get value of enum.
+ *
+ * @return string
+ */
+ public String getValue() {
+ return value;
+ }
+
+}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/state/JobStateNode.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/state/JobStateNode.java
new file mode 100644
index 0000000..dbf648c
--- /dev/null
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/state/JobStateNode.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.lite.internal.state;
+
+import org.apache.shardingsphere.elasticjob.lite.internal.storage.JobNodePath;
+
+/**
+ * Job state state node path.
+ *
+ **/
+public class JobStateNode {
+ public static final String ROOT_STATE = "state/state";
+
+ public static final String ROOT_STATE_FOR_CACHE = "state";
+
+ private static final String ROOT_PROC = "proc";
+
+ private static final String ROOT_FAIL = "proc/fail";
+
+ private static final String ROOT_SUCC = "proc/succ";
+
+ private static final String PROC_FAIL = "proc/fail/%s";
+
+ private static final String PROC_SUCC = "proc/succ/%s";
+
+ private final JobNodePath jobNodePath;
+
+ public JobStateNode(final String jobName) {
+ jobNodePath = new JobNodePath(jobName);
+ }
+
+ /**
+ * Get job's root state path.
+ *
+ * @return root state path.
+ */
+ public static String getRootState() {
+ return ROOT_STATE;
+ }
+
+ /**
+ * Get job's proc path.
+ *
+ * @return proc path
+ */
+ public static String getRootProc() {
+ return ROOT_PROC;
+ }
+
+ /**
+ * Get job item's fail path.
+ *
+ * @param item sharding item
+ * @return fail path.
+ */
+ public static String getProcFail(final int item) {
+ return String.format(PROC_FAIL, item);
+ }
+
+ /**
+ * Get job item's success path.
+ * @param item sharding item.
+ * @return success path.
+ */
+ public static String getProcSucc(final int item) {
+ return String.format(PROC_SUCC, item);
+ }
+
+ /**
+ * Get job proc fail path.
+ *
+ * @return proc fail path
+ */
+ public static String getRooProcFail() {
+ return ROOT_FAIL;
+ }
+
+ /**
+ * Get job proc success path.
+ *
+ * @return proc success path
+ */
+ public static String getRooProcSucc() {
+ return ROOT_SUCC;
+ }
+
+}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/storage/JobNodeStorage.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/storage/JobNodeStorage.java
index 63e8eef..55d0f3c 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/storage/JobNodeStorage.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/storage/JobNodeStorage.java
@@ -118,7 +118,19 @@ public final class JobNodeStorage {
regCenter.persist(jobNodePath.getFullPath(node), "");
}
}
-
+
+ /**
+ * Create job node if needed.
+ *
+ * @param node node
+ * @param value value
+ */
+ public void createJobNodeIfNeeded(final String node, final Object value) {
+ if (isJobRootNodeExisted() && !isJobNodeExisted(node)) {
+ regCenter.persist(jobNodePath.getFullPath(node), value.toString());
+ }
+ }
+
/**
* Remove job node if existed.
*
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/DagJobRetryTriggerTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/DagJobRetryTriggerTest.java
new file mode 100644
index 0000000..611b8cd
--- /dev/null
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/DagJobRetryTriggerTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.lite.internal.dag;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.shardingsphere.elasticjob.lite.util.ReflectionUtils;
+import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DagJobRetryTriggerTest {
+
+ @Mock
+ private CoordinatorRegistryCenter regCenter;
+
+ private JobRetryTrigger jobRetryTrigger;
+
+ @Before
+ public void setUp() {
+ jobRetryTrigger = new JobRetryTrigger(regCenter, "testDag");
+ ReflectionUtils.setFieldValue(jobRetryTrigger, "regCenter", regCenter);
+ }
+
+ @Test
+ public void consumeMessage() throws Exception {
+ when(regCenter.isExisted("/dag/testDag/states")).thenReturn(true);
+ when(regCenter.getDirectly("/dag/testDag/states")).thenReturn("running");
+ jobRetryTrigger.consumeMessage("testDag||job1");
+ }
+
+ @Test
+ public void stateChanged() {
+ jobRetryTrigger.stateChanged((CuratorFramework) regCenter.getRawClient(), ConnectionState.CONNECTED);
+ }
+}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/DagNodeStorageTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/DagNodeStorageTest.java
new file mode 100644
index 0000000..7865545
--- /dev/null
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/DagNodeStorageTest.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.lite.internal.dag;
+
+import com.google.common.collect.Lists;
+import org.apache.shardingsphere.elasticjob.lite.internal.state.JobStateEnum;
+import org.apache.shardingsphere.elasticjob.lite.util.ReflectionUtils;
+import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.internal.util.collections.Sets;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DagNodeStorageTest {
+
+ @Mock
+ private CoordinatorRegistryCenter regCenter;
+
+ private DagNodeStorage dagNodeStorage;
+
+ @Before
+ public void setUp() {
+ dagNodeStorage = new DagNodeStorage(regCenter, "testDag", "testJob");
+ ReflectionUtils.setFieldValue(dagNodeStorage, "regCenter", regCenter);
+ }
+
+ private Map<String, Set<String>> createAllDagNode() {
+ Map<String, Set<String>> allDagNode = new HashMap<>(6);
+ allDagNode.put("testJob", Sets.newSet("self"));
+ allDagNode.put("job1", Sets.newSet("testJob"));
+ allDagNode.put("job2", Sets.newSet("testJob"));
+ allDagNode.put("job3", Sets.newSet("testJob", "job1"));
+ allDagNode.put("job4", Sets.newSet("job1", "job2"));
+ allDagNode.put("job5", Sets.newSet("job3", "job4"));
+ return allDagNode;
+ }
+
+ @Test
+ public void persistDagConfig() {
+ dagNodeStorage.persistDagConfig("value");
+ verify(regCenter).persist("/dag/testDag/config/testJob", "value");
+ }
+
+ @Test
+ public void pathOfJobNodeState() {
+ String s = dagNodeStorage.pathOfJobNodeState();
+ assertThat(s, is("/testJob/state/state"));
+ }
+
+ @Test
+ public void initDagGraph() {
+ when(regCenter.isExisted("/dag/testDag/graph")).thenReturn(true);
+ dagNodeStorage.initDagGraph(createAllDagNode(), "batch-foo-no");
+ verify(regCenter).remove("/dag/testDag/graph");
+ verify(regCenter).persist("/dag/testDag/graph", "batch-foo-no");
+ }
+
+ @Test
+ public void currentDagBatchNo() {
+ when(regCenter.getDirectly("/dag/testDag/graph")).thenReturn("batch-foo-no");
+ assertThat(dagNodeStorage.currentDagBatchNo(), is("batch-foo-no"));
+ verify(regCenter).getDirectly("/dag/testDag/graph");
+ }
+
+ @Test
+ public void currentDagStates() {
+ when(regCenter.isExisted("/dag/testDag/states")).thenReturn(true);
+ when(regCenter.getDirectly("/dag/testDag/states")).thenReturn("fail");
+ assertThat(dagNodeStorage.currentDagStates(), is("fail"));
+ verify(regCenter).getDirectly("/dag/testDag/states");
+ }
+
+ @Test
+ public void updateDagStates() {
+ dagNodeStorage.updateDagStates(DagStates.SUCCESS);
+ verify(regCenter).update("/dag/testDag/states", "success");
+ }
+
+ @Test
+ public void updateDagJobStatesRunning() {
+ dagNodeStorage.updateDagJobStates(JobStateEnum.RUNNING);
+ verify(regCenter, times(1)).persist(eq("/dag/testDag/running/testJob"), anyString());
+ }
+
+ @Test
+ public void updateDagJobStatesSkip() {
+ dagNodeStorage.updateDagJobStates(JobStateEnum.SKIP);
+ verify(regCenter).remove("/dag/testDag/running/testJob");
+ verify(regCenter, times(1)).persist(eq("/dag/testDag/skip/testJob"), anyString());
+ }
+
+ @Test
+ public void updateDagJobStatesSuccess() {
+ dagNodeStorage.updateDagJobStates(JobStateEnum.SUCCESS);
+ verify(regCenter).remove("/dag/testDag/running/testJob");
+ verify(regCenter, times(1)).persist(eq("/dag/testDag/success/testJob"), anyString());
+ }
+
+ @Test
+ public void updateDagJobStatesFail() {
+ dagNodeStorage.updateDagJobStates(JobStateEnum.FAIL);
+ verify(regCenter).remove("/dag/testDag/running/testJob");
+ verify(regCenter, times(1)).persist(eq("/dag/testDag/fail/testJob"), anyString());
+ }
+
+ @Test
+ public void getAllDagConfigJobs() {
+ when(regCenter.getChildrenKeys("/dag/testDag/config")).thenReturn(Lists.newArrayList("job1", "job2", "job3"));
+ when(regCenter.getDirectly("/dag/testDag/config/job1")).thenReturn("self");
+ when(regCenter.getDirectly("/dag/testDag/config/job2")).thenReturn("self,job1");
+ when(regCenter.getDirectly("/dag/testDag/config/job3")).thenReturn("job1,job2");
+ Map<String, Set<String>> allDagConfigJobs = dagNodeStorage.getAllDagConfigJobs();
+ assertThat(allDagConfigJobs.size(), is(3));
+ }
+
+ @Test
+ public void getAllDagGraphJobs() {
+ when(regCenter.getChildrenKeys("/dag/testDag/graph")).thenReturn(Lists.newArrayList("job1", "job2", "job3"));
+ when(regCenter.getDirectly("/dag/testDag/graph/job1")).thenReturn("self");
+ when(regCenter.getDirectly("/dag/testDag/graph/job2")).thenReturn("self,job1");
+ when(regCenter.getDirectly("/dag/testDag/graph/job3")).thenReturn("job1,job2");
+ Map<String, Set<String>> allDagGraphJobs = dagNodeStorage.getAllDagGraphJobs();
+ assertThat(allDagGraphJobs.size(), is(3));
+ }
+
+ @Test
+ public void getDagJobListByState() {
+ when(regCenter.getChildrenKeys("/dag/testDag/running")).thenReturn(Lists.newArrayList("job1", "job2"));
+ when(regCenter.getChildrenKeys("/dag/testDag/success")).thenReturn(Lists.newArrayList("job1", "job2"));
+ when(regCenter.getChildrenKeys("/dag/testDag/fail")).thenReturn(Lists.newArrayList("job1", "job2"));
+ when(regCenter.getChildrenKeys("/dag/testDag/skip")).thenReturn(Lists.newArrayList("job1", "job2"));
+ when(regCenter.getChildrenKeys("/dag/testDag/retry")).thenReturn(Lists.newArrayList("job1", "job2"));
+ assertThat(dagNodeStorage.getDagJobListByState(DagJobStates.RUNNING).size(), is(2));
+ assertThat(dagNodeStorage.getDagJobListByState(DagJobStates.SUCCESS).size(), is(2));
+ assertThat(dagNodeStorage.getDagJobListByState(DagJobStates.FAIL).size(), is(2));
+ assertThat(dagNodeStorage.getDagJobListByState(DagJobStates.SKIP).size(), is(2));
+ assertThat(dagNodeStorage.getDagJobListByState(DagJobStates.RETRY).size(), is(2));
+ }
+
+ @Test
+ public void getJobDenpendencies() {
+ when(regCenter.get("/dag/testDag/graph/testJob")).thenReturn("job1,job2");
+ String[] jobDenpendencies = dagNodeStorage.getJobDenpendencies();
+ assertThat(jobDenpendencies, is(new String[]{"job1", "job2"}));
+ }
+
+ @Test
+ public void getDagJobRunStates() {
+ when(regCenter.isExisted("/dag/testDag/success/job1")).thenReturn(true);
+ when(regCenter.isExisted("/dag/testDag/fail/job2")).thenReturn(true);
+ when(regCenter.isExisted("/dag/testDag/running/job3")).thenReturn(true);
+ when(regCenter.isExisted("/dag/testDag/skip/job4")).thenReturn(true);
+ assertThat(dagNodeStorage.getDagJobRunStates("job1"), is(DagJobStates.SUCCESS));
+ assertThat(dagNodeStorage.getDagJobRunStates("job2"), is(DagJobStates.FAIL));
+ assertThat(dagNodeStorage.getDagJobRunStates("job3"), is(DagJobStates.RUNNING));
+ assertThat(dagNodeStorage.getDagJobRunStates("job4"), is(DagJobStates.SKIP));
+ }
+
+ @Test
+ public void triggerJobWhenTrigged() {
+ when(regCenter.isExisted("/dag/testDag/running/job1")).thenReturn(true);
+ dagNodeStorage.triggerJob("job1");
+ }
+
+ @Test
+ public void triggerJob() {
+ when(regCenter.isExisted("/dag/testDag/running/job1")).thenReturn(false);
+ when(regCenter.isExisted("/dag/testDag/success/job1")).thenReturn(false);
+ when(regCenter.isExisted("/dag/testDag/fail/job1")).thenReturn(false);
+ dagNodeStorage.triggerJob("job1");
+ }
+
+ @Test
+ public void triggerRetryJob() {
+ dagNodeStorage.triggerRetryJob();
+ }
+
+ @Test
+ public void getJobRetryTimes() {
+ when(regCenter.getDirectly("/dag/testDag/graph/testJob/retry")).thenReturn("3");
+ assertThat(dagNodeStorage.getJobRetryTimes(), is(3));
+ }
+
+ @Test
+ public void updateJobRetryTimes() {
+ dagNodeStorage.updateJobRetryTimes(3);
+ verify(regCenter).persist("/dag/testDag/retry/testJob", "");
+ verify(regCenter).persist("/dag/testDag/graph/testJob/retry", "3");
+ }
+
+ @Test
+ public void removeFailJob() {
+ dagNodeStorage.removeFailJob("job1");
+ verify(regCenter).remove("/dag/testDag/fail/job1");
+ }
+
+ @Test
+ public void getAllDags() {
+ dagNodeStorage.getAllDags();
+ verify(regCenter).getChildrenKeys("/dag");
+ }
+
+}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/DagServiceTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/DagServiceTest.java
new file mode 100644
index 0000000..d5baf2e
--- /dev/null
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/DagServiceTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.lite.internal.dag;
+
+import com.google.common.collect.Lists;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
+import org.apache.shardingsphere.elasticjob.api.JobDagConfiguration;
+import org.apache.shardingsphere.elasticjob.infra.exception.DagRuntimeException;
+import org.apache.shardingsphere.elasticjob.lite.util.ReflectionUtils;
+import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
+import org.apache.shardingsphere.elasticjob.tracing.JobEventBus;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.internal.util.collections.Sets;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DagServiceTest {
+
+ @Mock
+ private CoordinatorRegistryCenter regCenter;
+
+ @Mock
+ private DagNodeStorage dagNodeStorage;
+
+ @Mock
+ private JobEventBus jobEventBus;
+
+ private DagService dagService;
+
+ @Before
+ public void setUp() {
+ dagService = new DagService(regCenter, "testDag", dagNodeStorage);
+ ReflectionUtils.setFieldValue(dagNodeStorage, "regCenter", regCenter);
+ ReflectionUtils.setFieldValue(dagService, "jobEventBus", jobEventBus);
+ ReflectionUtils.setFieldValue(dagService, "dagNodeStorage", dagNodeStorage);
+ ReflectionUtils.setFieldValue(dagService, "jobDagConfig",
+ new JobDagConfiguration("testDag", "job1, job2", 3, 300, false, false));
+ ReflectionUtils.setFieldValue(dagService, "jobName", "testJob");
+ }
+
+ @Test
+ public void isDagRootJob() {
+ assertFalse(dagService.isDagRootJob());
+ }
+
+ @Test
+ public void getDagStates() {
+ when(dagNodeStorage.currentDagStates()).thenReturn("running");
+ assertThat(dagService.getDagStates(), is(DagStates.RUNNING));
+ }
+
+ @Test(expected = DagRuntimeException.class)
+ public void changeDagStatesAndReGraph() {
+ dagService.changeDagStatesAndReGraph();
+ }
+
+ @Test(expected = DagRuntimeException.class)
+ public void checkJobDependenciesState() {
+ when(dagNodeStorage.getDagJobRunStates("testJob")).thenReturn(DagJobStates.FAIL);
+ dagService.checkJobDependenciesState();
+ }
+
+ @Test
+ public void checkJobDependenciesStateRunning() {
+ when(dagNodeStorage.getDagJobRunStates("testJob")).thenReturn(DagJobStates.RUNNING);
+ when(dagNodeStorage.getJobDenpendencies()).thenReturn(new String[]{"self"});
+ dagService.checkJobDependenciesState();
+ }
+
+ @Test
+ public void nextShouldTriggerJob() {
+ when(dagNodeStorage.getAllDagGraphJobs()).thenReturn(createAllDagGraph());
+ when(dagNodeStorage.getDagJobListByState(DagJobStates.RUNNING)).thenReturn(Lists.newArrayList("job3"));
+ when(dagNodeStorage.getDagJobListByState(DagJobStates.SKIP)).thenReturn(Lists.newArrayList("job1"));
+ when(dagNodeStorage.getDagJobListByState(DagJobStates.SUCCESS)).thenReturn(Lists.newArrayList("job2", "testJob"));
+ when(dagNodeStorage.getDagJobListByState(DagJobStates.FAIL)).thenReturn(Lists.newArrayList());
+ assertThat(dagService.nextShouldTriggerJob(), is(Lists.newArrayList("job4")));
+ }
+
+ private Map<String, Set<String>> createAllDagGraph() {
+ Map<String, Set<String>> allDagNode = new HashMap<>(6);
+ allDagNode.put("testJob", Sets.newSet("self"));
+ allDagNode.put("job1", Sets.newSet("testJob"));
+ allDagNode.put("job2", Sets.newSet("testJob"));
+ allDagNode.put("job3", Sets.newSet("testJob", "job1"));
+ allDagNode.put("job4", Sets.newSet("job1", "job2"));
+ allDagNode.put("job5", Sets.newSet("job3", "job4"));
+ return allDagNode;
+ }
+
+ @Test
+ public void event() {
+ dagService.event(CuratorCacheListener.Type.NODE_CHANGED, new ChildData("/job1/state/state", null, "running".getBytes()), new ChildData("/job1/state/state", null, "success".getBytes()));
+ }
+}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/JobDagConfigTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/JobDagConfigTest.java
new file mode 100644
index 0000000..b76f9ec
--- /dev/null
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/JobDagConfigTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.lite.internal.dag;
+
+import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
+import org.apache.shardingsphere.elasticjob.api.JobDagConfiguration;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import org.apache.shardingsphere.elasticjob.infra.yaml.YamlEngine;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+//CHECKSTYLE:OFF
+//CHECKSTYLE:ON
+
+/**
+ * Test for dag config.
+ **/
+public class JobDagConfigTest {
+
+ @Test
+ public void assertBuildJobDagConfig() {
+ JobDagConfiguration jobDagConfig = new JobDagConfiguration();
+ jobDagConfig.setDagName("dagGroup");
+ jobDagConfig.setDagDependencies("jobb,jobc");
+ JobConfiguration actual = JobConfiguration.newBuilder("test_job", 3)
+ .jobDagConfiguration(jobDagConfig)
+ .cron("0/1 * * * * ?")
+ .build();
+ assertNotNull(actual.getJobDagConfiguration());
+ assertThat(actual.getJobDagConfiguration().getDagName(), is("dagGroup"));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void assertBuildJobDagConfigEmpty() {
+ JobDagConfiguration jobDagConfig = new JobDagConfiguration();
+ JobConfiguration actual = JobConfiguration.newBuilder("test_job", 3)
+ .jobDagConfiguration(jobDagConfig)
+ .build();
+ }
+
+ @Test
+ public void assertJobDagConfigGson() {
+ JobDagConfiguration jobDagConfig = new JobDagConfiguration();
+ jobDagConfig.setDagName("dagGroup");
+ jobDagConfig.setDagDependencies("jobb,jobc");
+ jobDagConfig.setDagSkipWhenFail(true);
+ jobDagConfig.setDagRunAlone(false);
+ jobDagConfig.setRetryInterval(5000);
+ jobDagConfig.setRetryTimes(5);
+ JobConfiguration actual = JobConfiguration.newBuilder("test_job", 3)
+ .jobDagConfiguration(jobDagConfig)
+ .cron("0/1 * * * * ?")
+ .jobExecutorServiceHandlerType("simple")
+ .description("test yaml")
+ .failover(true)
+ .jobParameter("a=1,b=-2")
+ .overwrite(true)
+ .build();
+
+ System.out.println(actual);
+
+ String marshal = YamlEngine.marshal(JobConfigurationPOJO.fromJobConfiguration(actual));
+ System.out.println(marshal);
+
+ JobConfiguration unmarshal = YamlEngine.unmarshal(marshal, JobConfigurationPOJO.class).toJobConfiguration();
+
+ assertNotNull(unmarshal);
+ assertNotNull(unmarshal.getJobDagConfiguration());
+ assertEquals(unmarshal.getJobDagConfiguration().getDagName(), "dagGroup");
+ assertEquals(unmarshal.getJobDagConfiguration().getDagDependencies(), "jobb,jobc");
+ assertEquals(unmarshal.getJobDagConfiguration().getRetryTimes(), 5);
+ assertEquals(unmarshal.getJobDagConfiguration().getRetryInterval(), 5000);
+ assertEquals(unmarshal.getJobDagConfiguration().isDagRunAlone(), false);
+ assertEquals(unmarshal.getJobDagConfiguration().isDagSkipWhenFail(), true);
+ }
+}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/LiteJobFacadeTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/LiteJobFacadeTest.java
index 1642ab6..fc438a3 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/LiteJobFacadeTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/LiteJobFacadeTest.java
@@ -20,10 +20,13 @@ package org.apache.shardingsphere.elasticjob.lite.internal.schedule;
import com.google.common.collect.Lists;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.api.listener.ShardingContexts;
+import org.apache.shardingsphere.elasticjob.infra.exception.DagRuntimeException;
import org.apache.shardingsphere.elasticjob.infra.exception.JobExecutionEnvironmentException;
import org.apache.shardingsphere.elasticjob.lite.api.listener.fixture.ElasticJobListenerCaller;
import org.apache.shardingsphere.elasticjob.lite.api.listener.fixture.TestElasticJobListener;
import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
+import org.apache.shardingsphere.elasticjob.lite.internal.dag.DagService;
+import org.apache.shardingsphere.elasticjob.lite.internal.dag.DagStates;
import org.apache.shardingsphere.elasticjob.lite.internal.failover.FailoverService;
import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ExecutionContextService;
import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ExecutionService;
@@ -41,6 +44,7 @@ import java.util.Collections;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -68,6 +72,9 @@ public final class LiteJobFacadeTest {
@Mock
private ElasticJobListenerCaller caller;
+
+ @Mock
+ private DagService dagService;
private LiteJobFacade liteJobFacade;
@@ -80,6 +87,7 @@ public final class LiteJobFacadeTest {
ReflectionUtils.setFieldValue(liteJobFacade, "executionService", executionService);
ReflectionUtils.setFieldValue(liteJobFacade, "failoverService", failoverService);
ReflectionUtils.setFieldValue(liteJobFacade, "jobEventBus", jobEventBus);
+ ReflectionUtils.setFieldValue(liteJobFacade, "dagService", dagService);
}
@Test
@@ -212,4 +220,21 @@ public final class LiteJobFacadeTest {
liteJobFacade.postJobExecutionEvent(null);
verify(jobEventBus).post(null);
}
+
+ @Test
+ public void assertIsDagJob() {
+ assertTrue(liteJobFacade.isDagJob());
+ }
+
+ @Test(expected = DagRuntimeException.class)
+ public void assertDagStatesCheck() {
+ when(dagService.getDagStates()).thenReturn(DagStates.PAUSE);
+ liteJobFacade.dagStatesCheck();
+ }
+
+ @Test
+ public void assertDagJobDependenciesCheck() {
+ liteJobFacade.dagJobDependenciesCheck();
+ verify(dagService, times(1)).checkJobDependenciesState();
+ }
}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ExecutionServiceTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ExecutionServiceTest.java
index defe681..67c1fa1 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ExecutionServiceTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ExecutionServiceTest.java
@@ -21,6 +21,7 @@ import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.api.listener.ShardingContexts;
import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobRegistry;
+import org.apache.shardingsphere.elasticjob.lite.internal.state.JobStateEnum;
import org.apache.shardingsphere.elasticjob.lite.internal.storage.JobNodeStorage;
import org.apache.shardingsphere.elasticjob.lite.util.ReflectionUtils;
import org.junit.After;
@@ -82,6 +83,9 @@ public final class ExecutionServiceTest {
verify(jobNodeStorage).fillEphemeralJobNode("sharding/1/running", "");
verify(jobNodeStorage).fillEphemeralJobNode("sharding/2/running", "");
assertTrue(JobRegistry.getInstance().isJobRunning("test_job"));
+ verify(jobNodeStorage).createJobNodeIfNeeded("state/state", JobStateEnum.RUNNING);
+ verify(jobNodeStorage).createJobNodeIfNeeded("proc/succ");
+ verify(jobNodeStorage).createJobNodeIfNeeded("proc/fail");
}
@Test
@@ -104,7 +108,37 @@ public final class ExecutionServiceTest {
verify(jobNodeStorage).removeJobNodeIfExisted("sharding/2/running");
assertFalse(JobRegistry.getInstance().isJobRunning("test_job"));
}
-
+
+ private Map<Integer, String> itemErrorMessages() {
+ Map<Integer, String> items = new HashMap<>(2);
+ items.put(0, "Some error!");
+ items.put(1, "Some error!");
+ return items;
+ }
+
+ @Test
+ public void assertRegisterJobCompletedWithItemWithoutMonitorExecution() {
+ JobRegistry.getInstance().setJobRunning("test_job", true);
+ when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 3).cron("0/1 * * * * ?").monitorExecution(false).build());
+ executionService.registerJobCompleted(new ShardingContexts("fake_task_id", "test_job", 10, "", Collections.emptyMap()), itemErrorMessages());
+ verify(jobNodeStorage, times(0)).removeJobNodeIfExisted(any());
+ verify(jobNodeStorage, times(0)).createJobNodeIfNeeded(any());
+ assertFalse(JobRegistry.getInstance().isJobRunning("test_job"));
+ }
+
+ @Test
+ public void assertRegisterJobCompletedWithItemWithMonitorExecution() {
+ JobRegistry.getInstance().setJobRunning("test_job", true);
+ when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 3).cron("0/1 * * * * ?").monitorExecution(true).build());
+ executionService.registerJobCompleted(getShardingContext(), itemErrorMessages());
+ verify(jobNodeStorage).removeJobNodeIfExisted("sharding/0/running");
+ verify(jobNodeStorage).removeJobNodeIfExisted("sharding/1/running");
+ verify(jobNodeStorage).removeJobNodeIfExisted("sharding/2/running");
+ assertFalse(JobRegistry.getInstance().isJobRunning("test_job"));
+ verify(jobNodeStorage).createJobNodeIfNeeded("proc/fail/0");
+ verify(jobNodeStorage).createJobNodeIfNeeded("proc/fail/1");
+ }
+
@Test
public void assertClearAllRunningInfo() {
when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 3).cron("0/1 * * * * ?").monitorExecution(false).build());
diff --git a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/job/ref/RefFooSimpleElasticJob.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/state/JobStateEnumTest.java
similarity index 50%
copy from elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/job/ref/RefFooSimpleElasticJob.java
copy to elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/state/JobStateEnumTest.java
index 157d33d..53c27b5 100644
--- a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/job/ref/RefFooSimpleElasticJob.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/state/JobStateEnumTest.java
@@ -7,7 +7,7 @@
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,33 +15,21 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.elasticjob.lite.spring.namespace.fixture.job.ref;
+package org.apache.shardingsphere.elasticjob.lite.internal.state;
-import lombok.Getter;
-import lombok.Setter;
-import org.apache.shardingsphere.elasticjob.api.ShardingContext;
-import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
-import org.apache.shardingsphere.elasticjob.lite.spring.namespace.fixture.service.FooService;
+import org.junit.Test;
-public class RefFooSimpleElasticJob implements SimpleJob {
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
- @Getter
- private static volatile boolean completed;
-
- @Getter
- @Setter
- private FooService fooService;
-
- @Override
- public void execute(final ShardingContext shardingContext) {
- fooService.foo();
- completed = true;
- }
-
- /**
- * Set completed to false.
- */
- public static void reset() {
- completed = false;
+public class JobStateEnumTest {
+
+ @Test
+ public void assertEnumOf() {
+ assertThat(JobStateEnum.of("none"), is(JobStateEnum.NONE));
+ assertThat(JobStateEnum.of("running"), is(JobStateEnum.RUNNING));
+ assertThat(JobStateEnum.of("success"), is(JobStateEnum.SUCCESS));
+ assertThat(JobStateEnum.of("skip"), is(JobStateEnum.SKIP));
+ assertThat(JobStateEnum.of("fail"), is(JobStateEnum.FAIL));
}
}
diff --git a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/job/ref/RefFooSimpleElasticJob.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/state/JobStateNodeTest.java
similarity index 51%
copy from elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/job/ref/RefFooSimpleElasticJob.java
copy to elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/state/JobStateNodeTest.java
index 157d33d..37961ac 100644
--- a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/job/ref/RefFooSimpleElasticJob.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/state/JobStateNodeTest.java
@@ -7,7 +7,7 @@
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,33 +15,32 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.elasticjob.lite.spring.namespace.fixture.job.ref;
-
-import lombok.Getter;
-import lombok.Setter;
-import org.apache.shardingsphere.elasticjob.api.ShardingContext;
-import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
-import org.apache.shardingsphere.elasticjob.lite.spring.namespace.fixture.service.FooService;
-
-public class RefFooSimpleElasticJob implements SimpleJob {
-
- @Getter
- private static volatile boolean completed;
-
- @Getter
- @Setter
- private FooService fooService;
-
- @Override
- public void execute(final ShardingContext shardingContext) {
- fooService.foo();
- completed = true;
+package org.apache.shardingsphere.elasticjob.lite.internal.state;
+
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+public class JobStateNodeTest {
+
+ @Test
+ public void assertGetRootState() {
+ assertThat(JobStateNode.getRootState(), is("state/state"));
}
-
- /**
- * Set completed to false.
- */
- public static void reset() {
- completed = false;
+
+ @Test
+ public void assertGetRootProc() {
+ assertThat(JobStateNode.getRootProc(), is("proc"));
+ }
+
+ @Test
+ public void assertGetProcFail() {
+ assertThat(JobStateNode.getProcFail(3), is("proc/fail/3"));
+ }
+
+ @Test
+ public void assertGetProcSucc() {
+ assertThat(JobStateNode.getProcSucc(2), is("proc/succ/2"));
}
}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/resources/logback-test.xml b/elasticjob-lite/elasticjob-lite-core/src/test/resources/logback-test.xml
index 473355d..428a781 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/resources/logback-test.xml
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/resources/logback-test.xml
@@ -37,6 +37,9 @@
</root>
<logger name="org.apache.shardingsphere.elasticjob.lite.internal.snapshot.SnapshotService" level="OFF" />
+ <logger name="org.apache.shardingsphere.elasticjob.lite.internal.dag" level="debug" />
+ <logger name="org.apache.zookeeper.ClientCnxn" level="ERROR" />
<logger name="org.apache.shardingsphere.elasticjob.infra.handler.error.impl.LogJobErrorHandler" level="OFF" />
<logger name="org.apache.curator.framework.listen.MappingListenerManager" level="OFF" />
+
</configuration>
diff --git a/elasticjob-lite/elasticjob-lite-lifecycle/src/main/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/api/DagOperateAPI.java b/elasticjob-lite/elasticjob-lite-lifecycle/src/main/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/api/DagOperateAPI.java
new file mode 100644
index 0000000..6cebf1b
--- /dev/null
+++ b/elasticjob-lite/elasticjob-lite-lifecycle/src/main/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/api/DagOperateAPI.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.lite.lifecycle.api;
+
+import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.DagBriefInfo;
+
+import java.util.List;
+
+/**
+ * Dag operate api.
+ *
+ **/
+public interface DagOperateAPI {
+ /**
+ * Pause the running dag, do not trigger next job.
+ *
+ * @param dagName dag name
+ * @return toggle success?
+ */
+ boolean toggleDagPause(String dagName);
+
+ /**
+ * Resume the pause dag.
+ *
+ * @param dagName dag name
+ * @return toggle success?
+ */
+ boolean toggleDagResume(String dagName);
+
+ /**
+ * Start the dag, trigger the first job.
+ *
+ * @param dagName dag name
+ * @return toggle success?
+ */
+ boolean toggleDagStart(String dagName);
+
+ /**
+ * Stop the pause dag, can not resume.
+ *
+ * @param dagName dag name
+ * @return toggle success?
+ */
+ boolean toggleDagStop(String dagName);
+
+ /**
+ * when the dag state fail, rerun the fail job .
+ *
+ * @param dagName dag name
+ * @return toggle success?
+ */
+ boolean toggleDagRerunWhenFail(String dagName);
+
+ /**
+ * query dag list.
+ *
+ * @return dag info list
+ */
+ List<DagBriefInfo> getDagList();
+
+ /**
+ * get dag's job and it's dependencies.
+ *
+ * @param dagName dag name
+ * @return dag info list
+ */
+ List<DagBriefInfo> getDagJobDependencies(String dagName);
+}
diff --git a/elasticjob-lite/elasticjob-lite-lifecycle/src/main/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/api/JobAPIFactory.java b/elasticjob-lite/elasticjob-lite-lifecycle/src/main/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/api/JobAPIFactory.java
index b1a842b..dfa6a29 100644
--- a/elasticjob-lite/elasticjob-lite-lifecycle/src/main/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/api/JobAPIFactory.java
+++ b/elasticjob-lite/elasticjob-lite-lifecycle/src/main/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/api/JobAPIFactory.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.elasticjob.lite.lifecycle.api;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.elasticjob.lite.lifecycle.internal.operate.DagOperateAPIImpl;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.internal.operate.JobOperateAPIImpl;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.internal.operate.ShardingOperateAPIImpl;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.internal.reg.RegistryCenterFactory;
@@ -104,4 +105,16 @@ public final class JobAPIFactory {
public static ShardingStatisticsAPI createShardingStatisticsAPI(final String connectString, final String namespace, final String digest) {
return new ShardingStatisticsAPIImpl(RegistryCenterFactory.createCoordinatorRegistryCenter(connectString, namespace, digest));
}
+
+ /**
+ * Create dag operate API.
+ *
+ * @param connectString registry center connect string
+ * @param namespace registry center namespace
+ * @param digest registry center digest
+ * @return job sharding statistics API
+ */
+ public static DagOperateAPI createDagOperateAPI(final String connectString, final String namespace, final String digest) {
+ return new DagOperateAPIImpl(RegistryCenterFactory.createCoordinatorRegistryCenter(connectString, namespace, digest));
+ }
}
diff --git a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/job/ref/RefFooSimpleElasticJob.java b/elasticjob-lite/elasticjob-lite-lifecycle/src/main/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/domain/DagBriefInfo.java
similarity index 51%
copy from elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/job/ref/RefFooSimpleElasticJob.java
copy to elasticjob-lite/elasticjob-lite-lifecycle/src/main/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/domain/DagBriefInfo.java
index 157d33d..84a8ce4 100644
--- a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/job/ref/RefFooSimpleElasticJob.java
+++ b/elasticjob-lite/elasticjob-lite-lifecycle/src/main/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/domain/DagBriefInfo.java
@@ -7,7 +7,7 @@
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,33 +15,31 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.elasticjob.lite.spring.namespace.fixture.job.ref;
+package org.apache.shardingsphere.elasticjob.lite.lifecycle.domain;
+
+import lombok.Data;
+import lombok.ToString;
-import lombok.Getter;
-import lombok.Setter;
-import org.apache.shardingsphere.elasticjob.api.ShardingContext;
-import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
-import org.apache.shardingsphere.elasticjob.lite.spring.namespace.fixture.service.FooService;
+/**
+ * Dag config domain.
+ *
+ **/
+@Data
+@ToString
+public class DagBriefInfo {
+ private String dagName;
-public class RefFooSimpleElasticJob implements SimpleJob {
+ private String jobName;
- @Getter
- private static volatile boolean completed;
-
- @Getter
- @Setter
- private FooService fooService;
-
- @Override
- public void execute(final ShardingContext shardingContext) {
- fooService.foo();
- completed = true;
+ private String dependencies;
+
+ public DagBriefInfo(final String group) {
+ this.dagName = group;
}
-
- /**
- * Set completed to false.
- */
- public static void reset() {
- completed = false;
+
+ public DagBriefInfo(final String dagName, final String jobName, final String dependencies) {
+ this.dagName = dagName;
+ this.jobName = jobName;
+ this.dependencies = dependencies;
}
}
diff --git a/elasticjob-lite/elasticjob-lite-lifecycle/src/main/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/operate/DagOperateAPIImpl.java b/elasticjob-lite/elasticjob-lite-lifecycle/src/main/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/operate/DagOperateAPIImpl.java
new file mode 100644
index 0000000..ba5ad22
--- /dev/null
+++ b/elasticjob-lite/elasticjob-lite-lifecycle/src/main/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/operate/DagOperateAPIImpl.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.lite.lifecycle.internal.operate;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
+import org.apache.shardingsphere.elasticjob.lite.internal.dag.DagJobStates;
+import org.apache.shardingsphere.elasticjob.lite.internal.dag.DagNodeStorage;
+import org.apache.shardingsphere.elasticjob.lite.internal.dag.DagService;
+import org.apache.shardingsphere.elasticjob.lite.internal.dag.DagStates;
+import org.apache.shardingsphere.elasticjob.lite.internal.storage.JobNodePath;
+import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.DagOperateAPI;
+import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.DagBriefInfo;
+import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Dag operate api implemention.
+ *
+ **/
+@Slf4j
+public class DagOperateAPIImpl implements DagOperateAPI {
+ private final CoordinatorRegistryCenter regCenter;
+
+ public DagOperateAPIImpl(final CoordinatorRegistryCenter regCenter) {
+ this.regCenter = regCenter;
+ }
+
+ @Override
+ public boolean toggleDagPause(final String dagName) {
+ DagNodeStorage dagNodeStorage = new DagNodeStorage(regCenter, dagName, "");
+ DagStates dagStates = DagStates.of(dagNodeStorage.currentDagStates());
+ if (dagStates != DagStates.RUNNING) {
+ log.error("Current dag-{} not RUNNING , Cannot turn to PAUSE", dagName);
+ return false;
+ }
+ dagNodeStorage.updateDagStates(DagStates.PAUSE);
+ return true;
+ }
+
+ @Override
+ public boolean toggleDagResume(final String dagName) {
+ DagNodeStorage dagNodeStorage = new DagNodeStorage(regCenter, dagName, "");
+ DagStates dagStates = DagStates.of(dagNodeStorage.currentDagStates());
+ if (dagStates != DagStates.PAUSE) {
+ log.error("Current dag-{} not PAUSE , Cannot turn to RUNNING", dagName);
+ return false;
+ }
+
+ // get next should trigger job.
+ DagService dagService = new DagService(regCenter, dagName, dagNodeStorage);
+ List<String> nextJobs = dagService.nextShouldTriggerJob();
+ dagNodeStorage.updateDagStates(DagStates.RUNNING);
+ nextJobs.forEach(job -> triggerJobManual(job));
+ return true;
+ }
+
+ @Override
+ public boolean toggleDagStart(final String dagName) {
+ DagNodeStorage dagNodeStorage = new DagNodeStorage(regCenter, dagName, "");
+ DagStates dagStates = DagStates.of(dagNodeStorage.currentDagStates());
+ if (dagStates == DagStates.RUNNING || dagStates == DagStates.PAUSE) {
+ log.error("Can not start DAG-{}, current state-{}", dagName, dagStates);
+ return false;
+ }
+ // trigger self
+ List<String> rootJobs = Lists.newArrayList();
+ Map<String, Set<String>> allDagConfigJobs = dagNodeStorage.getAllDagConfigJobs();
+
+ allDagConfigJobs.forEach((key, value) -> {
+ if (value.size() == 1 && value.contains(DagService.ROOT_JOB)) {
+ rootJobs.add(key);
+ }
+ });
+
+ log.info("Dag-{} start jobs [{}]", dagName, rootJobs);
+ rootJobs.forEach(job -> triggerJobManual(job));
+ return true;
+ }
+
+ @Override
+ public boolean toggleDagStop(final String dagName) {
+ DagNodeStorage dagNodeStorage = new DagNodeStorage(regCenter, dagName, "");
+ DagStates dagStates = DagStates.of(dagNodeStorage.currentDagStates());
+ if (dagStates != DagStates.PAUSE) {
+ log.error("Current dag-{} not PAUSE , Cannot turn to FAIL", dagName);
+ return false;
+ }
+ dagNodeStorage.updateDagStates(DagStates.FAIL);
+ return true;
+ }
+
+ @Override
+ public boolean toggleDagRerunWhenFail(final String dagName) {
+ DagNodeStorage dagNodeStorage = new DagNodeStorage(regCenter, dagName, "");
+ DagStates dagStates = DagStates.of(dagNodeStorage.currentDagStates());
+ if (dagStates != DagStates.FAIL) {
+ log.error("Current dag-{} not FAIL , Cannot rerun fail jobs", dagName);
+ return false;
+ }
+
+ // delete fail job
+ List<String> failJobs = dagNodeStorage.getDagJobListByState(DagJobStates.FAIL);
+ if (failJobs.isEmpty()) {
+ log.error("Dag-{} don't have fail jobs", dagName);
+ return false;
+ }
+ log.info("Dag-{} rerun fail jobs [{}]", dagName, failJobs);
+ failJobs.forEach(job -> dagNodeStorage.removeFailJob(job));
+ dagNodeStorage.updateDagStates(DagStates.RUNNING);
+ failJobs.forEach(job -> triggerJobManual(job));
+ return true;
+ }
+
+ @Override
+ public List<DagBriefInfo> getDagList() {
+ DagNodeStorage dagNodeStorage = new DagNodeStorage(regCenter, "", "");
+ List<String> allDags = dagNodeStorage.getAllDags();
+ List<DagBriefInfo> resList = Lists.newArrayList();
+ allDags.forEach(name -> resList.add(new DagBriefInfo(name)));
+ return resList;
+ }
+
+ @Override
+ public List<DagBriefInfo> getDagJobDependencies(final String dagName) {
+ DagNodeStorage dagNodeStorage = new DagNodeStorage(regCenter, dagName, "");
+ Map<String, Set<String>> allDagConfigJobs = dagNodeStorage.getAllDagConfigJobs();
+ List<DagBriefInfo> resList = Lists.newArrayList();
+ allDagConfigJobs.forEach((key, value) -> {
+ resList.add(new DagBriefInfo(dagName, key, Joiner.on(",").join(value)));
+ });
+ return resList;
+ }
+
+ private void triggerJobManual(final String job) {
+ CuratorTransactionFinal curatorTransactionFinal = null;
+
+ log.info("Trigger Dag job-{} manually.", job);
+ try {
+ JobNodePath jobNodePath = new JobNodePath(job);
+ CuratorFramework rawClient = (CuratorFramework) regCenter.getRawClient();
+ List<CuratorOp> opList = new ArrayList<>();
+ for (String each : regCenter.getChildrenKeys(jobNodePath.getInstancesNodePath())) {
+ opList.add(rawClient.transactionOp().setData().forPath(jobNodePath.getInstanceNodePath(each), "TRIGGER".getBytes()));
+ }
+ rawClient.transaction().forOperations(opList);
+ //CHECKSTYLE:OFF
+ } catch (final Exception exp) {
+ //CHECKSTYLE:ON
+ log.error("Trigger Dag job-{} by hand in transaction Exception!", job, exp);
+ }
+ }
+
+}
diff --git a/elasticjob-lite/elasticjob-lite-lifecycle/src/test/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/api/JobAPIFactoryTest.java b/elasticjob-lite/elasticjob-lite-lifecycle/src/test/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/api/JobAPIFactoryTest.java
index 63eea65..da9c7c1 100644
--- a/elasticjob-lite/elasticjob-lite-lifecycle/src/test/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/api/JobAPIFactoryTest.java
+++ b/elasticjob-lite/elasticjob-lite-lifecycle/src/test/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/api/JobAPIFactoryTest.java
@@ -54,4 +54,9 @@ public final class JobAPIFactoryTest extends AbstractEmbedZookeeperBaseTest {
public void assertCreateShardingStatisticsAPI() {
assertThat(JobAPIFactory.createShardingStatisticsAPI(getConnectionString(), "namespace", null), instanceOf(ShardingStatisticsAPI.class));
}
+
+ @Test
+ public void assertCreateDagOperateAPI() {
+ assertThat(JobAPIFactory.createDagOperateAPI(getConnectionString(), "namespace", null), instanceOf(DagOperateAPI.class));
+ }
}
diff --git a/elasticjob-lite/elasticjob-lite-lifecycle/src/test/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/fixture/LifecycleYamlConstants.java b/elasticjob-lite/elasticjob-lite-lifecycle/src/test/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/fixture/LifecycleYamlConstants.java
index b9e2db8..89933b8 100644
--- a/elasticjob-lite/elasticjob-lite-lifecycle/src/test/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/fixture/LifecycleYamlConstants.java
+++ b/elasticjob-lite/elasticjob-lite-lifecycle/src/test/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/fixture/LifecycleYamlConstants.java
@@ -37,6 +37,8 @@ public final class LifecycleYamlConstants {
+ "overwrite: false\n";
private static final String DATAFLOW_JOB_YAML = "cron: 0/1 * * * * ?\n"
+ + "dagRunAlone: false\n"
+ + "dagSkipWhenFail: false\n"
+ "description: ''\n"
+ "disabled: false\n"
+ "failover: false\n"
@@ -49,6 +51,8 @@ public final class LifecycleYamlConstants {
+ "props:\n"
+ " streaming.process: 'true'\n"
+ "reconcileIntervalMinutes: 10\n"
+ + "retryInterval: 0\n"
+ + "retryTimes: 0\n"
+ "shardingTotalCount: 3\n";
private static final String SCRIPT_JOB_YAML = "jobName: test_job\n"
diff --git a/elasticjob-lite/elasticjob-lite-lifecycle/src/test/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/operate/DagOperateAPIImplTest.java b/elasticjob-lite/elasticjob-lite-lifecycle/src/test/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/operate/DagOperateAPIImplTest.java
new file mode 100644
index 0000000..f029ddf
--- /dev/null
+++ b/elasticjob-lite/elasticjob-lite-lifecycle/src/test/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/operate/DagOperateAPIImplTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.lite.lifecycle.internal.operate;
+
+import com.google.common.collect.Lists;
+import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.DagOperateAPI;
+import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.DagBriefInfo;
+import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DagOperateAPIImplTest {
+
+ @Mock
+ private CoordinatorRegistryCenter regCenter;
+
+ private DagOperateAPI dagOperateAPI;
+
+ @Before
+ public void setUp() {
+ dagOperateAPI = new DagOperateAPIImpl(regCenter);
+ }
+
+ @Test
+ public void toggleDagPause() {
+ when(regCenter.isExisted("/dag/testDag/states")).thenReturn(true);
+ when(regCenter.getDirectly("/dag/testDag/states")).thenReturn("running");
+ dagOperateAPI.toggleDagPause("testDag");
+ verify(regCenter).update("/dag/testDag/states", "pause");
+ }
+
+ @Test
+ public void toggleDagResume() {
+ when(regCenter.isExisted("/dag/testDag/states")).thenReturn(true);
+ when(regCenter.getDirectly("/dag/testDag/states")).thenReturn("pause");
+ dagOperateAPI.toggleDagResume("testDag");
+ verify(regCenter).update("/dag/testDag/states", "running");
+ }
+
+ @Test
+ public void toggleDagStart() {
+ when(regCenter.isExisted("/dag/testDag/states")).thenReturn(true);
+ when(regCenter.getDirectly("/dag/testDag/states")).thenReturn("pause");
+ assertFalse(dagOperateAPI.toggleDagStart("testDag"));
+ }
+
+ @Test
+ public void toggleDagStop() {
+ when(regCenter.isExisted("/dag/testDag/states")).thenReturn(true);
+ when(regCenter.getDirectly("/dag/testDag/states")).thenReturn("pause");
+ dagOperateAPI.toggleDagStop("testDag");
+ verify(regCenter).update("/dag/testDag/states", "fail");
+ }
+
+ @Test
+ public void toggleDagRerunWhenFail() {
+ when(regCenter.isExisted("/dag/testDag/states")).thenReturn(true);
+ when(regCenter.getDirectly("/dag/testDag/states")).thenReturn("pause");
+ assertFalse(dagOperateAPI.toggleDagRerunWhenFail("testDag"));
+ }
+
+ @Test
+ public void getDagList() {
+ when(regCenter.getChildrenKeys("/dag")).thenReturn(Lists.newArrayList("d1", "d2"));
+ List<DagBriefInfo> dagList = dagOperateAPI.getDagList();
+ assertThat(dagList.size(), is(2));
+ }
+
+ @Test
+ public void getDagJobDependencies() {
+ when(regCenter.getChildrenKeys("/dag/testDag/config")).thenReturn(Lists.newArrayList("job1", "job2", "job3"));
+ when(regCenter.getDirectly("/dag/testDag/config/job1")).thenReturn("self");
+ when(regCenter.getDirectly("/dag/testDag/config/job2")).thenReturn("self,job1");
+ when(regCenter.getDirectly("/dag/testDag/config/job3")).thenReturn("job1,job2");
+ List<DagBriefInfo> testDag = dagOperateAPI.getDagJobDependencies("testDag");
+ assertThat(testDag.size(), is(3));
+ }
+}
diff --git a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-boot-starter/src/main/java/org/apache/shardingsphere/elasticjob/lite/spring/boot/job/ElasticJobConfigurationProperties.java b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-boot-starter/src/main/java/org/apache/shardingsphere/elasticjob/lite/spring/boot/job/ElasticJobConfigurationProperties.java
index ba9348d..5d9095a 100644
--- a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-boot-starter/src/main/java/org/apache/shardingsphere/elasticjob/lite/spring/boot/job/ElasticJobConfigurationProperties.java
+++ b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-boot-starter/src/main/java/org/apache/shardingsphere/elasticjob/lite/spring/boot/job/ElasticJobConfigurationProperties.java
@@ -19,8 +19,10 @@ package org.apache.shardingsphere.elasticjob.lite.spring.boot.job;
import lombok.Getter;
import lombok.Setter;
+import org.apache.commons.lang3.StringUtils;
import org.apache.shardingsphere.elasticjob.api.ElasticJob;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
+import org.apache.shardingsphere.elasticjob.api.JobDagConfiguration;
import java.util.Properties;
@@ -66,6 +68,18 @@ public class ElasticJobConfigurationProperties {
private boolean overwrite;
+ private String dagName;
+
+ private String dagDependencies;
+
+ private int dagRetryTimes;
+
+ private int dagRetryInterval;
+
+ private boolean dagRunAlone;
+
+ private boolean dagSkipWhenFail;
+
/**
* Convert to job configuration.
*
@@ -73,12 +87,16 @@ public class ElasticJobConfigurationProperties {
* @return job configuration
*/
public JobConfiguration toJobConfiguration(final String jobName) {
+ JobDagConfiguration jobDagConfiguration = null;
+ if (StringUtils.isNotEmpty(dagName)) {
+ jobDagConfiguration = new JobDagConfiguration(dagName, dagDependencies, dagRetryTimes, dagRetryInterval, dagRunAlone, dagSkipWhenFail);
+ }
JobConfiguration result = JobConfiguration.newBuilder(jobName, shardingTotalCount)
.cron(cron).shardingItemParameters(shardingItemParameters).jobParameter(jobParameter)
.monitorExecution(monitorExecution).failover(failover).misfire(misfire)
.maxTimeDiffSeconds(maxTimeDiffSeconds).reconcileIntervalMinutes(reconcileIntervalMinutes)
.jobShardingStrategyType(jobShardingStrategyType).jobExecutorServiceHandlerType(jobExecutorServiceHandlerType).jobErrorHandlerType(jobErrorHandlerType)
- .description(description).disabled(disabled).overwrite(overwrite).build();
+ .description(description).disabled(disabled).overwrite(overwrite).jobDagConfiguration(jobDagConfiguration).build();
for (Object each : props.keySet()) {
result.getProps().setProperty(each.toString(), props.get(each.toString()).toString());
}
diff --git a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-boot-starter/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/boot/job/ElasticJobSpringBootTest.java b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-boot-starter/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/boot/job/ElasticJobSpringBootTest.java
index 3959198..7867359 100644
--- a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-boot-starter/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/boot/job/ElasticJobSpringBootTest.java
+++ b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-boot-starter/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/boot/job/ElasticJobSpringBootTest.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.elasticjob.lite.spring.boot.job;
import org.apache.shardingsphere.elasticjob.api.ElasticJob;
+import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.infra.concurrent.BlockUtils;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.JobBootstrap;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
@@ -37,6 +38,7 @@ import java.sql.SQLException;
import java.util.Map;
import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
@@ -96,4 +98,19 @@ public class ElasticJobSpringBootTest extends AbstractJUnit4SpringContextTests {
assertNotNull(applicationContext.getBean("customTestJobBean", OneOffJobBootstrap.class));
assertNotNull(applicationContext.getBean("printTestJobBean", OneOffJobBootstrap.class));
}
+
+ @Test
+ public void assertJobDagConfiguration() {
+ assertNotNull(applicationContext);
+ ElasticJobProperties bean = applicationContext.getBean(ElasticJobProperties.class);
+ assertNotNull(bean);
+ assertNotNull(bean.getJobs());
+ Map<String, ElasticJobConfigurationProperties> jobs = bean.getJobs();
+ jobs.forEach((key, value) -> {
+ JobConfiguration job = value.toJobConfiguration("name");
+ assertNotNull(job);
+ assertNotNull(job.getJobDagConfiguration());
+ assertEquals(job.getJobDagConfiguration().getDagName(), "dagA");
+ });
+ }
}
diff --git a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-boot-starter/src/test/resources/application-elasticjob.yml b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-boot-starter/src/test/resources/application-elasticjob.yml
index fccac3f..ff4a499 100644
--- a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-boot-starter/src/test/resources/application-elasticjob.yml
+++ b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-boot-starter/src/test/resources/application-elasticjob.yml
@@ -33,9 +33,13 @@ elasticjob:
elasticJobClass: org.apache.shardingsphere.elasticjob.lite.spring.boot.job.fixture.job.impl.CustomTestJob
jobBootstrapBeanName: customTestJobBean
shardingTotalCount: 3
+ dagName: dagA
+ dagDependencies: job1,printTestJob
printTestJob:
elasticJobType: PRINT
jobBootstrapBeanName: printTestJobBean
shardingTotalCount: 3
props:
print.content: "test print job"
+ dagName: dagA
+ dagDependencies: job1,customTestJob
diff --git a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/main/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/job/parser/JobBeanDefinitionParser.java b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/main/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/job/parser/JobBeanDefinitionParser.java
index 5d9da40..3245d77 100644
--- a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/main/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/job/parser/JobBeanDefinitionParser.java
+++ b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/main/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/job/parser/JobBeanDefinitionParser.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.elasticjob.lite.spring.namespace.job.parser;
import com.google.common.base.Strings;
+import org.apache.shardingsphere.elasticjob.api.JobDagConfiguration;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
@@ -29,6 +30,7 @@ import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.ManagedList;
import org.springframework.beans.factory.xml.AbstractBeanDefinitionParser;
import org.springframework.beans.factory.xml.ParserContext;
+import org.springframework.util.StringUtils;
import org.springframework.util.xml.DomUtils;
import org.w3c.dom.Element;
@@ -84,6 +86,7 @@ public final class JobBeanDefinitionParser extends AbstractBeanDefinitionParser
result.addConstructorArgValue(parsePropsElement(element, parserContext));
result.addConstructorArgValue(element.getAttribute(JobBeanDefinitionTag.DISABLED_ATTRIBUTE));
result.addConstructorArgValue(element.getAttribute(JobBeanDefinitionTag.OVERWRITE_ATTRIBUTE));
+ result.addConstructorArgValue(createJobDagConfig(element));
return result.getBeanDefinition();
}
@@ -110,4 +113,18 @@ public final class JobBeanDefinitionParser extends AbstractBeanDefinitionParser
}
return result;
}
+
+ private BeanDefinition createJobDagConfig(final Element element) {
+ if (StringUtils.isEmpty(element.getAttribute(JobBeanDefinitionTag.DAG_NAME))) {
+ return null;
+ }
+ BeanDefinitionBuilder jobDagConfig = BeanDefinitionBuilder.rootBeanDefinition(JobDagConfiguration.class);
+ jobDagConfig.addConstructorArgValue(element.getAttribute(JobBeanDefinitionTag.DAG_NAME));
+ jobDagConfig.addConstructorArgValue(element.getAttribute(JobBeanDefinitionTag.DAG_DEPENDENCIES));
+ jobDagConfig.addConstructorArgValue(element.getAttribute(JobBeanDefinitionTag.DAG_RETRY_TIMES));
+ jobDagConfig.addConstructorArgValue(element.getAttribute(JobBeanDefinitionTag.DAG_RETRY_INTERVAL));
+ jobDagConfig.addConstructorArgValue(element.getAttribute(JobBeanDefinitionTag.DAG_RUN_ALONE));
+ jobDagConfig.addConstructorArgValue(element.getAttribute(JobBeanDefinitionTag.DAG_SKIP_WHEN_FAIL));
+ return jobDagConfig.getBeanDefinition();
+ }
}
diff --git a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/main/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/job/tag/JobBeanDefinitionTag.java b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/main/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/job/tag/JobBeanDefinitionTag.java
index 96cc6d2..de25a78 100644
--- a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/main/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/job/tag/JobBeanDefinitionTag.java
+++ b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/main/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/job/tag/JobBeanDefinitionTag.java
@@ -65,4 +65,16 @@ public final class JobBeanDefinitionTag {
public static final String DISABLED_ATTRIBUTE = "disabled";
public static final String OVERWRITE_ATTRIBUTE = "overwrite";
+
+ public static final String DAG_NAME = "dag-name";
+
+ public static final String DAG_DEPENDENCIES = "dag-dependencies";
+
+ public static final String DAG_RETRY_TIMES = "dag-retry-times";
+
+ public static final String DAG_RETRY_INTERVAL = "dag-retry-interval";
+
+ public static final String DAG_RUN_ALONE = "dag-run-alone";
+
+ public static final String DAG_SKIP_WHEN_FAIL = "dag-skip-when-fail";
}
diff --git a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/main/resources/META-INF/namespace/elasticjob.xsd b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/main/resources/META-INF/namespace/elasticjob.xsd
index d42c23b..ae8b9c9 100644
--- a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/main/resources/META-INF/namespace/elasticjob.xsd
+++ b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/main/resources/META-INF/namespace/elasticjob.xsd
@@ -65,6 +65,12 @@
<xsd:attribute name="description" type="xsd:string" />
<xsd:attribute name="disabled" type="xsd:string" default="false" />
<xsd:attribute name="overwrite" type="xsd:string" default="false" />
+ <xsd:attribute name="dag-name" type="xsd:string" />
+ <xsd:attribute name="dag-dependencies" type="xsd:string" />
+ <xsd:attribute name="dag-retry-times" type="xsd:string" default="0" />
+ <xsd:attribute name="dag-retry-interval" type="xsd:string" default="0" />
+ <xsd:attribute name="dag-run-alone" type="xsd:string" default="false" />
+ <xsd:attribute name="dag-skip-when-fail" type="xsd:string" default="false" />
</xsd:extension>
</xsd:complexContent>
</xsd:complexType>
diff --git a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/job/ref/RefFooSimpleElasticJob.java b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/job/ref/RefFooSimpleElasticJob.java
index 157d33d..bc9190d 100644
--- a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/job/ref/RefFooSimpleElasticJob.java
+++ b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/job/ref/RefFooSimpleElasticJob.java
@@ -23,10 +23,15 @@ import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.apache.shardingsphere.elasticjob.lite.spring.namespace.fixture.service.FooService;
+import java.util.concurrent.atomic.AtomicInteger;
+
public class RefFooSimpleElasticJob implements SimpleJob {
@Getter
private static volatile boolean completed;
+
+ @Getter
+ private static volatile AtomicInteger times = new AtomicInteger(0);
@Getter
@Setter
@@ -36,6 +41,7 @@ public class RefFooSimpleElasticJob implements SimpleJob {
public void execute(final ShardingContext shardingContext) {
fooService.foo();
completed = true;
+ times.getAndIncrement();
}
/**
diff --git a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/job/JobSpringNamespaceWithRefDagTest.java b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/job/JobSpringNamespaceWithRefDagTest.java
new file mode 100644
index 0000000..c21a32d
--- /dev/null
+++ b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/job/JobSpringNamespaceWithRefDagTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.lite.spring.namespace.job;
+
+import org.apache.shardingsphere.elasticjob.infra.concurrent.BlockUtils;
+import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobRegistry;
+import org.apache.shardingsphere.elasticjob.lite.spring.namespace.fixture.job.ref.RefFooSimpleElasticJob;
+import org.apache.shardingsphere.elasticjob.lite.spring.namespace.test.AbstractZookeeperJUnit4SpringContextTests;
+import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.test.context.ContextConfiguration;
+
+import javax.annotation.Resource;
+
+import static org.junit.Assert.assertTrue;
+
+@ContextConfiguration(locations = "classpath:META-INF/job/withJobRefDag.xml")
+public final class JobSpringNamespaceWithRefDagTest extends AbstractZookeeperJUnit4SpringContextTests {
+
+ private final String rootJob = "simpleElasticJob_job_ref_root";
+
+ private final String job2 = "simpleElasticJob_job_ref_job2";
+
+ @Resource
+ private CoordinatorRegistryCenter regCenter;
+
+ @Before
+ @After
+ public void reset() {
+ RefFooSimpleElasticJob.reset();
+ }
+
+ @After
+ public void tearDown() {
+ JobRegistry.getInstance().shutdown(rootJob);
+ }
+
+ @Test
+ public void assertSpringJobBean() {
+ assertSimpleElasticJobDagBean();
+ }
+
+ private void assertSimpleElasticJobDagBean() {
+ while (RefFooSimpleElasticJob.getTimes().intValue() < 9) {
+ BlockUtils.sleep(300L);
+ }
+ BlockUtils.sleep(300L);
+ assertTrue(RefFooSimpleElasticJob.isCompleted());
+ }
+}
diff --git a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/resources/META-INF/job/withJobRefDag.xml b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/resources/META-INF/job/withJobRefDag.xml
new file mode 100644
index 0000000..f6d2c24
--- /dev/null
+++ b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/resources/META-INF/job/withJobRefDag.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:elasticjob="http://shardingsphere.apache.org/schema/elasticjob"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://shardingsphere.apache.org/schema/elasticjob
+ http://shardingsphere.apache.org/schema/elasticjob/elasticjob.xsd
+ ">
+ <import resource="base.xml"/>
+
+ <bean id="refSimpleJobRoot" class="org.apache.shardingsphere.elasticjob.lite.spring.namespace.fixture.job.ref.RefFooSimpleElasticJob">
+ <property name="fooService" ref="foo" />
+ </bean>
+ <bean id="refSimpleJobJob1" class="org.apache.shardingsphere.elasticjob.lite.spring.namespace.fixture.job.ref.RefFooSimpleElasticJob">
+ <property name="fooService" ref="foo" />
+ </bean>
+ <bean id="refSimpleJobJob2" class="org.apache.shardingsphere.elasticjob.lite.spring.namespace.fixture.job.ref.RefFooSimpleElasticJob">
+ <property name="fooService" ref="foo" />
+ </bean>
+
+ <elasticjob:job id="simpleElasticJob_job_ref_root" job-ref="refSimpleJobRoot" registry-center-ref="regCenter"
+ cron="${simpelJobDagRoot.cron}" sharding-total-count="${simpleJob.shardingTotalCount}" sharding-item-parameters="${simpleJob.shardingItemParameters}"
+ disabled="${simpleJob.disabled}" overwrite="${simpleJob.overwrite}" job-executor-service-handler-type="SINGLE_THREAD" dag-name="fooDag" dag-dependencies="self" />
+
+ <elasticjob:job id="simpleElasticJob_job_ref_job1" job-ref="refSimpleJobJob1" registry-center-ref="regCenter"
+ cron="${simpelJobDagDep.cron}" sharding-total-count="${simpleJob.shardingTotalCount}" sharding-item-parameters="${simpleJob.shardingItemParameters}"
+ disabled="${simpleJob.disabled}" overwrite="${simpleJob.overwrite}" job-executor-service-handler-type="SINGLE_THREAD" dag-name="fooDag" dag-dependencies="simpleElasticJob_job_ref_root" />
+
+ <elasticjob:job id="simpleElasticJob_job_ref_job2" job-ref="refSimpleJobJob2" registry-center-ref="regCenter"
+ cron="${simpelJobDagDep.cron}" sharding-total-count="${simpleJob.shardingTotalCount}" sharding-item-parameters="${simpleJob.shardingItemParameters}"
+ disabled="${simpleJob.disabled}" overwrite="${simpleJob.overwrite}" job-executor-service-handler-type="SINGLE_THREAD" dag-name="fooDag" dag-dependencies=" simpleElasticJob_job_ref_root , simpleElasticJob_job_ref_job1" />
+</beans>
diff --git a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/resources/conf/job/conf.properties b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/resources/conf/job/conf.properties
index 18714ea..9829492 100644
--- a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/resources/conf/job/conf.properties
+++ b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/resources/conf/job/conf.properties
@@ -41,3 +41,7 @@ dataflowJob.streamingProcess=true
# need absolute path
#script.scriptCommandLine=your_path/elasticjob-lite/elasticjob-lite-spring/src/test/resources/script/demo.sh
script.scriptCommandLine=echo test
+
+# for dag job test
+simpelJobDagRoot.cron=0/59 * * * * ?
+simpelJobDagDep.cron=0 * * 1 1 ? 2099