You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ashwin Jayaprakash <as...@gmail.com> on 2015/08/17 15:30:48 UTC

Self-join with filter

Hi, I'm trying to evaluate Flink to see if it can do efficient semi-joins
or self-joins with filter.

Problem description:
I have 1 stream that can contain "near duplicates" records. The records
share a "family name" and so, many records can have the same family name.
But each record has a unique id too.

Query:
1) I would like to run a query such that I get the ids and names of some
records based on a criterion

2) I then want to fetch all the records that match this set of ids fetched
previously but this time include the duplicates that share the same name

Question 1:
I've pasted the sample code below, but I'm trying to see if I can push all
the ids that came out of the first filter step "into" the connector,
partition by partition and then pull out only the matching records in step
2. Is there a way to do this?

Question 2:
I haven't seen many examples of non-Hadoop connectors. Is there any plan
for those?

(If you are curious, I was looking at Presto and had to post the question
there also -
https://groups.google.com/forum/#!topic/presto-users/Ns0q4pvHwfo)

public class JoinExample {
    static final int AGE_ADD_CONSTANT = 100;

    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();

        DataSet<Record> allRecords = env.fromElements(
                new Record("aa", 1), new Record("bb", 2), new
Record("ee", 3), new Record("ww", 23),
                new Record("cc", 3),
                new Record("dd", 1), new Record("bb", 12), new
Record("aa", 6), new Record("aa", 33),
                new Record("ff", 87));
        allRecords.print();
        System.out.println();

        DataSet<Record> newerRecords = allRecords
                .filter(new FilterFunction<Record>() {
                    @Override
                    public boolean filter(Record user) throws Exception {
                        return user.age < 10;
                    }
                })
                .groupBy(new KeySelector<Record, String>() {
                    @Override
                    public String getKey(Record user) throws Exception {
                        return user.id;
                    }
                })
                .combineGroup(new GroupCombineFunction<Record, Record>() {
                    @Override
                    public void combine(Iterable<Record> iterable,
Collector<Record> collector) throws Exception {
                        int age = 0;
                        String id = null;
                        for (Record user : iterable) {
                            if (id == null) {
                                id = user.id;
                            }
                            //Just some way to fake a merge.
                            age = (age * AGE_ADD_CONSTANT) + user.age;
                        }
                        collector.collect(new Record(id, age));
                    }
                });
        newerRecords.print();
        System.out.println();

        DataSet<Record> multiAgeRecords = allRecords
                /*
                If this were another data set, would it have been
possible to push all newerRecord ids
                partition by partition as multiple filters, without
having to do an explicit join?
                 */
                .join(newerRecords)
                .where(new KeySelector<Record, String>() {
                    @Override
                    public String getKey(Record user) throws Exception {
                        return user.id;
                    }
                }).equalTo(new KeySelector<Record, String>() {
                    @Override
                    public String getKey(Record user) throws Exception {
                        return user.id;
                    }
                })
                .with(new FlatJoinFunction<Record, Record, Record>() {
                    @Override
                    public void join(Record user, Record user2,
Collector<Record> collector) throws Exception {
                        collector.collect(new Record(user.id, user.age));
                    }
                })
                .groupBy(new KeySelector<Record, String>() {
                    @Override
                    public String getKey(Record user) throws Exception {
                        return user.id;
                    }
                })
                .combineGroup(new GroupCombineFunction<Record, Record>() {
                    @Override
                    public void combine(Iterable<Record> iterable,
Collector<Record> collector) throws Exception {
                        int age = 0;
                        String id = null;
                        for (Record user : iterable) {
                            if (id == null) {
                                id = user.id;
                            }
                            //Just some way to fake a merge.
                            age = (age * 100) + user.age;
                        }
                        collector.collect(new Record(id, age));
                    }
                });

        multiAgeRecords.print();
    }

    static class Record {
        String id;
        int age;

        Record(String id, int age) {
            this.id = id;
            this.age = age;
        }

        @Override
        public String toString() {
            return "{" + "id='" + id + '\'' + ", age=" + age + '}';
        }
    }
}



