You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/07/18 07:53:33 UTC

[GitHub] [beam] echauchot commented on a diff in pull request #22157: Fixes #22156: Fix Spark3 runner to compile against Spark 3.2/3.3

echauchot commented on code in PR #22157:
URL: https://github.com/apache/beam/pull/22157#discussion_r916864607


##########
runners/spark/3/build.gradle:
##########
@@ -28,3 +28,37 @@ project.ext {
 
 // Load the main build script which contains all build logic.
 apply from: "$basePath/spark_runner.gradle"
+

Review Comment:
   I like this encapsulation and dynamic testing style



##########
runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderFactory.java:
##########
@@ -17,38 +17,35 @@
  */
 package org.apache.beam.runners.spark.structuredstreaming.translation.helpers;
 
-import static org.apache.spark.sql.types.DataTypes.BinaryType;
-
-import java.util.Collections;
-import java.util.List;
-import org.apache.beam.sdk.coders.Coder;
 import org.apache.spark.sql.Encoder;
-import org.apache.spark.sql.catalyst.analysis.GetColumnByOrdinal;
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
-import org.apache.spark.sql.catalyst.expressions.BoundReference;
-import org.apache.spark.sql.catalyst.expressions.Cast;
 import org.apache.spark.sql.catalyst.expressions.Expression;
-import org.apache.spark.sql.types.ObjectType;
-import scala.collection.JavaConversions;
-import scala.reflect.ClassTag;
+import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke;
+import org.apache.spark.sql.types.DataType;
+import scala.collection.Seq;
+import scala.collection.immutable.List;
+import scala.collection.immutable.Nil$;
+import scala.collection.mutable.WrappedArray;
 import scala.reflect.ClassTag$;
 
 public class EncoderFactory {
 
-  public static <T> Encoder<T> fromBeamCoder(Coder<T> coder) {
-    Class<? super T> clazz = coder.getEncodedTypeDescriptor().getRawType();
-    ClassTag<T> classTag = ClassTag$.MODULE$.apply(clazz);
-    List<Expression> serializers =
-        Collections.singletonList(
-            new EncoderHelpers.EncodeUsingBeamCoder<>(
-                new BoundReference(0, new ObjectType(clazz), true), coder));
-
+  static <T> Encoder<T> create(
+      Expression serializer, Expression deserializer, Class<? super T> clazz) {
+    List<Expression> serializers = Nil$.MODULE$.$colon$colon(serializer);
     return new ExpressionEncoder<>(
         SchemaHelpers.binarySchema(),
         false,
-        JavaConversions.collectionAsScalaIterable(serializers).toSeq(),
-        new EncoderHelpers.DecodeUsingBeamCoder<>(
-            new Cast(new GetColumnByOrdinal(0, BinaryType), BinaryType), classTag, coder),
-        classTag);
+        serializers,
+        deserializer,
+        ClassTag$.MODULE$.apply(clazz));
+  }
+
+  static Expression invokeIfNotNull(Class<?> cls, String fun, DataType type, Expression... args) {

Review Comment:
   Missing the javadoc as in the other `EncoderFactory` class.



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SparkSessionRule.java:
##########
@@ -37,23 +39,43 @@ public SparkSessionRule(String sparkMaster, Map<String, String> sparkConfig) {
     builder = SparkSession.builder();
     sparkConfig.forEach(builder::config);
     builder.master(sparkMaster);
+    builder.config("spark.sql.shuffle.partitions", numDriverCores(sparkMaster));
   }
 
   public SparkSessionRule(KV<String, String>... sparkConfig) {
-    this("local", sparkConfig);
+    this("local[2]", sparkConfig);
   }
 
   public SparkSessionRule(String sparkMaster, KV<String, String>... sparkConfig) {
     this(sparkMaster, Arrays.stream(sparkConfig).collect(toMap(KV::getKey, KV::getValue)));
   }
 
+  private static int numDriverCores(String master) {
+    return master.startsWith("local[")
+        ? Integer.parseInt(master.substring("local[".length(), master.length() - 1))

Review Comment:
   duplicated code, please extract in an uility method and deduplicate with `AbstractTranslationContext.java`



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SparkSessionRule.java:
##########
@@ -37,23 +39,43 @@ public SparkSessionRule(String sparkMaster, Map<String, String> sparkConfig) {
     builder = SparkSession.builder();
     sparkConfig.forEach(builder::config);
     builder.master(sparkMaster);
+    builder.config("spark.sql.shuffle.partitions", numDriverCores(sparkMaster));

Review Comment:
   yes, better to be in sync with what is configured for a prod session



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpersTest.java:
##########
@@ -35,13 +45,52 @@ public class EncoderHelpersTest {
 
   @ClassRule public static SparkSessionRule sessionRule = new SparkSessionRule();
 
+  private <T> Dataset<T> createDataset(List<T> data, Encoder<T> encoder) {
+    Dataset<T> ds = sessionRule.getSession().createDataset(data, encoder);
+    ds.printSchema();
+    return ds;
+  }
+
   @Test
   public void beamCoderToSparkEncoderTest() {
     List<Integer> data = Arrays.asList(1, 2, 3);
-    Dataset<Integer> dataset =
-        sessionRule
-            .getSession()
-            .createDataset(data, EncoderHelpers.fromBeamCoder(VarIntCoder.of()));
+    Dataset<Integer> dataset = createDataset(data, EncoderHelpers.fromBeamCoder(VarIntCoder.of()));
     assertEquals(data, dataset.collectAsList());
   }
+
+  @Test
+  public void testBeamEncoderOfPrivateType() {

Review Comment:
   yeah, cf the comment on using OBJECT_TYPE in the prod code.
   
   what was the problem with private types ?



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpersTest.java:
##########
@@ -35,13 +45,52 @@ public class EncoderHelpersTest {
 
   @ClassRule public static SparkSessionRule sessionRule = new SparkSessionRule();
 
+  private <T> Dataset<T> createDataset(List<T> data, Encoder<T> encoder) {
+    Dataset<T> ds = sessionRule.getSession().createDataset(data, encoder);
+    ds.printSchema();
+    return ds;
+  }
+
   @Test
   public void beamCoderToSparkEncoderTest() {
     List<Integer> data = Arrays.asList(1, 2, 3);
-    Dataset<Integer> dataset =
-        sessionRule
-            .getSession()
-            .createDataset(data, EncoderHelpers.fromBeamCoder(VarIntCoder.of()));
+    Dataset<Integer> dataset = createDataset(data, EncoderHelpers.fromBeamCoder(VarIntCoder.of()));
     assertEquals(data, dataset.collectAsList());
   }
+
+  @Test
+  public void testBeamEncoderOfPrivateType() {
+    List<PrivateString> data = asList(new PrivateString("1"), new PrivateString("2"));
+    Dataset<PrivateString> dataset = createDataset(data, fromBeamCoder(PrivateString.coder));
+    assertThat(dataset.collect(), equalTo(data.toArray()));
+  }
+
+  private static class PrivateString {
+    private static Coder<PrivateString> coder =

Review Comment:
   final



##########
.test-infra/jenkins/job_PreCommit_Java_Spark3_Versions.groovy:
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import PrecommitJobBuilder
+
+PrecommitJobBuilder builder = new PrecommitJobBuilder(
+    scope: this,
+    nameBase: 'Java_Spark3_Versions',
+    gradleTask: ':runners:spark:3:sparkVersionsTest',
+    gradleSwitches: [
+      '-PdisableSpotlessCheck=true'

Review Comment:
   we should not disable spotless check



##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderFactory.java:
##########
@@ -17,33 +17,48 @@
  */
 package org.apache.beam.runners.spark.structuredstreaming.translation.helpers;
 
-import static org.apache.spark.sql.types.DataTypes.BinaryType;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.spark.sql.Encoder;
-import org.apache.spark.sql.catalyst.analysis.GetColumnByOrdinal;
+import java.lang.reflect.Constructor;
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
-import org.apache.spark.sql.catalyst.expressions.BoundReference;
-import org.apache.spark.sql.catalyst.expressions.Cast;
 import org.apache.spark.sql.catalyst.expressions.Expression;
-import org.apache.spark.sql.types.ObjectType;
+import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke;
+import org.apache.spark.sql.types.DataType;
+import scala.collection.immutable.Nil$;
+import scala.collection.mutable.WrappedArray;
 import scala.reflect.ClassTag;
-import scala.reflect.ClassTag$;
 
 public class EncoderFactory {
+  // default constructor to reflectively create static invoke expressions
+  private static final Constructor<StaticInvoke> STATIC_INVOKE_CONSTRUCTOR =
+      (Constructor<StaticInvoke>) StaticInvoke.class.getConstructors()[0];
+
+  static <T> ExpressionEncoder<T> create(
+      Expression serializer, Expression deserializer, Class<? super T> clazz) {
+    return new ExpressionEncoder<>(serializer, deserializer, ClassTag.apply(clazz));
+  }
 
-  public static <T> Encoder<T> fromBeamCoder(Coder<T> coder) {
-    Class<? super T> clazz = coder.getEncodedTypeDescriptor().getRawType();
-    ClassTag<T> classTag = ClassTag$.MODULE$.apply(clazz);
-    Expression serializer =
-        new EncoderHelpers.EncodeUsingBeamCoder<>(
-            new BoundReference(0, new ObjectType(clazz), true), coder);
-    Expression deserializer =
-        new EncoderHelpers.DecodeUsingBeamCoder<>(
-            new Cast(
-                new GetColumnByOrdinal(0, BinaryType), BinaryType, scala.Option.<String>empty()),
-            classTag,
-            coder);
-    return new ExpressionEncoder<>(serializer, deserializer, classTag);
+  /**
+   * Invoke method {@code fun} on Class {@code cls}, immediately propagating {@code null} if any
+   * input arg is {@code null}.
+   *
+   * <p>To address breaking interfaces between various version of Spark 3 these are created

Review Comment:
   you could have also passed the spark version in (from the spark session) and call the correct constructor depending on the version. That way you would have spared using the reflexion api and also guessing the version based on the number of parameters of the constructor



##########
runners/spark/3/build.gradle:
##########
@@ -28,3 +28,37 @@ project.ext {
 
 // Load the main build script which contains all build logic.
 apply from: "$basePath/spark_runner.gradle"
+
+
+def sparkVersions = [
+    "330": "3.3.0",
+    "321": "3.2.1"
+]
+
+sparkVersions.each { kv ->

Review Comment:
   ok, you test on all supported versions, but what version is the resulting spark-3-runner artifact built against ?



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SparkSessionRule.java:
##########
@@ -37,23 +39,43 @@ public SparkSessionRule(String sparkMaster, Map<String, String> sparkConfig) {
     builder = SparkSession.builder();
     sparkConfig.forEach(builder::config);
     builder.master(sparkMaster);
+    builder.config("spark.sql.shuffle.partitions", numDriverCores(sparkMaster));
   }
 
   public SparkSessionRule(KV<String, String>... sparkConfig) {
-    this("local", sparkConfig);
+    this("local[2]", sparkConfig);

Review Comment:
   default to 4 is the value people are used to. Why change or why not let it detect the number of CPU cores of the system ?



##########
runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java:
##########
@@ -146,10 +146,12 @@ public SparkStructuredStreamingPipelineResult run(final Pipeline pipeline) {
             });
     executorService.shutdown();
 
-    // TODO: Streaming.
+    Runnable onTerminalState =
+        options.getUseActiveSparkSession()
+            ? () -> {}
+            : () -> translationContext.getSparkSession().stop();

Review Comment:
   Compared to what old SparkStructuredStreamingPipelineResult#stop(() did, this callback misses to set the pipeline state: `if the pipeline was running, then set its state to State.STOPPED`



##########
runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderFactory.java:
##########
@@ -17,38 +17,35 @@
  */
 package org.apache.beam.runners.spark.structuredstreaming.translation.helpers;
 
-import static org.apache.spark.sql.types.DataTypes.BinaryType;
-
-import java.util.Collections;
-import java.util.List;
-import org.apache.beam.sdk.coders.Coder;
 import org.apache.spark.sql.Encoder;
-import org.apache.spark.sql.catalyst.analysis.GetColumnByOrdinal;
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
-import org.apache.spark.sql.catalyst.expressions.BoundReference;
-import org.apache.spark.sql.catalyst.expressions.Cast;
 import org.apache.spark.sql.catalyst.expressions.Expression;
-import org.apache.spark.sql.types.ObjectType;
-import scala.collection.JavaConversions;
-import scala.reflect.ClassTag;
+import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke;
+import org.apache.spark.sql.types.DataType;
+import scala.collection.Seq;
+import scala.collection.immutable.List;
+import scala.collection.immutable.Nil$;
+import scala.collection.mutable.WrappedArray;
 import scala.reflect.ClassTag$;
 
 public class EncoderFactory {
 
-  public static <T> Encoder<T> fromBeamCoder(Coder<T> coder) {
-    Class<? super T> clazz = coder.getEncodedTypeDescriptor().getRawType();
-    ClassTag<T> classTag = ClassTag$.MODULE$.apply(clazz);
-    List<Expression> serializers =
-        Collections.singletonList(
-            new EncoderHelpers.EncodeUsingBeamCoder<>(
-                new BoundReference(0, new ObjectType(clazz), true), coder));
-
+  static <T> Encoder<T> create(
+      Expression serializer, Expression deserializer, Class<? super T> clazz) {
+    List<Expression> serializers = Nil$.MODULE$.$colon$colon(serializer);

Review Comment:
   I'm sorry but I find this line unreadable (mainly because of scala integration in java). This is only to add an element at the beginning of an empty list. I would prefer `JavaConversions.collectionAsScalaIterable()` and make the collection in java. Remember that there is no scala in Beam so we should reduce scala to the strict minimum.



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SparkSessionRule.java:
##########
@@ -37,23 +39,43 @@ public SparkSessionRule(String sparkMaster, Map<String, String> sparkConfig) {
     builder = SparkSession.builder();
     sparkConfig.forEach(builder::config);
     builder.master(sparkMaster);
+    builder.config("spark.sql.shuffle.partitions", numDriverCores(sparkMaster));
   }
 
   public SparkSessionRule(KV<String, String>... sparkConfig) {
-    this("local", sparkConfig);
+    this("local[2]", sparkConfig);
   }
 
   public SparkSessionRule(String sparkMaster, KV<String, String>... sparkConfig) {
     this(sparkMaster, Arrays.stream(sparkConfig).collect(toMap(KV::getKey, KV::getValue)));
   }
 
+  private static int numDriverCores(String master) {

Review Comment:
   please rename into `extractNumWorkers`.



##########
runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderFactory.java:
##########
@@ -17,38 +17,35 @@
  */
 package org.apache.beam.runners.spark.structuredstreaming.translation.helpers;
 
-import static org.apache.spark.sql.types.DataTypes.BinaryType;
-
-import java.util.Collections;
-import java.util.List;
-import org.apache.beam.sdk.coders.Coder;
 import org.apache.spark.sql.Encoder;
-import org.apache.spark.sql.catalyst.analysis.GetColumnByOrdinal;
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
-import org.apache.spark.sql.catalyst.expressions.BoundReference;
-import org.apache.spark.sql.catalyst.expressions.Cast;
 import org.apache.spark.sql.catalyst.expressions.Expression;
-import org.apache.spark.sql.types.ObjectType;
-import scala.collection.JavaConversions;
-import scala.reflect.ClassTag;
+import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke;
+import org.apache.spark.sql.types.DataType;
+import scala.collection.Seq;
+import scala.collection.immutable.List;
+import scala.collection.immutable.Nil$;
+import scala.collection.mutable.WrappedArray;
 import scala.reflect.ClassTag$;
 
 public class EncoderFactory {
 
-  public static <T> Encoder<T> fromBeamCoder(Coder<T> coder) {
-    Class<? super T> clazz = coder.getEncodedTypeDescriptor().getRawType();
-    ClassTag<T> classTag = ClassTag$.MODULE$.apply(clazz);
-    List<Expression> serializers =
-        Collections.singletonList(
-            new EncoderHelpers.EncodeUsingBeamCoder<>(
-                new BoundReference(0, new ObjectType(clazz), true), coder));
-
+  static <T> Encoder<T> create(
+      Expression serializer, Expression deserializer, Class<? super T> clazz) {
+    List<Expression> serializers = Nil$.MODULE$.$colon$colon(serializer);
     return new ExpressionEncoder<>(
         SchemaHelpers.binarySchema(),
         false,
-        JavaConversions.collectionAsScalaIterable(serializers).toSeq(),
-        new EncoderHelpers.DecodeUsingBeamCoder<>(
-            new Cast(new GetColumnByOrdinal(0, BinaryType), BinaryType), classTag, coder),
-        classTag);
+        serializers,
+        deserializer,
+        ClassTag$.MODULE$.apply(clazz));
+  }
+
+  static Expression invokeIfNotNull(Class<?> cls, String fun, DataType type, Expression... args) {
+    return new StaticInvoke(cls, type, fun, seqOf(args), true, true);
+  }
+
+  private static Seq<Expression> seqOf(Expression... args) {

Review Comment:
   inline like in the other `EncoderFactory` class



##########
runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderFactory.java:
##########
@@ -17,38 +17,35 @@
  */
 package org.apache.beam.runners.spark.structuredstreaming.translation.helpers;
 
-import static org.apache.spark.sql.types.DataTypes.BinaryType;
-
-import java.util.Collections;
-import java.util.List;
-import org.apache.beam.sdk.coders.Coder;
 import org.apache.spark.sql.Encoder;
-import org.apache.spark.sql.catalyst.analysis.GetColumnByOrdinal;
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
-import org.apache.spark.sql.catalyst.expressions.BoundReference;
-import org.apache.spark.sql.catalyst.expressions.Cast;
 import org.apache.spark.sql.catalyst.expressions.Expression;
-import org.apache.spark.sql.types.ObjectType;
-import scala.collection.JavaConversions;
-import scala.reflect.ClassTag;
+import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke;
+import org.apache.spark.sql.types.DataType;
+import scala.collection.Seq;
+import scala.collection.immutable.List;
+import scala.collection.immutable.Nil$;
+import scala.collection.mutable.WrappedArray;
 import scala.reflect.ClassTag$;
 
 public class EncoderFactory {
 
-  public static <T> Encoder<T> fromBeamCoder(Coder<T> coder) {
-    Class<? super T> clazz = coder.getEncodedTypeDescriptor().getRawType();
-    ClassTag<T> classTag = ClassTag$.MODULE$.apply(clazz);
-    List<Expression> serializers =
-        Collections.singletonList(
-            new EncoderHelpers.EncodeUsingBeamCoder<>(
-                new BoundReference(0, new ObjectType(clazz), true), coder));
-
+  static <T> Encoder<T> create(
+      Expression serializer, Expression deserializer, Class<? super T> clazz) {
+    List<Expression> serializers = Nil$.MODULE$.$colon$colon(serializer);
     return new ExpressionEncoder<>(
         SchemaHelpers.binarySchema(),
         false,
-        JavaConversions.collectionAsScalaIterable(serializers).toSeq(),
-        new EncoderHelpers.DecodeUsingBeamCoder<>(
-            new Cast(new GetColumnByOrdinal(0, BinaryType), BinaryType), classTag, coder),
-        classTag);
+        serializers,
+        deserializer,
+        ClassTag$.MODULE$.apply(clazz));

Review Comment:
   Can't you just use `ClassTag.apply(clazz)` as in the other EncoderFactory class ?



##########
runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java:
##########
@@ -19,256 +19,53 @@
 
 import static org.apache.spark.sql.types.DataTypes.BinaryType;
 
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.spark.sql.Encoder;
-import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.catalyst.analysis.GetColumnByOrdinal;
+import org.apache.spark.sql.catalyst.expressions.BoundReference;
 import org.apache.spark.sql.catalyst.expressions.Expression;
-import org.apache.spark.sql.catalyst.expressions.NonSQLExpression;
-import org.apache.spark.sql.catalyst.expressions.UnaryExpression;
-import org.apache.spark.sql.catalyst.expressions.codegen.Block;
-import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator;
-import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext;
-import org.apache.spark.sql.catalyst.expressions.codegen.ExprCode;
+import org.apache.spark.sql.catalyst.expressions.Literal;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.ObjectType;
-import org.checkerframework.checker.nullness.qual.Nullable;
-import scala.StringContext;
-import scala.collection.JavaConversions;
-import scala.reflect.ClassTag;
+import org.checkerframework.checker.nullness.qual.NonNull;
 
-/** {@link Encoders} utility class. */
-@SuppressWarnings({
-  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
 public class EncoderHelpers {
+  private static final DataType OBJECT_TYPE = new ObjectType(Object.class);
+
   /**
    * Wrap a Beam coder into a Spark Encoder using Catalyst Expression Encoders (which uses java code
    * generation).
    */
   public static <T> Encoder<T> fromBeamCoder(Coder<T> coder) {
-    return EncoderFactory.fromBeamCoder(coder);
+    Class<? super T> clazz = coder.getEncodedTypeDescriptor().getRawType();
+    // Class T could be private, therefore use OBJECT_TYPE to not risk an IllegalAccessError
+    return EncoderFactory.create(
+        beamSerializer(rootRef(OBJECT_TYPE, true), coder),
+        beamDeserializer(rootCol(BinaryType), coder),

Review Comment:
   Thanks for that ! It is way more maintainable than relying on code generation ! What I wonder is why spark bundled Encoders like Kryo rely on code generation. Have you seen a perf drop of not using code generation in Beam ExpressionEncoders ?



##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderFactory.java:
##########
@@ -17,33 +17,48 @@
  */
 package org.apache.beam.runners.spark.structuredstreaming.translation.helpers;
 
-import static org.apache.spark.sql.types.DataTypes.BinaryType;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.spark.sql.Encoder;
-import org.apache.spark.sql.catalyst.analysis.GetColumnByOrdinal;
+import java.lang.reflect.Constructor;
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
-import org.apache.spark.sql.catalyst.expressions.BoundReference;
-import org.apache.spark.sql.catalyst.expressions.Cast;
 import org.apache.spark.sql.catalyst.expressions.Expression;
-import org.apache.spark.sql.types.ObjectType;
+import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke;
+import org.apache.spark.sql.types.DataType;
+import scala.collection.immutable.Nil$;
+import scala.collection.mutable.WrappedArray;
 import scala.reflect.ClassTag;
-import scala.reflect.ClassTag$;
 
 public class EncoderFactory {
+  // default constructor to reflectively create static invoke expressions
+  private static final Constructor<StaticInvoke> STATIC_INVOKE_CONSTRUCTOR =
+      (Constructor<StaticInvoke>) StaticInvoke.class.getConstructors()[0];
+
+  static <T> ExpressionEncoder<T> create(
+      Expression serializer, Expression deserializer, Class<? super T> clazz) {
+    return new ExpressionEncoder<>(serializer, deserializer, ClassTag.apply(clazz));

Review Comment:
   it is more logical API than what was in v2 where there was a list of serializers but a single deserializer



##########
runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/sink/CsvSink.java:
##########
@@ -18,22 +18,64 @@
 package org.apache.beam.runners.spark.metrics.sink;
 
 import com.codahale.metrics.MetricRegistry;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.lang.reflect.Constructor;
 import java.util.Properties;
 import org.apache.beam.runners.spark.metrics.AggregatorMetric;
 import org.apache.beam.runners.spark.metrics.WithMetricsSupport;
 import org.apache.spark.metrics.sink.Sink;
 
 /**
- * A Spark {@link Sink} that is tailored to report {@link AggregatorMetric} metrics to a CSV file.
+ * A {@link Sink} for <a href="https://spark.apache.org/docs/latest/monitoring.html#metrics">Spark's
+ * metric system</a> that is tailored to report {@link AggregatorMetric}s to a CSV file.
+ *
+ * <p>The sink is configured using Spark configuration parameters, for example:
+ *
+ * <pre>{@code
+ * "spark.metrics.conf.*.sink.csv.class"="org.apache.beam.runners.spark.metrics.sink.CsvSink"
+ * "spark.metrics.conf.*.sink.csv.directory"="<output_directory>"
+ * "spark.metrics.conf.*.sink.csv.period"=10
+ * "spark.metrics.conf.*.sink.csv.unit"=seconds
+ * }</pre>
  */
-// Intentionally overriding parent name because inheritors should replace the parent.
-@SuppressFBWarnings("NM_SAME_SIMPLE_NAME_AS_SUPERCLASS")
-public class CsvSink extends org.apache.spark.metrics.sink.CsvSink {
+public class CsvSink implements Sink {
+
+  // Initialized reflectively as done by Spark's MetricsSystem
+  private final org.apache.spark.metrics.sink.CsvSink delegate;
+
+  /** Constructor for Spark 3.1.x. */
   public CsvSink(
       final Properties properties,
       final MetricRegistry metricRegistry,
       final org.apache.spark.SecurityManager securityMgr) {
-    super(properties, WithMetricsSupport.forRegistry(metricRegistry), securityMgr);
+    delegate = newDelegate(properties, WithMetricsSupport.forRegistry(metricRegistry), securityMgr);
+  }
+
+  /** Constructor for Spark 3.2.x and later. */
+  public CsvSink(final Properties properties, final MetricRegistry metricRegistry) {
+    delegate = newDelegate(properties, WithMetricsSupport.forRegistry(metricRegistry));
+  }
+
+  @Override
+  public void start() {
+    delegate.start();
+  }
+
+  @Override
+  public void stop() {
+    delegate.stop();
+  }
+
+  @Override
+  public void report() {
+    delegate.report();
+  }
+
+  private static org.apache.spark.metrics.sink.CsvSink newDelegate(Object... params) {
+    try {
+      Constructor<?> constructor = org.apache.spark.metrics.sink.CsvSink.class.getConstructors()[0];

Review Comment:
   I'm not a big fan of using the reflexion API. I know why you do it, it is to avoid having 2 maven modules for spark 3.1 and spark 3.2+. So you don't know which is your runtime spark version so you need to use the reflexion api. But I would prefer that you use an explicit call to the correct constructor like below rather than using varargs and constructor[0] which could change in the future without compilation error.
   ```
    delegate = org.apache.spark.metrics.sink.CsvSink.class.getConstructor(Properties.class,
         MetricRegistry.class, org.apache.spark.SecurityManager.class).newInstance(properties, metricRegistry, securityMgr);
   ```



##########
runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/sink/CodahaleCsvSink.java:
##########
@@ -18,19 +18,64 @@
 package org.apache.beam.runners.spark.structuredstreaming.metrics.sink;
 
 import com.codahale.metrics.MetricRegistry;
+import java.lang.reflect.Constructor;
 import java.util.Properties;
 import org.apache.beam.runners.spark.structuredstreaming.metrics.AggregatorMetric;
 import org.apache.beam.runners.spark.structuredstreaming.metrics.WithMetricsSupport;
 import org.apache.spark.metrics.sink.Sink;
 
 /**
- * A Spark {@link Sink} that is tailored to report {@link AggregatorMetric} metrics to a CSV file.
+ * A {@link Sink} for <a href="https://spark.apache.org/docs/latest/monitoring.html#metrics">Spark's
+ * metric system</a> that is tailored to report {@link AggregatorMetric}s to a CSV file.
+ *
+ * <p>The sink is configured using Spark configuration parameters, for example:
+ *
+ * <pre>{@code
+ * "spark.metrics.conf.*.sink.csv.class"="org.apache.beam.runners.spark.structuredstreaming.metrics.sink.CodahaleCsvSink"
+ * "spark.metrics.conf.*.sink.csv.directory"="<output_directory>"
+ * "spark.metrics.conf.*.sink.csv.period"=10
+ * "spark.metrics.conf.*.sink.csv.unit"=seconds
+ * }</pre>
  */
-public class CodahaleCsvSink extends org.apache.spark.metrics.sink.CsvSink {
+public class CodahaleCsvSink implements Sink {
+
+  // Initialized reflectively as done by Spark's MetricsSystem
+  private final org.apache.spark.metrics.sink.CsvSink delegate;
+
+  /** Constructor for Spark 3.1.x. */
   public CodahaleCsvSink(
       final Properties properties,
       final MetricRegistry metricRegistry,
       final org.apache.spark.SecurityManager securityMgr) {
-    super(properties, WithMetricsSupport.forRegistry(metricRegistry), securityMgr);
+    delegate = newDelegate(properties, WithMetricsSupport.forRegistry(metricRegistry), securityMgr);
+  }
+
+  /** Constructor for Spark 3.2.x and later. */
+  public CodahaleCsvSink(final Properties properties, final MetricRegistry metricRegistry) {
+    delegate = newDelegate(properties, WithMetricsSupport.forRegistry(metricRegistry));
+  }
+
+  @Override
+  public void start() {
+    delegate.start();
+  }
+
+  @Override
+  public void stop() {
+    delegate.stop();
+  }
+
+  @Override
+  public void report() {
+    delegate.report();
+  }
+
+  private static org.apache.spark.metrics.sink.CsvSink newDelegate(Object... params) {
+    try {
+      Constructor<?> constructor = org.apache.spark.metrics.sink.CsvSink.class.getConstructors()[0];

Review Comment:
   ditto



##########
runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/sink/GraphiteSink.java:
##########
@@ -18,20 +18,68 @@
 package org.apache.beam.runners.spark.metrics.sink;
 
 import com.codahale.metrics.MetricRegistry;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.lang.reflect.Constructor;
 import java.util.Properties;
 import org.apache.beam.runners.spark.metrics.AggregatorMetric;
 import org.apache.beam.runners.spark.metrics.WithMetricsSupport;
 import org.apache.spark.metrics.sink.Sink;
 
-/** A Spark {@link Sink} that is tailored to report {@link AggregatorMetric} metrics to Graphite. */
-// Intentionally overriding parent name because inheritors should replace the parent.
-@SuppressFBWarnings("NM_SAME_SIMPLE_NAME_AS_SUPERCLASS")
-public class GraphiteSink extends org.apache.spark.metrics.sink.GraphiteSink {
+/**
+ * A {@link Sink} for <a href="https://spark.apache.org/docs/latest/monitoring.html#metrics">Spark's
+ * metric system</a> that is tailored to report {@link AggregatorMetric}s to Graphite.
+ *
+ * <p>The sink is configured using Spark configuration parameters, for example:
+ *
+ * <pre>{@code
+ * "spark.metrics.conf.*.sink.graphite.class"="org.apache.beam.runners.spark.metrics.sink.GraphiteSink"
+ * "spark.metrics.conf.*.sink.graphite.host"="<graphite_hostname>"
+ * "spark.metrics.conf.*.sink.graphite.port"=<graphite_listening_port>
+ * "spark.metrics.conf.*.sink.graphite.period"=10
+ * "spark.metrics.conf.*.sink.graphite.unit"=seconds
+ * "spark.metrics.conf.*.sink.graphite.prefix"="<optional_prefix>"
+ * "spark.metrics.conf.*.sink.graphite.regex"="<optional_regex_to_send_matching_metrics>"
+ * }</pre>
+ */
+public class GraphiteSink implements Sink {
+
+  // Initialized reflectively as done by Spark's MetricsSystem
+  private final org.apache.spark.metrics.sink.GraphiteSink delegate;
+
+  /** Constructor for Spark 3.1.x. */
   public GraphiteSink(
       final Properties properties,
       final MetricRegistry metricRegistry,
       final org.apache.spark.SecurityManager securityMgr) {
-    super(properties, WithMetricsSupport.forRegistry(metricRegistry), securityMgr);
+    delegate = newDelegate(properties, WithMetricsSupport.forRegistry(metricRegistry), securityMgr);
+  }
+
+  /** Constructor for Spark 3.2.x and later. */
+  public GraphiteSink(final Properties properties, final MetricRegistry metricRegistry) {
+    delegate = newDelegate(properties, WithMetricsSupport.forRegistry(metricRegistry));
+  }
+
+  @Override
+  public void start() {
+    delegate.start();
+  }
+
+  @Override
+  public void stop() {
+    delegate.stop();
+  }
+
+  @Override
+  public void report() {
+    delegate.report();
+  }
+
+  private static org.apache.spark.metrics.sink.GraphiteSink newDelegate(Object... params) {
+    try {
+      Constructor<?> constructor =
+          org.apache.spark.metrics.sink.GraphiteSink.class.getConstructors()[0];

Review Comment:
   same comment as for CsvSink



##########
runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/sink/CodahaleGraphiteSink.java:
##########
@@ -18,17 +18,68 @@
 package org.apache.beam.runners.spark.structuredstreaming.metrics.sink;
 
 import com.codahale.metrics.MetricRegistry;
+import java.lang.reflect.Constructor;
 import java.util.Properties;
 import org.apache.beam.runners.spark.structuredstreaming.metrics.AggregatorMetric;
 import org.apache.beam.runners.spark.structuredstreaming.metrics.WithMetricsSupport;
 import org.apache.spark.metrics.sink.Sink;
 
-/** A Spark {@link Sink} that is tailored to report {@link AggregatorMetric} metrics to Graphite. */
-public class CodahaleGraphiteSink extends org.apache.spark.metrics.sink.GraphiteSink {
+/**
+ * A {@link Sink} for <a href="https://spark.apache.org/docs/latest/monitoring.html#metrics">Spark's
+ * metric system</a> that is tailored to report {@link AggregatorMetric}s to Graphite.
+ *
+ * <p>The sink is configured using Spark configuration parameters, for example:
+ *
+ * <pre>{@code
+ * "spark.metrics.conf.*.sink.graphite.class"="org.apache.beam.runners.spark.structuredstreaming.metrics.sink.CodahaleGraphiteSink"
+ * "spark.metrics.conf.*.sink.graphite.host"="<graphite_hostname>"
+ * "spark.metrics.conf.*.sink.graphite.port"=<graphite_listening_port>
+ * "spark.metrics.conf.*.sink.graphite.period"=10
+ * "spark.metrics.conf.*.sink.graphite.unit"=seconds
+ * "spark.metrics.conf.*.sink.graphite.prefix"="<optional_prefix>"
+ * "spark.metrics.conf.*.sink.graphite.regex"="<optional_regex_to_send_matching_metrics>"
+ * }</pre>
+ */
+public class CodahaleGraphiteSink implements Sink {
+
+  // Initialized reflectively as done by Spark's MetricsSystem
+  private final org.apache.spark.metrics.sink.GraphiteSink delegate;
+
+  /** Constructor for Spark 3.1.x. */
   public CodahaleGraphiteSink(
       final Properties properties,
       final MetricRegistry metricRegistry,
       final org.apache.spark.SecurityManager securityMgr) {
-    super(properties, WithMetricsSupport.forRegistry(metricRegistry), securityMgr);
+    delegate = newDelegate(properties, WithMetricsSupport.forRegistry(metricRegistry), securityMgr);
+  }
+
+  /** Constructor for Spark 3.2.x and later. */
+  public CodahaleGraphiteSink(final Properties properties, final MetricRegistry metricRegistry) {
+    delegate = newDelegate(properties, WithMetricsSupport.forRegistry(metricRegistry));
+  }
+
+  @Override
+  public void start() {
+    delegate.start();
+  }
+
+  @Override
+  public void stop() {
+    delegate.stop();
+  }
+
+  @Override
+  public void report() {
+    delegate.report();
+  }
+
+  private static org.apache.spark.metrics.sink.GraphiteSink newDelegate(Object... params) {
+    try {
+      Constructor<?> constructor =
+          org.apache.spark.metrics.sink.GraphiteSink.class.getConstructors()[0];

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org