You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Martin Neumann <mn...@spotify.com> on 2014/10/21 14:03:58 UTC

how load/group with large csv files

Hej,

I have a csv file with 54 columns each of them is string (for now). I need
to group and sort them on field 15.

Whats the best way to load the data into Flink?
There is no Tuple54 (and the <> would look awful anyway with 54 times
String in it).
My current Idea is to write a Mapper and split the string to Arrays of
Strings would grouping and sorting work on this?

So can I do something like this or does that only work on tuples:
Dataset<String[]> ds;
ds.groupBy(15).sort(20. ANY)

cheers Martin

Re: how load/group with large csv files

Posted by Gyula Fóra <gy...@gmail.com>.
Motivated both by Martin and our recent use-case, I updated
<https://github.com/mbalassi/incubator-flink/commit/a6cd742958dace6ead896af189983933470d8eb1>
the groupBys and aggregations for the Streaming API to work also on arrays
by default.

I think it would probably make sense to do something similar for the batch
too:

DataStream<double[500]>  ds = ... ;
ds.groupBy(100,200).sum(10);

(Here we group by the 100 and 200th field and sum the 10th)

What do you think?

Gyula

On Tue, Oct 21, 2014 at 7:04 PM, Martin Neumann <mn...@spotify.com>
wrote:

> There was not enough time to clean it up and gold plate it. He got semi
> horrible java code now with some explanation how it would look in scala.
> My colleague was asking for a quick (and dirty) job, so taking more time on
> it would have defied the purpose of the whole thing a bit.
>
> In any case thanks for the advice, hopefully I found us another Flink
> supporter.
>
> On Tue, Oct 21, 2014 at 3:52 PM, Stephan Ewen <se...@apache.org> wrote:
>
> > Hej,
> >
> > Do you want to use Scala? You can use simple case classes there and use
> > fields directly as keys, it will look very elegant...
> >
> > If you want to stick with Java, you can actually use POJOs (Robert just
> > corrected me, expression keys should be available there)
> >
> > Can you define a class
> >
> > public class MyClass {
> >     public String id;
> >     public int someValue;
> >     public double anotherValue;
> >     ...
> > }
> >
> > and then run a program like this:
> >
> > DataSet<MyClass> data = env.readAsText(...).map(new Parser());
> >
> > data.groupBy("id").sort("someValue").reduceGroup(new
> > GroupReduceFunction(...));
> >
> >
> > Feel free to post your program here so we can give you comments!
> >
> > Greetings,
> > Stephan
> >
> >
> >
> > On Tue, Oct 21, 2014 at 3:32 PM, Martin Neumann <mn...@spotify.com>
> > wrote:
> >
> > > Nope,
> > >
> > > but I cant filter out the useless data since the program I'm comparing
> to
> > > does not either. The point is to prove to one of my Colleague that
> Flink
> > >
> > > Spark.
> > > The Spark program runs out of memory and crashes when just doing a
> simple
> > > group and counting the number of items.
> > >
> > > This is also one of the reasons I ask for what is the best style of
> doing
> > > this so I can get it as clean as possible to compare it to Spark.
> > >
> > > cheers Martin
> > >
> > >
> > > On Tue, Oct 21, 2014 at 3:07 PM, Aljoscha Krettek <aljoscha@apache.org
> >
> > > wrote:
> > >
> > > > By the way, do you actually need all those 54 columns in your job?
> > > >
> > > > On Tue, Oct 21, 2014 at 3:02 PM, Martin Neumann <
> mneumann@spotify.com>
> > > > wrote:
> > > > > I will go with that workaround, however I would have preferred if I
> > > could
> > > > > have done that directly with the API instead of doing Map/Reduce
> like
> > > > > Key/Value tuples again :-)
> > > > >
> > > > > By the way is there a simple function to count the number of items
> > in a
> > > > > reduce group? It feels stupid to write a GroupReduce that just
> > iterates
> > > > and
> > > > > increments a counter.
> > > > >
> > > > > cheers Martin
> > > > >
> > > > > On Tue, Oct 21, 2014 at 2:54 PM, Robert Metzger <
> rmetzger@apache.org
> > >
> > > > wrote:
> > > > >
> > > > >> Yes, for sorted groups, you need to use Pojos or Tuples.
> > > > >> I think you have to split the input lines manually, with a mapper.
> > > > >> How about using a TupleN<...> with only the fields you need?
> > (returned
> > > > by
> > > > >> the mapper)
> > > > >>
> > > > >> if you need all fields, you could also use a Tuple2<String,
> > String[]>
> > > > where
> > > > >> the first position is the sort key?
> > > > >>
> > > > >>
> > > > >>
> > > > >> On Tue, Oct 21, 2014 at 2:20 PM, Gyula Fora <gy...@apache.org>
> > > wrote:
> > > > >>
> > > > >> > I am not sure how you should go about that, let’s wait for some
> > > > feedback
> > > > >> > from the others.
> > > > >> >
> > > > >> > Until then you can always map the array to (array, keyfield) and
> > use
> > > > >> > groupBy(1).
> > > > >> >
> > > > >> >
> > > > >> > > On 21 Oct 2014, at 14:17, Martin Neumann <
> mneumann@spotify.com>
> > > > wrote:
> > > > >> > >
> > > > >> > > Hej,
> > > > >> > >
> > > > >> > > Unfortunately .sort() cannot take a key extractor, would I
> have
> > to
> > > > do
> > > > >> the
> > > > >> > > sort myself then?
> > > > >> > >
> > > > >> > > cheers Martin
> > > > >> > >
> > > > >> > > On Tue, Oct 21, 2014 at 2:08 PM, Gyula Fora <
> gyfora@apache.org>
> > > > wrote:
> > > > >> > >
> > > > >> > >> Hey,
> > > > >> > >>
> > > > >> > >> Using arrays is probably a convenient way to do so.
> > > > >> > >>
> > > > >> > >> I think the way you described the groupBy only works for
> tuples
> > > > now.
> > > > >> To
> > > > >> > do
> > > > >> > >> the grouping on the array field, you would need to create a
> key
> > > > >> > extractor
> > > > >> > >> for this and pass that to groupBy.
> > > > >> > >>
> > > > >> > >> Actually we have some use-cases like this for streaming so we
> > are
> > > > >> > thinking
> > > > >> > >> of writing a wrapper for the array types that would behave as
> > you
> > > > >> > described.
> > > > >> > >>
> > > > >> > >> Regards,
> > > > >> > >> Gyula
> > > > >> > >>
> > > > >> > >>> On 21 Oct 2014, at 14:03, Martin Neumann <
> > mneumann@spotify.com>
> > > > >> wrote:
> > > > >> > >>>
> > > > >> > >>> Hej,
> > > > >> > >>>
> > > > >> > >>> I have a csv file with 54 columns each of them is string
> (for
> > > > now). I
> > > > >> > >> need
> > > > >> > >>> to group and sort them on field 15.
> > > > >> > >>>
> > > > >> > >>> Whats the best way to load the data into Flink?
> > > > >> > >>> There is no Tuple54 (and the <> would look awful anyway with
> > 54
> > > > times
> > > > >> > >>> String in it).
> > > > >> > >>> My current Idea is to write a Mapper and split the string to
> > > > Arrays
> > > > >> of
> > > > >> > >>> Strings would grouping and sorting work on this?
> > > > >> > >>>
> > > > >> > >>> So can I do something like this or does that only work on
> > > tuples:
> > > > >> > >>> Dataset<String[]> ds;
> > > > >> > >>> ds.groupBy(15).sort(20. ANY)
> > > > >> > >>>
> > > > >> > >>> cheers Martin
> > > > >> > >>
> > > > >> > >>
> > > > >> >
> > > > >> >
> > > > >>
> > > >
> > >
> >
>

