You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/07/12 12:11:34 UTC
[shardingsphere-elasticjob-lite] branch master updated: Use
ElasticJobExecutor for cloud (#1032)
This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere-elasticjob-lite.git
The following commit(s) were added to refs/heads/master by this push:
new ca4050d Use ElasticJobExecutor for cloud (#1032)
ca4050d is described below
commit ca4050db6277a1ebfefd80e2d9bb75fd45d1ed11
Author: Liang Zhang <te...@163.com>
AuthorDate: Sun Jul 12 20:11:23 2020 +0800
Use ElasticJobExecutor for cloud (#1032)
* Refactor executor dependency
* Refactor CloudJobFacade
* Unify ElasticJob interface
* Use ElasticJobExecutor for cloud
* Remove AbstractElasticJobExecutor
* Remove JobProperties
---
elasticjob-cloud/elasticjob-cloud-common/pom.xml | 15 ++
.../elasticjob/cloud/api/ElasticJob.java | 24 ---
.../elasticjob/cloud/api/dataflow/DataflowJob.java | 47 -----
.../elasticjob/cloud/api/script/ScriptJob.java | 26 ---
.../elasticjob/cloud/api/simple/SimpleJob.java | 34 ---
.../cloud/config/JobCoreConfiguration.java | 59 ++++--
.../cloud/config/JobTypeConfiguration.java | 2 +-
.../config/script/ScriptJobConfiguration.java | 3 +-
.../cloud/exception/AppConfigurationException.java | 4 -
.../cloud/executor/AbstractElasticJobExecutor.java | 221 --------------------
.../cloud/executor/JobExecutorFactory.java | 56 -----
.../executor/handler/ExecutorServiceHandler.java | 2 +-
.../cloud/executor/handler/JobProperties.java | 116 -----------
.../cloud/executor/type/DataflowJobExecutor.java | 75 -------
.../cloud/executor/type/ScriptJobExecutor.java | 59 ------
.../cloud/executor/type/SimpleJobExecutor.java | 41 ----
.../AbstractJobConfigurationGsonTypeAdapter.java | 57 ++---
.../elasticjob/cloud/util/json/GsonFactory.java | 4 +-
.../cloud/config/JobCoreConfigurationTest.java | 9 +-
.../cloud/executor/JobExecutorFactoryTest.java | 77 -------
.../cloud/executor/handler/JobPropertiesTest.java | 89 --------
.../executor/type/DataflowJobExecutorTest.java | 171 ---------------
.../cloud/executor/type/ElasticJobVerify.java | 54 -----
.../cloud/executor/type/ScriptJobExecutorTest.java | 85 --------
.../cloud/executor/type/SimpleJobExecutorTest.java | 232 ---------------------
.../cloud/executor/type/WrongJobExecutorTest.java | 71 -------
.../elasticjob/cloud/fixture/APIJsonConstants.java | 78 -------
.../cloud/fixture/ShardingContextsBuilder.java | 52 -----
.../config/TestDataflowJobConfiguration.java | 41 ----
.../fixture/config/TestJobRootConfiguration.java | 33 ---
.../fixture/config/TestScriptJobConfiguration.java | 41 ----
.../fixture/config/TestSimpleJobConfiguration.java | 56 -----
.../fixture/handler/IgnoreJobExceptionHandler.java | 27 ---
.../fixture/handler/ThrowJobExceptionHandler.java | 29 ---
.../elasticjob/cloud/fixture/job/JobCaller.java | 41 ----
.../elasticjob/cloud/fixture/job/OtherJob.java | 23 --
.../cloud/fixture/job/TestDataflowJob.java | 42 ----
.../cloud/fixture/job/TestSimpleJob.java | 33 ---
.../elasticjob/cloud/fixture/job/TestWrongJob.java | 31 ---
.../json/JobConfigurationGsonTypeAdapterTest.java | 105 ----------
elasticjob-cloud/elasticjob-cloud-executor/pom.xml | 15 ++
.../elasticjob/cloud/executor/CloudJobFacade.java | 19 +-
.../cloud/executor/DaemonTaskScheduler.java | 27 ++-
.../cloud/executor/JobTypeConfigurationUtil.java | 6 +-
.../elasticjob/cloud/executor/TaskExecutor.java | 25 +--
.../cloud/executor/local/LocalTaskExecutor.java | 28 ++-
.../cloud/executor/CloudJobFacadeTest.java | 2 +-
.../cloud/executor/DaemonTaskSchedulerTest.java | 16 +-
.../cloud/executor/TaskExecutorThreadTest.java | 8 +-
.../elasticjob/cloud/executor/fixture/TestJob.java | 2 +-
.../fixture/TestScriptJobConfiguration.java | 37 ----
.../executor/local/fixture/TestDataflowJob.java | 2 +-
.../executor/local/fixture/TestSimpleJob.java | 2 +-
.../elasticjob-cloud-scheduler/pom.xml | 15 ++
.../cloud/scheduler/mesos/TaskInfoData.java | 5 +-
.../fixture/CloudJobConfigurationBuilder.java | 10 +-
.../scheduler/fixture/CloudJsonConstants.java | 9 +-
.../cloud/scheduler/fixture/TestSimpleJob.java | 2 +-
elasticjob-lite/elasticjob-lite-core/pom.xml | 12 +-
elasticjob-lite/elasticjob-lite-lifecycle/pom.xml | 15 --
elasticjob-lite/elasticjob-lite-spring/pom.xml | 15 --
examples/elasticjob-example-jobs/pom.xml | 14 --
62 files changed, 214 insertions(+), 2337 deletions(-)
diff --git a/elasticjob-cloud/elasticjob-cloud-common/pom.xml b/elasticjob-cloud/elasticjob-cloud-common/pom.xml
index 92524f0..9241acb 100755
--- a/elasticjob-cloud/elasticjob-cloud-common/pom.xml
+++ b/elasticjob-cloud/elasticjob-cloud-common/pom.xml
@@ -41,6 +41,21 @@
</dependency>
<dependency>
<groupId>org.apache.shardingsphere.elasticjob</groupId>
+ <artifactId>elasticjob-simple-executor</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.shardingsphere.elasticjob</groupId>
+ <artifactId>elasticjob-dataflow-executor</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.shardingsphere.elasticjob</groupId>
+ <artifactId>elasticjob-script-executor</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.shardingsphere.elasticjob</groupId>
<artifactId>elasticjob-registry-center</artifactId>
<version>${project.parent.version}</version>
</dependency>
diff --git a/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/api/ElasticJob.java b/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/api/ElasticJob.java
deleted file mode 100755
index 32e1636..0000000
--- a/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/api/ElasticJob.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.elasticjob.cloud.api;
-
-/**
- * ElasticJob interface.
- */
-public interface ElasticJob {
-}
diff --git a/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/api/dataflow/DataflowJob.java b/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/api/dataflow/DataflowJob.java
deleted file mode 100755
index 145483b..0000000
--- a/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/api/dataflow/DataflowJob.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.elasticjob.cloud.api.dataflow;
-
-import org.apache.shardingsphere.elasticjob.api.ShardingContext;
-import org.apache.shardingsphere.elasticjob.cloud.api.ElasticJob;
-
-import java.util.List;
-
-/**
- * Dataflow job.
- *
- * @param <T> type of data
- */
-public interface DataflowJob<T> extends ElasticJob {
-
- /**
- * Fetch to be processed data.
- *
- * @param shardingContext sharding context
- * @return to be processed data
- */
- List<T> fetchData(ShardingContext shardingContext);
-
- /**
- * Process data.
- *
- * @param shardingContext sharding context
- * @param data to be processed data
- */
- void processData(ShardingContext shardingContext, List<T> data);
-}
diff --git a/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/api/script/ScriptJob.java b/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/api/script/ScriptJob.java
deleted file mode 100755
index 0c32588..0000000
--- a/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/api/script/ScriptJob.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.elasticjob.cloud.api.script;
-
-import org.apache.shardingsphere.elasticjob.cloud.api.ElasticJob;
-
-/**
- * Script job.
- */
-public interface ScriptJob extends ElasticJob {
-}
diff --git a/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/api/simple/SimpleJob.java b/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/api/simple/SimpleJob.java
deleted file mode 100755
index 94a94de..0000000
--- a/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/api/simple/SimpleJob.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.elasticjob.cloud.api.simple;
-
-import org.apache.shardingsphere.elasticjob.api.ShardingContext;
-import org.apache.shardingsphere.elasticjob.cloud.api.ElasticJob;
-
-/**
- * Simple job.
- */
-public interface SimpleJob extends ElasticJob {
-
- /**
- * Execute job.
- *
- * @param shardingContext sharding context
- */
- void execute(ShardingContext shardingContext);
-}
diff --git a/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/config/JobCoreConfiguration.java b/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/config/JobCoreConfiguration.java
index 51fefcc..6e4a0a1 100755
--- a/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/config/JobCoreConfiguration.java
+++ b/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/config/JobCoreConfiguration.java
@@ -19,7 +19,6 @@ package org.apache.shardingsphere.elasticjob.cloud.config;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
-import org.apache.shardingsphere.elasticjob.cloud.executor.handler.JobProperties;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
@@ -46,10 +45,12 @@ public final class JobCoreConfiguration {
private final boolean misfire;
+ private final String jobExecutorServiceHandlerType;
+
+ private final String jobErrorHandlerType;
+
private final String description;
- private final JobProperties jobProperties;
-
/**
* Create simple job configuration builder.
*
@@ -78,11 +79,13 @@ public final class JobCoreConfiguration {
private boolean failover;
private boolean misfire = true;
+
+ private String jobExecutorServiceHandlerType;
+
+ private String jobErrorHandlerType;
private String description = "";
- private final JobProperties jobProperties = new JobProperties();
-
/**
* Set mapper of sharding items and sharding parameters.
*
@@ -104,7 +107,7 @@ public final class JobCoreConfiguration {
}
return this;
}
-
+
/**
* Set job parameter.
*
@@ -118,7 +121,7 @@ public final class JobCoreConfiguration {
}
return this;
}
-
+
/**
* Set enable failover.
*
@@ -134,7 +137,7 @@ public final class JobCoreConfiguration {
this.failover = failover;
return this;
}
-
+
/**
* Set enable misfire.
*
@@ -146,34 +149,43 @@ public final class JobCoreConfiguration {
this.misfire = misfire;
return this;
}
-
+
/**
- * Set job description.
+ * Set job executor service handler type.
*
- * @param description job description
+ * @param jobExecutorServiceHandlerType job executor service handler type
+ * @return job configuration builder
+ */
+ public Builder jobExecutorServiceHandlerType(final String jobExecutorServiceHandlerType) {
+ this.jobExecutorServiceHandlerType = jobExecutorServiceHandlerType;
+ return this;
+ }
+
+ /**
+ * Set job error handler type.
*
+ * @param jobErrorHandlerType job error handler type
* @return job configuration builder
*/
- public Builder description(final String description) {
- if (null != description) {
- this.description = description;
- }
+ public Builder jobErrorHandlerType(final String jobErrorHandlerType) {
+ this.jobErrorHandlerType = jobErrorHandlerType;
return this;
}
-
+
/**
- * Set job properties.
+ * Set job description.
*
- * @param key property key
- * @param value property value
+ * @param description job description
*
* @return job configuration builder
*/
- public Builder jobProperties(final String key, final String value) {
- jobProperties.put(key, value);
+ public Builder description(final String description) {
+ if (null != description) {
+ this.description = description;
+ }
return this;
}
-
+
/**
* Build Job.
*
@@ -182,7 +194,8 @@ public final class JobCoreConfiguration {
public final JobCoreConfiguration build() {
Preconditions.checkArgument(!Strings.isNullOrEmpty(jobName), "jobName can not be empty.");
Preconditions.checkArgument(shardingTotalCount > 0, "shardingTotalCount should larger than zero.");
- return new JobCoreConfiguration(jobName, cron, shardingTotalCount, shardingItemParameters, jobParameter, failover, misfire, description, jobProperties);
+ return new JobCoreConfiguration(
+ jobName, cron, shardingTotalCount, shardingItemParameters, jobParameter, failover, misfire, jobExecutorServiceHandlerType, jobErrorHandlerType, description);
}
}
}
diff --git a/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/config/JobTypeConfiguration.java b/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/config/JobTypeConfiguration.java
index 0c8bfda..af4bc4b 100755
--- a/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/config/JobTypeConfiguration.java
+++ b/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/config/JobTypeConfiguration.java
@@ -23,7 +23,7 @@ import org.apache.shardingsphere.elasticjob.cloud.api.JobType;
* Job type configuration.
*/
public interface JobTypeConfiguration {
-
+
/**
* Get job type.
*
diff --git a/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/config/script/ScriptJobConfiguration.java b/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/config/script/ScriptJobConfiguration.java
index acd0289..7ce6448 100755
--- a/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/config/script/ScriptJobConfiguration.java
+++ b/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/config/script/ScriptJobConfiguration.java
@@ -18,7 +18,6 @@
package org.apache.shardingsphere.elasticjob.cloud.config.script;
import org.apache.shardingsphere.elasticjob.cloud.api.JobType;
-import org.apache.shardingsphere.elasticjob.cloud.api.script.ScriptJob;
import org.apache.shardingsphere.elasticjob.cloud.config.JobTypeConfiguration;
import org.apache.shardingsphere.elasticjob.cloud.config.JobCoreConfiguration;
import lombok.Getter;
@@ -35,7 +34,7 @@ public final class ScriptJobConfiguration implements JobTypeConfiguration {
private final JobType jobType = JobType.SCRIPT;
- private final String jobClass = ScriptJob.class.getCanonicalName();
+ private final String jobClass = "SCRIPT";
private final String scriptCommandLine;
}
diff --git a/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/exception/AppConfigurationException.java b/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/exception/AppConfigurationException.java
index 1e6c661..1bbec80 100755
--- a/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/exception/AppConfigurationException.java
+++ b/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/exception/AppConfigurationException.java
@@ -27,8 +27,4 @@ public final class AppConfigurationException extends RuntimeException {
public AppConfigurationException(final String errorMessage, final Object... args) {
super(String.format(errorMessage, args));
}
-
- public AppConfigurationException(final Throwable cause) {
- super(cause);
- }
}
diff --git a/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/executor/AbstractElasticJobExecutor.java b/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/executor/AbstractElasticJobExecutor.java
deleted file mode 100755
index d05f61a..0000000
--- a/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/executor/AbstractElasticJobExecutor.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.elasticjob.cloud.executor;
-
-import lombok.AccessLevel;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.elasticjob.api.ShardingContext;
-import org.apache.shardingsphere.elasticjob.api.listener.ShardingContexts;
-import org.apache.shardingsphere.elasticjob.cloud.config.JobTypeConfiguration;
-import org.apache.shardingsphere.elasticjob.cloud.executor.handler.ExecutorServiceHandler;
-import org.apache.shardingsphere.elasticjob.cloud.executor.handler.ExecutorServiceHandlerRegistry;
-import org.apache.shardingsphere.elasticjob.cloud.executor.handler.JobExceptionHandler;
-import org.apache.shardingsphere.elasticjob.cloud.executor.handler.JobProperties;
-import org.apache.shardingsphere.elasticjob.infra.env.IpUtils;
-import org.apache.shardingsphere.elasticjob.infra.exception.ExceptionUtils;
-import org.apache.shardingsphere.elasticjob.infra.exception.JobExecutionEnvironmentException;
-import org.apache.shardingsphere.elasticjob.infra.exception.JobSystemException;
-import org.apache.shardingsphere.elasticjob.tracing.event.JobExecutionEvent;
-import org.apache.shardingsphere.elasticjob.tracing.event.JobExecutionEvent.ExecutionSource;
-import org.apache.shardingsphere.elasticjob.tracing.event.JobStatusTraceEvent.State;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-
-/**
- * ElasticJob executor.
- */
-@Slf4j
-public abstract class AbstractElasticJobExecutor {
-
- @Getter(AccessLevel.PROTECTED)
- private final JobFacade jobFacade;
-
- @Getter(AccessLevel.PROTECTED)
- private final JobTypeConfiguration jobConfig;
-
- private final String jobName;
-
- private final ExecutorService executorService;
-
- private final JobExceptionHandler jobExceptionHandler;
-
- private final Map<Integer, String> itemErrorMessages;
-
- protected AbstractElasticJobExecutor(final JobFacade jobFacade) {
- this.jobFacade = jobFacade;
- jobConfig = jobFacade.loadJobRootConfiguration(true);
- jobName = jobConfig.getCoreConfig().getJobName();
- executorService = ExecutorServiceHandlerRegistry.getExecutorServiceHandler(jobName, (ExecutorServiceHandler) getHandler(JobProperties.JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER));
- jobExceptionHandler = (JobExceptionHandler) getHandler(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER);
- itemErrorMessages = new ConcurrentHashMap<>(jobConfig.getCoreConfig().getShardingTotalCount(), 1);
- }
-
- private Object getHandler(final JobProperties.JobPropertiesEnum jobPropertiesEnum) {
- String handlerClassName = jobConfig.getCoreConfig().getJobProperties().get(jobPropertiesEnum);
- try {
- Class<?> handlerClass = Class.forName(handlerClassName);
- if (jobPropertiesEnum.getClassType().isAssignableFrom(handlerClass)) {
- return handlerClass.newInstance();
- }
- return getDefaultHandler(jobPropertiesEnum, handlerClassName);
- } catch (final ReflectiveOperationException ex) {
- return getDefaultHandler(jobPropertiesEnum, handlerClassName);
- }
- }
-
- private Object getDefaultHandler(final JobProperties.JobPropertiesEnum jobPropertiesEnum, final String handlerClassName) {
- log.warn("Cannot instantiation class '{}', use default '{}' class.", handlerClassName, jobPropertiesEnum.getKey());
- try {
- return Class.forName(jobPropertiesEnum.getDefaultValue()).newInstance();
- } catch (final ClassNotFoundException | InstantiationException | IllegalAccessException e) {
- throw new JobSystemException(e);
- }
- }
-
- /**
- * Execute job.
- */
- public final void execute() {
- try {
- jobFacade.checkJobExecutionEnvironment();
- } catch (final JobExecutionEnvironmentException cause) {
- jobExceptionHandler.handleException(jobName, cause);
- }
- ShardingContexts shardingContexts = jobFacade.getShardingContexts();
- if (shardingContexts.isAllowSendJobEvent()) {
- jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, String.format("Job '%s' execute begin.", jobName));
- }
- if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) {
- if (shardingContexts.isAllowSendJobEvent()) {
- jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format(
- "Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobName,
- shardingContexts.getShardingItemParameters().keySet()));
- }
- return;
- }
- try {
- jobFacade.beforeJobExecuted(shardingContexts);
- //CHECKSTYLE:OFF
- } catch (final Throwable cause) {
- //CHECKSTYLE:ON
- jobExceptionHandler.handleException(jobName, cause);
- }
- execute(shardingContexts, ExecutionSource.NORMAL_TRIGGER);
- while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
- jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
- execute(shardingContexts, ExecutionSource.MISFIRE);
- }
- jobFacade.failoverIfNecessary();
- try {
- jobFacade.afterJobExecuted(shardingContexts);
- //CHECKSTYLE:OFF
- } catch (final Throwable cause) {
- //CHECKSTYLE:ON
- jobExceptionHandler.handleException(jobName, cause);
- }
- }
-
- private void execute(final ShardingContexts shardingContexts, final ExecutionSource executionSource) {
- if (shardingContexts.getShardingItemParameters().isEmpty()) {
- if (shardingContexts.isAllowSendJobEvent()) {
- jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format("Sharding item for job '%s' is empty.", jobName));
- }
- return;
- }
- jobFacade.registerJobBegin(shardingContexts);
- String taskId = shardingContexts.getTaskId();
- if (shardingContexts.isAllowSendJobEvent()) {
- jobFacade.postJobStatusTraceEvent(taskId, State.TASK_RUNNING, "");
- }
- try {
- process(shardingContexts, executionSource);
- } finally {
- // TODO Consider increasing the status of job failure, and how to handle the overall loop of job failure
- jobFacade.registerJobCompleted(shardingContexts);
- if (itemErrorMessages.isEmpty()) {
- if (shardingContexts.isAllowSendJobEvent()) {
- jobFacade.postJobStatusTraceEvent(taskId, State.TASK_FINISHED, "");
- }
- } else {
- if (shardingContexts.isAllowSendJobEvent()) {
- jobFacade.postJobStatusTraceEvent(taskId, State.TASK_ERROR, itemErrorMessages.toString());
- }
- }
- }
- }
-
- private void process(final ShardingContexts shardingContexts, final ExecutionSource executionSource) {
- Collection<Integer> items = shardingContexts.getShardingItemParameters().keySet();
- if (1 == items.size()) {
- int item = shardingContexts.getShardingItemParameters().keySet().iterator().next();
- JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(IpUtils.getHostName(), IpUtils.getIp(), shardingContexts.getTaskId(), jobName, executionSource, item);
- process(shardingContexts, item, jobExecutionEvent);
- return;
- }
- final CountDownLatch latch = new CountDownLatch(items.size());
- for (final int each : items) {
- final JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(IpUtils.getHostName(), IpUtils.getIp(), shardingContexts.getTaskId(), jobName, executionSource, each);
- if (executorService.isShutdown()) {
- return;
- }
- executorService.submit(() -> {
- try {
- process(shardingContexts, each, jobExecutionEvent);
- } finally {
- latch.countDown();
- }
- });
- }
- try {
- latch.await();
- } catch (final InterruptedException ex) {
- Thread.currentThread().interrupt();
- }
- }
-
- private void process(final ShardingContexts shardingContexts, final int item, final JobExecutionEvent startEvent) {
- if (shardingContexts.isAllowSendJobEvent()) {
- jobFacade.postJobExecutionEvent(startEvent);
- }
- log.trace("Job '{}' executing, item is: '{}'.", jobName, item);
- JobExecutionEvent completeEvent;
- try {
- process(shardingContexts.createShardingContext(item));
- completeEvent = startEvent.executionSuccess();
- log.trace("Job '{}' executed, item is: '{}'.", jobName, item);
- if (shardingContexts.isAllowSendJobEvent()) {
- jobFacade.postJobExecutionEvent(completeEvent);
- }
- // CHECKSTYLE:OFF
- } catch (final Throwable cause) {
- // CHECKSTYLE:ON
- String errorMessage = ExceptionUtils.transform(cause);
- completeEvent = startEvent.executionFailure(errorMessage);
- jobFacade.postJobExecutionEvent(completeEvent);
- itemErrorMessages.put(item, errorMessage);
- jobExceptionHandler.handleException(jobName, cause);
- }
- }
-
- protected abstract void process(ShardingContext shardingContext);
-}
diff --git a/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/executor/JobExecutorFactory.java b/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/executor/JobExecutorFactory.java
deleted file mode 100755
index 03a266d..0000000
--- a/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/executor/JobExecutorFactory.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.elasticjob.cloud.executor;
-
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.elasticjob.cloud.api.ElasticJob;
-import org.apache.shardingsphere.elasticjob.cloud.api.dataflow.DataflowJob;
-import org.apache.shardingsphere.elasticjob.cloud.api.simple.SimpleJob;
-import org.apache.shardingsphere.elasticjob.cloud.executor.type.DataflowJobExecutor;
-import org.apache.shardingsphere.elasticjob.cloud.executor.type.ScriptJobExecutor;
-import org.apache.shardingsphere.elasticjob.cloud.executor.type.SimpleJobExecutor;
-import org.apache.shardingsphere.elasticjob.infra.exception.JobConfigurationException;
-
-/**
- * Job executor factory.
- */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class JobExecutorFactory {
-
- /**
- * Get job executor.
- *
- * @param elasticJob elasticJob object
- * @param jobFacade job facade
- * @return job executor
- */
- @SuppressWarnings("unchecked")
- public static AbstractElasticJobExecutor getJobExecutor(final ElasticJob elasticJob, final JobFacade jobFacade) {
- if (null == elasticJob) {
- return new ScriptJobExecutor(jobFacade);
- }
- if (elasticJob instanceof SimpleJob) {
- return new SimpleJobExecutor((SimpleJob) elasticJob, jobFacade);
- }
- if (elasticJob instanceof DataflowJob) {
- return new DataflowJobExecutor((DataflowJob) elasticJob, jobFacade);
- }
- throw new JobConfigurationException("Cannot support job type '%s'", elasticJob.getClass().getCanonicalName());
- }
-}
diff --git a/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/executor/handler/ExecutorServiceHandler.java b/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/executor/handler/ExecutorServiceHandler.java
index 0018d36..21b995f 100755
--- a/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/executor/handler/ExecutorServiceHandler.java
+++ b/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/executor/handler/ExecutorServiceHandler.java
@@ -23,7 +23,7 @@ import java.util.concurrent.ExecutorService;
* Executor service handler.
*/
public interface ExecutorServiceHandler {
-
+
/**
* Create executor service.
*
diff --git a/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/executor/handler/JobProperties.java b/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/executor/handler/JobProperties.java
deleted file mode 100755
index 2fbf548..0000000
--- a/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/executor/handler/JobProperties.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.elasticjob.cloud.executor.handler;
-
-import org.apache.shardingsphere.elasticjob.cloud.executor.handler.impl.DefaultJobExceptionHandler;
-import org.apache.shardingsphere.elasticjob.cloud.util.json.GsonFactory;
-import org.apache.shardingsphere.elasticjob.cloud.executor.handler.impl.DefaultExecutorServiceHandler;
-import lombok.AllArgsConstructor;
-import lombok.Getter;
-import lombok.NoArgsConstructor;
-import lombok.RequiredArgsConstructor;
-
-import java.util.EnumMap;
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-/**
- * Job properties.
- */
-@AllArgsConstructor
-@NoArgsConstructor
-public final class JobProperties {
-
- private EnumMap<JobPropertiesEnum, String> map = new EnumMap<>(JobPropertiesEnum.class);
-
- /**
- * Put job property.
- *
- * @param key property key
- * @param value property value
- */
- public void put(final String key, final String value) {
- JobPropertiesEnum jobPropertiesEnum = JobPropertiesEnum.from(key);
- if (null == jobPropertiesEnum || null == value) {
- return;
- }
- map.put(jobPropertiesEnum, value);
- }
-
- /**
- * Get job property.
- *
- * @param jobPropertiesEnum job properties enum
- * @return property value
- */
- public String get(final JobPropertiesEnum jobPropertiesEnum) {
- return map.containsKey(jobPropertiesEnum) ? map.get(jobPropertiesEnum) : jobPropertiesEnum.getDefaultValue();
- }
-
- /**
- * Get all keys.
- *
- * @return all keys
- */
- public String json() {
- Map<String, String> jsonMap = new LinkedHashMap<>(JobPropertiesEnum.values().length, 1);
- for (JobPropertiesEnum each : JobPropertiesEnum.values()) {
- jsonMap.put(each.getKey(), get(each));
- }
- return GsonFactory.getGson().toJson(jsonMap);
- }
-
- /**
- * Job properties enum.
- */
- @RequiredArgsConstructor
- @Getter
- public enum JobPropertiesEnum {
-
- /**
- * Job execution handler.
- */
- JOB_EXCEPTION_HANDLER("job_exception_handler", JobExceptionHandler.class, DefaultJobExceptionHandler.class.getCanonicalName()),
-
- /**
- * Executor service handler.
- */
- EXECUTOR_SERVICE_HANDLER("executor_service_handler", ExecutorServiceHandler.class, DefaultExecutorServiceHandler.class.getCanonicalName());
-
- private final String key;
-
- private final Class<?> classType;
-
- private final String defaultValue;
-
- /**
- * Get job properties enum via key.
- *
- * @param key property key
- * @return job properties enum
- */
- public static JobPropertiesEnum from(final String key) {
- for (JobPropertiesEnum each : JobPropertiesEnum.values()) {
- if (each.getKey().equals(key)) {
- return each;
- }
- }
- return null;
- }
- }
-}
diff --git a/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/executor/type/DataflowJobExecutor.java b/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/executor/type/DataflowJobExecutor.java
deleted file mode 100755
index e07f900..0000000
--- a/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/executor/type/DataflowJobExecutor.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.elasticjob.cloud.executor.type;
-
-import org.apache.shardingsphere.elasticjob.api.ShardingContext;
-import org.apache.shardingsphere.elasticjob.cloud.api.dataflow.DataflowJob;
-import org.apache.shardingsphere.elasticjob.cloud.config.dataflow.DataflowJobConfiguration;
-import org.apache.shardingsphere.elasticjob.cloud.executor.AbstractElasticJobExecutor;
-import org.apache.shardingsphere.elasticjob.cloud.executor.JobFacade;
-
-import java.util.List;
-
-/**
- * Dataflow job executor.
- */
-public final class DataflowJobExecutor extends AbstractElasticJobExecutor {
-
- private final DataflowJob<Object> dataflowJob;
-
- public DataflowJobExecutor(final DataflowJob<Object> dataflowJob, final JobFacade jobFacade) {
- super(jobFacade);
- this.dataflowJob = dataflowJob;
- }
-
- @Override
- protected void process(final ShardingContext shardingContext) {
- DataflowJobConfiguration dataflowConfig = (DataflowJobConfiguration) getJobConfig();
- if (dataflowConfig.isStreamingProcess()) {
- streamingExecute(shardingContext, dataflowConfig);
- } else {
- oneOffExecute(shardingContext);
- }
- }
-
- private void streamingExecute(final ShardingContext shardingContext, final DataflowJobConfiguration dataflowConfig) {
- List<Object> data = fetchData(shardingContext);
- while (null != data && !data.isEmpty()) {
- processData(shardingContext, data);
- if (!dataflowConfig.isStreamingProcess()) {
- break;
- }
- data = fetchData(shardingContext);
- }
- }
-
- private void oneOffExecute(final ShardingContext shardingContext) {
- List<Object> data = fetchData(shardingContext);
- if (null != data && !data.isEmpty()) {
- processData(shardingContext, data);
- }
- }
-
- private List<Object> fetchData(final ShardingContext shardingContext) {
- return dataflowJob.fetchData(shardingContext);
- }
-
- private void processData(final ShardingContext shardingContext, final List<Object> data) {
- dataflowJob.processData(shardingContext, data);
- }
-}
diff --git a/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/executor/type/ScriptJobExecutor.java b/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/executor/type/ScriptJobExecutor.java
deleted file mode 100755
index 071ee00..0000000
--- a/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/executor/type/ScriptJobExecutor.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.elasticjob.cloud.executor.type;
-
-import com.google.common.base.Strings;
-import org.apache.commons.exec.CommandLine;
-import org.apache.commons.exec.DefaultExecutor;
-import org.apache.shardingsphere.elasticjob.api.ShardingContext;
-import org.apache.shardingsphere.elasticjob.cloud.config.script.ScriptJobConfiguration;
-import org.apache.shardingsphere.elasticjob.cloud.executor.AbstractElasticJobExecutor;
-import org.apache.shardingsphere.elasticjob.cloud.executor.JobFacade;
-import org.apache.shardingsphere.elasticjob.cloud.util.json.GsonFactory;
-import org.apache.shardingsphere.elasticjob.infra.exception.JobConfigurationException;
-
-import java.io.IOException;
-
-/**
- * Script job executor.
- */
-public final class ScriptJobExecutor extends AbstractElasticJobExecutor {
-
- public ScriptJobExecutor(final JobFacade jobFacade) {
- super(jobFacade);
- }
-
- @Override
- protected void process(final ShardingContext shardingContext) {
- final String scriptCommandLine = ((ScriptJobConfiguration) getJobConfig()).getScriptCommandLine();
- if (Strings.isNullOrEmpty(scriptCommandLine)) {
- throw new JobConfigurationException("Cannot find script command line for job '%s', job is not executed.", shardingContext.getJobName());
- }
- executeScript(shardingContext, scriptCommandLine);
- }
-
- private void executeScript(final ShardingContext shardingContext, final String scriptCommandLine) {
- CommandLine commandLine = CommandLine.parse(scriptCommandLine);
- commandLine.addArgument(GsonFactory.getGson().toJson(shardingContext), false);
- try {
- new DefaultExecutor().execute(commandLine);
- } catch (final IOException ex) {
- throw new JobConfigurationException("Execute script failure.", ex);
- }
- }
-}
diff --git a/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/executor/type/SimpleJobExecutor.java b/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/executor/type/SimpleJobExecutor.java
deleted file mode 100755
index 7fb8a61..0000000
--- a/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/executor/type/SimpleJobExecutor.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.elasticjob.cloud.executor.type;
-
-import org.apache.shardingsphere.elasticjob.api.ShardingContext;
-import org.apache.shardingsphere.elasticjob.cloud.api.simple.SimpleJob;
-import org.apache.shardingsphere.elasticjob.cloud.executor.AbstractElasticJobExecutor;
-import org.apache.shardingsphere.elasticjob.cloud.executor.JobFacade;
-
-/**
- * Simple job executor.
- */
-public final class SimpleJobExecutor extends AbstractElasticJobExecutor {
-
- private final SimpleJob simpleJob;
-
- public SimpleJobExecutor(final SimpleJob simpleJob, final JobFacade jobFacade) {
- super(jobFacade);
- this.simpleJob = simpleJob;
- }
-
- @Override
- protected void process(final ShardingContext shardingContext) {
- simpleJob.execute(shardingContext);
- }
-}
diff --git a/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/util/json/AbstractJobConfigurationGsonTypeAdapter.java b/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/util/json/AbstractJobConfigurationGsonTypeAdapter.java
index c7652cc..2aee317 100755
--- a/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/util/json/AbstractJobConfigurationGsonTypeAdapter.java
+++ b/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/util/json/AbstractJobConfigurationGsonTypeAdapter.java
@@ -23,13 +23,12 @@ import com.google.gson.TypeAdapter;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonWriter;
import org.apache.shardingsphere.elasticjob.cloud.api.JobType;
-import org.apache.shardingsphere.elasticjob.cloud.config.JobTypeConfiguration;
-import org.apache.shardingsphere.elasticjob.cloud.config.script.ScriptJobConfiguration;
import org.apache.shardingsphere.elasticjob.cloud.config.JobCoreConfiguration;
import org.apache.shardingsphere.elasticjob.cloud.config.JobRootConfiguration;
+import org.apache.shardingsphere.elasticjob.cloud.config.JobTypeConfiguration;
import org.apache.shardingsphere.elasticjob.cloud.config.dataflow.DataflowJobConfiguration;
+import org.apache.shardingsphere.elasticjob.cloud.config.script.ScriptJobConfiguration;
import org.apache.shardingsphere.elasticjob.cloud.config.simple.SimpleJobConfiguration;
-import org.apache.shardingsphere.elasticjob.cloud.executor.handler.JobProperties;
import java.io.IOException;
import java.util.HashMap;
@@ -50,9 +49,10 @@ public abstract class AbstractJobConfigurationGsonTypeAdapter<T extends JobRootC
String shardingItemParameters = "";
String jobParameter = "";
boolean failover = false;
- boolean misfire = failover;
+ boolean misfire = false;
+ String jobExecutorServiceHandlerType = "";
+ String jobErrorHandlerType = "";
String description = "";
- JobProperties jobProperties = new JobProperties();
JobType jobType = null;
String jobClass = "";
boolean streamingProcess = false;
@@ -83,12 +83,15 @@ public abstract class AbstractJobConfigurationGsonTypeAdapter<T extends JobRootC
case "misfire":
misfire = in.nextBoolean();
break;
+ case "jobExecutorServiceHandlerType":
+ jobExecutorServiceHandlerType = in.nextString();
+ break;
+ case "jobErrorHandlerType":
+ jobErrorHandlerType = in.nextString();
+ break;
case "description":
description = in.nextString();
break;
- case "jobProperties":
- jobProperties = getJobProperties(in);
- break;
case "jobType":
jobType = JobType.valueOf(in.nextString());
break;
@@ -108,40 +111,19 @@ public abstract class AbstractJobConfigurationGsonTypeAdapter<T extends JobRootC
}
in.endObject();
JobCoreConfiguration coreConfig = getJobCoreConfiguration(jobName, cron, shardingTotalCount, shardingItemParameters,
- jobParameter, failover, misfire, description, jobProperties);
+ jobParameter, failover, misfire, jobExecutorServiceHandlerType, jobErrorHandlerType, description);
JobTypeConfiguration typeConfig = getJobTypeConfiguration(coreConfig, jobType, jobClass, streamingProcess, scriptCommandLine);
return getJobRootConfiguration(typeConfig, customizedValueMap);
}
- private JobProperties getJobProperties(final JsonReader in) throws IOException {
- JobProperties result = new JobProperties();
- in.beginObject();
- while (in.hasNext()) {
- switch (in.nextName()) {
- case "job_exception_handler":
- result.put(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER.getKey(), in.nextString());
- break;
- case "executor_service_handler":
- result.put(JobProperties.JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER.getKey(), in.nextString());
- break;
- default:
- break;
- }
- }
- in.endObject();
- return result;
- }
-
protected abstract void addToCustomizedValueMap(String jsonName, JsonReader in, Map<String, Object> customizedValueMap) throws IOException;
private JobCoreConfiguration getJobCoreConfiguration(final String jobName, final String cron, final int shardingTotalCount,
- final String shardingItemParameters, final String jobParameter, final boolean failover,
- final boolean misfire, final String description,
- final JobProperties jobProperties) {
+ final String shardingItemParameters, final String jobParameter, final boolean failover, final boolean misfire,
+ final String jobExecutorServiceHandlerType, final String jobErrorHandlerType, final String description) {
return JobCoreConfiguration.newBuilder(jobName, cron, shardingTotalCount)
- .shardingItemParameters(shardingItemParameters).jobParameter(jobParameter).failover(failover).misfire(misfire).description(description)
- .jobProperties(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER.getKey(), jobProperties.get(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER))
- .jobProperties(JobProperties.JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER.getKey(), jobProperties.get(JobProperties.JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER))
+ .shardingItemParameters(shardingItemParameters).jobParameter(jobParameter).failover(failover).misfire(misfire)
+ .jobExecutorServiceHandlerType(jobExecutorServiceHandlerType).jobErrorHandlerType(jobErrorHandlerType).description(description)
.build();
}
@@ -176,8 +158,13 @@ public abstract class AbstractJobConfigurationGsonTypeAdapter<T extends JobRootC
out.name("jobParameter").value(value.getTypeConfig().getCoreConfig().getJobParameter());
out.name("failover").value(value.getTypeConfig().getCoreConfig().isFailover());
out.name("misfire").value(value.getTypeConfig().getCoreConfig().isMisfire());
+ if (!Strings.isNullOrEmpty(value.getTypeConfig().getCoreConfig().getJobExecutorServiceHandlerType())) {
+ out.name("executorServiceHandler").value(value.getTypeConfig().getCoreConfig().getJobExecutorServiceHandlerType());
+ }
+ if (!Strings.isNullOrEmpty(value.getTypeConfig().getCoreConfig().getJobErrorHandlerType())) {
+ out.name("jobExceptionHandler").value(value.getTypeConfig().getCoreConfig().getJobErrorHandlerType());
+ }
out.name("description").value(value.getTypeConfig().getCoreConfig().getDescription());
- out.name("jobProperties").jsonValue(value.getTypeConfig().getCoreConfig().getJobProperties().json());
if (value.getTypeConfig().getJobType() == JobType.DATAFLOW) {
DataflowJobConfiguration dataflowJobConfig = (DataflowJobConfiguration) value.getTypeConfig();
out.name("streamingProcess").value(dataflowJobConfig.isStreamingProcess());
diff --git a/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/util/json/GsonFactory.java b/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/util/json/GsonFactory.java
index 6297965..0e70d61 100755
--- a/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/util/json/GsonFactory.java
+++ b/elasticjob-cloud/elasticjob-cloud-common/src/main/java/org/apache/shardingsphere/elasticjob/cloud/util/json/GsonFactory.java
@@ -34,7 +34,7 @@ public final class GsonFactory {
private static final GsonBuilder GSON_BUILDER = new GsonBuilder();
private static volatile Gson gson = GSON_BUILDER.create();
-
+
/**
* Register type adapter.
*
@@ -45,7 +45,7 @@ public final class GsonFactory {
GSON_BUILDER.registerTypeAdapter(type, typeAdapter);
gson = GSON_BUILDER.create();
}
-
+
/**
* Get gson instance.
*
diff --git a/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/config/JobCoreConfigurationTest.java b/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/config/JobCoreConfigurationTest.java
index 17390a7..e31e27f 100755
--- a/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/config/JobCoreConfigurationTest.java
+++ b/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/config/JobCoreConfigurationTest.java
@@ -17,9 +17,6 @@
package org.apache.shardingsphere.elasticjob.cloud.config;
-import org.apache.shardingsphere.elasticjob.cloud.executor.handler.JobProperties;
-import org.apache.shardingsphere.elasticjob.cloud.executor.handler.impl.DefaultJobExceptionHandler;
-import org.apache.shardingsphere.elasticjob.cloud.fixture.handler.IgnoreJobExceptionHandler;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.is;
@@ -32,15 +29,14 @@ public final class JobCoreConfigurationTest {
@Test
public void assertBuildAllProperties() {
JobCoreConfiguration actual = JobCoreConfiguration.newBuilder("test_job", "0/1 * * * * ?", 3)
- .shardingItemParameters("0=a,1=b,2=c").jobParameter("param").failover(true).misfire(false).description("desc")
- .jobProperties("job_exception_handler", IgnoreJobExceptionHandler.class.getName()).build();
+ .shardingItemParameters("0=a,1=b,2=c").jobParameter("param").failover(true).misfire(false).description("desc").jobErrorHandlerType("IGNORE").build();
assertRequiredProperties(actual);
assertThat(actual.getShardingItemParameters(), is("0=a,1=b,2=c"));
assertThat(actual.getJobParameter(), is("param"));
assertTrue(actual.isFailover());
assertFalse(actual.isMisfire());
assertThat(actual.getDescription(), is("desc"));
- assertThat(actual.getJobProperties().get(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER), is(IgnoreJobExceptionHandler.class.getName()));
+ assertThat(actual.getJobErrorHandlerType(), is("IGNORE"));
}
@Test
@@ -70,7 +66,6 @@ public final class JobCoreConfigurationTest {
assertFalse(actual.isFailover());
assertTrue(actual.isMisfire());
assertThat(actual.getDescription(), is(""));
- assertThat(actual.getJobProperties().get(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER), is(DefaultJobExceptionHandler.class.getName()));
}
@Test(expected = IllegalArgumentException.class)
diff --git a/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/JobExecutorFactoryTest.java b/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/JobExecutorFactoryTest.java
deleted file mode 100755
index b3a650a..0000000
--- a/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/JobExecutorFactoryTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.elasticjob.cloud.executor;
-
-import org.apache.shardingsphere.elasticjob.cloud.executor.type.DataflowJobExecutor;
-import org.apache.shardingsphere.elasticjob.cloud.executor.type.ScriptJobExecutor;
-import org.apache.shardingsphere.elasticjob.cloud.executor.type.SimpleJobExecutor;
-import org.apache.shardingsphere.elasticjob.cloud.fixture.config.TestDataflowJobConfiguration;
-import org.apache.shardingsphere.elasticjob.cloud.fixture.config.TestScriptJobConfiguration;
-import org.apache.shardingsphere.elasticjob.cloud.fixture.config.TestSimpleJobConfiguration;
-import org.apache.shardingsphere.elasticjob.cloud.fixture.handler.IgnoreJobExceptionHandler;
-import org.apache.shardingsphere.elasticjob.cloud.fixture.job.OtherJob;
-import org.apache.shardingsphere.elasticjob.cloud.fixture.job.TestDataflowJob;
-import org.apache.shardingsphere.elasticjob.cloud.fixture.job.TestSimpleJob;
-import org.apache.shardingsphere.elasticjob.infra.exception.JobConfigurationException;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.when;
-
-@RunWith(MockitoJUnitRunner.class)
-public final class JobExecutorFactoryTest {
-
- @Mock
- private JobFacade jobFacade;
-
- @Test
- public void assertGetJobExecutorForScriptJob() {
- when(jobFacade.loadJobRootConfiguration(true)).thenReturn(new TestScriptJobConfiguration("test.sh", IgnoreJobExceptionHandler.class).getTypeConfig());
- assertThat(JobExecutorFactory.getJobExecutor(null, jobFacade), instanceOf(ScriptJobExecutor.class));
- }
-
- @Test
- public void assertGetJobExecutorForSimpleJob() {
- when(jobFacade.loadJobRootConfiguration(true)).thenReturn(new TestSimpleJobConfiguration().getTypeConfig());
- assertThat(JobExecutorFactory.getJobExecutor(new TestSimpleJob(null), jobFacade), instanceOf(SimpleJobExecutor.class));
- }
-
- @Test
- public void assertGetJobExecutorForDataflowJob() {
- when(jobFacade.loadJobRootConfiguration(true)).thenReturn(new TestDataflowJobConfiguration(false).getTypeConfig());
- assertThat(JobExecutorFactory.getJobExecutor(new TestDataflowJob(null), jobFacade), instanceOf(DataflowJobExecutor.class));
- }
-
- @Test(expected = JobConfigurationException.class)
- public void assertGetJobExecutorWhenJobClassWhenUnsupportedJob() {
- JobExecutorFactory.getJobExecutor(new OtherJob(), jobFacade);
- }
-
- @Test
- public void assertGetJobExecutorTwice() {
- when(jobFacade.loadJobRootConfiguration(true)).thenReturn(new TestDataflowJobConfiguration(false).getTypeConfig());
- AbstractElasticJobExecutor executor = JobExecutorFactory.getJobExecutor(new TestSimpleJob(null), jobFacade);
- AbstractElasticJobExecutor anotherExecutor = JobExecutorFactory.getJobExecutor(new TestSimpleJob(null), jobFacade);
- assertTrue(executor.hashCode() != anotherExecutor.hashCode());
- }
-}
diff --git a/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/handler/JobPropertiesTest.java b/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/handler/JobPropertiesTest.java
deleted file mode 100755
index 91c5906..0000000
--- a/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/handler/JobPropertiesTest.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.elasticjob.cloud.executor.handler;
-
-import org.apache.shardingsphere.elasticjob.cloud.executor.handler.impl.DefaultExecutorServiceHandler;
-import org.apache.shardingsphere.elasticjob.cloud.executor.handler.impl.DefaultJobExceptionHandler;
-import org.apache.shardingsphere.elasticjob.cloud.fixture.APIJsonConstants;
-import org.apache.shardingsphere.elasticjob.cloud.fixture.handler.IgnoreJobExceptionHandler;
-import org.junit.Test;
-import org.unitils.util.ReflectionUtils;
-
-import java.util.Map;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-public final class JobPropertiesTest {
-
- @Test
- public void assertPutInvalidKey() throws NoSuchFieldException {
- JobProperties actual = new JobProperties();
- actual.put("invalid_key", "");
- assertTrue(getMap(actual).isEmpty());
- }
-
- @Test
- public void assertPutNullValue() throws NoSuchFieldException {
- JobProperties actual = new JobProperties();
- actual.put(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER.getKey(), null);
- assertTrue(getMap(actual).isEmpty());
- }
-
- @Test
- public void assertPutSuccess() throws NoSuchFieldException {
- JobProperties actual = new JobProperties();
- actual.put(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER.getKey(), DefaultJobExceptionHandler.class.getCanonicalName());
- assertThat(getMap(actual).size(), is(1));
- }
-
- private Map getMap(final JobProperties jobProperties) throws NoSuchFieldException {
- return (Map) ReflectionUtils.getFieldValue(jobProperties, JobProperties.class.getDeclaredField("map"));
- }
-
- @Test
- public void assertGetWhenValueIsEmpty() {
- JobProperties actual = new JobProperties();
- assertThat(actual.get(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER), is(DefaultJobExceptionHandler.class.getCanonicalName()));
- assertThat(actual.get(JobProperties.JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER), is(DefaultExecutorServiceHandler.class.getCanonicalName()));
- }
-
- @Test
- public void assertGetWhenValueIsNotEmpty() {
- JobProperties actual = new JobProperties();
- actual.put(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER.getKey(), IgnoreJobExceptionHandler.class.getCanonicalName());
- assertThat(actual.get(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER), is(IgnoreJobExceptionHandler.class.getCanonicalName()));
- }
-
- @Test
- public void assertJson() {
- assertThat(new JobProperties().json(), is(APIJsonConstants.getJobPropertiesJson(DefaultJobExceptionHandler.class.getCanonicalName())));
- }
-
- @Test
- public void assertJobPropertiesEnumFromValidValue() {
- assertThat(JobProperties.JobPropertiesEnum.from(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER.getKey()), is(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER));
- }
-
- @Test
- public void assertJobPropertiesEnumFromInvalidValue() {
- assertNull(JobProperties.JobPropertiesEnum.from("invalid"));
- }
-}
diff --git a/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/type/DataflowJobExecutorTest.java b/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/type/DataflowJobExecutorTest.java
deleted file mode 100755
index 6fcc365..0000000
--- a/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/type/DataflowJobExecutorTest.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.elasticjob.cloud.executor.type;
-
-import org.apache.shardingsphere.elasticjob.api.listener.ShardingContexts;
-import org.apache.shardingsphere.elasticjob.cloud.executor.JobFacade;
-import org.apache.shardingsphere.elasticjob.cloud.fixture.ShardingContextsBuilder;
-import org.apache.shardingsphere.elasticjob.cloud.fixture.config.TestDataflowJobConfiguration;
-import org.apache.shardingsphere.elasticjob.cloud.fixture.job.JobCaller;
-import org.apache.shardingsphere.elasticjob.cloud.fixture.job.TestDataflowJob;
-import org.junit.After;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-import java.util.Arrays;
-import java.util.Collections;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-@RunWith(MockitoJUnitRunner.class)
-public final class DataflowJobExecutorTest {
-
- @Mock
- private JobCaller jobCaller;
-
- @Mock
- private JobFacade jobFacade;
-
- private ShardingContexts shardingContexts;
-
- private DataflowJobExecutor dataflowJobExecutor;
-
- @After
- public void tearDown() {
- verify(jobFacade).loadJobRootConfiguration(true);
- ElasticJobVerify.verifyForIsNotMisfire(jobFacade, shardingContexts);
- }
-
- @Test
- public void assertExecuteWhenFetchDataIsNullAndEmpty() {
- setUp(true, ShardingContextsBuilder.getMultipleShardingContexts());
- when(jobCaller.fetchData(0)).thenReturn(null);
- when(jobCaller.fetchData(1)).thenReturn(Collections.emptyList());
- dataflowJobExecutor.execute();
- verify(jobCaller).fetchData(0);
- verify(jobCaller).fetchData(1);
- verify(jobCaller, times(0)).processData(any());
- }
-
- @Test
- public void assertExecuteWhenFetchDataIsNotEmptyForUnStreamingProcessAndSingleShardingItem() {
- setUp(false, ShardingContextsBuilder.getSingleShardingContexts());
- doThrow(new IllegalStateException()).when(jobCaller).fetchData(0);
- dataflowJobExecutor.execute();
- verify(jobCaller).fetchData(0);
- verify(jobCaller, times(0)).processData(any());
- }
-
- @Test
- public void assertExecuteWhenFetchDataIsNotEmptyForUnStreamingProcessAndMultipleShardingItems() {
- setUp(false, ShardingContextsBuilder.getMultipleShardingContexts());
- when(jobCaller.fetchData(0)).thenReturn(Arrays.asList(1, 2));
- when(jobCaller.fetchData(1)).thenReturn(Arrays.asList(3, 4));
- doThrow(new IllegalStateException()).when(jobCaller).processData(4);
- dataflowJobExecutor.execute();
- verify(jobCaller).fetchData(0);
- verify(jobCaller).fetchData(1);
- verify(jobCaller).processData(1);
- verify(jobCaller).processData(2);
- verify(jobCaller).processData(3);
- verify(jobCaller).processData(4);
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void assertExecuteWhenFetchDataIsNotEmptyForStreamingProcessAndSingleShardingItem() {
- setUp(true, ShardingContextsBuilder.getSingleShardingContexts());
- when(jobCaller.fetchData(0)).thenReturn(Collections.singletonList(1), Collections.emptyList());
- dataflowJobExecutor.execute();
- verify(jobCaller, times(2)).fetchData(0);
- verify(jobCaller).processData(1);
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void assertExecuteWhenFetchDataIsNotEmptyForStreamingProcessAndMultipleShardingItems() {
- setUp(true, ShardingContextsBuilder.getMultipleShardingContexts());
- when(jobCaller.fetchData(0)).thenReturn(Collections.singletonList(1), Collections.emptyList());
- when(jobCaller.fetchData(1)).thenReturn(Collections.singletonList(2), Collections.emptyList());
- dataflowJobExecutor.execute();
- verify(jobCaller, times(2)).fetchData(0);
- verify(jobCaller, times(2)).fetchData(1);
- verify(jobCaller).processData(1);
- verify(jobCaller).processData(2);
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void assertExecuteWhenFetchDataIsNotEmptyAndProcessFailureWithExceptionForStreamingProcess() {
- setUp(true, ShardingContextsBuilder.getMultipleShardingContexts());
- when(jobCaller.fetchData(0)).thenReturn(Collections.singletonList(1), Collections.emptyList());
- when(jobCaller.fetchData(1)).thenReturn(Arrays.asList(2, 3), Collections.emptyList());
- doThrow(new IllegalStateException()).when(jobCaller).processData(2);
- dataflowJobExecutor.execute();
- verify(jobCaller, times(2)).fetchData(0);
- verify(jobCaller, times(1)).fetchData(1);
- verify(jobCaller).processData(1);
- verify(jobCaller).processData(2);
- verify(jobCaller, times(0)).processData(3);
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void assertExecuteWhenFetchDataIsNotEmptyAndIsEligibleForJobRunningForStreamingProcess() {
- setUp(true, ShardingContextsBuilder.getMultipleShardingContexts());
- when(jobCaller.fetchData(0)).thenReturn(Arrays.asList(1, 2), Collections.emptyList());
- when(jobCaller.fetchData(1)).thenReturn(Arrays.asList(3, 4), Collections.emptyList());
- doThrow(new IllegalStateException()).when(jobCaller).processData(4);
- dataflowJobExecutor.execute();
- verify(jobCaller, times(2)).fetchData(0);
- verify(jobCaller, times(1)).fetchData(1);
- verify(jobCaller).processData(1);
- verify(jobCaller).processData(2);
- verify(jobCaller).processData(3);
- verify(jobCaller).processData(4);
- }
-
- @Test
- public void assertExecuteWhenFetchDataIsNotEmptyAndIsNotEligibleForJobRunningForStreamingProcess() {
- setUp(false, ShardingContextsBuilder.getMultipleShardingContexts());
- when(jobCaller.fetchData(0)).thenReturn(Arrays.asList(1, 2));
- when(jobCaller.fetchData(1)).thenReturn(Arrays.asList(3, 4));
- doThrow(new IllegalStateException()).when(jobCaller).processData(4);
- dataflowJobExecutor.execute();
- verify(jobCaller).fetchData(0);
- verify(jobCaller).fetchData(1);
- verify(jobCaller).processData(1);
- verify(jobCaller).processData(2);
- verify(jobCaller).processData(3);
- verify(jobCaller).processData(4);
- }
-
- private void setUp(final boolean isStreamingProcess, final ShardingContexts shardingContexts) {
- this.shardingContexts = shardingContexts;
- when(jobFacade.loadJobRootConfiguration(true)).thenReturn(new TestDataflowJobConfiguration(isStreamingProcess).getTypeConfig());
- when(jobFacade.getShardingContexts()).thenReturn(shardingContexts);
- dataflowJobExecutor = new DataflowJobExecutor(new TestDataflowJob(jobCaller), jobFacade);
- ElasticJobVerify.prepareForIsNotMisfire(jobFacade, shardingContexts);
- }
-}
diff --git a/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/type/ElasticJobVerify.java b/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/type/ElasticJobVerify.java
deleted file mode 100755
index b0abb12..0000000
--- a/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/type/ElasticJobVerify.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.elasticjob.cloud.executor.type;
-
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.elasticjob.api.listener.ShardingContexts;
-import org.apache.shardingsphere.elasticjob.cloud.executor.JobFacade;
-import org.apache.shardingsphere.elasticjob.infra.exception.JobExecutionEnvironmentException;
-import org.apache.shardingsphere.elasticjob.tracing.event.JobStatusTraceEvent.State;
-
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-final class ElasticJobVerify {
-
- public static void prepareForIsNotMisfire(final JobFacade jobFacade, final ShardingContexts shardingContexts) {
- when(jobFacade.getShardingContexts()).thenReturn(shardingContexts);
- when(jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())).thenReturn(false);
- when(jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())).thenReturn(false);
- }
-
- public static void verifyForIsNotMisfire(final JobFacade jobFacade, final ShardingContexts shardingContexts) {
- try {
- verify(jobFacade).checkJobExecutionEnvironment();
- } catch (final JobExecutionEnvironmentException ex) {
- throw new RuntimeException(ex);
- }
- verify(jobFacade).getShardingContexts();
- verify(jobFacade).postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, "Job 'test_job' execute begin.");
- verify(jobFacade).misfireIfRunning(shardingContexts.getShardingItemParameters().keySet());
- verify(jobFacade).beforeJobExecuted(shardingContexts);
- verify(jobFacade).registerJobBegin(shardingContexts);
- verify(jobFacade).registerJobCompleted(shardingContexts);
- verify(jobFacade).isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet());
- verify(jobFacade).afterJobExecuted(shardingContexts);
- }
-}
diff --git a/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/type/ScriptJobExecutorTest.java b/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/type/ScriptJobExecutorTest.java
deleted file mode 100755
index 3c3bf1c..0000000
--- a/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/type/ScriptJobExecutorTest.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.elasticjob.cloud.executor.type;
-
-import org.apache.shardingsphere.elasticjob.api.listener.ShardingContexts;
-import org.apache.shardingsphere.elasticjob.cloud.executor.JobFacade;
-import org.apache.shardingsphere.elasticjob.cloud.fixture.ShardingContextsBuilder;
-import org.apache.shardingsphere.elasticjob.cloud.fixture.config.TestScriptJobConfiguration;
-import org.apache.shardingsphere.elasticjob.cloud.fixture.handler.IgnoreJobExceptionHandler;
-import org.apache.shardingsphere.elasticjob.cloud.fixture.handler.ThrowJobExceptionHandler;
-import org.apache.shardingsphere.elasticjob.infra.exception.JobSystemException;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-@RunWith(MockitoJUnitRunner.class)
-public final class ScriptJobExecutorTest {
-
- @Mock
- private JobFacade jobFacade;
-
- private ScriptJobExecutor scriptJobExecutor;
-
- @Test
- public void assertExecuteWhenCommandLineIsEmpty() {
- ElasticJobVerify.prepareForIsNotMisfire(jobFacade, ShardingContextsBuilder.getMultipleShardingContexts());
- when(jobFacade.loadJobRootConfiguration(true)).thenReturn(new TestScriptJobConfiguration("", IgnoreJobExceptionHandler.class).getTypeConfig());
- scriptJobExecutor = new ScriptJobExecutor(jobFacade);
- scriptJobExecutor.execute();
- }
-
- @Test(expected = JobSystemException.class)
- public void assertExecuteWhenExecuteFailureForSingleShardingItems() {
- assertExecuteWhenExecuteFailure(ShardingContextsBuilder.getSingleShardingContexts());
- }
-
- @Test
- public void assertExecuteWhenExecuteFailureForMultipleShardingItems() {
- assertExecuteWhenExecuteFailure(ShardingContextsBuilder.getMultipleShardingContexts());
- }
-
- private void assertExecuteWhenExecuteFailure(final ShardingContexts shardingContexts) {
- ElasticJobVerify.prepareForIsNotMisfire(jobFacade, shardingContexts);
- when(jobFacade.loadJobRootConfiguration(true)).thenReturn(new TestScriptJobConfiguration("not_exists_file", ThrowJobExceptionHandler.class).getTypeConfig());
- scriptJobExecutor = new ScriptJobExecutor(jobFacade);
- scriptJobExecutor.execute();
- }
-
- @Test
- public void assertExecuteSuccessForMultipleShardingItems() {
- assertExecuteSuccess(ShardingContextsBuilder.getMultipleShardingContexts());
- }
-
- @Test
- public void assertExecuteSuccessForSingleShardingItems() {
- assertExecuteSuccess(ShardingContextsBuilder.getSingleShardingContexts());
- }
-
- private void assertExecuteSuccess(final ShardingContexts shardingContexts) {
- ElasticJobVerify.prepareForIsNotMisfire(jobFacade, shardingContexts);
- when(jobFacade.loadJobRootConfiguration(true)).thenReturn(new TestScriptJobConfiguration("exists_file param0 param1", IgnoreJobExceptionHandler.class).getTypeConfig());
- scriptJobExecutor = new ScriptJobExecutor(jobFacade);
- scriptJobExecutor.execute();
- verify(jobFacade).loadJobRootConfiguration(true);
- }
-}
diff --git a/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/type/SimpleJobExecutorTest.java b/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/type/SimpleJobExecutorTest.java
deleted file mode 100755
index 2fd8221..0000000
--- a/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/type/SimpleJobExecutorTest.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.elasticjob.cloud.executor.type;
-
-import org.apache.shardingsphere.elasticjob.api.listener.ShardingContexts;
-import org.apache.shardingsphere.elasticjob.cloud.executor.AbstractElasticJobExecutor;
-import org.apache.shardingsphere.elasticjob.cloud.executor.JobFacade;
-import org.apache.shardingsphere.elasticjob.cloud.executor.handler.impl.DefaultExecutorServiceHandler;
-import org.apache.shardingsphere.elasticjob.cloud.executor.handler.impl.DefaultJobExceptionHandler;
-import org.apache.shardingsphere.elasticjob.cloud.fixture.ShardingContextsBuilder;
-import org.apache.shardingsphere.elasticjob.cloud.fixture.config.TestSimpleJobConfiguration;
-import org.apache.shardingsphere.elasticjob.cloud.fixture.job.JobCaller;
-import org.apache.shardingsphere.elasticjob.cloud.fixture.job.TestSimpleJob;
-import org.apache.shardingsphere.elasticjob.infra.exception.JobExecutionEnvironmentException;
-import org.apache.shardingsphere.elasticjob.infra.exception.JobSystemException;
-import org.apache.shardingsphere.elasticjob.tracing.event.JobStatusTraceEvent.State;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-import org.unitils.util.ReflectionUtils;
-
-import java.util.Collections;
-
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-@RunWith(MockitoJUnitRunner.class)
-public final class SimpleJobExecutorTest {
-
- @Mock
- private JobCaller jobCaller;
-
- @Mock
- private JobFacade jobFacade;
-
- private SimpleJobExecutor simpleJobExecutor;
-
- @Before
- public void setUp() {
- when(jobFacade.loadJobRootConfiguration(true)).thenReturn(new TestSimpleJobConfiguration().getTypeConfig());
- simpleJobExecutor = new SimpleJobExecutor(new TestSimpleJob(jobCaller), jobFacade);
- }
-
- @Test
- public void assertNewExecutorWithDefaultHandlers() throws NoSuchFieldException {
- when(jobFacade.loadJobRootConfiguration(true)).thenReturn(new TestSimpleJobConfiguration("ErrorHandler", Object.class.getName()).getTypeConfig());
- SimpleJobExecutor simpleJobExecutor = new SimpleJobExecutor(new TestSimpleJob(jobCaller), jobFacade);
- assertThat(ReflectionUtils.getFieldValue(simpleJobExecutor, AbstractElasticJobExecutor.class.getDeclaredField("executorService")),
- instanceOf(new DefaultExecutorServiceHandler().createExecutorService("test_job").getClass()));
- assertThat(ReflectionUtils.getFieldValue(simpleJobExecutor, AbstractElasticJobExecutor.class.getDeclaredField("jobExceptionHandler")),
- instanceOf(DefaultJobExceptionHandler.class));
- }
-
- @Test(expected = JobSystemException.class)
- public void assertExecuteWhenCheckMaxTimeDiffSecondsIntolerable() throws JobExecutionEnvironmentException {
- doThrow(JobExecutionEnvironmentException.class).when(jobFacade).checkJobExecutionEnvironment();
- try {
- simpleJobExecutor.execute();
- } finally {
- verify(jobFacade).checkJobExecutionEnvironment();
- verify(jobCaller, times(0)).execute();
- }
- }
-
- @Test
- public void assertExecuteWhenPreviousJobStillRunning() throws JobExecutionEnvironmentException {
- ShardingContexts shardingContexts = new ShardingContexts("fake_task_id", "test_job", 10, "", Collections.emptyMap());
- when(jobFacade.getShardingContexts()).thenReturn(shardingContexts);
- when(jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())).thenReturn(true);
- simpleJobExecutor.execute();
- verify(jobFacade).postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, "Job 'test_job' execute begin.");
- verify(jobFacade).postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED,
- "Previous job 'test_job' - shardingItems '[]' is still running, misfired job will start after previous job completed.");
- verify(jobFacade).checkJobExecutionEnvironment();
- verify(jobFacade).getShardingContexts();
- verify(jobFacade).misfireIfRunning(shardingContexts.getShardingItemParameters().keySet());
- verify(jobCaller, times(0)).execute();
- }
-
- @Test
- public void assertExecuteWhenShardingItemsIsEmpty() throws JobExecutionEnvironmentException {
- ShardingContexts shardingContexts = new ShardingContexts("fake_task_id", "test_job", 10, "", Collections.emptyMap());
- ElasticJobVerify.prepareForIsNotMisfire(jobFacade, shardingContexts);
- simpleJobExecutor.execute();
- verify(jobFacade).postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, "Job 'test_job' execute begin.");
- verify(jobFacade).postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, "Sharding item for job 'test_job' is empty.");
- verify(jobFacade).checkJobExecutionEnvironment();
- verify(jobFacade).getShardingContexts();
- verify(jobFacade).misfireIfRunning(shardingContexts.getShardingItemParameters().keySet());
- verify(jobCaller, times(0)).execute();
- }
-
- @Test(expected = JobSystemException.class)
- public void assertExecuteWhenRunOnceAndThrowExceptionForSingleShardingItem() throws JobExecutionEnvironmentException {
- assertExecuteWhenRunOnceAndThrowException(ShardingContextsBuilder.getSingleShardingContexts());
- }
-
- @Test
- public void assertExecuteWhenRunOnceAndThrowExceptionForMultipleShardingItems() throws JobExecutionEnvironmentException {
- assertExecuteWhenRunOnceAndThrowException(ShardingContextsBuilder.getMultipleShardingContexts());
- }
-
- private void assertExecuteWhenRunOnceAndThrowException(final ShardingContexts shardingContexts) throws JobExecutionEnvironmentException {
- ElasticJobVerify.prepareForIsNotMisfire(jobFacade, shardingContexts);
- doThrow(RuntimeException.class).when(jobCaller).execute();
- try {
- simpleJobExecutor.execute();
- } finally {
- verify(jobFacade).postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, "Job 'test_job' execute begin.");
- verify(jobFacade).postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_RUNNING, "");
- String errorMessage;
- String lineSeparator = System.getProperty("line.separator");
- if (1 == shardingContexts.getShardingItemParameters().size()) {
- errorMessage = "{0=java.lang.RuntimeException" + lineSeparator + "}";
- } else {
- errorMessage = "{0=java.lang.RuntimeException" + lineSeparator + ", 1=java.lang.RuntimeException" + lineSeparator + "}";
- }
- verify(jobFacade).postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_ERROR, errorMessage);
- verify(jobFacade).checkJobExecutionEnvironment();
- verify(jobFacade).getShardingContexts();
- verify(jobFacade).misfireIfRunning(shardingContexts.getShardingItemParameters().keySet());
- verify(jobFacade).registerJobBegin(shardingContexts);
- verify(jobCaller, times(shardingContexts.getShardingTotalCount())).execute();
- verify(jobFacade).registerJobCompleted(shardingContexts);
- }
- }
-
- @Test
- public void assertExecuteWhenRunOnceSuccessForSingleShardingItems() {
- assertExecuteWhenRunOnceSuccess(ShardingContextsBuilder.getSingleShardingContexts());
- }
-
- @Test
- public void assertExecuteWhenRunOnceSuccessForMultipleShardingItems() {
- assertExecuteWhenRunOnceSuccess(ShardingContextsBuilder.getMultipleShardingContexts());
- }
-
- private void assertExecuteWhenRunOnceSuccess(final ShardingContexts shardingContexts) {
- ElasticJobVerify.prepareForIsNotMisfire(jobFacade, shardingContexts);
- simpleJobExecutor.execute();
- verify(jobFacade).postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, "Job 'test_job' execute begin.");
- verify(jobFacade).postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, "");
- ElasticJobVerify.verifyForIsNotMisfire(jobFacade, shardingContexts);
- verify(jobCaller, times(shardingContexts.getShardingTotalCount())).execute();
- }
-
- @Test
- public void assertExecuteWhenRunOnceWithMisfireIsEmpty() {
- ShardingContexts shardingContexts = ShardingContextsBuilder.getMultipleShardingContexts();
- when(jobFacade.getShardingContexts()).thenReturn(shardingContexts);
- when(jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())).thenReturn(false);
- simpleJobExecutor.execute();
- ElasticJobVerify.verifyForIsNotMisfire(jobFacade, shardingContexts);
- verify(jobCaller, times(2)).execute();
- }
-
- @Test
- public void assertExecuteWhenRunOnceWithMisfireIsNotEmptyButIsNotEligibleForJobRunning() {
- ShardingContexts shardingContexts = ShardingContextsBuilder.getMultipleShardingContexts();
- when(jobFacade.getShardingContexts()).thenReturn(shardingContexts);
- when(jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())).thenReturn(false);
- simpleJobExecutor.execute();
- ElasticJobVerify.verifyForIsNotMisfire(jobFacade, shardingContexts);
- verify(jobCaller, times(2)).execute();
- verify(jobFacade, times(0)).clearMisfire(shardingContexts.getShardingItemParameters().keySet());
- }
-
- @Test
- public void assertExecuteWhenRunOnceWithMisfire() throws JobExecutionEnvironmentException {
- ShardingContexts shardingContexts = ShardingContextsBuilder.getMultipleShardingContexts();
- when(jobFacade.getShardingContexts()).thenReturn(shardingContexts);
- when(jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())).thenReturn(false);
- when(jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())).thenReturn(true, false);
- simpleJobExecutor.execute();
- verify(jobFacade).postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, "Job 'test_job' execute begin.");
- verify(jobFacade, times(2)).postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_RUNNING, "");
- verify(jobFacade).checkJobExecutionEnvironment();
- verify(jobFacade).getShardingContexts();
- verify(jobFacade).misfireIfRunning(shardingContexts.getShardingItemParameters().keySet());
- verify(jobFacade, times(2)).registerJobBegin(shardingContexts);
- verify(jobCaller, times(4)).execute();
- verify(jobFacade, times(2)).registerJobCompleted(shardingContexts);
- }
-
- @Test(expected = JobSystemException.class)
- public void assertBeforeJobExecutedFailure() {
- ShardingContexts shardingContexts = ShardingContextsBuilder.getMultipleShardingContexts();
- when(jobFacade.getShardingContexts()).thenReturn(shardingContexts);
- when(jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())).thenReturn(false);
- doThrow(RuntimeException.class).when(jobFacade).beforeJobExecuted(shardingContexts);
- try {
- simpleJobExecutor.execute();
- } finally {
- verify(jobCaller, times(0)).execute();
- }
- }
-
- @Test(expected = JobSystemException.class)
- public void assertAfterJobExecutedFailure() {
- ShardingContexts shardingContexts = ShardingContextsBuilder.getMultipleShardingContexts();
- when(jobFacade.getShardingContexts()).thenReturn(shardingContexts);
- when(jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())).thenReturn(false);
- when(jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())).thenReturn(false);
- doThrow(RuntimeException.class).when(jobFacade).afterJobExecuted(shardingContexts);
- try {
- simpleJobExecutor.execute();
- } finally {
- verify(jobCaller, times(2)).execute();
- }
- }
-}
diff --git a/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/type/WrongJobExecutorTest.java b/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/type/WrongJobExecutorTest.java
deleted file mode 100755
index 95ac786..0000000
--- a/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/type/WrongJobExecutorTest.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.elasticjob.cloud.executor.type;
-
-import org.apache.shardingsphere.elasticjob.api.listener.ShardingContexts;
-import org.apache.shardingsphere.elasticjob.cloud.executor.JobFacade;
-import org.apache.shardingsphere.elasticjob.cloud.fixture.config.TestSimpleJobConfiguration;
-import org.apache.shardingsphere.elasticjob.cloud.fixture.job.TestWrongJob;
-import org.apache.shardingsphere.elasticjob.tracing.event.JobStatusTraceEvent.State;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-@RunWith(MockitoJUnitRunner.class)
-public final class WrongJobExecutorTest {
-
- @Mock
- private JobFacade jobFacade;
-
- private SimpleJobExecutor wrongSimpleJobExecutor;
-
- @Before
- public void setUp() {
- when(jobFacade.loadJobRootConfiguration(true)).thenReturn(new TestSimpleJobConfiguration().getTypeConfig());
- wrongSimpleJobExecutor = new SimpleJobExecutor(new TestWrongJob(), jobFacade);
- }
-
- @Test(expected = RuntimeException.class)
- public void assertWrongJobExecutorWithSingleItem() {
- Map<Integer, String> map = new HashMap<>(1, 1);
- map.put(0, "A");
- ShardingContexts shardingContexts = new ShardingContexts("fake_task_id", "test_job", 10, "", map);
- when(jobFacade.getShardingContexts()).thenReturn(shardingContexts);
- wrongSimpleJobExecutor.execute();
- }
-
- @Test
- public void assertWrongJobExecutorWithMultipleItems() {
- Map<Integer, String> map = new HashMap<>(1, 1);
- map.put(0, "A");
- map.put(1, "B");
- ShardingContexts shardingContexts = new ShardingContexts("fake_task_id", "test_job", 10, "", map);
- when(jobFacade.getShardingContexts()).thenReturn(shardingContexts);
- wrongSimpleJobExecutor.execute();
- verify(jobFacade).getShardingContexts();
- verify(jobFacade).postJobStatusTraceEvent("fake_task_id", State.TASK_RUNNING, "");
- }
-}
diff --git a/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/fixture/APIJsonConstants.java b/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/fixture/APIJsonConstants.java
deleted file mode 100755
index dd7ab10..0000000
--- a/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/fixture/APIJsonConstants.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.elasticjob.cloud.fixture;
-
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.elasticjob.cloud.executor.handler.impl.DefaultExecutorServiceHandler;
-
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class APIJsonConstants {
-
- private static final String JOB_PROPS_JSON = "{\"job_exception_handler\":\"%s\",\"executor_service_handler\":\"" + DefaultExecutorServiceHandler.class.getCanonicalName() + "\"}";
-
- // CHECKSTYLE:OFF
- private static final String SIMPLE_JOB_JSON = "{\"jobName\":\"test_job\",\"jobClass\":\"org.apache.shardingsphere.elasticjob.cloud.fixture.job.TestSimpleJob\",\"jobType\":\"SIMPLE\","
- + "\"cron\":\"0/1 * * * * ?\",\"shardingTotalCount\":3,\"shardingItemParameters\":\"0\\u003dA,1\\u003dB,2\\u003dC\",\"jobParameter\":\"param\",\"failover\":true,\"misfire\":false,"
- + "\"description\":\"desc\",\"jobProperties\":%s}";
- // CHECKSTYLE:ON
-
- private static final String DATAFLOW_JOB_JSON = "{\"jobName\":\"test_job\",\"jobClass\":\"org.apache.shardingsphere.elasticjob.cloud.fixture.job.TestDataflowJob\",\"jobType\":\"DATAFLOW\","
- + "\"cron\":\"0/1 * * * * ?\",\"shardingTotalCount\":3,\"shardingItemParameters\":\"\",\"jobParameter\":\"\",\"failover\":false,\"misfire\":true,\"description\":\"\","
- + "\"jobProperties\":%s,\"streamingProcess\":true}";
-
- private static final String SCRIPT_JOB_JSON = "{\"jobName\":\"test_job\",\"jobClass\":\"org.apache.shardingsphere.elasticjob.cloud.api.script.ScriptJob\",\"jobType\":\"SCRIPT\","
- + "\"cron\":\"0/1 * * * * ?\",\"shardingTotalCount\":3,\"shardingItemParameters\":\"\",\"jobParameter\":\"\",\"failover\":false,\"misfire\":true,\"description\":\"\","
- + "\"jobProperties\":%s,\"scriptCommandLine\":\"test.sh\"}";
-
- /**
- * Get job properties in json format.
- * @param jobExceptionHandler the job exception handler
- * @return the json string
- */
- public static String getJobPropertiesJson(final String jobExceptionHandler) {
- return String.format(JOB_PROPS_JSON, jobExceptionHandler);
- }
-
- /**
- * Get simple job in json format.
- * @param jobExceptionHandler the job exception handler
- * @return the json string
- */
- public static String getSimpleJobJson(final String jobExceptionHandler) {
- return String.format(SIMPLE_JOB_JSON, getJobPropertiesJson(jobExceptionHandler));
- }
-
- /**
- * Get dataflow job in json format.
- * @param jobExceptionHandler the job exception handler
- * @return the json string
- */
- public static String getDataflowJobJson(final String jobExceptionHandler) {
- return String.format(DATAFLOW_JOB_JSON, getJobPropertiesJson(jobExceptionHandler));
- }
-
- /**
- * Get script job in json format.
- * @param jobExceptionHandler the job exception handler
- * @return the json string
- */
- public static String getScriptJobJson(final String jobExceptionHandler) {
- return String.format(SCRIPT_JOB_JSON, getJobPropertiesJson(jobExceptionHandler));
- }
-}
diff --git a/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/fixture/ShardingContextsBuilder.java b/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/fixture/ShardingContextsBuilder.java
deleted file mode 100755
index 8035f89..0000000
--- a/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/fixture/ShardingContextsBuilder.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.elasticjob.cloud.fixture;
-
-import lombok.AccessLevel;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.elasticjob.api.listener.ShardingContexts;
-
-import java.util.HashMap;
-import java.util.Map;
-
-@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
-public final class ShardingContextsBuilder {
-
- public static final String JOB_NAME = "test_job";
-
- /**
- * Get single sharding contexts.
- * @return ShardingContexts
- */
- public static ShardingContexts getSingleShardingContexts() {
- Map<Integer, String> map = new HashMap<>(1, 1);
- map.put(0, "A");
- return new ShardingContexts("fake_task_id", JOB_NAME, 1, "", map);
- }
-
- /**
- * Get multi sharding contexts.
- * @return ShardingContexts
- */
- public static ShardingContexts getMultipleShardingContexts() {
- Map<Integer, String> map = new HashMap<>(2, 1);
- map.put(0, "A");
- map.put(1, "B");
- return new ShardingContexts("fake_task_id", JOB_NAME, 2, "", map);
- }
-}
diff --git a/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/fixture/config/TestDataflowJobConfiguration.java b/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/fixture/config/TestDataflowJobConfiguration.java
deleted file mode 100755
index a0055a4..0000000
--- a/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/fixture/config/TestDataflowJobConfiguration.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.elasticjob.cloud.fixture.config;
-
-import org.apache.shardingsphere.elasticjob.cloud.config.JobRootConfiguration;
-import org.apache.shardingsphere.elasticjob.cloud.config.JobTypeConfiguration;
-import org.apache.shardingsphere.elasticjob.cloud.fixture.job.TestDataflowJob;
-import org.apache.shardingsphere.elasticjob.cloud.config.JobCoreConfiguration;
-import org.apache.shardingsphere.elasticjob.cloud.config.dataflow.DataflowJobConfiguration;
-import org.apache.shardingsphere.elasticjob.cloud.executor.handler.JobProperties;
-import org.apache.shardingsphere.elasticjob.cloud.fixture.ShardingContextsBuilder;
-import org.apache.shardingsphere.elasticjob.cloud.fixture.handler.IgnoreJobExceptionHandler;
-import lombok.RequiredArgsConstructor;
-
-@RequiredArgsConstructor
-public final class TestDataflowJobConfiguration implements JobRootConfiguration {
-
- private final boolean streamingProcess;
-
- @Override
- public JobTypeConfiguration getTypeConfig() {
- return new DataflowJobConfiguration(JobCoreConfiguration.newBuilder(ShardingContextsBuilder.JOB_NAME, "0/1 * * * * ?", 3)
- .jobProperties(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER.getKey(), IgnoreJobExceptionHandler.class.getCanonicalName()).build(),
- TestDataflowJob.class.getCanonicalName(), streamingProcess);
- }
-}
diff --git a/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/fixture/config/TestJobRootConfiguration.java b/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/fixture/config/TestJobRootConfiguration.java
deleted file mode 100755
index 4321de1..0000000
--- a/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/fixture/config/TestJobRootConfiguration.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.elasticjob.cloud.fixture.config;
-
-import org.apache.shardingsphere.elasticjob.cloud.config.JobTypeConfiguration;
-import org.apache.shardingsphere.elasticjob.cloud.config.JobRootConfiguration;
-import lombok.RequiredArgsConstructor;
-
-@RequiredArgsConstructor
-public final class TestJobRootConfiguration implements JobRootConfiguration {
-
- private final JobTypeConfiguration typeConfig;
-
- @Override
- public JobTypeConfiguration getTypeConfig() {
- return typeConfig;
- }
-}
diff --git a/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/fixture/config/TestScriptJobConfiguration.java b/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/fixture/config/TestScriptJobConfiguration.java
deleted file mode 100755
index c2772de..0000000
--- a/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/fixture/config/TestScriptJobConfiguration.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.elasticjob.cloud.fixture.config;
-
-import org.apache.shardingsphere.elasticjob.cloud.config.JobTypeConfiguration;
-import org.apache.shardingsphere.elasticjob.cloud.config.script.ScriptJobConfiguration;
-import org.apache.shardingsphere.elasticjob.cloud.executor.handler.JobExceptionHandler;
-import org.apache.shardingsphere.elasticjob.cloud.config.JobCoreConfiguration;
-import org.apache.shardingsphere.elasticjob.cloud.config.JobRootConfiguration;
-import org.apache.shardingsphere.elasticjob.cloud.executor.handler.JobProperties;
-import org.apache.shardingsphere.elasticjob.cloud.fixture.ShardingContextsBuilder;
-import lombok.RequiredArgsConstructor;
-
-@RequiredArgsConstructor
-public final class TestScriptJobConfiguration implements JobRootConfiguration {
-
- private final String scriptCommandLine;
-
- private final Class<? extends JobExceptionHandler> jobExceptionHandlerClass;
-
- @Override
- public JobTypeConfiguration getTypeConfig() {
- return new ScriptJobConfiguration(JobCoreConfiguration.newBuilder(ShardingContextsBuilder.JOB_NAME, "0/1 * * * * ?", 3)
- .jobProperties(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER.getKey(), jobExceptionHandlerClass.getCanonicalName()).build(), scriptCommandLine);
- }
-}
diff --git a/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/fixture/config/TestSimpleJobConfiguration.java b/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/fixture/config/TestSimpleJobConfiguration.java
deleted file mode 100755
index c0a20fb..0000000
--- a/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/fixture/config/TestSimpleJobConfiguration.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.elasticjob.cloud.fixture.config;
-
-import org.apache.shardingsphere.elasticjob.cloud.config.JobCoreConfiguration;
-import org.apache.shardingsphere.elasticjob.cloud.config.JobRootConfiguration;
-import org.apache.shardingsphere.elasticjob.cloud.config.JobTypeConfiguration;
-import org.apache.shardingsphere.elasticjob.cloud.config.simple.SimpleJobConfiguration;
-import org.apache.shardingsphere.elasticjob.cloud.executor.handler.JobProperties;
-import org.apache.shardingsphere.elasticjob.cloud.fixture.ShardingContextsBuilder;
-import org.apache.shardingsphere.elasticjob.cloud.fixture.handler.ThrowJobExceptionHandler;
-import org.apache.shardingsphere.elasticjob.cloud.fixture.job.TestSimpleJob;
-import lombok.NoArgsConstructor;
-
-@NoArgsConstructor
-public final class TestSimpleJobConfiguration implements JobRootConfiguration {
-
- private String jobExceptionHandlerClassName;
-
- private String executorServiceHandlerClassName;
-
- public TestSimpleJobConfiguration(final String jobExceptionHandlerClassName, final String executorServiceHandlerClassName) {
- this.jobExceptionHandlerClassName = jobExceptionHandlerClassName;
- this.executorServiceHandlerClassName = executorServiceHandlerClassName;
- }
-
- @Override
- public JobTypeConfiguration getTypeConfig() {
- JobCoreConfiguration.Builder builder = JobCoreConfiguration.newBuilder(ShardingContextsBuilder.JOB_NAME, "0/1 * * * * ?", 3)
- .shardingItemParameters("0=A,1=B,2=C").jobParameter("param").failover(true).misfire(false).description("desc");
- if (null == jobExceptionHandlerClassName) {
- builder.jobProperties(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER.getKey(), ThrowJobExceptionHandler.class.getCanonicalName());
- } else {
- builder.jobProperties(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER.getKey(), jobExceptionHandlerClassName);
- }
- if (null != executorServiceHandlerClassName) {
- builder.jobProperties(JobProperties.JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER.getKey(), executorServiceHandlerClassName);
- }
- return new SimpleJobConfiguration(builder.build(), TestSimpleJob.class.getCanonicalName());
- }
-}
diff --git a/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/fixture/handler/IgnoreJobExceptionHandler.java b/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/fixture/handler/IgnoreJobExceptionHandler.java
deleted file mode 100755
index 6e93dfc..0000000
--- a/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/fixture/handler/IgnoreJobExceptionHandler.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.elasticjob.cloud.fixture.handler;
-
-import org.apache.shardingsphere.elasticjob.cloud.executor.handler.JobExceptionHandler;
-
-public final class IgnoreJobExceptionHandler implements JobExceptionHandler {
-
- @Override
- public void handleException(final String jobName, final Throwable cause) {
- }
-}
diff --git a/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/fixture/handler/ThrowJobExceptionHandler.java b/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/fixture/handler/ThrowJobExceptionHandler.java
deleted file mode 100755
index 0392ce2..0000000
--- a/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/fixture/handler/ThrowJobExceptionHandler.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.elasticjob.cloud.fixture.handler;
-
-import org.apache.shardingsphere.elasticjob.cloud.executor.handler.JobExceptionHandler;
-import org.apache.shardingsphere.elasticjob.infra.exception.JobSystemException;
-
-public final class ThrowJobExceptionHandler implements JobExceptionHandler {
-
- @Override
- public void handleException(final String jobName, final Throwable cause) {
- throw new JobSystemException(cause);
- }
-}
diff --git a/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/fixture/job/JobCaller.java b/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/fixture/job/JobCaller.java
deleted file mode 100755
index c8b0aae..0000000
--- a/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/fixture/job/JobCaller.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.elasticjob.cloud.fixture.job;
-
-import java.util.List;
-
-public interface JobCaller {
-
- /**
- * Execute the job.
- */
- void execute();
-
- /**
- * Fetch job data.
- * @param shardingItem shard item
- * @return the job data
- */
- List<Object> fetchData(int shardingItem);
-
- /**
- * Process job data.
- * @param data the job to process
- */
- void processData(Object data);
-}
diff --git a/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/fixture/job/OtherJob.java b/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/fixture/job/OtherJob.java
deleted file mode 100755
index 724a744..0000000
--- a/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/fixture/job/OtherJob.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.elasticjob.cloud.fixture.job;
-
-import org.apache.shardingsphere.elasticjob.cloud.api.ElasticJob;
-
-public final class OtherJob implements ElasticJob {
-}
diff --git a/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/fixture/job/TestDataflowJob.java b/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/fixture/job/TestDataflowJob.java
deleted file mode 100755
index fcf5c46..0000000
--- a/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/fixture/job/TestDataflowJob.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.elasticjob.cloud.fixture.job;
-
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.elasticjob.api.ShardingContext;
-import org.apache.shardingsphere.elasticjob.cloud.api.dataflow.DataflowJob;
-
-import java.util.List;
-
-@RequiredArgsConstructor
-public final class TestDataflowJob implements DataflowJob<Object> {
-
- private final JobCaller jobCaller;
-
- @Override
- public List<Object> fetchData(final ShardingContext shardingContext) {
- return jobCaller.fetchData(shardingContext.getShardingItem());
- }
-
- @Override
- public void processData(final ShardingContext shardingContext, final List<Object> data) {
- for (Object each : data) {
- jobCaller.processData(each);
- }
- }
-}
diff --git a/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/fixture/job/TestSimpleJob.java b/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/fixture/job/TestSimpleJob.java
deleted file mode 100755
index 33de3f5..0000000
--- a/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/fixture/job/TestSimpleJob.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.elasticjob.cloud.fixture.job;
-
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.elasticjob.api.ShardingContext;
-import org.apache.shardingsphere.elasticjob.cloud.api.simple.SimpleJob;
-
-@RequiredArgsConstructor
-public final class TestSimpleJob implements SimpleJob {
-
- private final JobCaller jobCaller;
-
- @Override
- public void execute(final ShardingContext shardingContext) {
- jobCaller.execute();
- }
-}
diff --git a/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/fixture/job/TestWrongJob.java b/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/fixture/job/TestWrongJob.java
deleted file mode 100755
index f5ff5e0..0000000
--- a/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/fixture/job/TestWrongJob.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.elasticjob.cloud.fixture.job;
-
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.elasticjob.api.ShardingContext;
-import org.apache.shardingsphere.elasticjob.cloud.api.simple.SimpleJob;
-
-@RequiredArgsConstructor
-public final class TestWrongJob implements SimpleJob {
-
- @Override
- public void execute(final ShardingContext shardingContext) {
- throw new RuntimeException("WrongJobException");
- }
-}
diff --git a/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/util/json/JobConfigurationGsonTypeAdapterTest.java b/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/util/json/JobConfigurationGsonTypeAdapterTest.java
deleted file mode 100755
index 620d70e..0000000
--- a/elasticjob-cloud/elasticjob-cloud-common/src/test/java/org/apache/shardingsphere/elasticjob/cloud/util/json/JobConfigurationGsonTypeAdapterTest.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.elasticjob.cloud.util.json;
-
-import com.google.gson.stream.JsonReader;
-import com.google.gson.stream.JsonWriter;
-import org.apache.shardingsphere.elasticjob.cloud.config.JobTypeConfiguration;
-import org.apache.shardingsphere.elasticjob.cloud.executor.handler.impl.DefaultExecutorServiceHandler;
-import org.apache.shardingsphere.elasticjob.cloud.fixture.APIJsonConstants;
-import org.apache.shardingsphere.elasticjob.cloud.fixture.config.TestDataflowJobConfiguration;
-import org.apache.shardingsphere.elasticjob.cloud.fixture.config.TestJobRootConfiguration;
-import org.apache.shardingsphere.elasticjob.cloud.fixture.config.TestScriptJobConfiguration;
-import org.apache.shardingsphere.elasticjob.cloud.fixture.config.TestSimpleJobConfiguration;
-import org.apache.shardingsphere.elasticjob.cloud.fixture.handler.IgnoreJobExceptionHandler;
-import org.apache.shardingsphere.elasticjob.cloud.fixture.handler.ThrowJobExceptionHandler;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.util.Map;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-
-public final class JobConfigurationGsonTypeAdapterTest {
-
- @BeforeClass
- public static void setUp() {
- GsonFactory.registerTypeAdapter(TestJobRootConfiguration.class, new JobConfigurationGsonTypeAdapter());
- }
-
- @Test
- public void assertToSimpleJobJson() {
- assertThat(GsonFactory.getGson().toJson(new TestJobRootConfiguration(
- new TestSimpleJobConfiguration(ThrowJobExceptionHandler.class.getCanonicalName(), DefaultExecutorServiceHandler.class.getCanonicalName()).getTypeConfig())),
- is(APIJsonConstants.getSimpleJobJson(ThrowJobExceptionHandler.class.getCanonicalName())));
- }
-
- @Test
- public void assertToDataflowJobJson() {
- assertThat(GsonFactory.getGson().toJson(new TestJobRootConfiguration(new TestDataflowJobConfiguration(true).getTypeConfig())),
- is(APIJsonConstants.getDataflowJobJson(IgnoreJobExceptionHandler.class.getCanonicalName())));
- }
-
- @Test
- public void assertToScriptJobJson() {
- assertThat(GsonFactory.getGson().toJson(new TestJobRootConfiguration(new TestScriptJobConfiguration("test.sh", ThrowJobExceptionHandler.class).getTypeConfig())),
- is(APIJsonConstants.getScriptJobJson(ThrowJobExceptionHandler.class.getCanonicalName())));
- }
-
- @Test
- public void assertFromSimpleJobJson() {
- TestJobRootConfiguration actual = GsonFactory.getGson().fromJson(
- APIJsonConstants.getSimpleJobJson(ThrowJobExceptionHandler.class.getCanonicalName()), TestJobRootConfiguration.class);
- TestJobRootConfiguration expected = new TestJobRootConfiguration(
- new TestSimpleJobConfiguration(ThrowJobExceptionHandler.class.getCanonicalName(), DefaultExecutorServiceHandler.class.getCanonicalName()).getTypeConfig());
- assertThat(GsonFactory.getGson().toJson(actual), is(GsonFactory.getGson().toJson(expected)));
- }
-
- @Test
- public void assertFromDataflowJobJson() {
- TestJobRootConfiguration actual = GsonFactory.getGson().fromJson(
- APIJsonConstants.getDataflowJobJson(IgnoreJobExceptionHandler.class.getCanonicalName()), TestJobRootConfiguration.class);
- TestJobRootConfiguration expected = new TestJobRootConfiguration(new TestDataflowJobConfiguration(true).getTypeConfig());
- assertThat(GsonFactory.getGson().toJson(actual), is(GsonFactory.getGson().toJson(expected)));
- }
-
- @Test
- public void assertFromScriptJobJson() {
- TestJobRootConfiguration actual = GsonFactory.getGson().fromJson(
- APIJsonConstants.getScriptJobJson(ThrowJobExceptionHandler.class.getCanonicalName()), TestJobRootConfiguration.class);
- TestJobRootConfiguration expected = new TestJobRootConfiguration(new TestScriptJobConfiguration("test.sh", ThrowJobExceptionHandler.class).getTypeConfig());
- assertThat(GsonFactory.getGson().toJson(actual), is(GsonFactory.getGson().toJson(expected)));
- }
-
- private static class JobConfigurationGsonTypeAdapter extends AbstractJobConfigurationGsonTypeAdapter<TestJobRootConfiguration> {
-
- @Override
- protected void addToCustomizedValueMap(final String jsonName, final JsonReader in, final Map<String, Object> customizedValueMap) {
- }
-
- @Override
- protected TestJobRootConfiguration getJobRootConfiguration(final JobTypeConfiguration typeConfig, final Map<String, Object> customizedValueMap) {
- return new TestJobRootConfiguration(typeConfig);
- }
-
- @Override
- protected void writeCustomized(final JsonWriter out, final TestJobRootConfiguration value) {
- }
- }
-}
diff --git a/elasticjob-cloud/elasticjob-cloud-executor/pom.xml b/elasticjob-cloud/elasticjob-cloud-executor/pom.xml
index 610c775..c620b1d 100755
--- a/elasticjob-cloud/elasticjob-cloud-executor/pom.xml
+++ b/elasticjob-cloud/elasticjob-cloud-executor/pom.xml
@@ -29,6 +29,21 @@
<dependencies>
<dependency>
<groupId>org.apache.shardingsphere.elasticjob</groupId>
+ <artifactId>elasticjob-simple-executor</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.shardingsphere.elasticjob</groupId>
+ <artifactId>elasticjob-dataflow-executor</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.shardingsphere.elasticjob</groupId>
+ <artifactId>elasticjob-script-executor</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.shardingsphere.elasticjob</groupId>
<artifactId>elasticjob-cloud-common</artifactId>
<version>${project.parent.version}</version>
</dependency>
diff --git a/elasticjob-cloud/elasticjob-cloud-executor/src/main/java/org/apache/shardingsphere/elasticjob/cloud/executor/CloudJobFacade.java b/elasticjob-cloud/elasticjob-cloud-executor/src/main/java/org/apache/shardingsphere/elasticjob/cloud/executor/CloudJobFacade.java
index 939b83f..dab3558 100755
--- a/elasticjob-cloud/elasticjob-cloud-executor/src/main/java/org/apache/shardingsphere/elasticjob/cloud/executor/CloudJobFacade.java
+++ b/elasticjob-cloud/elasticjob-cloud-executor/src/main/java/org/apache/shardingsphere/elasticjob/cloud/executor/CloudJobFacade.java
@@ -20,8 +20,10 @@ package org.apache.shardingsphere.elasticjob.cloud.executor;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.api.listener.ShardingContexts;
+import org.apache.shardingsphere.elasticjob.cloud.api.JobType;
import org.apache.shardingsphere.elasticjob.cloud.config.JobTypeConfiguration;
-import org.apache.shardingsphere.elasticjob.cloud.executor.handler.JobProperties.JobPropertiesEnum;
+import org.apache.shardingsphere.elasticjob.cloud.config.dataflow.DataflowJobConfiguration;
+import org.apache.shardingsphere.elasticjob.cloud.config.script.ScriptJobConfiguration;
import org.apache.shardingsphere.elasticjob.infra.context.TaskContext;
import org.apache.shardingsphere.elasticjob.tracing.JobEventBus;
import org.apache.shardingsphere.elasticjob.tracing.event.JobExecutionEvent;
@@ -35,7 +37,7 @@ import java.util.Collection;
* Cloud job facade.
*/
@RequiredArgsConstructor
-public final class CloudJobFacade implements JobFacade {
+public final class CloudJobFacade implements JobFacade, org.apache.shardingsphere.elasticjob.executor.JobFacade {
private final ShardingContexts shardingContexts;
@@ -50,12 +52,17 @@ public final class CloudJobFacade implements JobFacade {
@Override
public JobConfiguration loadJobConfiguration(final boolean fromCache) {
- return JobConfiguration.newBuilder(jobConfig.getCoreConfig().getJobName(), jobConfig.getCoreConfig().getShardingTotalCount())
+ JobConfiguration result = JobConfiguration.newBuilder(jobConfig.getCoreConfig().getJobName(), jobConfig.getCoreConfig().getShardingTotalCount())
.cron(jobConfig.getCoreConfig().getCron()).shardingItemParameters(jobConfig.getCoreConfig().getShardingItemParameters()).jobParameter(jobConfig.getCoreConfig().getJobParameter())
.failover(jobConfig.getCoreConfig().isFailover()).misfire(jobConfig.getCoreConfig().isMisfire()).description(jobConfig.getCoreConfig().getDescription())
- .jobExecutorServiceHandlerType(jobConfig.getCoreConfig().getJobProperties().get(JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER))
- .jobErrorHandlerType(jobConfig.getCoreConfig().getJobProperties().get(JobPropertiesEnum.JOB_EXCEPTION_HANDLER)).build();
-
+ .jobExecutorServiceHandlerType(jobConfig.getCoreConfig().getJobExecutorServiceHandlerType())
+ .jobErrorHandlerType(jobConfig.getCoreConfig().getJobErrorHandlerType()).build();
+ if (JobType.DATAFLOW == jobConfig.getJobType()) {
+ result.getProps().setProperty("streaming.process", Boolean.toString(((DataflowJobConfiguration) jobConfig).isStreamingProcess()));
+ } else if (JobType.SCRIPT == jobConfig.getJobType()) {
+ result.getProps().setProperty("script.command.line", ((ScriptJobConfiguration) jobConfig).getScriptCommandLine());
+ }
+ return result;
}
@Override
diff --git a/elasticjob-cloud/elasticjob-cloud-executor/src/main/java/org/apache/shardingsphere/elasticjob/cloud/executor/DaemonTaskScheduler.java b/elasticjob-cloud/elasticjob-cloud-executor/src/main/java/org/apache/shardingsphere/elasticjob/cloud/executor/DaemonTaskScheduler.java
index d6ed6d7..90d00f3 100755
--- a/elasticjob-cloud/elasticjob-cloud-executor/src/main/java/org/apache/shardingsphere/elasticjob/cloud/executor/DaemonTaskScheduler.java
+++ b/elasticjob-cloud/elasticjob-cloud-executor/src/main/java/org/apache/shardingsphere/elasticjob/cloud/executor/DaemonTaskScheduler.java
@@ -21,9 +21,10 @@ import lombok.RequiredArgsConstructor;
import lombok.Setter;
import org.apache.mesos.ExecutorDriver;
import org.apache.mesos.Protos;
+import org.apache.shardingsphere.elasticjob.api.ElasticJob;
import org.apache.shardingsphere.elasticjob.api.listener.ShardingContexts;
-import org.apache.shardingsphere.elasticjob.cloud.api.ElasticJob;
import org.apache.shardingsphere.elasticjob.cloud.config.JobCoreConfiguration;
+import org.apache.shardingsphere.elasticjob.executor.ElasticJobExecutor;
import org.apache.shardingsphere.elasticjob.infra.exception.JobSystemException;
import org.quartz.CronScheduleBuilder;
import org.quartz.CronTrigger;
@@ -46,7 +47,9 @@ import java.util.concurrent.ConcurrentHashMap;
@RequiredArgsConstructor
public final class DaemonTaskScheduler {
- public static final String ELASTIC_JOB_DATA_MAP_KEY = "elasticJob";
+ private static final String ELASTIC_JOB_DATA_MAP_KEY = "elasticJob";
+
+ private static final String ELASTIC_JOB_TYPE_DATA_MAP_KEY = "elasticJobType";
private static final String JOB_FACADE_DATA_MAP_KEY = "jobFacade";
@@ -58,6 +61,8 @@ public final class DaemonTaskScheduler {
private final ElasticJob elasticJob;
+ private final String elasticJobType;
+
private final JobCoreConfiguration jobConfig;
private final JobFacade jobFacade;
@@ -72,6 +77,7 @@ public final class DaemonTaskScheduler {
public void init() {
JobDetail jobDetail = JobBuilder.newJob(DaemonJob.class).withIdentity(jobConfig.getJobName()).build();
jobDetail.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJob);
+ jobDetail.getJobDataMap().put(ELASTIC_JOB_TYPE_DATA_MAP_KEY, elasticJobType);
jobDetail.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade);
jobDetail.getJobDataMap().put(EXECUTOR_DRIVER_DATA_MAP_KEY, executorDriver);
jobDetail.getJobDataMap().put(TASK_ID_DATA_MAP_KEY, taskId);
@@ -140,9 +146,12 @@ public final class DaemonTaskScheduler {
@Setter
private ElasticJob elasticJob;
+
+ @Setter
+ private String elasticJobType;
@Setter
- private JobFacade jobFacade;
+ private CloudJobFacade jobFacade;
@Setter
private ExecutorDriver executorDriver;
@@ -158,11 +167,19 @@ public final class DaemonTaskScheduler {
if (jobEventSamplingCount > 0 && ++currentJobEventSamplingCount < jobEventSamplingCount) {
shardingContexts.setCurrentJobEventSamplingCount(currentJobEventSamplingCount);
jobFacade.getShardingContexts().setAllowSendJobEvent(false);
- JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
+ if (null == elasticJob) {
+ new ElasticJobExecutor(elasticJobType, jobFacade.loadJobConfiguration(true), jobFacade).execute();
+ } else {
+ new ElasticJobExecutor(elasticJob, jobFacade.loadJobConfiguration(true), jobFacade).execute();
+ }
} else {
jobFacade.getShardingContexts().setAllowSendJobEvent(true);
executorDriver.sendStatusUpdate(Protos.TaskStatus.newBuilder().setTaskId(taskId).setState(Protos.TaskState.TASK_RUNNING).setMessage("BEGIN").build());
- JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
+ if (null == elasticJob) {
+ new ElasticJobExecutor(elasticJobType, jobFacade.loadJobConfiguration(true), jobFacade).execute();
+ } else {
+ new ElasticJobExecutor(elasticJob, jobFacade.loadJobConfiguration(true), jobFacade).execute();
+ }
executorDriver.sendStatusUpdate(Protos.TaskStatus.newBuilder().setTaskId(taskId).setState(Protos.TaskState.TASK_RUNNING).setMessage("COMPLETE").build());
shardingContexts.setCurrentJobEventSamplingCount(0);
}
diff --git a/elasticjob-cloud/elasticjob-cloud-executor/src/main/java/org/apache/shardingsphere/elasticjob/cloud/executor/JobTypeConfigurationUtil.java b/elasticjob-cloud/elasticjob-cloud-executor/src/main/java/org/apache/shardingsphere/elasticjob/cloud/executor/JobTypeConfigurationUtil.java
index e9872f0..f7650dd 100644
--- a/elasticjob-cloud/elasticjob-cloud-executor/src/main/java/org/apache/shardingsphere/elasticjob/cloud/executor/JobTypeConfigurationUtil.java
+++ b/elasticjob-cloud/elasticjob-cloud-executor/src/main/java/org/apache/shardingsphere/elasticjob/cloud/executor/JobTypeConfigurationUtil.java
@@ -25,7 +25,6 @@ import org.apache.shardingsphere.elasticjob.cloud.config.JobTypeConfiguration;
import org.apache.shardingsphere.elasticjob.cloud.config.dataflow.DataflowJobConfiguration;
import org.apache.shardingsphere.elasticjob.cloud.config.script.ScriptJobConfiguration;
import org.apache.shardingsphere.elasticjob.cloud.config.simple.SimpleJobConfiguration;
-import org.apache.shardingsphere.elasticjob.cloud.executor.handler.JobProperties.JobPropertiesEnum;
import java.util.Map;
@@ -49,9 +48,8 @@ public final class JobTypeConfigurationUtil {
Preconditions.checkArgument(!Strings.isNullOrEmpty(jobName), "jobName can not be empty.");
Preconditions.checkArgument(!Strings.isNullOrEmpty(jobType), "jobType can not be empty.");
Preconditions.checkArgument(!Strings.isNullOrEmpty(jobClass), "jobClass can not be empty.");
- JobCoreConfiguration jobCoreConfig = JobCoreConfiguration.newBuilder(jobName, cron, ignoredShardingTotalCount).build();
- jobCoreConfig.getJobProperties().put(JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER.name(), jobConfigurationMap.get("executorServiceHandler"));
- jobCoreConfig.getJobProperties().put(JobPropertiesEnum.JOB_EXCEPTION_HANDLER.name(), jobConfigurationMap.get("jobExceptionHandler"));
+ JobCoreConfiguration jobCoreConfig = JobCoreConfiguration.newBuilder(jobName, cron, ignoredShardingTotalCount)
+ .jobExecutorServiceHandlerType(jobConfigurationMap.get("executorServiceHandler")).jobErrorHandlerType(jobConfigurationMap.get("jobExceptionHandler")).build();
if (JobType.DATAFLOW.name().equals(jobType)) {
return new DataflowJobConfiguration(jobCoreConfig, jobClass, Boolean.valueOf(jobConfigurationMap.get("streamingProcess")));
} else if (JobType.SCRIPT.name().equals(jobType)) {
diff --git a/elasticjob-cloud/elasticjob-cloud-executor/src/main/java/org/apache/shardingsphere/elasticjob/cloud/executor/TaskExecutor.java b/elasticjob-cloud/elasticjob-cloud-executor/src/main/java/org/apache/shardingsphere/elasticjob/cloud/executor/TaskExecutor.java
index d8b8fef..6323a03 100755
--- a/elasticjob-cloud/elasticjob-cloud-executor/src/main/java/org/apache/shardingsphere/elasticjob/cloud/executor/TaskExecutor.java
+++ b/elasticjob-cloud/elasticjob-cloud-executor/src/main/java/org/apache/shardingsphere/elasticjob/cloud/executor/TaskExecutor.java
@@ -26,9 +26,9 @@ import org.apache.mesos.Executor;
import org.apache.mesos.ExecutorDriver;
import org.apache.mesos.Protos;
import org.apache.mesos.Protos.TaskInfo;
+import org.apache.shardingsphere.elasticjob.api.ElasticJob;
import org.apache.shardingsphere.elasticjob.api.listener.ShardingContexts;
-import org.apache.shardingsphere.elasticjob.cloud.api.ElasticJob;
-import org.apache.shardingsphere.elasticjob.cloud.api.script.ScriptJob;
+import org.apache.shardingsphere.elasticjob.executor.ElasticJobExecutor;
import org.apache.shardingsphere.elasticjob.infra.concurrent.ElasticJobExecutorService;
import org.apache.shardingsphere.elasticjob.infra.exception.ExceptionUtils;
import org.apache.shardingsphere.elasticjob.infra.exception.JobSystemException;
@@ -125,10 +125,15 @@ public final class TaskExecutor implements Executor {
ElasticJob elasticJob = getElasticJobInstance(jobConfig);
final CloudJobFacade jobFacade = new CloudJobFacade(shardingContexts, jobConfig.getTypeConfig(), jobEventBus);
if (jobConfig.isTransient()) {
- JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
+ if (null == elasticJob) {
+ new ElasticJobExecutor(jobConfig.getTypeConfig().getJobClass(), jobFacade.loadJobConfiguration(true), jobFacade).execute();
+ } else {
+ new ElasticJobExecutor(elasticJob, jobFacade.loadJobConfiguration(true), jobFacade).execute();
+ }
+
executorDriver.sendStatusUpdate(Protos.TaskStatus.newBuilder().setTaskId(taskInfo.getTaskId()).setState(Protos.TaskState.TASK_FINISHED).build());
} else {
- new DaemonTaskScheduler(elasticJob, jobConfig.getTypeConfig().getCoreConfig(), jobFacade, executorDriver, taskInfo.getTaskId()).init();
+ new DaemonTaskScheduler(elasticJob, jobConfig.getTypeConfig().getJobClass(), jobConfig.getTypeConfig().getCoreConfig(), jobFacade, executorDriver, taskInfo.getTaskId()).init();
}
// CHECKSTYLE:OFF
} catch (final Throwable ex) {
@@ -143,9 +148,8 @@ public final class TaskExecutor implements Executor {
private ElasticJob getElasticJobInstance(final JobConfigurationContext jobConfig) {
if (!Strings.isNullOrEmpty(jobConfig.getBeanName()) && !Strings.isNullOrEmpty(jobConfig.getApplicationContext())) {
return getElasticJobBean(jobConfig);
- } else {
- return getElasticJobClass(jobConfig);
}
+ return getElasticJobClass(jobConfig);
}
private ElasticJob getElasticJobBean(final JobConfigurationContext jobConfig) {
@@ -163,14 +167,11 @@ public final class TaskExecutor implements Executor {
try {
Class<?> elasticJobClass = Class.forName(jobClass);
if (!ElasticJob.class.isAssignableFrom(elasticJobClass)) {
- throw new JobSystemException("Elastic-Job: Class '%s' must implements ElasticJob interface.", jobClass);
+ throw new JobSystemException("ElasticJob: Class '%s' must implements ElasticJob interface.", jobClass);
}
- if (elasticJobClass != ScriptJob.class) {
- return (ElasticJob) elasticJobClass.newInstance();
- }
- return null;
+ return (ElasticJob) elasticJobClass.newInstance();
} catch (final ReflectiveOperationException ex) {
- throw new JobSystemException("Elastic-Job: Class '%s' initialize failure, the error message is '%s'.", jobClass, ex.getMessage());
+ return null;
}
}
}
diff --git a/elasticjob-cloud/elasticjob-cloud-executor/src/main/java/org/apache/shardingsphere/elasticjob/cloud/executor/local/LocalTaskExecutor.java b/elasticjob-cloud/elasticjob-cloud-executor/src/main/java/org/apache/shardingsphere/elasticjob/cloud/executor/local/LocalTaskExecutor.java
index 2b3068c..19a276d 100755
--- a/elasticjob-cloud/elasticjob-cloud-executor/src/main/java/org/apache/shardingsphere/elasticjob/cloud/executor/local/LocalTaskExecutor.java
+++ b/elasticjob-cloud/elasticjob-cloud-executor/src/main/java/org/apache/shardingsphere/elasticjob/cloud/executor/local/LocalTaskExecutor.java
@@ -20,23 +20,20 @@ package org.apache.shardingsphere.elasticjob.cloud.executor.local;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.elasticjob.api.ElasticJob;
import org.apache.shardingsphere.elasticjob.api.listener.ShardingContexts;
-import org.apache.shardingsphere.elasticjob.cloud.api.ElasticJob;
import org.apache.shardingsphere.elasticjob.cloud.api.JobType;
-import org.apache.shardingsphere.elasticjob.cloud.api.dataflow.DataflowJob;
-import org.apache.shardingsphere.elasticjob.cloud.api.simple.SimpleJob;
import org.apache.shardingsphere.elasticjob.cloud.config.JobCoreConfiguration;
import org.apache.shardingsphere.elasticjob.cloud.config.JobTypeConfiguration;
import org.apache.shardingsphere.elasticjob.cloud.config.dataflow.DataflowJobConfiguration;
import org.apache.shardingsphere.elasticjob.cloud.config.script.ScriptJobConfiguration;
-import org.apache.shardingsphere.elasticjob.cloud.executor.AbstractElasticJobExecutor;
import org.apache.shardingsphere.elasticjob.cloud.executor.CloudJobFacade;
import org.apache.shardingsphere.elasticjob.cloud.executor.JobTypeConfigurationUtil;
-import org.apache.shardingsphere.elasticjob.cloud.executor.type.DataflowJobExecutor;
-import org.apache.shardingsphere.elasticjob.cloud.executor.type.ScriptJobExecutor;
-import org.apache.shardingsphere.elasticjob.cloud.executor.type.SimpleJobExecutor;
import org.apache.shardingsphere.elasticjob.cloud.util.config.ShardingItemParameters;
+import org.apache.shardingsphere.elasticjob.dataflow.job.DataflowJob;
+import org.apache.shardingsphere.elasticjob.executor.ElasticJobExecutor;
import org.apache.shardingsphere.elasticjob.infra.exception.JobSystemException;
+import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.apache.shardingsphere.elasticjob.tracing.JobEventBus;
import org.springframework.context.support.ClassPathXmlApplicationContext;
@@ -56,22 +53,23 @@ public final class LocalTaskExecutor {
*/
@SuppressWarnings("unchecked")
public void execute() {
- AbstractElasticJobExecutor jobExecutor;
CloudJobFacade jobFacade = new CloudJobFacade(getShardingContexts(), getJobTypeConfiguration(), new JobEventBus());
+ ElasticJob elasticJob;
switch (localCloudJobConfiguration.getTypeConfig().getJobType()) {
case SIMPLE:
- jobExecutor = new SimpleJobExecutor(getJobInstance(SimpleJob.class), jobFacade);
+ elasticJob = getJobInstance(SimpleJob.class);
break;
case DATAFLOW:
- jobExecutor = new DataflowJobExecutor(getJobInstance(DataflowJob.class), jobFacade);
- break;
- case SCRIPT:
- jobExecutor = new ScriptJobExecutor(jobFacade);
+ elasticJob = getJobInstance(DataflowJob.class);
break;
default:
- throw new UnsupportedOperationException(localCloudJobConfiguration.getTypeConfig().getJobType().name());
+ elasticJob = null;
+ }
+ if (null == elasticJob) {
+ new ElasticJobExecutor("SCRIPT", jobFacade.loadJobConfiguration(true), jobFacade).execute();
+ } else {
+ new ElasticJobExecutor(elasticJob, jobFacade.loadJobConfiguration(true), jobFacade).execute();
}
- jobExecutor.execute();
}
private ShardingContexts getShardingContexts() {
diff --git a/elasticjob-cloud/elasticjob-cloud-executor/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/CloudJobFacadeTest.java b/elasticjob-cloud/elasticjob-cloud-executor/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/CloudJobFacadeTest.java
index 3c76b9a..553abec 100755
--- a/elasticjob-cloud/elasticjob-cloud-executor/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/CloudJobFacadeTest.java
+++ b/elasticjob-cloud/elasticjob-cloud-executor/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/CloudJobFacadeTest.java
@@ -17,8 +17,8 @@
package org.apache.shardingsphere.elasticjob.cloud.executor;
+import org.apache.shardingsphere.elasticjob.api.ElasticJob;
import org.apache.shardingsphere.elasticjob.api.listener.ShardingContexts;
-import org.apache.shardingsphere.elasticjob.cloud.api.ElasticJob;
import org.apache.shardingsphere.elasticjob.cloud.api.JobType;
import org.apache.shardingsphere.elasticjob.infra.context.ExecutionType;
import org.apache.shardingsphere.elasticjob.infra.exception.JobExecutionEnvironmentException;
diff --git a/elasticjob-cloud/elasticjob-cloud-executor/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/DaemonTaskSchedulerTest.java b/elasticjob-cloud/elasticjob-cloud-executor/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/DaemonTaskSchedulerTest.java
index 1e787d9..9894107 100755
--- a/elasticjob-cloud/elasticjob-cloud-executor/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/DaemonTaskSchedulerTest.java
+++ b/elasticjob-cloud/elasticjob-cloud-executor/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/DaemonTaskSchedulerTest.java
@@ -21,9 +21,10 @@ import org.apache.mesos.ExecutorDriver;
import org.apache.mesos.Protos.TaskID;
import org.apache.mesos.Protos.TaskState;
import org.apache.mesos.Protos.TaskStatus;
+import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.api.listener.ShardingContexts;
import org.apache.shardingsphere.elasticjob.infra.context.ExecutionType;
-import org.apache.shardingsphere.elasticjob.cloud.executor.fixture.TestScriptJobConfiguration;
+import org.apache.shardingsphere.elasticjob.script.props.ScriptJobProperties;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -38,7 +39,7 @@ import static org.mockito.Mockito.when;
public final class DaemonTaskSchedulerTest {
@Mock
- private JobFacade jobFacade;
+ private CloudJobFacade jobFacade;
@Mock
private ExecutorDriver executorDriver;
@@ -56,8 +57,8 @@ public final class DaemonTaskSchedulerTest {
@Before
public void setUp() {
daemonJob = new DaemonTaskScheduler.DaemonJob();
- daemonJob.setElasticJob(null);
daemonJob.setJobFacade(jobFacade);
+ daemonJob.setElasticJobType("SCRIPT");
daemonJob.setExecutorDriver(executorDriver);
daemonJob.setTaskId(taskId);
}
@@ -65,7 +66,7 @@ public final class DaemonTaskSchedulerTest {
@Test
public void assertJobRun() {
when(jobFacade.getShardingContexts()).thenReturn(shardingContexts);
- when(jobFacade.loadJobRootConfiguration(true)).thenReturn(new TestScriptJobConfiguration("test.sh").getTypeConfig());
+ when(jobFacade.loadJobConfiguration(true)).thenReturn(createJobConfiguration());
daemonJob.execute(jobExecutionContext);
verify(shardingContexts).setAllowSendJobEvent(true);
verify(executorDriver).sendStatusUpdate(TaskStatus.newBuilder().setTaskId(taskId).setState(TaskState.TASK_RUNNING).setMessage("BEGIN").build());
@@ -77,7 +78,7 @@ public final class DaemonTaskSchedulerTest {
public void assertJobRunWithEventSampling() {
when(shardingContexts.getJobEventSamplingCount()).thenReturn(2);
when(jobFacade.getShardingContexts()).thenReturn(shardingContexts);
- when(jobFacade.loadJobRootConfiguration(true)).thenReturn(new TestScriptJobConfiguration("test.sh").getTypeConfig());
+ when(jobFacade.loadJobConfiguration(true)).thenReturn(createJobConfiguration());
daemonJob.execute(jobExecutionContext);
verify(shardingContexts).setCurrentJobEventSamplingCount(1);
verify(shardingContexts).setAllowSendJobEvent(false);
@@ -88,4 +89,9 @@ public final class DaemonTaskSchedulerTest {
verify(executorDriver).sendStatusUpdate(TaskStatus.newBuilder().setTaskId(taskId).setState(TaskState.TASK_RUNNING).setMessage("COMPLETE").build());
verify(shardingContexts).setCurrentJobEventSamplingCount(0);
}
+
+ private JobConfiguration createJobConfiguration() {
+ return JobConfiguration.newBuilder("test_script_job", 3).cron("0/1 * * * * ?").jobErrorHandlerType("IGNORE")
+ .setProperty(ScriptJobProperties.SCRIPT_KEY, "test.sh").build();
+ }
}
diff --git a/elasticjob-cloud/elasticjob-cloud-executor/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/TaskExecutorThreadTest.java b/elasticjob-cloud/elasticjob-cloud-executor/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/TaskExecutorThreadTest.java
index c975f8f..0b7ee6f 100755
--- a/elasticjob-cloud/elasticjob-cloud-executor/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/TaskExecutorThreadTest.java
+++ b/elasticjob-cloud/elasticjob-cloud-executor/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/TaskExecutorThreadTest.java
@@ -26,9 +26,10 @@ import org.apache.mesos.Protos.TaskInfo;
import org.apache.mesos.Protos.TaskState;
import org.apache.shardingsphere.elasticjob.api.listener.ShardingContexts;
import org.apache.shardingsphere.elasticjob.cloud.api.JobType;
-import org.apache.shardingsphere.elasticjob.infra.context.ExecutionType;
import org.apache.shardingsphere.elasticjob.cloud.executor.fixture.TestJob;
+import org.apache.shardingsphere.elasticjob.infra.context.ExecutionType;
import org.apache.shardingsphere.elasticjob.infra.exception.JobSystemException;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
@@ -83,18 +84,19 @@ public final class TaskExecutorThreadTest {
try {
taskThread.run();
} catch (final JobSystemException ex) {
- assertTrue(ex.getMessage().startsWith("Elastic-Job: Class 'org.apache.shardingsphere.elasticjob.cloud.executor.TaskExecutorThreadTest' must implements ElasticJob interface."));
+ assertTrue(ex.getMessage().startsWith("ElasticJob: Class 'org.apache.shardingsphere.elasticjob.cloud.executor.TaskExecutorThreadTest' must implements ElasticJob interface."));
}
}
@Test
+ @Ignore
public void assertLaunchTaskWithWrongClass() {
TaskInfo taskInfo = buildWrongClass();
TaskExecutor.TaskThread taskThread = new TaskExecutor().new TaskThread(executorDriver, taskInfo);
try {
taskThread.run();
} catch (final JobSystemException ex) {
- assertTrue(ex.getMessage().startsWith("Elastic-Job: Class 'WrongClass' initialize failure, the error message is 'WrongClass'."));
+ assertTrue(ex.getMessage().startsWith("ElasticJob: Class 'WrongClass' initialize failure, the error message is 'WrongClass'."));
}
}
diff --git a/elasticjob-cloud/elasticjob-cloud-executor/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/fixture/TestJob.java b/elasticjob-cloud/elasticjob-cloud-executor/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/fixture/TestJob.java
index e35e12d..2beccc3 100755
--- a/elasticjob-cloud/elasticjob-cloud-executor/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/fixture/TestJob.java
+++ b/elasticjob-cloud/elasticjob-cloud-executor/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/fixture/TestJob.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.elasticjob.cloud.executor.fixture;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
-import org.apache.shardingsphere.elasticjob.cloud.api.simple.SimpleJob;
+import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
public final class TestJob implements SimpleJob {
diff --git a/elasticjob-cloud/elasticjob-cloud-executor/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/fixture/TestScriptJobConfiguration.java b/elasticjob-cloud/elasticjob-cloud-executor/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/fixture/TestScriptJobConfiguration.java
deleted file mode 100755
index be71167..0000000
--- a/elasticjob-cloud/elasticjob-cloud-executor/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/fixture/TestScriptJobConfiguration.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.elasticjob.cloud.executor.fixture;
-
-import org.apache.shardingsphere.elasticjob.cloud.config.JobTypeConfiguration;
-import org.apache.shardingsphere.elasticjob.cloud.config.script.ScriptJobConfiguration;
-import org.apache.shardingsphere.elasticjob.cloud.config.JobCoreConfiguration;
-import org.apache.shardingsphere.elasticjob.cloud.config.JobRootConfiguration;
-import org.apache.shardingsphere.elasticjob.cloud.executor.handler.JobProperties;
-import lombok.RequiredArgsConstructor;
-
-@RequiredArgsConstructor
-public final class TestScriptJobConfiguration implements JobRootConfiguration {
-
- private final String scriptCommandLine;
-
- @Override
- public JobTypeConfiguration getTypeConfig() {
- return new ScriptJobConfiguration(JobCoreConfiguration.newBuilder("test_script_job", "0/1 * * * * ?", 3)
- .jobProperties(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER.getKey(), "ignoredExceptionHandler").build(), scriptCommandLine);
- }
-}
diff --git a/elasticjob-cloud/elasticjob-cloud-executor/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/local/fixture/TestDataflowJob.java b/elasticjob-cloud/elasticjob-cloud-executor/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/local/fixture/TestDataflowJob.java
index 75377e4..3589b8d 100755
--- a/elasticjob-cloud/elasticjob-cloud-executor/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/local/fixture/TestDataflowJob.java
+++ b/elasticjob-cloud/elasticjob-cloud-executor/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/local/fixture/TestDataflowJob.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.elasticjob.cloud.executor.local.fixture;
import lombok.Getter;
import lombok.Setter;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
-import org.apache.shardingsphere.elasticjob.cloud.api.dataflow.DataflowJob;
+import org.apache.shardingsphere.elasticjob.dataflow.job.DataflowJob;
import java.util.List;
import java.util.stream.Collectors;
diff --git a/elasticjob-cloud/elasticjob-cloud-executor/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/local/fixture/TestSimpleJob.java b/elasticjob-cloud/elasticjob-cloud-executor/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/local/fixture/TestSimpleJob.java
index 5498f3b..325e9ba 100755
--- a/elasticjob-cloud/elasticjob-cloud-executor/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/local/fixture/TestSimpleJob.java
+++ b/elasticjob-cloud/elasticjob-cloud-executor/src/test/java/org/apache/shardingsphere/elasticjob/cloud/executor/local/fixture/TestSimpleJob.java
@@ -21,7 +21,7 @@ import com.google.common.base.Strings;
import lombok.Getter;
import lombok.Setter;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
-import org.apache.shardingsphere.elasticjob.cloud.api.simple.SimpleJob;
+import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
diff --git a/elasticjob-cloud/elasticjob-cloud-scheduler/pom.xml b/elasticjob-cloud/elasticjob-cloud-scheduler/pom.xml
index 0502182..5cc648c 100755
--- a/elasticjob-cloud/elasticjob-cloud-scheduler/pom.xml
+++ b/elasticjob-cloud/elasticjob-cloud-scheduler/pom.xml
@@ -34,6 +34,21 @@
</dependency>
<dependency>
<groupId>org.apache.shardingsphere.elasticjob</groupId>
+ <artifactId>elasticjob-simple-executor</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.shardingsphere.elasticjob</groupId>
+ <artifactId>elasticjob-dataflow-executor</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.shardingsphere.elasticjob</groupId>
+ <artifactId>elasticjob-script-executor</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.shardingsphere.elasticjob</groupId>
<artifactId>elasticjob-cloud-common</artifactId>
<version>${project.parent.version}</version>
</dependency>
diff --git a/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/mesos/TaskInfoData.java b/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/mesos/TaskInfoData.java
index 23eeae8..4e138be 100755
--- a/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/mesos/TaskInfoData.java
+++ b/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/mesos/TaskInfoData.java
@@ -22,7 +22,6 @@ import org.apache.commons.lang3.SerializationUtils;
import org.apache.shardingsphere.elasticjob.api.listener.ShardingContexts;
import org.apache.shardingsphere.elasticjob.cloud.config.dataflow.DataflowJobConfiguration;
import org.apache.shardingsphere.elasticjob.cloud.config.script.ScriptJobConfiguration;
-import org.apache.shardingsphere.elasticjob.cloud.executor.handler.JobProperties;
import org.apache.shardingsphere.elasticjob.cloud.scheduler.config.job.CloudJobConfiguration;
import org.apache.shardingsphere.elasticjob.cloud.scheduler.config.job.CloudJobExecutionType;
@@ -57,8 +56,8 @@ public final class TaskInfoData {
result.put("jobName", jobConfig.getJobName());
result.put("jobClass", jobConfig.getTypeConfig().getJobClass());
result.put("cron", CloudJobExecutionType.DAEMON == jobConfig.getJobExecutionType() ? jobConfig.getTypeConfig().getCoreConfig().getCron() : "");
- result.put("jobExceptionHandler", jobConfig.getTypeConfig().getCoreConfig().getJobProperties().get(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER));
- result.put("executorServiceHandler", jobConfig.getTypeConfig().getCoreConfig().getJobProperties().get(JobProperties.JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER));
+ result.put("executorServiceHandler", jobConfig.getTypeConfig().getCoreConfig().getJobExecutorServiceHandlerType());
+ result.put("jobExceptionHandler", jobConfig.getTypeConfig().getCoreConfig().getJobErrorHandlerType());
if (jobConfig.getTypeConfig() instanceof DataflowJobConfiguration) {
result.put("streamingProcess", Boolean.toString(((DataflowJobConfiguration) jobConfig.getTypeConfig()).isStreamingProcess()));
} else if (jobConfig.getTypeConfig() instanceof ScriptJobConfiguration) {
diff --git a/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/fixture/CloudJobConfigurationBuilder.java b/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/fixture/CloudJobConfigurationBuilder.java
index 00917f2..371e909 100755
--- a/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/fixture/CloudJobConfigurationBuilder.java
+++ b/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/fixture/CloudJobConfigurationBuilder.java
@@ -17,15 +17,15 @@
package org.apache.shardingsphere.elasticjob.cloud.scheduler.fixture;
-import org.apache.shardingsphere.elasticjob.cloud.api.simple.SimpleJob;
-import org.apache.shardingsphere.elasticjob.cloud.scheduler.config.job.CloudJobConfiguration;
-import org.apache.shardingsphere.elasticjob.cloud.scheduler.config.job.CloudJobExecutionType;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
import org.apache.shardingsphere.elasticjob.cloud.config.JobCoreConfiguration;
import org.apache.shardingsphere.elasticjob.cloud.config.dataflow.DataflowJobConfiguration;
import org.apache.shardingsphere.elasticjob.cloud.config.script.ScriptJobConfiguration;
import org.apache.shardingsphere.elasticjob.cloud.config.simple.SimpleJobConfiguration;
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.elasticjob.cloud.scheduler.config.job.CloudJobConfiguration;
+import org.apache.shardingsphere.elasticjob.cloud.scheduler.config.job.CloudJobExecutionType;
+import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class CloudJobConfigurationBuilder {
diff --git a/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/fixture/CloudJsonConstants.java b/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/fixture/CloudJsonConstants.java
index 2f0c43a..476b943 100755
--- a/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/fixture/CloudJsonConstants.java
+++ b/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/fixture/CloudJsonConstants.java
@@ -19,24 +19,19 @@ package org.apache.shardingsphere.elasticjob.cloud.scheduler.fixture;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.elasticjob.cloud.executor.handler.impl.DefaultExecutorServiceHandler;
-import org.apache.shardingsphere.elasticjob.cloud.executor.handler.impl.DefaultJobExceptionHandler;
import org.apache.shardingsphere.elasticjob.cloud.scheduler.config.job.CloudJobExecutionType;
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class CloudJsonConstants {
- private static final String JOB_PROPS_JSON = "{\"job_exception_handler\":\"" + DefaultJobExceptionHandler.class.getCanonicalName() + "\","
- + "\"executor_service_handler\":\"" + DefaultExecutorServiceHandler.class.getCanonicalName() + "\"}";
-
private static final String JOB_JSON = "{\"jobName\":\"%s\",\"jobClass\":\"org.apache.shardingsphere.elasticjob.cloud.scheduler.fixture.TestSimpleJob\",\"jobType\":\"SIMPLE\","
+ "\"cron\":\"0/30 * * * * ?\",\"shardingTotalCount\":10,\"shardingItemParameters\":\"\",\"jobParameter\":\"\",\"failover\":true,\"misfire\":%s,\"description\":\"\","
- + "\"jobProperties\":" + JOB_PROPS_JSON + ",\"appName\":\"test_app\",\"cpuCount\":1.0,\"memoryMB\":128.0,"
+ + "\"appName\":\"test_app\",\"cpuCount\":1.0,\"memoryMB\":128.0,"
+ "\"jobExecutionType\":\"%s\"}";
private static final String SPRING_JOB_JSON = "{\"jobName\":\"test_spring_job\",\"jobClass\":\"org.apache.shardingsphere.elasticjob.cloud.scheduler.fixture.TestSimpleJob\",\"jobType\":\"SIMPLE\","
+ "\"cron\":\"0/30 * * * * ?\",\"shardingTotalCount\":10,\"shardingItemParameters\":\"\",\"jobParameter\":\"\",\"failover\":true,\"misfire\":true,\"description\":\"\","
- + "\"jobProperties\":" + JOB_PROPS_JSON + ",\"appName\":\"test_spring_app\",\"cpuCount\":1.0,\"memoryMB\":128.0,"
+ + "\"appName\":\"test_spring_app\",\"cpuCount\":1.0,\"memoryMB\":128.0,"
+ "\"jobExecutionType\":\"TRANSIENT\",\"beanName\":\"springSimpleJob\","
+ "\"applicationContext\":\"applicationContext.xml\"}";
diff --git a/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/fixture/TestSimpleJob.java b/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/fixture/TestSimpleJob.java
index e29b867..6d6eaad 100755
--- a/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/fixture/TestSimpleJob.java
+++ b/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/fixture/TestSimpleJob.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.elasticjob.cloud.scheduler.fixture;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
-import org.apache.shardingsphere.elasticjob.cloud.api.simple.SimpleJob;
+import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
public final class TestSimpleJob implements SimpleJob {
diff --git a/elasticjob-lite/elasticjob-lite-core/pom.xml b/elasticjob-lite/elasticjob-lite-core/pom.xml
index f821140..e47734a 100644
--- a/elasticjob-lite/elasticjob-lite-core/pom.xml
+++ b/elasticjob-lite/elasticjob-lite-core/pom.xml
@@ -43,7 +43,17 @@
</dependency>
<dependency>
<groupId>org.apache.shardingsphere.elasticjob</groupId>
- <artifactId>elasticjob-executor-kernel</artifactId>
+ <artifactId>elasticjob-simple-executor</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.shardingsphere.elasticjob</groupId>
+ <artifactId>elasticjob-dataflow-executor</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.shardingsphere.elasticjob</groupId>
+ <artifactId>elasticjob-script-executor</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
diff --git a/elasticjob-lite/elasticjob-lite-lifecycle/pom.xml b/elasticjob-lite/elasticjob-lite-lifecycle/pom.xml
index 0d667ea..0ea8da8 100644
--- a/elasticjob-lite/elasticjob-lite-lifecycle/pom.xml
+++ b/elasticjob-lite/elasticjob-lite-lifecycle/pom.xml
@@ -32,21 +32,6 @@
<artifactId>elasticjob-lite-core</artifactId>
<version>${project.parent.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.shardingsphere.elasticjob</groupId>
- <artifactId>elasticjob-simple-executor</artifactId>
- <version>${project.parent.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.shardingsphere.elasticjob</groupId>
- <artifactId>elasticjob-dataflow-executor</artifactId>
- <version>${project.parent.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.shardingsphere.elasticjob</groupId>
- <artifactId>elasticjob-script-executor</artifactId>
- <version>${project.parent.version}</version>
- </dependency>
<dependency>
<groupId>org.projectlombok</groupId>
diff --git a/elasticjob-lite/elasticjob-lite-spring/pom.xml b/elasticjob-lite/elasticjob-lite-spring/pom.xml
index 270a2fb..77a65de 100644
--- a/elasticjob-lite/elasticjob-lite-spring/pom.xml
+++ b/elasticjob-lite/elasticjob-lite-spring/pom.xml
@@ -31,21 +31,6 @@
<artifactId>elasticjob-lite-core</artifactId>
<version>${project.parent.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.shardingsphere.elasticjob</groupId>
- <artifactId>elasticjob-simple-executor</artifactId>
- <version>${project.parent.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.shardingsphere.elasticjob</groupId>
- <artifactId>elasticjob-dataflow-executor</artifactId>
- <version>${project.parent.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.shardingsphere.elasticjob</groupId>
- <artifactId>elasticjob-script-executor</artifactId>
- <version>${project.parent.version}</version>
- </dependency>
<dependency>
<groupId>org.springframework</groupId>
diff --git a/examples/elasticjob-example-jobs/pom.xml b/examples/elasticjob-example-jobs/pom.xml
index 0ab70e1..781d4b2 100644
--- a/examples/elasticjob-example-jobs/pom.xml
+++ b/examples/elasticjob-example-jobs/pom.xml
@@ -31,20 +31,6 @@
<groupId>org.apache.shardingsphere.elasticjob</groupId>
<artifactId>elasticjob-lite-core</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.shardingsphere.elasticjob</groupId>
- <artifactId>elasticjob-simple-executor</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.shardingsphere.elasticjob</groupId>
- <artifactId>elasticjob-dataflow-executor</artifactId>
- <version>${project.parent.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.shardingsphere.elasticjob</groupId>
- <artifactId>elasticjob-script-executor</artifactId>
- <version>${project.parent.version}</version>
- </dependency>
<dependency>
<groupId>org.springframework</groupId>