You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/09/05 14:48:07 UTC

[beam] 14/24: Fix getting the output value in code generation

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 959664f8092e5bb06c7a78dae78a12ac2559d413
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Wed Sep 4 16:50:17 2019 +0200

    Fix getting the output value in code generation
---
 .../translation/helpers/EncoderHelpers.java        | 37 +++++++++++++---------
 1 file changed, 22 insertions(+), 15 deletions(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
index dff308a..a452da0 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
@@ -21,7 +21,6 @@ import static org.apache.spark.sql.types.DataTypes.BinaryType;
 import static scala.compat.java8.JFunction.func;
 
 import java.io.ByteArrayInputStream;
-import java.lang.reflect.Array;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
@@ -42,7 +41,6 @@ import org.apache.spark.sql.catalyst.expressions.codegen.Block;
 import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator;
 import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext;
 import org.apache.spark.sql.catalyst.expressions.codegen.ExprCode;
-import org.apache.spark.sql.catalyst.expressions.codegen.VariableValue;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.ObjectType;
 import scala.StringContext;
@@ -144,34 +142,42 @@ public class EncoderHelpers {
 
       /*
         CODE GENERATED
+        byte[] ${ev.value};
        try {
         java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream();
-        final byte[] output;
         if ({input.isNull})
-            output = null;
-        else
-            output = $beamCoder.encode(${input.value}, baos); baos.toByteArray();
+            ${ev.value} = null;
+        else{
+            $beamCoder.encode(${input.value}, baos);
+            ${ev.value} =  baos.toByteArray();
+        }
         } catch (Exception e) {
           throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));
         }
       */
       List<String> parts = new ArrayList<>();
-      parts.add("try {java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream(); final byte[] output; if (");
-      parts.add(") output = null; else output =");
+      parts.add("byte[] ");
+      parts.add(";try { java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream(); if (");
+      parts.add(") ");
+      parts.add(" = null; else{");
       parts.add(".encode(");
-      parts.add(", baos); baos.toByteArray();} catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}");
+      parts.add(", baos); ");
+      parts.add(" = baos.toByteArray();}} catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}");
 
       StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq());
 
       List<Object> args = new ArrayList<>();
+
+      args.add(ev.value());
       args.add(input.isNull());
+      args.add(ev.value());
       args.add(beamCoder);
       args.add(input.value());
+      args.add(ev.value());
       Block code = (new Block.BlockHelper(sc))
           .code(JavaConversions.collectionAsScalaIterable(args).toSeq());
 
-      return ev.copy(input.code().$plus(code), input.isNull(),
-          new VariableValue("output", Array.class));
+      return ev.copy(input.code().$plus(code), input.isNull(),ev.value());
     }
 
 
@@ -263,7 +269,7 @@ public class EncoderHelpers {
 /*
      CODE GENERATED:
      try {
-      final $javaType output =
+      final $javaType ${ev.value} =
       ${input.isNull} ?
       ${CodeGenerator.defaultValue(dataType)} :
       ($javaType) $beamCoder.decode(new java.io.ByteArrayInputStream(${input.value}));
@@ -274,7 +280,8 @@ public class EncoderHelpers {
 
       List<String> parts = new ArrayList<>();
       parts.add("try { final ");
-      parts.add(" output =");
+      parts.add(" ");
+      parts.add(" =");
       parts.add("?");
       parts.add(":");
       parts.add("(");
@@ -286,6 +293,7 @@ public class EncoderHelpers {
 
       List<Object> args = new ArrayList<>();
       args.add(javaType);
+      args.add(ev.value());
       args.add(input.isNull());
       args.add(CodeGenerator.defaultValue(dataType(), false));
       args.add(javaType);
@@ -294,8 +302,7 @@ public class EncoderHelpers {
       Block code = (new Block.BlockHelper(sc))
           .code(JavaConversions.collectionAsScalaIterable(args).toSeq());
 
-      return ev.copy(input.code().$plus(code), input.isNull(),
-          new VariableValue("output", classTag.runtimeClass()));
+      return ev.copy(input.code().$plus(code), input.isNull(), ev.value());
 
     }