Re: how load/group with large csv files

Posted by Martin Neumann <mn...@spotify.com>.
There was not enough time to clean it up and gold plate it. He got semi
horrible java code now with some explanation how it would look in scala.
My colleague was asking for a quick (and dirty) job, so taking more time on
it would have defied the purpose of the whole thing a bit.

In any case thanks for the advice, hopefully I found us another Flink
supporter.

On Tue, Oct 21, 2014 at 3:52 PM, Stephan Ewen <se...@apache.org> wrote:

> Hej,
>
> Do you want to use Scala? You can use simple case classes there and use
> fields directly as keys, it will look very elegant...
>
> If you want to stick with Java, you can actually use POJOs (Robert just
> corrected me, expression keys should be available there)
>
> Can you define a class
>
> public class MyClass {
>     public String id;
>     public int someValue;
>     public double anotherValue;
>     ...
> }
>
> and then run a program like this:
>
> DataSet<MyClass> data = env.readAsText(...).map(new Parser());
>
> data.groupBy("id").sort("someValue").reduceGroup(new
> GroupReduceFunction(...));
>
>
> Feel free to post your program here so we can give you comments!
>
> Greetings,
> Stephan
>
>
>
> On Tue, Oct 21, 2014 at 3:32 PM, Martin Neumann <mn...@spotify.com>
> wrote:
>
> > Nope,
> >
> > but I cant filter out the useless data since the program I'm comparing to
> > does not either. The point is to prove to one of my Colleague that Flink
> >
> > Spark.
> > The Spark program runs out of memory and crashes when just doing a simple
> > group and counting the number of items.
> >
> > This is also one of the reasons I ask for what is the best style of doing
> > this so I can get it as clean as possible to compare it to Spark.
> >
> > cheers Martin
> >
> >
> > On Tue, Oct 21, 2014 at 3:07 PM, Aljoscha Krettek <al...@apache.org>
> > wrote:
> >
> > > By the way, do you actually need all those 54 columns in your job?
> > >
> > > On Tue, Oct 21, 2014 at 3:02 PM, Martin Neumann <mn...@spotify.com>
> > > wrote:
> > > > I will go with that workaround, however I would have preferred if I
> > could
> > > > have done that directly with the API instead of doing Map/Reduce like
> > > > Key/Value tuples again :-)
> > > >
> > > > By the way is there a simple function to count the number of items
> in a
> > > > reduce group? It feels stupid to write a GroupReduce that just
> iterates
> > > and
> > > > increments a counter.
> > > >
> > > > cheers Martin
> > > >
> > > > On Tue, Oct 21, 2014 at 2:54 PM, Robert Metzger <rmetzger@apache.org
> >
> > > wrote:
> > > >
> > > >> Yes, for sorted groups, you need to use Pojos or Tuples.
> > > >> I think you have to split the input lines manually, with a mapper.
> > > >> How about using a TupleN<...> with only the fields you need?
> (returned
> > > by
> > > >> the mapper)
> > > >>
> > > >> if you need all fields, you could also use a Tuple2<String,
> String[]>
> > > where
> > > >> the first position is the sort key?
> > > >>
> > > >>
> > > >>
> > > >> On Tue, Oct 21, 2014 at 2:20 PM, Gyula Fora <gy...@apache.org>
> > wrote:
> > > >>
> > > >> > I am not sure how you should go about that, let’s wait for some
> > > feedback
> > > >> > from the others.
> > > >> >
> > > >> > Until then you can always map the array to (array, keyfield) and
> use
> > > >> > groupBy(1).
> > > >> >
> > > >> >
> > > >> > > On 21 Oct 2014, at 14:17, Martin Neumann <mn...@spotify.com>
> > > wrote:
> > > >> > >
> > > >> > > Hej,
> > > >> > >
> > > >> > > Unfortunately .sort() cannot take a key extractor, would I have
> to
> > > do
> > > >> the
> > > >> > > sort myself then?
> > > >> > >
> > > >> > > cheers Martin
> > > >> > >
> > > >> > > On Tue, Oct 21, 2014 at 2:08 PM, Gyula Fora <gy...@apache.org>
> > > wrote:
> > > >> > >
> > > >> > >> Hey,
> > > >> > >>
> > > >> > >> Using arrays is probably a convenient way to do so.
> > > >> > >>
> > > >> > >> I think the way you described the groupBy only works for tuples
> > > now.
> > > >> To
> > > >> > do
> > > >> > >> the grouping on the array field, you would need to create a key
> > > >> > extractor
> > > >> > >> for this and pass that to groupBy.
> > > >> > >>
> > > >> > >> Actually we have some use-cases like this for streaming so we
> are
> > > >> > thinking
> > > >> > >> of writing a wrapper for the array types that would behave as
> you
> > > >> > described.
> > > >> > >>
> > > >> > >> Regards,
> > > >> > >> Gyula
> > > >> > >>
> > > >> > >>> On 21 Oct 2014, at 14:03, Martin Neumann <
> mneumann@spotify.com>
> > > >> wrote:
> > > >> > >>>
> > > >> > >>> Hej,
> > > >> > >>>
> > > >> > >>> I have a csv file with 54 columns each of them is string (for
> > > now). I
> > > >> > >> need
> > > >> > >>> to group and sort them on field 15.
> > > >> > >>>
> > > >> > >>> Whats the best way to load the data into Flink?
> > > >> > >>> There is no Tuple54 (and the <> would look awful anyway with
> 54
> > > times
> > > >> > >>> String in it).
> > > >> > >>> My current Idea is to write a Mapper and split the string to
> > > Arrays
> > > >> of
> > > >> > >>> Strings would grouping and sorting work on this?
> > > >> > >>>
> > > >> > >>> So can I do something like this or does that only work on
> > tuples:
> > > >> > >>> Dataset<String[]> ds;
> > > >> > >>> ds.groupBy(15).sort(20. ANY)
> > > >> > >>>
> > > >> > >>> cheers Martin
> > > >> > >>
> > > >> > >>
> > > >> >
> > > >> >
> > > >>
> > >
> >
>

