You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Adrienne Kole <ad...@gmail.com> on 2019/03/29 13:09:20 UTC

Source reinterpretAsKeyedStream

Dear community,

I have a use-case where sources are keyed.
For example, there is a source function with parallelism 10, and each
instance has its own key.
I used reinterpretAsKeyedStream to convert source DataStream to
KeyedStream, however, I get an IllegalArgument exception.
Is reinterpretAsKeyedStream can be used with source operators as well, or
should the operator to be used be already partitioned (by keyby(..)) ?

Thanks,
Adrienne

Re: Source reinterpretAsKeyedStream

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

Konstantin is right.
reinterpreteAsKeyedStream only works if you call it on a DataStream that
was keyBy'ed before (with the same parallelism).
Flink cannot reuse the partioning of another system like Kafka.

Best, Fabian


Adrienne Kole <ad...@gmail.com> schrieb am Do., 4. Apr. 2019, 14:33:

> Thanks a lot for the replies.
>
> Below I paste my code:
>
>
>         DataStreamSource<Tuple> source = env.addSource(new MySource());
>         KeyedStream<Tuple, Integer> keyedStream =
> DataStreamUtils.reinterpretAsKeyedStream(source, new DummyKeySelector(),
> TypeInformation.of(Integer.class) );
>         keyedStream.timeWindow(Time.seconds(1)).apply(new
> WindowFunction<Tuple, Object, Integer, TimeWindow>() {
>             @Override
>             public void apply(Integer integer, TimeWindow timeWindow,
> Iterable<Tuple> iterable, Collector<Object> collector) throws Exception {
>                 collector.collect(1);
>             }
>         });
>         env.execute("Test");
>
>     static class DummyKeySelector implements KeySelector<Tuple, Integer> {
>
>         @Override
>         public Integer getKey(Tuple value) throws Exception {
>             return value.getSourceID();
>         }
>     }
>
>     static class MySource extends RichParallelSourceFunction<Tuple> {
>         public MySource() {
>             this.sourceID = sourceID;
>         }
>         @Override
>         public void open(Configuration parameters) throws Exception {
>             sourceID = sourceID +
> getRuntimeContext().getIndexOfThisSubtask();
>         }
>
>         @Override
>         public void run(SourceContext<Tuple> ctx) throws Exception {
>             while (true) {
>                 Tuple tuple = new Tuple(sourceID);
>                 ctx.collect(tuple);
>             }
>         }
>
>         @Override
>         public void cancel() {
>
>         }
>     }
>
>
> Whatever I do, I get
> Caused by: java.lang.IllegalArgumentException
>     at
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
>
> When I check the details from the source code, it seems that some keys are
> not within allowed key range, that is why Flink throws an exception.
> In this case, as Konstantin said, it is not possible to interpret source
> as keyed.
> Please correct me if I am wrong.
>
>
> Thanks,
> Adrienne
>
>
>
>
>
>
>
> On Wed, Apr 3, 2019 at 8:08 PM Konstantin Knauf <ko...@ververica.com>
> wrote:
>
>> Hi Adrienne,
>>
>> you can only use DataStream#reinterpretAsKeyedStream on a stream, which
>> has previously been keyed/partitioned by Flink with exactly the same
>> KeySelector as given to reinterpretAsKeyedStream. It does not work with a
>> key-partitioned stream, which has been partitioned by any other process.
>>
>> Best,
>>
>> Konstantin
>>
>> On Fri, Mar 29, 2019 at 11:47 PM Rong Rong <wa...@gmail.com> wrote:
>>
>>> Hi Adrienne,
>>>
>>> I think you should be able to reinterpretAsKeyedStream by passing in a
>>> DataStreamSource based on the ITCase example [1].
>>> Can you share the full code/error logs or the IAE?
>>>
>>> --
>>> Rong
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/release-1.7.2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/ReinterpretDataStreamAsKeyedStreamITCase.java#L98
>>>
>>> On Fri, Mar 29, 2019 at 6:09 AM Adrienne Kole <ad...@gmail.com>
>>> wrote:
>>>
>>>> Dear community,
>>>>
>>>> I have a use-case where sources are keyed.
>>>> For example, there is a source function with parallelism 10, and each
>>>> instance has its own key.
>>>> I used reinterpretAsKeyedStream to convert source DataStream to
>>>> KeyedStream, however, I get an IllegalArgument exception.
>>>> Is reinterpretAsKeyedStream can be used with source operators as well,
>>>> or should the operator to be used be already partitioned (by keyby(..)) ?
>>>>
>>>> Thanks,
>>>> Adrienne
>>>>
>>>
>>
>> --
>>
>> Konstantin Knauf | Solutions Architect
>>
>> +49 160 91394525
>>
>> <https://www.ververica.com/>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Data Artisans GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>>
>

Re: Source reinterpretAsKeyedStream

Posted by Adrienne Kole <ad...@gmail.com>.
Thanks a lot for the replies.

Below I paste my code:


        DataStreamSource<Tuple> source = env.addSource(new MySource());
        KeyedStream<Tuple, Integer> keyedStream =
