You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by kant kodali <ka...@gmail.com> on 2020/01/28 21:43:00 UTC

is streaming outer join sending unnecessary traffic?

Hi All,

I am doing a streaming outer join from four topics in Kafka lets call them
sample1, sample2, sample3, sample4. Each of these test topics has just one
column which is of tuple string. my query is this

SELECT * FROM sample1 FULL OUTER JOIN sample2 on sample1.f0=sample2.f0
FULL OUTER JOIN sample3 on sample2.f0=sample3.f0 FULL OUTER JOIN
sample4 on sample3.f0=sample4.f0


And here is how I send messages to those Kafka topics at various times.

At time t1 Send a message "flink" to test-topic1

(true,flink,null,null,null) // Looks good

At time t2 Send a message "flink" to test-topic4

(true,null,null,null,flink) // Looks good

At time t3 Send a message "flink" to test-topic3

(false,null,null,null,flink) // Looks good
(true,null,null,flink,flink) //Looks good

At time t3 Send a message "flink" to test-topic2

(false,flink,null,null,null) // Looks good
(false,null,null,flink,flink) // Looks good
*(true,null,null,null,flink) // Redundant?*
*(false,null,null,null,flink) // Redundant?*
(true,flink,flink,flink,flink) //Looks good

Those two rows above seem to be redundant to be although the end result is
correct. Doesn't see the same behavior if I join two topics. This unwanted
message will lead to a lot of database operations underneath so any way to
optimize this? I am using Flink 1.9 so not sure if this is already fixed in
1.10.

Attached the code as well.

Thanks!
kant

Re: is streaming outer join sending unnecessary traffic?

Posted by Jark Wu <im...@gmail.com>.
Hi Kant,

Benchao explained the reason in depth, thanks Benchao!

In a word, the results are as expected. That's because all the streaming
operators are in per-record manner and eventually consistent.
That means user may see some instantaneous intermediate values. The outer
join will generate additional retractions and the nested outer join
will amplify the number of intermediate values.

Currently, blink planner provides mini-batch optimization to reduce the
visibility of the intermediate results. However, the mini-batch is only
available
in aggregate operators [1]. The mini-batch optimization for streaming join
is on the roadmap, but may not catch up 1.11.

A workaround for now is that you can customize your sink function, buffer
the retraction messages and upsert messages in memory,
reduce them before flushing to external system.

Best,
Jark

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation


On Sun, 2 Feb 2020 at 11:44, Benchao Li <li...@gmail.com> wrote:

