You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Leonidas Fegaras <fe...@cse.uta.edu> on 2014/07/30 23:14:49 UTC

Problem with groupBy over custom types

Hi,
I am trying to do a groupBy over a DataSet with a custom type (not a Tuple):

public class FData implements Serializable, Comparable<FData> {
     public ... data;
     public FData () { ... }
     @Override
     public int compareTo ( FData x ) { return data.compareTo(x.data); }
...
}

Methods map and flatMap work fine on DataSet<FData>. But I have a 
problem with the following groupBy code:

s.groupBy(new GroupbyKey()).reduceGroup(new GroupbyReducer());

where s is a DataSet<FData> and the classes are defined as follows:

public static final class GroupbyKey extends KeySelector<FData,FData> {
    @Override
    public FData getKey ( FData value ) { return value; }
}
public static final class GroupbyReducer extends 
GroupReduceFunction<FData,FData> {
    @Override
    public void reduce ( final Iterator<FData> values, Collector<FData> 
out ) {}
}

This gives me the following error:

org.apache.flink.compiler.CompilerException: Error translating node 
'GroupReduce "x.FlinkEvaluator$GroupbyReducer" : SORTED_GROUP_REDUCE [[ 
GlobalProperties [partitioning=RANDOM] ]] [[ LocalProperties 
[ordering=null, grouped=null, unique=null] ]]': Could not serialize 
comparator into the configuration.
     at 
org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:346)
     at 
org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:100)
     at 
org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:145)
     at 
org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:146)
     at 
org.apache.flink.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:165)
     at 
org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:170)
     at org.apache.flink.client.program.Client.getJobGraph(Client.java:214)
...

(I tried to make the example as simple as possible).
What is the problem here? Do I need to implement FData with a different 
interface?
Thanks
Leonidas Fegaras


Re: Problem with groupBy over custom types

Posted by Robert Metzger <rm...@apache.org>.
Hi Leonidas,
I'm very exited to hear that you are planning to add support for Flink to
MRQL.

Let us know if you need more help getting Flink to run on MRQL. We are very
interested in connecting to other Apache projects.
If you don't want to be blocked by our first Apache release (which seems to
take some time, as expected ;) ), you can also use the "release-0.6"
branch. We have merged all breaking API changes, so you can assume this
branch to be very similar to the 0.6 release.

Regards,
Robert


On Thu, Jul 31, 2014 at 4:54 PM, Leonidas Fegaras <fe...@cse.uta.edu>
wrote:

