You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Martin Junghanns <m....@mailbox.org> on 2015/10/28 13:15:00 UTC

Fast strategy for intersect

Hi all!

While working on FLINK-2905, I was wondering what a good (and fast) way 
to compute the intersect between two data sets (Gelly vertices in my 
case) with unknown size would be.

I came up with three ways to solve this:

Consider two sets:

DataSet<Vertex<K, VV>> verticesLeft =  this.getVertices();
DataSet<Vertex<K, VV>> verticesRight = graph.getVertices();

Way 1 (join)

intersectVertices = verticesLeft.join(verticesRight)
  .where(0)
  .equalTo(0)
  .with(new JoinFunction<Vertex<K, VV>, /* .. * ./>() {
   @Override
   public Vertex<K, VV> join(Vertex<K, VV> first, Vertex<K, VV> second)
    throws Exception {
     return first;
   }
});

Way 2 (coGroup)

intersectVertices = verticesLeft.coGroup(verticesRight)
  .where(0)
  .equalTo(0)
  .with(new CoGroupFunction<Vertex<K, VV>, /* .. */ >() {
   @Override
   public void coGroup(Iterable<Vertex<K, VV>> first,
	Iterable<Vertex<K, VV>> second,
	Collector<Vertex<K, VV>> out) throws Exception {
    Iterator<Vertex<K, VV>> leftIt = first.iterator();
    Iterator<Vertex<K, VV>> rightIt = second.iterator();
    if (leftIt.hasNext() && rightIt.hasNext()) {
     out.collect(leftIt.next());
    }
   }
  });

Way 3 (union + groupBy + aggregate)

intersectVertices = verticesLeft.union(verticesRight)
  .map(new MapFunction<Vertex<K, VV>, Tuple3<K, VV, Integer>>() {
   @Override
   public Tuple3<K, VV, Integer> map(Vertex<K, VV> vertex)
	throws Exception {
    return new Tuple3<>(vertex.f0, vertex.f1, 1);
   }
  }).withForwardedFields("f0;f1")
  .groupBy(0) // vertex id
  .aggregate(Aggregations.SUM, 2)
  .filter(new FilterFunction<Tuple3<K, VV, Integer>>() {
   @Override
   public boolean filter(Tuple3<K, VV, Integer> value) {
    return value.f2 == 2;
   }
  })
  .map(new MapFunction<Tuple3<K, VV, Integer>, Vertex<K, VV>>() {
   @Override
   public Vertex<K, VV> map(Tuple3<K, VV, Integer> vertexWithAggregate) {
    return new Vertex<>(vertexWithAggregate.f0, vertexWithAggregate.f1);
   }
  }).withForwardedFields("f0;f1");

Thanks for your input.

Best,

Martin




Re: Fast strategy for intersect

Posted by Fabian Hueske <fh...@gmail.com>.
I would go for the first solution with the join.

This gives the engine the highest degree of freedom:
- repartition vs. broadcast-forward
- sort-merge vs. hash-join

Best, Fabian

2015-10-28 18:45 GMT+01:00 Vasiliki Kalavri <va...@gmail.com>:

> Hi Martin,
>
> isn't finding the intersection of edges enough in this case?
> And assuming there are no duplicate edges, I believe a join should do the
> trick.
>
> Cheers,
> -Vasia.
>
> On 28 October 2015 at 13:15, Martin Junghanns <m....@mailbox.org>
> wrote:
>
> > Hi all!
> >
> > While working on FLINK-2905, I was wondering what a good (and fast) way
> to
> > compute the intersect between two data sets (Gelly vertices in my case)
> > with unknown size would be.
> >
> > I came up with three ways to solve this:
> >
> > Consider two sets:
> >
> > DataSet<Vertex<K, VV>> verticesLeft =  this.getVertices();
> > DataSet<Vertex<K, VV>> verticesRight = graph.getVertices();
> >
> > Way 1 (join)
> >
> > intersectVertices = verticesLeft.join(verticesRight)
> >  .where(0)
> >  .equalTo(0)
> >  .with(new JoinFunction<Vertex<K, VV>, /* .. * ./>() {
> >   @Override
> >   public Vertex<K, VV> join(Vertex<K, VV> first, Vertex<K, VV> second)
> >    throws Exception {
> >     return first;
> >   }
> > });
> >
> > Way 2 (coGroup)
> >
> > intersectVertices = verticesLeft.coGroup(verticesRight)
> >  .where(0)
> >  .equalTo(0)
> >  .with(new CoGroupFunction<Vertex<K, VV>, /* .. */ >() {
> >   @Override
> >   public void coGroup(Iterable<Vertex<K, VV>> first,
> >         Iterable<Vertex<K, VV>> second,
> >         Collector<Vertex<K, VV>> out) throws Exception {
> >    Iterator<Vertex<K, VV>> leftIt = first.iterator();
> >    Iterator<Vertex<K, VV>> rightIt = second.iterator();
> >    if (leftIt.hasNext() && rightIt.hasNext()) {
> >     out.collect(leftIt.next());
> >    }
> >   }
> >  });
> >
> > Way 3 (union + groupBy + aggregate)
> >
> > intersectVertices = verticesLeft.union(verticesRight)
> >  .map(new MapFunction<Vertex<K, VV>, Tuple3<K, VV, Integer>>() {
> >   @Override
> >   public Tuple3<K, VV, Integer> map(Vertex<K, VV> vertex)
> >         throws Exception {
> >    return new Tuple3<>(vertex.f0, vertex.f1, 1);
> >   }
> >  }).withForwardedFields("f0;f1")
> >  .groupBy(0) // vertex id
> >  .aggregate(Aggregations.SUM, 2)
> >  .filter(new FilterFunction<Tuple3<K, VV, Integer>>() {
> >   @Override
> >   public boolean filter(Tuple3<K, VV, Integer> value) {
> >    return value.f2 == 2;
> >   }
> >  })
> >  .map(new MapFunction<Tuple3<K, VV, Integer>, Vertex<K, VV>>() {
> >   @Override
> >   public Vertex<K, VV> map(Tuple3<K, VV, Integer> vertexWithAggregate) {
> >    return new Vertex<>(vertexWithAggregate.f0, vertexWithAggregate.f1);
> >   }
> >  }).withForwardedFields("f0;f1");
> >
> > Thanks for your input.
> >
> > Best,
> >
> > Martin
> >
> >
> >
> >
>

