You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2022/07/27 00:49:44 UTC

[beam] branch master updated: [CdapIO] Fixed necessary warnings (#22399)

This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 59b2edb7ff1 [CdapIO] Fixed necessary warnings (#22399)
59b2edb7ff1 is described below

commit 59b2edb7ff166a3ed91d67fe60a6040bf875e7a4
Author: akashorabek <70...@users.noreply.github.com>
AuthorDate: Wed Jul 27 06:49:37 2022 +0600

    [CdapIO] Fixed necessary warnings (#22399)
    
    * Fixed warnings
    
    * Add read and write expanding tests
    
    * Format code
---
 .../java/org/apache/beam/sdk/io/cdap/CdapIO.java   | 53 +++++++++-------------
 .../java/org/apache/beam/sdk/io/cdap/Plugin.java   |  1 -
 .../io/cdap/PluginConfigInstantiationUtils.java    |  4 +-
 .../org/apache/beam/sdk/io/cdap/CdapIOTest.java    | 15 ++++--
 4 files changed, 34 insertions(+), 39 deletions(-)

diff --git a/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/CdapIO.java b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/CdapIO.java
index f749acf3096..f2655507cf5 100644
--- a/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/CdapIO.java
+++ b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/CdapIO.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.io.cdap;
 
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
 import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
 
 import com.google.auto.value.AutoValue;
@@ -39,7 +40,6 @@ import org.checkerframework.checker.nullness.qual.Nullable;
  * href="https://github.com/data-integrations">CDAP</a> plugins.
  */
 @Experimental(Kind.SOURCE_SINK)
-@SuppressWarnings("nullness")
 public class CdapIO {
 
   public static <K, V> Read<K, V> read() {
@@ -54,7 +54,6 @@ public class CdapIO {
   @AutoValue
   @AutoValue.CopyAnnotations
   public abstract static class Read<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> {
-
     abstract @Nullable PluginConfig getPluginConfig();
 
     abstract @Nullable Plugin getCdapPlugin();
@@ -108,30 +107,24 @@ public class CdapIO {
 
     @Override
     public PCollection<KV<K, V>> expand(PBegin input) {
-      validateTransform();
+      Plugin plugin = checkArgumentNotNull(getCdapPlugin(), "withCdapPluginClass() is required");
+      PluginConfig pluginConfig =
+          checkArgumentNotNull(getPluginConfig(), "withPluginConfig() is required");
+      Class<K> keyClass = checkArgumentNotNull(getKeyClass(), "withKeyClass() is required");
+      Class<V> valueClass = checkArgumentNotNull(getValueClass(), "withValueClass() is required");
 
-      getCdapPlugin()
-          .withConfig(getPluginConfig())
-          .withHadoopConfiguration(getKeyClass(), getValueClass())
-          .prepareRun();
+      plugin.withConfig(pluginConfig).withHadoopConfiguration(keyClass, valueClass).prepareRun();
 
-      if (getCdapPlugin().isUnbounded()) {
+      if (plugin.isUnbounded()) {
         // TODO: implement SparkReceiverIO.<~>read()
         throw new NotImplementedException("Support for unbounded plugins is not implemented!");
       } else {
-        Configuration hConf = getCdapPlugin().getHadoopConfiguration();
+        Configuration hConf = plugin.getHadoopConfiguration();
         HadoopFormatIO.Read<K, V> readFromHadoop =
             HadoopFormatIO.<K, V>read().withConfiguration(hConf);
         return input.apply(readFromHadoop);
       }
     }
-
-    public void validateTransform() {
-      checkArgument(getCdapPlugin() != null, "withCdapPluginClass() is required");
-      checkArgument(getPluginConfig() != null, "withPluginConfig() is required");
-      checkArgument(getKeyClass() != null, "withKeyClass() is required");
-      checkArgument(getValueClass() != null, "withValueClass() is required");
-    }
   }
 
   /** A {@link PTransform} to read from CDAP source. */
@@ -201,32 +194,28 @@ public class CdapIO {
 
     @Override
     public PDone expand(PCollection<KV<K, V>> input) {
-      validateTransform();
-      getCdapPlugin()
-          .withConfig(getPluginConfig())
-          .withHadoopConfiguration(getKeyClass(), getValueClass())
-          .prepareRun();
+      Plugin plugin = checkArgumentNotNull(getCdapPlugin(), "withKeyClass() is required");
+      PluginConfig pluginConfig =
+          checkArgumentNotNull(getPluginConfig(), "withKeyClass() is required");
+      Class<K> keyClass = checkArgumentNotNull(getKeyClass(), "withKeyClass() is required");
+      Class<V> valueClass = checkArgumentNotNull(getValueClass(), "withValueClass() is required");
+      String locksDirPath =
+          checkArgumentNotNull(getLocksDirPath(), "withLocksDirPath() is required");
 
-      if (getCdapPlugin().isUnbounded()) {
+      plugin.withConfig(pluginConfig).withHadoopConfiguration(keyClass, valueClass).prepareRun();
+
+      if (plugin.isUnbounded()) {
         // TODO: implement SparkReceiverIO.<~>write()
         throw new NotImplementedException("Support for unbounded plugins is not implemented!");
       } else {
-        Configuration hConf = getCdapPlugin().getHadoopConfiguration();
+        Configuration hConf = plugin.getHadoopConfiguration();
         HadoopFormatIO.Write<K, V> writeHadoop =
             HadoopFormatIO.<K, V>write()
                 .withConfiguration(hConf)
                 .withPartitioning()
-                .withExternalSynchronization(new HDFSSynchronization(getLocksDirPath()));
+                .withExternalSynchronization(new HDFSSynchronization(locksDirPath));
         return input.apply(writeHadoop);
       }
     }
-
-    public void validateTransform() {
-      checkArgument(getCdapPlugin() != null, "withCdapPluginClass() is required");
-      checkArgument(getPluginConfig() != null, "withPluginConfig() is required");
-      checkArgument(getKeyClass() != null, "withKeyClass() is required");
-      checkArgument(getValueClass() != null, "withValueClass() is required");
-      checkArgument(getLocksDirPath() != null, "withLocksDirPath() is required");
-    }
   }
 }
diff --git a/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/Plugin.java b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/Plugin.java
index 88ed2f321b0..31deb9d258d 100644
--- a/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/Plugin.java
+++ b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/Plugin.java
@@ -132,7 +132,6 @@ public abstract class Plugin {
   /** Sets a plugin Hadoop configuration. */
   public Plugin withHadoopConfiguration(Configuration hadoopConfiguration) {
     this.hadoopConfiguration = hadoopConfiguration;
-
     return this;
   }
 
diff --git a/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/PluginConfigInstantiationUtils.java b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/PluginConfigInstantiationUtils.java
index cfa6f0ed034..ced11201019 100644
--- a/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/PluginConfigInstantiationUtils.java
+++ b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/PluginConfigInstantiationUtils.java
@@ -30,11 +30,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
+import org.checkerframework.checker.initialization.qual.Initialized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /** Class for getting any filled {@link PluginConfig} configuration object. */
-@SuppressWarnings("nullness")
 public class PluginConfigInstantiationUtils {
 
   private static final Logger LOG = LoggerFactory.getLogger(PluginConfigInstantiationUtils.class);
@@ -66,7 +66,7 @@ public class PluginConfigInstantiationUtils {
     }
     InstantiatorFactory instantiatorFactory = new InstantiatorFactory(false);
 
-    T config = instantiatorFactory.get(TypeToken.of(configClass)).create();
+    @Initialized T config = instantiatorFactory.get(TypeToken.of(configClass)).create();
 
     if (config != null) {
       for (Field field : allFields) {
diff --git a/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/CdapIOTest.java b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/CdapIOTest.java
index 3cae6edb0b5..e978f5b8fca 100644
--- a/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/CdapIOTest.java
+++ b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/CdapIOTest.java
@@ -27,12 +27,15 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.cdap.context.BatchSinkContextImpl;
 import org.apache.beam.sdk.io.cdap.context.BatchSourceContextImpl;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 import org.apache.hadoop.mapreduce.OutputCommitter;
@@ -119,9 +122,10 @@ public class CdapIOTest {
   }
 
   @Test
-  public void testReadValidationFailsMissingCdapPluginClass() {
+  public void testReadExpandingFailsMissingCdapPluginClass() {
+    PBegin testPBegin = PBegin.in(TestPipeline.create());
     CdapIO.Read<String, String> read = CdapIO.read();
-    assertThrows(IllegalArgumentException.class, read::validateTransform);
+    assertThrows(IllegalArgumentException.class, () -> read.expand(testPBegin));
   }
 
   @Test
@@ -221,9 +225,12 @@ public class CdapIOTest {
   }
 
   @Test
-  public void testWriteValidationFailsMissingCdapPluginClass() {
+  public void testWriteExpandingFailsMissingCdapPluginClass() {
+    PBegin testPBegin = PBegin.in(TestPipeline.create());
+    PCollection<KV<String, String>> testPCollection =
+        Create.empty(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())).expand(testPBegin);
     CdapIO.Write<String, String> write = CdapIO.write();
-    assertThrows(IllegalArgumentException.class, write::validateTransform);
+    assertThrows(IllegalArgumentException.class, () -> write.expand(testPCollection));
   }
 
   @Test