> Hi Stephan,
> Thank you for fixing this so fast (given that you are very busy preparing
> your first release).
> Maybe I should explain why I need to work on GenericTypes.
> I am trying to make Apache MRQL run on Flink. MRQL is a query processing
> and optimization system for large-scale, distributed data analysis, built
> on top of Apache Hadoop, Hama, and Spark. MRQL queries are SQL-like but not
> SQL. They can work on complex nested data (JSON, XML, etc) and can express
> complex queries (pagerank, matrix factorization, etc). Let me make this
> clear first: Flink doesn't really need a query language. Flink programs are
> like queries because operations are collected and optimized. This gives
> Flink an edge over Spark. The reason I want to port MRQL to Flink is for
> the benefit of our project only: we want our queries to run on multiple
> platforms so that users can play and experiment with these systems without
> having to learn their APIs and without changing the queries. So I am not
> interested in the Flink optimizations (which is a shame, I know) since our
> system has it's own optimizer (which is currently not cost-based). So, to
> make the story short, the MRQL data model is like AVRO since it must
> support complex types. So the getKey methods must map an AVRO-like object
> to another AVRO-like object (the key). It doesn't mean that the key is the
> same as the value. It's fully understandable (and expected) that I will not
> be able to use the benefits of the Flink optimizer much on GenericTypes.
> Anyway, I am in the process of learning Flink and I will probably bother
> you with more questions later (but I will wait for the first Flink release
> first, since this will keep you busy for a while).
> Thanks for your help
> Leonidas
>
>
>
>
> On 07/30/2014 08:09 PM, Stephan Ewen wrote:
>
>> Addendum: I made a quick fix for the POJOs to support the expression keys
>> for reducers. The example from the above mail works with the code in this
>> branch: https://github.com/StephanEwen/incubator-flink/commits/pojofix
>>
>>
>> On Thu, Jul 31, 2014 at 3:06 AM, Stephan Ewen <sewen@apache.org <mailto:
>> sewen@apache.org>> wrote:
>>
>>     Yep, there is a flaw in the POJO Code, it incorrectly replaces the
>>     GenericTypes:
>>
>>     @Leonidas: To explain what is going on:
>>
>>       - Flink determines the types of the functions via reflection and
>>     build up its own model of representing them at runtime. It handles
>>     basic types (int, String, double, ...), arrays, and tuples in a
>>     special way, the rest is treated in a generic fashion. The FData
>>     class is such a "generic" type in flink.
>>
>>       - We recently added experimental code to analyze those types and
>>     represent their contained fields in a transparent way. Those are
>>     the POJO types, the code at the bottom illustrates what they can do.
>>       They replace the "generic types" at several levels, but their
>>     implementation is incomplete currently
>>
>>
>>     A few remarks on how to best use the system:
>>       - Flink keeps data always in a serialized form. That allows it
>>     to operate very robust with respect to memory pressure, spilling, etc.
>>
>>       - A way in which we gain efficiency is to access in that binary
>>     data only what is really necessary (only the parts that make up
>>     the key, for example).
>>
>>       - If you define the entire object to be the key, you prevent the
>>     system from doing these optimizations. If you can actually define
>>     which part is the key, you allow it more efficient operation.
>>
>>
>>
>>     ===================================
>>
>>     Example of PoJos and Expression Fields
>>
>>     public static void main(String[] args) throws Exception {
>>     ExecutionEnvironment env =
>>     ExecutionEnvironment.getExecutionEnvironment();
>>     DataSet<FData> data = env.fromElements(new FData("some"), new
>>     FData("test"), new FData("POJOs"));
>>     data.groupBy("theString").reduceGroup(new GroupbyReducer()).print();
>>
>>     env.execute();
>>     }
>>
>>     public static class FData implements Serializable, Comparable<FData> {
>>     public String theString;
>>
>>     public FData () {
>>     theString = "";
>>     }
>>     public FData (String data) {
>>     this.theString = data;
>>     }
>>     @Override
>>     public int compareTo ( FData x ) { return
>>     theString.compareTo(x.theString); }
>>     @Override
>>     public String toString() {
>>     return theString;
>>     }
>>     }
>>
>>     public static final class GroupbyReducer extends
>>     GroupReduceFunction<FData,FData> {
>>     @Override
>>     public void reduce ( Iterator<FData> values, Collector<FData> out ) {
>>     while (values.hasNext()) {
>>     out.collect(values.next());
>>     }
>>     }
>>     }
>>
>>
>>
>

Re: Problem with groupBy over custom types

Posted by Leonidas Fegaras <fe...@cse.uta.edu>.
Hi Stephan,
Thank you for fixing this so fast (given that you are very busy 
preparing your first release).
Maybe I should explain why I need to work on GenericTypes.
I am trying to make Apache MRQL run on Flink. MRQL is a query processing 
and optimization system for large-scale, distributed data analysis, 
built on top of Apache Hadoop, Hama, and Spark. MRQL queries are 
SQL-like but not SQL. They can work on complex nested data (JSON, XML, 
etc) and can express complex queries (pagerank, matrix factorization, 
etc). Let me make this clear first: Flink doesn't really need a query 
language. Flink programs are like queries because operations are 
collected and optimized. This gives Flink an edge over Spark. The reason 
I want to port MRQL to Flink is for the benefit of our project only: we 
want our queries to run on multiple platforms so that users can play and 
experiment with these systems without having to learn their APIs and 
without changing the queries. So I am not interested in the Flink 
optimizations (which is a shame, I know) since our system has it's own 
optimizer (which is currently not cost-based). So, to make the story 
short, the MRQL data model is like AVRO since it must support complex 
types. So the getKey methods must map an AVRO-like object to another 
AVRO-like object (the key). It doesn't mean that the key is the same as 
the value. It's fully understandable (and expected) that I will not be 
able to use the benefits of the Flink optimizer much on GenericTypes. 
Anyway, I am in the process of learning Flink and I will probably bother 
you with more questions later (but I will wait for the first Flink 
release first, since this will keep you busy for a while).
Thanks for your help
Leonidas



