You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/10/02 14:52:01 UTC

[shardingsphere-elasticjob] branch master updated: Bind lifecycle of JobErrorHandler and ElasticJobListener to job (#1535)

This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere-elasticjob.git


The following commit(s) were added to refs/heads/master by this push:
     new 35afcbe  Bind lifecycle of JobErrorHandler and ElasticJobListener to job (#1535)
35afcbe is described below

commit 35afcbece7d010b98117594a124960529a7f3283
Author: 吴伟杰 <ro...@me.com>
AuthorDate: Fri Oct 2 22:47:56 2020 +0800

    Bind lifecycle of JobErrorHandler and ElasticJobListener to job (#1535)
---
 .../error/handler/JobErrorHandlerFactory.java      | 27 +++-----
 .../error/handler/JobErrorHandlerFactoryTest.java  |  6 +-
 .../handler/general/IgnoreJobErrorHandlerTest.java |  3 +-
 .../handler/general/LogJobErrorHandlerTest.java    |  3 +-
 .../handler/general/ThrowJobErrorHandlerTest.java  |  3 +-
 .../elasticjob/executor/ElasticJobExecutor.java    |  4 +-
 .../infra/listener/ElasticJobListenerFactory.java  | 25 +++----
 .../infra/spi/ElasticJobServiceLoader.java         | 79 ++++++++++++++++++++++
 .../ServiceLoaderInstantiationException.java       | 19 +++---
 .../lite/internal/schedule/JobScheduler.java       |  7 +-
 10 files changed, 126 insertions(+), 50 deletions(-)

diff --git a/elasticjob-error-handler/elasticjob-error-handler-impl/elasticjob-error-handler-general/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandlerFactory.java b/elasticjob-error-handler/elasticjob-error-handler-impl/elasticjob-error-handler-general/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandlerFactory.java
index e401d5b..6bc84cb 100644
--- a/elasticjob-error-handler/elasticjob-error-handler-impl/elasticjob-error-handler-general/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandlerFactory.java
+++ b/elasticjob-error-handler/elasticjob-error-handler-impl/elasticjob-error-handler-general/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandlerFactory.java
@@ -20,11 +20,9 @@ package org.apache.shardingsphere.elasticjob.error.handler;
 import com.google.common.base.Strings;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.elasticjob.infra.exception.JobConfigurationException;
+import org.apache.shardingsphere.elasticjob.infra.spi.ElasticJobServiceLoader;
 
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.ServiceLoader;
+import java.util.Optional;
 
 /**
  * Job error handler factory.
@@ -32,14 +30,10 @@ import java.util.ServiceLoader;
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
 public final class JobErrorHandlerFactory {
     
-    private static final Map<String, JobErrorHandler> HANDLERS = new LinkedHashMap<>();
-    
     private static final String DEFAULT_HANDLER = "LOG";
     
     static {
-        for (JobErrorHandler each : ServiceLoader.load(JobErrorHandler.class)) {
-            HANDLERS.put(each.getType(), each);
-        }
+        ElasticJobServiceLoader.register(JobErrorHandler.class);
     }
     
     /**
@@ -48,13 +42,14 @@ public final class JobErrorHandlerFactory {
      * @param type job error handler type
      * @return job error handler
      */
-    public static JobErrorHandler getHandler(final String type) {
+    public static Optional<JobErrorHandler> createHandler(final String type) {
         if (Strings.isNullOrEmpty(type)) {
-            return HANDLERS.get(DEFAULT_HANDLER);
-        }
-        if (!HANDLERS.containsKey(type)) {
-            throw new JobConfigurationException("Can not find job error handler type '%s'.", type);
+            return newHandlerInstance(DEFAULT_HANDLER);
         }
-        return HANDLERS.get(type);
-    } 
+        return newHandlerInstance(type);
+    }
+    
+    private static Optional<JobErrorHandler> newHandlerInstance(final String type) {
+        return ElasticJobServiceLoader.newServiceInstances(JobErrorHandler.class).stream().filter(handler -> handler.getType().equalsIgnoreCase(type)).findFirst();
+    }
 }
diff --git a/elasticjob-error-handler/elasticjob-error-handler-impl/elasticjob-error-handler-general/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandlerFactoryTest.java b/elasticjob-error-handler/elasticjob-error-handler-impl/elasticjob-error-handler-general/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandlerFactoryTest.java
index e1f38d2..23ca2e3 100644
--- a/elasticjob-error-handler/elasticjob-error-handler-impl/elasticjob-error-handler-general/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandlerFactoryTest.java
+++ b/elasticjob-error-handler/elasticjob-error-handler-impl/elasticjob-error-handler-general/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandlerFactoryTest.java
@@ -29,16 +29,16 @@ public final class JobErrorHandlerFactoryTest {
     
     @Test
     public void assertGetDefaultHandler() {
-        assertThat(JobErrorHandlerFactory.getHandler(""), instanceOf(LogJobErrorHandler.class));
+        assertThat(JobErrorHandlerFactory.createHandler("").orElse(null), instanceOf(LogJobErrorHandler.class));
     }
     
     @Test(expected = JobConfigurationException.class)
     public void assertGetInvalidHandler() {
-        JobErrorHandlerFactory.getHandler("INVALID");
+        JobErrorHandlerFactory.createHandler("INVALID").orElseThrow(() -> new JobConfigurationException(""));
     }
     
     @Test
     public void assertGetHandler() {
-        assertThat(JobErrorHandlerFactory.getHandler("THROW"), instanceOf(ThrowJobErrorHandler.class));
+        assertThat(JobErrorHandlerFactory.createHandler("THROW").orElse(null), instanceOf(ThrowJobErrorHandler.class));
     }
 }
diff --git a/elasticjob-error-handler/elasticjob-error-handler-impl/elasticjob-error-handler-general/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/general/IgnoreJobErrorHandlerTest.java b/elasticjob-error-handler/elasticjob-error-handler-impl/elasticjob-error-handler-general/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/general/IgnoreJobErrorHandlerTest.java
index baf4e47..5c3dd61 100644
--- a/elasticjob-error-handler/elasticjob-error-handler-impl/elasticjob-error-handler-general/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/general/IgnoreJobErrorHandlerTest.java
+++ b/elasticjob-error-handler/elasticjob-error-handler-impl/elasticjob-error-handler-general/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/general/IgnoreJobErrorHandlerTest.java
@@ -18,12 +18,13 @@
 package org.apache.shardingsphere.elasticjob.error.handler.general;
 
 import org.apache.shardingsphere.elasticjob.error.handler.JobErrorHandlerFactory;
+import org.apache.shardingsphere.elasticjob.infra.exception.JobConfigurationException;
 import org.junit.Test;
 
 public final class IgnoreJobErrorHandlerTest {
     
     @Test
     public void assertHandleException() {
-        JobErrorHandlerFactory.getHandler("IGNORE").handleException("test_job", new RuntimeException("test"));
+        JobErrorHandlerFactory.createHandler("IGNORE").orElseThrow(() -> new JobConfigurationException("IGNORE error handler not found.")).handleException("test_job", new RuntimeException("test"));
     }
 }
diff --git a/elasticjob-error-handler/elasticjob-error-handler-impl/elasticjob-error-handler-general/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/general/LogJobErrorHandlerTest.java b/elasticjob-error-handler/elasticjob-error-handler-impl/elasticjob-error-handler-general/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/general/LogJobErrorHandlerTest.java
index 4b6a134..e566da1 100644
--- a/elasticjob-error-handler/elasticjob-error-handler-impl/elasticjob-error-handler-general/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/general/LogJobErrorHandlerTest.java
+++ b/elasticjob-error-handler/elasticjob-error-handler-impl/elasticjob-error-handler-general/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/general/LogJobErrorHandlerTest.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.elasticjob.error.handler.general;
 
 import lombok.SneakyThrows;
 import org.apache.shardingsphere.elasticjob.error.handler.JobErrorHandlerFactory;
+import org.apache.shardingsphere.elasticjob.infra.exception.JobConfigurationException;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
@@ -38,7 +39,7 @@ public final class LogJobErrorHandlerTest {
     
     @Test
     public void assertHandleException() {
-        LogJobErrorHandler actual = (LogJobErrorHandler) JobErrorHandlerFactory.getHandler("LOG");
+        LogJobErrorHandler actual = (LogJobErrorHandler) JobErrorHandlerFactory.createHandler("LOG").orElseThrow(() -> new JobConfigurationException("LOG error handler not found."));
         setStaticFieldValue(actual);
         Throwable cause = new RuntimeException("test");
         actual.handleException("test_job", cause);
diff --git a/elasticjob-error-handler/elasticjob-error-handler-impl/elasticjob-error-handler-general/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/general/ThrowJobErrorHandlerTest.java b/elasticjob-error-handler/elasticjob-error-handler-impl/elasticjob-error-handler-general/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/general/ThrowJobErrorHandlerTest.java
index a4c13f8..6b48807 100644
--- a/elasticjob-error-handler/elasticjob-error-handler-impl/elasticjob-error-handler-general/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/general/ThrowJobErrorHandlerTest.java
+++ b/elasticjob-error-handler/elasticjob-error-handler-impl/elasticjob-error-handler-general/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/general/ThrowJobErrorHandlerTest.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.elasticjob.error.handler.general;
 
 import org.apache.shardingsphere.elasticjob.error.handler.JobErrorHandlerFactory;
+import org.apache.shardingsphere.elasticjob.infra.exception.JobConfigurationException;
 import org.apache.shardingsphere.elasticjob.infra.exception.JobSystemException;
 import org.junit.Test;
 
@@ -25,6 +26,6 @@ public final class ThrowJobErrorHandlerTest {
     
     @Test(expected = JobSystemException.class)
     public void assertHandleException() {
-        JobErrorHandlerFactory.getHandler("THROW").handleException("test_job", new RuntimeException("test"));
+        JobErrorHandlerFactory.createHandler("THROW").orElseThrow(() -> new JobConfigurationException("THROW error handler not found.")).handleException("test_job", new RuntimeException("test"));
     }
 }
diff --git a/elasticjob-executor/elasticjob-executor-kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutor.java b/elasticjob-executor/elasticjob-executor-kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutor.java
index b098dde..7ac74af 100644
--- a/elasticjob-executor/elasticjob-executor-kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutor.java
+++ b/elasticjob-executor/elasticjob-executor-kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutor.java
@@ -24,6 +24,7 @@ import org.apache.shardingsphere.elasticjob.error.handler.JobErrorHandler;
 import org.apache.shardingsphere.elasticjob.error.handler.JobErrorHandlerFactory;
 import org.apache.shardingsphere.elasticjob.infra.env.IpUtils;
 import org.apache.shardingsphere.elasticjob.infra.exception.ExceptionUtils;
+import org.apache.shardingsphere.elasticjob.infra.exception.JobConfigurationException;
 import org.apache.shardingsphere.elasticjob.infra.exception.JobExecutionEnvironmentException;
 import org.apache.shardingsphere.elasticjob.infra.handler.threadpool.JobExecutorServiceHandlerFactory;
 import org.apache.shardingsphere.elasticjob.executor.item.JobItemExecutor;
@@ -73,7 +74,8 @@ public final class ElasticJobExecutor {
         this.jobFacade = jobFacade;
         this.jobItemExecutor = jobItemExecutor;
         executorService = JobExecutorServiceHandlerFactory.getHandler(jobConfig.getJobExecutorServiceHandlerType()).createExecutorService(jobConfig.getJobName());
-        jobErrorHandler = JobErrorHandlerFactory.getHandler(jobConfig.getJobErrorHandlerType());
+        jobErrorHandler = JobErrorHandlerFactory.createHandler(jobConfig.getJobErrorHandlerType())
+                .orElseThrow(() -> new JobConfigurationException("Can not find job error handler type '%s'.", jobConfig.getJobErrorHandlerType()));
         itemErrorMessages = new ConcurrentHashMap<>(jobConfig.getShardingTotalCount(), 1);
     }
     
diff --git a/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/listener/ElasticJobListenerFactory.java b/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/listener/ElasticJobListenerFactory.java
index 97083e6..c5f7cfc 100644
--- a/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/listener/ElasticJobListenerFactory.java
+++ b/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/listener/ElasticJobListenerFactory.java
@@ -19,10 +19,9 @@ package org.apache.shardingsphere.elasticjob.infra.listener;
 
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.elasticjob.infra.spi.ElasticJobServiceLoader;
 
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.ServiceLoader;
+import java.util.Optional;
 
 /**
  * Job listener factory.
@@ -30,24 +29,18 @@ import java.util.ServiceLoader;
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
 public final class ElasticJobListenerFactory {
     
-    private static final Map<String, ElasticJobListener> HANDLERS = new LinkedHashMap<>();
-    
     static {
-        for (ElasticJobListener each : ServiceLoader.load(ElasticJobListener.class)) {
-            HANDLERS.put(each.getType(), each);
-        }
+        ElasticJobServiceLoader.register(ElasticJobListener.class);
     }
     
     /**
-     * Get job listener.
+     * Create a job listener instance.
      *
      * @param type job listener type
-     * @return job listener
+     * @return optional job listener instance
      */
-    public static ElasticJobListener getListener(final String type) {
-        if (!HANDLERS.containsKey(type)) {
-            throw new IllegalArgumentException(String.format("Can not find job listener type '%s'.", type));
-        }
-        return HANDLERS.get(type);
-    } 
+    public static Optional<ElasticJobListener> createListener(final String type) {
+        return ElasticJobServiceLoader.newServiceInstances(ElasticJobListener.class)
+                .stream().filter(listener -> listener.getType().equalsIgnoreCase(type)).findFirst();
+    }
 }
diff --git a/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/spi/ElasticJobServiceLoader.java b/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/spi/ElasticJobServiceLoader.java
new file mode 100644
index 0000000..a7e9e6f
--- /dev/null
+++ b/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/spi/ElasticJobServiceLoader.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.infra.spi;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.elasticjob.infra.spi.exception.ServiceLoaderInstantiationException;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.ServiceLoader;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
+
+/**
+ * ElasticJob service loader.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class ElasticJobServiceLoader {
+    
+    private static final ConcurrentMap<Class<?>, Collection<Class<?>>> SERVICE_MAP = new ConcurrentHashMap<>();
+    
+    /**
+     * Register SPI service into map for new instance.
+     *
+     * @param service service type
+     * @param <T>     type of service
+     */
+    public static <T> void register(final Class<T> service) {
+        if (SERVICE_MAP.containsKey(service)) {
+            return;
+        }
+        ServiceLoader.load(service).forEach(each -> registerServiceClass(service, each));
+    }
+    
+    private static <T> void registerServiceClass(final Class<T> service, final T instance) {
+        SERVICE_MAP.computeIfAbsent(service, unused -> new LinkedHashSet<>()).add(instance.getClass());
+    }
+    
+    /**
+     * New service instances.
+     *
+     * @param service service class
+     * @param <T>     type of service
+     * @return service instances
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> Collection<T> newServiceInstances(final Class<T> service) {
+        return SERVICE_MAP.containsKey(service) ? SERVICE_MAP.get(service).stream().map(each -> (T) newServiceInstance(each)).collect(Collectors.toList()) : Collections.emptyList();
+    }
+    
+    private static Object newServiceInstance(final Class<?> clazz) {
+        try {
+            return clazz.getConstructor().newInstance();
+        } catch (final InstantiationException | NoSuchMethodException | IllegalAccessException ex) {
+            throw new ServiceLoaderInstantiationException(clazz, ex);
+        } catch (final InvocationTargetException ex) {
+            throw new ServiceLoaderInstantiationException(clazz, ex.getCause());
+        }
+    }
+}
diff --git a/elasticjob-error-handler/elasticjob-error-handler-impl/elasticjob-error-handler-general/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/general/IgnoreJobErrorHandlerTest.java b/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/spi/exception/ServiceLoaderInstantiationException.java
similarity index 62%
copy from elasticjob-error-handler/elasticjob-error-handler-impl/elasticjob-error-handler-general/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/general/IgnoreJobErrorHandlerTest.java
copy to elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/spi/exception/ServiceLoaderInstantiationException.java
index baf4e47..37ba5f0 100644
--- a/elasticjob-error-handler/elasticjob-error-handler-impl/elasticjob-error-handler-general/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/general/IgnoreJobErrorHandlerTest.java
+++ b/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/spi/exception/ServiceLoaderInstantiationException.java
@@ -7,7 +7,7 @@
  * the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,15 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.elasticjob.error.handler.general;
-
-import org.apache.shardingsphere.elasticjob.error.handler.JobErrorHandlerFactory;
-import org.junit.Test;
+package org.apache.shardingsphere.elasticjob.infra.spi.exception;
 
-public final class IgnoreJobErrorHandlerTest {
+/**
+ * Service loader instantiation exception.
+ */
+public final class ServiceLoaderInstantiationException extends RuntimeException {
+    
+    private static final long serialVersionUID = -2949903598320994076L;
     
-    @Test
-    public void assertHandleException() {
-        JobErrorHandlerFactory.getHandler("IGNORE").handleException("test_job", new RuntimeException("test"));
+    public ServiceLoaderInstantiationException(final Class<?> clazz, final Throwable cause) {
+        super(String.format("Can not find public no args constructor for SPI class `%s`", clazz.getName()), cause);
     }
 }
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduler.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduler.java
index ab52c6c..24a371d 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduler.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduler.java
@@ -76,7 +76,9 @@ public final class JobScheduler {
         this.regCenter = regCenter;
         elasticJobType = null;
         final Collection<ElasticJobListener> elasticJobListeners = jobConfig.getJobListenerTypes().stream()
-                .map(ElasticJobListenerFactory::getListener).collect(Collectors.toList());
+                .map(type -> ElasticJobListenerFactory.createListener(type)
+                        .orElseThrow(() -> new IllegalArgumentException(String.format("Can not find job listener type '%s'.", type))))
+                .collect(Collectors.toList());
         setUpFacade = new SetUpFacade(regCenter, jobConfig.getJobName(), elasticJobListeners);
         schedulerFacade = new SchedulerFacade(regCenter, jobConfig.getJobName());
         jobFacade = new LiteJobFacade(regCenter, jobConfig.getJobName(), elasticJobListeners, tracingConfig);
@@ -95,7 +97,8 @@ public final class JobScheduler {
         this.regCenter = regCenter;
         this.elasticJobType = elasticJobType;
         final Collection<ElasticJobListener> elasticJobListeners = jobConfig.getJobListenerTypes().stream()
-                .map(ElasticJobListenerFactory::getListener).collect(Collectors.toList());
+                .map(type -> ElasticJobListenerFactory.createListener(type).orElseThrow(() -> new IllegalArgumentException(String.format("Can not find job listener type '%s'.", type))))
+                .collect(Collectors.toList());
         setUpFacade = new SetUpFacade(regCenter, jobConfig.getJobName(), elasticJobListeners);
         schedulerFacade = new SchedulerFacade(regCenter, jobConfig.getJobName());
         jobFacade = new LiteJobFacade(regCenter, jobConfig.getJobName(), elasticJobListeners, tracingConfig);