> Hi kant,
>
> Thanks for reporting the issue, I'd like to give some thoughts here after
> digging into the source code[1] in blink planner, logic is same with legacy
> planner[2].
>
> The main logic of FULL OUTER JOIN is:
>
> if input record is accumulate
> |  if input side is outer
> |  |  if there is no matched rows on the other side, send +[record+null], state.add(record, 0)
> |  |  if there are matched rows on the other side
> |  |  | if other side is outer
> |  |  | |  if the matched num in the matched rows == 0, send -[null+other]
> |  |  | |  if the matched num in the matched rows > 0, skip
> |  |  | |  otherState.update(other, old + 1)
> |  |  | endif
> |  |  | send +[record+other]s, state.add(record, other.size)
> |  |  endif
> |  endif
> |  if input side not outer
> |  |  state.add(record)
> |  |  if there is no matched rows on the other side, skip
> |  |  if there are matched rows on the other side
> |  |  |  if other side is outer
> |  |  |  |  if the matched num in the matched rows == 0, send -[null+other]
> |  |  |  |  if the matched num in the matched rows > 0, skip
> |  |  |  |  otherState.update(other, old + 1)
> |  |  |  endif
> |  |  |  send +[record+other]s
> |  |  endif
> |  endif
> endif
>
> if input record is retract
> |  state.retract(record)
> |  if there is no matched rows on the other side
> |  | if input side is outer, send -[record+null]
> |  endif
> |  if there are matched rows on the other side, send -[record+other]s
> |  |  if other side is outer
> |  |  |  if the matched num in the matched rows == 0, this should never happen!
> |  |  |  if the matched num in the matched rows == 1, send +[null+other]
> |  |  |  if the matched num in the matched rows > 1, skip
> |  |  |  otherState.update(other, old - 1)
> |  |  endif
> |  endif
> endif
>
>
> For just one Join Operator, the logic above is correct, and deals with all
> corner cases.
> However, for your query, there are three Join Operators:
>       Join3
>       /    \
>    Join2 T4
>    /    \
> Join1 T3
>  /  \
> T1 T2
>
> At t4, after sending "flink" to test-topic2:
> Join1 will first retract -[flink, null, null, null], then send +[flink,
> flink, null, null]
> Join2 receives -[flink, null, null, null], will send -[flink, null, null,
> null]
> Join2 receives +[flink, flink, null, null], will send -[null, null, flink,
> null], and +[flink, flink, flink, null]
> Join3 receives -[flink, null, null, null], will send -[flink, null, null,
> null]
> Join3 receives -[null, null, flink, null], will send -[null, null, flink,
> flink], and +[null, null, null, flink]
> Join3 receives +[flink, flink, flink, null], will send -[null, null, null,
> flink], and +[flink, flink, flink, flink]
>
> In my personal opinion, it's normal behavior for current design, and I
> can't find an easy way to eliminate this duplication.
> Let's wait for Jark and Timo's opinions.
>
> [1]
> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java
> [2]
> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/NonWindowFullJoin.scala
>
> kant kodali <ka...@gmail.com> 于2020年2月2日周日 上午5:18写道:
>
>> Wondering if anyone had a chance to look through this or should I create
>> the JIRA?
>>
>>
>>
>> On Wed, Jan 29, 2020 at 6:49 AM Till Rohrmann <tr...@apache.org>
>> wrote:
>>
>>> Hi Kant,
>>>
>>> I am not an expert on Flink's SQL implementation. Hence, I'm pulling in
>>> Timo and Jark who might help you with your question.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Jan 28, 2020 at 10:46 PM kant kodali <ka...@gmail.com> wrote:
>>>
>>>> Sorry. fixed some typos.
>>>>
>>>> I am doing a streaming outer join from four topics in Kafka lets call
>>>> them sample1, sample2, sample3, sample4. Each of these test topics has just
>>>> one column which is of tuple string. my query is this
>>>>
>>>> SELECT * FROM sample1 FULL OUTER JOIN sample2 on sample1.f0=sample2.f0 FULL OUTER JOIN sample3 on sample2.f0=sample3.f0 FULL OUTER JOIN sample4 on sample3.f0=sample4.f0
>>>>
>>>>
>>>> And here is how I send messages to those Kafka topics at various times.
>>>>
>>>> At time t1 Send a message "flink" to test-topic1
>>>>
>>>> (true,flink,null,null,null) // Looks good
>>>>
>>>> At time t2 Send a message "flink" to test-topic4
>>>>
>>>> (true,null,null,null,flink) // Looks good
>>>>
>>>> At time t3 Send a message "flink" to test-topic3
>>>>
>>>> (false,null,null,null,flink) // Looks good
>>>> (true,null,null,flink,flink) //Looks good
>>>>
>>>> At time t4 Send a message "flink" to test-topic2
>>>>
>>>> (false,flink,null,null,null) // Looks good
>>>> (false,null,null,flink,flink) // Looks good
>>>> *(true,null,null,null,flink) // Redundant?*
>>>> *(false,null,null,null,flink) // Redundant?*
>>>> (true,flink,flink,flink,flink) //Looks good
>>>>
>>>> Assume t1<t2<t3<t4
>>>>
>>>> Those two rows above seem to be redundant to me although the end result
>>>> is correct. Doesn't see the same behavior if I join two topics. These
>>>> redundant messages can lead to a lot of database operations underneath so
>>>> any way to optimize this? I am using Flink 1.9 so not sure if this is
>>>> already fixed in 1.10.
>>>>
>>>> Attached the code as well.
>>>>
>>>> Thanks!
>>>> kant
>>>>
>>>>
>>>> On Tue, Jan 28, 2020 at 1:43 PM kant kodali <ka...@gmail.com> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I am doing a streaming outer join from four topics in Kafka lets call
>>>>> them sample1, sample2, sample3, sample4. Each of these test topics has just
>>>>> one column which is of tuple string. my query is this
>>>>>
>>>>> SELECT * FROM sample1 FULL OUTER JOIN sample2 on sample1.f0=sample2.f0 FULL OUTER JOIN sample3 on sample2.f0=sample3.f0 FULL OUTER JOIN sample4 on sample3.f0=sample4.f0
>>>>>
>>>>>
>>>>> And here is how I send messages to those Kafka topics at various times.
>>>>>
>>>>> At time t1 Send a message "flink" to test-topic1
>>>>>
>>>>> (true,flink,null,null,null) // Looks good
>>>>>
>>>>> At time t2 Send a message "flink" to test-topic4
>>>>>
>>>>> (true,null,null,null,flink) // Looks good
>>>>>
>>>>> At time t3 Send a message "flink" to test-topic3
>>>>>
>>>>> (false,null,null,null,flink) // Looks good
>>>>> (true,null,null,flink,flink) //Looks good
>>>>>
>>>>> At time t3 Send a message "flink" to test-topic2
>>>>>
>>>>> (false,flink,null,null,null) // Looks good
>>>>> (false,null,null,flink,flink) // Looks good
>>>>> *(true,null,null,null,flink) // Redundant?*
>>>>> *(false,null,null,null,flink) // Redundant?*
>>>>> (true,flink,flink,flink,flink) //Looks good
>>>>>
>>>>> Those two rows above seem to be redundant to be although the end
>>>>> result is correct. Doesn't see the same behavior if I join two topics. This
>>>>> unwanted message will lead to a lot of database operations underneath so
>>>>> any way to optimize this? I am using Flink 1.9 so not sure if this is
>>>>> already fixed in 1.10.
>>>>>
>>>>> Attached the code as well.
>>>>>
>>>>> Thanks!
>>>>> kant
>>>>>
>>>>>
>>>>>
>>>>>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenchao@gmail.com; libenchao@pku.edu.cn
>
>
>

