You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/14 13:24:09 UTC

[beam] branch spark-runner_structured-streaming updated (c6618c5 -> af80e19)

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a change to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git.


 discard c6618c5  First attempt for ParDo primitive implementation
     new bab9027  Fix serialization issues
     new 5c9fcd3  Add SerializationDebugger
     new 43c737b  Fix SerializationDebugger
     new 47c20c2  Add serialization test
     new 002f0b4  Move SourceTest to same package as tested class
     new 3be7f2d  Fix SourceTest
     new 9fad3d4  Simplify beam reader creation as it created once the source as already been partitioned
     new a7d2328  Put all transform translators Serializable
     new 2acdf67  Enable test mode
     new d5f235d  Enable gradle build scan
     new af80e19  Add flatten test

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (c6618c5)
            \
             N -- N -- N   refs/heads/spark-runner_structured-streaming (af80e19)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 11 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 runners/spark-structured-streaming/build.gradle    |   4 +
 .../spark/structuredstreaming/SparkRunner.java     |   2 +-
 .../translation/TransformTranslator.java           |   3 +-
 .../translation/TranslationContext.java            |  23 +--
 .../translation/batch/DatasetSourceBatch.java      |  80 +++++-----
 .../translation/batch/DoFnFunction.java            | 137 ----------------
 .../translation/batch/ParDoTranslatorBatch.java    | 174 +--------------------
 .../translation/batch/SparkProcessContext.java     | 149 ------------------
 .../batch/functions/SparkNoOpStepContext.java      |  36 -----
 .../batch/functions/SparkSideInputReader.java      |  62 --------
 .../spark/structuredstreaming/SourceTest.java      |  29 ----
 .../translation/batch/FlattenTest.java             |  42 +++++
 .../translation/batch/SourceTest.java              |  79 ++++++++++
 .../utils/SerializationDebugger.java               | 131 ++++++++++++++++
 .../structuredstreaming/utils}/package-info.java   |   4 +-
 15 files changed, 311 insertions(+), 644 deletions(-)
 delete mode 100644 runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java
 delete mode 100644 runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SparkProcessContext.java
 delete mode 100644 runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/SparkNoOpStepContext.java
 delete mode 100644 runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/SparkSideInputReader.java
 delete mode 100644 runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SourceTest.java
 create mode 100644 runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTest.java
 create mode 100644 runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SourceTest.java
 create mode 100644 runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/SerializationDebugger.java
 copy runners/{spark/src/main/java/org/apache/beam/runners/spark/translation => spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils}/package-info.java (86%)


[beam] 07/11: Simplify beam reader creation as it created once the source as already been partitioned

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 9fad3d438d13326e88f74a08d8370eeed8288935
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Jan 11 11:23:30 2019 +0100

    Simplify beam reader creation as it created once the source as already been partitioned
---
 .../translation/batch/DatasetSourceBatch.java       | 21 +++++++++------------
 1 file changed, 9 insertions(+), 12 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
index c35f62e..d9e1722 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static scala.collection.JavaConversions.asScalaBuffer;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.runners.core.construction.PipelineOptionsSerializationUtils;
@@ -63,7 +64,7 @@ public class DatasetSourceBatch implements DataSourceV2, ReadSupport {
   }
 
   /** This class is mapped to Beam {@link BoundedSource}. */
