You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/08/02 20:34:52 UTC

[GitHub] [beam] ibzib commented on a change in pull request #15218: [BEAM-7093] Migrate spark structured streaming runner to spark 3

ibzib commented on a change in pull request #15218:
URL: https://github.com/apache/beam/pull/15218#discussion_r681244483



##########
File path: runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
##########
@@ -17,246 +17,18 @@
  */
 package org.apache.beam.runners.spark.structuredstreaming.translation;
 
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
-import org.apache.beam.runners.core.construction.TransformInputs;
 import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions;
-import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.ForeachFunction;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.ForeachWriter;
-import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.streaming.DataStreamWriter;
-import org.apache.spark.sql.streaming.StreamingQueryException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-/**
- * Base class that gives a context for {@link PTransform} translation: keeping track of the
- * datasets, the {@link SparkSession}, the current transform being translated.
- */
-@SuppressWarnings({
-  "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
-public class TranslationContext {
-
-  private static final Logger LOG = LoggerFactory.getLogger(TranslationContext.class);
-
-  /** All the datasets of the DAG. */
-  private final Map<PValue, Dataset<?>> datasets;
-  /** datasets that are not used as input to other datasets (leaves of the DAG). */
-  private final Set<Dataset<?>> leaves;
-
-  private final SerializablePipelineOptions serializablePipelineOptions;
-
-  @SuppressFBWarnings("URF_UNREAD_FIELD") // make spotbugs happy
-  private AppliedPTransform<?, ?, ?> currentTransform;
-
-  @SuppressFBWarnings("URF_UNREAD_FIELD") // make spotbugs happy
-  private final SparkSession sparkSession;
-
-  private final Map<PCollectionView<?>, Dataset<?>> broadcastDataSets;
+/** Subclass of {@link AbstractTranslationContext} that address spark breaking changes. */
+public class TranslationContext extends AbstractTranslationContext {
 
   public TranslationContext(SparkStructuredStreamingPipelineOptions options) {
-    SparkConf sparkConf = new SparkConf();
-    sparkConf.setMaster(options.getSparkMaster());
-    sparkConf.setAppName(options.getAppName());
-    if (options.getFilesToStage() != null && !options.getFilesToStage().isEmpty()) {
-      sparkConf.setJars(options.getFilesToStage().toArray(new String[0]));
-    }
-
-    // By default, Spark defines 200 as a number of sql partitions. This seems too much for local
-    // mode, so try to align with value of "sparkMaster" option in this case.
-    // We should not overwrite this value (or any user-defined spark configuration value) if the
-    // user has already configured it.
-    String sparkMaster = options.getSparkMaster();
-    if (sparkMaster != null
-        && sparkMaster.startsWith("local[")
-        && System.getProperty("spark.sql.shuffle.partitions") == null) {
-      int numPartitions =
-          Integer.parseInt(sparkMaster.substring("local[".length(), sparkMaster.length() - 1));
-      if (numPartitions > 0) {
-        sparkConf.set("spark.sql.shuffle.partitions", String.valueOf(numPartitions));
-      }
-    }
-
-    this.sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();
-    this.serializablePipelineOptions = new SerializablePipelineOptions(options);
-    this.datasets = new HashMap<>();
-    this.leaves = new HashSet<>();
-    this.broadcastDataSets = new HashMap<>();
-  }
-
-  public SparkSession getSparkSession() {
-    return sparkSession;
-  }
-
-  public SerializablePipelineOptions getSerializableOptions() {
-    return serializablePipelineOptions;
-  }
-
-  // --------------------------------------------------------------------------------------------
-  //  Transforms methods
-  // --------------------------------------------------------------------------------------------
-  public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) {
-    this.currentTransform = currentTransform;
-  }
-
-  public AppliedPTransform<?, ?, ?> getCurrentTransform() {
-    return currentTransform;
-  }
-
-  // --------------------------------------------------------------------------------------------
-  //  Datasets methods
-  // --------------------------------------------------------------------------------------------
-  @SuppressWarnings("unchecked")
-  public <T> Dataset<T> emptyDataset() {
-    return (Dataset<T>) sparkSession.emptyDataset(EncoderHelpers.fromBeamCoder(VoidCoder.of()));
-  }
-
-  @SuppressWarnings("unchecked")
-  public <T> Dataset<WindowedValue<T>> getDataset(PValue value) {
-    Dataset<?> dataset = datasets.get(value);
-    // assume that the Dataset is used as an input if retrieved here. So it is not a leaf anymore
-    leaves.remove(dataset);
-    return (Dataset<WindowedValue<T>>) dataset;
+    super(options);
   }
 
-  /**
-   * TODO: All these 3 methods (putDataset*) are temporary and they are used only for generics type
-   * checking. We should unify them in the future.
-   */
-  public void putDatasetWildcard(PValue value, Dataset<WindowedValue<?>> dataset) {
-    if (!datasets.containsKey(value)) {
-      datasets.put(value, dataset);
-      leaves.add(dataset);
-    }
-  }
-
-  public <T> void putDataset(PValue value, Dataset<WindowedValue<T>> dataset) {
-    if (!datasets.containsKey(value)) {
-      datasets.put(value, dataset);
-      leaves.add(dataset);
-    }
-  }
-
-  public <ViewT, ElemT> void setSideInputDataset(
-      PCollectionView<ViewT> value, Dataset<WindowedValue<ElemT>> set) {
-    if (!broadcastDataSets.containsKey(value)) {
-      broadcastDataSets.put(value, set);
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  public <T> Dataset<T> getSideInputDataSet(PCollectionView<?> value) {
-    return (Dataset<T>) broadcastDataSets.get(value);
-  }
-
-  // --------------------------------------------------------------------------------------------
-  //  PCollections methods
-  // --------------------------------------------------------------------------------------------
-  public PValue getInput() {
-    return Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(currentTransform));
-  }
-
-  public Map<TupleTag<?>, PCollection<?>> getInputs() {
-    return currentTransform.getInputs();
-  }
-
-  public PValue getOutput() {
-    return Iterables.getOnlyElement(currentTransform.getOutputs().values());
-  }
-
-  public Map<TupleTag<?>, PCollection<?>> getOutputs() {
-    return currentTransform.getOutputs();
-  }
-
-  @SuppressWarnings("unchecked")
-  public Map<TupleTag<?>, Coder<?>> getOutputCoders() {
-    return currentTransform.getOutputs().entrySet().stream()
-        .filter(e -> e.getValue() instanceof PCollection)
-        .collect(Collectors.toMap(Map.Entry::getKey, e -> ((PCollection) e.getValue()).getCoder()));
-  }
-
-  // --------------------------------------------------------------------------------------------
-  //  Pipeline methods
-  // --------------------------------------------------------------------------------------------
-
-  /** Starts the pipeline. */
-  public void startPipeline() {
-    try {
-      SparkStructuredStreamingPipelineOptions options =
-          serializablePipelineOptions.get().as(SparkStructuredStreamingPipelineOptions.class);
-      int datasetIndex = 0;
-      for (Dataset<?> dataset : leaves) {
-        if (options.isStreaming()) {
-          // TODO: deal with Beam Discarding, Accumulating and Accumulating & Retracting	outputmodes
-          // with DatastreamWriter.outputMode
-          DataStreamWriter<?> dataStreamWriter = dataset.writeStream();
-          // spark sets a default checkpoint dir if not set.
-          if (options.getCheckpointDir() != null) {
-            dataStreamWriter =
-                dataStreamWriter.option("checkpointLocation", options.getCheckpointDir());
-          }
-          // TODO: Do not await termination here.
-          dataStreamWriter.foreach(new NoOpForeachWriter<>()).start().awaitTermination();
-        } else {
-          if (options.getTestMode()) {
-            LOG.debug("**** dataset {} catalyst execution plans ****", ++datasetIndex);
-            dataset.explain(true);
-          }
-          // apply a dummy fn just to apply foreach action that will trigger the pipeline run in
-          // spark
-          dataset.foreach((ForeachFunction) t -> {});
-        }
-      }
-    } catch (StreamingQueryException e) {
-      throw new RuntimeException("Pipeline execution failed: " + e);
-    }
-  }
-
-  public static void printDatasetContent(Dataset<WindowedValue> dataset) {
-    // cannot use dataset.show because dataset schema is binary so it will print binary
-    // code.
-    List<WindowedValue> windowedValues = dataset.collectAsList();
-    for (WindowedValue windowedValue : windowedValues) {
-      LOG.debug("**** dataset content {} ****", windowedValue.toString());
-    }
-  }
-
-  private static class NoOpForeachWriter<T> extends ForeachWriter<T> {
-
-    @Override
-    public boolean open(long partitionId, long epochId) {
-      return false;
-    }
-
-    @Override
-    public void process(T value) {
-      // do nothing
-    }
-
-    @Override
-    public void close(Throwable errorOrNull) {
-      // do nothing
-    }
+  @Override
+  public void launchStreaming(DataStreamWriter<?> dataStreamWriter) {
+    dataStreamWriter.start();

Review comment:
       As far as I can tell, the only difference between the Spark 2 and 3 translation contexts is the try/catch block. What happens if we catch the TimeoutException in Spark 2?

##########
File path: runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming.translation.streaming;
+
+/**
+ * Empty impl needed for compilation. Spark DataSourceV2 API was removed in Spark3. Need to code a

Review comment:
       Are we leaving this as a TODO for now? Is there a JIRA for it?

##########
File path: runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
##########
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+
+import static org.apache.beam.runners.spark.structuredstreaming.Constants.BEAM_SOURCE_OPTION;
+import static org.apache.beam.runners.spark.structuredstreaming.Constants.DEFAULT_PARALLELISM;
+import static org.apache.beam.runners.spark.structuredstreaming.Constants.PIPELINE_OPTIONS;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
+import org.apache.beam.runners.core.serialization.Base64Serializer;
+import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.RowHelpers;
+import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.SchemaHelpers;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.parquet.Strings;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.SupportsRead;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCapability;
+import org.apache.spark.sql.connector.catalog.TableProvider;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.connector.read.Batch;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.apache.spark.sql.connector.read.PartitionReaderFactory;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.read.ScanBuilder;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+/**
+ * Empty impl needed for compilation. Spark DataSourceV2 API was removed in Spark3. Need to code a

Review comment:
       This class is no longer empty, please update the Javadoc.

##########
File path: runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
##########
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+
+import static org.apache.beam.runners.spark.structuredstreaming.Constants.BEAM_SOURCE_OPTION;
+import static org.apache.beam.runners.spark.structuredstreaming.Constants.DEFAULT_PARALLELISM;
+import static org.apache.beam.runners.spark.structuredstreaming.Constants.PIPELINE_OPTIONS;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
+import org.apache.beam.runners.core.serialization.Base64Serializer;
+import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.RowHelpers;
+import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.SchemaHelpers;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.parquet.Strings;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.SupportsRead;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCapability;
+import org.apache.spark.sql.connector.catalog.TableProvider;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.connector.read.Batch;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.apache.spark.sql.connector.read.PartitionReaderFactory;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.read.ScanBuilder;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+/**
+ * Empty impl needed for compilation. Spark DataSourceV2 API was removed in Spark3. Need to code a
+ * Beam source using new spark 3 API.
+ */
+public class DatasetSourceBatch implements TableProvider {
+
+  private static final StructType BINARY_SCHEMA = SchemaHelpers.binarySchema();
+
+  public DatasetSourceBatch() {}
+
+  @Override
+  public StructType inferSchema(CaseInsensitiveStringMap options) {
+    return BINARY_SCHEMA;
+  }
+
+  @Override
+  public boolean supportsExternalMetadata() {
+    return true;
+  }
+
+  @Override
+  public Table getTable(
+      StructType schema, Transform[] partitioning, Map<String, String> properties) {
+    return new DatasetSourceBatchTable();
+  }
+
+  private static class DatasetSourceBatchTable implements SupportsRead {
+
+    @Override
+    public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
+      return new ScanBuilder() {
+
+        @Override
+        public Scan build() {
+          return new Scan() { // scan for Batch reading
+
+            @Override
+            public StructType readSchema() {
+              return BINARY_SCHEMA;
+            }
+
+            @Override
+            public Batch toBatch() {
+              return new BeamBatch<>(options);
+            }
+          };
+        }
+      };
+    }
+
+    @Override
+    public String name() {
+      return "BeamSource";
+    }
+
+    @Override
+    public StructType schema() {
+      return BINARY_SCHEMA;
+    }
+
+    @Override
+    public Set<TableCapability> capabilities() {
+      final HashSet<TableCapability> capabilities = new HashSet<>();

Review comment:
       Nit: can we make this an ImmutableSet and statically initialize it?

##########
File path: runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming.translation;
+
+import java.util.concurrent.TimeoutException;
+import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions;
+import org.apache.spark.sql.streaming.DataStreamWriter;
+
+/**
+ * Subclass of {@link
+ * org.apache.beam.runners.spark.structuredstreaming.translation.AbstractTranslationContext} that
+ * address spark breaking changes.
+ */
+public class TranslationContext extends AbstractTranslationContext {
+
+  public TranslationContext(SparkStructuredStreamingPipelineOptions options) {
+    super(options);
+  }
+
+  @Override
+  public void launchStreaming(DataStreamWriter<?> dataStreamWriter) {
+    try {
+      dataStreamWriter.start();
+    } catch (TimeoutException e) {
+      throw new RuntimeException("A timeout occurrend when running the streaming pipeline", e);

Review comment:
       ```suggestion
         throw new RuntimeException("A timeout occurred when running the streaming pipeline", e);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org