Re: is streaming outer join sending unnecessary traffic?

Posted by Benchao Li <li...@gmail.com>.
Hi kant,

Thanks for reporting the issue, I'd like to give some thoughts here after
digging into the source code[1] in blink planner, logic is same with legacy
planner[2].

The main logic of FULL OUTER JOIN is:

if input record is accumulate
|  if input side is outer
|  |  if there is no matched rows on the other side, send
+[record+null], state.add(record, 0)
|  |  if there are matched rows on the other side
|  |  | if other side is outer
|  |  | |  if the matched num in the matched rows == 0, send -[null+other]
|  |  | |  if the matched num in the matched rows > 0, skip
|  |  | |  otherState.update(other, old + 1)
|  |  | endif
|  |  | send +[record+other]s, state.add(record, other.size)
|  |  endif
|  endif
|  if input side not outer
|  |  state.add(record)
|  |  if there is no matched rows on the other side, skip
|  |  if there are matched rows on the other side
|  |  |  if other side is outer
|  |  |  |  if the matched num in the matched rows == 0, send -[null+other]
|  |  |  |  if the matched num in the matched rows > 0, skip
|  |  |  |  otherState.update(other, old + 1)
|  |  |  endif
|  |  |  send +[record+other]s
|  |  endif
|  endif
endif

if input record is retract
|  state.retract(record)
|  if there is no matched rows on the other side
|  | if input side is outer, send -[record+null]
|  endif
|  if there are matched rows on the other side, send -[record+other]s
|  |  if other side is outer
|  |  |  if the matched num in the matched rows == 0, this should never happen!
|  |  |  if the matched num in the matched rows == 1, send +[null+other]
|  |  |  if the matched num in the matched rows > 1, skip
|  |  |  otherState.update(other, old - 1)
|  |  endif
|  endif
endif


For just one Join Operator, the logic above is correct, and deals with all
corner cases.
However, for your query, there are three Join Operators:
      Join3
      /    \
   Join2 T4
   /    \
Join1 T3
 /  \
T1 T2