DataStreamUtils.reinterpretAsKeyedStream(source, new DummyKeySelector(),
TypeInformation.of(Integer.class) );
        keyedStream.timeWindow(Time.seconds(1)).apply(new
WindowFunction<Tuple, Object, Integer, TimeWindow>() {
            @Override
            public void apply(Integer integer, TimeWindow timeWindow,
Iterable<Tuple> iterable, Collector<Object> collector) throws Exception {
                collector.collect(1);
            }
        });
        env.execute("Test");

    static class DummyKeySelector implements KeySelector<Tuple, Integer> {

        @Override
        public Integer getKey(Tuple value) throws Exception {
            return value.getSourceID();
        }
    }

    static class MySource extends RichParallelSourceFunction<Tuple> {
        public MySource() {
            this.sourceID = sourceID;
        }
        @Override
        public void open(Configuration parameters) throws Exception {
            sourceID = sourceID +
getRuntimeContext().getIndexOfThisSubtask();
        }

        @Override
        public void run(SourceContext<Tuple> ctx) throws Exception {
            while (true) {
                Tuple tuple = new Tuple(sourceID);
                ctx.collect(tuple);
            }
        }

        @Override
        public void cancel() {

        }
    }


Whatever I do, I get
Caused by: java.lang.IllegalArgumentException
    at
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)

When I check the details from the source code, it seems that some keys are
not within allowed key range, that is why Flink throws an exception.
In this case, as Konstantin said, it is not possible to interpret source as
keyed.
Please correct me if I am wrong.


Thanks,
Adrienne







On Wed, Apr 3, 2019 at 8:08 PM Konstantin Knauf <ko...@ververica.com>
wrote:

> Hi Adrienne,
>
> you can only use DataStream#reinterpretAsKeyedStream on a stream, which
> has previously been keyed/partitioned by Flink with exactly the same
> KeySelector as given to reinterpretAsKeyedStream. It does not work with a
> key-partitioned stream, which has been partitioned by any other process.
>
> Best,
>
> Konstantin
>
> On Fri, Mar 29, 2019 at 11:47 PM Rong Rong <wa...@gmail.com> wrote:
>
>> Hi Adrienne,
>>
>> I think you should be able to reinterpretAsKeyedStream by passing in a
>> DataStreamSource based on the ITCase example [1].
>> Can you share the full code/error logs or the IAE?
>>
>> --
>> Rong
>>
>> [1]
>> https://github.com/apache/flink/blob/release-1.7.2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/ReinterpretDataStreamAsKeyedStreamITCase.java#L98
>>
>> On Fri, Mar 29, 2019 at 6:09 AM Adrienne Kole <ad...@gmail.com>
>> wrote:
>>
>>> Dear community,
>>>
>>> I have a use-case where sources are keyed.
>>> For example, there is a source function with parallelism 10, and each
>>> instance has its own key.
>>> I used reinterpretAsKeyedStream to convert source DataStream to
>>> KeyedStream, however, I get an IllegalArgument exception.
>>> Is reinterpretAsKeyedStream can be used with source operators as well,
>>> or should the operator to be used be already partitioned (by keyby(..)) ?
>>>
>>> Thanks,
>>> Adrienne
>>>
>>
>
> --
>
> Konstantin Knauf | Solutions Architect
>
> +49 160 91394525
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Data Artisans GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>

Re: Source reinterpretAsKeyedStream

Posted by Konstantin Knauf <ko...@ververica.com>.
Hi Adrienne,

you can only use DataStream#reinterpretAsKeyedStream on a stream, which has
previously been keyed/partitioned by Flink with exactly the same
KeySelector as given to reinterpretAsKeyedStream. It does not work with a
key-partitioned stream, which has been partitioned by any other process.

Best,

Konstantin

On Fri, Mar 29, 2019 at 11:47 PM Rong Rong <wa...@gmail.com> wrote:

> Hi Adrienne,
>
> I think you should be able to reinterpretAsKeyedStream by passing in a
> DataStreamSource based on the ITCase example [1].
> Can you share the full code/error logs or the IAE?
>
> --
> Rong
>
> [1]
> https://github.com/apache/flink/blob/release-1.7.2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/ReinterpretDataStreamAsKeyedStreamITCase.java#L98
>
> On Fri, Mar 29, 2019 at 6:09 AM Adrienne Kole <ad...@gmail.com>
> wrote:
>
>> Dear community,
>>
>> I have a use-case where sources are keyed.
>> For example, there is a source function with parallelism 10, and each
>> instance has its own key.
>> I used reinterpretAsKeyedStream to convert source DataStream to
>> KeyedStream, however, I get an IllegalArgument exception.
>> Is reinterpretAsKeyedStream can be used with source operators as well, or
>> should the operator to be used be already partitioned (by keyby(..)) ?
>>
>> Thanks,
>> Adrienne
>>
>

-- 

Konstantin Knauf | Solutions Architect

+49 160 91394525

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen

Re: Source reinterpretAsKeyedStream

Posted by Rong Rong <wa...@gmail.com>.
Hi Adrienne,

I think you should be able to reinterpretAsKeyedStream by passing in a
DataStreamSource based on the ITCase example [1].
Can you share the full code/error logs or the IAE?

--
Rong

[1]
https://github.com/apache/flink/blob/release-1.7.2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/ReinterpretDataStreamAsKeyedStreamITCase.java#L98

On Fri, Mar 29, 2019 at 6:09 AM Adrienne Kole <ad...@gmail.com>
wrote:

> Dear community,
>
> I have a use-case where sources are keyed.
> For example, there is a source function with parallelism 10, and each
> instance has its own key.
> I used reinterpretAsKeyedStream to convert source DataStream to
> KeyedStream, however, I get an IllegalArgument exception.
> Is reinterpretAsKeyedStream can be used with source operators as well, or
> should the operator to be used be already partitioned (by keyby(..)) ?
>
> Thanks,
> Adrienne
>