Re: how load/group with large csv files

Posted by Stephan Ewen <se...@apache.org>.
Hej,

Do you want to use Scala? You can use simple case classes there and use
fields directly as keys, it will look very elegant...

If you want to stick with Java, you can actually use POJOs (Robert just
corrected me, expression keys should be available there)

Can you define a class

public class MyClass {
    public String id;
    public int someValue;
    public double anotherValue;
    ...
}

and then run a program like this:

DataSet<MyClass> data = env.readAsText(...).map(new Parser());

data.groupBy("id").sort("someValue").reduceGroup(new
GroupReduceFunction(...));


Feel free to post your program here so we can give you comments!

Greetings,
Stephan



On Tue, Oct 21, 2014 at 3:32 PM, Martin Neumann <mn...@spotify.com>
wrote:

> Nope,
>
> but I cant filter out the useless data since the program I'm comparing to
> does not either. The point is to prove to one of my Colleague that Flink >
> Spark.
> The Spark program runs out of memory and crashes when just doing a simple
> group and counting the number of items.
>
> This is also one of the reasons I ask for what is the best style of doing
> this so I can get it as clean as possible to compare it to Spark.
>
> cheers Martin
>
>
> On Tue, Oct 21, 2014 at 3:07 PM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
> > By the way, do you actually need all those 54 columns in your job?
> >
> > On Tue, Oct 21, 2014 at 3:02 PM, Martin Neumann <mn...@spotify.com>
> > wrote:
> > > I will go with that workaround, however I would have preferred if I
> could
> > > have done that directly with the API instead of doing Map/Reduce like
> > > Key/Value tuples again :-)
> > >
> > > By the way is there a simple function to count the number of items in a
> > > reduce group? It feels stupid to write a GroupReduce that just iterates
> > and
> > > increments a counter.
> > >
> > > cheers Martin
> > >
> > > On Tue, Oct 21, 2014 at 2:54 PM, Robert Metzger <rm...@apache.org>
> > wrote:
> > >
> > >> Yes, for sorted groups, you need to use Pojos or Tuples.
> > >> I think you have to split the input lines manually, with a mapper.
> > >> How about using a TupleN<...> with only the fields you need? (returned
> > by
> > >> the mapper)
> > >>
> > >> if you need all fields, you could also use a Tuple2<String, String[]>
> > where
> > >> the first position is the sort key?
> > >>
> > >>
> > >>
> > >> On Tue, Oct 21, 2014 at 2:20 PM, Gyula Fora <gy...@apache.org>
> wrote:
> > >>
> > >> > I am not sure how you should go about that, let’s wait for some
> > feedback
> > >> > from the others.
> > >> >
> > >> > Until then you can always map the array to (array, keyfield) and use
> > >> > groupBy(1).
> > >> >
> > >> >
> > >> > > On 21 Oct 2014, at 14:17, Martin Neumann <mn...@spotify.com>
> > wrote:
> > >> > >
> > >> > > Hej,
> > >> > >
> > >> > > Unfortunately .sort() cannot take a key extractor, would I have to
> > do
> > >> the
> > >> > > sort myself then?
> > >> > >
> > >> > > cheers Martin
> > >> > >
> > >> > > On Tue, Oct 21, 2014 at 2:08 PM, Gyula Fora <gy...@apache.org>
> > wrote:
> > >> > >
> > >> > >> Hey,
> > >> > >>
> > >> > >> Using arrays is probably a convenient way to do so.
> > >> > >>
> > >> > >> I think the way you described the groupBy only works for tuples
> > now.
> > >> To
> > >> > do
> > >> > >> the grouping on the array field, you would need to create a key
> > >> > extractor
> > >> > >> for this and pass that to groupBy.
> > >> > >>
> > >> > >> Actually we have some use-cases like this for streaming so we are
> > >> > thinking
> > >> > >> of writing a wrapper for the array types that would behave as you
> > >> > described.
> > >> > >>
> > >> > >> Regards,
> > >> > >> Gyula
> > >> > >>
> > >> > >>> On 21 Oct 2014, at 14:03, Martin Neumann <mn...@spotify.com>
> > >> wrote:
> > >> > >>>
> > >> > >>> Hej,
> > >> > >>>
> > >> > >>> I have a csv file with 54 columns each of them is string (for
> > now). I
> > >> > >> need
> > >> > >>> to group and sort them on field 15.
> > >> > >>>
> > >> > >>> Whats the best way to load the data into Flink?
> > >> > >>> There is no Tuple54 (and the <> would look awful anyway with 54
> > times
> > >> > >>> String in it).
> > >> > >>> My current Idea is to write a Mapper and split the string to
> > Arrays
> > >> of
> > >> > >>> Strings would grouping and sorting work on this?
> > >> > >>>
> > >> > >>> So can I do something like this or does that only work on
> tuples:
> > >> > >>> Dataset<String[]> ds;
> > >> > >>> ds.groupBy(15).sort(20. ANY)
> > >> > >>>
> > >> > >>> cheers Martin
> > >> > >>
> > >> > >>
> > >> >
> > >> >
> > >>
> >
>

