You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/11/28 04:24:22 UTC
[shardingsphere-elasticjob] branch master updated: Extract all
reloadable objects from ElasticJobExecutor (#1735)
This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere-elasticjob.git
The following commit(s) were added to refs/heads/master by this push:
new edaef2d Extract all reloadable objects from ElasticJobExecutor (#1735)
edaef2d is described below
commit edaef2d69b50bc339e9dfea17a69616b47f7ebb2
Author: 吴伟杰 <ro...@me.com>
AuthorDate: Sat Nov 28 12:24:10 2020 +0800
Extract all reloadable objects from ElasticJobExecutor (#1735)
* Extract all reloadable objects from ElasticJobExecutor
* Add debug log in implementations of Reloadable
* Add testcases for implementations of Reloadable
* Fix Checkstyle
* Extract init method of Reloadable into ReloadablePostProcessor
---
.../elasticjob/error/handler/JobErrorHandler.java | 8 +-
.../error/handler/JobErrorHandlerFactory.java | 2 +-
.../error/handler/JobErrorHandlerReloadable.java | 82 ++++++++++++++++
...rdingsphere.elasticjob.infra.context.Reloadable | 18 ++++
.../handler/JobErrorHandlerReloadableTest.java | 104 +++++++++++++++++++++
.../elasticjob/executor/ElasticJobExecutor.java | 38 ++++----
.../executor/context/ExecutorContext.java | 89 ++++++++++++++++++
.../executor/ElasticJobExecutorTest.java | 2 +
.../concurrent/ExecutorServiceReloadable.java | 78 ++++++++++++++++
.../elasticjob/infra/context/Reloadable.java | 30 ++++--
.../infra/context/ReloadablePostProcessor.java | 20 ++--
.../JobExecutorServiceHandlerFactory.java | 2 +-
...rdingsphere.elasticjob.infra.context.Reloadable | 18 ++++
.../concurrent/ExecutorServiceReloadableTest.java | 101 ++++++++++++++++++++
14 files changed, 547 insertions(+), 45 deletions(-)
diff --git a/elasticjob-ecosystem/elasticjob-error-handler/elasticjob-error-handler-spi/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandler.java b/elasticjob-ecosystem/elasticjob-error-handler/elasticjob-error-handler-spi/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandler.java
index fe87a19..95f85d9 100644
--- a/elasticjob-ecosystem/elasticjob-error-handler/elasticjob-error-handler-spi/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandler.java
+++ b/elasticjob-ecosystem/elasticjob-error-handler/elasticjob-error-handler-spi/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandler.java
@@ -20,10 +20,12 @@ package org.apache.shardingsphere.elasticjob.error.handler;
import org.apache.shardingsphere.elasticjob.infra.spi.SPIPostProcessor;
import org.apache.shardingsphere.elasticjob.infra.spi.TypedSPI;
+import java.io.Closeable;
+
/**
* Job error handler.
*/
-public interface JobErrorHandler extends TypedSPI, SPIPostProcessor {
+public interface JobErrorHandler extends TypedSPI, SPIPostProcessor, Closeable {
/**
* Handle exception.
@@ -32,4 +34,8 @@ public interface JobErrorHandler extends TypedSPI, SPIPostProcessor {
* @param cause failure cause
*/
void handleException(String jobName, Throwable cause);
+
+ @Override
+ default void close() {
+ }
}
diff --git a/elasticjob-ecosystem/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandlerFactory.java b/elasticjob-ecosystem/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandlerFactory.java
index 7b8951f..363573c 100644
--- a/elasticjob-ecosystem/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandlerFactory.java
+++ b/elasticjob-ecosystem/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandlerFactory.java
@@ -31,7 +31,7 @@ import java.util.Properties;
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class JobErrorHandlerFactory {
- private static final String DEFAULT_HANDLER = "LOG";
+ public static final String DEFAULT_HANDLER = "LOG";
static {
ElasticJobServiceLoader.registerTypedService(JobErrorHandler.class);
diff --git a/elasticjob-ecosystem/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandlerReloadable.java b/elasticjob-ecosystem/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandlerReloadable.java
new file mode 100644
index 0000000..1256561
--- /dev/null
+++ b/elasticjob-ecosystem/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandlerReloadable.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.error.handler;
+
+import com.google.common.base.Strings;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
+import org.apache.shardingsphere.elasticjob.infra.context.Reloadable;
+import org.apache.shardingsphere.elasticjob.infra.context.ReloadablePostProcessor;
+import org.apache.shardingsphere.elasticjob.infra.exception.JobConfigurationException;
+
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * JobErrorHandler reloadable.
+ */
+@Slf4j
+public final class JobErrorHandlerReloadable implements Reloadable<JobErrorHandler>, ReloadablePostProcessor {
+
+ private String jobErrorHandlerType;
+
+ private Properties props;
+
+ private JobErrorHandler jobErrorHandler;
+
+ @Override
+ public void init(final JobConfiguration jobConfig) {
+ jobErrorHandlerType = Strings.isNullOrEmpty(jobConfig.getJobErrorHandlerType()) ? JobErrorHandlerFactory.DEFAULT_HANDLER : jobConfig.getJobErrorHandlerType();
+ props = (Properties) jobConfig.getProps().clone();
+ jobErrorHandler = JobErrorHandlerFactory.createHandler(jobErrorHandlerType, props)
+ .orElseThrow(() -> new JobConfigurationException("Cannot find job error handler type '%s'.", jobErrorHandlerType));
+ }
+
+ @Override
+ public synchronized void reloadIfNecessary(final JobConfiguration jobConfig) {
+ String newJobErrorHandlerType = Strings.isNullOrEmpty(jobConfig.getJobErrorHandlerType()) ? JobErrorHandlerFactory.DEFAULT_HANDLER : jobConfig.getJobErrorHandlerType();
+ if (newJobErrorHandlerType.equals(jobErrorHandlerType) && props.equals(jobConfig.getProps())) {
+ return;
+ }
+ log.debug("JobErrorHandler reload occurred in the job '{}'. Change from '{}' to '{}'.", jobConfig.getJobName(), jobErrorHandlerType, newJobErrorHandlerType);
+ reload(newJobErrorHandlerType, jobConfig.getProps());
+ }
+
+ private void reload(final String jobErrorHandlerType, final Properties props) {
+ jobErrorHandler.close();
+ this.jobErrorHandlerType = jobErrorHandlerType;
+ this.props = (Properties) props.clone();
+ jobErrorHandler = JobErrorHandlerFactory.createHandler(jobErrorHandlerType, props)
+ .orElseThrow(() -> new JobConfigurationException("Cannot find job error handler type '%s'.", jobErrorHandlerType));
+ }
+
+ @Override
+ public JobErrorHandler getInstance() {
+ return jobErrorHandler;
+ }
+
+ @Override
+ public String getType() {
+ return JobErrorHandler.class.getName();
+ }
+
+ @Override
+ public void close() {
+ Optional.ofNullable(jobErrorHandler).ifPresent(JobErrorHandler::close);
+ }
+}
diff --git a/elasticjob-ecosystem/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/main/resources/META-INF/services/org.apache.shardingsphere.elasticjob.infra.context.Reloadable b/elasticjob-ecosystem/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/main/resources/META-INF/services/org.apache.shardingsphere.elasticjob.infra.context.Reloadable
new file mode 100644
index 0000000..0a00a1d
--- /dev/null
+++ b/elasticjob-ecosystem/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/main/resources/META-INF/services/org.apache.shardingsphere.elasticjob.infra.context.Reloadable
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.elasticjob.error.handler.JobErrorHandlerReloadable
diff --git a/elasticjob-ecosystem/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandlerReloadableTest.java b/elasticjob-ecosystem/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandlerReloadableTest.java
new file mode 100644
index 0000000..48a7559
--- /dev/null
+++ b/elasticjob-ecosystem/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandlerReloadableTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.error.handler;
+
+import lombok.SneakyThrows;
+import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
+import org.apache.shardingsphere.elasticjob.error.handler.general.IgnoreJobErrorHandler;
+import org.apache.shardingsphere.elasticjob.error.handler.general.LogJobErrorHandler;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.lang.reflect.Field;
+import java.util.Properties;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.verify;
+
+@RunWith(MockitoJUnitRunner.class)
+public final class JobErrorHandlerReloadableTest {
+
+ @Mock
+ private JobErrorHandler mockJobErrorHandler;
+
+ @Test
+ public void assertInitialize() {
+ JobErrorHandlerReloadable jobErrorHandlerReloadable = new JobErrorHandlerReloadable();
+ String jobErrorHandlerType = "IGNORE";
+ JobConfiguration jobConfig = JobConfiguration.newBuilder("job", 1).jobErrorHandlerType(jobErrorHandlerType).build();
+ assertNull(jobErrorHandlerReloadable.getInstance());
+ jobErrorHandlerReloadable.init(jobConfig);
+ JobErrorHandler actual = jobErrorHandlerReloadable.getInstance();
+ assertNotNull(actual);
+ assertThat(actual.getType(), equalTo(jobErrorHandlerType));
+ assertTrue(actual instanceof IgnoreJobErrorHandler);
+ }
+
+ @Test
+ public void assertReload() {
+ JobErrorHandlerReloadable jobErrorHandlerReloadable = new JobErrorHandlerReloadable();
+ setField(jobErrorHandlerReloadable, "jobErrorHandler", mockJobErrorHandler);
+ setField(jobErrorHandlerReloadable, "jobErrorHandlerType", "mock");
+ setField(jobErrorHandlerReloadable, "props", new Properties());
+ String newJobErrorHandlerType = "LOG";
+ JobConfiguration newJobConfig = JobConfiguration.newBuilder("job", 1).jobErrorHandlerType(newJobErrorHandlerType).build();
+ jobErrorHandlerReloadable.reloadIfNecessary(newJobConfig);
+ verify(mockJobErrorHandler).close();
+ JobErrorHandler actual = jobErrorHandlerReloadable.getInstance();
+ assertThat(actual.getType(), equalTo(newJobErrorHandlerType));
+ assertTrue(actual instanceof LogJobErrorHandler);
+ }
+
+ @Test
+ public void assertUnnecessaryToReload() {
+ JobErrorHandlerReloadable jobErrorHandlerReloadable = new JobErrorHandlerReloadable();
+ String jobErrorHandlerType = "IGNORE";
+ JobConfiguration jobConfig = JobConfiguration.newBuilder("job", 1).jobErrorHandlerType(jobErrorHandlerType).build();
+ jobErrorHandlerReloadable.init(jobConfig);
+ JobErrorHandler expected = jobErrorHandlerReloadable.getInstance();
+ jobErrorHandlerReloadable.reloadIfNecessary(jobConfig);
+ JobErrorHandler actual = jobErrorHandlerReloadable.getInstance();
+ assertThat(actual, is(expected));
+ }
+
+ @Test
+ public void assertShutdown() {
+ JobErrorHandlerReloadable jobErrorHandlerReloadable = new JobErrorHandlerReloadable();
+ setField(jobErrorHandlerReloadable, "jobErrorHandler", mockJobErrorHandler);
+ jobErrorHandlerReloadable.close();
+ verify(mockJobErrorHandler).close();
+ }
+
+ @SneakyThrows
+ private void setField(final Object target, final String fieldName, final Object value) {
+ Field field = target.getClass().getDeclaredField(fieldName);
+ boolean originAccessible = field.isAccessible();
+ if (!originAccessible) {
+ field.setAccessible(true);
+ }
+ field.set(target, value);
+ field.setAccessible(originAccessible);
+ }
+}
diff --git a/elasticjob-ecosystem/elasticjob-executor/elasticjob-executor-kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutor.java b/elasticjob-ecosystem/elasticjob-executor/elasticjob-executor-kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutor.java
index 448213f..1b54eae 100644
--- a/elasticjob-ecosystem/elasticjob-executor/elasticjob-executor-kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutor.java
+++ b/elasticjob-ecosystem/elasticjob-executor/elasticjob-executor-kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutor.java
@@ -21,14 +21,12 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.elasticjob.api.ElasticJob;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.error.handler.JobErrorHandler;
-import org.apache.shardingsphere.elasticjob.error.handler.JobErrorHandlerFactory;
+import org.apache.shardingsphere.elasticjob.executor.context.ExecutorContext;
import org.apache.shardingsphere.elasticjob.executor.item.JobItemExecutor;
import org.apache.shardingsphere.elasticjob.executor.item.JobItemExecutorFactory;
import org.apache.shardingsphere.elasticjob.infra.env.IpUtils;
import org.apache.shardingsphere.elasticjob.infra.exception.ExceptionUtils;
-import org.apache.shardingsphere.elasticjob.infra.exception.JobConfigurationException;
import org.apache.shardingsphere.elasticjob.infra.exception.JobExecutionEnvironmentException;
-import org.apache.shardingsphere.elasticjob.infra.handler.threadpool.JobExecutorServiceHandlerFactory;
import org.apache.shardingsphere.elasticjob.infra.listener.ShardingContexts;
import org.apache.shardingsphere.elasticjob.tracing.event.JobExecutionEvent;
import org.apache.shardingsphere.elasticjob.tracing.event.JobExecutionEvent.ExecutionSource;
@@ -48,15 +46,11 @@ public final class ElasticJobExecutor {
private final ElasticJob elasticJob;
- private final JobConfiguration jobConfig;
-
private final JobFacade jobFacade;
private final JobItemExecutor jobItemExecutor;
- private final ExecutorService executorService;
-
- private final JobErrorHandler jobErrorHandler;
+ private final ExecutorContext executorContext;
private final Map<Integer, String> itemErrorMessages;
@@ -70,12 +64,9 @@ public final class ElasticJobExecutor {
private ElasticJobExecutor(final ElasticJob elasticJob, final JobConfiguration jobConfig, final JobFacade jobFacade, final JobItemExecutor jobItemExecutor) {
this.elasticJob = elasticJob;
- this.jobConfig = jobConfig;
this.jobFacade = jobFacade;
this.jobItemExecutor = jobItemExecutor;
- executorService = JobExecutorServiceHandlerFactory.getHandler(jobConfig.getJobExecutorServiceHandlerType()).createExecutorService(jobConfig.getJobName());
- jobErrorHandler = JobErrorHandlerFactory.createHandler(jobConfig.getJobErrorHandlerType(), jobConfig.getProps())
- .orElseThrow(() -> new JobConfigurationException("Can not find job error handler type '%s'.", jobConfig.getJobErrorHandlerType()));
+ executorContext = new ExecutorContext(jobFacade.loadJobConfiguration(true));
itemErrorMessages = new ConcurrentHashMap<>(jobConfig.getShardingTotalCount(), 1);
}
@@ -83,6 +74,9 @@ public final class ElasticJobExecutor {
* Execute job.
*/
public void execute() {
+ JobConfiguration jobConfig = jobFacade.loadJobConfiguration(true);
+ executorContext.reloadIfNecessary(jobConfig);
+ JobErrorHandler jobErrorHandler = executorContext.get(JobErrorHandler.class);
try {
jobFacade.checkJobExecutionEnvironment();
} catch (final JobExecutionEnvironmentException cause) {
@@ -103,10 +97,10 @@ public final class ElasticJobExecutor {
//CHECKSTYLE:ON
jobErrorHandler.handleException(jobConfig.getJobName(), cause);
}
- execute(shardingContexts, ExecutionSource.NORMAL_TRIGGER);
+ execute(jobConfig, shardingContexts, ExecutionSource.NORMAL_TRIGGER);
while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
- execute(shardingContexts, ExecutionSource.MISFIRE);
+ execute(jobConfig, shardingContexts, ExecutionSource.MISFIRE);
}
jobFacade.failoverIfNecessary();
try {
@@ -118,7 +112,7 @@ public final class ElasticJobExecutor {
}
}
- private void execute(final ShardingContexts shardingContexts, final ExecutionSource executionSource) {
+ private void execute(final JobConfiguration jobConfig, final ShardingContexts shardingContexts, final ExecutionSource executionSource) {
if (shardingContexts.getShardingItemParameters().isEmpty()) {
jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format("Sharding item for job '%s' is empty.", jobConfig.getJobName()));
return;
@@ -127,7 +121,7 @@ public final class ElasticJobExecutor {
String taskId = shardingContexts.getTaskId();
jobFacade.postJobStatusTraceEvent(taskId, State.TASK_RUNNING, "");
try {
- process(shardingContexts, executionSource);
+ process(jobConfig, shardingContexts, executionSource);
} finally {
// TODO Consider increasing the status of job failure, and how to handle the overall loop of job failure
jobFacade.registerJobCompleted(shardingContexts);
@@ -139,23 +133,24 @@ public final class ElasticJobExecutor {
}
}
- private void process(final ShardingContexts shardingContexts, final ExecutionSource executionSource) {
+ private void process(final JobConfiguration jobConfig, final ShardingContexts shardingContexts, final ExecutionSource executionSource) {
Collection<Integer> items = shardingContexts.getShardingItemParameters().keySet();
if (1 == items.size()) {
int item = shardingContexts.getShardingItemParameters().keySet().iterator().next();
JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(IpUtils.getHostName(), IpUtils.getIp(), shardingContexts.getTaskId(), jobConfig.getJobName(), executionSource, item);
- process(shardingContexts, item, jobExecutionEvent);
+ process(jobConfig, shardingContexts, item, jobExecutionEvent);
return;
}
CountDownLatch latch = new CountDownLatch(items.size());
for (int each : items) {
JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(IpUtils.getHostName(), IpUtils.getIp(), shardingContexts.getTaskId(), jobConfig.getJobName(), executionSource, each);
+ ExecutorService executorService = executorContext.get(ExecutorService.class);
if (executorService.isShutdown()) {
return;
}
executorService.submit(() -> {
try {
- process(shardingContexts, each, jobExecutionEvent);
+ process(jobConfig, shardingContexts, each, jobExecutionEvent);
} finally {
latch.countDown();
}
@@ -169,7 +164,7 @@ public final class ElasticJobExecutor {
}
@SuppressWarnings("unchecked")
- private void process(final ShardingContexts shardingContexts, final int item, final JobExecutionEvent startEvent) {
+ private void process(final JobConfiguration jobConfig, final ShardingContexts shardingContexts, final int item, final JobExecutionEvent startEvent) {
jobFacade.postJobExecutionEvent(startEvent);
log.trace("Job '{}' executing, item is: '{}'.", jobConfig.getJobName(), item);
JobExecutionEvent completeEvent;
@@ -184,6 +179,7 @@ public final class ElasticJobExecutor {
completeEvent = startEvent.executionFailure(ExceptionUtils.transform(cause));
jobFacade.postJobExecutionEvent(completeEvent);
itemErrorMessages.put(item, ExceptionUtils.transform(cause));
+ JobErrorHandler jobErrorHandler = executorContext.get(JobErrorHandler.class);
jobErrorHandler.handleException(jobConfig.getJobName(), cause);
}
}
@@ -192,6 +188,6 @@ public final class ElasticJobExecutor {
* Shutdown executor.
*/
public void shutdown() {
- executorService.shutdown();
+ executorContext.shutdown();
}
}
diff --git a/elasticjob-ecosystem/elasticjob-executor/elasticjob-executor-kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/context/ExecutorContext.java b/elasticjob-ecosystem/elasticjob-executor/elasticjob-executor-kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/context/ExecutorContext.java
new file mode 100644
index 0000000..38786c6
--- /dev/null
+++ b/elasticjob-ecosystem/elasticjob-executor/elasticjob-executor-kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/context/ExecutorContext.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.executor.context;
+
+import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
+import org.apache.shardingsphere.elasticjob.infra.context.Reloadable;
+import org.apache.shardingsphere.elasticjob.infra.context.ReloadablePostProcessor;
+import org.apache.shardingsphere.elasticjob.infra.spi.ElasticJobServiceLoader;
+
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.ServiceLoader;
+
+/**
+ * Executor context.
+ *
+ * @see org.apache.shardingsphere.elasticjob.error.handler.JobErrorHandlerReloadable
+ * @see org.apache.shardingsphere.elasticjob.infra.concurrent.ExecutorServiceReloadable
+ */
+public final class ExecutorContext {
+
+ static {
+ ElasticJobServiceLoader.registerTypedService(Reloadable.class);
+ }
+
+ private final Map<String, Reloadable<?>> reloadableItems = new LinkedHashMap<>();
+
+ public ExecutorContext(final JobConfiguration jobConfig) {
+ ServiceLoader.load(Reloadable.class).forEach(each -> {
+ ElasticJobServiceLoader.newTypedServiceInstance(Reloadable.class, each.getType(), new Properties())
+ .ifPresent(reloadable -> reloadableItems.put(reloadable.getType(), reloadable));
+ });
+ initReloadable(jobConfig);
+ }
+
+ private void initReloadable(final JobConfiguration jobConfig) {
+ reloadableItems.values().stream().filter(each -> each instanceof ReloadablePostProcessor).forEach(each -> ((ReloadablePostProcessor) each).init(jobConfig));
+ }
+
+ /**
+ * Reload all reloadable item if necessary.
+ *
+ * @param jobConfiguration job configuration
+ */
+ public void reloadIfNecessary(final JobConfiguration jobConfiguration) {
+ reloadableItems.values().forEach(each -> each.reloadIfNecessary(jobConfiguration));
+ }
+
+ /**
+ * Get instance.
+ *
+ * @param targetClass target class
+ * @param <T> target type
+ * @return instance
+ */
+ @SuppressWarnings("unchecked")
+ public <T> T get(final Class<T> targetClass) {
+ return (T) reloadableItems.get(targetClass.getName()).getInstance();
+ }
+
+ /**
+ * Shutdown all closeable instances.
+ */
+ public void shutdown() {
+ for (Reloadable<?> each : reloadableItems.values()) {
+ try {
+ each.close();
+ } catch (final IOException ignored) {
+ }
+ }
+ }
+}
diff --git a/elasticjob-ecosystem/elasticjob-executor/elasticjob-executor-kernel/src/test/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutorTest.java b/elasticjob-ecosystem/elasticjob-executor/elasticjob-executor-kernel/src/test/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutorTest.java
index b328321..46082ab 100644
--- a/elasticjob-ecosystem/elasticjob-executor/elasticjob-executor-kernel/src/test/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutorTest.java
+++ b/elasticjob-ecosystem/elasticjob-executor/elasticjob-executor-kernel/src/test/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutorTest.java
@@ -37,6 +37,7 @@ import java.util.HashMap;
import java.util.Map;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.times;
@@ -62,6 +63,7 @@ public final class ElasticJobExecutorTest {
@Before
public void setUp() {
jobConfig = createJobConfiguration();
+ when(jobFacade.loadJobConfiguration(anyBoolean())).thenReturn(jobConfig);
elasticJobExecutor = new ElasticJobExecutor(fooJob, jobConfig, jobFacade);
setJobItemExecutor();
}
diff --git a/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/concurrent/ExecutorServiceReloadable.java b/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/concurrent/ExecutorServiceReloadable.java
new file mode 100644
index 0000000..2f411c4
--- /dev/null
+++ b/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/concurrent/ExecutorServiceReloadable.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.infra.concurrent;
+
+import com.google.common.base.Strings;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
+import org.apache.shardingsphere.elasticjob.infra.context.Reloadable;
+import org.apache.shardingsphere.elasticjob.infra.context.ReloadablePostProcessor;
+import org.apache.shardingsphere.elasticjob.infra.handler.threadpool.JobExecutorServiceHandlerFactory;
+
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Executor service reloadable.
+ */
+@Slf4j
+public final class ExecutorServiceReloadable implements Reloadable<ExecutorService>, ReloadablePostProcessor {
+
+ private String jobExecutorServiceHandlerType;
+
+ private ExecutorService executorService;
+
+ @Override
+ public void init(final JobConfiguration jobConfig) {
+ jobExecutorServiceHandlerType = Strings.isNullOrEmpty(jobConfig.getJobExecutorServiceHandlerType())
+ ? JobExecutorServiceHandlerFactory.DEFAULT_HANDLER : jobConfig.getJobExecutorServiceHandlerType();
+ executorService = JobExecutorServiceHandlerFactory.getHandler(jobExecutorServiceHandlerType).createExecutorService(jobConfig.getJobName());
+ }
+
+ @Override
+ public synchronized void reloadIfNecessary(final JobConfiguration jobConfig) {
+ String newJobExecutorServiceHandlerType = Strings.isNullOrEmpty(jobConfig.getJobExecutorServiceHandlerType())
+ ? JobExecutorServiceHandlerFactory.DEFAULT_HANDLER : jobConfig.getJobExecutorServiceHandlerType();
+ if (newJobExecutorServiceHandlerType.equals(jobExecutorServiceHandlerType)) {
+ return;
+ }
+ log.debug("JobExecutorServiceHandler reload occurred in the job '{}'. Change from '{}' to '{}'.", jobConfig.getJobName(), jobExecutorServiceHandlerType, newJobExecutorServiceHandlerType);
+ reload(newJobExecutorServiceHandlerType, jobConfig.getJobName());
+ }
+
+ private void reload(final String jobExecutorServiceHandlerType, final String jobName) {
+ executorService.shutdown();
+ this.jobExecutorServiceHandlerType = jobExecutorServiceHandlerType;
+ executorService = JobExecutorServiceHandlerFactory.getHandler(jobExecutorServiceHandlerType).createExecutorService(jobName);
+ }
+
+ @Override
+ public ExecutorService getInstance() {
+ return executorService;
+ }
+
+ @Override
+ public void close() {
+ Optional.ofNullable(executorService).ifPresent(ExecutorService::shutdown);
+ }
+
+ @Override
+ public String getType() {
+ return ExecutorService.class.getName();
+ }
+}
diff --git a/elasticjob-ecosystem/elasticjob-error-handler/elasticjob-error-handler-spi/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandler.java b/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/context/Reloadable.java
similarity index 63%
copy from elasticjob-ecosystem/elasticjob-error-handler/elasticjob-error-handler-spi/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandler.java
copy to elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/context/Reloadable.java
index fe87a19..22e9015 100644
--- a/elasticjob-ecosystem/elasticjob-error-handler/elasticjob-error-handler-spi/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandler.java
+++ b/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/context/Reloadable.java
@@ -7,7 +7,7 @@
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,21 +15,31 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.elasticjob.error.handler;
+package org.apache.shardingsphere.elasticjob.infra.context;
-import org.apache.shardingsphere.elasticjob.infra.spi.SPIPostProcessor;
+import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.infra.spi.TypedSPI;
+import java.io.Closeable;
+
/**
- * Job error handler.
+ * Reloadable.
+ *
+ * @param <T> reload target
*/
-public interface JobErrorHandler extends TypedSPI, SPIPostProcessor {
+public interface Reloadable<T> extends TypedSPI, Closeable {
+
+ /**
+ * Reload if necessary.
+ *
+ * @param jobConfiguration job configuration
+ */
+ void reloadIfNecessary(JobConfiguration jobConfiguration);
/**
- * Handle exception.
- *
- * @param jobName job name
- * @param cause failure cause
+ * Get target instance.
+ *
+ * @return instance
*/
- void handleException(String jobName, Throwable cause);
+ T getInstance();
}
diff --git a/elasticjob-ecosystem/elasticjob-error-handler/elasticjob-error-handler-spi/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandler.java b/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/context/ReloadablePostProcessor.java
similarity index 64%
copy from elasticjob-ecosystem/elasticjob-error-handler/elasticjob-error-handler-spi/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandler.java
copy to elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/context/ReloadablePostProcessor.java
index fe87a19..8dc86d2 100644
--- a/elasticjob-ecosystem/elasticjob-error-handler/elasticjob-error-handler-spi/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandler.java
+++ b/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/context/ReloadablePostProcessor.java
@@ -7,7 +7,7 @@
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,21 +15,19 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.elasticjob.error.handler;
+package org.apache.shardingsphere.elasticjob.infra.context;
-import org.apache.shardingsphere.elasticjob.infra.spi.SPIPostProcessor;
-import org.apache.shardingsphere.elasticjob.infra.spi.TypedSPI;
+import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
/**
- * Job error handler.
+ * Reloadable post processor.
*/
-public interface JobErrorHandler extends TypedSPI, SPIPostProcessor {
+public interface ReloadablePostProcessor {
/**
- * Handle exception.
- *
- * @param jobName job name
- * @param cause failure cause
+ * Initialize reloadable.
+ *
+ * @param jobConfig job configuration
*/
- void handleException(String jobName, Throwable cause);
+ void init(JobConfiguration jobConfig);
}
diff --git a/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/handler/threadpool/JobExecutorServiceHandlerFactory.java b/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/handler/threadpool/JobExecutorServiceHandlerFactory.java
index 6987a76..1cc1494 100644
--- a/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/handler/threadpool/JobExecutorServiceHandlerFactory.java
+++ b/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/handler/threadpool/JobExecutorServiceHandlerFactory.java
@@ -29,7 +29,7 @@ import org.apache.shardingsphere.elasticjob.infra.spi.ElasticJobServiceLoader;
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class JobExecutorServiceHandlerFactory {
- private static final String DEFAULT_HANDLER = "CPU";
+ public static final String DEFAULT_HANDLER = "CPU";
static {
ElasticJobServiceLoader.registerTypedService(JobExecutorServiceHandler.class);
diff --git a/elasticjob-infra/elasticjob-infra-common/src/main/resources/META-INF/services/org.apache.shardingsphere.elasticjob.infra.context.Reloadable b/elasticjob-infra/elasticjob-infra-common/src/main/resources/META-INF/services/org.apache.shardingsphere.elasticjob.infra.context.Reloadable
new file mode 100644
index 0000000..14d1efd
--- /dev/null
+++ b/elasticjob-infra/elasticjob-infra-common/src/main/resources/META-INF/services/org.apache.shardingsphere.elasticjob.infra.context.Reloadable
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.elasticjob.infra.concurrent.ExecutorServiceReloadable
diff --git a/elasticjob-infra/elasticjob-infra-common/src/test/java/org/apache/shardingsphere/elasticjob/infra/concurrent/ExecutorServiceReloadableTest.java b/elasticjob-infra/elasticjob-infra-common/src/test/java/org/apache/shardingsphere/elasticjob/infra/concurrent/ExecutorServiceReloadableTest.java
new file mode 100644
index 0000000..3a64c83
--- /dev/null
+++ b/elasticjob-infra/elasticjob-infra-common/src/test/java/org/apache/shardingsphere/elasticjob/infra/concurrent/ExecutorServiceReloadableTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.infra.concurrent;
+
+import lombok.SneakyThrows;
+import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.lang.reflect.Field;
+import java.util.concurrent.ExecutorService;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.verify;
+
+@RunWith(MockitoJUnitRunner.class)
+public final class ExecutorServiceReloadableTest {
+
+ @Mock
+ private ExecutorService mockExecutorService;
+
+ @Test
+ public void assertInitialize() {
+ ExecutorServiceReloadable executorServiceReloadable = new ExecutorServiceReloadable();
+ String jobExecutorServiceHandlerType = "SINGLE_THREAD";
+ JobConfiguration jobConfig = JobConfiguration.newBuilder("job", 1).jobExecutorServiceHandlerType(jobExecutorServiceHandlerType).build();
+ assertNull(executorServiceReloadable.getInstance());
+ executorServiceReloadable.init(jobConfig);
+ ExecutorService actual = executorServiceReloadable.getInstance();
+ assertNotNull(actual);
+ assertFalse(actual.isShutdown());
+ assertFalse(actual.isTerminated());
+ actual.shutdown();
+ }
+
+ @Test
+ public void assertReload() {
+ ExecutorServiceReloadable executorServiceReloadable = new ExecutorServiceReloadable();
+ setField(executorServiceReloadable, "jobExecutorServiceHandlerType", "mock");
+ setField(executorServiceReloadable, "executorService", mockExecutorService);
+ JobConfiguration jobConfig = JobConfiguration.newBuilder("job", 1).build();
+ executorServiceReloadable.reloadIfNecessary(jobConfig);
+ verify(mockExecutorService).shutdown();
+ ExecutorService actual = executorServiceReloadable.getInstance();
+ assertFalse(actual.isShutdown());
+ assertFalse(actual.isTerminated());
+ actual.shutdown();
+ }
+
+ @Test
+ public void assertUnnecessaryToReload() {
+ ExecutorServiceReloadable executorServiceReloadable = new ExecutorServiceReloadable();
+ JobConfiguration jobConfig = JobConfiguration.newBuilder("job", 1).build();
+ executorServiceReloadable.init(jobConfig);
+ ExecutorService expected = executorServiceReloadable.getInstance();
+ executorServiceReloadable.reloadIfNecessary(jobConfig);
+ ExecutorService actual = executorServiceReloadable.getInstance();
+ assertThat(actual, is(expected));
+ actual.shutdown();
+ }
+
+ @Test
+ public void assertShutdown() {
+ ExecutorServiceReloadable executorServiceReloadable = new ExecutorServiceReloadable();
+ setField(executorServiceReloadable, "executorService", mockExecutorService);
+ executorServiceReloadable.close();
+ verify(mockExecutorService).shutdown();
+ }
+
+ @SneakyThrows
+ private void setField(final Object target, final String fieldName, final Object value) {
+ Field field = target.getClass().getDeclaredField(fieldName);
+ boolean originAccessible = field.isAccessible();
+ if (!originAccessible) {
+ field.setAccessible(true);
+ }
+ field.set(target, value);
+ field.setAccessible(originAccessible);
+ }
+}