You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by "weixiaonan1 (via GitHub)" <gi...@apache.org> on 2023/08/30 05:39:53 UTC

[GitHub] [dolphinscheduler] weixiaonan1 opened a new pull request, #14833: [Feature-14832][Listener]Implementation of Listener Mechanism

weixiaonan1 opened a new pull request, #14833:
URL: https://github.com/apache/dolphinscheduler/pull/14833

   <!--Thanks very much for contributing to Apache DolphinScheduler, we are happy that you want to help us improve DolphinScheduler! -->
   
   ## Purpose of the pull request
   
   This implementation allows to add listeners for monitoring and tracking the workflow/task state using plugins without restarting the service. 
   1. Users can customize **listener plugins** by building a plugin JAR which implemented the methods in ListenerPlugin and registering it in `Security module`.
   2. Users can create **listener instances** with the listener plugin and choose the **listener events** to subscribe to in `Security module`.
   3. The **listener events** monitored by the **listener instance** will be pushed to the specified destination (message queues, external system interfaces, etc.).
   
   ## Brief change log
   
   Add new module: `dolphinscheduler-listener`.
   
   ## Verify this pull request
   
   This change added tests and can be verified as follows:
   Manually verified the change by testing locally and unit test will be added later.
   
   ## Code Structure
   `dolphinscheduler-listener`
   
   - `dolphinscheduler-listener-common`:  contains the interface definition of **`ListenerPlugin`**, as well as definitions for various listener events. Users only need to depend on this module to create their own listener plugins.
   - `dolphinscheduler-listener-plugin`: examples of listener plugin
       - dolphinscheduler-listener-logger: write listener events into log file which is convenient to test the listener mechanism
       - dolphinscheduler-listener-kafka: send listener events to Kafka
   - `dolphinscheduler-listener-service`:core code of listener mechanism
       - `ListenerPluginService`: manage the listener plugins along with the listener instances
       - `ListenerEventProducer`: interface definition for storing listener events
           - JdbcListenerEventProducer: save listener events into database
       - `ListenerEventConsumer`: interface definition for querying/updating/deleting listener events
           - JdbcListenerEventConsumer: query/update/delete listener events from database
       - `ListenerEventPublishService`: save listener events asynchronously
       - `ListenerInstancePostService`: query listener events and process them
   
   ## Listener Plugin & Listener Instance
   1. create custom listener plugin
       1. add dolphinscheduler-listener-common as dependency
       2. implement methods in ListenerPlugin
       3. package into a JAR file
   ![create custom listener plugin](https://github.com/apache/dolphinscheduler/assets/33857431/6a7584d6-d5f8-4096-8c55-cd8fcd86357f)
   2. install listener plugin 
       upload plugin JAR file and input the full class name of ListenerPlugin in `Security module`.
   ![install listener plugin ](https://github.com/apache/dolphinscheduler/assets/33857431/59ece66c-0cfa-40af-a062-3f1ce4d091e8)
   3. update listener plugin
       edit listener plugin and upload new plugin JAR file in `Security module`.
   ![update listener plugin](https://github.com/apache/dolphinscheduler/assets/33857431/88def74b-3d84-4084-8835-4b592fffcfc5)
   4. uninstall listener plugin
       uninstall the listener plugin in `Security module`.(need to remove all related listener instances first)
   5. create listener instance (take LoggerPlugin as example)
       input instance name, check event types to subscribe to, select listener plugin and input plugin params in `Security module`.
   ![create listener instance ](https://github.com/apache/dolphinscheduler/assets/33857431/4f1e7cc8-b192-4f5d-bfae-068efb471d43)
   6. update listener instance
       update instance name, event types and plugin params in `Security module`.
   ![update listener instance](https://github.com/apache/dolphinscheduler/assets/33857431/35e422fd-1fa2-405d-8e74-831500a75934)
   7. delete listener instance
       delete listener instance in `Security module`.
   
   
   When install/update/uninstall listener plugins and create/update/delete listener instances, the Api Server will send `RPC request` to `Alert Server` to process.
   ![Untitled 5](https://github.com/apache/dolphinscheduler/assets/33857431/bbbb3e04-287c-489e-94a0-3f60e555f364)
   ListenerPluginService injected into AlertServer: 
   ![injected in AlertServer](https://github.com/apache/dolphinscheduler/assets/33857431/c22b7e61-2489-45c5-bffe-0a6a7bfa8176)
   
   ## Listener Event:
   ![Listener Event](https://github.com/apache/dolphinscheduler/assets/33857431/f086bc5e-a436-4456-85a1-ce3a8a0d4d97)
   Prerequisite: configure the listener type in application.yml (jdbc default)
   ![listener type](https://github.com/apache/dolphinscheduler/assets/33857431/e7e24ff8-536a-456a-b42e-80fcc1508dce)
   1. ListenerEvent:there are 13 sorts of `ListenerEvent` defined in `dolphinscheduler-listener-common`;
   2. `Api Server` and `Master Server` will generate listener events and publish to `ListenerEventPublishService` which save the events asynchronously (To prevent the loss and disorder of listener events, we `persistently` store the listener events).
   3. ListenerEvents will be saved in database by default `JdbcListenerEventProducer`. Users can implement their own producer to save events in redis or other places;
   4. Each listener instance has a corresponding `ListenerInstancePostService` which takes listener events which handled by this instance and process them. The events will be retrieved from database by default `JdbcListenerEventConsumer`. Users can implement their own customer to process events from redis or other places.
   5. `ListenerInstancePostService` will perfom the following actions periodically: 
       1. take events in chronological order first, 
       2.  process them sequencially
           1. If process successfully, invove `ListenerEventCosumer`.delete to remove the event.
           2. Otherwise, invove `ListenerEventCosumer`.upadte to update the failure reason, and stop processing subsequent events.
   
   ## Database Design:
   1. (modify) t_ds_plugin_define
   
      id, plugin_name, plugin_type, plugin_params, plugin_location (new column), plugin_class_name (new column), create_time, update_time
   
   2. (added) t_ds_listener_plugin_instance
   
      id, instance_name, plugin_define_id, plugin_instance_params, listener_event_types, create_time, update_time
   
   3. (added) t_ds_listener_event
   
      id, content, post_status, event_type, log, plugin_instance_id, create_time, update_time
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] weixiaonan1 closed pull request #14833: [Feature-14832][Listener]Implementation of Listener Mechanism

Posted by "weixiaonan1 (via GitHub)" <gi...@apache.org>.
weixiaonan1 closed pull request #14833: [Feature-14832][Listener]Implementation of Listener Mechanism
URL: https://github.com/apache/dolphinscheduler/pull/14833


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] ruanwenjun commented on a diff in pull request #14833: [Feature-14832][Listener]Implementation of Listener Mechanism

Posted by "ruanwenjun (via GitHub)" <gi...@apache.org>.
ruanwenjun commented on code in PR #14833:
URL: https://github.com/apache/dolphinscheduler/pull/14833#discussion_r1310271296


##########
dolphinscheduler-master/src/main/resources/application.yaml:
##########
@@ -83,6 +83,9 @@ registry:
     block-until-connected: 600ms
     digest: ~
 
+listener:

Review Comment:
   This config will make confusion.



##########
dolphinscheduler-listener/dolphinscheduler-listener-plugin/dolphinscheduler-listener-kafka/src/main/java/org/apache/dolphinscheduler/listener/KafkaListener.java:
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.listener;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.listener.event.ServerDownListenerEvent;
+import org.apache.dolphinscheduler.listener.event.TaskCreateListenerEvent;
+import org.apache.dolphinscheduler.listener.event.TaskEndListenerEvent;
+import org.apache.dolphinscheduler.listener.event.TaskFailListenerEvent;
+import org.apache.dolphinscheduler.listener.event.TaskRemoveListenerEvent;
+import org.apache.dolphinscheduler.listener.event.TaskStartListenerEvent;
+import org.apache.dolphinscheduler.listener.event.TaskUpdateListenerEvent;
+import org.apache.dolphinscheduler.listener.event.WorkflowCreateListenerEvent;
+import org.apache.dolphinscheduler.listener.event.WorkflowEndListenerEvent;
+import org.apache.dolphinscheduler.listener.event.WorkflowFailListenerEvent;
+import org.apache.dolphinscheduler.listener.event.WorkflowRemoveListenerEvent;
+import org.apache.dolphinscheduler.listener.event.WorkflowStartListenerEvent;
+import org.apache.dolphinscheduler.listener.event.WorkflowUpdateListenerEvent;
+import org.apache.dolphinscheduler.listener.plugin.ListenerPlugin;
+import org.apache.dolphinscheduler.spi.params.base.PluginParams;
+import org.apache.dolphinscheduler.spi.params.base.Validate;
+import org.apache.dolphinscheduler.spi.params.input.InputParam;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class KafkaListener implements ListenerPlugin {
+
+    private final Map<String, KafkaProducer<String, String>> kafkaProducers = new HashMap<>();
+
+    @Override
+    public String name() {
+        return "KafkaListener";
+    }
+
+    @Override
+    public List<PluginParams> params() {
+        List<PluginParams> paramsList = new ArrayList<>();
+        InputParam hostParam = InputParam.newBuilder("servers", "bootstrap.servers")
+                .setPlaceholder("please input servers")
+                .addValidate(Validate.newBuilder()
+                        .setRequired(true)
+                        .build())
+                .build();
+        InputParam topicParam = InputParam.newBuilder("topic", "topic")
+                .setPlaceholder("please input topic")
+                .addValidate(Validate.newBuilder()
+                        .setRequired(true)
+                        .build())
+                .build();
+        InputParam usernameParam = InputParam.newBuilder("username", "username")
+                .setPlaceholder("please input username")
+                .addValidate(Validate.newBuilder()
+                        .setRequired(false)
+                        .build())
+                .build();
+        InputParam passwordParam = InputParam.newBuilder("password", "password")
+                .setPlaceholder("please input password")
+                .addValidate(Validate.newBuilder()
+                        .setRequired(false)
+                        .build())
+                .build();
+        paramsList.add(hostParam);
+        paramsList.add(topicParam);
+        paramsList.add(usernameParam);
+        paramsList.add(passwordParam);
+        return paramsList;
+    }
+
+    @Override
+    public void onServerDown(ServerDownListenerEvent serverDownListenerEvent) {
+        sendEvent(serverDownListenerEvent.getListenerInstanceParams(), ServerDownListenerEvent.class.getSimpleName(),
+                JSONUtils.toJsonString(serverDownListenerEvent));
+    }
+
+    @Override
+    public void onWorkflowAdded(WorkflowCreateListenerEvent workflowCreateEvent) {
+        sendEvent(workflowCreateEvent.getListenerInstanceParams(), WorkflowCreateListenerEvent.class.getSimpleName(),
+                JSONUtils.toJsonString(workflowCreateEvent));
+    }
+
+    @Override
+    public void onWorkflowUpdate(WorkflowUpdateListenerEvent workflowUpdateEvent) {
+        sendEvent(workflowUpdateEvent.getListenerInstanceParams(), WorkflowUpdateListenerEvent.class.getSimpleName(),
+                JSONUtils.toJsonString(workflowUpdateEvent));
+    }
+
+    @Override
+    public void onWorkflowRemoved(WorkflowRemoveListenerEvent workflowRemovedEvent) {
+        sendEvent(workflowRemovedEvent.getListenerInstanceParams(), WorkflowRemoveListenerEvent.class.getSimpleName(),
+                JSONUtils.toJsonString(workflowRemovedEvent));
+    }
+
+    @Override
+    public void onWorkflowStart(WorkflowStartListenerEvent workflowStartEvent) {
+        sendEvent(workflowStartEvent.getListenerInstanceParams(), WorkflowStartListenerEvent.class.getSimpleName(),
+                JSONUtils.toJsonString(workflowStartEvent));
+
+    }
+
+    @Override
+    public void onWorkflowEnd(WorkflowEndListenerEvent workflowEndEvent) {
+        sendEvent(workflowEndEvent.getListenerInstanceParams(), WorkflowEndListenerEvent.class.getSimpleName(),
+                JSONUtils.toJsonString(workflowEndEvent));
+    }
+
+    @Override
+    public void onWorkflowFail(WorkflowFailListenerEvent workflowErrorEvent) {
+        sendEvent(workflowErrorEvent.getListenerInstanceParams(), WorkflowFailListenerEvent.class.getSimpleName(),
+                JSONUtils.toJsonString(workflowErrorEvent));
+    }
+
+    @Override
+    public void onTaskAdded(TaskCreateListenerEvent taskAddedEvent) {
+        sendEvent(taskAddedEvent.getListenerInstanceParams(), TaskCreateListenerEvent.class.getSimpleName(),
+                JSONUtils.toJsonString(taskAddedEvent));
+    }
+
+    @Override
+    public void onTaskUpdate(TaskUpdateListenerEvent taskUpdateEvent) {
+        sendEvent(taskUpdateEvent.getListenerInstanceParams(), TaskUpdateListenerEvent.class.getSimpleName(),
+                JSONUtils.toJsonString(taskUpdateEvent));
+    }
+
+    @Override
+    public void onTaskRemoved(TaskRemoveListenerEvent taskRemovedEvent) {
+        sendEvent(taskRemovedEvent.getListenerInstanceParams(), TaskRemoveListenerEvent.class.getSimpleName(),
+                JSONUtils.toJsonString(taskRemovedEvent));
+    }
+
+    @Override
+    public void onTaskStart(TaskStartListenerEvent taskStartEvent) {
+        sendEvent(taskStartEvent.getListenerInstanceParams(), TaskStartListenerEvent.class.getSimpleName(),
+                JSONUtils.toJsonString(taskStartEvent));
+    }
+
+    @Override
+    public void onTaskEnd(TaskEndListenerEvent taskEndEvent) {
+        sendEvent(taskEndEvent.getListenerInstanceParams(), TaskEndListenerEvent.class.getSimpleName(),
+                JSONUtils.toJsonString(taskEndEvent));
+    }
+
+    @Override
+    public void onTaskFail(TaskFailListenerEvent taskErrorEvent) {
+        sendEvent(taskErrorEvent.getListenerInstanceParams(), TaskFailListenerEvent.class.getSimpleName(),
+                JSONUtils.toJsonString(taskErrorEvent));
+    }
+
+    private void sendEvent(Map<String, String> listenerInstanceParams, String key, String value) {
+        String uniqueId = uniqueId(listenerInstanceParams);
+        if (!kafkaProducers.containsKey(uniqueId)) {
+            String kafkaBroker = listenerInstanceParams.get("servers");
+            String username = listenerInstanceParams.get("username");
+            String password = listenerInstanceParams.get("password");
+            Map<String, Object> configurations = new HashMap<>();
+            // TODO: when use username/password, throws exception: Unable to find LoginModule class:
+            // org.apache.kafka.common.security.plain.PlainLoginModule
+            configurations.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker);
+            configurations.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+            configurations.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+            if (StringUtils.isNotEmpty(username) && StringUtils.isNotEmpty(password)) {
+                configurations.put("sasl.jaas.config", String.format(
+                        "org.apache.kafka.common.security.plain.PlainLoginModule required username='%s' password='%s';",
+                        username, password));
+                configurations.put("security.protocol", "SASL_PLAINTEXT");
+                configurations.put("sasl.mechanism", "PLAIN");
+            }
+            KafkaProducer<String, String> producer = new KafkaProducer<>(configurations);
+            kafkaProducers.put(uniqueId, producer);
+
+        }
+        KafkaProducer<String, String> producer = kafkaProducers.get(uniqueId);
+        String topic = listenerInstanceParams.get("topic");
+        producer.send(new ProducerRecord<>(topic, key, value), (recordMetadata, e) -> {
+            if (e != null) {
+                throw new RuntimeException(e);
+            }
+        });
+    }
+
+    private String uniqueId(Map<String, String> listenerInstanceParams) {
+        String kafkaBroker = listenerInstanceParams.get("servers");
+        String topic = listenerInstanceParams.get("topic");
+        String username = listenerInstanceParams.getOrDefault("username", "foo");
+        String password = listenerInstanceParams.getOrDefault("password", "foo");

Review Comment:
   We don't need to add default username and password.



##########
dolphinscheduler-listener/dolphinscheduler-listener-service/src/main/java/org/apache/dolphinscheduler/listener/service/jdbc/mapper/ListenerEventMapper.java:
##########
@@ -0,0 +1,46 @@
+/*
+ *
+ *  * Licensed to Apache Software Foundation (ASF) under one or more contributor
+ *  * license agreements. See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright
+ *  * ownership. Apache Software Foundation (ASF) licenses this file to you under
+ *  * the Apache License, Version 2.0 (the "License"); you may
+ *  * not use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ *
+ */
+
+package org.apache.dolphinscheduler.listener.service.jdbc.mapper;
+
+import org.apache.dolphinscheduler.listener.service.jdbc.JdbcListenerEvent;
+
+import org.apache.ibatis.annotations.Insert;
+import org.apache.ibatis.annotations.Param;
+
+import java.util.List;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+
+public interface ListenerEventMapper extends BaseMapper<JdbcListenerEvent> {
+
+    @Insert({"<script>",
+            "        insert into t_ds_listener_event ( content, post_status, event_type, log, plugin_instance_id, create_time, update_time)",
+            "        values",
+            "        <foreach collection='jdbcListenerEvents' item='jdbcListenerEvent' separator=','>",
+            "            (#{jdbcListenerEvent.content},#{jdbcListenerEvent.postStatus}," +
+                    "            #{jdbcListenerEvent.eventType},#{jdbcListenerEvent.log},#{jdbcListenerEvent.pluginInstanceId}, #{jdbcListenerEvent.createTime}, #{jdbcListenerEvent.updateTime})"
+                    +
+                    "        </foreach>",
+            "</script>"})

Review Comment:
   Please don't use `@Annotation` and xml at the same time.



##########
dolphinscheduler-listener/dolphinscheduler-listener-plugin/dolphinscheduler-listener-kafka/src/main/java/org/apache/dolphinscheduler/listener/KafkaListener.java:
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.listener;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.listener.event.ServerDownListenerEvent;
+import org.apache.dolphinscheduler.listener.event.TaskCreateListenerEvent;
+import org.apache.dolphinscheduler.listener.event.TaskEndListenerEvent;
+import org.apache.dolphinscheduler.listener.event.TaskFailListenerEvent;
+import org.apache.dolphinscheduler.listener.event.TaskRemoveListenerEvent;
+import org.apache.dolphinscheduler.listener.event.TaskStartListenerEvent;
+import org.apache.dolphinscheduler.listener.event.TaskUpdateListenerEvent;
+import org.apache.dolphinscheduler.listener.event.WorkflowCreateListenerEvent;
+import org.apache.dolphinscheduler.listener.event.WorkflowEndListenerEvent;
+import org.apache.dolphinscheduler.listener.event.WorkflowFailListenerEvent;
+import org.apache.dolphinscheduler.listener.event.WorkflowRemoveListenerEvent;
+import org.apache.dolphinscheduler.listener.event.WorkflowStartListenerEvent;
+import org.apache.dolphinscheduler.listener.event.WorkflowUpdateListenerEvent;
+import org.apache.dolphinscheduler.listener.plugin.ListenerPlugin;
+import org.apache.dolphinscheduler.spi.params.base.PluginParams;
+import org.apache.dolphinscheduler.spi.params.base.Validate;
+import org.apache.dolphinscheduler.spi.params.input.InputParam;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class KafkaListener implements ListenerPlugin {
+
+    private final Map<String, KafkaProducer<String, String>> kafkaProducers = new HashMap<>();
+
+    @Override
+    public String name() {
+        return "KafkaListener";
+    }
+
+    @Override
+    public List<PluginParams> params() {
+        List<PluginParams> paramsList = new ArrayList<>();
+        InputParam hostParam = InputParam.newBuilder("servers", "bootstrap.servers")
+                .setPlaceholder("please input servers")
+                .addValidate(Validate.newBuilder()
+                        .setRequired(true)
+                        .build())
+                .build();
+        InputParam topicParam = InputParam.newBuilder("topic", "topic")
+                .setPlaceholder("please input topic")
+                .addValidate(Validate.newBuilder()
+                        .setRequired(true)
+                        .build())
+                .build();
+        InputParam usernameParam = InputParam.newBuilder("username", "username")
+                .setPlaceholder("please input username")
+                .addValidate(Validate.newBuilder()
+                        .setRequired(false)
+                        .build())
+                .build();
+        InputParam passwordParam = InputParam.newBuilder("password", "password")
+                .setPlaceholder("please input password")
+                .addValidate(Validate.newBuilder()
+                        .setRequired(false)
+                        .build())
+                .build();
+        paramsList.add(hostParam);
+        paramsList.add(topicParam);
+        paramsList.add(usernameParam);
+        paramsList.add(passwordParam);
+        return paramsList;
+    }
+
+    @Override
+    public void onServerDown(ServerDownListenerEvent serverDownListenerEvent) {
+        sendEvent(serverDownListenerEvent.getListenerInstanceParams(), ServerDownListenerEvent.class.getSimpleName(),
+                JSONUtils.toJsonString(serverDownListenerEvent));
+    }
+
+    @Override
+    public void onWorkflowAdded(WorkflowCreateListenerEvent workflowCreateEvent) {
+        sendEvent(workflowCreateEvent.getListenerInstanceParams(), WorkflowCreateListenerEvent.class.getSimpleName(),
+                JSONUtils.toJsonString(workflowCreateEvent));
+    }
+
+    @Override
+    public void onWorkflowUpdate(WorkflowUpdateListenerEvent workflowUpdateEvent) {
+        sendEvent(workflowUpdateEvent.getListenerInstanceParams(), WorkflowUpdateListenerEvent.class.getSimpleName(),
+                JSONUtils.toJsonString(workflowUpdateEvent));
+    }
+
+    @Override
+    public void onWorkflowRemoved(WorkflowRemoveListenerEvent workflowRemovedEvent) {
+        sendEvent(workflowRemovedEvent.getListenerInstanceParams(), WorkflowRemoveListenerEvent.class.getSimpleName(),
+                JSONUtils.toJsonString(workflowRemovedEvent));
+    }
+
+    @Override
+    public void onWorkflowStart(WorkflowStartListenerEvent workflowStartEvent) {
+        sendEvent(workflowStartEvent.getListenerInstanceParams(), WorkflowStartListenerEvent.class.getSimpleName(),
+                JSONUtils.toJsonString(workflowStartEvent));
+
+    }
+
+    @Override
+    public void onWorkflowEnd(WorkflowEndListenerEvent workflowEndEvent) {
+        sendEvent(workflowEndEvent.getListenerInstanceParams(), WorkflowEndListenerEvent.class.getSimpleName(),
+                JSONUtils.toJsonString(workflowEndEvent));
+    }
+
+    @Override
+    public void onWorkflowFail(WorkflowFailListenerEvent workflowErrorEvent) {
+        sendEvent(workflowErrorEvent.getListenerInstanceParams(), WorkflowFailListenerEvent.class.getSimpleName(),
+                JSONUtils.toJsonString(workflowErrorEvent));
+    }
+
+    @Override
+    public void onTaskAdded(TaskCreateListenerEvent taskAddedEvent) {
+        sendEvent(taskAddedEvent.getListenerInstanceParams(), TaskCreateListenerEvent.class.getSimpleName(),
+                JSONUtils.toJsonString(taskAddedEvent));
+    }
+
+    @Override
+    public void onTaskUpdate(TaskUpdateListenerEvent taskUpdateEvent) {
+        sendEvent(taskUpdateEvent.getListenerInstanceParams(), TaskUpdateListenerEvent.class.getSimpleName(),
+                JSONUtils.toJsonString(taskUpdateEvent));
+    }
+
+    @Override
+    public void onTaskRemoved(TaskRemoveListenerEvent taskRemovedEvent) {
+        sendEvent(taskRemovedEvent.getListenerInstanceParams(), TaskRemoveListenerEvent.class.getSimpleName(),
+                JSONUtils.toJsonString(taskRemovedEvent));
+    }
+
+    @Override
+    public void onTaskStart(TaskStartListenerEvent taskStartEvent) {
+        sendEvent(taskStartEvent.getListenerInstanceParams(), TaskStartListenerEvent.class.getSimpleName(),
+                JSONUtils.toJsonString(taskStartEvent));
+    }
+
+    @Override
+    public void onTaskEnd(TaskEndListenerEvent taskEndEvent) {
+        sendEvent(taskEndEvent.getListenerInstanceParams(), TaskEndListenerEvent.class.getSimpleName(),
+                JSONUtils.toJsonString(taskEndEvent));
+    }
+
+    @Override
+    public void onTaskFail(TaskFailListenerEvent taskErrorEvent) {
+        sendEvent(taskErrorEvent.getListenerInstanceParams(), TaskFailListenerEvent.class.getSimpleName(),
+                JSONUtils.toJsonString(taskErrorEvent));
+    }
+
+    private void sendEvent(Map<String, String> listenerInstanceParams, String key, String value) {
+        String uniqueId = uniqueId(listenerInstanceParams);
+        if (!kafkaProducers.containsKey(uniqueId)) {
+            String kafkaBroker = listenerInstanceParams.get("servers");
+            String username = listenerInstanceParams.get("username");
+            String password = listenerInstanceParams.get("password");
+            Map<String, Object> configurations = new HashMap<>();
+            // TODO: when use username/password, throws exception: Unable to find LoginModule class:
+            // org.apache.kafka.common.security.plain.PlainLoginModule
+            configurations.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker);
+            configurations.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+            configurations.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+            if (StringUtils.isNotEmpty(username) && StringUtils.isNotEmpty(password)) {
+                configurations.put("sasl.jaas.config", String.format(
+                        "org.apache.kafka.common.security.plain.PlainLoginModule required username='%s' password='%s';",
+                        username, password));
+                configurations.put("security.protocol", "SASL_PLAINTEXT");
+                configurations.put("sasl.mechanism", "PLAIN");

Review Comment:
   Why don't expose this config,



##########
dolphinscheduler-listener/dolphinscheduler-listener-plugin/dolphinscheduler-listener-kafka/src/main/java/org/apache/dolphinscheduler/listener/KafkaListener.java:
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.listener;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.listener.event.ServerDownListenerEvent;
+import org.apache.dolphinscheduler.listener.event.TaskCreateListenerEvent;
+import org.apache.dolphinscheduler.listener.event.TaskEndListenerEvent;
+import org.apache.dolphinscheduler.listener.event.TaskFailListenerEvent;
+import org.apache.dolphinscheduler.listener.event.TaskRemoveListenerEvent;
+import org.apache.dolphinscheduler.listener.event.TaskStartListenerEvent;
+import org.apache.dolphinscheduler.listener.event.TaskUpdateListenerEvent;
+import org.apache.dolphinscheduler.listener.event.WorkflowCreateListenerEvent;
+import org.apache.dolphinscheduler.listener.event.WorkflowEndListenerEvent;
+import org.apache.dolphinscheduler.listener.event.WorkflowFailListenerEvent;
+import org.apache.dolphinscheduler.listener.event.WorkflowRemoveListenerEvent;
+import org.apache.dolphinscheduler.listener.event.WorkflowStartListenerEvent;
+import org.apache.dolphinscheduler.listener.event.WorkflowUpdateListenerEvent;
+import org.apache.dolphinscheduler.listener.plugin.ListenerPlugin;
+import org.apache.dolphinscheduler.spi.params.base.PluginParams;
+import org.apache.dolphinscheduler.spi.params.base.Validate;
+import org.apache.dolphinscheduler.spi.params.input.InputParam;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class KafkaListener implements ListenerPlugin {
+
+    private final Map<String, KafkaProducer<String, String>> kafkaProducers = new HashMap<>();
+
+    @Override
+    public String name() {
+        return "KafkaListener";
+    }
+
+    @Override
+    public List<PluginParams> params() {
+        List<PluginParams> paramsList = new ArrayList<>();
+        InputParam hostParam = InputParam.newBuilder("servers", "bootstrap.servers")
+                .setPlaceholder("please input servers")
+                .addValidate(Validate.newBuilder()
+                        .setRequired(true)
+                        .build())
+                .build();
+        InputParam topicParam = InputParam.newBuilder("topic", "topic")
+                .setPlaceholder("please input topic")
+                .addValidate(Validate.newBuilder()
+                        .setRequired(true)
+                        .build())
+                .build();
+        InputParam usernameParam = InputParam.newBuilder("username", "username")
+                .setPlaceholder("please input username")
+                .addValidate(Validate.newBuilder()
+                        .setRequired(false)
+                        .build())
+                .build();
+        InputParam passwordParam = InputParam.newBuilder("password", "password")
+                .setPlaceholder("please input password")
+                .addValidate(Validate.newBuilder()
+                        .setRequired(false)
+                        .build())
+                .build();
+        paramsList.add(hostParam);
+        paramsList.add(topicParam);
+        paramsList.add(usernameParam);
+        paramsList.add(passwordParam);
+        return paramsList;
+    }
+
+    @Override
+    public void onServerDown(ServerDownListenerEvent serverDownListenerEvent) {
+        sendEvent(serverDownListenerEvent.getListenerInstanceParams(), ServerDownListenerEvent.class.getSimpleName(),
+                JSONUtils.toJsonString(serverDownListenerEvent));
+    }
+
+    @Override
+    public void onWorkflowAdded(WorkflowCreateListenerEvent workflowCreateEvent) {
+        sendEvent(workflowCreateEvent.getListenerInstanceParams(), WorkflowCreateListenerEvent.class.getSimpleName(),
+                JSONUtils.toJsonString(workflowCreateEvent));
+    }
+
+    @Override
+    public void onWorkflowUpdate(WorkflowUpdateListenerEvent workflowUpdateEvent) {
+        sendEvent(workflowUpdateEvent.getListenerInstanceParams(), WorkflowUpdateListenerEvent.class.getSimpleName(),
+                JSONUtils.toJsonString(workflowUpdateEvent));
+    }
+
+    @Override
+    public void onWorkflowRemoved(WorkflowRemoveListenerEvent workflowRemovedEvent) {
+        sendEvent(workflowRemovedEvent.getListenerInstanceParams(), WorkflowRemoveListenerEvent.class.getSimpleName(),
+                JSONUtils.toJsonString(workflowRemovedEvent));
+    }
+
+    @Override
+    public void onWorkflowStart(WorkflowStartListenerEvent workflowStartEvent) {
+        sendEvent(workflowStartEvent.getListenerInstanceParams(), WorkflowStartListenerEvent.class.getSimpleName(),
+                JSONUtils.toJsonString(workflowStartEvent));
+
+    }
+
+    @Override
+    public void onWorkflowEnd(WorkflowEndListenerEvent workflowEndEvent) {
+        sendEvent(workflowEndEvent.getListenerInstanceParams(), WorkflowEndListenerEvent.class.getSimpleName(),
+                JSONUtils.toJsonString(workflowEndEvent));
+    }
+
+    @Override
+    public void onWorkflowFail(WorkflowFailListenerEvent workflowErrorEvent) {
+        sendEvent(workflowErrorEvent.getListenerInstanceParams(), WorkflowFailListenerEvent.class.getSimpleName(),
+                JSONUtils.toJsonString(workflowErrorEvent));
+    }
+
+    @Override
+    public void onTaskAdded(TaskCreateListenerEvent taskAddedEvent) {
+        sendEvent(taskAddedEvent.getListenerInstanceParams(), TaskCreateListenerEvent.class.getSimpleName(),
+                JSONUtils.toJsonString(taskAddedEvent));
+    }
+
+    @Override
+    public void onTaskUpdate(TaskUpdateListenerEvent taskUpdateEvent) {
+        sendEvent(taskUpdateEvent.getListenerInstanceParams(), TaskUpdateListenerEvent.class.getSimpleName(),
+                JSONUtils.toJsonString(taskUpdateEvent));
+    }
+
+    @Override
+    public void onTaskRemoved(TaskRemoveListenerEvent taskRemovedEvent) {
+        sendEvent(taskRemovedEvent.getListenerInstanceParams(), TaskRemoveListenerEvent.class.getSimpleName(),
+                JSONUtils.toJsonString(taskRemovedEvent));
+    }
+
+    @Override
+    public void onTaskStart(TaskStartListenerEvent taskStartEvent) {
+        sendEvent(taskStartEvent.getListenerInstanceParams(), TaskStartListenerEvent.class.getSimpleName(),
+                JSONUtils.toJsonString(taskStartEvent));
+    }
+
+    @Override
+    public void onTaskEnd(TaskEndListenerEvent taskEndEvent) {
+        sendEvent(taskEndEvent.getListenerInstanceParams(), TaskEndListenerEvent.class.getSimpleName(),
+                JSONUtils.toJsonString(taskEndEvent));
+    }
+
+    @Override
+    public void onTaskFail(TaskFailListenerEvent taskErrorEvent) {
+        sendEvent(taskErrorEvent.getListenerInstanceParams(), TaskFailListenerEvent.class.getSimpleName(),
+                JSONUtils.toJsonString(taskErrorEvent));
+    }
+
+    private void sendEvent(Map<String, String> listenerInstanceParams, String key, String value) {
+        String uniqueId = uniqueId(listenerInstanceParams);
+        if (!kafkaProducers.containsKey(uniqueId)) {
+            String kafkaBroker = listenerInstanceParams.get("servers");
+            String username = listenerInstanceParams.get("username");
+            String password = listenerInstanceParams.get("password");
+            Map<String, Object> configurations = new HashMap<>();
+            // TODO: when use username/password, throws exception: Unable to find LoginModule class:
+            // org.apache.kafka.common.security.plain.PlainLoginModule
+            configurations.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker);
+            configurations.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+            configurations.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+            if (StringUtils.isNotEmpty(username) && StringUtils.isNotEmpty(password)) {
+                configurations.put("sasl.jaas.config", String.format(
+                        "org.apache.kafka.common.security.plain.PlainLoginModule required username='%s' password='%s';",
+                        username, password));
+                configurations.put("security.protocol", "SASL_PLAINTEXT");
+                configurations.put("sasl.mechanism", "PLAIN");
+            }
+            KafkaProducer<String, String> producer = new KafkaProducer<>(configurations);
+            kafkaProducers.put(uniqueId, producer);
+
+        }
+        KafkaProducer<String, String> producer = kafkaProducers.get(uniqueId);
+        String topic = listenerInstanceParams.get("topic");
+        producer.send(new ProducerRecord<>(topic, key, value), (recordMetadata, e) -> {

Review Comment:
   You said you need to `ensures that messages are neither lost nor unordered` but this is asyn send, how can you ensure the message doesn't lost.



##########
dolphinscheduler-listener/dolphinscheduler-listener-service/src/main/java/org/apache/dolphinscheduler/listener/service/ListenerEventPublishService.java:
##########
@@ -0,0 +1,273 @@
+/*
+ *
+ *  * Licensed to Apache Software Foundation (ASF) under one or more contributor
+ *  * license agreements. See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright
+ *  * ownership. Apache Software Foundation (ASF) licenses this file to you under
+ *  * the Apache License, Version 2.0 (the "License"); you may
+ *  * not use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ *
+ */
+
+package org.apache.dolphinscheduler.listener.service;
+
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
+import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.ProjectUser;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.listener.enums.ListenerEventType;
+import org.apache.dolphinscheduler.listener.event.ListenerEvent;
+import org.apache.dolphinscheduler.listener.event.TaskCreateListenerEvent;
+import org.apache.dolphinscheduler.listener.event.TaskEndListenerEvent;
+import org.apache.dolphinscheduler.listener.event.TaskFailListenerEvent;
+import org.apache.dolphinscheduler.listener.event.TaskRemoveListenerEvent;
+import org.apache.dolphinscheduler.listener.event.TaskStartListenerEvent;
+import org.apache.dolphinscheduler.listener.event.TaskUpdateListenerEvent;
+import org.apache.dolphinscheduler.listener.event.WorkflowCreateListenerEvent;
+import org.apache.dolphinscheduler.listener.event.WorkflowEndListenerEvent;
+import org.apache.dolphinscheduler.listener.event.WorkflowFailListenerEvent;
+import org.apache.dolphinscheduler.listener.event.WorkflowRemoveListenerEvent;
+import org.apache.dolphinscheduler.listener.event.WorkflowStartListenerEvent;
+import org.apache.dolphinscheduler.listener.event.WorkflowUpdateListenerEvent;
+
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.annotation.PostConstruct;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+@Slf4j
+public class ListenerEventPublishService {
+
+    private final BlockingQueue<ListenerEvent> listenerEventQueue = new LinkedBlockingQueue<>();
+
+    @Autowired
+    private ListenerEventProducer producer;
+
+    /**
+     * create a daemon thread to process the listener event queue
+     */
+    @PostConstruct
+    private void init() {
+        Thread thread = new Thread(this::doPublish);
+        thread.setDaemon(true);
+        thread.setName("Listener-Event-Produce-Thread");
+        thread.start();
+    }
+
+    public void publish(ListenerEvent listenerEvent) {
+        if (!listenerEventQueue.offer(listenerEvent)) {
+            log.error("Publish listener event failed, message:{}", listenerEvent);
+        }
+    }

Review Comment:
   If the downstream producer crash, this will cause master OOM, this is not reasonable, DolphinScheduler is use to schedule and trigger workflow, why `event` handle will affect its available?



##########
dolphinscheduler-listener/dolphinscheduler-listener-plugin/dolphinscheduler-listener-logger/src/main/java/org/apache/dolphinscheduler/listener/LoggerListener.java:
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.listener;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.listener.event.ServerDownListenerEvent;
+import org.apache.dolphinscheduler.listener.event.TaskCreateListenerEvent;
+import org.apache.dolphinscheduler.listener.event.TaskEndListenerEvent;
+import org.apache.dolphinscheduler.listener.event.TaskFailListenerEvent;
+import org.apache.dolphinscheduler.listener.event.TaskRemoveListenerEvent;
+import org.apache.dolphinscheduler.listener.event.TaskStartListenerEvent;
+import org.apache.dolphinscheduler.listener.event.TaskUpdateListenerEvent;
+import org.apache.dolphinscheduler.listener.event.WorkflowCreateListenerEvent;
+import org.apache.dolphinscheduler.listener.event.WorkflowEndListenerEvent;
+import org.apache.dolphinscheduler.listener.event.WorkflowFailListenerEvent;
+import org.apache.dolphinscheduler.listener.event.WorkflowRemoveListenerEvent;
+import org.apache.dolphinscheduler.listener.event.WorkflowStartListenerEvent;
+import org.apache.dolphinscheduler.listener.event.WorkflowUpdateListenerEvent;
+import org.apache.dolphinscheduler.listener.plugin.ListenerPlugin;
+import org.apache.dolphinscheduler.spi.params.base.PluginParams;
+import org.apache.dolphinscheduler.spi.params.base.Validate;
+import org.apache.dolphinscheduler.spi.params.input.InputParam;
+
+import java.io.BufferedWriter;
+import java.io.FileWriter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class LoggerListener implements ListenerPlugin {
+
+    @Override
+    public String name() {
+        return "LoggerListener";
+    }
+
+    @Override
+    public List<PluginParams> params() {
+        List<PluginParams> paramsList = new ArrayList<>();
+        InputParam param1 = InputParam.newBuilder("logFile", "log_file")
+                .setPlaceholder("please input log file")
+                .addValidate(Validate.newBuilder()
+                        .setRequired(true)
+                        .build())
+                .build();
+        paramsList.add(param1);
+        return paramsList;
+    }
+
+    @Override
+    public void onServerDown(ServerDownListenerEvent serverDownListenerEvent) {
+        printLogIntoFile(serverDownListenerEvent.getListenerInstanceParams(),
+                JSONUtils.toJsonString(serverDownListenerEvent));
+    }
+
+    @Override
+    public void onWorkflowAdded(WorkflowCreateListenerEvent workflowCreateEvent) {
+        printLogIntoFile(workflowCreateEvent.getListenerInstanceParams(), JSONUtils.toJsonString(workflowCreateEvent));
+    }
+
+    @Override
+    public void onWorkflowUpdate(WorkflowUpdateListenerEvent workflowUpdateEvent) {
+        printLogIntoFile(workflowUpdateEvent.getListenerInstanceParams(), JSONUtils.toJsonString(workflowUpdateEvent));
+    }
+
+    @Override
+    public void onWorkflowRemoved(WorkflowRemoveListenerEvent workflowRemovedEvent) {
+        printLogIntoFile(workflowRemovedEvent.getListenerInstanceParams(),
+                JSONUtils.toJsonString(workflowRemovedEvent));
+    }
+
+    @Override
+    public void onWorkflowStart(WorkflowStartListenerEvent workflowStartEvent) {
+        printLogIntoFile(workflowStartEvent.getListenerInstanceParams(), JSONUtils.toJsonString(workflowStartEvent));
+    }
+
+    @Override
+    public void onWorkflowEnd(WorkflowEndListenerEvent workflowEndEvent) {
+        printLogIntoFile(workflowEndEvent.getListenerInstanceParams(), JSONUtils.toJsonString(workflowEndEvent));
+    }
+
+    @Override
+    public void onWorkflowFail(WorkflowFailListenerEvent workflowErrorEvent) {
+        printLogIntoFile(workflowErrorEvent.getListenerInstanceParams(), JSONUtils.toJsonString(workflowErrorEvent));
+    }
+
+    @Override
+    public void onTaskAdded(TaskCreateListenerEvent taskAddedEvent) {
+        printLogIntoFile(taskAddedEvent.getListenerInstanceParams(), JSONUtils.toJsonString(taskAddedEvent));
+    }
+
+    @Override
+    public void onTaskUpdate(TaskUpdateListenerEvent taskUpdateEvent) {
+        printLogIntoFile(taskUpdateEvent.getListenerInstanceParams(), JSONUtils.toJsonString(taskUpdateEvent));
+    }
+
+    @Override
+    public void onTaskRemoved(TaskRemoveListenerEvent taskRemovedEvent) {
+        printLogIntoFile(taskRemovedEvent.getListenerInstanceParams(), JSONUtils.toJsonString(taskRemovedEvent));
+    }
+
+    @Override
+    public void onTaskStart(TaskStartListenerEvent taskStartEvent) {
+        printLogIntoFile(taskStartEvent.getListenerInstanceParams(), JSONUtils.toJsonString(taskStartEvent));
+    }
+
+    @Override
+    public void onTaskEnd(TaskEndListenerEvent taskEndEvent) {
+        printLogIntoFile(taskEndEvent.getListenerInstanceParams(), JSONUtils.toJsonString(taskEndEvent));
+    }
+
+    @Override
+    public void onTaskFail(TaskFailListenerEvent taskErrorEvent) {
+        printLogIntoFile(taskErrorEvent.getListenerInstanceParams(), JSONUtils.toJsonString(taskErrorEvent));
+    }
+
+    private void printLogIntoFile(Map<String, String> listenerInstanceParams, String content) {
+        String logFile = listenerInstanceParams.get("logFile");
+        try (BufferedWriter writer = new BufferedWriter(new FileWriter(logFile, true))) {
+            writer.write(content);
+            writer.newLine();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }

Review Comment:
   Have you test the performance? AFAIK, this is very slow.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] songjianet commented on a diff in pull request #14833: [Feature-14832][Listener]Implementation of Listener Mechanism

Posted by "songjianet (via GitHub)" <gi...@apache.org>.
songjianet commented on code in PR #14833:
URL: https://github.com/apache/dolphinscheduler/pull/14833#discussion_r1311115183


##########
dolphinscheduler-ui/src/views/security/listener-instance-manage/use-form.ts:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { reactive, ref, Ref } from 'vue'
+import { useI18n } from 'vue-i18n'
+import {
+  queryUiPluginsByType,
+  queryUiPluginDetailById
+} from '@/service/modules/ui-plugins'
+import type {
+  IPluginId,
+  IPlugin,
+  IFormRules,
+  IMeta,
+  IJsonItem,
+  IRecord
+} from './types'
+export function useForm() {
+  const { t } = useI18n()
+
+  const eventTypes = {
+    SERVER_DOWN: {
+      value: "SERVER_DOWN",

Review Comment:
   Please use single quotes.



##########
dolphinscheduler-ui/src/views/security/listener-instance-manage/detail.tsx:
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import {
+  defineComponent,
+  toRefs,
+  watch,
+  onMounted,
+  ref,
+  getCurrentInstance
+} from 'vue'
+import { NSelect, NInput, NCheckboxGroup, NSpace, NCheckbox } from 'naive-ui'
+import { isFunction } from 'lodash'
+import { useI18n } from 'vue-i18n'
+import { useForm } from './use-form'
+import { useDetail } from './use-detail'
+import Modal from '@/components/modal'
+import Form from '@/components/form'
+import getElementByJson from '@/components/form/get-elements-by-json'
+import type { IRecord, IFormRules, IFormItem } from './types'
+import type { PropType, Ref } from 'vue'
+import { stateType } from '@/common/common'
+
+interface IElements extends Omit<Ref, 'value'> {
+  value: IFormItem[]
+}
+
+const props = {
+  show: {
+    type: Boolean as PropType<boolean>,
+    default: false
+  },
+  currentRecord: {
+    type: Object as PropType<IRecord>,
+    default: {}
+  }
+}
+const DetailModal = defineComponent({
+  name: 'DetailModal',
+  props,
+  emits: ['cancel', 'update'],
+  setup(props, ctx) {
+    const { t } = useI18n()
+
+    const rules = ref<IFormRules>({})
+    const elements = ref<IFormItem[]>([]) as IElements
+    const {
+      meta,
+      state,
+      eventTypes,
+      setDetail,
+      initForm,
+      resetForm,
+      getFormValues,
+      changePlugin
+    } = useForm()
+
+    const { status, createOrUpdate } = useDetail(getFormValues)
+
+    const onCancel = () => {
+      resetForm()
+      rules.value = {}
+      elements.value = []
+      ctx.emit('cancel')
+    }
+
+    const onSubmit = async () => {
+      await state.detailFormRef.validate()
+      const res = await createOrUpdate(props.currentRecord, state.json)
+      if (res) {
+        onCancel()
+        ctx.emit('update')
+      }
+    }
+    const onChangePlugin = changePlugin
+
+    const trim = getCurrentInstance()?.appContext.config.globalProperties.trim
+
+    watch(
+      () => props.show,
+      async () => {
+        props.show && props.currentRecord && setDetail(props.currentRecord)
+      }
+    )
+    watch(
+      () => state.json,
+      () => {
+        if (!state.json?.length) return
+        state.json.forEach((item) => {
+          const mergedItem = isFunction(item) ? item() : item
+          mergedItem.name = mergedItem.title
+        })
+        const { rules: fieldsRules, elements: fieldsElements } =
+          getElementByJson(state.json, state.detailForm)
+        rules.value = fieldsRules
+        elements.value = fieldsElements
+      }
+    )
+
+    onMounted(() => {
+      initForm()
+    })
+
+    return {
+      t,
+      ...toRefs(state),
+      ...toRefs(status),
+      meta,
+      rules,
+      elements,
+      eventTypes,
+      onChangePlugin,
+      onSubmit,
+      onCancel,
+      trim
+    }
+  },
+  render(props: { currentRecord: IRecord }) {
+    const {
+      show,
+      t,
+      meta,
+      rules,
+      elements,
+      detailForm,
+      uiPlugins,
+      eventTypes,
+      pluginsLoading,
+      loading,
+      saving,
+      onChangePlugin,
+      onCancel,
+      onSubmit
+    } = this
+    const { currentRecord } = props
+    return (
+      <Modal
+        show={show}
+        title={t(
+          currentRecord?.id
+            ? 'security.listener_instance.edit_listener_instance'
+            : 'security.listener_instance.create_listener_instance'
+        )}
+        onConfirm={onSubmit}
+        confirmLoading={saving || loading}
+        onCancel={onCancel}
+      >
+        {{
+          default: () => (
+            <Form
+              ref='detailFormRef'
+              loading={loading || pluginsLoading}
+              meta={{
+                ...meta,
+                rules: {
+                  ...meta.rules,
+                  ...rules
+                },
+                elements: [
+                  {
+                    path: 'instanceName',
+                    label: t('security.listener_instance.instance_name'),
+                    widget: (
+                      <NInput
+                        allowInput={this.trim}
+                        v-model={[detailForm.instanceName, 'value']}
+                        placeholder={t(
+                          'security.listener_instance.instance_name_tips'
+                        )}
+                      />
+                    )
+                  },
+                  {
+                    path: 'listenerEventTypes',
+                    label: t('security.listener_instance.listener_event_types'),
+                    widget: (
+                      <NCheckboxGroup 
+                        disabled={!this.trim}
+                        v-model={[detailForm.listenerEventTypes, 'value']}>
+                        <NSpace style="display: flex;">
+                          {
+                            Object.values(
+                              eventTypes
+                            ).map((item)=>{
+                              return <NCheckbox value={item.value} label={item.label} defaultChecked={true}/>
+                            })

Review Comment:
   `map((item) => <NCheckbox value={item.value} label={item.label} defaultChecked={true} />)`



##########
dolphinscheduler-ui/src/views/security/listener-plugin-manage/components/listener-plugin-modal.tsx:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import {
+  defineComponent,
+  getCurrentInstance,
+  PropType,

Review Comment:
   type PropType



##########
dolphinscheduler-ui/src/views/security/listener-plugin-manage/components/use-modalData.ts:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { reactive, ref, SetupContext } from 'vue'

Review Comment:
   type SetupContext



##########
dolphinscheduler-ui/src/views/security/listener-plugin-manage/components/listener-plugin-modal.tsx:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import {
+  defineComponent,
+  getCurrentInstance,
+  PropType,
+  toRefs,
+  watch
+} from 'vue'
+import Modal from '@/components/modal'
+import { NButton, NForm, NFormItem, NInput, NSelect, NUpload } from 'naive-ui'
+import { useModalData } from './use-modalData'
+import { useI18n } from 'vue-i18n'
+
+const ListenerPluginModal = defineComponent({
+  name: 'tenant-modal',
+  props: {
+    showModalRef: {
+      type: Boolean as PropType<boolean>,
+      default: false
+    },
+    statusRef: {
+      type: Number as PropType<number>,
+      default: 0
+    },
+    row: {
+      type: Object as PropType<any>,
+      default: {}
+    }
+  },
+  emits: ['cancelModal', 'confirmModal'],
+  setup(props, ctx) {
+    const { variables, handleValidate } = useModalData(props, ctx)
+    const { t } = useI18n()
+
+    const cancelModal = () => {
+      if (props.statusRef === 0) {
+        variables.model.classPath = ''
+        variables.model.pluginJar = ''
+      }
+      ctx.emit('cancelModal', props.showModalRef)
+    }
+
+    const confirmModal = () => {
+      handleValidate(props.statusRef)
+    }
+
+    const trim = getCurrentInstance()?.appContext.config.globalProperties.trim
+
+    const customRequest = ({ file }: any) => {
+      variables.model.pluginJar = file.file
+      variables.listenerPluginFormRef.validate()
+    }
+  
+    // watch(
+    //   () => props.showModalRef,
+    //   () => {
+    //     props.showModalRef && getListData(props.statusRef)
+    //   }
+    // )

Review Comment:
   Delete this.



##########
dolphinscheduler-ui/src/views/security/listener-instance-manage/detail.tsx:
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import {
+  defineComponent,
+  toRefs,
+  watch,
+  onMounted,
+  ref,
+  getCurrentInstance
+} from 'vue'
+import { NSelect, NInput, NCheckboxGroup, NSpace, NCheckbox } from 'naive-ui'
+import { isFunction } from 'lodash'
+import { useI18n } from 'vue-i18n'
+import { useForm } from './use-form'
+import { useDetail } from './use-detail'
+import Modal from '@/components/modal'
+import Form from '@/components/form'
+import getElementByJson from '@/components/form/get-elements-by-json'
+import type { IRecord, IFormRules, IFormItem } from './types'
+import type { PropType, Ref } from 'vue'
+import { stateType } from '@/common/common'
+
+interface IElements extends Omit<Ref, 'value'> {
+  value: IFormItem[]
+}
+
+const props = {
+  show: {
+    type: Boolean as PropType<boolean>,
+    default: false
+  },
+  currentRecord: {
+    type: Object as PropType<IRecord>,
+    default: {}
+  }
+}
+const DetailModal = defineComponent({
+  name: 'DetailModal',
+  props,
+  emits: ['cancel', 'update'],
+  setup(props, ctx) {
+    const { t } = useI18n()
+
+    const rules = ref<IFormRules>({})
+    const elements = ref<IFormItem[]>([]) as IElements
+    const {
+      meta,
+      state,
+      eventTypes,
+      setDetail,
+      initForm,
+      resetForm,
+      getFormValues,
+      changePlugin
+    } = useForm()
+
+    const { status, createOrUpdate } = useDetail(getFormValues)
+
+    const onCancel = () => {
+      resetForm()
+      rules.value = {}
+      elements.value = []
+      ctx.emit('cancel')
+    }
+
+    const onSubmit = async () => {
+      await state.detailFormRef.validate()
+      const res = await createOrUpdate(props.currentRecord, state.json)
+      if (res) {
+        onCancel()
+        ctx.emit('update')
+      }

Review Comment:
   if (!res) return false
   onCancel()
   ctx.emit('update')



##########
dolphinscheduler-ui/src/views/security/listener-instance-manage/use-table.ts:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { reactive } from 'vue'
+import {
+  queryListenerInstanceListPaging,
+  deleteListenerInstanceById
+} from '@/service/modules/listener-instance'
+import { format } from 'date-fns'
+import { parseTime } from '@/common/common'
+import type { IRecord } from './types'
+
+export function useTable() {
+  const data = reactive({
+    page: 1,
+    pageSize: 10,
+    itemCount: 0,
+    searchVal: '',
+    list: [],
+    loading: false
+  })
+
+  const getList = async () => {
+    if (data.loading) return
+    data.loading = true
+
+    const { totalList, total } = await queryListenerInstanceListPaging({
+      pageNo: data.page,
+      pageSize: data.pageSize,
+      searchVal: data.searchVal
+    })
+    data.loading = false
+    if (!totalList) throw Error()
+    data.list = totalList.map((record: IRecord) => {
+      record.createTime = record.createTime
+        ? format(parseTime(record.createTime), 'yyyy-MM-dd HH:mm:ss')
+        : ''

Review Comment:
   `record.createTime && record.createTime = format(parseTime(record.createTime), 'yyyy-MM-dd HH:mm:ss')`



##########
dolphinscheduler-ui/src/views/security/listener-instance-manage/use-table.ts:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { reactive } from 'vue'
+import {
+  queryListenerInstanceListPaging,
+  deleteListenerInstanceById
+} from '@/service/modules/listener-instance'
+import { format } from 'date-fns'
+import { parseTime } from '@/common/common'
+import type { IRecord } from './types'
+
+export function useTable() {
+  const data = reactive({
+    page: 1,
+    pageSize: 10,
+    itemCount: 0,
+    searchVal: '',
+    list: [],
+    loading: false
+  })
+
+  const getList = async () => {
+    if (data.loading) return
+    data.loading = true
+
+    const { totalList, total } = await queryListenerInstanceListPaging({
+      pageNo: data.page,
+      pageSize: data.pageSize,
+      searchVal: data.searchVal
+    })
+    data.loading = false
+    if (!totalList) throw Error()
+    data.list = totalList.map((record: IRecord) => {
+      record.createTime = record.createTime
+        ? format(parseTime(record.createTime), 'yyyy-MM-dd HH:mm:ss')
+        : ''
+      record.updateTime = record.updateTime
+        ? format(parseTime(record.updateTime), 'yyyy-MM-dd HH:mm:ss')
+        : ''

Review Comment:
   Ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org