You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Mans Singh (Jira)" <ji...@apache.org> on 2022/04/26 13:50:00 UTC

[jira] [Commented] (FLINK-22584) Use protobuf-shaded in StateFun core.

    [ https://issues.apache.org/jira/browse/FLINK-22584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17528183#comment-17528183 ] 

Mans Singh commented on FLINK-22584:
------------------------------------

[~igal] -

I am getting the error mentioned [here|https://lists.apache.org/thread/tcbx58oqn7kw32kl2q4cskoojwn7yrfx]  (java.lang.NoSuchMethodError: org.apache.flink.statefun.sdk.reqreply.generated.TypedValue$Builder.setValue(Lcom/google/protobuf/ByteString;)Lorg/apache/flink/statefun/sdk/reqreply/generated/TypedValue$Builder) and need to use statefun java sdk (v 3.2.0).

Can you please advice how to use the workaroud code segment using byte buddy that you have mentioned above ?

Thanks

> Use protobuf-shaded in StateFun core.
> -------------------------------------
>
>                 Key: FLINK-22584
>                 URL: https://issues.apache.org/jira/browse/FLINK-22584
>             Project: Flink
>          Issue Type: Improvement
>          Components: Stateful Functions
>            Reporter: Igal Shilman
>            Priority: Not a Priority
>              Labels: auto-deprioritized-major, auto-deprioritized-minor, developer-experience
>
> We have *statefun-protobuf-shaded* module, that was introduced for the remote Java sdk.
> we can use it to shade protobuf internally, to reduce the dependency surface.
> The major hurdle we need to overcome is that, in embedded functions, we have to be able to accept instances of protobuf generated messages by the user.
> For example:
> {code:java}
> UserProfile userProfile = UserProfile.newBilder().build();
> context.send(..., userProfile) {code}
> If we will simply use the shaded Protobuf version, we will get immediately a class cast exception.
> One way to overcome this is to use reflection and find the well known methods on the generated classes and call toBytes() / parseFrom() reflectively.
> This however will cause a significant slow down, even by using MethodHandles.
> A small experiment that I've previously done with ByteBuddy mitigates this, by generating 
> accessors, in pre-flight:
> {code:java}
> package org.apache.flink.statefun.flink.common.protobuf.serde;
> import static net.bytebuddy.matcher.ElementMatchers.named;import java.io.InputStream;
> import java.io.OutputStream;
> import java.lang.reflect.InvocationTargetException;
> import java.lang.reflect.Method;
> import net.bytebuddy.ByteBuddy;
> import net.bytebuddy.dynamic.DynamicType;
> import net.bytebuddy.implementation.FixedValue;
> import net.bytebuddy.implementation.MethodCall;
> import net.bytebuddy.implementation.bytecode.assign.Assigner;final class ReflectiveProtobufSerde {  @SuppressWarnings({"unchecked", "rawtypes"})
>   static <M> ProtobufSerde<M> ofProtobufGeneratedType(Class<M> type) {
>     try {
>       DynamicType.Unloaded<ProtobufSerde> unloaded = configureByteBuddy(type);      Class<? extends ProtobufSerde> writer = unloaded.load(type.getClassLoader()).getLoaded();      return (ProtobufSerde<M>) writer.getDeclaredConstructor().newInstance();
>     } catch (Throwable e) {
>       throw new IllegalArgumentException();
>     }
>   }  @SuppressWarnings("rawtypes")
>   private static DynamicType.Unloaded<ProtobufSerde> configureByteBuddy(Class<?> type)
>       throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
>     Method writeToMethod = type.getMethod("writeTo", OutputStream.class);
>     Method parseFromMethod = type.getMethod("parseFrom", InputStream.class);
>     Method getSerializedSizeMethod = type.getMethod("getSerializedSize");    // get the message full name
>     Method getDescriptorMethod = type.getMethod("getDescriptor");
>     Object descriptor = getDescriptorMethod.invoke(null);
>     Method getFullNameMethod = descriptor.getClass().getMethod("getFullName");
>     String messageFullName = (String) getFullNameMethod.invoke(descriptor);    return new ByteBuddy()
>         .subclass(ProtobufSerde.class)
>         .typeVariable("M", type)
>         .method(named("writeTo"))
>         .intercept(
>             MethodCall.invoke(writeToMethod)
>                 .onArgument(0)
>                 .withArgument(1)
>                 .withAssigner(Assigner.DEFAULT, Assigner.Typing.DYNAMIC))
>         .method(named("parseFrom"))
>         .intercept(MethodCall.invoke(parseFromMethod).withArgument(0))
>         .method(named("getSerializedSize"))
>         .intercept(
>             MethodCall.invoke(getSerializedSizeMethod)
>                 .onArgument(0)
>                 .withAssigner(Assigner.DEFAULT, Assigner.Typing.DYNAMIC))
>         .method(named("getMessageFullName"))
>         .intercept(FixedValue.value(messageFullName))
>         .make();
>   }
> }
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)