You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/07/06 04:20:55 UTC

[shardingsphere-elasticjob-lite] branch master updated: Add job properties (#960)

This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere-elasticjob-lite.git


The following commit(s) were added to refs/heads/master by this push:
     new 042e8e6  Add job properties (#960)
042e8e6 is described below

commit 042e8e6ab27dab4a0454fe1c9c9b480f5231fe74
Author: Liang Zhang <te...@163.com>
AuthorDate: Mon Jul 6 12:20:49 2020 +0800

    Add job properties (#960)
    
    * Add DataflowJobProperties
    
    * Add ScriptJobProperties
---
 .../dataflow/executor/DataflowJobExecutor.java     |  9 ++++----
 .../lite/dataflow/props/DataflowJobProperties.java | 24 ++++++++--------------
 .../lite/script/executor/ScriptJobExecutor.java    |  7 +++----
 .../lite/script/props/ScriptJobProperties.java     | 24 ++++++++--------------
 .../settings/JobConfigurationAPIImplTest.java      | 10 ++++-----
 .../dataflow/DataflowJobBeanDefinitionParser.java  |  4 ++--
 .../script/ScriptJobBeanDefinitionParser.java      |  4 ++--
 .../elasticjob/lite/example/JavaMain.java          |  8 ++++----
 .../lite/example/config/DataflowJobConfig.java     |  6 +++---
 9 files changed, 39 insertions(+), 57 deletions(-)

diff --git a/elastic-job-lite-executor/elastic-job-lite-dataflow-executor/src/main/java/org/apache/shardingsphere/elasticjob/lite/dataflow/executor/DataflowJobExecutor.java b/elastic-job-lite-executor/elastic-job-lite-dataflow-executor/src/main/java/org/apache/shardingsphere/elasticjob/lite/dataflow/executor/DataflowJobExecutor.java
index 2c41692..ae197c2 100644
--- a/elastic-job-lite-executor/elastic-job-lite-dataflow-executor/src/main/java/org/apache/shardingsphere/elasticjob/lite/dataflow/executor/DataflowJobExecutor.java
+++ b/elastic-job-lite-executor/elastic-job-lite-dataflow-executor/src/main/java/org/apache/shardingsphere/elasticjob/lite/dataflow/executor/DataflowJobExecutor.java
@@ -17,9 +17,10 @@
 
 package org.apache.shardingsphere.elasticjob.lite.dataflow.executor;
 
-import org.apache.shardingsphere.elasticjob.lite.api.job.ShardingContext;
 import org.apache.shardingsphere.elasticjob.lite.api.job.JobConfiguration;
+import org.apache.shardingsphere.elasticjob.lite.api.job.ShardingContext;
 import org.apache.shardingsphere.elasticjob.lite.dataflow.job.DataflowJob;
+import org.apache.shardingsphere.elasticjob.lite.dataflow.props.DataflowJobProperties;
 import org.apache.shardingsphere.elasticjob.lite.executor.JobFacade;
 import org.apache.shardingsphere.elasticjob.lite.executor.item.impl.ClassedJobItemExecutor;
 
@@ -30,11 +31,9 @@ import java.util.List;
  */
 public final class DataflowJobExecutor implements ClassedJobItemExecutor<DataflowJob> {
     
-    public static final String STREAM_PROCESS_KEY = "streaming.process";
-    
     @Override
     public void process(final DataflowJob elasticJob, final JobConfiguration jobConfig, final JobFacade jobFacade, final ShardingContext shardingContext) {
-        if (Boolean.parseBoolean(jobConfig.getProps().getOrDefault(STREAM_PROCESS_KEY, false).toString())) {
+        if (Boolean.parseBoolean(jobConfig.getProps().getOrDefault(DataflowJobProperties.STREAM_PROCESS_KEY, false).toString())) {
             streamingExecute(elasticJob, jobConfig, jobFacade, shardingContext);
         } else {
             oneOffExecute(elasticJob, shardingContext);
@@ -53,7 +52,7 @@ public final class DataflowJobExecutor implements ClassedJobItemExecutor<Dataflo
     }
     
     private boolean isEligibleForJobRunning(final JobConfiguration jobConfig, final JobFacade jobFacade) {
-        return !jobFacade.isNeedSharding() && Boolean.parseBoolean(jobConfig.getProps().getOrDefault(DataflowJobExecutor.STREAM_PROCESS_KEY, false).toString());
+        return !jobFacade.isNeedSharding() && Boolean.parseBoolean(jobConfig.getProps().getOrDefault(DataflowJobProperties.STREAM_PROCESS_KEY, false).toString());
     }
     
     private void oneOffExecute(final DataflowJob elasticJob, final ShardingContext shardingContext) {
diff --git a/elastic-job-lite-spring/src/main/java/org/apache/shardingsphere/elasticjob/lite/spring/job/parser/script/ScriptJobBeanDefinitionParser.java b/elastic-job-lite-executor/elastic-job-lite-dataflow-executor/src/main/java/org/apache/shardingsphere/elasticjob/lite/dataflow/props/DataflowJobProperties.java
similarity index 52%
copy from elastic-job-lite-spring/src/main/java/org/apache/shardingsphere/elasticjob/lite/spring/job/parser/script/ScriptJobBeanDefinitionParser.java
copy to elastic-job-lite-executor/elastic-job-lite-dataflow-executor/src/main/java/org/apache/shardingsphere/elasticjob/lite/dataflow/props/DataflowJobProperties.java
index 8ee6af9..3580212 100644
--- a/elastic-job-lite-spring/src/main/java/org/apache/shardingsphere/elasticjob/lite/spring/job/parser/script/ScriptJobBeanDefinitionParser.java
+++ b/elastic-job-lite-executor/elastic-job-lite-dataflow-executor/src/main/java/org/apache/shardingsphere/elasticjob/lite/dataflow/props/DataflowJobProperties.java
@@ -7,7 +7,7 @@
  * the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ *  
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,23 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.elasticjob.lite.spring.job.parser.script;
-
-import org.apache.shardingsphere.elasticjob.lite.script.executor.ScriptJobExecutor;
-import org.apache.shardingsphere.elasticjob.lite.spring.job.parser.common.AbstractJobBeanDefinitionParser;
-import org.w3c.dom.Element;
-
-import java.util.Properties;
+package org.apache.shardingsphere.elasticjob.lite.dataflow.props;
 
 /**
- * Script job bean definition parser.
+ * Dataflow job properties.
  */
-public final class ScriptJobBeanDefinitionParser extends AbstractJobBeanDefinitionParser {
+public final class DataflowJobProperties {
     
-    @Override
-    protected Properties getProps(final Element element) {
-        Properties result = new Properties();
-        result.setProperty(ScriptJobExecutor.SCRIPT_KEY, element.getAttribute(ScriptJobBeanDefinitionParserTag.SCRIPT_COMMAND_LINE_ATTRIBUTE));
-        return result;
-    }
+    /**
+     * Whether use stream mode to process dataflow job.
+     */
+    public static final String STREAM_PROCESS_KEY = "streaming.process";
 }
diff --git a/elastic-job-lite-executor/elastic-job-lite-script-executor/src/main/java/org/apache/shardingsphere/elasticjob/lite/script/executor/ScriptJobExecutor.java b/elastic-job-lite-executor/elastic-job-lite-script-executor/src/main/java/org/apache/shardingsphere/elasticjob/lite/script/executor/ScriptJobExecutor.java
index cebf56b..b59ef65 100644
--- a/elastic-job-lite-executor/elastic-job-lite-script-executor/src/main/java/org/apache/shardingsphere/elasticjob/lite/script/executor/ScriptJobExecutor.java
+++ b/elastic-job-lite-executor/elastic-job-lite-script-executor/src/main/java/org/apache/shardingsphere/elasticjob/lite/script/executor/ScriptJobExecutor.java
@@ -21,12 +21,13 @@ import com.google.common.base.Strings;
 import org.apache.commons.exec.CommandLine;
 import org.apache.commons.exec.DefaultExecutor;
 import org.apache.shardingsphere.elasticjob.lite.api.job.ElasticJob;
-import org.apache.shardingsphere.elasticjob.lite.api.job.ShardingContext;
 import org.apache.shardingsphere.elasticjob.lite.api.job.JobConfiguration;
+import org.apache.shardingsphere.elasticjob.lite.api.job.ShardingContext;
 import org.apache.shardingsphere.elasticjob.lite.exception.JobConfigurationException;
 import org.apache.shardingsphere.elasticjob.lite.exception.JobSystemException;
 import org.apache.shardingsphere.elasticjob.lite.executor.JobFacade;
 import org.apache.shardingsphere.elasticjob.lite.executor.item.impl.TypedJobItemExecutor;
+import org.apache.shardingsphere.elasticjob.lite.script.props.ScriptJobProperties;
 import org.apache.shardingsphere.elasticjob.lite.util.json.GsonFactory;
 
 import java.io.IOException;
@@ -37,8 +38,6 @@ import java.util.Properties;
  */
 public final class ScriptJobExecutor implements TypedJobItemExecutor {
     
-    public static final String SCRIPT_KEY = "script.command.line";
-    
     @Override
     public void process(final ElasticJob elasticJob, final JobConfiguration jobConfig, final JobFacade jobFacade, final ShardingContext shardingContext) {
         CommandLine commandLine = CommandLine.parse(getScriptCommandLine(jobConfig.getProps()));
@@ -51,7 +50,7 @@ public final class ScriptJobExecutor implements TypedJobItemExecutor {
     }
     
     private String getScriptCommandLine(final Properties props) {
-        String result = props.getProperty(SCRIPT_KEY);
+        String result = props.getProperty(ScriptJobProperties.SCRIPT_KEY);
         if (Strings.isNullOrEmpty(result)) {
             throw new JobConfigurationException("Cannot find script command line, job is not executed.");
         }
diff --git a/elastic-job-lite-spring/src/main/java/org/apache/shardingsphere/elasticjob/lite/spring/job/parser/script/ScriptJobBeanDefinitionParser.java b/elastic-job-lite-executor/elastic-job-lite-script-executor/src/main/java/org/apache/shardingsphere/elasticjob/lite/script/props/ScriptJobProperties.java
similarity index 52%
copy from elastic-job-lite-spring/src/main/java/org/apache/shardingsphere/elasticjob/lite/spring/job/parser/script/ScriptJobBeanDefinitionParser.java
copy to elastic-job-lite-executor/elastic-job-lite-script-executor/src/main/java/org/apache/shardingsphere/elasticjob/lite/script/props/ScriptJobProperties.java
index 8ee6af9..c6eff3a 100644
--- a/elastic-job-lite-spring/src/main/java/org/apache/shardingsphere/elasticjob/lite/spring/job/parser/script/ScriptJobBeanDefinitionParser.java
+++ b/elastic-job-lite-executor/elastic-job-lite-script-executor/src/main/java/org/apache/shardingsphere/elasticjob/lite/script/props/ScriptJobProperties.java
@@ -7,7 +7,7 @@
  * the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ *  
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,23 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.elasticjob.lite.spring.job.parser.script;
-
-import org.apache.shardingsphere.elasticjob.lite.script.executor.ScriptJobExecutor;
-import org.apache.shardingsphere.elasticjob.lite.spring.job.parser.common.AbstractJobBeanDefinitionParser;
-import org.w3c.dom.Element;
-
-import java.util.Properties;
+package org.apache.shardingsphere.elasticjob.lite.script.props;
 
 /**
- * Script job bean definition parser.
+ * Script job properties.
  */
-public final class ScriptJobBeanDefinitionParser extends AbstractJobBeanDefinitionParser {
+public final class ScriptJobProperties {
     
-    @Override
-    protected Properties getProps(final Element element) {
-        Properties result = new Properties();
-        result.setProperty(ScriptJobExecutor.SCRIPT_KEY, element.getAttribute(ScriptJobBeanDefinitionParserTag.SCRIPT_COMMAND_LINE_ATTRIBUTE));
-        return result;
-    }
+    /**
+     * Script command line to be executed.
+     */
+    public static final String SCRIPT_KEY = "script.command.line";
 }
diff --git a/elastic-job-lite-lifecycle/src/test/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/settings/JobConfigurationAPIImplTest.java b/elastic-job-lite-lifecycle/src/test/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/settings/JobConfigurationAPIImplTest.java
index 4f30d5d..f777dd0 100644
--- a/elastic-job-lite-lifecycle/src/test/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/settings/JobConfigurationAPIImplTest.java
+++ b/elastic-job-lite-lifecycle/src/test/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/settings/JobConfigurationAPIImplTest.java
@@ -17,12 +17,12 @@
 
 package org.apache.shardingsphere.elasticjob.lite.lifecycle.internal.settings;
 
-import org.apache.shardingsphere.elasticjob.lite.dataflow.executor.DataflowJobExecutor;
-import org.apache.shardingsphere.elasticjob.lite.script.executor.ScriptJobExecutor;
+import org.apache.shardingsphere.elasticjob.lite.dataflow.props.DataflowJobProperties;
 import org.apache.shardingsphere.elasticjob.lite.internal.config.yaml.YamlJobConfiguration;
 import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobConfigurationAPI;
 import org.apache.shardingsphere.elasticjob.lite.lifecycle.fixture.LifecycleYamlConstants;
 import org.apache.shardingsphere.elasticjob.lite.reg.base.CoordinatorRegistryCenter;
+import org.apache.shardingsphere.elasticjob.lite.script.props.ScriptJobProperties;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -55,7 +55,7 @@ public final class JobConfigurationAPIImplTest {
         when(regCenter.get("/test_job/config")).thenReturn(LifecycleYamlConstants.getDataflowJobYaml());
         YamlJobConfiguration actual = jobConfigAPI.getJobConfiguration("test_job");
         assertJobConfig(actual);
-        assertThat(actual.getProps().getProperty(DataflowJobExecutor.STREAM_PROCESS_KEY), is("true"));
+        assertThat(actual.getProps().getProperty(DataflowJobProperties.STREAM_PROCESS_KEY), is("true"));
         verify(regCenter).get("/test_job/config");
     }
     
@@ -64,7 +64,7 @@ public final class JobConfigurationAPIImplTest {
         when(regCenter.get("/test_job/config")).thenReturn(LifecycleYamlConstants.getScriptJobYaml());
         YamlJobConfiguration actual = jobConfigAPI.getJobConfiguration("test_job");
         assertJobConfig(actual);
-        assertThat(actual.getProps().getProperty(ScriptJobExecutor.SCRIPT_KEY), is("echo"));
+        assertThat(actual.getProps().getProperty(ScriptJobProperties.SCRIPT_KEY), is("echo"));
         verify(regCenter).get("/test_job/config");
     }
     
@@ -96,7 +96,7 @@ public final class JobConfigurationAPIImplTest {
         jobConfiguration.setMaxTimeDiffSeconds(-1);
         jobConfiguration.setReconcileIntervalMinutes(10);
         jobConfiguration.setDescription("");
-        jobConfiguration.getProps().setProperty(DataflowJobExecutor.STREAM_PROCESS_KEY, "true");
+        jobConfiguration.getProps().setProperty(DataflowJobProperties.STREAM_PROCESS_KEY, "true");
         jobConfigAPI.updateJobConfiguration(jobConfiguration);
         verify(regCenter).update("/test_job/config", LifecycleYamlConstants.getDataflowJobYaml());
     }
diff --git a/elastic-job-lite-spring/src/main/java/org/apache/shardingsphere/elasticjob/lite/spring/job/parser/dataflow/DataflowJobBeanDefinitionParser.java b/elastic-job-lite-spring/src/main/java/org/apache/shardingsphere/elasticjob/lite/spring/job/parser/dataflow/DataflowJobBeanDefinitionParser.java
index 7bc5f0a..af55d1d 100644
--- a/elastic-job-lite-spring/src/main/java/org/apache/shardingsphere/elasticjob/lite/spring/job/parser/dataflow/DataflowJobBeanDefinitionParser.java
+++ b/elastic-job-lite-spring/src/main/java/org/apache/shardingsphere/elasticjob/lite/spring/job/parser/dataflow/DataflowJobBeanDefinitionParser.java
@@ -17,7 +17,7 @@
 
 package org.apache.shardingsphere.elasticjob.lite.spring.job.parser.dataflow;
 
-import org.apache.shardingsphere.elasticjob.lite.dataflow.executor.DataflowJobExecutor;
+import org.apache.shardingsphere.elasticjob.lite.dataflow.props.DataflowJobProperties;
 import org.apache.shardingsphere.elasticjob.lite.spring.job.parser.common.AbstractJobBeanDefinitionParser;
 import org.w3c.dom.Element;
 
@@ -31,7 +31,7 @@ public final class DataflowJobBeanDefinitionParser extends AbstractJobBeanDefini
     @Override
     protected Properties getProps(final Element element) {
         Properties result = new Properties();
-        result.setProperty(DataflowJobExecutor.STREAM_PROCESS_KEY, element.getAttribute(DataflowJobBeanDefinitionParserTag.STREAMING_PROCESS_ATTRIBUTE));
+        result.setProperty(DataflowJobProperties.STREAM_PROCESS_KEY, element.getAttribute(DataflowJobBeanDefinitionParserTag.STREAMING_PROCESS_ATTRIBUTE));
         return result;
     }
 }
diff --git a/elastic-job-lite-spring/src/main/java/org/apache/shardingsphere/elasticjob/lite/spring/job/parser/script/ScriptJobBeanDefinitionParser.java b/elastic-job-lite-spring/src/main/java/org/apache/shardingsphere/elasticjob/lite/spring/job/parser/script/ScriptJobBeanDefinitionParser.java
index 8ee6af9..4dbfbc5 100644
--- a/elastic-job-lite-spring/src/main/java/org/apache/shardingsphere/elasticjob/lite/spring/job/parser/script/ScriptJobBeanDefinitionParser.java
+++ b/elastic-job-lite-spring/src/main/java/org/apache/shardingsphere/elasticjob/lite/spring/job/parser/script/ScriptJobBeanDefinitionParser.java
@@ -17,7 +17,7 @@
 
 package org.apache.shardingsphere.elasticjob.lite.spring.job.parser.script;
 
-import org.apache.shardingsphere.elasticjob.lite.script.executor.ScriptJobExecutor;
+import org.apache.shardingsphere.elasticjob.lite.script.props.ScriptJobProperties;
 import org.apache.shardingsphere.elasticjob.lite.spring.job.parser.common.AbstractJobBeanDefinitionParser;
 import org.w3c.dom.Element;
 
@@ -31,7 +31,7 @@ public final class ScriptJobBeanDefinitionParser extends AbstractJobBeanDefiniti
     @Override
     protected Properties getProps(final Element element) {
         Properties result = new Properties();
-        result.setProperty(ScriptJobExecutor.SCRIPT_KEY, element.getAttribute(ScriptJobBeanDefinitionParserTag.SCRIPT_COMMAND_LINE_ATTRIBUTE));
+        result.setProperty(ScriptJobProperties.SCRIPT_KEY, element.getAttribute(ScriptJobBeanDefinitionParserTag.SCRIPT_COMMAND_LINE_ATTRIBUTE));
         return result;
     }
 }
diff --git a/examples/elastic-job-example-lite-java/src/main/java/org/apache/shardingsphere/elasticjob/lite/example/JavaMain.java b/examples/elastic-job-example-lite-java/src/main/java/org/apache/shardingsphere/elasticjob/lite/example/JavaMain.java
index 069c89c..1f4339e 100644
--- a/examples/elastic-job-example-lite-java/src/main/java/org/apache/shardingsphere/elasticjob/lite/example/JavaMain.java
+++ b/examples/elastic-job-example-lite-java/src/main/java/org/apache/shardingsphere/elasticjob/lite/example/JavaMain.java
@@ -20,13 +20,13 @@ package org.apache.shardingsphere.elasticjob.lite.example;
 import org.apache.commons.dbcp.BasicDataSource;
 import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
 import org.apache.shardingsphere.elasticjob.lite.api.job.JobConfiguration;
+import org.apache.shardingsphere.elasticjob.lite.dataflow.props.DataflowJobProperties;
 import org.apache.shardingsphere.elasticjob.lite.example.job.dataflow.JavaDataflowJob;
 import org.apache.shardingsphere.elasticjob.lite.example.job.simple.JavaSimpleJob;
-import org.apache.shardingsphere.elasticjob.lite.dataflow.executor.DataflowJobExecutor;
-import org.apache.shardingsphere.elasticjob.lite.script.executor.ScriptJobExecutor;
 import org.apache.shardingsphere.elasticjob.lite.reg.base.CoordinatorRegistryCenter;
 import org.apache.shardingsphere.elasticjob.lite.reg.zookeeper.ZookeeperConfiguration;
 import org.apache.shardingsphere.elasticjob.lite.reg.zookeeper.ZookeeperRegistryCenter;
+import org.apache.shardingsphere.elasticjob.lite.script.props.ScriptJobProperties;
 import org.apache.shardingsphere.elasticjob.lite.tracing.api.TracingConfiguration;
 
 import javax.sql.DataSource;
@@ -91,12 +91,12 @@ public final class JavaMain {
     private static void setUpDataflowJob(final CoordinatorRegistryCenter regCenter, final TracingConfiguration tracingConfig) {
         new ScheduleJobBootstrap(regCenter, new JavaDataflowJob(), JobConfiguration.newBuilder("javaDataflowElasticJob", 3)
                 .cron("0/5 * * * * ?").shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou")
-                .setProperty(DataflowJobExecutor.STREAM_PROCESS_KEY, Boolean.TRUE.toString()).build(), tracingConfig).schedule();
+                .setProperty(DataflowJobProperties.STREAM_PROCESS_KEY, Boolean.TRUE.toString()).build(), tracingConfig).schedule();
     }
     
     private static void setUpScriptJob(final CoordinatorRegistryCenter regCenter, final TracingConfiguration tracingConfig) throws IOException {
         new ScheduleJobBootstrap(regCenter, "SCRIPT", JobConfiguration.newBuilder("scriptElasticJob", 3)
-                .cron("0/5 * * * * ?").setProperty(ScriptJobExecutor.SCRIPT_KEY, buildScriptCommandLine()).build(), tracingConfig).schedule();
+                .cron("0/5 * * * * ?").setProperty(ScriptJobProperties.SCRIPT_KEY, buildScriptCommandLine()).build(), tracingConfig).schedule();
     }
     
     private static String buildScriptCommandLine() throws IOException {
diff --git a/examples/elastic-job-example-lite-springboot/src/main/java/org/apache/shardingsphere/elasticjob/lite/example/config/DataflowJobConfig.java b/examples/elastic-job-example-lite-springboot/src/main/java/org/apache/shardingsphere/elasticjob/lite/example/config/DataflowJobConfig.java
index 4e9395b..791cc3c 100644
--- a/examples/elastic-job-example-lite-springboot/src/main/java/org/apache/shardingsphere/elasticjob/lite/example/config/DataflowJobConfig.java
+++ b/examples/elastic-job-example-lite-springboot/src/main/java/org/apache/shardingsphere/elasticjob/lite/example/config/DataflowJobConfig.java
@@ -18,10 +18,10 @@
 package org.apache.shardingsphere.elasticjob.lite.example.config;
 
 import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
-import org.apache.shardingsphere.elasticjob.lite.dataflow.job.DataflowJob;
 import org.apache.shardingsphere.elasticjob.lite.api.job.JobConfiguration;
+import org.apache.shardingsphere.elasticjob.lite.dataflow.job.DataflowJob;
+import org.apache.shardingsphere.elasticjob.lite.dataflow.props.DataflowJobProperties;
 import org.apache.shardingsphere.elasticjob.lite.example.job.dataflow.SpringDataflowJob;
-import org.apache.shardingsphere.elasticjob.lite.dataflow.executor.DataflowJobExecutor;
 import org.apache.shardingsphere.elasticjob.lite.reg.zookeeper.ZookeeperRegistryCenter;
 import org.apache.shardingsphere.elasticjob.lite.tracing.api.TracingConfiguration;
 import org.springframework.beans.factory.annotation.Value;
@@ -53,6 +53,6 @@ public class DataflowJobConfig {
     
     private JobConfiguration getJobConfiguration(final Class<? extends DataflowJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) {
         return JobConfiguration.newBuilder(jobClass.getName(), shardingTotalCount)
-                .cron(cron).shardingItemParameters(shardingItemParameters).setProperty(DataflowJobExecutor.STREAM_PROCESS_KEY, Boolean.TRUE.toString()).overwrite(true).build();
+                .cron(cron).shardingItemParameters(shardingItemParameters).setProperty(DataflowJobProperties.STREAM_PROCESS_KEY, Boolean.TRUE.toString()).overwrite(true).build();
     }
 }