You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by gyfora <gi...@git.apache.org> on 2014/12/21 02:17:43 UTC

[GitHub] incubator-flink pull request: Streaming Scala API

GitHub user gyfora opened a pull request:

    https://github.com/apache/incubator-flink/pull/275

    Streaming Scala API

    This PR contains the commits for the Scala api for Flink Streaming. Most functionality is already implemented and I think it is ready to merge after maybe some slight refactoring.
    
    I would appreciate if someone with more scala experience could look it through, that code works but it might not be very pretty everywhere.
    
    Also If anyone can please run a scala formatter on the code since it seems impossible to get line wrapping settings in eclipse so I had to break the lines manually to make travis pass.
    
    What's missing:
    - groupBy("field") operator for case classes. The streaming api currently doesn't use the comparators but instead it extracts the keys from elements. I need to figure this out for case classes.
    - Connected data stream functionality, I will add this soon. This is trivial.
    - Proper scala code-style formatting, see above
    - I am not sure about the package structure, any comments on how I did it?
    
    Feel free to play around and test the features, I am sure that there will be bugs to fix :)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mbalassi/incubator-flink scala-api

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-flink/pull/275.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #275
    
----
commit 4c65fcb45a3c23b6b67af82522c6fc43b27b9c87
Author: Gyula Fora <gy...@apache.org>
Date:   2014-12-11T14:22:03Z

    [scala] [streaming] Base functionality added for streaming scala api

commit 11a08ca6660ee426931f3bcfcc38f4ce1258c6b4
Author: Gyula Fora <gy...@apache.org>
Date:   2014-12-11T23:12:49Z

    [scala] [streaming] Extended scala data stream functionality to include simple operators

commit cf63e578d33732f417309f32e161bf33b393e145
Author: Gyula Fora <gy...@apache.org>
Date:   2014-12-13T00:08:08Z

    [scala] [streaming] Finished scala StreamExecutionEnvrionment functionality + DataStream sinks + docs

commit f09a6d9754f90fac27c954c9ce7281b31e0c69c2
Author: mbalassi <mb...@apache.org>
Date:   2014-12-15T13:15:35Z

    [scala] [streaming] Added SplitDataStream functionality
    
    Conflicts:
    	flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala

commit 40ec285ffac7659f6d2a6be4fe7868ee48899f1b
Author: Gyula Fora <gy...@apache.org>
Date:   2014-12-15T15:21:00Z

    [scala] [streaming] Windowing functionality added to scala api

commit fc8118dae8b6eebdebb7bb4125c04fb76ab5e282
Author: Gyula Fora <gy...@apache.org>
Date:   2014-12-18T21:46:00Z

    [scala] [streaming] Added support for iterative streams for scala api

commit 14ef43140391c0d2c467d4aa81c4e79d7c08b4dc
Author: Gyula Fora <gy...@apache.org>
Date:   2014-12-19T20:05:01Z

    [streaming] Temporal join and cross rework for consistence and extended features

commit 9448335378341b8ddc940bdf15075f20ede1e386
Author: Gyula Fora <gy...@apache.org>
Date:   2014-12-20T17:12:19Z

    [scala] [streaming] Temporal join operator added

commit fe3f9a97e1665decee5f5029e90adafe61aa7c27
Author: Gyula Fora <gy...@apache.org>
Date:   2014-12-20T18:34:55Z

    [scala] [streaming] Temporal cross operator added

commit 4dfd4a1c5efa20e75f166390c76fddf73cbba42b
Author: Gyula Fora <gy...@apache.org>
Date:   2014-12-20T22:46:35Z

    [scala] [streaming] Modified aggregations to work on scala tuples

commit 91fe7d8630676a71b0d1d9dd12d96fa9075362e3
Author: Gyula Fora <gy...@apache.org>
Date:   2014-12-21T00:05:40Z

    [scala] [streaming] Fixed scala formatting

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Streaming Scala API

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the pull request:

    https://github.com/apache/incubator-flink/pull/275#issuecomment-67830412
  
    You are right I also use the comparators for pojo types. I didnt use comparators for everythibg else because it was trivial to extract keys from tuples and arrays. But i guess its better to have the Keys approach I will implement it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Streaming Scala API

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the pull request:

    https://github.com/apache/incubator-flink/pull/275#issuecomment-67823274
  
    What I don't get is how to use the Keys to get the actual key (like I do in the selectors), maybe I am missing something trivial here. If I knew I would have used them trust me :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Streaming Scala API

