You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/09 12:27:55 UTC

[GitHub] [beam] Amar3tto opened a new pull request, #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Amar3tto opened a new pull request, #21765:
URL: https://github.com/apache/beam/pull/21765

   Cdap package contains an io-connector for [CDAP](https://cdap.atlassian.net/wiki/home) sources and sinks.
   `CdapIO` is the primary class used in working with the connector in a pipeline. It contains static `PTransform` classes that represents reads/writes within the I/O (e.g. `Read`, `Write`).
   
   ![package drawio (1)](https://user-images.githubusercontent.com/17278361/157455025-1a58eb9d-0930-4a83-aaa2-9c226fe0281d.png)
   
   `CdapIO.<~>read()` method method starts constructing a `CdapIO.Read` transform, it receives a type parameters`<~>` that specify the output type for CDAP source.
   The `Read` class implements an interface similar to the builder pattern and has simple `withX()` methods for passing configuration and other settings needed before start reading from CDAP source:
   - `withCdapPluginClass()` - pass a source/sink plugin class marked with CDAP `@Plugin` annotation. (e.g. `ZendeskBatchSource.class`) 
   - `withPluginConfig()`- pass a corresponding `PluginConfig` object (e.g. `ZendeskBatchSourceConfig.class`)
   - `withKeyClass()` - specify the key class for the output (e.g. `NullWritable.class`)
   - `withValueClass()`- specify the value class for the output (e.g. `StructuredRecord.class`)
   
   Inside the `.withCdapPluginClass()` method, the `CdapPlugin` object is constructed. `CdapPlugin` object contains the following information about CDAP plugin:
   - Bounded/Unbounded
   - Input/Output
   - `BatchContextImpl` object for stubbing some CDAP-specific behavior
   - Format / Key / Value classes needed for filling Hadoop `Configuration` object
   - `T cdapPluginObj` for performing `prepareRun()` call before reading
   
   The following steps are performed in the `Read.expand()` method:
   - Validating that all configs and other obligatory fields are set
   - Call `CdapPlugin.prepareRun()` method before reading (see javadoc)
   - Construct the needed `Read` transform depending on whether the `CdapPlugin` is bounded / unbounded:
   - - For bounded sources `HadoopFormatIO.<K, V>read()` method is called passing filled Hadoop `Configuration` object.
   - - For unbounded sources `SparkReceiverIO.<~>read()` method is called.
   
   ![156-seq drawio(2)](https://user-images.githubusercontent.com/17278361/155947282-342fa263-66c4-45f9-9ac1-2863f563138e.png)
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Add a link to the appropriate issue in your description, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on PR #21765:
URL: https://github.com/apache/beam/pull/21765#issuecomment-1171035501

   Run Python PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on PR #21765:
URL: https://github.com/apache/beam/pull/21765#issuecomment-1165536257

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on a diff in pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on code in PR #21765:
URL: https://github.com/apache/beam/pull/21765#discussion_r897896655


##########
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/CdapIO.java:
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import io.cdap.cdap.api.plugin.PluginConfig;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.io.hadoop.format.HDFSSynchronization;
+import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.hadoop.conf.Configuration;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * An unbounded/bounded sources and sinks from <a
+ * href="https://github.com/data-integrations">CDAP</a> plugins.
+ */
+@SuppressWarnings({
+  "argument.type.incompatible",
+  "return.type.incompatible",
+  "dereference.of.nullable",
+  "UnnecessaryParentheses"
+})
+public class CdapIO {
+
+  public static <K, V> Read<K, V> read() {
+    return new AutoValue_CdapIO_Read.Builder<K, V>().build();
+  }
+
+  public static <K, V> Write<K, V> write() {
+    return new AutoValue_CdapIO_Write.Builder<K, V>().build();
+  }
+
+  /** A {@link PTransform} to read from CDAP source. */
+  @AutoValue
+  @AutoValue.CopyAnnotations
+  public abstract static class Read<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> {
+
+    abstract @Nullable PluginConfig getPluginConfig();
+
+    abstract @Nullable Plugin getCdapPlugin();
+
+    abstract @Nullable Class<K> getKeyClass();
+
+    abstract @Nullable Class<V> getValueClass();
+
+    abstract Builder<K, V> toBuilder();
+
+    @Experimental(Experimental.Kind.PORTABILITY)
+    @AutoValue.Builder
+    abstract static class Builder<K, V> {
+
+      abstract Builder<K, V> setPluginConfig(PluginConfig config);
+
+      abstract Builder<K, V> setCdapPlugin(Plugin plugin);
+
+      abstract Builder<K, V> setKeyClass(Class<K> keyClass);
+
+      abstract Builder<K, V> setValueClass(Class<V> valueClass);
+
+      abstract Read<K, V> build();
+    }
+
+    public Read<K, V> withCdapPlugin(Plugin plugin) {
+      checkArgument(plugin != null, "Cdap plugin can not be null");
+      return toBuilder().setCdapPlugin(plugin).build();
+    }
+
+    public Read<K, V> withCdapPluginClass(Class<?> cdapPluginClass) {
+      checkArgument(cdapPluginClass != null, "Cdap plugin class can not be null");
+      Plugin plugin = MappingUtils.getPluginByClass(cdapPluginClass);
+      checkArgument(plugin != null, "Can not instantiate Plugin object from provided plugin class");
+      return toBuilder().setCdapPlugin(plugin).build();
+    }
+
+    public Read<K, V> withPluginConfig(PluginConfig pluginConfig) {
+      checkArgument(pluginConfig != null, "Plugin config can not be null");
+      return toBuilder().setPluginConfig(pluginConfig).build();
+    }
+
+    public Read<K, V> withKeyClass(Class<K> keyClass) {
+      checkArgument(keyClass != null, "Key class can not be null");
+      return toBuilder().setKeyClass(keyClass).build();
+    }
+
+    public Read<K, V> withValueClass(Class<V> valueClass) {
+      checkArgument(valueClass != null, "Value class can not be null");
+      return toBuilder().setValueClass(valueClass).build();
+    }
+
+    @Override
+    public PCollection<KV<K, V>> expand(PBegin input) {
+      validateTransform();
+
+      getCdapPlugin()
+          .withConfig(getPluginConfig())
+          .withHadoopConfiguration(getKeyClass(), getValueClass())
+          .prepareRun();
+
+      if (getCdapPlugin().isUnbounded()) {
+        // TODO: implement SparkReceiverIO.<~>read()
+        return null;
+      } else {
+        Configuration hConf = getCdapPlugin().getHadoopConfiguration();
+        HadoopFormatIO.Read<K, V> readFromHadoop =
+            HadoopFormatIO.<K, V>read().withConfiguration(hConf);
+        return input.apply(readFromHadoop);
+      }
+    }
+
+    public void validateTransform() {
+      checkArgument(getCdapPlugin() != null, "withCdapPluginClass() is required");
+      checkArgument(getPluginConfig() != null, "withPluginConfig() is required");
+      checkArgument(getKeyClass() != null, "withKeyClass() is required");
+      checkArgument(getValueClass() != null, "withValueClass() is required");
+    }
+  }
+
+  /** A {@link PTransform} to read from CDAP source. */
+  @AutoValue
+  @AutoValue.CopyAnnotations
+  public abstract static class Write<K, V> extends PTransform<PCollection<KV<K, V>>, PDone> {
+
+    abstract @Nullable PluginConfig getPluginConfig();
+
+    abstract @Nullable Plugin getCdapPlugin();
+
+    abstract @Nullable Class<K> getKeyClass();
+
+    abstract @Nullable Class<V> getValueClass();
+
+    abstract @Nullable String getLocksDirPath();
+
+    abstract Builder<K, V> toBuilder();
+
+    @Experimental(Experimental.Kind.PORTABILITY)
+    @AutoValue.Builder
+    abstract static class Builder<K, V> {
+
+      abstract Builder<K, V> setPluginConfig(PluginConfig config);
+
+      abstract Builder<K, V> setCdapPlugin(Plugin plugin);
+
+      abstract Builder<K, V> setKeyClass(Class<K> keyClass);
+
+      abstract Builder<K, V> setValueClass(Class<V> valueClass);
+
+      abstract Builder<K, V> setLocksDirPath(String path);
+
+      abstract Write<K, V> build();
+    }
+
+    public Write<K, V> withCdapPlugin(Plugin plugin) {
+      checkArgument(plugin != null, "Cdap plugin can not be null");

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on PR #21765:
URL: https://github.com/apache/beam/pull/21765#issuecomment-1165290020

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on a diff in pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on code in PR #21765:
URL: https://github.com/apache/beam/pull/21765#discussion_r897900118


##########
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/MappingUtils.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import io.cdap.plugin.common.SourceInputFormatProvider;
+import io.cdap.plugin.hubspot.sink.batch.HubspotBatchSink;
+import io.cdap.plugin.hubspot.sink.batch.HubspotOutputFormat;
+import io.cdap.plugin.hubspot.source.batch.HubspotBatchSource;
+import io.cdap.plugin.hubspot.source.batch.HubspotInputFormat;
+import io.cdap.plugin.hubspot.source.batch.HubspotInputFormatProvider;
+import io.cdap.plugin.salesforce.plugin.source.batch.SalesforceBatchSource;
+import io.cdap.plugin.salesforce.plugin.source.batch.SalesforceInputFormat;
+import io.cdap.plugin.salesforce.plugin.source.batch.SalesforceInputFormatProvider;
+import io.cdap.plugin.servicenow.source.ServiceNowInputFormat;
+import io.cdap.plugin.servicenow.source.ServiceNowSource;
+import io.cdap.plugin.zendesk.source.batch.ZendeskBatchSource;
+import io.cdap.plugin.zendesk.source.batch.ZendeskInputFormat;
+import io.cdap.plugin.zendesk.source.batch.ZendeskInputFormatProvider;
+import javax.annotation.Nullable;
+
+public class MappingUtils {
+
+  public static @Nullable Plugin getPluginByClass(Class<?> pluginClass) {
+    if (pluginClass.equals(SalesforceBatchSource.class)) {

Review Comment:
   Good question,
   Currently we have `withCdapPlugin(Plugin plugin)` and `withCdapPluginClass(Class<?> cdapPluginClass)` methods.
   So this static method is needed only for the second case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on a diff in pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on code in PR #21765:
URL: https://github.com/apache/beam/pull/21765#discussion_r905394971


##########
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/BatchSinkContextImpl.java:
##########
@@ -24,7 +24,10 @@
 public class BatchSinkContextImpl extends BatchContextImpl implements BatchSinkContext {
 
   @Override
-  public void addOutput(Output output) {}
+  public void addOutput(Output output) {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] asf-ci commented on pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
asf-ci commented on PR #21765:
URL: https://github.com/apache/beam/pull/21765#issuecomment-1151058089

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on a diff in pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on code in PR #21765:
URL: https://github.com/apache/beam/pull/21765#discussion_r897862777


##########
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/CdapIO.java:
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import io.cdap.cdap.api.plugin.PluginConfig;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.io.hadoop.format.HDFSSynchronization;
+import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.hadoop.conf.Configuration;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * An unbounded/bounded sources and sinks from <a
+ * href="https://github.com/data-integrations">CDAP</a> plugins.
+ */
+@SuppressWarnings({
+  "argument.type.incompatible",
+  "return.type.incompatible",
+  "dereference.of.nullable",
+  "UnnecessaryParentheses"
+})
+public class CdapIO {
+
+  public static <K, V> Read<K, V> read() {
+    return new AutoValue_CdapIO_Read.Builder<K, V>().build();
+  }
+
+  public static <K, V> Write<K, V> write() {
+    return new AutoValue_CdapIO_Write.Builder<K, V>().build();
+  }
+
+  /** A {@link PTransform} to read from CDAP source. */
+  @AutoValue
+  @AutoValue.CopyAnnotations
+  public abstract static class Read<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> {
+
+    abstract @Nullable PluginConfig getPluginConfig();
+
+    abstract @Nullable Plugin getCdapPlugin();
+
+    abstract @Nullable Class<K> getKeyClass();
+
+    abstract @Nullable Class<V> getValueClass();
+
+    abstract Builder<K, V> toBuilder();
+
+    @Experimental(Experimental.Kind.PORTABILITY)
+    @AutoValue.Builder
+    abstract static class Builder<K, V> {
+
+      abstract Builder<K, V> setPluginConfig(PluginConfig config);
+
+      abstract Builder<K, V> setCdapPlugin(Plugin plugin);
+
+      abstract Builder<K, V> setKeyClass(Class<K> keyClass);
+
+      abstract Builder<K, V> setValueClass(Class<V> valueClass);
+
+      abstract Read<K, V> build();
+    }
+
+    public Read<K, V> withCdapPlugin(Plugin plugin) {
+      checkArgument(plugin != null, "Cdap plugin can not be null");

Review Comment:
   This was written the same way as in HadoopFormatIO, JdbcIO and other IOs.
   Do you think we should change it everywhere or only here or leave it as it is?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on PR #21765:
URL: https://github.com/apache/beam/pull/21765#issuecomment-1158765751

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on a diff in pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on code in PR #21765:
URL: https://github.com/apache/beam/pull/21765#discussion_r905394698


##########
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/MappingUtils.java:
##########
@@ -49,6 +52,6 @@ public class MappingUtils {
       return Plugin.create(
           pluginClass, ServiceNowInputFormat.class, SourceInputFormatProvider.class);
     }
-    return null;
+    throw new UnsupportedOperationException("Given plugin class is not supported!");

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on PR #21765:
URL: https://github.com/apache/beam/pull/21765#issuecomment-1165231487

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on PR #21765:
URL: https://github.com/apache/beam/pull/21765#issuecomment-1165669004

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem commented on pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
pabloem commented on PR #21765:
URL: https://github.com/apache/beam/pull/21765#issuecomment-1170135135

   uggg I don't know why the PR got closed by me just now


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on a diff in pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on code in PR #21765:
URL: https://github.com/apache/beam/pull/21765#discussion_r896908439


##########
buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy:
##########
@@ -630,6 +631,7 @@ class BeamModulePlugin implements Plugin<Project> {
         guava_testlib                               : "com.google.guava:guava-testlib:$guava_version",
         hadoop_client                               : "org.apache.hadoop:hadoop-client:$hadoop_version",
         hadoop_common                               : "org.apache.hadoop:hadoop-common:$hadoop_version",
+        hadoop_mapred                               : "org.apache.hadoop:hadoop-mapred:0.22.0",

Review Comment:
   should this be $hadoop_version instead of a hardcoded value?



##########
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/CdapIO.java:
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import io.cdap.cdap.api.plugin.PluginConfig;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.io.hadoop.format.HDFSSynchronization;
+import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.hadoop.conf.Configuration;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * An unbounded/bounded sources and sinks from <a
+ * href="https://github.com/data-integrations">CDAP</a> plugins.
+ */
+@SuppressWarnings({
+  "argument.type.incompatible",
+  "return.type.incompatible",
+  "dereference.of.nullable",
+  "UnnecessaryParentheses"
+})
+public class CdapIO {
+
+  public static <K, V> Read<K, V> read() {
+    return new AutoValue_CdapIO_Read.Builder<K, V>().build();
+  }
+
+  public static <K, V> Write<K, V> write() {
+    return new AutoValue_CdapIO_Write.Builder<K, V>().build();
+  }
+
+  /** A {@link PTransform} to read from CDAP source. */
+  @AutoValue
+  @AutoValue.CopyAnnotations
+  public abstract static class Read<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> {
+
+    abstract @Nullable PluginConfig getPluginConfig();
+
+    abstract @Nullable Plugin getCdapPlugin();
+
+    abstract @Nullable Class<K> getKeyClass();
+
+    abstract @Nullable Class<V> getValueClass();
+
+    abstract Builder<K, V> toBuilder();
+
+    @Experimental(Experimental.Kind.PORTABILITY)
+    @AutoValue.Builder
+    abstract static class Builder<K, V> {
+
+      abstract Builder<K, V> setPluginConfig(PluginConfig config);
+
+      abstract Builder<K, V> setCdapPlugin(Plugin plugin);
+
+      abstract Builder<K, V> setKeyClass(Class<K> keyClass);
+
+      abstract Builder<K, V> setValueClass(Class<V> valueClass);
+
+      abstract Read<K, V> build();
+    }
+
+    public Read<K, V> withCdapPlugin(Plugin plugin) {
+      checkArgument(plugin != null, "Cdap plugin can not be null");

Review Comment:
   by convention, we are using Preconditions.checkNotNull for these type of checks



##########
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/CdapIO.java:
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import io.cdap.cdap.api.plugin.PluginConfig;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.io.hadoop.format.HDFSSynchronization;
+import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.hadoop.conf.Configuration;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * An unbounded/bounded sources and sinks from <a
+ * href="https://github.com/data-integrations">CDAP</a> plugins.
+ */
+@SuppressWarnings({
+  "argument.type.incompatible",
+  "return.type.incompatible",
+  "dereference.of.nullable",
+  "UnnecessaryParentheses"
+})
+public class CdapIO {
+
+  public static <K, V> Read<K, V> read() {
+    return new AutoValue_CdapIO_Read.Builder<K, V>().build();
+  }
+
+  public static <K, V> Write<K, V> write() {
+    return new AutoValue_CdapIO_Write.Builder<K, V>().build();
+  }
+
+  /** A {@link PTransform} to read from CDAP source. */
+  @AutoValue
+  @AutoValue.CopyAnnotations
+  public abstract static class Read<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> {
+
+    abstract @Nullable PluginConfig getPluginConfig();
+
+    abstract @Nullable Plugin getCdapPlugin();
+
+    abstract @Nullable Class<K> getKeyClass();
+
+    abstract @Nullable Class<V> getValueClass();
+
+    abstract Builder<K, V> toBuilder();
+
+    @Experimental(Experimental.Kind.PORTABILITY)
+    @AutoValue.Builder
+    abstract static class Builder<K, V> {
+
+      abstract Builder<K, V> setPluginConfig(PluginConfig config);
+
+      abstract Builder<K, V> setCdapPlugin(Plugin plugin);
+
+      abstract Builder<K, V> setKeyClass(Class<K> keyClass);
+
+      abstract Builder<K, V> setValueClass(Class<V> valueClass);
+
+      abstract Read<K, V> build();
+    }
+
+    public Read<K, V> withCdapPlugin(Plugin plugin) {
+      checkArgument(plugin != null, "Cdap plugin can not be null");
+      return toBuilder().setCdapPlugin(plugin).build();
+    }
+
+    public Read<K, V> withCdapPluginClass(Class<?> cdapPluginClass) {
+      checkArgument(cdapPluginClass != null, "Cdap plugin class can not be null");
+      Plugin plugin = MappingUtils.getPluginByClass(cdapPluginClass);
+      checkArgument(plugin != null, "Can not instantiate Plugin object from provided plugin class");
+      return toBuilder().setCdapPlugin(plugin).build();
+    }
+
+    public Read<K, V> withPluginConfig(PluginConfig pluginConfig) {
+      checkArgument(pluginConfig != null, "Plugin config can not be null");
+      return toBuilder().setPluginConfig(pluginConfig).build();
+    }
+
+    public Read<K, V> withKeyClass(Class<K> keyClass) {
+      checkArgument(keyClass != null, "Key class can not be null");
+      return toBuilder().setKeyClass(keyClass).build();
+    }
+
+    public Read<K, V> withValueClass(Class<V> valueClass) {
+      checkArgument(valueClass != null, "Value class can not be null");
+      return toBuilder().setValueClass(valueClass).build();
+    }
+
+    @Override
+    public PCollection<KV<K, V>> expand(PBegin input) {
+      validateTransform();
+
+      getCdapPlugin()
+          .withConfig(getPluginConfig())
+          .withHadoopConfiguration(getKeyClass(), getValueClass())
+          .prepareRun();
+
+      if (getCdapPlugin().isUnbounded()) {
+        // TODO: implement SparkReceiverIO.<~>read()

Review Comment:
   This should probably throw an exception for now, to prevent confusion



##########
sdks/java/io/cdap/build.gradle:
##########
@@ -38,26 +38,35 @@ interface for integration with CDAP plugins."""
  */
 
 dependencies {
-    implementation library.java.avro
     implementation library.java.cdap_api
     implementation library.java.cdap_api_commons
     implementation (library.java.cdap_common) {
-        exclude module: "log4j-over-slf4j"
+        exclude module: 'log4j-over-slf4j'
     }
     implementation library.java.cdap_etl_api
     implementation library.java.cdap_etl_api_spark
+    implementation library.java.cdap_hydrator_common
+    implementation library.java.guava
     implementation library.java.hadoop_common
-    implementation library.java.hadoop_mapreduce_client_core
+    implementation library.java.hadoop_mapred
     implementation library.java.jackson_core
     implementation library.java.jackson_databind
-    implementation library.java.guava
     implementation library.java.slf4j_api
     implementation library.java.tephra
+    implementation library.java.vendored_guava_26_0_jre
     implementation project(path: ":sdks:java:core", configuration: "shadow")
+    implementation project(":sdks:java:io:hadoop-format")
+    implementation "io.cdap.plugin:salesforce-plugins:1.4.0"

Review Comment:
   should these be moved up to the beam module plugin?



##########
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/CdapIO.java:
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import io.cdap.cdap.api.plugin.PluginConfig;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.io.hadoop.format.HDFSSynchronization;
+import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.hadoop.conf.Configuration;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * An unbounded/bounded sources and sinks from <a
+ * href="https://github.com/data-integrations">CDAP</a> plugins.
+ */
+@SuppressWarnings({
+  "argument.type.incompatible",
+  "return.type.incompatible",
+  "dereference.of.nullable",
+  "UnnecessaryParentheses"
+})
+public class CdapIO {
+
+  public static <K, V> Read<K, V> read() {
+    return new AutoValue_CdapIO_Read.Builder<K, V>().build();
+  }
+
+  public static <K, V> Write<K, V> write() {
+    return new AutoValue_CdapIO_Write.Builder<K, V>().build();
+  }
+
+  /** A {@link PTransform} to read from CDAP source. */
+  @AutoValue
+  @AutoValue.CopyAnnotations
+  public abstract static class Read<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> {
+
+    abstract @Nullable PluginConfig getPluginConfig();
+
+    abstract @Nullable Plugin getCdapPlugin();
+
+    abstract @Nullable Class<K> getKeyClass();
+
+    abstract @Nullable Class<V> getValueClass();
+
+    abstract Builder<K, V> toBuilder();
+
+    @Experimental(Experimental.Kind.PORTABILITY)
+    @AutoValue.Builder
+    abstract static class Builder<K, V> {
+
+      abstract Builder<K, V> setPluginConfig(PluginConfig config);
+
+      abstract Builder<K, V> setCdapPlugin(Plugin plugin);
+
+      abstract Builder<K, V> setKeyClass(Class<K> keyClass);
+
+      abstract Builder<K, V> setValueClass(Class<V> valueClass);
+
+      abstract Read<K, V> build();
+    }
+
+    public Read<K, V> withCdapPlugin(Plugin plugin) {
+      checkArgument(plugin != null, "Cdap plugin can not be null");
+      return toBuilder().setCdapPlugin(plugin).build();
+    }
+
+    public Read<K, V> withCdapPluginClass(Class<?> cdapPluginClass) {
+      checkArgument(cdapPluginClass != null, "Cdap plugin class can not be null");
+      Plugin plugin = MappingUtils.getPluginByClass(cdapPluginClass);
+      checkArgument(plugin != null, "Can not instantiate Plugin object from provided plugin class");
+      return toBuilder().setCdapPlugin(plugin).build();
+    }
+
+    public Read<K, V> withPluginConfig(PluginConfig pluginConfig) {
+      checkArgument(pluginConfig != null, "Plugin config can not be null");
+      return toBuilder().setPluginConfig(pluginConfig).build();
+    }
+
+    public Read<K, V> withKeyClass(Class<K> keyClass) {
+      checkArgument(keyClass != null, "Key class can not be null");
+      return toBuilder().setKeyClass(keyClass).build();
+    }
+
+    public Read<K, V> withValueClass(Class<V> valueClass) {
+      checkArgument(valueClass != null, "Value class can not be null");
+      return toBuilder().setValueClass(valueClass).build();
+    }
+
+    @Override
+    public PCollection<KV<K, V>> expand(PBegin input) {
+      validateTransform();
+
+      getCdapPlugin()
+          .withConfig(getPluginConfig())
+          .withHadoopConfiguration(getKeyClass(), getValueClass())
+          .prepareRun();
+
+      if (getCdapPlugin().isUnbounded()) {
+        // TODO: implement SparkReceiverIO.<~>read()
+        return null;
+      } else {
+        Configuration hConf = getCdapPlugin().getHadoopConfiguration();
+        HadoopFormatIO.Read<K, V> readFromHadoop =
+            HadoopFormatIO.<K, V>read().withConfiguration(hConf);
+        return input.apply(readFromHadoop);
+      }
+    }
+
+    public void validateTransform() {
+      checkArgument(getCdapPlugin() != null, "withCdapPluginClass() is required");
+      checkArgument(getPluginConfig() != null, "withPluginConfig() is required");
+      checkArgument(getKeyClass() != null, "withKeyClass() is required");
+      checkArgument(getValueClass() != null, "withValueClass() is required");
+    }
+  }
+
+  /** A {@link PTransform} to read from CDAP source. */
+  @AutoValue
+  @AutoValue.CopyAnnotations
+  public abstract static class Write<K, V> extends PTransform<PCollection<KV<K, V>>, PDone> {
+
+    abstract @Nullable PluginConfig getPluginConfig();
+
+    abstract @Nullable Plugin getCdapPlugin();
+
+    abstract @Nullable Class<K> getKeyClass();
+
+    abstract @Nullable Class<V> getValueClass();
+
+    abstract @Nullable String getLocksDirPath();
+
+    abstract Builder<K, V> toBuilder();
+
+    @Experimental(Experimental.Kind.PORTABILITY)
+    @AutoValue.Builder
+    abstract static class Builder<K, V> {
+
+      abstract Builder<K, V> setPluginConfig(PluginConfig config);
+
+      abstract Builder<K, V> setCdapPlugin(Plugin plugin);
+
+      abstract Builder<K, V> setKeyClass(Class<K> keyClass);
+
+      abstract Builder<K, V> setValueClass(Class<V> valueClass);
+
+      abstract Builder<K, V> setLocksDirPath(String path);
+
+      abstract Write<K, V> build();
+    }
+
+    public Write<K, V> withCdapPlugin(Plugin plugin) {
+      checkArgument(plugin != null, "Cdap plugin can not be null");
+      return toBuilder().setCdapPlugin(plugin).build();
+    }
+
+    public Write<K, V> withCdapPluginClass(Class<?> cdapPluginClass) {
+      checkArgument(cdapPluginClass != null, "Cdap plugin class can not be null");
+      Plugin plugin = MappingUtils.getPluginByClass(cdapPluginClass);
+      checkArgument(plugin != null, "Can not instantiate Plugin object from provided plugin class");
+      return toBuilder().setCdapPlugin(plugin).build();
+    }
+
+    public Write<K, V> withPluginConfig(PluginConfig pluginConfig) {
+      checkArgument(pluginConfig != null, "Plugin config can not be null");
+      return toBuilder().setPluginConfig(pluginConfig).build();
+    }
+
+    public Write<K, V> withKeyClass(Class<K> keyClass) {
+      checkArgument(keyClass != null, "Key class can not be null");
+      return toBuilder().setKeyClass(keyClass).build();
+    }
+
+    public Write<K, V> withLocksDirPath(String locksDirPath) {
+      checkArgument(locksDirPath != null, "Locks dir path can not be null");
+      return toBuilder().setLocksDirPath(locksDirPath).build();
+    }
+
+    public Write<K, V> withValueClass(Class<V> valueClass) {
+      checkArgument(valueClass != null, "Value class can not be null");
+      return toBuilder().setValueClass(valueClass).build();
+    }
+
+    @Override
+    public PDone expand(PCollection<KV<K, V>> input) {
+      getCdapPlugin()
+          .withConfig(getPluginConfig())
+          .withHadoopConfiguration(getKeyClass(), getValueClass())
+          .prepareRun();
+
+      if (getCdapPlugin().isUnbounded()) {
+        // TODO: implement SparkReceiverIO.<~>write()

Review Comment:
   same as above



##########
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/BatchSinkContextImpl.java:
##########
@@ -24,7 +24,10 @@
 public class BatchSinkContextImpl extends BatchContextImpl implements BatchSinkContext {
 
   @Override
-  public void addOutput(Output output) {}
+  public void addOutput(Output output) {

Review Comment:
   this probably should be setoutput if it can only be one element
   
   if it is addoutput, it should be some form of a collection of elements



##########
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/CdapIO.java:
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import io.cdap.cdap.api.plugin.PluginConfig;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.io.hadoop.format.HDFSSynchronization;
+import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.hadoop.conf.Configuration;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * An unbounded/bounded sources and sinks from <a
+ * href="https://github.com/data-integrations">CDAP</a> plugins.
+ */
+@SuppressWarnings({
+  "argument.type.incompatible",
+  "return.type.incompatible",
+  "dereference.of.nullable",
+  "UnnecessaryParentheses"
+})
+public class CdapIO {
+
+  public static <K, V> Read<K, V> read() {
+    return new AutoValue_CdapIO_Read.Builder<K, V>().build();
+  }
+
+  public static <K, V> Write<K, V> write() {
+    return new AutoValue_CdapIO_Write.Builder<K, V>().build();
+  }
+
+  /** A {@link PTransform} to read from CDAP source. */
+  @AutoValue
+  @AutoValue.CopyAnnotations
+  public abstract static class Read<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> {
+
+    abstract @Nullable PluginConfig getPluginConfig();
+
+    abstract @Nullable Plugin getCdapPlugin();
+
+    abstract @Nullable Class<K> getKeyClass();
+
+    abstract @Nullable Class<V> getValueClass();
+
+    abstract Builder<K, V> toBuilder();
+
+    @Experimental(Experimental.Kind.PORTABILITY)
+    @AutoValue.Builder
+    abstract static class Builder<K, V> {
+
+      abstract Builder<K, V> setPluginConfig(PluginConfig config);
+
+      abstract Builder<K, V> setCdapPlugin(Plugin plugin);
+
+      abstract Builder<K, V> setKeyClass(Class<K> keyClass);
+
+      abstract Builder<K, V> setValueClass(Class<V> valueClass);
+
+      abstract Read<K, V> build();
+    }
+
+    public Read<K, V> withCdapPlugin(Plugin plugin) {
+      checkArgument(plugin != null, "Cdap plugin can not be null");
+      return toBuilder().setCdapPlugin(plugin).build();
+    }
+
+    public Read<K, V> withCdapPluginClass(Class<?> cdapPluginClass) {
+      checkArgument(cdapPluginClass != null, "Cdap plugin class can not be null");
+      Plugin plugin = MappingUtils.getPluginByClass(cdapPluginClass);
+      checkArgument(plugin != null, "Can not instantiate Plugin object from provided plugin class");
+      return toBuilder().setCdapPlugin(plugin).build();
+    }
+
+    public Read<K, V> withPluginConfig(PluginConfig pluginConfig) {
+      checkArgument(pluginConfig != null, "Plugin config can not be null");
+      return toBuilder().setPluginConfig(pluginConfig).build();
+    }
+
+    public Read<K, V> withKeyClass(Class<K> keyClass) {
+      checkArgument(keyClass != null, "Key class can not be null");
+      return toBuilder().setKeyClass(keyClass).build();
+    }
+
+    public Read<K, V> withValueClass(Class<V> valueClass) {
+      checkArgument(valueClass != null, "Value class can not be null");
+      return toBuilder().setValueClass(valueClass).build();
+    }
+
+    @Override
+    public PCollection<KV<K, V>> expand(PBegin input) {
+      validateTransform();
+
+      getCdapPlugin()
+          .withConfig(getPluginConfig())
+          .withHadoopConfiguration(getKeyClass(), getValueClass())
+          .prepareRun();
+
+      if (getCdapPlugin().isUnbounded()) {
+        // TODO: implement SparkReceiverIO.<~>read()
+        return null;
+      } else {
+        Configuration hConf = getCdapPlugin().getHadoopConfiguration();
+        HadoopFormatIO.Read<K, V> readFromHadoop =
+            HadoopFormatIO.<K, V>read().withConfiguration(hConf);
+        return input.apply(readFromHadoop);
+      }
+    }
+
+    public void validateTransform() {
+      checkArgument(getCdapPlugin() != null, "withCdapPluginClass() is required");
+      checkArgument(getPluginConfig() != null, "withPluginConfig() is required");
+      checkArgument(getKeyClass() != null, "withKeyClass() is required");
+      checkArgument(getValueClass() != null, "withValueClass() is required");
+    }
+  }
+
+  /** A {@link PTransform} to read from CDAP source. */
+  @AutoValue
+  @AutoValue.CopyAnnotations
+  public abstract static class Write<K, V> extends PTransform<PCollection<KV<K, V>>, PDone> {
+
+    abstract @Nullable PluginConfig getPluginConfig();
+
+    abstract @Nullable Plugin getCdapPlugin();
+
+    abstract @Nullable Class<K> getKeyClass();
+
+    abstract @Nullable Class<V> getValueClass();
+
+    abstract @Nullable String getLocksDirPath();
+
+    abstract Builder<K, V> toBuilder();
+
+    @Experimental(Experimental.Kind.PORTABILITY)
+    @AutoValue.Builder
+    abstract static class Builder<K, V> {
+
+      abstract Builder<K, V> setPluginConfig(PluginConfig config);
+
+      abstract Builder<K, V> setCdapPlugin(Plugin plugin);
+
+      abstract Builder<K, V> setKeyClass(Class<K> keyClass);
+
+      abstract Builder<K, V> setValueClass(Class<V> valueClass);
+
+      abstract Builder<K, V> setLocksDirPath(String path);
+
+      abstract Write<K, V> build();
+    }
+
+    public Write<K, V> withCdapPlugin(Plugin plugin) {
+      checkArgument(plugin != null, "Cdap plugin can not be null");

Review Comment:
   same as above



##########
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/Plugin.java:
##########
@@ -84,16 +88,34 @@ public PluginConfig getPluginConfig() {
    * validating connection to the CDAP sink/source and performing initial tuning.
    */
   public void prepareRun() {
+    PluginConfig pluginConfig = getPluginConfig();
+    if (pluginConfig == null) {

Review Comment:
   we should use preconditions.checkNotNull here as well



##########
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/MappingUtils.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import io.cdap.plugin.common.SourceInputFormatProvider;
+import io.cdap.plugin.hubspot.sink.batch.HubspotBatchSink;
+import io.cdap.plugin.hubspot.sink.batch.HubspotOutputFormat;
+import io.cdap.plugin.hubspot.source.batch.HubspotBatchSource;
+import io.cdap.plugin.hubspot.source.batch.HubspotInputFormat;
+import io.cdap.plugin.hubspot.source.batch.HubspotInputFormatProvider;
+import io.cdap.plugin.salesforce.plugin.source.batch.SalesforceBatchSource;
+import io.cdap.plugin.salesforce.plugin.source.batch.SalesforceInputFormat;
+import io.cdap.plugin.salesforce.plugin.source.batch.SalesforceInputFormatProvider;
+import io.cdap.plugin.servicenow.source.ServiceNowInputFormat;
+import io.cdap.plugin.servicenow.source.ServiceNowSource;
+import io.cdap.plugin.zendesk.source.batch.ZendeskBatchSource;
+import io.cdap.plugin.zendesk.source.batch.ZendeskInputFormat;
+import io.cdap.plugin.zendesk.source.batch.ZendeskInputFormatProvider;
+import javax.annotation.Nullable;
+
+public class MappingUtils {
+
+  public static @Nullable Plugin getPluginByClass(Class<?> pluginClass) {
+    if (pluginClass.equals(SalesforceBatchSource.class)) {
+      return Plugin.create(
+          pluginClass, SalesforceInputFormat.class, SalesforceInputFormatProvider.class);
+    } else if (pluginClass.equals(HubspotBatchSource.class)) {
+      return Plugin.create(pluginClass, HubspotInputFormat.class, HubspotInputFormatProvider.class);
+    } else if (pluginClass.equals(ZendeskBatchSource.class)) {
+      return Plugin.create(pluginClass, ZendeskInputFormat.class, ZendeskInputFormatProvider.class);
+    } else if (pluginClass.equals(HubspotBatchSink.class)) {
+      return Plugin.create(pluginClass, HubspotOutputFormat.class, SourceInputFormatProvider.class);
+    } else if (pluginClass.equals(ServiceNowSource.class)) {
+      return Plugin.create(
+          pluginClass, ServiceNowInputFormat.class, SourceInputFormatProvider.class);
+    }
+    return null;

Review Comment:
   this should throw an exception, not return null



##########
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/MappingUtils.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import io.cdap.plugin.common.SourceInputFormatProvider;
+import io.cdap.plugin.hubspot.sink.batch.HubspotBatchSink;
+import io.cdap.plugin.hubspot.sink.batch.HubspotOutputFormat;
+import io.cdap.plugin.hubspot.source.batch.HubspotBatchSource;
+import io.cdap.plugin.hubspot.source.batch.HubspotInputFormat;
+import io.cdap.plugin.hubspot.source.batch.HubspotInputFormatProvider;
+import io.cdap.plugin.salesforce.plugin.source.batch.SalesforceBatchSource;
+import io.cdap.plugin.salesforce.plugin.source.batch.SalesforceInputFormat;
+import io.cdap.plugin.salesforce.plugin.source.batch.SalesforceInputFormatProvider;
+import io.cdap.plugin.servicenow.source.ServiceNowInputFormat;
+import io.cdap.plugin.servicenow.source.ServiceNowSource;
+import io.cdap.plugin.zendesk.source.batch.ZendeskBatchSource;
+import io.cdap.plugin.zendesk.source.batch.ZendeskInputFormat;
+import io.cdap.plugin.zendesk.source.batch.ZendeskInputFormatProvider;
+import javax.annotation.Nullable;
+
+public class MappingUtils {
+
+  public static @Nullable Plugin getPluginByClass(Class<?> pluginClass) {
+    if (pluginClass.equals(SalesforceBatchSource.class)) {

Review Comment:
   Should this be a static method, or is there a way we can enable a method to register new plugins?



##########
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/Plugin.java:
##########
@@ -84,16 +88,34 @@ public PluginConfig getPluginConfig() {
    * validating connection to the CDAP sink/source and performing initial tuning.
    */
   public void prepareRun() {
+    PluginConfig pluginConfig = getPluginConfig();
+    if (pluginConfig == null) {
+      throw new IllegalArgumentException("PluginConfig should be not null!");
+    }
     if (cdapPluginObj == null) {
-      InstantiatorFactory instantiatorFactory = new InstantiatorFactory(false);
-      cdapPluginObj =
-          (SubmitterLifecycle) instantiatorFactory.get(TypeToken.of(getPluginClass())).create();
+      for (Constructor<?> constructor : getPluginClass().getDeclaredConstructors()) {

Review Comment:
   this is odd to me. It seems like we shouldn't be grabbing all the constructors, but just the one that can be constructed with the plugin config.
   
   If we have multiple constructors here, one will certainly cause the exception block to be hit, even if the other constructor is valid



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on PR #21765:
URL: https://github.com/apache/beam/pull/21765#issuecomment-1174733527

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on a diff in pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on code in PR #21765:
URL: https://github.com/apache/beam/pull/21765#discussion_r899374514


##########
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/BatchSinkContextImpl.java:
##########
@@ -24,7 +24,10 @@
 public class BatchSinkContextImpl extends BatchContextImpl implements BatchSinkContext {
 
   @Override
-  public void addOutput(Output output) {}
+  public void addOutput(Output output) {

Review Comment:
   Hmm. its probably worth adding a comment here to help mitigate future confusion



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on PR #21765:
URL: https://github.com/apache/beam/pull/21765#issuecomment-1158674936

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on PR #21765:
URL: https://github.com/apache/beam/pull/21765#issuecomment-1158675190

   retest this please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on PR #21765:
URL: https://github.com/apache/beam/pull/21765#issuecomment-1165407871

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on PR #21765:
URL: https://github.com/apache/beam/pull/21765#issuecomment-1165447654

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Lizzfox commented on pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
Lizzfox commented on PR #21765:
URL: https://github.com/apache/beam/pull/21765#issuecomment-1165990899

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on PR #21765:
URL: https://github.com/apache/beam/pull/21765#issuecomment-1165787879

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Lizzfox commented on pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
Lizzfox commented on PR #21765:
URL: https://github.com/apache/beam/pull/21765#issuecomment-1166057683

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #21765:
URL: https://github.com/apache/beam/pull/21765#issuecomment-1170103592

   This LGTM, but I am not a committer. @chamikaramj @pabloem for a review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on PR #21765:
URL: https://github.com/apache/beam/pull/21765#issuecomment-1184032545

   Run Spotless PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on a diff in pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on code in PR #21765:
URL: https://github.com/apache/beam/pull/21765#discussion_r897867972


##########
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/BatchSinkContextImpl.java:
##########
@@ -24,7 +24,10 @@
 public class BatchSinkContextImpl extends BatchContextImpl implements BatchSinkContext {
 
   @Override
-  public void addOutput(Output output) {}
+  public void addOutput(Output output) {

Review Comment:
   This class implements the CDAP (io.cdap.cdap.etl.api.batch) interface, so we cannot change the method name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on a diff in pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on code in PR #21765:
URL: https://github.com/apache/beam/pull/21765#discussion_r897895615


##########
buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy:
##########
@@ -630,6 +631,7 @@ class BeamModulePlugin implements Plugin<Project> {
         guava_testlib                               : "com.google.guava:guava-testlib:$guava_version",
         hadoop_client                               : "org.apache.hadoop:hadoop-client:$hadoop_version",
         hadoop_common                               : "org.apache.hadoop:hadoop-common:$hadoop_version",
+        hadoop_mapred                               : "org.apache.hadoop:hadoop-mapred:0.22.0",

Review Comment:
   Thanks for noticing, I removed that dependency.
   We can use `hadoop_mapreduce_client_core` instead.



##########
sdks/java/io/cdap/build.gradle:
##########
@@ -38,26 +38,35 @@ interface for integration with CDAP plugins."""
  */
 
 dependencies {
-    implementation library.java.avro
     implementation library.java.cdap_api
     implementation library.java.cdap_api_commons
     implementation (library.java.cdap_common) {
-        exclude module: "log4j-over-slf4j"
+        exclude module: 'log4j-over-slf4j'
     }
     implementation library.java.cdap_etl_api
     implementation library.java.cdap_etl_api_spark
+    implementation library.java.cdap_hydrator_common
+    implementation library.java.guava
     implementation library.java.hadoop_common
-    implementation library.java.hadoop_mapreduce_client_core
+    implementation library.java.hadoop_mapred
     implementation library.java.jackson_core
     implementation library.java.jackson_databind
-    implementation library.java.guava
     implementation library.java.slf4j_api
     implementation library.java.tephra
+    implementation library.java.vendored_guava_26_0_jre
     implementation project(path: ":sdks:java:core", configuration: "shadow")
+    implementation project(":sdks:java:io:hadoop-format")
+    implementation "io.cdap.plugin:salesforce-plugins:1.4.0"

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on PR #21765:
URL: https://github.com/apache/beam/pull/21765#issuecomment-1173381811

   Run Python PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on PR #21765:
URL: https://github.com/apache/beam/pull/21765#issuecomment-1165460755

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Lizzfox commented on pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
Lizzfox commented on PR #21765:
URL: https://github.com/apache/beam/pull/21765#issuecomment-1165733004

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Lizzfox commented on pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
Lizzfox commented on PR #21765:
URL: https://github.com/apache/beam/pull/21765#issuecomment-1165822405

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem closed pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
pabloem closed pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included
URL: https://github.com/apache/beam/pull/21765


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on PR #21765:
URL: https://github.com/apache/beam/pull/21765#issuecomment-1174636514

   Run GoPortable PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on a diff in pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on code in PR #21765:
URL: https://github.com/apache/beam/pull/21765#discussion_r897896281


##########
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/CdapIO.java:
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import io.cdap.cdap.api.plugin.PluginConfig;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.io.hadoop.format.HDFSSynchronization;
+import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.hadoop.conf.Configuration;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * An unbounded/bounded sources and sinks from <a
+ * href="https://github.com/data-integrations">CDAP</a> plugins.
+ */
+@SuppressWarnings({
+  "argument.type.incompatible",
+  "return.type.incompatible",
+  "dereference.of.nullable",
+  "UnnecessaryParentheses"
+})
+public class CdapIO {
+
+  public static <K, V> Read<K, V> read() {
+    return new AutoValue_CdapIO_Read.Builder<K, V>().build();
+  }
+
+  public static <K, V> Write<K, V> write() {
+    return new AutoValue_CdapIO_Write.Builder<K, V>().build();
+  }
+
+  /** A {@link PTransform} to read from CDAP source. */
+  @AutoValue
+  @AutoValue.CopyAnnotations
+  public abstract static class Read<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> {
+
+    abstract @Nullable PluginConfig getPluginConfig();
+
+    abstract @Nullable Plugin getCdapPlugin();
+
+    abstract @Nullable Class<K> getKeyClass();
+
+    abstract @Nullable Class<V> getValueClass();
+
+    abstract Builder<K, V> toBuilder();
+
+    @Experimental(Experimental.Kind.PORTABILITY)
+    @AutoValue.Builder
+    abstract static class Builder<K, V> {
+
+      abstract Builder<K, V> setPluginConfig(PluginConfig config);
+
+      abstract Builder<K, V> setCdapPlugin(Plugin plugin);
+
+      abstract Builder<K, V> setKeyClass(Class<K> keyClass);
+
+      abstract Builder<K, V> setValueClass(Class<V> valueClass);
+
+      abstract Read<K, V> build();
+    }
+
+    public Read<K, V> withCdapPlugin(Plugin plugin) {
+      checkArgument(plugin != null, "Cdap plugin can not be null");
+      return toBuilder().setCdapPlugin(plugin).build();
+    }
+
+    public Read<K, V> withCdapPluginClass(Class<?> cdapPluginClass) {
+      checkArgument(cdapPluginClass != null, "Cdap plugin class can not be null");
+      Plugin plugin = MappingUtils.getPluginByClass(cdapPluginClass);
+      checkArgument(plugin != null, "Can not instantiate Plugin object from provided plugin class");
+      return toBuilder().setCdapPlugin(plugin).build();
+    }
+
+    public Read<K, V> withPluginConfig(PluginConfig pluginConfig) {
+      checkArgument(pluginConfig != null, "Plugin config can not be null");
+      return toBuilder().setPluginConfig(pluginConfig).build();
+    }
+
+    public Read<K, V> withKeyClass(Class<K> keyClass) {
+      checkArgument(keyClass != null, "Key class can not be null");
+      return toBuilder().setKeyClass(keyClass).build();
+    }
+
+    public Read<K, V> withValueClass(Class<V> valueClass) {
+      checkArgument(valueClass != null, "Value class can not be null");
+      return toBuilder().setValueClass(valueClass).build();
+    }
+
+    @Override
+    public PCollection<KV<K, V>> expand(PBegin input) {
+      validateTransform();
+
+      getCdapPlugin()
+          .withConfig(getPluginConfig())
+          .withHadoopConfiguration(getKeyClass(), getValueClass())
+          .prepareRun();
+
+      if (getCdapPlugin().isUnbounded()) {
+        // TODO: implement SparkReceiverIO.<~>read()
+        return null;
+      } else {
+        Configuration hConf = getCdapPlugin().getHadoopConfiguration();
+        HadoopFormatIO.Read<K, V> readFromHadoop =
+            HadoopFormatIO.<K, V>read().withConfiguration(hConf);
+        return input.apply(readFromHadoop);
+      }
+    }
+
+    public void validateTransform() {
+      checkArgument(getCdapPlugin() != null, "withCdapPluginClass() is required");
+      checkArgument(getPluginConfig() != null, "withPluginConfig() is required");
+      checkArgument(getKeyClass() != null, "withKeyClass() is required");
+      checkArgument(getValueClass() != null, "withValueClass() is required");
+    }
+  }
+
+  /** A {@link PTransform} to read from CDAP source. */
+  @AutoValue
+  @AutoValue.CopyAnnotations
+  public abstract static class Write<K, V> extends PTransform<PCollection<KV<K, V>>, PDone> {
+
+    abstract @Nullable PluginConfig getPluginConfig();
+
+    abstract @Nullable Plugin getCdapPlugin();
+
+    abstract @Nullable Class<K> getKeyClass();
+
+    abstract @Nullable Class<V> getValueClass();
+
+    abstract @Nullable String getLocksDirPath();
+
+    abstract Builder<K, V> toBuilder();
+
+    @Experimental(Experimental.Kind.PORTABILITY)
+    @AutoValue.Builder
+    abstract static class Builder<K, V> {
+
+      abstract Builder<K, V> setPluginConfig(PluginConfig config);
+
+      abstract Builder<K, V> setCdapPlugin(Plugin plugin);
+
+      abstract Builder<K, V> setKeyClass(Class<K> keyClass);
+
+      abstract Builder<K, V> setValueClass(Class<V> valueClass);
+
+      abstract Builder<K, V> setLocksDirPath(String path);
+
+      abstract Write<K, V> build();
+    }
+
+    public Write<K, V> withCdapPlugin(Plugin plugin) {
+      checkArgument(plugin != null, "Cdap plugin can not be null");
+      return toBuilder().setCdapPlugin(plugin).build();
+    }
+
+    public Write<K, V> withCdapPluginClass(Class<?> cdapPluginClass) {
+      checkArgument(cdapPluginClass != null, "Cdap plugin class can not be null");
+      Plugin plugin = MappingUtils.getPluginByClass(cdapPluginClass);
+      checkArgument(plugin != null, "Can not instantiate Plugin object from provided plugin class");
+      return toBuilder().setCdapPlugin(plugin).build();
+    }
+
+    public Write<K, V> withPluginConfig(PluginConfig pluginConfig) {
+      checkArgument(pluginConfig != null, "Plugin config can not be null");
+      return toBuilder().setPluginConfig(pluginConfig).build();
+    }
+
+    public Write<K, V> withKeyClass(Class<K> keyClass) {
+      checkArgument(keyClass != null, "Key class can not be null");
+      return toBuilder().setKeyClass(keyClass).build();
+    }
+
+    public Write<K, V> withLocksDirPath(String locksDirPath) {
+      checkArgument(locksDirPath != null, "Locks dir path can not be null");
+      return toBuilder().setLocksDirPath(locksDirPath).build();
+    }
+
+    public Write<K, V> withValueClass(Class<V> valueClass) {
+      checkArgument(valueClass != null, "Value class can not be null");
+      return toBuilder().setValueClass(valueClass).build();
+    }
+
+    @Override
+    public PDone expand(PCollection<KV<K, V>> input) {
+      getCdapPlugin()
+          .withConfig(getPluginConfig())
+          .withHadoopConfiguration(getKeyClass(), getValueClass())
+          .prepareRun();
+
+      if (getCdapPlugin().isUnbounded()) {
+        // TODO: implement SparkReceiverIO.<~>write()

Review Comment:
   Done



##########
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/MappingUtils.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import io.cdap.plugin.common.SourceInputFormatProvider;
+import io.cdap.plugin.hubspot.sink.batch.HubspotBatchSink;
+import io.cdap.plugin.hubspot.sink.batch.HubspotOutputFormat;
+import io.cdap.plugin.hubspot.source.batch.HubspotBatchSource;
+import io.cdap.plugin.hubspot.source.batch.HubspotInputFormat;
+import io.cdap.plugin.hubspot.source.batch.HubspotInputFormatProvider;
+import io.cdap.plugin.salesforce.plugin.source.batch.SalesforceBatchSource;
+import io.cdap.plugin.salesforce.plugin.source.batch.SalesforceInputFormat;
+import io.cdap.plugin.salesforce.plugin.source.batch.SalesforceInputFormatProvider;
+import io.cdap.plugin.servicenow.source.ServiceNowInputFormat;
+import io.cdap.plugin.servicenow.source.ServiceNowSource;
+import io.cdap.plugin.zendesk.source.batch.ZendeskBatchSource;
+import io.cdap.plugin.zendesk.source.batch.ZendeskInputFormat;
+import io.cdap.plugin.zendesk.source.batch.ZendeskInputFormatProvider;
+import javax.annotation.Nullable;
+
+public class MappingUtils {
+
+  public static @Nullable Plugin getPluginByClass(Class<?> pluginClass) {
+    if (pluginClass.equals(SalesforceBatchSource.class)) {
+      return Plugin.create(
+          pluginClass, SalesforceInputFormat.class, SalesforceInputFormatProvider.class);
+    } else if (pluginClass.equals(HubspotBatchSource.class)) {
+      return Plugin.create(pluginClass, HubspotInputFormat.class, HubspotInputFormatProvider.class);
+    } else if (pluginClass.equals(ZendeskBatchSource.class)) {
+      return Plugin.create(pluginClass, ZendeskInputFormat.class, ZendeskInputFormatProvider.class);
+    } else if (pluginClass.equals(HubspotBatchSink.class)) {
+      return Plugin.create(pluginClass, HubspotOutputFormat.class, SourceInputFormatProvider.class);
+    } else if (pluginClass.equals(ServiceNowSource.class)) {
+      return Plugin.create(
+          pluginClass, ServiceNowInputFormat.class, SourceInputFormatProvider.class);
+    }
+    return null;

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codecov[bot] commented on pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on PR #21765:
URL: https://github.com/apache/beam/pull/21765#issuecomment-1151082172

   # [Codecov](https://codecov.io/gh/apache/beam/pull/21765?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#21765](https://codecov.io/gh/apache/beam/pull/21765?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (f31338b) into [master](https://codecov.io/gh/apache/beam/commit/4ffeae4d2b800f2df36d2ea2eab549f2204d5691?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (4ffeae4) will **decrease** coverage by `0.00%`.
   > The diff coverage is `n/a`.
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #21765      +/-   ##
   ==========================================
   - Coverage   74.03%   74.02%   -0.01%     
   ==========================================
     Files         698      698              
     Lines       92192    92192              
   ==========================================
   - Hits        68255    68247       -8     
   - Misses      22686    22694       +8     
     Partials     1251     1251              
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | python | `83.60% <ø> (-0.02%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/21765?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [.../apache\_beam/runners/interactive/dataproc/types.py](https://codecov.io/gh/apache/beam/pull/21765/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9kYXRhcHJvYy90eXBlcy5weQ==) | `93.10% <0.00%> (-3.45%)` | :arrow_down: |
   | [...eam/runners/portability/fn\_api\_runner/execution.py](https://codecov.io/gh/apache/beam/pull/21765/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL2V4ZWN1dGlvbi5weQ==) | `92.44% <0.00%> (-0.65%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/worker/bundle\_processor.py](https://codecov.io/gh/apache/beam/pull/21765/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvYnVuZGxlX3Byb2Nlc3Nvci5weQ==) | `93.30% <0.00%> (-0.38%)` | :arrow_down: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/21765?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/21765?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [4ffeae4...f31338b](https://codecov.io/gh/apache/beam/pull/21765?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on PR #21765:
URL: https://github.com/apache/beam/pull/21765#issuecomment-1165342973

   Run Java_Kafka_IO_Direct PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on PR #21765:
URL: https://github.com/apache/beam/pull/21765#issuecomment-1165364837

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on a diff in pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on code in PR #21765:
URL: https://github.com/apache/beam/pull/21765#discussion_r897897046


##########
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/Plugin.java:
##########
@@ -84,16 +88,34 @@ public PluginConfig getPluginConfig() {
    * validating connection to the CDAP sink/source and performing initial tuning.
    */
   public void prepareRun() {
+    PluginConfig pluginConfig = getPluginConfig();
+    if (pluginConfig == null) {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on PR #21765:
URL: https://github.com/apache/beam/pull/21765#issuecomment-1158786767

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Lizzfox commented on pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
Lizzfox commented on PR #21765:
URL: https://github.com/apache/beam/pull/21765#issuecomment-1166261679

   @johnjcasey PR is ready to be merged 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] chamikaramj commented on a diff in pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on code in PR #21765:
URL: https://github.com/apache/beam/pull/21765#discussion_r920642038


##########
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/CdapIO.java:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import io.cdap.cdap.api.plugin.PluginConfig;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.io.hadoop.format.HDFSSynchronization;
+import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * An unbounded/bounded sources and sinks from <a
+ * href="https://github.com/data-integrations">CDAP</a> plugins.
+ */
+@SuppressWarnings("nullness")

Review Comment:
   Please add an experiment annotation here so that this API can be changed in the future if needed. 
   Example: https://github.com/apache/beam/blob/f2e41be4880ef47d86d975c38f353896f70365bd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java#L369
   
   We can remove the annotation when are are confident about the stability of the API.



##########
sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/EmployeeBatchSource.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software

Review Comment:
   In the future, consider moving Employee* classes to a Beam example (not a blocker for this PR).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] chamikaramj merged pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
chamikaramj merged PR #21765:
URL: https://github.com/apache/beam/pull/21765


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on PR #21765:
URL: https://github.com/apache/beam/pull/21765#issuecomment-1158805446

   Run Python PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on PR #21765:
URL: https://github.com/apache/beam/pull/21765#issuecomment-1173481328

   Run GoPortable PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] asf-ci commented on pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
asf-ci commented on PR #21765:
URL: https://github.com/apache/beam/pull/21765#issuecomment-1151058093

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on PR #21765:
URL: https://github.com/apache/beam/pull/21765#issuecomment-1157433669

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on a diff in pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on code in PR #21765:
URL: https://github.com/apache/beam/pull/21765#discussion_r920792366


##########
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/CdapIO.java:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import io.cdap.cdap.api.plugin.PluginConfig;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.io.hadoop.format.HDFSSynchronization;
+import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * An unbounded/bounded sources and sinks from <a
+ * href="https://github.com/data-integrations">CDAP</a> plugins.
+ */
+@SuppressWarnings("nullness")

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on PR #21765:
URL: https://github.com/apache/beam/pull/21765#issuecomment-1165255004

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on a diff in pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on code in PR #21765:
URL: https://github.com/apache/beam/pull/21765#discussion_r904027669


##########
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/MappingUtils.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import io.cdap.plugin.common.SourceInputFormatProvider;
+import io.cdap.plugin.hubspot.sink.batch.HubspotBatchSink;
+import io.cdap.plugin.hubspot.sink.batch.HubspotOutputFormat;
+import io.cdap.plugin.hubspot.source.batch.HubspotBatchSource;
+import io.cdap.plugin.hubspot.source.batch.HubspotInputFormat;
+import io.cdap.plugin.hubspot.source.batch.HubspotInputFormatProvider;
+import io.cdap.plugin.salesforce.plugin.source.batch.SalesforceBatchSource;
+import io.cdap.plugin.salesforce.plugin.source.batch.SalesforceInputFormat;
+import io.cdap.plugin.salesforce.plugin.source.batch.SalesforceInputFormatProvider;
+import io.cdap.plugin.servicenow.source.ServiceNowInputFormat;
+import io.cdap.plugin.servicenow.source.ServiceNowSource;
+import io.cdap.plugin.zendesk.source.batch.ZendeskBatchSource;
+import io.cdap.plugin.zendesk.source.batch.ZendeskInputFormat;
+import io.cdap.plugin.zendesk.source.batch.ZendeskInputFormatProvider;
+import javax.annotation.Nullable;
+
+public class MappingUtils {
+
+  public static @Nullable Plugin getPluginByClass(Class<?> pluginClass) {
+    if (pluginClass.equals(SalesforceBatchSource.class)) {

Review Comment:
   Sounds good



##########
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/CdapIO.java:
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import io.cdap.cdap.api.plugin.PluginConfig;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.io.hadoop.format.HDFSSynchronization;
+import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.hadoop.conf.Configuration;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * An unbounded/bounded sources and sinks from <a
+ * href="https://github.com/data-integrations">CDAP</a> plugins.
+ */
+@SuppressWarnings({
+  "argument.type.incompatible",
+  "return.type.incompatible",
+  "dereference.of.nullable",
+  "UnnecessaryParentheses"
+})
+public class CdapIO {
+
+  public static <K, V> Read<K, V> read() {
+    return new AutoValue_CdapIO_Read.Builder<K, V>().build();
+  }
+
+  public static <K, V> Write<K, V> write() {
+    return new AutoValue_CdapIO_Write.Builder<K, V>().build();
+  }
+
+  /** A {@link PTransform} to read from CDAP source. */
+  @AutoValue
+  @AutoValue.CopyAnnotations
+  public abstract static class Read<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> {
+
+    abstract @Nullable PluginConfig getPluginConfig();
+
+    abstract @Nullable Plugin getCdapPlugin();
+
+    abstract @Nullable Class<K> getKeyClass();
+
+    abstract @Nullable Class<V> getValueClass();
+
+    abstract Builder<K, V> toBuilder();
+
+    @Experimental(Experimental.Kind.PORTABILITY)
+    @AutoValue.Builder
+    abstract static class Builder<K, V> {
+
+      abstract Builder<K, V> setPluginConfig(PluginConfig config);
+
+      abstract Builder<K, V> setCdapPlugin(Plugin plugin);
+
+      abstract Builder<K, V> setKeyClass(Class<K> keyClass);
+
+      abstract Builder<K, V> setValueClass(Class<V> valueClass);
+
+      abstract Read<K, V> build();
+    }
+
+    public Read<K, V> withCdapPlugin(Plugin plugin) {
+      checkArgument(plugin != null, "Cdap plugin can not be null");

Review Comment:
   This looks fine to me



##########
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/MappingUtils.java:
##########
@@ -49,6 +52,6 @@ public class MappingUtils {
       return Plugin.create(
           pluginClass, ServiceNowInputFormat.class, SourceInputFormatProvider.class);
     }
-    return null;
+    throw new UnsupportedOperationException("Given plugin class is not supported!");

Review Comment:
   we should have it include what class it got in the exception message, to make debugging easier



##########
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/BatchSinkContextImpl.java:
##########
@@ -24,7 +24,10 @@
 public class BatchSinkContextImpl extends BatchContextImpl implements BatchSinkContext {
 
   @Override
-  public void addOutput(Output output) {}
+  public void addOutput(Output output) {

Review Comment:
   Please add the comment here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Lizzfox commented on pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
Lizzfox commented on PR #21765:
URL: https://github.com/apache/beam/pull/21765#issuecomment-1165942028

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on a diff in pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on code in PR #21765:
URL: https://github.com/apache/beam/pull/21765#discussion_r897895901


##########
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/CdapIO.java:
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import io.cdap.cdap.api.plugin.PluginConfig;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.io.hadoop.format.HDFSSynchronization;
+import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.hadoop.conf.Configuration;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * An unbounded/bounded sources and sinks from <a
+ * href="https://github.com/data-integrations">CDAP</a> plugins.
+ */
+@SuppressWarnings({
+  "argument.type.incompatible",
+  "return.type.incompatible",
+  "dereference.of.nullable",
+  "UnnecessaryParentheses"
+})
+public class CdapIO {
+
+  public static <K, V> Read<K, V> read() {
+    return new AutoValue_CdapIO_Read.Builder<K, V>().build();
+  }
+
+  public static <K, V> Write<K, V> write() {
+    return new AutoValue_CdapIO_Write.Builder<K, V>().build();
+  }
+
+  /** A {@link PTransform} to read from CDAP source. */
+  @AutoValue
+  @AutoValue.CopyAnnotations
+  public abstract static class Read<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> {
+
+    abstract @Nullable PluginConfig getPluginConfig();
+
+    abstract @Nullable Plugin getCdapPlugin();
+
+    abstract @Nullable Class<K> getKeyClass();
+
+    abstract @Nullable Class<V> getValueClass();
+
+    abstract Builder<K, V> toBuilder();
+
+    @Experimental(Experimental.Kind.PORTABILITY)
+    @AutoValue.Builder
+    abstract static class Builder<K, V> {
+
+      abstract Builder<K, V> setPluginConfig(PluginConfig config);
+
+      abstract Builder<K, V> setCdapPlugin(Plugin plugin);
+
+      abstract Builder<K, V> setKeyClass(Class<K> keyClass);
+
+      abstract Builder<K, V> setValueClass(Class<V> valueClass);
+
+      abstract Read<K, V> build();
+    }
+
+    public Read<K, V> withCdapPlugin(Plugin plugin) {
+      checkArgument(plugin != null, "Cdap plugin can not be null");
+      return toBuilder().setCdapPlugin(plugin).build();
+    }
+
+    public Read<K, V> withCdapPluginClass(Class<?> cdapPluginClass) {
+      checkArgument(cdapPluginClass != null, "Cdap plugin class can not be null");
+      Plugin plugin = MappingUtils.getPluginByClass(cdapPluginClass);
+      checkArgument(plugin != null, "Can not instantiate Plugin object from provided plugin class");
+      return toBuilder().setCdapPlugin(plugin).build();
+    }
+
+    public Read<K, V> withPluginConfig(PluginConfig pluginConfig) {
+      checkArgument(pluginConfig != null, "Plugin config can not be null");
+      return toBuilder().setPluginConfig(pluginConfig).build();
+    }
+
+    public Read<K, V> withKeyClass(Class<K> keyClass) {
+      checkArgument(keyClass != null, "Key class can not be null");
+      return toBuilder().setKeyClass(keyClass).build();
+    }
+
+    public Read<K, V> withValueClass(Class<V> valueClass) {
+      checkArgument(valueClass != null, "Value class can not be null");
+      return toBuilder().setValueClass(valueClass).build();
+    }
+
+    @Override
+    public PCollection<KV<K, V>> expand(PBegin input) {
+      validateTransform();
+
+      getCdapPlugin()
+          .withConfig(getPluginConfig())
+          .withHadoopConfiguration(getKeyClass(), getValueClass())
+          .prepareRun();
+
+      if (getCdapPlugin().isUnbounded()) {
+        // TODO: implement SparkReceiverIO.<~>read()

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on a diff in pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on code in PR #21765:
URL: https://github.com/apache/beam/pull/21765#discussion_r897894449


##########
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/Plugin.java:
##########
@@ -84,16 +88,34 @@ public PluginConfig getPluginConfig() {
    * validating connection to the CDAP sink/source and performing initial tuning.
    */
   public void prepareRun() {
+    PluginConfig pluginConfig = getPluginConfig();
+    if (pluginConfig == null) {
+      throw new IllegalArgumentException("PluginConfig should be not null!");
+    }
     if (cdapPluginObj == null) {
-      InstantiatorFactory instantiatorFactory = new InstantiatorFactory(false);
-      cdapPluginObj =
-          (SubmitterLifecycle) instantiatorFactory.get(TypeToken.of(getPluginClass())).create();
+      for (Constructor<?> constructor : getPluginClass().getDeclaredConstructors()) {

Review Comment:
   You're right, there is no need to grab all the constructors here.
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on a diff in pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on code in PR #21765:
URL: https://github.com/apache/beam/pull/21765#discussion_r899375075


##########
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/CdapIO.java:
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import io.cdap.cdap.api.plugin.PluginConfig;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.io.hadoop.format.HDFSSynchronization;
+import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.hadoop.conf.Configuration;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * An unbounded/bounded sources and sinks from <a
+ * href="https://github.com/data-integrations">CDAP</a> plugins.
+ */
+@SuppressWarnings({
+  "argument.type.incompatible",
+  "return.type.incompatible",
+  "dereference.of.nullable",
+  "UnnecessaryParentheses"
+})
+public class CdapIO {
+
+  public static <K, V> Read<K, V> read() {
+    return new AutoValue_CdapIO_Read.Builder<K, V>().build();
+  }
+
+  public static <K, V> Write<K, V> write() {
+    return new AutoValue_CdapIO_Write.Builder<K, V>().build();
+  }
+
+  /** A {@link PTransform} to read from CDAP source. */
+  @AutoValue
+  @AutoValue.CopyAnnotations
+  public abstract static class Read<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> {
+
+    abstract @Nullable PluginConfig getPluginConfig();
+
+    abstract @Nullable Plugin getCdapPlugin();
+
+    abstract @Nullable Class<K> getKeyClass();
+
+    abstract @Nullable Class<V> getValueClass();
+
+    abstract Builder<K, V> toBuilder();
+
+    @Experimental(Experimental.Kind.PORTABILITY)
+    @AutoValue.Builder
+    abstract static class Builder<K, V> {
+
+      abstract Builder<K, V> setPluginConfig(PluginConfig config);
+
+      abstract Builder<K, V> setCdapPlugin(Plugin plugin);
+
+      abstract Builder<K, V> setKeyClass(Class<K> keyClass);
+
+      abstract Builder<K, V> setValueClass(Class<V> valueClass);
+
+      abstract Read<K, V> build();
+    }
+
+    public Read<K, V> withCdapPlugin(Plugin plugin) {
+      checkArgument(plugin != null, "Cdap plugin can not be null");

Review Comment:
   I'll bring this up with the broader team



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem commented on pull request #21765: [BEAM-14073] [CdapIO] CDAP IO for batch plugins: Read, Write. Unit tests included

Posted by GitBox <gi...@apache.org>.
pabloem commented on PR #21765:
URL: https://github.com/apache/beam/pull/21765#issuecomment-1170134566

   I'd prefer @chamikaramj to review this as he should be more familiar. Cham PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org