You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/09/05 14:48:07 UTC
[beam] 14/24: Fix getting the output value in code generation
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 959664f8092e5bb06c7a78dae78a12ac2559d413
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Wed Sep 4 16:50:17 2019 +0200
Fix getting the output value in code generation
---
.../translation/helpers/EncoderHelpers.java | 37 +++++++++++++---------
1 file changed, 22 insertions(+), 15 deletions(-)
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
index dff308a..a452da0 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
@@ -21,7 +21,6 @@ import static org.apache.spark.sql.types.DataTypes.BinaryType;
import static scala.compat.java8.JFunction.func;
import java.io.ByteArrayInputStream;
-import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
@@ -42,7 +41,6 @@ import org.apache.spark.sql.catalyst.expressions.codegen.Block;
import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator;
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext;
import org.apache.spark.sql.catalyst.expressions.codegen.ExprCode;
-import org.apache.spark.sql.catalyst.expressions.codegen.VariableValue;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.ObjectType;
import scala.StringContext;
@@ -144,34 +142,42 @@ public class EncoderHelpers {
/*
CODE GENERATED
+ byte[] ${ev.value};
try {
java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream();
- final byte[] output;
if ({input.isNull})
- output = null;
- else
- output = $beamCoder.encode(${input.value}, baos); baos.toByteArray();
+ ${ev.value} = null;
+ else{
+ $beamCoder.encode(${input.value}, baos);
+ ${ev.value} = baos.toByteArray();
+ }
} catch (Exception e) {
throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));
}
*/
List<String> parts = new ArrayList<>();
- parts.add("try {java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream(); final byte[] output; if (");
- parts.add(") output = null; else output =");
+ parts.add("byte[] ");
+ parts.add(";try { java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream(); if (");
+ parts.add(") ");
+ parts.add(" = null; else{");
parts.add(".encode(");
- parts.add(", baos); baos.toByteArray();} catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}");
+ parts.add(", baos); ");
+ parts.add(" = baos.toByteArray();}} catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}");
StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq());
List<Object> args = new ArrayList<>();
+
+ args.add(ev.value());
args.add(input.isNull());
+ args.add(ev.value());
args.add(beamCoder);
args.add(input.value());
+ args.add(ev.value());
Block code = (new Block.BlockHelper(sc))
.code(JavaConversions.collectionAsScalaIterable(args).toSeq());
- return ev.copy(input.code().$plus(code), input.isNull(),
- new VariableValue("output", Array.class));
+ return ev.copy(input.code().$plus(code), input.isNull(),ev.value());
}
@@ -263,7 +269,7 @@ public class EncoderHelpers {
/*
CODE GENERATED:
try {
- final $javaType output =
+ final $javaType ${ev.value} =
${input.isNull} ?
${CodeGenerator.defaultValue(dataType)} :
($javaType) $beamCoder.decode(new java.io.ByteArrayInputStream(${input.value}));
@@ -274,7 +280,8 @@ public class EncoderHelpers {
List<String> parts = new ArrayList<>();
parts.add("try { final ");
- parts.add(" output =");
+ parts.add(" ");
+ parts.add(" =");
parts.add("?");
parts.add(":");
parts.add("(");
@@ -286,6 +293,7 @@ public class EncoderHelpers {
List<Object> args = new ArrayList<>();
args.add(javaType);
+ args.add(ev.value());
args.add(input.isNull());
args.add(CodeGenerator.defaultValue(dataType(), false));
args.add(javaType);
@@ -294,8 +302,7 @@ public class EncoderHelpers {
Block code = (new Block.BlockHelper(sc))
.code(JavaConversions.collectionAsScalaIterable(args).toSeq());
- return ev.copy(input.code().$plus(code), input.isNull(),
- new VariableValue("output", classTag.runtimeClass()));
+ return ev.copy(input.code().$plus(code), input.isNull(), ev.value());
}