You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Aljoscha Krettek (Jira)" <ji...@apache.org> on 2020/08/03 09:27:00 UTC

[jira] [Commented] (FLINK-18770) Emitting element fails in KryoSerializer

    [ https://issues.apache.org/jira/browse/FLINK-18770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17169849#comment-17169849 ] 

Aljoscha Krettek commented on FLINK-18770:
------------------------------------------

Thanks for the analysis! Regarding object reuse: your intuition is correct, your code needs to be more defensive and not keep a handle to input objects, in a nutshell.

To answer your question: there should be no overhead of using object reuse, in fact the opposite should be the case because we potentially copy fewer objects.

Btw, I'll be on vacation for the next three weeks, so I'll probably not answer here for that time.

> Emitting element fails in KryoSerializer
> ----------------------------------------
>
>                 Key: FLINK-18770
>                 URL: https://issues.apache.org/jira/browse/FLINK-18770
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Type Serialization System
>    Affects Versions: 1.11.1
>         Environment: Flink 1.11.1, Linux
>            Reporter: Leonid Ilyevsky
>            Priority: Major
>         Attachments: AppMain.java, FlinkTest.scala, KryoException.txt, SolaceSource.java, run_command.txt
>
>
> I wrote a simple Flink connector for Solace, see attached java file. It works fine under local execution environment. However, when I deployed it in the real Flink cluster, it failed with the Kryo exception, see attached.
> After a few hours of search and debugging, I can see now what is going on.
> The data I want to emit from this source is a simple byte array. In the exception stack you can see that when I call 'collect' on the context, it goes into OperatorChain.java:715, and then to KryoSerializer, where it ultimately fails. I didn't have a chance to learn what KryoSerializer is and why it would not know what to do with byte[], but that is not the point now.
> Then I used debugger in my local test, in order to figure out how it manages to work. I saw that after OperatorChain.java:715 it goes into BytePrimitiveArraySerializer, and then everything is working as expected. Obviously BytePrimitiveArraySerializer makes sense for byte[] data.
> The question is, how can I configure the execution environment under cluster so that it does serialization the same way as the local one? I looked at [https://ci.apache.org/projects/flink/flink-docs-stable/dev/execution_configuration.html] , and I was thinking of setting disableForceKryo, but it says it is disabled by default anyway.
>  
> Another question is, why cluster execution environment has different default settings compare to local? This makes it difficult to rely on local tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)