You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Martin Junghanns <m....@mailbox.org> on 2015/10/28 13:15:00 UTC
Fast strategy for intersect
Hi all!
While working on FLINK-2905, I was wondering what a good (and fast) way
to compute the intersect between two data sets (Gelly vertices in my
case) with unknown size would be.
I came up with three ways to solve this:
Consider two sets:
DataSet<Vertex<K, VV>> verticesLeft = this.getVertices();
DataSet<Vertex<K, VV>> verticesRight = graph.getVertices();
Way 1 (join)
intersectVertices = verticesLeft.join(verticesRight)
.where(0)
.equalTo(0)
.with(new JoinFunction<Vertex<K, VV>, /* .. * ./>() {
@Override
public Vertex<K, VV> join(Vertex<K, VV> first, Vertex<K, VV> second)
throws Exception {
return first;
}
});
Way 2 (coGroup)
intersectVertices = verticesLeft.coGroup(verticesRight)
.where(0)
.equalTo(0)
.with(new CoGroupFunction<Vertex<K, VV>, /* .. */ >() {
@Override
public void coGroup(Iterable<Vertex<K, VV>> first,
Iterable<Vertex<K, VV>> second,
Collector<Vertex<K, VV>> out) throws Exception {
Iterator<Vertex<K, VV>> leftIt = first.iterator();
Iterator<Vertex<K, VV>> rightIt = second.iterator();
if (leftIt.hasNext() && rightIt.hasNext()) {
out.collect(leftIt.next());
}
}
});
Way 3 (union + groupBy + aggregate)
intersectVertices = verticesLeft.union(verticesRight)
.map(new MapFunction<Vertex<K, VV>, Tuple3<K, VV, Integer>>() {
@Override
public Tuple3<K, VV, Integer> map(Vertex<K, VV> vertex)
throws Exception {
return new Tuple3<>(vertex.f0, vertex.f1, 1);
}
}).withForwardedFields("f0;f1")
.groupBy(0) // vertex id
.aggregate(Aggregations.SUM, 2)
.filter(new FilterFunction<Tuple3<K, VV, Integer>>() {
@Override
public boolean filter(Tuple3<K, VV, Integer> value) {
return value.f2 == 2;
}
})
.map(new MapFunction<Tuple3<K, VV, Integer>, Vertex<K, VV>>() {
@Override
public Vertex<K, VV> map(Tuple3<K, VV, Integer> vertexWithAggregate) {
return new Vertex<>(vertexWithAggregate.f0, vertexWithAggregate.f1);
}
}).withForwardedFields("f0;f1");
Thanks for your input.
Best,
Martin
Re: Fast strategy for intersect
Posted by Fabian Hueske <fh...@gmail.com>.
I would go for the first solution with the join.
This gives the engine the highest degree of freedom:
- repartition vs. broadcast-forward
- sort-merge vs. hash-join
Best, Fabian
2015-10-28 18:45 GMT+01:00 Vasiliki Kalavri <va...@gmail.com>:
> Hi Martin,
>
> isn't finding the intersection of edges enough in this case?
> And assuming there are no duplicate edges, I believe a join should do the
> trick.
>
> Cheers,
> -Vasia.
>
> On 28 October 2015 at 13:15, Martin Junghanns <m....@mailbox.org>
> wrote:
>
> > Hi all!
> >
> > While working on FLINK-2905, I was wondering what a good (and fast) way
> to
> > compute the intersect between two data sets (Gelly vertices in my case)
> > with unknown size would be.
> >
> > I came up with three ways to solve this:
> >
> > Consider two sets:
> >
> > DataSet<Vertex<K, VV>> verticesLeft = this.getVertices();
> > DataSet<Vertex<K, VV>> verticesRight = graph.getVertices();
> >
> > Way 1 (join)
> >
> > intersectVertices = verticesLeft.join(verticesRight)
> > .where(0)
> > .equalTo(0)
> > .with(new JoinFunction<Vertex<K, VV>, /* .. * ./>() {
> > @Override
> > public Vertex<K, VV> join(Vertex<K, VV> first, Vertex<K, VV> second)
> > throws Exception {
> > return first;
> > }
> > });
> >
> > Way 2 (coGroup)
> >
> > intersectVertices = verticesLeft.coGroup(verticesRight)
> > .where(0)
> > .equalTo(0)
> > .with(new CoGroupFunction<Vertex<K, VV>, /* .. */ >() {
> > @Override
> > public void coGroup(Iterable<Vertex<K, VV>> first,
> > Iterable<Vertex<K, VV>> second,
> > Collector<Vertex<K, VV>> out) throws Exception {
> > Iterator<Vertex<K, VV>> leftIt = first.iterator();
> > Iterator<Vertex<K, VV>> rightIt = second.iterator();
> > if (leftIt.hasNext() && rightIt.hasNext()) {
> > out.collect(leftIt.next());
> > }
> > }
> > });
> >
> > Way 3 (union + groupBy + aggregate)
> >
> > intersectVertices = verticesLeft.union(verticesRight)
> > .map(new MapFunction<Vertex<K, VV>, Tuple3<K, VV, Integer>>() {
> > @Override
> > public Tuple3<K, VV, Integer> map(Vertex<K, VV> vertex)
> > throws Exception {
> > return new Tuple3<>(vertex.f0, vertex.f1, 1);
> > }
> > }).withForwardedFields("f0;f1")
> > .groupBy(0) // vertex id
> > .aggregate(Aggregations.SUM, 2)
> > .filter(new FilterFunction<Tuple3<K, VV, Integer>>() {
> > @Override
> > public boolean filter(Tuple3<K, VV, Integer> value) {
> > return value.f2 == 2;
> > }
> > })
> > .map(new MapFunction<Tuple3<K, VV, Integer>, Vertex<K, VV>>() {
> > @Override
> > public Vertex<K, VV> map(Tuple3<K, VV, Integer> vertexWithAggregate) {
> > return new Vertex<>(vertexWithAggregate.f0, vertexWithAggregate.f1);
> > }
> > }).withForwardedFields("f0;f1");
> >
> > Thanks for your input.
> >
> > Best,
> >
> > Martin
> >
> >
> >
> >
>
Re: Fast strategy for intersect
Posted by Vasiliki Kalavri <va...@gmail.com>.
Hi Martin,
isn't finding the intersection of edges enough in this case?
And assuming there are no duplicate edges, I believe a join should do the
trick.
Cheers,
-Vasia.
On 28 October 2015 at 13:15, Martin Junghanns <m....@mailbox.org>
wrote:
> Hi all!
>
> While working on FLINK-2905, I was wondering what a good (and fast) way to
> compute the intersect between two data sets (Gelly vertices in my case)
> with unknown size would be.
>
> I came up with three ways to solve this:
>
> Consider two sets:
>
> DataSet<Vertex<K, VV>> verticesLeft = this.getVertices();
> DataSet<Vertex<K, VV>> verticesRight = graph.getVertices();
>
> Way 1 (join)
>
> intersectVertices = verticesLeft.join(verticesRight)
> .where(0)
> .equalTo(0)
> .with(new JoinFunction<Vertex<K, VV>, /* .. * ./>() {
> @Override
> public Vertex<K, VV> join(Vertex<K, VV> first, Vertex<K, VV> second)
> throws Exception {
> return first;
> }
> });
>
> Way 2 (coGroup)
>
> intersectVertices = verticesLeft.coGroup(verticesRight)
> .where(0)
> .equalTo(0)
> .with(new CoGroupFunction<Vertex<K, VV>, /* .. */ >() {
> @Override
> public void coGroup(Iterable<Vertex<K, VV>> first,
> Iterable<Vertex<K, VV>> second,
> Collector<Vertex<K, VV>> out) throws Exception {
> Iterator<Vertex<K, VV>> leftIt = first.iterator();
> Iterator<Vertex<K, VV>> rightIt = second.iterator();
> if (leftIt.hasNext() && rightIt.hasNext()) {
> out.collect(leftIt.next());
> }
> }
> });
>
> Way 3 (union + groupBy + aggregate)
>
> intersectVertices = verticesLeft.union(verticesRight)
> .map(new MapFunction<Vertex<K, VV>, Tuple3<K, VV, Integer>>() {
> @Override
> public Tuple3<K, VV, Integer> map(Vertex<K, VV> vertex)
> throws Exception {
> return new Tuple3<>(vertex.f0, vertex.f1, 1);
> }
> }).withForwardedFields("f0;f1")
> .groupBy(0) // vertex id
> .aggregate(Aggregations.SUM, 2)
> .filter(new FilterFunction<Tuple3<K, VV, Integer>>() {
> @Override
> public boolean filter(Tuple3<K, VV, Integer> value) {
> return value.f2 == 2;
> }
> })
> .map(new MapFunction<Tuple3<K, VV, Integer>, Vertex<K, VV>>() {
> @Override
> public Vertex<K, VV> map(Tuple3<K, VV, Integer> vertexWithAggregate) {
> return new Vertex<>(vertexWithAggregate.f0, vertexWithAggregate.f1);
> }
> }).withForwardedFields("f0;f1");
>
> Thanks for your input.
>
> Best,
>
> Martin
>
>
>
>