You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Chen-Che Huang <ac...@gmail.com> on 2022/03/21 06:26:31 UTC

Is it possible to make SideOutput back to input stream?

Hi all,

We have an application where the operations on some keys depend on the
results of related keys. Assume that there are
two keys k1 and k2 that have some relationship between them. Our
application won't send the value for key k1 to the data sink
when the value for key k2 was sent to the data sink earlier. To do so, we
hope that our Flink application can send some value
information for key k2 to SideOutput and the SideOutput becomes the input
of the original stream (see below).

dataSource1
.union(dataSource2)
.iterate(
inStream => {
val outStream = inStream
.keyBy(_.key)
.connect(relationshipSource)
.process(new CustomOperator())

(outStream.getSideOutput(CustomOperator.Result), outStream)
}
)
.disableChaining()
.name(OperatorKey.Name).uid(OperatorKey.Name)

However, although our Flink application can write value info to SideOutput
successfully, the data in SideOutput won't be
sent to the input stream. We wonder whether it's doable for our scenario
with Flink? If so, how should we modify our code to
achieve the goal? Many thanks for any comments.

Best regards,
Chen-Che Huang

Re: Is it possible to make SideOutput back to input stream?

Posted by Chen-Che Huang <ac...@gmail.com>.
HI Guowei,

Will check the doc out. Thanks for your help.

Best regards,
Chen-Che

On Mon, Mar 21, 2022 at 4:05 PM Guowei Ma <gu...@gmail.com> wrote:

> Hi, Huang
> From the document[1] it seems that you need to close the iterate stream.
> such as `iteration.closeWith(feedback);`
> BTW You also could get a detailed iteration example from here [2].
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/overview/#iterate
> [2]
> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
>
> Best,
> Guowei
>
>
> On Mon, Mar 21, 2022 at 2:27 PM Chen-Che Huang <ac...@gmail.com> wrote:
>
>> Hi all,
>>
>> We have an application where the operations on some keys depend on the
>> results of related keys. Assume that there are
>> two keys k1 and k2 that have some relationship between them. Our
>> application won't send the value for key k1 to the data sink
>> when the value for key k2 was sent to the data sink earlier. To do so, we
>> hope that our Flink application can send some value
>> information for key k2 to SideOutput and the SideOutput becomes the input
>> of the original stream (see below).
>>
>> dataSource1
>> .union(dataSource2)
>> .iterate(
>> inStream => {
>> val outStream = inStream
>> .keyBy(_.key)
>> .connect(relationshipSource)
>> .process(new CustomOperator())
>>
>> (outStream.getSideOutput(CustomOperator.Result), outStream)
>> }
>> )
>> .disableChaining()
>> .name(OperatorKey.Name).uid(OperatorKey.Name)
>>
>> However, although our Flink application can write value info to
>> SideOutput successfully, the data in SideOutput won't be
>> sent to the input stream. We wonder whether it's doable for our scenario
>> with Flink? If so, how should we modify our code to
>> achieve the goal? Many thanks for any comments.
>>
>> Best regards,
>> Chen-Che Huang
>>
>

Re: Is it possible to make SideOutput back to input stream?

Posted by Guowei Ma <gu...@gmail.com>.
Hi, Huang
From the document[1] it seems that you need to close the iterate stream.
such as `iteration.closeWith(feedback);`
BTW You also could get a detailed iteration example from here [2].

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/overview/#iterate
[2]
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java

Best,
Guowei


On Mon, Mar 21, 2022 at 2:27 PM Chen-Che Huang <ac...@gmail.com> wrote:

> Hi all,
>
> We have an application where the operations on some keys depend on the
> results of related keys. Assume that there are
> two keys k1 and k2 that have some relationship between them. Our
> application won't send the value for key k1 to the data sink
> when the value for key k2 was sent to the data sink earlier. To do so, we
> hope that our Flink application can send some value
> information for key k2 to SideOutput and the SideOutput becomes the input
> of the original stream (see below).
>
> dataSource1
> .union(dataSource2)
> .iterate(
> inStream => {
> val outStream = inStream
> .keyBy(_.key)
> .connect(relationshipSource)
> .process(new CustomOperator())
>
> (outStream.getSideOutput(CustomOperator.Result), outStream)
> }
> )
> .disableChaining()
> .name(OperatorKey.Name).uid(OperatorKey.Name)
>
> However, although our Flink application can write value info to SideOutput
> successfully, the data in SideOutput won't be
> sent to the input stream. We wonder whether it's doable for our scenario
> with Flink? If so, how should we modify our code to
> achieve the goal? Many thanks for any comments.
>
> Best regards,
> Chen-Che Huang
>