You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/09/17 14:02:47 UTC

[pulsar] branch master updated: [Doc] Update *Develop Connectors Guide* (#5181)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f3d37d0  [Doc] Update *Develop Connectors Guide* (#5181)
f3d37d0 is described below

commit f3d37d0bf9f01011e0e1ccc306e61c8202da2073
Author: Anonymitaet <50...@users.noreply.github.com>
AuthorDate: Tue Sep 17 22:02:41 2019 +0800

    [Doc] Update *Develop Connectors Guide* (#5181)
---
 site2/docs/io-develop.md | 278 ++++++++++++++++++++++++++---------------------
 1 file changed, 156 insertions(+), 122 deletions(-)

diff --git a/site2/docs/io-develop.md b/site2/docs/io-develop.md
index ff15e26..27aa677 100644
--- a/site2/docs/io-develop.md
+++ b/site2/docs/io-develop.md
@@ -1,156 +1,188 @@
 ---
 id: io-develop
-title: Develop Connectors
-sidebar_label: Developing Connectors
+title: How to develop Pulsar connectors
+sidebar_label: Develop
 ---
 
-This guide describes how developers can write new connectors for Pulsar IO to move data
-between Pulsar and other systems. It describes how to create a Pulsar IO connector.
+This guide describes how to develop Pulsar connectors to move data
+between Pulsar and other systems. 
 
-Pulsar IO connectors are specialized [Pulsar Functions](functions-overview.md). So writing
-a Pulsar IO connector is as simple as writing a Pulsar function. Pulsar IO connectors come
-in two flavors: {@inject: github:`Source`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java},
-which import data from another system, and {@inject: github:`Sink`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java},
-which export data to another system. For example, [KinesisSink](io-kinesis.md) would export
-the messages of a Pulsar topic to a Kinesis stream, and [RabbitmqSource](io-rabbitmq.md) would import
-the messages of a RabbitMQ queue to a Pulsar topic.
+Pulsar connectors are special [Pulsar Functions](functions-overview.md), so creating
+a Pulsar connector is similar to creating a Pulsar function. 
 
-### Developing
+Pulsar connectors come in two types: 
 
-#### Develop a source connector
+| Type | Description | Example
+|---|---|---
+{@inject: github:`Source`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java}|Import data from another system to Pulsar.|[RabbitMQ source connector](io-rabbitmq.md) imports the messages of a RabbitMQ queue to a Pulsar topic.
+{@inject: github:`Sink`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java}|Export data from Pulsar to another system.|[Kinesis sink connector](io-kinesis.md) exports the messages of a Pulsar topic to a Kinesis stream.
 
-What you need to develop a source connector is to implement {@inject: github:`Source`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java}
-interface.
+## Develop
 
-First, you need to implement the {@inject: github:`open`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java#L33} method. This method will be called once when the source connector
-is initialized. In this method, you can retrieve all the connector specific settings through
-the passed `config` parameter, and initialize all the necessary resourcess. For example, a Kafka
-connector can create the Kafka client in this `open` method.
+You can develop Pulsar source connectors and sink connectors.
 
-Beside the passed-in `config` object, the Pulsar runtime also provides a `SourceContext` for the
-connector to access runtime resources for tasks like collecting metrics. The implementation can
-save the `SourceContext` for futher usage.
+### Source
 
-```java
+Developing a source connector is to implement the {@inject: github:`Source`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java}
+interface, which means you need to implement the {@inject: github:`open`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java#L33} method and the {@inject: github:`record`:/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java#L28} method.
+
+1. Implement the {@inject: github:`open`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java#L33} method. 
+
+    ```java
     /**
-     * Open connector with configuration
-     *
-     * @param config initialization config
-     * @param sourceContext
-     * @throws Exception IO type exceptions when opening a connector
-     */
+    * Open connector with configuration
+    *
+    * @param config initialization config
+    * @param sourceContext
+    * @throws Exception IO type exceptions when opening a connector
+    */
     void open(final Map<String, Object> config, SourceContext sourceContext) throws Exception;
-```
+    ```
 
-The main task for a Source implementor is to implement {@inject: github:`read`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java#L41}
-method.
+    This method is called when the source connector is initialized. 
 
-```java
-    /**
-     * Reads the next message from source.
-     * If source does not have any new messages, this call should block.
-     * @return next message from source.  The return result should never be null
-     * @throws Exception
-     */
-    Record<T> read() throws Exception;
-```
+    In this method, you can retrieve all connector specific settings through the passed-in `config` parameter and initialize all necessary resources. 
+    
+    For example, a Kafka connector can create a Kafka client in this `open` method.
 
-The implementation should be blocking on this method if nothing to return. It should never return
-`null`. The returned {@inject: github:`Record`:/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java#L28} should encapsulates the information that is needed by
-Pulsar IO runtime.
+    Besides, Pulsar runtime also provides a `SourceContext` for the 
+    connector to access runtime resources for tasks like collecting metrics. The implementation can save the `SourceContext` for future use.
 
-These information includes:
+2. Implement the {@inject: github:`read`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java#L41} method.
 
-- *Topic Name*: _Optional_. If the record is originated from a Pulsar topic, it should be the Pulsar topic name.
-- *Key*: _Optional_. If the record has a key associated with it.
-- *Value*: _Required_. The actual data of this record.
-- *Partition Id*: _Optional_. If the record is originated from a partitioned source,
-  return its partition id. The partition id will be used as part of the unique identifier
-  by Pulsar IO runtime to do message deduplication and achieve exactly-once processing guarantee.
-- *Record Sequence*: _Optional_. If the record is originated from a sequential source,
-  return its record sequence. The record sequence will be used as part of the unique identifier
-  by Pulsar IO runtime to do message deduplication and achieve exactly-once processing guarantee.
-- *Properties*: _Optional_. If the record carries user-defined properties, return those properties.
+    ```java
+        /**
+        * Reads the next message from source.
+        * If source does not have any new messages, this call should block.
+        * @return next message from source.  The return result should never be null
+        * @throws Exception
+        */
+        Record<T> read() throws Exception;
+    ```
 
-Additionally, the implemention of the record should provide two methods: `ack` and `fail`. These
-two methods will be used by Pulsar IO connector to acknowledge the records that it has done
-processing and fail the records that it has failed to process.
+    If nothing to return, the implementation should be blocking rather than returning `null`. 
 
-{@inject: github:`KafkaSource`:/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java} is a good example to follow.
+    The returned {@inject: github:`Record`:/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java#L28} should encapsulate the following information, which is needed by Pulsar IO runtime. 
 
-#### Develop a sink connector
+    * {@inject: github:`Record`:/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java#L28} should provide the following variables:
 
-Developing a sink connector is as easy as developing a source connector. You just need to
-implement {@inject: github:`Sink`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java} interface.
+      |Variable|Required|Description
+      |---|---|---
+      `TopicName`|No|Pulsar topic name from which the record is originated from.
+      `Key`|No| Messages can optionally be tagged with keys.<br/><br/>For more information, see [Routing modes](concepts-messaging.md#routing-modes).|
+      `Value`|Yes|Actual data of the record.
+      `EventTime`|No|Event time of the record from the source.
+      `PartitionId`|No| If the record is originated from a partitioned source, it returns its `PartitionId`. <br/><br/>`PartitionId` is used as a part of the unique identifier by Pulsar IO runtime to deduplicate messages and achieve exactly-once processing guarantee.
+      `RecordSequence`|No|If the record is originated from a sequential source, it returns its `RecordSequence`.<br/><br/>`RecordSequence` is used as a part of the unique identifier by Pulsar IO runtime to deduplicate messages and achieve exactly-once processing guarantee.
+      `Properties` |No| If the record carries user-defined properties, it returns those properties.
+      `DestinationTopic`|No|Topic to which message should be written.
+      `Message`|No|A class which carries data sent by users.<br/><br/>For more information, see [Message.java](https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java).|
 
-Similarly, you first need to implement the {@inject: github:`open`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java#L36} method to initialize all the necessary resources
-before implementing the {@inject: github:`write`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java#L44} method.
+     * {@inject: github:`Record`:/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java#L28} should provide the following methods:
 
-```java
-    /**
-     * Open connector with configuration
-     *
-     * @param config initialization config
-     * @param sinkContext
-     * @throws Exception IO type exceptions when opening a connector
-     */
-    void open(final Map<String, Object> config, SinkContext sinkContext) throws Exception;
-```
+        Method|Description
+        |---|---
+        `ack` |Acknowledge that the record is fully processed.
+        `fail`|Indicate that the record fails to be processed.
 
-The main task for a Sink implementor is to implement {@inject: github:`write`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java#L44} method.
+> #### Tip
+>
+> For more information about **how to create a source connector**, see {@inject: github:`KafkaSource`:/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java}.
 
-```java
-    /**
-     * Write a message to Sink
-     * @param inputRecordContext Context of input record from the source
-     * @param record record to write to sink
-     * @throws Exception
-     */
-    void write(Record<T> record) throws Exception;
-```
+### Sink
+
+Developing a sink connector **is similar to** developing a source connector, that is, you need to implement the {@inject: github:`Sink`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java} interface, which means implementing the {@inject: github:`open`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java#L36} method and the {@inject: github:`write`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java#L44} method.
+
+1. Implement the {@inject: github:`open`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java#L36} method.
+
+    ```java
+        /**
+        * Open connector with configuration
+        *
+        * @param config initialization config
+        * @param sinkContext
+        * @throws Exception IO type exceptions when opening a connector
+        */
+        void open(final Map<String, Object> config, SinkContext sinkContext) throws Exception;
+    ```
+
+2. Implement the {@inject: github:`write`:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java#L44} method.
+
+    ```java
+        /**
+        * Write a message to Sink
+        * @param inputRecordContext Context of input record from the source
+        * @param record record to write to sink
+        * @throws Exception
+        */
+        void write(Record<T> record) throws Exception;
+    ```
+
+    During the implementation, you can decide how to write the `Value` and
+    the `Key` to the actual source, and leverage all the provided information such as
+    `PartitionId` and `RecordSequence` to achieve different processing guarantees. 
 
-In the implemention of `write` method, the implementor can decide how to write the value and
-the optional key to the actual source, and leverage all the provided information such as
-`Partition Id`, `Record Sequence` for achieving different processing guarantees. The implementor
-is also responsible for acknowledging records if it has successfully written them or failing
-records if has failed to write them.
+    You also need to ack records (if messages are sent successfully) or fail records (if messages fail to send). 
 
-### Testing
+## Test
 
 Testing connectors can be challenging because Pulsar IO connectors interact with two systems
-that may be difficult to mock - Pulsar and the system the connector is connecting to. It is
-recommended to write very specificially test the functionalities of the connector classes
-while mocking the external services.
-
-Once you have written sufficient unit tests for your connector, we also recommend adding
-separate integration tests to verify end-to-end functionality. In Pulsar, we are using
-[testcontainers](https://www.testcontainers.org/) for all Pulsar integration tests. Pulsar IO
-{@inject: github:`IntegrationTests`:/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io} are good examples to follow on integration testing your connectors.
-
-### Packaging
-
-Once you've developed and tested your connector, you must package it so that it can be submitted
-to a [Pulsar Functions](functions-overview.md) cluster. There are two approaches described
-here work with Pulsar Functions' runtime.
-
-If you plan to package and distribute your connector for others to use, you are obligated to
-properly license and copyright your own code and to adhere to the licensing and copyrights of
-all libraries your code uses and that you include in your distribution. If you are using the
-approach described in ["Creating a NAR package"](#creating-a-nar-package), the NAR plugin will
-automatically create a `DEPENDENCIES` file in the generated NAR package, including the proper
+that may be difficult to mock—Pulsar and the system to which the connector is connecting. 
+
+It is
+recommended writing special tests to test the connector functionalities as below
+while mocking the external service. 
+
+### Unit test
+
+You can create unit tests for your connector.
+
+### Integration test
+
+Once you have written sufficient unit tests, you can add
+separate integration tests to verify end-to-end functionality. 
+
+Pulsar uses
+[testcontainers](https://www.testcontainers.org/) **for all integration tests**. 
+
+> #### Tip
+>
+>For more information about **how to create integration tests for Pulsar connectors**, see {@inject: github:`IntegrationTests`:/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io}.
+
+## Package
+
+Once you've developed and tested your connector, you need to package it so that it can be submitted
+to a [Pulsar Functions](functions-overview.md) cluster. 
+
+There are two methods to
+work with Pulsar Functions' runtime, that is, [NAR](#nar) and [uber JAR](#uber-jar).
+
+> #### Note
+> 
+> If you plan to package and distribute your connector for others to use, you are obligated to
+license and copyright your own code properly. Remember to add the license and copyright to
+all libraries your code uses and to your distribution. 
+>
+> If you use the [NAR](#nar) method, the NAR plugin 
+automatically creates a `DEPENDENCIES` file in the generated NAR package, including the proper
 licensing and copyrights of all libraries of your connector.
 
-#### Creating a NAR package
+### NAR 
 
-The easiest approach to packaging a Pulsar IO connector is to create a NAR package using
-[nifi-nar-maven-plugin](https://mvnrepository.com/artifact/org.apache.nifi/nifi-nar-maven-plugin).
+**NAR** stands for NiFi Archive, which is a custom packaging mechanism used by Apache NiFi, to provide
+a bit of Java ClassLoader isolation. 
 
-NAR stands for NiFi Archive. It is a custom packaging mechanism used by Apache NiFi, to provide
-a bit of Java ClassLoader isolation. For more details, you can read this
-[blog post](https://medium.com/hashmapinc/nifi-nar-files-explained-14113f7796fd) to understand
-how NAR works. Pulsar uses the same mechanism for packaging all the [builtin connectors](io-connectors).
+> #### Tip
+> 
+> For more information about **how NAR works**, see
+> [here](https://medium.com/hashmapinc/nifi-nar-files-explained-14113f7796fd). 
+
+Pulsar uses the same mechanism for packaging **all** [built-in connectors](io-connectors). 
+
+The easiest approach to package a Pulsar connector is to create a NAR package using
+[nifi-nar-maven-plugin](https://mvnrepository.com/artifact/org.apache.nifi/nifi-nar-maven-plugin).
 
-All what you need is to include this [nifi-nar-maven-plugin](https://mvnrepository.com/artifact/org.apache.nifi/nifi-nar-maven-plugin) in your maven project for your connector. For example:
+All you need to do is to include this [nifi-nar-maven-plugin](https://mvnrepository.com/artifact/org.apache.nifi/nifi-nar-maven-plugin) in your maven project for your connector as below. 
 
 ```xml
 <plugins>
@@ -162,14 +194,16 @@ All what you need is to include this [nifi-nar-maven-plugin](https://mvnreposito
 </plugins>
 ```
 
-The {@inject: github:`TwitterFirehose`:/pulsar-io/twitter} connector is a good example to follow.
+> #### Tip
+> 
+> For more information about an **how to use NAR for Pulsar connectors**, see {@inject: github:`TwitterFirehose`:/pulsar-io/twitter/pom.xml#L79}.
 
-#### Creating an Uber JAR
+### Uber JAR
 
-An alternative approach is to create an _uber JAR_ that contains all of the connector's JAR files
+An alternative approach is to create an **uber JAR** that contains all of the connector's JAR files
 and other resource files. No directory internal structure is necessary.
 
-You can use [maven-shade-plugin](https://maven.apache.org/plugins/maven-shade-plugin/examples/includes-excludes.html) to create a Uber JAR. For example:
+You can use [maven-shade-plugin](https://maven.apache.org/plugins/maven-shade-plugin/examples/includes-excludes.html) to create a uber JAR as below:
 
 ```xml
 <plugin>