You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/08 11:11:52 UTC
[beam] branch spark-runner_structured-streaming updated: Checkstyle
and Findbugs
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push:
new c494dfc Checkstyle and Findbugs
c494dfc is described below
commit c494dfc89fb59c85186d77f70ac77416e2613825
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Tue Jan 8 12:11:34 2019 +0100
Checkstyle and Findbugs
---
.../structuredstreaming/translation/batch/DatasetSourceBatch.java | 2 +-
.../translation/batch/ReadSourceTranslatorBatch.java | 4 ++--
.../translation/batch/mocks/ReadSourceTranslatorMockBatch.java | 4 ++--
.../translation/streaming/ReadSourceTranslatorStreaming.java | 4 ++--
.../org/apache/beam/runners/spark/structuredstreaming/SourceTest.java | 3 +++
5 files changed, 10 insertions(+), 7 deletions(-)
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
index c1a93e3..df88e8e 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
@@ -68,7 +68,7 @@ public class DatasetSourceBatch<T> implements DataSourceV2, ReadSupport {
if (!options.get(DEFAULT_PARALLELISM).isPresent()){
throw new RuntimeException("Spark default parallelism was not set in DataSource options");
}
- int numPartitions = Integer.valueOf(options.get(DEFAULT_PARALLELISM).get());
+ int numPartitions = Integer.parseInt(options.get(DEFAULT_PARALLELISM).get());
checkArgument(numPartitions > 0, "Number of partitions must be greater than zero.");
if (!options.get(PIPELINE_OPTIONS).isPresent()){
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
index 48d1646..ebd79ac 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
@@ -40,7 +40,7 @@ import org.apache.spark.sql.SparkSession;
class ReadSourceTranslatorBatch<T>
implements TransformTranslator<PTransform<PBegin, PCollection<T>>> {
- private String SOURCEPROVIDERCLASS = DatasetSourceBatch.class.getCanonicalName();
+ private static String sourceProviderClass = DatasetSourceBatch.class.getCanonicalName();
@SuppressWarnings("unchecked")
@Override
@@ -65,7 +65,7 @@ class ReadSourceTranslatorBatch<T>
String.valueOf(context.getSparkSession().sparkContext().defaultParallelism()));
datasetSourceOptions.put(DatasetSourceBatch.PIPELINE_OPTIONS,
PipelineOptionsSerializationUtils.serializeToJson(context.getOptions()));
- Dataset<Row> rowDataset = sparkSession.read().format(SOURCEPROVIDERCLASS).options(datasetSourceOptions)
+ Dataset<Row> rowDataset = sparkSession.read().format(sourceProviderClass).options(datasetSourceOptions)
.load();
MapFunction<Row, WindowedValue> func = new MapFunction<Row, WindowedValue>() {
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/ReadSourceTranslatorMockBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/ReadSourceTranslatorMockBatch.java
index c0d628b..5cfb755 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/ReadSourceTranslatorMockBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/ReadSourceTranslatorMockBatch.java
@@ -36,7 +36,7 @@ import org.apache.spark.sql.SparkSession;
public class ReadSourceTranslatorMockBatch<T>
implements TransformTranslator<PTransform<PBegin, PCollection<T>>> {
- private String SOURCEPROVIDERCLASS = DatasetSourceMockBatch.class.getCanonicalName();
+ private static String sourceProviderClass = DatasetSourceMockBatch.class.getCanonicalName();
@SuppressWarnings("unchecked")
@Override
@@ -44,7 +44,7 @@ public class ReadSourceTranslatorMockBatch<T>
PTransform<PBegin, PCollection<T>> transform, TranslationContext context) {
SparkSession sparkSession = context.getSparkSession();
- Dataset<Row> rowDataset = sparkSession.read().format(SOURCEPROVIDERCLASS).load();
+ Dataset<Row> rowDataset = sparkSession.read().format(sourceProviderClass).load();
MapFunction<Row, WindowedValue> func = new MapFunction<Row, WindowedValue>() {
@Override public WindowedValue call(Row value) throws Exception {
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
index c58a688..9a6022a 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
@@ -36,7 +36,7 @@ import org.apache.spark.sql.SparkSession;
class ReadSourceTranslatorStreaming<T>
implements TransformTranslator<PTransform<PBegin, PCollection<T>>> {
- private String SOURCEPROVIDERCLASS = DatasetSourceStreaming.class.getCanonicalName();
+ private static String sourceProviderClass = DatasetSourceStreaming.class.getCanonicalName();
@SuppressWarnings("unchecked")
@Override
@@ -55,7 +55,7 @@ class ReadSourceTranslatorStreaming<T>
}
SparkSession sparkSession = context.getSparkSession();
- Dataset<Row> rowDataset = sparkSession.readStream().format(SOURCEPROVIDERCLASS).load();
+ Dataset<Row> rowDataset = sparkSession.readStream().format(sourceProviderClass).load();
//TODO pass the source and the translation context serialized as string to the DatasetSource
MapFunction<Row, WindowedValue> func = new MapFunction<Row, WindowedValue>() {
diff --git a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SourceTest.java b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SourceTest.java
index 79a85f3..5b4957d 100644
--- a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SourceTest.java
+++ b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SourceTest.java
@@ -5,6 +5,9 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
+/**
+ * Test class for beam to spark source translation.
+ */
public class SourceTest {
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();