Re: how load/group with large csv files

Posted by Martin Neumann <mn...@spotify.com>.
Nope,

but I cant filter out the useless data since the program I'm comparing to
does not either. The point is to prove to one of my Colleague that Flink >
Spark.
The Spark program runs out of memory and crashes when just doing a simple
group and counting the number of items.

This is also one of the reasons I ask for what is the best style of doing
this so I can get it as clean as possible to compare it to Spark.

cheers Martin


On Tue, Oct 21, 2014 at 3:07 PM, Aljoscha Krettek <al...@apache.org>
wrote:

> By the way, do you actually need all those 54 columns in your job?
>
> On Tue, Oct 21, 2014 at 3:02 PM, Martin Neumann <mn...@spotify.com>
> wrote:
> > I will go with that workaround, however I would have preferred if I could
> > have done that directly with the API instead of doing Map/Reduce like
> > Key/Value tuples again :-)
> >
> > By the way is there a simple function to count the number of items in a
> > reduce group? It feels stupid to write a GroupReduce that just iterates
> and
> > increments a counter.
> >
> > cheers Martin
> >
> > On Tue, Oct 21, 2014 at 2:54 PM, Robert Metzger <rm...@apache.org>
> wrote:
> >
> >> Yes, for sorted groups, you need to use Pojos or Tuples.
> >> I think you have to split the input lines manually, with a mapper.
> >> How about using a TupleN<...> with only the fields you need? (returned
> by
> >> the mapper)
> >>
> >> if you need all fields, you could also use a Tuple2<String, String[]>
> where
> >> the first position is the sort key?
> >>
> >>
> >>
> >> On Tue, Oct 21, 2014 at 2:20 PM, Gyula Fora <gy...@apache.org> wrote:
> >>
> >> > I am not sure how you should go about that, let’s wait for some
> feedback
> >> > from the others.
> >> >
> >> > Until then you can always map the array to (array, keyfield) and use
> >> > groupBy(1).
> >> >
> >> >
> >> > > On 21 Oct 2014, at 14:17, Martin Neumann <mn...@spotify.com>
> wrote:
> >> > >
> >> > > Hej,
> >> > >
> >> > > Unfortunately .sort() cannot take a key extractor, would I have to
> do
> >> the
> >> > > sort myself then?
> >> > >
> >> > > cheers Martin
> >> > >
> >> > > On Tue, Oct 21, 2014 at 2:08 PM, Gyula Fora <gy...@apache.org>
> wrote:
> >> > >
> >> > >> Hey,
> >> > >>
> >> > >> Using arrays is probably a convenient way to do so.
> >> > >>
> >> > >> I think the way you described the groupBy only works for tuples
> now.
> >> To
> >> > do
> >> > >> the grouping on the array field, you would need to create a key
> >> > extractor
> >> > >> for this and pass that to groupBy.
> >> > >>
> >> > >> Actually we have some use-cases like this for streaming so we are
> >> > thinking
> >> > >> of writing a wrapper for the array types that would behave as you
> >> > described.
> >> > >>
> >> > >> Regards,
> >> > >> Gyula
> >> > >>
> >> > >>> On 21 Oct 2014, at 14:03, Martin Neumann <mn...@spotify.com>
> >> wrote:
> >> > >>>
> >> > >>> Hej,
> >> > >>>
> >> > >>> I have a csv file with 54 columns each of them is string (for
> now). I
> >> > >> need
> >> > >>> to group and sort them on field 15.
> >> > >>>
> >> > >>> Whats the best way to load the data into Flink?
> >> > >>> There is no Tuple54 (and the <> would look awful anyway with 54
> times
> >> > >>> String in it).
> >> > >>> My current Idea is to write a Mapper and split the string to
> Arrays
> >> of
> >> > >>> Strings would grouping and sorting work on this?
> >> > >>>
> >> > >>> So can I do something like this or does that only work on tuples:
> >> > >>> Dataset<String[]> ds;
> >> > >>> ds.groupBy(15).sort(20. ANY)
> >> > >>>
> >> > >>> cheers Martin
> >> > >>
> >> > >>
> >> >
> >> >
> >>
>

