You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/09/05 14:48:04 UTC

[beam] 11/24: Fix call to scala Fucntion1 in coder lazy init

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 2c94eef2f1d2677bf64851a8c22520c44557170f
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Wed Sep 4 14:54:59 2019 +0200

    Fix call to scala Fucntion1 in coder lazy init
---
 runners/spark/build.gradle                         |  1 +
 .../translation/helpers/EncoderHelpers.java        | 28 +++++++++++-----------
 2 files changed, 15 insertions(+), 14 deletions(-)

diff --git a/runners/spark/build.gradle b/runners/spark/build.gradle
index 73a710b..a948ef1 100644
--- a/runners/spark/build.gradle
+++ b/runners/spark/build.gradle
@@ -77,6 +77,7 @@ dependencies {
   provided "com.esotericsoftware.kryo:kryo:2.21"
   runtimeOnly library.java.jackson_module_scala
   runtimeOnly "org.scala-lang:scala-library:2.11.8"
+  compile "org.scala-lang.modules:scala-java8-compat_2.11:0.9.0"
   testCompile project(":sdks:java:io:kafka")
   testCompile project(path: ":sdks:java:core", configuration: "shadowTest")
   // SparkStateInternalsTest extends abstract StateInternalsTest
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
index f7706cc..694bc24 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
@@ -18,9 +18,9 @@
 package org.apache.beam.runners.spark.structuredstreaming.translation.helpers;
 
 import static org.apache.spark.sql.types.DataTypes.BinaryType;
+import static scala.compat.java8.JFunction.func;
 
 import java.io.ByteArrayInputStream;
-import java.io.IOException;
 import java.lang.reflect.Array;
 import java.util.ArrayList;
 import java.util.List;
@@ -370,23 +370,23 @@ case class DecodeUsingSerializer[T](child: Expression, tag: ClassTag[T], kryo: B
 
   private static <T> String lazyInitBeamCoder(CodegenContext ctx, Class<Coder<T>> coderClass) {
     String beamCoderInstance = "beamCoder";
-    ctx.addImmutableStateIfNotExists(coderClass.getName(), beamCoderInstance, v -> {
+    ctx.addImmutableStateIfNotExists(coderClass.getName(), beamCoderInstance, func(v1 -> {
       /*
     CODE GENERATED
     v = (coderClass) coderClass.newInstance();
      */
-      List<String> parts = new ArrayList<>();
-      parts.add("");
-      parts.add(" = (");
-      parts.add(") ");
-      parts.add(".newInstance();");
-      StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq());
-      List<Object> args = new ArrayList<>();
-      args.add(v);
-      args.add(coderClass.getName());
-      args.add(coderClass.getName());
-      return sc.s(JavaConversions.collectionAsScalaIterable(args).toSeq());
-    });
+        List<String> parts = new ArrayList<>();
+        parts.add("");
+        parts.add(" = (");
+        parts.add(") ");
+        parts.add(".newInstance();");
+        StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq());
+        List<Object> args = new ArrayList<>();
+        args.add(v1);
+        args.add(coderClass.getName());
+        args.add(coderClass.getName());
+        return sc.s(JavaConversions.collectionAsScalaIterable(args).toSeq());
+      }));
     return beamCoderInstance;
   }