You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/01/17 11:29:58 UTC

[incubator-inlong-website] branch master updated: [INLONG-2138][Agent] add InLong-Agent Plugin Development Guide (#253)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong-website.git


The following commit(s) were added to refs/heads/master by this push:
     new 5442c00  [INLONG-2138][Agent] add InLong-Agent Plugin Development Guide (#253)
5442c00 is described below

commit 5442c00b4ddd826f810baff0afb6500885185aa0
Author: ziruipeng <zp...@connect.ust.hk>
AuthorDate: Mon Jan 17 19:29:53 2022 +0800

    [INLONG-2138][Agent] add InLong-Agent Plugin Development Guide (#253)
    
    Co-authored-by: EMsnap <st...@tencent.com>
---
 .../how_to_write_plugin_agent.md                   | 205 ++++++++++++++++++++
 docs/design_and_concept/img/Agent_Flow.png         | Bin 0 -> 28915 bytes
 .../how_to_write_plugin_agent.md                   | 206 +++++++++++++++++++++
 .../current/design_and_concept/img/Agent_Flow.png  | Bin 0 -> 28915 bytes
 4 files changed, 411 insertions(+)

diff --git a/docs/design_and_concept/how_to_write_plugin_agent.md b/docs/design_and_concept/how_to_write_plugin_agent.md
new file mode 100644
index 0000000..9e13152
--- /dev/null
+++ b/docs/design_and_concept/how_to_write_plugin_agent.md
@@ -0,0 +1,205 @@
+---
+title: Agent Plugin
+sidebar_position: 1
+---
+
+# Overview
+
+This article is aimed at InLong-Agent plug-in developers, trying to explain the process of developing an Agent plug-in as comprehensively as possible, and strive to eliminate the confusion of developers and make plug-in development easier.
+
+## Before Development
+
+InLong Agent itself, as a data collection framework, is constructed with a Job + Task architecture. And abstract data source reading and writing into Reader/Sink plug-ins, which are incorporated into the entire framework.
+
+Developers need to be clear about the concepts of Job and Task:
+
+- `Job`: `Job` is used by Agent to describe the synchronization job from a source to a destination, and is the smallest business unit of Agent data synchronization. For example: read all files in a file directory
+- `Task`: `Task` is the smallest execution unit obtained by splitting `Job`. For example, if there are multiple files in the folder that need to be read, then a job will be split into multiple tasks, and each task will read the corresponding file
+
+A Task contains the following components:
+
+- Reader: Reader is a data collection module, which is responsible for collecting data from the data source and sending the data to the channel.
+- Sink: Sink is a data writing module, responsible for continuously fetching data from the channel and writing the data to the destination.
+- Channel: Channel is used to connect reader and sink, as a data transmission channel for both, and plays a role in monitoring data writing and reading
+
+As a developer, you only need to develop specific Source, Reader and Sink. If the data needs to be persisted to the local disk, use the persistent Channel, otherwise use the memory Channel
+
+## Demonstration
+
+The Job\Task\Reader\Sink\Channel concept introduced above can be represented by the following figure:
+![](img/Agent_Flow.png)
+
+1. The user submits a Job (via the manager or via curl), and the Job defines the Source, Channel, and Sink that need to be used (defined by the fully qualified name of the class)
+2. The framework starts the Job and creates the Source through the reflection mechanism
+3. The framework starts the Source and calls the Split interface of the Source to generate one or more Tasks
+4. When a Task is generated, a Reader (a type of Source will generate a corresponding reader), a User-configured Channel and a User-configured Sink are generated at the same time
+5. Task starts to execute, Reader starts to read data to Channel, Sink fetches data from Channel and sends it
+6. All the information needed for Job and Task execution is encapsulated in the JobProfile
+
+## Reference Demo
+
+Please understand the above process by reading the Job class, Task class, TextFileSource class, TextFileReader class, and ProxySink class in the Agent source
+
+## Development Process
+
+1. First develop Source, implement split logic, and return ReaderList
+2. The developed Reader implements the logic of reading data and writing to Channel
+3. The sink under development implements the logic of fetching data from the channel and writing it to the specified sink
+
+## Programming must know interface
+
+Some of the plug-ins that will be developed below, the classes and interfaces that need to be known are as follows:
+
+### Reader
+```java
+private class ReaderImpl implements Reader {
+
+    private int count = 0;
+
+    @Override
+    public Message read() {
+        count += 1;
+        return new DefaultMessage("".getBytes(StandardCharsets.UTF_8));
+    }
+
+    @Override
+    public boolean isFinished() {
+        return count > 99999;
+    }
+
+    @Override
+    public String getReadSource() {
+        return null;
+    }
+
+    @Override
+    public void setReadTimeout(long mill) {
+
+    }
+}
+```
+
+The `Reader` interface functions as follows:
+- `read`: Called by a single Task, and returns a read message after the call, and the message inside the Agent is encapsulated by Message
+- `isFinished`: judge whether the reading is completed, for example: if it is an SQL task, judge whether all the contents in the ResultSet have been read; if it is a file task, judge whether there is still data written after the waiting time set by the user
+- `getReadSource`: Get the acquisition source, for example: if it is a file task, it will return the file name currently read
+- `setReadTimeout`: set read timeout
+
+### Sink
+
+```java
+public interface Sink extends Stage {
+
+    /**
+     * Write data into data center
+     *
+     * @param message - message
+     */
+    void write(Message message);
+
+    /**
+     * set source file name where the message is generated
+     * @param sourceName
+     */
+    void setSourceName(String sourceName);
+
+    /**
+     * every sink should include a message filter to filter out stream id
+     */
+    MessageFilter initMessageFilter(JobProfile jobConf);
+}
+
+```
+
+The `Sink` interface functions as follows:
+- `write`: called by a single Task, reads a message from the Channel in the Task and writes it to a specific storage medium. Taking PulsarSink as an example, it needs to be sent to Pulsar through PulsarSender
+- `setSourceName`: set the data source name, if it is a file, the file name
+- `initMessageFilter`: Initialize MessageFilter , the user can create a message filter to filter each message by setting agent.message.filter.classname in the Job configuration file. For details, please refer to the MessageFilter interface
+
+### Source
+
+```java
+/**
+ * Source can be split into multiple reader.
+ */
+public interface Source {
+
+    /**
+     * Split source into a list of readers.
+     *
+     * @param conf job conf
+     * @return - list of reader
+     */
+    List<Reader> split(JobProfile conf);
+}
+
+```
+
+The `Source` interface functions as follows:
+- `split`: Called by a single job to generate multiple Readers, for example: a read file task, matching multiple files in a folder, when the job starts, it will specify TextFileSource as the Source entry,
+  After calling the split function, TextFileSource will detect how many paths match the path matching expression in the folder set by the user, and generate TextFileReader to read
+
+
+## Job Definition
+
+The code is written, have you ever wondered how the framework finds the entry class of the plugin? How does the framework load plugins?
+
+When submitting a task, you will find information about the plugin defined in the task, including the entry class. For example:
+
+```json
+{
+"job": {
+"name": "fileAgentTest",
+"source": "org.apache.inlong.agent.plugin.sources.TextFileSource",
+"sink": "org.apache.inlong.agent.plugin.sinks.ProxySink",
+"channel": "org.apache.inlong.agent.plugin.channel.MemoryChannel"
+}
+}
+```
+
+- `source`: The fully qualified name of the Source class, the instance of which the framework imports the plugin through reflection.
+- `sink`: The fully qualified name of the Sink class, the instance of which the framework imports through the reflection plugin.
+- `channel`: The name of the Channel class used by the framework, the instance of the entry class of the plugin through reflection.
+
+## Message
+
+Like the general `producer-consumer` model, the `Reader` plugin and the `Sink` plugin also use `channel` to achieve data transmission.
+`channel` can be in-memory or persistent, plugins don't have to care. Plugins write data to `channel` through `RecordSender` and read data from `channel` through `RecordReceiver`.
+
+A piece of data in `channel` is a `Message` object, `Message` contains a byte array and attribute data represented by a Map
+
+`Message` has the following methods:
+
+```java
+public interface Message {
+
+    /**
+     * Data content of message.
+     *
+     * @return bytes body
+     */
+    byte[] getBody();
+
+    /**
+     * Data attribute of message
+     *
+     * @return map header
+     */
+    Map<String, String> getHeader();
+}
+```
+
+Developers can expand customized Message according to this interface. For example, ProxyMessage contains InLongGroupId, InLongStreamId and other attributes
+
+
+## Last but not Least
+
+All new plugins must have a document in the `InLong` official wiki. The document needs to include but not limited to the following:
+
+1. **Quick introduction**: Introduce the usage scenarios and features of the plug-in.
+2. **Implementation principle**: Introduce the underlying principle of plug-in implementation, such as `sqlReader` to read data in the database by executing Sql query
+3. **Configuration Instructions**
+    - Give the json configuration file of synchronization tasks in typical scenarios.
+    - Introduce the meaning of each parameter, whether it is required, default value, value range and other constraints.
+4. **Restrictions**: Are there other restrictions on use.
+5. **FAQ**: Frequently asked questions by users.
diff --git a/docs/design_and_concept/img/Agent_Flow.png b/docs/design_and_concept/img/Agent_Flow.png
new file mode 100644
index 0000000..4915d06
Binary files /dev/null and b/docs/design_and_concept/img/Agent_Flow.png differ
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/design_and_concept/how_to_write_plugin_agent.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/design_and_concept/how_to_write_plugin_agent.md
new file mode 100644
index 0000000..27c5a68
--- /dev/null
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/design_and_concept/how_to_write_plugin_agent.md
@@ -0,0 +1,206 @@
+---
+title: Agent 插件
+sidebar_position: 1
+---
+
+# 总览
+
+本文面向 InLong-Agent 插件开发人员,尝试尽可能全面地阐述开发一个 Agent 插件所经过的历程,力求消除开发者的困惑,让插件开发变得简单。
+
+## 开发之前
+
+InLong Agent 本身作为数据采集框架,采用 Job + Task 架构构建。并将数据源读取和写入抽象成为 Reader/Sink 插件,纳入到整个框架中。
+
+开发人员需要明确 Job 以及 Task 的概念:
+
+- `Job`: `Job`是 Agent 用以描述从一个源头到一个目的端的同步作业,是 Agent 数据同步的最小业务单元。比如:读取一个文件目录下的所有文件
+- `Task`: `Task`是把`Job`拆分得到的最小执行单元。比如:文件夹下有多个文件需要被读取,那么一个 job 会被拆分成为多个 task ,每个 task 读取对应的文件
+
+一个Task包含以下各个组件:
+
+- Reader:Reader 为数据采集模块,负责采集数据源的数据,将数据发送给 channel。
+- Sink: Sink 为数据写入模块,负责不断向 channel 取数据,并将数据写入到目的端。
+- Channel:Channel 用于连接 reader 和 sink,作为两者的数据传输通道,并起到了数据的写入读取监控作用
+
+作为开发人员,实际上只需要开发特定的 Source、Reader 以及 Sink 即可,数据如果需要持久化到本地磁盘,使用持久化 Channel ,如果否则使用内存 Channel
+
+## 流程图示
+
+上述介绍的 Job \ Task \ Reader \ Sink \ Channel 概念可以用下图表示:
+![](img/Agent_Flow.png)
+
+1. 用户提交 Job(通过 manager 或者通过 curl 方式提交),Job 中定义了需要使用的 Source, Channel, Sink(通过类的全限定名定义)
+2. 框架启动 Job,通过反射机制创建出 Source
+3. 框架启动 Source,并调用 Source 的 Split 接口,生成一个或者多个 Task
+4. 生成一个 Task 时,同时生成 Reader(一种类型的 Source 会生成对应的 reader),用户配置的 Channel 以及用户配置的 Sink
+5. Task 开始执行,Reader 开始读取数据到 Channel,Sink 从 Channel 中取数进行发送
+6. Job 和 Task 执行时所需要的所有信息都封装在 JobProfile 中
+
+
+## 参考 Demo
+
+请开发人员通过阅读 Agent 源码中的 Job 类、Task 类、TextFileSource 类、TextFileReader 类、以及 ProxySink 类来弄懂上述流程
+
+## 开发流程
+
+1、首先开发 Source , 实现 Split 逻辑,返回 Reader 列表
+2、开发对应的 Reader ,实现读取数据并写入到 Channel 的逻辑
+3、开发对应的 Sink , 实现从 Channel 中取数并写入到指定 Sink 中的逻辑
+
+## 编程必知接口
+
+下面将介绍开发一款插件需要知道的类与接口,如下:
+
+### Reader
+```java
+private class ReaderImpl implements Reader {
+
+    private int count = 0;
+
+    @Override
+    public Message read() {
+        count += 1;
+        return new DefaultMessage("".getBytes(StandardCharsets.UTF_8));
+    }
+
+    @Override
+    public boolean isFinished() {
+        return count > 99999;
+    }
+
+    @Override
+    public String getReadSource() {
+        return null;
+    }
+
+    @Override
+    public void setReadTimeout(long mill) {
+
+    }
+}
+```
+
+`Reader`接口功能如下:
+- `read`: 被单个 Task 调用,调用后返回读取的一条消息,Agent 内部的消息使用 Message 封装
+- `isFinished`: 判断是否读取完成,举例:如果是 SQL 任务,则判断是否读取完了 ResultSet 中的所有内容,如果是文件任务,则判断超过用户设置的等待时间后是否还有数据写入
+- `getReadSource`: 获取采集源,举例:如果是文件任务,则返回当前读取的文件名
+- `setReadTimeout`: 设置读取超时时间
+
+### Sink
+
+```java
+public interface Sink extends Stage {
+
+    /**
+     * Write data into data center
+     *
+     * @param message - message
+     */
+    void write(Message message);
+
+    /**
+     * set source file name where the message is generated
+     * @param sourceName
+     */
+    void setSourceName(String sourceName);
+
+    /**
+     * every sink should include a message filter to filter out stream id
+     */
+    MessageFilter initMessageFilter(JobProfile jobConf);
+}
+
+```
+
+`Sink`接口功能如下:
+- `write`: 被单个 Task 调用,从 Task 中的 Channel 读取一条消息,并写入到特定的存储介质中,以 PulsarSink 为例,则需要通过 PulsarSender 发送到 Pulsar
+- `setSourceName`: 设置数据源名称,如果是文件,则是文件名
+- `initMessageFilter`: 初始化 MessageFilter , 用户可以在Job配置文件中通过设置 agent.message.filter.classname 来创建一个消息过滤器来过滤每一条消息,详情可以参考 MessageFilter 接口
+
+
+### Source
+
+```java
+/**
+ * Source can be split into multiple reader.
+ */
+public interface Source {
+
+    /**
+     * Split source into a list of readers.
+     *
+     * @param conf job conf
+     * @return - list of reader
+     */
+    List<Reader> split(JobProfile conf);
+}
+
+```
+
+`Source`接口功能如下:
+- `split`: 被单个 Job 调用,产生多个 Reader,举例:一个读取文件任务,匹配文件夹内的多个文件,在 job 启动时,会指定 TextFileSource 作为 Source 入口,
+  调用 split 函数后,TextFileSource 会检测用户设置的文件夹内有多少符合路径匹配表达式的路径,并生成 TextFileReader 读取
+  
+
+## 任务配置
+
+代码写好了,有没有想过框架是怎么找到插件的入口类的?框架是如何加载插件的呢?
+
+在提交任务时,会发现任务中定义了插件的相关信息,包括入口类。例如:
+
+```json
+{
+"job": {
+"name": "fileAgentTest",
+"source": "org.apache.inlong.agent.plugin.sources.TextFileSource",
+"sink": "org.apache.inlong.agent.plugin.sinks.ProxySink",
+"channel": "org.apache.inlong.agent.plugin.channel.MemoryChannel"
+}
+}
+```
+
+- `source`: Source 类的全限定名称,框架通过反射插件入口类的实例。
+- `sink`: Sink 类的全限定名称,框架通过反射插件入口类的实例。
+- `channel`: 使用的 Channel 类名,框架通过反射插件入口类的实例。
+
+## Message
+
+跟一般的`生产者-消费者`模式一样,`Reader`插件和`Sink`插件之间也是通过`channel`来实现数据的传输的。
+`channel`可以是内存的,也可能是持久化的,插件不必关心。插件通过`RecordSender`往`channel`写入数据,通过`RecordReceiver`从`channel`读取数据。
+
+`channel`中的一条数据为一个`Message`的对象,`Message`中包含一个字节数组以及一个Map表示的属性数据
+
+`Message`有如下方法:
+
+```java
+public interface Message {
+
+    /**
+     * Data content of message.
+     *
+     * @return bytes body
+     */
+    byte[] getBody();
+
+    /**
+     * Data attribute of message
+     *
+     * @return map header
+     */
+    Map<String, String> getHeader();
+}
+```
+
+开发人员可以根据该接口拓展定制化的 Message ,比如 ProxyMessage 中,就包含了 InLongGroupId, InLongStreamId 等属性
+
+## Last but not Least
+
+新增插件都必须在`InLong`官方wiki中有一篇文档,文档需要包括但不限于以下内容:
+
+1. **快速介绍**:介绍插件的使用场景,特点等。
+2. **实现原理**:介绍插件实现的底层原理,比如`sqlReader`通过执行Sql查询来读取数据库中的数据
+3. **配置说明**
+    - 给出典型场景下的同步任务的json配置文件。
+    - 介绍每个参数的含义、是否必选、默认值、取值范围和其他约束。
+4. **约束限制**:是否存在其他的使用限制条件。
+5. **FAQ**:用户经常会遇到的问题。
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/design_and_concept/img/Agent_Flow.png b/i18n/zh-CN/docusaurus-plugin-content-docs/current/design_and_concept/img/Agent_Flow.png
new file mode 100644
index 0000000..4915d06
Binary files /dev/null and b/i18n/zh-CN/docusaurus-plugin-content-docs/current/design_and_concept/img/Agent_Flow.png differ