At t4, after sending "flink" to test-topic2:
Join1 will first retract -[flink, null, null, null], then send +[flink,
flink, null, null]
Join2 receives -[flink, null, null, null], will send -[flink, null, null,
null]
Join2 receives +[flink, flink, null, null], will send -[null, null, flink,
null], and +[flink, flink, flink, null]
Join3 receives -[flink, null, null, null], will send -[flink, null, null,
null]
Join3 receives -[null, null, flink, null], will send -[null, null, flink,
flink], and +[null, null, null, flink]
Join3 receives +[flink, flink, flink, null], will send -[null, null, null,
flink], and +[flink, flink, flink, flink]

In my personal opinion, it's normal behavior for current design, and I
can't find an easy way to eliminate this duplication.
Let's wait for Jark and Timo's opinions.

[1]
https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java
[2]
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/NonWindowFullJoin.scala

kant kodali <ka...@gmail.com> 于2020年2月2日周日 上午5:18写道:

> Wondering if anyone had a chance to look through this or should I create
> the JIRA?
>
>
>
> On Wed, Jan 29, 2020 at 6:49 AM Till Rohrmann <tr...@apache.org>
> wrote:
>
>> Hi Kant,
>>
>> I am not an expert on Flink's SQL implementation. Hence, I'm pulling in
>> Timo and Jark who might help you with your question.
>>
>> Cheers,
>> Till
>>
>> On Tue, Jan 28, 2020 at 10:46 PM kant kodali <ka...@gmail.com> wrote:
>>
>>> Sorry. fixed some typos.
>>>
>>> I am doing a streaming outer join from four topics in Kafka lets call
>>> them sample1, sample2, sample3, sample4. Each of these test topics has just
>>> one column which is of tuple string. my query is this
>>>
>>> SELECT * FROM sample1 FULL OUTER JOIN sample2 on sample1.f0=sample2.f0 FULL OUTER JOIN sample3 on sample2.f0=sample3.f0 FULL OUTER JOIN sample4 on sample3.f0=sample4.f0
>>>
>>>
>>> And here is how I send messages to those Kafka topics at various times.
>>>
>>> At time t1 Send a message "flink" to test-topic1
>>>
>>> (true,flink,null,null,null) // Looks good
>>>
>>> At time t2 Send a message "flink" to test-topic4
>>>
>>> (true,null,null,null,flink) // Looks good
>>>
>>> At time t3 Send a message "flink" to test-topic3
>>>
>>> (false,null,null,null,flink) // Looks good
>>> (true,null,null,flink,flink) //Looks good
>>>
>>> At time t4 Send a message "flink" to test-topic2
>>>
>>> (false,flink,null,null,null) // Looks good
>>> (false,null,null,flink,flink) // Looks good
>>> *(true,null,null,null,flink) // Redundant?*
>>> *(false,null,null,null,flink) // Redundant?*
>>> (true,flink,flink,flink,flink) //Looks good
>>>
>>> Assume t1<t2<t3<t4
>>>
>>> Those two rows above seem to be redundant to me although the end result
>>> is correct. Doesn't see the same behavior if I join two topics. These
>>> redundant messages can lead to a lot of database operations underneath so
>>> any way to optimize this? I am using Flink 1.9 so not sure if this is
>>> already fixed in 1.10.
>>>
>>> Attached the code as well.
>>>
>>> Thanks!
>>> kant
>>>
>>>
>>> On Tue, Jan 28, 2020 at 1:43 PM kant kodali <ka...@gmail.com> wrote:
>>>
>>>> Hi All,
>>>>
>>>> I am doing a streaming outer join from four topics in Kafka lets call
>>>> them sample1, sample2, sample3, sample4. Each of these test topics has just
>>>> one column which is of tuple string. my query is this
>>>>
>>>> SELECT * FROM sample1 FULL OUTER JOIN sample2 on sample1.f0=sample2.f0 FULL OUTER JOIN sample3 on sample2.f0=sample3.f0 FULL OUTER JOIN sample4 on sample3.f0=sample4.f0
>>>>
>>>>
>>>> And here is how I send messages to those Kafka topics at various times.
>>>>
>>>> At time t1 Send a message "flink" to test-topic1
>>>>
>>>> (true,flink,null,null,null) // Looks good
>>>>
>>>> At time t2 Send a message "flink" to test-topic4
>>>>
>>>> (true,null,null,null,flink) // Looks good
>>>>
>>>> At time t3 Send a message "flink" to test-topic3
>>>>
>>>> (false,null,null,null,flink) // Looks good
>>>> (true,null,null,flink,flink) //Looks good
>>>>
>>>> At time t3 Send a message "flink" to test-topic2
>>>>
>>>> (false,flink,null,null,null) // Looks good
>>>> (false,null,null,flink,flink) // Looks good
>>>> *(true,null,null,null,flink) // Redundant?*
>>>> *(false,null,null,null,flink) // Redundant?*
>>>> (true,flink,flink,flink,flink) //Looks good
>>>>
>>>> Those two rows above seem to be redundant to be although the end result
>>>> is correct. Doesn't see the same behavior if I join two topics. This
>>>> unwanted message will lead to a lot of database operations underneath so
>>>> any way to optimize this? I am using Flink 1.9 so not sure if this is
>>>> already fixed in 1.10.
>>>>
>>>> Attached the code as well.
>>>>
>>>> Thanks!
>>>> kant
>>>>
>>>>
>>>>
>>>>

