You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/10/24 10:08:38 UTC

[beam] 14/37: Fix beam coder lazy init using reflexion: use .clas + try catch + cast

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 0cf2c8759a64c81c1d4f83f74a759ae3dafd1f83
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Thu Sep 5 10:07:32 2019 +0200

    Fix beam coder lazy init using reflexion: use .clas + try catch + cast
---
 .../translation/helpers/EncoderHelpers.java           | 19 ++++++++++++-------
 1 file changed, 12 insertions(+), 7 deletions(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
index a452da0..0751c4c 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Objects;
 import org.apache.beam.runners.spark.structuredstreaming.translation.SchemaHelpers;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.spark.sql.Encoder;
@@ -388,18 +389,22 @@ case class DecodeUsingSerializer[T](child: Expression, tag: ClassTag[T], kryo: B
     ctx.addImmutableStateIfNotExists(coderClass.getName(), beamCoderInstance, func(v1 -> {
       /*
     CODE GENERATED
-    v = (coderClass) coderClass.getDeclaredConstructor().newInstance();
+    try {
+    v1 = coderClass.class.getDeclaredConstructor().newInstance();
+    } catch (Exception e) {
+      throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));
+    }
      */
-        List<String> parts = new ArrayList<>();
-        parts.add("");
+      List<String> parts = new ArrayList<>();
+        parts.add("try {");
         parts.add(" = (");
-        parts.add(") ");
-        parts.add(".getDeclaredConstructor().newInstance();");
+      parts.add(") ");
+      parts.add(".class.getDeclaredConstructor().newInstance();} catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}");
         StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq());
         List<Object> args = new ArrayList<>();
         args.add(v1);
-        args.add(coderClass.getName());
-        args.add(coderClass.getName());
+      args.add(coderClass.getName());
+      args.add(coderClass.getName());
         return sc.s(JavaConversions.collectionAsScalaIterable(args).toSeq());
       }));
     return beamCoderInstance;