Re: how load/group with large csv files

Posted by Stephan Ewen <se...@apache.org>.
The POJO support should allow you to have a custom type with such many
fields, and then point to the relevant sorting fields.

Unfortunately, the pojo expression keys are not available in group sorting
as of today. Next version will solve it more elegantly...

On Tue, Oct 21, 2014 at 3:07 PM, Aljoscha Krettek <al...@apache.org>
wrote:

> By the way, do you actually need all those 54 columns in your job?
>
> On Tue, Oct 21, 2014 at 3:02 PM, Martin Neumann <mn...@spotify.com>
> wrote:
> > I will go with that workaround, however I would have preferred if I could
> > have done that directly with the API instead of doing Map/Reduce like
> > Key/Value tuples again :-)
> >
> > By the way is there a simple function to count the number of items in a
> > reduce group? It feels stupid to write a GroupReduce that just iterates
> and
> > increments a counter.
> >
> > cheers Martin
> >
> > On Tue, Oct 21, 2014 at 2:54 PM, Robert Metzger <rm...@apache.org>
> wrote:
> >
> >> Yes, for sorted groups, you need to use Pojos or Tuples.
> >> I think you have to split the input lines manually, with a mapper.
> >> How about using a TupleN<...> with only the fields you need? (returned
> by
> >> the mapper)
> >>
> >> if you need all fields, you could also use a Tuple2<String, String[]>
> where
> >> the first position is the sort key?
> >>
> >>
> >>
> >> On Tue, Oct 21, 2014 at 2:20 PM, Gyula Fora <gy...@apache.org> wrote:
> >>
> >> > I am not sure how you should go about that, let’s wait for some
> feedback
> >> > from the others.
> >> >
> >> > Until then you can always map the array to (array, keyfield) and use
> >> > groupBy(1).
> >> >
> >> >
> >> > > On 21 Oct 2014, at 14:17, Martin Neumann <mn...@spotify.com>
> wrote:
> >> > >
> >> > > Hej,
> >> > >
> >> > > Unfortunately .sort() cannot take a key extractor, would I have to
> do
> >> the
> >> > > sort myself then?
> >> > >
> >> > > cheers Martin
> >> > >
> >> > > On Tue, Oct 21, 2014 at 2:08 PM, Gyula Fora <gy...@apache.org>
> wrote:
> >> > >
> >> > >> Hey,
> >> > >>
> >> > >> Using arrays is probably a convenient way to do so.
> >> > >>
> >> > >> I think the way you described the groupBy only works for tuples
> now.
> >> To
> >> > do
> >> > >> the grouping on the array field, you would need to create a key
> >> > extractor
> >> > >> for this and pass that to groupBy.
> >> > >>
> >> > >> Actually we have some use-cases like this for streaming so we are
> >> > thinking
> >> > >> of writing a wrapper for the array types that would behave as you
> >> > described.
> >> > >>
> >> > >> Regards,
> >> > >> Gyula
> >> > >>
> >> > >>> On 21 Oct 2014, at 14:03, Martin Neumann <mn...@spotify.com>
> >> wrote:
> >> > >>>
> >> > >>> Hej,
> >> > >>>
> >> > >>> I have a csv file with 54 columns each of them is string (for
> now). I
> >> > >> need
> >> > >>> to group and sort them on field 15.
> >> > >>>
> >> > >>> Whats the best way to load the data into Flink?
> >> > >>> There is no Tuple54 (and the <> would look awful anyway with 54
> times
> >> > >>> String in it).
> >> > >>> My current Idea is to write a Mapper and split the string to
> Arrays
> >> of
> >> > >>> Strings would grouping and sorting work on this?
> >> > >>>
> >> > >>> So can I do something like this or does that only work on tuples:
> >> > >>> Dataset<String[]> ds;
> >> > >>> ds.groupBy(15).sort(20. ANY)
> >> > >>>
> >> > >>> cheers Martin
> >> > >>
> >> > >>
> >> >
> >> >
> >>
>

