You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2016/06/30 18:57:50 UTC
[2/4] flink git commit: [FLINK-3618] [gelly] Rename abstract UDF
classes in Scatter-Gather implementation
http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
index 29183e9..4ff4e79 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
@@ -24,9 +24,9 @@ import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.GatherFunction;
import org.apache.flink.graph.spargel.MessageIterator;
-import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.graph.spargel.ScatterFunction;
/**
* This is an implementation of the Single-Source-Shortest Paths algorithm, using a scatter-gather iteration.
@@ -52,7 +52,7 @@ public class SingleSourceShortestPaths<K> implements GraphAlgorithm<K, Double, D
public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> input) {
return input.mapVertices(new InitVerticesMapper<K>(srcVertexId))
- .runScatterGatherIteration(new VertexDistanceUpdater<K>(), new MinDistanceMessenger<K>(),
+ .runScatterGatherIteration(new MinDistanceMessenger<K>(), new VertexDistanceUpdater<K>(),
maxIterations).getVertices();
}
@@ -74,12 +74,30 @@ public class SingleSourceShortestPaths<K> implements GraphAlgorithm<K, Double, D
}
/**
+ * Distributes the minimum distance associated with a given vertex among all
+ * the target vertices summed up with the edge's value.
+ *
+ * @param <K>
+ */
+ public static final class MinDistanceMessenger<K> extends ScatterFunction<K, Double, Double, Double> {
+
+ @Override
+ public void sendMessages(Vertex<K, Double> vertex) {
+ if (vertex.getValue() < Double.POSITIVE_INFINITY) {
+ for (Edge<K, Double> edge : getEdges()) {
+ sendMessageTo(edge.getTarget(), vertex.getValue() + edge.getValue());
+ }
+ }
+ }
+ }
+
+ /**
* Function that updates the value of a vertex by picking the minimum
* distance from all incoming messages.
*
* @param <K>
*/
- public static final class VertexDistanceUpdater<K> extends VertexUpdateFunction<K, Double, Double> {
+ public static final class VertexDistanceUpdater<K> extends GatherFunction<K, Double, Double> {
@Override
public void updateVertex(Vertex<K, Double> vertex,
@@ -98,22 +116,4 @@ public class SingleSourceShortestPaths<K> implements GraphAlgorithm<K, Double, D
}
}
}
-
- /**
- * Distributes the minimum distance associated with a given vertex among all
- * the target vertices summed up with the edge's value.
- *
- * @param <K>
- */
- public static final class MinDistanceMessenger<K> extends MessagingFunction<K, Double, Double, Double> {
-
- @Override
- public void sendMessages(Vertex<K, Double> vertex) {
- if (vertex.getValue() < Double.POSITIVE_INFINITY) {
- for (Edge<K, Double> edge : getEdges()) {
- sendMessageTo(edge.getTarget(), vertex.getValue() + edge.getValue());
- }
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
index 8272d8f..681d060 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
@@ -18,11 +18,11 @@
package org.apache.flink.graph.library;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
import org.apache.flink.api.java.DataSet;
http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/GatherFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/GatherFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/GatherFunction.java
new file mode 100644
index 0000000..d56c0da
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/GatherFunction.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.spargel;
+
+import org.apache.flink.api.common.aggregators.Aggregator;
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+import java.util.Collection;
+
+/**
+ * This class must be extended by functions that compute the state of the vertex depending on the old state and the
+ * incoming messages. The central method is {@link #updateVertex(Vertex, MessageIterator)}, which is
+ * invoked once per vertex per superstep.
+ *
+ * {@code <K>} The vertex key type.
+ * {@code <VV>} The vertex value type.
+ * {@code <Message>} The message type.
+ */
+public abstract class GatherFunction<K, VV, Message> implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ // --------------------------------------------------------------------------------------------
+ // Attributes that allow vertices to access their in/out degrees and the total number of vertices
+ // inside an iteration.
+ // --------------------------------------------------------------------------------------------
+
+ private long numberOfVertices = -1L;
+
+ /**
+ * Retrieves the number of vertices in the graph.
+ * @return the number of vertices if the {@link org.apache.flink.graph.IterationConfiguration#setOptNumVertices(boolean)}
+ * option has been set; -1 otherwise.
+ */
+ public long getNumberOfVertices() {
+ return numberOfVertices;
+ }
+
+ void setNumberOfVertices(long numberOfVertices) {
+ this.numberOfVertices = numberOfVertices;
+ }
+
+ //---------------------------------------------------------------------------------------------
+
+ private boolean optDegrees;
+
+ boolean isOptDegrees() {
+ return optDegrees;
+ }
+
+ void setOptDegrees(boolean optDegrees) {
+ this.optDegrees = optDegrees;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Public API Methods
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * This method is invoked once per vertex per superstep. It receives the current state of the vertex, as well as
+ * the incoming messages. It may set a new vertex state via {@link #setNewVertexValue(Object)}. If the vertex
+ * state is changed, it will trigger the sending of messages via the {@link ScatterFunction}.
+ *
+ * @param vertex The vertex.
+ * @param inMessages The incoming messages to this vertex.
+ *
+ * @throws Exception The computation may throw exceptions, which causes the superstep to fail.
+ */
+ public abstract void updateVertex(Vertex<K, VV> vertex, MessageIterator<Message> inMessages) throws Exception;
+
+ /**
+ * This method is executed once per superstep before the gather function is invoked for each vertex.
+ *
+ * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail.
+ */
+ public void preSuperstep() throws Exception {}
+
+ /**
+ * This method is executed once per superstep after the gather function has been invoked for each vertex.
+ *
+ * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail.
+ */
+ public void postSuperstep() throws Exception {}
+
+ /**
+ * Sets the new value of this vertex. Setting a new value triggers the sending of outgoing messages from this vertex.
+ *
+ * This should be called at most once per updateVertex.
+ *
+ * @param newValue The new vertex value.
+ */
+ public void setNewVertexValue(VV newValue) {
+ if(setNewVertexValueCalled) {
+ throw new IllegalStateException("setNewVertexValue should only be called at most once per updateVertex");
+ }
+ setNewVertexValueCalled = true;
+ if(isOptDegrees()) {
+ outValWithDegrees.f1.f0 = newValue;
+ outWithDegrees.collect(outValWithDegrees);
+ } else {
+ outVal.setValue(newValue);
+ out.collect(outVal);
+ }
+ }
+
+ /**
+ * Gets the number of the superstep, starting at <tt>1</tt>.
+ *
+ * @return The number of the current superstep.
+ */
+ public int getSuperstepNumber() {
+ return this.runtimeContext.getSuperstepNumber();
+ }
+
+ /**
+ * Gets the iteration aggregator registered under the given name. The iteration aggregator combines
+ * all aggregates globally once per superstep and makes them available in the next superstep.
+ *
+ * @param name The name of the aggregator.
+ * @return The aggregator registered under this name, or null, if no aggregator was registered.
+ */
+ public <T extends Aggregator<?>> T getIterationAggregator(String name) {
+ return this.runtimeContext.<T>getIterationAggregator(name);
+ }
+
+ /**
+ * Get the aggregated value that an aggregator computed in the previous iteration.
+ *
+ * @param name The name of the aggregator.
+ * @return The aggregated value of the previous iteration.
+ */
+ public <T extends Value> T getPreviousIterationAggregate(String name) {
+ return this.runtimeContext.<T>getPreviousIterationAggregate(name);
+ }
+
+ /**
+ * Gets the broadcast data set registered under the given name. Broadcast data sets
+ * are available on all parallel instances of a function. They can be registered via
+ * {@link org.apache.flink.graph.spargel.ScatterGatherConfiguration#addBroadcastSetForGatherFunction(String, org.apache.flink.api.java.DataSet)}.
+ *
+ * @param name The name under which the broadcast set is registered.
+ * @return The broadcast data set.
+ */
+ public <T> Collection<T> getBroadcastSet(String name) {
+ return this.runtimeContext.<T>getBroadcastVariable(name);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // internal methods
+ // --------------------------------------------------------------------------------------------
+
+ private IterationRuntimeContext runtimeContext;
+
+ private Collector<Vertex<K, VV>> out;
+
+ private Collector<Vertex<K, Tuple3<VV, Long, Long>>> outWithDegrees;
+
+ private Vertex<K, VV> outVal;
+
+ private Vertex<K, Tuple3<VV, Long, Long>> outValWithDegrees;
+
+ private long inDegree = -1;
+
+ private long outDegree = -1;
+
+ private boolean setNewVertexValueCalled;
+
+ void init(IterationRuntimeContext context) {
+ this.runtimeContext = context;
+ }
+
+ void setOutput(Vertex<K, VV> outVal, Collector<Vertex<K, VV>> out) {
+ this.outVal = outVal;
+ this.out = out;
+ setNewVertexValueCalled = false;
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ <ValueWithDegree> void setOutputWithDegrees(Vertex<K, ValueWithDegree> outVal,
+ Collector out) {
+ this.outValWithDegrees = (Vertex<K, Tuple3<VV, Long, Long>>) outVal;
+ this.outWithDegrees = out;
+ setNewVertexValueCalled = false;
+ }
+
+ /**
+ * Retrieves the vertex in-degree (number of in-coming edges).
+ * @return The in-degree of this vertex
+ */
+ public long getInDegree() {
+ return inDegree;
+ }
+
+ void setInDegree(long inDegree) {
+ this.inDegree = inDegree;
+ }
+
+ /**
+ * Retrieve the vertex out-degree (number of out-going edges).
+ * @return The out-degree of this vertex
+ */
+ public long getOutDegree() {
+ return outDegree;
+ }
+
+ void setOutDegree(long outDegree) {
+ this.outDegree = outDegree;
+ }
+
+ /**
+ * In order to hide the Tuple3(actualValue, inDegree, OutDegree) vertex value from the user,
+ * another function will be called from {@link org.apache.flink.graph.spargel.ScatterGatherIteration}.
+ *
+ * This function will retrieve the vertex from the vertexState and will set its degrees, afterwards calling
+ * the regular updateVertex function.
+ *
+ * @param vertexState
+ * @param inMessages
+ * @throws Exception
+ */
+ @SuppressWarnings("unchecked")
+ <VertexWithDegree> void updateVertexFromScatterGatherIteration(Vertex<K, VertexWithDegree> vertexState,
+ MessageIterator<Message> inMessages) throws Exception {
+
+ Vertex<K, VV> vertex = new Vertex<K, VV>(vertexState.f0,
+ ((Tuple3<VV, Long, Long>)vertexState.getValue()).f0);
+
+ updateVertex(vertex, inMessages);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
deleted file mode 100644
index e12d779..0000000
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
+++ /dev/null
@@ -1,338 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.spargel;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.aggregators.Aggregator;
-import org.apache.flink.api.common.functions.IterationRuntimeContext;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.EdgeDirection;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.types.Value;
-import org.apache.flink.util.Collector;
-
-/**
- * The base class for functions that produce messages between vertices as a part of a {@link ScatterGatherIteration}.
- *
- * @param <K> The type of the vertex key (the vertex identifier).
- * @param <VV> The type of the vertex value (the state of the vertex).
- * @param <Message> The type of the message sent between vertices along the edges.
- * @param <EV> The type of the values that are associated with the edges.
- */
-public abstract class MessagingFunction<K, VV, Message, EV> implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- // --------------------------------------------------------------------------------------------
- // Attributes that allow vertices to access their in/out degrees and the total number of vertices
- // inside an iteration.
- // --------------------------------------------------------------------------------------------
-
- private long numberOfVertices = -1L;
-
- /**
- * Retrieves the number of vertices in the graph.
- * @return the number of vertices if the {@link org.apache.flink.graph.IterationConfiguration#setOptNumVertices(boolean)}
- * option has been set; -1 otherwise.
- */
- public long getNumberOfVertices() {
- return numberOfVertices;
- }
-
- void setNumberOfVertices(long numberOfVertices) {
- this.numberOfVertices = numberOfVertices;
- }
-
- // --------------------------------------------------------------------------------------------
- // Attribute that allows the user to choose the neighborhood type(in/out/all) on which to run
- // the scatter gather iteration.
- // --------------------------------------------------------------------------------------------
-
- private EdgeDirection direction;
-
- /**
- * Retrieves the edge direction in which messages are propagated in the scatter-gather iteration.
- * @return the messaging {@link EdgeDirection}
- */
- public EdgeDirection getDirection() {
- return direction;
- }
-
- void setDirection(EdgeDirection direction) {
- this.direction = direction;
- }
-
- // --------------------------------------------------------------------------------------------
- // Public API Methods
- // --------------------------------------------------------------------------------------------
-
- /**
- * This method is invoked once per superstep for each vertex that was changed in that superstep.
- * It needs to produce the messages that will be received by vertices in the next superstep.
- *
- * @param vertex The vertex that was changed.
- *
- * @throws Exception The computation may throw exceptions, which causes the superstep to fail.
- */
- public abstract void sendMessages(Vertex<K, VV> vertex) throws Exception;
-
- /**
- * This method is executed once per superstep before the vertex update function is invoked for each vertex.
- *
- * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail.
- */
- public void preSuperstep() throws Exception {}
-
- /**
- * This method is executed once per superstep after the vertex update function has been invoked for each vertex.
- *
- * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail.
- */
- public void postSuperstep() throws Exception {}
-
-
- /**
- * Gets an {@link java.lang.Iterable} with all edges. This method is mutually exclusive with
- * {@link #sendMessageToAllNeighbors(Object)} and may be called only once.
- * <p>
- * If the {@link EdgeDirection} is OUT (default), then this iterator contains outgoing edges.
- * If the {@link EdgeDirection} is IN, then this iterator contains incoming edges.
- * If the {@link EdgeDirection} is ALL, then this iterator contains both outgoing and incoming edges.
- *
- * @return An iterator with all edges.
- */
- @SuppressWarnings("unchecked")
- public Iterable<Edge<K, EV>> getEdges() {
- if (edgesUsed) {
- throw new IllegalStateException("Can use either 'getEdges()' or 'sendMessageToAllNeighbors()' exactly once.");
- }
- edgesUsed = true;
- this.edgeIterator.set((Iterator<Edge<K, EV>>) edges);
- return this.edgeIterator;
- }
-
- /**
- * Sends the given message to all vertices that are targets of an edge of the changed vertex.
- * This method is mutually exclusive to the method {@link #getEdges()} and may be called only once.
- * <p>
- * If the {@link EdgeDirection} is OUT (default), the message will be sent to out-neighbors.
- * If the {@link EdgeDirection} is IN, the message will be sent to in-neighbors.
- * If the {@link EdgeDirection} is ALL, the message will be sent to all neighbors.
- *
- * @param m The message to send.
- */
- public void sendMessageToAllNeighbors(Message m) {
- if (edgesUsed) {
- throw new IllegalStateException("Can use either 'getEdges()' or 'sendMessageToAllNeighbors()'"
- + "exactly once.");
- }
-
- edgesUsed = true;
- outValue.f1 = m;
-
- while (edges.hasNext()) {
- Tuple next = (Tuple) edges.next();
-
- /*
- * When EdgeDirection is OUT, the edges iterator only has the out-edges
- * of the vertex, i.e. the ones where this vertex is src.
- * next.getField(1) gives the neighbor of the vertex running this MessagingFunction.
- */
- if (getDirection().equals(EdgeDirection.OUT)) {
- outValue.f0 = next.getField(1);
- }
- /*
- * When EdgeDirection is IN, the edges iterator only has the in-edges
- * of the vertex, i.e. the ones where this vertex is trg.
- * next.getField(10) gives the neighbor of the vertex running this MessagingFunction.
- */
- else if (getDirection().equals(EdgeDirection.IN)) {
- outValue.f0 = next.getField(0);
- }
- // When EdgeDirection is ALL, the edges iterator contains both in- and out- edges
- if (getDirection().equals(EdgeDirection.ALL)) {
- if (next.getField(0).equals(vertexId)) {
- // send msg to the trg
- outValue.f0 = next.getField(1);
- }
- else {
- // send msg to the src
- outValue.f0 = next.getField(0);
- }
- }
- out.collect(outValue);
- }
- }
-
- /**
- * Sends the given message to the vertex identified by the given key. If the target vertex does not exist,
- * the next superstep will cause an exception due to a non-deliverable message.
- *
- * @param target The key (id) of the target vertex to message.
- * @param m The message.
- */
- public void sendMessageTo(K target, Message m) {
- outValue.f0 = target;
- outValue.f1 = m;
- out.collect(outValue);
- }
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Gets the number of the superstep, starting at <tt>1</tt>.
- *
- * @return The number of the current superstep.
- */
- public int getSuperstepNumber() {
- return this.runtimeContext.getSuperstepNumber();
- }
-
- /**
- * Gets the iteration aggregator registered under the given name. The iteration aggregator combines
- * all aggregates globally once per superstep and makes them available in the next superstep.
- *
- * @param name The name of the aggregator.
- * @return The aggregator registered under this name, or null, if no aggregator was registered.
- */
- public <T extends Aggregator<?>> T getIterationAggregator(String name) {
- return this.runtimeContext.<T>getIterationAggregator(name);
- }
-
- /**
- * Get the aggregated value that an aggregator computed in the previous iteration.
- *
- * @param name The name of the aggregator.
- * @return The aggregated value of the previous iteration.
- */
- public <T extends Value> T getPreviousIterationAggregate(String name) {
- return this.runtimeContext.<T>getPreviousIterationAggregate(name);
- }
-
- /**
- * Gets the broadcast data set registered under the given name. Broadcast data sets
- * are available on all parallel instances of a function. They can be registered via
- * {@link org.apache.flink.graph.spargel.ScatterGatherConfiguration#addBroadcastSetForMessagingFunction(String, org.apache.flink.api.java.DataSet)}.
- *
- * @param name The name under which the broadcast set is registered.
- * @return The broadcast data set.
- */
- public <T> Collection<T> getBroadcastSet(String name) {
- return this.runtimeContext.<T>getBroadcastVariable(name);
- }
-
- // --------------------------------------------------------------------------------------------
- // internal methods and state
- // --------------------------------------------------------------------------------------------
-
- private Tuple2<K, Message> outValue;
-
- private IterationRuntimeContext runtimeContext;
-
- private Iterator<?> edges;
-
- private Collector<Tuple2<K, Message>> out;
-
- private K vertexId;
-
- private EdgesIterator<K, EV> edgeIterator;
-
- private boolean edgesUsed;
-
- private long inDegree = -1;
-
- private long outDegree = -1;
-
- void init(IterationRuntimeContext context) {
- this.runtimeContext = context;
- this.outValue = new Tuple2<K, Message>();
- this.edgeIterator = new EdgesIterator<K, EV>();
- }
-
- void set(Iterator<?> edges, Collector<Tuple2<K, Message>> out, K id) {
- this.edges = edges;
- this.out = out;
- this.vertexId = id;
- this.edgesUsed = false;
- }
-
- private static final class EdgesIterator<K, EV>
- implements Iterator<Edge<K, EV>>, Iterable<Edge<K, EV>>
- {
- private Iterator<Edge<K, EV>> input;
-
- private Edge<K, EV> edge = new Edge<K, EV>();
-
- void set(Iterator<Edge<K, EV>> input) {
- this.input = input;
- }
-
- @Override
- public boolean hasNext() {
- return input.hasNext();
- }
-
- @Override
- public Edge<K, EV> next() {
- Edge<K, EV> next = input.next();
- edge.setSource(next.f0);
- edge.setTarget(next.f1);
- edge.setValue(next.f2);
- return edge;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- @Override
- public Iterator<Edge<K, EV>> iterator() {
- return this;
- }
- }
-
- /**
- * Retrieves the vertex in-degree (number of in-coming edges).
- * @return The in-degree of this vertex
- */
- public long getInDegree() {
- return inDegree;
- }
-
- void setInDegree(long inDegree) {
- this.inDegree = inDegree;
- }
-
- /**
- * Retrieve the vertex out-degree (number of out-going edges).
- * @return The out-degree of this vertex
- */
- public long getOutDegree() {
- return outDegree;
- }
-
- void setOutDegree(long outDegree) {
- this.outDegree = outDegree;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterFunction.java
new file mode 100644
index 0000000..336e73d
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterFunction.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.spargel;
+
+import org.apache.flink.api.common.aggregators.Aggregator;
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Iterator;
+
+/**
+ * The base class for functions that produce messages between vertices as a part of a {@link ScatterGatherIteration}.
+ *
+ * @param <K> The type of the vertex key (the vertex identifier).
+ * @param <VV> The type of the vertex value (the state of the vertex).
+ * @param <Message> The type of the message sent between vertices along the edges.
+ * @param <EV> The type of the values that are associated with the edges.
+ */
+public abstract class ScatterFunction<K, VV, Message, EV> implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ // --------------------------------------------------------------------------------------------
+ // Attributes that allow vertices to access their in/out degrees and the total number of vertices
+ // inside an iteration.
+ // --------------------------------------------------------------------------------------------
+
+ private long numberOfVertices = -1L;
+
+ /**
+ * Retrieves the number of vertices in the graph.
+ * @return the number of vertices if the {@link org.apache.flink.graph.IterationConfiguration#setOptNumVertices(boolean)}
+ * option has been set; -1 otherwise.
+ */
+ public long getNumberOfVertices() {
+ return numberOfVertices;
+ }
+
+ void setNumberOfVertices(long numberOfVertices) {
+ this.numberOfVertices = numberOfVertices;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Attribute that allows the user to choose the neighborhood type(in/out/all) on which to run
+ // the scatter gather iteration.
+ // --------------------------------------------------------------------------------------------
+
+ private EdgeDirection direction;
+
+ /**
+ * Retrieves the edge direction in which messages are propagated in the scatter-gather iteration.
+ * @return the messaging {@link EdgeDirection}
+ */
+ public EdgeDirection getDirection() {
+ return direction;
+ }
+
+ void setDirection(EdgeDirection direction) {
+ this.direction = direction;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Public API Methods
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * This method is invoked once per superstep for each vertex that was changed in that superstep.
+ * It needs to produce the messages that will be received by vertices in the next superstep.
+ *
+ * @param vertex The vertex that was changed.
+ *
+ * @throws Exception The computation may throw exceptions, which causes the superstep to fail.
+ */
+ public abstract void sendMessages(Vertex<K, VV> vertex) throws Exception;
+
+ /**
+ * This method is executed once per superstep before the scatter function is invoked for each vertex.
+ *
+ * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail.
+ */
+ public void preSuperstep() throws Exception {}
+
+ /**
+ * This method is executed once per superstep after the scatter function has been invoked for each vertex.
+ *
+ * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail.
+ */
+ public void postSuperstep() throws Exception {}
+
+
+ /**
+ * Gets an {@link java.lang.Iterable} with all edges. This method is mutually exclusive with
+ * {@link #sendMessageToAllNeighbors(Object)} and may be called only once.
+ * <p>
+ * If the {@link EdgeDirection} is OUT (default), then this iterator contains outgoing edges.
+ * If the {@link EdgeDirection} is IN, then this iterator contains incoming edges.
+ * If the {@link EdgeDirection} is ALL, then this iterator contains both outgoing and incoming edges.
+ *
+ * @return An iterator with all edges.
+ */
+ @SuppressWarnings("unchecked")
+ public Iterable<Edge<K, EV>> getEdges() {
+ if (edgesUsed) {
+ throw new IllegalStateException("Can use either 'getEdges()' or 'sendMessageToAllNeighbors()' exactly once.");
+ }
+ edgesUsed = true;
+ this.edgeIterator.set((Iterator<Edge<K, EV>>) edges);
+ return this.edgeIterator;
+ }
+
+ /**
+ * Sends the given message to all vertices that are targets of an edge of the changed vertex.
+ * This method is mutually exclusive to the method {@link #getEdges()} and may be called only once.
+ * <p>
+ * If the {@link EdgeDirection} is OUT (default), the message will be sent to out-neighbors.
+ * If the {@link EdgeDirection} is IN, the message will be sent to in-neighbors.
+ * If the {@link EdgeDirection} is ALL, the message will be sent to all neighbors.
+ *
+ * @param m The message to send.
+ */
+ public void sendMessageToAllNeighbors(Message m) {
+ if (edgesUsed) {
+ throw new IllegalStateException("Can use either 'getEdges()' or 'sendMessageToAllNeighbors()'"
+ + "exactly once.");
+ }
+
+ edgesUsed = true;
+ outValue.f1 = m;
+
+ while (edges.hasNext()) {
+ Tuple next = (Tuple) edges.next();
+
+ /*
+ * When EdgeDirection is OUT, the edges iterator only has the out-edges
+ * of the vertex, i.e. the ones where this vertex is src.
+ * next.getField(1) gives the neighbor of the vertex running this ScatterFunction.
+ */
+ if (getDirection().equals(EdgeDirection.OUT)) {
+ outValue.f0 = next.getField(1);
+ }
+ /*
+ * When EdgeDirection is IN, the edges iterator only has the in-edges
+ * of the vertex, i.e. the ones where this vertex is trg.
+ * next.getField(10) gives the neighbor of the vertex running this ScatterFunction.
+ */
+ else if (getDirection().equals(EdgeDirection.IN)) {
+ outValue.f0 = next.getField(0);
+ }
+ // When EdgeDirection is ALL, the edges iterator contains both in- and out- edges
+ if (getDirection().equals(EdgeDirection.ALL)) {
+ if (next.getField(0).equals(vertexId)) {
+ // send msg to the trg
+ outValue.f0 = next.getField(1);
+ }
+ else {
+ // send msg to the src
+ outValue.f0 = next.getField(0);
+ }
+ }
+ out.collect(outValue);
+ }
+ }
+
+ /**
+ * Sends the given message to the vertex identified by the given key. If the target vertex does not exist,
+ * the next superstep will cause an exception due to a non-deliverable message.
+ *
+ * @param target The key (id) of the target vertex to message.
+ * @param m The message.
+ */
+ public void sendMessageTo(K target, Message m) {
+ outValue.f0 = target;
+ outValue.f1 = m;
+ out.collect(outValue);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Gets the number of the superstep, starting at <tt>1</tt>.
+ *
+ * @return The number of the current superstep.
+ */
+ public int getSuperstepNumber() {
+ return this.runtimeContext.getSuperstepNumber();
+ }
+
+ /**
+ * Gets the iteration aggregator registered under the given name. The iteration aggregator combines
+ * all aggregates globally once per superstep and makes them available in the next superstep.
+ *
+ * @param name The name of the aggregator.
+ * @return The aggregator registered under this name, or null, if no aggregator was registered.
+ */
+ public <T extends Aggregator<?>> T getIterationAggregator(String name) {
+ return this.runtimeContext.<T>getIterationAggregator(name);
+ }
+
+ /**
+ * Get the aggregated value that an aggregator computed in the previous iteration.
+ *
+ * @param name The name of the aggregator.
+ * @return The aggregated value of the previous iteration.
+ */
+ public <T extends Value> T getPreviousIterationAggregate(String name) {
+ return this.runtimeContext.<T>getPreviousIterationAggregate(name);
+ }
+
+ /**
+ * Gets the broadcast data set registered under the given name. Broadcast data sets
+ * are available on all parallel instances of a function. They can be registered via
+ * {@link org.apache.flink.graph.spargel.ScatterGatherConfiguration#addBroadcastSetForScatterFunction(String, org.apache.flink.api.java.DataSet)}.
+ *
+ * @param name The name under which the broadcast set is registered.
+ * @return The broadcast data set.
+ */
+ public <T> Collection<T> getBroadcastSet(String name) {
+ return this.runtimeContext.<T>getBroadcastVariable(name);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // internal methods and state
+ // --------------------------------------------------------------------------------------------
+
+ private Tuple2<K, Message> outValue;
+
+ private IterationRuntimeContext runtimeContext;
+
+ private Iterator<?> edges;
+
+ private Collector<Tuple2<K, Message>> out;
+
+ private K vertexId;
+
+ private EdgesIterator<K, EV> edgeIterator;
+
+ private boolean edgesUsed;
+
+ private long inDegree = -1;
+
+ private long outDegree = -1;
+
+ void init(IterationRuntimeContext context) {
+ this.runtimeContext = context;
+ this.outValue = new Tuple2<K, Message>();
+ this.edgeIterator = new EdgesIterator<K, EV>();
+ }
+
+ void set(Iterator<?> edges, Collector<Tuple2<K, Message>> out, K id) {
+ this.edges = edges;
+ this.out = out;
+ this.vertexId = id;
+ this.edgesUsed = false;
+ }
+
+ private static final class EdgesIterator<K, EV>
+ implements Iterator<Edge<K, EV>>, Iterable<Edge<K, EV>>
+ {
+ private Iterator<Edge<K, EV>> input;
+
+ private Edge<K, EV> edge = new Edge<K, EV>();
+
+ void set(Iterator<Edge<K, EV>> input) {
+ this.input = input;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return input.hasNext();
+ }
+
+ @Override
+ public Edge<K, EV> next() {
+ Edge<K, EV> next = input.next();
+ edge.setSource(next.f0);
+ edge.setTarget(next.f1);
+ edge.setValue(next.f2);
+ return edge;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ @Override
+ public Iterator<Edge<K, EV>> iterator() {
+ return this;
+ }
+ }
+
+ /**
+ * Retrieves the vertex in-degree (number of in-coming edges).
+ * @return The in-degree of this vertex
+ */
+ public long getInDegree() {
+ return inDegree;
+ }
+
+ void setInDegree(long inDegree) {
+ this.inDegree = inDegree;
+ }
+
+ /**
+ * Retrieve the vertex out-degree (number of out-going edges).
+ * @return The out-degree of this vertex
+ */
+ public long getOutDegree() {
+ return outDegree;
+ }
+
+ void setOutDegree(long outDegree) {
+ this.outDegree = outDegree;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherConfiguration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherConfiguration.java
index 3a3de64..4ac1ae1 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherConfiguration.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherConfiguration.java
@@ -29,20 +29,20 @@ import java.util.List;
/**
* A ScatterGatherConfiguration object can be used to set the iteration name and
* degree of parallelism, to register aggregators and use broadcast sets in
- * the {@link org.apache.flink.graph.spargel.VertexUpdateFunction} and {@link org.apache.flink.graph.spargel.MessagingFunction}
+ * the {@link GatherFunction} and {@link ScatterFunction}
*
* The VertexCentricConfiguration object is passed as an argument to
* {@link org.apache.flink.graph.Graph#runScatterGatherIteration (
- * org.apache.flink.graph.spargel.VertexUpdateFunction, org.apache.flink.graph.spargel.MessagingFunction, int,
+ * org.apache.flink.graph.spargel.GatherFunction, org.apache.flink.graph.spargel.ScatterFunction, int,
* ScatterGatherConfiguration)}.
*/
public class ScatterGatherConfiguration extends IterationConfiguration {
- /** the broadcast variables for the update function **/
- private List<Tuple2<String, DataSet<?>>> bcVarsUpdate = new ArrayList<Tuple2<String,DataSet<?>>>();
+ /** the broadcast variables for the scatter function **/
+ private List<Tuple2<String, DataSet<?>>> bcVarsScatter = new ArrayList<>();
- /** the broadcast variables for the messaging function **/
- private List<Tuple2<String, DataSet<?>>> bcVarsMessaging = new ArrayList<Tuple2<String,DataSet<?>>>();
+ /** the broadcast variables for the gather function **/
+ private List<Tuple2<String, DataSet<?>>> bcVarsGather = new ArrayList<>();
/** flag that defines whether the degrees option is set **/
private boolean optDegrees = false;
@@ -53,43 +53,43 @@ public class ScatterGatherConfiguration extends IterationConfiguration {
public ScatterGatherConfiguration() {}
/**
- * Adds a data set as a broadcast set to the messaging function.
+ * Adds a data set as a broadcast set to the scatter function.
*
- * @param name The name under which the broadcast data is available in the messaging function.
+ * @param name The name under which the broadcast data is available in the scatter function.
* @param data The data set to be broadcasted.
*/
- public void addBroadcastSetForMessagingFunction(String name, DataSet<?> data) {
- this.bcVarsMessaging.add(new Tuple2<String, DataSet<?>>(name, data));
+ public void addBroadcastSetForScatterFunction(String name, DataSet<?> data) {
+ this.bcVarsScatter.add(new Tuple2<String, DataSet<?>>(name, data));
}
/**
- * Adds a data set as a broadcast set to the vertex update function.
+ * Adds a data set as a broadcast set to the gather function.
*
- * @param name The name under which the broadcast data is available in the vertex update function.
+ * @param name The name under which the broadcast data is available in the gather function.
* @param data The data set to be broadcasted.
*/
- public void addBroadcastSetForUpdateFunction(String name, DataSet<?> data) {
- this.bcVarsUpdate.add(new Tuple2<String, DataSet<?>>(name, data));
+ public void addBroadcastSetForGatherFunction(String name, DataSet<?> data) {
+ this.bcVarsGather.add(new Tuple2<String, DataSet<?>>(name, data));
}
/**
- * Get the broadcast variables of the VertexUpdateFunction.
+ * Get the broadcast variables of the ScatterFunction.
*
* @return a List of Tuple2, where the first field is the broadcast variable name
* and the second field is the broadcast DataSet.
*/
- public List<Tuple2<String, DataSet<?>>> getUpdateBcastVars() {
- return this.bcVarsUpdate;
+ public List<Tuple2<String, DataSet<?>>> getScatterBcastVars() {
+ return this.bcVarsScatter;
}
/**
- * Get the broadcast variables of the MessagingFunction.
+ * Get the broadcast variables of the GatherFunction.
*
* @return a List of Tuple2, where the first field is the broadcast variable name
* and the second field is the broadcast DataSet.
*/
- public List<Tuple2<String, DataSet<?>>> getMessagingBcastVars() {
- return this.bcVarsMessaging;
+ public List<Tuple2<String, DataSet<?>>> getGatherBcastVars() {
+ return this.bcVarsGather;
}
/**
@@ -113,7 +113,7 @@ public class ScatterGatherConfiguration extends IterationConfiguration {
}
/**
- * Gets the direction in which messages are sent in the MessagingFunction.
+ * Gets the direction in which messages are sent in the ScatterFunction.
* By default the messaging direction is OUT.
*
* @return an EdgeDirection, which can be either IN, OUT or ALL.
@@ -123,7 +123,7 @@ public class ScatterGatherConfiguration extends IterationConfiguration {
}
/**
- * Sets the direction in which messages are sent in the MessagingFunction.
+ * Sets the direction in which messages are sent in the ScatterFunction.
* By default the messaging direction is OUT.
*
* @param direction - IN, OUT or ALL
http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
index fc5c210..fde305f 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
@@ -53,21 +53,21 @@ import java.util.Map;
* Scatter-Gather algorithms operate on graphs, which are defined through vertices and edges. The
* algorithms send messages along the edges and update the state of vertices based on
* the old state and the incoming messages. All vertices have an initial state.
- * The computation terminates once no vertex updates it state any more.
+ * The computation terminates once no vertex updates its state any more.
* Additionally, a maximum number of iterations (supersteps) may be specified.
* <p>
* The computation is here represented by two functions:
* <ul>
- * <li>The {@link VertexUpdateFunction} receives incoming messages and may updates the state for
+ * <li>The {@link GatherFunction} receives incoming messages and may updates the state for
* the vertex. If a state is updated, messages are sent from this vertex. Initially, all vertices are
* considered updated.</li>
- * <li>The {@link MessagingFunction} takes the new vertex state and sends messages along the outgoing
+ * <li>The {@link ScatterFunction} takes the new vertex state and sends messages along the outgoing
* edges of the vertex. The outgoing edges may optionally have an associated value, such as a weight.</li>
* </ul>
* <p>
*
* Scatter-Gather graph iterations are are run by calling
- * {@link Graph#runScatterGatherIteration(VertexUpdateFunction, MessagingFunction, int)}.
+ * {@link Graph#runScatterGatherIteration(ScatterFunction, GatherFunction, int)}.
*
* @param <K> The type of the vertex key (the vertex identifier).
* @param <VV> The type of the vertex value (the state of the vertex).
@@ -77,47 +77,47 @@ import java.util.Map;
public class ScatterGatherIteration<K, VV, Message, EV>
implements CustomUnaryOperation<Vertex<K, VV>, Vertex<K, VV>>
{
- private final VertexUpdateFunction<K, VV, Message> updateFunction;
+ private final ScatterFunction<K, VV, Message, EV> scatterFunction;
+
+ private final GatherFunction<K, VV, Message> gatherFunction;
- private final MessagingFunction<K, VV, Message, EV> messagingFunction;
-
private final DataSet<Edge<K, EV>> edgesWithValue;
-
+
private final int maximumNumberOfIterations;
-
+
private final TypeInformation<Message> messageType;
-
+
private DataSet<Vertex<K, VV>> initialVertices;
private ScatterGatherConfiguration configuration;
// ----------------------------------------------------------------------------------
-
- private ScatterGatherIteration(VertexUpdateFunction<K, VV, Message> uf,
- MessagingFunction<K, VV, Message, EV> mf,
+
+ private ScatterGatherIteration(ScatterFunction<K, VV, Message, EV> sf,
+ GatherFunction<K, VV, Message> gf,
DataSet<Edge<K, EV>> edgesWithValue,
int maximumNumberOfIterations)
{
- Preconditions.checkNotNull(uf);
- Preconditions.checkNotNull(mf);
+ Preconditions.checkNotNull(sf);
+ Preconditions.checkNotNull(gf);
Preconditions.checkNotNull(edgesWithValue);
Preconditions.checkArgument(maximumNumberOfIterations > 0, "The maximum number of iterations must be at least one.");
- this.updateFunction = uf;
- this.messagingFunction = mf;
+ this.scatterFunction = sf;
+ this.gatherFunction = gf;
this.edgesWithValue = edgesWithValue;
- this.maximumNumberOfIterations = maximumNumberOfIterations;
- this.messageType = getMessageType(mf);
+ this.maximumNumberOfIterations = maximumNumberOfIterations;
+ this.messageType = getMessageType(sf);
}
-
- private TypeInformation<Message> getMessageType(MessagingFunction<K, VV, Message, EV> mf) {
- return TypeExtractor.createTypeInfo(mf, MessagingFunction.class, mf.getClass(), 2);
+
+ private TypeInformation<Message> getMessageType(ScatterFunction<K, VV, Message, EV> mf) {
+ return TypeExtractor.createTypeInfo(mf, ScatterFunction.class, mf.getClass(), 2);
}
-
+
// --------------------------------------------------------------------------------------------
// Custom Operator behavior
// --------------------------------------------------------------------------------------------
-
+
/**
* Sets the input data set for this operator. In the case of this operator this input data set represents
* the set of vertices with their initial state.
@@ -131,7 +131,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
public void setInput(DataSet<Vertex<K, VV>> inputData) {
this.initialVertices = inputData;
}
-
+
/**
* Creates the operator that represents this scatter-gather graph computation.
*
@@ -145,14 +145,14 @@ public class ScatterGatherIteration<K, VV, Message, EV>
// prepare some type information
TypeInformation<K> keyType = ((TupleTypeInfo<?>) initialVertices.getType()).getTypeAt(0);
- TypeInformation<Tuple2<K, Message>> messageTypeInfo = new TupleTypeInfo<Tuple2<K,Message>>(keyType, messageType);
+ TypeInformation<Tuple2<K, Message>> messageTypeInfo = new TupleTypeInfo<>(keyType, messageType);
// create a graph
Graph<K, VV, EV> graph =
Graph.fromDataSet(initialVertices, edgesWithValue, initialVertices.getExecutionEnvironment());
// check whether the numVertices option is set and, if so, compute the total number of vertices
- // and set it within the messaging and update functions
+ // and set it within the scatter and gather functions
DataSet<LongValue> numberOfVertices = null;
if (this.configuration != null && this.configuration.isOptNumVertices()) {
@@ -164,13 +164,13 @@ public class ScatterGatherIteration<K, VV, Message, EV>
}
if(this.configuration != null) {
- messagingFunction.setDirection(this.configuration.getDirection());
+ scatterFunction.setDirection(this.configuration.getDirection());
} else {
- messagingFunction.setDirection(EdgeDirection.OUT);
+ scatterFunction.setDirection(EdgeDirection.OUT);
}
// retrieve the direction in which the updates are made and in which the messages are sent
- EdgeDirection messagingDirection = messagingFunction.getDirection();
+ EdgeDirection messagingDirection = scatterFunction.getDirection();
// check whether the degrees option is set and, if so, compute the in and the out degrees and
// add them to the vertex value
@@ -186,9 +186,9 @@ public class ScatterGatherIteration<K, VV, Message, EV>
* a weight or distance).
*
* @param edgesWithValue The data set containing edges.
- * @param uf The function that updates the state of the vertices from the incoming messages.
- * @param mf The function that turns changed vertex states into messages along the edges.
- *
+ * @param sf The function that turns changed vertex states into messages along the edges.
+ * @param gf The function that updates the state of the vertices from the incoming messages.
+ *
* @param <K> The type of the vertex key (the vertex identifier).
* @param <VV> The type of the vertex value (the state of the vertex).
* @param <Message> The type of the message sent between vertices along the edges.
@@ -196,14 +196,11 @@ public class ScatterGatherIteration<K, VV, Message, EV>
*
* @return An in stance of the scatter-gather graph computation operator.
*/
- public static final <K, VV, Message, EV>
- ScatterGatherIteration<K, VV, Message, EV> withEdges(
- DataSet<Edge<K, EV>> edgesWithValue,
- VertexUpdateFunction<K, VV, Message> uf,
- MessagingFunction<K, VV, Message, EV> mf,
- int maximumNumberOfIterations)
+ public static final <K, VV, Message, EV> ScatterGatherIteration<K, VV, Message, EV> withEdges(
+ DataSet<Edge<K, EV>> edgesWithValue, ScatterFunction<K, VV, Message, EV> sf,
+ GatherFunction<K, VV, Message> gf, int maximumNumberOfIterations)
{
- return new ScatterGatherIteration<K, VV, Message, EV>(uf, mf, edgesWithValue, maximumNumberOfIterations);
+ return new ScatterGatherIteration<>(sf, gf, edgesWithValue, maximumNumberOfIterations);
}
/**
@@ -226,23 +223,122 @@ public class ScatterGatherIteration<K, VV, Message, EV>
// Wrapping UDFs
// --------------------------------------------------------------------------------------------
- private static abstract class VertexUpdateUdf<K, VVWithDegrees, Message> extends RichCoGroupFunction<
+ /*
+ * UDF that encapsulates the message sending function for graphs where the edges have an associated value.
+ */
+ private static abstract class ScatterUdfWithEdgeValues<K, VVWithDegrees, VV, Message, EV>
+ extends RichCoGroupFunction<Edge<K, EV>, Vertex<K, VVWithDegrees>, Tuple2<K, Message>>
+ implements ResultTypeQueryable<Tuple2<K, Message>>
+ {
+ private static final long serialVersionUID = 1L;
+
+ final ScatterFunction<K, VV, Message, EV> scatterFunction;
+
+ private transient TypeInformation<Tuple2<K, Message>> resultType;
+
+
+ private ScatterUdfWithEdgeValues(ScatterFunction<K, VV, Message, EV> scatterFunction,
+ TypeInformation<Tuple2<K, Message>> resultType)
+ {
+ this.scatterFunction = scatterFunction;
+ this.resultType = resultType;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ if (getRuntimeContext().hasBroadcastVariable("number of vertices")) {
+ Collection<LongValue> numberOfVertices = getRuntimeContext().getBroadcastVariable("number of vertices");
+ this.scatterFunction.setNumberOfVertices(numberOfVertices.iterator().next().getValue());
+ }
+ if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
+ this.scatterFunction.init(getIterationRuntimeContext());
+ }
+ this.scatterFunction.preSuperstep();
+ }
+
+ @Override
+ public void close() throws Exception {
+ this.scatterFunction.postSuperstep();
+ }
+
+ @Override
+ public TypeInformation<Tuple2<K, Message>> getProducedType() {
+ return this.resultType;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class ScatterUdfWithEVsSimpleVV<K, VV, Message, EV>
+ extends ScatterUdfWithEdgeValues<K, VV, VV, Message, EV> {
+
+ private ScatterUdfWithEVsSimpleVV(ScatterFunction<K, VV, Message, EV> scatterFunction,
+ TypeInformation<Tuple2<K, Message>> resultType) {
+ super(scatterFunction, resultType);
+ }
+
+ @Override
+ public void coGroup(Iterable<Edge<K, EV>> edges,
+ Iterable<Vertex<K, VV>> state,
+ Collector<Tuple2<K, Message>> out) throws Exception {
+ final Iterator<Vertex<K, VV>> stateIter = state.iterator();
+
+ if (stateIter.hasNext()) {
+ Vertex<K, VV> newVertexState = stateIter.next();
+ scatterFunction.set(edges.iterator(), out, newVertexState.getId());
+ scatterFunction.sendMessages(newVertexState);
+ }
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class ScatterUdfWithEVsVVWithDegrees<K, VV, Message, EV>
+ extends ScatterUdfWithEdgeValues<K, Tuple3<VV, LongValue, LongValue>, VV, Message, EV> {
+
+ private Vertex<K, VV> nextVertex = new Vertex<>();
+
+ private ScatterUdfWithEVsVVWithDegrees(ScatterFunction<K, VV, Message, EV> scatterFunction,
+ TypeInformation<Tuple2<K, Message>> resultType) {
+ super(scatterFunction, resultType);
+ }
+
+ @Override
+ public void coGroup(Iterable<Edge<K, EV>> edges, Iterable<Vertex<K, Tuple3<VV, LongValue, LongValue>>> state,
+ Collector<Tuple2<K, Message>> out) throws Exception {
+
+ final Iterator<Vertex<K, Tuple3<VV, LongValue, LongValue>>> stateIter = state.iterator();
+
+ if (stateIter.hasNext()) {
+ Vertex<K, Tuple3<VV, LongValue, LongValue>> vertexWithDegrees = stateIter.next();
+
+ nextVertex.setField(vertexWithDegrees.f0, 0);
+ nextVertex.setField(vertexWithDegrees.f1.f0, 1);
+
+ scatterFunction.setInDegree(vertexWithDegrees.f1.f1.getValue());
+ scatterFunction.setOutDegree(vertexWithDegrees.f1.f2.getValue());
+
+ scatterFunction.set(edges.iterator(), out, vertexWithDegrees.getId());
+ scatterFunction.sendMessages(nextVertex);
+ }
+ }
+ }
+
+ private static abstract class GatherUdf<K, VVWithDegrees, Message> extends RichCoGroupFunction<
Tuple2<K, Message>, Vertex<K, VVWithDegrees>, Vertex<K, VVWithDegrees>>
implements ResultTypeQueryable<Vertex<K, VVWithDegrees>>
{
private static final long serialVersionUID = 1L;
-
- final VertexUpdateFunction<K, VVWithDegrees, Message> vertexUpdateFunction;
- final MessageIterator<Message> messageIter = new MessageIterator<Message>();
-
+ final GatherFunction<K, VVWithDegrees, Message> gatherFunction;
+
+ final MessageIterator<Message> messageIter = new MessageIterator<>();
+
private transient TypeInformation<Vertex<K, VVWithDegrees>> resultType;
-
-
- private VertexUpdateUdf(VertexUpdateFunction<K, VVWithDegrees, Message> vertexUpdateFunction,
+
+
+ private GatherUdf(GatherFunction<K, VVWithDegrees, Message> gatherFunction,
TypeInformation<Vertex<K, VVWithDegrees>> resultType)
{
- this.vertexUpdateFunction = vertexUpdateFunction;
+ this.gatherFunction = gatherFunction;
this.resultType = resultType;
}
@@ -250,17 +346,17 @@ public class ScatterGatherIteration<K, VV, Message, EV>
public void open(Configuration parameters) throws Exception {
if (getRuntimeContext().hasBroadcastVariable("number of vertices")) {
Collection<LongValue> numberOfVertices = getRuntimeContext().getBroadcastVariable("number of vertices");
- this.vertexUpdateFunction.setNumberOfVertices(numberOfVertices.iterator().next().getValue());
+ this.gatherFunction.setNumberOfVertices(numberOfVertices.iterator().next().getValue());
}
if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
- this.vertexUpdateFunction.init(getIterationRuntimeContext());
+ this.gatherFunction.init(getIterationRuntimeContext());
}
- this.vertexUpdateFunction.preSuperstep();
+ this.gatherFunction.preSuperstep();
}
-
+
@Override
public void close() throws Exception {
- this.vertexUpdateFunction.postSuperstep();
+ this.gatherFunction.postSuperstep();
}
@Override
@@ -270,10 +366,10 @@ public class ScatterGatherIteration<K, VV, Message, EV>
}
@SuppressWarnings("serial")
- private static final class VertexUpdateUdfSimpleVV<K, VV, Message> extends VertexUpdateUdf<K, VV, Message> {
+ private static final class GatherUdfSimpleVV<K, VV, Message> extends GatherUdf<K, VV, Message> {
- private VertexUpdateUdfSimpleVV(VertexUpdateFunction<K, VV, Message> vertexUpdateFunction, TypeInformation<Vertex<K, VV>> resultType) {
- super(vertexUpdateFunction, resultType);
+ private GatherUdfSimpleVV(GatherFunction<K, VV, Message> gatherFunction, TypeInformation<Vertex<K, VV>> resultType) {
+ super(gatherFunction, resultType);
}
@Override
@@ -289,8 +385,8 @@ public class ScatterGatherIteration<K, VV, Message, EV>
Iterator<Tuple2<?, Message>> downcastIter = (Iterator<Tuple2<?, Message>>) (Iterator<?>) messages.iterator();
messageIter.setSource(downcastIter);
- vertexUpdateFunction.setOutput(vertexState, out);
- vertexUpdateFunction.updateVertex(vertexState, messageIter);
+ gatherFunction.setOutput(vertexState, out);
+ gatherFunction.updateVertex(vertexState, messageIter);
}
else {
final Iterator<Tuple2<K, Message>> messageIter = messages.iterator();
@@ -299,7 +395,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
try {
Tuple2<K, Message> next = messageIter.next();
message = "Target vertex '" + next.f0 + "' does not exist!.";
- } catch (Throwable t) {}
+ } catch (Throwable ignored) {}
throw new Exception(message);
} else {
throw new Exception();
@@ -309,31 +405,31 @@ public class ScatterGatherIteration<K, VV, Message, EV>
}
@SuppressWarnings("serial")
- private static final class VertexUpdateUdfVVWithDegrees<K, VV, Message> extends VertexUpdateUdf<K, Tuple3<VV, LongValue, LongValue>, Message> {
+ private static final class GatherUdfVVWithDegrees<K, VV, Message> extends GatherUdf<K, Tuple3<VV, LongValue, LongValue>, Message> {
- private VertexUpdateUdfVVWithDegrees(VertexUpdateFunction<K, Tuple3<VV, LongValue, LongValue>, Message> vertexUpdateFunction,
+ private GatherUdfVVWithDegrees(GatherFunction<K, Tuple3<VV, LongValue, LongValue>, Message> gatherFunction,
TypeInformation<Vertex<K, Tuple3<VV, LongValue, LongValue>>> resultType) {
- super(vertexUpdateFunction, resultType);
+ super(gatherFunction, resultType);
}
-
+
@Override
public void coGroup(Iterable<Tuple2<K, Message>> messages, Iterable<Vertex<K, Tuple3<VV, LongValue, LongValue>>> vertex,
Collector<Vertex<K, Tuple3<VV, LongValue, LongValue>>> out) throws Exception {
final Iterator<Vertex<K, Tuple3<VV, LongValue, LongValue>>> vertexIter = vertex.iterator();
-
+
if (vertexIter.hasNext()) {
Vertex<K, Tuple3<VV, LongValue, LongValue>> vertexWithDegrees = vertexIter.next();
-
+
@SuppressWarnings("unchecked")
Iterator<Tuple2<?, Message>> downcastIter = (Iterator<Tuple2<?, Message>>) (Iterator<?>) messages.iterator();
messageIter.setSource(downcastIter);
- vertexUpdateFunction.setInDegree(vertexWithDegrees.f1.f1.getValue());
- vertexUpdateFunction.setOutDegree(vertexWithDegrees.f1.f2.getValue());
+ gatherFunction.setInDegree(vertexWithDegrees.f1.f1.getValue());
+ gatherFunction.setOutDegree(vertexWithDegrees.f1.f2.getValue());
- vertexUpdateFunction.setOutputWithDegrees(vertexWithDegrees, out);
- vertexUpdateFunction.updateVertexFromScatterGatherIteration(vertexWithDegrees, messageIter);
+ gatherFunction.setOutputWithDegrees(vertexWithDegrees, out);
+ gatherFunction.updateVertexFromScatterGatherIteration(vertexWithDegrees, messageIter);
}
else {
final Iterator<Tuple2<K, Message>> messageIter = messages.iterator();
@@ -342,7 +438,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
try {
Tuple2<K, Message> next = messageIter.next();
message = "Target vertex '" + next.f0 + "' does not exist!.";
- } catch (Throwable t) {}
+ } catch (Throwable ignored) {}
throw new Exception(message);
} else {
throw new Exception();
@@ -351,112 +447,12 @@ public class ScatterGatherIteration<K, VV, Message, EV>
}
}
- /*
- * UDF that encapsulates the message sending function for graphs where the edges have an associated value.
- */
- private static abstract class MessagingUdfWithEdgeValues<K, VVWithDegrees, VV, Message, EV>
- extends RichCoGroupFunction<Edge<K, EV>, Vertex<K, VVWithDegrees>, Tuple2<K, Message>>
- implements ResultTypeQueryable<Tuple2<K, Message>>
- {
- private static final long serialVersionUID = 1L;
-
- final MessagingFunction<K, VV, Message, EV> messagingFunction;
-
- private transient TypeInformation<Tuple2<K, Message>> resultType;
-
-
- private MessagingUdfWithEdgeValues(MessagingFunction<K, VV, Message, EV> messagingFunction,
- TypeInformation<Tuple2<K, Message>> resultType)
- {
- this.messagingFunction = messagingFunction;
- this.resultType = resultType;
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- if (getRuntimeContext().hasBroadcastVariable("number of vertices")) {
- Collection<LongValue> numberOfVertices = getRuntimeContext().getBroadcastVariable("number of vertices");
- this.messagingFunction.setNumberOfVertices(numberOfVertices.iterator().next().getValue());
- }
- if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
- this.messagingFunction.init(getIterationRuntimeContext());
- }
- this.messagingFunction.preSuperstep();
- }
-
- @Override
- public void close() throws Exception {
- this.messagingFunction.postSuperstep();
- }
-
- @Override
- public TypeInformation<Tuple2<K, Message>> getProducedType() {
- return this.resultType;
- }
- }
-
- @SuppressWarnings("serial")
- private static final class MessagingUdfWithEVsSimpleVV<K, VV, Message, EV>
- extends MessagingUdfWithEdgeValues<K, VV, VV, Message, EV> {
-
- private MessagingUdfWithEVsSimpleVV(MessagingFunction<K, VV, Message, EV> messagingFunction,
- TypeInformation<Tuple2<K, Message>> resultType) {
- super(messagingFunction, resultType);
- }
-
- @Override
- public void coGroup(Iterable<Edge<K, EV>> edges,
- Iterable<Vertex<K, VV>> state,
- Collector<Tuple2<K, Message>> out) throws Exception {
- final Iterator<Vertex<K, VV>> stateIter = state.iterator();
-
- if (stateIter.hasNext()) {
- Vertex<K, VV> newVertexState = stateIter.next();
- messagingFunction.set((Iterator<?>) edges.iterator(), out, newVertexState.getId());
- messagingFunction.sendMessages(newVertexState);
- }
- }
- }
-
- @SuppressWarnings("serial")
- private static final class MessagingUdfWithEVsVVWithDegrees<K, VV, Message, EV>
- extends MessagingUdfWithEdgeValues<K, Tuple3<VV, LongValue, LongValue>, VV, Message, EV> {
-
- private Vertex<K, VV> nextVertex = new Vertex<K, VV>();
-
- private MessagingUdfWithEVsVVWithDegrees(MessagingFunction<K, VV, Message, EV> messagingFunction,
- TypeInformation<Tuple2<K, Message>> resultType) {
- super(messagingFunction, resultType);
- }
-
- @Override
- public void coGroup(Iterable<Edge<K, EV>> edges, Iterable<Vertex<K, Tuple3<VV, LongValue, LongValue>>> state,
- Collector<Tuple2<K, Message>> out) throws Exception {
-
- final Iterator<Vertex<K, Tuple3<VV, LongValue, LongValue>>> stateIter = state.iterator();
-
- if (stateIter.hasNext()) {
- Vertex<K, Tuple3<VV, LongValue, LongValue>> vertexWithDegrees = stateIter.next();
-
- nextVertex.setField(vertexWithDegrees.f0, 0);
- nextVertex.setField(vertexWithDegrees.f1.f0, 1);
-
- messagingFunction.setInDegree(vertexWithDegrees.f1.f1.getValue());
- messagingFunction.setOutDegree(vertexWithDegrees.f1.f2.getValue());
-
- messagingFunction.set((Iterator<?>) edges.iterator(), out, vertexWithDegrees.getId());
- messagingFunction.sendMessages(nextVertex);
- }
- }
- }
-
-
// --------------------------------------------------------------------------------------------
// UTIL methods
// --------------------------------------------------------------------------------------------
/**
- * Method that builds the messaging function using a coGroup operator for a simple vertex(without
+ * Method that builds the scatter function using a coGroup operator for a simple vertex (without
* degrees).
* It afterwards configures the function with a custom name and broadcast variables.
*
@@ -464,17 +460,17 @@ public class ScatterGatherIteration<K, VV, Message, EV>
* @param messageTypeInfo
* @param whereArg the argument for the where within the coGroup
* @param equalToArg the argument for the equalTo within the coGroup
- * @return the messaging function
+ * @return the scatter function
*/
- private CoGroupOperator<?, ?, Tuple2<K, Message>> buildMessagingFunction(
+ private CoGroupOperator<?, ?, Tuple2<K, Message>> buildScatterFunction(
DeltaIteration<Vertex<K, VV>, Vertex<K, VV>> iteration,
TypeInformation<Tuple2<K, Message>> messageTypeInfo, int whereArg, int equalToArg,
DataSet<LongValue> numberOfVertices) {
- // build the messaging function (co group)
+ // build the scatter function (co group)
CoGroupOperator<?, ?, Tuple2<K, Message>> messages;
- MessagingUdfWithEdgeValues<K, VV, VV, Message, EV> messenger =
- new MessagingUdfWithEVsSimpleVV<K, VV, Message, EV>(messagingFunction, messageTypeInfo);
+ ScatterUdfWithEdgeValues<K, VV, VV, Message, EV> messenger =
+ new ScatterUdfWithEVsSimpleVV<>(scatterFunction, messageTypeInfo);
messages = this.edgesWithValue.coGroup(iteration.getWorkset()).where(whereArg)
.equalTo(equalToArg).with(messenger);
@@ -482,7 +478,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
// configure coGroup message function with name and broadcast variables
messages = messages.name("Messaging");
if(this.configuration != null) {
- for (Tuple2<String, DataSet<?>> e : this.configuration.getMessagingBcastVars()) {
+ for (Tuple2<String, DataSet<?>> e : this.configuration.getScatterBcastVars()) {
messages = messages.withBroadcastSet(e.f1, e.f0);
}
if (this.configuration.isOptNumVertices()) {
@@ -494,7 +490,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
}
/**
- * Method that builds the messaging function using a coGroup operator for a vertex
+ * Method that builds the scatter function using a coGroup operator for a vertex
* containing degree information.
* It afterwards configures the function with a custom name and broadcast variables.
*
@@ -502,17 +498,17 @@ public class ScatterGatherIteration<K, VV, Message, EV>
* @param messageTypeInfo
* @param whereArg the argument for the where within the coGroup
* @param equalToArg the argument for the equalTo within the coGroup
- * @return the messaging function
+ * @return the scatter function
*/
- private CoGroupOperator<?, ?, Tuple2<K, Message>> buildMessagingFunctionVerticesWithDegrees(
+ private CoGroupOperator<?, ?, Tuple2<K, Message>> buildScatterFunctionVerticesWithDegrees(
DeltaIteration<Vertex<K, Tuple3<VV, LongValue, LongValue>>, Vertex<K, Tuple3<VV, LongValue, LongValue>>> iteration,
TypeInformation<Tuple2<K, Message>> messageTypeInfo, int whereArg, int equalToArg,
DataSet<LongValue> numberOfVertices) {
- // build the messaging function (co group)
+ // build the scatter function (co group)
CoGroupOperator<?, ?, Tuple2<K, Message>> messages;
- MessagingUdfWithEdgeValues<K, Tuple3<VV, LongValue, LongValue>, VV, Message, EV> messenger =
- new MessagingUdfWithEVsVVWithDegrees<K, VV, Message, EV>(messagingFunction, messageTypeInfo);
+ ScatterUdfWithEdgeValues<K, Tuple3<VV, LongValue, LongValue>, VV, Message, EV> messenger =
+ new ScatterUdfWithEVsVVWithDegrees<>(scatterFunction, messageTypeInfo);
messages = this.edgesWithValue.coGroup(iteration.getWorkset()).where(whereArg)
.equalTo(equalToArg).with(messenger);
@@ -521,7 +517,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
messages = messages.name("Messaging");
if (this.configuration != null) {
- for (Tuple2<String, DataSet<?>> e : this.configuration.getMessagingBcastVars()) {
+ for (Tuple2<String, DataSet<?>> e : this.configuration.getScatterBcastVars()) {
messages = messages.withBroadcastSet(e.f1, e.f0);
}
if (this.configuration.isOptNumVertices()) {
@@ -543,7 +539,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
// set up the iteration operator
if (this.configuration != null) {
- iteration.name(this.configuration.getName("Scatter-gather iteration (" + updateFunction + " | " + messagingFunction + ")"));
+ iteration.name(this.configuration.getName("Scatter-gather iteration (" + gatherFunction + " | " + scatterFunction + ")"));
iteration.parallelism(this.configuration.getParallelism());
iteration.setSolutionSetUnManaged(this.configuration.isSolutionSetUnmanagedMemory());
@@ -554,7 +550,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
}
else {
// no configuration provided; set default name
- iteration.name("Scatter-gather iteration (" + updateFunction + " | " + messagingFunction + ")");
+ iteration.name("Scatter-gather iteration (" + gatherFunction + " | " + scatterFunction + ")");
}
}
@@ -579,21 +575,21 @@ public class ScatterGatherIteration<K, VV, Message, EV>
switch (messagingDirection) {
case IN:
- messages = buildMessagingFunction(iteration, messageTypeInfo, 1, 0, numberOfVertices);
+ messages = buildScatterFunction(iteration, messageTypeInfo, 1, 0, numberOfVertices);
break;
case OUT:
- messages = buildMessagingFunction(iteration, messageTypeInfo, 0, 0, numberOfVertices);
+ messages = buildScatterFunction(iteration, messageTypeInfo, 0, 0, numberOfVertices);
break;
case ALL:
- messages = buildMessagingFunction(iteration, messageTypeInfo, 1, 0, numberOfVertices)
- .union(buildMessagingFunction(iteration, messageTypeInfo, 0, 0, numberOfVertices)) ;
+ messages = buildScatterFunction(iteration, messageTypeInfo, 1, 0, numberOfVertices)
+ .union(buildScatterFunction(iteration, messageTypeInfo, 0, 0, numberOfVertices)) ;
break;
default:
throw new IllegalArgumentException("Illegal edge direction");
}
- VertexUpdateUdf<K, VV, Message> updateUdf =
- new VertexUpdateUdfSimpleVV<K, VV, Message>(updateFunction, vertexTypes);
+ GatherUdf<K, VV, Message> updateUdf =
+ new GatherUdfSimpleVV<K, VV, Message>(gatherFunction, vertexTypes);
// build the update function (co group)
CoGroupOperator<?, ?, Vertex<K, VV>> updates =
@@ -624,7 +620,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
DataSet<Tuple2<K, Message>> messages;
- this.updateFunction.setOptDegrees(this.configuration.isOptDegrees());
+ this.gatherFunction.setOptDegrees(this.configuration.isOptDegrees());
DataSet<Tuple2<K, LongValue>> inDegrees = graph.inDegrees();
DataSet<Tuple2<K, LongValue>> outDegrees = graph.outDegrees();
@@ -634,7 +630,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
@Override
public void join(Tuple2<K, LongValue> first, Tuple2<K, LongValue> second, Collector<Tuple3<K, LongValue, LongValue>> out) {
- out.collect(new Tuple3<K, LongValue, LongValue>(first.f0, first.f1, second.f1));
+ out.collect(new Tuple3<>(first.f0, first.f1, second.f1));
}
}).withForwardedFieldsFirst("f0;f1").withForwardedFieldsSecond("f1");
@@ -644,9 +640,8 @@ public class ScatterGatherIteration<K, VV, Message, EV>
@Override
public void join(Vertex<K, VV> vertex, Tuple3<K, LongValue, LongValue> degrees,
Collector<Vertex<K, Tuple3<VV, LongValue, LongValue>>> out) throws Exception {
-
- out.collect(new Vertex<K, Tuple3<VV, LongValue, LongValue>>(vertex.getId(),
- new Tuple3<VV, LongValue, LongValue>(vertex.getValue(), degrees.f1, degrees.f2)));
+ out.collect(new Vertex<>(vertex.getId(),
+ new Tuple3<>(vertex.getValue(), degrees.f1, degrees.f2)));
}
}).withForwardedFieldsFirst("f0");
@@ -659,22 +654,22 @@ public class ScatterGatherIteration<K, VV, Message, EV>
switch (messagingDirection) {
case IN:
- messages = buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 1, 0, numberOfVertices);
+ messages = buildScatterFunctionVerticesWithDegrees(iteration, messageTypeInfo, 1, 0, numberOfVertices);
break;
case OUT:
- messages = buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 0, 0, numberOfVertices);
+ messages = buildScatterFunctionVerticesWithDegrees(iteration, messageTypeInfo, 0, 0, numberOfVertices);
break;
case ALL:
- messages = buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 1, 0, numberOfVertices)
- .union(buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 0, 0, numberOfVertices)) ;
+ messages = buildScatterFunctionVerticesWithDegrees(iteration, messageTypeInfo, 1, 0, numberOfVertices)
+ .union(buildScatterFunctionVerticesWithDegrees(iteration, messageTypeInfo, 0, 0, numberOfVertices)) ;
break;
default:
throw new IllegalArgumentException("Illegal edge direction");
}
@SuppressWarnings({ "unchecked", "rawtypes" })
- VertexUpdateUdf<K, Tuple3<VV, LongValue, LongValue>, Message> updateUdf =
- new VertexUpdateUdfVVWithDegrees(updateFunction, vertexTypes);
+ GatherUdf<K, Tuple3<VV, LongValue, LongValue>, Message> updateUdf =
+ new GatherUdfVVWithDegrees(gatherFunction, vertexTypes);
// build the update function (co group)
CoGroupOperator<?, ?, Vertex<K, Tuple3<VV, LongValue, LongValue>>> updates =
@@ -690,7 +685,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
new MapFunction<Vertex<K, Tuple3<VV, LongValue, LongValue>>, Vertex<K, VV>>() {
public Vertex<K, VV> map(Vertex<K, Tuple3<VV, LongValue, LongValue>> vertex) {
- return new Vertex<K, VV>(vertex.getId(), vertex.getValue().f0);
+ return new Vertex<>(vertex.getId(), vertex.getValue().f0);
}
});
}
@@ -700,7 +695,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
// configure coGroup update function with name and broadcast variables
updates = updates.name("Vertex State Updates");
if (this.configuration != null) {
- for (Tuple2<String, DataSet<?>> e : this.configuration.getUpdateBcastVars()) {
+ for (Tuple2<String, DataSet<?>> e : this.configuration.getGatherBcastVars()) {
updates = updates.withBroadcastSet(e.f1, e.f0);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
deleted file mode 100644
index 9085432..0000000
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.spargel;
-
-import java.io.Serializable;
-import java.util.Collection;
-
-import org.apache.flink.api.common.aggregators.Aggregator;
-import org.apache.flink.api.common.functions.IterationRuntimeContext;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.types.Value;
-import org.apache.flink.util.Collector;
-
-/**
- * This class must be extended by functions that compute the state of the vertex depending on the old state and the
- * incoming messages. The central method is {@link #updateVertex(Vertex, MessageIterator)}, which is
- * invoked once per vertex per superstep.
- *
- * {@code <K>} The vertex key type.
- * {@code <VV>} The vertex value type.
- * {@code <Message>} The message type.
- */
-public abstract class VertexUpdateFunction<K, VV, Message> implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- // --------------------------------------------------------------------------------------------
- // Attributes that allow vertices to access their in/out degrees and the total number of vertices
- // inside an iteration.
- // --------------------------------------------------------------------------------------------
-
- private long numberOfVertices = -1L;
-
- /**
- * Retrieves the number of vertices in the graph.
- * @return the number of vertices if the {@link org.apache.flink.graph.IterationConfiguration#setOptNumVertices(boolean)}
- * option has been set; -1 otherwise.
- */
- public long getNumberOfVertices() {
- return numberOfVertices;
- }
-
- void setNumberOfVertices(long numberOfVertices) {
- this.numberOfVertices = numberOfVertices;
- }
-
- //---------------------------------------------------------------------------------------------
-
- private boolean optDegrees;
-
- boolean isOptDegrees() {
- return optDegrees;
- }
-
- void setOptDegrees(boolean optDegrees) {
- this.optDegrees = optDegrees;
- }
-
- // --------------------------------------------------------------------------------------------
- // Public API Methods
- // --------------------------------------------------------------------------------------------
-
- /**
- * This method is invoked once per vertex per superstep. It receives the current state of the vertex, as well as
- * the incoming messages. It may set a new vertex state via {@link #setNewVertexValue(Object)}. If the vertex
- * state is changed, it will trigger the sending of messages via the {@link MessagingFunction}.
- *
- * @param vertex The vertex.
- * @param inMessages The incoming messages to this vertex.
- *
- * @throws Exception The computation may throw exceptions, which causes the superstep to fail.
- */
- public abstract void updateVertex(Vertex<K, VV> vertex, MessageIterator<Message> inMessages) throws Exception;
-
- /**
- * This method is executed one per superstep before the vertex update function is invoked for each vertex.
- *
- * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail.
- */
- public void preSuperstep() throws Exception {}
-
- /**
- * This method is executed one per superstep after the vertex update function has been invoked for each vertex.
- *
- * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail.
- */
- public void postSuperstep() throws Exception {}
-
- /**
- * Sets the new value of this vertex. Setting a new value triggers the sending of outgoing messages from this vertex.
- *
- * This should be called at most once per updateVertex.
- *
- * @param newValue The new vertex value.
- */
- public void setNewVertexValue(VV newValue) {
- if(setNewVertexValueCalled) {
- throw new IllegalStateException("setNewVertexValue should only be called at most once per updateVertex");
- }
- setNewVertexValueCalled = true;
- if(isOptDegrees()) {
- outValWithDegrees.f1.f0 = newValue;
- outWithDegrees.collect(outValWithDegrees);
- } else {
- outVal.setValue(newValue);
- out.collect(outVal);
- }
- }
-
- /**
- * Gets the number of the superstep, starting at <tt>1</tt>.
- *
- * @return The number of the current superstep.
- */
- public int getSuperstepNumber() {
- return this.runtimeContext.getSuperstepNumber();
- }
-
- /**
- * Gets the iteration aggregator registered under the given name. The iteration aggregator combines
- * all aggregates globally once per superstep and makes them available in the next superstep.
- *
- * @param name The name of the aggregator.
- * @return The aggregator registered under this name, or null, if no aggregator was registered.
- */
- public <T extends Aggregator<?>> T getIterationAggregator(String name) {
- return this.runtimeContext.<T>getIterationAggregator(name);
- }
-
- /**
- * Get the aggregated value that an aggregator computed in the previous iteration.
- *
- * @param name The name of the aggregator.
- * @return The aggregated value of the previous iteration.
- */
- public <T extends Value> T getPreviousIterationAggregate(String name) {
- return this.runtimeContext.<T>getPreviousIterationAggregate(name);
- }
-
- /**
- * Gets the broadcast data set registered under the given name. Broadcast data sets
- * are available on all parallel instances of a function. They can be registered via
- * {@link org.apache.flink.graph.spargel.ScatterGatherConfiguration#addBroadcastSetForUpdateFunction(String, org.apache.flink.api.java.DataSet)}.
- *
- * @param name The name under which the broadcast set is registered.
- * @return The broadcast data set.
- */
- public <T> Collection<T> getBroadcastSet(String name) {
- return this.runtimeContext.<T>getBroadcastVariable(name);
- }
-
- // --------------------------------------------------------------------------------------------
- // internal methods
- // --------------------------------------------------------------------------------------------
-
- private IterationRuntimeContext runtimeContext;
-
- private Collector<Vertex<K, VV>> out;
-
- private Collector<Vertex<K, Tuple3<VV, Long, Long>>> outWithDegrees;
-
- private Vertex<K, VV> outVal;
-
- private Vertex<K, Tuple3<VV, Long, Long>> outValWithDegrees;
-
- private long inDegree = -1;
-
- private long outDegree = -1;
-
- private boolean setNewVertexValueCalled;
-
- void init(IterationRuntimeContext context) {
- this.runtimeContext = context;
- }
-
- void setOutput(Vertex<K, VV> outVal, Collector<Vertex<K, VV>> out) {
- this.outVal = outVal;
- this.out = out;
- setNewVertexValueCalled = false;
- }
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
- <ValueWithDegree> void setOutputWithDegrees(Vertex<K, ValueWithDegree> outVal,
- Collector out) {
- this.outValWithDegrees = (Vertex<K, Tuple3<VV, Long, Long>>) outVal;
- this.outWithDegrees = out;
- setNewVertexValueCalled = false;
- }
-
- /**
- * Retrieves the vertex in-degree (number of in-coming edges).
- * @return The in-degree of this vertex
- */
- public long getInDegree() {
- return inDegree;
- }
-
- void setInDegree(long inDegree) {
- this.inDegree = inDegree;
- }
-
- /**
- * Retrieve the vertex out-degree (number of out-going edges).
- * @return The out-degree of this vertex
- */
- public long getOutDegree() {
- return outDegree;
- }
-
- void setOutDegree(long outDegree) {
- this.outDegree = outDegree;
- }
-
- /**
- * In order to hide the Tuple3(actualValue, inDegree, OutDegree) vertex value from the user,
- * another function will be called from {@link org.apache.flink.graph.spargel.ScatterGatherIteration}.
- *
- * This function will retrieve the vertex from the vertexState and will set its degrees, afterwards calling
- * the regular updateVertex function.
- *
- * @param vertexState
- * @param inMessages
- * @throws Exception
- */
- @SuppressWarnings("unchecked")
- <VertexWithDegree> void updateVertexFromScatterGatherIteration(Vertex<K, VertexWithDegree> vertexState,
- MessageIterator<Message> inMessages) throws Exception {
-
- Vertex<K, VV> vertex = new Vertex<K, VV>(vertexState.f0,
- ((Tuple3<VV, Long, Long>)vertexState.getValue()).f0);
-
- updateVertex(vertex, inMessages);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
index 3a750af..14c2fb4 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
@@ -75,8 +75,8 @@ public class SpargelCompilerTest extends CompilerTestBase {
Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);
DataSet<Vertex<Long, Long>> result = graph.runScatterGatherIteration(
- new ConnectedComponents.CCUpdater<Long, Long>(),
- new ConnectedComponents.CCMessenger<Long, Long>(BasicTypeInfo.LONG_TYPE_INFO), 100)
+ new ConnectedComponents.CCMessenger<Long, Long>(BasicTypeInfo.LONG_TYPE_INFO),
+ new ConnectedComponents.CCUpdater<Long, Long>(), 100)
.getVertices();
result.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
@@ -157,12 +157,12 @@ public class SpargelCompilerTest extends CompilerTestBase {
Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);
ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
- parameters.addBroadcastSetForMessagingFunction(BC_VAR_NAME, bcVar);
- parameters.addBroadcastSetForUpdateFunction(BC_VAR_NAME, bcVar);
+ parameters.addBroadcastSetForScatterFunction(BC_VAR_NAME, bcVar);
+ parameters.addBroadcastSetForGatherFunction(BC_VAR_NAME, bcVar);
DataSet<Vertex<Long, Long>> result = graph.runScatterGatherIteration(
- new ConnectedComponents.CCUpdater<Long, Long>(),
- new ConnectedComponents.CCMessenger<Long, Long>(BasicTypeInfo.LONG_TYPE_INFO), 100)
+ new ConnectedComponents.CCMessenger<Long, Long>(BasicTypeInfo.LONG_TYPE_INFO),
+ new ConnectedComponents.CCUpdater<Long, Long>(), 100)
.getVertices();
result.output(new DiscardingOutputFormat<Vertex<Long, Long>>());