-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn

Re: is streaming outer join sending unnecessary traffic?

Posted by kant kodali <ka...@gmail.com>.
Wondering if anyone had a chance to look through this or should I create
the JIRA?



On Wed, Jan 29, 2020 at 6:49 AM Till Rohrmann <tr...@apache.org> wrote:

> Hi Kant,
>
> I am not an expert on Flink's SQL implementation. Hence, I'm pulling in
> Timo and Jark who might help you with your question.
>
> Cheers,
> Till
>
> On Tue, Jan 28, 2020 at 10:46 PM kant kodali <ka...@gmail.com> wrote:
>
>> Sorry. fixed some typos.
>>
>> I am doing a streaming outer join from four topics in Kafka lets call
>> them sample1, sample2, sample3, sample4. Each of these test topics has just
>> one column which is of tuple string. my query is this
>>
>> SELECT * FROM sample1 FULL OUTER JOIN sample2 on sample1.f0=sample2.f0 FULL OUTER JOIN sample3 on sample2.f0=sample3.f0 FULL OUTER JOIN sample4 on sample3.f0=sample4.f0
>>
>>
>> And here is how I send messages to those Kafka topics at various times.
>>
>> At time t1 Send a message "flink" to test-topic1
>>
>> (true,flink,null,null,null) // Looks good
>>
>> At time t2 Send a message "flink" to test-topic4
>>
>> (true,null,null,null,flink) // Looks good
>>
>> At time t3 Send a message "flink" to test-topic3
>>
>> (false,null,null,null,flink) // Looks good
>> (true,null,null,flink,flink) //Looks good
>>
>> At time t4 Send a message "flink" to test-topic2
>>
>> (false,flink,null,null,null) // Looks good
>> (false,null,null,flink,flink) // Looks good
>> *(true,null,null,null,flink) // Redundant?*
>> *(false,null,null,null,flink) // Redundant?*
>> (true,flink,flink,flink,flink) //Looks good
>>
>> Assume t1<t2<t3<t4
>>
>> Those two rows above seem to be redundant to me although the end result
>> is correct. Doesn't see the same behavior if I join two topics. These
>> redundant messages can lead to a lot of database operations underneath so
>> any way to optimize this? I am using Flink 1.9 so not sure if this is
>> already fixed in 1.10.
>>
>> Attached the code as well.
>>
>> Thanks!
>> kant
>>
>>
>> On Tue, Jan 28, 2020 at 1:43 PM kant kodali <ka...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> I am doing a streaming outer join from four topics in Kafka lets call
>>> them sample1, sample2, sample3, sample4. Each of these test topics has just
>>> one column which is of tuple string. my query is this
>>>
>>> SELECT * FROM sample1 FULL OUTER JOIN sample2 on sample1.f0=sample2.f0 FULL OUTER JOIN sample3 on sample2.f0=sample3.f0 FULL OUTER JOIN sample4 on sample3.f0=sample4.f0
>>>
>>>
>>> And here is how I send messages to those Kafka topics at various times.
>>>
>>> At time t1 Send a message "flink" to test-topic1
>>>
>>> (true,flink,null,null,null) // Looks good
>>>
>>> At time t2 Send a message "flink" to test-topic4
>>>
>>> (true,null,null,null,flink) // Looks good
>>>
>>> At time t3 Send a message "flink" to test-topic3
>>>
>>> (false,null,null,null,flink) // Looks good
>>> (true,null,null,flink,flink) //Looks good
>>>
>>> At time t3 Send a message "flink" to test-topic2
>>>
>>> (false,flink,null,null,null) // Looks good
>>> (false,null,null,flink,flink) // Looks good
>>> *(true,null,null,null,flink) // Redundant?*
>>> *(false,null,null,null,flink) // Redundant?*
>>> (true,flink,flink,flink,flink) //Looks good
>>>
>>> Those two rows above seem to be redundant to be although the end result
>>> is correct. Doesn't see the same behavior if I join two topics. This
>>> unwanted message will lead to a lot of database operations underneath so
>>> any way to optimize this? I am using Flink 1.9 so not sure if this is
>>> already fixed in 1.10.
>>>
>>> Attached the code as well.
>>>
>>> Thanks!
>>> kant
>>>
>>>
>>>
>>>