Re: how load/group with large csv files

Posted by Aljoscha Krettek <al...@apache.org>.
By the way, do you actually need all those 54 columns in your job?

On Tue, Oct 21, 2014 at 3:02 PM, Martin Neumann <mn...@spotify.com> wrote:
> I will go with that workaround, however I would have preferred if I could
> have done that directly with the API instead of doing Map/Reduce like
> Key/Value tuples again :-)
>
> By the way is there a simple function to count the number of items in a
> reduce group? It feels stupid to write a GroupReduce that just iterates and
> increments a counter.
>
> cheers Martin
>
> On Tue, Oct 21, 2014 at 2:54 PM, Robert Metzger <rm...@apache.org> wrote:
>
>> Yes, for sorted groups, you need to use Pojos or Tuples.
>> I think you have to split the input lines manually, with a mapper.
>> How about using a TupleN<...> with only the fields you need? (returned by
>> the mapper)
>>
>> if you need all fields, you could also use a Tuple2<String, String[]> where
>> the first position is the sort key?
>>
>>
>>
>> On Tue, Oct 21, 2014 at 2:20 PM, Gyula Fora <gy...@apache.org> wrote:
>>
>> > I am not sure how you should go about that, let’s wait for some feedback
>> > from the others.
>> >
>> > Until then you can always map the array to (array, keyfield) and use
>> > groupBy(1).
>> >
>> >
>> > > On 21 Oct 2014, at 14:17, Martin Neumann <mn...@spotify.com> wrote:
>> > >
>> > > Hej,
>> > >
>> > > Unfortunately .sort() cannot take a key extractor, would I have to do
>> the
>> > > sort myself then?
>> > >
>> > > cheers Martin
>> > >
>> > > On Tue, Oct 21, 2014 at 2:08 PM, Gyula Fora <gy...@apache.org> wrote:
>> > >
>> > >> Hey,
>> > >>
>> > >> Using arrays is probably a convenient way to do so.
>> > >>
>> > >> I think the way you described the groupBy only works for tuples now.
>> To
>> > do
>> > >> the grouping on the array field, you would need to create a key
>> > extractor
>> > >> for this and pass that to groupBy.
>> > >>
>> > >> Actually we have some use-cases like this for streaming so we are
>> > thinking
>> > >> of writing a wrapper for the array types that would behave as you
>> > described.
>> > >>
>> > >> Regards,
>> > >> Gyula
>> > >>
>> > >>> On 21 Oct 2014, at 14:03, Martin Neumann <mn...@spotify.com>
>> wrote:
>> > >>>
>> > >>> Hej,
>> > >>>
>> > >>> I have a csv file with 54 columns each of them is string (for now). I
>> > >> need
>> > >>> to group and sort them on field 15.
>> > >>>
>> > >>> Whats the best way to load the data into Flink?
>> > >>> There is no Tuple54 (and the <> would look awful anyway with 54 times
>> > >>> String in it).
>> > >>> My current Idea is to write a Mapper and split the string to Arrays
>> of
>> > >>> Strings would grouping and sorting work on this?
>> > >>>
>> > >>> So can I do something like this or does that only work on tuples:
>> > >>> Dataset<String[]> ds;
>> > >>> ds.groupBy(15).sort(20. ANY)
>> > >>>
>> > >>> cheers Martin
>> > >>
>> > >>
>> >
>> >
>>

