You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/07 08:40:09 UTC
[beam] branch spark-runner_structured-streaming updated: Fix
checkstyle
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push:
new 852058d Fix checkstyle
852058d is described below
commit 852058d20b036432abc8fb8149f3ba40339cf7de
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Mon Jan 7 09:39:50 2019 +0100
Fix checkstyle
---
.../translation/batch/DatasetSourceBatch.java | 2 +-
.../translation/batch/ReadSourceTranslatorBatch.java | 4 ++--
.../batch/mocks/DatasetSourceMockBatch.java | 2 +-
.../batch/mocks/ReadSourceTranslatorMockBatch.java | 4 ++--
.../translation/batch/mocks/package-info.java | 20 ++++++++++++++++++++
.../streaming/DatasetSourceStreaming.java | 13 +------------
.../streaming/ReadSourceTranslatorStreaming.java | 6 ++----
7 files changed, 29 insertions(+), 22 deletions(-)
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
index 5d17e1b..1c8a888 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
@@ -133,7 +133,7 @@ public class DatasetSourceBatch<T> implements DataSourceV2, ReadSupport {
}
}
- /** This class can be mapped to Beam {@link BoundedReader} */
+ /** This class can be mapped to Beam {@link BoundedReader}. */
private class DatasetPartitionReader implements InputPartitionReader<InternalRow> {
BoundedReader<T> reader;
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
index 0da1c2a..68a9de9 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
@@ -40,7 +40,7 @@ import org.apache.spark.sql.SparkSession;
class ReadSourceTranslatorBatch<T>
implements TransformTranslator<PTransform<PBegin, PCollection<T>>> {
- private String SOURCE_PROVIDER_CLASS = DatasetSourceBatch.class.getCanonicalName();
+ private String SOURCEPROVIDERCLASS = DatasetSourceBatch.class.getCanonicalName();
@SuppressWarnings("unchecked")
@Override
@@ -65,7 +65,7 @@ class ReadSourceTranslatorBatch<T>
String.valueOf(context.getSparkSession().sparkContext().defaultParallelism()));
datasetSourceOptions.put(DatasetSourceBatch.PIPELINE_OPTIONS,
PipelineOptionsSerializationUtils.serializeToJson(context.getOptions()));
- Dataset<Row> rowDataset = sparkSession.read().format(SOURCE_PROVIDER_CLASS).options(datasetSourceOptions)
+ Dataset<Row> rowDataset = sparkSession.read().format(SOURCEPROVIDERCLASS).options(datasetSourceOptions)
.load();
//TODO pass the source and the translation context serialized as string to the DatasetSource
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/DatasetSourceMockBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/DatasetSourceMockBatch.java
index 914eed0..5a93928 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/DatasetSourceMockBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/DatasetSourceMockBatch.java
@@ -62,7 +62,7 @@ public class DatasetSourceMockBatch implements DataSourceV2, ReadSupport {
}
}
- /** This class is a mocked reader*/
+ /** This class is a mocked reader. */
private static class DatasetPartitionReaderMock implements InputPartitionReader<InternalRow> {
private ArrayList<Integer> values;
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/ReadSourceTranslatorMockBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/ReadSourceTranslatorMockBatch.java
index 17c7f62..c0d628b 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/ReadSourceTranslatorMockBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/ReadSourceTranslatorMockBatch.java
@@ -36,7 +36,7 @@ import org.apache.spark.sql.SparkSession;
public class ReadSourceTranslatorMockBatch<T>
implements TransformTranslator<PTransform<PBegin, PCollection<T>>> {
- private String SOURCE_PROVIDER_CLASS = DatasetSourceMockBatch.class.getCanonicalName();
+ private String SOURCEPROVIDERCLASS = DatasetSourceMockBatch.class.getCanonicalName();
@SuppressWarnings("unchecked")
@Override
@@ -44,7 +44,7 @@ public class ReadSourceTranslatorMockBatch<T>
PTransform<PBegin, PCollection<T>> transform, TranslationContext context) {
SparkSession sparkSession = context.getSparkSession();
- Dataset<Row> rowDataset = sparkSession.read().format(SOURCE_PROVIDER_CLASS).load();
+ Dataset<Row> rowDataset = sparkSession.read().format(SOURCEPROVIDERCLASS).load();
MapFunction<Row, WindowedValue> func = new MapFunction<Row, WindowedValue>() {
@Override public WindowedValue call(Row value) throws Exception {
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/package-info.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/package-info.java
new file mode 100644
index 0000000..3c00aaf
--- /dev/null
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Source mocks, only temporary waiting for the proper source to be done. */
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch.mocks;
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java
index fad68d3..3175aed 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java
@@ -28,16 +28,7 @@ import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
-import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.spark.sql.AnalysisException;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.catalog.Catalog;
-import org.apache.spark.sql.catalog.Column;
-import org.apache.spark.sql.catalog.Database;
-import org.apache.spark.sql.catalog.Function;
-import org.apache.spark.sql.catalog.Table;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.ContinuousReadSupport;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
@@ -48,8 +39,6 @@ import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
import org.apache.spark.sql.sources.v2.reader.streaming.Offset;
import org.apache.spark.sql.types.StructType;
-import org.apache.spark.storage.StorageLevel;
-import scala.collection.immutable.Map;
/**
* This is a spark structured streaming {@link DataSourceV2} implementation. As Continuous streaming
@@ -157,7 +146,7 @@ public class DatasetSourceStreaming<T> implements DataSourceV2, MicroBatchReadSu
}
}
- /** This class can be mapped to Beam {@link BoundedReader} */
+ /** This class can be mapped to Beam {@link BoundedReader}. */
private class DatasetMicroBatchPartitionReader implements InputPartitionReader<InternalRow> {
BoundedReader<T> reader;
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
index 6066822..c58a688 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
@@ -21,8 +21,6 @@ import java.io.IOException;
import org.apache.beam.runners.core.construction.ReadTranslation;
import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
-import org.apache.beam.runners.spark.structuredstreaming.translation.batch.DatasetSourceBatch;
-import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
@@ -38,7 +36,7 @@ import org.apache.spark.sql.SparkSession;
class ReadSourceTranslatorStreaming<T>
implements TransformTranslator<PTransform<PBegin, PCollection<T>>> {
- private String SOURCE_PROVIDER_CLASS = DatasetSourceStreaming.class.getCanonicalName();
+ private String SOURCEPROVIDERCLASS = DatasetSourceStreaming.class.getCanonicalName();
@SuppressWarnings("unchecked")
@Override
@@ -57,7 +55,7 @@ class ReadSourceTranslatorStreaming<T>
}
SparkSession sparkSession = context.getSparkSession();
- Dataset<Row> rowDataset = sparkSession.readStream().format(SOURCE_PROVIDER_CLASS).load();
+ Dataset<Row> rowDataset = sparkSession.readStream().format(SOURCEPROVIDERCLASS).load();
//TODO pass the source and the translation context serialized as string to the DatasetSource
MapFunction<Row, WindowedValue> func = new MapFunction<Row, WindowedValue>() {