You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/10/09 18:05:48 UTC
[10/24] flink git commit: [FLINK-2833] [gelly] create a
flink-libraries module and move gelly there
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
deleted file mode 100755
index b24f749..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ /dev/null
@@ -1,1948 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-import java.util.List;
-import java.util.Arrays;
-
-import org.apache.flink.api.common.functions.CoGroupFunction;
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.graph.gsa.ApplyFunction;
-import org.apache.flink.graph.gsa.GSAConfiguration;
-import org.apache.flink.graph.gsa.GatherFunction;
-import org.apache.flink.graph.gsa.GatherSumApplyIteration;
-import org.apache.flink.graph.gsa.SumFunction;
-import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexCentricConfiguration;
-import org.apache.flink.graph.spargel.VertexCentricIteration;
-import org.apache.flink.graph.spargel.VertexUpdateFunction;
-import org.apache.flink.graph.utils.EdgeToTuple3Map;
-import org.apache.flink.graph.utils.Tuple2ToVertexMap;
-import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
-import org.apache.flink.graph.utils.VertexToTuple2Map;
-import org.apache.flink.graph.validation.GraphValidator;
-import org.apache.flink.util.Collector;
-import org.apache.flink.types.NullValue;
-
-/**
- * Represents a Graph consisting of {@link Edge edges} and {@link Vertex
- * vertices}.
- *
- *
- * @see org.apache.flink.graph.Edge
- * @see org.apache.flink.graph.Vertex
- *
- * @param <K> the key type for edge and vertex identifiers
- * @param <VV> the value type for vertices
- * @param <EV> the value type for edges
- */
-@SuppressWarnings("serial")
-public class Graph<K, VV, EV> {
-
- private final ExecutionEnvironment context;
- private final DataSet<Vertex<K, VV>> vertices;
- private final DataSet<Edge<K, EV>> edges;
-
- /**
- * Creates a graph from two DataSets: vertices and edges
- *
- * @param vertices a DataSet of vertices.
- * @param edges a DataSet of edges.
- * @param context the flink execution environment.
- */
- private Graph(DataSet<Vertex<K, VV>> vertices, DataSet<Edge<K, EV>> edges, ExecutionEnvironment context) {
- this.vertices = vertices;
- this.edges = edges;
- this.context = context;
- }
-
- /**
- * Creates a graph from a Collection of vertices and a Collection of edges.
- *
- * @param vertices a Collection of vertices.
- * @param edges a Collection of edges.
- * @param context the flink execution environment.
- * @return the newly created graph.
- */
- public static <K, VV, EV> Graph<K, VV, EV> fromCollection(Collection<Vertex<K, VV>> vertices,
- Collection<Edge<K, EV>> edges, ExecutionEnvironment context) {
-
- return fromDataSet(context.fromCollection(vertices),
- context.fromCollection(edges), context);
- }
-
- /**
- * Creates a graph from a Collection of edges, vertices are induced from the
- * edges. Vertices are created automatically and their values are set to
- * NullValue.
- *
- * @param edges a Collection of vertices.
- * @param context the flink execution environment.
- * @return the newly created graph.
- */
- public static <K, EV> Graph<K, NullValue, EV> fromCollection(Collection<Edge<K, EV>> edges,
- ExecutionEnvironment context) {
-
- return fromDataSet(context.fromCollection(edges), context);
- }
-
- /**
- * Creates a graph from a Collection of edges, vertices are induced from the
- * edges and vertex values are calculated by a mapper function. Vertices are
- * created automatically and their values are set by applying the provided
- * map function to the vertex ids.
- *
- * @param edges a Collection of edges.
- * @param mapper the mapper function.
- * @param context the flink execution environment.
- * @return the newly created graph.
- */
- public static <K, VV, EV> Graph<K, VV, EV> fromCollection(Collection<Edge<K, EV>> edges,
- final MapFunction<K, VV> mapper, ExecutionEnvironment context) {
-
- return fromDataSet(context.fromCollection(edges), mapper, context);
- }
-
- /**
- * Creates a graph from a DataSet of vertices and a DataSet of edges.
- *
- * @param vertices a DataSet of vertices.
- * @param edges a DataSet of edges.
- * @param context the flink execution environment.
- * @return the newly created graph.
- */
- public static <K, VV, EV> Graph<K, VV, EV> fromDataSet(DataSet<Vertex<K, VV>> vertices,
- DataSet<Edge<K, EV>> edges, ExecutionEnvironment context) {
-
- return new Graph<K, VV, EV>(vertices, edges, context);
- }
-
- /**
- * Creates a graph from a DataSet of edges, vertices are induced from the
- * edges. Vertices are created automatically and their values are set to
- * NullValue.
- *
- * @param edges a DataSet of edges.
- * @param context the flink execution environment.
- * @return the newly created graph.
- */
- public static <K, EV> Graph<K, NullValue, EV> fromDataSet(
- DataSet<Edge<K, EV>> edges, ExecutionEnvironment context) {
-
- DataSet<Vertex<K, NullValue>> vertices = edges.flatMap(new EmitSrcAndTarget<K, EV>()).distinct();
-
- return new Graph<K, NullValue, EV>(vertices, edges, context);
- }
-
- private static final class EmitSrcAndTarget<K, EV> implements FlatMapFunction<
- Edge<K, EV>, Vertex<K, NullValue>> {
-
- public void flatMap(Edge<K, EV> edge, Collector<Vertex<K, NullValue>> out) {
- out.collect(new Vertex<K, NullValue>(edge.f0, NullValue.getInstance()));
- out.collect(new Vertex<K, NullValue>(edge.f1, NullValue.getInstance()));
- }
- }
-
- /**
- * Creates a graph from a DataSet of edges, vertices are induced from the
- * edges and vertex values are calculated by a mapper function. Vertices are
- * created automatically and their values are set by applying the provided
- * map function to the vertex ids.
- *
- * @param edges a DataSet of edges.
- * @param mapper the mapper function.
- * @param context the flink execution environment.
- * @return the newly created graph.
- */
- public static <K, VV, EV> Graph<K, VV, EV> fromDataSet(DataSet<Edge<K, EV>> edges,
- final MapFunction<K, VV> mapper, ExecutionEnvironment context) {
-
- TypeInformation<K> keyType = ((TupleTypeInfo<?>) edges.getType()).getTypeAt(0);
-
- TypeInformation<VV> valueType = TypeExtractor.createTypeInfo(
- MapFunction.class, mapper.getClass(), 1, null, null);
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
- TypeInformation<Vertex<K, VV>> returnType = (TypeInformation<Vertex<K, VV>>) new TupleTypeInfo(
- Vertex.class, keyType, valueType);
-
- DataSet<Vertex<K, VV>> vertices = edges
- .flatMap(new EmitSrcAndTargetAsTuple1<K, EV>()).distinct()
- .map(new MapFunction<Tuple1<K>, Vertex<K, VV>>() {
- public Vertex<K, VV> map(Tuple1<K> value) throws Exception {
- return new Vertex<K, VV>(value.f0, mapper.map(value.f0));
- }
- }).returns(returnType).withForwardedFields("f0");
-
- return new Graph<K, VV, EV>(vertices, edges, context);
- }
-
- private static final class EmitSrcAndTargetAsTuple1<K, EV> implements FlatMapFunction<
- Edge<K, EV>, Tuple1<K>> {
-
- public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) {
- out.collect(new Tuple1<K>(edge.f0));
- out.collect(new Tuple1<K>(edge.f1));
- }
- }
-
- /**
- * Creates a graph from a DataSet of Tuple objects for vertices and edges.
- *
- * Vertices with value are created from Tuple2, Edges with value are created
- * from Tuple3.
- *
- * @param vertices a DataSet of Tuple2.
- * @param edges a DataSet of Tuple3.
- * @param context the flink execution environment.
- * @return the newly created graph.
- */
- public static <K, VV, EV> Graph<K, VV, EV> fromTupleDataSet(DataSet<Tuple2<K, VV>> vertices,
- DataSet<Tuple3<K, K, EV>> edges, ExecutionEnvironment context) {
-
- DataSet<Vertex<K, VV>> vertexDataSet = vertices.map(new Tuple2ToVertexMap<K, VV>());
- DataSet<Edge<K, EV>> edgeDataSet = edges.map(new Tuple3ToEdgeMap<K, EV>());
- return fromDataSet(vertexDataSet, edgeDataSet, context);
- }
-
- /**
- * Creates a graph from a DataSet of Tuple objects for edges, vertices are
- * induced from the edges.
- *
- * Edges with value are created from Tuple3. Vertices are created
- * automatically and their values are set to NullValue.
- *
- * @param edges a DataSet of Tuple3.
- * @param context the flink execution environment.
- * @return the newly created graph.
- */
- public static <K, EV> Graph<K, NullValue, EV> fromTupleDataSet(DataSet<Tuple3<K, K, EV>> edges,
- ExecutionEnvironment context) {
-
- DataSet<Edge<K, EV>> edgeDataSet = edges.map(new Tuple3ToEdgeMap<K, EV>());
- return fromDataSet(edgeDataSet, context);
- }
-
- /**
- * Creates a graph from a DataSet of Tuple objects for edges, vertices are
- * induced from the edges and vertex values are calculated by a mapper
- * function. Edges with value are created from Tuple3. Vertices are created
- * automatically and their values are set by applying the provided map
- * function to the vertex ids.
- *
- * @param edges a DataSet of Tuple3.
- * @param mapper the mapper function.
- * @param context the flink execution environment.
- * @return the newly created graph.
- */
- public static <K, VV, EV> Graph<K, VV, EV> fromTupleDataSet(DataSet<Tuple3<K, K, EV>> edges,
- final MapFunction<K, VV> mapper, ExecutionEnvironment context) {
-
- DataSet<Edge<K, EV>> edgeDataSet = edges.map(new Tuple3ToEdgeMap<K, EV>());
- return fromDataSet(edgeDataSet, mapper, context);
- }
-
- /**
- * Creates a Graph from a CSV file of vertices and a CSV file of edges.
- *
- * @param verticesPath path to a CSV file with the Vertex data.
- * @param edgesPath path to a CSV file with the Edge data
- * @param context the Flink execution environment.
- * @return An instance of {@link org.apache.flink.graph.GraphCsvReader},
- * on which calling methods to specify types of the Vertex ID, Vertex value and Edge value returns a Graph.
- *
- * @see {@link org.apache.flink.graph.GraphCsvReader#types(Class, Class, Class)},
- * {@link org.apache.flink.graph.GraphCsvReader#vertexTypes(Class, Class)},
- * {@link org.apache.flink.graph.GraphCsvReader#edgeTypes(Class, Class)} and
- * {@link org.apache.flink.graph.GraphCsvReader#keyType(Class)}.
- */
- public static GraphCsvReader fromCsvReader(String verticesPath, String edgesPath, ExecutionEnvironment context) {
- return new GraphCsvReader(verticesPath, edgesPath, context);
- }
-
- /**
- * Creates a graph from a CSV file of edges. Vertices will be created automatically.
- *
- * @param edgesPath a path to a CSV file with the Edges data
- * @param context the execution environment.
- * @return An instance of {@link org.apache.flink.graph.GraphCsvReader},
- * on which calling methods to specify types of the Vertex ID, Vertex value and Edge value returns a Graph.
- *
- * @see {@link org.apache.flink.graph.GraphCsvReader#types(Class, Class, Class)},
- * {@link org.apache.flink.graph.GraphCsvReader#vertexTypes(Class, Class)},
- * {@link org.apache.flink.graph.GraphCsvReader#edgeTypes(Class, Class)} and
- * {@link org.apache.flink.graph.GraphCsvReader#keyType(Class)}.
- */
- public static GraphCsvReader fromCsvReader(String edgesPath, ExecutionEnvironment context) {
- return new GraphCsvReader(edgesPath, context);
- }
-
- /**
- * Creates a graph from a CSV file of edges. Vertices will be created automatically and
- * Vertex values are set by the provided mapper.
- *
- * @param edgesPath a path to a CSV file with the Edge data
- * @param mapper the mapper function.
- * @param context the execution environment.
- * @return An instance of {@link org.apache.flink.graph.GraphCsvReader},
- * on which calling methods to specify types of the Vertex ID, Vertex Value and Edge value returns a Graph.
- *
- * @see {@link org.apache.flink.graph.GraphCsvReader#types(Class, Class, Class)},
- * {@link org.apache.flink.graph.GraphCsvReader#vertexTypes(Class, Class)},
- * {@link org.apache.flink.graph.GraphCsvReader#edgeTypes(Class, Class)} and
- * {@link org.apache.flink.graph.GraphCsvReader#keyType(Class)}.
- */
- public static <K, VV> GraphCsvReader fromCsvReader(String edgesPath,
- final MapFunction<K, VV> mapper, ExecutionEnvironment context) {
- return new GraphCsvReader(edgesPath, mapper, context);
- }
-
- /**
- * @return the flink execution environment.
- */
- public ExecutionEnvironment getContext() {
- return this.context;
- }
-
- /**
- * Function that checks whether a Graph is a valid Graph,
- * as defined by the given {@link GraphValidator}.
- *
- * @return true if the Graph is valid.
- */
- public Boolean validate(GraphValidator<K, VV, EV> validator) throws Exception {
- return validator.validate(this);
- }
-
- /**
- * @return the vertex DataSet.
- */
- public DataSet<Vertex<K, VV>> getVertices() {
- return vertices;
- }
-
- /**
- * @return the edge DataSet.
- */
- public DataSet<Edge<K, EV>> getEdges() {
- return edges;
- }
-
- /**
- * @return the vertex DataSet as Tuple2.
- */
- public DataSet<Tuple2<K, VV>> getVerticesAsTuple2() {
- return vertices.map(new VertexToTuple2Map<K, VV>());
- }
-
- /**
- * @return the edge DataSet as Tuple3.
- */
- public DataSet<Tuple3<K, K, EV>> getEdgesAsTuple3() {
- return edges.map(new EdgeToTuple3Map<K, EV>());
- }
-
- /**
- * This method allows access to the graph's edge values along with its source and target vertex values.
- *
- * @return a triplet DataSet consisting of (srcVertexId, trgVertexId, srcVertexValue, trgVertexValue, edgeValue)
- */
- public DataSet<Triplet<K, VV, EV>> getTriplets() {
- return this.getVertices().join(this.getEdges()).where(0).equalTo(0)
- .with(new ProjectEdgeWithSrcValue<K, VV, EV>())
- .join(this.getVertices()).where(1).equalTo(0)
- .with(new ProjectEdgeWithVertexValues<K, VV, EV>());
- }
-
- @ForwardedFieldsFirst("f1->f2")
- @ForwardedFieldsSecond("f0; f1; f2->f3")
- private static final class ProjectEdgeWithSrcValue<K, VV, EV> implements
- FlatJoinFunction<Vertex<K, VV>, Edge<K, EV>, Tuple4<K, K, VV, EV>> {
-
- @Override
- public void join(Vertex<K, VV> vertex, Edge<K, EV> edge, Collector<Tuple4<K, K, VV, EV>> collector)
- throws Exception {
-
- collector.collect(new Tuple4<K, K, VV, EV>(edge.getSource(), edge.getTarget(), vertex.getValue(),
- edge.getValue()));
- }
- }
-
- @ForwardedFieldsFirst("f0; f1; f2; f3->f4")
- @ForwardedFieldsSecond("f1->f3")
- private static final class ProjectEdgeWithVertexValues<K, VV, EV> implements
- FlatJoinFunction<Tuple4<K, K, VV, EV>, Vertex<K, VV>, Triplet<K, VV, EV>> {
-
- @Override
- public void join(Tuple4<K, K, VV, EV> tripletWithSrcValSet,
- Vertex<K, VV> vertex, Collector<Triplet<K, VV, EV>> collector) throws Exception {
-
- collector.collect(new Triplet<K, VV, EV>(tripletWithSrcValSet.f0, tripletWithSrcValSet.f1,
- tripletWithSrcValSet.f2, vertex.getValue(), tripletWithSrcValSet.f3));
- }
- }
-
- /**
- * Apply a function to the attribute of each vertex in the graph.
- *
- * @param mapper the map function to apply.
- * @return a new graph
- */
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public <NV> Graph<K, NV, EV> mapVertices(final MapFunction<Vertex<K, VV>, NV> mapper) {
-
- TypeInformation<K> keyType = ((TupleTypeInfo<?>) vertices.getType()).getTypeAt(0);
-
- TypeInformation<NV> valueType = TypeExtractor.createTypeInfo(MapFunction.class, mapper.getClass(), 1, null, null);
-
- TypeInformation<Vertex<K, NV>> returnType = (TypeInformation<Vertex<K, NV>>) new TupleTypeInfo(
- Vertex.class, keyType, valueType);
-
- return mapVertices(mapper, returnType);
- }
-
- /**
- * Apply a function to the attribute of each vertex in the graph.
- *
- * @param mapper the map function to apply.
- * @param returnType the explicit return type.
- * @return a new graph
- */
- public <NV> Graph<K, NV, EV> mapVertices(final MapFunction<Vertex<K, VV>, NV> mapper, TypeInformation<Vertex<K,NV>> returnType) {
- DataSet<Vertex<K, NV>> mappedVertices = vertices.map(
- new MapFunction<Vertex<K, VV>, Vertex<K, NV>>() {
- public Vertex<K, NV> map(Vertex<K, VV> value) throws Exception {
- return new Vertex<K, NV>(value.f0, mapper.map(value));
- }
- })
- .returns(returnType)
- .withForwardedFields("f0");
-
- return new Graph<K, NV, EV>(mappedVertices, this.edges, this.context);
- }
-
- /**
- * Apply a function to the attribute of each edge in the graph.
- *
- * @param mapper the map function to apply.
- * @return a new graph
- */
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public <NV> Graph<K, VV, NV> mapEdges(final MapFunction<Edge<K, EV>, NV> mapper) {
-
- TypeInformation<K> keyType = ((TupleTypeInfo<?>) edges.getType()).getTypeAt(0);
-
- TypeInformation<NV> valueType = TypeExtractor.createTypeInfo(MapFunction.class, mapper.getClass(), 1, null, null);
-
- TypeInformation<Edge<K, NV>> returnType = (TypeInformation<Edge<K, NV>>) new TupleTypeInfo(
- Edge.class, keyType, keyType, valueType);
-
- return mapEdges(mapper, returnType);
- }
-
- /**
- * Apply a function to the attribute of each edge in the graph.
- *
- * @param mapper the map function to apply.
- * @param returnType the explicit return type.
- * @return a new graph
- */
- public <NV> Graph<K, VV, NV> mapEdges(final MapFunction<Edge<K, EV>, NV> mapper, TypeInformation<Edge<K,NV>> returnType) {
- DataSet<Edge<K, NV>> mappedEdges = edges.map(
- new MapFunction<Edge<K, EV>, Edge<K, NV>>() {
- public Edge<K, NV> map(Edge<K, EV> value) throws Exception {
- return new Edge<K, NV>(value.f0, value.f1, mapper
- .map(value));
- }
- })
- .returns(returnType)
- .withForwardedFields("f0; f1");
-
- return new Graph<K, VV, NV>(this.vertices, mappedEdges, this.context);
- }
-
- /**
- * Joins the vertex DataSet of this graph with an input DataSet and applies
- * a UDF on the resulted values.
- *
- * @param inputDataSet the DataSet to join with.
- * @param mapper the UDF map function to apply.
- * @return a new graph where the vertex values have been updated.
- */
- public <T> Graph<K, VV, EV> joinWithVertices(DataSet<Tuple2<K, T>> inputDataSet,
- final MapFunction<Tuple2<VV, T>, VV> mapper) {
-
- DataSet<Vertex<K, VV>> resultedVertices = this.getVertices()
- .coGroup(inputDataSet).where(0).equalTo(0)
- .with(new ApplyCoGroupToVertexValues<K, VV, T>(mapper));
- return new Graph<K, VV, EV>(resultedVertices, this.edges, this.context);
- }
-
- private static final class ApplyCoGroupToVertexValues<K, VV, T>
- implements CoGroupFunction<Vertex<K, VV>, Tuple2<K, T>, Vertex<K, VV>> {
-
- private MapFunction<Tuple2<VV, T>, VV> mapper;
-
- public ApplyCoGroupToVertexValues(MapFunction<Tuple2<VV, T>, VV> mapper) {
- this.mapper = mapper;
- }
-
- @Override
- public void coGroup(Iterable<Vertex<K, VV>> vertices,
- Iterable<Tuple2<K, T>> input, Collector<Vertex<K, VV>> collector) throws Exception {
-
- final Iterator<Vertex<K, VV>> vertexIterator = vertices.iterator();
- final Iterator<Tuple2<K, T>> inputIterator = input.iterator();
-
- if (vertexIterator.hasNext()) {
- if (inputIterator.hasNext()) {
- final Tuple2<K, T> inputNext = inputIterator.next();
-
- collector.collect(new Vertex<K, VV>(inputNext.f0, mapper
- .map(new Tuple2<VV, T>(vertexIterator.next().f1,
- inputNext.f1))));
- } else {
- collector.collect(vertexIterator.next());
- }
-
- }
- }
- }
-
- /**
- * Joins the edge DataSet with an input DataSet on a composite key of both
- * source and target and applies a UDF on the resulted values.
- *
- * @param inputDataSet the DataSet to join with.
- * @param mapper the UDF map function to apply.
- * @param <T> the return type
- * @return a new graph where the edge values have been updated.
- */
- public <T> Graph<K, VV, EV> joinWithEdges(DataSet<Tuple3<K, K, T>> inputDataSet,
- final MapFunction<Tuple2<EV, T>, EV> mapper) {
-
- DataSet<Edge<K, EV>> resultedEdges = this.getEdges()
- .coGroup(inputDataSet).where(0, 1).equalTo(0, 1)
- .with(new ApplyCoGroupToEdgeValues<K, EV, T>(mapper));
- return new Graph<K, VV, EV>(this.vertices, resultedEdges, this.context);
- }
-
- private static final class ApplyCoGroupToEdgeValues<K, EV, T>
- implements CoGroupFunction<Edge<K, EV>, Tuple3<K, K, T>, Edge<K, EV>> {
-
- private MapFunction<Tuple2<EV, T>, EV> mapper;
-
- public ApplyCoGroupToEdgeValues(MapFunction<Tuple2<EV, T>, EV> mapper) {
- this.mapper = mapper;
- }
-
- @Override
- public void coGroup(Iterable<Edge<K, EV>> edges, Iterable<Tuple3<K, K, T>> input,
- Collector<Edge<K, EV>> collector) throws Exception {
-
- final Iterator<Edge<K, EV>> edgesIterator = edges.iterator();
- final Iterator<Tuple3<K, K, T>> inputIterator = input.iterator();
-
- if (edgesIterator.hasNext()) {
- if (inputIterator.hasNext()) {
- final Tuple3<K, K, T> inputNext = inputIterator.next();
-
- collector.collect(new Edge<K, EV>(inputNext.f0,
- inputNext.f1, mapper.map(new Tuple2<EV, T>(
- edgesIterator.next().f2, inputNext.f2))));
- } else {
- collector.collect(edgesIterator.next());
- }
- }
- }
- }
-
- /**
- * Joins the edge DataSet with an input DataSet on the source key of the
- * edges and the first attribute of the input DataSet and applies a UDF on
- * the resulted values. In case the inputDataSet contains the same key more
- * than once, only the first value will be considered.
- *
- * @param inputDataSet the DataSet to join with.
- * @param mapper the UDF map function to apply.
- * @param <T> the return type
- * @return a new graph where the edge values have been updated.
- */
- public <T> Graph<K, VV, EV> joinWithEdgesOnSource(DataSet<Tuple2<K, T>> inputDataSet,
- final MapFunction<Tuple2<EV, T>, EV> mapper) {
-
- DataSet<Edge<K, EV>> resultedEdges = this.getEdges()
- .coGroup(inputDataSet).where(0).equalTo(0)
- .with(new ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>(mapper));
-
- return new Graph<K, VV, EV>(this.vertices, resultedEdges, this.context);
- }
-
- private static final class ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>
- implements CoGroupFunction<Edge<K, EV>, Tuple2<K, T>, Edge<K, EV>> {
-
- private MapFunction<Tuple2<EV, T>, EV> mapper;
-
- public ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget(
- MapFunction<Tuple2<EV, T>, EV> mapper) {
- this.mapper = mapper;
- }
-
- @Override
- public void coGroup(Iterable<Edge<K, EV>> edges,
- Iterable<Tuple2<K, T>> input, Collector<Edge<K, EV>> collector) throws Exception {
-
- final Iterator<Edge<K, EV>> edgesIterator = edges.iterator();
- final Iterator<Tuple2<K, T>> inputIterator = input.iterator();
-
- if (inputIterator.hasNext()) {
- final Tuple2<K, T> inputNext = inputIterator.next();
-
- while (edgesIterator.hasNext()) {
- Edge<K, EV> edgesNext = edgesIterator.next();
-
- collector.collect(new Edge<K, EV>(edgesNext.f0,
- edgesNext.f1, mapper.map(new Tuple2<EV, T>(
- edgesNext.f2, inputNext.f1))));
- }
-
- } else {
- while (edgesIterator.hasNext()) {
- collector.collect(edgesIterator.next());
- }
- }
- }
- }
-
- /**
- * Joins the edge DataSet with an input DataSet on the target key of the
- * edges and the first attribute of the input DataSet and applies a UDF on
- * the resulted values. Should the inputDataSet contain the same key more
- * than once, only the first value will be considered.
- *
- * @param inputDataSet the DataSet to join with.
- * @param mapper the UDF map function to apply.
- * @param <T> the return type
- * @return a new graph where the edge values have been updated.
- */
- public <T> Graph<K, VV, EV> joinWithEdgesOnTarget(DataSet<Tuple2<K, T>> inputDataSet,
- final MapFunction<Tuple2<EV, T>, EV> mapper) {
-
- DataSet<Edge<K, EV>> resultedEdges = this.getEdges()
- .coGroup(inputDataSet).where(1).equalTo(0)
- .with(new ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>(mapper));
-
- return new Graph<K, VV, EV>(this.vertices, resultedEdges, this.context);
- }
-
- /**
- * Apply filtering functions to the graph and return a sub-graph that
- * satisfies the predicates for both vertices and edges.
- *
- * @param vertexFilter the filter function for vertices.
- * @param edgeFilter the filter function for edges.
- * @return the resulting sub-graph.
- */
- public Graph<K, VV, EV> subgraph(FilterFunction<Vertex<K, VV>> vertexFilter, FilterFunction<Edge<K, EV>> edgeFilter) {
-
- DataSet<Vertex<K, VV>> filteredVertices = this.vertices.filter(vertexFilter);
-
- DataSet<Edge<K, EV>> remainingEdges = this.edges.join(filteredVertices)
- .where(0).equalTo(0).with(new ProjectEdge<K, VV, EV>())
- .join(filteredVertices).where(1).equalTo(0)
- .with(new ProjectEdge<K, VV, EV>());
-
- DataSet<Edge<K, EV>> filteredEdges = remainingEdges.filter(edgeFilter);
-
- return new Graph<K, VV, EV>(filteredVertices, filteredEdges,
- this.context);
- }
-
- /**
- * Apply a filtering function to the graph and return a sub-graph that
- * satisfies the predicates only for the vertices.
- *
- * @param vertexFilter the filter function for vertices.
- * @return the resulting sub-graph.
- */
- public Graph<K, VV, EV> filterOnVertices(FilterFunction<Vertex<K, VV>> vertexFilter) {
-
- DataSet<Vertex<K, VV>> filteredVertices = this.vertices.filter(vertexFilter);
-
- DataSet<Edge<K, EV>> remainingEdges = this.edges.join(filteredVertices)
- .where(0).equalTo(0).with(new ProjectEdge<K, VV, EV>())
- .join(filteredVertices).where(1).equalTo(0)
- .with(new ProjectEdge<K, VV, EV>());
-
- return new Graph<K, VV, EV>(filteredVertices, remainingEdges, this.context);
- }
-
- /**
- * Apply a filtering function to the graph and return a sub-graph that
- * satisfies the predicates only for the edges.
- *
- * @param edgeFilter the filter function for edges.
- * @return the resulting sub-graph.
- */
- public Graph<K, VV, EV> filterOnEdges(FilterFunction<Edge<K, EV>> edgeFilter) {
- DataSet<Edge<K, EV>> filteredEdges = this.edges.filter(edgeFilter);
-
- return new Graph<K, VV, EV>(this.vertices, filteredEdges, this.context);
- }
-
- @ForwardedFieldsFirst("f0; f1; f2")
- private static final class ProjectEdge<K, VV, EV> implements FlatJoinFunction<
- Edge<K, EV>, Vertex<K, VV>, Edge<K, EV>> {
- public void join(Edge<K, EV> first, Vertex<K, VV> second, Collector<Edge<K, EV>> out) {
- out.collect(first);
- }
- }
-
- /**
- * Return the out-degree of all vertices in the graph
- *
- * @return A DataSet of Tuple2<vertexId, outDegree>
- */
- public DataSet<Tuple2<K, Long>> outDegrees() {
-
- return vertices.coGroup(edges).where(0).equalTo(0).with(new CountNeighborsCoGroup<K, VV, EV>());
- }
-
- private static final class CountNeighborsCoGroup<K, VV, EV>
- implements CoGroupFunction<Vertex<K, VV>, Edge<K, EV>, Tuple2<K, Long>> {
- @SuppressWarnings("unused")
- public void coGroup(Iterable<Vertex<K, VV>> vertex, Iterable<Edge<K, EV>> outEdges,
- Collector<Tuple2<K, Long>> out) {
- long count = 0;
- for (Edge<K, EV> edge : outEdges) {
- count++;
- }
-
- Iterator<Vertex<K, VV>> vertexIterator = vertex.iterator();
-
- if(vertexIterator.hasNext()) {
- out.collect(new Tuple2<K, Long>(vertexIterator.next().f0, count));
- } else {
- throw new NoSuchElementException("The edge src/trg id could not be found within the vertexIds");
- }
- }
- }
-
- /**
- * Return the in-degree of all vertices in the graph
- *
- * @return A DataSet of Tuple2<vertexId, inDegree>
- */
- public DataSet<Tuple2<K, Long>> inDegrees() {
-
- return vertices.coGroup(edges).where(0).equalTo(1).with(new CountNeighborsCoGroup<K, VV, EV>());
- }
-
- /**
- * Return the degree of all vertices in the graph
- *
- * @return A DataSet of Tuple2<vertexId, degree>
- */
- public DataSet<Tuple2<K, Long>> getDegrees() {
- return outDegrees().union(inDegrees()).groupBy(0).sum(1);
- }
-
- /**
- * This operation adds all inverse-direction edges to the graph.
- *
- * @return the undirected graph.
- */
- public Graph<K, VV, EV> getUndirected() {
-
- DataSet<Edge<K, EV>> undirectedEdges = edges.flatMap(new RegularAndReversedEdgesMap<K, EV>());
- return new Graph<K, VV, EV>(vertices, undirectedEdges, this.context);
- }
-
- /**
- * Compute an aggregate over the edges of each vertex. The function applied
- * on the edges has access to the vertex value.
- *
- * @param edgesFunction
- * the function to apply to the neighborhood
- * @param direction
- * the edge direction (in-, out-, all-)
- * @param <T>
- * the output type
- * @return a dataset of a T
- * @throws IllegalArgumentException
- */
- public <T> DataSet<T> groupReduceOnEdges(EdgesFunctionWithVertexValue<K, VV, EV, T> edgesFunction,
- EdgeDirection direction) throws IllegalArgumentException {
-
- switch (direction) {
- case IN:
- return vertices.coGroup(edges).where(0).equalTo(1)
- .with(new ApplyCoGroupFunction<K, VV, EV, T>(edgesFunction));
- case OUT:
- return vertices.coGroup(edges).where(0).equalTo(0)
- .with(new ApplyCoGroupFunction<K, VV, EV, T>(edgesFunction));
- case ALL:
- return vertices.coGroup(edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>()))
- .where(0).equalTo(0).with(new ApplyCoGroupFunctionOnAllEdges<K, VV, EV, T>(edgesFunction));
- default:
- throw new IllegalArgumentException("Illegal edge direction");
- }
- }
-
- /**
- * Compute an aggregate over the edges of each vertex. The function applied
- * on the edges has access to the vertex value.
- *
- * @param edgesFunction
- * the function to apply to the neighborhood
- * @param direction
- * the edge direction (in-, out-, all-)
- * @param <T>
- * the output type
- * @param typeInfo the explicit return type.
- * @return a dataset of a T
- * @throws IllegalArgumentException
- */
- public <T> DataSet<T> groupReduceOnEdges(EdgesFunctionWithVertexValue<K, VV, EV, T> edgesFunction,
- EdgeDirection direction, TypeInformation<T> typeInfo) throws IllegalArgumentException {
-
- switch (direction) {
- case IN:
- return vertices.coGroup(edges).where(0).equalTo(1)
- .with(new ApplyCoGroupFunction<K, VV, EV, T>(edgesFunction)).returns(typeInfo);
- case OUT:
- return vertices.coGroup(edges).where(0).equalTo(0)
- .with(new ApplyCoGroupFunction<K, VV, EV, T>(edgesFunction)).returns(typeInfo);
- case ALL:
- return vertices.coGroup(edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>()))
- .where(0).equalTo(0).with(new ApplyCoGroupFunctionOnAllEdges<K, VV, EV, T>(edgesFunction)).returns(typeInfo);
- default:
- throw new IllegalArgumentException("Illegal edge direction");
- }
- }
-
- /**
- * Compute an aggregate over the edges of each vertex. The function applied
- * on the edges only has access to the vertex id (not the vertex value).
- *
- * @param edgesFunction
- * the function to apply to the neighborhood
- * @param direction
- * the edge direction (in-, out-, all-)
- * @param <T>
- * the output type
- * @return a dataset of T
- * @throws IllegalArgumentException
- */
- public <T> DataSet<T> groupReduceOnEdges(EdgesFunction<K, EV, T> edgesFunction,
- EdgeDirection direction) throws IllegalArgumentException {
-
- switch (direction) {
- case IN:
- return edges.map(new ProjectVertexIdMap<K, EV>(1))
- .withForwardedFields("f1->f0")
- .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction));
- case OUT:
- return edges.map(new ProjectVertexIdMap<K, EV>(0))
- .withForwardedFields("f0")
- .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction));
- case ALL:
- return edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>())
- .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction));
- default:
- throw new IllegalArgumentException("Illegal edge direction");
- }
- }
-
- /**
- * Compute an aggregate over the edges of each vertex. The function applied
- * on the edges only has access to the vertex id (not the vertex value).
- *
- * @param edgesFunction
- * the function to apply to the neighborhood
- * @param direction
- * the edge direction (in-, out-, all-)
- * @param <T>
- * the output type
- * @param typeInfo the explicit return type.
- * @return a dataset of T
- * @throws IllegalArgumentException
- */
- public <T> DataSet<T> groupReduceOnEdges(EdgesFunction<K, EV, T> edgesFunction,
- EdgeDirection direction, TypeInformation<T> typeInfo) throws IllegalArgumentException {
-
- switch (direction) {
- case IN:
- return edges.map(new ProjectVertexIdMap<K, EV>(1))
- .withForwardedFields("f1->f0")
- .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction)).returns(typeInfo);
- case OUT:
- return edges.map(new ProjectVertexIdMap<K, EV>(0))
- .withForwardedFields("f0")
- .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction)).returns(typeInfo);
- case ALL:
- return edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>())
- .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction)).returns(typeInfo);
- default:
- throw new IllegalArgumentException("Illegal edge direction");
- }
- }
-
- private static final class ProjectVertexIdMap<K, EV> implements MapFunction<
- Edge<K, EV>, Tuple2<K, Edge<K, EV>>> {
-
- private int fieldPosition;
-
- public ProjectVertexIdMap(int position) {
- this.fieldPosition = position;
- }
-
- @SuppressWarnings("unchecked")
- public Tuple2<K, Edge<K, EV>> map(Edge<K, EV> edge) {
- return new Tuple2<K, Edge<K, EV>>((K) edge.getField(fieldPosition), edge);
- }
- }
-
- private static final class ProjectVertexWithEdgeValueMap<K, EV> implements MapFunction<
- Edge<K, EV>, Tuple2<K, EV>> {
-
- private int fieldPosition;
-
- public ProjectVertexWithEdgeValueMap(int position) {
- this.fieldPosition = position;
- }
-
- @SuppressWarnings("unchecked")
- public Tuple2<K, EV> map(Edge<K, EV> edge) {
- return new Tuple2<K, EV>((K) edge.getField(fieldPosition), edge.getValue());
- }
- }
-
- private static final class ApplyGroupReduceFunction<K, EV, T> implements GroupReduceFunction<
- Tuple2<K, Edge<K, EV>>, T>, ResultTypeQueryable<T> {
-
- private EdgesFunction<K, EV, T> function;
-
- public ApplyGroupReduceFunction(EdgesFunction<K, EV, T> fun) {
- this.function = fun;
- }
-
- public void reduce(Iterable<Tuple2<K, Edge<K, EV>>> edges, Collector<T> out) throws Exception {
- function.iterateEdges(edges, out);
- }
-
- @Override
- public TypeInformation<T> getProducedType() {
- return TypeExtractor.createTypeInfo(EdgesFunction.class, function.getClass(), 2, null, null);
- }
- }
-
- private static final class EmitOneEdgePerNode<K, VV, EV> implements FlatMapFunction<
- Edge<K, EV>, Tuple2<K, Edge<K, EV>>> {
-
- public void flatMap(Edge<K, EV> edge, Collector<Tuple2<K, Edge<K, EV>>> out) {
- out.collect(new Tuple2<K, Edge<K, EV>>(edge.getSource(), edge));
- out.collect(new Tuple2<K, Edge<K, EV>>(edge.getTarget(), edge));
- }
- }
-
- private static final class EmitOneVertexWithEdgeValuePerNode<K, EV> implements FlatMapFunction<
- Edge<K, EV>, Tuple2<K, EV>> {
-
- public void flatMap(Edge<K, EV> edge, Collector<Tuple2<K, EV>> out) {
- out.collect(new Tuple2<K, EV>(edge.getSource(), edge.getValue()));
- out.collect(new Tuple2<K, EV>(edge.getTarget(), edge.getValue()));
- }
- }
-
- private static final class EmitOneEdgeWithNeighborPerNode<K, EV> implements FlatMapFunction<
- Edge<K, EV>, Tuple3<K, K, Edge<K, EV>>> {
-
- public void flatMap(Edge<K, EV> edge, Collector<Tuple3<K, K, Edge<K, EV>>> out) {
- out.collect(new Tuple3<K, K, Edge<K, EV>>(edge.getSource(), edge.getTarget(), edge));
- out.collect(new Tuple3<K, K, Edge<K, EV>>(edge.getTarget(), edge.getSource(), edge));
- }
- }
-
- private static final class ApplyCoGroupFunction<K, VV, EV, T> implements CoGroupFunction<
- Vertex<K, VV>, Edge<K, EV>, T>, ResultTypeQueryable<T> {
-
- private EdgesFunctionWithVertexValue<K, VV, EV, T> function;
-
- public ApplyCoGroupFunction(EdgesFunctionWithVertexValue<K, VV, EV, T> fun) {
- this.function = fun;
- }
-
- public void coGroup(Iterable<Vertex<K, VV>> vertex,
- Iterable<Edge<K, EV>> edges, Collector<T> out) throws Exception {
-
- Iterator<Vertex<K, VV>> vertexIterator = vertex.iterator();
-
- if(vertexIterator.hasNext()) {
- function.iterateEdges(vertexIterator.next(), edges, out);
- } else {
- throw new NoSuchElementException("The edge src/trg id could not be found within the vertexIds");
- }
- }
-
- @Override
- public TypeInformation<T> getProducedType() {
- return TypeExtractor.createTypeInfo(EdgesFunctionWithVertexValue.class, function.getClass(), 3,
- null, null);
- }
- }
-
- private static final class ApplyCoGroupFunctionOnAllEdges<K, VV, EV, T>
- implements CoGroupFunction<Vertex<K, VV>, Tuple2<K, Edge<K, EV>>, T>, ResultTypeQueryable<T> {
-
- private EdgesFunctionWithVertexValue<K, VV, EV, T> function;
-
- public ApplyCoGroupFunctionOnAllEdges(EdgesFunctionWithVertexValue<K, VV, EV, T> fun) {
- this.function = fun;
- }
-
- public void coGroup(Iterable<Vertex<K, VV>> vertex, final Iterable<Tuple2<K, Edge<K, EV>>> keysWithEdges,
- Collector<T> out) throws Exception {
-
- final Iterator<Edge<K, EV>> edgesIterator = new Iterator<Edge<K, EV>>() {
-
- final Iterator<Tuple2<K, Edge<K, EV>>> keysWithEdgesIterator = keysWithEdges.iterator();
-
- @Override
- public boolean hasNext() {
- return keysWithEdgesIterator.hasNext();
- }
-
- @Override
- public Edge<K, EV> next() {
- return keysWithEdgesIterator.next().f1;
- }
-
- @Override
- public void remove() {
- keysWithEdgesIterator.remove();
- }
- };
-
- Iterable<Edge<K, EV>> edgesIterable = new Iterable<Edge<K, EV>>() {
- public Iterator<Edge<K, EV>> iterator() {
- return edgesIterator;
- }
- };
-
- Iterator<Vertex<K, VV>> vertexIterator = vertex.iterator();
-
- if(vertexIterator.hasNext()) {
- function.iterateEdges(vertexIterator.next(), edgesIterable, out);
- } else {
- throw new NoSuchElementException("The edge src/trg id could not be found within the vertexIds");
- }
- }
-
- @Override
- public TypeInformation<T> getProducedType() {
- return TypeExtractor.createTypeInfo(EdgesFunctionWithVertexValue.class, function.getClass(), 3,
- null, null);
- }
- }
-
- @ForwardedFields("f0->f1; f1->f0; f2")
- private static final class ReverseEdgesMap<K, EV>
- implements MapFunction<Edge<K, EV>, Edge<K, EV>> {
-
- public Edge<K, EV> map(Edge<K, EV> value) {
- return new Edge<K, EV>(value.f1, value.f0, value.f2);
- }
- }
-
- private static final class RegularAndReversedEdgesMap<K, EV>
- implements FlatMapFunction<Edge<K, EV>, Edge<K, EV>> {
-
- @Override
- public void flatMap(Edge<K, EV> edge, Collector<Edge<K, EV>> out) throws Exception {
- out.collect(new Edge<K, EV>(edge.f0, edge.f1, edge.f2));
- out.collect(new Edge<K, EV>(edge.f1, edge.f0, edge.f2));
- }
- }
-
- /**
- * Reverse the direction of the edges in the graph
- *
- * @return a new graph with all edges reversed
- * @throws UnsupportedOperationException
- */
- public Graph<K, VV, EV> reverse() throws UnsupportedOperationException {
- DataSet<Edge<K, EV>> reversedEdges = edges.map(new ReverseEdgesMap<K, EV>());
- return new Graph<K, VV, EV>(vertices, reversedEdges, this.context);
- }
-
- /**
- * @return a long integer representing the number of vertices
- */
- public long numberOfVertices() throws Exception {
- return vertices.count();
- }
-
- /**
- * @return a long integer representing the number of edges
- */
- public long numberOfEdges() throws Exception {
- return edges.count();
- }
-
- /**
- * @return The IDs of the vertices as DataSet
- */
- public DataSet<K> getVertexIds() {
- return vertices.map(new ExtractVertexIDMapper<K, VV>());
- }
-
- private static final class ExtractVertexIDMapper<K, VV>
- implements MapFunction<Vertex<K, VV>, K> {
- @Override
- public K map(Vertex<K, VV> vertex) {
- return vertex.f0;
- }
- }
-
- /**
- * @return The IDs of the edges as DataSet
- */
- public DataSet<Tuple2<K, K>> getEdgeIds() {
- return edges.map(new ExtractEdgeIDsMapper<K, EV>());
- }
-
- @ForwardedFields("f0; f1")
- private static final class ExtractEdgeIDsMapper<K, EV>
- implements MapFunction<Edge<K, EV>, Tuple2<K, K>> {
- @Override
- public Tuple2<K, K> map(Edge<K, EV> edge) throws Exception {
- return new Tuple2<K, K>(edge.f0, edge.f1);
- }
- }
-
- /**
- * Adds the input vertex to the graph. If the vertex already
- * exists in the graph, it will not be added again.
- *
- * @param vertex the vertex to be added
- * @return the new graph containing the existing vertices as well as the one just added
- */
- public Graph<K, VV, EV> addVertex(final Vertex<K, VV> vertex) {
- List<Vertex<K, VV>> newVertex = new ArrayList<Vertex<K, VV>>();
- newVertex.add(vertex);
-
- return addVertices(newVertex);
- }
-
- /**
- * Adds the list of vertices, passed as input, to the graph.
- * If the vertices already exist in the graph, they will not be added once more.
- *
- * @param verticesToAdd the list of vertices to add
- * @return the new graph containing the existing and newly added vertices
- */
- public Graph<K, VV, EV> addVertices(List<Vertex<K, VV>> verticesToAdd) {
- // Add the vertices
- DataSet<Vertex<K, VV>> newVertices = this.vertices.union(this.context.fromCollection(verticesToAdd)).distinct();
-
- return new Graph<K, VV, EV>(newVertices, this.edges, this.context);
- }
-
- /**
- * Adds the given edge to the graph. If the source and target vertices do
- * not exist in the graph, they will also be added.
- *
- * @param source the source vertex of the edge
- * @param target the target vertex of the edge
- * @param edgeValue the edge value
- * @return the new graph containing the existing vertices and edges plus the
- * newly added edge
- */
- public Graph<K, VV, EV> addEdge(Vertex<K, VV> source, Vertex<K, VV> target, EV edgeValue) {
- Graph<K, VV, EV> partialGraph = fromCollection(Arrays.asList(source, target),
- Arrays.asList(new Edge<K, EV>(source.f0, target.f0, edgeValue)),
- this.context);
- return this.union(partialGraph);
- }
-
- /**
- * Adds the given list edges to the graph.
- *
- * When adding an edge for a non-existing set of vertices, the edge is considered invalid and ignored.
- *
- * @param newEdges the data set of edges to be added
- * @return a new graph containing the existing edges plus the newly added edges.
- */
- public Graph<K, VV, EV> addEdges(List<Edge<K, EV>> newEdges) {
-
- DataSet<Edge<K,EV>> newEdgesDataSet = this.context.fromCollection(newEdges);
-
- DataSet<Edge<K,EV>> validNewEdges = this.getVertices().join(newEdgesDataSet)
- .where(0).equalTo(0)
- .with(new JoinVerticesWithEdgesOnSrc<K, VV, EV>())
- .join(this.getVertices()).where(1).equalTo(0)
- .with(new JoinWithVerticesOnTrg<K, VV, EV>());
-
- return Graph.fromDataSet(this.vertices, this.edges.union(validNewEdges), this.context);
- }
-
- @ForwardedFieldsSecond("f0; f1; f2")
- private static final class JoinVerticesWithEdgesOnSrc<K, VV, EV> implements
- JoinFunction<Vertex<K, VV>, Edge<K, EV>, Edge<K, EV>> {
-
- @Override
- public Edge<K, EV> join(Vertex<K, VV> vertex, Edge<K, EV> edge) throws Exception {
- return edge;
- }
- }
-
- @ForwardedFieldsFirst("f0; f1; f2")
- private static final class JoinWithVerticesOnTrg<K, VV, EV> implements
- JoinFunction<Edge<K, EV>, Vertex<K, VV>, Edge<K, EV>> {
-
- @Override
- public Edge<K, EV> join(Edge<K, EV> edge, Vertex<K, VV> vertex) throws Exception {
- return edge;
- }
- }
-
- /**
- * Removes the given vertex and its edges from the graph.
- *
- * @param vertex the vertex to remove
- * @return the new graph containing the existing vertices and edges without
- * the removed vertex and its edges
- */
- public Graph<K, VV, EV> removeVertex(Vertex<K, VV> vertex) {
-
- List<Vertex<K, VV>> vertexToBeRemoved = new ArrayList<Vertex<K, VV>>();
- vertexToBeRemoved.add(vertex);
-
- return removeVertices(vertexToBeRemoved);
- }
- /**
- * Removes the given list of vertices and its edges from the graph.
- *
- * @param verticesToBeRemoved the list of vertices to be removed
- * @return the resulted graph containing the initial vertices and edges minus the vertices
- * and edges removed.
- */
-
- public Graph<K, VV, EV> removeVertices(List<Vertex<K, VV>> verticesToBeRemoved)
- {
- return removeVertices(this.context.fromCollection(verticesToBeRemoved));
- }
-
- /**
- * Removes the given list of vertices and its edges from the graph.
- *
- * @param verticesToBeRemoved the DataSet of vertices to be removed
- * @return the resulted graph containing the initial vertices and edges minus the vertices
- * and edges removed.
- */
- private Graph<K, VV, EV> removeVertices(DataSet<Vertex<K, VV>> verticesToBeRemoved) {
-
- DataSet<Vertex<K, VV>> newVertices = getVertices().coGroup(verticesToBeRemoved).where(0).equalTo(0)
- .with(new VerticesRemovalCoGroup<K, VV>());
-
- DataSet < Edge < K, EV >> newEdges = newVertices.join(getEdges()).where(0).equalTo(0)
- // if the edge source was removed, the edge will also be removed
- .with(new ProjectEdgeToBeRemoved<K, VV, EV>())
- // if the edge target was removed, the edge will also be removed
- .join(newVertices).where(1).equalTo(0)
- .with(new ProjectEdge<K, VV, EV>());
-
- return new Graph<K, VV, EV>(newVertices, newEdges, context);
- }
-
- private static final class VerticesRemovalCoGroup<K, VV> implements CoGroupFunction<Vertex<K, VV>, Vertex<K, VV>, Vertex<K, VV>> {
-
- @Override
- public void coGroup(Iterable<Vertex<K, VV>> vertex, Iterable<Vertex<K, VV>> vertexToBeRemoved,
- Collector<Vertex<K, VV>> out) throws Exception {
-
- final Iterator<Vertex<K, VV>> vertexIterator = vertex.iterator();
- final Iterator<Vertex<K, VV>> vertexToBeRemovedIterator = vertexToBeRemoved.iterator();
- Vertex<K, VV> next;
-
- if (vertexIterator.hasNext()) {
- if (!vertexToBeRemovedIterator.hasNext()) {
- next = vertexIterator.next();
- out.collect(next);
- }
- }
- }
- }
-
-
-
- @ForwardedFieldsSecond("f0; f1; f2")
- private static final class ProjectEdgeToBeRemoved<K,VV,EV> implements JoinFunction<Vertex<K, VV>, Edge<K, EV>, Edge<K, EV>> {
- @Override
- public Edge<K, EV> join(Vertex<K, VV> vertex, Edge<K, EV> edge) throws Exception {
- return edge;
- }
- }
-
- /**
- * Removes all edges that match the given edge from the graph.
- *
- * @param edge the edge to remove
- * @return the new graph containing the existing vertices and edges without
- * the removed edges
- */
- public Graph<K, VV, EV> removeEdge(Edge<K, EV> edge) {
- DataSet<Edge<K, EV>> newEdges = getEdges().filter(new EdgeRemovalEdgeFilter<K, EV>(edge));
- return new Graph<K, VV, EV>(this.vertices, newEdges, this.context);
- }
-
- private static final class EdgeRemovalEdgeFilter<K, EV>
- implements FilterFunction<Edge<K, EV>> {
- private Edge<K, EV> edgeToRemove;
-
- public EdgeRemovalEdgeFilter(Edge<K, EV> edge) {
- edgeToRemove = edge;
- }
-
- @Override
- public boolean filter(Edge<K, EV> edge) {
- return (!(edge.f0.equals(edgeToRemove.f0) && edge.f1
- .equals(edgeToRemove.f1)));
- }
- }
-
- /**
- * Removes all the edges that match the edges in the given data set from the graph.
- *
- * @param edgesToBeRemoved the list of edges to be removed
- * @return a new graph where the edges have been removed and in which the vertices remained intact
- */
- public Graph<K, VV, EV> removeEdges(List<Edge<K, EV>> edgesToBeRemoved) {
-
- DataSet<Edge<K, EV>> newEdges = getEdges().coGroup(this.context.fromCollection(edgesToBeRemoved))
- .where(0,1).equalTo(0,1).with(new EdgeRemovalCoGroup<K, EV>());
-
- return new Graph<K, VV, EV>(this.vertices, newEdges, context);
- }
-
- private static final class EdgeRemovalCoGroup<K,EV> implements CoGroupFunction<Edge<K, EV>, Edge<K, EV>, Edge<K, EV>> {
-
- @Override
- public void coGroup(Iterable<Edge<K, EV>> edge, Iterable<Edge<K, EV>> edgeToBeRemoved,
- Collector<Edge<K, EV>> out) throws Exception {
-
- final Iterator<Edge<K, EV>> edgeIterator = edge.iterator();
- final Iterator<Edge<K, EV>> edgeToBeRemovedIterator = edgeToBeRemoved.iterator();
- Edge<K, EV> next;
-
- if (edgeIterator.hasNext()) {
- if (!edgeToBeRemovedIterator.hasNext()) {
- next = edgeIterator.next();
- out.collect(next);
- }
- }
- }
- }
-
- /**
- * Performs union on the vertices and edges sets of the input graphs
- * removing duplicate vertices but maintaining duplicate edges.
- *
- * @param graph the graph to perform union with
- * @return a new graph
- */
- public Graph<K, VV, EV> union(Graph<K, VV, EV> graph) {
-
- DataSet<Vertex<K, VV>> unionedVertices = graph.getVertices().union(this.getVertices()).distinct();
- DataSet<Edge<K, EV>> unionedEdges = graph.getEdges().union(this.getEdges());
- return new Graph<K, VV, EV>(unionedVertices, unionedEdges, this.context);
- }
-
- /**
- * Performs Difference on the vertex and edge sets of the input graphs
- * removes common vertices and edges. If a source/target vertex is removed, its corresponding edge will also be removed
- * @param graph the graph to perform difference with
- * @return a new graph where the common vertices and edges have been removed
- */
- public Graph<K,VV,EV> difference(Graph<K,VV,EV> graph) {
- DataSet<Vertex<K,VV>> removeVerticesData = graph.getVertices();
- return this.removeVertices(removeVerticesData);
- }
-
- /**
- * Runs a Vertex-Centric iteration on the graph.
- * No configuration options are provided.
- *
- * @param vertexUpdateFunction the vertex update function
- * @param messagingFunction the messaging function
- * @param maximumNumberOfIterations maximum number of iterations to perform
- *
- * @return the updated Graph after the vertex-centric iteration has converged or
- * after maximumNumberOfIterations.
- */
- public <M> Graph<K, VV, EV> runVertexCentricIteration(
- VertexUpdateFunction<K, VV, M> vertexUpdateFunction,
- MessagingFunction<K, VV, M, EV> messagingFunction,
- int maximumNumberOfIterations) {
-
- return this.runVertexCentricIteration(vertexUpdateFunction, messagingFunction,
- maximumNumberOfIterations, null);
- }
-
- /**
- * Runs a Vertex-Centric iteration on the graph with configuration options.
- *
- * @param vertexUpdateFunction the vertex update function
- * @param messagingFunction the messaging function
- * @param maximumNumberOfIterations maximum number of iterations to perform
- * @param parameters the iteration configuration parameters
- *
- * @return the updated Graph after the vertex-centric iteration has converged or
- * after maximumNumberOfIterations.
- */
- public <M> Graph<K, VV, EV> runVertexCentricIteration(
- VertexUpdateFunction<K, VV, M> vertexUpdateFunction,
- MessagingFunction<K, VV, M, EV> messagingFunction,
- int maximumNumberOfIterations, VertexCentricConfiguration parameters) {
-
- VertexCentricIteration<K, VV, M, EV> iteration = VertexCentricIteration.withEdges(
- edges, vertexUpdateFunction, messagingFunction, maximumNumberOfIterations);
-
- iteration.configure(parameters);
-
- DataSet<Vertex<K, VV>> newVertices = this.getVertices().runOperation(iteration);
-
- return new Graph<K, VV, EV>(newVertices, this.edges, this.context);
- }
-
- /**
- * Runs a Gather-Sum-Apply iteration on the graph.
- * No configuration options are provided.
- *
- * @param gatherFunction the gather function collects information about adjacent vertices and edges
- * @param sumFunction the sum function aggregates the gathered information
- * @param applyFunction the apply function updates the vertex values with the aggregates
- * @param maximumNumberOfIterations maximum number of iterations to perform
- * @param <M> the intermediate type used between gather, sum and apply
- *
- * @return the updated Graph after the gather-sum-apply iteration has converged or
- * after maximumNumberOfIterations.
- */
- public <M> Graph<K, VV, EV> runGatherSumApplyIteration(
- GatherFunction<VV, EV, M> gatherFunction, SumFunction<VV, EV, M> sumFunction,
- ApplyFunction<K, VV, M> applyFunction, int maximumNumberOfIterations) {
-
- return this.runGatherSumApplyIteration(gatherFunction, sumFunction, applyFunction,
- maximumNumberOfIterations, null);
- }
-
- /**
- * Runs a Gather-Sum-Apply iteration on the graph with configuration options.
- *
- * @param gatherFunction the gather function collects information about adjacent vertices and edges
- * @param sumFunction the sum function aggregates the gathered information
- * @param applyFunction the apply function updates the vertex values with the aggregates
- * @param maximumNumberOfIterations maximum number of iterations to perform
- * @param parameters the iteration configuration parameters
- * @param <M> the intermediate type used between gather, sum and apply
- *
- * @return the updated Graph after the gather-sum-apply iteration has converged or
- * after maximumNumberOfIterations.
- */
- public <M> Graph<K, VV, EV> runGatherSumApplyIteration(
- GatherFunction<VV, EV, M> gatherFunction, SumFunction<VV, EV, M> sumFunction,
- ApplyFunction<K, VV, M> applyFunction, int maximumNumberOfIterations,
- GSAConfiguration parameters) {
-
- GatherSumApplyIteration<K, VV, EV, M> iteration = GatherSumApplyIteration.withEdges(
- edges, gatherFunction, sumFunction, applyFunction, maximumNumberOfIterations);
-
- iteration.configure(parameters);
-
- DataSet<Vertex<K, VV>> newVertices = vertices.runOperation(iteration);
-
- return new Graph<K, VV, EV>(newVertices, this.edges, this.context);
- }
-
- /**
- * @param algorithm the algorithm to run on the Graph
- * @param <T> the return type
- * @return the result of the graph algorithm
- * @throws Exception
- */
- public <T> T run(GraphAlgorithm<K, VV, EV, T> algorithm) throws Exception {
- return algorithm.run(this);
- }
-
- /**
- * Compute an aggregate over the neighbors (edges and vertices) of each
- * vertex. The function applied on the neighbors has access to the vertex
- * value.
- *
- * @param neighborsFunction the function to apply to the neighborhood
- * @param direction the edge direction (in-, out-, all-)
- * @param <T> the output type
- * @return a dataset of a T
- * @throws IllegalArgumentException
- */
- public <T> DataSet<T> groupReduceOnNeighbors(NeighborsFunctionWithVertexValue<K, VV, EV, T> neighborsFunction,
- EdgeDirection direction) throws IllegalArgumentException {
- switch (direction) {
- case IN:
- // create <edge-sourceVertex> pairs
- DataSet<Tuple2<Edge<K, EV>, Vertex<K, VV>>> edgesWithSources = edges
- .join(this.vertices).where(0).equalTo(0);
- return vertices.coGroup(edgesWithSources)
- .where(0).equalTo("f0.f1")
- .with(new ApplyNeighborCoGroupFunction<K, VV, EV, T>(neighborsFunction));
- case OUT:
- // create <edge-targetVertex> pairs
- DataSet<Tuple2<Edge<K, EV>, Vertex<K, VV>>> edgesWithTargets = edges
- .join(this.vertices).where(1).equalTo(0);
- return vertices.coGroup(edgesWithTargets)
- .where(0).equalTo("f0.f0")
- .with(new ApplyNeighborCoGroupFunction<K, VV, EV, T>(neighborsFunction));
- case ALL:
- // create <edge-sourceOrTargetVertex> pairs
- DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithNeighbors = edges
- .flatMap(new EmitOneEdgeWithNeighborPerNode<K, EV>())
- .join(this.vertices).where(1).equalTo(0)
- .with(new ProjectEdgeWithNeighbor<K, VV, EV>());
-
- return vertices.coGroup(edgesWithNeighbors)
- .where(0).equalTo(0)
- .with(new ApplyCoGroupFunctionOnAllNeighbors<K, VV, EV, T>(neighborsFunction));
- default:
- throw new IllegalArgumentException("Illegal edge direction");
- }
- }
-
- /**
- * Compute an aggregate over the neighbors (edges and vertices) of each
- * vertex. The function applied on the neighbors has access to the vertex
- * value.
- *
- * @param neighborsFunction the function to apply to the neighborhood
- * @param direction the edge direction (in-, out-, all-)
- * @param <T> the output type
- * @param typeInfo the explicit return type.
- * @return a dataset of a T
- * @throws IllegalArgumentException
- */
- public <T> DataSet<T> groupReduceOnNeighbors(NeighborsFunctionWithVertexValue<K, VV, EV, T> neighborsFunction,
- EdgeDirection direction, TypeInformation<T> typeInfo) throws IllegalArgumentException {
- switch (direction) {
- case IN:
- // create <edge-sourceVertex> pairs
- DataSet<Tuple2<Edge<K, EV>, Vertex<K, VV>>> edgesWithSources = edges
- .join(this.vertices).where(0).equalTo(0);
- return vertices.coGroup(edgesWithSources)
- .where(0).equalTo("f0.f1")
- .with(new ApplyNeighborCoGroupFunction<K, VV, EV, T>(neighborsFunction)).returns(typeInfo);
- case OUT:
- // create <edge-targetVertex> pairs
- DataSet<Tuple2<Edge<K, EV>, Vertex<K, VV>>> edgesWithTargets = edges
- .join(this.vertices).where(1).equalTo(0);
- return vertices.coGroup(edgesWithTargets)
- .where(0).equalTo("f0.f0")
- .with(new ApplyNeighborCoGroupFunction<K, VV, EV, T>(neighborsFunction)).returns(typeInfo);
- case ALL:
- // create <edge-sourceOrTargetVertex> pairs
- DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithNeighbors = edges
- .flatMap(new EmitOneEdgeWithNeighborPerNode<K, EV>())
- .join(this.vertices).where(1).equalTo(0)
- .with(new ProjectEdgeWithNeighbor<K, VV, EV>());
-
- return vertices.coGroup(edgesWithNeighbors)
- .where(0).equalTo(0)
- .with(new ApplyCoGroupFunctionOnAllNeighbors<K, VV, EV, T>(neighborsFunction)).returns(typeInfo);
- default:
- throw new IllegalArgumentException("Illegal edge direction");
- }
- }
-
-
- /**
- * Compute an aggregate over the neighbors (edges and vertices) of each
- * vertex. The function applied on the neighbors only has access to the
- * vertex id (not the vertex value).
- *
- * @param neighborsFunction the function to apply to the neighborhood
- * @param direction the edge direction (in-, out-, all-)
- * @param <T> the output type
- * @return a dataset of a T
- * @throws IllegalArgumentException
- */
- public <T> DataSet<T> groupReduceOnNeighbors(NeighborsFunction<K, VV, EV, T> neighborsFunction,
- EdgeDirection direction) throws IllegalArgumentException {
- switch (direction) {
- case IN:
- // create <edge-sourceVertex> pairs
- DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithSources = edges
- .join(this.vertices).where(0).equalTo(0)
- .with(new ProjectVertexIdJoin<K, VV, EV>(1))
- .withForwardedFieldsFirst("f1->f0");
- return edgesWithSources.groupBy(0).reduceGroup(
- new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction));
- case OUT:
- // create <edge-targetVertex> pairs
- DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithTargets = edges
- .join(this.vertices).where(1).equalTo(0)
- .with(new ProjectVertexIdJoin<K, VV, EV>(0))
- .withForwardedFieldsFirst("f0");
- return edgesWithTargets.groupBy(0).reduceGroup(
- new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction));
- case ALL:
- // create <edge-sourceOrTargetVertex> pairs
- DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithNeighbors = edges
- .flatMap(new EmitOneEdgeWithNeighborPerNode<K, EV>())
- .join(this.vertices).where(1).equalTo(0)
- .with(new ProjectEdgeWithNeighbor<K, VV, EV>());
-
- return edgesWithNeighbors.groupBy(0).reduceGroup(
- new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction));
- default:
- throw new IllegalArgumentException("Illegal edge direction");
- }
- }
-
- /**
- * Compute an aggregate over the neighbors (edges and vertices) of each
- * vertex. The function applied on the neighbors only has access to the
- * vertex id (not the vertex value).
- *
- * @param neighborsFunction the function to apply to the neighborhood
- * @param direction the edge direction (in-, out-, all-)
- * @param <T> the output type
- * @param typeInfo the explicit return type.
- * @return a dataset of a T
- * @throws IllegalArgumentException
- */
- public <T> DataSet<T> groupReduceOnNeighbors(NeighborsFunction<K, VV, EV, T> neighborsFunction,
- EdgeDirection direction, TypeInformation<T> typeInfo) throws IllegalArgumentException {
- switch (direction) {
- case IN:
- // create <edge-sourceVertex> pairs
- DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithSources = edges
- .join(this.vertices).where(0).equalTo(0)
- .with(new ProjectVertexIdJoin<K, VV, EV>(1))
- .withForwardedFieldsFirst("f1->f0");
- return edgesWithSources.groupBy(0).reduceGroup(
- new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction)).returns(typeInfo);
- case OUT:
- // create <edge-targetVertex> pairs
- DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithTargets = edges
- .join(this.vertices).where(1).equalTo(0)
- .with(new ProjectVertexIdJoin<K, VV, EV>(0))
- .withForwardedFieldsFirst("f0");
- return edgesWithTargets.groupBy(0).reduceGroup(
- new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction)).returns(typeInfo);
- case ALL:
- // create <edge-sourceOrTargetVertex> pairs
- DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithNeighbors = edges
- .flatMap(new EmitOneEdgeWithNeighborPerNode<K, EV>())
- .join(this.vertices).where(1).equalTo(0)
- .with(new ProjectEdgeWithNeighbor<K, VV, EV>());
-
- return edgesWithNeighbors.groupBy(0).reduceGroup(
- new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction)).returns(typeInfo);
- default:
- throw new IllegalArgumentException("Illegal edge direction");
- }
- }
-
- private static final class ApplyNeighborGroupReduceFunction<K, VV, EV, T>
- implements GroupReduceFunction<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>, T>, ResultTypeQueryable<T> {
-
- private NeighborsFunction<K, VV, EV, T> function;
-
- public ApplyNeighborGroupReduceFunction(NeighborsFunction<K, VV, EV, T> fun) {
- this.function = fun;
- }
-
- public void reduce(Iterable<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edges, Collector<T> out) throws Exception {
- function.iterateNeighbors(edges, out);
- }
-
- @Override
- public TypeInformation<T> getProducedType() {
- return TypeExtractor.createTypeInfo(NeighborsFunction.class, function.getClass(), 3, null, null);
- }
- }
-
- @ForwardedFieldsSecond("f1")
- private static final class ProjectVertexWithNeighborValueJoin<K, VV, EV>
- implements FlatJoinFunction<Edge<K, EV>, Vertex<K, VV>, Tuple2<K, VV>> {
-
- private int fieldPosition;
-
- public ProjectVertexWithNeighborValueJoin(int position) {
- this.fieldPosition = position;
- }
-
- @SuppressWarnings("unchecked")
- public void join(Edge<K, EV> edge, Vertex<K, VV> otherVertex,
- Collector<Tuple2<K, VV>> out) {
- out.collect(new Tuple2<K, VV>((K) edge.getField(fieldPosition), otherVertex.getValue()));
- }
- }
-
- private static final class ProjectVertexIdJoin<K, VV, EV> implements FlatJoinFunction<
- Edge<K, EV>, Vertex<K, VV>, Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> {
-
- private int fieldPosition;
-
- public ProjectVertexIdJoin(int position) {
- this.fieldPosition = position;
- }
-
- @SuppressWarnings("unchecked")
- public void join(Edge<K, EV> edge, Vertex<K, VV> otherVertex,
- Collector<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> out) {
- out.collect(new Tuple3<K, Edge<K, EV>, Vertex<K, VV>>((K) edge.getField(fieldPosition), edge, otherVertex));
- }
- }
-
- @ForwardedFieldsFirst("f0")
- @ForwardedFieldsSecond("f1")
- private static final class ProjectNeighborValue<K, VV, EV> implements FlatJoinFunction<
- Tuple3<K, K, Edge<K, EV>>, Vertex<K, VV>, Tuple2<K, VV>> {
-
- public void join(Tuple3<K, K, Edge<K, EV>> keysWithEdge, Vertex<K, VV> neighbor,
- Collector<Tuple2<K, VV>> out) {
-
- out.collect(new Tuple2<K, VV>(keysWithEdge.f0, neighbor.getValue()));
- }
- }
-
- @ForwardedFieldsFirst("f0; f2->f1")
- @ForwardedFieldsSecond("*->f2")
- private static final class ProjectEdgeWithNeighbor<K, VV, EV> implements FlatJoinFunction<
- Tuple3<K, K, Edge<K, EV>>, Vertex<K, VV>, Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> {
-
- public void join(Tuple3<K, K, Edge<K, EV>> keysWithEdge, Vertex<K, VV> neighbor,
- Collector<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> out) {
- out.collect(new Tuple3<K, Edge<K, EV>, Vertex<K, VV>>(keysWithEdge.f0, keysWithEdge.f2, neighbor));
- }
- }
-
- private static final class ApplyNeighborCoGroupFunction<K, VV, EV, T> implements CoGroupFunction<
- Vertex<K, VV>, Tuple2<Edge<K, EV>, Vertex<K, VV>>, T>, ResultTypeQueryable<T> {
-
- private NeighborsFunctionWithVertexValue<K, VV, EV, T> function;
-
- public ApplyNeighborCoGroupFunction(NeighborsFunctionWithVertexValue<K, VV, EV, T> fun) {
- this.function = fun;
- }
-
- public void coGroup(Iterable<Vertex<K, VV>> vertex, Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>> neighbors,
- Collector<T> out) throws Exception {
- function.iterateNeighbors(vertex.iterator().next(), neighbors, out);
- }
-
- @Override
- public TypeInformation<T> getProducedType() {
- return TypeExtractor.createTypeInfo(NeighborsFunctionWithVertexValue.class, function.getClass(), 3, null, null);
- }
- }
-
- private static final class ApplyCoGroupFunctionOnAllNeighbors<K, VV, EV, T>
- implements CoGroupFunction<Vertex<K, VV>, Tuple3<K, Edge<K, EV>, Vertex<K, VV>>, T>, ResultTypeQueryable<T> {
-
- private NeighborsFunctionWithVertexValue<K, VV, EV, T> function;
-
- public ApplyCoGroupFunctionOnAllNeighbors(NeighborsFunctionWithVertexValue<K, VV, EV, T> fun) {
- this.function = fun;
- }
-
- public void coGroup(Iterable<Vertex<K, VV>> vertex,
- final Iterable<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> keysWithNeighbors,
- Collector<T> out) throws Exception {
-
- final Iterator<Tuple2<Edge<K, EV>, Vertex<K, VV>>> neighborsIterator = new Iterator<Tuple2<Edge<K, EV>, Vertex<K, VV>>>() {
-
- final Iterator<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> keysWithEdgesIterator = keysWithNeighbors.iterator();
-
- @Override
- public boolean hasNext() {
- return keysWithEdgesIterator.hasNext();
- }
-
- @Override
- public Tuple2<Edge<K, EV>, Vertex<K, VV>> next() {
- Tuple3<K, Edge<K, EV>, Vertex<K, VV>> next = keysWithEdgesIterator.next();
- return new Tuple2<Edge<K, EV>, Vertex<K, VV>>(next.f1, next.f2);
- }
-
- @Override
- public void remove() {
- keysWithEdgesIterator.remove();
- }
- };
-
- Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>> neighborsIterable = new Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>>() {
- public Iterator<Tuple2<Edge<K, EV>, Vertex<K, VV>>> iterator() {
- return neighborsIterator;
- }
- };
-
- Iterator<Vertex<K, VV>> vertexIterator = vertex.iterator();
-
- if (vertexIterator.hasNext()) {
- function.iterateNeighbors(vertexIterator.next(), neighborsIterable, out);
- } else {
- throw new NoSuchElementException("The edge src/trg id could not be found within the vertexIds");
- }
- }
-
- @Override
- public TypeInformation<T> getProducedType() {
- return TypeExtractor.createTypeInfo(NeighborsFunctionWithVertexValue.class, function.getClass(), 3, null, null);
- }
- }
-
- /**
- * Compute an aggregate over the neighbor values of each
- * vertex.
- *
- * @param reduceNeighborsFunction the function to apply to the neighborhood
- * @param direction the edge direction (in-, out-, all-)
- * @return a Dataset containing one value per vertex (vertex id, aggregate vertex value)
- * @throws IllegalArgumentException
- */
- public DataSet<Tuple2<K, VV>> reduceOnNeighbors(ReduceNeighborsFunction<VV> reduceNeighborsFunction,
- EdgeDirection direction) throws IllegalArgumentException {
- switch (direction) {
- case IN:
- // create <vertex-source value> pairs
- final DataSet<Tuple2<K, VV>> verticesWithSourceNeighborValues = edges
- .join(this.vertices).where(0).equalTo(0)
- .with(new ProjectVertexWithNeighborValueJoin<K, VV, EV>(1))
- .withForwardedFieldsFirst("f1->f0");
- return verticesWithSourceNeighborValues.groupBy(0).reduce(new ApplyNeighborReduceFunction<K, VV>(
- reduceNeighborsFunction));
- case OUT:
- // create <vertex-target value> pairs
- DataSet<Tuple2<K, VV>> verticesWithTargetNeighborValues = edges
- .join(this.vertices).where(1).equalTo(0)
- .with(new ProjectVertexWithNeighborValueJoin<K, VV, EV>(0))
- .withForwardedFieldsFirst("f0");
- return verticesWithTargetNeighborValues.groupBy(0).reduce(new ApplyNeighborReduceFunction<K, VV>(
- reduceNeighborsFunction));
- case ALL:
- // create <vertex-neighbor value> pairs
- DataSet<Tuple2<K, VV>> verticesWithNeighborValues = edges
- .flatMap(new EmitOneEdgeWithNeighborPerNode<K, EV>())
- .join(this.vertices).where(1).equalTo(0)
- .with(new ProjectNeighborValue<K, VV, EV>());
-
- return verticesWithNeighborValues.groupBy(0).reduce(new ApplyNeighborReduceFunction<K, VV>(
- reduceNeighborsFunction));
- default:
- throw new IllegalArgumentException("Illegal edge direction");
- }
- }
-
- @ForwardedFields("f0")
- private static final class ApplyNeighborReduceFunction<K, VV> implements ReduceFunction<Tuple2<K, VV>> {
-
- private ReduceNeighborsFunction<VV> function;
-
- public ApplyNeighborReduceFunction(ReduceNeighborsFunction<VV> fun) {
- this.function = fun;
- }
-
- @Override
- public Tuple2<K, VV> reduce(Tuple2<K, VV> first, Tuple2<K, VV> second) throws Exception {
- first.setField(function.reduceNeighbors(first.f1, second.f1), 1);
- return first;
- }
- }
-
- /**
- * Compute an aggregate over the edge values of each vertex.
- *
- * @param reduceEdgesFunction
- * the function to apply to the neighborhood
- * @param direction
- * the edge direction (in-, out-, all-)
- * @return a Dataset containing one value per vertex(vertex key, aggregate edge value)
- * @throws IllegalArgumentException
- */
- public DataSet<Tuple2<K, EV>> reduceOnEdges(ReduceEdgesFunction<EV> reduceEdgesFunction,
- EdgeDirection direction) throws IllegalArgumentException {
-
- switch (direction) {
- case IN:
- return edges.map(new ProjectVertexWithEdgeValueMap<K, EV>(1))
- .withForwardedFields("f1->f0")
- .groupBy(0).reduce(new ApplyReduceFunction<K, EV>(reduceEdgesFunction));
- case OUT:
- return edges.map(new ProjectVertexWithEdgeValueMap<K, EV>(0))
- .withForwardedFields("f0->f0")
- .groupBy(0).reduce(new ApplyReduceFunction<K, EV>(reduceEdgesFunction));
- case ALL:
- return edges.flatMap(new EmitOneVertexWithEdgeValuePerNode<K, EV>())
- .withForwardedFields("f2->f1")
- .groupBy(0).reduce(new ApplyReduceFunction<K, EV>(reduceEdgesFunction));
- default:
- throw new IllegalArgumentException("Illegal edge direction");
- }
- }
-
- @ForwardedFields("f0")
- private static final class ApplyReduceFunction<K, EV> implements ReduceFunction<Tuple2<K, EV>> {
-
- private ReduceEdgesFunction<EV> function;
-
- public ApplyReduceFunction(ReduceEdgesFunction<EV> fun) {
- this.function = fun;
- }
-
- @Override
- public Tuple2<K, EV> reduce(Tuple2<K, EV> first, Tuple2<K, EV> second) throws Exception {
- first.setField(function.reduceEdges(first.f1, second.f1), 1);
- return first;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
deleted file mode 100644
index 08cf011..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph;
-
-/**
- * @param <K> key type
- * @param <VV> vertex value type
- * @param <EV> edge value type
- * @param <T> the return type
- */
-public interface GraphAlgorithm<K, VV, EV, T> {
-
- public T run(Graph<K, VV, EV> input) throws Exception;
-}