You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Lorenzo Nicora <lo...@gmail.com> on 2023/02/18 08:58:45 UTC

KeyedProcessFunction within an iteration

Hi all,

I am trying to implement an iterative streaming job that processes the loop
with a KeyedProcessFunction.

I need a KeyedProcessFunction to use keyed state and to emit a side-output
(that after further transformations becomes the feedback)

Problem is IterativeStream.process() only accepts ProcessFunction, no
KeyedProcessFunction.

The main and feedback streams have the same payload type, and I am keying
both before starting and closing the iteration.
I understand I cannot re-key after starting the iteration, as
IterativeStream does not support keyBy() and throws an
UnsupportedOperationException "Cannot change the input partitioning of an
iteration head directly. Apply the partitioning on the input and feedback
streams instead."

Is there any way of using keyed state within an iteration?
BTW,
I am using Flink 1.15.2 and I am bound to that version

Regards
Lorenzo

Re: KeyedProcessFunction within an iteration

Posted by Lorenzo Nicora <lo...@gmail.com>.
Hi Zhipeng

IterativeStreams does have keyBy() methods, but they all throw
UnsupportedOperationException [1]

For some context: the whole thing is to do message enrichment with asyncIO,
caching the enrichment info in state (with TTL).
I am using an iteration as RichAsyncFunction does not support state.
I didn't find a simpler way to do both async IO and caching in state.

Here is a shortened version of the flow

-----8<-----8<-----8<-----8<-----8<-----8<----
// Measure, SensorData and EnrichedMeasure are POJOs. The key (sensorId) is
an Integer.

/// flow....
DataStream<Measure> measures = env.addSource(new MeasuresSource());
IterativeStream.ConnectedIterativeStreams<Measure, SensorData> iteration =
measures

.iterate().withFeedbackType(TypeInformation.of(SensorData.class));
ConnectedStreams<Measure, SensorData> measureAndSensorDataBySensorId =
iteration
            // THIS THROWS UnsupportedOperationException
            // "Cannot change the input partitioning of an iteration hea
directly. Apply the partitioning on the input and feedback streams instead."
            .keyBy(Measure::getSensorId, SensorData::getSensorId);

//    CachedEnrichment extends KeyedCoProcessFunction<Integer, Measure,
SensorData, EnrichedMeasure>
//    It emits cache hit on main output and cache miss on a side-output
CachedEnrichment cachedEnrichment = new CachedEnrichment();
// Try enrichment from cache
SingleOutputStreamOperator<EnrichedMeasure> cacheHitEnrichedMeasures =
measureAndSensorDataBySensorId
            .process(cachedEnrichment);
DataStream<Measure> cacheMissMeasures = cacheHitEnrichedMeasures
            .getSideOutput(cachedEnrichment.cacheMissOutputTag);

// On cache miss fetch SensorData with async IO
SingleOutputStreamOperator<Tuple2<EnrichedMeasure, SensorData>>
enrichedMeasuresAndFetchedSensorData =
            AsyncDataStream.unorderedWait(
                    cacheMissMeasures,
                    //    AsyncEnrich extends RichAsyncFunction<Measure,
Tuple2<EnrichedMeasure, SensorData>>
                    new AsyncEnrich(),
                    ASYNC_CALL_TIMEOUT, TimeUnit.MILLISECONDS,
ASYNC_OPERATOR_CAPACITY);

// Close the loop with newly fetched SensorData
iteration.closeWith(enrichedMeasuresAndFetchedSensorData.map(t -> t.f1));

// Merge outputs
DataStream<EnrichedMeasure> allEnrichedMeasures = cacheHitEnrichedMeasures
            .union(enrichedMeasuresAndFetchedSensorData.map(t -> t.f0));
-----8<-----8<-----8<-----8<-----8<-----8<----

Also, the message of the UnsupportedOperationException thrown by
IterativeStreams.keyBy()
("...Apply the partitioning on the input and feedback streams instead")
does not look right.

I tried that (I made a loop with a single stream of
Either<Measure,SensorData>) but it seems there is no way
of processing an IterativeStream with a KeyedProcessFunction, nor to feed
back a KeyedStream into the loop.

-----8<-----8<-----8<-----8<-----8<-----8<----
....
KeyedStream<Either<Measure, SensorData>, Integer>
measuresOrSensorDataBySensorId = measures
                .map(m -> Either.<Measure, SensorData>Left(m)).returns(new
TypeHint<>() {})
                .keyBy(msd -> msd.isLeft() ? msd.left().getSensorId() :
msd.right().getSensorId());
IterativeStream<Either<Measure, SensorData>> iteration =
measuresOrSensorDataBySensorId.iterate();

