You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/10/09 18:05:55 UTC
[17/24] flink git commit: [FLINK-2833] [gelly] create a
flink-libraries module and move gelly there
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricConfiguration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricConfiguration.java
new file mode 100644
index 0000000..afd4ffd
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricConfiguration.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.spargel;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.IterationConfiguration;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A VertexCentricConfiguration object can be used to set the iteration name and
+ * degree of parallelism, to register aggregators and use broadcast sets in
+ * the {@link org.apache.flink.graph.spargel.VertexUpdateFunction} and {@link org.apache.flink.graph.spargel.MessagingFunction}
+ *
+ * The VertexCentricConfiguration object is passed as an argument to
+ * {@link org.apache.flink.graph.Graph#runVertexCentricIteration (
+ * org.apache.flink.graph.spargel.VertexUpdateFunction, org.apache.flink.graph.spargel.MessagingFunction, int,
+ * VertexCentricConfiguration)}.
+ */
+public class VertexCentricConfiguration extends IterationConfiguration {
+
+ /** the broadcast variables for the update function **/
+ private List<Tuple2<String, DataSet<?>>> bcVarsUpdate = new ArrayList<Tuple2<String,DataSet<?>>>();
+
+ /** the broadcast variables for the messaging function **/
+ private List<Tuple2<String, DataSet<?>>> bcVarsMessaging = new ArrayList<Tuple2<String,DataSet<?>>>();
+
+ /** flag that defines whether the degrees option is set **/
+ private boolean optDegrees = false;
+
+ /** the direction in which the messages should be sent **/
+ private EdgeDirection direction = EdgeDirection.OUT;
+
+ public VertexCentricConfiguration() {}
+
+ /**
+ * Adds a data set as a broadcast set to the messaging function.
+ *
+ * @param name The name under which the broadcast data is available in the messaging function.
+ * @param data The data set to be broadcasted.
+ */
+ public void addBroadcastSetForMessagingFunction(String name, DataSet<?> data) {
+ this.bcVarsMessaging.add(new Tuple2<String, DataSet<?>>(name, data));
+ }
+
+ /**
+ * Adds a data set as a broadcast set to the vertex update function.
+ *
+ * @param name The name under which the broadcast data is available in the vertex update function.
+ * @param data The data set to be broadcasted.
+ */
+ public void addBroadcastSetForUpdateFunction(String name, DataSet<?> data) {
+ this.bcVarsUpdate.add(new Tuple2<String, DataSet<?>>(name, data));
+ }
+
+ /**
+ * Get the broadcast variables of the VertexUpdateFunction.
+ *
+ * @return a List of Tuple2, where the first field is the broadcast variable name
+ * and the second field is the broadcast DataSet.
+ */
+ public List<Tuple2<String, DataSet<?>>> getUpdateBcastVars() {
+ return this.bcVarsUpdate;
+ }
+
+ /**
+ * Get the broadcast variables of the MessagingFunction.
+ *
+ * @return a List of Tuple2, where the first field is the broadcast variable name
+ * and the second field is the broadcast DataSet.
+ */
+ public List<Tuple2<String, DataSet<?>>> getMessagingBcastVars() {
+ return this.bcVarsMessaging;
+ }
+
+ /**
+ * Gets whether the degrees option is set.
+ * By default, the degrees option is not set.
+ *
+ * @return True, if the degree option is set, false otherwise.
+ */
+ public boolean isOptDegrees() {
+ return optDegrees;
+ }
+
+ /**
+ * Sets the degree option.
+ * By default, the degrees option is not set.
+ *
+ * @param optDegrees True, to set this option, false otherwise.
+ */
+ public void setOptDegrees(boolean optDegrees) {
+ this.optDegrees = optDegrees;
+ }
+
+ /**
+ * Gets the direction in which messages are sent in the MessagingFunction.
+ * By default the messaging direction is OUT.
+ *
+ * @return an EdgeDirection, which can be either IN, OUT or ALL.
+ */
+ public EdgeDirection getDirection() {
+ return direction;
+ }
+
+ /**
+ * Sets the direction in which messages are sent in the MessagingFunction.
+ * By default the messaging direction is OUT.
+ *
+ * @param direction - IN, OUT or ALL
+ */
+ public void setDirection(EdgeDirection direction) {
+ this.direction = direction;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
new file mode 100644
index 0000000..b3a470e
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
@@ -0,0 +1,686 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.spargel;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.flink.api.common.aggregators.Aggregator;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.common.functions.RichCoGroupFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.operators.CoGroupOperator;
+import org.apache.flink.api.java.operators.CustomUnaryOperation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.util.Collector;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class represents iterative graph computations, programmed in a vertex-centric perspective.
+ * It is a special case of <i>Bulk Synchronous Parallel<i> computation. The paradigm has also been
+ * implemented by Google's <i>Pregel</i> system and by <i>Apache Giraph</i>.
+ * <p>
+ * Vertex centric algorithms operate on graphs, which are defined through vertices and edges. The
+ * algorithms send messages along the edges and update the state of vertices based on
+ * the old state and the incoming messages. All vertices have an initial state.
+ * The computation terminates once no vertex updates it state any more.
+ * Additionally, a maximum number of iterations (supersteps) may be specified.
+ * <p>
+ * The computation is here represented by two functions:
+ * <ul>
+ * <li>The {@link VertexUpdateFunction} receives incoming messages and may updates the state for
+ * the vertex. If a state is updated, messages are sent from this vertex. Initially, all vertices are
+ * considered updated.</li>
+ * <li>The {@link MessagingFunction} takes the new vertex state and sends messages along the outgoing
+ * edges of the vertex. The outgoing edges may optionally have an associated value, such as a weight.</li>
+ * </ul>
+ * <p>
+ *
+ * Vertex-centric graph iterations are are run by calling
+ * {@link Graph#runVertexCentricIteration(VertexUpdateFunction, MessagingFunction, int)}.
+ *
+ * @param <K> The type of the vertex key (the vertex identifier).
+ * @param <VV> The type of the vertex value (the state of the vertex).
+ * @param <Message> The type of the message sent between vertices along the edges.
+ * @param <EV> The type of the values that are associated with the edges.
+ */
+public class VertexCentricIteration<K, VV, Message, EV>
+ implements CustomUnaryOperation<Vertex<K, VV>, Vertex<K, VV>>
+{
+ private final VertexUpdateFunction<K, VV, Message> updateFunction;
+
+ private final MessagingFunction<K, VV, Message, EV> messagingFunction;
+
+ private final DataSet<Edge<K, EV>> edgesWithValue;
+
+ private final int maximumNumberOfIterations;
+
+ private final TypeInformation<Message> messageType;
+
+ private DataSet<Vertex<K, VV>> initialVertices;
+
+ private VertexCentricConfiguration configuration;
+
+ // ----------------------------------------------------------------------------------
+
+ private VertexCentricIteration(VertexUpdateFunction<K, VV, Message> uf,
+ MessagingFunction<K, VV, Message, EV> mf,
+ DataSet<Edge<K, EV>> edgesWithValue,
+ int maximumNumberOfIterations)
+ {
+ Preconditions.checkNotNull(uf);
+ Preconditions.checkNotNull(mf);
+ Preconditions.checkNotNull(edgesWithValue);
+ Preconditions.checkArgument(maximumNumberOfIterations > 0, "The maximum number of iterations must be at least one.");
+
+ this.updateFunction = uf;
+ this.messagingFunction = mf;
+ this.edgesWithValue = edgesWithValue;
+ this.maximumNumberOfIterations = maximumNumberOfIterations;
+ this.messageType = getMessageType(mf);
+ }
+
+ private TypeInformation<Message> getMessageType(MessagingFunction<K, VV, Message, EV> mf) {
+ return TypeExtractor.createTypeInfo(MessagingFunction.class, mf.getClass(), 2, null, null);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Custom Operator behavior
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Sets the input data set for this operator. In the case of this operator this input data set represents
+ * the set of vertices with their initial state.
+ *
+ * @param inputData The input data set, which in the case of this operator represents the set of
+ * vertices with their initial state.
+ *
+ * @see org.apache.flink.api.java.operators.CustomUnaryOperation#setInput(org.apache.flink.api.java.DataSet)
+ */
+ @Override
+ public void setInput(DataSet<Vertex<K, VV>> inputData) {
+ this.initialVertices = inputData;
+ }
+
+ /**
+ * Creates the operator that represents this vertex-centric graph computation.
+ *
+ * @return The operator that represents this vertex-centric graph computation.
+ */
+ @Override
+ public DataSet<Vertex<K, VV>> createResult() {
+ if (this.initialVertices == null) {
+ throw new IllegalStateException("The input data set has not been set.");
+ }
+
+ // prepare some type information
+ TypeInformation<K> keyType = ((TupleTypeInfo<?>) initialVertices.getType()).getTypeAt(0);
+ TypeInformation<Tuple2<K, Message>> messageTypeInfo = new TupleTypeInfo<Tuple2<K,Message>>(keyType, messageType);
+
+ // create a graph
+ Graph<K, VV, EV> graph =
+ Graph.fromDataSet(initialVertices, edgesWithValue, initialVertices.getExecutionEnvironment());
+
+ // check whether the numVertices option is set and, if so, compute the total number of vertices
+ // and set it within the messaging and update functions
+
+ if (this.configuration != null && this.configuration.isOptNumVertices()) {
+ try {
+ long numberOfVertices = graph.numberOfVertices();
+ messagingFunction.setNumberOfVertices(numberOfVertices);
+ updateFunction.setNumberOfVertices(numberOfVertices);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ if(this.configuration != null) {
+ messagingFunction.setDirection(this.configuration.getDirection());
+ } else {
+ messagingFunction.setDirection(EdgeDirection.OUT);
+ }
+
+ // retrieve the direction in which the updates are made and in which the messages are sent
+ EdgeDirection messagingDirection = messagingFunction.getDirection();
+
+ // check whether the degrees option is set and, if so, compute the in and the out degrees and
+ // add them to the vertex value
+ if(this.configuration != null && this.configuration.isOptDegrees()) {
+ return createResultVerticesWithDegrees(graph, messagingDirection, messageTypeInfo);
+ } else {
+ return createResultSimpleVertex(messagingDirection, messageTypeInfo);
+ }
+ }
+
+ /**
+ * Creates a new vertex-centric iteration operator for graphs where the edges are associated with a value (such as
+ * a weight or distance).
+ *
+ * @param edgesWithValue The data set containing edges.
+ * @param uf The function that updates the state of the vertices from the incoming messages.
+ * @param mf The function that turns changed vertex states into messages along the edges.
+ *
+ * @param <K> The type of the vertex key (the vertex identifier).
+ * @param <VV> The type of the vertex value (the state of the vertex).
+ * @param <Message> The type of the message sent between vertices along the edges.
+ * @param <EV> The type of the values that are associated with the edges.
+ *
+ * @return An in stance of the vertex-centric graph computation operator.
+ */
+ public static final <K, VV, Message, EV>
+ VertexCentricIteration<K, VV, Message, EV> withEdges(
+ DataSet<Edge<K, EV>> edgesWithValue,
+ VertexUpdateFunction<K, VV, Message> uf,
+ MessagingFunction<K, VV, Message, EV> mf,
+ int maximumNumberOfIterations)
+ {
+ return new VertexCentricIteration<K, VV, Message, EV>(uf, mf, edgesWithValue, maximumNumberOfIterations);
+ }
+
+ /**
+ * Configures this vertex-centric iteration with the provided parameters.
+ *
+ * @param parameters the configuration parameters
+ */
+ public void configure(VertexCentricConfiguration parameters) {
+ this.configuration = parameters;
+ }
+
+ /**
+ * @return the configuration parameters of this vertex-centric iteration
+ */
+ public VertexCentricConfiguration getIterationConfiguration() {
+ return this.configuration;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Wrapping UDFs
+ // --------------------------------------------------------------------------------------------
+
+ private static abstract class VertexUpdateUdf<K, VVWithDegrees, Message> extends RichCoGroupFunction<
+ Tuple2<K, Message>, Vertex<K, VVWithDegrees>, Vertex<K, VVWithDegrees>>
+ implements ResultTypeQueryable<Vertex<K, VVWithDegrees>>
+ {
+ private static final long serialVersionUID = 1L;
+
+ final VertexUpdateFunction<K, VVWithDegrees, Message> vertexUpdateFunction;
+
+ final MessageIterator<Message> messageIter = new MessageIterator<Message>();
+
+ private transient TypeInformation<Vertex<K, VVWithDegrees>> resultType;
+
+
+ private VertexUpdateUdf(VertexUpdateFunction<K, VVWithDegrees, Message> vertexUpdateFunction,
+ TypeInformation<Vertex<K, VVWithDegrees>> resultType)
+ {
+ this.vertexUpdateFunction = vertexUpdateFunction;
+ this.resultType = resultType;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
+ this.vertexUpdateFunction.init(getIterationRuntimeContext());
+ }
+ this.vertexUpdateFunction.preSuperstep();
+ }
+
+ @Override
+ public void close() throws Exception {
+ this.vertexUpdateFunction.postSuperstep();
+ }
+
+ @Override
+ public TypeInformation<Vertex<K, VVWithDegrees>> getProducedType() {
+ return this.resultType;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class VertexUpdateUdfSimpleVV<K, VV, Message> extends VertexUpdateUdf<K, VV, Message> {
+
+ private VertexUpdateUdfSimpleVV(VertexUpdateFunction<K, VV, Message> vertexUpdateFunction, TypeInformation<Vertex<K, VV>> resultType) {
+ super(vertexUpdateFunction, resultType);
+ }
+
+ @Override
+ public void coGroup(Iterable<Tuple2<K, Message>> messages,
+ Iterable<Vertex<K, VV>> vertex,
+ Collector<Vertex<K, VV>> out) throws Exception {
+ final Iterator<Vertex<K, VV>> vertexIter = vertex.iterator();
+
+ if (vertexIter.hasNext()) {
+ Vertex<K, VV> vertexState = vertexIter.next();
+
+ @SuppressWarnings("unchecked")
+ Iterator<Tuple2<?, Message>> downcastIter = (Iterator<Tuple2<?, Message>>) (Iterator<?>) messages.iterator();
+ messageIter.setSource(downcastIter);
+
+ vertexUpdateFunction.setOutput(vertexState, out);
+ vertexUpdateFunction.updateVertex(vertexState, messageIter);
+ }
+ else {
+ final Iterator<Tuple2<K, Message>> messageIter = messages.iterator();
+ if (messageIter.hasNext()) {
+ String message = "Target vertex does not exist!.";
+ try {
+ Tuple2<K, Message> next = messageIter.next();
+ message = "Target vertex '" + next.f0 + "' does not exist!.";
+ } catch (Throwable t) {}
+ throw new Exception(message);
+ } else {
+ throw new Exception();
+ }
+ }
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class VertexUpdateUdfVVWithDegrees<K, VV, Message> extends VertexUpdateUdf<K, Tuple3<VV, Long, Long>, Message> {
+
+ private VertexUpdateUdfVVWithDegrees(VertexUpdateFunction<K, Tuple3<VV, Long, Long>, Message> vertexUpdateFunction,
+ TypeInformation<Vertex<K, Tuple3<VV, Long, Long>>> resultType) {
+ super(vertexUpdateFunction, resultType);
+ }
+
+ @Override
+ public void coGroup(Iterable<Tuple2<K, Message>> messages, Iterable<Vertex<K, Tuple3<VV, Long, Long>>> vertex,
+ Collector<Vertex<K, Tuple3<VV, Long, Long>>> out) throws Exception {
+
+ final Iterator<Vertex<K, Tuple3<VV, Long, Long>>> vertexIter = vertex.iterator();
+
+ if (vertexIter.hasNext()) {
+ Vertex<K, Tuple3<VV, Long, Long>> vertexWithDegrees = vertexIter.next();
+
+ @SuppressWarnings("unchecked")
+ Iterator<Tuple2<?, Message>> downcastIter = (Iterator<Tuple2<?, Message>>) (Iterator<?>) messages.iterator();
+ messageIter.setSource(downcastIter);
+
+ vertexUpdateFunction.setInDegree(vertexWithDegrees.f1.f1);
+ vertexUpdateFunction.setOutDegree(vertexWithDegrees.f1.f2);
+
+ vertexUpdateFunction.setOutputWithDegrees(vertexWithDegrees, out);
+ vertexUpdateFunction.updateVertexFromVertexCentricIteration(vertexWithDegrees, messageIter);
+ }
+ else {
+ final Iterator<Tuple2<K, Message>> messageIter = messages.iterator();
+ if (messageIter.hasNext()) {
+ String message = "Target vertex does not exist!.";
+ try {
+ Tuple2<K, Message> next = messageIter.next();
+ message = "Target vertex '" + next.f0 + "' does not exist!.";
+ } catch (Throwable t) {}
+ throw new Exception(message);
+ } else {
+ throw new Exception();
+ }
+ }
+ }
+ }
+
+ /*
+ * UDF that encapsulates the message sending function for graphs where the edges have an associated value.
+ */
+ private static abstract class MessagingUdfWithEdgeValues<K, VVWithDegrees, VV, Message, EV>
+ extends RichCoGroupFunction<Edge<K, EV>, Vertex<K, VVWithDegrees>, Tuple2<K, Message>>
+ implements ResultTypeQueryable<Tuple2<K, Message>>
+ {
+ private static final long serialVersionUID = 1L;
+
+ final MessagingFunction<K, VV, Message, EV> messagingFunction;
+
+ private transient TypeInformation<Tuple2<K, Message>> resultType;
+
+
+ private MessagingUdfWithEdgeValues(MessagingFunction<K, VV, Message, EV> messagingFunction,
+ TypeInformation<Tuple2<K, Message>> resultType)
+ {
+ this.messagingFunction = messagingFunction;
+ this.resultType = resultType;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
+ this.messagingFunction.init(getIterationRuntimeContext());
+ }
+
+ this.messagingFunction.preSuperstep();
+ }
+
+ @Override
+ public void close() throws Exception {
+ this.messagingFunction.postSuperstep();
+ }
+
+ @Override
+ public TypeInformation<Tuple2<K, Message>> getProducedType() {
+ return this.resultType;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class MessagingUdfWithEVsSimpleVV<K, VV, Message, EV>
+ extends MessagingUdfWithEdgeValues<K, VV, VV, Message, EV> {
+
+ private MessagingUdfWithEVsSimpleVV(MessagingFunction<K, VV, Message, EV> messagingFunction,
+ TypeInformation<Tuple2<K, Message>> resultType) {
+ super(messagingFunction, resultType);
+ }
+
+ @Override
+ public void coGroup(Iterable<Edge<K, EV>> edges,
+ Iterable<Vertex<K, VV>> state,
+ Collector<Tuple2<K, Message>> out) throws Exception {
+ final Iterator<Vertex<K, VV>> stateIter = state.iterator();
+
+ if (stateIter.hasNext()) {
+ Vertex<K, VV> newVertexState = stateIter.next();
+ messagingFunction.set((Iterator<?>) edges.iterator(), out);
+ messagingFunction.sendMessages(newVertexState);
+ }
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class MessagingUdfWithEVsVVWithDegrees<K, VV, Message, EV>
+ extends MessagingUdfWithEdgeValues<K, Tuple3<VV, Long, Long>, VV, Message, EV> {
+
+ private Vertex<K, VV> nextVertex = new Vertex<K, VV>();
+
+ private MessagingUdfWithEVsVVWithDegrees(MessagingFunction<K, VV, Message, EV> messagingFunction,
+ TypeInformation<Tuple2<K, Message>> resultType) {
+ super(messagingFunction, resultType);
+ }
+
+ @Override
+ public void coGroup(Iterable<Edge<K, EV>> edges, Iterable<Vertex<K, Tuple3<VV, Long, Long>>> state,
+ Collector<Tuple2<K, Message>> out) throws Exception {
+
+ final Iterator<Vertex<K, Tuple3<VV, Long, Long>>> stateIter = state.iterator();
+
+ if (stateIter.hasNext()) {
+ Vertex<K, Tuple3<VV, Long, Long>> vertexWithDegrees = stateIter.next();
+
+ nextVertex.setField(vertexWithDegrees.f0, 0);
+ nextVertex.setField(vertexWithDegrees.f1.f0, 1);
+
+ messagingFunction.setInDegree(vertexWithDegrees.f1.f1);
+ messagingFunction.setOutDegree(vertexWithDegrees.f1.f2);
+
+ messagingFunction.set((Iterator<?>) edges.iterator(), out);
+ messagingFunction.sendMessages(nextVertex);
+ }
+ }
+ }
+
+
+ // --------------------------------------------------------------------------------------------
+ // UTIL methods
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Method that builds the messaging function using a coGroup operator for a simple vertex(without
+ * degrees).
+ * It afterwards configures the function with a custom name and broadcast variables.
+ *
+ * @param iteration
+ * @param messageTypeInfo
+ * @param whereArg the argument for the where within the coGroup
+ * @param equalToArg the argument for the equalTo within the coGroup
+ * @return the messaging function
+ */
+ private CoGroupOperator<?, ?, Tuple2<K, Message>> buildMessagingFunction(
+ DeltaIteration<Vertex<K, VV>, Vertex<K, VV>> iteration,
+ TypeInformation<Tuple2<K, Message>> messageTypeInfo, int whereArg, int equalToArg) {
+
+ // build the messaging function (co group)
+ CoGroupOperator<?, ?, Tuple2<K, Message>> messages;
+ MessagingUdfWithEdgeValues<K, VV, VV, Message, EV> messenger =
+ new MessagingUdfWithEVsSimpleVV<K, VV, Message, EV>(messagingFunction, messageTypeInfo);
+
+ messages = this.edgesWithValue.coGroup(iteration.getWorkset()).where(whereArg)
+ .equalTo(equalToArg).with(messenger);
+
+ // configure coGroup message function with name and broadcast variables
+ messages = messages.name("Messaging");
+ if(this.configuration != null) {
+ for (Tuple2<String, DataSet<?>> e : this.configuration.getMessagingBcastVars()) {
+ messages = messages.withBroadcastSet(e.f1, e.f0);
+ }
+ }
+
+ return messages;
+ }
+
+ /**
+ * Method that builds the messaging function using a coGroup operator for a vertex
+ * containing degree information.
+ * It afterwards configures the function with a custom name and broadcast variables.
+ *
+ * @param iteration
+ * @param messageTypeInfo
+ * @param whereArg the argument for the where within the coGroup
+ * @param equalToArg the argument for the equalTo within the coGroup
+ * @return the messaging function
+ */
+ private CoGroupOperator<?, ?, Tuple2<K, Message>> buildMessagingFunctionVerticesWithDegrees(
+ DeltaIteration<Vertex<K, Tuple3<VV, Long, Long>>, Vertex<K, Tuple3<VV, Long, Long>>> iteration,
+ TypeInformation<Tuple2<K, Message>> messageTypeInfo, int whereArg, int equalToArg) {
+
+ // build the messaging function (co group)
+ CoGroupOperator<?, ?, Tuple2<K, Message>> messages;
+ MessagingUdfWithEdgeValues<K, Tuple3<VV, Long, Long>, VV, Message, EV> messenger =
+ new MessagingUdfWithEVsVVWithDegrees<K, VV, Message, EV>(messagingFunction, messageTypeInfo);
+
+ messages = this.edgesWithValue.coGroup(iteration.getWorkset()).where(whereArg)
+ .equalTo(equalToArg).with(messenger);
+
+ // configure coGroup message function with name and broadcast variables
+ messages = messages.name("Messaging");
+
+ if (this.configuration != null) {
+ for (Tuple2<String, DataSet<?>> e : this.configuration.getMessagingBcastVars()) {
+ messages = messages.withBroadcastSet(e.f1, e.f0);
+ }
+ }
+
+ return messages;
+ }
+
+ /**
+ * Helper method which sets up an iteration with the given vertex value(either simple or with degrees)
+ *
+ * @param iteration
+ */
+
+ private void setUpIteration(DeltaIteration<?, ?> iteration) {
+
+ // set up the iteration operator
+ if (this.configuration != null) {
+
+ iteration.name(this.configuration.getName("Vertex-centric iteration (" + updateFunction + " | " + messagingFunction + ")"));
+ iteration.parallelism(this.configuration.getParallelism());
+ iteration.setSolutionSetUnManaged(this.configuration.isSolutionSetUnmanagedMemory());
+
+ // register all aggregators
+ for (Map.Entry<String, Aggregator<?>> entry : this.configuration.getAggregators().entrySet()) {
+ iteration.registerAggregator(entry.getKey(), entry.getValue());
+ }
+ }
+ else {
+ // no configuration provided; set default name
+ iteration.name("Vertex-centric iteration (" + updateFunction + " | " + messagingFunction + ")");
+ }
+ }
+
+ /**
+ * Creates the operator that represents this vertex centric graph computation for a simple vertex.
+ *
+ * @param messagingDirection
+ * @param messageTypeInfo
+ * @return the operator
+ */
+ private DataSet<Vertex<K, VV>> createResultSimpleVertex(EdgeDirection messagingDirection,
+ TypeInformation<Tuple2<K, Message>> messageTypeInfo) {
+
+ DataSet<Tuple2<K, Message>> messages;
+
+ TypeInformation<Vertex<K, VV>> vertexTypes = initialVertices.getType();
+
+ final DeltaIteration<Vertex<K, VV>, Vertex<K, VV>> iteration =
+ initialVertices.iterateDelta(initialVertices, this.maximumNumberOfIterations, 0);
+ setUpIteration(iteration);
+
+ switch (messagingDirection) {
+ case IN:
+ messages = buildMessagingFunction(iteration, messageTypeInfo, 1, 0);
+ break;
+ case OUT:
+ messages = buildMessagingFunction(iteration, messageTypeInfo, 0, 0);
+ break;
+ case ALL:
+ messages = buildMessagingFunction(iteration, messageTypeInfo, 1, 0)
+ .union(buildMessagingFunction(iteration, messageTypeInfo, 0, 0)) ;
+ break;
+ default:
+ throw new IllegalArgumentException("Illegal edge direction");
+ }
+
+ VertexUpdateUdf<K, VV, Message> updateUdf =
+ new VertexUpdateUdfSimpleVV<K, VV, Message>(updateFunction, vertexTypes);
+
+ // build the update function (co group)
+ CoGroupOperator<?, ?, Vertex<K, VV>> updates =
+ messages.coGroup(iteration.getSolutionSet()).where(0).equalTo(0).with(updateUdf);
+
+ configureUpdateFunction(updates);
+
+ return iteration.closeWith(updates, updates);
+ }
+
+ /**
+ * Creates the operator that represents this vertex centric graph computation for a vertex with in
+ * and out degrees added to the vertex value.
+ *
+ * @param graph
+ * @param messagingDirection
+ * @param messageTypeInfo
+ * @return the operator
+ */
+ @SuppressWarnings("serial")
+ private DataSet<Vertex<K, VV>> createResultVerticesWithDegrees(Graph<K, VV, EV> graph, EdgeDirection messagingDirection,
+ TypeInformation<Tuple2<K, Message>> messageTypeInfo) {
+
+ DataSet<Tuple2<K, Message>> messages;
+
+ this.updateFunction.setOptDegrees(this.configuration.isOptDegrees());
+
+ DataSet<Tuple2<K, Long>> inDegrees = graph.inDegrees();
+ DataSet<Tuple2<K, Long>> outDegrees = graph.outDegrees();
+
+ DataSet<Tuple3<K, Long, Long>> degrees = inDegrees.join(outDegrees).where(0).equalTo(0)
+ .with(new FlatJoinFunction<Tuple2<K, Long>, Tuple2<K, Long>, Tuple3<K, Long, Long>>() {
+
+ @Override
+ public void join(Tuple2<K, Long> first, Tuple2<K, Long> second, Collector<Tuple3<K, Long, Long>> out) {
+ out.collect(new Tuple3<K, Long, Long>(first.f0, first.f1, second.f1));
+ }
+ }).withForwardedFieldsFirst("f0;f1").withForwardedFieldsSecond("f1");
+
+ DataSet<Vertex<K, Tuple3<VV, Long, Long>>> verticesWithDegrees = initialVertices
+ .join(degrees).where(0).equalTo(0)
+ .with(new FlatJoinFunction<Vertex<K,VV>, Tuple3<K,Long,Long>, Vertex<K, Tuple3<VV, Long, Long>>>() {
+ @Override
+ public void join(Vertex<K, VV> vertex, Tuple3<K, Long, Long> degrees,
+ Collector<Vertex<K, Tuple3<VV, Long, Long>>> out) throws Exception {
+
+ out.collect(new Vertex<K, Tuple3<VV, Long, Long>>(vertex.getId(),
+ new Tuple3<VV, Long, Long>(vertex.getValue(), degrees.f1, degrees.f2)));
+ }
+ }).withForwardedFieldsFirst("f0");
+
+ // add type info
+ TypeInformation<Vertex<K, Tuple3<VV, Long, Long>>> vertexTypes = verticesWithDegrees.getType();
+
+ final DeltaIteration<Vertex<K, Tuple3<VV, Long, Long>>, Vertex<K, Tuple3<VV, Long, Long>>> iteration =
+ verticesWithDegrees.iterateDelta(verticesWithDegrees, this.maximumNumberOfIterations, 0);
+ setUpIteration(iteration);
+
+ switch (messagingDirection) {
+ case IN:
+ messages = buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 1, 0);
+ break;
+ case OUT:
+ messages = buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 0, 0);
+ break;
+ case ALL:
+ messages = buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 1, 0)
+ .union(buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 0, 0)) ;
+ break;
+ default:
+ throw new IllegalArgumentException("Illegal edge direction");
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ VertexUpdateUdf<K, Tuple3<VV, Long, Long>, Message> updateUdf =
+ new VertexUpdateUdfVVWithDegrees(updateFunction, vertexTypes);
+
+ // build the update function (co group)
+ CoGroupOperator<?, ?, Vertex<K, Tuple3<VV, Long, Long>>> updates =
+ messages.coGroup(iteration.getSolutionSet()).where(0).equalTo(0).with(updateUdf);
+
+ configureUpdateFunction(updates);
+
+ return iteration.closeWith(updates, updates).map(
+ new MapFunction<Vertex<K, Tuple3<VV, Long, Long>>, Vertex<K, VV>>() {
+
+ public Vertex<K, VV> map(Vertex<K, Tuple3<VV, Long, Long>> vertex) {
+ return new Vertex<K, VV>(vertex.getId(), vertex.getValue().f0);
+ }
+ });
+ }
+
+ private <VVWithDegree> void configureUpdateFunction(CoGroupOperator<?, ?, Vertex<K, VVWithDegree>> updates) {
+
+ // configure coGroup update function with name and broadcast variables
+ updates = updates.name("Vertex State Updates");
+ if (this.configuration != null) {
+ for (Tuple2<String, DataSet<?>> e : this.configuration.getUpdateBcastVars()) {
+ updates = updates.withBroadcastSet(e.f1, e.f0);
+ }
+ }
+
+ // let the operator know that we preserve the key field
+ updates.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0");
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
new file mode 100644
index 0000000..248925b
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.spargel;
+
+import java.io.Serializable;
+import java.util.Collection;
+
+import org.apache.flink.api.common.aggregators.Aggregator;
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.Collector;
+
+/**
+ * This class must be extended by functions that compute the state of the vertex depending on the old state and the
+ * incoming messages. The central method is {@link #updateVertex(Comparable, Object, MessageIterator)}, which is
+ * invoked once per vertex per superstep.
+ *
+ * <K> The vertex key type.
+ * <VV> The vertex value type.
+ * <Message> The message type.
+ */
+public abstract class VertexUpdateFunction<K, VV, Message> implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ // --------------------------------------------------------------------------------------------
+ // Attributes that allow vertices to access their in/out degrees and the total number of vertices
+ // inside an iteration.
+ // --------------------------------------------------------------------------------------------
+
+ private long numberOfVertices = -1L;
+
+ /**
+ * Retrieves the number of vertices in the graph.
+ * @return the number of vertices if the {@link IterationConfiguration#setOptNumVertices(boolean)}
+ * option has been set; -1 otherwise.
+ */
+ public long getNumberOfVertices() {
+ return numberOfVertices;
+ }
+
+ void setNumberOfVertices(long numberOfVertices) {
+ this.numberOfVertices = numberOfVertices;
+ }
+
+ //---------------------------------------------------------------------------------------------
+
+ private boolean optDegrees;
+
+ boolean isOptDegrees() {
+ return optDegrees;
+ }
+
+ void setOptDegrees(boolean optDegrees) {
+ this.optDegrees = optDegrees;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Public API Methods
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * This method is invoked once per vertex per superstep. It receives the current state of the vertex, as well as
+ * the incoming messages. It may set a new vertex state via {@link #setNewVertexValue(Object)}. If the vertex
+ * state is changed, it will trigger the sending of messages via the {@link MessagingFunction}.
+ *
+ * @param vertex The vertex.
+ * @param inMessages The incoming messages to this vertex.
+ *
+ * @throws Exception The computation may throw exceptions, which causes the superstep to fail.
+ */
+ public abstract void updateVertex(Vertex<K, VV> vertex, MessageIterator<Message> inMessages) throws Exception;
+
+ /**
+ * This method is executed one per superstep before the vertex update function is invoked for each vertex.
+ *
+ * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail.
+ */
+ public void preSuperstep() throws Exception {}
+
+ /**
+ * This method is executed one per superstep after the vertex update function has been invoked for each vertex.
+ *
+ * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail.
+ */
+ public void postSuperstep() throws Exception {}
+
+ /**
+ * Sets the new value of this vertex. Setting a new value triggers the sending of outgoing messages from this vertex.
+ *
+ * This should be called at most once per updateVertex.
+ *
+ * @param newValue The new vertex value.
+ */
+ public void setNewVertexValue(VV newValue) {
+ if(setNewVertexValueCalled) {
+ throw new IllegalStateException("setNewVertexValue should only be called at most once per updateVertex");
+ }
+ setNewVertexValueCalled = true;
+ if(isOptDegrees()) {
+ outValWithDegrees.f1.f0 = newValue;
+ outWithDegrees.collect(outValWithDegrees);
+ } else {
+ outVal.setValue(newValue);
+ out.collect(outVal);
+ }
+ }
+
+ /**
+ * Gets the number of the superstep, starting at <tt>1</tt>.
+ *
+ * @return The number of the current superstep.
+ */
+ public int getSuperstepNumber() {
+ return this.runtimeContext.getSuperstepNumber();
+ }
+
+ /**
+ * Gets the iteration aggregator registered under the given name. The iteration aggregator combines
+ * all aggregates globally once per superstep and makes them available in the next superstep.
+ *
+ * @param name The name of the aggregator.
+ * @return The aggregator registered under this name, or null, if no aggregator was registered.
+ */
+ public <T extends Aggregator<?>> T getIterationAggregator(String name) {
+ return this.runtimeContext.<T>getIterationAggregator(name);
+ }
+
+ /**
+ * Get the aggregated value that an aggregator computed in the previous iteration.
+ *
+ * @param name The name of the aggregator.
+ * @return The aggregated value of the previous iteration.
+ */
+ public <T extends Value> T getPreviousIterationAggregate(String name) {
+ return this.runtimeContext.<T>getPreviousIterationAggregate(name);
+ }
+
+ /**
+ * Gets the broadcast data set registered under the given name. Broadcast data sets
+ * are available on all parallel instances of a function. They can be registered via
+ * {@link org.apache.flink.graph.spargel.VertexCentricConfiguration#addBroadcastSetForUpdateFunction(String, org.apache.flink.api.java.DataSet)}.
+ *
+ * @param name The name under which the broadcast set is registered.
+ * @return The broadcast data set.
+ */
+ public <T> Collection<T> getBroadcastSet(String name) {
+ return this.runtimeContext.<T>getBroadcastVariable(name);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // internal methods
+ // --------------------------------------------------------------------------------------------
+
+ private IterationRuntimeContext runtimeContext;
+
+ private Collector<Vertex<K, VV>> out;
+
+ private Collector<Vertex<K, Tuple3<VV, Long, Long>>> outWithDegrees;
+
+ private Vertex<K, VV> outVal;
+
+ private Vertex<K, Tuple3<VV, Long, Long>> outValWithDegrees;
+
+ private long inDegree = -1;
+
+ private long outDegree = -1;
+
+ private boolean setNewVertexValueCalled;
+
+ void init(IterationRuntimeContext context) {
+ this.runtimeContext = context;
+ }
+
+ void setOutput(Vertex<K, VV> outVal, Collector<Vertex<K, VV>> out) {
+ this.outVal = outVal;
+ this.out = out;
+ setNewVertexValueCalled = false;
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ <ValueWithDegree> void setOutputWithDegrees(Vertex<K, ValueWithDegree> outVal,
+ Collector out) {
+ this.outValWithDegrees = (Vertex<K, Tuple3<VV, Long, Long>>) outVal;
+ this.outWithDegrees = out;
+ setNewVertexValueCalled = false;
+ }
+
+ /**
+ * Retrieves the vertex in-degree (number of in-coming edges).
+ * @return The in-degree of this vertex if the {@link IterationConfiguration#setOptDegrees(boolean)}
+ * option has been set; -1 otherwise.
+ */
+ public long getInDegree() {
+ return inDegree;
+ }
+
+ void setInDegree(long inDegree) {
+ this.inDegree = inDegree;
+ }
+
+ /**
+ * Retrieve the vertex out-degree (number of out-going edges).
+ * @return The out-degree of this vertex if the {@link IterationConfiguration#setOptDegrees(boolean)}
+ * option has been set; -1 otherwise.
+ */
+ public long getOutDegree() {
+ return outDegree;
+ }
+
+ void setOutDegree(long outDegree) {
+ this.outDegree = outDegree;
+ }
+
+ /**
+ * In order to hide the Tuple3(actualValue, inDegree, OutDegree) vertex value from the user,
+ * another function will be called from {@link org.apache.flink.graph.spargel.VertexCentricIteration}.
+ *
+ * This function will retrieve the vertex from the vertexState and will set its degrees, afterwards calling
+ * the regular updateVertex function.
+ *
+ * @param vertexState
+ * @param inMessages
+ * @throws Exception
+ */
+ @SuppressWarnings("unchecked")
+ <VertexWithDegree> void updateVertexFromVertexCentricIteration(Vertex<K, VertexWithDegree> vertexState,
+ MessageIterator<Message> inMessages) throws Exception {
+
+ Vertex<K, VV> vertex = new Vertex<K, VV>(vertexState.f0,
+ ((Tuple3<VV, Long, Long>)vertexState.getValue()).f0);
+
+ updateVertex(vertex, inMessages);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple3Map.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple3Map.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple3Map.java
new file mode 100644
index 0000000..0e085b4
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple3Map.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.utils;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+
+@ForwardedFields("f0; f1; f2")
+public class EdgeToTuple3Map<K, EV> implements MapFunction<Edge<K, EV>, Tuple3<K, K, EV>> {
+
+ private static final long serialVersionUID = 1L;
+
+ public Tuple3<K, K, EV> map(Edge<K, EV> edge) {
+ return new Tuple3<K, K, EV>(edge.f0, edge.f1, edge.f2);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/NullValueEdgeMapper.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/NullValueEdgeMapper.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/NullValueEdgeMapper.java
new file mode 100644
index 0000000..2bd4719
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/NullValueEdgeMapper.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.utils;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.types.NullValue;
+
+public class NullValueEdgeMapper<K, EV> implements MapFunction<Edge<K, EV>, NullValue> {
+
+ private static final long serialVersionUID = 1L;
+
+ public NullValue map(Edge<K, EV> edge) {
+ return NullValue.getInstance();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToVertexMap.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToVertexMap.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToVertexMap.java
new file mode 100644
index 0000000..e51362b
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToVertexMap.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.utils;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+
+@ForwardedFields("f0; f1")
+public class Tuple2ToVertexMap<K, VV> implements MapFunction<Tuple2<K, VV>, Vertex<K, VV>> {
+
+ private static final long serialVersionUID = 1L;
+
+ public Vertex<K, VV> map(Tuple2<K, VV> vertex) {
+ return new Vertex<K, VV>(vertex.f0, vertex.f1);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java
new file mode 100644
index 0000000..0db9a51
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.utils;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+
+/**
+ * create an Edge DataSetfrom a Tuple3 dataset
+ *
+ * @param <K>
+ * @param <EV>
+ */
+@ForwardedFields("f0; f1; f2")
+public class Tuple3ToEdgeMap<K, EV> implements MapFunction<Tuple3<K, K, EV>, Edge<K, EV>> {
+
+ private static final long serialVersionUID = 1L;
+
+ public Edge<K, EV> map(Tuple3<K, K, EV> tuple) {
+ return new Edge<K, EV>(tuple.f0, tuple.f1, tuple.f2);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/VertexToTuple2Map.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/VertexToTuple2Map.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/VertexToTuple2Map.java
new file mode 100644
index 0000000..04d1f47
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/VertexToTuple2Map.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.utils;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+
+@ForwardedFields("f0; f1")
+public class VertexToTuple2Map<K, VV> implements MapFunction<Vertex<K, VV>, Tuple2<K, VV>> {
+
+ private static final long serialVersionUID = 1L;
+
+ public Tuple2<K, VV> map(Vertex<K, VV> vertex) {
+ return new Tuple2<K, VV>(vertex.f0, vertex.f1);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
new file mode 100644
index 0000000..75b672c
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.validation;
+
+import java.io.Serializable;
+
+import org.apache.flink.graph.Graph;
+
+/**
+ * A utility for defining validation criteria for different types of Graphs.
+ *
+ * @param <K> the vertex key type
+ * @param <VV> the vertex value type
+ * @param <EV> the edge value type
+ */
+@SuppressWarnings("serial")
+public abstract class GraphValidator<K, VV, EV> implements Serializable {
+
+ public abstract boolean validate(Graph<K, VV, EV> graph) throws Exception;
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
new file mode 100644
index 0000000..33d469b
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.validation;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.util.Collector;
+
+@SuppressWarnings("serial")
+public class InvalidVertexIdsValidator<K, VV, EV> extends GraphValidator<K, VV, EV> {
+
+ /**
+ * Checks that the edge set input contains valid vertex Ids, i.e. that they
+ * also exist in the vertex input set.
+ *
+ * @return a boolean stating whether a graph is valid
+ * with respect to its vertex ids.
+ */
+ @Override
+ public boolean validate(Graph<K, VV, EV> graph) throws Exception {
+ DataSet<Tuple1<K>> edgeIds = graph.getEdges()
+ .flatMap(new MapEdgeIds<K, EV>()).distinct();
+ DataSet<K> invalidIds = graph.getVertices().coGroup(edgeIds).where(0)
+ .equalTo(0).with(new GroupInvalidIds<K, VV>()).first(1);
+
+ return invalidIds.map(new KToTupleMap<K>()).count() == 0;
+ }
+
+ private static final class MapEdgeIds<K, EV> implements FlatMapFunction<Edge<K, EV>, Tuple1<K>> {
+ public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) {
+ out.collect(new Tuple1<K>(edge.f0));
+ out.collect(new Tuple1<K>(edge.f1));
+ }
+ }
+
+ private static final class GroupInvalidIds<K, VV> implements CoGroupFunction<Vertex<K, VV>, Tuple1<K>, K> {
+ public void coGroup(Iterable<Vertex<K, VV>> vertexId,
+ Iterable<Tuple1<K>> edgeId, Collector<K> out) {
+ if (!(vertexId.iterator().hasNext())) {
+ // found an id that doesn't exist in the vertex set
+ out.collect(edgeId.iterator().next().f0);
+ }
+ }
+ }
+
+ private static final class KToTupleMap<K> implements MapFunction<K, Tuple1<K>> {
+ public Tuple1<K> map(K key) throws Exception {
+ return new Tuple1<K>(key);
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSACompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSACompilerTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSACompilerTest.java
new file mode 100644
index 0000000..2ad203f
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSACompilerTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.gsa;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
+import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
+import org.apache.flink.optimizer.util.CompilerTestBase;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+public class GSACompilerTest extends CompilerTestBase {
+
+ private static final long serialVersionUID = 1L;
+
+ @Test
+ public void testGSACompiler() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(DEFAULT_PARALLELISM);
+ // compose test program
+ {
+ DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple3<Long, Long, NullValue>(
+ 1L, 2L, NullValue.getInstance())).map(new Tuple3ToEdgeMap<Long, NullValue>());
+
+ Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, new InitVertices(), env);
+
+ DataSet<Vertex<Long, Long>> result = graph.runGatherSumApplyIteration(
+ new GatherNeighborIds(), new SelectMinId(),
+ new UpdateComponentId(), 100).getVertices();
+
+ result.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
+ }
+
+ Plan p = env.createProgramPlan("GSA Connected Components");
+ OptimizedPlan op = compileNoStats(p);
+
+ // check the sink
+ SinkPlanNode sink = op.getDataSinks().iterator().next();
+ assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+ assertEquals(DEFAULT_PARALLELISM, sink.getParallelism());
+ assertEquals(PartitioningProperty.HASH_PARTITIONED, sink.getGlobalProperties().getPartitioning());
+
+ // check the iteration
+ WorksetIterationPlanNode iteration = (WorksetIterationPlanNode) sink.getInput().getSource();
+ assertEquals(DEFAULT_PARALLELISM, iteration.getParallelism());
+
+ // check the solution set join and the delta
+ PlanNode ssDelta = iteration.getSolutionSetDeltaPlanNode();
+ assertTrue(ssDelta instanceof DualInputPlanNode); // this is only true if the update function preserves the partitioning
+
+ DualInputPlanNode ssJoin = (DualInputPlanNode) ssDelta;
+ assertEquals(DEFAULT_PARALLELISM, ssJoin.getParallelism());
+ assertEquals(ShipStrategyType.PARTITION_HASH, ssJoin.getInput1().getShipStrategy());
+ assertEquals(new FieldList(0), ssJoin.getInput1().getShipStrategyKeys());
+
+ // check the workset set join
+ SingleInputPlanNode sumReducer = (SingleInputPlanNode) ssJoin.getInput1().getSource();
+ SingleInputPlanNode gatherMapper = (SingleInputPlanNode) sumReducer.getInput().getSource();
+ DualInputPlanNode edgeJoin = (DualInputPlanNode) gatherMapper.getInput().getSource();
+ assertEquals(DEFAULT_PARALLELISM, edgeJoin.getParallelism());
+ // input1 is the workset
+ assertEquals(ShipStrategyType.FORWARD, edgeJoin.getInput1().getShipStrategy());
+ // input2 is the edges
+ assertEquals(ShipStrategyType.PARTITION_HASH, edgeJoin.getInput2().getShipStrategy());
+ assertTrue(edgeJoin.getInput2().getTempMode().isCached());
+
+ assertEquals(new FieldList(0), edgeJoin.getInput2().getShipStrategyKeys());
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class InitVertices implements MapFunction<Long, Long> {
+
+ public Long map(Long vertexId) {
+ return vertexId;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class GatherNeighborIds extends GatherFunction<Long, NullValue, Long> {
+
+ public Long gather(Neighbor<Long, NullValue> neighbor) {
+ return neighbor.getNeighborValue();
+ }
+ };
+
+ @SuppressWarnings("serial")
+ private static final class SelectMinId extends SumFunction<Long, NullValue, Long> {
+
+ public Long sum(Long newValue, Long currentValue) {
+ return Math.min(newValue, currentValue);
+ }
+ };
+
+ @SuppressWarnings("serial")
+ private static final class UpdateComponentId extends ApplyFunction<Long, Long, Long> {
+
+ public void apply(Long summedValue, Long origValue) {
+ if (summedValue < origValue) {
+ setResult(summedValue);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSATranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSATranslationTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSATranslationTest.java
new file mode 100644
index 0000000..ced7508
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSATranslationTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.gsa;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.flink.api.common.aggregators.LongSumAggregator;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.operators.DeltaIterationResultSet;
+import org.apache.flink.api.java.operators.SingleInputUdfOperator;
+import org.apache.flink.api.java.operators.TwoInputUdfOperator;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+public class GSATranslationTest {
+
+ @Test
+ public void testTranslation() {
+ try {
+ final String ITERATION_NAME = "Test Name";
+
+ final String AGGREGATOR_NAME = "AggregatorName";
+
+ final String BC_SET_GATHER_NAME = "gather messages";
+
+ final String BC_SET_SUM_NAME = "sum updates";
+
+ final String BC_SET_APLLY_NAME = "apply updates";
+
+ final int NUM_ITERATIONS = 13;
+
+ final int ITERATION_parallelism = 77;
+
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Long> bcGather = env.fromElements(1L);
+ DataSet<Long> bcSum = env.fromElements(1L);
+ DataSet<Long> bcApply = env.fromElements(1L);
+
+ DataSet<Vertex<Long, Long>> result;
+
+ // ------------ construct the test program ------------------
+ {
+
+ DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple3<Long, Long, NullValue>(
+ 1L, 2L, NullValue.getInstance())).map(new Tuple3ToEdgeMap<Long, NullValue>());
+
+ Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, new InitVertices(), env);
+
+ GSAConfiguration parameters = new GSAConfiguration();
+
+ parameters.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator());
+ parameters.setName(ITERATION_NAME);
+ parameters.setParallelism(ITERATION_parallelism);
+ parameters.addBroadcastSetForGatherFunction(BC_SET_GATHER_NAME, bcGather);
+ parameters.addBroadcastSetForSumFunction(BC_SET_SUM_NAME, bcSum);
+ parameters.addBroadcastSetForApplyFunction(BC_SET_APLLY_NAME, bcApply);
+
+ result = graph.runGatherSumApplyIteration(
+ new GatherNeighborIds(), new SelectMinId(),
+ new UpdateComponentId(), NUM_ITERATIONS, parameters).getVertices();
+
+ result.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
+ }
+
+
+ // ------------- validate the java program ----------------
+
+ assertTrue(result instanceof DeltaIterationResultSet);
+
+ DeltaIterationResultSet<?, ?> resultSet = (DeltaIterationResultSet<?, ?>) result;
+ DeltaIteration<?, ?> iteration = (DeltaIteration<?, ?>) resultSet.getIterationHead();
+
+ // check the basic iteration properties
+ assertEquals(NUM_ITERATIONS, resultSet.getMaxIterations());
+ assertArrayEquals(new int[] {0}, resultSet.getKeyPositions());
+ assertEquals(ITERATION_parallelism, iteration.getParallelism());
+ assertEquals(ITERATION_NAME, iteration.getName());
+
+ assertEquals(AGGREGATOR_NAME, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());
+
+ // validate that the semantic properties are set as they should
+ TwoInputUdfOperator<?, ?, ?, ?> solutionSetJoin = (TwoInputUdfOperator<?, ?, ?, ?>) resultSet.getNextWorkset();
+ assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(0, 0).contains(0));
+ assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(1, 0).contains(0));
+
+ SingleInputUdfOperator<?, ?, ?> sumReduce = (SingleInputUdfOperator<?, ?, ?>) solutionSetJoin.getInput1();
+ SingleInputUdfOperator<?, ?, ?> gatherMap = (SingleInputUdfOperator<?, ?, ?>) sumReduce.getInput();
+
+ // validate that the broadcast sets are forwarded
+ assertEquals(bcGather, gatherMap.getBroadcastSets().get(BC_SET_GATHER_NAME));
+ assertEquals(bcSum, sumReduce.getBroadcastSets().get(BC_SET_SUM_NAME));
+ assertEquals(bcApply, solutionSetJoin.getBroadcastSets().get(BC_SET_APLLY_NAME));
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class InitVertices implements MapFunction<Long, Long> {
+
+ public Long map(Long vertexId) {
+ return vertexId;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class GatherNeighborIds extends GatherFunction<Long, NullValue, Long> {
+
+ public Long gather(Neighbor<Long, NullValue> neighbor) {
+ return neighbor.getNeighborValue();
+ }
+ };
+
+ @SuppressWarnings("serial")
+ private static final class SelectMinId extends SumFunction<Long, NullValue, Long> {
+
+ public Long sum(Long newValue, Long currentValue) {
+ return Math.min(newValue, currentValue);
+ }
+ };
+
+ @SuppressWarnings("serial")
+ private static final class UpdateComponentId extends ApplyFunction<Long, Long, Long> {
+
+ public void apply(Long summedValue, Long origValue) {
+ if (summedValue < origValue) {
+ setResult(summedValue);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
new file mode 100644
index 0000000..7a8143a
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.graph.spargel;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.util.CompilerTestBase;
+import org.junit.Test;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.util.LocalStrategy;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.graph.library.ConnectedComponents;
+import org.apache.flink.graph.utils.Tuple2ToVertexMap;
+
+
+public class SpargelCompilerTest extends CompilerTestBase {
+
+ private static final long serialVersionUID = 1L;
+
+ @SuppressWarnings("serial")
+ @Test
+ public void testSpargelCompiler() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(DEFAULT_PARALLELISM);
+ // compose test program
+ {
+
+ DataSet<Vertex<Long, Long>> initialVertices = env.fromElements(
+ new Tuple2<Long, Long>(1L, 1L), new Tuple2<Long, Long>(2L, 2L))
+ .map(new Tuple2ToVertexMap<Long, Long>());
+
+ DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L))
+ .map(new MapFunction<Tuple2<Long,Long>, Edge<Long, NullValue>>() {
+
+ public Edge<Long, NullValue> map(Tuple2<Long, Long> edge) {
+ return new Edge<Long, NullValue>(edge.f0, edge.f1, NullValue.getInstance());
+ }
+ });
+
+ Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);
+
+ DataSet<Vertex<Long, Long>> result = graph.runVertexCentricIteration(
+ new ConnectedComponents.CCUpdater<Long>(),
+ new ConnectedComponents.CCMessenger<Long>(), 100)
+ .getVertices();
+
+ result.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
+ }
+
+ Plan p = env.createProgramPlan("Spargel Connected Components");
+ OptimizedPlan op = compileNoStats(p);
+
+ // check the sink
+ SinkPlanNode sink = op.getDataSinks().iterator().next();
+ assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+ assertEquals(DEFAULT_PARALLELISM, sink.getParallelism());
+
+ // check the iteration
+ WorksetIterationPlanNode iteration = (WorksetIterationPlanNode) sink.getInput().getSource();
+ assertEquals(DEFAULT_PARALLELISM, iteration.getParallelism());
+
+ // check the solution set join and the delta
+ PlanNode ssDelta = iteration.getSolutionSetDeltaPlanNode();
+ assertTrue(ssDelta instanceof DualInputPlanNode); // this is only true if the update functions preserves the partitioning
+
+ DualInputPlanNode ssJoin = (DualInputPlanNode) ssDelta;
+ assertEquals(DEFAULT_PARALLELISM, ssJoin.getParallelism());
+ assertEquals(ShipStrategyType.PARTITION_HASH, ssJoin.getInput1().getShipStrategy());
+ assertEquals(new FieldList(0), ssJoin.getInput1().getShipStrategyKeys());
+
+ // check the workset set join
+ DualInputPlanNode edgeJoin = (DualInputPlanNode) ssJoin.getInput1().getSource();
+ assertEquals(DEFAULT_PARALLELISM, edgeJoin.getParallelism());
+ assertEquals(ShipStrategyType.PARTITION_HASH, edgeJoin.getInput1().getShipStrategy());
+ assertEquals(ShipStrategyType.FORWARD, edgeJoin.getInput2().getShipStrategy());
+ assertTrue(edgeJoin.getInput1().getTempMode().isCached());
+
+ assertEquals(new FieldList(0), edgeJoin.getInput1().getShipStrategyKeys());
+
+ // check that the initial partitioning is pushed out of the loop
+ assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput1().getShipStrategy());
+ assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput2().getShipStrategy());
+ assertEquals(new FieldList(0), iteration.getInput1().getShipStrategyKeys());
+ assertEquals(new FieldList(0), iteration.getInput2().getShipStrategyKeys());
+
+ // check that the initial workset sort is outside the loop
+ assertEquals(LocalStrategy.SORT, iteration.getInput2().getLocalStrategy());
+ assertEquals(new FieldList(0), iteration.getInput2().getLocalStrategyKeys());
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @SuppressWarnings("serial")
+ @Test
+ public void testSpargelCompilerWithBroadcastVariable() {
+ try {
+ final String BC_VAR_NAME = "borat variable";
+
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(DEFAULT_PARALLELISM);
+ // compose test program
+ {
+ DataSet<Long> bcVar = env.fromElements(1L);
+
+ DataSet<Vertex<Long, Long>> initialVertices = env.fromElements(
+ new Tuple2<Long, Long>(1L, 1L), new Tuple2<Long, Long>(2L, 2L))
+ .map(new Tuple2ToVertexMap<Long, Long>());
+
+ DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L))
+ .map(new MapFunction<Tuple2<Long,Long>, Edge<Long, NullValue>>() {
+
+ public Edge<Long, NullValue> map(Tuple2<Long, Long> edge) {
+ return new Edge<Long, NullValue>(edge.f0, edge.f1, NullValue.getInstance());
+ }
+ });
+
+ Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);
+
+ VertexCentricConfiguration parameters = new VertexCentricConfiguration();
+ parameters.addBroadcastSetForMessagingFunction(BC_VAR_NAME, bcVar);
+ parameters.addBroadcastSetForUpdateFunction(BC_VAR_NAME, bcVar);
+
+ DataSet<Vertex<Long, Long>> result = graph.runVertexCentricIteration(
+ new ConnectedComponents.CCUpdater<Long>(),
+ new ConnectedComponents.CCMessenger<Long>(), 100)
+ .getVertices();
+
+ result.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
+
+ }
+
+ Plan p = env.createProgramPlan("Spargel Connected Components");
+ OptimizedPlan op = compileNoStats(p);
+
+ // check the sink
+ SinkPlanNode sink = op.getDataSinks().iterator().next();
+ assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+ assertEquals(DEFAULT_PARALLELISM, sink.getParallelism());
+
+ // check the iteration
+ WorksetIterationPlanNode iteration = (WorksetIterationPlanNode) sink.getInput().getSource();
+ assertEquals(DEFAULT_PARALLELISM, iteration.getParallelism());
+
+ // check the solution set join and the delta
+ PlanNode ssDelta = iteration.getSolutionSetDeltaPlanNode();
+ assertTrue(ssDelta instanceof DualInputPlanNode); // this is only true if the update functions preserves the partitioning
+
+ DualInputPlanNode ssJoin = (DualInputPlanNode) ssDelta;
+ assertEquals(DEFAULT_PARALLELISM, ssJoin.getParallelism());
+ assertEquals(ShipStrategyType.PARTITION_HASH, ssJoin.getInput1().getShipStrategy());
+ assertEquals(new FieldList(0), ssJoin.getInput1().getShipStrategyKeys());
+
+ // check the workset set join
+ DualInputPlanNode edgeJoin = (DualInputPlanNode) ssJoin.getInput1().getSource();
+ assertEquals(DEFAULT_PARALLELISM, edgeJoin.getParallelism());
+ assertEquals(ShipStrategyType.PARTITION_HASH, edgeJoin.getInput1().getShipStrategy());
+ assertEquals(ShipStrategyType.FORWARD, edgeJoin.getInput2().getShipStrategy());
+ assertTrue(edgeJoin.getInput1().getTempMode().isCached());
+
+ assertEquals(new FieldList(0), edgeJoin.getInput1().getShipStrategyKeys());
+
+ // check that the initial partitioning is pushed out of the loop
+ assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput1().getShipStrategy());
+ assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput2().getShipStrategy());
+ assertEquals(new FieldList(0), iteration.getInput1().getShipStrategyKeys());
+ assertEquals(new FieldList(0), iteration.getInput2().getShipStrategyKeys());
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}