You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/10/09 18:05:56 UTC
[18/24] flink git commit: [FLINK-2833] [gelly] create a
flink-libraries module and move gelly there
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
new file mode 100755
index 0000000..23ccb68
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.gsa;
+
+import org.apache.flink.api.common.aggregators.Aggregator;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.RichFlatJoinFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
+import org.apache.flink.api.java.operators.CustomUnaryOperation;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.operators.JoinOperator;
+import org.apache.flink.api.java.operators.MapOperator;
+import org.apache.flink.api.java.operators.ReduceOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.util.Collector;
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class represents iterative graph computations, programmed in a gather-sum-apply perspective.
+ *
+ * @param <K> The type of the vertex key in the graph
+ * @param <VV> The type of the vertex value in the graph
+ * @param <EV> The type of the edge value in the graph
+ * @param <M> The intermediate type used by the gather, sum and apply functions
+ */
+public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperation<Vertex<K, VV>,
+ Vertex<K, VV>> {
+
+ private DataSet<Vertex<K, VV>> vertexDataSet;
+ private DataSet<Edge<K, EV>> edgeDataSet;
+
+ private final GatherFunction<VV, EV, M> gather;
+ private final SumFunction<VV, EV, M> sum;
+ private final ApplyFunction<K, VV, M> apply;
+ private final int maximumNumberOfIterations;
+ private EdgeDirection direction = EdgeDirection.OUT;
+
+ private GSAConfiguration configuration;
+
+ // ----------------------------------------------------------------------------------
+
+ private GatherSumApplyIteration(GatherFunction<VV, EV, M> gather, SumFunction<VV, EV, M> sum,
+ ApplyFunction<K, VV, M> apply, DataSet<Edge<K, EV>> edges, int maximumNumberOfIterations) {
+
+ Preconditions.checkNotNull(gather);
+ Preconditions.checkNotNull(sum);
+ Preconditions.checkNotNull(apply);
+ Preconditions.checkNotNull(edges);
+ Preconditions.checkArgument(maximumNumberOfIterations > 0, "The maximum number of iterations must be at least one.");
+
+ this.gather = gather;
+ this.sum = sum;
+ this.apply = apply;
+ this.edgeDataSet = edges;
+ this.maximumNumberOfIterations = maximumNumberOfIterations;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Custom Operator behavior
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Sets the input data set for this operator. In the case of this operator this input data set represents
+ * the set of vertices with their initial state.
+ *
+ * @param dataSet The input data set, which in the case of this operator represents the set of
+ * vertices with their initial state.
+ */
+ @Override
+ public void setInput(DataSet<Vertex<K, VV>> dataSet) {
+ this.vertexDataSet = dataSet;
+ }
+
+ /**
+ * Computes the results of the gather-sum-apply iteration
+ *
+ * @return The resulting DataSet
+ */
+ @Override
+ public DataSet<Vertex<K, VV>> createResult() {
+ if (vertexDataSet == null) {
+ throw new IllegalStateException("The input data set has not been set.");
+ }
+
+ // Prepare type information
+ TypeInformation<K> keyType = ((TupleTypeInfo<?>) vertexDataSet.getType()).getTypeAt(0);
+ TypeInformation<M> messageType = TypeExtractor.createTypeInfo(GatherFunction.class, gather.getClass(), 2, null, null);
+ TypeInformation<Tuple2<K, M>> innerType = new TupleTypeInfo<Tuple2<K, M>>(keyType, messageType);
+ TypeInformation<Vertex<K, VV>> outputType = vertexDataSet.getType();
+
+ // create a graph
+ Graph<K, VV, EV> graph =
+ Graph.fromDataSet(vertexDataSet, edgeDataSet, vertexDataSet.getExecutionEnvironment());
+
+ // check whether the numVertices option is set and, if so, compute the total number of vertices
+ // and set it within the gather, sum and apply functions
+ if (this.configuration != null && this.configuration.isOptNumVertices()) {
+ try {
+ long numberOfVertices = graph.numberOfVertices();
+ gather.setNumberOfVertices(numberOfVertices);
+ sum.setNumberOfVertices(numberOfVertices);
+ apply.setNumberOfVertices(numberOfVertices);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ // Prepare UDFs
+ GatherUdf<K, VV, EV, M> gatherUdf = new GatherUdf<K, VV, EV, M>(gather, innerType);
+ SumUdf<K, VV, EV, M> sumUdf = new SumUdf<K, VV, EV, M>(sum, innerType);
+ ApplyUdf<K, VV, EV, M> applyUdf = new ApplyUdf<K, VV, EV, M>(apply, outputType);
+
+ final int[] zeroKeyPos = new int[] {0};
+ final DeltaIteration<Vertex<K, VV>, Vertex<K, VV>> iteration =
+ vertexDataSet.iterateDelta(vertexDataSet, maximumNumberOfIterations, zeroKeyPos);
+
+ // set up the iteration operator
+ if (this.configuration != null) {
+
+ iteration.name(this.configuration.getName(
+ "Gather-sum-apply iteration (" + gather + " | " + sum + " | " + apply + ")"));
+ iteration.parallelism(this.configuration.getParallelism());
+ iteration.setSolutionSetUnManaged(this.configuration.isSolutionSetUnmanagedMemory());
+
+ // register all aggregators
+ for (Map.Entry<String, Aggregator<?>> entry : this.configuration.getAggregators().entrySet()) {
+ iteration.registerAggregator(entry.getKey(), entry.getValue());
+ }
+ }
+ else {
+ // no configuration provided; set default name
+ iteration.name("Gather-sum-apply iteration (" + gather + " | " + sum + " | " + apply + ")");
+ }
+
+ // Prepare the neighbors
+ if(this.configuration != null) {
+ direction = this.configuration.getDirection();
+ }
+ DataSet<Tuple2<K, Neighbor<VV, EV>>> neighbors;
+ switch(direction) {
+ case OUT:
+ neighbors = iteration
+ .getWorkset().join(edgeDataSet)
+ .where(0).equalTo(0).with(new ProjectKeyWithNeighborOUT<K, VV, EV>());
+ break;
+ case IN:
+ neighbors = iteration
+ .getWorkset().join(edgeDataSet)
+ .where(0).equalTo(1).with(new ProjectKeyWithNeighborIN<K, VV, EV>());
+ break;
+ case ALL:
+ neighbors = iteration
+ .getWorkset().join(edgeDataSet)
+ .where(0).equalTo(0).with(new ProjectKeyWithNeighborOUT<K, VV, EV>()).union(iteration
+ .getWorkset().join(edgeDataSet)
+ .where(0).equalTo(1).with(new ProjectKeyWithNeighborIN<K, VV, EV>()));
+ break;
+ default:
+ neighbors = iteration
+ .getWorkset().join(edgeDataSet)
+ .where(0).equalTo(0).with(new ProjectKeyWithNeighborOUT<K, VV, EV>());
+ break;
+ }
+
+ // Gather, sum and apply
+ MapOperator<Tuple2<K, Neighbor<VV, EV>>, Tuple2<K, M>> gatherMapOperator = neighbors.map(gatherUdf);
+
+ // configure map gather function with name and broadcast variables
+ gatherMapOperator = gatherMapOperator.name("Gather");
+
+ if (this.configuration != null) {
+ for (Tuple2<String, DataSet<?>> e : this.configuration.getGatherBcastVars()) {
+ gatherMapOperator = gatherMapOperator.withBroadcastSet(e.f1, e.f0);
+ }
+ }
+ DataSet<Tuple2<K, M>> gatheredSet = gatherMapOperator;
+
+ ReduceOperator<Tuple2<K, M>> sumReduceOperator = gatheredSet.groupBy(0).reduce(sumUdf);
+
+ // configure reduce sum function with name and broadcast variables
+ sumReduceOperator = sumReduceOperator.name("Sum");
+
+ if (this.configuration != null) {
+ for (Tuple2<String, DataSet<?>> e : this.configuration.getSumBcastVars()) {
+ sumReduceOperator = sumReduceOperator.withBroadcastSet(e.f1, e.f0);
+ }
+ }
+ DataSet<Tuple2<K, M>> summedSet = sumReduceOperator;
+
+ JoinOperator<?, ?, Vertex<K, VV>> appliedSet = summedSet
+ .join(iteration.getSolutionSet())
+ .where(0)
+ .equalTo(0)
+ .with(applyUdf);
+
+ // configure join apply function with name and broadcast variables
+ appliedSet = appliedSet.name("Apply");
+
+ if (this.configuration != null) {
+ for (Tuple2<String, DataSet<?>> e : this.configuration.getApplyBcastVars()) {
+ appliedSet = appliedSet.withBroadcastSet(e.f1, e.f0);
+ }
+ }
+
+ // let the operator know that we preserve the key field
+ appliedSet.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0");
+
+ return iteration.closeWith(appliedSet, appliedSet);
+ }
+
+ /**
+ * Creates a new gather-sum-apply iteration operator for graphs
+ *
+ * @param edges The edge DataSet
+ *
+ * @param gather The gather function of the GSA iteration
+ * @param sum The sum function of the GSA iteration
+ * @param apply The apply function of the GSA iteration
+ *
+ * @param maximumNumberOfIterations The maximum number of iterations executed
+ *
+ * @param <K> The type of the vertex key in the graph
+ * @param <VV> The type of the vertex value in the graph
+ * @param <EV> The type of the edge value in the graph
+ * @param <M> The intermediate type used by the gather, sum and apply functions
+ *
+ * @return An in stance of the gather-sum-apply graph computation operator.
+ */
+ public static final <K, VV, EV, M> GatherSumApplyIteration<K, VV, EV, M>
+ withEdges(DataSet<Edge<K, EV>> edges, GatherFunction<VV, EV, M> gather,
+ SumFunction<VV, EV, M> sum, ApplyFunction<K, VV, M> apply, int maximumNumberOfIterations) {
+
+ return new GatherSumApplyIteration<K, VV, EV, M>(gather, sum, apply, edges, maximumNumberOfIterations);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Wrapping UDFs
+ // --------------------------------------------------------------------------------------------
+
+ @SuppressWarnings("serial")
+ @ForwardedFields("f0")
+ private static final class GatherUdf<K, VV, EV, M> extends RichMapFunction<Tuple2<K, Neighbor<VV, EV>>,
+ Tuple2<K, M>> implements ResultTypeQueryable<Tuple2<K, M>> {
+
+ private final GatherFunction<VV, EV, M> gatherFunction;
+ private transient TypeInformation<Tuple2<K, M>> resultType;
+
+ private GatherUdf(GatherFunction<VV, EV, M> gatherFunction, TypeInformation<Tuple2<K, M>> resultType) {
+ this.gatherFunction = gatherFunction;
+ this.resultType = resultType;
+ }
+
+ @Override
+ public Tuple2<K, M> map(Tuple2<K, Neighbor<VV, EV>> neighborTuple) {
+ M result = this.gatherFunction.gather(neighborTuple.f1);
+ return new Tuple2<K, M>(neighborTuple.f0, result);
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
+ this.gatherFunction.init(getIterationRuntimeContext());
+ }
+ this.gatherFunction.preSuperstep();
+ }
+
+ @Override
+ public void close() throws Exception {
+ this.gatherFunction.postSuperstep();
+ }
+
+ @Override
+ public TypeInformation<Tuple2<K, M>> getProducedType() {
+ return this.resultType;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class SumUdf<K, VV, EV, M> extends RichReduceFunction<Tuple2<K, M>>
+ implements ResultTypeQueryable<Tuple2<K, M>>{
+
+ private final SumFunction<VV, EV, M> sumFunction;
+ private transient TypeInformation<Tuple2<K, M>> resultType;
+
+ private SumUdf(SumFunction<VV, EV, M> sumFunction, TypeInformation<Tuple2<K, M>> resultType) {
+ this.sumFunction = sumFunction;
+ this.resultType = resultType;
+ }
+
+ @Override
+ public Tuple2<K, M> reduce(Tuple2<K, M> arg0, Tuple2<K, M> arg1) throws Exception {
+ K key = arg0.f0;
+ M result = this.sumFunction.sum(arg0.f1, arg1.f1);
+ return new Tuple2<K, M>(key, result);
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
+ this.sumFunction.init(getIterationRuntimeContext());
+ }
+ this.sumFunction.preSuperstep();
+ }
+
+ @Override
+ public void close() throws Exception {
+ this.sumFunction.postSuperstep();
+ }
+
+ @Override
+ public TypeInformation<Tuple2<K, M>> getProducedType() {
+ return this.resultType;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class ApplyUdf<K, VV, EV, M> extends RichFlatJoinFunction<Tuple2<K, M>,
+ Vertex<K, VV>, Vertex<K, VV>> implements ResultTypeQueryable<Vertex<K, VV>> {
+
+ private final ApplyFunction<K, VV, M> applyFunction;
+ private transient TypeInformation<Vertex<K, VV>> resultType;
+
+ private ApplyUdf(ApplyFunction<K, VV, M> applyFunction, TypeInformation<Vertex<K, VV>> resultType) {
+ this.applyFunction = applyFunction;
+ this.resultType = resultType;
+ }
+
+ @Override
+ public void join(Tuple2<K, M> newValue, final Vertex<K, VV> currentValue, final Collector<Vertex<K, VV>> out) throws Exception {
+
+ this.applyFunction.setOutput(currentValue, out);
+ this.applyFunction.apply(newValue.f1, currentValue.getValue());
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
+ this.applyFunction.init(getIterationRuntimeContext());
+ }
+ this.applyFunction.preSuperstep();
+ }
+
+ @Override
+ public void close() throws Exception {
+ this.applyFunction.postSuperstep();
+ }
+
+ @Override
+ public TypeInformation<Vertex<K, VV>> getProducedType() {
+ return this.resultType;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ @ForwardedFieldsSecond("f1->f0")
+ private static final class ProjectKeyWithNeighborOUT<K, VV, EV> implements FlatJoinFunction<
+ Vertex<K, VV>, Edge<K, EV>, Tuple2<K, Neighbor<VV, EV>>> {
+
+ public void join(Vertex<K, VV> vertex, Edge<K, EV> edge, Collector<Tuple2<K, Neighbor<VV, EV>>> out) {
+ out.collect(new Tuple2<K, Neighbor<VV, EV>>(
+ edge.getTarget(), new Neighbor<VV, EV>(vertex.getValue(), edge.getValue())));
+ }
+ }
+
+ @SuppressWarnings("serial")
+ @ForwardedFieldsSecond({"f0"})
+ private static final class ProjectKeyWithNeighborIN<K, VV, EV> implements FlatJoinFunction<
+ Vertex<K, VV>, Edge<K, EV>, Tuple2<K, Neighbor<VV, EV>>> {
+
+ public void join(Vertex<K, VV> vertex, Edge<K, EV> edge, Collector<Tuple2<K, Neighbor<VV, EV>>> out) {
+ out.collect(new Tuple2<K, Neighbor<VV, EV>>(
+ edge.getSource(), new Neighbor<VV, EV>(vertex.getValue(), edge.getValue())));
+ }
+ }
+
+
+
+
+ /**
+ * Configures this gather-sum-apply iteration with the provided parameters.
+ *
+ * @param parameters the configuration parameters
+ */
+ public void configure(GSAConfiguration parameters) {
+ this.configuration = parameters;
+ }
+
+ /**
+ * @return the configuration parameters of this gather-sum-apply iteration
+ */
+ public GSAConfiguration getIterationConfiguration() {
+ return this.configuration;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java
new file mode 100755
index 0000000..7fa1ed2
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.gsa;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ * This class represents a <sourceVertex, edge> pair
+ * This is a wrapper around Tuple2<VV, EV> for convenience in the GatherFunction
+ * @param <VV> the vertex value type
+ * @param <EV> the edge value type
+ */
+@SuppressWarnings("serial")
+public class Neighbor<VV, EV> extends Tuple2<VV, EV> {
+
+ public Neighbor() {}
+
+ public Neighbor(VV neighborValue, EV edgeValue) {
+ super(neighborValue, edgeValue);
+ }
+
+ public VV getNeighborValue() {
+ return this.f0;
+ }
+
+ public EV getEdgeValue() {
+ return this.f1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
new file mode 100755
index 0000000..f27e275
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.gsa;
+
+import org.apache.flink.api.common.aggregators.Aggregator;
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.types.Value;
+
+import java.io.Serializable;
+import java.util.Collection;
+
+/**
+ * The base class for the second step of a {@link GatherSumApplyIteration}.
+ *
+ * @param <VV> the vertex value type
+ * @param <EV> the edge value type
+ * @param <M> the output type
+ */
+@SuppressWarnings("serial")
+public abstract class SumFunction<VV, EV, M> implements Serializable {
+
+ // --------------------------------------------------------------------------------------------
+ // Attribute that allows access to the total number of vertices inside an iteration.
+ // --------------------------------------------------------------------------------------------
+
+ private long numberOfVertices = -1L;
+
+ /**
+ * Retrieves the number of vertices in the graph.
+ * @return the number of vertices if the {@link IterationConfigurationion#setOptNumVertices(boolean)}
+ * option has been set; -1 otherwise.
+ */
+ public long getNumberOfVertices() {
+ return numberOfVertices;
+ }
+
+ void setNumberOfVertices(long numberOfVertices) {
+ this.numberOfVertices = numberOfVertices;
+ }
+
+ //---------------------------------------------------------------------------------------------
+ /**
+ * This method is invoked once per superstep, after the {@link GatherFunction}
+ * in a {@link GatherSumApplyIteration}.
+ * It combines the partial values produced by {@link GatherFunction#gather(Neighbor)}
+ * in pairs, until a single value has been computed.
+ *
+ * @param arg0 the first partial value.
+ * @param arg1 the second partial value.
+ * @return the combined value.
+ */
+ public abstract M sum(M arg0, M arg1);
+
+ /**
+ * This method is executed once per superstep before the vertex update function is invoked for each vertex.
+ *
+ * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail.
+ */
+ public void preSuperstep() {}
+
+ /**
+ * This method is executed once per superstep after the vertex update function has been invoked for each vertex.
+ *
+ * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail.
+ */
+ public void postSuperstep() {}
+
+ /**
+ * Gets the number of the superstep, starting at <tt>1</tt>.
+ *
+ * @return The number of the current superstep.
+ */
+ public int getSuperstepNumber() {
+ return this.runtimeContext.getSuperstepNumber();
+ }
+
+ /**
+ * Gets the iteration aggregator registered under the given name. The iteration aggregator combines
+ * all aggregates globally once per superstep and makes them available in the next superstep.
+ *
+ * @param name The name of the aggregator.
+ * @return The aggregator registered under this name, or null, if no aggregator was registered.
+ */
+ public <T extends Aggregator<?>> T getIterationAggregator(String name) {
+ return this.runtimeContext.<T>getIterationAggregator(name);
+ }
+
+ /**
+ * Get the aggregated value that an aggregator computed in the previous iteration.
+ *
+ * @param name The name of the aggregator.
+ * @return The aggregated value of the previous iteration.
+ */
+ public <T extends Value> T getPreviousIterationAggregate(String name) {
+ return this.runtimeContext.<T>getPreviousIterationAggregate(name);
+ }
+
+ /**
+ * Gets the broadcast data set registered under the given name. Broadcast data sets
+ * are available on all parallel instances of a function.
+ *
+ * @param name The name under which the broadcast set is registered.
+ * @return The broadcast data set.
+ */
+ public <T> Collection<T> getBroadcastSet(String name) {
+ return this.runtimeContext.<T>getBroadcastVariable(name);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Internal methods
+ // --------------------------------------------------------------------------------------------
+
+ private IterationRuntimeContext runtimeContext;
+
+ public void init(IterationRuntimeContext iterationRuntimeContext) {
+ this.runtimeContext = iterationRuntimeContext;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java
new file mode 100644
index 0000000..0dd39fc
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * Community Detection Algorithm.
+ *
+ * This implementation expects Long Vertex values and labels. The Vertex values of the input Graph provide the initial label assignments.
+ *
+ * Initially, each vertex is assigned a tuple formed of its own initial value along with a score equal to 1.0.
+ * The vertices propagate their labels and max scores in iterations, each time adopting the label with the
+ * highest score from the list of received messages. The chosen label is afterwards re-scored using the fraction
+ * delta/the superstep number. Delta is passed as a parameter and has 0.5 as a default value.
+ *
+ * The algorithm converges when vertices no longer update their value or when the maximum number of iterations
+ * is reached.
+ *
+ * @param <K> the Vertex ID type
+ *
+ * @see <a href="http://arxiv.org/pdf/0808.2633.pdf">article explaining the algorithm in detail</a>
+ */
+public class CommunityDetection<K> implements GraphAlgorithm<K, Long, Double, Graph<K, Long, Double>> {
+
+ private Integer maxIterations;
+
+ private Double delta;
+
+ public CommunityDetection(Integer maxIterations, Double delta) {
+
+ this.maxIterations = maxIterations;
+ this.delta = delta;
+ }
+
+ @Override
+ public Graph<K, Long, Double> run(Graph<K, Long, Double> graph) {
+
+ DataSet<Vertex<K, Tuple2<Long, Double>>> initializedVertices = graph.getVertices()
+ .map(new AddScoreToVertexValuesMapper<K>());
+
+ Graph<K, Tuple2<Long, Double>, Double> graphWithScoredVertices =
+ Graph.fromDataSet(initializedVertices, graph.getEdges(), graph.getContext()).getUndirected();
+
+ return graphWithScoredVertices.runVertexCentricIteration(new VertexLabelUpdater<K>(delta),
+ new LabelMessenger<K>(), maxIterations)
+ .mapVertices(new RemoveScoreFromVertexValuesMapper<K>());
+ }
+
+ @SuppressWarnings("serial")
+ public static final class VertexLabelUpdater<K> extends VertexUpdateFunction<
+ K, Tuple2<Long, Double>, Tuple2<Long, Double>> {
+
+ private Double delta;
+
+ public VertexLabelUpdater(Double delta) {
+ this.delta = delta;
+ }
+
+ @Override
+ public void updateVertex(Vertex<K, Tuple2<Long, Double>> vertex,
+ MessageIterator<Tuple2<Long, Double>> inMessages) throws Exception {
+
+ // we would like these two maps to be ordered
+ Map<Long, Double> receivedLabelsWithScores = new TreeMap<Long, Double>();
+ Map<Long, Double> labelsWithHighestScore = new TreeMap<Long, Double>();
+
+ for (Tuple2<Long, Double> message : inMessages) {
+ // split the message into received label and score
+ Long receivedLabel = message.f0;
+ Double receivedScore = message.f1;
+
+ // if the label was received before
+ if (receivedLabelsWithScores.containsKey(receivedLabel)) {
+ Double newScore = receivedScore + receivedLabelsWithScores.get(receivedLabel);
+ receivedLabelsWithScores.put(receivedLabel, newScore);
+ } else {
+ // first time we see the label
+ receivedLabelsWithScores.put(receivedLabel, receivedScore);
+ }
+
+ // store the labels with the highest scores
+ if (labelsWithHighestScore.containsKey(receivedLabel)) {
+ Double currentScore = labelsWithHighestScore.get(receivedLabel);
+ if (currentScore < receivedScore) {
+ // record the highest score
+ labelsWithHighestScore.put(receivedLabel, receivedScore);
+ }
+ } else {
+ // first time we see this label
+ labelsWithHighestScore.put(receivedLabel, receivedScore);
+ }
+ }
+
+ if(receivedLabelsWithScores.size() > 0) {
+ // find the label with the highest score from the ones received
+ Double maxScore = -Double.MAX_VALUE;
+ Long maxScoreLabel = vertex.getValue().f0;
+ for (Long curLabel : receivedLabelsWithScores.keySet()) {
+
+ if (receivedLabelsWithScores.get(curLabel) > maxScore) {
+ maxScore = receivedLabelsWithScores.get(curLabel);
+ maxScoreLabel = curLabel;
+ }
+ }
+
+ // find the highest score of maxScoreLabel
+ Double highestScore = labelsWithHighestScore.get(maxScoreLabel);
+ // re-score the new label
+ if (maxScoreLabel != vertex.getValue().f0) {
+ highestScore -= delta / getSuperstepNumber();
+ }
+ // else delta = 0
+ // update own label
+ setNewVertexValue(new Tuple2<Long, Double>(maxScoreLabel, highestScore));
+ }
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public static final class LabelMessenger<K> extends MessagingFunction<K, Tuple2<Long, Double>,
+ Tuple2<Long, Double>, Double> {
+
+ @Override
+ public void sendMessages(Vertex<K, Tuple2<Long, Double>> vertex) throws Exception {
+
+ for(Edge<K, Double> edge : getEdges()) {
+ sendMessageTo(edge.getTarget(), new Tuple2<Long, Double>(vertex.getValue().f0,
+ vertex.getValue().f1 * edge.getValue()));
+ }
+ }
+ }
+
+ @SuppressWarnings("serial")
+ @ForwardedFields("f0")
+ public static final class AddScoreToVertexValuesMapper<K> implements MapFunction<
+ Vertex<K, Long>, Vertex<K, Tuple2<Long, Double>>> {
+
+ public Vertex<K, Tuple2<Long, Double>> map(Vertex<K, Long> vertex) {
+ return new Vertex<K, Tuple2<Long, Double>>(
+ vertex.getId(), new Tuple2<Long, Double>(vertex.getValue(), 1.0));
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public static final class RemoveScoreFromVertexValuesMapper<K> implements MapFunction<
+ Vertex<K, Tuple2<Long, Double>>, Long> {
+
+ @Override
+ public Long map(Vertex<K, Tuple2<Long, Double>> vertex) throws Exception {
+ return vertex.getValue().f0;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
new file mode 100644
index 0000000..ed853fe
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.graph.utils.NullValueEdgeMapper;
+import org.apache.flink.types.NullValue;
+
+/**
+ * A vertex-centric implementation of the Connected Components algorithm.
+ *
+ * This implementation assumes that the vertices of the input Graph are initialized with unique, Long component IDs.
+ * The vertices propagate their current component ID in iterations, each time adopting a new value from the received neighbor IDs,
+ * provided that the value is less than the current minimum.
+ *
+ * The algorithm converges when vertices no longer update their value or when the maximum number of iterations
+ * is reached.
+ *
+ * The result is a DataSet of vertices, where the vertex value corresponds to the assigned component ID.
+ *
+ * @see {@link org.apache.flink.graph.library.GSAConnectedComponents}
+ */
+@SuppressWarnings("serial")
+public class ConnectedComponents<K, EV> implements GraphAlgorithm<K, Long, EV, DataSet<Vertex<K, Long>>> {
+
+ private Integer maxIterations;
+
+ public ConnectedComponents(Integer maxIterations) {
+ this.maxIterations = maxIterations;
+ }
+
+ @Override
+ public DataSet<Vertex<K, Long>> run(Graph<K, Long, EV> graph) throws Exception {
+
+ Graph<K, Long, NullValue> undirectedGraph = graph.mapEdges(new NullValueEdgeMapper<K, EV>())
+ .getUndirected();
+
+ // initialize vertex values and run the Vertex Centric Iteration
+ return undirectedGraph.runVertexCentricIteration(
+ new CCUpdater<K>(), new CCMessenger<K>(), maxIterations)
+ .getVertices();
+ }
+
+ /**
+ * Updates the value of a vertex by picking the minimum neighbor ID out of all the incoming messages.
+ */
+ public static final class CCUpdater<K> extends VertexUpdateFunction<K, Long, Long> {
+
+ @Override
+ public void updateVertex(Vertex<K, Long> vertex, MessageIterator<Long> messages) throws Exception {
+ long min = Long.MAX_VALUE;
+
+ for (long msg : messages) {
+ min = Math.min(min, msg);
+ }
+
+ // update vertex value, if new minimum
+ if (min < vertex.getValue()) {
+ setNewVertexValue(min);
+ }
+ }
+ }
+
+ /**
+ * Distributes the minimum ID associated with a given vertex among all the target vertices.
+ */
+ public static final class CCMessenger<K> extends MessagingFunction<K, Long, Long, NullValue> {
+
+ @Override
+ public void sendMessages(Vertex<K, Long> vertex) throws Exception {
+ // send current minimum to neighbors
+ sendMessageToAllNeighbors(vertex.getValue());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
new file mode 100755
index 0000000..77bc2cf
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.gsa.ApplyFunction;
+import org.apache.flink.graph.gsa.GatherFunction;
+import org.apache.flink.graph.gsa.SumFunction;
+import org.apache.flink.graph.gsa.Neighbor;
+import org.apache.flink.graph.utils.NullValueEdgeMapper;
+import org.apache.flink.types.NullValue;
+
+/**
+ * This is an implementation of the Connected Components algorithm, using a gather-sum-apply iteration.
+ * This implementation assumes that the vertices of the input Graph are initialized with unique, Long component IDs.
+ * The result is a DataSet of vertices, where the vertex value corresponds to the assigned component ID.
+ *
+ * @see {@link org.apache.flink.graph.library.ConnectedComponents}
+ */
+public class GSAConnectedComponents<K, EV> implements GraphAlgorithm<K, Long, EV, DataSet<Vertex<K, Long>>> {
+
+ private Integer maxIterations;
+
+ public GSAConnectedComponents(Integer maxIterations) {
+ this.maxIterations = maxIterations;
+ }
+
+ @Override
+ public DataSet<Vertex<K, Long>> run(Graph<K, Long, EV> graph) throws Exception {
+
+ Graph<K, Long, NullValue> undirectedGraph = graph.mapEdges(new NullValueEdgeMapper<K, EV>())
+ .getUndirected();
+
+ // initialize vertex values and run the Vertex Centric Iteration
+ return undirectedGraph.runGatherSumApplyIteration(
+ new GatherNeighborIds(), new SelectMinId(), new UpdateComponentId<K>(),
+ maxIterations).getVertices();
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Connected Components UDFs
+ // --------------------------------------------------------------------------------------------
+
+ @SuppressWarnings("serial")
+ private static final class GatherNeighborIds extends GatherFunction<Long, NullValue, Long> {
+
+ public Long gather(Neighbor<Long, NullValue> neighbor) {
+ return neighbor.getNeighborValue();
+ }
+ };
+
+ @SuppressWarnings("serial")
+ private static final class SelectMinId extends SumFunction<Long, NullValue, Long> {
+
+ public Long sum(Long newValue, Long currentValue) {
+ return Math.min(newValue, currentValue);
+ }
+ };
+
+ @SuppressWarnings("serial")
+ private static final class UpdateComponentId<K> extends ApplyFunction<K, Long, Long> {
+
+ public void apply(Long summedValue, Long origValue) {
+ if (summedValue < origValue) {
+ setResult(summedValue);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
new file mode 100644
index 0000000..df3e89a
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.gsa.ApplyFunction;
+import org.apache.flink.graph.gsa.GatherFunction;
+import org.apache.flink.graph.gsa.Neighbor;
+import org.apache.flink.graph.gsa.SumFunction;
+
+/**
+ * This is an implementation of a simple PageRank algorithm, using a gather-sum-apply iteration.
+ * The user can define the damping factor and the maximum number of iterations.
+ * If the number of vertices of the input graph is known, it should be provided as a parameter
+ * to speed up computation. Otherwise, the algorithm will first execute a job to count the vertices.
+ *
+ * The implementation assumes that each page has at least one incoming and one outgoing link.
+ */
+public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
+
+ private double beta;
+ private int maxIterations;
+ private long numberOfVertices;
+
+ /**
+ * @param beta the damping factor
+ * @param maxIterations the maximum number of iterations
+ */
+ public GSAPageRank(double beta, int maxIterations) {
+ this.beta = beta;
+ this.maxIterations = maxIterations;
+ }
+
+ public GSAPageRank(double beta, long numVertices, int maxIterations) {
+ this.beta = beta;
+ this.numberOfVertices = numVertices;
+ this.maxIterations = maxIterations;
+ }
+
+ @Override
+ public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) throws Exception {
+
+ if (numberOfVertices == 0) {
+ numberOfVertices = network.numberOfVertices();
+ }
+
+ DataSet<Tuple2<K, Long>> vertexOutDegrees = network.outDegrees();
+
+ Graph<K, Double, Double> networkWithWeights = network
+ .joinWithEdgesOnSource(vertexOutDegrees, new InitWeightsMapper());
+
+ return networkWithWeights.runGatherSumApplyIteration(new GatherRanks(numberOfVertices), new SumRanks(),
+ new UpdateRanks<K>(beta, numberOfVertices), maxIterations)
+ .getVertices();
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Page Rank UDFs
+ // --------------------------------------------------------------------------------------------
+
+ @SuppressWarnings("serial")
+ private static final class GatherRanks extends GatherFunction<Double, Double, Double> {
+
+ long numberOfVertices;
+
+ public GatherRanks(long numberOfVertices) {
+ this.numberOfVertices = numberOfVertices;
+ }
+
+ @Override
+ public Double gather(Neighbor<Double, Double> neighbor) {
+ double neighborRank = neighbor.getNeighborValue();
+
+ if(getSuperstepNumber() == 1) {
+ neighborRank = 1.0 / numberOfVertices;
+ }
+
+ return neighborRank * neighbor.getEdgeValue();
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class SumRanks extends SumFunction<Double, Double, Double> {
+
+ @Override
+ public Double sum(Double newValue, Double currentValue) {
+ return newValue + currentValue;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class UpdateRanks<K> extends ApplyFunction<K, Double, Double> {
+
+ private final double beta;
+ private final long numVertices;
+
+ public UpdateRanks(double beta, long numberOfVertices) {
+ this.beta = beta;
+ this.numVertices = numberOfVertices;
+ }
+
+ @Override
+ public void apply(Double rankSum, Double currentValue) {
+ setResult((1-beta)/numVertices + beta * rankSum);
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class InitWeightsMapper implements MapFunction<Tuple2<Double, Long>, Double> {
+ public Double map(Tuple2<Double, Long> value) {
+ return value.f0 / value.f1;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
new file mode 100755
index 0000000..5a76072
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.gsa.ApplyFunction;
+import org.apache.flink.graph.gsa.GatherFunction;
+import org.apache.flink.graph.gsa.SumFunction;
+import org.apache.flink.graph.gsa.Neighbor;
+
+/**
+ * This is an implementation of the Single Source Shortest Paths algorithm, using a gather-sum-apply iteration
+ */
+public class GSASingleSourceShortestPaths<K> implements
+ GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
+
+ private final K srcVertexId;
+ private final Integer maxIterations;
+
+ public GSASingleSourceShortestPaths(K srcVertexId, Integer maxIterations) {
+ this.srcVertexId = srcVertexId;
+ this.maxIterations = maxIterations;
+ }
+
+ @Override
+ public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> input) {
+
+ return input.mapVertices(new InitVerticesMapper<K>(srcVertexId))
+ .runGatherSumApplyIteration(new CalculateDistances(), new ChooseMinDistance(),
+ new UpdateDistance<K>(), maxIterations)
+ .getVertices();
+ }
+
+ @SuppressWarnings("serial")
+ public static final class InitVerticesMapper<K> implements MapFunction<Vertex<K, Double>, Double> {
+
+ private K srcVertexId;
+
+ public InitVerticesMapper(K srcId) {
+ this.srcVertexId = srcId;
+ }
+
+ public Double map(Vertex<K, Double> value) {
+ if (value.f0.equals(srcVertexId)) {
+ return 0.0;
+ } else {
+ return Double.MAX_VALUE;
+ }
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Single Source Shortest Path UDFs
+ // --------------------------------------------------------------------------------------------
+
+ @SuppressWarnings("serial")
+ private static final class CalculateDistances extends GatherFunction<Double, Double, Double> {
+
+ public Double gather(Neighbor<Double, Double> neighbor) {
+ return neighbor.getNeighborValue() + neighbor.getEdgeValue();
+ }
+ };
+
+ @SuppressWarnings("serial")
+ private static final class ChooseMinDistance extends SumFunction<Double, Double, Double> {
+
+ public Double sum(Double newValue, Double currentValue) {
+ return Math.min(newValue, currentValue);
+ }
+ };
+
+ @SuppressWarnings("serial")
+ private static final class UpdateDistance<K> extends ApplyFunction<K, Double, Double> {
+
+ public void apply(Double newDistance, Double oldDistance) {
+ if (newDistance < oldDistance) {
+ setResult(newDistance);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
new file mode 100644
index 0000000..76d170d
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.ReduceNeighborsFunction;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Triplet;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.types.NullValue;
+
+import java.util.TreeMap;
+
+/**
+ * Triangle Count Algorithm.
+ *
+ * This algorithm operates in three phases. First, vertices select neighbors with id greater than theirs
+ * and send messages to them. Each received message is then propagated to neighbors with higher id.
+ * Finally, if a node encounters the target id in the list of received messages, it increments the number
+ * of triangles found.
+ *
+ * This implementation is non - iterative.
+ *
+ * The algorithm takes an undirected, unweighted graph as input and outputs a DataSet
+ * which contains a single integer representing the number of triangles.
+ */
+public class GSATriangleCount<K extends Comparable<K>, VV, EV> implements
+ GraphAlgorithm<K, VV, EV, DataSet<Integer>> {
+
+ @SuppressWarnings("serial")
+ @Override
+ public DataSet<Integer> run(Graph<K, VV, EV> input) throws Exception {
+
+ ExecutionEnvironment env = input.getContext();
+
+ // order the edges so that src is always higher than trg
+ DataSet<Edge<K, NullValue>> edges = input.getEdges().map(new OrderEdges<K, EV>()).distinct();
+
+ Graph<K, TreeMap<K, Integer>, NullValue> graph = Graph.fromDataSet(edges,
+ new VertexInitializer<K>(), env);
+
+ // select neighbors with ids higher than the current vertex id
+ // Gather: a no-op in this case
+ // Sum: create the set of neighbors
+ DataSet<Tuple2<K, TreeMap<K, Integer>>> higherIdNeighbors =
+ graph.reduceOnNeighbors(new GatherHigherIdNeighbors<K>(), EdgeDirection.IN);
+
+ Graph<K, TreeMap<K, Integer>, NullValue> graphWithReinitializedVertexValues =
+ graph.mapVertices(new VertexInitializerEmptyTreeMap<K>());
+
+ // Apply: attach the computed values to the vertices
+ // joinWithVertices to update the node values
+ DataSet<Vertex<K, TreeMap<K, Integer>>> verticesWithHigherIdNeighbors =
+ graphWithReinitializedVertexValues.joinWithVertices(higherIdNeighbors, new AttachValues<K>()).getVertices();
+
+ Graph<K, TreeMap<K,Integer>, NullValue> graphWithNeighbors = Graph.fromDataSet(verticesWithHigherIdNeighbors,
+ edges, env);
+
+ // propagate each received value to neighbors with higher id
+ // Gather: a no-op in this case
+ // Sum: propagate values
+ DataSet<Tuple2<K, TreeMap<K, Integer>>> propagatedValues = graphWithNeighbors
+ .reduceOnNeighbors(new GatherHigherIdNeighbors<K>(), EdgeDirection.IN);
+
+ // Apply: attach propagated values to vertices
+ DataSet<Vertex<K, TreeMap<K, Integer>>> verticesWithPropagatedValues =
+ graphWithReinitializedVertexValues.joinWithVertices(propagatedValues, new AttachValues<K>()).getVertices();
+
+ Graph<K, TreeMap<K, Integer>, NullValue> graphWithPropagatedNeighbors =
+ Graph.fromDataSet(verticesWithPropagatedValues, graphWithNeighbors.getEdges(), env);
+
+ // Scatter: compute the number of triangles
+ DataSet<Integer> numberOfTriangles = graphWithPropagatedNeighbors.getTriplets()
+ .map(new ComputeTriangles<K>()).reduce(new ReduceFunction<Integer>() {
+
+ @Override
+ public Integer reduce(Integer first, Integer second) throws Exception {
+ return first + second;
+ }
+ });
+
+ return numberOfTriangles;
+ }
+
+ @SuppressWarnings("serial")
+ private static final class OrderEdges<K extends Comparable<K>, EV> implements
+ MapFunction<Edge<K, EV>, Edge<K, NullValue>> {
+
+ @Override
+ public Edge<K, NullValue> map(Edge<K, EV> edge) throws Exception {
+ if (edge.getSource().compareTo(edge.getTarget()) < 0) {
+ return new Edge<K, NullValue>(edge.getTarget(), edge.getSource(), NullValue.getInstance());
+ } else {
+ return new Edge<K, NullValue>(edge.getSource(), edge.getTarget(), NullValue.getInstance());
+ }
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class VertexInitializer<K> implements MapFunction<K, TreeMap<K, Integer>> {
+
+ @Override
+ public TreeMap<K, Integer> map(K value) throws Exception {
+ TreeMap<K, Integer> neighbors = new TreeMap<K, Integer>();
+ neighbors.put(value, 1);
+
+ return neighbors;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class VertexInitializerEmptyTreeMap<K> implements
+ MapFunction<Vertex<K, TreeMap<K, Integer>>, TreeMap<K, Integer>> {
+
+ @Override
+ public TreeMap<K, Integer> map(Vertex<K, TreeMap<K, Integer>> vertex) throws Exception {
+ return new TreeMap<K, Integer>();
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class AttachValues<K> implements MapFunction<Tuple2<TreeMap<K, Integer>,
+ TreeMap<K, Integer>>, TreeMap<K, Integer>> {
+
+ @Override
+ public TreeMap<K, Integer> map(Tuple2<TreeMap<K, Integer>, TreeMap<K, Integer>> tuple2) throws Exception {
+ return tuple2.f1;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class GatherHigherIdNeighbors<K> implements
+ ReduceNeighborsFunction<TreeMap<K,Integer>> {
+
+ @Override
+ public TreeMap<K, Integer> reduceNeighbors(TreeMap<K,Integer> first, TreeMap<K,Integer> second) {
+ for (K key : second.keySet()) {
+ Integer value = first.get(key);
+ if (value != null) {
+ first.put(key, value + second.get(key));
+ } else {
+ first.put(key, second.get(key));
+ }
+ }
+ return first;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class ComputeTriangles<K> implements MapFunction<Triplet<K, TreeMap<K, Integer>, NullValue>,
+ Integer> {
+
+ @Override
+ public Integer map(Triplet<K, TreeMap<K, Integer>, NullValue> triplet) throws Exception {
+
+ Vertex<K, TreeMap<K, Integer>> srcVertex = triplet.getSrcVertex();
+ Vertex<K, TreeMap<K, Integer>> trgVertex = triplet.getTrgVertex();
+ int triangles = 0;
+
+ if(trgVertex.getValue().get(srcVertex.getId()) != null) {
+ triangles = trgVertex.getValue().get(srcVertex.getId());
+ }
+ return triangles;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
new file mode 100644
index 0000000..82dfee7
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.graph.utils.NullValueEdgeMapper;
+import org.apache.flink.types.NullValue;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * An implementation of the label propagation algorithm. The iterative algorithm
+ * detects communities by propagating labels. In each iteration, a vertex adopts
+ * the label that is most frequent among its neighbors' labels. Labels are
+ * represented by Longs and we assume a total ordering among them, in order to
+ * break ties. The algorithm converges when no vertex changes its value or the
+ * maximum number of iterations have been reached. Note that different
+ * initializations might lead to different results.
+ *
+ */
+@SuppressWarnings("serial")
+
+public class LabelPropagation<K extends Comparable<K>, EV> implements GraphAlgorithm<K, Long, EV,
+ DataSet<Vertex<K, Long>>> {
+
+ private final int maxIterations;
+
+ public LabelPropagation(int maxIterations) {
+ this.maxIterations = maxIterations;
+ }
+
+ @Override
+ public DataSet<Vertex<K, Long>> run(Graph<K, Long, EV> input) {
+
+ // iteratively adopt the most frequent label among the neighbors
+ // of each vertex
+ return input.mapEdges(new NullValueEdgeMapper<K, EV>()).runVertexCentricIteration(new UpdateVertexLabel<K>(), new SendNewLabelToNeighbors<K>(),
+ maxIterations).getVertices();
+ }
+
+ /**
+ * Function that updates the value of a vertex by adopting the most frequent
+ * label among its in-neighbors
+ */
+ public static final class UpdateVertexLabel<K> extends VertexUpdateFunction<K, Long, Long> {
+
+ public void updateVertex(Vertex<K, Long> vertex,
+ MessageIterator<Long> inMessages) {
+ Map<Long, Long> labelsWithFrequencies = new HashMap<Long, Long>();
+
+ long maxFrequency = 1;
+ long mostFrequentLabel = vertex.getValue();
+
+ // store the labels with their frequencies
+ for (Long msg : inMessages) {
+ if (labelsWithFrequencies.containsKey(msg)) {
+ long currentFreq = labelsWithFrequencies.get(msg);
+ labelsWithFrequencies.put(msg, currentFreq + 1);
+ } else {
+ labelsWithFrequencies.put(msg, 1L);
+ }
+ }
+ // select the most frequent label: if two or more labels have the
+ // same frequency,
+ // the node adopts the label with the highest value
+ for (Entry<Long, Long> entry : labelsWithFrequencies.entrySet()) {
+ if (entry.getValue() == maxFrequency) {
+ // check the label value to break ties
+ if (entry.getKey() > mostFrequentLabel) {
+ mostFrequentLabel = entry.getKey();
+ }
+ } else if (entry.getValue() > maxFrequency) {
+ maxFrequency = entry.getValue();
+ mostFrequentLabel = entry.getKey();
+ }
+ }
+
+ // set the new vertex value
+ setNewVertexValue(mostFrequentLabel);
+ }
+ }
+
+ /**
+ * Sends the vertex label to all out-neighbors
+ */
+ public static final class SendNewLabelToNeighbors<K> extends MessagingFunction<K, Long, Long, NullValue> {
+
+ public void sendMessages(Vertex<K, Long> vertex) {
+ sendMessageToAllNeighbors(vertex.getValue());
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
new file mode 100644
index 0000000..8193dba
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+
+/**
+ * This is an implementation of a simple PageRank algorithm, using a vertex-centric iteration.
+ * The user can define the damping factor and the maximum number of iterations.
+ * If the number of vertices of the input graph is known, it should be provided as a parameter
+ * to speed up computation. Otherwise, the algorithm will first execute a job to count the vertices.
+ *
+ * The implementation assumes that each page has at least one incoming and one outgoing link.
+ */
+public class PageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
+
+ private double beta;
+ private int maxIterations;
+ private long numberOfVertices;
+
+ /**
+ * @param beta the damping factor
+ * @param maxIterations the maximum number of iterations
+ */
+ public PageRank(double beta, int maxIterations) {
+ this.beta = beta;
+ this.maxIterations = maxIterations;
+ this.numberOfVertices = 0;
+ }
+
+ /**
+ * @param beta the damping factor
+ * @param maxIterations the maximum number of iterations
+ * @param numVertices the number of vertices in the input
+ */
+ public PageRank(double beta, long numVertices, int maxIterations) {
+ this.beta = beta;
+ this.maxIterations = maxIterations;
+ this.numberOfVertices = numVertices;
+ }
+
+ @Override
+ public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) throws Exception {
+
+ if (numberOfVertices == 0) {
+ numberOfVertices = network.numberOfVertices();
+ }
+
+ DataSet<Tuple2<K, Long>> vertexOutDegrees = network.outDegrees();
+
+ Graph<K, Double, Double> networkWithWeights = network
+ .joinWithEdgesOnSource(vertexOutDegrees, new InitWeightsMapper());
+
+ return networkWithWeights.runVertexCentricIteration(new VertexRankUpdater<K>(beta, numberOfVertices),
+ new RankMessenger<K>(numberOfVertices), maxIterations)
+ .getVertices();
+ }
+
+ /**
+ * Function that updates the rank of a vertex by summing up the partial
+ * ranks from all incoming messages and then applying the dampening formula.
+ */
+ @SuppressWarnings("serial")
+ public static final class VertexRankUpdater<K> extends VertexUpdateFunction<K, Double, Double> {
+
+ private final double beta;
+ private final long numVertices;
+
+ public VertexRankUpdater(double beta, long numberOfVertices) {
+ this.beta = beta;
+ this.numVertices = numberOfVertices;
+ }
+
+ @Override
+ public void updateVertex(Vertex<K, Double> vertex, MessageIterator<Double> inMessages) {
+ double rankSum = 0.0;
+ for (double msg : inMessages) {
+ rankSum += msg;
+ }
+
+ // apply the dampening factor / random jump
+ double newRank = (beta * rankSum) + (1 - beta) / numVertices;
+ setNewVertexValue(newRank);
+ }
+ }
+
+ /**
+ * Distributes the rank of a vertex among all target vertices according to
+ * the transition probability, which is associated with an edge as the edge
+ * value.
+ */
+ @SuppressWarnings("serial")
+ public static final class RankMessenger<K> extends MessagingFunction<K, Double, Double, Double> {
+
+ private final long numVertices;
+
+ public RankMessenger(long numberOfVertices) {
+ this.numVertices = numberOfVertices;
+ }
+
+ @Override
+ public void sendMessages(Vertex<K, Double> vertex) {
+ if (getSuperstepNumber() == 1) {
+ // initialize vertex ranks
+ vertex.setValue(new Double(1.0 / numVertices));
+ }
+
+ for (Edge<K, Double> edge : getEdges()) {
+ sendMessageTo(edge.getTarget(), vertex.getValue() * edge.getValue());
+ }
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class InitWeightsMapper implements MapFunction<Tuple2<Double, Long>, Double> {
+ public Double map(Tuple2<Double, Long> value) {
+ return value.f0 / value.f1;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
new file mode 100644
index 0000000..60c4c17
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+
+/**
+ * This is an implementation of the Single-Source-Shortest Paths algorithm, using a vertex-centric iteration.
+ */
+@SuppressWarnings("serial")
+public class SingleSourceShortestPaths<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
+
+ private final K srcVertexId;
+ private final Integer maxIterations;
+
+ public SingleSourceShortestPaths(K srcVertexId, Integer maxIterations) {
+ this.srcVertexId = srcVertexId;
+ this.maxIterations = maxIterations;
+ }
+
+ @Override
+ public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> input) {
+
+ return input.mapVertices(new InitVerticesMapper<K>(srcVertexId))
+ .runVertexCentricIteration(new VertexDistanceUpdater<K>(), new MinDistanceMessenger<K>(),
+ maxIterations).getVertices();
+ }
+
+ public static final class InitVerticesMapper<K> implements MapFunction<Vertex<K, Double>, Double> {
+
+ private K srcVertexId;
+
+ public InitVerticesMapper(K srcId) {
+ this.srcVertexId = srcId;
+ }
+
+ public Double map(Vertex<K, Double> value) {
+ if (value.f0.equals(srcVertexId)) {
+ return 0.0;
+ } else {
+ return Double.MAX_VALUE;
+ }
+ }
+ }
+
+ /**
+ * Function that updates the value of a vertex by picking the minimum
+ * distance from all incoming messages.
+ *
+ * @param <K>
+ */
+ public static final class VertexDistanceUpdater<K> extends VertexUpdateFunction<K, Double, Double> {
+
+ @Override
+ public void updateVertex(Vertex<K, Double> vertex,
+ MessageIterator<Double> inMessages) {
+
+ Double minDistance = Double.MAX_VALUE;
+
+ for (double msg : inMessages) {
+ if (msg < minDistance) {
+ minDistance = msg;
+ }
+ }
+
+ if (vertex.getValue() > minDistance) {
+ setNewVertexValue(minDistance);
+ }
+ }
+ }
+
+ /**
+ * Distributes the minimum distance associated with a given vertex among all
+ * the target vertices summed up with the edge's value.
+ *
+ * @param <K>
+ */
+ public static final class MinDistanceMessenger<K> extends MessagingFunction<K, Double, Double, Double> {
+
+ @Override
+ public void sendMessages(Vertex<K, Double> vertex)
+ throws Exception {
+ for (Edge<K, Double> edge : getEdges()) {
+ sendMessageTo(edge.getTarget(), vertex.getValue() + edge.getValue());
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessageIterator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessageIterator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessageIterator.java
new file mode 100644
index 0000000..d6fdc8a
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessageIterator.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.spargel;
+
+import java.util.Iterator;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ * An iterator that returns messages. The iterator is {@link java.lang.Iterable} at the same time to support
+ * the <i>foreach</i> syntax.
+ */
+public final class MessageIterator<Message> implements Iterator<Message>, Iterable<Message>, java.io.Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private transient Iterator<Tuple2<?, Message>> source;
+
+
+ final void setSource(Iterator<Tuple2<?, Message>> source) {
+ this.source = source;
+ }
+
+ @Override
+ public final boolean hasNext() {
+ return this.source.hasNext();
+ }
+
+ @Override
+ public final Message next() {
+ return this.source.next().f1;
+ }
+
+ @Override
+ public final void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Iterator<Message> iterator() {
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
new file mode 100644
index 0000000..4245c24
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.spargel;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.flink.api.common.aggregators.Aggregator;
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.Collector;
+
+/**
+ * The base class for functions that produce messages between vertices as a part of a {@link VertexCentricIteration}.
+ *
+ * @param <K> The type of the vertex key (the vertex identifier).
+ * @param <VV> The type of the vertex value (the state of the vertex).
+ * @param <Message> The type of the message sent between vertices along the edges.
+ * @param <EV> The type of the values that are associated with the edges.
+ */
+public abstract class MessagingFunction<K, VV, Message, EV> implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ // --------------------------------------------------------------------------------------------
+ // Attributes that allow vertices to access their in/out degrees and the total number of vertices
+ // inside an iteration.
+ // --------------------------------------------------------------------------------------------
+
+ private long numberOfVertices = -1L;
+
+ /**
+ * Retrieves the number of vertices in the graph.
+ * @return the number of vertices if the {@link IterationConfiguration#setOptNumVertices(boolean)}
+ * option has been set; -1 otherwise.
+ */
+ public long getNumberOfVertices() {
+ return numberOfVertices;
+ }
+
+ void setNumberOfVertices(long numberOfVertices) {
+ this.numberOfVertices = numberOfVertices;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Attribute that allows the user to choose the neighborhood type(in/out/all) on which to run
+ // the vertex centric iteration.
+ // --------------------------------------------------------------------------------------------
+
+ private EdgeDirection direction;
+
+ /**
+ * Retrieves the edge direction in which messages are propagated in the vertex-centric iteration.
+ * @return the messaging {@link EdgeDirection}
+ */
+ public EdgeDirection getDirection() {
+ return direction;
+ }
+
+ void setDirection(EdgeDirection direction) {
+ this.direction = direction;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Public API Methods
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * This method is invoked once per superstep for each vertex that was changed in that superstep.
+ * It needs to produce the messages that will be received by vertices in the next superstep.
+ *
+ * @param vertex The vertex that was changed.
+ *
+ * @throws Exception The computation may throw exceptions, which causes the superstep to fail.
+ */
+ public abstract void sendMessages(Vertex<K, VV> vertex) throws Exception;
+
+ /**
+ * This method is executed one per superstep before the vertex update function is invoked for each vertex.
+ *
+ * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail.
+ */
+ public void preSuperstep() throws Exception {}
+
+ /**
+ * This method is executed one per superstep after the vertex update function has been invoked for each vertex.
+ *
+ * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail.
+ */
+ public void postSuperstep() throws Exception {}
+
+
+ /**
+ * Gets an {@link java.lang.Iterable} with all edges. This method is mutually exclusive with
+ * {@link #sendMessageToAllNeighbors(Object)} and may be called only once.
+ *
+ * @return An iterator with all outgoing edges.
+ */
+ @SuppressWarnings("unchecked")
+ public Iterable<Edge<K, EV>> getEdges() {
+ if (edgesUsed) {
+ throw new IllegalStateException("Can use either 'getEdges()' or 'sendMessageToAllTargets()' exactly once.");
+ }
+ edgesUsed = true;
+ this.edgeIterator.set((Iterator<Edge<K, EV>>) edges);
+ return this.edgeIterator;
+ }
+
+ /**
+ * Sends the given message to all vertices that are targets of an outgoing edge of the changed vertex.
+ * This method is mutually exclusive to the method {@link #getEdges()} and may be called only once.
+ *
+ * @param m The message to send.
+ */
+ public void sendMessageToAllNeighbors(Message m) {
+ if (edgesUsed) {
+ throw new IllegalStateException("Can use either 'getEdges()' or 'sendMessageToAllTargets()' exactly once.");
+ }
+
+ edgesUsed = true;
+
+ outValue.f1 = m;
+
+ while (edges.hasNext()) {
+ Tuple next = (Tuple) edges.next();
+ K k = next.getField(1);
+ outValue.f0 = k;
+ out.collect(outValue);
+ }
+ }
+
+ /**
+ * Sends the given message to the vertex identified by the given key. If the target vertex does not exist,
+ * the next superstep will cause an exception due to a non-deliverable message.
+ *
+ * @param target The key (id) of the target vertex to message.
+ * @param m The message.
+ */
+ public void sendMessageTo(K target, Message m) {
+ outValue.f0 = target;
+ outValue.f1 = m;
+ out.collect(outValue);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Gets the number of the superstep, starting at <tt>1</tt>.
+ *
+ * @return The number of the current superstep.
+ */
+ public int getSuperstepNumber() {
+ return this.runtimeContext.getSuperstepNumber();
+ }
+
+ /**
+ * Gets the iteration aggregator registered under the given name. The iteration aggregator combines
+ * all aggregates globally once per superstep and makes them available in the next superstep.
+ *
+ * @param name The name of the aggregator.
+ * @return The aggregator registered under this name, or null, if no aggregator was registered.
+ */
+ public <T extends Aggregator<?>> T getIterationAggregator(String name) {
+ return this.runtimeContext.<T>getIterationAggregator(name);
+ }
+
+ /**
+ * Get the aggregated value that an aggregator computed in the previous iteration.
+ *
+ * @param name The name of the aggregator.
+ * @return The aggregated value of the previous iteration.
+ */
+ public <T extends Value> T getPreviousIterationAggregate(String name) {
+ return this.runtimeContext.<T>getPreviousIterationAggregate(name);
+ }
+
+ /**
+ * Gets the broadcast data set registered under the given name. Broadcast data sets
+ * are available on all parallel instances of a function. They can be registered via
+ * {@link org.apache.flink.graph.spargel.VertexCentricConfiguration#addBroadcastSetForMessagingFunction(String, org.apache.flink.api.java.DataSet)}.
+ *
+ * @param name The name under which the broadcast set is registered.
+ * @return The broadcast data set.
+ */
+ public <T> Collection<T> getBroadcastSet(String name) {
+ return this.runtimeContext.<T>getBroadcastVariable(name);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // internal methods and state
+ // --------------------------------------------------------------------------------------------
+
+ private Tuple2<K, Message> outValue;
+
+ private IterationRuntimeContext runtimeContext;
+
+ private Iterator<?> edges;
+
+ private Collector<Tuple2<K, Message>> out;
+
+ private EdgesIterator<K, EV> edgeIterator;
+
+ private boolean edgesUsed;
+
+ private long inDegree = -1;
+
+ private long outDegree = -1;
+
+ void init(IterationRuntimeContext context) {
+ this.runtimeContext = context;
+ this.outValue = new Tuple2<K, Message>();
+ this.edgeIterator = new EdgesIterator<K, EV>();
+ }
+
+ void set(Iterator<?> edges, Collector<Tuple2<K, Message>> out) {
+ this.edges = edges;
+ this.out = out;
+ this.edgesUsed = false;
+ }
+
+ private static final class EdgesIterator<K, EV>
+ implements Iterator<Edge<K, EV>>, Iterable<Edge<K, EV>>
+ {
+ private Iterator<Edge<K, EV>> input;
+
+ private Edge<K, EV> edge = new Edge<K, EV>();
+
+ void set(Iterator<Edge<K, EV>> input) {
+ this.input = input;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return input.hasNext();
+ }
+
+ @Override
+ public Edge<K, EV> next() {
+ Edge<K, EV> next = input.next();
+ edge.setSource(next.f0);
+ edge.setTarget(next.f1);
+ edge.setValue(next.f2);
+ return edge;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ @Override
+ public Iterator<Edge<K, EV>> iterator() {
+ return this;
+ }
+ }
+
+ /**
+ * Retrieves the vertex in-degree (number of in-coming edges).
+ * @return The in-degree of this vertex if the {@link IterationConfiguration#setOptDegrees(boolean)}
+ * option has been set; -1 otherwise.
+ */
+ public long getInDegree() {
+ return inDegree;
+ }
+
+ void setInDegree(long inDegree) {
+ this.inDegree = inDegree;
+ }
+
+ /**
+ * Retrieve the vertex out-degree (number of out-going edges).
+ * @return The out-degree of this vertex if the {@link IterationConfiguration#setOptDegrees(boolean)}
+ * option has been set; -1 otherwise.
+ */
+ public long getOutDegree() {
+ return outDegree;
+ }
+
+ void setOutDegree(long outDegree) {
+ this.outDegree = outDegree;
+ }
+}