You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Robert Metzger <rm...@apache.org> on 2014/08/06 15:54:34 UTC

Re: Problem with groupBy over custom types

Hi Leonidas,
I'm very exited to hear that you are planning to add support for Flink to
MRQL.

Let us know if you need more help getting Flink to run on MRQL. We are very
interested in connecting to other Apache projects.
If you don't want to be blocked by our first Apache release (which seems to
take some time, as expected ;) ), you can also use the "release-0.6"
branch. We have merged all breaking API changes, so you can assume this
branch to be very similar to the 0.6 release.

Regards,
Robert


On Thu, Jul 31, 2014 at 4:54 PM, Leonidas Fegaras <fe...@cse.uta.edu>
wrote:

> Hi Stephan,
> Thank you for fixing this so fast (given that you are very busy preparing
> your first release).
> Maybe I should explain why I need to work on GenericTypes.
> I am trying to make Apache MRQL run on Flink. MRQL is a query processing
> and optimization system for large-scale, distributed data analysis, built
> on top of Apache Hadoop, Hama, and Spark. MRQL queries are SQL-like but not
> SQL. They can work on complex nested data (JSON, XML, etc) and can express
> complex queries (pagerank, matrix factorization, etc). Let me make this
> clear first: Flink doesn't really need a query language. Flink programs are
> like queries because operations are collected and optimized. This gives
> Flink an edge over Spark. The reason I want to port MRQL to Flink is for
> the benefit of our project only: we want our queries to run on multiple
> platforms so that users can play and experiment with these systems without
> having to learn their APIs and without changing the queries. So I am not
> interested in the Flink optimizations (which is a shame, I know) since our
> system has it's own optimizer (which is currently not cost-based). So, to
> make the story short, the MRQL data model is like AVRO since it must
> support complex types. So the getKey methods must map an AVRO-like object
> to another AVRO-like object (the key). It doesn't mean that the key is the
> same as the value. It's fully understandable (and expected) that I will not
> be able to use the benefits of the Flink optimizer much on GenericTypes.
> Anyway, I am in the process of learning Flink and I will probably bother
> you with more questions later (but I will wait for the first Flink release
> first, since this will keep you busy for a while).
> Thanks for your help
> Leonidas
>
>
>
>
> On 07/30/2014 08:09 PM, Stephan Ewen wrote:
>
>> Addendum: I made a quick fix for the POJOs to support the expression keys
>> for reducers. The example from the above mail works with the code in this
>> branch: https://github.com/StephanEwen/incubator-flink/commits/pojofix
>>
>>
>> On Thu, Jul 31, 2014 at 3:06 AM, Stephan Ewen <sewen@apache.org <mailto:
>> sewen@apache.org>> wrote:
>>
>>     Yep, there is a flaw in the POJO Code, it incorrectly replaces the
>>     GenericTypes:
>>
>>     @Leonidas: To explain what is going on:
>>
>>       - Flink determines the types of the functions via reflection and
>>     build up its own model of representing them at runtime. It handles
>>     basic types (int, String, double, ...), arrays, and tuples in a
>>     special way, the rest is treated in a generic fashion. The FData
>>     class is such a "generic" type in flink.
>>
>>       - We recently added experimental code to analyze those types and
>>     represent their contained fields in a transparent way. Those are
>>     the POJO types, the code at the bottom illustrates what they can do.
>>       They replace the "generic types" at several levels, but their
>>     implementation is incomplete currently
>>
>>
>>     A few remarks on how to best use the system:
>>       - Flink keeps data always in a serialized form. That allows it
>>     to operate very robust with respect to memory pressure, spilling, etc.
>>
>>       - A way in which we gain efficiency is to access in that binary
>>     data only what is really necessary (only the parts that make up
>>     the key, for example).
>>
>>       - If you define the entire object to be the key, you prevent the
>>     system from doing these optimizations. If you can actually define
>>     which part is the key, you allow it more efficient operation.
>>
>>
>>
>>     ===================================
>>
>>     Example of PoJos and Expression Fields
>>
>>     public static void main(String[] args) throws Exception {
>>     ExecutionEnvironment env =
>>     ExecutionEnvironment.getExecutionEnvironment();
>>     DataSet<FData> data = env.fromElements(new FData("some"), new
>>     FData("test"), new FData("POJOs"));
>>     data.groupBy("theString").reduceGroup(new GroupbyReducer()).print();
>>
>>     env.execute();
>>     }
>>
>>     public static class FData implements Serializable, Comparable<FData> {
>>     public String theString;
>>
>>     public FData () {
>>     theString = "";
>>     }
>>     public FData (String data) {
>>     this.theString = data;
>>     }
>>     @Override
>>     public int compareTo ( FData x ) { return
>>     theString.compareTo(x.theString); }
>>     @Override
>>     public String toString() {
>>     return theString;
>>     }
>>     }
>>
>>     public static final class GroupbyReducer extends
>>     GroupReduceFunction<FData,FData> {
>>     @Override
>>     public void reduce ( Iterator<FData> values, Collector<FData> out ) {
>>     while (values.hasNext()) {
>>     out.collect(values.next());
>>     }
>>     }
>>     }
>>
>>
>>
>