You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/09/05 14:48:04 UTC
[beam] 11/24: Fix call to scala Fucntion1 in coder lazy init
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 2c94eef2f1d2677bf64851a8c22520c44557170f
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Wed Sep 4 14:54:59 2019 +0200
Fix call to scala Fucntion1 in coder lazy init
---
runners/spark/build.gradle | 1 +
.../translation/helpers/EncoderHelpers.java | 28 +++++++++++-----------
2 files changed, 15 insertions(+), 14 deletions(-)
diff --git a/runners/spark/build.gradle b/runners/spark/build.gradle
index 73a710b..a948ef1 100644
--- a/runners/spark/build.gradle
+++ b/runners/spark/build.gradle
@@ -77,6 +77,7 @@ dependencies {
provided "com.esotericsoftware.kryo:kryo:2.21"
runtimeOnly library.java.jackson_module_scala
runtimeOnly "org.scala-lang:scala-library:2.11.8"
+ compile "org.scala-lang.modules:scala-java8-compat_2.11:0.9.0"
testCompile project(":sdks:java:io:kafka")
testCompile project(path: ":sdks:java:core", configuration: "shadowTest")
// SparkStateInternalsTest extends abstract StateInternalsTest
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
index f7706cc..694bc24 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
@@ -18,9 +18,9 @@
package org.apache.beam.runners.spark.structuredstreaming.translation.helpers;
import static org.apache.spark.sql.types.DataTypes.BinaryType;
+import static scala.compat.java8.JFunction.func;
import java.io.ByteArrayInputStream;
-import java.io.IOException;
import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.List;
@@ -370,23 +370,23 @@ case class DecodeUsingSerializer[T](child: Expression, tag: ClassTag[T], kryo: B
private static <T> String lazyInitBeamCoder(CodegenContext ctx, Class<Coder<T>> coderClass) {
String beamCoderInstance = "beamCoder";
- ctx.addImmutableStateIfNotExists(coderClass.getName(), beamCoderInstance, v -> {
+ ctx.addImmutableStateIfNotExists(coderClass.getName(), beamCoderInstance, func(v1 -> {
/*
CODE GENERATED
v = (coderClass) coderClass.newInstance();
*/
- List<String> parts = new ArrayList<>();
- parts.add("");
- parts.add(" = (");
- parts.add(") ");
- parts.add(".newInstance();");
- StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq());
- List<Object> args = new ArrayList<>();
- args.add(v);
- args.add(coderClass.getName());
- args.add(coderClass.getName());
- return sc.s(JavaConversions.collectionAsScalaIterable(args).toSeq());
- });
+ List<String> parts = new ArrayList<>();
+ parts.add("");
+ parts.add(" = (");
+ parts.add(") ");
+ parts.add(".newInstance();");
+ StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq());
+ List<Object> args = new ArrayList<>();
+ args.add(v1);
+ args.add(coderClass.getName());
+ args.add(coderClass.getName());
+ return sc.s(JavaConversions.collectionAsScalaIterable(args).toSeq());
+ }));
return beamCoderInstance;
}