-  private static class DatasetReader<T> implements DataSourceReader {
+  private static class DatasetReader<T> implements DataSourceReader, Serializable {
 
     private int numPartitions;
     private BoundedSource<T> source;
@@ -135,26 +136,22 @@ public class DatasetSourceBatch implements DataSourceV2, ReadSupport {
     private boolean started;
     private boolean closed;
     private BoundedReader<T> reader;
-    private BoundedSource<T> source;
-    private SerializablePipelineOptions serializablePipelineOptions;
 
     DatasetPartitionReader(BoundedSource<T> source, SerializablePipelineOptions serializablePipelineOptions) {
-      this.source = source;
-      this.serializablePipelineOptions = serializablePipelineOptions;
       this.started = false;
       this.closed = false;
+      // reader is not serializable so lazy initialize it
+      try {
+        reader = source
+            .createReader(serializablePipelineOptions.get().as(SparkPipelineOptions.class));
+      } catch (IOException e) {
+        throw new RuntimeException("Error creating BoundedReader ", e);
+      }
     }
 
     @Override
     public boolean next() throws IOException {
       if (!started) {
-        // reader is not serializable so lazy initialize it
-        try {
-          reader = source
-              .createReader(serializablePipelineOptions.get().as(SparkPipelineOptions.class));
-        } catch (IOException e) {
-          throw new RuntimeException("Error creating BoundedReader ", e);
-        }
         started = true;
         return reader.start();
       } else {


[beam] 11/11: Add flatten test

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit af80e19c16fd9a32dda0b1006da16cc3dd806c2e
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Mon Jan 14 09:27:41 2019 +0100

    Add flatten test
---
 .../translation/batch/FlattenTest.java             | 42 ++++++++++++++++++++++
 1 file changed, 42 insertions(+)

diff --git a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTest.java b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTest.java
new file mode 100644
index 0000000..ec22d14
--- /dev/null
+++ b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTest.java
@@ -0,0 +1,42 @@
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+
+import java.io.Serializable;
+import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
+import org.apache.beam.runners.spark.structuredstreaming.SparkRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Test class for beam to spark source translation.
+ */
+@RunWith(JUnit4.class)
+public class FlattenTest implements Serializable {
+  private static Pipeline pipeline;
+
+  @BeforeClass
+  public static void beforeClass(){
+    PipelineOptions options = PipelineOptionsFactory.create().as(SparkPipelineOptions.class);
+    options.setRunner(SparkRunner.class);
+    pipeline = Pipeline.create(options);
+  }
+
+
+  @Test
+  public void testFlatted(){
+    PCollection<Integer> input1 = pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
+    PCollection<Integer> input2 = pipeline.apply(Create.of(11, 12, 13, 14, 15, 16, 17, 18, 19, 20));
+    PCollectionList<Integer> pcs = PCollectionList.of(input1).and(input2);
+    PCollection<Integer> merged = pcs.apply(Flatten.<Integer>pCollections());
+    pipeline.run();
+  }
+
+}


[beam] 09/11: Enable test mode

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 2acdf672cbbde78e4fb9cd5053b44109c8a07a0f
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Jan 11 12:06:09 2019 +0100

    Enable test mode
---
 .../beam/runners/spark/structuredstreaming/SparkRunner.java   |  2 +-
 .../structuredstreaming/translation/TranslationContext.java   | 11 ++++++++---
 2 files changed, 9 insertions(+), 4 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
index 8e0cf25..97aa4d8 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
@@ -114,7 +114,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
   public SparkPipelineResult run(final Pipeline pipeline) {
     translationContext = translatePipeline(pipeline);
     //TODO initialise other services: checkpointing, metrics system, listeners, ...
-    translationContext.startPipeline();
+    translationContext.startPipeline(true);
     return new SparkPipelineResult();
   }
 
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
index e40bb85..9a3330a 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -159,7 +159,7 @@ public class TranslationContext {
   //  Pipeline methods
   // --------------------------------------------------------------------------------------------
 
-  public void startPipeline() {
+  public void startPipeline(boolean testMode) {
     try {
       // to start a pipeline we need a DatastreamWriter to start
       for (Dataset<?> dataset : leaves) {
@@ -167,8 +167,13 @@ public class TranslationContext {
         if (options.isStreaming()) {
           dataset.writeStream().foreach(new NoOpForeachWriter<>()).start().awaitTermination();
         } else {
-          // apply a dummy fn just to apply forech action that will trigger the pipeline run in spark
-          dataset.foreachPartition(t -> {});
+          if (testMode){
+            dataset.show();
+          } else {
+            // apply a dummy fn just to apply forech action that will trigger the pipeline run in spark
+            dataset.foreachPartition(t -> {
+            });
+          }
         }
       }
     } catch (StreamingQueryException e) {


[beam] 03/11: Fix SerializationDebugger

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 43c737b2f4b0e22c41a41fb5a0196acce92627d0
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Jan 11 10:17:35 2019 +0100

    Fix SerializationDebugger
---
 .../structuredstreaming/utils/SerializationDebugger.java      | 11 ++++++-----
 1 file changed, 6 insertions(+), 5 deletions(-)

diff --git a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/SerializationDebugger.java b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/SerializationDebugger.java
index 0e47969..a0ef7cc 100644
--- a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/SerializationDebugger.java
+++ b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/SerializationDebugger.java
@@ -19,6 +19,8 @@
 /** Testing utils for spark structured streaming runner. */
 package org.apache.beam.runners.spark.structuredstreaming.utils;
 
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.ObjectOutputStream;
 import java.io.OutputStream;
@@ -28,9 +30,9 @@ import java.util.List;
 
 public class SerializationDebugger {
 
-  public static void testSerialization(Object object) throws IOException {
+  public static void testSerialization(Object object, File to) throws IOException {
     DebuggingObjectOutputStream out =
-        new DebuggingObjectOutputStream();
+        new DebuggingObjectOutputStream(new FileOutputStream(to));
     try {
       out.writeObject(object);
     } catch (Exception e) {
@@ -42,8 +44,6 @@ public class SerializationDebugger {
 
   private static class DebuggingObjectOutputStream extends ObjectOutputStream {
 
-    public DebuggingObjectOutputStream() throws IOException, SecurityException {
-    }
 
     private static final Field DEPTH_FIELD;
 
@@ -56,7 +56,7 @@ public class SerializationDebugger {
       }
     }
 
-    final List<Object> stack = new ArrayList<Object>();
+    final List<Object> stack = new ArrayList<>();
 
     /**
      * Indicates whether or not OOS has tried to
@@ -75,6 +75,7 @@ public class SerializationDebugger {
      * Abuse {@code replaceObject()} as a hook to
      * maintain our stack.
      */
+    @Override
     protected Object replaceObject(Object o) {
       // ObjectOutputStream writes serialization
       // exceptions to the stream. Ignore


[beam] 08/11: Put all transform translators Serializable

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit a7d2328d697319be6d5ebea196cfe9389b5cda0f
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Jan 11 11:24:39 2019 +0100

    Put all transform translators Serializable
---
 .../spark/structuredstreaming/translation/TransformTranslator.java     | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TransformTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TransformTranslator.java
index f9558c9..097013b 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TransformTranslator.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TransformTranslator.java
@@ -17,10 +17,11 @@
  */
 package org.apache.beam.runners.spark.structuredstreaming.translation;
 
+import java.io.Serializable;
 import org.apache.beam.sdk.transforms.PTransform;
 
 /** Supports translation between a Beam transform, and Spark's operations on Datasets. */
-public interface TransformTranslator<TransformT extends PTransform> {
+public interface TransformTranslator<TransformT extends PTransform> extends Serializable {
 
   /** Base class for translators of {@link PTransform}. */
   void translateTransform(TransformT transform, TranslationContext context);


[beam] 06/11: Fix SourceTest

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 3be7f2db80a88d723302848d35b183c5a3032062
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Jan 11 11:01:51 2019 +0100

    Fix SourceTest
---
 .../translation/batch/SourceTest.java                 | 19 +++++++++----------
 1 file changed, 9 insertions(+), 10 deletions(-)

diff --git a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SourceTest.java b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SourceTest.java
index 6ef41b8..c3ec2ec 100644
--- a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SourceTest.java
+++ b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SourceTest.java
@@ -10,7 +10,6 @@ import org.apache.beam.runners.core.construction.PipelineOptionsSerializationUti
 import org.apache.beam.runners.core.serialization.Base64Serializer;
 import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
 import org.apache.beam.runners.spark.structuredstreaming.SparkRunner;
-import org.apache.beam.runners.spark.structuredstreaming.translation.batch.DatasetSourceBatch;
 import org.apache.beam.runners.spark.structuredstreaming.utils.SerializationDebugger;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.BoundedSource;
@@ -20,7 +19,7 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
 import org.junit.BeforeClass;
-import org.junit.Rule;
+import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
@@ -32,8 +31,8 @@ import org.junit.runners.JUnit4;
 @RunWith(JUnit4.class)
 public class SourceTest implements Serializable {
   private static Pipeline pipeline;
-  @Rule
-  public TemporaryFolder temporaryFolder;
+  @ClassRule
+  public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
 
   @BeforeClass
   public static void beforeClass(){
@@ -44,7 +43,6 @@ public class SourceTest implements Serializable {
 
   @Test
   public void testSerialization() throws IOException{
-    Map<String, String> datasetSourceOptions = new HashMap<>();
     BoundedSource<Integer> source = new BoundedSource<Integer>() {
 
       @Override public List<? extends BoundedSource<Integer>> split(long desiredBundleSizeBytes,
@@ -62,13 +60,14 @@ public class SourceTest implements Serializable {
       }
     };
     String serializedSource = Base64Serializer.serializeUnchecked(source);
-    datasetSourceOptions.put("source", serializedSource);
-    datasetSourceOptions.put("defaultParallelism", "4");
-    datasetSourceOptions.put("pipelineOptions",
+    Map<String, String> dataSourceOptions = new HashMap<>();
+    dataSourceOptions.put(DatasetSourceBatch.BEAM_SOURCE_OPTION, serializedSource);
+    dataSourceOptions.put(DatasetSourceBatch.DEFAULT_PARALLELISM, "4");
+    dataSourceOptions.put(DatasetSourceBatch.PIPELINE_OPTIONS,
         PipelineOptionsSerializationUtils.serializeToJson(pipeline.getOptions()));
     DataSourceReader objectToTest = new DatasetSourceBatch()
-        .createReader(new DataSourceOptions(datasetSourceOptions));
-    SerializationDebugger.testSerialization(objectToTest, temporaryFolder.getRoot());
+        .createReader(new DataSourceOptions(dataSourceOptions));
+    SerializationDebugger.testSerialization(objectToTest, TEMPORARY_FOLDER.newFile());
   }
 
   @Test


[beam] 02/11: Add SerializationDebugger

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 5c9fcd34b7b092c1d2c0935aee06ae1fd412ed30
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Jan 11 10:07:23 2019 +0100

    Add SerializationDebugger
---
 .../utils/SerializationDebugger.java               | 130 +++++++++++++++++++++
 .../structuredstreaming/utils/package-info.java    |  20 ++++
 2 files changed, 150 insertions(+)

diff --git a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/SerializationDebugger.java b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/SerializationDebugger.java
new file mode 100644
index 0000000..0e47969
--- /dev/null
+++ b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/SerializationDebugger.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Testing utils for spark structured streaming runner. */
+package org.apache.beam.runners.spark.structuredstreaming.utils;
+
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+
+public class SerializationDebugger {
+
+  public static void testSerialization(Object object) throws IOException {
+    DebuggingObjectOutputStream out =
+        new DebuggingObjectOutputStream();
+    try {
+      out.writeObject(object);
+    } catch (Exception e) {
+      throw new RuntimeException(
+          "Serialization error. Path to bad object: "
+              + out.getStack(), e);
+    }
+  }
+
+  private static class DebuggingObjectOutputStream extends ObjectOutputStream {
+
+    public DebuggingObjectOutputStream() throws IOException, SecurityException {
+    }
+
+    private static final Field DEPTH_FIELD;
+
+    static {
+      try {
+        DEPTH_FIELD = ObjectOutputStream.class.getDeclaredField("depth");
+        DEPTH_FIELD.setAccessible(true);
+      } catch (NoSuchFieldException e) {
+        throw new AssertionError(e);
+      }
+    }
+
+    final List<Object> stack = new ArrayList<Object>();
+
+    /**
+     * Indicates whether or not OOS has tried to
+     * write an IOException (presumably as the
+     * result of a serialization error) to the
+     * stream.
+     */
+    boolean broken = false;
+
+    public DebuggingObjectOutputStream(OutputStream out) throws IOException {
+      super(out);
+      enableReplaceObject(true);
+    }
+
+    /**
+     * Abuse {@code replaceObject()} as a hook to
+     * maintain our stack.
+     */
+    protected Object replaceObject(Object o) {
+      // ObjectOutputStream writes serialization
+      // exceptions to the stream. Ignore
+      // everything after that so we don't lose
+      // the path to a non-serializable object. So
+      // long as the user doesn't write an
+      // IOException as the root object, we're OK.
+      int currentDepth = currentDepth();
+      if (o instanceof IOException && currentDepth == 0) {
+        broken = true;
+      }
+      if (!broken) {
+        truncate(currentDepth);
+        stack.add(o);
+      }
+      return o;
+    }
+
+    private void truncate(int depth) {
+      while (stack.size() > depth) {
+        pop();
+      }
+    }
+
+    private Object pop() {
+      return stack.remove(stack.size() - 1);
+    }
+
+    /**
+     * Returns a 0-based depth within the object
+     * graph of the current object being
+     * serialized.
+     */
+    private int currentDepth() {
+      try {
+        Integer oneBased = ((Integer) DEPTH_FIELD.get(this));
+        return oneBased - 1;
+      } catch (IllegalAccessException e) {
+        throw new AssertionError(e);
+      }
+    }
+
+    /**
+     * Returns the path to the last object
+     * serialized. If an exception occurred, this
+     * should be the path to the non-serializable
+     * object.
+     */
+    public List<Object> getStack() {
+      return stack;
+    }
+  }
+}
\ No newline at end of file
diff --git a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/package-info.java b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/package-info.java
new file mode 100644
index 0000000..3d7da11
--- /dev/null
+++ b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Testing utils for spark structured streaming runner. */
+package org.apache.beam.runners.spark.structuredstreaming.utils;


[beam] 10/11: Enable gradle build scan

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit d5f235d6e807af871d73251fcd449f6d8a12a490
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Jan 11 16:26:07 2019 +0100

    Enable gradle build scan
---
 runners/spark-structured-streaming/build.gradle | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/runners/spark-structured-streaming/build.gradle b/runners/spark-structured-streaming/build.gradle
index 3154fca..06143b2 100644
--- a/runners/spark-structured-streaming/build.gradle
+++ b/runners/spark-structured-streaming/build.gradle
@@ -45,6 +45,10 @@ configurations.all {
     forcedModules = []
   }
 }
+buildScan {
+  termsOfServiceUrl = 'https://gradle.com/terms-of-service'
+  termsOfServiceAgree = 'yes'
+}
 test {
   systemProperty "spark.ui.enabled", "false"
   systemProperty "spark.ui.showConsoleProgress", "false"


[beam] 01/11: Fix serialization issues

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit bab9027f9fd6d7bf38134518b98d942fc75ec16a
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Thu Jan 10 17:22:13 2019 +0100

    Fix serialization issues
---
 .../translation/batch/DatasetSourceBatch.java      | 83 +++++++++++-----------
 1 file changed, 42 insertions(+), 41 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
index 421a3f9..c35f62e 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
@@ -23,8 +23,8 @@ import static scala.collection.JavaConversions.asScalaBuffer;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Optional;
 import org.apache.beam.runners.core.construction.PipelineOptionsSerializationUtils;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.core.serialization.Base64Serializer;
 import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
 import org.apache.beam.sdk.io.BoundedSource;
@@ -47,7 +47,7 @@ import org.apache.spark.sql.types.StructType;
  * This is a spark structured streaming {@link DataSourceV2} implementation. As Continuous streaming
  * is tagged experimental in spark, this class does no implement {@link ContinuousReadSupport}.
  */
-public class DatasetSourceBatch<T> implements DataSourceV2, ReadSupport {
+public class DatasetSourceBatch implements DataSourceV2, ReadSupport {
 
   static final String BEAM_SOURCE_OPTION = "beam-source";
   static final String DEFAULT_PARALLELISM = "default-parallelism";
@@ -59,38 +59,35 @@ public class DatasetSourceBatch<T> implements DataSourceV2, ReadSupport {
   @SuppressWarnings("unchecked")
   @Override
   public DataSourceReader createReader(DataSourceOptions options) {
-    if (!options.get(BEAM_SOURCE_OPTION).isPresent()){
-      throw new RuntimeException("Beam source was not set in DataSource options");
-    }
-    BoundedSource<T> source = Base64Serializer
-        .deserializeUnchecked(options.get(BEAM_SOURCE_OPTION).get(), BoundedSource.class);
-
-    if (!options.get(DEFAULT_PARALLELISM).isPresent()){
-      throw new RuntimeException("Spark default parallelism was not set in DataSource options");
-    }
-    int numPartitions = Integer.parseInt(options.get(DEFAULT_PARALLELISM).get());
-    checkArgument(numPartitions > 0, "Number of partitions must be greater than zero.");
-
-    if (!options.get(PIPELINE_OPTIONS).isPresent()){
-      throw new RuntimeException("Beam pipelineOptions were not set in DataSource options");
-    }
-    SparkPipelineOptions sparkPipelineOptions = PipelineOptionsSerializationUtils
-        .deserializeFromJson(options.get(PIPELINE_OPTIONS).get()).as(SparkPipelineOptions.class);
-    return new DatasetReader(numPartitions, source, sparkPipelineOptions);
+    return new DatasetReader<>(options);
   }
 
-  /** This class can be mapped to Beam {@link BoundedSource}. */
-  private class DatasetReader implements DataSourceReader {
+  /** This class is mapped to Beam {@link BoundedSource}. */
+  private static class DatasetReader<T> implements DataSourceReader {
 
     private int numPartitions;
     private BoundedSource<T> source;
-    private SparkPipelineOptions sparkPipelineOptions;
+    private SerializablePipelineOptions serializablePipelineOptions;
 
-    private DatasetReader(int numPartitions, BoundedSource<T> source,
-        SparkPipelineOptions sparkPipelineOptions) {
-      this.numPartitions = numPartitions;
-      this.source = source;
-      this.sparkPipelineOptions = sparkPipelineOptions;
+    private DatasetReader(DataSourceOptions options) {
+      if (!options.get(BEAM_SOURCE_OPTION).isPresent()){
+        throw new RuntimeException("Beam source was not set in DataSource options");
+      }
+      this.source = Base64Serializer
+          .deserializeUnchecked(options.get(BEAM_SOURCE_OPTION).get(), BoundedSource.class);
+
+      if (!options.get(DEFAULT_PARALLELISM).isPresent()){
+        throw new RuntimeException("Spark default parallelism was not set in DataSource options");
+      }
+      this.numPartitions = Integer.parseInt(options.get(DEFAULT_PARALLELISM).get());
+      checkArgument(numPartitions > 0, "Number of partitions must be greater than zero.");
+
+      if (!options.get(PIPELINE_OPTIONS).isPresent()){
+        throw new RuntimeException("Beam pipelineOptions were not set in DataSource options");
+      }
+      SparkPipelineOptions sparkPipelineOptions = PipelineOptionsSerializationUtils
+          .deserializeFromJson(options.get(PIPELINE_OPTIONS).get()).as(SparkPipelineOptions.class);
+      this.serializablePipelineOptions = new SerializablePipelineOptions(sparkPipelineOptions);
     }
 
     @Override
@@ -104,6 +101,8 @@ public class DatasetSourceBatch<T> implements DataSourceV2, ReadSupport {
 
     @Override
     public List<InputPartition<InternalRow>> planInputPartitions() {
+      SparkPipelineOptions sparkPipelineOptions = serializablePipelineOptions.get()
+          .as(SparkPipelineOptions.class);
       List<InputPartition<InternalRow>> result = new ArrayList<>();
       long desiredSizeBytes;
       try {
@@ -118,14 +117,7 @@ public class DatasetSourceBatch<T> implements DataSourceV2, ReadSupport {
 
                 @Override
                 public InputPartitionReader<InternalRow> createPartitionReader() {
-                  BoundedReader<T> reader = null;
-                  try {
-                    reader = split.createReader(sparkPipelineOptions);
-                  } catch (IOException e) {
-                    throw new RuntimeException(
-                        "Error creating BoundedReader " + reader.getClass().getCanonicalName(), e);
-                  }
-                  return new DatasetPartitionReader(reader);
+                  return new DatasetPartitionReader<>(source, serializablePipelineOptions);
                 }
               });
         }
@@ -139,14 +131,16 @@ public class DatasetSourceBatch<T> implements DataSourceV2, ReadSupport {
   }
 
   /** This class can be mapped to Beam {@link BoundedReader}. */
-  private class DatasetPartitionReader implements InputPartitionReader<InternalRow> {
-
-    BoundedReader<T> reader;
+  private static class DatasetPartitionReader<T> implements InputPartitionReader<InternalRow> {
     private boolean started;
     private boolean closed;
+    private BoundedReader<T> reader;
+    private BoundedSource<T> source;
+    private SerializablePipelineOptions serializablePipelineOptions;
 
-    DatasetPartitionReader(BoundedReader<T> reader) {
-      this.reader = reader;
+    DatasetPartitionReader(BoundedSource<T> source, SerializablePipelineOptions serializablePipelineOptions) {
+      this.source = source;
+      this.serializablePipelineOptions = serializablePipelineOptions;
       this.started = false;
       this.closed = false;
     }
@@ -154,6 +148,13 @@ public class DatasetSourceBatch<T> implements DataSourceV2, ReadSupport {
     @Override
     public boolean next() throws IOException {
       if (!started) {
+        // reader is not serializable so lazy initialize it
+        try {
+          reader = source
+              .createReader(serializablePipelineOptions.get().as(SparkPipelineOptions.class));
+        } catch (IOException e) {
+          throw new RuntimeException("Error creating BoundedReader ", e);
+        }
         started = true;
         return reader.start();
       } else {


[beam] 04/11: Add serialization test

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 47c20c27b261afcd7da9b807b040c78bd7db2495
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Jan 11 10:39:10 2019 +0100

    Add serialization test
---
 .../spark/structuredstreaming/SourceTest.java      | 51 +++++++++++++++++++++-
 1 file changed, 50 insertions(+), 1 deletion(-)

diff --git a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SourceTest.java b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SourceTest.java
index 8263718..c348ed5 100644
--- a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SourceTest.java
+++ b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SourceTest.java
@@ -1,17 +1,37 @@
 package org.apache.beam.runners.spark.structuredstreaming;
 
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.core.construction.PipelineOptionsSerializationUtils;
+import org.apache.beam.runners.core.serialization.Base64Serializer;
+import org.apache.beam.runners.spark.structuredstreaming.translation.batch.DatasetSourceBatch;
+import org.apache.beam.runners.spark.structuredstreaming.utils.SerializationDebugger;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
 
 /**
  * Test class for beam to spark source translation.
  */
-public class SourceTest {
+@RunWith(JUnit4.class)
+public class SourceTest implements Serializable {
   private static Pipeline pipeline;
+  @Rule
+  public TemporaryFolder temporaryFolder;
 
   @BeforeClass
   public static void beforeClass(){
@@ -21,6 +41,35 @@ public class SourceTest {
   }
 
   @Test
+  public void testSerialization() throws IOException{
+    Map<String, String> datasetSourceOptions = new HashMap<>();
+    BoundedSource<Integer> source = new BoundedSource<Integer>() {
+
+      @Override public List<? extends BoundedSource<Integer>> split(long desiredBundleSizeBytes,
+          PipelineOptions options) throws Exception {
+        return new ArrayList<>();
+      }
+
+      @Override public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+        return 0;
+      }
+
+      @Override public BoundedReader<Integer> createReader(PipelineOptions options)
+          throws IOException {
+        return null;
+      }
+    };
+    String serializedSource = Base64Serializer.serializeUnchecked(source);
+    datasetSourceOptions.put("source", serializedSource);
+    datasetSourceOptions.put("defaultParallelism", "4");
+    datasetSourceOptions.put("pipelineOptions",
+        PipelineOptionsSerializationUtils.serializeToJson(pipeline.getOptions()));
+    DataSourceReader objectToTest = new DatasetSourceBatch()
+        .createReader(new DataSourceOptions(datasetSourceOptions));
+    SerializationDebugger.testSerialization(objectToTest, temporaryFolder.getRoot());
+  }
+
+  @Test
   public void testBoundedSource(){
     pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
     pipeline.run();


[beam] 05/11: Move SourceTest to same package as tested class

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 002f0b4913f13d45792ada45145f486059d81b28
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Jan 11 10:53:12 2019 +0100

    Move SourceTest to same package as tested class
---
 .../spark/structuredstreaming/{ => translation/batch}/SourceTest.java | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SourceTest.java b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SourceTest.java
similarity index 92%
rename from runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SourceTest.java
rename to runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SourceTest.java
index c348ed5..6ef41b8 100644
--- a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SourceTest.java
+++ b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SourceTest.java
@@ -1,4 +1,4 @@
-package org.apache.beam.runners.spark.structuredstreaming;
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -8,6 +8,8 @@ import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.core.construction.PipelineOptionsSerializationUtils;
 import org.apache.beam.runners.core.serialization.Base64Serializer;
+import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
+import org.apache.beam.runners.spark.structuredstreaming.SparkRunner;
 import org.apache.beam.runners.spark.structuredstreaming.translation.batch.DatasetSourceBatch;
 import org.apache.beam.runners.spark.structuredstreaming.utils.SerializationDebugger;
 import org.apache.beam.sdk.Pipeline;