Posted by senorcarbone <gi...@git.apache.org>.
Github user senorcarbone commented on the pull request:

    https://github.com/apache/incubator-flink/pull/275#issuecomment-67777220
  
    +1
    Nice job!
    @Marton I can also help with the scala examples, will have some free time
    during holidays.
    
    On Sun, Dec 21, 2014 at 5:45 PM, Márton Balassi <no...@github.com>
    wrote:
    
    > Looks great, @gyfora <https://github.com/gyfora>. The windowed join on
    > named fields of case classes is simply beautiful.
    >
    > I like the project structure, adding the streaming scala code under
    > flink-scala. Another somewhat viable option could be to open a
    > flink-streaming-scala project (and rename flink-streaming-core to f-s-java,
    > or even have separate f-s-core and f-s-java projects).
    >
    > Would like to merge it soon, because it not only includes the streaming
    > scala API but some much needed refactor of our java API.
    >
    > Also would like to try out the scala API by implementing the examples with
    > a couple of volunteers. The streaming examples are fairly short and
    > straightforward compared to the batch ones, so a handful of people is more
    > than enough in that department.
    >
    > —
    > Reply to this email directly or view it on GitHub
    > <https://github.com/apache/incubator-flink/pull/275#issuecomment-67776196>
    > .
    >


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Streaming Scala API

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/incubator-flink/pull/275#issuecomment-67820873
  
    Why does Streaming use it's own classes for field selectors? I.e. FieldsKeySelector, CaseClassKeySelector. Streaming also has it's own selector classes in the Java API.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Streaming Scala API

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the pull request:

    https://github.com/apache/incubator-flink/pull/275#issuecomment-67821986
  
    It should be possible to put the two scala selectors together in a ScalaFieldsKeySelector but I think we need them


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Streaming Scala API

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the pull request:

    https://github.com/apache/incubator-flink/pull/275#issuecomment-67849811
  
    Okay I will fix this when I have some time. Today or tomorrow :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Streaming Scala API

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the pull request:

    https://github.com/apache/incubator-flink/pull/275#issuecomment-67997069
  
    Okay as @aljoscha and @rmetzger suggested I reworked our grouping and removed the extra classes and substituted them with the use of Keys from the batch API, works well and looks good :)
    
    The only thing I left in is the ArrayKeySelector which we use in the Streaming api to allow grouping and aggregating on array types as well.
    
    Let's merge this soon because I am also pushing out a rework of the streaming sources for better parallelism handling and also on the connectors to other systems which builds on this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Streaming Scala API

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/incubator-flink/pull/275#issuecomment-67822151
  
    But in the batch API we don't have special KeySelectors, everything is handled uniformly in Keys.java. The field keys and expression keys support Java Tuples, Scala Tuples, Pojos and Case Classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Streaming Scala API

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/incubator-flink/pull/275#issuecomment-67822719
  
    I like the work, overall. It's just some nitpicking here and there. :smile: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Streaming Scala API

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the pull request:

    https://github.com/apache/incubator-flink/pull/275#issuecomment-67821435
  
    The Java api FieldsKeySelector works on array, java tuple and pojo types as well, but it doesnt work on scala tuples.
    
    The Scala FieldsKeySelector works on Java and Scala tuples, and the CaseClassKeySelector works on case classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Streaming Scala API

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/incubator-flink/pull/275#issuecomment-67822657
  
    Also, as @mbalassi mentioned, we should really think about the directory structure now. Having Java Streaming in addons, Scala Streaming in flink-scala and the examples also scattered all over the place does not make things very manageable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Streaming Scala API

Posted by mbalassi <gi...@git.apache.org>.
Github user mbalassi commented on the pull request:

    https://github.com/apache/incubator-flink/pull/275#issuecomment-67776196
  
    Looks great, @gyfora. The windowed join on named fields of case classes is simply beautiful.
    
    I like the project structure, adding the streaming scala code under flink-scala. Another somewhat viable option could be to open a flink-streaming-scala project (and rename flink-streaming-core to f-s-java, or even have separate f-s-core and f-s-java projects). 
    
    Would like to merge it soon, because it not only includes the streaming scala API but some much needed refactor of our java API.
    
    Also would like to try out the scala API by implementing the examples with a couple of volunteers. The streaming examples are fairly short and straightforward compared to the batch ones, so a handful of people is more than enough in that department. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Streaming Scala API

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-flink/pull/275


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Streaming Scala API

