You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Mazlum Tosun (Jira)" <ji...@apache.org> on 2021/05/02 00:24:00 UTC
[jira] [Updated] (BEAM-12265) FlatMapElement Kotlin Beam non
Serializable lambda
[ https://issues.apache.org/jira/browse/BEAM-12265?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Mazlum Tosun updated BEAM-12265:
--------------------------------
Description:
I have an existing Apache Beam project with Java 8, Apache Beam 2.27.0, Maven and Dagger 2.
I migrated this project in Kotlin : Kotlin JDK 8 with version 1.5.0.
I used the 1.5.0 version of Kotlin because the 1.4.3 had an issue with Beam and Maven plugin (Could not read class: VirtualFile : [https://stackoverflow.com/questions/66170900/kotlin-1-4-30-apache-beam-compilation-error])
Everything seems to be good except the use of native MapElement or FlatMapElement with Typedescriptor and lambda expression.
A part of my pom.xml file
{{{code:xml}}}
<properties>
<beam.version>2.27.0</beam.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<kotlin.code.style>official</kotlin.code.style>
<kotlin.compiler.jvmTarget>1.8</kotlin.compiler.jvmTarget>
<kotlin.compiler.incremental>true</kotlin.compiler.incremental>
<kotlin.version>1.5.0</kotlin.version>
<serialization.version>1.2.0</serialization.version>
<java.version>1.8</java.version>
<dagger.version>2.35.1</dagger.version>
<maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
<maven-exec-plugin.version>1.6.0</maven-exec-plugin.version>
<maven-surefire-plugin.version>3.0.0-M5</maven-surefire-plugin.version>
<properties>
<dependencies>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib-jdk8</artifactId>
<version>${kotlin.version}</version>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-serialization-json</artifactId>
<version>${serialization.version}</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>${beam.version}</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam.version}</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-redis</artifactId>
<version>${beam.version}</version>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-test-junit</artifactId>
<version>${kotlin.version}</version>
<scope>test</scope>
</dependency>
<dependencies>
<build>
<plugins>
<plugin>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-maven-plugin</artifactId>
<version>${kotlin.version}</version>
<executions>
<execution>
<id>kapt</id>
<goals>
<goal>kapt</goal>
</goals>
<configuration>
<sourceDirs>
<sourceDir>src/main/kotlin</sourceDir>
</sourceDirs>
<annotationProcessorPaths>
<annotationProcessorPath>
<groupId>com.google.dagger</groupId>
<artifactId>dagger-compiler</artifactId>
<version>${dagger.version}</version>
</annotationProcessorPath>
</annotationProcessorPaths>
</configuration>
</execution>
<execution>
<id>compile</id>
<phase>process-sources</phase>
<goals>
<goal>compile</goal>
</goals>
<configuration>
<sourceDirs>
<sourceDir>src/main/kotlin</sourceDir>
</sourceDirs>
</configuration>
</execution>
<execution>
<id>test-kapt</id>
<goals>
<goal>test-kapt</goal>
</goals>
<configuration>
<sourceDirs>
<sourceDir>src/test/kotlin</sourceDir>
</sourceDirs>
<annotationProcessorPaths>
<annotationProcessorPath>
<groupId>com.google.dagger</groupId>
<artifactId>dagger-compiler</artifactId>
<version>${dagger.version}</version>
</annotationProcessorPath>
</annotationProcessorPaths>
</configuration>
</execution>
<execution>
<id>test-compile</id>
<goals>
<goal>test-compile</goal>
</goals>
<configuration>
<sourceDirs>
<sourceDir>src/test/kotlin</sourceDir>
<sourceDir>target/generated-sources/kapt/test</sourceDir>
</sourceDirs>
</configuration>
</execution>
</executions>
<configuration>
<compilerPlugins>
<plugin>kotlinx-serialization</plugin>
</compilerPlugins>
</configuration>
<dependencies>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-maven-serialization</artifactId>
<version>${kotlin.version}</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>${maven-surefire-plugin.version}</version>
<dependencies>
<dependency>
<groupId>org.apache.maven.surefire</groupId>
<artifactId>surefire-junit47</artifactId>
<version>${maven-surefire-plugin.version}</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>${maven-exec-plugin.version}</version>
<configuration>
<cleanupDaemonThreads>false</cleanupDaemonThreads>
</configuration>
</plugin>
</plugins>
{{{code}}}
An object that implements Serializable (java.io)
{{{code:kotlin}}}
data class MyObject(
val field: String = ""
) : Serializable {
{{{code}}}
And basically i want to execute a FlatMapElement with Typedescriptor and a lambda (behind the scene a SerializableFunction)
{{{code:kotlin}}}
class MyTransform(private val redisConnectionConf: RedisConnectionConfiguration) :
PTransform<PBegin, PCollection<MyObject>>() {
override fun expand(input: PBegin): PCollection<MyObject>
{ return input .apply(RedisIO.read().withConnectionConfiguration(redisConnectionConf).withKeyPattern("my-pattern*")) .apply( FlatMapElements.into(of(MyObject::class.java)) .via(SerializableFunction<KV<String, String>, List<MyObject>> \\{ toMyObjects(it) }
)
)
}
fun toMyObjects(entry: KV<String, String>): List<MyObject> {
val key = entry.key
val value = entry.value
val ref = object : TypeReference<List<MyObject>>() {}
return OBJECT_MAPPER.readValue(value, ref)
}
{{{code}}}
I volontary changed the code and put some part of code in method "toMyObjects" in order to give the maximum of elements.
The "OBJECT_MAPPER" object is a Jackson Object Mapper.
With Java 8 and Beam 2.27.0 this basic code works perfectly fine.
With Kotlin this code doesn't works with the following error :
{{{code:text}}}
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray (SerializableUtils.java:59)
at org.apache.beam.runners.core.construction.ParDoTranslation.translateDoFn (ParDoTranslation.java:692)
at org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator$1.translateDoFn (PrimitiveParDoSingleFactory.java:218)
at org.apache.beam.runners.core.construction.ParDoTranslation.payloadForParDoLike (ParDoTranslation.java:814)
at org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator.payloadForParDoSingle (PrimitiveParDoSingleFactory.java:214)
at org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator.translate (PrimitiveParDoSingleFactory.java:163)
at org.apache.beam.runners.core.construction.PTransformTranslation$KnownTransformPayloadTranslator.translate (PTransformTranslation.java:429)
at org.apache.beam.runners.core.construction.PTransformTranslation.toProto (PTransformTranslation.java:239)
at org.apache.beam.runners.core.construction.SdkComponents.registerPTransform (SdkComponents.java:175)
at org.apache.beam.runners.core.construction.PipelineTranslation$1.visitPrimitiveTransform (PipelineTranslation.java:87)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:587)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500 (TransformHierarchy.java:239)
at org.apache.beam.sdk.runners.TransformHierarchy.visit (TransformHierarchy.java:213)
at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:468)
at org.apache.beam.runners.core.construction.PipelineTranslation.toProto (PipelineTranslation.java:59)
at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:933)
at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:196)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:322)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:308)
at myPackage.MyApp.main (MyApp.kt:44)
at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke (Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
at java.lang.Thread.run (Thread.java:748)
Caused by: java.io.NotSerializableException: Non-serializable lambda
at mypackage.MyTransform$$Lambda$783/1784079343.writeObject (Unknown Source)
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java (default-cli) on project my-project:
An exception occured while executing the Java class. unable to serialize
DoFnWithExecutionInformation{doFn=org.apache.beam.sdk.transforms.FlatMapElements$2@23402e70,
mainOutputTag=Tag<org.apache.beam.sdk.values.PCollection.<init>:402#6929f09b03d242ca>, sideInputMapping={}, schemaInformation=DoFnSchemaInformation\{elementConverters=[]}}: Non-serializable lambda -> [Help 1]
{{{code:text}}}
{{}}
The SerializableUtils.serializeToByteArray method in Beam sdk sends this error : java.io.NotSerializableException: Non-serializable lambda
MyObject is Serializable and the lambda is wrapped in a Beam SerializableFunction (function that implements Serializable).
Normally in this case, Beam take a SerializableCoder from the Serializable object.
I don't understand why Beam saw the lambda as non Serializable.
I don't have this kind of behaviour directly in Java.
I precise, if i replace the FlatMapElement/descriptor/lambda by a ParDo.of(DoFn), this works fine, but in some cases for a better concision and readabilty, i want to use the built in MapElement and FlatMapElement with lambda expressions.
Thanks in advance for your help.
was:
I have an existing Apache Beam project with Java 8, Apache Beam 2.27.0, Maven and Dagger 2.
I migrated this project in Kotlin : Kotlin JDK 8 with version 1.5.0.
I used the 1.5.0 version of Kotlin because the 1.4.3 had an issue with Beam and Maven plugin (Could not read class: VirtualFile : https://stackoverflow.com/questions/66170900/kotlin-1-4-30-apache-beam-compilation-error)
Everything seems to be good except the use of native MapElement or FlatMapElement with Typedescriptor and lambda expression.
A part of my pom.xml file
```xml
<properties>
<beam.version>2.27.0</beam.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<kotlin.code.style>official</kotlin.code.style>
<kotlin.compiler.jvmTarget>1.8</kotlin.compiler.jvmTarget>
<kotlin.compiler.incremental>true</kotlin.compiler.incremental>
<kotlin.version>1.5.0</kotlin.version>
<serialization.version>1.2.0</serialization.version>
<java.version>1.8</java.version>
<dagger.version>2.35.1</dagger.version>
<maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
<maven-exec-plugin.version>1.6.0</maven-exec-plugin.version>
<maven-surefire-plugin.version>3.0.0-M5</maven-surefire-plugin.version>
<properties>
<dependencies>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib-jdk8</artifactId>
<version>${kotlin.version}</version>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-serialization-json</artifactId>
<version>${serialization.version}</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>${beam.version}</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam.version}</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-redis</artifactId>
<version>${beam.version}</version>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-test-junit</artifactId>
<version>${kotlin.version}</version>
<scope>test</scope>
</dependency>
<dependencies>
<build>
<plugins>
<plugin>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-maven-plugin</artifactId>
<version>${kotlin.version}</version>
<executions>
<execution>
<id>kapt</id>
<goals>
<goal>kapt</goal>
</goals>
<configuration>
<sourceDirs>
<sourceDir>src/main/kotlin</sourceDir>
</sourceDirs>
<annotationProcessorPaths>
<annotationProcessorPath>
<groupId>com.google.dagger</groupId>
<artifactId>dagger-compiler</artifactId>
<version>${dagger.version}</version>
</annotationProcessorPath>
</annotationProcessorPaths>
</configuration>
</execution>
<execution>
<id>compile</id>
<phase>process-sources</phase>
<goals>
<goal>compile</goal>
</goals>
<configuration>
<sourceDirs>
<sourceDir>src/main/kotlin</sourceDir>
</sourceDirs>
</configuration>
</execution>
<execution>
<id>test-kapt</id>
<goals>
<goal>test-kapt</goal>
</goals>
<configuration>
<sourceDirs>
<sourceDir>src/test/kotlin</sourceDir>
</sourceDirs>
<annotationProcessorPaths>
<annotationProcessorPath>
<groupId>com.google.dagger</groupId>
<artifactId>dagger-compiler</artifactId>
<version>${dagger.version}</version>
</annotationProcessorPath>
</annotationProcessorPaths>
</configuration>
</execution>
<execution>
<id>test-compile</id>
<goals>
<goal>test-compile</goal>
</goals>
<configuration>
<sourceDirs>
<sourceDir>src/test/kotlin</sourceDir>
<sourceDir>target/generated-sources/kapt/test</sourceDir>
</sourceDirs>
</configuration>
</execution>
</executions>
<configuration>
<compilerPlugins>
<plugin>kotlinx-serialization</plugin>
</compilerPlugins>
</configuration>
<dependencies>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-maven-serialization</artifactId>
<version>${kotlin.version}</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>${maven-surefire-plugin.version}</version>
<dependencies>
<dependency>
<groupId>org.apache.maven.surefire</groupId>
<artifactId>surefire-junit47</artifactId>
<version>${maven-surefire-plugin.version}</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>${maven-exec-plugin.version}</version>
<configuration>
<cleanupDaemonThreads>false</cleanupDaemonThreads>
</configuration>
</plugin>
</plugins>
```
An object that implements Serializable (java.io)
```kotlin
data class MyObject(
val field: String = ""
) : Serializable {
```
And basically i want to execute a FlatMapElement with Typedescriptor and a lambda (behind the scene a SerializableFunction)
```kotlin
class MyTransform(private val redisConnectionConf: RedisConnectionConfiguration) :
PTransform<PBegin, PCollection<MyObject>>() {
override fun expand(input: PBegin): PCollection<MyObject> {
return input
.apply(RedisIO.read().withConnectionConfiguration(redisConnectionConf).withKeyPattern("my-pattern*"))
.apply(
FlatMapElements.into(of(MyObject::class.java))
.via(SerializableFunction<KV<String, String>, List<MyObject>> \{ toMyObjects(it) })
)
}
fun toMyObjects(entry: KV<String, String>): List<MyObject> {
val key = entry.key
val value = entry.value
val ref = object : TypeReference<List<MyObject>>() {}
return OBJECT_MAPPER.readValue(value, ref)
}
```
I volontary changed the code and put some part of code in method "toMyObjects" in order to give the maximum of elements.
The "OBJECT_MAPPER" object is a Jackson Object Mapper.
With Java 8 and Beam 2.27.0 this basic code works perfectly fine.
With Kotlin this code doesn't works with the following error :
```text
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray (SerializableUtils.java:59)
at org.apache.beam.runners.core.construction.ParDoTranslation.translateDoFn (ParDoTranslation.java:692)
at org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator$1.translateDoFn (PrimitiveParDoSingleFactory.java:218)
at org.apache.beam.runners.core.construction.ParDoTranslation.payloadForParDoLike (ParDoTranslation.java:814)
at org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator.payloadForParDoSingle (PrimitiveParDoSingleFactory.java:214)
at org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator.translate (PrimitiveParDoSingleFactory.java:163)
at org.apache.beam.runners.core.construction.PTransformTranslation$KnownTransformPayloadTranslator.translate (PTransformTranslation.java:429)
at org.apache.beam.runners.core.construction.PTransformTranslation.toProto (PTransformTranslation.java:239)
at org.apache.beam.runners.core.construction.SdkComponents.registerPTransform (SdkComponents.java:175)
at org.apache.beam.runners.core.construction.PipelineTranslation$1.visitPrimitiveTransform (PipelineTranslation.java:87)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:587)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500 (TransformHierarchy.java:239)
at org.apache.beam.sdk.runners.TransformHierarchy.visit (TransformHierarchy.java:213)
at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:468)
at org.apache.beam.runners.core.construction.PipelineTranslation.toProto (PipelineTranslation.java:59)
at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:933)
at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:196)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:322)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:308)
at myPackage.MyApp.main (MyApp.kt:44)
at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke (Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
at java.lang.Thread.run (Thread.java:748)
Caused by: java.io.NotSerializableException: Non-serializable lambda
at mypackage.MyTransform$$Lambda$783/1784079343.writeObject (Unknown Source)
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java (default-cli) on project my-project:
An exception occured while executing the Java class. unable to serialize
DoFnWithExecutionInformation{doFn=org.apache.beam.sdk.transforms.FlatMapElements$2@23402e70,
mainOutputTag=Tag<org.apache.beam.sdk.values.PCollection.<init>:402#6929f09b03d242ca>, sideInputMapping={}, schemaInformation=DoFnSchemaInformation\{elementConverters=[]}}: Non-serializable lambda -> [Help 1]
```
The SerializableUtils.serializeToByteArray method in Beam sdk sends this error : java.io.NotSerializableException: Non-serializable lambda
MyObject is Serializable and the lambda is wrapped in a Beam SerializableFunction (function that implements Serializable).
Normally in this case, Beam take a SerializableCoder from the Serializable object.
I don't understand why Beam saw the lambda as non Serializable.
I don't have this kind of behaviour directly in Java.
I precise, if i replace the FlatMapElement/descriptor/lambda by a ParDo.of(DoFn), this works fine, but in some cases for a better concision and readabilty, i want to use the built in MapElement and FlatMapElement with lambda expressions.
Thanks in advance for your help.
> FlatMapElement Kotlin Beam non Serializable lambda
> --------------------------------------------------
>
> Key: BEAM-12265
> URL: https://issues.apache.org/jira/browse/BEAM-12265
> Project: Beam
> Issue Type: Bug
> Components: beam-community
> Reporter: Mazlum Tosun
> Priority: P0
> Fix For: 2.27.0
>
>
> I have an existing Apache Beam project with Java 8, Apache Beam 2.27.0, Maven and Dagger 2.
> I migrated this project in Kotlin : Kotlin JDK 8 with version 1.5.0.
> I used the 1.5.0 version of Kotlin because the 1.4.3 had an issue with Beam and Maven plugin (Could not read class: VirtualFile : [https://stackoverflow.com/questions/66170900/kotlin-1-4-30-apache-beam-compilation-error])
> Everything seems to be good except the use of native MapElement or FlatMapElement with Typedescriptor and lambda expression.
> A part of my pom.xml file
>
> {{{code:xml}}}
> <properties>
> <beam.version>2.27.0</beam.version>
> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
> <kotlin.code.style>official</kotlin.code.style>
> <kotlin.compiler.jvmTarget>1.8</kotlin.compiler.jvmTarget>
> <kotlin.compiler.incremental>true</kotlin.compiler.incremental>
> <kotlin.version>1.5.0</kotlin.version>
> <serialization.version>1.2.0</serialization.version>
> <java.version>1.8</java.version>
> <dagger.version>2.35.1</dagger.version>
> <maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
> <maven-exec-plugin.version>1.6.0</maven-exec-plugin.version>
> <maven-surefire-plugin.version>3.0.0-M5</maven-surefire-plugin.version>
> <properties>
> <dependencies>
> <dependency>
> <groupId>org.jetbrains.kotlin</groupId>
> <artifactId>kotlin-stdlib-jdk8</artifactId>
> <version>${kotlin.version}</version>
> </dependency>
> <dependency>
> <groupId>org.jetbrains.kotlinx</groupId>
> <artifactId>kotlinx-serialization-json</artifactId>
> <version>${serialization.version}</version>
> </dependency>
> <dependency>
> <groupId>org.apache.beam</groupId>
> <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
> <version>${beam.version}</version>
> <scope>runtime</scope>
> </dependency>
> <dependency>
> <groupId>org.apache.beam</groupId>
> <artifactId>beam-sdks-java-core</artifactId>
> <version>${beam.version}</version>
> </dependency>
> <dependency>
> <groupId>org.apache.beam</groupId>
> <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
> <version>${beam.version}</version>
> </dependency>
> <dependency>
> <groupId>org.apache.beam</groupId>
> <artifactId>beam-sdks-java-io-redis</artifactId>
> <version>${beam.version}</version>
> </dependency>
> <dependency>
> <groupId>org.jetbrains.kotlin</groupId>
> <artifactId>kotlin-test-junit</artifactId>
> <version>${kotlin.version}</version>
> <scope>test</scope>
> </dependency>
> <dependencies>
> <build>
> <plugins>
> <plugin>
> <groupId>org.jetbrains.kotlin</groupId>
> <artifactId>kotlin-maven-plugin</artifactId>
> <version>${kotlin.version}</version>
> <executions>
> <execution>
> <id>kapt</id>
> <goals>
> <goal>kapt</goal>
> </goals>
> <configuration>
> <sourceDirs>
> <sourceDir>src/main/kotlin</sourceDir>
> </sourceDirs>
> <annotationProcessorPaths>
> <annotationProcessorPath>
> <groupId>com.google.dagger</groupId>
> <artifactId>dagger-compiler</artifactId>
> <version>${dagger.version}</version>
> </annotationProcessorPath>
> </annotationProcessorPaths>
> </configuration>
> </execution>
> <execution>
> <id>compile</id>
> <phase>process-sources</phase>
> <goals>
> <goal>compile</goal>
> </goals>
> <configuration>
> <sourceDirs>
> <sourceDir>src/main/kotlin</sourceDir>
> </sourceDirs>
> </configuration>
> </execution>
> <execution>
> <id>test-kapt</id>
> <goals>
> <goal>test-kapt</goal>
> </goals>
> <configuration>
> <sourceDirs>
> <sourceDir>src/test/kotlin</sourceDir>
> </sourceDirs>
> <annotationProcessorPaths>
> <annotationProcessorPath>
> <groupId>com.google.dagger</groupId>
> <artifactId>dagger-compiler</artifactId>
> <version>${dagger.version}</version>
> </annotationProcessorPath>
> </annotationProcessorPaths>
> </configuration>
> </execution>
> <execution>
> <id>test-compile</id>
> <goals>
> <goal>test-compile</goal>
> </goals>
> <configuration>
> <sourceDirs>
> <sourceDir>src/test/kotlin</sourceDir>
> <sourceDir>target/generated-sources/kapt/test</sourceDir>
> </sourceDirs>
> </configuration>
> </execution>
> </executions>
> <configuration>
> <compilerPlugins>
> <plugin>kotlinx-serialization</plugin>
> </compilerPlugins>
> </configuration>
> <dependencies>
> <dependency>
> <groupId>org.jetbrains.kotlin</groupId>
> <artifactId>kotlin-maven-serialization</artifactId>
> <version>${kotlin.version}</version>
> </dependency>
> </dependencies>
> </plugin>
> <plugin>
> <groupId>org.apache.maven.plugins</groupId>
> <artifactId>maven-surefire-plugin</artifactId>
> <version>${maven-surefire-plugin.version}</version>
> <dependencies>
> <dependency>
> <groupId>org.apache.maven.surefire</groupId>
> <artifactId>surefire-junit47</artifactId>
> <version>${maven-surefire-plugin.version}</version>
> </dependency>
> </dependencies>
> </plugin>
> <plugin>
> <groupId>org.codehaus.mojo</groupId>
> <artifactId>exec-maven-plugin</artifactId>
> <version>${maven-exec-plugin.version}</version>
> <configuration>
> <cleanupDaemonThreads>false</cleanupDaemonThreads>
> </configuration>
> </plugin>
> </plugins>
> {{{code}}}
>
> An object that implements Serializable (java.io)
>
> {{{code:kotlin}}}
> data class MyObject(
> val field: String = ""
> ) : Serializable {
> {{{code}}}
> And basically i want to execute a FlatMapElement with Typedescriptor and a lambda (behind the scene a SerializableFunction)
>
> {{{code:kotlin}}}
> class MyTransform(private val redisConnectionConf: RedisConnectionConfiguration) :
> PTransform<PBegin, PCollection<MyObject>>() {
> override fun expand(input: PBegin): PCollection<MyObject>
> { return input .apply(RedisIO.read().withConnectionConfiguration(redisConnectionConf).withKeyPattern("my-pattern*")) .apply( FlatMapElements.into(of(MyObject::class.java)) .via(SerializableFunction<KV<String, String>, List<MyObject>> \\{ toMyObjects(it) }
> )
> )
> }
> fun toMyObjects(entry: KV<String, String>): List<MyObject> {
> val key = entry.key
> val value = entry.value
> val ref = object : TypeReference<List<MyObject>>() {}
> return OBJECT_MAPPER.readValue(value, ref)
> }
> {{{code}}}
> I volontary changed the code and put some part of code in method "toMyObjects" in order to give the maximum of elements.
> The "OBJECT_MAPPER" object is a Jackson Object Mapper.
> With Java 8 and Beam 2.27.0 this basic code works perfectly fine.
> With Kotlin this code doesn't works with the following error :
> {{{code:text}}}
> at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray (SerializableUtils.java:59)
> at org.apache.beam.runners.core.construction.ParDoTranslation.translateDoFn (ParDoTranslation.java:692)
> at org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator$1.translateDoFn (PrimitiveParDoSingleFactory.java:218)
> at org.apache.beam.runners.core.construction.ParDoTranslation.payloadForParDoLike (ParDoTranslation.java:814)
> at org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator.payloadForParDoSingle (PrimitiveParDoSingleFactory.java:214)
> at org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator.translate (PrimitiveParDoSingleFactory.java:163)
> at org.apache.beam.runners.core.construction.PTransformTranslation$KnownTransformPayloadTranslator.translate (PTransformTranslation.java:429)
> at org.apache.beam.runners.core.construction.PTransformTranslation.toProto (PTransformTranslation.java:239)
> at org.apache.beam.runners.core.construction.SdkComponents.registerPTransform (SdkComponents.java:175)
> at org.apache.beam.runners.core.construction.PipelineTranslation$1.visitPrimitiveTransform (PipelineTranslation.java:87)
> at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:587)
> at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
> at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
> at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
> at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500 (TransformHierarchy.java:239)
> at org.apache.beam.sdk.runners.TransformHierarchy.visit (TransformHierarchy.java:213)
> at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:468)
> at org.apache.beam.runners.core.construction.PipelineTranslation.toProto (PipelineTranslation.java:59)
> at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:933)
> at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:196)
> at org.apache.beam.sdk.Pipeline.run (Pipeline.java:322)
> at org.apache.beam.sdk.Pipeline.run (Pipeline.java:308)
> at myPackage.MyApp.main (MyApp.kt:44)
> at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke (Method.java:498)
> at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
> at java.lang.Thread.run (Thread.java:748)
> Caused by: java.io.NotSerializableException: Non-serializable lambda
> at mypackage.MyTransform$$Lambda$783/1784079343.writeObject (Unknown Source)
> [ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java (default-cli) on project my-project:
> An exception occured while executing the Java class. unable to serialize
> DoFnWithExecutionInformation{doFn=org.apache.beam.sdk.transforms.FlatMapElements$2@23402e70,
> mainOutputTag=Tag<org.apache.beam.sdk.values.PCollection.<init>:402#6929f09b03d242ca>, sideInputMapping={}, schemaInformation=DoFnSchemaInformation\{elementConverters=[]}}: Non-serializable lambda -> [Help 1]
> {{{code:text}}}
> {{}}
> The SerializableUtils.serializeToByteArray method in Beam sdk sends this error : java.io.NotSerializableException: Non-serializable lambda
> MyObject is Serializable and the lambda is wrapped in a Beam SerializableFunction (function that implements Serializable).
> Normally in this case, Beam take a SerializableCoder from the Serializable object.
> I don't understand why Beam saw the lambda as non Serializable.
> I don't have this kind of behaviour directly in Java.
> I precise, if i replace the FlatMapElement/descriptor/lambda by a ParDo.of(DoFn), this works fine, but in some cases for a better concision and readabilty, i want to use the built in MapElement and FlatMapElement with lambda expressions.
> Thanks in advance for your help.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)