On 07/30/2014 08:09 PM, Stephan Ewen wrote:
> Addendum: I made a quick fix for the POJOs to support the expression 
> keys for reducers. The example from the above mail works with the code 
> in this branch: 
> https://github.com/StephanEwen/incubator-flink/commits/pojofix
>
>
> On Thu, Jul 31, 2014 at 3:06 AM, Stephan Ewen <sewen@apache.org 
> <ma...@apache.org>> wrote:
>
>     Yep, there is a flaw in the POJO Code, it incorrectly replaces the
>     GenericTypes:
>
>     @Leonidas: To explain what is going on:
>
>       - Flink determines the types of the functions via reflection and
>     build up its own model of representing them at runtime. It handles
>     basic types (int, String, double, ...), arrays, and tuples in a
>     special way, the rest is treated in a generic fashion. The FData
>     class is such a "generic" type in flink.
>
>       - We recently added experimental code to analyze those types and
>     represent their contained fields in a transparent way. Those are
>     the POJO types, the code at the bottom illustrates what they can do.
>       They replace the "generic types" at several levels, but their
>     implementation is incomplete currently
>
>
>     A few remarks on how to best use the system:
>       - Flink keeps data always in a serialized form. That allows it
>     to operate very robust with respect to memory pressure, spilling, etc.
>
>       - A way in which we gain efficiency is to access in that binary
>     data only what is really necessary (only the parts that make up
>     the key, for example).
>
>       - If you define the entire object to be the key, you prevent the
>     system from doing these optimizations. If you can actually define
>     which part is the key, you allow it more efficient operation.
>
>
>
>     ===================================
>
>     Example of PoJos and Expression Fields
>
>     public static void main(String[] args) throws Exception {
>     ExecutionEnvironment env =
>     ExecutionEnvironment.getExecutionEnvironment();
>     DataSet<FData> data = env.fromElements(new FData("some"), new
>     FData("test"), new FData("POJOs"));
>     data.groupBy("theString").reduceGroup(new GroupbyReducer()).print();
>
>     env.execute();
>     }
>
>     public static class FData implements Serializable, Comparable<FData> {
>     public String theString;
>
>     public FData () {
>     theString = "";
>     }
>     public FData (String data) {
>     this.theString = data;
>     }
>     @Override
>     public int compareTo ( FData x ) { return
>     theString.compareTo(x.theString); }
>     @Override
>     public String toString() {
>     return theString;
>     }
>     }
>
>     public static final class GroupbyReducer extends
>     GroupReduceFunction<FData,FData> {
>     @Override
>     public void reduce ( Iterator<FData> values, Collector<FData> out ) {
>     while (values.hasNext()) {
>     out.collect(values.next());
>     }
>     }
>     }
>
>


Re: Problem with groupBy over custom types

Posted by Stephan Ewen <se...@apache.org>.
Addendum: I made a quick fix for the POJOs to support the expression keys
for reducers. The example from the above mail works with the code in this
branch: https://github.com/StephanEwen/incubator-flink/commits/pojofix


On Thu, Jul 31, 2014 at 3:06 AM, Stephan Ewen <se...@apache.org> wrote:

> Yep, there is a flaw in the POJO Code, it incorrectly replaces the
> GenericTypes:
>
> @Leonidas: To explain what is going on:
>
>   - Flink determines the types of the functions via reflection and build
> up its own model of representing them at runtime. It handles basic types
> (int, String, double, ...), arrays, and tuples in a special way, the rest
> is treated in a generic fashion. The FData class is such a "generic" type
> in flink.
>
>   - We recently added experimental code to analyze those types and
> represent their contained fields in a transparent way. Those are the POJO
> types, the code at the bottom illustrates what they can do.
>   They replace the "generic types" at several levels, but their
> implementation is incomplete currently
>
>
> A few remarks on how to best use the system:
>   - Flink keeps data always in a serialized form. That allows it to
> operate very robust with respect to memory pressure, spilling, etc.
>
>   - A way in which we gain efficiency is to access in that binary data
> only what is really necessary (only the parts that make up the key, for
> example).
>
>   - If you define the entire object to be the key, you prevent the system
> from doing these optimizations. If you can actually define which part is
> the key, you allow it more efficient operation.
>
>
>
> ===================================
>
> Example of PoJos and Expression Fields
>
> public static void main(String[] args) throws Exception {
>  ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>  DataSet<FData> data = env.fromElements(new FData("some"), new
> FData("test"), new FData("POJOs"));
>  data.groupBy("theString").reduceGroup(new GroupbyReducer()).print();
>
> env.execute();
> }
>
> public static class FData implements Serializable, Comparable<FData> {
>  public String theString;
>
>  public FData () {
> theString = "";
> }
>  public FData (String data) {
> this.theString = data;
>  }
>  @Override
>  public int compareTo ( FData x ) { return
> theString.compareTo(x.theString); }
>  @Override
> public String toString() {
> return theString;
>  }
> }
>
> public static final class GroupbyReducer extends
> GroupReduceFunction<FData,FData> {
>  @Override
> public void reduce ( Iterator<FData> values, Collector<FData> out ) {
>  while (values.hasNext()) {
> out.collect(values.next());
>  }
> }
> }
>

Re: Problem with groupBy over custom types

Posted by Stephan Ewen <se...@apache.org>.
Yep, there is a flaw in the POJO Code, it incorrectly replaces the
GenericTypes:

@Leonidas: To explain what is going on:

  - Flink determines the types of the functions via reflection and build up
its own model of representing them at runtime. It handles basic types (int,
String, double, ...), arrays, and tuples in a special way, the rest is
treated in a generic fashion. The FData class is such a "generic" type in
flink.

  - We recently added experimental code to analyze those types and
represent their contained fields in a transparent way. Those are the POJO
types, the code at the bottom illustrates what they can do.
  They replace the "generic types" at several levels, but their
implementation is incomplete currently


A few remarks on how to best use the system:
  - Flink keeps data always in a serialized form. That allows it to operate
very robust with respect to memory pressure, spilling, etc.

  - A way in which we gain efficiency is to access in that binary data only
what is really necessary (only the parts that make up the key, for example).

  - If you define the entire object to be the key, you prevent the system
from doing these optimizations. If you can actually define which part is
the key, you allow it more efficient operation.



===================================

Example of PoJos and Expression Fields

public static void main(String[] args) throws Exception {
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 DataSet<FData> data = env.fromElements(new FData("some"), new
FData("test"), new FData("POJOs"));
 data.groupBy("theString").reduceGroup(new GroupbyReducer()).print();

env.execute();
}

public static class FData implements Serializable, Comparable<FData> {
 public String theString;

public FData () {
theString = "";
}
 public FData (String data) {
this.theString = data;
}
 @Override
public int compareTo ( FData x ) { return theString.compareTo(x.theString);
}
 @Override
public String toString() {
return theString;
}
}

public static final class GroupbyReducer extends
GroupReduceFunction<FData,FData> {
@Override
public void reduce ( Iterator<FData> values, Collector<FData> out ) {
while (values.hasNext()) {
out.collect(values.next());
}
}
}

Re: Problem with groupBy over custom types

Posted by Stephan Ewen <se...@apache.org>.
Before the POJO code, the "GenericType" was the catch-all type. In that
sense, it is correct that FData is generic.

Might be that the POJO additions conflict with the GenericType on the
lowest level. I am also looking into this.