Posted by mbalassi <gi...@git.apache.org>.
Github user mbalassi commented on the pull request:

    https://github.com/apache/incubator-flink/pull/275#issuecomment-67864400
  
    @aljoscha and @fhueske on the package structure:
    
    We hope that the point in time when streaming can "graduate" from the flink-addons project is near, but to be honest some work is still needed there. If prioritized that that can be achived for the 0.9 release, if not it is safe to say that it can be done for the 1.0.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Streaming Scala API

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/incubator-flink/pull/275#issuecomment-67841095
  
    +1 for having a clear code structure. IMO, we should either move the whole flink-streaming module to the root folder or keep everything under flink-addons.
    Having it as a module of the root folder would mark it as reasonably stable (API & runtime-wise).
    I haven't touched the streaming part yet, so I cannot tell but would of course be fine with moving it if the streaming contributors feel it is ready ;-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Streaming Scala API

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the pull request:

    https://github.com/apache/incubator-flink/pull/275#issuecomment-67791071
  
    I suggest to merge this asap after the package structure and code formatting is sorted out. And we can add features as we go. As Marton said, I also refactored some parts of the java api and we would like to build on top of that.
    
    @StephanEwen or @aljoscha can you guys please help me with this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Streaming Scala API

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/incubator-flink/pull/275#issuecomment-67827360
  
    Yes, @rmetzger added some magic when he unified Pojo and Tuple keys. TypeComparator has a method extractKeys() that does exactly that. It extracts the key fields of a tuple or object and stores them in an array. I think all the infrastructure is there. You can just use Keys.java and create TypeComparators, those you can use for everything else. Maybe @rmetzger can have a look and make some suggestions for how this could best be implemented in the Streaming API.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Re: [GitHub] incubator-flink pull request: Streaming Scala API

Posted by Ufuk Celebi <uc...@apache.org>.
On Wed, Dec 24, 2014 at 1:38 PM, StephanEwen <gi...@git.apache.org> wrote:

>     Concerning the project structure: For the batch part, there are
> discussions
>     to combine the "flink-core", "flink-java" projects, possibly also the
>     "flink-scala" project. We are starting to see too many
> interdependencies.
>
>     May be good to keep in mind when you decide on a project structure.
>
>     We could, in the next version, go for something like
>
>     - flink-core (core and batch, java & scala)
>     - flink-streaming (java & scala)
>     - flink-runtime
>


+1 I like that idea. Maybe you should open a separate thread or issue for
it. :-)

Re: [GitHub] incubator-flink pull request: Streaming Scala API

Posted by Ufuk Celebi <uc...@apache.org>.
On Wed, Dec 24, 2014 at 1:38 PM, StephanEwen <gi...@git.apache.org> wrote:

>     Concerning the project structure: For the batch part, there are
> discussions
>     to combine the "flink-core", "flink-java" projects, possibly also the
>     "flink-scala" project. We are starting to see too many
> interdependencies.
>
>     May be good to keep in mind when you decide on a project structure.
>
>     We could, in the next version, go for something like
>
>     - flink-core (core and batch, java & scala)
>     - flink-streaming (java & scala)
>     - flink-runtime
>


+1 I like that idea. Maybe you should open a separate thread or issue for
it. :-)

[GitHub] incubator-flink pull request: Streaming Scala API

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/incubator-flink/pull/275#issuecomment-68049822
  
    Concerning the project structure: For the batch part, there are discussions
    to combine the "flink-core", "flink-java" projects, possibly also the
    "flink-scala" project. We are starting to see too many interdependencies.
    
    May be good to keep in mind when you decide on a project structure.
    
    We could, in the next version, go for something like
    
    - flink-core (core and batch, java & scala)
    - flink-streaming (java & scala)
    - flink-runtime
    - ...
    Am 23.12.2014 21:52 schrieb "Gyula Fora" <no...@github.com>:
    
    > Okay as @aljoscha <https://github.com/aljoscha> and @rmetzger
    > <https://github.com/rmetzger> suggested I reworked our grouping and
    > removed the extra classes and substituted them with the use of Keys from
    > the batch API, works well and looks good :)
    >
    > The only thing I left in is the ArrayKeySelector which we use in the
    > Streaming api to allow grouping and aggregating on array types as well.
    >
    > Let's merge this soon because I am also pushing out a rework of the
    > streaming sources for better parallelism handling and also on the
    > connectors to other systems which builds on this PR.
    >
    > —
    > Reply to this email directly or view it on GitHub
    > <https://github.com/apache/incubator-flink/pull/275#issuecomment-67997069>
    > .
    >


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Streaming Scala API

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/incubator-flink/pull/275#issuecomment-67846346
  
    I would suggest to use the Keys class also for Tuples.
    If you have a Tuple3<Tuple2<String, Int>, Int, Int> and you select key 0 (which is a Tuple2), we internally handle this as keys[] = {0,1}.
    If you really want to select only the String field of the Tuple2, you have to use string -field section "f0.f0" and you'll get 0.
    
    If you just use the "trivial" extraction (= directly using the user input), you might cause unexpected behavior.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Streaming Scala API

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/incubator-flink/pull/275#issuecomment-67834220
  
    Plus, you get all the support for nesting tuples, pojos and case classes that is already there. :smile_cat: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---