Re: how load/group with large csv files

Posted by Martin Neumann <mn...@spotify.com>.
I will go with that workaround, however I would have preferred if I could
have done that directly with the API instead of doing Map/Reduce like
Key/Value tuples again :-)

By the way is there a simple function to count the number of items in a
reduce group? It feels stupid to write a GroupReduce that just iterates and
increments a counter.

cheers Martin

On Tue, Oct 21, 2014 at 2:54 PM, Robert Metzger <rm...@apache.org> wrote:

> Yes, for sorted groups, you need to use Pojos or Tuples.
> I think you have to split the input lines manually, with a mapper.
> How about using a TupleN<...> with only the fields you need? (returned by
> the mapper)
>
> if you need all fields, you could also use a Tuple2<String, String[]> where
> the first position is the sort key?
>
>
>
> On Tue, Oct 21, 2014 at 2:20 PM, Gyula Fora <gy...@apache.org> wrote:
>
> > I am not sure how you should go about that, let’s wait for some feedback
> > from the others.
> >
> > Until then you can always map the array to (array, keyfield) and use
> > groupBy(1).
> >
> >
> > > On 21 Oct 2014, at 14:17, Martin Neumann <mn...@spotify.com> wrote:
> > >
> > > Hej,
> > >
> > > Unfortunately .sort() cannot take a key extractor, would I have to do
> the
> > > sort myself then?
> > >
> > > cheers Martin
> > >
> > > On Tue, Oct 21, 2014 at 2:08 PM, Gyula Fora <gy...@apache.org> wrote:
> > >
> > >> Hey,
> > >>
> > >> Using arrays is probably a convenient way to do so.
> > >>
> > >> I think the way you described the groupBy only works for tuples now.
> To
> > do
> > >> the grouping on the array field, you would need to create a key
> > extractor
> > >> for this and pass that to groupBy.
> > >>
> > >> Actually we have some use-cases like this for streaming so we are
> > thinking
> > >> of writing a wrapper for the array types that would behave as you
> > described.
> > >>
> > >> Regards,
> > >> Gyula
> > >>
> > >>> On 21 Oct 2014, at 14:03, Martin Neumann <mn...@spotify.com>
> wrote:
> > >>>
> > >>> Hej,
> > >>>
> > >>> I have a csv file with 54 columns each of them is string (for now). I
> > >> need
> > >>> to group and sort them on field 15.
> > >>>
> > >>> Whats the best way to load the data into Flink?
> > >>> There is no Tuple54 (and the <> would look awful anyway with 54 times
> > >>> String in it).
> > >>> My current Idea is to write a Mapper and split the string to Arrays
> of
> > >>> Strings would grouping and sorting work on this?
> > >>>
> > >>> So can I do something like this or does that only work on tuples:
> > >>> Dataset<String[]> ds;
> > >>> ds.groupBy(15).sort(20. ANY)
> > >>>
> > >>> cheers Martin
> > >>
> > >>
> >
> >
>

Re: how load/group with large csv files

Posted by Robert Metzger <rm...@apache.org>.
Yes, for sorted groups, you need to use Pojos or Tuples.
I think you have to split the input lines manually, with a mapper.
How about using a TupleN<...> with only the fields you need? (returned by
the mapper)

if you need all fields, you could also use a Tuple2<String, String[]> where
the first position is the sort key?



On Tue, Oct 21, 2014 at 2:20 PM, Gyula Fora <gy...@apache.org> wrote:

> I am not sure how you should go about that, let’s wait for some feedback
> from the others.
>
> Until then you can always map the array to (array, keyfield) and use
> groupBy(1).
>
>
> > On 21 Oct 2014, at 14:17, Martin Neumann <mn...@spotify.com> wrote:
> >
> > Hej,
> >
> > Unfortunately .sort() cannot take a key extractor, would I have to do the
> > sort myself then?
> >
> > cheers Martin
> >
> > On Tue, Oct 21, 2014 at 2:08 PM, Gyula Fora <gy...@apache.org> wrote:
> >
> >> Hey,
> >>
> >> Using arrays is probably a convenient way to do so.
> >>
> >> I think the way you described the groupBy only works for tuples now. To
> do
> >> the grouping on the array field, you would need to create a key
> extractor
> >> for this and pass that to groupBy.
> >>
> >> Actually we have some use-cases like this for streaming so we are
> thinking
> >> of writing a wrapper for the array types that would behave as you
> described.
> >>
> >> Regards,
> >> Gyula
> >>
> >>> On 21 Oct 2014, at 14:03, Martin Neumann <mn...@spotify.com> wrote:
> >>>
> >>> Hej,
> >>>
> >>> I have a csv file with 54 columns each of them is string (for now). I
> >> need
> >>> to group and sort them on field 15.
> >>>
> >>> Whats the best way to load the data into Flink?
> >>> There is no Tuple54 (and the <> would look awful anyway with 54 times
> >>> String in it).
> >>> My current Idea is to write a Mapper and split the string to Arrays of
> >>> Strings would grouping and sorting work on this?
> >>>
> >>> So can I do something like this or does that only work on tuples:
> >>> Dataset<String[]> ds;
> >>> ds.groupBy(15).sort(20. ANY)
> >>>
> >>> cheers Martin
> >>
> >>
>
>