Re: is streaming outer join sending unnecessary traffic?

Posted by Till Rohrmann <tr...@apache.org>.
Hi Kant,

I am not an expert on Flink's SQL implementation. Hence, I'm pulling in
Timo and Jark who might help you with your question.

Cheers,
Till

On Tue, Jan 28, 2020 at 10:46 PM kant kodali <ka...@gmail.com> wrote:

> Sorry. fixed some typos.
>
> I am doing a streaming outer join from four topics in Kafka lets call them
> sample1, sample2, sample3, sample4. Each of these test topics has just one
> column which is of tuple string. my query is this
>
> SELECT * FROM sample1 FULL OUTER JOIN sample2 on sample1.f0=sample2.f0 FULL OUTER JOIN sample3 on sample2.f0=sample3.f0 FULL OUTER JOIN sample4 on sample3.f0=sample4.f0
>
>
> And here is how I send messages to those Kafka topics at various times.
>
> At time t1 Send a message "flink" to test-topic1
>
> (true,flink,null,null,null) // Looks good
>
> At time t2 Send a message "flink" to test-topic4
>
> (true,null,null,null,flink) // Looks good
>
> At time t3 Send a message "flink" to test-topic3
>
> (false,null,null,null,flink) // Looks good
> (true,null,null,flink,flink) //Looks good
>
> At time t4 Send a message "flink" to test-topic2
>
> (false,flink,null,null,null) // Looks good
> (false,null,null,flink,flink) // Looks good
> *(true,null,null,null,flink) // Redundant?*
> *(false,null,null,null,flink) // Redundant?*
> (true,flink,flink,flink,flink) //Looks good
>
> Assume t1<t2<t3<t4
>
> Those two rows above seem to be redundant to me although the end result is
> correct. Doesn't see the same behavior if I join two topics. These
> redundant messages can lead to a lot of database operations underneath so
> any way to optimize this? I am using Flink 1.9 so not sure if this is
> already fixed in 1.10.
>
> Attached the code as well.
>
> Thanks!
> kant
>
>
> On Tue, Jan 28, 2020 at 1:43 PM kant kodali <ka...@gmail.com> wrote:
>
>> Hi All,
>>
>> I am doing a streaming outer join from four topics in Kafka lets call
>> them sample1, sample2, sample3, sample4. Each of these test topics has just
>> one column which is of tuple string. my query is this
>>
>> SELECT * FROM sample1 FULL OUTER JOIN sample2 on sample1.f0=sample2.f0 FULL OUTER JOIN sample3 on sample2.f0=sample3.f0 FULL OUTER JOIN sample4 on sample3.f0=sample4.f0
>>
>>
>> And here is how I send messages to those Kafka topics at various times.
>>
>> At time t1 Send a message "flink" to test-topic1
>>
>> (true,flink,null,null,null) // Looks good
>>
>> At time t2 Send a message "flink" to test-topic4
>>
>> (true,null,null,null,flink) // Looks good
>>
>> At time t3 Send a message "flink" to test-topic3
>>
>> (false,null,null,null,flink) // Looks good
>> (true,null,null,flink,flink) //Looks good
>>
>> At time t3 Send a message "flink" to test-topic2
>>
>> (false,flink,null,null,null) // Looks good
>> (false,null,null,flink,flink) // Looks good
>> *(true,null,null,null,flink) // Redundant?*
>> *(false,null,null,null,flink) // Redundant?*
>> (true,flink,flink,flink,flink) //Looks good
>>
>> Those two rows above seem to be redundant to be although the end result
>> is correct. Doesn't see the same behavior if I join two topics. This
>> unwanted message will lead to a lot of database operations underneath so
>> any way to optimize this? I am using Flink 1.9 so not sure if this is
>> already fixed in 1.10.
>>
>> Attached the code as well.
>>
>> Thanks!
>> kant
>>
>>
>>
>>

