You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Marios Trivyzas <ma...@gmail.com> on 2022/04/01 15:52:17 UTC

Re: Flink SQL and data shuffling (keyBy)

Hi!

I don't think there is a way to achieve that without resorting to
DataStream API.
I don't know if using the PARTITIONED BY clause in the create statement of
the table can help to "balance" the data, see
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#partitioned-by
.


On Thu, Mar 31, 2022 at 7:18 AM Yaroslav Tkachenko <ya...@goldsky.io>
wrote:

> Hey everyone,
>
> I'm trying to use Flink SQL to construct a set of transformations for my
> application. Let's say the topology just has three steps:
>
> - SQL Source
> - SQL SELECT statement
> - SQL Sink (via INSERT)
>
> The sink I'm using (JDBC) would really benefit from data partitioning (by
> PK ID) to avoid conflicting transactions and deadlocks. I can force Flink
> to partition the data by the PK ID before the INSERT by resorting to
> DataStream API and leveraging the keyBy method, then transforming
> DataStream back to the Table again...
>
> Is there a simpler way to do this? I understand that, for example, a GROUP
> BY statement will probably perform similar data shuffling, but what if I
> have a simple SELECT followed by INSERT?
>
> Thank you!
>


-- 
Marios

Re: Flink SQL and data shuffling (keyBy)

Posted by Marios Trivyzas <ma...@gmail.com>.
Happy to help,

Let us know if it helped in your use case.

On Tue, Apr 5, 2022 at 1:34 AM Yaroslav Tkachenko <ya...@goldsky.io>
wrote:

> Hi Marios,
>
> Thank you, this looks very promising!
>
> On Mon, Apr 4, 2022 at 2:42 AM Marios Trivyzas <ma...@gmail.com> wrote:
>
>> Hi again,
>>
>> Maybe you can use the
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/config/#table-exec-sink-keyed-shuffle
>> *table.exec.sink.keyed-shuffle* and set it to *FORCE, *which will use
>> the primary key column(s) to partition and distribute the data.
>>
>> On Fri, Apr 1, 2022 at 6:52 PM Marios Trivyzas <ma...@gmail.com> wrote:
>>
>>> Hi!
>>>
>>> I don't think there is a way to achieve that without resorting to
>>> DataStream API.
>>> I don't know if using the PARTITIONED BY clause in the create statement
>>> of the table can help to "balance" the data, see
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#partitioned-by
>>> .
>>>
>>>
>>> On Thu, Mar 31, 2022 at 7:18 AM Yaroslav Tkachenko <ya...@goldsky.io>
>>> wrote:
>>>
>>>> Hey everyone,
>>>>
>>>> I'm trying to use Flink SQL to construct a set of transformations for
>>>> my application. Let's say the topology just has three steps:
>>>>
>>>> - SQL Source
>>>> - SQL SELECT statement
>>>> - SQL Sink (via INSERT)
>>>>
>>>> The sink I'm using (JDBC) would really benefit from data partitioning
>>>> (by PK ID) to avoid conflicting transactions and deadlocks. I can force
>>>> Flink to partition the data by the PK ID before the INSERT by resorting to
>>>> DataStream API and leveraging the keyBy method, then transforming
>>>> DataStream back to the Table again...
>>>>
>>>> Is there a simpler way to do this? I understand that, for example, a
>>>> GROUP BY statement will probably perform similar data shuffling, but what
>>>> if I have a simple SELECT followed by INSERT?
>>>>
>>>> Thank you!
>>>>
>>>
>>>
>>> --
>>> Marios
>>>
>>
>>
>> Best,
>> Marios
>>
>

-- 
Marios

Re: Flink SQL and data shuffling (keyBy)

Posted by Yaroslav Tkachenko <ya...@goldsky.io>.
Hi Marios,

Thank you, this looks very promising!

On Mon, Apr 4, 2022 at 2:42 AM Marios Trivyzas <ma...@gmail.com> wrote:

> Hi again,
>
> Maybe you can use the
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/config/#table-exec-sink-keyed-shuffle
> *table.exec.sink.keyed-shuffle* and set it to *FORCE, *which will use the
> primary key column(s) to partition and distribute the data.
>
> On Fri, Apr 1, 2022 at 6:52 PM Marios Trivyzas <ma...@gmail.com> wrote:
>
>> Hi!
>>
>> I don't think there is a way to achieve that without resorting to
>> DataStream API.
>> I don't know if using the PARTITIONED BY clause in the create statement
>> of the table can help to "balance" the data, see
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#partitioned-by
>> .
>>
>>
>> On Thu, Mar 31, 2022 at 7:18 AM Yaroslav Tkachenko <ya...@goldsky.io>
>> wrote:
>>
>>> Hey everyone,
>>>
>>> I'm trying to use Flink SQL to construct a set of transformations for my
>>> application. Let's say the topology just has three steps:
>>>
>>> - SQL Source
>>> - SQL SELECT statement
>>> - SQL Sink (via INSERT)
>>>
>>> The sink I'm using (JDBC) would really benefit from data partitioning
>>> (by PK ID) to avoid conflicting transactions and deadlocks. I can force
>>> Flink to partition the data by the PK ID before the INSERT by resorting to
>>> DataStream API and leveraging the keyBy method, then transforming
>>> DataStream back to the Table again...
>>>
>>> Is there a simpler way to do this? I understand that, for example, a
>>> GROUP BY statement will probably perform similar data shuffling, but what
>>> if I have a simple SELECT followed by INSERT?
>>>
>>> Thank you!
>>>
>>
>>
>> --
>> Marios
>>
>
>
> Best,
> Marios
>

Re: Flink SQL and data shuffling (keyBy)

Posted by Marios Trivyzas <ma...@gmail.com>.
Hi again,

Maybe you can use the
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/config/#table-exec-sink-keyed-shuffle
*table.exec.sink.keyed-shuffle* and set it to *FORCE, *which will use the
primary key column(s) to partition and distribute the data.

On Fri, Apr 1, 2022 at 6:52 PM Marios Trivyzas <ma...@gmail.com> wrote:

> Hi!
>
> I don't think there is a way to achieve that without resorting to
> DataStream API.
> I don't know if using the PARTITIONED BY clause in the create statement of
> the table can help to "balance" the data, see
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#partitioned-by
> .
>
>
> On Thu, Mar 31, 2022 at 7:18 AM Yaroslav Tkachenko <ya...@goldsky.io>
> wrote:
>
>> Hey everyone,
>>
>> I'm trying to use Flink SQL to construct a set of transformations for my
>> application. Let's say the topology just has three steps:
>>
>> - SQL Source
>> - SQL SELECT statement
>> - SQL Sink (via INSERT)
>>
>> The sink I'm using (JDBC) would really benefit from data partitioning (by
>> PK ID) to avoid conflicting transactions and deadlocks. I can force Flink
>> to partition the data by the PK ID before the INSERT by resorting to
>> DataStream API and leveraging the keyBy method, then transforming
>> DataStream back to the Table again...
>>
>> Is there a simpler way to do this? I understand that, for example, a
>> GROUP BY statement will probably perform similar data shuffling, but what
>> if I have a simple SELECT followed by INSERT?
>>
>> Thank you!
>>
>
>
> --
> Marios
>


Best,
Marios