You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2018/12/11 14:22:16 UTC

[beam] 03/04: Apply spotless

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 8e08c5856adf16e45dbd51cb2284593439d54714
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Dec 7 12:08:51 2018 +0100

    Apply spotless
---
 .../translation/TranslationContext.java            |   7 --
 .../batch/ReadSourceTranslatorBatch.java           |   4 -
 .../translation/io/DatasetSource.java              | 109 +++++++++++++--------
 3 files changed, 68 insertions(+), 52 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
index 52ed11f..0f2493d 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -33,18 +33,11 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.spark.SparkConf;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.ForeachWriter;
-import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.execution.datasources.DataSource;
-import org.apache.spark.sql.execution.streaming.Source;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
-import org.apache.spark.sql.sources.v2.ReadSupport;
-import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
 import org.apache.spark.sql.streaming.StreamingQueryException;
 
 /**
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
index 05dc374..63f2fdf 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
@@ -19,14 +19,12 @@ package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
 
 import java.io.IOException;
 import org.apache.beam.runners.core.construction.ReadTranslation;
-import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
 import org.apache.beam.runners.spark.structuredstreaming.translation.io.DatasetSource;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.spark.sql.Dataset;
@@ -57,6 +55,4 @@ class ReadSourceTranslatorBatch<T>
 
     context.putDataset(output, dataset);
   }
-
-
 }
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java
index 60bdab6..f230a70 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.beam.runners.spark.structuredstreaming.translation.io;
 
 import static com.google.common.base.Preconditions.checkArgument;
@@ -25,8 +42,8 @@ import org.apache.spark.sql.types.StructType;
 
 /**
  * This is a spark structured streaming {@link DataSourceV2} implementation. As Continuous streaming
- * is tagged experimental in spark, this class does no implement {@link ContinuousReadSupport}.
- * This class is just a mix-in.
+ * is tagged experimental in spark, this class does no implement {@link ContinuousReadSupport}. This
+ * class is just a mix-in.
  */
 public class DatasetSource<T> implements DataSourceV2, MicroBatchReadSupport {
 
@@ -41,79 +58,87 @@ public class DatasetSource<T> implements DataSourceV2, MicroBatchReadSupport {
     this.numPartitions = context.getSparkSession().sparkContext().defaultParallelism();
     checkArgument(this.numPartitions > 0, "Number of partitions must be greater than zero.");
     this.bundleSize = context.getOptions().getBundleSize();
-
   }
 
-  @Override public MicroBatchReader createMicroBatchReader(Optional<StructType> schema,
-      String checkpointLocation, DataSourceOptions options) {
+  @Override
+  public MicroBatchReader createMicroBatchReader(
+      Optional<StructType> schema, String checkpointLocation, DataSourceOptions options) {
     return new DatasetMicroBatchReader(schema, checkpointLocation, options);
   }
 
-  /**
-   * This class can be mapped to Beam {@link BoundedSource}.
-   */
+  /** This class can be mapped to Beam {@link BoundedSource}. */
   private class DatasetMicroBatchReader implements MicroBatchReader {
 
     private Optional<StructType> schema;
     private String checkpointLocation;
     private DataSourceOptions options;
 
-    private DatasetMicroBatchReader(Optional<StructType> schema, String checkpointLocation,
-        DataSourceOptions options) {
+    private DatasetMicroBatchReader(
+        Optional<StructType> schema, String checkpointLocation, DataSourceOptions options) {
       //TODO deal with schema and options
     }
 
-    @Override public void setOffsetRange(Optional<Offset> start, Optional<Offset> end) {
+    @Override
+    public void setOffsetRange(Optional<Offset> start, Optional<Offset> end) {
       //TODO extension point for SDF
     }
 
-    @Override public Offset getStartOffset() {
+    @Override
+    public Offset getStartOffset() {
       //TODO extension point for SDF
       return null;
     }
 
-    @Override public Offset getEndOffset() {
+    @Override
+    public Offset getEndOffset() {
       //TODO extension point for SDF
       return null;
     }
 
-    @Override public Offset deserializeOffset(String json) {
+    @Override
+    public Offset deserializeOffset(String json) {
       //TODO extension point for SDF
       return null;
     }
 
-    @Override public void commit(Offset end) {
+    @Override
+    public void commit(Offset end) {
       //TODO no more to read after end Offset
     }
 
-    @Override public void stop() {
-    }
+    @Override
+    public void stop() {}
 
-    @Override public StructType readSchema() {
+    @Override
+    public StructType readSchema() {
       return null;
     }
 
-    @Override public List<InputPartition<InternalRow>> planInputPartitions() {
+    @Override
+    public List<InputPartition<InternalRow>> planInputPartitions() {
       List<InputPartition<InternalRow>> result = new ArrayList<>();
       long desiredSizeBytes;
       SparkPipelineOptions options = context.getOptions();
       try {
-        desiredSizeBytes = (bundleSize == null) ?
-            source.getEstimatedSizeBytes(options) / numPartitions :
-            bundleSize;
+        desiredSizeBytes =
+            (bundleSize == null)
+                ? source.getEstimatedSizeBytes(options) / numPartitions
+                : bundleSize;
         List<? extends BoundedSource<T>> sources = source.split(desiredSizeBytes, options);
         for (BoundedSource<T> source : sources) {
-          result.add(new InputPartition<InternalRow>() {
-
-            @Override public InputPartitionReader<InternalRow> createPartitionReader() {
-              BoundedReader<T> reader = null;
-              try {
-                reader = source.createReader(options);
-              } catch (IOException e) {
-              }
-              return new DatasetMicroBatchPartitionReader(reader);
-            }
-          });
+          result.add(
+              new InputPartition<InternalRow>() {
+
+                @Override
+                public InputPartitionReader<InternalRow> createPartitionReader() {
+                  BoundedReader<T> reader = null;
+                  try {
+                    reader = source.createReader(options);
+                  } catch (IOException e) {
+                  }
+                  return new DatasetMicroBatchPartitionReader(reader);
+                }
+              });
         }
         return result;
 
@@ -122,12 +147,9 @@ public class DatasetSource<T> implements DataSourceV2, MicroBatchReadSupport {
       }
       return result;
     }
-
   }
 
-  /**
-   * This class can be mapped to Beam {@link BoundedReader}
-   */
+  /** This class can be mapped to Beam {@link BoundedReader} */
   private class DatasetMicroBatchPartitionReader implements InputPartitionReader<InternalRow> {
 
     BoundedReader<T> reader;
@@ -140,7 +162,8 @@ public class DatasetSource<T> implements DataSourceV2, MicroBatchReadSupport {
       this.closed = false;
     }
 
-    @Override public boolean next() throws IOException {
+    @Override
+    public boolean next() throws IOException {
       if (!started) {
         started = true;
         return reader.start();
@@ -149,13 +172,17 @@ public class DatasetSource<T> implements DataSourceV2, MicroBatchReadSupport {
       }
     }
 
-    @Override public InternalRow get() {
+    @Override
+    public InternalRow get() {
       List<Object> list = new ArrayList<>();
-      list.add(WindowedValue.timestampedValueInGlobalWindow(reader.getCurrent(), reader.getCurrentTimestamp()));
+      list.add(
+          WindowedValue.timestampedValueInGlobalWindow(
+              reader.getCurrent(), reader.getCurrentTimestamp()));
       return InternalRow.apply(asScalaBuffer(list).toList());
     }
 
-    @Override public void close() throws IOException {
+    @Override
+    public void close() throws IOException {
       closed = true;
       reader.close();
     }