Re: Problem with groupBy over custom types

Posted by Ufuk Celebi <u....@fu-berlin.de>.
I got the following exception when trying something along those lines as Leonidas:

Exception in thread "main" java.lang.IllegalArgumentException: The field at position 0 (PojoType<org.apache.flink.example.java.wordcount.WordCountPOJO.WC, fields = [count: Integer, other: GenericType<org.apache.flink.example.java.wordcount.WordCountPOJO.WC>, word: String]>) is no atomic key type.
	at org.apache.flink.api.java.typeutils.TupleTypeInfo.createLeadingFieldComparator(TupleTypeInfo.java:232)
	at org.apache.flink.api.java.typeutils.TupleTypeInfo.createComparator(TupleTypeInfo.java:124)
	at org.apache.flink.compiler.postpass.JavaApiPostPass.createComparator(JavaApiPostPass.java:296)
	at org.apache.flink.compiler.postpass.JavaApiPostPass.traverse(JavaApiPostPass.java:165)
	at org.apache.flink.compiler.postpass.JavaApiPostPass.traverseChannel(JavaApiPostPass.java:264)
	at org.apache.flink.compiler.postpass.JavaApiPostPass.traverse(JavaApiPostPass.java:170)
	at org.apache.flink.compiler.postpass.JavaApiPostPass.traverseChannel(JavaApiPostPass.java:264)
	at org.apache.flink.compiler.postpass.JavaApiPostPass.traverse(JavaApiPostPass.java:93)
	at org.apache.flink.compiler.postpass.JavaApiPostPass.postPass(JavaApiPostPass.java:77)
	at org.apache.flink.compiler.PactCompiler.compile(PactCompiler.java:584)
	at org.apache.flink.compiler.PactCompiler.compile(PactCompiler.java:461)
	at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:239)
	at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:58)
	at org.apache.flink.example.java.wordcount.WordCountPOJO.main(WordCountPOJO.java:88)

On 31 Jul 2014, at 00:48, Stephan Ewen <se...@apache.org> wrote:

> Hi Leonidas!
> 
> What you are doing should actually be supported. Do you have more of the
> stack-trace?
> 
> It seems that there is some non-serializable part somewhere in the
> GenericTypeComparator..

Shouldn't FData be of PojoType and not GenericType in Leonidas example and hence suffer from the same problem as I did?

Re: Problem with groupBy over custom types

Posted by Stephan Ewen <se...@apache.org>.
Hi Leonidas!

What you are doing should actually be supported. Do you have more of the
stack-trace?

It seems that there is some non-serializable part somewhere in the
GenericTypeComparator..

Stephan



On Wed, Jul 30, 2014 at 11:39 PM, Leonidas Fegaras <fe...@cse.uta.edu>
wrote:

> Hi Ufuk,
> Your getKey returns a String, so it's very simple. Mine must return a
> custom type (FData). So my getKey gets an FData and returns a different
> FData. I just made it identical to show you the error.
> So my question now is this: can getKey return a Comparable custom type or
> it must always be a simple type, such as String?
> Thanks
> Leonidas
> PS. Should your WC class be Serializable?
>
>
>
> On 07/30/2014 04:26 PM, Ufuk Celebi wrote:
>
>> Hey Leonidas,
>>
>> I think the problem is with the KeySelector. The key selector should
>> specify which field of your custom type should be used to do the grouping,
>> but you are currently just returning the same object.
>>
>> So you would have to think about which fields define the separate groups.
>> For example with a custom type for word counts, where you want to group on
>> distinct words:
>>
>> public class WC {
>>      public String word;
>>      public int count;
>>      // [...]
>> }
>>
>> input.groupBy(new KeySelector<WC, String>() {
>>      public String getKey(WC wc) {
>>          return wc.word;
>>      }
>> }).reduce(...);
>>
>> Does this help? Feel free to get back if you have further questions! :-)
>>
>> Ufuk
>>
>> On 30 Jul 2014, at 23:14, Leonidas Fegaras <fe...@cse.uta.edu> wrote:
>>
>>  Hi,
>>> I am trying to do a groupBy over a DataSet with a custom type (not a
>>> Tuple):
>>>
>>> public class FData implements Serializable, Comparable<FData> {
>>>     public ... data;
>>>     public FData () { ... }
>>>     @Override
>>>     public int compareTo ( FData x ) { return data.compareTo(x.data); }
>>> ...
>>> }
>>>
>>> Methods map and flatMap work fine on DataSet<FData>. But I have a
>>> problem with the following groupBy code:
>>>
>>> s.groupBy(new GroupbyKey()).reduceGroup(new GroupbyReducer());
>>>
>>> where s is a DataSet<FData> and the classes are defined as follows:
>>>
>>> public static final class GroupbyKey extends KeySelector<FData,FData> {
>>>    @Override
>>>    public FData getKey ( FData value ) { return value; }
>>> }
>>> public static final class GroupbyReducer extends
>>> GroupReduceFunction<FData,FData> {
>>>    @Override
>>>    public void reduce ( final Iterator<FData> values, Collector<FData>
>>> out ) {}
>>> }
>>>
>>> This gives me the following error:
>>>
>>> org.apache.flink.compiler.CompilerException: Error translating node
>>> 'GroupReduce "x.FlinkEvaluator$GroupbyReducer" : SORTED_GROUP_REDUCE [[
>>> GlobalProperties [partitioning=RANDOM] ]] [[ LocalProperties
>>> [ordering=null, grouped=null, unique=null] ]]': Could not serialize
>>> comparator into the configuration.
>>>     at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.
>>> preVisit(NepheleJobGraphGenerator.java:346)
>>>     at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.
>>> preVisit(NepheleJobGraphGenerator.java:100)
>>>     at org.apache.flink.compiler.plan.SingleInputPlanNode.
>>> accept(SingleInputPlanNode.java:145)
>>>     at org.apache.flink.compiler.plan.SingleInputPlanNode.
>>> accept(SingleInputPlanNode.java:146)
>>>     at org.apache.flink.compiler.plan.OptimizedPlan.accept(
>>> OptimizedPlan.java:165)
>>>     at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.
>>> compileJobGraph(NepheleJobGraphGenerator.java:170)
>>>     at org.apache.flink.client.program.Client.getJobGraph(
>>> Client.java:214)
>>> ...
>>>
>>> (I tried to make the example as simple as possible).
>>> What is the problem here? Do I need to implement FData with a different
>>> interface?
>>> Thanks
>>> Leonidas Fegaras
>>>
>> .
>>
>>
>

Re: Problem with groupBy over custom types

Posted by Leonidas Fegaras <fe...@cse.uta.edu>.
Hi Ufuk,
Your getKey returns a String, so it's very simple. Mine must return a 
custom type (FData). So my getKey gets an FData and returns a different 
FData. I just made it identical to show you the error.
So my question now is this: can getKey return a Comparable custom type 
or it must always be a simple type, such as String?
Thanks
Leonidas
PS. Should your WC class be Serializable?


