You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/07 08:40:09 UTC

[beam] branch spark-runner_structured-streaming updated: Fix checkstyle

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push:
     new 852058d  Fix checkstyle
852058d is described below

commit 852058d20b036432abc8fb8149f3ba40339cf7de
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Mon Jan 7 09:39:50 2019 +0100

    Fix checkstyle
---
 .../translation/batch/DatasetSourceBatch.java        |  2 +-
 .../translation/batch/ReadSourceTranslatorBatch.java |  4 ++--
 .../batch/mocks/DatasetSourceMockBatch.java          |  2 +-
 .../batch/mocks/ReadSourceTranslatorMockBatch.java   |  4 ++--
 .../translation/batch/mocks/package-info.java        | 20 ++++++++++++++++++++
 .../streaming/DatasetSourceStreaming.java            | 13 +------------
 .../streaming/ReadSourceTranslatorStreaming.java     |  6 ++----
 7 files changed, 29 insertions(+), 22 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
index 5d17e1b..1c8a888 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
@@ -133,7 +133,7 @@ public class DatasetSourceBatch<T> implements DataSourceV2, ReadSupport {
     }
   }
 
-  /** This class can be mapped to Beam {@link BoundedReader} */
+  /** This class can be mapped to Beam {@link BoundedReader}. */
   private class DatasetPartitionReader implements InputPartitionReader<InternalRow> {
 
     BoundedReader<T> reader;
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
index 0da1c2a..68a9de9 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
@@ -40,7 +40,7 @@ import org.apache.spark.sql.SparkSession;
 class ReadSourceTranslatorBatch<T>
     implements TransformTranslator<PTransform<PBegin, PCollection<T>>> {
 
-  private String SOURCE_PROVIDER_CLASS = DatasetSourceBatch.class.getCanonicalName();
+  private String SOURCEPROVIDERCLASS = DatasetSourceBatch.class.getCanonicalName();
 
   @SuppressWarnings("unchecked")
   @Override
@@ -65,7 +65,7 @@ class ReadSourceTranslatorBatch<T>
         String.valueOf(context.getSparkSession().sparkContext().defaultParallelism()));
     datasetSourceOptions.put(DatasetSourceBatch.PIPELINE_OPTIONS,
         PipelineOptionsSerializationUtils.serializeToJson(context.getOptions()));
-    Dataset<Row> rowDataset = sparkSession.read().format(SOURCE_PROVIDER_CLASS).options(datasetSourceOptions)
+    Dataset<Row> rowDataset = sparkSession.read().format(SOURCEPROVIDERCLASS).options(datasetSourceOptions)
         .load();
 
     //TODO pass the source and the translation context serialized as string to the DatasetSource
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/DatasetSourceMockBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/DatasetSourceMockBatch.java
index 914eed0..5a93928 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/DatasetSourceMockBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/DatasetSourceMockBatch.java
@@ -62,7 +62,7 @@ public class DatasetSourceMockBatch implements DataSourceV2, ReadSupport {
     }
   }
 
-  /** This class is a mocked reader*/
+  /** This class is a mocked reader. */
   private static class DatasetPartitionReaderMock implements InputPartitionReader<InternalRow> {
 
     private ArrayList<Integer> values;
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/ReadSourceTranslatorMockBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/ReadSourceTranslatorMockBatch.java
index 17c7f62..c0d628b 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/ReadSourceTranslatorMockBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/ReadSourceTranslatorMockBatch.java
@@ -36,7 +36,7 @@ import org.apache.spark.sql.SparkSession;
 public class ReadSourceTranslatorMockBatch<T>
     implements TransformTranslator<PTransform<PBegin, PCollection<T>>> {
 
-  private String SOURCE_PROVIDER_CLASS = DatasetSourceMockBatch.class.getCanonicalName();
+  private String SOURCEPROVIDERCLASS = DatasetSourceMockBatch.class.getCanonicalName();
 
   @SuppressWarnings("unchecked")
   @Override
@@ -44,7 +44,7 @@ public class ReadSourceTranslatorMockBatch<T>
       PTransform<PBegin, PCollection<T>> transform, TranslationContext context) {
     SparkSession sparkSession = context.getSparkSession();
 
-    Dataset<Row> rowDataset = sparkSession.read().format(SOURCE_PROVIDER_CLASS).load();
+    Dataset<Row> rowDataset = sparkSession.read().format(SOURCEPROVIDERCLASS).load();
 
     MapFunction<Row, WindowedValue> func = new MapFunction<Row, WindowedValue>() {
       @Override public WindowedValue call(Row value) throws Exception {
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/package-info.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/package-info.java
new file mode 100644
index 0000000..3c00aaf
--- /dev/null
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Source mocks, only temporary waiting for the proper source to be done. */
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch.mocks;
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java
index fad68d3..3175aed 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java
@@ -28,16 +28,7 @@ import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
-import org.apache.beam.sdk.io.Source;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.spark.sql.AnalysisException;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.catalog.Catalog;
-import org.apache.spark.sql.catalog.Column;
-import org.apache.spark.sql.catalog.Database;
-import org.apache.spark.sql.catalog.Function;
-import org.apache.spark.sql.catalog.Table;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.sources.v2.ContinuousReadSupport;
 import org.apache.spark.sql.sources.v2.DataSourceOptions;
@@ -48,8 +39,6 @@ import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
 import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
 import org.apache.spark.sql.sources.v2.reader.streaming.Offset;
 import org.apache.spark.sql.types.StructType;
-import org.apache.spark.storage.StorageLevel;
-import scala.collection.immutable.Map;
 
 /**
  * This is a spark structured streaming {@link DataSourceV2} implementation. As Continuous streaming
@@ -157,7 +146,7 @@ public class DatasetSourceStreaming<T> implements DataSourceV2, MicroBatchReadSu
     }
   }
 
-  /** This class can be mapped to Beam {@link BoundedReader} */
+  /** This class can be mapped to Beam {@link BoundedReader}. */
   private class DatasetMicroBatchPartitionReader implements InputPartitionReader<InternalRow> {
 
     BoundedReader<T> reader;
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
index 6066822..c58a688 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
@@ -21,8 +21,6 @@ import java.io.IOException;
 import org.apache.beam.runners.core.construction.ReadTranslation;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
-import org.apache.beam.runners.spark.structuredstreaming.translation.batch.DatasetSourceBatch;
-import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -38,7 +36,7 @@ import org.apache.spark.sql.SparkSession;
 class ReadSourceTranslatorStreaming<T>
     implements TransformTranslator<PTransform<PBegin, PCollection<T>>> {
 
-  private String SOURCE_PROVIDER_CLASS = DatasetSourceStreaming.class.getCanonicalName();
+  private String SOURCEPROVIDERCLASS = DatasetSourceStreaming.class.getCanonicalName();
 
   @SuppressWarnings("unchecked")
   @Override
@@ -57,7 +55,7 @@ class ReadSourceTranslatorStreaming<T>
     }
     SparkSession sparkSession = context.getSparkSession();
 
-    Dataset<Row> rowDataset = sparkSession.readStream().format(SOURCE_PROVIDER_CLASS).load();
+    Dataset<Row> rowDataset = sparkSession.readStream().format(SOURCEPROVIDERCLASS).load();
 
     //TODO pass the source and the translation context serialized as string to the DatasetSource
     MapFunction<Row, WindowedValue> func = new MapFunction<Row, WindowedValue>() {