You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/10/09 18:05:59 UTC
[21/24] flink git commit: [FLINK-2833] [gelly] create a
flink-libraries module and move gelly there
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
new file mode 100755
index 0000000..b24f749
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -0,0 +1,1948 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.List;
+import java.util.Arrays;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.graph.gsa.ApplyFunction;
+import org.apache.flink.graph.gsa.GSAConfiguration;
+import org.apache.flink.graph.gsa.GatherFunction;
+import org.apache.flink.graph.gsa.GatherSumApplyIteration;
+import org.apache.flink.graph.gsa.SumFunction;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexCentricConfiguration;
+import org.apache.flink.graph.spargel.VertexCentricIteration;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.graph.utils.EdgeToTuple3Map;
+import org.apache.flink.graph.utils.Tuple2ToVertexMap;
+import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
+import org.apache.flink.graph.utils.VertexToTuple2Map;
+import org.apache.flink.graph.validation.GraphValidator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.types.NullValue;
+
+/**
+ * Represents a Graph consisting of {@link Edge edges} and {@link Vertex
+ * vertices}.
+ *
+ *
+ * @see org.apache.flink.graph.Edge
+ * @see org.apache.flink.graph.Vertex
+ *
+ * @param <K> the key type for edge and vertex identifiers
+ * @param <VV> the value type for vertices
+ * @param <EV> the value type for edges
+ */
+@SuppressWarnings("serial")
+public class Graph<K, VV, EV> {
+
+ private final ExecutionEnvironment context;
+ private final DataSet<Vertex<K, VV>> vertices;
+ private final DataSet<Edge<K, EV>> edges;
+
+ /**
+ * Creates a graph from two DataSets: vertices and edges
+ *
+ * @param vertices a DataSet of vertices.
+ * @param edges a DataSet of edges.
+ * @param context the flink execution environment.
+ */
+ private Graph(DataSet<Vertex<K, VV>> vertices, DataSet<Edge<K, EV>> edges, ExecutionEnvironment context) {
+ this.vertices = vertices;
+ this.edges = edges;
+ this.context = context;
+ }
+
+ /**
+ * Creates a graph from a Collection of vertices and a Collection of edges.
+ *
+ * @param vertices a Collection of vertices.
+ * @param edges a Collection of edges.
+ * @param context the flink execution environment.
+ * @return the newly created graph.
+ */
+ public static <K, VV, EV> Graph<K, VV, EV> fromCollection(Collection<Vertex<K, VV>> vertices,
+ Collection<Edge<K, EV>> edges, ExecutionEnvironment context) {
+
+ return fromDataSet(context.fromCollection(vertices),
+ context.fromCollection(edges), context);
+ }
+
+ /**
+ * Creates a graph from a Collection of edges, vertices are induced from the
+ * edges. Vertices are created automatically and their values are set to
+ * NullValue.
+ *
+ * @param edges a Collection of vertices.
+ * @param context the flink execution environment.
+ * @return the newly created graph.
+ */
+ public static <K, EV> Graph<K, NullValue, EV> fromCollection(Collection<Edge<K, EV>> edges,
+ ExecutionEnvironment context) {
+
+ return fromDataSet(context.fromCollection(edges), context);
+ }
+
+ /**
+ * Creates a graph from a Collection of edges, vertices are induced from the
+ * edges and vertex values are calculated by a mapper function. Vertices are
+ * created automatically and their values are set by applying the provided
+ * map function to the vertex ids.
+ *
+ * @param edges a Collection of edges.
+ * @param mapper the mapper function.
+ * @param context the flink execution environment.
+ * @return the newly created graph.
+ */
+ public static <K, VV, EV> Graph<K, VV, EV> fromCollection(Collection<Edge<K, EV>> edges,
+ final MapFunction<K, VV> mapper, ExecutionEnvironment context) {
+
+ return fromDataSet(context.fromCollection(edges), mapper, context);
+ }
+
+ /**
+ * Creates a graph from a DataSet of vertices and a DataSet of edges.
+ *
+ * @param vertices a DataSet of vertices.
+ * @param edges a DataSet of edges.
+ * @param context the flink execution environment.
+ * @return the newly created graph.
+ */
+ public static <K, VV, EV> Graph<K, VV, EV> fromDataSet(DataSet<Vertex<K, VV>> vertices,
+ DataSet<Edge<K, EV>> edges, ExecutionEnvironment context) {
+
+ return new Graph<K, VV, EV>(vertices, edges, context);
+ }
+
+ /**
+ * Creates a graph from a DataSet of edges, vertices are induced from the
+ * edges. Vertices are created automatically and their values are set to
+ * NullValue.
+ *
+ * @param edges a DataSet of edges.
+ * @param context the flink execution environment.
+ * @return the newly created graph.
+ */
+ public static <K, EV> Graph<K, NullValue, EV> fromDataSet(
+ DataSet<Edge<K, EV>> edges, ExecutionEnvironment context) {
+
+ DataSet<Vertex<K, NullValue>> vertices = edges.flatMap(new EmitSrcAndTarget<K, EV>()).distinct();
+
+ return new Graph<K, NullValue, EV>(vertices, edges, context);
+ }
+
+ private static final class EmitSrcAndTarget<K, EV> implements FlatMapFunction<
+ Edge<K, EV>, Vertex<K, NullValue>> {
+
+ public void flatMap(Edge<K, EV> edge, Collector<Vertex<K, NullValue>> out) {
+ out.collect(new Vertex<K, NullValue>(edge.f0, NullValue.getInstance()));
+ out.collect(new Vertex<K, NullValue>(edge.f1, NullValue.getInstance()));
+ }
+ }
+
+ /**
+ * Creates a graph from a DataSet of edges, vertices are induced from the
+ * edges and vertex values are calculated by a mapper function. Vertices are
+ * created automatically and their values are set by applying the provided
+ * map function to the vertex ids.
+ *
+ * @param edges a DataSet of edges.
+ * @param mapper the mapper function.
+ * @param context the flink execution environment.
+ * @return the newly created graph.
+ */
+ public static <K, VV, EV> Graph<K, VV, EV> fromDataSet(DataSet<Edge<K, EV>> edges,
+ final MapFunction<K, VV> mapper, ExecutionEnvironment context) {
+
+ TypeInformation<K> keyType = ((TupleTypeInfo<?>) edges.getType()).getTypeAt(0);
+
+ TypeInformation<VV> valueType = TypeExtractor.createTypeInfo(
+ MapFunction.class, mapper.getClass(), 1, null, null);
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ TypeInformation<Vertex<K, VV>> returnType = (TypeInformation<Vertex<K, VV>>) new TupleTypeInfo(
+ Vertex.class, keyType, valueType);
+
+ DataSet<Vertex<K, VV>> vertices = edges
+ .flatMap(new EmitSrcAndTargetAsTuple1<K, EV>()).distinct()
+ .map(new MapFunction<Tuple1<K>, Vertex<K, VV>>() {
+ public Vertex<K, VV> map(Tuple1<K> value) throws Exception {
+ return new Vertex<K, VV>(value.f0, mapper.map(value.f0));
+ }
+ }).returns(returnType).withForwardedFields("f0");
+
+ return new Graph<K, VV, EV>(vertices, edges, context);
+ }
+
+ private static final class EmitSrcAndTargetAsTuple1<K, EV> implements FlatMapFunction<
+ Edge<K, EV>, Tuple1<K>> {
+
+ public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) {
+ out.collect(new Tuple1<K>(edge.f0));
+ out.collect(new Tuple1<K>(edge.f1));
+ }
+ }
+
+ /**
+ * Creates a graph from a DataSet of Tuple objects for vertices and edges.
+ *
+ * Vertices with value are created from Tuple2, Edges with value are created
+ * from Tuple3.
+ *
+ * @param vertices a DataSet of Tuple2.
+ * @param edges a DataSet of Tuple3.
+ * @param context the flink execution environment.
+ * @return the newly created graph.
+ */
+ public static <K, VV, EV> Graph<K, VV, EV> fromTupleDataSet(DataSet<Tuple2<K, VV>> vertices,
+ DataSet<Tuple3<K, K, EV>> edges, ExecutionEnvironment context) {
+
+ DataSet<Vertex<K, VV>> vertexDataSet = vertices.map(new Tuple2ToVertexMap<K, VV>());
+ DataSet<Edge<K, EV>> edgeDataSet = edges.map(new Tuple3ToEdgeMap<K, EV>());
+ return fromDataSet(vertexDataSet, edgeDataSet, context);
+ }
+
+ /**
+ * Creates a graph from a DataSet of Tuple objects for edges, vertices are
+ * induced from the edges.
+ *
+ * Edges with value are created from Tuple3. Vertices are created
+ * automatically and their values are set to NullValue.
+ *
+ * @param edges a DataSet of Tuple3.
+ * @param context the flink execution environment.
+ * @return the newly created graph.
+ */
+ public static <K, EV> Graph<K, NullValue, EV> fromTupleDataSet(DataSet<Tuple3<K, K, EV>> edges,
+ ExecutionEnvironment context) {
+
+ DataSet<Edge<K, EV>> edgeDataSet = edges.map(new Tuple3ToEdgeMap<K, EV>());
+ return fromDataSet(edgeDataSet, context);
+ }
+
+ /**
+ * Creates a graph from a DataSet of Tuple objects for edges, vertices are
+ * induced from the edges and vertex values are calculated by a mapper
+ * function. Edges with value are created from Tuple3. Vertices are created
+ * automatically and their values are set by applying the provided map
+ * function to the vertex ids.
+ *
+ * @param edges a DataSet of Tuple3.
+ * @param mapper the mapper function.
+ * @param context the flink execution environment.
+ * @return the newly created graph.
+ */
+ public static <K, VV, EV> Graph<K, VV, EV> fromTupleDataSet(DataSet<Tuple3<K, K, EV>> edges,
+ final MapFunction<K, VV> mapper, ExecutionEnvironment context) {
+
+ DataSet<Edge<K, EV>> edgeDataSet = edges.map(new Tuple3ToEdgeMap<K, EV>());
+ return fromDataSet(edgeDataSet, mapper, context);
+ }
+
+ /**
+ * Creates a Graph from a CSV file of vertices and a CSV file of edges.
+ *
+ * @param verticesPath path to a CSV file with the Vertex data.
+ * @param edgesPath path to a CSV file with the Edge data
+ * @param context the Flink execution environment.
+ * @return An instance of {@link org.apache.flink.graph.GraphCsvReader},
+ * on which calling methods to specify types of the Vertex ID, Vertex value and Edge value returns a Graph.
+ *
+ * @see {@link org.apache.flink.graph.GraphCsvReader#types(Class, Class, Class)},
+ * {@link org.apache.flink.graph.GraphCsvReader#vertexTypes(Class, Class)},
+ * {@link org.apache.flink.graph.GraphCsvReader#edgeTypes(Class, Class)} and
+ * {@link org.apache.flink.graph.GraphCsvReader#keyType(Class)}.
+ */
+ public static GraphCsvReader fromCsvReader(String verticesPath, String edgesPath, ExecutionEnvironment context) {
+ return new GraphCsvReader(verticesPath, edgesPath, context);
+ }
+
+ /**
+ * Creates a graph from a CSV file of edges. Vertices will be created automatically.
+ *
+ * @param edgesPath a path to a CSV file with the Edges data
+ * @param context the execution environment.
+ * @return An instance of {@link org.apache.flink.graph.GraphCsvReader},
+ * on which calling methods to specify types of the Vertex ID, Vertex value and Edge value returns a Graph.
+ *
+ * @see {@link org.apache.flink.graph.GraphCsvReader#types(Class, Class, Class)},
+ * {@link org.apache.flink.graph.GraphCsvReader#vertexTypes(Class, Class)},
+ * {@link org.apache.flink.graph.GraphCsvReader#edgeTypes(Class, Class)} and
+ * {@link org.apache.flink.graph.GraphCsvReader#keyType(Class)}.
+ */
+ public static GraphCsvReader fromCsvReader(String edgesPath, ExecutionEnvironment context) {
+ return new GraphCsvReader(edgesPath, context);
+ }
+
+ /**
+ * Creates a graph from a CSV file of edges. Vertices will be created automatically and
+ * Vertex values are set by the provided mapper.
+ *
+ * @param edgesPath a path to a CSV file with the Edge data
+ * @param mapper the mapper function.
+ * @param context the execution environment.
+ * @return An instance of {@link org.apache.flink.graph.GraphCsvReader},
+ * on which calling methods to specify types of the Vertex ID, Vertex Value and Edge value returns a Graph.
+ *
+ * @see {@link org.apache.flink.graph.GraphCsvReader#types(Class, Class, Class)},
+ * {@link org.apache.flink.graph.GraphCsvReader#vertexTypes(Class, Class)},
+ * {@link org.apache.flink.graph.GraphCsvReader#edgeTypes(Class, Class)} and
+ * {@link org.apache.flink.graph.GraphCsvReader#keyType(Class)}.
+ */
+ public static <K, VV> GraphCsvReader fromCsvReader(String edgesPath,
+ final MapFunction<K, VV> mapper, ExecutionEnvironment context) {
+ return new GraphCsvReader(edgesPath, mapper, context);
+ }
+
+ /**
+ * @return the flink execution environment.
+ */
+ public ExecutionEnvironment getContext() {
+ return this.context;
+ }
+
+ /**
+ * Function that checks whether a Graph is a valid Graph,
+ * as defined by the given {@link GraphValidator}.
+ *
+ * @return true if the Graph is valid.
+ */
+ public Boolean validate(GraphValidator<K, VV, EV> validator) throws Exception {
+ return validator.validate(this);
+ }
+
+ /**
+ * @return the vertex DataSet.
+ */
+ public DataSet<Vertex<K, VV>> getVertices() {
+ return vertices;
+ }
+
+ /**
+ * @return the edge DataSet.
+ */
+ public DataSet<Edge<K, EV>> getEdges() {
+ return edges;
+ }
+
+ /**
+ * @return the vertex DataSet as Tuple2.
+ */
+ public DataSet<Tuple2<K, VV>> getVerticesAsTuple2() {
+ return vertices.map(new VertexToTuple2Map<K, VV>());
+ }
+
+ /**
+ * @return the edge DataSet as Tuple3.
+ */
+ public DataSet<Tuple3<K, K, EV>> getEdgesAsTuple3() {
+ return edges.map(new EdgeToTuple3Map<K, EV>());
+ }
+
+ /**
+ * This method allows access to the graph's edge values along with its source and target vertex values.
+ *
+ * @return a triplet DataSet consisting of (srcVertexId, trgVertexId, srcVertexValue, trgVertexValue, edgeValue)
+ */
+ public DataSet<Triplet<K, VV, EV>> getTriplets() {
+ return this.getVertices().join(this.getEdges()).where(0).equalTo(0)
+ .with(new ProjectEdgeWithSrcValue<K, VV, EV>())
+ .join(this.getVertices()).where(1).equalTo(0)
+ .with(new ProjectEdgeWithVertexValues<K, VV, EV>());
+ }
+
+ @ForwardedFieldsFirst("f1->f2")
+ @ForwardedFieldsSecond("f0; f1; f2->f3")
+ private static final class ProjectEdgeWithSrcValue<K, VV, EV> implements
+ FlatJoinFunction<Vertex<K, VV>, Edge<K, EV>, Tuple4<K, K, VV, EV>> {
+
+ @Override
+ public void join(Vertex<K, VV> vertex, Edge<K, EV> edge, Collector<Tuple4<K, K, VV, EV>> collector)
+ throws Exception {
+
+ collector.collect(new Tuple4<K, K, VV, EV>(edge.getSource(), edge.getTarget(), vertex.getValue(),
+ edge.getValue()));
+ }
+ }
+
+ @ForwardedFieldsFirst("f0; f1; f2; f3->f4")
+ @ForwardedFieldsSecond("f1->f3")
+ private static final class ProjectEdgeWithVertexValues<K, VV, EV> implements
+ FlatJoinFunction<Tuple4<K, K, VV, EV>, Vertex<K, VV>, Triplet<K, VV, EV>> {
+
+ @Override
+ public void join(Tuple4<K, K, VV, EV> tripletWithSrcValSet,
+ Vertex<K, VV> vertex, Collector<Triplet<K, VV, EV>> collector) throws Exception {
+
+ collector.collect(new Triplet<K, VV, EV>(tripletWithSrcValSet.f0, tripletWithSrcValSet.f1,
+ tripletWithSrcValSet.f2, vertex.getValue(), tripletWithSrcValSet.f3));
+ }
+ }
+
+ /**
+ * Apply a function to the attribute of each vertex in the graph.
+ *
+ * @param mapper the map function to apply.
+ * @return a new graph
+ */
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public <NV> Graph<K, NV, EV> mapVertices(final MapFunction<Vertex<K, VV>, NV> mapper) {
+
+ TypeInformation<K> keyType = ((TupleTypeInfo<?>) vertices.getType()).getTypeAt(0);
+
+ TypeInformation<NV> valueType = TypeExtractor.createTypeInfo(MapFunction.class, mapper.getClass(), 1, null, null);
+
+ TypeInformation<Vertex<K, NV>> returnType = (TypeInformation<Vertex<K, NV>>) new TupleTypeInfo(
+ Vertex.class, keyType, valueType);
+
+ return mapVertices(mapper, returnType);
+ }
+
+ /**
+ * Apply a function to the attribute of each vertex in the graph.
+ *
+ * @param mapper the map function to apply.
+ * @param returnType the explicit return type.
+ * @return a new graph
+ */
+ public <NV> Graph<K, NV, EV> mapVertices(final MapFunction<Vertex<K, VV>, NV> mapper, TypeInformation<Vertex<K,NV>> returnType) {
+ DataSet<Vertex<K, NV>> mappedVertices = vertices.map(
+ new MapFunction<Vertex<K, VV>, Vertex<K, NV>>() {
+ public Vertex<K, NV> map(Vertex<K, VV> value) throws Exception {
+ return new Vertex<K, NV>(value.f0, mapper.map(value));
+ }
+ })
+ .returns(returnType)
+ .withForwardedFields("f0");
+
+ return new Graph<K, NV, EV>(mappedVertices, this.edges, this.context);
+ }
+
+ /**
+ * Apply a function to the attribute of each edge in the graph.
+ *
+ * @param mapper the map function to apply.
+ * @return a new graph
+ */
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public <NV> Graph<K, VV, NV> mapEdges(final MapFunction<Edge<K, EV>, NV> mapper) {
+
+ TypeInformation<K> keyType = ((TupleTypeInfo<?>) edges.getType()).getTypeAt(0);
+
+ TypeInformation<NV> valueType = TypeExtractor.createTypeInfo(MapFunction.class, mapper.getClass(), 1, null, null);
+
+ TypeInformation<Edge<K, NV>> returnType = (TypeInformation<Edge<K, NV>>) new TupleTypeInfo(
+ Edge.class, keyType, keyType, valueType);
+
+ return mapEdges(mapper, returnType);
+ }
+
+ /**
+ * Apply a function to the attribute of each edge in the graph.
+ *
+ * @param mapper the map function to apply.
+ * @param returnType the explicit return type.
+ * @return a new graph
+ */
+ public <NV> Graph<K, VV, NV> mapEdges(final MapFunction<Edge<K, EV>, NV> mapper, TypeInformation<Edge<K,NV>> returnType) {
+ DataSet<Edge<K, NV>> mappedEdges = edges.map(
+ new MapFunction<Edge<K, EV>, Edge<K, NV>>() {
+ public Edge<K, NV> map(Edge<K, EV> value) throws Exception {
+ return new Edge<K, NV>(value.f0, value.f1, mapper
+ .map(value));
+ }
+ })
+ .returns(returnType)
+ .withForwardedFields("f0; f1");
+
+ return new Graph<K, VV, NV>(this.vertices, mappedEdges, this.context);
+ }
+
+ /**
+ * Joins the vertex DataSet of this graph with an input DataSet and applies
+ * a UDF on the resulted values.
+ *
+ * @param inputDataSet the DataSet to join with.
+ * @param mapper the UDF map function to apply.
+ * @return a new graph where the vertex values have been updated.
+ */
+ public <T> Graph<K, VV, EV> joinWithVertices(DataSet<Tuple2<K, T>> inputDataSet,
+ final MapFunction<Tuple2<VV, T>, VV> mapper) {
+
+ DataSet<Vertex<K, VV>> resultedVertices = this.getVertices()
+ .coGroup(inputDataSet).where(0).equalTo(0)
+ .with(new ApplyCoGroupToVertexValues<K, VV, T>(mapper));
+ return new Graph<K, VV, EV>(resultedVertices, this.edges, this.context);
+ }
+
+ private static final class ApplyCoGroupToVertexValues<K, VV, T>
+ implements CoGroupFunction<Vertex<K, VV>, Tuple2<K, T>, Vertex<K, VV>> {
+
+ private MapFunction<Tuple2<VV, T>, VV> mapper;
+
+ public ApplyCoGroupToVertexValues(MapFunction<Tuple2<VV, T>, VV> mapper) {
+ this.mapper = mapper;
+ }
+
+ @Override
+ public void coGroup(Iterable<Vertex<K, VV>> vertices,
+ Iterable<Tuple2<K, T>> input, Collector<Vertex<K, VV>> collector) throws Exception {
+
+ final Iterator<Vertex<K, VV>> vertexIterator = vertices.iterator();
+ final Iterator<Tuple2<K, T>> inputIterator = input.iterator();
+
+ if (vertexIterator.hasNext()) {
+ if (inputIterator.hasNext()) {
+ final Tuple2<K, T> inputNext = inputIterator.next();
+
+ collector.collect(new Vertex<K, VV>(inputNext.f0, mapper
+ .map(new Tuple2<VV, T>(vertexIterator.next().f1,
+ inputNext.f1))));
+ } else {
+ collector.collect(vertexIterator.next());
+ }
+
+ }
+ }
+ }
+
+ /**
+ * Joins the edge DataSet with an input DataSet on a composite key of both
+ * source and target and applies a UDF on the resulted values.
+ *
+ * @param inputDataSet the DataSet to join with.
+ * @param mapper the UDF map function to apply.
+ * @param <T> the return type
+ * @return a new graph where the edge values have been updated.
+ */
+ public <T> Graph<K, VV, EV> joinWithEdges(DataSet<Tuple3<K, K, T>> inputDataSet,
+ final MapFunction<Tuple2<EV, T>, EV> mapper) {
+
+ DataSet<Edge<K, EV>> resultedEdges = this.getEdges()
+ .coGroup(inputDataSet).where(0, 1).equalTo(0, 1)
+ .with(new ApplyCoGroupToEdgeValues<K, EV, T>(mapper));
+ return new Graph<K, VV, EV>(this.vertices, resultedEdges, this.context);
+ }
+
+ private static final class ApplyCoGroupToEdgeValues<K, EV, T>
+ implements CoGroupFunction<Edge<K, EV>, Tuple3<K, K, T>, Edge<K, EV>> {
+
+ private MapFunction<Tuple2<EV, T>, EV> mapper;
+
+ public ApplyCoGroupToEdgeValues(MapFunction<Tuple2<EV, T>, EV> mapper) {
+ this.mapper = mapper;
+ }
+
+ @Override
+ public void coGroup(Iterable<Edge<K, EV>> edges, Iterable<Tuple3<K, K, T>> input,
+ Collector<Edge<K, EV>> collector) throws Exception {
+
+ final Iterator<Edge<K, EV>> edgesIterator = edges.iterator();
+ final Iterator<Tuple3<K, K, T>> inputIterator = input.iterator();
+
+ if (edgesIterator.hasNext()) {
+ if (inputIterator.hasNext()) {
+ final Tuple3<K, K, T> inputNext = inputIterator.next();
+
+ collector.collect(new Edge<K, EV>(inputNext.f0,
+ inputNext.f1, mapper.map(new Tuple2<EV, T>(
+ edgesIterator.next().f2, inputNext.f2))));
+ } else {
+ collector.collect(edgesIterator.next());
+ }
+ }
+ }
+ }
+
+ /**
+ * Joins the edge DataSet with an input DataSet on the source key of the
+ * edges and the first attribute of the input DataSet and applies a UDF on
+ * the resulted values. In case the inputDataSet contains the same key more
+ * than once, only the first value will be considered.
+ *
+ * @param inputDataSet the DataSet to join with.
+ * @param mapper the UDF map function to apply.
+ * @param <T> the return type
+ * @return a new graph where the edge values have been updated.
+ */
+ public <T> Graph<K, VV, EV> joinWithEdgesOnSource(DataSet<Tuple2<K, T>> inputDataSet,
+ final MapFunction<Tuple2<EV, T>, EV> mapper) {
+
+ DataSet<Edge<K, EV>> resultedEdges = this.getEdges()
+ .coGroup(inputDataSet).where(0).equalTo(0)
+ .with(new ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>(mapper));
+
+ return new Graph<K, VV, EV>(this.vertices, resultedEdges, this.context);
+ }
+
+ private static final class ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>
+ implements CoGroupFunction<Edge<K, EV>, Tuple2<K, T>, Edge<K, EV>> {
+
+ private MapFunction<Tuple2<EV, T>, EV> mapper;
+
+ public ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget(
+ MapFunction<Tuple2<EV, T>, EV> mapper) {
+ this.mapper = mapper;
+ }
+
+ @Override
+ public void coGroup(Iterable<Edge<K, EV>> edges,
+ Iterable<Tuple2<K, T>> input, Collector<Edge<K, EV>> collector) throws Exception {
+
+ final Iterator<Edge<K, EV>> edgesIterator = edges.iterator();
+ final Iterator<Tuple2<K, T>> inputIterator = input.iterator();
+
+ if (inputIterator.hasNext()) {
+ final Tuple2<K, T> inputNext = inputIterator.next();
+
+ while (edgesIterator.hasNext()) {
+ Edge<K, EV> edgesNext = edgesIterator.next();
+
+ collector.collect(new Edge<K, EV>(edgesNext.f0,
+ edgesNext.f1, mapper.map(new Tuple2<EV, T>(
+ edgesNext.f2, inputNext.f1))));
+ }
+
+ } else {
+ while (edgesIterator.hasNext()) {
+ collector.collect(edgesIterator.next());
+ }
+ }
+ }
+ }
+
+ /**
+ * Joins the edge DataSet with an input DataSet on the target key of the
+ * edges and the first attribute of the input DataSet and applies a UDF on
+ * the resulted values. Should the inputDataSet contain the same key more
+ * than once, only the first value will be considered.
+ *
+ * @param inputDataSet the DataSet to join with.
+ * @param mapper the UDF map function to apply.
+ * @param <T> the return type
+ * @return a new graph where the edge values have been updated.
+ */
+ public <T> Graph<K, VV, EV> joinWithEdgesOnTarget(DataSet<Tuple2<K, T>> inputDataSet,
+ final MapFunction<Tuple2<EV, T>, EV> mapper) {
+
+ DataSet<Edge<K, EV>> resultedEdges = this.getEdges()
+ .coGroup(inputDataSet).where(1).equalTo(0)
+ .with(new ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>(mapper));
+
+ return new Graph<K, VV, EV>(this.vertices, resultedEdges, this.context);
+ }
+
+ /**
+ * Apply filtering functions to the graph and return a sub-graph that
+ * satisfies the predicates for both vertices and edges.
+ *
+ * @param vertexFilter the filter function for vertices.
+ * @param edgeFilter the filter function for edges.
+ * @return the resulting sub-graph.
+ */
+ public Graph<K, VV, EV> subgraph(FilterFunction<Vertex<K, VV>> vertexFilter, FilterFunction<Edge<K, EV>> edgeFilter) {
+
+ DataSet<Vertex<K, VV>> filteredVertices = this.vertices.filter(vertexFilter);
+
+ DataSet<Edge<K, EV>> remainingEdges = this.edges.join(filteredVertices)
+ .where(0).equalTo(0).with(new ProjectEdge<K, VV, EV>())
+ .join(filteredVertices).where(1).equalTo(0)
+ .with(new ProjectEdge<K, VV, EV>());
+
+ DataSet<Edge<K, EV>> filteredEdges = remainingEdges.filter(edgeFilter);
+
+ return new Graph<K, VV, EV>(filteredVertices, filteredEdges,
+ this.context);
+ }
+
+ /**
+ * Apply a filtering function to the graph and return a sub-graph that
+ * satisfies the predicates only for the vertices.
+ *
+ * @param vertexFilter the filter function for vertices.
+ * @return the resulting sub-graph.
+ */
+ public Graph<K, VV, EV> filterOnVertices(FilterFunction<Vertex<K, VV>> vertexFilter) {
+
+ DataSet<Vertex<K, VV>> filteredVertices = this.vertices.filter(vertexFilter);
+
+ DataSet<Edge<K, EV>> remainingEdges = this.edges.join(filteredVertices)
+ .where(0).equalTo(0).with(new ProjectEdge<K, VV, EV>())
+ .join(filteredVertices).where(1).equalTo(0)
+ .with(new ProjectEdge<K, VV, EV>());
+
+ return new Graph<K, VV, EV>(filteredVertices, remainingEdges, this.context);
+ }
+
+ /**
+ * Apply a filtering function to the graph and return a sub-graph that
+ * satisfies the predicates only for the edges.
+ *
+ * @param edgeFilter the filter function for edges.
+ * @return the resulting sub-graph.
+ */
+ public Graph<K, VV, EV> filterOnEdges(FilterFunction<Edge<K, EV>> edgeFilter) {
+ DataSet<Edge<K, EV>> filteredEdges = this.edges.filter(edgeFilter);
+
+ return new Graph<K, VV, EV>(this.vertices, filteredEdges, this.context);
+ }
+
+ @ForwardedFieldsFirst("f0; f1; f2")
+ private static final class ProjectEdge<K, VV, EV> implements FlatJoinFunction<
+ Edge<K, EV>, Vertex<K, VV>, Edge<K, EV>> {
+ public void join(Edge<K, EV> first, Vertex<K, VV> second, Collector<Edge<K, EV>> out) {
+ out.collect(first);
+ }
+ }
+
+ /**
+ * Return the out-degree of all vertices in the graph
+ *
+ * @return A DataSet of Tuple2<vertexId, outDegree>
+ */
+ public DataSet<Tuple2<K, Long>> outDegrees() {
+
+ return vertices.coGroup(edges).where(0).equalTo(0).with(new CountNeighborsCoGroup<K, VV, EV>());
+ }
+
+ private static final class CountNeighborsCoGroup<K, VV, EV>
+ implements CoGroupFunction<Vertex<K, VV>, Edge<K, EV>, Tuple2<K, Long>> {
+ @SuppressWarnings("unused")
+ public void coGroup(Iterable<Vertex<K, VV>> vertex, Iterable<Edge<K, EV>> outEdges,
+ Collector<Tuple2<K, Long>> out) {
+ long count = 0;
+ for (Edge<K, EV> edge : outEdges) {
+ count++;
+ }
+
+ Iterator<Vertex<K, VV>> vertexIterator = vertex.iterator();
+
+ if(vertexIterator.hasNext()) {
+ out.collect(new Tuple2<K, Long>(vertexIterator.next().f0, count));
+ } else {
+ throw new NoSuchElementException("The edge src/trg id could not be found within the vertexIds");
+ }
+ }
+ }
+
+ /**
+ * Return the in-degree of all vertices in the graph
+ *
+ * @return A DataSet of Tuple2<vertexId, inDegree>
+ */
+ public DataSet<Tuple2<K, Long>> inDegrees() {
+
+ return vertices.coGroup(edges).where(0).equalTo(1).with(new CountNeighborsCoGroup<K, VV, EV>());
+ }
+
+ /**
+ * Return the degree of all vertices in the graph
+ *
+ * @return A DataSet of Tuple2<vertexId, degree>
+ */
+ public DataSet<Tuple2<K, Long>> getDegrees() {
+ return outDegrees().union(inDegrees()).groupBy(0).sum(1);
+ }
+
+ /**
+ * This operation adds all inverse-direction edges to the graph.
+ *
+ * @return the undirected graph.
+ */
+ public Graph<K, VV, EV> getUndirected() {
+
+ DataSet<Edge<K, EV>> undirectedEdges = edges.flatMap(new RegularAndReversedEdgesMap<K, EV>());
+ return new Graph<K, VV, EV>(vertices, undirectedEdges, this.context);
+ }
+
+ /**
+ * Compute an aggregate over the edges of each vertex. The function applied
+ * on the edges has access to the vertex value.
+ *
+ * @param edgesFunction
+ * the function to apply to the neighborhood
+ * @param direction
+ * the edge direction (in-, out-, all-)
+ * @param <T>
+ * the output type
+ * @return a dataset of a T
+ * @throws IllegalArgumentException
+ */
+ public <T> DataSet<T> groupReduceOnEdges(EdgesFunctionWithVertexValue<K, VV, EV, T> edgesFunction,
+ EdgeDirection direction) throws IllegalArgumentException {
+
+ switch (direction) {
+ case IN:
+ return vertices.coGroup(edges).where(0).equalTo(1)
+ .with(new ApplyCoGroupFunction<K, VV, EV, T>(edgesFunction));
+ case OUT:
+ return vertices.coGroup(edges).where(0).equalTo(0)
+ .with(new ApplyCoGroupFunction<K, VV, EV, T>(edgesFunction));
+ case ALL:
+ return vertices.coGroup(edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>()))
+ .where(0).equalTo(0).with(new ApplyCoGroupFunctionOnAllEdges<K, VV, EV, T>(edgesFunction));
+ default:
+ throw new IllegalArgumentException("Illegal edge direction");
+ }
+ }
+
+ /**
+ * Compute an aggregate over the edges of each vertex. The function applied
+ * on the edges has access to the vertex value.
+ *
+ * @param edgesFunction
+ * the function to apply to the neighborhood
+ * @param direction
+ * the edge direction (in-, out-, all-)
+ * @param <T>
+ * the output type
+ * @param typeInfo the explicit return type.
+ * @return a dataset of a T
+ * @throws IllegalArgumentException
+ */
+ public <T> DataSet<T> groupReduceOnEdges(EdgesFunctionWithVertexValue<K, VV, EV, T> edgesFunction,
+ EdgeDirection direction, TypeInformation<T> typeInfo) throws IllegalArgumentException {
+
+ switch (direction) {
+ case IN:
+ return vertices.coGroup(edges).where(0).equalTo(1)
+ .with(new ApplyCoGroupFunction<K, VV, EV, T>(edgesFunction)).returns(typeInfo);
+ case OUT:
+ return vertices.coGroup(edges).where(0).equalTo(0)
+ .with(new ApplyCoGroupFunction<K, VV, EV, T>(edgesFunction)).returns(typeInfo);
+ case ALL:
+ return vertices.coGroup(edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>()))
+ .where(0).equalTo(0).with(new ApplyCoGroupFunctionOnAllEdges<K, VV, EV, T>(edgesFunction)).returns(typeInfo);
+ default:
+ throw new IllegalArgumentException("Illegal edge direction");
+ }
+ }
+
+ /**
+ * Compute an aggregate over the edges of each vertex. The function applied
+ * on the edges only has access to the vertex id (not the vertex value).
+ *
+ * @param edgesFunction
+ * the function to apply to the neighborhood
+ * @param direction
+ * the edge direction (in-, out-, all-)
+ * @param <T>
+ * the output type
+ * @return a dataset of T
+ * @throws IllegalArgumentException
+ */
+ public <T> DataSet<T> groupReduceOnEdges(EdgesFunction<K, EV, T> edgesFunction,
+ EdgeDirection direction) throws IllegalArgumentException {
+
+ switch (direction) {
+ case IN:
+ return edges.map(new ProjectVertexIdMap<K, EV>(1))
+ .withForwardedFields("f1->f0")
+ .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction));
+ case OUT:
+ return edges.map(new ProjectVertexIdMap<K, EV>(0))
+ .withForwardedFields("f0")
+ .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction));
+ case ALL:
+ return edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>())
+ .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction));
+ default:
+ throw new IllegalArgumentException("Illegal edge direction");
+ }
+ }
+
+ /**
+ * Compute an aggregate over the edges of each vertex. The function applied
+ * on the edges only has access to the vertex id (not the vertex value).
+ *
+ * @param edgesFunction
+ * the function to apply to the neighborhood
+ * @param direction
+ * the edge direction (in-, out-, all-)
+ * @param <T>
+ * the output type
+ * @param typeInfo the explicit return type.
+ * @return a dataset of T
+ * @throws IllegalArgumentException
+ */
+ public <T> DataSet<T> groupReduceOnEdges(EdgesFunction<K, EV, T> edgesFunction,
+ EdgeDirection direction, TypeInformation<T> typeInfo) throws IllegalArgumentException {
+
+ switch (direction) {
+ case IN:
+ return edges.map(new ProjectVertexIdMap<K, EV>(1))
+ .withForwardedFields("f1->f0")
+ .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction)).returns(typeInfo);
+ case OUT:
+ return edges.map(new ProjectVertexIdMap<K, EV>(0))
+ .withForwardedFields("f0")
+ .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction)).returns(typeInfo);
+ case ALL:
+ return edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>())
+ .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction)).returns(typeInfo);
+ default:
+ throw new IllegalArgumentException("Illegal edge direction");
+ }
+ }
+
+ private static final class ProjectVertexIdMap<K, EV> implements MapFunction<
+ Edge<K, EV>, Tuple2<K, Edge<K, EV>>> {
+
+ private int fieldPosition;
+
+ public ProjectVertexIdMap(int position) {
+ this.fieldPosition = position;
+ }
+
+ @SuppressWarnings("unchecked")
+ public Tuple2<K, Edge<K, EV>> map(Edge<K, EV> edge) {
+ return new Tuple2<K, Edge<K, EV>>((K) edge.getField(fieldPosition), edge);
+ }
+ }
+
+ private static final class ProjectVertexWithEdgeValueMap<K, EV> implements MapFunction<
+ Edge<K, EV>, Tuple2<K, EV>> {
+
+ private int fieldPosition;
+
+ public ProjectVertexWithEdgeValueMap(int position) {
+ this.fieldPosition = position;
+ }
+
+ @SuppressWarnings("unchecked")
+ public Tuple2<K, EV> map(Edge<K, EV> edge) {
+ return new Tuple2<K, EV>((K) edge.getField(fieldPosition), edge.getValue());
+ }
+ }
+
+ private static final class ApplyGroupReduceFunction<K, EV, T> implements GroupReduceFunction<
+ Tuple2<K, Edge<K, EV>>, T>, ResultTypeQueryable<T> {
+
+ private EdgesFunction<K, EV, T> function;
+
+ public ApplyGroupReduceFunction(EdgesFunction<K, EV, T> fun) {
+ this.function = fun;
+ }
+
+ public void reduce(Iterable<Tuple2<K, Edge<K, EV>>> edges, Collector<T> out) throws Exception {
+ function.iterateEdges(edges, out);
+ }
+
+ @Override
+ public TypeInformation<T> getProducedType() {
+ return TypeExtractor.createTypeInfo(EdgesFunction.class, function.getClass(), 2, null, null);
+ }
+ }
+
+ private static final class EmitOneEdgePerNode<K, VV, EV> implements FlatMapFunction<
+ Edge<K, EV>, Tuple2<K, Edge<K, EV>>> {
+
+ public void flatMap(Edge<K, EV> edge, Collector<Tuple2<K, Edge<K, EV>>> out) {
+ out.collect(new Tuple2<K, Edge<K, EV>>(edge.getSource(), edge));
+ out.collect(new Tuple2<K, Edge<K, EV>>(edge.getTarget(), edge));
+ }
+ }
+
+ private static final class EmitOneVertexWithEdgeValuePerNode<K, EV> implements FlatMapFunction<
+ Edge<K, EV>, Tuple2<K, EV>> {
+
+ public void flatMap(Edge<K, EV> edge, Collector<Tuple2<K, EV>> out) {
+ out.collect(new Tuple2<K, EV>(edge.getSource(), edge.getValue()));
+ out.collect(new Tuple2<K, EV>(edge.getTarget(), edge.getValue()));
+ }
+ }
+
+ private static final class EmitOneEdgeWithNeighborPerNode<K, EV> implements FlatMapFunction<
+ Edge<K, EV>, Tuple3<K, K, Edge<K, EV>>> {
+
+ public void flatMap(Edge<K, EV> edge, Collector<Tuple3<K, K, Edge<K, EV>>> out) {
+ out.collect(new Tuple3<K, K, Edge<K, EV>>(edge.getSource(), edge.getTarget(), edge));
+ out.collect(new Tuple3<K, K, Edge<K, EV>>(edge.getTarget(), edge.getSource(), edge));
+ }
+ }
+
+ private static final class ApplyCoGroupFunction<K, VV, EV, T> implements CoGroupFunction<
+ Vertex<K, VV>, Edge<K, EV>, T>, ResultTypeQueryable<T> {
+
+ private EdgesFunctionWithVertexValue<K, VV, EV, T> function;
+
+ public ApplyCoGroupFunction(EdgesFunctionWithVertexValue<K, VV, EV, T> fun) {
+ this.function = fun;
+ }
+
+ public void coGroup(Iterable<Vertex<K, VV>> vertex,
+ Iterable<Edge<K, EV>> edges, Collector<T> out) throws Exception {
+
+ Iterator<Vertex<K, VV>> vertexIterator = vertex.iterator();
+
+ if(vertexIterator.hasNext()) {
+ function.iterateEdges(vertexIterator.next(), edges, out);
+ } else {
+ throw new NoSuchElementException("The edge src/trg id could not be found within the vertexIds");
+ }
+ }
+
+ @Override
+ public TypeInformation<T> getProducedType() {
+ return TypeExtractor.createTypeInfo(EdgesFunctionWithVertexValue.class, function.getClass(), 3,
+ null, null);
+ }
+ }
+
+ private static final class ApplyCoGroupFunctionOnAllEdges<K, VV, EV, T>
+ implements CoGroupFunction<Vertex<K, VV>, Tuple2<K, Edge<K, EV>>, T>, ResultTypeQueryable<T> {
+
+ private EdgesFunctionWithVertexValue<K, VV, EV, T> function;
+
+ public ApplyCoGroupFunctionOnAllEdges(EdgesFunctionWithVertexValue<K, VV, EV, T> fun) {
+ this.function = fun;
+ }
+
+ public void coGroup(Iterable<Vertex<K, VV>> vertex, final Iterable<Tuple2<K, Edge<K, EV>>> keysWithEdges,
+ Collector<T> out) throws Exception {
+
+ final Iterator<Edge<K, EV>> edgesIterator = new Iterator<Edge<K, EV>>() {
+
+ final Iterator<Tuple2<K, Edge<K, EV>>> keysWithEdgesIterator = keysWithEdges.iterator();
+
+ @Override
+ public boolean hasNext() {
+ return keysWithEdgesIterator.hasNext();
+ }
+
+ @Override
+ public Edge<K, EV> next() {
+ return keysWithEdgesIterator.next().f1;
+ }
+
+ @Override
+ public void remove() {
+ keysWithEdgesIterator.remove();
+ }
+ };
+
+ Iterable<Edge<K, EV>> edgesIterable = new Iterable<Edge<K, EV>>() {
+ public Iterator<Edge<K, EV>> iterator() {
+ return edgesIterator;
+ }
+ };
+
+ Iterator<Vertex<K, VV>> vertexIterator = vertex.iterator();
+
+ if(vertexIterator.hasNext()) {
+ function.iterateEdges(vertexIterator.next(), edgesIterable, out);
+ } else {
+ throw new NoSuchElementException("The edge src/trg id could not be found within the vertexIds");
+ }
+ }
+
+ @Override
+ public TypeInformation<T> getProducedType() {
+ return TypeExtractor.createTypeInfo(EdgesFunctionWithVertexValue.class, function.getClass(), 3,
+ null, null);
+ }
+ }
+
+ @ForwardedFields("f0->f1; f1->f0; f2")
+ private static final class ReverseEdgesMap<K, EV>
+ implements MapFunction<Edge<K, EV>, Edge<K, EV>> {
+
+ public Edge<K, EV> map(Edge<K, EV> value) {
+ return new Edge<K, EV>(value.f1, value.f0, value.f2);
+ }
+ }
+
+ private static final class RegularAndReversedEdgesMap<K, EV>
+ implements FlatMapFunction<Edge<K, EV>, Edge<K, EV>> {
+
+ @Override
+ public void flatMap(Edge<K, EV> edge, Collector<Edge<K, EV>> out) throws Exception {
+ out.collect(new Edge<K, EV>(edge.f0, edge.f1, edge.f2));
+ out.collect(new Edge<K, EV>(edge.f1, edge.f0, edge.f2));
+ }
+ }
+
+ /**
+ * Reverse the direction of the edges in the graph
+ *
+ * @return a new graph with all edges reversed
+ * @throws UnsupportedOperationException
+ */
+ public Graph<K, VV, EV> reverse() throws UnsupportedOperationException {
+ DataSet<Edge<K, EV>> reversedEdges = edges.map(new ReverseEdgesMap<K, EV>());
+ return new Graph<K, VV, EV>(vertices, reversedEdges, this.context);
+ }
+
+ /**
+ * @return a long integer representing the number of vertices
+ */
+ public long numberOfVertices() throws Exception {
+ return vertices.count();
+ }
+
+ /**
+ * @return a long integer representing the number of edges
+ */
+ public long numberOfEdges() throws Exception {
+ return edges.count();
+ }
+
+ /**
+ * @return The IDs of the vertices as DataSet
+ */
+ public DataSet<K> getVertexIds() {
+ return vertices.map(new ExtractVertexIDMapper<K, VV>());
+ }
+
+ private static final class ExtractVertexIDMapper<K, VV>
+ implements MapFunction<Vertex<K, VV>, K> {
+ @Override
+ public K map(Vertex<K, VV> vertex) {
+ return vertex.f0;
+ }
+ }
+
+ /**
+ * @return The IDs of the edges as DataSet
+ */
+ public DataSet<Tuple2<K, K>> getEdgeIds() {
+ return edges.map(new ExtractEdgeIDsMapper<K, EV>());
+ }
+
+ @ForwardedFields("f0; f1")
+ private static final class ExtractEdgeIDsMapper<K, EV>
+ implements MapFunction<Edge<K, EV>, Tuple2<K, K>> {
+ @Override
+ public Tuple2<K, K> map(Edge<K, EV> edge) throws Exception {
+ return new Tuple2<K, K>(edge.f0, edge.f1);
+ }
+ }
+
+ /**
+ * Adds the input vertex to the graph. If the vertex already
+ * exists in the graph, it will not be added again.
+ *
+ * @param vertex the vertex to be added
+ * @return the new graph containing the existing vertices as well as the one just added
+ */
+ public Graph<K, VV, EV> addVertex(final Vertex<K, VV> vertex) {
+ List<Vertex<K, VV>> newVertex = new ArrayList<Vertex<K, VV>>();
+ newVertex.add(vertex);
+
+ return addVertices(newVertex);
+ }
+
+ /**
+ * Adds the list of vertices, passed as input, to the graph.
+ * If the vertices already exist in the graph, they will not be added once more.
+ *
+ * @param verticesToAdd the list of vertices to add
+ * @return the new graph containing the existing and newly added vertices
+ */
+ public Graph<K, VV, EV> addVertices(List<Vertex<K, VV>> verticesToAdd) {
+ // Add the vertices
+ DataSet<Vertex<K, VV>> newVertices = this.vertices.union(this.context.fromCollection(verticesToAdd)).distinct();
+
+ return new Graph<K, VV, EV>(newVertices, this.edges, this.context);
+ }
+
+ /**
+ * Adds the given edge to the graph. If the source and target vertices do
+ * not exist in the graph, they will also be added.
+ *
+ * @param source the source vertex of the edge
+ * @param target the target vertex of the edge
+ * @param edgeValue the edge value
+ * @return the new graph containing the existing vertices and edges plus the
+ * newly added edge
+ */
+ public Graph<K, VV, EV> addEdge(Vertex<K, VV> source, Vertex<K, VV> target, EV edgeValue) {
+ Graph<K, VV, EV> partialGraph = fromCollection(Arrays.asList(source, target),
+ Arrays.asList(new Edge<K, EV>(source.f0, target.f0, edgeValue)),
+ this.context);
+ return this.union(partialGraph);
+ }
+
+ /**
+ * Adds the given list edges to the graph.
+ *
+ * When adding an edge for a non-existing set of vertices, the edge is considered invalid and ignored.
+ *
+ * @param newEdges the data set of edges to be added
+ * @return a new graph containing the existing edges plus the newly added edges.
+ */
+ public Graph<K, VV, EV> addEdges(List<Edge<K, EV>> newEdges) {
+
+ DataSet<Edge<K,EV>> newEdgesDataSet = this.context.fromCollection(newEdges);
+
+ DataSet<Edge<K,EV>> validNewEdges = this.getVertices().join(newEdgesDataSet)
+ .where(0).equalTo(0)
+ .with(new JoinVerticesWithEdgesOnSrc<K, VV, EV>())
+ .join(this.getVertices()).where(1).equalTo(0)
+ .with(new JoinWithVerticesOnTrg<K, VV, EV>());
+
+ return Graph.fromDataSet(this.vertices, this.edges.union(validNewEdges), this.context);
+ }
+
+ @ForwardedFieldsSecond("f0; f1; f2")
+ private static final class JoinVerticesWithEdgesOnSrc<K, VV, EV> implements
+ JoinFunction<Vertex<K, VV>, Edge<K, EV>, Edge<K, EV>> {
+
+ @Override
+ public Edge<K, EV> join(Vertex<K, VV> vertex, Edge<K, EV> edge) throws Exception {
+ return edge;
+ }
+ }
+
+ @ForwardedFieldsFirst("f0; f1; f2")
+ private static final class JoinWithVerticesOnTrg<K, VV, EV> implements
+ JoinFunction<Edge<K, EV>, Vertex<K, VV>, Edge<K, EV>> {
+
+ @Override
+ public Edge<K, EV> join(Edge<K, EV> edge, Vertex<K, VV> vertex) throws Exception {
+ return edge;
+ }
+ }
+
+ /**
+ * Removes the given vertex and its edges from the graph.
+ *
+ * @param vertex the vertex to remove
+ * @return the new graph containing the existing vertices and edges without
+ * the removed vertex and its edges
+ */
+ public Graph<K, VV, EV> removeVertex(Vertex<K, VV> vertex) {
+
+ List<Vertex<K, VV>> vertexToBeRemoved = new ArrayList<Vertex<K, VV>>();
+ vertexToBeRemoved.add(vertex);
+
+ return removeVertices(vertexToBeRemoved);
+ }
+ /**
+ * Removes the given list of vertices and its edges from the graph.
+ *
+ * @param verticesToBeRemoved the list of vertices to be removed
+ * @return the resulted graph containing the initial vertices and edges minus the vertices
+ * and edges removed.
+ */
+
+ public Graph<K, VV, EV> removeVertices(List<Vertex<K, VV>> verticesToBeRemoved)
+ {
+ return removeVertices(this.context.fromCollection(verticesToBeRemoved));
+ }
+
+ /**
+ * Removes the given list of vertices and its edges from the graph.
+ *
+ * @param verticesToBeRemoved the DataSet of vertices to be removed
+ * @return the resulted graph containing the initial vertices and edges minus the vertices
+ * and edges removed.
+ */
+ private Graph<K, VV, EV> removeVertices(DataSet<Vertex<K, VV>> verticesToBeRemoved) {
+
+ DataSet<Vertex<K, VV>> newVertices = getVertices().coGroup(verticesToBeRemoved).where(0).equalTo(0)
+ .with(new VerticesRemovalCoGroup<K, VV>());
+
+ DataSet < Edge < K, EV >> newEdges = newVertices.join(getEdges()).where(0).equalTo(0)
+ // if the edge source was removed, the edge will also be removed
+ .with(new ProjectEdgeToBeRemoved<K, VV, EV>())
+ // if the edge target was removed, the edge will also be removed
+ .join(newVertices).where(1).equalTo(0)
+ .with(new ProjectEdge<K, VV, EV>());
+
+ return new Graph<K, VV, EV>(newVertices, newEdges, context);
+ }
+
+ private static final class VerticesRemovalCoGroup<K, VV> implements CoGroupFunction<Vertex<K, VV>, Vertex<K, VV>, Vertex<K, VV>> {
+
+ @Override
+ public void coGroup(Iterable<Vertex<K, VV>> vertex, Iterable<Vertex<K, VV>> vertexToBeRemoved,
+ Collector<Vertex<K, VV>> out) throws Exception {
+
+ final Iterator<Vertex<K, VV>> vertexIterator = vertex.iterator();
+ final Iterator<Vertex<K, VV>> vertexToBeRemovedIterator = vertexToBeRemoved.iterator();
+ Vertex<K, VV> next;
+
+ if (vertexIterator.hasNext()) {
+ if (!vertexToBeRemovedIterator.hasNext()) {
+ next = vertexIterator.next();
+ out.collect(next);
+ }
+ }
+ }
+ }
+
+
+
+ @ForwardedFieldsSecond("f0; f1; f2")
+ private static final class ProjectEdgeToBeRemoved<K,VV,EV> implements JoinFunction<Vertex<K, VV>, Edge<K, EV>, Edge<K, EV>> {
+ @Override
+ public Edge<K, EV> join(Vertex<K, VV> vertex, Edge<K, EV> edge) throws Exception {
+ return edge;
+ }
+ }
+
+ /**
+ * Removes all edges that match the given edge from the graph.
+ *
+ * @param edge the edge to remove
+ * @return the new graph containing the existing vertices and edges without
+ * the removed edges
+ */
+ public Graph<K, VV, EV> removeEdge(Edge<K, EV> edge) {
+ DataSet<Edge<K, EV>> newEdges = getEdges().filter(new EdgeRemovalEdgeFilter<K, EV>(edge));
+ return new Graph<K, VV, EV>(this.vertices, newEdges, this.context);
+ }
+
+ private static final class EdgeRemovalEdgeFilter<K, EV>
+ implements FilterFunction<Edge<K, EV>> {
+ private Edge<K, EV> edgeToRemove;
+
+ public EdgeRemovalEdgeFilter(Edge<K, EV> edge) {
+ edgeToRemove = edge;
+ }
+
+ @Override
+ public boolean filter(Edge<K, EV> edge) {
+ return (!(edge.f0.equals(edgeToRemove.f0) && edge.f1
+ .equals(edgeToRemove.f1)));
+ }
+ }
+
+ /**
+ * Removes all the edges that match the edges in the given data set from the graph.
+ *
+ * @param edgesToBeRemoved the list of edges to be removed
+ * @return a new graph where the edges have been removed and in which the vertices remained intact
+ */
+ public Graph<K, VV, EV> removeEdges(List<Edge<K, EV>> edgesToBeRemoved) {
+
+ DataSet<Edge<K, EV>> newEdges = getEdges().coGroup(this.context.fromCollection(edgesToBeRemoved))
+ .where(0,1).equalTo(0,1).with(new EdgeRemovalCoGroup<K, EV>());
+
+ return new Graph<K, VV, EV>(this.vertices, newEdges, context);
+ }
+
+ private static final class EdgeRemovalCoGroup<K,EV> implements CoGroupFunction<Edge<K, EV>, Edge<K, EV>, Edge<K, EV>> {
+
+ @Override
+ public void coGroup(Iterable<Edge<K, EV>> edge, Iterable<Edge<K, EV>> edgeToBeRemoved,
+ Collector<Edge<K, EV>> out) throws Exception {
+
+ final Iterator<Edge<K, EV>> edgeIterator = edge.iterator();
+ final Iterator<Edge<K, EV>> edgeToBeRemovedIterator = edgeToBeRemoved.iterator();
+ Edge<K, EV> next;
+
+ if (edgeIterator.hasNext()) {
+ if (!edgeToBeRemovedIterator.hasNext()) {
+ next = edgeIterator.next();
+ out.collect(next);
+ }
+ }
+ }
+ }
+
+ /**
+ * Performs union on the vertices and edges sets of the input graphs
+ * removing duplicate vertices but maintaining duplicate edges.
+ *
+ * @param graph the graph to perform union with
+ * @return a new graph
+ */
+ public Graph<K, VV, EV> union(Graph<K, VV, EV> graph) {
+
+ DataSet<Vertex<K, VV>> unionedVertices = graph.getVertices().union(this.getVertices()).distinct();
+ DataSet<Edge<K, EV>> unionedEdges = graph.getEdges().union(this.getEdges());
+ return new Graph<K, VV, EV>(unionedVertices, unionedEdges, this.context);
+ }
+
+ /**
+ * Performs Difference on the vertex and edge sets of the input graphs
+ * removes common vertices and edges. If a source/target vertex is removed, its corresponding edge will also be removed
+ * @param graph the graph to perform difference with
+ * @return a new graph where the common vertices and edges have been removed
+ */
+ public Graph<K,VV,EV> difference(Graph<K,VV,EV> graph) {
+ DataSet<Vertex<K,VV>> removeVerticesData = graph.getVertices();
+ return this.removeVertices(removeVerticesData);
+ }
+
+ /**
+ * Runs a Vertex-Centric iteration on the graph.
+ * No configuration options are provided.
+ *
+ * @param vertexUpdateFunction the vertex update function
+ * @param messagingFunction the messaging function
+ * @param maximumNumberOfIterations maximum number of iterations to perform
+ *
+ * @return the updated Graph after the vertex-centric iteration has converged or
+ * after maximumNumberOfIterations.
+ */
+ public <M> Graph<K, VV, EV> runVertexCentricIteration(
+ VertexUpdateFunction<K, VV, M> vertexUpdateFunction,
+ MessagingFunction<K, VV, M, EV> messagingFunction,
+ int maximumNumberOfIterations) {
+
+ return this.runVertexCentricIteration(vertexUpdateFunction, messagingFunction,
+ maximumNumberOfIterations, null);
+ }
+
+ /**
+ * Runs a Vertex-Centric iteration on the graph with configuration options.
+ *
+ * @param vertexUpdateFunction the vertex update function
+ * @param messagingFunction the messaging function
+ * @param maximumNumberOfIterations maximum number of iterations to perform
+ * @param parameters the iteration configuration parameters
+ *
+ * @return the updated Graph after the vertex-centric iteration has converged or
+ * after maximumNumberOfIterations.
+ */
+ public <M> Graph<K, VV, EV> runVertexCentricIteration(
+ VertexUpdateFunction<K, VV, M> vertexUpdateFunction,
+ MessagingFunction<K, VV, M, EV> messagingFunction,
+ int maximumNumberOfIterations, VertexCentricConfiguration parameters) {
+
+ VertexCentricIteration<K, VV, M, EV> iteration = VertexCentricIteration.withEdges(
+ edges, vertexUpdateFunction, messagingFunction, maximumNumberOfIterations);
+
+ iteration.configure(parameters);
+
+ DataSet<Vertex<K, VV>> newVertices = this.getVertices().runOperation(iteration);
+
+ return new Graph<K, VV, EV>(newVertices, this.edges, this.context);
+ }
+
+ /**
+ * Runs a Gather-Sum-Apply iteration on the graph.
+ * No configuration options are provided.
+ *
+ * @param gatherFunction the gather function collects information about adjacent vertices and edges
+ * @param sumFunction the sum function aggregates the gathered information
+ * @param applyFunction the apply function updates the vertex values with the aggregates
+ * @param maximumNumberOfIterations maximum number of iterations to perform
+ * @param <M> the intermediate type used between gather, sum and apply
+ *
+ * @return the updated Graph after the gather-sum-apply iteration has converged or
+ * after maximumNumberOfIterations.
+ */
+ public <M> Graph<K, VV, EV> runGatherSumApplyIteration(
+ GatherFunction<VV, EV, M> gatherFunction, SumFunction<VV, EV, M> sumFunction,
+ ApplyFunction<K, VV, M> applyFunction, int maximumNumberOfIterations) {
+
+ return this.runGatherSumApplyIteration(gatherFunction, sumFunction, applyFunction,
+ maximumNumberOfIterations, null);
+ }
+
+ /**
+ * Runs a Gather-Sum-Apply iteration on the graph with configuration options.
+ *
+ * @param gatherFunction the gather function collects information about adjacent vertices and edges
+ * @param sumFunction the sum function aggregates the gathered information
+ * @param applyFunction the apply function updates the vertex values with the aggregates
+ * @param maximumNumberOfIterations maximum number of iterations to perform
+ * @param parameters the iteration configuration parameters
+ * @param <M> the intermediate type used between gather, sum and apply
+ *
+ * @return the updated Graph after the gather-sum-apply iteration has converged or
+ * after maximumNumberOfIterations.
+ */
+ public <M> Graph<K, VV, EV> runGatherSumApplyIteration(
+ GatherFunction<VV, EV, M> gatherFunction, SumFunction<VV, EV, M> sumFunction,
+ ApplyFunction<K, VV, M> applyFunction, int maximumNumberOfIterations,
+ GSAConfiguration parameters) {
+
+ GatherSumApplyIteration<K, VV, EV, M> iteration = GatherSumApplyIteration.withEdges(
+ edges, gatherFunction, sumFunction, applyFunction, maximumNumberOfIterations);
+
+ iteration.configure(parameters);
+
+ DataSet<Vertex<K, VV>> newVertices = vertices.runOperation(iteration);
+
+ return new Graph<K, VV, EV>(newVertices, this.edges, this.context);
+ }
+
+ /**
+ * @param algorithm the algorithm to run on the Graph
+ * @param <T> the return type
+ * @return the result of the graph algorithm
+ * @throws Exception
+ */
+ public <T> T run(GraphAlgorithm<K, VV, EV, T> algorithm) throws Exception {
+ return algorithm.run(this);
+ }
+
+ /**
+ * Compute an aggregate over the neighbors (edges and vertices) of each
+ * vertex. The function applied on the neighbors has access to the vertex
+ * value.
+ *
+ * @param neighborsFunction the function to apply to the neighborhood
+ * @param direction the edge direction (in-, out-, all-)
+ * @param <T> the output type
+ * @return a dataset of a T
+ * @throws IllegalArgumentException
+ */
+ public <T> DataSet<T> groupReduceOnNeighbors(NeighborsFunctionWithVertexValue<K, VV, EV, T> neighborsFunction,
+ EdgeDirection direction) throws IllegalArgumentException {
+ switch (direction) {
+ case IN:
+ // create <edge-sourceVertex> pairs
+ DataSet<Tuple2<Edge<K, EV>, Vertex<K, VV>>> edgesWithSources = edges
+ .join(this.vertices).where(0).equalTo(0);
+ return vertices.coGroup(edgesWithSources)
+ .where(0).equalTo("f0.f1")
+ .with(new ApplyNeighborCoGroupFunction<K, VV, EV, T>(neighborsFunction));
+ case OUT:
+ // create <edge-targetVertex> pairs
+ DataSet<Tuple2<Edge<K, EV>, Vertex<K, VV>>> edgesWithTargets = edges
+ .join(this.vertices).where(1).equalTo(0);
+ return vertices.coGroup(edgesWithTargets)
+ .where(0).equalTo("f0.f0")
+ .with(new ApplyNeighborCoGroupFunction<K, VV, EV, T>(neighborsFunction));
+ case ALL:
+ // create <edge-sourceOrTargetVertex> pairs
+ DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithNeighbors = edges
+ .flatMap(new EmitOneEdgeWithNeighborPerNode<K, EV>())
+ .join(this.vertices).where(1).equalTo(0)
+ .with(new ProjectEdgeWithNeighbor<K, VV, EV>());
+
+ return vertices.coGroup(edgesWithNeighbors)
+ .where(0).equalTo(0)
+ .with(new ApplyCoGroupFunctionOnAllNeighbors<K, VV, EV, T>(neighborsFunction));
+ default:
+ throw new IllegalArgumentException("Illegal edge direction");
+ }
+ }
+
+ /**
+ * Compute an aggregate over the neighbors (edges and vertices) of each
+ * vertex. The function applied on the neighbors has access to the vertex
+ * value.
+ *
+ * @param neighborsFunction the function to apply to the neighborhood
+ * @param direction the edge direction (in-, out-, all-)
+ * @param <T> the output type
+ * @param typeInfo the explicit return type.
+ * @return a dataset of a T
+ * @throws IllegalArgumentException
+ */
+ public <T> DataSet<T> groupReduceOnNeighbors(NeighborsFunctionWithVertexValue<K, VV, EV, T> neighborsFunction,
+ EdgeDirection direction, TypeInformation<T> typeInfo) throws IllegalArgumentException {
+ switch (direction) {
+ case IN:
+ // create <edge-sourceVertex> pairs
+ DataSet<Tuple2<Edge<K, EV>, Vertex<K, VV>>> edgesWithSources = edges
+ .join(this.vertices).where(0).equalTo(0);
+ return vertices.coGroup(edgesWithSources)
+ .where(0).equalTo("f0.f1")
+ .with(new ApplyNeighborCoGroupFunction<K, VV, EV, T>(neighborsFunction)).returns(typeInfo);
+ case OUT:
+ // create <edge-targetVertex> pairs
+ DataSet<Tuple2<Edge<K, EV>, Vertex<K, VV>>> edgesWithTargets = edges
+ .join(this.vertices).where(1).equalTo(0);
+ return vertices.coGroup(edgesWithTargets)
+ .where(0).equalTo("f0.f0")
+ .with(new ApplyNeighborCoGroupFunction<K, VV, EV, T>(neighborsFunction)).returns(typeInfo);
+ case ALL:
+ // create <edge-sourceOrTargetVertex> pairs
+ DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithNeighbors = edges
+ .flatMap(new EmitOneEdgeWithNeighborPerNode<K, EV>())
+ .join(this.vertices).where(1).equalTo(0)
+ .with(new ProjectEdgeWithNeighbor<K, VV, EV>());
+
+ return vertices.coGroup(edgesWithNeighbors)
+ .where(0).equalTo(0)
+ .with(new ApplyCoGroupFunctionOnAllNeighbors<K, VV, EV, T>(neighborsFunction)).returns(typeInfo);
+ default:
+ throw new IllegalArgumentException("Illegal edge direction");
+ }
+ }
+
+
+ /**
+ * Compute an aggregate over the neighbors (edges and vertices) of each
+ * vertex. The function applied on the neighbors only has access to the
+ * vertex id (not the vertex value).
+ *
+ * @param neighborsFunction the function to apply to the neighborhood
+ * @param direction the edge direction (in-, out-, all-)
+ * @param <T> the output type
+ * @return a dataset of a T
+ * @throws IllegalArgumentException
+ */
+ public <T> DataSet<T> groupReduceOnNeighbors(NeighborsFunction<K, VV, EV, T> neighborsFunction,
+ EdgeDirection direction) throws IllegalArgumentException {
+ switch (direction) {
+ case IN:
+ // create <edge-sourceVertex> pairs
+ DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithSources = edges
+ .join(this.vertices).where(0).equalTo(0)
+ .with(new ProjectVertexIdJoin<K, VV, EV>(1))
+ .withForwardedFieldsFirst("f1->f0");
+ return edgesWithSources.groupBy(0).reduceGroup(
+ new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction));
+ case OUT:
+ // create <edge-targetVertex> pairs
+ DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithTargets = edges
+ .join(this.vertices).where(1).equalTo(0)
+ .with(new ProjectVertexIdJoin<K, VV, EV>(0))
+ .withForwardedFieldsFirst("f0");
+ return edgesWithTargets.groupBy(0).reduceGroup(
+ new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction));
+ case ALL:
+ // create <edge-sourceOrTargetVertex> pairs
+ DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithNeighbors = edges
+ .flatMap(new EmitOneEdgeWithNeighborPerNode<K, EV>())
+ .join(this.vertices).where(1).equalTo(0)
+ .with(new ProjectEdgeWithNeighbor<K, VV, EV>());
+
+ return edgesWithNeighbors.groupBy(0).reduceGroup(
+ new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction));
+ default:
+ throw new IllegalArgumentException("Illegal edge direction");
+ }
+ }
+
+ /**
+ * Compute an aggregate over the neighbors (edges and vertices) of each
+ * vertex. The function applied on the neighbors only has access to the
+ * vertex id (not the vertex value).
+ *
+ * @param neighborsFunction the function to apply to the neighborhood
+ * @param direction the edge direction (in-, out-, all-)
+ * @param <T> the output type
+ * @param typeInfo the explicit return type.
+ * @return a dataset of a T
+ * @throws IllegalArgumentException
+ */
+ public <T> DataSet<T> groupReduceOnNeighbors(NeighborsFunction<K, VV, EV, T> neighborsFunction,
+ EdgeDirection direction, TypeInformation<T> typeInfo) throws IllegalArgumentException {
+ switch (direction) {
+ case IN:
+ // create <edge-sourceVertex> pairs
+ DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithSources = edges
+ .join(this.vertices).where(0).equalTo(0)
+ .with(new ProjectVertexIdJoin<K, VV, EV>(1))
+ .withForwardedFieldsFirst("f1->f0");
+ return edgesWithSources.groupBy(0).reduceGroup(
+ new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction)).returns(typeInfo);
+ case OUT:
+ // create <edge-targetVertex> pairs
+ DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithTargets = edges
+ .join(this.vertices).where(1).equalTo(0)
+ .with(new ProjectVertexIdJoin<K, VV, EV>(0))
+ .withForwardedFieldsFirst("f0");
+ return edgesWithTargets.groupBy(0).reduceGroup(
+ new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction)).returns(typeInfo);
+ case ALL:
+ // create <edge-sourceOrTargetVertex> pairs
+ DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithNeighbors = edges
+ .flatMap(new EmitOneEdgeWithNeighborPerNode<K, EV>())
+ .join(this.vertices).where(1).equalTo(0)
+ .with(new ProjectEdgeWithNeighbor<K, VV, EV>());
+
+ return edgesWithNeighbors.groupBy(0).reduceGroup(
+ new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction)).returns(typeInfo);
+ default:
+ throw new IllegalArgumentException("Illegal edge direction");
+ }
+ }
+
+ private static final class ApplyNeighborGroupReduceFunction<K, VV, EV, T>
+ implements GroupReduceFunction<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>, T>, ResultTypeQueryable<T> {
+
+ private NeighborsFunction<K, VV, EV, T> function;
+
+ public ApplyNeighborGroupReduceFunction(NeighborsFunction<K, VV, EV, T> fun) {
+ this.function = fun;
+ }
+
+ public void reduce(Iterable<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edges, Collector<T> out) throws Exception {
+ function.iterateNeighbors(edges, out);
+ }
+
+ @Override
+ public TypeInformation<T> getProducedType() {
+ return TypeExtractor.createTypeInfo(NeighborsFunction.class, function.getClass(), 3, null, null);
+ }
+ }
+
+ @ForwardedFieldsSecond("f1")
+ private static final class ProjectVertexWithNeighborValueJoin<K, VV, EV>
+ implements FlatJoinFunction<Edge<K, EV>, Vertex<K, VV>, Tuple2<K, VV>> {
+
+ private int fieldPosition;
+
+ public ProjectVertexWithNeighborValueJoin(int position) {
+ this.fieldPosition = position;
+ }
+
+ @SuppressWarnings("unchecked")
+ public void join(Edge<K, EV> edge, Vertex<K, VV> otherVertex,
+ Collector<Tuple2<K, VV>> out) {
+ out.collect(new Tuple2<K, VV>((K) edge.getField(fieldPosition), otherVertex.getValue()));
+ }
+ }
+
+ private static final class ProjectVertexIdJoin<K, VV, EV> implements FlatJoinFunction<
+ Edge<K, EV>, Vertex<K, VV>, Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> {
+
+ private int fieldPosition;
+
+ public ProjectVertexIdJoin(int position) {
+ this.fieldPosition = position;
+ }
+
+ @SuppressWarnings("unchecked")
+ public void join(Edge<K, EV> edge, Vertex<K, VV> otherVertex,
+ Collector<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> out) {
+ out.collect(new Tuple3<K, Edge<K, EV>, Vertex<K, VV>>((K) edge.getField(fieldPosition), edge, otherVertex));
+ }
+ }
+
+ @ForwardedFieldsFirst("f0")
+ @ForwardedFieldsSecond("f1")
+ private static final class ProjectNeighborValue<K, VV, EV> implements FlatJoinFunction<
+ Tuple3<K, K, Edge<K, EV>>, Vertex<K, VV>, Tuple2<K, VV>> {
+
+ public void join(Tuple3<K, K, Edge<K, EV>> keysWithEdge, Vertex<K, VV> neighbor,
+ Collector<Tuple2<K, VV>> out) {
+
+ out.collect(new Tuple2<K, VV>(keysWithEdge.f0, neighbor.getValue()));
+ }
+ }
+
+ @ForwardedFieldsFirst("f0; f2->f1")
+ @ForwardedFieldsSecond("*->f2")
+ private static final class ProjectEdgeWithNeighbor<K, VV, EV> implements FlatJoinFunction<
+ Tuple3<K, K, Edge<K, EV>>, Vertex<K, VV>, Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> {
+
+ public void join(Tuple3<K, K, Edge<K, EV>> keysWithEdge, Vertex<K, VV> neighbor,
+ Collector<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> out) {
+ out.collect(new Tuple3<K, Edge<K, EV>, Vertex<K, VV>>(keysWithEdge.f0, keysWithEdge.f2, neighbor));
+ }
+ }
+
+ private static final class ApplyNeighborCoGroupFunction<K, VV, EV, T> implements CoGroupFunction<
+ Vertex<K, VV>, Tuple2<Edge<K, EV>, Vertex<K, VV>>, T>, ResultTypeQueryable<T> {
+
+ private NeighborsFunctionWithVertexValue<K, VV, EV, T> function;
+
+ public ApplyNeighborCoGroupFunction(NeighborsFunctionWithVertexValue<K, VV, EV, T> fun) {
+ this.function = fun;
+ }
+
+ public void coGroup(Iterable<Vertex<K, VV>> vertex, Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>> neighbors,
+ Collector<T> out) throws Exception {
+ function.iterateNeighbors(vertex.iterator().next(), neighbors, out);
+ }
+
+ @Override
+ public TypeInformation<T> getProducedType() {
+ return TypeExtractor.createTypeInfo(NeighborsFunctionWithVertexValue.class, function.getClass(), 3, null, null);
+ }
+ }
+
+ private static final class ApplyCoGroupFunctionOnAllNeighbors<K, VV, EV, T>
+ implements CoGroupFunction<Vertex<K, VV>, Tuple3<K, Edge<K, EV>, Vertex<K, VV>>, T>, ResultTypeQueryable<T> {
+
+ private NeighborsFunctionWithVertexValue<K, VV, EV, T> function;
+
+ public ApplyCoGroupFunctionOnAllNeighbors(NeighborsFunctionWithVertexValue<K, VV, EV, T> fun) {
+ this.function = fun;
+ }
+
+ public void coGroup(Iterable<Vertex<K, VV>> vertex,
+ final Iterable<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> keysWithNeighbors,
+ Collector<T> out) throws Exception {
+
+ final Iterator<Tuple2<Edge<K, EV>, Vertex<K, VV>>> neighborsIterator = new Iterator<Tuple2<Edge<K, EV>, Vertex<K, VV>>>() {
+
+ final Iterator<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> keysWithEdgesIterator = keysWithNeighbors.iterator();
+
+ @Override
+ public boolean hasNext() {
+ return keysWithEdgesIterator.hasNext();
+ }
+
+ @Override
+ public Tuple2<Edge<K, EV>, Vertex<K, VV>> next() {
+ Tuple3<K, Edge<K, EV>, Vertex<K, VV>> next = keysWithEdgesIterator.next();
+ return new Tuple2<Edge<K, EV>, Vertex<K, VV>>(next.f1, next.f2);
+ }
+
+ @Override
+ public void remove() {
+ keysWithEdgesIterator.remove();
+ }
+ };
+
+ Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>> neighborsIterable = new Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>>() {
+ public Iterator<Tuple2<Edge<K, EV>, Vertex<K, VV>>> iterator() {
+ return neighborsIterator;
+ }
+ };
+
+ Iterator<Vertex<K, VV>> vertexIterator = vertex.iterator();
+
+ if (vertexIterator.hasNext()) {
+ function.iterateNeighbors(vertexIterator.next(), neighborsIterable, out);
+ } else {
+ throw new NoSuchElementException("The edge src/trg id could not be found within the vertexIds");
+ }
+ }
+
+ @Override
+ public TypeInformation<T> getProducedType() {
+ return TypeExtractor.createTypeInfo(NeighborsFunctionWithVertexValue.class, function.getClass(), 3, null, null);
+ }
+ }
+
+ /**
+ * Compute an aggregate over the neighbor values of each
+ * vertex.
+ *
+ * @param reduceNeighborsFunction the function to apply to the neighborhood
+ * @param direction the edge direction (in-, out-, all-)
+ * @return a Dataset containing one value per vertex (vertex id, aggregate vertex value)
+ * @throws IllegalArgumentException
+ */
+ public DataSet<Tuple2<K, VV>> reduceOnNeighbors(ReduceNeighborsFunction<VV> reduceNeighborsFunction,
+ EdgeDirection direction) throws IllegalArgumentException {
+ switch (direction) {
+ case IN:
+ // create <vertex-source value> pairs
+ final DataSet<Tuple2<K, VV>> verticesWithSourceNeighborValues = edges
+ .join(this.vertices).where(0).equalTo(0)
+ .with(new ProjectVertexWithNeighborValueJoin<K, VV, EV>(1))
+ .withForwardedFieldsFirst("f1->f0");
+ return verticesWithSourceNeighborValues.groupBy(0).reduce(new ApplyNeighborReduceFunction<K, VV>(
+ reduceNeighborsFunction));
+ case OUT:
+ // create <vertex-target value> pairs
+ DataSet<Tuple2<K, VV>> verticesWithTargetNeighborValues = edges
+ .join(this.vertices).where(1).equalTo(0)
+ .with(new ProjectVertexWithNeighborValueJoin<K, VV, EV>(0))
+ .withForwardedFieldsFirst("f0");
+ return verticesWithTargetNeighborValues.groupBy(0).reduce(new ApplyNeighborReduceFunction<K, VV>(
+ reduceNeighborsFunction));
+ case ALL:
+ // create <vertex-neighbor value> pairs
+ DataSet<Tuple2<K, VV>> verticesWithNeighborValues = edges
+ .flatMap(new EmitOneEdgeWithNeighborPerNode<K, EV>())
+ .join(this.vertices).where(1).equalTo(0)
+ .with(new ProjectNeighborValue<K, VV, EV>());
+
+ return verticesWithNeighborValues.groupBy(0).reduce(new ApplyNeighborReduceFunction<K, VV>(
+ reduceNeighborsFunction));
+ default:
+ throw new IllegalArgumentException("Illegal edge direction");
+ }
+ }
+
+ @ForwardedFields("f0")
+ private static final class ApplyNeighborReduceFunction<K, VV> implements ReduceFunction<Tuple2<K, VV>> {
+
+ private ReduceNeighborsFunction<VV> function;
+
+ public ApplyNeighborReduceFunction(ReduceNeighborsFunction<VV> fun) {
+ this.function = fun;
+ }
+
+ @Override
+ public Tuple2<K, VV> reduce(Tuple2<K, VV> first, Tuple2<K, VV> second) throws Exception {
+ first.setField(function.reduceNeighbors(first.f1, second.f1), 1);
+ return first;
+ }
+ }
+
+ /**
+ * Compute an aggregate over the edge values of each vertex.
+ *
+ * @param reduceEdgesFunction
+ * the function to apply to the neighborhood
+ * @param direction
+ * the edge direction (in-, out-, all-)
+ * @return a Dataset containing one value per vertex(vertex key, aggregate edge value)
+ * @throws IllegalArgumentException
+ */
+ public DataSet<Tuple2<K, EV>> reduceOnEdges(ReduceEdgesFunction<EV> reduceEdgesFunction,
+ EdgeDirection direction) throws IllegalArgumentException {
+
+ switch (direction) {
+ case IN:
+ return edges.map(new ProjectVertexWithEdgeValueMap<K, EV>(1))
+ .withForwardedFields("f1->f0")
+ .groupBy(0).reduce(new ApplyReduceFunction<K, EV>(reduceEdgesFunction));
+ case OUT:
+ return edges.map(new ProjectVertexWithEdgeValueMap<K, EV>(0))
+ .withForwardedFields("f0->f0")
+ .groupBy(0).reduce(new ApplyReduceFunction<K, EV>(reduceEdgesFunction));
+ case ALL:
+ return edges.flatMap(new EmitOneVertexWithEdgeValuePerNode<K, EV>())
+ .withForwardedFields("f2->f1")
+ .groupBy(0).reduce(new ApplyReduceFunction<K, EV>(reduceEdgesFunction));
+ default:
+ throw new IllegalArgumentException("Illegal edge direction");
+ }
+ }
+
+ @ForwardedFields("f0")
+ private static final class ApplyReduceFunction<K, EV> implements ReduceFunction<Tuple2<K, EV>> {
+
+ private ReduceEdgesFunction<EV> function;
+
+ public ApplyReduceFunction(ReduceEdgesFunction<EV> fun) {
+ this.function = fun;
+ }
+
+ @Override
+ public Tuple2<K, EV> reduce(Tuple2<K, EV> first, Tuple2<K, EV> second) throws Exception {
+ first.setField(function.reduceEdges(first.f1, second.f1), 1);
+ return first;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
new file mode 100644
index 0000000..08cf011
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+/**
+ * @param <K> key type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ * @param <T> the return type
+ */
+public interface GraphAlgorithm<K, VV, EV, T> {
+
+ public T run(Graph<K, VV, EV> input) throws Exception;
+}