You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Svend Vanderveken <sv...@kelesia.com> on 2021/02/14 10:41:54 UTC

Failed to register Protobuf Kryo serialization

Hi all,

I'm failing to setup an example of wire serialization with Protobuf, could
you help me figure out what I'm doing wrong?

I'm using a simple protobuf schema:

```

syntax = "proto3";

import "google/protobuf/wrappers.proto";
option java_multiple_files = true;

message DemoUserEvent {
  Metadata metadata = 1;
  oneof payload {
    Created created = 10;
    Updated updated = 11;
  }

  message Created {...}

  message Updated {...}

  ...

}

```


From which I'm generating java from this Gradle plugin:


```

plugins {
    id "com.google.protobuf" version "0.8.15"
}

```


And I'm generating DemoUserEvent instances with Java Iterator looking like this:


```

public class UserEventGenerator implements Iterator<DemoUserEvent>,
Serializable {
    transient public final static Faker faker = new Faker();
    ...
    @Override public DemoUserEvent next() {
        return randomCreatedEvent();

     }

     ...

```


I read those two pieces of documentation:
*
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/types_serialization.html
*
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/custom_serializers.html

And tried the demo app below:

```

import com.twitter.chill.protobuf.ProtobufSerializer;

...

public static void main(String[] args) {
    final StreamExecutionEnvironment flinkEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
    flinkEnv.getConfig().registerTypeWithKryoSerializer(DemoUserEvent.class,
ProtobufSerializer.class);
    flinkEnv.fromCollection(new UserEventGenerator(),
DemoUserEvent.class).print();
}

```

But the serialization mechanism still fails to handle my protobuf class:

11:22:45,822 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
         [] - class live.schema.event.user.v1.DemoUserEvent does not
contain a getter for field payloadCase_
11:22:45,822 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
         [] - class live.schema.event.user.v1.DemoUserEvent does not
contain a setter for field payloadCase_
11:22:45,822 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
         [] - Class class live.schema.event.user.v1.DemoUserEvent
cannot be used as a POJO type because not all fields are valid POJO
fields, and must be processed as GenericType. Please read the Flink
documentation on "Data Types & Serialization" for details of the
effect on performance.

I've also tried this, without success:

```

flinkEnv.getConfig().addDefaultKryoSerializer(DemoUserEvent.class,
ProtobufSerializer.class);

```


I'm using those versions:

```

ext {
    javaVersion = '11'
    flinkVersion = '1.12.1'
    scalaBinaryVersion = '2.12'
}

dependencies {
    compileOnly
"org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
    implementation ("com.twitter:chill-protobuf:0.9.5") {
        exclude group: 'com.esotericsoftware.kryo', module: 'kryo'
    }
    implementation "com.google.protobuf:protobuf-java:3.14.0"
    implementation 'com.github.javafaker:javafaker:1.0.2'
}

```


Any idea what I should try next?

Thanks in advance!

Re: Failed to register Protobuf Kryo serialization

Posted by Svend Vanderveken <sv...@kelesia.com>.
Sorry, I realize I posted this to the wrong list, please ignore and I'll
post it to the flink-user one.

On Sun, Feb 14, 2021 at 11:41 AM Svend Vanderveken <sv...@kelesia.com>
wrote:

> Hi all,
>
> I'm failing to setup an example of wire serialization with Protobuf, could
> you help me figure out what I'm doing wrong?
>
> I'm using a simple protobuf schema:
>
> ```
>
> syntax = "proto3";
>
> import "google/protobuf/wrappers.proto";
> option java_multiple_files = true;
>
> message DemoUserEvent {
>   Metadata metadata = 1;
>   oneof payload {
>     Created created = 10;
>     Updated updated = 11;
>   }
>
>   message Created {...}
>
>   message Updated {...}
>
>   ...
>
> }
>
> ```
>
>
> From which I'm generating java from this Gradle plugin:
>
>
> ```
>
> plugins {
>     id "com.google.protobuf" version "0.8.15"
> }
>
> ```
>
>
> And I'm generating DemoUserEvent instances with Java Iterator looking like this:
>
>
> ```
>
> public class UserEventGenerator implements Iterator<DemoUserEvent>, Serializable {
>     transient public final static Faker faker = new Faker();
>     ...
>     @Override public DemoUserEvent next() {
>         return randomCreatedEvent();
>
>      }
>
>      ...
>
> ```
>
>
> I read those two pieces of documentation:
> *
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/types_serialization.html
> *
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/custom_serializers.html
>
> And tried the demo app below:
>
> ```
>
> import com.twitter.chill.protobuf.ProtobufSerializer;
>
> ...
>
> public static void main(String[] args) {
>     final StreamExecutionEnvironment flinkEnv = StreamExecutionEnvironment.getExecutionEnvironment();
>     flinkEnv.getConfig().registerTypeWithKryoSerializer(DemoUserEvent.class, ProtobufSerializer.class);
>     flinkEnv.fromCollection(new UserEventGenerator(), DemoUserEvent.class).print();
> }
>
> ```
>
> But the serialization mechanism still fails to handle my protobuf class:
>
> 11:22:45,822 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            [] - class live.schema.event.user.v1.DemoUserEvent does not contain a getter for field payloadCase_
> 11:22:45,822 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            [] - class live.schema.event.user.v1.DemoUserEvent does not contain a setter for field payloadCase_
> 11:22:45,822 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            [] - Class class live.schema.event.user.v1.DemoUserEvent cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
>
> I've also tried this, without success:
>
> ```
>
> flinkEnv.getConfig().addDefaultKryoSerializer(DemoUserEvent.class, ProtobufSerializer.class);
>
> ```
>
>
> I'm using those versions:
>
> ```
>
> ext {
>     javaVersion = '11'
>     flinkVersion = '1.12.1'
>     scalaBinaryVersion = '2.12'
> }
>
> dependencies {
>     compileOnly "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
>     implementation ("com.twitter:chill-protobuf:0.9.5") {
>         exclude group: 'com.esotericsoftware.kryo', module: 'kryo'
>     }
>     implementation "com.google.protobuf:protobuf-java:3.14.0"
>     implementation 'com.github.javafaker:javafaker:1.0.2'
> }
>
> ```
>
>
> Any idea what I should try next?
>
> Thanks in advance!
>
>
>
>

-- 
Svend Vanderveken
Kelesia SPRL - BE 0839 049 010
blog: https://svend.kelesia.com <http://svend.kelesia.com/>
Twitter: @sv3ndk <https://twitter.com/sv3ndk>