You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/08 11:11:52 UTC

[beam] branch spark-runner_structured-streaming updated: Checkstyle and Findbugs

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push:
     new c494dfc  Checkstyle and Findbugs
c494dfc is described below

commit c494dfc89fb59c85186d77f70ac77416e2613825
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Tue Jan 8 12:11:34 2019 +0100

    Checkstyle and Findbugs
---
 .../structuredstreaming/translation/batch/DatasetSourceBatch.java     | 2 +-
 .../translation/batch/ReadSourceTranslatorBatch.java                  | 4 ++--
 .../translation/batch/mocks/ReadSourceTranslatorMockBatch.java        | 4 ++--
 .../translation/streaming/ReadSourceTranslatorStreaming.java          | 4 ++--
 .../org/apache/beam/runners/spark/structuredstreaming/SourceTest.java | 3 +++
 5 files changed, 10 insertions(+), 7 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
index c1a93e3..df88e8e 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
@@ -68,7 +68,7 @@ public class DatasetSourceBatch<T> implements DataSourceV2, ReadSupport {
     if (!options.get(DEFAULT_PARALLELISM).isPresent()){
       throw new RuntimeException("Spark default parallelism was not set in DataSource options");
     }
-    int numPartitions = Integer.valueOf(options.get(DEFAULT_PARALLELISM).get());
+    int numPartitions = Integer.parseInt(options.get(DEFAULT_PARALLELISM).get());
     checkArgument(numPartitions > 0, "Number of partitions must be greater than zero.");
 
     if (!options.get(PIPELINE_OPTIONS).isPresent()){
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
index 48d1646..ebd79ac 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
@@ -40,7 +40,7 @@ import org.apache.spark.sql.SparkSession;
 class ReadSourceTranslatorBatch<T>
     implements TransformTranslator<PTransform<PBegin, PCollection<T>>> {
 
-  private String SOURCEPROVIDERCLASS = DatasetSourceBatch.class.getCanonicalName();
+  private static String sourceProviderClass = DatasetSourceBatch.class.getCanonicalName();
 
   @SuppressWarnings("unchecked")
   @Override
@@ -65,7 +65,7 @@ class ReadSourceTranslatorBatch<T>
         String.valueOf(context.getSparkSession().sparkContext().defaultParallelism()));
     datasetSourceOptions.put(DatasetSourceBatch.PIPELINE_OPTIONS,
         PipelineOptionsSerializationUtils.serializeToJson(context.getOptions()));
-    Dataset<Row> rowDataset = sparkSession.read().format(SOURCEPROVIDERCLASS).options(datasetSourceOptions)
+    Dataset<Row> rowDataset = sparkSession.read().format(sourceProviderClass).options(datasetSourceOptions)
         .load();
 
     MapFunction<Row, WindowedValue> func = new MapFunction<Row, WindowedValue>() {
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/ReadSourceTranslatorMockBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/ReadSourceTranslatorMockBatch.java
index c0d628b..5cfb755 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/ReadSourceTranslatorMockBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/ReadSourceTranslatorMockBatch.java
@@ -36,7 +36,7 @@ import org.apache.spark.sql.SparkSession;
 public class ReadSourceTranslatorMockBatch<T>
     implements TransformTranslator<PTransform<PBegin, PCollection<T>>> {
 
-  private String SOURCEPROVIDERCLASS = DatasetSourceMockBatch.class.getCanonicalName();
+  private static String sourceProviderClass = DatasetSourceMockBatch.class.getCanonicalName();
 
   @SuppressWarnings("unchecked")
   @Override
@@ -44,7 +44,7 @@ public class ReadSourceTranslatorMockBatch<T>
       PTransform<PBegin, PCollection<T>> transform, TranslationContext context) {
     SparkSession sparkSession = context.getSparkSession();
 
-    Dataset<Row> rowDataset = sparkSession.read().format(SOURCEPROVIDERCLASS).load();
+    Dataset<Row> rowDataset = sparkSession.read().format(sourceProviderClass).load();
 
     MapFunction<Row, WindowedValue> func = new MapFunction<Row, WindowedValue>() {
       @Override public WindowedValue call(Row value) throws Exception {
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
index c58a688..9a6022a 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
@@ -36,7 +36,7 @@ import org.apache.spark.sql.SparkSession;
 class ReadSourceTranslatorStreaming<T>
     implements TransformTranslator<PTransform<PBegin, PCollection<T>>> {
 
-  private String SOURCEPROVIDERCLASS = DatasetSourceStreaming.class.getCanonicalName();
+  private static String sourceProviderClass = DatasetSourceStreaming.class.getCanonicalName();
 
   @SuppressWarnings("unchecked")
   @Override
@@ -55,7 +55,7 @@ class ReadSourceTranslatorStreaming<T>
     }
     SparkSession sparkSession = context.getSparkSession();
 
-    Dataset<Row> rowDataset = sparkSession.readStream().format(SOURCEPROVIDERCLASS).load();
+    Dataset<Row> rowDataset = sparkSession.readStream().format(sourceProviderClass).load();
 
     //TODO pass the source and the translation context serialized as string to the DatasetSource
     MapFunction<Row, WindowedValue> func = new MapFunction<Row, WindowedValue>() {
diff --git a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SourceTest.java b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SourceTest.java
index 79a85f3..5b4957d 100644
--- a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SourceTest.java
+++ b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SourceTest.java
@@ -5,6 +5,9 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
 
+/**
+ * Test class for beam to spark source translation.
+ */
 public class SourceTest {
   public static void main(String[] args) {
     PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();