You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/10/24 10:08:33 UTC
[beam] 09/37: Fix code generation in Beam coder wrapper
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit d5645ff60aa99608a9ee3b8a5be6c58f9ac3903b
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Mon Sep 2 15:45:24 2019 +0200
Fix code generation in Beam coder wrapper
---
.../translation/helpers/EncoderHelpers.java | 93 ++++++++++++----------
.../structuredstreaming/utils/EncodersTest.java | 4 +-
2 files changed, 55 insertions(+), 42 deletions(-)
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
index 0765c78..cc862cd 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
@@ -42,15 +42,13 @@ import org.apache.spark.sql.catalyst.expressions.codegen.Block;
import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator;
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext;
import org.apache.spark.sql.catalyst.expressions.codegen.ExprCode;
-import org.apache.spark.sql.catalyst.expressions.codegen.ExprValue;
-import org.apache.spark.sql.catalyst.expressions.codegen.SimpleExprValue;
import org.apache.spark.sql.catalyst.expressions.codegen.VariableValue;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.ObjectType;
+import scala.Function1;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.JavaConversions;
-import scala.collection.Seq;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
@@ -143,29 +141,33 @@ public class EncoderHelpers {
@Override public ExprCode doGenCode(CodegenContext ctx, ExprCode ev) {
// Code to serialize.
ExprCode input = child.genCode(ctx);
- String javaType = CodeGenerator.javaType(dataType());
- String outputStream = "ByteArrayOutputStream baos = new ByteArrayOutputStream();";
-
- String serialize = outputStream + "$beamCoder.encode(${input.value}, baos); baos.toByteArray();";
-
- String outside = "final $javaType output = ${input.isNull} ? ${CodeGenerator.defaultValue(dataType)} : $serialize;";
- List<String> instructions = new ArrayList<>();
- instructions.add(outside);
- Seq<String> parts = JavaConversions.collectionAsScalaIterable(instructions).toSeq();
+ /*
+ CODE GENERATED
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final bytes[] output;
+ if ({input.isNull})
+ output = null;
+ else
+ output = $beamCoder.encode(${input.value}, baos); baos.toByteArray();
+ */
+ List<String> parts = new ArrayList<>();
+ parts.add("ByteArrayOutputStream baos = new ByteArrayOutputStream(); final bytes[] output; if (");
+ parts.add(") output = null; else output =");
+ parts.add(".encode(");
+ parts.add(", baos); baos.toByteArray();");
+
+ StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq());
- StringContext stringContext = new StringContext(parts);
- Block.BlockHelper blockHelper = new Block.BlockHelper(stringContext);
List<Object> args = new ArrayList<>();
- args.add(new VariableValue("beamCoder", Coder.class));
- args.add(new SimpleExprValue("input.value", ExprValue.class));
- args.add(new VariableValue("javaType", String.class));
- args.add(new SimpleExprValue("input.isNull", Boolean.class));
- args.add(new SimpleExprValue("CodeGenerator.defaultValue(dataType)", String.class));
- args.add(new VariableValue("serialize", String.class));
- Block code = blockHelper.code(JavaConversions.collectionAsScalaIterable(args).toSeq());
-
- return ev.copy(input.code().$plus(code), input.isNull(), new VariableValue("output", Array.class));
+ args.add(input.isNull());
+ args.add(beamCoder);
+ args.add(input.value());
+ Block code = (new Block.BlockHelper(sc))
+ .code(JavaConversions.collectionAsScalaIterable(args).toSeq());
+
+ return ev.copy(input.code().$plus(code), input.isNull(),
+ new VariableValue("output", Array.class));
}
@Override public DataType dataType() {
@@ -252,27 +254,38 @@ public class EncoderHelpers {
ExprCode input = child.genCode(ctx);
String javaType = CodeGenerator.javaType(dataType());
- String inputStream = "ByteArrayInputStream bais = new ByteArrayInputStream(${input.value});";
- String deserialize = inputStream + "($javaType) $beamCoder.decode(bais);";
+/*
+ CODE GENERATED:
+ final $javaType output =
+ ${input.isNull} ?
+ ${CodeGenerator.defaultValue(dataType)} :
+ ($javaType) $beamCoder.decode(new ByteArrayInputStream(${input.value}));
+*/
- String outside = "final $javaType output = ${input.isNull} ? ${CodeGenerator.defaultValue(dataType)} : $deserialize;";
+ List<String> parts = new ArrayList<>();
+ parts.add("final ");
+ parts.add(" output =");
+ parts.add("?");
+ parts.add(":");
+ parts.add("(");
+ parts.add(") ");
+ parts.add(".decode(new ByteArrayInputStream(");
+ parts.add("));");
- List<String> instructions = new ArrayList<>();
- instructions.add(outside);
- Seq<String> parts = JavaConversions.collectionAsScalaIterable(instructions).toSeq();
+ StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq());
- StringContext stringContext = new StringContext(parts);
- Block.BlockHelper blockHelper = new Block.BlockHelper(stringContext);
List<Object> args = new ArrayList<>();
- args.add(new SimpleExprValue("input.value", ExprValue.class));
- args.add(new VariableValue("javaType", String.class));
- args.add(new VariableValue("beamCoder", Coder.class));
- args.add(new SimpleExprValue("input.isNull", Boolean.class));
- args.add(new SimpleExprValue("CodeGenerator.defaultValue(dataType)", String.class));
- args.add(new VariableValue("deserialize", String.class));
- Block code = blockHelper.code(JavaConversions.collectionAsScalaIterable(args).toSeq());
-
- return ev.copy(input.code().$plus(code), input.isNull(), new VariableValue("output", classTag.runtimeClass()));
+ args.add(javaType);
+ args.add(input.isNull());
+ args.add(CodeGenerator.defaultValue(dataType(), false));
+ args.add(javaType);
+ args.add(beamCoder);
+ args.add(input.value());
+ Block code = (new Block.BlockHelper(sc))
+ .code(JavaConversions.collectionAsScalaIterable(args).toSeq());
+
+ return ev.copy(input.code().$plus(code), input.isNull(),
+ new VariableValue("output", classTag.runtimeClass()));
}
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/EncodersTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/EncodersTest.java
index 490e3dc..7078b0c 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/EncodersTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/EncodersTest.java
@@ -23,7 +23,7 @@ public class EncodersTest {
data.add(1);
data.add(2);
data.add(3);
-// sparkSession.createDataset(data, EncoderHelpers.fromBeamCoder(VarIntCoder.of()));
- sparkSession.createDataset(data, EncoderHelpers.genericEncoder());
+ sparkSession.createDataset(data, EncoderHelpers.fromBeamCoder(VarIntCoder.of()));
+// sparkSession.createDataset(data, EncoderHelpers.genericEncoder());
}
}