On 07/30/2014 04:26 PM, Ufuk Celebi wrote:
> Hey Leonidas,
>
> I think the problem is with the KeySelector. The key selector should specify which field of your custom type should be used to do the grouping, but you are currently just returning the same object.
>
> So you would have to think about which fields define the separate groups. For example with a custom type for word counts, where you want to group on distinct words:
>
> public class WC {
>      public String word;
>      public int count;
>      // [...]
> }
>
> input.groupBy(new KeySelector<WC, String>() {
>      public String getKey(WC wc) {
>          return wc.word;
>      }
> }).reduce(...);
>
> Does this help? Feel free to get back if you have further questions! :-)
>
> Ufuk
>
> On 30 Jul 2014, at 23:14, Leonidas Fegaras <fe...@cse.uta.edu> wrote:
>
>> Hi,
>> I am trying to do a groupBy over a DataSet with a custom type (not a Tuple):
>>
>> public class FData implements Serializable, Comparable<FData> {
>>     public ... data;
>>     public FData () { ... }
>>     @Override
>>     public int compareTo ( FData x ) { return data.compareTo(x.data); }
>> ...
>> }
>>
>> Methods map and flatMap work fine on DataSet<FData>. But I have a problem with the following groupBy code:
>>
>> s.groupBy(new GroupbyKey()).reduceGroup(new GroupbyReducer());
>>
>> where s is a DataSet<FData> and the classes are defined as follows:
>>
>> public static final class GroupbyKey extends KeySelector<FData,FData> {
>>    @Override
>>    public FData getKey ( FData value ) { return value; }
>> }
>> public static final class GroupbyReducer extends GroupReduceFunction<FData,FData> {
>>    @Override
>>    public void reduce ( final Iterator<FData> values, Collector<FData> out ) {}
>> }
>>
>> This gives me the following error:
>>
>> org.apache.flink.compiler.CompilerException: Error translating node 'GroupReduce "x.FlinkEvaluator$GroupbyReducer" : SORTED_GROUP_REDUCE [[ GlobalProperties [partitioning=RANDOM] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not serialize comparator into the configuration.
>>     at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:346)
>>     at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:100)
>>     at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:145)
>>     at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:146)
>>     at org.apache.flink.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:165)
>>     at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:170)
>>     at org.apache.flink.client.program.Client.getJobGraph(Client.java:214)
>> ...
>>
>> (I tried to make the example as simple as possible).
>> What is the problem here? Do I need to implement FData with a different interface?
>> Thanks
>> Leonidas Fegaras
> .
>


Re: Problem with groupBy over custom types

Posted by Ufuk Celebi <u....@fu-berlin.de>.
Hey Leonidas,

I think the problem is with the KeySelector. The key selector should specify which field of your custom type should be used to do the grouping, but you are currently just returning the same object.

So you would have to think about which fields define the separate groups. For example with a custom type for word counts, where you want to group on distinct words:

public class WC {
    public String word;
    public int count;
    // [...]
}

input.groupBy(new KeySelector<WC, String>() {
    public String getKey(WC wc) {
        return wc.word;
    }
}).reduce(...);

Does this help? Feel free to get back if you have further questions! :-)

Ufuk

On 30 Jul 2014, at 23:14, Leonidas Fegaras <fe...@cse.uta.edu> wrote:

> Hi,
> I am trying to do a groupBy over a DataSet with a custom type (not a Tuple):
> 
> public class FData implements Serializable, Comparable<FData> {
>    public ... data;
>    public FData () { ... }
>    @Override
>    public int compareTo ( FData x ) { return data.compareTo(x.data); }
> ...
> }
> 
> Methods map and flatMap work fine on DataSet<FData>. But I have a problem with the following groupBy code:
> 
> s.groupBy(new GroupbyKey()).reduceGroup(new GroupbyReducer());
> 
> where s is a DataSet<FData> and the classes are defined as follows:
> 
> public static final class GroupbyKey extends KeySelector<FData,FData> {
>   @Override
>   public FData getKey ( FData value ) { return value; }
> }
> public static final class GroupbyReducer extends GroupReduceFunction<FData,FData> {
>   @Override
>   public void reduce ( final Iterator<FData> values, Collector<FData> out ) {}
> }
> 
> This gives me the following error:
> 
> org.apache.flink.compiler.CompilerException: Error translating node 'GroupReduce "x.FlinkEvaluator$GroupbyReducer" : SORTED_GROUP_REDUCE [[ GlobalProperties [partitioning=RANDOM] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not serialize comparator into the configuration.
>    at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:346)
>    at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:100)
>    at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:145)
>    at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:146)
>    at org.apache.flink.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:165)
>    at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:170)
>    at org.apache.flink.client.program.Client.getJobGraph(Client.java:214)
> ...
> 
> (I tried to make the example as simple as possible).
> What is the problem here? Do I need to implement FData with a different interface?
> Thanks
> Leonidas Fegaras