You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by 杨力 <bi...@gmail.com> on 2018/03/09 10:00:11 UTC

Extremely large job serialization produced by union operator

I wrote a flink-sql app with following topography.

KafkaJsonTableSource -> SQL -> toAppendStream -> Map -> JDBCAppendTableSink
KafkaJsonTableSource -> SQL -> toAppendStream -> Map -> JDBCAppendTableSink
...
KafkaJsonTableSource -> SQL -> toAppendStream -> Map -> JDBCAppendTableSink

I have a dozen of TableSources And tens of SQLs. As a result, the number of
JDBCAppendTableSink times parallelism, that is the number of concurrent
connections to database, is too large for the database server to handle. So
I tried union DataStreams before connecting them to the TableSink.

KafkaJsonTableSource -> SQL -> toAppendStream -> Map
\
KafkaJsonTableSource -> SQL -> toAppendStream -> Map --- union ->
JDBCAppendTableSink
... /
KafkaJsonTableSource -> SQL -> toAppendStream -> Map

With this strategy, job submission failed with an OversizedPayloadException
of 104 MB. Increasing akka.framesize helps to avoid this exception, but job
submission hangs and times out.

I can't understand why a simple union operator would serialize to such a
large message. Can I avoid this problem?
Or can I change some configuration to fix the submission time out?

Regards,
Bill

Re: Extremely large job serialization produced by union operator

Posted by Fabian Hueske <fh...@gmail.com>.
Can you share the operator plan
(StreamExecutionEnvironment.getExecutionPlan()) for both cases?

Thanks, Fabian

2018-03-14 9:08 GMT+01:00 杨力 <bi...@gmail.com>:

> I understand complex SQL queries would be translated into large DAGs.
> However, the submission succeeds in my case if I don't use union operator.
> It might be a potential bug related to it. For example, following code
> submisses successfully with the default limitations of akka.framesize.
>
> val sqls: Seq[String] = ...
> val sink: JDBCAppendTableSink = ...
>
> sqls foreach {
>   sql =>
>     val table = tEnv.sqlQuery(sql)
>     val outputStream = tEnv.toAppendStream[Row](table) map {
>       ...
>     }
>     tEnv.fromDataStream(outputStream).writeToSink(sink)
> }
>
> If I union these outputStreams and send it to a single sink, the size of
> serialized job will be 100 MB.
>
> val outputStream = sqls map {
>   sql =>
>     val table = tEnv.sqlQuery(sql)
>     tEnv.toAppendStream[Row](table) map {
>       ...
>     }
> } reduce {
>   (a, b) => a union b
> }
> tEnv.fromDataStream(outputStream).writeToSink(sink)
>
> I failed to reproduce it without actually used table schemas and SQL
> queries in my production. And at last I wrote my own JDBC sink with
> connection pooling to migrate this problem. Maybe someone familiar with the
> implementation of union operator would figure out what's going wrong.
>
> Fabian Hueske <fh...@gmail.com> 于 2018年3月13日周二 下午11:42写道:
>
>> Hi Bill,
>>
>> The size of the program depends on the number and complexity SQL queries
>> that you are submitting.
>> Each query might be translated into a sequence of multiple operators.
>> Each operator has a string with generated code that will be compiled on the
>> worker nodes. The size of the code depends on the number of fields in the
>> schema.
>> Operators and code are not shared across queries.
>>
>> Best, Fabian
>>
>> 2018-03-09 23:36 GMT+01:00 杨力 <bi...@gmail.com>:
>>
>>> Thank you for your response. It occurs both in a standalone cluster anda
>>> a yarn-cluster. I am trying to remove business code and reproduce it with a
>>> minimal demo.
>>>
>>>
>>> On Sat, Mar 10, 2018 at 2:27 AM Piotr Nowojski <pi...@data-artisans.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Could you provide more details about your queries and setup? Logs could
>>>> be helpful as well.
>>>>
>>>> Piotrek
>>>>
>>>> > On 9 Mar 2018, at 11:00, 杨力 <bi...@gmail.com> wrote:
>>>> >
>>>> > I wrote a flink-sql app with following topography.
>>>> >
>>>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map ->
>>>> JDBCAppendTableSink
>>>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map ->
>>>> JDBCAppendTableSink
>>>> > ...
>>>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map ->
>>>> JDBCAppendTableSink
>>>> >
>>>> > I have a dozen of TableSources And tens of SQLs. As a result, the
>>>> number of JDBCAppendTableSink times parallelism, that is the number of
>>>> concurrent connections to database, is too large for the database server to
>>>> handle. So I tried union DataStreams before connecting them to the
>>>> TableSink.
>>>> >
>>>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map
>>>> > \
>>>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map --- union ->
>>>> JDBCAppendTableSink
>>>> > ... /
>>>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map
>>>> >
>>>> > With this strategy, job submission failed with an
>>>> OversizedPayloadException of 104 MB. Increasing akka.framesize helps to
>>>> avoid this exception, but job submission hangs and times out.
>>>> >
>>>> > I can't understand why a simple union operator would serialize to
>>>> such a large message. Can I avoid this problem?
>>>> > Or can I change some configuration to fix the submission time out?
>>>> >
>>>> > Regards,
>>>> > Bill
>>>>
>>>>
>>