Re: how load/group with large csv files

Posted by Gyula Fora <gy...@apache.org>.
I am not sure how you should go about that, let’s wait for some feedback from the others. 

Until then you can always map the array to (array, keyfield) and use groupBy(1).


> On 21 Oct 2014, at 14:17, Martin Neumann <mn...@spotify.com> wrote:
> 
> Hej,
> 
> Unfortunately .sort() cannot take a key extractor, would I have to do the
> sort myself then?
> 
> cheers Martin
> 
> On Tue, Oct 21, 2014 at 2:08 PM, Gyula Fora <gy...@apache.org> wrote:
> 
>> Hey,
>> 
>> Using arrays is probably a convenient way to do so.
>> 
>> I think the way you described the groupBy only works for tuples now. To do
>> the grouping on the array field, you would need to create a key extractor
>> for this and pass that to groupBy.
>> 
>> Actually we have some use-cases like this for streaming so we are thinking
>> of writing a wrapper for the array types that would behave as you described.
>> 
>> Regards,
>> Gyula
>> 
>>> On 21 Oct 2014, at 14:03, Martin Neumann <mn...@spotify.com> wrote:
>>> 
>>> Hej,
>>> 
>>> I have a csv file with 54 columns each of them is string (for now). I
>> need
>>> to group and sort them on field 15.
>>> 
>>> Whats the best way to load the data into Flink?
>>> There is no Tuple54 (and the <> would look awful anyway with 54 times
>>> String in it).
>>> My current Idea is to write a Mapper and split the string to Arrays of
>>> Strings would grouping and sorting work on this?
>>> 
>>> So can I do something like this or does that only work on tuples:
>>> Dataset<String[]> ds;
>>> ds.groupBy(15).sort(20. ANY)
>>> 
>>> cheers Martin
>> 
>> 


Re: how load/group with large csv files

Posted by Martin Neumann <mn...@spotify.com>.
Hej,

Unfortunately .sort() cannot take a key extractor, would I have to do the
sort myself then?

cheers Martin

On Tue, Oct 21, 2014 at 2:08 PM, Gyula Fora <gy...@apache.org> wrote:

> Hey,
>
> Using arrays is probably a convenient way to do so.
>
> I think the way you described the groupBy only works for tuples now. To do
> the grouping on the array field, you would need to create a key extractor
> for this and pass that to groupBy.
>
> Actually we have some use-cases like this for streaming so we are thinking
> of writing a wrapper for the array types that would behave as you described.
>
> Regards,
> Gyula
>
> > On 21 Oct 2014, at 14:03, Martin Neumann <mn...@spotify.com> wrote:
> >
> > Hej,
> >
> > I have a csv file with 54 columns each of them is string (for now). I
> need
> > to group and sort them on field 15.
> >
> > Whats the best way to load the data into Flink?
> > There is no Tuple54 (and the <> would look awful anyway with 54 times
> > String in it).
> > My current Idea is to write a Mapper and split the string to Arrays of
> > Strings would grouping and sorting work on this?
> >
> > So can I do something like this or does that only work on tuples:
> > Dataset<String[]> ds;
> > ds.groupBy(15).sort(20. ANY)
> >
> > cheers Martin
>
>

Re: how load/group with large csv files

Posted by Gyula Fora <gy...@apache.org>.
Hey,

Using arrays is probably a convenient way to do so.

I think the way you described the groupBy only works for tuples now. To do the grouping on the array field, you would need to create a key extractor for this and pass that to groupBy.

Actually we have some use-cases like this for streaming so we are thinking of writing a wrapper for the array types that would behave as you described.

Regards,
Gyula

> On 21 Oct 2014, at 14:03, Martin Neumann <mn...@spotify.com> wrote:
> 
> Hej,
> 
> I have a csv file with 54 columns each of them is string (for now). I need
> to group and sort them on field 15.
> 
> Whats the best way to load the data into Flink?
> There is no Tuple54 (and the <> would look awful anyway with 54 times
> String in it).
> My current Idea is to write a Mapper and split the string to Arrays of
> Strings would grouping and sorting work on this?
> 
> So can I do something like this or does that only work on tuples:
> Dataset<String[]> ds;
> ds.groupBy(15).sort(20. ANY)
> 
> cheers Martin