Re: is streaming outer join sending unnecessary traffic?

Posted by kant kodali <ka...@gmail.com>.
Sorry. fixed some typos.

I am doing a streaming outer join from four topics in Kafka lets call them
sample1, sample2, sample3, sample4. Each of these test topics has just one
column which is of tuple string. my query is this

SELECT * FROM sample1 FULL OUTER JOIN sample2 on sample1.f0=sample2.f0
FULL OUTER JOIN sample3 on sample2.f0=sample3.f0 FULL OUTER JOIN
sample4 on sample3.f0=sample4.f0


And here is how I send messages to those Kafka topics at various times.

At time t1 Send a message "flink" to test-topic1

(true,flink,null,null,null) // Looks good

At time t2 Send a message "flink" to test-topic4

(true,null,null,null,flink) // Looks good

At time t3 Send a message "flink" to test-topic3

(false,null,null,null,flink) // Looks good
(true,null,null,flink,flink) //Looks good

At time t4 Send a message "flink" to test-topic2

(false,flink,null,null,null) // Looks good
(false,null,null,flink,flink) // Looks good
*(true,null,null,null,flink) // Redundant?*
*(false,null,null,null,flink) // Redundant?*
(true,flink,flink,flink,flink) //Looks good

Assume t1<t2<t3<t4

Those two rows above seem to be redundant to me although the end result is
correct. Doesn't see the same behavior if I join two topics. These
redundant messages can lead to a lot of database operations underneath so
any way to optimize this? I am using Flink 1.9 so not sure if this is
already fixed in 1.10.

Attached the code as well.

Thanks!
kant


On Tue, Jan 28, 2020 at 1:43 PM kant kodali <ka...@gmail.com> wrote:

> Hi All,
>
> I am doing a streaming outer join from four topics in Kafka lets call them
> sample1, sample2, sample3, sample4. Each of these test topics has just one
> column which is of tuple string. my query is this
>
> SELECT * FROM sample1 FULL OUTER JOIN sample2 on sample1.f0=sample2.f0 FULL OUTER JOIN sample3 on sample2.f0=sample3.f0 FULL OUTER JOIN sample4 on sample3.f0=sample4.f0
>
>
> And here is how I send messages to those Kafka topics at various times.
>
> At time t1 Send a message "flink" to test-topic1
>
> (true,flink,null,null,null) // Looks good
>
> At time t2 Send a message "flink" to test-topic4
>
> (true,null,null,null,flink) // Looks good
>
> At time t3 Send a message "flink" to test-topic3
>
> (false,null,null,null,flink) // Looks good
> (true,null,null,flink,flink) //Looks good
>
> At time t3 Send a message "flink" to test-topic2
>
> (false,flink,null,null,null) // Looks good
> (false,null,null,flink,flink) // Looks good
> *(true,null,null,null,flink) // Redundant?*
> *(false,null,null,null,flink) // Redundant?*
> (true,flink,flink,flink,flink) //Looks good
>
> Those two rows above seem to be redundant to be although the end result is
> correct. Doesn't see the same behavior if I join two topics. This unwanted
> message will lead to a lot of database operations underneath so any way to
> optimize this? I am using Flink 1.9 so not sure if this is already fixed in
> 1.10.
>
> Attached the code as well.
>
> Thanks!
> kant
>
>
>
>