You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/09/23 08:06:52 UTC

[GitHub] [beam] Amar3tto opened a new pull request, #23338: [CdapIO] Add CdapIO and SparkReceiverIO to the connectors table in documentation

Amar3tto opened a new pull request, #23338:
URL: https://github.com/apache/beam/pull/23338

   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev merged pull request #23338: [CdapIO] Add CdapIO and SparkReceiverIO documentation in website

Posted by GitBox <gi...@apache.org>.
aromanenko-dev merged PR #23338:
URL: https://github.com/apache/beam/pull/23338


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #23338: [CdapIO] Add CdapIO and SparkReceiverIO documentation in website

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #23338:
URL: https://github.com/apache/beam/pull/23338#issuecomment-1332467994

   Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on a diff in pull request #23338: [CdapIO] Add CdapIO and SparkReceiverIO documentation in website

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on code in PR #23338:
URL: https://github.com/apache/beam/pull/23338#discussion_r1041036827


##########
website/www/site/content/en/documentation/io/built-in/cdap.md:
##########
@@ -0,0 +1,392 @@
+---
+title: "Cdap IO"
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Cdap IO
+
+A `CdapIO` is a transform for reading data from source or writing data to sink CDAP plugin.
+
+## Batch plugins support
+
+`CdapIO` currently supports the following CDAP Batch plugins by referencing `CDAP plugin` class name:
+- [Hubspot Batch Source](https://github.com/data-integrations/hubspot/blob/develop/src/main/java/io/cdap/plugin/hubspot/source/batch/HubspotBatchSource.java)
+- [Hubspot Batch Sink](https://github.com/data-integrations/hubspot/blob/develop/src/main/java/io/cdap/plugin/hubspot/sink/batch/HubspotBatchSink.java)
+- [Salesforce Batch Source](https://github.com/data-integrations/salesforce/blob/develop/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchSource.java)
+- [Salesforce Batch Sink](https://github.com/data-integrations/salesforce/blob/develop/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceBatchSink.java)
+- [ServiceNow Batch Source](https://github.com/data-integrations/servicenow-plugins/blob/develop/src/main/java/io/cdap/plugin/servicenow/source/ServiceNowSource.java)
+- [Zendesk Batch Source](https://github.com/data-integrations/zendesk/blob/develop/src/main/java/io/cdap/plugin/zendesk/source/batch/ZendeskBatchSource.java)
+
+Also, any other CDAP Batch plugin based on Hadoop's `InputFormat` or `OutputFormat` can be used. They can be easily added to the list of supported by class name plugins, for more details please see [CdapIO readme](https://github.com/apache/beam/blob/master/sdks/java/io/cdap/README.md).
+
+## Streaming plugins support
+
+`CdapIO` currently supports CDAP Streaming plugins based on [Apache Spark Receiver](https://spark.apache.org/docs/2.4.0/streaming-custom-receivers.html).
+
+Requirements for CDAP Streaming plugins:
+- CDAP Streaming plugin should be based on `Spark Receiver` (Spark 2.4).
+- CDAP Streaming plugin should support work with offsets.
+- Corresponding Spark Receiver should implement [HasOffset](https://github.com/apache/beam/blob/master/sdks/java/io/sparkreceiver/src/main/java/org/apache/beam/sdk/io/sparkreceiver/HasOffset.java) interface.
+- Records should have the numeric field that represents record offset.
+
+## Batch reading using CdapIO
+
+In order to read from CDAP plugin you will need to pass:
+- `Key` and `Value` classes. You will need to check if these classes have a Beam Coder available.
+- `PluginConfig` object with parameters for certain CDAP plugin.
+
+You can easily build `PluginConfig` object using `ConfigWrapper` class by specifying:
+
+- Class of the needed `PluginConfig`.
+- `Map<String, Object>` parameters map for corresponding CDAP plugin.
+
+For example:
+
+{{< highlight java >}}
+MyPluginConfig pluginConfig =
+new ConfigWrapper<>(MyPluginConfig.class).withParams(pluginConfigParams).build();

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on a diff in pull request #23338: [CdapIO] Add CdapIO and SparkReceiverIO documentation in website

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on code in PR #23338:
URL: https://github.com/apache/beam/pull/23338#discussion_r1041036544


##########
website/www/site/content/en/documentation/io/built-in/sparkreceiver.md:
##########
@@ -0,0 +1,101 @@
+---
+title: "SparkReceiver IO"
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# SparkReceiver IO
+
+SparkReceiverIO is a transform for reading data from an Apache Spark Receiver as an unbounded source.
+
+## Spark Receivers support
+
+`SparkReceiverIO` currently supports [Apache Spark Receiver](https://spark.apache.org/docs/2.4.0/streaming-custom-receivers.html).
+
+Requirements for `Spark Receiver`:
+- Version of Spark should be 2.4.
+- `Spark Receiver` should support work with offsets.
+- `Spark Receiver` should implement [HasOffset](https://github.com/apache/beam/blob/master/sdks/java/io/sparkreceiver/src/main/java/org/apache/beam/sdk/io/sparkreceiver/HasOffset.java) interface.
+- Records should have the numeric field that represents record offset.
+
+For more details please see [SparkReceiverIO readme](https://github.com/apache/beam/blob/master/sdks/java/io/sparkreceiver/2/README.md).
+
+## Streaming reading using SparkReceiverIO
+
+In order to read from `Spark Receiver` you will need to pass:
+
+- `getOffsetFn`, which is `SerializableFunction` that defines how to get `Long` record offset from a record.
+- `receiverBuilder`, which is needed for building instances of `Spark Receiver` that use Apache Beam mechanisms instead of Spark environment.
+
+You can easily create `receiverBuilder` object by passing the following parameters:
+
+- Class of your `Spark Receiver`.
+- Constructor arguments needed to create an instance of your `Spark Receiver`.
+
+For example:
+
+{{< highlight java >}}
+Object[] myConstructorArgs = new Object [] {myConfiguration};

Review Comment:
   Fixed



##########
website/www/site/content/en/documentation/io/built-in/sparkreceiver.md:
##########
@@ -0,0 +1,101 @@
+---
+title: "SparkReceiver IO"
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# SparkReceiver IO
+
+SparkReceiverIO is a transform for reading data from an Apache Spark Receiver as an unbounded source.
+
+## Spark Receivers support
+
+`SparkReceiverIO` currently supports [Apache Spark Receiver](https://spark.apache.org/docs/2.4.0/streaming-custom-receivers.html).
+
+Requirements for `Spark Receiver`:
+- Version of Spark should be 2.4.

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on a diff in pull request #23338: [CdapIO] Add CdapIO and SparkReceiverIO documentation in website

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on code in PR #23338:
URL: https://github.com/apache/beam/pull/23338#discussion_r1041037660


##########
website/www/site/content/en/documentation/io/connectors.md:
##########
@@ -775,6 +775,32 @@ This table provides a consolidated, at-a-glance overview of the available built-
     <td class="present">✔</td>
     <td class="present">✔</td>
   </tr>
+  <tr>
+    <td>SparkReceiverIO</td>
+    <td class="present">✔</td>
+    <td class="present">✘</td>
+    <td class="present">
+      ✔
+      native
+    </td>
+    <td>Not available</td>
+    <td>Not available</td>
+    <td class="present">✘</td>
+    <td class="present">✔</td>
+  </tr>
+  <tr>
+    <td>CdapIO</td>

Review Comment:
   Done



##########
website/www/site/content/en/documentation/io/connectors.md:
##########
@@ -775,6 +775,32 @@ This table provides a consolidated, at-a-glance overview of the available built-
     <td class="present">✔</td>
     <td class="present">✔</td>
   </tr>
+  <tr>
+    <td>SparkReceiverIO</td>

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on a diff in pull request #23338: [CdapIO] Add CdapIO and SparkReceiverIO documentation in website

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on code in PR #23338:
URL: https://github.com/apache/beam/pull/23338#discussion_r1038104635


##########
website/www/site/content/en/documentation/io/connectors.md:
##########
@@ -775,6 +775,32 @@ This table provides a consolidated, at-a-glance overview of the available built-
     <td class="present">✔</td>
     <td class="present">✔</td>
   </tr>
+  <tr>
+    <td>SparkReceiverIO</td>

Review Comment:
   Please, add a link to the guide page, like it's done for ParquetIO, for example.



##########
website/www/site/content/en/documentation/io/built-in/sparkreceiver.md:
##########
@@ -0,0 +1,101 @@
+---

Review Comment:
   Please, add a link to this page on `I/O connector guides` sub-menu.



##########
website/www/site/content/en/documentation/io/built-in/cdap.md:
##########
@@ -0,0 +1,392 @@
+---
+title: "Cdap IO"
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Cdap IO
+
+A `CdapIO` is a transform for reading data from source or writing data to sink CDAP plugin.
+
+## Batch plugins support
+
+`CdapIO` currently supports the following CDAP Batch plugins by referencing `CDAP plugin` class name:
+- [Hubspot Batch Source](https://github.com/data-integrations/hubspot/blob/develop/src/main/java/io/cdap/plugin/hubspot/source/batch/HubspotBatchSource.java)
+- [Hubspot Batch Sink](https://github.com/data-integrations/hubspot/blob/develop/src/main/java/io/cdap/plugin/hubspot/sink/batch/HubspotBatchSink.java)
+- [Salesforce Batch Source](https://github.com/data-integrations/salesforce/blob/develop/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchSource.java)
+- [Salesforce Batch Sink](https://github.com/data-integrations/salesforce/blob/develop/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceBatchSink.java)
+- [ServiceNow Batch Source](https://github.com/data-integrations/servicenow-plugins/blob/develop/src/main/java/io/cdap/plugin/servicenow/source/ServiceNowSource.java)
+- [Zendesk Batch Source](https://github.com/data-integrations/zendesk/blob/develop/src/main/java/io/cdap/plugin/zendesk/source/batch/ZendeskBatchSource.java)
+
+Also, any other CDAP Batch plugin based on Hadoop's `InputFormat` or `OutputFormat` can be used. They can be easily added to the list of supported by class name plugins, for more details please see [CdapIO readme](https://github.com/apache/beam/blob/master/sdks/java/io/cdap/README.md).
+
+## Streaming plugins support
+
+`CdapIO` currently supports CDAP Streaming plugins based on [Apache Spark Receiver](https://spark.apache.org/docs/2.4.0/streaming-custom-receivers.html).
+
+Requirements for CDAP Streaming plugins:
+- CDAP Streaming plugin should be based on `Spark Receiver` (Spark 2.4).
+- CDAP Streaming plugin should support work with offsets.
+- Corresponding Spark Receiver should implement [HasOffset](https://github.com/apache/beam/blob/master/sdks/java/io/sparkreceiver/src/main/java/org/apache/beam/sdk/io/sparkreceiver/HasOffset.java) interface.
+- Records should have the numeric field that represents record offset.
+
+## Batch reading using CdapIO
+
+In order to read from CDAP plugin you will need to pass:
+- `Key` and `Value` classes. You will need to check if these classes have a Beam Coder available.
+- `PluginConfig` object with parameters for certain CDAP plugin.
+
+You can easily build `PluginConfig` object using `ConfigWrapper` class by specifying:
+
+- Class of the needed `PluginConfig`.
+- `Map<String, Object>` parameters map for corresponding CDAP plugin.
+
+For example:
+
+{{< highlight java >}}
+MyPluginConfig pluginConfig =
+new ConfigWrapper<>(MyPluginConfig.class).withParams(pluginConfigParams).build();
+{{< /highlight >}}
+
+### Read data by plugin class name
+
+Some CDAP plugins are already supported and can be used just by plugin class name.
+
+For example:
+
+{{< highlight java >}}
+CdapIO.Read<NullWritable, JsonElement> readTransform =

Review Comment:
   Please, format all code snippets of this PR with the same padding/code style as other Beam code used to be done.



##########
website/www/site/content/en/documentation/io/built-in/cdap.md:
##########
@@ -0,0 +1,392 @@
+---
+title: "Cdap IO"
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Cdap IO
+
+A `CdapIO` is a transform for reading data from source or writing data to sink CDAP plugin.
+
+## Batch plugins support
+
+`CdapIO` currently supports the following CDAP Batch plugins by referencing `CDAP plugin` class name:
+- [Hubspot Batch Source](https://github.com/data-integrations/hubspot/blob/develop/src/main/java/io/cdap/plugin/hubspot/source/batch/HubspotBatchSource.java)
+- [Hubspot Batch Sink](https://github.com/data-integrations/hubspot/blob/develop/src/main/java/io/cdap/plugin/hubspot/sink/batch/HubspotBatchSink.java)
+- [Salesforce Batch Source](https://github.com/data-integrations/salesforce/blob/develop/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchSource.java)
+- [Salesforce Batch Sink](https://github.com/data-integrations/salesforce/blob/develop/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceBatchSink.java)
+- [ServiceNow Batch Source](https://github.com/data-integrations/servicenow-plugins/blob/develop/src/main/java/io/cdap/plugin/servicenow/source/ServiceNowSource.java)
+- [Zendesk Batch Source](https://github.com/data-integrations/zendesk/blob/develop/src/main/java/io/cdap/plugin/zendesk/source/batch/ZendeskBatchSource.java)
+
+Also, any other CDAP Batch plugin based on Hadoop's `InputFormat` or `OutputFormat` can be used. They can be easily added to the list of supported by class name plugins, for more details please see [CdapIO readme](https://github.com/apache/beam/blob/master/sdks/java/io/cdap/README.md).
+
+## Streaming plugins support
+
+`CdapIO` currently supports CDAP Streaming plugins based on [Apache Spark Receiver](https://spark.apache.org/docs/2.4.0/streaming-custom-receivers.html).
+
+Requirements for CDAP Streaming plugins:
+- CDAP Streaming plugin should be based on `Spark Receiver` (Spark 2.4).
+- CDAP Streaming plugin should support work with offsets.
+- Corresponding Spark Receiver should implement [HasOffset](https://github.com/apache/beam/blob/master/sdks/java/io/sparkreceiver/src/main/java/org/apache/beam/sdk/io/sparkreceiver/HasOffset.java) interface.
+- Records should have the numeric field that represents record offset.
+
+## Batch reading using CdapIO
+
+In order to read from CDAP plugin you will need to pass:
+- `Key` and `Value` classes. You will need to check if these classes have a Beam Coder available.
+- `PluginConfig` object with parameters for certain CDAP plugin.
+
+You can easily build `PluginConfig` object using `ConfigWrapper` class by specifying:
+
+- Class of the needed `PluginConfig`.
+- `Map<String, Object>` parameters map for corresponding CDAP plugin.
+
+For example:
+
+{{< highlight java >}}
+MyPluginConfig pluginConfig =
+new ConfigWrapper<>(MyPluginConfig.class).withParams(pluginConfigParams).build();
+{{< /highlight >}}
+
+### Read data by plugin class name
+
+Some CDAP plugins are already supported and can be used just by plugin class name.
+
+For example:
+
+{{< highlight java >}}
+CdapIO.Read<NullWritable, JsonElement> readTransform =
+CdapIO.<NullWritable, JsonElement>read()
+    .withCdapPluginClass(HubspotBatchSource.class)
+    .withPluginConfig(pluginConfig)
+    .withKeyClass(NullWritable.class)
+    .withValueClass(JsonElement.class);
+p.apply("read", readTransform);
+{{< /highlight >}}
+
+### Read data with building Batch Plugin
+
+If CDAP plugin is not supported by plugin class name, you can easily build `Plugin` object by passing the following parameters:
+
+- Class of CDAP Batch plugin.
+- The `InputFormat` class used to connect to your CDAP plugin of choice.
+- The `InputFormatProvider` class used to provide `InputFormat`.
+
+Then you will be able to pass this `Plugin` object to `CdapIO`.
+
+For example:
+
+{{< highlight java >}}
+CdapIO.Read<String, String> readTransform =
+CdapIO.<String, String>read()
+    .withCdapPlugin(
+        Plugin.createBatch(
+            MyCdapPlugin.class,
+            MyInputFormat.class,
+            MyInputFormatProvider.class))
+    .withPluginConfig(pluginConfig)
+    .withKeyClass(String.class)
+    .withValueClass(String.class);
+p.apply("read", readTransform);
+{{< /highlight >}}
+
+### Examples for specific CDAP plugins
+
+#### CDAP Hubspot Batch Source plugin
+
+{{< highlight java >}}
+SourceHubspotConfig pluginConfig =
+    new ConfigWrapper<>(SourceHubspotConfig.class).withParams(pluginConfigParams).build();
+CdapIO<NullWritable, JsonElement> readTransform =
+CdapIO.<NullWritable, JsonElement>read()
+    .withCdapPluginClass(HubspotBatchSource.class)
+    .withPluginConfig(pluginConfig)
+    .withKeyClass(NullWritable.class)
+    .withValueClass(JsonElement.class);
+p.apply("readFromHubspotPlugin", readTransform);
+{{< /highlight >}}
+
+#### CDAP Salesforce Batch Source plugin
+
+{{< highlight java >}}
+SalesforceSourceConfig pluginConfig =
+    new ConfigWrapper<>(SalesforceSourceConfig.class).withParams(pluginConfigParams).build();
+CdapIO<Schema, LinkedHashMap> readTransform =
+CdapIO.<Schema, LinkedHashMap>read()
+    .withCdapPluginClass(SalesforceBatchSource.class)
+    .withPluginConfig(pluginConfig)
+    .withKeyClass(Schema.class)

Review Comment:
   `Schema.class` used as a key? Which Schema class is actually used?



##########
website/www/site/content/en/documentation/io/built-in/cdap.md:
##########
@@ -0,0 +1,392 @@
+---
+title: "Cdap IO"
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Cdap IO
+
+A `CdapIO` is a transform for reading data from source or writing data to sink CDAP plugin.
+
+## Batch plugins support
+
+`CdapIO` currently supports the following CDAP Batch plugins by referencing `CDAP plugin` class name:
+- [Hubspot Batch Source](https://github.com/data-integrations/hubspot/blob/develop/src/main/java/io/cdap/plugin/hubspot/source/batch/HubspotBatchSource.java)
+- [Hubspot Batch Sink](https://github.com/data-integrations/hubspot/blob/develop/src/main/java/io/cdap/plugin/hubspot/sink/batch/HubspotBatchSink.java)
+- [Salesforce Batch Source](https://github.com/data-integrations/salesforce/blob/develop/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchSource.java)
+- [Salesforce Batch Sink](https://github.com/data-integrations/salesforce/blob/develop/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceBatchSink.java)
+- [ServiceNow Batch Source](https://github.com/data-integrations/servicenow-plugins/blob/develop/src/main/java/io/cdap/plugin/servicenow/source/ServiceNowSource.java)
+- [Zendesk Batch Source](https://github.com/data-integrations/zendesk/blob/develop/src/main/java/io/cdap/plugin/zendesk/source/batch/ZendeskBatchSource.java)
+
+Also, any other CDAP Batch plugin based on Hadoop's `InputFormat` or `OutputFormat` can be used. They can be easily added to the list of supported by class name plugins, for more details please see [CdapIO readme](https://github.com/apache/beam/blob/master/sdks/java/io/cdap/README.md).
+
+## Streaming plugins support
+
+`CdapIO` currently supports CDAP Streaming plugins based on [Apache Spark Receiver](https://spark.apache.org/docs/2.4.0/streaming-custom-receivers.html).
+
+Requirements for CDAP Streaming plugins:
+- CDAP Streaming plugin should be based on `Spark Receiver` (Spark 2.4).
+- CDAP Streaming plugin should support work with offsets.
+- Corresponding Spark Receiver should implement [HasOffset](https://github.com/apache/beam/blob/master/sdks/java/io/sparkreceiver/src/main/java/org/apache/beam/sdk/io/sparkreceiver/HasOffset.java) interface.
+- Records should have the numeric field that represents record offset.
+
+## Batch reading using CdapIO
+
+In order to read from CDAP plugin you will need to pass:
+- `Key` and `Value` classes. You will need to check if these classes have a Beam Coder available.
+- `PluginConfig` object with parameters for certain CDAP plugin.
+
+You can easily build `PluginConfig` object using `ConfigWrapper` class by specifying:
+
+- Class of the needed `PluginConfig`.
+- `Map<String, Object>` parameters map for corresponding CDAP plugin.
+
+For example:
+
+{{< highlight java >}}
+MyPluginConfig pluginConfig =
+new ConfigWrapper<>(MyPluginConfig.class).withParams(pluginConfigParams).build();

Review Comment:
   What is `pluginConfigParams` and how it's defined?



##########
website/www/site/content/en/documentation/io/built-in/sparkreceiver.md:
##########
@@ -0,0 +1,101 @@
+---
+title: "SparkReceiver IO"
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# SparkReceiver IO
+
+SparkReceiverIO is a transform for reading data from an Apache Spark Receiver as an unbounded source.
+
+## Spark Receivers support
+
+`SparkReceiverIO` currently supports [Apache Spark Receiver](https://spark.apache.org/docs/2.4.0/streaming-custom-receivers.html).
+
+Requirements for `Spark Receiver`:
+- Version of Spark should be 2.4.
+- `Spark Receiver` should support work with offsets.
+- `Spark Receiver` should implement [HasOffset](https://github.com/apache/beam/blob/master/sdks/java/io/sparkreceiver/src/main/java/org/apache/beam/sdk/io/sparkreceiver/HasOffset.java) interface.
+- Records should have the numeric field that represents record offset.
+
+For more details please see [SparkReceiverIO readme](https://github.com/apache/beam/blob/master/sdks/java/io/sparkreceiver/2/README.md).
+
+## Streaming reading using SparkReceiverIO
+
+In order to read from `Spark Receiver` you will need to pass:
+
+- `getOffsetFn`, which is `SerializableFunction` that defines how to get `Long` record offset from a record.
+- `receiverBuilder`, which is needed for building instances of `Spark Receiver` that use Apache Beam mechanisms instead of Spark environment.
+
+You can easily create `receiverBuilder` object by passing the following parameters:
+
+- Class of your `Spark Receiver`.
+- Constructor arguments needed to create an instance of your `Spark Receiver`.
+
+For example:
+
+{{< highlight java >}}
+Object[] myConstructorArgs = new Object [] {myConfiguration};

Review Comment:
   I think it's not clear from this example to properly configure a `ReceiverBuilder` and with which arguments.



##########
website/www/site/content/en/documentation/io/connectors.md:
##########
@@ -775,6 +775,32 @@ This table provides a consolidated, at-a-glance overview of the available built-
     <td class="present">✔</td>
     <td class="present">✔</td>
   </tr>
+  <tr>
+    <td>SparkReceiverIO</td>
+    <td class="present">✔</td>
+    <td class="present">✘</td>
+    <td class="present">
+      ✔
+      native
+    </td>
+    <td>Not available</td>
+    <td>Not available</td>
+    <td class="present">✘</td>
+    <td class="present">✔</td>
+  </tr>
+  <tr>
+    <td>CdapIO</td>

Review Comment:
   Please, add a link to the guide page, like it's done for ParquetIO, for example.



##########
website/www/site/content/en/documentation/io/built-in/cdap.md:
##########
@@ -0,0 +1,392 @@
+---

Review Comment:
   Please, add a link to this page on `I/O connector guides` sub-menu.



##########
website/www/site/content/en/documentation/io/built-in/sparkreceiver.md:
##########
@@ -0,0 +1,101 @@
+---
+title: "SparkReceiver IO"
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# SparkReceiver IO
+
+SparkReceiverIO is a transform for reading data from an Apache Spark Receiver as an unbounded source.
+
+## Spark Receivers support
+
+`SparkReceiverIO` currently supports [Apache Spark Receiver](https://spark.apache.org/docs/2.4.0/streaming-custom-receivers.html).
+
+Requirements for `Spark Receiver`:
+- Version of Spark should be 2.4.

Review Comment:
   nit: `2.4.*` ?



##########
website/www/site/content/en/documentation/io/built-in/cdap.md:
##########
@@ -0,0 +1,392 @@
+---
+title: "Cdap IO"
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Cdap IO
+
+A `CdapIO` is a transform for reading data from source or writing data to sink CDAP plugin.
+
+## Batch plugins support
+
+`CdapIO` currently supports the following CDAP Batch plugins by referencing `CDAP plugin` class name:
+- [Hubspot Batch Source](https://github.com/data-integrations/hubspot/blob/develop/src/main/java/io/cdap/plugin/hubspot/source/batch/HubspotBatchSource.java)
+- [Hubspot Batch Sink](https://github.com/data-integrations/hubspot/blob/develop/src/main/java/io/cdap/plugin/hubspot/sink/batch/HubspotBatchSink.java)
+- [Salesforce Batch Source](https://github.com/data-integrations/salesforce/blob/develop/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchSource.java)
+- [Salesforce Batch Sink](https://github.com/data-integrations/salesforce/blob/develop/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceBatchSink.java)
+- [ServiceNow Batch Source](https://github.com/data-integrations/servicenow-plugins/blob/develop/src/main/java/io/cdap/plugin/servicenow/source/ServiceNowSource.java)
+- [Zendesk Batch Source](https://github.com/data-integrations/zendesk/blob/develop/src/main/java/io/cdap/plugin/zendesk/source/batch/ZendeskBatchSource.java)
+
+Also, any other CDAP Batch plugin based on Hadoop's `InputFormat` or `OutputFormat` can be used. They can be easily added to the list of supported by class name plugins, for more details please see [CdapIO readme](https://github.com/apache/beam/blob/master/sdks/java/io/cdap/README.md).
+
+## Streaming plugins support
+
+`CdapIO` currently supports CDAP Streaming plugins based on [Apache Spark Receiver](https://spark.apache.org/docs/2.4.0/streaming-custom-receivers.html).
+
+Requirements for CDAP Streaming plugins:
+- CDAP Streaming plugin should be based on `Spark Receiver` (Spark 2.4).
+- CDAP Streaming plugin should support work with offsets.
+- Corresponding Spark Receiver should implement [HasOffset](https://github.com/apache/beam/blob/master/sdks/java/io/sparkreceiver/src/main/java/org/apache/beam/sdk/io/sparkreceiver/HasOffset.java) interface.
+- Records should have the numeric field that represents record offset.
+
+## Batch reading using CdapIO
+
+In order to read from CDAP plugin you will need to pass:
+- `Key` and `Value` classes. You will need to check if these classes have a Beam Coder available.
+- `PluginConfig` object with parameters for certain CDAP plugin.
+
+You can easily build `PluginConfig` object using `ConfigWrapper` class by specifying:
+
+- Class of the needed `PluginConfig`.
+- `Map<String, Object>` parameters map for corresponding CDAP plugin.
+
+For example:
+
+{{< highlight java >}}
+MyPluginConfig pluginConfig =
+new ConfigWrapper<>(MyPluginConfig.class).withParams(pluginConfigParams).build();
+{{< /highlight >}}
+
+### Read data by plugin class name
+
+Some CDAP plugins are already supported and can be used just by plugin class name.
+
+For example:
+
+{{< highlight java >}}
+CdapIO.Read<NullWritable, JsonElement> readTransform =
+CdapIO.<NullWritable, JsonElement>read()
+    .withCdapPluginClass(HubspotBatchSource.class)
+    .withPluginConfig(pluginConfig)
+    .withKeyClass(NullWritable.class)
+    .withValueClass(JsonElement.class);
+p.apply("read", readTransform);
+{{< /highlight >}}
+
+### Read data with building Batch Plugin
+
+If CDAP plugin is not supported by plugin class name, you can easily build `Plugin` object by passing the following parameters:
+
+- Class of CDAP Batch plugin.
+- The `InputFormat` class used to connect to your CDAP plugin of choice.
+- The `InputFormatProvider` class used to provide `InputFormat`.
+
+Then you will be able to pass this `Plugin` object to `CdapIO`.
+
+For example:
+
+{{< highlight java >}}
+CdapIO.Read<String, String> readTransform =
+CdapIO.<String, String>read()
+    .withCdapPlugin(
+        Plugin.createBatch(
+            MyCdapPlugin.class,
+            MyInputFormat.class,
+            MyInputFormatProvider.class))
+    .withPluginConfig(pluginConfig)
+    .withKeyClass(String.class)
+    .withValueClass(String.class);
+p.apply("read", readTransform);
+{{< /highlight >}}
+
+### Examples for specific CDAP plugins
+
+#### CDAP Hubspot Batch Source plugin
+
+{{< highlight java >}}
+SourceHubspotConfig pluginConfig =
+    new ConfigWrapper<>(SourceHubspotConfig.class).withParams(pluginConfigParams).build();
+CdapIO<NullWritable, JsonElement> readTransform =
+CdapIO.<NullWritable, JsonElement>read()
+    .withCdapPluginClass(HubspotBatchSource.class)
+    .withPluginConfig(pluginConfig)
+    .withKeyClass(NullWritable.class)
+    .withValueClass(JsonElement.class);
+p.apply("readFromHubspotPlugin", readTransform);
+{{< /highlight >}}
+
+#### CDAP Salesforce Batch Source plugin
+
+{{< highlight java >}}
+SalesforceSourceConfig pluginConfig =
+    new ConfigWrapper<>(SalesforceSourceConfig.class).withParams(pluginConfigParams).build();
+CdapIO<Schema, LinkedHashMap> readTransform =
+CdapIO.<Schema, LinkedHashMap>read()
+    .withCdapPluginClass(SalesforceBatchSource.class)
+    .withPluginConfig(pluginConfig)
+    .withKeyClass(Schema.class)
+    .withValueClass(LinkedHashMap.class);
+p.apply("readFromSalesforcePlugin", readTransform);
+{{< /highlight >}}
+
+#### CDAP ServiceNow Batch Source plugin
+
+{{< highlight java >}}
+ServiceNowSourceConfig pluginConfig =
+    new ConfigWrapper<>(ServiceNowSourceConfig.class).withParams(pluginConfigParams).build();
+CdapIO<NullWritable, StructuredRecord> readTransform =
+CdapIO.<NullWritable, StructuredRecord>read()
+    .withCdapPluginClass(ServiceNowSource.class)
+    .withPluginConfig(pluginConfig)
+    .withKeyClass(NullWritable.class)
+    .withValueClass(StructuredRecord.class);
+p.apply("readFromServiceNowPlugin", readTransform);
+{{< /highlight >}}
+
+#### CDAP Zendesk Batch Source plugin
+
+{{< highlight java >}}
+ZendeskBatchSourceConfig pluginConfig =
+    new ConfigWrapper<>(ZendeskBatchSourceConfig.class).withParams(pluginConfigParams).build();
+CdapIO<NullWritable, StructuredRecord> readTransform =
+CdapIO.<NullWritable, StructuredRecord>read()
+    .withCdapPluginClass(ZendeskBatchSource.class)
+    .withPluginConfig(pluginConfig)
+    .withKeyClass(NullWritable.class)
+    .withValueClass(StructuredRecord.class);
+p.apply("readFromZendeskPlugin", readTransform);
+{{< /highlight >}}
+
+To learn more please check out [complete examples](https://github.com/apache/beam/tree/master/examples/java/cdap/src/main/java/org/apache/beam/examples/complete/cdap).
+
+## Batch writing using CdapIO
+
+In order to write to CDAP plugin you will need to pass:
+- `Key` and `Value` classes. You will need to check if these classes have a Beam Coder available.
+- `locksDirPath`, which is locks directory path where locks will be stored. This parameter is needed for Hadoop External Synchronization (mechanism for acquiring locks related to the write job).
+- `PluginConfig` object with parameters for certain CDAP plugin.
+
+You can easily build `PluginConfig` object using `ConfigWrapper` class by specifying:
+
+- Class of the needed `PluginConfig`.
+- `Map<String, Object>` parameters map for corresponding CDAP plugin.
+
+For example:
+
+{{< highlight java >}}
+MyPluginConfig pluginConfig =
+new ConfigWrapper<>(MyPluginConfig.class).withParams(pluginConfigParams).build();
+{{< /highlight >}}
+
+### Write data by plugin class name
+
+Some CDAP plugins are already supported and can be used just by plugin class name.
+
+For example:
+
+{{< highlight java >}}
+CdapIO.Write<NullWritable, String> readTransform =
+CdapIO.<NullWritable, String>write()
+    .withCdapPluginClass(HubspotBatchSink.class)
+    .withPluginConfig(pluginConfig)
+    .withKeyClass(NullWritable.class)
+    .withValueClass(String.class)
+    .withLocksDirPath(locksDirPath);
+p.apply("write", writeTransform);
+{{< /highlight >}}
+
+### Write data with building Batch Plugin
+
+If CDAP plugin is not supported by plugin class name, you can easily build `Plugin` object by passing the following parameters:
+
+- Class of CDAP plugin.
+- The `OutputFormat` class used to connect to your CDAP plugin of choice.
+- The `OutputFormatProvider` class used to provide `OutputFormat`.
+
+Then you will be able to pass this `Plugin` object to `CdapIO`.
+
+For example:
+
+{{< highlight java >}}
+CdapIO.Write<String, String> writeTransform =
+CdapIO.<String, String>write()
+    .withCdapPlugin(
+        Plugin.createBatch(
+            MyCdapPlugin.class,
+            MyOutputFormat.class,
+            MyOutputFormatProvider.class))
+    .withPluginConfig(pluginConfig)
+    .withKeyClass(String.class)
+    .withValueClass(String.class)
+    .withLocksDirPath(locksDirPath);
+p.apply("write", writeTransform);
+{{< /highlight >}}
+
+### Examples for specific CDAP plugins
+
+#### CDAP Hubspot Batch Sink plugin
+
+{{< highlight java >}}
+SinkHubspotConfig pluginConfig =
+    new ConfigWrapper<>(SinkHubspotConfig.class).withParams(pluginConfigParams).build();
+CdapIO<NullWritable, String> writeTransform =
+CdapIO.<NullWritable, String>write()
+    .withCdapPluginClass(pluginClass)
+    .withPluginConfig(pluginConfig)
+    .withKeyClass(NullWritable.class)
+    .withValueClass(String.class)
+    .withLocksDirPath(locksDirPath);
+p.apply("writeToHubspotPlugin", writeTransform);
+{{< /highlight >}}
+
+#### CDAP Salesforce Batch Sink plugin
+
+{{< highlight java >}}
+SalesforceSinkConfig pluginConfig =
+    new ConfigWrapper<>(SalesforceSinkConfig.class).withParams(pluginConfigParams).build();
+CdapIO<NullWritable, CSVRecord> writeTransform =
+CdapIO.<NullWritable, CSVRecord>write()
+    .withCdapPluginClass(pluginClass)
+    .withPluginConfig(pluginConfig)
+    .withKeyClass(NullWritable.class)
+    .withValueClass(CSVRecord.class)
+    .withLocksDirPath(locksDirPath);
+p.apply("writeToSalesforcePlugin", writeTransform);
+{{< /highlight >}}
+
+To learn more please check out [complete examples](https://github.com/apache/beam/tree/master/examples/java/cdap/src/main/java/org/apache/beam/examples/complete/cdap).
+
+## Streaming reading using CdapIO
+
+In order to read from CDAP plugin you will need to pass:
+- `Key` and `Value` classes. You will need to check if these classes have a Beam Coder available.
+- `PluginConfig` object with parameters for certain CDAP plugin.
+
+You can easily build `PluginConfig` object using `ConfigWrapper` class by specifying:
+
+- Class of the needed `PluginConfig`.
+- `Map<String, Object>` parameters map for corresponding CDAP plugin.
+
+For example:
+
+{{< highlight java >}}
+MyPluginConfig pluginConfig =
+new ConfigWrapper<>(MyPluginConfig.class).withParams(pluginConfigParams).build();
+{{< /highlight >}}
+
+### Read data by plugin class name
+
+Some CDAP plugins are already supported and can be used just by plugin class name.
+
+For example:
+
+{{< highlight java >}}
+CdapIO.Read<String, String> readTransform =
+CdapIO.<String, String>read()
+    .withCdapPluginClass(MyStreamingPlugin.class)
+    .withPluginConfig(pluginConfig)
+    .withKeyClass(NullWritable.class)
+    .withValueClass(String.class);
+p.apply("read", readTransform);
+{{< /highlight >}}
+
+### Read data with building Streaming Plugin
+
+If CDAP plugin is not supported by plugin class name, you can easily build `Plugin` object by passing the following parameters:
+
+- Class of CDAP Streaming plugin.
+- `getOffsetFn`, which is `SerializableFunction` that defines how to get `Long` record offset from a record.
+- `receiverClass`, which is Spark (v 2.4) `Receiver` class associated with CDAP plugin.
+- (Optionally) `getReceiverArgsFromConfigFn`, which is `SerializableFunction` that defines how to get constructor arguments for Spark `Receiver` using `PluginConfig` object.
+
+Then you will be able to pass this `Plugin` object to `CdapIO`.
+
+For example:
+
+{{< highlight java >}}
+CdapIO.Read<String, String> readTransform =
+CdapIO.<String, String>read()
+    .withCdapPlugin(
+        Plugin.createStreaming(
+            MyStreamingPlugin.class,
+            myGetOffsetFn,
+            MyReceiver.class,
+            myGetReceiverArgsFromConfigFn))
+    .withPluginConfig(pluginConfig)
+    .withKeyClass(NullWritable.class)
+    .withValueClass(String.class);
+p.apply("read", readTransform);
+{{< /highlight >}}
+
+### Read data with optional parameters
+
+Optionally you can pass the following optional parameters:
+
+- `pullFrequencySec`, which is delay in seconds between polling for new records updates.
+- `startOffset`, which is inclusive start offset from which the reading should be started.
+
+For example:
+
+{{< highlight java >}}
+CdapIO.Read<String, String> readTransform =
+CdapIO.<String, String>read()
+    .withCdapPluginClass(MyStreamingPlugin.class)
+    .withPluginConfig(pluginConfig)
+    .withKeyClass(NullWritable.class)
+    .withValueClass(String.class)
+    .withPullFrequencySec(1L)
+    .withStartOffset(1L);
+p.apply("read", readTransform);
+{{< /highlight >}}
+
+### Examples for specific CDAP plugins
+
+#### CDAP Hubspot Streaming Source plugin
+
+{{< highlight java >}}
+HubspotStreamingSourceConfig pluginConfig =
+    new ConfigWrapper<>(HubspotStreamingSourceConfig.class)
+        .withParams(pluginConfigParams).build();
+CdapIO.Read<NullWritable, String> readTransform =
+CdapIO.<NullWritable, String>read()
+    .withCdapPlugin(
+        Plugin.createStreaming(
+        HubspotStreamingSource.class,
+        GetOffsetUtils.getOffsetFnForHubspot(),
+        HubspotReceiver.class))
+    .withPluginConfig(pluginConfig)
+    .withKeyClass(NullWritable.class)
+    .withValueClass(String.class);
+p.apply("readFromHubspotPlugin", readTransform);
+{{< /highlight >}}
+
+#### CDAP Salesforce Streaming Source plugin
+
+{{< highlight java >}}
+SalesforceStreamingSourceConfig pluginConfig =
+    new ConfigWrapper<>(SalesforceStreamingSourceConfig.class)
+        .withParams(pluginConfigParams).build();
+CdapIO.Read<NullWritable, String> readTransform =
+CdapIO.<NullWritable, String>read()
+    .withCdapPlugin(
+        Plugin.createStreaming(
+            SalesforceStreamingSource.class,
+            GetOffsetUtils.getOffsetFnForSalesforce(),
+            SalesforceReceiver.class,
+            config -> {
+                SalesforceStreamingSourceConfig salesforceConfig =
+                    (SalesforceStreamingSourceConfig) config;
+                return new Object[] {
+                    salesforceConfig.getAuthenticatorCredentials(),
+                    salesforceConfig.getPushTopicName()
+                };
+            }))
+    .withPluginConfig(pluginConfig)
+    .withKeyClass(NullWritable.class)
+    .withValueClass(String.class);
+p.apply("readFromSalesforcePlugin", readTransform);
+{{< /highlight >}}
+
+To learn more please check out [complete examples](https://github.com/apache/beam/tree/master/examples/java/cdap/src/main/java/org/apache/beam/examples/complete/cdap).

Review Comment:
   I assume that this URL is not available since examples PR is not yet merged.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on pull request #23338: [CdapIO] Add CdapIO and SparkReceiverIO to the connectors table in documentation

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on PR #23338:
URL: https://github.com/apache/beam/pull/23338#issuecomment-1326211195

   Retest this please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on a diff in pull request #23338: [CdapIO] Add CdapIO and SparkReceiverIO documentation in website

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on code in PR #23338:
URL: https://github.com/apache/beam/pull/23338#discussion_r1041037294


##########
website/www/site/content/en/documentation/io/built-in/cdap.md:
##########
@@ -0,0 +1,392 @@
+---
+title: "Cdap IO"
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Cdap IO
+
+A `CdapIO` is a transform for reading data from source or writing data to sink CDAP plugin.
+
+## Batch plugins support
+
+`CdapIO` currently supports the following CDAP Batch plugins by referencing `CDAP plugin` class name:
+- [Hubspot Batch Source](https://github.com/data-integrations/hubspot/blob/develop/src/main/java/io/cdap/plugin/hubspot/source/batch/HubspotBatchSource.java)
+- [Hubspot Batch Sink](https://github.com/data-integrations/hubspot/blob/develop/src/main/java/io/cdap/plugin/hubspot/sink/batch/HubspotBatchSink.java)
+- [Salesforce Batch Source](https://github.com/data-integrations/salesforce/blob/develop/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchSource.java)
+- [Salesforce Batch Sink](https://github.com/data-integrations/salesforce/blob/develop/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceBatchSink.java)
+- [ServiceNow Batch Source](https://github.com/data-integrations/servicenow-plugins/blob/develop/src/main/java/io/cdap/plugin/servicenow/source/ServiceNowSource.java)
+- [Zendesk Batch Source](https://github.com/data-integrations/zendesk/blob/develop/src/main/java/io/cdap/plugin/zendesk/source/batch/ZendeskBatchSource.java)
+
+Also, any other CDAP Batch plugin based on Hadoop's `InputFormat` or `OutputFormat` can be used. They can be easily added to the list of supported by class name plugins, for more details please see [CdapIO readme](https://github.com/apache/beam/blob/master/sdks/java/io/cdap/README.md).
+
+## Streaming plugins support
+
+`CdapIO` currently supports CDAP Streaming plugins based on [Apache Spark Receiver](https://spark.apache.org/docs/2.4.0/streaming-custom-receivers.html).
+
+Requirements for CDAP Streaming plugins:
+- CDAP Streaming plugin should be based on `Spark Receiver` (Spark 2.4).
+- CDAP Streaming plugin should support work with offsets.
+- Corresponding Spark Receiver should implement [HasOffset](https://github.com/apache/beam/blob/master/sdks/java/io/sparkreceiver/src/main/java/org/apache/beam/sdk/io/sparkreceiver/HasOffset.java) interface.
+- Records should have the numeric field that represents record offset.
+
+## Batch reading using CdapIO
+
+In order to read from CDAP plugin you will need to pass:
+- `Key` and `Value` classes. You will need to check if these classes have a Beam Coder available.
+- `PluginConfig` object with parameters for certain CDAP plugin.
+
+You can easily build `PluginConfig` object using `ConfigWrapper` class by specifying:
+
+- Class of the needed `PluginConfig`.
+- `Map<String, Object>` parameters map for corresponding CDAP plugin.
+
+For example:
+
+{{< highlight java >}}
+MyPluginConfig pluginConfig =
+new ConfigWrapper<>(MyPluginConfig.class).withParams(pluginConfigParams).build();
+{{< /highlight >}}
+
+### Read data by plugin class name
+
+Some CDAP plugins are already supported and can be used just by plugin class name.
+
+For example:
+
+{{< highlight java >}}
+CdapIO.Read<NullWritable, JsonElement> readTransform =
+CdapIO.<NullWritable, JsonElement>read()
+    .withCdapPluginClass(HubspotBatchSource.class)
+    .withPluginConfig(pluginConfig)
+    .withKeyClass(NullWritable.class)
+    .withValueClass(JsonElement.class);
+p.apply("read", readTransform);
+{{< /highlight >}}
+
+### Read data with building Batch Plugin
+
+If CDAP plugin is not supported by plugin class name, you can easily build `Plugin` object by passing the following parameters:
+
+- Class of CDAP Batch plugin.
+- The `InputFormat` class used to connect to your CDAP plugin of choice.
+- The `InputFormatProvider` class used to provide `InputFormat`.
+
+Then you will be able to pass this `Plugin` object to `CdapIO`.
+
+For example:
+
+{{< highlight java >}}
+CdapIO.Read<String, String> readTransform =
+CdapIO.<String, String>read()
+    .withCdapPlugin(
+        Plugin.createBatch(
+            MyCdapPlugin.class,
+            MyInputFormat.class,
+            MyInputFormatProvider.class))
+    .withPluginConfig(pluginConfig)
+    .withKeyClass(String.class)
+    .withValueClass(String.class);
+p.apply("read", readTransform);
+{{< /highlight >}}
+
+### Examples for specific CDAP plugins
+
+#### CDAP Hubspot Batch Source plugin
+
+{{< highlight java >}}
+SourceHubspotConfig pluginConfig =
+    new ConfigWrapper<>(SourceHubspotConfig.class).withParams(pluginConfigParams).build();
+CdapIO<NullWritable, JsonElement> readTransform =
+CdapIO.<NullWritable, JsonElement>read()
+    .withCdapPluginClass(HubspotBatchSource.class)
+    .withPluginConfig(pluginConfig)
+    .withKeyClass(NullWritable.class)
+    .withValueClass(JsonElement.class);
+p.apply("readFromHubspotPlugin", readTransform);
+{{< /highlight >}}
+
+#### CDAP Salesforce Batch Source plugin
+
+{{< highlight java >}}
+SalesforceSourceConfig pluginConfig =
+    new ConfigWrapper<>(SalesforceSourceConfig.class).withParams(pluginConfigParams).build();
+CdapIO<Schema, LinkedHashMap> readTransform =
+CdapIO.<Schema, LinkedHashMap>read()
+    .withCdapPluginClass(SalesforceBatchSource.class)
+    .withPluginConfig(pluginConfig)
+    .withKeyClass(Schema.class)
+    .withValueClass(LinkedHashMap.class);
+p.apply("readFromSalesforcePlugin", readTransform);
+{{< /highlight >}}
+
+#### CDAP ServiceNow Batch Source plugin
+
+{{< highlight java >}}
+ServiceNowSourceConfig pluginConfig =
+    new ConfigWrapper<>(ServiceNowSourceConfig.class).withParams(pluginConfigParams).build();
+CdapIO<NullWritable, StructuredRecord> readTransform =
+CdapIO.<NullWritable, StructuredRecord>read()
+    .withCdapPluginClass(ServiceNowSource.class)
+    .withPluginConfig(pluginConfig)
+    .withKeyClass(NullWritable.class)
+    .withValueClass(StructuredRecord.class);
+p.apply("readFromServiceNowPlugin", readTransform);
+{{< /highlight >}}
+
+#### CDAP Zendesk Batch Source plugin
+
+{{< highlight java >}}
+ZendeskBatchSourceConfig pluginConfig =
+    new ConfigWrapper<>(ZendeskBatchSourceConfig.class).withParams(pluginConfigParams).build();
+CdapIO<NullWritable, StructuredRecord> readTransform =
+CdapIO.<NullWritable, StructuredRecord>read()
+    .withCdapPluginClass(ZendeskBatchSource.class)
+    .withPluginConfig(pluginConfig)
+    .withKeyClass(NullWritable.class)
+    .withValueClass(StructuredRecord.class);
+p.apply("readFromZendeskPlugin", readTransform);
+{{< /highlight >}}
+
+To learn more please check out [complete examples](https://github.com/apache/beam/tree/master/examples/java/cdap/src/main/java/org/apache/beam/examples/complete/cdap).
+
+## Batch writing using CdapIO
+
+In order to write to CDAP plugin you will need to pass:
+- `Key` and `Value` classes. You will need to check if these classes have a Beam Coder available.
+- `locksDirPath`, which is locks directory path where locks will be stored. This parameter is needed for Hadoop External Synchronization (mechanism for acquiring locks related to the write job).
+- `PluginConfig` object with parameters for certain CDAP plugin.
+
+You can easily build `PluginConfig` object using `ConfigWrapper` class by specifying:
+
+- Class of the needed `PluginConfig`.
+- `Map<String, Object>` parameters map for corresponding CDAP plugin.
+
+For example:
+
+{{< highlight java >}}
+MyPluginConfig pluginConfig =
+new ConfigWrapper<>(MyPluginConfig.class).withParams(pluginConfigParams).build();
+{{< /highlight >}}
+
+### Write data by plugin class name
+
+Some CDAP plugins are already supported and can be used just by plugin class name.
+
+For example:
+
+{{< highlight java >}}
+CdapIO.Write<NullWritable, String> readTransform =
+CdapIO.<NullWritable, String>write()
+    .withCdapPluginClass(HubspotBatchSink.class)
+    .withPluginConfig(pluginConfig)
+    .withKeyClass(NullWritable.class)
+    .withValueClass(String.class)
+    .withLocksDirPath(locksDirPath);
+p.apply("write", writeTransform);
+{{< /highlight >}}
+
+### Write data with building Batch Plugin
+
+If CDAP plugin is not supported by plugin class name, you can easily build `Plugin` object by passing the following parameters:
+
+- Class of CDAP plugin.
+- The `OutputFormat` class used to connect to your CDAP plugin of choice.
+- The `OutputFormatProvider` class used to provide `OutputFormat`.
+
+Then you will be able to pass this `Plugin` object to `CdapIO`.
+
+For example:
+
+{{< highlight java >}}
+CdapIO.Write<String, String> writeTransform =
+CdapIO.<String, String>write()
+    .withCdapPlugin(
+        Plugin.createBatch(
+            MyCdapPlugin.class,
+            MyOutputFormat.class,
+            MyOutputFormatProvider.class))
+    .withPluginConfig(pluginConfig)
+    .withKeyClass(String.class)
+    .withValueClass(String.class)
+    .withLocksDirPath(locksDirPath);
+p.apply("write", writeTransform);
+{{< /highlight >}}
+
+### Examples for specific CDAP plugins
+
+#### CDAP Hubspot Batch Sink plugin
+
+{{< highlight java >}}
+SinkHubspotConfig pluginConfig =
+    new ConfigWrapper<>(SinkHubspotConfig.class).withParams(pluginConfigParams).build();
+CdapIO<NullWritable, String> writeTransform =
+CdapIO.<NullWritable, String>write()
+    .withCdapPluginClass(pluginClass)
+    .withPluginConfig(pluginConfig)
+    .withKeyClass(NullWritable.class)
+    .withValueClass(String.class)
+    .withLocksDirPath(locksDirPath);
+p.apply("writeToHubspotPlugin", writeTransform);
+{{< /highlight >}}
+
+#### CDAP Salesforce Batch Sink plugin
+
+{{< highlight java >}}
+SalesforceSinkConfig pluginConfig =
+    new ConfigWrapper<>(SalesforceSinkConfig.class).withParams(pluginConfigParams).build();
+CdapIO<NullWritable, CSVRecord> writeTransform =
+CdapIO.<NullWritable, CSVRecord>write()
+    .withCdapPluginClass(pluginClass)
+    .withPluginConfig(pluginConfig)
+    .withKeyClass(NullWritable.class)
+    .withValueClass(CSVRecord.class)
+    .withLocksDirPath(locksDirPath);
+p.apply("writeToSalesforcePlugin", writeTransform);
+{{< /highlight >}}
+
+To learn more please check out [complete examples](https://github.com/apache/beam/tree/master/examples/java/cdap/src/main/java/org/apache/beam/examples/complete/cdap).
+
+## Streaming reading using CdapIO
+
+In order to read from CDAP plugin you will need to pass:
+- `Key` and `Value` classes. You will need to check if these classes have a Beam Coder available.
+- `PluginConfig` object with parameters for certain CDAP plugin.
+
+You can easily build `PluginConfig` object using `ConfigWrapper` class by specifying:
+
+- Class of the needed `PluginConfig`.
+- `Map<String, Object>` parameters map for corresponding CDAP plugin.
+
+For example:
+
+{{< highlight java >}}
+MyPluginConfig pluginConfig =
+new ConfigWrapper<>(MyPluginConfig.class).withParams(pluginConfigParams).build();
+{{< /highlight >}}
+
+### Read data by plugin class name
+
+Some CDAP plugins are already supported and can be used just by plugin class name.
+
+For example:
+
+{{< highlight java >}}
+CdapIO.Read<String, String> readTransform =
+CdapIO.<String, String>read()
+    .withCdapPluginClass(MyStreamingPlugin.class)
+    .withPluginConfig(pluginConfig)
+    .withKeyClass(NullWritable.class)
+    .withValueClass(String.class);
+p.apply("read", readTransform);
+{{< /highlight >}}
+
+### Read data with building Streaming Plugin
+
+If CDAP plugin is not supported by plugin class name, you can easily build `Plugin` object by passing the following parameters:
+
+- Class of CDAP Streaming plugin.
+- `getOffsetFn`, which is `SerializableFunction` that defines how to get `Long` record offset from a record.
+- `receiverClass`, which is Spark (v 2.4) `Receiver` class associated with CDAP plugin.
+- (Optionally) `getReceiverArgsFromConfigFn`, which is `SerializableFunction` that defines how to get constructor arguments for Spark `Receiver` using `PluginConfig` object.
+
+Then you will be able to pass this `Plugin` object to `CdapIO`.
+
+For example:
+
+{{< highlight java >}}
+CdapIO.Read<String, String> readTransform =
+CdapIO.<String, String>read()
+    .withCdapPlugin(
+        Plugin.createStreaming(
+            MyStreamingPlugin.class,
+            myGetOffsetFn,
+            MyReceiver.class,
+            myGetReceiverArgsFromConfigFn))
+    .withPluginConfig(pluginConfig)
+    .withKeyClass(NullWritable.class)
+    .withValueClass(String.class);
+p.apply("read", readTransform);
+{{< /highlight >}}
+
+### Read data with optional parameters
+
+Optionally you can pass the following optional parameters:
+
+- `pullFrequencySec`, which is delay in seconds between polling for new records updates.
+- `startOffset`, which is inclusive start offset from which the reading should be started.
+
+For example:
+
+{{< highlight java >}}
+CdapIO.Read<String, String> readTransform =
+CdapIO.<String, String>read()
+    .withCdapPluginClass(MyStreamingPlugin.class)
+    .withPluginConfig(pluginConfig)
+    .withKeyClass(NullWritable.class)
+    .withValueClass(String.class)
+    .withPullFrequencySec(1L)
+    .withStartOffset(1L);
+p.apply("read", readTransform);
+{{< /highlight >}}
+
+### Examples for specific CDAP plugins
+
+#### CDAP Hubspot Streaming Source plugin
+
+{{< highlight java >}}
+HubspotStreamingSourceConfig pluginConfig =
+    new ConfigWrapper<>(HubspotStreamingSourceConfig.class)
+        .withParams(pluginConfigParams).build();
+CdapIO.Read<NullWritable, String> readTransform =
+CdapIO.<NullWritable, String>read()
+    .withCdapPlugin(
+        Plugin.createStreaming(
+        HubspotStreamingSource.class,
+        GetOffsetUtils.getOffsetFnForHubspot(),
+        HubspotReceiver.class))
+    .withPluginConfig(pluginConfig)
+    .withKeyClass(NullWritable.class)
+    .withValueClass(String.class);
+p.apply("readFromHubspotPlugin", readTransform);
+{{< /highlight >}}
+
+#### CDAP Salesforce Streaming Source plugin
+
+{{< highlight java >}}
+SalesforceStreamingSourceConfig pluginConfig =
+    new ConfigWrapper<>(SalesforceStreamingSourceConfig.class)
+        .withParams(pluginConfigParams).build();
+CdapIO.Read<NullWritable, String> readTransform =
+CdapIO.<NullWritable, String>read()
+    .withCdapPlugin(
+        Plugin.createStreaming(
+            SalesforceStreamingSource.class,
+            GetOffsetUtils.getOffsetFnForSalesforce(),
+            SalesforceReceiver.class,
+            config -> {
+                SalesforceStreamingSourceConfig salesforceConfig =
+                    (SalesforceStreamingSourceConfig) config;
+                return new Object[] {
+                    salesforceConfig.getAuthenticatorCredentials(),
+                    salesforceConfig.getPushTopicName()
+                };
+            }))
+    .withPluginConfig(pluginConfig)
+    .withKeyClass(NullWritable.class)
+    .withValueClass(String.class);
+p.apply("readFromSalesforcePlugin", readTransform);
+{{< /highlight >}}
+
+To learn more please check out [complete examples](https://github.com/apache/beam/tree/master/examples/java/cdap/src/main/java/org/apache/beam/examples/complete/cdap).

Review Comment:
   Yes, we will merge this PR after the examples PR, so the link will be working.



##########
website/www/site/content/en/documentation/io/built-in/sparkreceiver.md:
##########
@@ -0,0 +1,101 @@
+---

Review Comment:
   Done



##########
website/www/site/content/en/documentation/io/built-in/cdap.md:
##########
@@ -0,0 +1,392 @@
+---

Review Comment:
   Done



##########
website/www/site/content/en/documentation/io/built-in/cdap.md:
##########
@@ -0,0 +1,392 @@
+---
+title: "Cdap IO"
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Cdap IO
+
+A `CdapIO` is a transform for reading data from source or writing data to sink CDAP plugin.
+
+## Batch plugins support
+
+`CdapIO` currently supports the following CDAP Batch plugins by referencing `CDAP plugin` class name:
+- [Hubspot Batch Source](https://github.com/data-integrations/hubspot/blob/develop/src/main/java/io/cdap/plugin/hubspot/source/batch/HubspotBatchSource.java)
+- [Hubspot Batch Sink](https://github.com/data-integrations/hubspot/blob/develop/src/main/java/io/cdap/plugin/hubspot/sink/batch/HubspotBatchSink.java)
+- [Salesforce Batch Source](https://github.com/data-integrations/salesforce/blob/develop/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchSource.java)
+- [Salesforce Batch Sink](https://github.com/data-integrations/salesforce/blob/develop/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceBatchSink.java)
+- [ServiceNow Batch Source](https://github.com/data-integrations/servicenow-plugins/blob/develop/src/main/java/io/cdap/plugin/servicenow/source/ServiceNowSource.java)
+- [Zendesk Batch Source](https://github.com/data-integrations/zendesk/blob/develop/src/main/java/io/cdap/plugin/zendesk/source/batch/ZendeskBatchSource.java)
+
+Also, any other CDAP Batch plugin based on Hadoop's `InputFormat` or `OutputFormat` can be used. They can be easily added to the list of supported by class name plugins, for more details please see [CdapIO readme](https://github.com/apache/beam/blob/master/sdks/java/io/cdap/README.md).
+
+## Streaming plugins support
+
+`CdapIO` currently supports CDAP Streaming plugins based on [Apache Spark Receiver](https://spark.apache.org/docs/2.4.0/streaming-custom-receivers.html).
+
+Requirements for CDAP Streaming plugins:
+- CDAP Streaming plugin should be based on `Spark Receiver` (Spark 2.4).
+- CDAP Streaming plugin should support work with offsets.
+- Corresponding Spark Receiver should implement [HasOffset](https://github.com/apache/beam/blob/master/sdks/java/io/sparkreceiver/src/main/java/org/apache/beam/sdk/io/sparkreceiver/HasOffset.java) interface.
+- Records should have the numeric field that represents record offset.
+
+## Batch reading using CdapIO
+
+In order to read from CDAP plugin you will need to pass:
+- `Key` and `Value` classes. You will need to check if these classes have a Beam Coder available.
+- `PluginConfig` object with parameters for certain CDAP plugin.
+
+You can easily build `PluginConfig` object using `ConfigWrapper` class by specifying:
+
+- Class of the needed `PluginConfig`.
+- `Map<String, Object>` parameters map for corresponding CDAP plugin.
+
+For example:
+
+{{< highlight java >}}
+MyPluginConfig pluginConfig =
+new ConfigWrapper<>(MyPluginConfig.class).withParams(pluginConfigParams).build();
+{{< /highlight >}}
+
+### Read data by plugin class name
+
+Some CDAP plugins are already supported and can be used just by plugin class name.
+
+For example:
+
+{{< highlight java >}}
+CdapIO.Read<NullWritable, JsonElement> readTransform =

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Lizzfox commented on pull request #23338: [CdapIO] Add CdapIO and SparkReceiverIO documentation in website

Posted by GitBox <gi...@apache.org>.
Lizzfox commented on PR #23338:
URL: https://github.com/apache/beam/pull/23338#issuecomment-1332466614

   R: @johnjcasey
   R: @chamikaramj
   R: @aromanenko-dev
   
   We've published Apache Beam website documentation for CDAP and SparkReceiver IOs PR. It's ready for review.
   
   Thank you for your attention!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on a diff in pull request #23338: [CdapIO] Add CdapIO and SparkReceiverIO documentation in website

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on code in PR #23338:
URL: https://github.com/apache/beam/pull/23338#discussion_r1040836067


##########
website/www/site/content/en/documentation/io/built-in/cdap.md:
##########
@@ -0,0 +1,392 @@
+---
+title: "Cdap IO"
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Cdap IO
+
+A `CdapIO` is a transform for reading data from source or writing data to sink CDAP plugin.
+
+## Batch plugins support
+
+`CdapIO` currently supports the following CDAP Batch plugins by referencing `CDAP plugin` class name:
+- [Hubspot Batch Source](https://github.com/data-integrations/hubspot/blob/develop/src/main/java/io/cdap/plugin/hubspot/source/batch/HubspotBatchSource.java)
+- [Hubspot Batch Sink](https://github.com/data-integrations/hubspot/blob/develop/src/main/java/io/cdap/plugin/hubspot/sink/batch/HubspotBatchSink.java)
+- [Salesforce Batch Source](https://github.com/data-integrations/salesforce/blob/develop/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchSource.java)
+- [Salesforce Batch Sink](https://github.com/data-integrations/salesforce/blob/develop/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceBatchSink.java)
+- [ServiceNow Batch Source](https://github.com/data-integrations/servicenow-plugins/blob/develop/src/main/java/io/cdap/plugin/servicenow/source/ServiceNowSource.java)
+- [Zendesk Batch Source](https://github.com/data-integrations/zendesk/blob/develop/src/main/java/io/cdap/plugin/zendesk/source/batch/ZendeskBatchSource.java)
+
+Also, any other CDAP Batch plugin based on Hadoop's `InputFormat` or `OutputFormat` can be used. They can be easily added to the list of supported by class name plugins, for more details please see [CdapIO readme](https://github.com/apache/beam/blob/master/sdks/java/io/cdap/README.md).
+
+## Streaming plugins support
+
+`CdapIO` currently supports CDAP Streaming plugins based on [Apache Spark Receiver](https://spark.apache.org/docs/2.4.0/streaming-custom-receivers.html).
+
+Requirements for CDAP Streaming plugins:
+- CDAP Streaming plugin should be based on `Spark Receiver` (Spark 2.4).
+- CDAP Streaming plugin should support work with offsets.
+- Corresponding Spark Receiver should implement [HasOffset](https://github.com/apache/beam/blob/master/sdks/java/io/sparkreceiver/src/main/java/org/apache/beam/sdk/io/sparkreceiver/HasOffset.java) interface.
+- Records should have the numeric field that represents record offset.
+
+## Batch reading using CdapIO
+
+In order to read from CDAP plugin you will need to pass:
+- `Key` and `Value` classes. You will need to check if these classes have a Beam Coder available.
+- `PluginConfig` object with parameters for certain CDAP plugin.
+
+You can easily build `PluginConfig` object using `ConfigWrapper` class by specifying:
+
+- Class of the needed `PluginConfig`.
+- `Map<String, Object>` parameters map for corresponding CDAP plugin.
+
+For example:
+
+{{< highlight java >}}
+MyPluginConfig pluginConfig =
+new ConfigWrapper<>(MyPluginConfig.class).withParams(pluginConfigParams).build();
+{{< /highlight >}}
+
+### Read data by plugin class name
+
+Some CDAP plugins are already supported and can be used just by plugin class name.
+
+For example:
+
+{{< highlight java >}}
+CdapIO.Read<NullWritable, JsonElement> readTransform =
+CdapIO.<NullWritable, JsonElement>read()
+    .withCdapPluginClass(HubspotBatchSource.class)
+    .withPluginConfig(pluginConfig)
+    .withKeyClass(NullWritable.class)
+    .withValueClass(JsonElement.class);
+p.apply("read", readTransform);
+{{< /highlight >}}
+
+### Read data with building Batch Plugin
+
+If CDAP plugin is not supported by plugin class name, you can easily build `Plugin` object by passing the following parameters:
+
+- Class of CDAP Batch plugin.
+- The `InputFormat` class used to connect to your CDAP plugin of choice.
+- The `InputFormatProvider` class used to provide `InputFormat`.
+
+Then you will be able to pass this `Plugin` object to `CdapIO`.
+
+For example:
+
+{{< highlight java >}}
+CdapIO.Read<String, String> readTransform =
+CdapIO.<String, String>read()
+    .withCdapPlugin(
+        Plugin.createBatch(
+            MyCdapPlugin.class,
+            MyInputFormat.class,
+            MyInputFormatProvider.class))
+    .withPluginConfig(pluginConfig)
+    .withKeyClass(String.class)
+    .withValueClass(String.class);
+p.apply("read", readTransform);
+{{< /highlight >}}
+
+### Examples for specific CDAP plugins
+
+#### CDAP Hubspot Batch Source plugin
+
+{{< highlight java >}}
+SourceHubspotConfig pluginConfig =
+    new ConfigWrapper<>(SourceHubspotConfig.class).withParams(pluginConfigParams).build();
+CdapIO<NullWritable, JsonElement> readTransform =
+CdapIO.<NullWritable, JsonElement>read()
+    .withCdapPluginClass(HubspotBatchSource.class)
+    .withPluginConfig(pluginConfig)
+    .withKeyClass(NullWritable.class)
+    .withValueClass(JsonElement.class);
+p.apply("readFromHubspotPlugin", readTransform);
+{{< /highlight >}}
+
+#### CDAP Salesforce Batch Source plugin
+
+{{< highlight java >}}
+SalesforceSourceConfig pluginConfig =
+    new ConfigWrapper<>(SalesforceSourceConfig.class).withParams(pluginConfigParams).build();
+CdapIO<Schema, LinkedHashMap> readTransform =
+CdapIO.<Schema, LinkedHashMap>read()
+    .withCdapPluginClass(SalesforceBatchSource.class)
+    .withPluginConfig(pluginConfig)
+    .withKeyClass(Schema.class)

Review Comment:
   `SalesforceBatchSource` [plugin](https://github.com/data-integrations/salesforce/blob/2c6916c733f11767b32b9b95bfdf46ec96129a3a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchSource.java#L65) uses  `io.cdap.cdap.api.data.schema.Shema` as a key.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org