You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by mxm <gi...@git.apache.org> on 2014/11/17 19:04:34 UTC

[GitHub] incubator-flink pull request: Implement the convenience methods co...

GitHub user mxm opened a pull request:

    https://github.com/apache/incubator-flink/pull/210

    Implement the convenience methods count and collect in DataSet

    These methods provide convenience for the API user to get intermediate results into the program.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mxm/incubator-flink count/collect

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-flink/pull/210.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #210
    
----
commit 3f65a895a2b6a12bd73806f85cad60022ba56c83
Author: Maximilian Michels <ma...@data-artisans.com>
Date:   2014-11-17T17:37:35Z

    [java-api] convenience method to get the number of elements in a DataSet

commit 800d29e6388c33cd7552304b2b9249a9d575b6d1
Author: Maximilian Michels <ma...@data-artisans.com>
Date:   2014-11-17T17:39:48Z

    [java-api] convenience method to get the elements of a DataSet

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Implement the convenience methods co...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the pull request:

    https://github.com/apache/incubator-flink/pull/210#issuecomment-63456473
  
    Very cool! I think this is a great addition and we will improve on the performance soon :)
    
    @zentol, as @StephanEwen said it makes sense to execute right away. You can think of this as "actions" from Spark, which get results back to the user. He can't use a map function, because he doesn't emit any records, but just adds them to the accumulator.
    
    I've tried this out locally and it works, but you have to add the random IDs as you run into problem with multiple actions in the same program otherwise.
    
    With this change, we also should allow to have prorams **without** sinks. Currently, you get a warning if you only have "actions" like collect/count without a sink.
    
    Before we can merge this, you should also add test cases for both operators and make sure that it works with multiple actions per program.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Implement the convenience methods co...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the pull request:

    https://github.com/apache/incubator-flink/pull/210#issuecomment-63365442
  
    well nevermind, just saw that you're calling `this.getExecutionEnvironment().execute();` within these methods, so when using it, `count()` or `collect()` would be the last method called. I find that very odd behaviour, i can't think of another method that executes than right away.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Implement the convenience methods co...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the pull request:

    https://github.com/apache/incubator-flink/pull/210#issuecomment-63363046
  
    Does this work when using `count()` or `collect()` multiple times in the same plan? 
    
    you use `this.getExecutionEnvironment().getIdString()` to identify the accumulators, and as far as i see this value is the same for all operators in a plan. As such, multiple usages of `count()` will return the sum count of all datasets, and collect() will throw an exception since the same key is used when calling `getRuntimeContext().addAccumulator(id, accumulator)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Implement the convenience methods co...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the pull request:

    https://github.com/apache/incubator-flink/pull/210#issuecomment-63467505
  
    hmm...alright, i can see the point. 
    
    Doesn't executing right away carries the risk of it being inefficient when using them multiple times though? Since it effectively means executing multiple jobs within the same program (i *think* ... ), any common part of the jobs are done an extra time. (if I'm wrong here skip the rest)
    
    example: 
    ```
    List l1 = A.map(X).map(Y).collect();
    List l2 = A.map(X).map(Z).collect();
    
    <some user code using l1 & l2>
    ```
    this would result in 2 jobs being executed, with map(X) being executed twice. whereas
    ```
    B = A.map(X);
    B.map(Y).collect("c1");
    B.map(Z).collect("c2");
    
    JobExecutionResult jre = env.execute()
    List l1 = jre.getAccumulatorResult("c1");
    List l2 = jre.getAccumulatorResult("c2);
    
    <some user code using l1 & l2>
    ```
    would only be 1 job, with map(X) done only once. it is not as pretty (by a fair margin i admit), but in line with the current API.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Implement the convenience methods co...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the pull request:

    https://github.com/apache/incubator-flink/pull/210#issuecomment-63365912
  
    Why didn't you implement these as an identity-(map)function and save the information to unique accumulators (maybe identified by a user-supplied ID? you could insert them into any plan at any point any number of times, i can see that being useful.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Implement the convenience methods co...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/incubator-flink/pull/210#issuecomment-63452925
  
    I think executing right away makes sense, because we need the `long` value or the `List<T>` immediately. This is a new type of methods, where you fetch back data to the driver program.
    
    As for IDs, I agree that a random generated ID per method makes sense.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Implement the convenience methods co...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the pull request:

    https://github.com/apache/incubator-flink/pull/210#issuecomment-63476823
  
    I changed the code to generate a unique identifier for each call of count or collect. 
    
    @zentol Executing in a lazy fashion could be implemented but would require additional changes to the API. The methods should then return something like a LocalDataSet which lets the user retrieve the accumulator result once the job has been executed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Implement the convenience methods co...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the pull request:

    https://github.com/apache/incubator-flink/pull/210#issuecomment-63449724
  
    >Why didn't you implement these as an identity-(map)function and save the information to unique accumulators (maybe identified by a user-supplied ID?
    
    Thank you for your feedback. The idea is to have intermediate results available to the user program within the execution of a Flink program. Currently, this is only possible through RichFunction which provide access to the RuntimeContext.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Implement the convenience methods co...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the pull request:

    https://github.com/apache/incubator-flink/pull/210#issuecomment-63479651
  
    why additional changes? you can already retrieve the accumulators by using the JobExecutionResult. 
    
    the only necessary change i see is removing the `execute()` within `collect()` and `count()`, and now that you generate the ID's yourself returning the ID, so the accumulators can be accessed later.
    
    ```
    B = A.map(X);
    String id1 = B.map(Y).collect();
    String id2 = B.map(Z).collect();
    
    JobExecutionResult jre = env.execute()
    List l1 = jre.getAccumulatorResult(id1);
    List l2 = jre.getAccumulatorResult(id2);
    
    <some user code using l1 & l2>
    ```
    
    why would this not be sufficient?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---