Re: Fast strategy for intersect

Posted by Vasiliki Kalavri <va...@gmail.com>.
Hi Martin,

isn't finding the intersection of edges enough in this case?
And assuming there are no duplicate edges, I believe a join should do the
trick.

Cheers,
-Vasia.

On 28 October 2015 at 13:15, Martin Junghanns <m....@mailbox.org>
wrote:

> Hi all!
>
> While working on FLINK-2905, I was wondering what a good (and fast) way to
> compute the intersect between two data sets (Gelly vertices in my case)
> with unknown size would be.
>
> I came up with three ways to solve this:
>
> Consider two sets:
>
> DataSet<Vertex<K, VV>> verticesLeft =  this.getVertices();
> DataSet<Vertex<K, VV>> verticesRight = graph.getVertices();
>
> Way 1 (join)
>
> intersectVertices = verticesLeft.join(verticesRight)
>  .where(0)
>  .equalTo(0)
>  .with(new JoinFunction<Vertex<K, VV>, /* .. * ./>() {
>   @Override
>   public Vertex<K, VV> join(Vertex<K, VV> first, Vertex<K, VV> second)
>    throws Exception {
>     return first;
>   }
> });
>
> Way 2 (coGroup)
>
> intersectVertices = verticesLeft.coGroup(verticesRight)
>  .where(0)
>  .equalTo(0)
>  .with(new CoGroupFunction<Vertex<K, VV>, /* .. */ >() {
>   @Override
>   public void coGroup(Iterable<Vertex<K, VV>> first,
>         Iterable<Vertex<K, VV>> second,
>         Collector<Vertex<K, VV>> out) throws Exception {
>    Iterator<Vertex<K, VV>> leftIt = first.iterator();
>    Iterator<Vertex<K, VV>> rightIt = second.iterator();
>    if (leftIt.hasNext() && rightIt.hasNext()) {
>     out.collect(leftIt.next());
>    }
>   }
>  });
>
> Way 3 (union + groupBy + aggregate)
>
> intersectVertices = verticesLeft.union(verticesRight)
>  .map(new MapFunction<Vertex<K, VV>, Tuple3<K, VV, Integer>>() {
>   @Override
>   public Tuple3<K, VV, Integer> map(Vertex<K, VV> vertex)
>         throws Exception {
>    return new Tuple3<>(vertex.f0, vertex.f1, 1);
>   }
>  }).withForwardedFields("f0;f1")
>  .groupBy(0) // vertex id
>  .aggregate(Aggregations.SUM, 2)
>  .filter(new FilterFunction<Tuple3<K, VV, Integer>>() {
>   @Override
>   public boolean filter(Tuple3<K, VV, Integer> value) {
>    return value.f2 == 2;
>   }
>  })
>  .map(new MapFunction<Tuple3<K, VV, Integer>, Vertex<K, VV>>() {
>   @Override
>   public Vertex<K, VV> map(Tuple3<K, VV, Integer> vertexWithAggregate) {
>    return new Vertex<>(vertexWithAggregate.f0, vertexWithAggregate.f1);
>   }
>  }).withForwardedFields("f0;f1");
>
> Thanks for your input.
>
> Best,
>
> Martin
>
>
>
>