Re: Extremely large job serialization produced by union operator

Posted by 杨力 <bi...@gmail.com>.
I understand complex SQL queries would be translated into large DAGs.
However, the submission succeeds in my case if I don't use union operator.
It might be a potential bug related to it. For example, following code
submisses successfully with the default limitations of akka.framesize.

val sqls: Seq[String] = ...
val sink: JDBCAppendTableSink = ...

sqls foreach {
  sql =>
    val table = tEnv.sqlQuery(sql)
    val outputStream = tEnv.toAppendStream[Row](table) map {
      ...
    }
    tEnv.fromDataStream(outputStream).writeToSink(sink)
}

If I union these outputStreams and send it to a single sink, the size of
serialized job will be 100 MB.

val outputStream = sqls map {
  sql =>
    val table = tEnv.sqlQuery(sql)
    tEnv.toAppendStream[Row](table) map {
      ...
    }
} reduce {
  (a, b) => a union b
}
tEnv.fromDataStream(outputStream).writeToSink(sink)

I failed to reproduce it without actually used table schemas and SQL
queries in my production. And at last I wrote my own JDBC sink with
connection pooling to migrate this problem. Maybe someone familiar with the
implementation of union operator would figure out what's going wrong.

Fabian Hueske <fh...@gmail.com> 于 2018年3月13日周二 下午11:42写道:

> Hi Bill,
>
> The size of the program depends on the number and complexity SQL queries
> that you are submitting.
> Each query might be translated into a sequence of multiple operators. Each
> operator has a string with generated code that will be compiled on the
> worker nodes. The size of the code depends on the number of fields in the
> schema.
> Operators and code are not shared across queries.
>
> Best, Fabian
>
> 2018-03-09 23:36 GMT+01:00 杨力 <bi...@gmail.com>:
>
>> Thank you for your response. It occurs both in a standalone cluster anda
>> a yarn-cluster. I am trying to remove business code and reproduce it with a
>> minimal demo.
>>
>>
>> On Sat, Mar 10, 2018 at 2:27 AM Piotr Nowojski <pi...@data-artisans.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Could you provide more details about your queries and setup? Logs could
>>> be helpful as well.
>>>
>>> Piotrek
>>>
>>> > On 9 Mar 2018, at 11:00, 杨力 <bi...@gmail.com> wrote:
>>> >
>>> > I wrote a flink-sql app with following topography.
>>> >
>>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map ->
>>> JDBCAppendTableSink
>>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map ->
>>> JDBCAppendTableSink
>>> > ...
>>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map ->
>>> JDBCAppendTableSink
>>> >
>>> > I have a dozen of TableSources And tens of SQLs. As a result, the
>>> number of JDBCAppendTableSink times parallelism, that is the number of
>>> concurrent connections to database, is too large for the database server to
>>> handle. So I tried union DataStreams before connecting them to the
>>> TableSink.
>>> >
>>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map
>>> > \
>>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map --- union ->
>>> JDBCAppendTableSink
>>> > ... /
>>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map
>>> >
>>> > With this strategy, job submission failed with an
>>> OversizedPayloadException of 104 MB. Increasing akka.framesize helps to
>>> avoid this exception, but job submission hangs and times out.
>>> >
>>> > I can't understand why a simple union operator would serialize to such
>>> a large message. Can I avoid this problem?
>>> > Or can I change some configuration to fix the submission time out?
>>> >
>>> > Regards,
>>> > Bill
>>>
>>>
>

Re: Extremely large job serialization produced by union operator

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Bill,

The size of the program depends on the number and complexity SQL queries
that you are submitting.
Each query might be translated into a sequence of multiple operators. Each
operator has a string with generated code that will be compiled on the
worker nodes. The size of the code depends on the number of fields in the
schema.
Operators and code are not shared across queries.

Best, Fabian

2018-03-09 23:36 GMT+01:00 杨力 <bi...@gmail.com>:

> Thank you for your response. It occurs both in a standalone cluster anda a
> yarn-cluster. I am trying to remove business code and reproduce it with a
> minimal demo.
>
>
> On Sat, Mar 10, 2018 at 2:27 AM Piotr Nowojski <pi...@data-artisans.com>
> wrote:
>
>> Hi,
>>
>> Could you provide more details about your queries and setup? Logs could
>> be helpful as well.
>>
>> Piotrek
>>
>> > On 9 Mar 2018, at 11:00, 杨力 <bi...@gmail.com> wrote:
>> >
>> > I wrote a flink-sql app with following topography.
>> >
>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map ->
>> JDBCAppendTableSink
>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map ->
>> JDBCAppendTableSink
>> > ...
>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map ->
>> JDBCAppendTableSink
>> >
>> > I have a dozen of TableSources And tens of SQLs. As a result, the
>> number of JDBCAppendTableSink times parallelism, that is the number of
>> concurrent connections to database, is too large for the database server to
>> handle. So I tried union DataStreams before connecting them to the
>> TableSink.
>> >
>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map
>> > \
>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map --- union ->
>> JDBCAppendTableSink
>> > ... /
>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map
>> >
>> > With this strategy, job submission failed with an
>> OversizedPayloadException of 104 MB. Increasing akka.framesize helps to
>> avoid this exception, but job submission hangs and times out.
>> >
>> > I can't understand why a simple union operator would serialize to such
>> a large message. Can I avoid this problem?
>> > Or can I change some configuration to fix the submission time out?
>> >
>> > Regards,
>> > Bill
>>
>>

Re: Extremely large job serialization produced by union operator

Posted by 杨力 <bi...@gmail.com>.
Thank you for your response. It occurs both in a standalone cluster anda a
yarn-cluster. I am trying to remove business code and reproduce it with a
minimal demo.

On Sat, Mar 10, 2018 at 2:27 AM Piotr Nowojski <pi...@data-artisans.com>
wrote:

> Hi,
>
> Could you provide more details about your queries and setup? Logs could be
> helpful as well.
>
> Piotrek
>
> > On 9 Mar 2018, at 11:00, 杨力 <bi...@gmail.com> wrote:
> >
> > I wrote a flink-sql app with following topography.
> >
> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map ->
> JDBCAppendTableSink
> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map ->
> JDBCAppendTableSink
> > ...
> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map ->
> JDBCAppendTableSink
> >
> > I have a dozen of TableSources And tens of SQLs. As a result, the number
> of JDBCAppendTableSink times parallelism, that is the number of concurrent
> connections to database, is too large for the database server to handle. So
> I tried union DataStreams before connecting them to the TableSink.
> >
> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map
> > \
> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map --- union ->
> JDBCAppendTableSink
> > ... /
> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map
> >
> > With this strategy, job submission failed with an
> OversizedPayloadException of 104 MB. Increasing akka.framesize helps to
> avoid this exception, but job submission hangs and times out.
> >
> > I can't understand why a simple union operator would serialize to such a
> large message. Can I avoid this problem?
> > Or can I change some configuration to fix the submission time out?
> >
> > Regards,
> > Bill
>
>

Re: Extremely large job serialization produced by union operator

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Hi,

Could you provide more details about your queries and setup? Logs could be helpful as well.

Piotrek

> On 9 Mar 2018, at 11:00, 杨力 <bi...@gmail.com> wrote:
> 
> I wrote a flink-sql app with following topography.
> 
> KafkaJsonTableSource -> SQL -> toAppendStream -> Map -> JDBCAppendTableSink
> KafkaJsonTableSource -> SQL -> toAppendStream -> Map -> JDBCAppendTableSink
> ...
> KafkaJsonTableSource -> SQL -> toAppendStream -> Map -> JDBCAppendTableSink
> 
> I have a dozen of TableSources And tens of SQLs. As a result, the number of JDBCAppendTableSink times parallelism, that is the number of concurrent connections to database, is too large for the database server to handle. So I tried union DataStreams before connecting them to the TableSink.
> 
> KafkaJsonTableSource -> SQL -> toAppendStream -> Map
> \
> KafkaJsonTableSource -> SQL -> toAppendStream -> Map --- union -> JDBCAppendTableSink
> ... /
> KafkaJsonTableSource -> SQL -> toAppendStream -> Map
> 
> With this strategy, job submission failed with an OversizedPayloadException of 104 MB. Increasing akka.framesize helps to avoid this exception, but job submission hangs and times out.
> 
> I can't understand why a simple union operator would serialize to such a large message. Can I avoid this problem?
> Or can I change some configuration to fix the submission time out?
> 
> Regards,
> Bill