You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/11/28 04:24:22 UTC

[shardingsphere-elasticjob] branch master updated: Extract all reloadable objects from ElasticJobExecutor (#1735)

This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere-elasticjob.git


The following commit(s) were added to refs/heads/master by this push:
     new edaef2d  Extract all reloadable objects from ElasticJobExecutor (#1735)
edaef2d is described below

commit edaef2d69b50bc339e9dfea17a69616b47f7ebb2
Author: 吴伟杰 <ro...@me.com>
AuthorDate: Sat Nov 28 12:24:10 2020 +0800

    Extract all reloadable objects from ElasticJobExecutor (#1735)
    
    * Extract all reloadable objects from ElasticJobExecutor
    
    * Add debug log in implementations of Reloadable
    
    * Add testcases for implementations of Reloadable
    
    * Fix Checkstyle
    
    * Extract init method of Reloadable into ReloadablePostProcessor
---
 .../elasticjob/error/handler/JobErrorHandler.java  |   8 +-
 .../error/handler/JobErrorHandlerFactory.java      |   2 +-
 .../error/handler/JobErrorHandlerReloadable.java   |  82 ++++++++++++++++
 ...rdingsphere.elasticjob.infra.context.Reloadable |  18 ++++
 .../handler/JobErrorHandlerReloadableTest.java     | 104 +++++++++++++++++++++
 .../elasticjob/executor/ElasticJobExecutor.java    |  38 ++++----
 .../executor/context/ExecutorContext.java          |  89 ++++++++++++++++++
 .../executor/ElasticJobExecutorTest.java           |   2 +
 .../concurrent/ExecutorServiceReloadable.java      |  78 ++++++++++++++++
 .../elasticjob/infra/context/Reloadable.java       |  30 ++++--
 .../infra/context/ReloadablePostProcessor.java     |  20 ++--
 .../JobExecutorServiceHandlerFactory.java          |   2 +-
 ...rdingsphere.elasticjob.infra.context.Reloadable |  18 ++++
 .../concurrent/ExecutorServiceReloadableTest.java  | 101 ++++++++++++++++++++
 14 files changed, 547 insertions(+), 45 deletions(-)

diff --git a/elasticjob-ecosystem/elasticjob-error-handler/elasticjob-error-handler-spi/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandler.java b/elasticjob-ecosystem/elasticjob-error-handler/elasticjob-error-handler-spi/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandler.java
index fe87a19..95f85d9 100644
--- a/elasticjob-ecosystem/elasticjob-error-handler/elasticjob-error-handler-spi/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandler.java
+++ b/elasticjob-ecosystem/elasticjob-error-handler/elasticjob-error-handler-spi/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandler.java
@@ -20,10 +20,12 @@ package org.apache.shardingsphere.elasticjob.error.handler;
 import org.apache.shardingsphere.elasticjob.infra.spi.SPIPostProcessor;
 import org.apache.shardingsphere.elasticjob.infra.spi.TypedSPI;
 
+import java.io.Closeable;
+
 /**
  * Job error handler.
  */
-public interface JobErrorHandler extends TypedSPI, SPIPostProcessor {
+public interface JobErrorHandler extends TypedSPI, SPIPostProcessor, Closeable {
     
     /**
      * Handle exception.
@@ -32,4 +34,8 @@ public interface JobErrorHandler extends TypedSPI, SPIPostProcessor {
      * @param cause failure cause
      */
     void handleException(String jobName, Throwable cause);
+    
+    @Override
+    default void close() {
+    }
 }
diff --git a/elasticjob-ecosystem/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandlerFactory.java b/elasticjob-ecosystem/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandlerFactory.java
index 7b8951f..363573c 100644
--- a/elasticjob-ecosystem/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandlerFactory.java
+++ b/elasticjob-ecosystem/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandlerFactory.java
@@ -31,7 +31,7 @@ import java.util.Properties;
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
 public final class JobErrorHandlerFactory {
     
-    private static final String DEFAULT_HANDLER = "LOG";
+    public static final String DEFAULT_HANDLER = "LOG";
     
     static {
         ElasticJobServiceLoader.registerTypedService(JobErrorHandler.class);
diff --git a/elasticjob-ecosystem/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandlerReloadable.java b/elasticjob-ecosystem/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandlerReloadable.java
new file mode 100644
index 0000000..1256561
--- /dev/null
+++ b/elasticjob-ecosystem/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandlerReloadable.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.error.handler;
+
+import com.google.common.base.Strings;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
+import org.apache.shardingsphere.elasticjob.infra.context.Reloadable;
+import org.apache.shardingsphere.elasticjob.infra.context.ReloadablePostProcessor;
+import org.apache.shardingsphere.elasticjob.infra.exception.JobConfigurationException;
+
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * JobErrorHandler reloadable.
+ */
+@Slf4j
+public final class JobErrorHandlerReloadable implements Reloadable<JobErrorHandler>, ReloadablePostProcessor {
+    
+    private String jobErrorHandlerType;
+    
+    private Properties props;
+    
+    private JobErrorHandler jobErrorHandler;
+    
+    @Override
+    public void init(final JobConfiguration jobConfig) {
+        jobErrorHandlerType = Strings.isNullOrEmpty(jobConfig.getJobErrorHandlerType()) ? JobErrorHandlerFactory.DEFAULT_HANDLER : jobConfig.getJobErrorHandlerType();
+        props = (Properties) jobConfig.getProps().clone();
+        jobErrorHandler = JobErrorHandlerFactory.createHandler(jobErrorHandlerType, props)
+                .orElseThrow(() -> new JobConfigurationException("Cannot find job error handler type '%s'.", jobErrorHandlerType));
+    }
+    
+    @Override
+    public synchronized void reloadIfNecessary(final JobConfiguration jobConfig) {
+        String newJobErrorHandlerType = Strings.isNullOrEmpty(jobConfig.getJobErrorHandlerType()) ? JobErrorHandlerFactory.DEFAULT_HANDLER : jobConfig.getJobErrorHandlerType();
+        if (newJobErrorHandlerType.equals(jobErrorHandlerType) && props.equals(jobConfig.getProps())) {
+            return;
+        }
+        log.debug("JobErrorHandler reload occurred in the job '{}'. Change from '{}' to '{}'.", jobConfig.getJobName(), jobErrorHandlerType, newJobErrorHandlerType);
+        reload(newJobErrorHandlerType, jobConfig.getProps());
+    }
+    
+    private void reload(final String jobErrorHandlerType, final Properties props) {
+        jobErrorHandler.close();
+        this.jobErrorHandlerType = jobErrorHandlerType;
+        this.props = (Properties) props.clone();
+        jobErrorHandler = JobErrorHandlerFactory.createHandler(jobErrorHandlerType, props)
+                .orElseThrow(() -> new JobConfigurationException("Cannot find job error handler type '%s'.", jobErrorHandlerType));
+    }
+    
+    @Override
+    public JobErrorHandler getInstance() {
+        return jobErrorHandler;
+    }
+    
+    @Override
+    public String getType() {
+        return JobErrorHandler.class.getName();
+    }
+    
+    @Override
+    public void close() {
+        Optional.ofNullable(jobErrorHandler).ifPresent(JobErrorHandler::close);
+    }
+}
diff --git a/elasticjob-ecosystem/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/main/resources/META-INF/services/org.apache.shardingsphere.elasticjob.infra.context.Reloadable b/elasticjob-ecosystem/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/main/resources/META-INF/services/org.apache.shardingsphere.elasticjob.infra.context.Reloadable
new file mode 100644
index 0000000..0a00a1d
--- /dev/null
+++ b/elasticjob-ecosystem/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/main/resources/META-INF/services/org.apache.shardingsphere.elasticjob.infra.context.Reloadable
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.elasticjob.error.handler.JobErrorHandlerReloadable
diff --git a/elasticjob-ecosystem/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandlerReloadableTest.java b/elasticjob-ecosystem/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandlerReloadableTest.java
new file mode 100644
index 0000000..48a7559
--- /dev/null
+++ b/elasticjob-ecosystem/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandlerReloadableTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.error.handler;
+
+import lombok.SneakyThrows;
+import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
+import org.apache.shardingsphere.elasticjob.error.handler.general.IgnoreJobErrorHandler;
+import org.apache.shardingsphere.elasticjob.error.handler.general.LogJobErrorHandler;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.lang.reflect.Field;
+import java.util.Properties;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.verify;
+
+@RunWith(MockitoJUnitRunner.class)
+public final class JobErrorHandlerReloadableTest {
+    
+    @Mock
+    private JobErrorHandler mockJobErrorHandler;
+    
+    @Test
+    public void assertInitialize() {
+        JobErrorHandlerReloadable jobErrorHandlerReloadable = new JobErrorHandlerReloadable();
+        String jobErrorHandlerType = "IGNORE";
+        JobConfiguration jobConfig = JobConfiguration.newBuilder("job", 1).jobErrorHandlerType(jobErrorHandlerType).build();
+        assertNull(jobErrorHandlerReloadable.getInstance());
+        jobErrorHandlerReloadable.init(jobConfig);
+        JobErrorHandler actual = jobErrorHandlerReloadable.getInstance();
+        assertNotNull(actual);
+        assertThat(actual.getType(), equalTo(jobErrorHandlerType));
+        assertTrue(actual instanceof IgnoreJobErrorHandler);
+    }
+    
+    @Test
+    public void assertReload() {
+        JobErrorHandlerReloadable jobErrorHandlerReloadable = new JobErrorHandlerReloadable();
+        setField(jobErrorHandlerReloadable, "jobErrorHandler", mockJobErrorHandler);
+        setField(jobErrorHandlerReloadable, "jobErrorHandlerType", "mock");
+        setField(jobErrorHandlerReloadable, "props", new Properties());
+        String newJobErrorHandlerType = "LOG";
+        JobConfiguration newJobConfig = JobConfiguration.newBuilder("job", 1).jobErrorHandlerType(newJobErrorHandlerType).build();
+        jobErrorHandlerReloadable.reloadIfNecessary(newJobConfig);
+        verify(mockJobErrorHandler).close();
+        JobErrorHandler actual = jobErrorHandlerReloadable.getInstance();
+        assertThat(actual.getType(), equalTo(newJobErrorHandlerType));
+        assertTrue(actual instanceof LogJobErrorHandler);
+    }
+    
+    @Test
+    public void assertUnnecessaryToReload() {
+        JobErrorHandlerReloadable jobErrorHandlerReloadable = new JobErrorHandlerReloadable();
+        String jobErrorHandlerType = "IGNORE";
+        JobConfiguration jobConfig = JobConfiguration.newBuilder("job", 1).jobErrorHandlerType(jobErrorHandlerType).build();
+        jobErrorHandlerReloadable.init(jobConfig);
+        JobErrorHandler expected = jobErrorHandlerReloadable.getInstance();
+        jobErrorHandlerReloadable.reloadIfNecessary(jobConfig);
+        JobErrorHandler actual = jobErrorHandlerReloadable.getInstance();
+        assertThat(actual, is(expected));
+    }
+    
+    @Test
+    public void assertShutdown() {
+        JobErrorHandlerReloadable jobErrorHandlerReloadable = new JobErrorHandlerReloadable();
+        setField(jobErrorHandlerReloadable, "jobErrorHandler", mockJobErrorHandler);
+        jobErrorHandlerReloadable.close();
+        verify(mockJobErrorHandler).close();
+    }
+    
+    @SneakyThrows
+    private void setField(final Object target, final String fieldName, final Object value) {
+        Field field = target.getClass().getDeclaredField(fieldName);
+        boolean originAccessible = field.isAccessible();
+        if (!originAccessible) {
+            field.setAccessible(true);
+        }
+        field.set(target, value);
+        field.setAccessible(originAccessible);
+    }
+}
diff --git a/elasticjob-ecosystem/elasticjob-executor/elasticjob-executor-kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutor.java b/elasticjob-ecosystem/elasticjob-executor/elasticjob-executor-kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutor.java
index 448213f..1b54eae 100644
--- a/elasticjob-ecosystem/elasticjob-executor/elasticjob-executor-kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutor.java
+++ b/elasticjob-ecosystem/elasticjob-executor/elasticjob-executor-kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutor.java
@@ -21,14 +21,12 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.elasticjob.api.ElasticJob;
 import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
 import org.apache.shardingsphere.elasticjob.error.handler.JobErrorHandler;
-import org.apache.shardingsphere.elasticjob.error.handler.JobErrorHandlerFactory;
+import org.apache.shardingsphere.elasticjob.executor.context.ExecutorContext;
 import org.apache.shardingsphere.elasticjob.executor.item.JobItemExecutor;
 import org.apache.shardingsphere.elasticjob.executor.item.JobItemExecutorFactory;
 import org.apache.shardingsphere.elasticjob.infra.env.IpUtils;
 import org.apache.shardingsphere.elasticjob.infra.exception.ExceptionUtils;
-import org.apache.shardingsphere.elasticjob.infra.exception.JobConfigurationException;
 import org.apache.shardingsphere.elasticjob.infra.exception.JobExecutionEnvironmentException;
-import org.apache.shardingsphere.elasticjob.infra.handler.threadpool.JobExecutorServiceHandlerFactory;
 import org.apache.shardingsphere.elasticjob.infra.listener.ShardingContexts;
 import org.apache.shardingsphere.elasticjob.tracing.event.JobExecutionEvent;
 import org.apache.shardingsphere.elasticjob.tracing.event.JobExecutionEvent.ExecutionSource;
@@ -48,15 +46,11 @@ public final class ElasticJobExecutor {
     
     private final ElasticJob elasticJob;
     
-    private final JobConfiguration jobConfig;
-    
     private final JobFacade jobFacade;
     
     private final JobItemExecutor jobItemExecutor;
     
-    private final ExecutorService executorService;
-    
-    private final JobErrorHandler jobErrorHandler;
+    private final ExecutorContext executorContext;
     
     private final Map<Integer, String> itemErrorMessages;
     
@@ -70,12 +64,9 @@ public final class ElasticJobExecutor {
     
     private ElasticJobExecutor(final ElasticJob elasticJob, final JobConfiguration jobConfig, final JobFacade jobFacade, final JobItemExecutor jobItemExecutor) {
         this.elasticJob = elasticJob;
-        this.jobConfig = jobConfig;
         this.jobFacade = jobFacade;
         this.jobItemExecutor = jobItemExecutor;
-        executorService = JobExecutorServiceHandlerFactory.getHandler(jobConfig.getJobExecutorServiceHandlerType()).createExecutorService(jobConfig.getJobName());
-        jobErrorHandler = JobErrorHandlerFactory.createHandler(jobConfig.getJobErrorHandlerType(), jobConfig.getProps())
-                .orElseThrow(() -> new JobConfigurationException("Can not find job error handler type '%s'.", jobConfig.getJobErrorHandlerType()));
+        executorContext = new ExecutorContext(jobFacade.loadJobConfiguration(true));
         itemErrorMessages = new ConcurrentHashMap<>(jobConfig.getShardingTotalCount(), 1);
     }
     
@@ -83,6 +74,9 @@ public final class ElasticJobExecutor {
      * Execute job.
      */
     public void execute() {
+        JobConfiguration jobConfig = jobFacade.loadJobConfiguration(true);
+        executorContext.reloadIfNecessary(jobConfig);
+        JobErrorHandler jobErrorHandler = executorContext.get(JobErrorHandler.class);
         try {
             jobFacade.checkJobExecutionEnvironment();
         } catch (final JobExecutionEnvironmentException cause) {
@@ -103,10 +97,10 @@ public final class ElasticJobExecutor {
             //CHECKSTYLE:ON
             jobErrorHandler.handleException(jobConfig.getJobName(), cause);
         }
-        execute(shardingContexts, ExecutionSource.NORMAL_TRIGGER);
+        execute(jobConfig, shardingContexts, ExecutionSource.NORMAL_TRIGGER);
         while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
             jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
-            execute(shardingContexts, ExecutionSource.MISFIRE);
+            execute(jobConfig, shardingContexts, ExecutionSource.MISFIRE);
         }
         jobFacade.failoverIfNecessary();
         try {
@@ -118,7 +112,7 @@ public final class ElasticJobExecutor {
         }
     }
     
-    private void execute(final ShardingContexts shardingContexts, final ExecutionSource executionSource) {
+    private void execute(final JobConfiguration jobConfig, final ShardingContexts shardingContexts, final ExecutionSource executionSource) {
         if (shardingContexts.getShardingItemParameters().isEmpty()) {
             jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format("Sharding item for job '%s' is empty.", jobConfig.getJobName()));
             return;
@@ -127,7 +121,7 @@ public final class ElasticJobExecutor {
         String taskId = shardingContexts.getTaskId();
         jobFacade.postJobStatusTraceEvent(taskId, State.TASK_RUNNING, "");
         try {
-            process(shardingContexts, executionSource);
+            process(jobConfig, shardingContexts, executionSource);
         } finally {
             // TODO Consider increasing the status of job failure, and how to handle the overall loop of job failure
             jobFacade.registerJobCompleted(shardingContexts);
@@ -139,23 +133,24 @@ public final class ElasticJobExecutor {
         }
     }
     
-    private void process(final ShardingContexts shardingContexts, final ExecutionSource executionSource) {
+    private void process(final JobConfiguration jobConfig, final ShardingContexts shardingContexts, final ExecutionSource executionSource) {
         Collection<Integer> items = shardingContexts.getShardingItemParameters().keySet();
         if (1 == items.size()) {
             int item = shardingContexts.getShardingItemParameters().keySet().iterator().next();
             JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(IpUtils.getHostName(), IpUtils.getIp(), shardingContexts.getTaskId(), jobConfig.getJobName(), executionSource, item);
-            process(shardingContexts, item, jobExecutionEvent);
+            process(jobConfig, shardingContexts, item, jobExecutionEvent);
             return;
         }
         CountDownLatch latch = new CountDownLatch(items.size());
         for (int each : items) {
             JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(IpUtils.getHostName(), IpUtils.getIp(), shardingContexts.getTaskId(), jobConfig.getJobName(), executionSource, each);
+            ExecutorService executorService = executorContext.get(ExecutorService.class);
             if (executorService.isShutdown()) {
                 return;
             }
             executorService.submit(() -> {
                 try {
-                    process(shardingContexts, each, jobExecutionEvent);
+                    process(jobConfig, shardingContexts, each, jobExecutionEvent);
                 } finally {
                     latch.countDown();
                 }
@@ -169,7 +164,7 @@ public final class ElasticJobExecutor {
     }
     
     @SuppressWarnings("unchecked")
-    private void process(final ShardingContexts shardingContexts, final int item, final JobExecutionEvent startEvent) {
+    private void process(final JobConfiguration jobConfig, final ShardingContexts shardingContexts, final int item, final JobExecutionEvent startEvent) {
         jobFacade.postJobExecutionEvent(startEvent);
         log.trace("Job '{}' executing, item is: '{}'.", jobConfig.getJobName(), item);
         JobExecutionEvent completeEvent;
@@ -184,6 +179,7 @@ public final class ElasticJobExecutor {
             completeEvent = startEvent.executionFailure(ExceptionUtils.transform(cause));
             jobFacade.postJobExecutionEvent(completeEvent);
             itemErrorMessages.put(item, ExceptionUtils.transform(cause));
+            JobErrorHandler jobErrorHandler = executorContext.get(JobErrorHandler.class);
             jobErrorHandler.handleException(jobConfig.getJobName(), cause);
         }
     }
@@ -192,6 +188,6 @@ public final class ElasticJobExecutor {
      * Shutdown executor.
      */
     public void shutdown() {
-        executorService.shutdown();
+        executorContext.shutdown();
     }
 }
diff --git a/elasticjob-ecosystem/elasticjob-executor/elasticjob-executor-kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/context/ExecutorContext.java b/elasticjob-ecosystem/elasticjob-executor/elasticjob-executor-kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/context/ExecutorContext.java
new file mode 100644
index 0000000..38786c6
--- /dev/null
+++ b/elasticjob-ecosystem/elasticjob-executor/elasticjob-executor-kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/context/ExecutorContext.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.executor.context;
+
+import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
+import org.apache.shardingsphere.elasticjob.infra.context.Reloadable;
+import org.apache.shardingsphere.elasticjob.infra.context.ReloadablePostProcessor;
+import org.apache.shardingsphere.elasticjob.infra.spi.ElasticJobServiceLoader;
+
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.ServiceLoader;
+
+/**
+ * Executor context.
+ *
+ * @see org.apache.shardingsphere.elasticjob.error.handler.JobErrorHandlerReloadable
+ * @see org.apache.shardingsphere.elasticjob.infra.concurrent.ExecutorServiceReloadable
+ */
+public final class ExecutorContext {
+    
+    static {
+        ElasticJobServiceLoader.registerTypedService(Reloadable.class);
+    }
+    
+    private final Map<String, Reloadable<?>> reloadableItems = new LinkedHashMap<>();
+    
+    public ExecutorContext(final JobConfiguration jobConfig) {
+        ServiceLoader.load(Reloadable.class).forEach(each -> {
+            ElasticJobServiceLoader.newTypedServiceInstance(Reloadable.class, each.getType(), new Properties())
+                    .ifPresent(reloadable -> reloadableItems.put(reloadable.getType(), reloadable));
+        });
+        initReloadable(jobConfig);
+    }
+    
+    private void initReloadable(final JobConfiguration jobConfig) {
+        reloadableItems.values().stream().filter(each -> each instanceof ReloadablePostProcessor).forEach(each -> ((ReloadablePostProcessor) each).init(jobConfig));
+    }
+    
+    /**
+     * Reload all reloadable item if necessary.
+     *
+     * @param jobConfiguration job configuration
+     */
+    public void reloadIfNecessary(final JobConfiguration jobConfiguration) {
+        reloadableItems.values().forEach(each -> each.reloadIfNecessary(jobConfiguration));
+    }
+    
+    /**
+     * Get instance.
+     *
+     * @param targetClass target class
+     * @param <T>         target type
+     * @return instance
+     */
+    @SuppressWarnings("unchecked")
+    public <T> T get(final Class<T> targetClass) {
+        return (T) reloadableItems.get(targetClass.getName()).getInstance();
+    }
+    
+    /**
+     * Shutdown all closeable instances.
+     */
+    public void shutdown() {
+        for (Reloadable<?> each : reloadableItems.values()) {
+            try {
+                each.close();
+            } catch (final IOException ignored) {
+            }
+        }
+    }
+}
diff --git a/elasticjob-ecosystem/elasticjob-executor/elasticjob-executor-kernel/src/test/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutorTest.java b/elasticjob-ecosystem/elasticjob-executor/elasticjob-executor-kernel/src/test/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutorTest.java
index b328321..46082ab 100644
--- a/elasticjob-ecosystem/elasticjob-executor/elasticjob-executor-kernel/src/test/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutorTest.java
+++ b/elasticjob-ecosystem/elasticjob-executor/elasticjob-executor-kernel/src/test/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutorTest.java
@@ -37,6 +37,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.times;
@@ -62,6 +63,7 @@ public final class ElasticJobExecutorTest {
     @Before
     public void setUp() {
         jobConfig = createJobConfiguration();
+        when(jobFacade.loadJobConfiguration(anyBoolean())).thenReturn(jobConfig);
         elasticJobExecutor = new ElasticJobExecutor(fooJob, jobConfig, jobFacade);
         setJobItemExecutor();
     }
diff --git a/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/concurrent/ExecutorServiceReloadable.java b/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/concurrent/ExecutorServiceReloadable.java
new file mode 100644
index 0000000..2f411c4
--- /dev/null
+++ b/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/concurrent/ExecutorServiceReloadable.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.infra.concurrent;
+
+import com.google.common.base.Strings;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
+import org.apache.shardingsphere.elasticjob.infra.context.Reloadable;
+import org.apache.shardingsphere.elasticjob.infra.context.ReloadablePostProcessor;
+import org.apache.shardingsphere.elasticjob.infra.handler.threadpool.JobExecutorServiceHandlerFactory;
+
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Executor service reloadable.
+ */
+@Slf4j
+public final class ExecutorServiceReloadable implements Reloadable<ExecutorService>, ReloadablePostProcessor {
+    
+    private String jobExecutorServiceHandlerType;
+    
+    private ExecutorService executorService;
+    
+    @Override
+    public void init(final JobConfiguration jobConfig) {
+        jobExecutorServiceHandlerType = Strings.isNullOrEmpty(jobConfig.getJobExecutorServiceHandlerType())
+                ? JobExecutorServiceHandlerFactory.DEFAULT_HANDLER : jobConfig.getJobExecutorServiceHandlerType();
+        executorService = JobExecutorServiceHandlerFactory.getHandler(jobExecutorServiceHandlerType).createExecutorService(jobConfig.getJobName());
+    }
+    
+    @Override
+    public synchronized void reloadIfNecessary(final JobConfiguration jobConfig) {
+        String newJobExecutorServiceHandlerType = Strings.isNullOrEmpty(jobConfig.getJobExecutorServiceHandlerType())
+                ? JobExecutorServiceHandlerFactory.DEFAULT_HANDLER : jobConfig.getJobExecutorServiceHandlerType();
+        if (newJobExecutorServiceHandlerType.equals(jobExecutorServiceHandlerType)) {
+            return;
+        }
+        log.debug("JobExecutorServiceHandler reload occurred in the job '{}'. Change from '{}' to '{}'.", jobConfig.getJobName(), jobExecutorServiceHandlerType, newJobExecutorServiceHandlerType);
+        reload(newJobExecutorServiceHandlerType, jobConfig.getJobName());
+    }
+    
+    private void reload(final String jobExecutorServiceHandlerType, final String jobName) {
+        executorService.shutdown();
+        this.jobExecutorServiceHandlerType = jobExecutorServiceHandlerType;
+        executorService = JobExecutorServiceHandlerFactory.getHandler(jobExecutorServiceHandlerType).createExecutorService(jobName);
+    }
+    
+    @Override
+    public ExecutorService getInstance() {
+        return executorService;
+    }
+    
+    @Override
+    public void close() {
+        Optional.ofNullable(executorService).ifPresent(ExecutorService::shutdown);
+    }
+    
+    @Override
+    public String getType() {
+        return ExecutorService.class.getName();
+    }
+}
diff --git a/elasticjob-ecosystem/elasticjob-error-handler/elasticjob-error-handler-spi/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandler.java b/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/context/Reloadable.java
similarity index 63%
copy from elasticjob-ecosystem/elasticjob-error-handler/elasticjob-error-handler-spi/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandler.java
copy to elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/context/Reloadable.java
index fe87a19..22e9015 100644
--- a/elasticjob-ecosystem/elasticjob-error-handler/elasticjob-error-handler-spi/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandler.java
+++ b/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/context/Reloadable.java
@@ -7,7 +7,7 @@
  * the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,21 +15,31 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.elasticjob.error.handler;
+package org.apache.shardingsphere.elasticjob.infra.context;
 
-import org.apache.shardingsphere.elasticjob.infra.spi.SPIPostProcessor;
+import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
 import org.apache.shardingsphere.elasticjob.infra.spi.TypedSPI;
 
+import java.io.Closeable;
+
 /**
- * Job error handler.
+ * Reloadable.
+ *
+ * @param <T> reload target
  */
-public interface JobErrorHandler extends TypedSPI, SPIPostProcessor {
+public interface Reloadable<T> extends TypedSPI, Closeable {
+    
+    /**
+     * Reload if necessary.
+     *
+     * @param jobConfiguration job configuration
+     */
+    void reloadIfNecessary(JobConfiguration jobConfiguration);
     
     /**
-     * Handle exception.
-     * 
-     * @param jobName job name
-     * @param cause failure cause
+     * Get target instance.
+     *
+     * @return instance
      */
-    void handleException(String jobName, Throwable cause);
+    T getInstance();
 }
diff --git a/elasticjob-ecosystem/elasticjob-error-handler/elasticjob-error-handler-spi/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandler.java b/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/context/ReloadablePostProcessor.java
similarity index 64%
copy from elasticjob-ecosystem/elasticjob-error-handler/elasticjob-error-handler-spi/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandler.java
copy to elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/context/ReloadablePostProcessor.java
index fe87a19..8dc86d2 100644
--- a/elasticjob-ecosystem/elasticjob-error-handler/elasticjob-error-handler-spi/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandler.java
+++ b/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/context/ReloadablePostProcessor.java
@@ -7,7 +7,7 @@
  * the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,21 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.elasticjob.error.handler;
+package org.apache.shardingsphere.elasticjob.infra.context;
 
-import org.apache.shardingsphere.elasticjob.infra.spi.SPIPostProcessor;
-import org.apache.shardingsphere.elasticjob.infra.spi.TypedSPI;
+import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
 
 /**
- * Job error handler.
+ * Reloadable post processor.
  */
-public interface JobErrorHandler extends TypedSPI, SPIPostProcessor {
+public interface ReloadablePostProcessor {
     
     /**
-     * Handle exception.
-     * 
-     * @param jobName job name
-     * @param cause failure cause
+     * Initialize reloadable.
+     *
+     * @param jobConfig job configuration
      */
-    void handleException(String jobName, Throwable cause);
+    void init(JobConfiguration jobConfig);
 }
diff --git a/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/handler/threadpool/JobExecutorServiceHandlerFactory.java b/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/handler/threadpool/JobExecutorServiceHandlerFactory.java
index 6987a76..1cc1494 100644
--- a/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/handler/threadpool/JobExecutorServiceHandlerFactory.java
+++ b/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/handler/threadpool/JobExecutorServiceHandlerFactory.java
@@ -29,7 +29,7 @@ import org.apache.shardingsphere.elasticjob.infra.spi.ElasticJobServiceLoader;
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
 public final class JobExecutorServiceHandlerFactory {
     
-    private static final String DEFAULT_HANDLER = "CPU";
+    public static final String DEFAULT_HANDLER = "CPU";
     
     static {
         ElasticJobServiceLoader.registerTypedService(JobExecutorServiceHandler.class);
diff --git a/elasticjob-infra/elasticjob-infra-common/src/main/resources/META-INF/services/org.apache.shardingsphere.elasticjob.infra.context.Reloadable b/elasticjob-infra/elasticjob-infra-common/src/main/resources/META-INF/services/org.apache.shardingsphere.elasticjob.infra.context.Reloadable
new file mode 100644
index 0000000..14d1efd
--- /dev/null
+++ b/elasticjob-infra/elasticjob-infra-common/src/main/resources/META-INF/services/org.apache.shardingsphere.elasticjob.infra.context.Reloadable
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.elasticjob.infra.concurrent.ExecutorServiceReloadable
diff --git a/elasticjob-infra/elasticjob-infra-common/src/test/java/org/apache/shardingsphere/elasticjob/infra/concurrent/ExecutorServiceReloadableTest.java b/elasticjob-infra/elasticjob-infra-common/src/test/java/org/apache/shardingsphere/elasticjob/infra/concurrent/ExecutorServiceReloadableTest.java
new file mode 100644
index 0000000..3a64c83
--- /dev/null
+++ b/elasticjob-infra/elasticjob-infra-common/src/test/java/org/apache/shardingsphere/elasticjob/infra/concurrent/ExecutorServiceReloadableTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.infra.concurrent;
+
+import lombok.SneakyThrows;
+import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.lang.reflect.Field;
+import java.util.concurrent.ExecutorService;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.verify;
+
+@RunWith(MockitoJUnitRunner.class)
+public final class ExecutorServiceReloadableTest {
+    
+    @Mock
+    private ExecutorService mockExecutorService;
+    
+    @Test
+    public void assertInitialize() {
+        ExecutorServiceReloadable executorServiceReloadable = new ExecutorServiceReloadable();
+        String jobExecutorServiceHandlerType = "SINGLE_THREAD";
+        JobConfiguration jobConfig = JobConfiguration.newBuilder("job", 1).jobExecutorServiceHandlerType(jobExecutorServiceHandlerType).build();
+        assertNull(executorServiceReloadable.getInstance());
+        executorServiceReloadable.init(jobConfig);
+        ExecutorService actual = executorServiceReloadable.getInstance();
+        assertNotNull(actual);
+        assertFalse(actual.isShutdown());
+        assertFalse(actual.isTerminated());
+        actual.shutdown();
+    }
+    
+    @Test
+    public void assertReload() {
+        ExecutorServiceReloadable executorServiceReloadable = new ExecutorServiceReloadable();
+        setField(executorServiceReloadable, "jobExecutorServiceHandlerType", "mock");
+        setField(executorServiceReloadable, "executorService", mockExecutorService);
+        JobConfiguration jobConfig = JobConfiguration.newBuilder("job", 1).build();
+        executorServiceReloadable.reloadIfNecessary(jobConfig);
+        verify(mockExecutorService).shutdown();
+        ExecutorService actual = executorServiceReloadable.getInstance();
+        assertFalse(actual.isShutdown());
+        assertFalse(actual.isTerminated());
+        actual.shutdown();
+    }
+    
+    @Test
+    public void assertUnnecessaryToReload() {
+        ExecutorServiceReloadable executorServiceReloadable = new ExecutorServiceReloadable();
+        JobConfiguration jobConfig = JobConfiguration.newBuilder("job", 1).build();
+        executorServiceReloadable.init(jobConfig);
+        ExecutorService expected = executorServiceReloadable.getInstance();
+        executorServiceReloadable.reloadIfNecessary(jobConfig);
+        ExecutorService actual = executorServiceReloadable.getInstance();
+        assertThat(actual, is(expected));
+        actual.shutdown();
+    }
+    
+    @Test
+    public void assertShutdown() {
+        ExecutorServiceReloadable executorServiceReloadable = new ExecutorServiceReloadable();
+        setField(executorServiceReloadable, "executorService", mockExecutorService);
+        executorServiceReloadable.close();
+        verify(mockExecutorService).shutdown();
+    }
+    
+    @SneakyThrows
+    private void setField(final Object target, final String fieldName, final Object value) {
+        Field field = target.getClass().getDeclaredField(fieldName);
+        boolean originAccessible = field.isAccessible();
+        if (!originAccessible) {
+            field.setAccessible(true);
+        }
+        field.set(target, value);
+        field.setAccessible(originAccessible);
+    }
+}