You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2018/12/11 14:22:16 UTC
[beam] 03/04: Apply spotless
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 8e08c5856adf16e45dbd51cb2284593439d54714
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Dec 7 12:08:51 2018 +0100
Apply spotless
---
.../translation/TranslationContext.java | 7 --
.../batch/ReadSourceTranslatorBatch.java | 4 -
.../translation/io/DatasetSource.java | 109 +++++++++++++--------
3 files changed, 68 insertions(+), 52 deletions(-)
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
index 52ed11f..0f2493d 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -33,18 +33,11 @@ import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.ForeachWriter;
-import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.execution.datasources.DataSource;
-import org.apache.spark.sql.execution.streaming.Source;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
-import org.apache.spark.sql.sources.v2.ReadSupport;
-import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
import org.apache.spark.sql.streaming.StreamingQueryException;
/**
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
index 05dc374..63f2fdf 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
@@ -19,14 +19,12 @@ package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
import java.io.IOException;
import org.apache.beam.runners.core.construction.ReadTranslation;
-import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
import org.apache.beam.runners.spark.structuredstreaming.translation.io.DatasetSource;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.spark.sql.Dataset;
@@ -57,6 +55,4 @@ class ReadSourceTranslatorBatch<T>
context.putDataset(output, dataset);
}
-
-
}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java
index 60bdab6..f230a70 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.beam.runners.spark.structuredstreaming.translation.io;
import static com.google.common.base.Preconditions.checkArgument;
@@ -25,8 +42,8 @@ import org.apache.spark.sql.types.StructType;
/**
* This is a spark structured streaming {@link DataSourceV2} implementation. As Continuous streaming
- * is tagged experimental in spark, this class does no implement {@link ContinuousReadSupport}.
- * This class is just a mix-in.
+ * is tagged experimental in spark, this class does no implement {@link ContinuousReadSupport}. This
+ * class is just a mix-in.
*/
public class DatasetSource<T> implements DataSourceV2, MicroBatchReadSupport {
@@ -41,79 +58,87 @@ public class DatasetSource<T> implements DataSourceV2, MicroBatchReadSupport {
this.numPartitions = context.getSparkSession().sparkContext().defaultParallelism();
checkArgument(this.numPartitions > 0, "Number of partitions must be greater than zero.");
this.bundleSize = context.getOptions().getBundleSize();
-
}
- @Override public MicroBatchReader createMicroBatchReader(Optional<StructType> schema,
- String checkpointLocation, DataSourceOptions options) {
+ @Override
+ public MicroBatchReader createMicroBatchReader(
+ Optional<StructType> schema, String checkpointLocation, DataSourceOptions options) {
return new DatasetMicroBatchReader(schema, checkpointLocation, options);
}
- /**
- * This class can be mapped to Beam {@link BoundedSource}.
- */
+ /** This class can be mapped to Beam {@link BoundedSource}. */
private class DatasetMicroBatchReader implements MicroBatchReader {
private Optional<StructType> schema;
private String checkpointLocation;
private DataSourceOptions options;
- private DatasetMicroBatchReader(Optional<StructType> schema, String checkpointLocation,
- DataSourceOptions options) {
+ private DatasetMicroBatchReader(
+ Optional<StructType> schema, String checkpointLocation, DataSourceOptions options) {
//TODO deal with schema and options
}
- @Override public void setOffsetRange(Optional<Offset> start, Optional<Offset> end) {
+ @Override
+ public void setOffsetRange(Optional<Offset> start, Optional<Offset> end) {
//TODO extension point for SDF
}
- @Override public Offset getStartOffset() {
+ @Override
+ public Offset getStartOffset() {
//TODO extension point for SDF
return null;
}
- @Override public Offset getEndOffset() {
+ @Override
+ public Offset getEndOffset() {
//TODO extension point for SDF
return null;
}
- @Override public Offset deserializeOffset(String json) {
+ @Override
+ public Offset deserializeOffset(String json) {
//TODO extension point for SDF
return null;
}
- @Override public void commit(Offset end) {
+ @Override
+ public void commit(Offset end) {
//TODO no more to read after end Offset
}
- @Override public void stop() {
- }
+ @Override
+ public void stop() {}
- @Override public StructType readSchema() {
+ @Override
+ public StructType readSchema() {
return null;
}
- @Override public List<InputPartition<InternalRow>> planInputPartitions() {
+ @Override
+ public List<InputPartition<InternalRow>> planInputPartitions() {
List<InputPartition<InternalRow>> result = new ArrayList<>();
long desiredSizeBytes;
SparkPipelineOptions options = context.getOptions();
try {
- desiredSizeBytes = (bundleSize == null) ?
- source.getEstimatedSizeBytes(options) / numPartitions :
- bundleSize;
+ desiredSizeBytes =
+ (bundleSize == null)
+ ? source.getEstimatedSizeBytes(options) / numPartitions
+ : bundleSize;
List<? extends BoundedSource<T>> sources = source.split(desiredSizeBytes, options);
for (BoundedSource<T> source : sources) {
- result.add(new InputPartition<InternalRow>() {
-
- @Override public InputPartitionReader<InternalRow> createPartitionReader() {
- BoundedReader<T> reader = null;
- try {
- reader = source.createReader(options);
- } catch (IOException e) {
- }
- return new DatasetMicroBatchPartitionReader(reader);
- }
- });
+ result.add(
+ new InputPartition<InternalRow>() {
+
+ @Override
+ public InputPartitionReader<InternalRow> createPartitionReader() {
+ BoundedReader<T> reader = null;
+ try {
+ reader = source.createReader(options);
+ } catch (IOException e) {
+ }
+ return new DatasetMicroBatchPartitionReader(reader);
+ }
+ });
}
return result;
@@ -122,12 +147,9 @@ public class DatasetSource<T> implements DataSourceV2, MicroBatchReadSupport {
}
return result;
}
-
}
- /**
- * This class can be mapped to Beam {@link BoundedReader}
- */
+ /** This class can be mapped to Beam {@link BoundedReader} */
private class DatasetMicroBatchPartitionReader implements InputPartitionReader<InternalRow> {
BoundedReader<T> reader;
@@ -140,7 +162,8 @@ public class DatasetSource<T> implements DataSourceV2, MicroBatchReadSupport {
this.closed = false;
}
- @Override public boolean next() throws IOException {
+ @Override
+ public boolean next() throws IOException {
if (!started) {
started = true;
return reader.start();
@@ -149,13 +172,17 @@ public class DatasetSource<T> implements DataSourceV2, MicroBatchReadSupport {
}
}
- @Override public InternalRow get() {
+ @Override
+ public InternalRow get() {
List<Object> list = new ArrayList<>();
- list.add(WindowedValue.timestampedValueInGlobalWindow(reader.getCurrent(), reader.getCurrentTimestamp()));
+ list.add(
+ WindowedValue.timestampedValueInGlobalWindow(
+ reader.getCurrent(), reader.getCurrentTimestamp()));
return InternalRow.apply(asScalaBuffer(list).toList());
}
- @Override public void close() throws IOException {
+ @Override
+ public void close() throws IOException {
closed = true;
reader.close();
}