You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Mans Singh (Jira)" <ji...@apache.org> on 2022/04/26 13:50:00 UTC
[jira] [Commented] (FLINK-22584) Use protobuf-shaded in StateFun core.
[ https://issues.apache.org/jira/browse/FLINK-22584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17528183#comment-17528183 ]
Mans Singh commented on FLINK-22584:
------------------------------------
[~igal] -
I am getting the error mentioned [here|https://lists.apache.org/thread/tcbx58oqn7kw32kl2q4cskoojwn7yrfx] (java.lang.NoSuchMethodError: org.apache.flink.statefun.sdk.reqreply.generated.TypedValue$Builder.setValue(Lcom/google/protobuf/ByteString;)Lorg/apache/flink/statefun/sdk/reqreply/generated/TypedValue$Builder) and need to use statefun java sdk (v 3.2.0).
Can you please advice how to use the workaroud code segment using byte buddy that you have mentioned above ?
Thanks
> Use protobuf-shaded in StateFun core.
> -------------------------------------
>
> Key: FLINK-22584
> URL: https://issues.apache.org/jira/browse/FLINK-22584
> Project: Flink
> Issue Type: Improvement
> Components: Stateful Functions
> Reporter: Igal Shilman
> Priority: Not a Priority
> Labels: auto-deprioritized-major, auto-deprioritized-minor, developer-experience
>
> We have *statefun-protobuf-shaded* module, that was introduced for the remote Java sdk.
> we can use it to shade protobuf internally, to reduce the dependency surface.
> The major hurdle we need to overcome is that, in embedded functions, we have to be able to accept instances of protobuf generated messages by the user.
> For example:
> {code:java}
> UserProfile userProfile = UserProfile.newBilder().build();
> context.send(..., userProfile) {code}
> If we will simply use the shaded Protobuf version, we will get immediately a class cast exception.
> One way to overcome this is to use reflection and find the well known methods on the generated classes and call toBytes() / parseFrom() reflectively.
> This however will cause a significant slow down, even by using MethodHandles.
> A small experiment that I've previously done with ByteBuddy mitigates this, by generating
> accessors, in pre-flight:
> {code:java}
> package org.apache.flink.statefun.flink.common.protobuf.serde;
> import static net.bytebuddy.matcher.ElementMatchers.named;import java.io.InputStream;
> import java.io.OutputStream;
> import java.lang.reflect.InvocationTargetException;
> import java.lang.reflect.Method;
> import net.bytebuddy.ByteBuddy;
> import net.bytebuddy.dynamic.DynamicType;
> import net.bytebuddy.implementation.FixedValue;
> import net.bytebuddy.implementation.MethodCall;
> import net.bytebuddy.implementation.bytecode.assign.Assigner;final class ReflectiveProtobufSerde { @SuppressWarnings({"unchecked", "rawtypes"})
> static <M> ProtobufSerde<M> ofProtobufGeneratedType(Class<M> type) {
> try {
> DynamicType.Unloaded<ProtobufSerde> unloaded = configureByteBuddy(type); Class<? extends ProtobufSerde> writer = unloaded.load(type.getClassLoader()).getLoaded(); return (ProtobufSerde<M>) writer.getDeclaredConstructor().newInstance();
> } catch (Throwable e) {
> throw new IllegalArgumentException();
> }
> } @SuppressWarnings("rawtypes")
> private static DynamicType.Unloaded<ProtobufSerde> configureByteBuddy(Class<?> type)
> throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
> Method writeToMethod = type.getMethod("writeTo", OutputStream.class);
> Method parseFromMethod = type.getMethod("parseFrom", InputStream.class);
> Method getSerializedSizeMethod = type.getMethod("getSerializedSize"); // get the message full name
> Method getDescriptorMethod = type.getMethod("getDescriptor");
> Object descriptor = getDescriptorMethod.invoke(null);
> Method getFullNameMethod = descriptor.getClass().getMethod("getFullName");
> String messageFullName = (String) getFullNameMethod.invoke(descriptor); return new ByteBuddy()
> .subclass(ProtobufSerde.class)
> .typeVariable("M", type)
> .method(named("writeTo"))
> .intercept(
> MethodCall.invoke(writeToMethod)
> .onArgument(0)
> .withArgument(1)
> .withAssigner(Assigner.DEFAULT, Assigner.Typing.DYNAMIC))
> .method(named("parseFrom"))
> .intercept(MethodCall.invoke(parseFromMethod).withArgument(0))
> .method(named("getSerializedSize"))
> .intercept(
> MethodCall.invoke(getSerializedSizeMethod)
> .onArgument(0)
> .withAssigner(Assigner.DEFAULT, Assigner.Typing.DYNAMIC))
> .method(named("getMessageFullName"))
> .intercept(FixedValue.value(messageFullName))
> .make();
> }
> }
> {code}
>
--
This message was sent by Atlassian Jira
(v8.20.7#820007)