You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2022/07/27 00:49:44 UTC
[beam] branch master updated: [CdapIO] Fixed necessary warnings (#22399)
This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 59b2edb7ff1 [CdapIO] Fixed necessary warnings (#22399)
59b2edb7ff1 is described below
commit 59b2edb7ff166a3ed91d67fe60a6040bf875e7a4
Author: akashorabek <70...@users.noreply.github.com>
AuthorDate: Wed Jul 27 06:49:37 2022 +0600
[CdapIO] Fixed necessary warnings (#22399)
* Fixed warnings
* Add read and write expanding tests
* Format code
---
.../java/org/apache/beam/sdk/io/cdap/CdapIO.java | 53 +++++++++-------------
.../java/org/apache/beam/sdk/io/cdap/Plugin.java | 1 -
.../io/cdap/PluginConfigInstantiationUtils.java | 4 +-
.../org/apache/beam/sdk/io/cdap/CdapIOTest.java | 15 ++++--
4 files changed, 34 insertions(+), 39 deletions(-)
diff --git a/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/CdapIO.java b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/CdapIO.java
index f749acf3096..f2655507cf5 100644
--- a/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/CdapIO.java
+++ b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/CdapIO.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io.cdap;
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import com.google.auto.value.AutoValue;
@@ -39,7 +40,6 @@ import org.checkerframework.checker.nullness.qual.Nullable;
* href="https://github.com/data-integrations">CDAP</a> plugins.
*/
@Experimental(Kind.SOURCE_SINK)
-@SuppressWarnings("nullness")
public class CdapIO {
public static <K, V> Read<K, V> read() {
@@ -54,7 +54,6 @@ public class CdapIO {
@AutoValue
@AutoValue.CopyAnnotations
public abstract static class Read<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> {
-
abstract @Nullable PluginConfig getPluginConfig();
abstract @Nullable Plugin getCdapPlugin();
@@ -108,30 +107,24 @@ public class CdapIO {
@Override
public PCollection<KV<K, V>> expand(PBegin input) {
- validateTransform();
+ Plugin plugin = checkArgumentNotNull(getCdapPlugin(), "withCdapPluginClass() is required");
+ PluginConfig pluginConfig =
+ checkArgumentNotNull(getPluginConfig(), "withPluginConfig() is required");
+ Class<K> keyClass = checkArgumentNotNull(getKeyClass(), "withKeyClass() is required");
+ Class<V> valueClass = checkArgumentNotNull(getValueClass(), "withValueClass() is required");
- getCdapPlugin()
- .withConfig(getPluginConfig())
- .withHadoopConfiguration(getKeyClass(), getValueClass())
- .prepareRun();
+ plugin.withConfig(pluginConfig).withHadoopConfiguration(keyClass, valueClass).prepareRun();
- if (getCdapPlugin().isUnbounded()) {
+ if (plugin.isUnbounded()) {
// TODO: implement SparkReceiverIO.<~>read()
throw new NotImplementedException("Support for unbounded plugins is not implemented!");
} else {
- Configuration hConf = getCdapPlugin().getHadoopConfiguration();
+ Configuration hConf = plugin.getHadoopConfiguration();
HadoopFormatIO.Read<K, V> readFromHadoop =
HadoopFormatIO.<K, V>read().withConfiguration(hConf);
return input.apply(readFromHadoop);
}
}
-
- public void validateTransform() {
- checkArgument(getCdapPlugin() != null, "withCdapPluginClass() is required");
- checkArgument(getPluginConfig() != null, "withPluginConfig() is required");
- checkArgument(getKeyClass() != null, "withKeyClass() is required");
- checkArgument(getValueClass() != null, "withValueClass() is required");
- }
}
/** A {@link PTransform} to read from CDAP source. */
@@ -201,32 +194,28 @@ public class CdapIO {
@Override
public PDone expand(PCollection<KV<K, V>> input) {
- validateTransform();
- getCdapPlugin()
- .withConfig(getPluginConfig())
- .withHadoopConfiguration(getKeyClass(), getValueClass())
- .prepareRun();
+ Plugin plugin = checkArgumentNotNull(getCdapPlugin(), "withKeyClass() is required");
+ PluginConfig pluginConfig =
+ checkArgumentNotNull(getPluginConfig(), "withKeyClass() is required");
+ Class<K> keyClass = checkArgumentNotNull(getKeyClass(), "withKeyClass() is required");
+ Class<V> valueClass = checkArgumentNotNull(getValueClass(), "withValueClass() is required");
+ String locksDirPath =
+ checkArgumentNotNull(getLocksDirPath(), "withLocksDirPath() is required");
- if (getCdapPlugin().isUnbounded()) {
+ plugin.withConfig(pluginConfig).withHadoopConfiguration(keyClass, valueClass).prepareRun();
+
+ if (plugin.isUnbounded()) {
// TODO: implement SparkReceiverIO.<~>write()
throw new NotImplementedException("Support for unbounded plugins is not implemented!");
} else {
- Configuration hConf = getCdapPlugin().getHadoopConfiguration();
+ Configuration hConf = plugin.getHadoopConfiguration();
HadoopFormatIO.Write<K, V> writeHadoop =
HadoopFormatIO.<K, V>write()
.withConfiguration(hConf)
.withPartitioning()
- .withExternalSynchronization(new HDFSSynchronization(getLocksDirPath()));
+ .withExternalSynchronization(new HDFSSynchronization(locksDirPath));
return input.apply(writeHadoop);
}
}
-
- public void validateTransform() {
- checkArgument(getCdapPlugin() != null, "withCdapPluginClass() is required");
- checkArgument(getPluginConfig() != null, "withPluginConfig() is required");
- checkArgument(getKeyClass() != null, "withKeyClass() is required");
- checkArgument(getValueClass() != null, "withValueClass() is required");
- checkArgument(getLocksDirPath() != null, "withLocksDirPath() is required");
- }
}
}
diff --git a/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/Plugin.java b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/Plugin.java
index 88ed2f321b0..31deb9d258d 100644
--- a/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/Plugin.java
+++ b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/Plugin.java
@@ -132,7 +132,6 @@ public abstract class Plugin {
/** Sets a plugin Hadoop configuration. */
public Plugin withHadoopConfiguration(Configuration hadoopConfiguration) {
this.hadoopConfiguration = hadoopConfiguration;
-
return this;
}
diff --git a/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/PluginConfigInstantiationUtils.java b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/PluginConfigInstantiationUtils.java
index cfa6f0ed034..ced11201019 100644
--- a/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/PluginConfigInstantiationUtils.java
+++ b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/PluginConfigInstantiationUtils.java
@@ -30,11 +30,11 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
+import org.checkerframework.checker.initialization.qual.Initialized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Class for getting any filled {@link PluginConfig} configuration object. */
-@SuppressWarnings("nullness")
public class PluginConfigInstantiationUtils {
private static final Logger LOG = LoggerFactory.getLogger(PluginConfigInstantiationUtils.class);
@@ -66,7 +66,7 @@ public class PluginConfigInstantiationUtils {
}
InstantiatorFactory instantiatorFactory = new InstantiatorFactory(false);
- T config = instantiatorFactory.get(TypeToken.of(configClass)).create();
+ @Initialized T config = instantiatorFactory.get(TypeToken.of(configClass)).create();
if (config != null) {
for (Field field : allFields) {
diff --git a/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/CdapIOTest.java b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/CdapIOTest.java
index 3cae6edb0b5..e978f5b8fca 100644
--- a/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/CdapIOTest.java
+++ b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/CdapIOTest.java
@@ -27,12 +27,15 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.cdap.context.BatchSinkContextImpl;
import org.apache.beam.sdk.io.cdap.context.BatchSourceContextImpl;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.mapreduce.OutputCommitter;
@@ -119,9 +122,10 @@ public class CdapIOTest {
}
@Test
- public void testReadValidationFailsMissingCdapPluginClass() {
+ public void testReadExpandingFailsMissingCdapPluginClass() {
+ PBegin testPBegin = PBegin.in(TestPipeline.create());
CdapIO.Read<String, String> read = CdapIO.read();
- assertThrows(IllegalArgumentException.class, read::validateTransform);
+ assertThrows(IllegalArgumentException.class, () -> read.expand(testPBegin));
}
@Test
@@ -221,9 +225,12 @@ public class CdapIOTest {
}
@Test
- public void testWriteValidationFailsMissingCdapPluginClass() {
+ public void testWriteExpandingFailsMissingCdapPluginClass() {
+ PBegin testPBegin = PBegin.in(TestPipeline.create());
+ PCollection<KV<String, String>> testPCollection =
+ Create.empty(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())).expand(testPBegin);
CdapIO.Write<String, String> write = CdapIO.write();
- assertThrows(IllegalArgumentException.class, write::validateTransform);
+ assertThrows(IllegalArgumentException.class, () -> write.expand(testPCollection));
}
@Test