You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/10/22 15:21:15 UTC
[shardingsphere-elasticjob] branch master updated: Add
SPIPostProcessor and use it in error handler (#1639)
This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere-elasticjob.git
The following commit(s) were added to refs/heads/master by this push:
new 7edcb38 Add SPIPostProcessor and use it in error handler (#1639)
7edcb38 is described below
commit 7edcb3849f57fd70285df85b4ebd89d5536efa41
Author: Liang Zhang <te...@163.com>
AuthorDate: Thu Oct 22 23:20:10 2020 +0800
Add SPIPostProcessor and use it in error handler (#1639)
* Add SPIPostProcessor
* Impl init method on dingtalk err handler
* Impl init method on wechat err handler
* Impl init method on email err handler
* refactor JobErrorHandler
* Fix compile error
---
.../elasticjob/error/handler/JobErrorHandler.java | 8 +-
.../handler/dingtalk/DingtalkConfiguration.java | 40 -------
.../handler/dingtalk/DingtalkJobErrorHandler.java | 56 +++++-----
.../dingtalk/DingtalkJobErrorHandlerTest.java | 24 ++--
.../error/handler/email/EmailJobErrorHandler.java | 124 +++++++++++----------
.../handler/email/EmailJobErrorHandlerTest.java | 12 +-
.../error/handler/JobErrorHandlerFactory.java | 8 +-
.../handler/general/IgnoreJobErrorHandler.java | 6 +-
.../error/handler/general/LogJobErrorHandler.java | 6 +-
.../handler/general/ThrowJobErrorHandler.java | 6 +-
.../error/handler/JobErrorHandlerFactoryTest.java | 8 +-
.../handler/general/IgnoreJobErrorHandlerTest.java | 7 +-
.../handler/general/LogJobErrorHandlerTest.java | 5 +-
.../handler/general/ThrowJobErrorHandlerTest.java | 7 +-
.../handler/wechat/WechatJobErrorHandler.java | 29 ++---
.../handler/wechat/WechatJobErrorHandlerTest.java | 20 ++--
.../elasticjob/executor/ElasticJobExecutor.java | 10 +-
.../infra/listener/ElasticJobListenerFactory.java | 3 +-
.../infra/spi/ElasticJobServiceLoader.java | 12 +-
.../elasticjob/infra/spi/SPIPostProcessor.java | 25 ++---
.../infra/spi/ElasticJobServiceLoaderTest.java | 6 +-
21 files changed, 207 insertions(+), 215 deletions(-)
diff --git a/elasticjob-error-handler/elasticjob-error-handler-spi/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandler.java b/elasticjob-error-handler/elasticjob-error-handler-spi/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandler.java
index 95757b4..fe87a19 100644
--- a/elasticjob-error-handler/elasticjob-error-handler-spi/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandler.java
+++ b/elasticjob-error-handler/elasticjob-error-handler-spi/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandler.java
@@ -17,21 +17,19 @@
package org.apache.shardingsphere.elasticjob.error.handler;
+import org.apache.shardingsphere.elasticjob.infra.spi.SPIPostProcessor;
import org.apache.shardingsphere.elasticjob.infra.spi.TypedSPI;
-import java.util.Properties;
-
/**
* Job error handler.
*/
-public interface JobErrorHandler extends TypedSPI {
+public interface JobErrorHandler extends TypedSPI, SPIPostProcessor {
/**
* Handle exception.
*
* @param jobName job name
- * @param props job properties
* @param cause failure cause
*/
- void handleException(String jobName, Properties props, Throwable cause);
+ void handleException(String jobName, Throwable cause);
}
diff --git a/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-dingtalk/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/dingtalk/DingtalkConfiguration.java b/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-dingtalk/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/dingtalk/DingtalkConfiguration.java
deleted file mode 100644
index 073d231..0000000
--- a/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-dingtalk/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/dingtalk/DingtalkConfiguration.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.elasticjob.error.handler.dingtalk;
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.elasticjob.error.handler.ErrorHandlerConfiguration;
-
-/**
- * Job error handler configuration for send error message via dingtalk.
- */
-@RequiredArgsConstructor
-@Getter
-public final class DingtalkConfiguration implements ErrorHandlerConfiguration {
-
- private final String webhook;
-
- private final String keyword;
-
- private final String secret;
-
- private final int connectTimeoutMilliseconds;
-
- private final int readTimeoutMilliseconds;
-}
diff --git a/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-dingtalk/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/dingtalk/DingtalkJobErrorHandler.java b/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-dingtalk/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/dingtalk/DingtalkJobErrorHandler.java
index dae1fa6..8ab1f76 100644
--- a/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-dingtalk/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/dingtalk/DingtalkJobErrorHandler.java
+++ b/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-dingtalk/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/dingtalk/DingtalkJobErrorHandler.java
@@ -55,7 +55,23 @@ public final class DingtalkJobErrorHandler implements JobErrorHandler {
private final CloseableHttpClient httpclient = HttpClients.createDefault();
- public DingtalkJobErrorHandler() {
+ private String webhook;
+
+ private String keyword;
+
+ private String secret;
+
+ private int connectTimeoutMilliseconds;
+
+ private int readTimeoutMilliseconds;
+
+ @Override
+ public void init(final Properties props) {
+ webhook = props.getProperty(DingtalkPropertiesConstants.WEBHOOK);
+ keyword = props.getProperty(DingtalkPropertiesConstants.KEYWORD);
+ secret = props.getProperty(DingtalkPropertiesConstants.SECRET);
+ connectTimeoutMilliseconds = Integer.parseInt(props.getProperty(DingtalkPropertiesConstants.CONNECT_TIMEOUT_MILLISECONDS, DingtalkPropertiesConstants.DEFAULT_CONNECT_TIMEOUT_MILLISECONDS));
+ readTimeoutMilliseconds = Integer.parseInt(props.getProperty(DingtalkPropertiesConstants.READ_TIMEOUT_MILLISECONDS, DingtalkPropertiesConstants.DEFAULT_READ_TIMEOUT_MILLISECONDS));
registerShutdownHook();
}
@@ -72,8 +88,8 @@ public final class DingtalkJobErrorHandler implements JobErrorHandler {
}
@Override
- public void handleException(final String jobName, final Properties props, final Throwable cause) {
- HttpPost httpPost = createHTTPPostMethod(jobName, cause, createConfiguration(props));
+ public void handleException(final String jobName, final Throwable cause) {
+ HttpPost httpPost = createHTTPPostMethod(jobName, cause);
try (CloseableHttpResponse response = httpclient.execute(httpPost)) {
int status = response.getStatusLine().getStatusCode();
if (HttpURLConnection.HTTP_OK == status) {
@@ -92,38 +108,28 @@ public final class DingtalkJobErrorHandler implements JobErrorHandler {
}
}
- private DingtalkConfiguration createConfiguration(final Properties props) {
- String webhook = props.getProperty(DingtalkPropertiesConstants.WEBHOOK);
- String keyword = props.getProperty(DingtalkPropertiesConstants.KEYWORD);
- String secret = props.getProperty(DingtalkPropertiesConstants.SECRET);
- int connectTimeoutMilliseconds = Integer.parseInt(
- props.getProperty(DingtalkPropertiesConstants.CONNECT_TIMEOUT_MILLISECONDS, DingtalkPropertiesConstants.DEFAULT_CONNECT_TIMEOUT_MILLISECONDS));
- int readTimeoutMilliseconds = Integer.parseInt(props.getProperty(DingtalkPropertiesConstants.READ_TIMEOUT_MILLISECONDS, DingtalkPropertiesConstants.DEFAULT_READ_TIMEOUT_MILLISECONDS));
- return new DingtalkConfiguration(webhook, keyword, secret, connectTimeoutMilliseconds, readTimeoutMilliseconds);
- }
-
- private HttpPost createHTTPPostMethod(final String jobName, final Throwable cause, final DingtalkConfiguration config) {
- HttpPost result = new HttpPost(getURL(config));
- RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(config.getConnectTimeoutMilliseconds()).setSocketTimeout(config.getReadTimeoutMilliseconds()).build();
+ private HttpPost createHTTPPostMethod(final String jobName, final Throwable cause) {
+ HttpPost result = new HttpPost(getURL());
+ RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(connectTimeoutMilliseconds).setSocketTimeout(readTimeoutMilliseconds).build();
result.setConfig(requestConfig);
- StringEntity entity = new StringEntity(getJsonParameter(getErrorMessage(jobName, config, cause)), StandardCharsets.UTF_8);
+ StringEntity entity = new StringEntity(getJsonParameter(getErrorMessage(jobName, cause)), StandardCharsets.UTF_8);
entity.setContentEncoding(StandardCharsets.UTF_8.name());
entity.setContentType("application/json");
result.setEntity(entity);
return result;
}
- private String getURL(final DingtalkConfiguration config) {
- return Strings.isNullOrEmpty(config.getSecret()) ? config.getWebhook() : getSignedURL(config);
+ private String getURL() {
+ return Strings.isNullOrEmpty(secret) ? webhook : getSignedURL();
}
- private String getSignedURL(final DingtalkConfiguration config) {
+ private String getSignedURL() {
long timestamp = System.currentTimeMillis();
- return String.format("%s×tamp=%s&sign=%s", config.getWebhook(), timestamp, generateSignature(timestamp, config.getSecret()));
+ return String.format("%s×tamp=%s&sign=%s", webhook, timestamp, generateSignature(timestamp));
}
@SneakyThrows({NoSuchAlgorithmException.class, UnsupportedEncodingException.class, InvalidKeyException.class})
- private String generateSignature(final long timestamp, final String secret) {
+ private String generateSignature(final long timestamp) {
String algorithmName = "HmacSHA256";
Mac mac = Mac.getInstance(algorithmName);
mac.init(new SecretKeySpec(secret.getBytes(StandardCharsets.UTF_8), algorithmName));
@@ -135,12 +141,12 @@ public final class DingtalkJobErrorHandler implements JobErrorHandler {
return GsonFactory.getGson().toJson(ImmutableMap.of("msgtype", "text", "text", Collections.singletonMap("content", message)));
}
- private String getErrorMessage(final String jobName, final DingtalkConfiguration config, final Throwable cause) {
+ private String getErrorMessage(final String jobName, final Throwable cause) {
StringWriter writer = new StringWriter();
cause.printStackTrace(new PrintWriter(writer, true));
String result = String.format("Job '%s' exception occur in job processing, caused by %s", jobName, writer.toString());
- if (!Strings.isNullOrEmpty(config.getKeyword())) {
- result = config.getKeyword().concat(result);
+ if (!Strings.isNullOrEmpty(keyword)) {
+ result = keyword.concat(result);
}
return result;
}
diff --git a/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-dingtalk/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/dingtalk/DingtalkJobErrorHandlerTest.java b/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-dingtalk/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/dingtalk/DingtalkJobErrorHandlerTest.java
index a889f0f..60b6501 100644
--- a/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-dingtalk/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/dingtalk/DingtalkJobErrorHandlerTest.java
+++ b/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-dingtalk/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/dingtalk/DingtalkJobErrorHandlerTest.java
@@ -68,51 +68,51 @@ public final class DingtalkJobErrorHandlerTest {
@Test
public void assertHandleExceptionWithNotifySuccessful() {
- DingtalkJobErrorHandler actual = getDingtalkJobErrorHandler();
+ DingtalkJobErrorHandler actual = getDingtalkJobErrorHandler(createConfigurationProperties("http://localhost:9875/send?access_token=mocked_token"));
setStaticFieldValue(actual);
Throwable cause = new RuntimeException("test");
- actual.handleException("test_job", createConfigurationProperties("http://localhost:9875/send?access_token=mocked_token"), cause);
+ actual.handleException("test_job", cause);
verify(log).info("An exception has occurred in Job '{}', Notification to Dingtalk was successful.", "test_job", cause);
}
@Test
public void assertHandleExceptionWithWrongToken() {
- DingtalkJobErrorHandler actual = getDingtalkJobErrorHandler();
+ DingtalkJobErrorHandler actual = getDingtalkJobErrorHandler(createConfigurationProperties("http://localhost:9875/send?access_token=wrong_token"));
setStaticFieldValue(actual);
Throwable cause = new RuntimeException("test");
- actual.handleException("test_job", createConfigurationProperties("http://localhost:9875/send?access_token=wrong_token"), cause);
+ actual.handleException("test_job", cause);
verify(log).info("An exception has occurred in Job '{}', But failed to send alert by Dingtalk because of: {}", "test_job", "token is not exist", cause);
}
@Test
public void assertHandleExceptionWithUrlIsNotFound() {
- DingtalkJobErrorHandler actual = getDingtalkJobErrorHandler();
+ DingtalkJobErrorHandler actual = getDingtalkJobErrorHandler(createConfigurationProperties("http://localhost:9875/404"));
setStaticFieldValue(actual);
Throwable cause = new RuntimeException("test");
- actual.handleException("test_job", createConfigurationProperties("http://localhost:9875/404"), cause);
+ actual.handleException("test_job", cause);
verify(log).error("An exception has occurred in Job '{}', But failed to send alert by Dingtalk because of: Unexpected response status: {}", "test_job", 404, cause);
}
@Test
public void assertHandleExceptionWithWrongUrl() {
- DingtalkJobErrorHandler actual = getDingtalkJobErrorHandler();
+ DingtalkJobErrorHandler actual = getDingtalkJobErrorHandler(createNoSignJobConfigurationProperties("http://wrongUrl"));
setStaticFieldValue(actual);
Throwable cause = new RuntimeException("test");
- actual.handleException("test_job", createNoSignJobConfigurationProperties("http://wrongUrl"), cause);
+ actual.handleException("test_job", cause);
verify(log).error("An exception has occurred in Job '{}', But failed to send alert by Dingtalk because of", "test_job", cause);
}
@Test
public void assertHandleExceptionWithNoSign() {
- DingtalkJobErrorHandler actual = getDingtalkJobErrorHandler();
+ DingtalkJobErrorHandler actual = getDingtalkJobErrorHandler(createNoSignJobConfigurationProperties("http://localhost:9875/send?access_token=mocked_token"));
setStaticFieldValue(actual);
Throwable cause = new RuntimeException("test");
- actual.handleException("test_job", createNoSignJobConfigurationProperties("http://localhost:9875/send?access_token=mocked_token"), cause);
+ actual.handleException("test_job", cause);
verify(log).info("An exception has occurred in Job '{}', Notification to Dingtalk was successful.", "test_job", cause);
}
- private DingtalkJobErrorHandler getDingtalkJobErrorHandler() {
- return (DingtalkJobErrorHandler) JobErrorHandlerFactory.createHandler("DINGTALK").orElseThrow(() -> new JobConfigurationException("DINGTALK error handler not found."));
+ private DingtalkJobErrorHandler getDingtalkJobErrorHandler(final Properties props) {
+ return (DingtalkJobErrorHandler) JobErrorHandlerFactory.createHandler("DINGTALK", props).orElseThrow(() -> new JobConfigurationException("DINGTALK error handler not found."));
}
@SneakyThrows
diff --git a/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-email/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/email/EmailJobErrorHandler.java b/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-email/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/email/EmailJobErrorHandler.java
index 7b3348d..02e3a11 100644
--- a/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-email/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/email/EmailJobErrorHandler.java
+++ b/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-email/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/email/EmailJobErrorHandler.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.elasticjob.error.handler.email;
+import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.shardingsphere.elasticjob.error.handler.JobErrorHandler;
@@ -36,7 +37,6 @@ import javax.mail.internet.MimeMultipart;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Date;
-import java.util.Optional;
import java.util.Properties;
/**
@@ -47,12 +47,61 @@ public final class EmailJobErrorHandler implements JobErrorHandler {
private Session session;
+ private String subject;
+
+ private String from;
+
+ private String to;
+
+ private String cc;
+
+ private String bcc;
+
+ @Override
+ public void init(final Properties props) {
+ String host = props.getProperty(EmailPropertiesConstants.HOST);
+ int port = Integer.parseInt(props.getProperty(EmailPropertiesConstants.PORT));
+ boolean isUseSSL = Boolean.getBoolean(props.getProperty(EmailPropertiesConstants.IS_USE_SSL, EmailPropertiesConstants.DEFAULT_IS_USE_SSL));
+ boolean isDebug = Boolean.getBoolean(props.getProperty(EmailPropertiesConstants.IS_DEBUG, EmailPropertiesConstants.DEFAULT_IS_DEBUG));
+ String username = props.getProperty(EmailPropertiesConstants.USERNAME);
+ String password = props.getProperty(EmailPropertiesConstants.PASSWORD);
+ session = Session.getDefaultInstance(createSessionProperties(host, port, isUseSSL, isDebug), getSessionAuthenticator(username, password));
+ subject = props.getProperty(EmailPropertiesConstants.SUBJECT, EmailPropertiesConstants.DEFAULT_SUBJECT);
+ from = props.getProperty(EmailPropertiesConstants.FROM);
+ to = props.getProperty(EmailPropertiesConstants.TO);
+ cc = props.getProperty(EmailPropertiesConstants.CC);
+ bcc = props.getProperty(EmailPropertiesConstants.BCC);
+ }
+
+ private Properties createSessionProperties(final String host, final int port, final boolean isUseSSL, final boolean isDebug) {
+ Properties result = new Properties();
+ result.put("mail.smtp.host", host);
+ result.put("mail.smtp.port", port);
+ result.put("mail.smtp.auth", Boolean.TRUE.toString());
+ result.put("mail.transport.protocol", "smtp");
+ result.setProperty("mail.debug", Boolean.toString(isDebug));
+ if (isUseSSL) {
+ result.setProperty("mail.smtp.socketFactory.class", "javax.net.ssl.SSLSocketFactory");
+ result.setProperty("mail.smtp.socketFactory.fallback", Boolean.FALSE.toString());
+ }
+ return result;
+ }
+
+ private Authenticator getSessionAuthenticator(final String username, final String password) {
+ return new Authenticator() {
+
+ @Override
+ public PasswordAuthentication getPasswordAuthentication() {
+ return new PasswordAuthentication(username, password);
+ }
+ };
+ }
+
@Override
- public void handleException(final String jobName, final Properties props, final Throwable cause) {
+ public void handleException(final String jobName, final Throwable cause) {
String errorMessage = getErrorMessage(jobName, cause);
- EmailConfiguration config = createConfiguration(props);
try {
- sendMessage(createMessage(errorMessage, config), config);
+ sendMessage(createMessage(errorMessage));
log.error("An exception has occurred in Job '{}'. An email has been sent successfully.", jobName, cause);
} catch (final MessagingException ex) {
cause.addSuppressed(ex);
@@ -60,92 +109,45 @@ public final class EmailJobErrorHandler implements JobErrorHandler {
}
}
- private EmailConfiguration createConfiguration(final Properties props) {
- String host = props.getProperty(EmailPropertiesConstants.HOST);
- int port = Integer.parseInt(props.getProperty(EmailPropertiesConstants.PORT));
- String username = props.getProperty(EmailPropertiesConstants.USERNAME);
- String password = props.getProperty(EmailPropertiesConstants.PASSWORD);
- boolean isUseSSL = Boolean.parseBoolean(props.getProperty(EmailPropertiesConstants.IS_USE_SSL, EmailPropertiesConstants.DEFAULT_IS_USE_SSL));
- String subject = props.getProperty(EmailPropertiesConstants.SUBJECT, EmailPropertiesConstants.DEFAULT_SUBJECT);
- String from = props.getProperty(EmailPropertiesConstants.FROM);
- String to = props.getProperty(EmailPropertiesConstants.TO);
- String cc = props.getProperty(EmailPropertiesConstants.CC);
- String bcc = props.getProperty(EmailPropertiesConstants.BCC);
- boolean isDebug = Boolean.parseBoolean(props.getProperty(EmailPropertiesConstants.IS_DEBUG, EmailPropertiesConstants.DEFAULT_IS_DEBUG));
- return new EmailConfiguration(host, port, username, password, isUseSSL, subject, from, to, cc, bcc, isDebug);
- }
-
private String getErrorMessage(final String jobName, final Throwable cause) {
StringWriter writer = new StringWriter();
cause.printStackTrace(new PrintWriter(writer, true));
return String.format("Job '%s' exception occur in job processing, caused by %s", jobName, writer.toString());
}
- private Message createMessage(final String content, final EmailConfiguration config) throws MessagingException {
- MimeMessage result = new MimeMessage(Optional.ofNullable(session).orElseGet(() -> createSession(config)));
- result.setFrom(new InternetAddress(config.getFrom()));
- result.setSubject(config.getSubject());
+ private Message createMessage(final String content) throws MessagingException {
+ MimeMessage result = new MimeMessage(session);
+ result.setFrom(new InternetAddress(from));
+ result.setSubject(subject);
result.setSentDate(new Date());
Multipart multipart = new MimeMultipart();
BodyPart mailBody = new MimeBodyPart();
mailBody.setContent(content, "text/html; charset=utf-8");
multipart.addBodyPart(mailBody);
result.setContent(multipart);
- String to = config.getTo();
if (StringUtils.isNotBlank(to)) {
String[] tos = to.split(",");
for (String each : tos) {
result.addRecipient(Message.RecipientType.TO, new InternetAddress(each));
}
}
- if (StringUtils.isNotBlank(config.getCc())) {
- result.addRecipient(Message.RecipientType.CC, new InternetAddress(config.getCc()));
+ if (!Strings.isNullOrEmpty(cc)) {
+ result.addRecipient(Message.RecipientType.CC, new InternetAddress(cc));
}
- if (StringUtils.isNotBlank(config.getBcc())) {
- result.addRecipient(Message.RecipientType.BCC, new InternetAddress(config.getBcc()));
+ if (!Strings.isNullOrEmpty(bcc)) {
+ result.addRecipient(Message.RecipientType.BCC, new InternetAddress(bcc));
}
result.saveChanges();
return result;
}
- private void sendMessage(final Message message, final EmailConfiguration config) throws MessagingException {
- try (Transport transport = Optional.ofNullable(session).orElseGet(() -> createSession(config)).getTransport()) {
+ private void sendMessage(final Message message) throws MessagingException {
+ try (Transport transport = session.getTransport()) {
transport.connect();
transport.sendMessage(message, message.getAllRecipients());
}
}
- private synchronized Session createSession(final EmailConfiguration config) {
- if (null == session) {
- session = Session.getDefaultInstance(createSessionProperties(config), getSessionAuthenticator(config));
- }
- return session;
- }
-
- private Properties createSessionProperties(final EmailConfiguration config) {
- Properties result = new Properties();
- result.put("mail.smtp.host", config.getHost());
- result.put("mail.smtp.port", config.getPort());
- result.put("mail.smtp.auth", "true");
- result.put("mail.transport.protocol", "smtp");
- result.setProperty("mail.debug", Boolean.toString(config.isDebug()));
- if (config.isUseSsl()) {
- result.setProperty("mail.smtp.socketFactory.class", "javax.net.ssl.SSLSocketFactory");
- result.setProperty("mail.smtp.socketFactory.fallback", "false");
- }
- return result;
- }
-
- private Authenticator getSessionAuthenticator(final EmailConfiguration config) {
- return new Authenticator() {
-
- @Override
- public PasswordAuthentication getPasswordAuthentication() {
- return new PasswordAuthentication(config.getUsername(), config.getPassword());
- }
- };
- }
-
@Override
public String getType() {
return "EMAIL";
diff --git a/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-email/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/email/EmailJobErrorHandlerTest.java b/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-email/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/email/EmailJobErrorHandlerTest.java
index b02121b..8e6b8ba 100644
--- a/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-email/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/email/EmailJobErrorHandlerTest.java
+++ b/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-email/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/email/EmailJobErrorHandlerTest.java
@@ -52,31 +52,31 @@ public final class EmailJobErrorHandlerTest {
@Test
public void assertHandleExceptionWithMessagingException() {
- EmailJobErrorHandler emailJobErrorHandler = getEmailJobErrorHandler();
+ EmailJobErrorHandler emailJobErrorHandler = getEmailJobErrorHandler(createConfigurationProperties());
setStaticFieldValue(emailJobErrorHandler, "log", log);
Throwable cause = new RuntimeException("test");
String jobName = "test_job";
- emailJobErrorHandler.handleException(jobName, createConfigurationProperties(), cause);
+ emailJobErrorHandler.handleException(jobName, cause);
verify(log).error("An exception has occurred in Job '{}' but failed to send email because of", jobName, cause);
}
@Test
@SneakyThrows
public void assertHandleExceptionSucceedInSendingEmail() {
- EmailJobErrorHandler emailJobErrorHandler = getEmailJobErrorHandler();
+ EmailJobErrorHandler emailJobErrorHandler = getEmailJobErrorHandler(createConfigurationProperties());
setStaticFieldValue(emailJobErrorHandler, "log", log);
setUpMockSession(session);
setFieldValue(emailJobErrorHandler, "session", session);
Throwable cause = new RuntimeException("test");
String jobName = "test_job";
when(session.getTransport()).thenReturn(transport);
- emailJobErrorHandler.handleException(jobName, createConfigurationProperties(), cause);
+ emailJobErrorHandler.handleException(jobName, cause);
verify(transport).sendMessage(any(Message.class), any(Address[].class));
verify(log).error("An exception has occurred in Job '{}'. An email has been sent successfully.", jobName, cause);
}
- private EmailJobErrorHandler getEmailJobErrorHandler() {
- return (EmailJobErrorHandler) JobErrorHandlerFactory.createHandler("EMAIL").orElseThrow(() -> new JobConfigurationException("EMAIL error handler not found."));
+ private EmailJobErrorHandler getEmailJobErrorHandler(final Properties props) {
+ return (EmailJobErrorHandler) JobErrorHandlerFactory.createHandler("EMAIL", props).orElseThrow(() -> new JobConfigurationException("EMAIL error handler not found."));
}
private void setUpMockSession(final Session session) {
diff --git a/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandlerFactory.java b/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandlerFactory.java
index e023ec5..7b8951f 100644
--- a/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandlerFactory.java
+++ b/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandlerFactory.java
@@ -23,6 +23,7 @@ import lombok.NoArgsConstructor;
import org.apache.shardingsphere.elasticjob.infra.spi.ElasticJobServiceLoader;
import java.util.Optional;
+import java.util.Properties;
/**
* Job error handler factory.
@@ -40,12 +41,13 @@ public final class JobErrorHandlerFactory {
* Get job error handler.
*
* @param type job error handler type
+ * @param props job properties
* @return job error handler
*/
- public static Optional<JobErrorHandler> createHandler(final String type) {
+ public static Optional<JobErrorHandler> createHandler(final String type, final Properties props) {
if (Strings.isNullOrEmpty(type)) {
- return ElasticJobServiceLoader.newTypedServiceInstance(JobErrorHandler.class, DEFAULT_HANDLER);
+ return ElasticJobServiceLoader.newTypedServiceInstance(JobErrorHandler.class, DEFAULT_HANDLER, props);
}
- return ElasticJobServiceLoader.newTypedServiceInstance(JobErrorHandler.class, type);
+ return ElasticJobServiceLoader.newTypedServiceInstance(JobErrorHandler.class, type, props);
}
}
diff --git a/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/general/IgnoreJobErrorHandler.java b/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/general/IgnoreJobErrorHandler.java
index 6e8ad3d..d6d9521 100644
--- a/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/general/IgnoreJobErrorHandler.java
+++ b/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/general/IgnoreJobErrorHandler.java
@@ -27,7 +27,11 @@ import java.util.Properties;
public final class IgnoreJobErrorHandler implements JobErrorHandler {
@Override
- public void handleException(final String jobName, final Properties props, final Throwable cause) {
+ public void init(final Properties props) {
+ }
+
+ @Override
+ public void handleException(final String jobName, final Throwable cause) {
}
@Override
diff --git a/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/general/LogJobErrorHandler.java b/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/general/LogJobErrorHandler.java
index 0e83df1..2311b69 100644
--- a/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/general/LogJobErrorHandler.java
+++ b/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/general/LogJobErrorHandler.java
@@ -29,7 +29,11 @@ import java.util.Properties;
public final class LogJobErrorHandler implements JobErrorHandler {
@Override
- public void handleException(final String jobName, final Properties props, final Throwable cause) {
+ public void init(final Properties props) {
+ }
+
+ @Override
+ public void handleException(final String jobName, final Throwable cause) {
log.error(String.format("Job '%s' exception occur in job processing", jobName), cause);
}
diff --git a/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/general/ThrowJobErrorHandler.java b/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/general/ThrowJobErrorHandler.java
index f9f9e3e..7d76372 100644
--- a/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/general/ThrowJobErrorHandler.java
+++ b/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/general/ThrowJobErrorHandler.java
@@ -28,7 +28,11 @@ import java.util.Properties;
public final class ThrowJobErrorHandler implements JobErrorHandler {
@Override
- public void handleException(final String jobName, final Properties props, final Throwable cause) {
+ public void init(final Properties props) {
+ }
+
+ @Override
+ public void handleException(final String jobName, final Throwable cause) {
throw new JobSystemException(cause);
}
diff --git a/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandlerFactoryTest.java b/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandlerFactoryTest.java
index 23ca2e3..51f93eb 100644
--- a/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandlerFactoryTest.java
+++ b/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandlerFactoryTest.java
@@ -22,6 +22,8 @@ import org.apache.shardingsphere.elasticjob.error.handler.general.ThrowJobErrorH
import org.apache.shardingsphere.elasticjob.infra.exception.JobConfigurationException;
import org.junit.Test;
+import java.util.Properties;
+
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertThat;
@@ -29,16 +31,16 @@ public final class JobErrorHandlerFactoryTest {
@Test
public void assertGetDefaultHandler() {
- assertThat(JobErrorHandlerFactory.createHandler("").orElse(null), instanceOf(LogJobErrorHandler.class));
+ assertThat(JobErrorHandlerFactory.createHandler("", new Properties()).orElse(null), instanceOf(LogJobErrorHandler.class));
}
@Test(expected = JobConfigurationException.class)
public void assertGetInvalidHandler() {
- JobErrorHandlerFactory.createHandler("INVALID").orElseThrow(() -> new JobConfigurationException(""));
+ JobErrorHandlerFactory.createHandler("INVALID", new Properties()).orElseThrow(() -> new JobConfigurationException(""));
}
@Test
public void assertGetHandler() {
- assertThat(JobErrorHandlerFactory.createHandler("THROW").orElse(null), instanceOf(ThrowJobErrorHandler.class));
+ assertThat(JobErrorHandlerFactory.createHandler("THROW", new Properties()).orElse(null), instanceOf(ThrowJobErrorHandler.class));
}
}
diff --git a/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/general/IgnoreJobErrorHandlerTest.java b/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/general/IgnoreJobErrorHandlerTest.java
index a820302..e82a6b7 100644
--- a/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/general/IgnoreJobErrorHandlerTest.java
+++ b/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/general/IgnoreJobErrorHandlerTest.java
@@ -21,12 +21,13 @@ import org.apache.shardingsphere.elasticjob.error.handler.JobErrorHandlerFactory
import org.apache.shardingsphere.elasticjob.infra.exception.JobConfigurationException;
import org.junit.Test;
+import java.util.Properties;
+
public final class IgnoreJobErrorHandlerTest {
- @SuppressWarnings("unchecked")
@Test
public void assertHandleException() {
- JobErrorHandlerFactory.createHandler("IGNORE").orElseThrow(
- () -> new JobConfigurationException("IGNORE error handler not found.")).handleException("test_job", null, new RuntimeException("test"));
+ JobErrorHandlerFactory.createHandler("IGNORE", new Properties()).orElseThrow(
+ () -> new JobConfigurationException("IGNORE error handler not found.")).handleException("test_job", new RuntimeException("test"));
}
}
diff --git a/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/general/LogJobErrorHandlerTest.java b/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/general/LogJobErrorHandlerTest.java
index b7d7e24..1d49a8c 100644
--- a/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/general/LogJobErrorHandlerTest.java
+++ b/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/general/LogJobErrorHandlerTest.java
@@ -28,6 +28,7 @@ import org.slf4j.Logger;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
+import java.util.Properties;
import static org.mockito.Mockito.verify;
@@ -39,10 +40,10 @@ public final class LogJobErrorHandlerTest {
@Test
public void assertHandleException() {
- LogJobErrorHandler actual = (LogJobErrorHandler) JobErrorHandlerFactory.createHandler("LOG").orElseThrow(() -> new JobConfigurationException("LOG error handler not found."));
+ LogJobErrorHandler actual = (LogJobErrorHandler) JobErrorHandlerFactory.createHandler("LOG", new Properties()).orElseThrow(() -> new JobConfigurationException("LOG error handler not found."));
setStaticFieldValue(actual);
Throwable cause = new RuntimeException("test");
- actual.handleException("test_job", null, cause);
+ actual.handleException("test_job", cause);
verify(log).error("Job 'test_job' exception occur in job processing", cause);
}
diff --git a/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/general/ThrowJobErrorHandlerTest.java b/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/general/ThrowJobErrorHandlerTest.java
index 6efc5ae..e0f8656 100644
--- a/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/general/ThrowJobErrorHandlerTest.java
+++ b/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-general/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/general/ThrowJobErrorHandlerTest.java
@@ -22,12 +22,13 @@ import org.apache.shardingsphere.elasticjob.infra.exception.JobConfigurationExce
import org.apache.shardingsphere.elasticjob.infra.exception.JobSystemException;
import org.junit.Test;
+import java.util.Properties;
+
public final class ThrowJobErrorHandlerTest {
- @SuppressWarnings("unchecked")
@Test(expected = JobSystemException.class)
public void assertHandleException() {
- JobErrorHandlerFactory.createHandler("THROW").orElseThrow(
- () -> new JobConfigurationException("THROW error handler not found.")).handleException("test_job", null, new RuntimeException("test"));
+ JobErrorHandlerFactory.createHandler("THROW", new Properties()).orElseThrow(
+ () -> new JobConfigurationException("THROW error handler not found.")).handleException("test_job", new RuntimeException("test"));
}
}
diff --git a/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-wechat/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/wechat/WechatJobErrorHandler.java b/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-wechat/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/wechat/WechatJobErrorHandler.java
index 4d9264f..84ac6a7 100644
--- a/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-wechat/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/wechat/WechatJobErrorHandler.java
+++ b/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-wechat/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/wechat/WechatJobErrorHandler.java
@@ -47,7 +47,17 @@ public final class WechatJobErrorHandler implements JobErrorHandler {
private final CloseableHttpClient httpclient = HttpClients.createDefault();
- public WechatJobErrorHandler() {
+ private String webhook;
+
+ private int connectTimeoutMilliseconds;
+
+ private int readTimeoutMilliseconds;
+
+ @Override
+ public void init(final Properties props) {
+ webhook = props.getProperty(WechatPropertiesConstants.WEBHOOK);
+ connectTimeoutMilliseconds = Integer.parseInt(props.getProperty(WechatPropertiesConstants.CONNECT_TIMEOUT_MILLISECONDS, WechatPropertiesConstants.DEFAULT_CONNECT_TIMEOUT_MILLISECONDS));
+ readTimeoutMilliseconds = Integer.parseInt(props.getProperty(WechatPropertiesConstants.READ_TIMEOUT_MILLISECONDS, WechatPropertiesConstants.DEFAULT_READ_TIMEOUT_MILLISECONDS));
registerShutdownHook();
}
@@ -64,8 +74,8 @@ public final class WechatJobErrorHandler implements JobErrorHandler {
}
@Override
- public void handleException(final String jobName, final Properties props, final Throwable cause) {
- HttpPost httpPost = createHTTPPostMethod(jobName, cause, createConfiguration(props));
+ public void handleException(final String jobName, final Throwable cause) {
+ HttpPost httpPost = createHTTPPostMethod(jobName, cause);
try (CloseableHttpResponse response = httpclient.execute(httpPost)) {
int status = response.getStatusLine().getStatusCode();
if (HttpURLConnection.HTTP_OK == status) {
@@ -84,16 +94,9 @@ public final class WechatJobErrorHandler implements JobErrorHandler {
}
}
- private WechatConfiguration createConfiguration(final Properties props) {
- String webhook = props.getProperty(WechatPropertiesConstants.WEBHOOK);
- int connectTimeoutMilliseconds = Integer.parseInt(props.getProperty(WechatPropertiesConstants.CONNECT_TIMEOUT_MILLISECONDS, WechatPropertiesConstants.DEFAULT_CONNECT_TIMEOUT_MILLISECONDS));
- int readTimeoutMilliseconds = Integer.parseInt(props.getProperty(WechatPropertiesConstants.READ_TIMEOUT_MILLISECONDS, WechatPropertiesConstants.DEFAULT_READ_TIMEOUT_MILLISECONDS));
- return new WechatConfiguration(webhook, connectTimeoutMilliseconds, readTimeoutMilliseconds);
- }
-
- private HttpPost createHTTPPostMethod(final String jobName, final Throwable cause, final WechatConfiguration config) {
- HttpPost result = new HttpPost(config.getWebhook());
- RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(config.getConnectTimeoutMilliseconds()).setSocketTimeout(config.getReadTimeoutMilliseconds()).build();
+ private HttpPost createHTTPPostMethod(final String jobName, final Throwable cause) {
+ HttpPost result = new HttpPost(webhook);
+ RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(connectTimeoutMilliseconds).setSocketTimeout(readTimeoutMilliseconds).build();
result.setConfig(requestConfig);
StringEntity entity = new StringEntity(getJsonParameter(getErrorMessage(jobName, cause)), StandardCharsets.UTF_8);
entity.setContentEncoding(StandardCharsets.UTF_8.name());
diff --git a/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-wechat/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/wechat/WechatJobErrorHandlerTest.java b/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-wechat/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/wechat/WechatJobErrorHandlerTest.java
index 6c5b1f8..a5a86a9 100644
--- a/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-wechat/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/wechat/WechatJobErrorHandlerTest.java
+++ b/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-wechat/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/wechat/WechatJobErrorHandlerTest.java
@@ -68,42 +68,42 @@ public final class WechatJobErrorHandlerTest {
@Test
public void assertHandleExceptionWithNotifySuccessful() {
- WechatJobErrorHandler actual = getWechatJobErrorHandler();
+ WechatJobErrorHandler actual = getWechatJobErrorHandler(createConfigurationProperties("http://localhost:9872/send?key=mocked_key"));
setStaticFieldValue(actual);
Throwable cause = new RuntimeException("test");
- actual.handleException("test_job", createConfigurationProperties("http://localhost:9872/send?key=mocked_key"), cause);
+ actual.handleException("test_job", cause);
verify(log).info("An exception has occurred in Job '{}', Notification to wechat was successful.", "test_job", cause);
}
@Test
public void assertHandleExceptionWithWrongToken() {
- WechatJobErrorHandler actual = getWechatJobErrorHandler();
+ WechatJobErrorHandler actual = getWechatJobErrorHandler(createConfigurationProperties("http://localhost:9872/send?key=wrong_key"));
setStaticFieldValue(actual);
Throwable cause = new RuntimeException("test");
- actual.handleException("test_job", createConfigurationProperties("http://localhost:9872/send?key=wrong_key"), cause);
+ actual.handleException("test_job", cause);
verify(log).info("An exception has occurred in Job '{}', But failed to send alert by wechat because of: {}", "test_job", "token is invalid", cause);
}
@Test
public void assertHandleExceptionWithWrongUrl() {
- WechatJobErrorHandler actual = getWechatJobErrorHandler();
+ WechatJobErrorHandler actual = getWechatJobErrorHandler(createConfigurationProperties("http://wrongUrl"));
setStaticFieldValue(actual);
Throwable cause = new RuntimeException("test");
- actual.handleException("test_job", createConfigurationProperties("http://wrongUrl"), cause);
+ actual.handleException("test_job", cause);
verify(log).error("An exception has occurred in Job '{}', But failed to send alert by wechat because of", "test_job", cause);
}
@Test
public void assertHandleExceptionWithUrlIsNotFound() {
- WechatJobErrorHandler actual = getWechatJobErrorHandler();
+ WechatJobErrorHandler actual = getWechatJobErrorHandler(createConfigurationProperties("http://localhost:9872/404"));
setStaticFieldValue(actual);
Throwable cause = new RuntimeException("test");
- actual.handleException("test_job", createConfigurationProperties("http://localhost:9872/404"), cause);
+ actual.handleException("test_job", cause);
verify(log).error("An exception has occurred in Job '{}', But failed to send alert by wechat because of: Unexpected response status: {}", "test_job", 404, cause);
}
- private WechatJobErrorHandler getWechatJobErrorHandler() {
- return (WechatJobErrorHandler) JobErrorHandlerFactory.createHandler("WECHAT").orElseThrow(() -> new JobConfigurationException("WECHAT error handler not found."));
+ private WechatJobErrorHandler getWechatJobErrorHandler(final Properties props) {
+ return (WechatJobErrorHandler) JobErrorHandlerFactory.createHandler("WECHAT", props).orElseThrow(() -> new JobConfigurationException("WECHAT error handler not found."));
}
@SneakyThrows
diff --git a/elasticjob-executor/elasticjob-executor-kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutor.java b/elasticjob-executor/elasticjob-executor-kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutor.java
index 1e9d712..448213f 100644
--- a/elasticjob-executor/elasticjob-executor-kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutor.java
+++ b/elasticjob-executor/elasticjob-executor-kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutor.java
@@ -74,7 +74,7 @@ public final class ElasticJobExecutor {
this.jobFacade = jobFacade;
this.jobItemExecutor = jobItemExecutor;
executorService = JobExecutorServiceHandlerFactory.getHandler(jobConfig.getJobExecutorServiceHandlerType()).createExecutorService(jobConfig.getJobName());
- jobErrorHandler = JobErrorHandlerFactory.createHandler(jobConfig.getJobErrorHandlerType())
+ jobErrorHandler = JobErrorHandlerFactory.createHandler(jobConfig.getJobErrorHandlerType(), jobConfig.getProps())
.orElseThrow(() -> new JobConfigurationException("Can not find job error handler type '%s'.", jobConfig.getJobErrorHandlerType()));
itemErrorMessages = new ConcurrentHashMap<>(jobConfig.getShardingTotalCount(), 1);
}
@@ -86,7 +86,7 @@ public final class ElasticJobExecutor {
try {
jobFacade.checkJobExecutionEnvironment();
} catch (final JobExecutionEnvironmentException cause) {
- jobErrorHandler.handleException(jobConfig.getJobName(), jobConfig.getProps(), cause);
+ jobErrorHandler.handleException(jobConfig.getJobName(), cause);
}
ShardingContexts shardingContexts = jobFacade.getShardingContexts();
jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, String.format("Job '%s' execute begin.", jobConfig.getJobName()));
@@ -101,7 +101,7 @@ public final class ElasticJobExecutor {
//CHECKSTYLE:OFF
} catch (final Throwable cause) {
//CHECKSTYLE:ON
- jobErrorHandler.handleException(jobConfig.getJobName(), jobConfig.getProps(), cause);
+ jobErrorHandler.handleException(jobConfig.getJobName(), cause);
}
execute(shardingContexts, ExecutionSource.NORMAL_TRIGGER);
while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
@@ -114,7 +114,7 @@ public final class ElasticJobExecutor {
//CHECKSTYLE:OFF
} catch (final Throwable cause) {
//CHECKSTYLE:ON
- jobErrorHandler.handleException(jobConfig.getJobName(), jobConfig.getProps(), cause);
+ jobErrorHandler.handleException(jobConfig.getJobName(), cause);
}
}
@@ -184,7 +184,7 @@ public final class ElasticJobExecutor {
completeEvent = startEvent.executionFailure(ExceptionUtils.transform(cause));
jobFacade.postJobExecutionEvent(completeEvent);
itemErrorMessages.put(item, ExceptionUtils.transform(cause));
- jobErrorHandler.handleException(jobConfig.getJobName(), jobConfig.getProps(), cause);
+ jobErrorHandler.handleException(jobConfig.getJobName(), cause);
}
}
diff --git a/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/listener/ElasticJobListenerFactory.java b/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/listener/ElasticJobListenerFactory.java
index f7ce3a1..f166e81 100644
--- a/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/listener/ElasticJobListenerFactory.java
+++ b/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/listener/ElasticJobListenerFactory.java
@@ -22,6 +22,7 @@ import lombok.NoArgsConstructor;
import org.apache.shardingsphere.elasticjob.infra.spi.ElasticJobServiceLoader;
import java.util.Optional;
+import java.util.Properties;
/**
* Job listener factory.
@@ -40,6 +41,6 @@ public final class ElasticJobListenerFactory {
* @return optional job listener instance
*/
public static Optional<ElasticJobListener> createListener(final String type) {
- return ElasticJobServiceLoader.newTypedServiceInstance(ElasticJobListener.class, type);
+ return ElasticJobServiceLoader.newTypedServiceInstance(ElasticJobListener.class, type, new Properties());
}
}
diff --git a/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/spi/ElasticJobServiceLoader.java b/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/spi/ElasticJobServiceLoader.java
index 849d225..6ba4cfa 100644
--- a/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/spi/ElasticJobServiceLoader.java
+++ b/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/spi/ElasticJobServiceLoader.java
@@ -23,6 +23,7 @@ import org.apache.shardingsphere.elasticjob.infra.spi.exception.ServiceLoaderIns
import java.lang.reflect.InvocationTargetException;
import java.util.Optional;
+import java.util.Properties;
import java.util.ServiceLoader;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -72,13 +73,16 @@ public final class ElasticJobServiceLoader {
*
* @param typedServiceInterface typed service interface
* @param type type
+ * @param props properties
* @param <T> class of service
* @return new typed service instance
*/
- public static <T extends TypedSPI> Optional<T> newTypedServiceInstance(final Class<T> typedServiceInterface, final String type) {
- return Optional.ofNullable(TYPED_SERVICE_CLASSES.get(typedServiceInterface))
- .map(serviceClasses -> serviceClasses.get(type))
- .map(clazz -> (T) newServiceInstance(clazz));
+ public static <T extends TypedSPI> Optional<T> newTypedServiceInstance(final Class<T> typedServiceInterface, final String type, final Properties props) {
+ Optional<T> result = Optional.ofNullable(TYPED_SERVICE_CLASSES.get(typedServiceInterface)).map(serviceClasses -> serviceClasses.get(type)).map(clazz -> (T) newServiceInstance(clazz));
+ if (result.isPresent() && result.get() instanceof SPIPostProcessor) {
+ ((SPIPostProcessor) result.get()).init(props);
+ }
+ return result;
}
private static Object newServiceInstance(final Class<?> clazz) {
diff --git a/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-wechat/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/wechat/WechatConfiguration.java b/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/spi/SPIPostProcessor.java
similarity index 60%
rename from elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-wechat/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/wechat/WechatConfiguration.java
rename to elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/spi/SPIPostProcessor.java
index 0e4c4eb..1c4d27e 100644
--- a/elasticjob-error-handler/elasticjob-error-handler-type/elasticjob-error-handler-wechat/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/wechat/WechatConfiguration.java
+++ b/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/spi/SPIPostProcessor.java
@@ -7,7 +7,7 @@
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,22 +15,19 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.elasticjob.error.handler.wechat;
+package org.apache.shardingsphere.elasticjob.infra.spi;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.elasticjob.error.handler.ErrorHandlerConfiguration;
+import java.util.Properties;
/**
- * Job error handler configuration for send error message via wechat.
+ * SPI post processor.
*/
-@RequiredArgsConstructor
-@Getter
-public final class WechatConfiguration implements ErrorHandlerConfiguration {
-
- private final String webhook;
-
- private final int connectTimeoutMilliseconds;
+public interface SPIPostProcessor {
- private final int readTimeoutMilliseconds;
+ /**
+ * Initialize SPI instance.
+ *
+ * @param props properties
+ */
+ void init(Properties props);
}
diff --git a/elasticjob-infra/elasticjob-infra-common/src/test/java/org/apache/shardingsphere/elasticjob/infra/spi/ElasticJobServiceLoaderTest.java b/elasticjob-infra/elasticjob-infra-common/src/test/java/org/apache/shardingsphere/elasticjob/infra/spi/ElasticJobServiceLoaderTest.java
index 026afe2..ec1fea9 100644
--- a/elasticjob-infra/elasticjob-infra-common/src/test/java/org/apache/shardingsphere/elasticjob/infra/spi/ElasticJobServiceLoaderTest.java
+++ b/elasticjob-infra/elasticjob-infra-common/src/test/java/org/apache/shardingsphere/elasticjob/infra/spi/ElasticJobServiceLoaderTest.java
@@ -22,6 +22,8 @@ import org.apache.shardingsphere.elasticjob.infra.spi.fixture.UnRegisteredTypedF
import org.junit.BeforeClass;
import org.junit.Test;
+import java.util.Properties;
+
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertThat;
@@ -54,11 +56,11 @@ public final class ElasticJobServiceLoaderTest {
@Test(expected = IllegalArgumentException.class)
public void assertNewTypedServiceInstanceFailureWithUnRegisteredServiceInterface() {
- ElasticJobServiceLoader.newTypedServiceInstance(UnRegisteredTypedFooService.class, "unRegisteredTypedFooServiceImpl").orElseThrow(IllegalArgumentException::new);
+ ElasticJobServiceLoader.newTypedServiceInstance(UnRegisteredTypedFooService.class, "unRegisteredTypedFooServiceImpl", new Properties()).orElseThrow(IllegalArgumentException::new);
}
@Test(expected = IllegalArgumentException.class)
public void assertNewTypedServiceInstanceFailureWithInvalidType() {
- ElasticJobServiceLoader.newTypedServiceInstance(TypedFooService.class, "INVALID").orElseThrow(IllegalArgumentException::new);
+ ElasticJobServiceLoader.newTypedServiceInstance(TypedFooService.class, "INVALID", new Properties()).orElseThrow(IllegalArgumentException::new);
}
}