Thanks.

Re: Self-join with filter

Posted by Stephan Ewen <se...@apache.org>.
Hi!

I am not 100% sure that I understand your question completely, but I'll
give it my best shot.

If you want to push IDs into the connector, I assume you mean that you use
some form of connector that can filter by ID directly in the low level data
access paths, in order to read as little data as possible. In some sense,
what databases do with an indexed fetch.

I think there are two ways to go about this:

(1) If the set of IDs is very small, just collect it and make it part of
the second access' parameters:

List<Integer> ids = newerRecords.collect();

env.createInput(new MyFilteringParquetFormat(tableSpec, ids)); // this lets
parquet search for the IDs.


(2) If the set is not so small, you can custom-partition it and run a
custom fetch code. Something like this:

DataSet<Integer> ids = ... // your code to get the IDs
ids
    .partitionCustom(new MyPartitionerThatIsAwareOfTheStoragePartitioning())
    .mapPartition(new RichMapPartitionFunction<Integer, Record>() {

            mapPartition(Iterable<Integer> values, Collector<Record> out) {
                List<Integer> ids = collect(values);
                int partition = getRuntimeContext().getIndexOfThisSubtask();

                Reader readerForPartiton = new MyReader(partition);
                for (Record r : readerForPartiton.query(ids)) {
                    out.collect(r);
                }
            }
     })
    .print();

Hope that helps!

Concerning your question (2) about non-Hadoop connectors, Flink packages by
default
  - HBase
  - JDBC
  - Java Collections / Iterators
  - HCatalog

  - Kafka (streaming)
  - RabbitMQ (streaming)
  - Flume (streaming)

  - Also, all Hadoop connectors can be used (via Hadoop compatibility
functions) giving you access to Hadoop's MongoDB, Cassandra, etc connectors.

Was that what you were referring to, or did I misunderstand your question?



Some other comments concerning your code:

  - The "combineGroup" function does not group all elements with that key
together, but only those of the current partition. You probably want the
"reduceGroup()" function.

  - If you make your type "Record" a POJO (public constructor, public
fields or public accessors), then you can use field names for the keys.

  - Tuples are still the fastest type in Flink, so if you care about
performance big time, make your "Record" class a subclass of Tuple2, and
use tuple positions as keys


Greetings,
Stephan


On Mon, Aug 17, 2015 at 3:30 PM, Ashwin Jayaprakash <
ashwin.jayaprakash@gmail.com> wrote:

> Hi, I'm trying to evaluate Flink to see if it can do efficient semi-joins
> or self-joins with filter.
>
> Problem description:
> I have 1 stream that can contain "near duplicates" records. The records
> share a "family name" and so, many records can have the same family name.
> But each record has a unique id too.
>
> Query:
> 1) I would like to run a query such that I get the ids and names of some
> records based on a criterion
>
> 2) I then want to fetch all the records that match this set of ids fetched
> previously but this time include the duplicates that share the same name
>
> Question 1:
> I've pasted the sample code below, but I'm trying to see if I can push all
> the ids that came out of the first filter step "into" the connector,
> partition by partition and then pull out only the matching records in step
> 2. Is there a way to do this?
>
> Question 2:
> I haven't seen many examples of non-Hadoop connectors. Is there any plan
> for those?
>
> (If you are curious, I was looking at Presto and had to post the question
> there also -
> https://groups.google.com/forum/#!topic/presto-users/Ns0q4pvHwfo)
>
> public class JoinExample {
>     static final int AGE_ADD_CONSTANT = 100;
>
>     public static void main(String[] args) throws Exception {
>         final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>
>         DataSet<Record> allRecords = env.fromElements(
>                 new Record("aa", 1), new Record("bb", 2), new Record("ee", 3), new Record("ww", 23),
>                 new Record("cc", 3),
>                 new Record("dd", 1), new Record("bb", 12), new Record("aa", 6), new Record("aa", 33),
>                 new Record("ff", 87));
>         allRecords.print();
>         System.out.println();
>
>         DataSet<Record> newerRecords = allRecords
>                 .filter(new FilterFunction<Record>() {
>                     @Override
>                     public boolean filter(Record user) throws Exception {
>                         return user.age < 10;
>                     }
>                 })
>                 .groupBy(new KeySelector<Record, String>() {
>                     @Override
>                     public String getKey(Record user) throws Exception {
>                         return user.id;
>                     }
>                 })
>                 .combineGroup(new GroupCombineFunction<Record, Record>() {
>                     @Override
>                     public void combine(Iterable<Record> iterable, Collector<Record> collector) throws Exception {
>                         int age = 0;
>                         String id = null;
>                         for (Record user : iterable) {
>                             if (id == null) {
>                                 id = user.id;
>                             }
>                             //Just some way to fake a merge.
>                             age = (age * AGE_ADD_CONSTANT) + user.age;
>                         }
>                         collector.collect(new Record(id, age));
>                     }
>                 });
>         newerRecords.print();
>         System.out.println();
>
>         DataSet<Record> multiAgeRecords = allRecords
>                 /*
>                 If this were another data set, would it have been possible to push all newerRecord ids
>                 partition by partition as multiple filters, without having to do an explicit join?
>                  */
>                 .join(newerRecords)
>                 .where(new KeySelector<Record, String>() {
>                     @Override
>                     public String getKey(Record user) throws Exception {
>                         return user.id;
>                     }
>                 }).equalTo(new KeySelector<Record, String>() {
>                     @Override
>                     public String getKey(Record user) throws Exception {
>                         return user.id;
>                     }
>                 })
>                 .with(new FlatJoinFunction<Record, Record, Record>() {
>                     @Override
>                     public void join(Record user, Record user2, Collector<Record> collector) throws Exception {
>                         collector.collect(new Record(user.id, user.age));
>                     }
>                 })
>                 .groupBy(new KeySelector<Record, String>() {
>                     @Override
>                     public String getKey(Record user) throws Exception {
>                         return user.id;
>                     }
>                 })
>                 .combineGroup(new GroupCombineFunction<Record, Record>() {
>                     @Override
>                     public void combine(Iterable<Record> iterable, Collector<Record> collector) throws Exception {
>                         int age = 0;
>                         String id = null;
>                         for (Record user : iterable) {
>                             if (id == null) {
>                                 id = user.id;
>                             }
>                             //Just some way to fake a merge.
>                             age = (age * 100) + user.age;
>                         }
>                         collector.collect(new Record(id, age));
>                     }
>                 });
>
>         multiAgeRecords.print();
>     }
>
>     static class Record {
>         String id;
>         int age;
>
>         Record(String id, int age) {
>             this.id = id;
>             this.age = age;
>         }
>
>         @Override
>         public String toString() {
>             return "{" + "id='" + id + '\'' + ", age=" + age + '}';
>         }
>     }
> }
>
>
>
> Thanks.
>

Re: Self-join with filter

Posted by Ashwin Jayaprakash <as...@gmail.com>.
Stephan, this is exactly what I was looking for :) Thanks, will try it out.

I know the combineGroup() needed a reduceGroup() too, but I was just trying
out the APIs.

I did not realize that the other streaming APIs were already available. I
will have a look.

Thanks again.



On Mon, Aug 17, 2015 at 6:30 AM, Ashwin Jayaprakash <
ashwin.jayaprakash@gmail.com> wrote:

> Hi, I'm trying to evaluate Flink to see if it can do efficient semi-joins
> or self-joins with filter.
>
> Problem description:
> I have 1 stream that can contain "near duplicates" records. The records
> share a "family name" and so, many records can have the same family name.
> But each record has a unique id too.
>
> Query:
> 1) I would like to run a query such that I get the ids and names of some
> records based on a criterion
>
> 2) I then want to fetch all the records that match this set of ids fetched
> previously but this time include the duplicates that share the same name
>
> Question 1:
> I've pasted the sample code below, but I'm trying to see if I can push all
> the ids that came out of the first filter step "into" the connector,
> partition by partition and then pull out only the matching records in step
> 2. Is there a way to do this?
>
> Question 2:
> I haven't seen many examples of non-Hadoop connectors. Is there any plan
> for those?
>
> (If you are curious, I was looking at Presto and had to post the question
> there also -
> https://groups.google.com/forum/#!topic/presto-users/Ns0q4pvHwfo)
>
> public class JoinExample {
>     static final int AGE_ADD_CONSTANT = 100;
>
>     public static void main(String[] args) throws Exception {
>         final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>
>         DataSet<Record> allRecords = env.fromElements(
>                 new Record("aa", 1), new Record("bb", 2), new Record("ee", 3), new Record("ww", 23),
>                 new Record("cc", 3),
>                 new Record("dd", 1), new Record("bb", 12), new Record("aa", 6), new Record("aa", 33),
>                 new Record("ff", 87));
>         allRecords.print();
>         System.out.println();
>
>         DataSet<Record> newerRecords = allRecords
>                 .filter(new FilterFunction<Record>() {
>                     @Override
>                     public boolean filter(Record user) throws Exception {
>                         return user.age < 10;
>                     }
>                 })
>                 .groupBy(new KeySelector<Record, String>() {
>                     @Override
>                     public String getKey(Record user) throws Exception {
>                         return user.id;
>                     }
>                 })
>                 .combineGroup(new GroupCombineFunction<Record, Record>() {
>                     @Override
>                     public void combine(Iterable<Record> iterable, Collector<Record> collector) throws Exception {
>                         int age = 0;
>                         String id = null;
>                         for (Record user : iterable) {
>                             if (id == null) {
>                                 id = user.id;
>                             }
>                             //Just some way to fake a merge.
>                             age = (age * AGE_ADD_CONSTANT) + user.age;
>                         }
>                         collector.collect(new Record(id, age));
>                     }
>                 });
>         newerRecords.print();
>         System.out.println();
>
>         DataSet<Record> multiAgeRecords = allRecords
>                 /*
>                 If this were another data set, would it have been possible to push all newerRecord ids
>                 partition by partition as multiple filters, without having to do an explicit join?
>                  */
>                 .join(newerRecords)
>                 .where(new KeySelector<Record, String>() {
>                     @Override
>                     public String getKey(Record user) throws Exception {
>                         return user.id;
>                     }
>                 }).equalTo(new KeySelector<Record, String>() {
>                     @Override
>                     public String getKey(Record user) throws Exception {
>                         return user.id;
>                     }
>                 })
>                 .with(new FlatJoinFunction<Record, Record, Record>() {
>                     @Override
>                     public void join(Record user, Record user2, Collector<Record> collector) throws Exception {
>                         collector.collect(new Record(user.id, user.age));
>                     }
>                 })
>                 .groupBy(new KeySelector<Record, String>() {
>                     @Override
>                     public String getKey(Record user) throws Exception {
>                         return user.id;
>                     }
>                 })
>                 .combineGroup(new GroupCombineFunction<Record, Record>() {
>                     @Override
>                     public void combine(Iterable<Record> iterable, Collector<Record> collector) throws Exception {
>                         int age = 0;
>                         String id = null;
>                         for (Record user : iterable) {
>                             if (id == null) {
>                                 id = user.id;
>                             }
>                             //Just some way to fake a merge.
>                             age = (age * 100) + user.age;
>                         }
>                         collector.collect(new Record(id, age));
>                     }
>                 });
>
>         multiAgeRecords.print();
>     }
>
>     static class Record {
>         String id;
>         int age;
>
>         Record(String id, int age) {
>             this.id = id;
>             this.age = age;
>         }
>
>         @Override
>         public String toString() {
>             return "{" + "id='" + id + '\'' + ", age=" + age + '}';
>         }
>     }
> }
>
>
>
> Thanks.
>