You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Gábor Hermann (JIRA)" <ji...@apache.org> on 2017/08/11 12:09:00 UTC

[jira] [Commented] (FLINK-3133) Introduce collect()/count()/print() methods in DataStream API

    [ https://issues.apache.org/jira/browse/FLINK-3133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16123262#comment-16123262 ] 

Gábor Hermann commented on FLINK-3133:
--------------------------------------

What's the status of this? We already have collect() implemented [FLINK-1670|https://issues.apache.org/jira/browse/FLINK-1670], but it starts the execution, so we can only use it for one DataStream. For testing, it would be great to have the output of multiple DataStreams, like the design [~kenmy] proposed.

One another design I can imagine:
{code:java}
StreamExecutionEnvironment env = StreamEnvironment.getStreamExecutionEnvironment(); 
DataStream<Integer> printSink = env.addSource(..).print(); 
DataStream<String> otherSink = env.addSource(..).map(..).filter(..).print();
Future<List<Integer>> printSinkResults = printSink.collect()
Future<List<String>> otherSinkResults = otherSink.collect()
env.execute();
{code}
Where the Futures would complete when the execution is finished. Although, this would require the users to know how to use Futures.

Do you know of any other effort for making testing easier (apart from external libraries such as https://github.com/ottogroup/flink-spector)?

I see that [~kenmy] has already put effort into this, but I'd also be happy take up this issue if nobody's willing to work on it.

> Introduce collect()/count()/print() methods in DataStream API
> -------------------------------------------------------------
>
>                 Key: FLINK-3133
>                 URL: https://issues.apache.org/jira/browse/FLINK-3133
>             Project: Flink
>          Issue Type: Improvement
>          Components: DataStream API
>    Affects Versions: 0.10.0, 1.0.0, 0.10.1
>            Reporter: Maximilian Michels
>            Assignee: Evgeny Kincharov
>             Fix For: 1.0.0
>
>
> The DataSet API's methods {{collect()}}, {{count()}}, and {{print()}} should be mirrored to the DataStream API. 
> The semantics of the calls are different. We need to be able to sample parts of a stream, e.g. by supplying a time period in the arguments to the methods. Users should use the {{JobClient}} to retrieve the results.
> {code:java}
> StreamExecutionEnvironment env = StramEnvironment.getStreamExecutionEnvironment();
> DataStream<DataType> streamData = env.addSource(..).map(..);
> JobClient jobClient = env.executeWithControl();
> Iterable<DataType> sampled = jobClient.sampleStream(streamData, Time.seconds(5));
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)