You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Evgeny Kincharov (JIRA)" <ji...@apache.org> on 2017/01/13 15:19:26 UTC

[jira] [Commented] (FLINK-3133) Introduce collect()/count()/print() methods in DataStream API

    [ https://issues.apache.org/jira/browse/FLINK-3133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15821899#comment-15821899 ] 

Evgeny Kincharov commented on FLINK-3133:
-----------------------------------------

Hi [~mxm], I assign this issue to me again. I try to explain how I implemented this and some problems that I met with.
1. I planned to make the new sink that only save records passing through. It is not hard.
2. We need to have the possibility to execute the stream pipeline in the nonblocking mode. You propose add the method executeWithControl() instead the execute(). I know this is described in subtasks of FLINK-4272 but I implemented executeWithControl() in this issue only for debugging the sink from 1 (of course I delete it from here before PR)
3. We need to insert this sink into the pipeline. It is the hardest part of the implementation. If I add this sink after executeWithControl() - addSink doesn't change the pipeline due to some conversion transformations during executeWithControl.
The my current state here: https://github.com/kenmy/flink/tree/FLINK-3133_temp
The commit is here: https://github.com/kenmy/flink/commit/136d51c79008126ed586f2b50a997b03c84b21b3
As a result, I can't see the simple possibility to add "sample" without changing pipeline before start executing, It may be a problem. Possible solutions (IMHO) :
1 Change the design to
{code:java} 
StreamExecutionEnvironment env = StramEnvironment.getStreamExecutionEnvironment(); 
DataStream<Integer> printSink = env.addSource(..).print(); 
ResultQueryable queryObject = env.executeWithResultQueryable(); 
List<Integer> sampled = queryObject.retrieve(printSink, Time.seconds(5)); 
{code}
The main idea is to change the pipeline before the starting execution, not after.
{code:java} 
StreamExecutionEnvironment env = StramEnvironment.getStreamExecutionEnvironment();
Sampler sampler = new Sampler();
DataStream<DataType> streamData = env.addSource(..).map(..).sample(sampler);
JobClient jobClient = env.executeWithControl();
Iterable<DataType> sampled = sampler(Time.seconds(5));
{code}
2. Don't use sink, use another mechanism to intercept DataStream (like extending DataStream by method getSampler that will return object which allows to enable/disable storing transferred data for any DataStream). IMHO "sink" approach looks more lightweight.
What solution do you prefer, I recommend [1] but I may not know all bussiness needs. Or may be you know better solution?
Thanks.

> Introduce collect()/count()/print() methods in DataStream API
> -------------------------------------------------------------
>
>                 Key: FLINK-3133
>                 URL: https://issues.apache.org/jira/browse/FLINK-3133
>             Project: Flink
>          Issue Type: Improvement
>          Components: DataStream API, Streaming
>    Affects Versions: 0.10.0, 1.0.0, 0.10.1
>            Reporter: Maximilian Michels
>            Assignee: Evgeny Kincharov
>             Fix For: 1.0.0
>
>
> The DataSet API's methods {{collect()}}, {{count()}}, and {{print()}} should be mirrored to the DataStream API. 
> The semantics of the calls are different. We need to be able to sample parts of a stream, e.g. by supplying a time period in the arguments to the methods. Users should use the {{JobClient}} to retrieve the results.
> {code:java}
> StreamExecutionEnvironment env = StramEnvironment.getStreamExecutionEnvironment();
> DataStream<DataType> streamData = env.addSource(..).map(..);
> JobClient jobClient = env.executeWithControl();
> Iterable<DataType> sampled = jobClient.sampleStream(streamData, Time.seconds(5));
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)