CachedEnrichment cachedEnrichment = new CachedEnrichment();
// The following line DOES NOT COMPILE: IterativeStream.process() expects
ProcessFunction, not KeyedProcessFunction
SingleOutputStreamOperator<EnrichedMeasure> cacheHitEnrichedMeasures =
                 iteration.<EnrichedMeasure>process(cachedEnrichment,
TypeInformation.of(EnrichedMeasure.class));
....
KeyedStream<SensorData, Integer> fetchedSensorDataBySensorId =
enrichedMeasureAndFetchedSensorData
                .map(t -> t.f1).keyBy(SensorData::getSensorId);
// The following line DOES NOT COMPILE: closeWith() does not expect
KeyedStream
iteration.closeWith(fetchedSensorDataBySensorId);
-----8<-----8<-----8<-----8<-----8<-----8<----

I will have a look at the iteration module you mentioned.
I wasn't aware.

Thanks
Lorenzo

[1]
https://github.com/apache/flink/blob/release-1.15.2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java#L196


On Sat, 18 Feb 2023 at 12:38, Zhipeng Zhang <zh...@gmail.com> wrote:

> Hi Lorenzo,
>
> Could you provide some code example to reproduce your question? As I
> understand, IterativeStream#keyBy is supported since it is a subclass
> of DataStream.
>
> Moreover, we have implemented an unified iteration module for Flink
> [1] in Flink ML [2], which relies on Flink 1.15.2. Probably you can
> also have a try.
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=184615300
> [2]
> https://github.com/apache/flink-ml/blob/master/flink-ml-iteration/src/main/java/org/apache/flink/iteration/Iterations.java
>
> Lorenzo Nicora <lo...@gmail.com> 于2023年2月18日周六 17:00写道:
> >
> > Hi all,
> >
> > I am trying to implement an iterative streaming job that processes the
> loop with a KeyedProcessFunction.
> >
> > I need a KeyedProcessFunction to use keyed state and to emit a
> side-output (that after further transformations becomes the feedback)
> >
> > Problem is IterativeStream.process() only accepts ProcessFunction, no
> KeyedProcessFunction.
> >
> > The main and feedback streams have the same payload type, and I am
> keying both before starting and closing the iteration.
> > I understand I cannot re-key after starting the iteration, as
> IterativeStream does not support keyBy() and throws an
> UnsupportedOperationException "Cannot change the input partitioning of an
> iteration head directly. Apply the partitioning on the input and feedback
> streams instead."
> >
> > Is there any way of using keyed state within an iteration?
> > BTW,
> > I am using Flink 1.15.2 and I am bound to that version
> >
> > Regards
> > Lorenzo
>
>
>
> --
> best,
> Zhipeng
>

Re: KeyedProcessFunction within an iteration

Posted by Zhipeng Zhang <zh...@gmail.com>.
Hi Lorenzo,

Could you provide some code example to reproduce your question? As I
understand, IterativeStream#keyBy is supported since it is a subclass
of DataStream.

Moreover, we have implemented an unified iteration module for Flink
[1] in Flink ML [2], which relies on Flink 1.15.2. Probably you can
also have a try.

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=184615300
[2] https://github.com/apache/flink-ml/blob/master/flink-ml-iteration/src/main/java/org/apache/flink/iteration/Iterations.java

Lorenzo Nicora <lo...@gmail.com> 于2023年2月18日周六 17:00写道:
>
> Hi all,
>
> I am trying to implement an iterative streaming job that processes the loop with a KeyedProcessFunction.
>
> I need a KeyedProcessFunction to use keyed state and to emit a side-output (that after further transformations becomes the feedback)
>
> Problem is IterativeStream.process() only accepts ProcessFunction, no KeyedProcessFunction.
>
> The main and feedback streams have the same payload type, and I am keying both before starting and closing the iteration.
> I understand I cannot re-key after starting the iteration, as IterativeStream does not support keyBy() and throws an UnsupportedOperationException "Cannot change the input partitioning of an iteration head directly. Apply the partitioning on the input and feedback streams instead."
>
> Is there any way of using keyed state within an iteration?
> BTW,
> I am using Flink 1.15.2 and I am bound to that version
>
> Regards
> Lorenzo



-- 
best,
Zhipeng