You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/05/19 23:03:36 UTC
[07/10] flink git commit: [FLINK-1523] [gelly] added getIn/Out
degrees methods in update and messaging functions;
deleted VertexWithValue type; deleted InaccessibleMethodException;
if the options are not set, -1 is returned; added missing jav
[FLINK-1523] [gelly] added getIn/Out degrees methods in update and messaging functions;
deleted VertexWithValue type;
deleted InaccessibleMethodException; if the options are not set, -1 is returned;
added missing javadocs;
added tests;
renamed type parameters VertexKey -> K, VertexValue -> VV, EdgeValue -> EV.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e2f28d47
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e2f28d47
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e2f28d47
Branch: refs/heads/master
Commit: e2f28d47f6108c49c02ccba8ded7c70d39bab4bb
Parents: e172067
Author: vasia <va...@apache.org>
Authored: Mon May 11 15:21:53 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Tue May 19 22:38:04 2015 +0200
----------------------------------------------------------------------
.../graph/InaccessibleMethodException.java | 32 --
.../apache/flink/graph/VertexWithDegrees.java | 72 ----
.../graph/example/IncrementalSSSPExample.java | 7 +-
.../library/ConnectedComponentsAlgorithm.java | 9 +-
.../flink/graph/spargel/MessagingFunction.java | 107 +++---
.../graph/spargel/VertexCentricIteration.java | 330 +++++++++----------
.../graph/spargel/VertexUpdateFunction.java | 87 +++--
.../test/CollectionModeSuperstepITCase.java | 1 -
.../test/GatherSumApplyConfigurationITCase.java | 1 -
.../test/VertexCentricConfigurationITCase.java | 285 ++++++++++------
...CentricConfigurationWithExceptionITCase.java | 190 -----------
...ctedComponentsWithRandomisedEdgesITCase.java | 1 +
.../test/example/IncrementalSSSPITCase.java | 4 +-
13 files changed, 466 insertions(+), 660 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e2f28d47/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/InaccessibleMethodException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/InaccessibleMethodException.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/InaccessibleMethodException.java
deleted file mode 100644
index de018de..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/InaccessibleMethodException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph;
-
-/**
- * The exception that gets thrown when the degree option or the number of vertices
- * option in {@link org.apache.flink.graph.spargel.IterationConfiguration} was not set.
- */
-public class InaccessibleMethodException extends Exception {
-
- public InaccessibleMethodException() {}
-
- public InaccessibleMethodException(String text) {
- super(text);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/e2f28d47/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/VertexWithDegrees.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/VertexWithDegrees.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/VertexWithDegrees.java
deleted file mode 100644
index b692475..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/VertexWithDegrees.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph;
-
-import java.io.Serializable;
-
-/**
- * Represents the graph's nodes. It carries an ID and a value as well as the vertex inDegree and outDegree.
- * For vertices with no value, use {@link org.apache.flink.types.NullValue} as the value type.
- *
- * @param <K>
- * @param <V>
- */
-public class VertexWithDegrees<K extends Comparable<K> & Serializable, V extends Serializable>
- extends Vertex<K, V> {
-
- private long inDegree;
-
- private long outDegree;
-
- public VertexWithDegrees() {
- super();
- inDegree = -1l;
- outDegree = -1l;
- }
-
- public VertexWithDegrees(K k, V v) {
- super(k,v);
- inDegree = 0l;
- outDegree = 0l;
- }
-
- public Long getInDegree() throws Exception{
- if(inDegree == -1) {
- throw new InaccessibleMethodException("The degree option was not set. To access the degrees, " +
- "call iterationConfiguration.setOptDegrees(true).");
- }
- return inDegree;
- }
-
- public void setInDegree(Long inDegree) {
- this.inDegree = inDegree;
- }
-
- public Long getOutDegree() throws Exception{
- if(outDegree == -1) {
- throw new InaccessibleMethodException("The degree option was not set. To access the degrees, " +
- "call iterationConfiguration.setOptDegrees(true).");
- }
- return outDegree;
- }
-
- public void setOutDegree(Long outDegree) {
- this.outDegree = outDegree;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/e2f28d47/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSPExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSPExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSPExample.java
index ec43207..365518c 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSPExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSPExample.java
@@ -26,11 +26,10 @@ import org.apache.flink.graph.Edge;
import org.apache.flink.graph.EdgeDirection;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.VertexWithDegrees;
import org.apache.flink.graph.example.utils.IncrementalSSSPData;
-import org.apache.flink.graph.spargel.IterationConfiguration;
import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexCentricConfiguration;
import org.apache.flink.graph.spargel.VertexUpdateFunction;
import org.apache.flink.graph.utils.Tuple2ToVertexMap;
import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
@@ -95,7 +94,7 @@ public class IncrementalSSSPExample implements ProgramDescription {
graph.removeEdge(edgeToBeRemoved);
// configure the iteration
- IterationConfiguration parameters = new IterationConfiguration();
+ VertexCentricConfiguration parameters = new VertexCentricConfiguration();
if(isInSSSP(edgeToBeRemoved, edgesInSSSP)) {
@@ -160,7 +159,7 @@ public class IncrementalSSSPExample implements ProgramDescription {
@Override
public void updateVertex(Vertex<Long, Double> vertex, MessageIterator<Double> inMessages) throws Exception {
if (inMessages.hasNext()) {
- Long outDegree = ((VertexWithDegrees)vertex).getOutDegree() - 1;
+ Long outDegree = getOutDegree() - 1;
// check if the vertex has another SP-Edge
if (outDegree > 0) {
// there is another shortest path from the source to this vertex
http://git-wip-us.apache.org/repos/asf/flink/blob/e2f28d47/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponentsAlgorithm.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponentsAlgorithm.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponentsAlgorithm.java
index a2ba2ac..7b536e5 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponentsAlgorithm.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponentsAlgorithm.java
@@ -20,6 +20,7 @@ package org.apache.flink.graph.library;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.MessagingFunction;
import org.apache.flink.graph.spargel.VertexUpdateFunction;
@@ -60,7 +61,7 @@ public class ConnectedComponentsAlgorithm implements GraphAlgorithm<Long, Long,
public static final class CCUpdater extends VertexUpdateFunction<Long, Long, Long> {
@Override
- public void updateVertex(Long id, Long currentMin, MessageIterator<Long> messages) throws Exception {
+ public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> messages) throws Exception {
long min = Long.MAX_VALUE;
for (long msg : messages) {
@@ -68,7 +69,7 @@ public class ConnectedComponentsAlgorithm implements GraphAlgorithm<Long, Long,
}
// update vertex value, if new minimum
- if (min < currentMin) {
+ if (min < vertex.getValue()) {
setNewVertexValue(min);
}
}
@@ -80,9 +81,9 @@ public class ConnectedComponentsAlgorithm implements GraphAlgorithm<Long, Long,
public static final class CCMessenger extends MessagingFunction<Long, Long, Long, NullValue> {
@Override
- public void sendMessages(Long id, Long currentMin) throws Exception {
+ public void sendMessages(Vertex<Long, Long> vertex) throws Exception {
// send current minimum to neighbors
- sendMessageToAllNeighbors(currentMin);
+ sendMessageToAllNeighbors(vertex.getValue());
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e2f28d47/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
index b3092fd..4245c24 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
@@ -26,24 +26,21 @@ import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.EdgeDirection;
-import org.apache.flink.graph.InaccessibleMethodException;
import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.VertexWithDegrees;
import org.apache.flink.types.Value;
import org.apache.flink.util.Collector;
/**
* The base class for functions that produce messages between vertices as a part of a {@link VertexCentricIteration}.
*
- * @param <VertexKey> The type of the vertex key (the vertex identifier).
- * @param <VertexValue> The type of the vertex value (the state of the vertex).
+ * @param <K> The type of the vertex key (the vertex identifier).
+ * @param <VV> The type of the vertex value (the state of the vertex).
* @param <Message> The type of the message sent between vertices along the edges.
- * @param <EdgeValue> The type of the values that are associated with the edges.
+ * @param <EV> The type of the values that are associated with the edges.
*/
-public abstract class MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> implements Serializable {
+public abstract class MessagingFunction<K, VV, Message, EV> implements Serializable {
private static final long serialVersionUID = 1L;
@@ -54,11 +51,12 @@ public abstract class MessagingFunction<VertexKey, VertexValue, Message, EdgeVal
private long numberOfVertices = -1L;
- public long getNumberOfVertices() throws Exception{
- if (numberOfVertices == -1) {
- throw new InaccessibleMethodException("The number of vertices option is not set. " +
- "To access the number of vertices, call iterationConfiguration.setOptNumVertices(true).");
- }
+ /**
+ * Retrieves the number of vertices in the graph.
+ * @return the number of vertices if the {@link IterationConfiguration#setOptNumVertices(boolean)}
+ * option has been set; -1 otherwise.
+ */
+ public long getNumberOfVertices() {
return numberOfVertices;
}
@@ -73,11 +71,15 @@ public abstract class MessagingFunction<VertexKey, VertexValue, Message, EdgeVal
private EdgeDirection direction;
+ /**
+ * Retrieves the edge direction in which messages are propagated in the vertex-centric iteration.
+ * @return the messaging {@link EdgeDirection}
+ */
public EdgeDirection getDirection() {
return direction;
}
- public void setDirection(EdgeDirection direction) {
+ void setDirection(EdgeDirection direction) {
this.direction = direction;
}
@@ -93,7 +95,7 @@ public abstract class MessagingFunction<VertexKey, VertexValue, Message, EdgeVal
*
* @throws Exception The computation may throw exceptions, which causes the superstep to fail.
*/
- public abstract void sendMessages(Vertex<VertexKey, VertexValue> vertex) throws Exception;
+ public abstract void sendMessages(Vertex<K, VV> vertex) throws Exception;
/**
* This method is executed one per superstep before the vertex update function is invoked for each vertex.
@@ -117,12 +119,12 @@ public abstract class MessagingFunction<VertexKey, VertexValue, Message, EdgeVal
* @return An iterator with all outgoing edges.
*/
@SuppressWarnings("unchecked")
- public Iterable<Edge<VertexKey, EdgeValue>> getEdges() {
+ public Iterable<Edge<K, EV>> getEdges() {
if (edgesUsed) {
throw new IllegalStateException("Can use either 'getEdges()' or 'sendMessageToAllTargets()' exactly once.");
}
edgesUsed = true;
- this.edgeIterator.set((Iterator<Edge<VertexKey, EdgeValue>>) edges);
+ this.edgeIterator.set((Iterator<Edge<K, EV>>) edges);
return this.edgeIterator;
}
@@ -143,7 +145,7 @@ public abstract class MessagingFunction<VertexKey, VertexValue, Message, EdgeVal
while (edges.hasNext()) {
Tuple next = (Tuple) edges.next();
- VertexKey k = next.getField(1);
+ K k = next.getField(1);
outValue.f0 = k;
out.collect(outValue);
}
@@ -156,7 +158,7 @@ public abstract class MessagingFunction<VertexKey, VertexValue, Message, EdgeVal
* @param target The key (id) of the target vertex to message.
* @param m The message.
*/
- public void sendMessageTo(VertexKey target, Message m) {
+ public void sendMessageTo(K target, Message m) {
outValue.f0 = target;
outValue.f1 = m;
out.collect(outValue);
@@ -210,39 +212,42 @@ public abstract class MessagingFunction<VertexKey, VertexValue, Message, EdgeVal
// internal methods and state
// --------------------------------------------------------------------------------------------
- private Tuple2<VertexKey, Message> outValue;
+ private Tuple2<K, Message> outValue;
private IterationRuntimeContext runtimeContext;
private Iterator<?> edges;
- private Collector<Tuple2<VertexKey, Message>> out;
+ private Collector<Tuple2<K, Message>> out;
- private EdgesIterator<VertexKey, EdgeValue> edgeIterator;
+ private EdgesIterator<K, EV> edgeIterator;
private boolean edgesUsed;
-
+
+ private long inDegree = -1;
+
+ private long outDegree = -1;
void init(IterationRuntimeContext context) {
this.runtimeContext = context;
- this.outValue = new Tuple2<VertexKey, Message>();
- this.edgeIterator = new EdgesIterator<VertexKey, EdgeValue>();
+ this.outValue = new Tuple2<K, Message>();
+ this.edgeIterator = new EdgesIterator<K, EV>();
}
- void set(Iterator<?> edges, Collector<Tuple2<VertexKey, Message>> out) {
+ void set(Iterator<?> edges, Collector<Tuple2<K, Message>> out) {
this.edges = edges;
this.out = out;
this.edgesUsed = false;
}
- private static final class EdgesIterator<VertexKey, EdgeValue>
- implements Iterator<Edge<VertexKey, EdgeValue>>, Iterable<Edge<VertexKey, EdgeValue>>
+ private static final class EdgesIterator<K, EV>
+ implements Iterator<Edge<K, EV>>, Iterable<Edge<K, EV>>
{
- private Iterator<Edge<VertexKey, EdgeValue>> input;
+ private Iterator<Edge<K, EV>> input;
- private Edge<VertexKey, EdgeValue> edge = new Edge<VertexKey, EdgeValue>();
+ private Edge<K, EV> edge = new Edge<K, EV>();
- void set(Iterator<Edge<VertexKey, EdgeValue>> input) {
+ void set(Iterator<Edge<K, EV>> input) {
this.input = input;
}
@@ -252,8 +257,8 @@ public abstract class MessagingFunction<VertexKey, VertexValue, Message, EdgeVal
}
@Override
- public Edge<VertexKey, EdgeValue> next() {
- Edge<VertexKey, EdgeValue> next = input.next();
+ public Edge<K, EV> next() {
+ Edge<K, EV> next = input.next();
edge.setSource(next.f0);
edge.setTarget(next.f1);
edge.setValue(next.f2);
@@ -265,28 +270,34 @@ public abstract class MessagingFunction<VertexKey, VertexValue, Message, EdgeVal
throw new UnsupportedOperationException();
}
@Override
- public Iterator<Edge<VertexKey, EdgeValue>> iterator() {
+ public Iterator<Edge<K, EV>> iterator() {
return this;
}
}
/**
- * In order to hide the Tuple3(actualValue, inDegree, outDegree) vertex value from the user,
- * another function will be called from {@link org.apache.flink.graph.spargel.VertexCentricIteration}.
- *
- * This function will retrieve the vertex from the vertexState and will set its degrees, afterwards calling
- * the regular sendMessages function.
- *
- * @param newVertexState
- * @throws Exception
+ * Retrieves the vertex in-degree (number of in-coming edges).
+ * @return The in-degree of this vertex if the {@link IterationConfiguration#setOptDegrees(boolean)}
+ * option has been set; -1 otherwise.
*/
- void sendMessagesFromVertexCentricIteration(Vertex<VertexKey, Tuple3<VertexValue, Long, Long>> newVertexState)
- throws Exception {
- VertexWithDegrees<VertexKey, VertexValue> vertex = new VertexWithDegrees<VertexKey, VertexValue>(newVertexState.getId(),
- newVertexState.getValue().f0);
- vertex.setInDegree(newVertexState.getValue().f1);
- vertex.setOutDegree(newVertexState.getValue().f2);
+ public long getInDegree() {
+ return inDegree;
+ }
+
+ void setInDegree(long inDegree) {
+ this.inDegree = inDegree;
+ }
+
+ /**
+ * Retrieve the vertex out-degree (number of out-going edges).
+ * @return The out-degree of this vertex if the {@link IterationConfiguration#setOptDegrees(boolean)}
+ * option has been set; -1 otherwise.
+ */
+ public long getOutDegree() {
+ return outDegree;
+ }
- sendMessages(vertex);
+ void setOutDegree(long outDegree) {
+ this.outDegree = outDegree;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e2f28d47/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
index 915a3ee..f8e64b6 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
@@ -29,10 +29,8 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.common.functions.RichCoGroupFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.CoGroupOperator;
import org.apache.flink.api.java.operators.CustomUnaryOperation;
-import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
@@ -43,7 +41,6 @@ import org.apache.flink.graph.Edge;
import org.apache.flink.graph.EdgeDirection;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.VertexWithDegrees;
import org.apache.flink.util.Collector;
import com.google.common.base.Preconditions;
@@ -72,34 +69,33 @@ import com.google.common.base.Preconditions;
* Vertex-centric graph iterations are are run by calling
* {@link Graph#runVertexCentricIteration(VertexUpdateFunction, MessagingFunction, int)}.
*
- * @param <VertexKey> The type of the vertex key (the vertex identifier).
- * @param <VertexValue> The type of the vertex value (the state of the vertex).
+ * @param <K> The type of the vertex key (the vertex identifier).
+ * @param <VV> The type of the vertex value (the state of the vertex).
* @param <Message> The type of the message sent between vertices along the edges.
- * @param <EdgeValue> The type of the values that are associated with the edges.
+ * @param <EV> The type of the values that are associated with the edges.
*/
-public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
- implements CustomUnaryOperation<Vertex<VertexKey, VertexValue>, Vertex<VertexKey, VertexValue>>
+public class VertexCentricIteration<K, VV, Message, EV>
+ implements CustomUnaryOperation<Vertex<K, VV>, Vertex<K, VV>>
{
- private final VertexUpdateFunction<VertexKey, VertexValue, Message> updateFunction;
+ private final VertexUpdateFunction<K, VV, Message> updateFunction;
- private final MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction;
+ private final MessagingFunction<K, VV, Message, EV> messagingFunction;
- private final DataSet<Edge<VertexKey, EdgeValue>> edgesWithValue;
+ private final DataSet<Edge<K, EV>> edgesWithValue;
private final int maximumNumberOfIterations;
private final TypeInformation<Message> messageType;
- private DataSet<Vertex<VertexKey, VertexValue>> initialVertices;
+ private DataSet<Vertex<K, VV>> initialVertices;
private VertexCentricConfiguration configuration;
- private DataSet<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> verticesWithDegrees;
// ----------------------------------------------------------------------------------
- private VertexCentricIteration(VertexUpdateFunction<VertexKey, VertexValue, Message> uf,
- MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf,
- DataSet<Edge<VertexKey, EdgeValue>> edgesWithValue,
+ private VertexCentricIteration(VertexUpdateFunction<K, VV, Message> uf,
+ MessagingFunction<K, VV, Message, EV> mf,
+ DataSet<Edge<K, EV>> edgesWithValue,
int maximumNumberOfIterations)
{
Preconditions.checkNotNull(uf);
@@ -114,7 +110,7 @@ public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
this.messageType = getMessageType(mf);
}
- private TypeInformation<Message> getMessageType(MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf) {
+ private TypeInformation<Message> getMessageType(MessagingFunction<K, VV, Message, EV> mf) {
return TypeExtractor.createTypeInfo(MessagingFunction.class, mf.getClass(), 2, null, null);
}
@@ -132,7 +128,7 @@ public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
* @see org.apache.flink.api.java.operators.CustomUnaryOperation#setInput(org.apache.flink.api.java.DataSet)
*/
@Override
- public void setInput(DataSet<Vertex<VertexKey, VertexValue>> inputData) {
+ public void setInput(DataSet<Vertex<K, VV>> inputData) {
this.initialVertices = inputData;
}
@@ -142,17 +138,17 @@ public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
* @return The operator that represents this vertex-centric graph computation.
*/
@Override
- public DataSet<Vertex<VertexKey, VertexValue>> createResult() {
+ public DataSet<Vertex<K, VV>> createResult() {
if (this.initialVertices == null) {
throw new IllegalStateException("The input data set has not been set.");
}
// prepare some type information
- TypeInformation<VertexKey> keyType = ((TupleTypeInfo<?>) initialVertices.getType()).getTypeAt(0);
- TypeInformation<Tuple2<VertexKey, Message>> messageTypeInfo = new TupleTypeInfo<Tuple2<VertexKey,Message>>(keyType, messageType);
+ TypeInformation<K> keyType = ((TupleTypeInfo<?>) initialVertices.getType()).getTypeAt(0);
+ TypeInformation<Tuple2<K, Message>> messageTypeInfo = new TupleTypeInfo<Tuple2<K,Message>>(keyType, messageType);
// create a graph
- Graph<VertexKey, VertexValue, EdgeValue> graph =
+ Graph<K, VV, EV> graph =
Graph.fromDataSet(initialVertices, edgesWithValue, ExecutionEnvironment.getExecutionEnvironment());
// check whether the numVertices option is set and, if so, compute the total number of vertices
@@ -194,21 +190,21 @@ public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
* @param uf The function that updates the state of the vertices from the incoming messages.
* @param mf The function that turns changed vertex states into messages along the edges.
*
- * @param <VertexKey> The type of the vertex key (the vertex identifier).
- * @param <VertexValue> The type of the vertex value (the state of the vertex).
+ * @param <K> The type of the vertex key (the vertex identifier).
+ * @param <VV> The type of the vertex value (the state of the vertex).
* @param <Message> The type of the message sent between vertices along the edges.
- * @param <EdgeValue> The type of the values that are associated with the edges.
+ * @param <EV> The type of the values that are associated with the edges.
*
* @return An in stance of the vertex-centric graph computation operator.
*/
- public static final <VertexKey, VertexValue, Message, EdgeValue>
- VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue> withEdges(
- DataSet<Edge<VertexKey, EdgeValue>> edgesWithValue,
- VertexUpdateFunction<VertexKey, VertexValue, Message> uf,
- MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf,
+ public static final <K, VV, Message, EV>
+ VertexCentricIteration<K, VV, Message, EV> withEdges(
+ DataSet<Edge<K, EV>> edgesWithValue,
+ VertexUpdateFunction<K, VV, Message> uf,
+ MessagingFunction<K, VV, Message, EV> mf,
int maximumNumberOfIterations)
{
- return new VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>(uf, mf, edgesWithValue, maximumNumberOfIterations);
+ return new VertexCentricIteration<K, VV, Message, EV>(uf, mf, edgesWithValue, maximumNumberOfIterations);
}
/**
@@ -231,21 +227,21 @@ public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
// Wrapping UDFs
// --------------------------------------------------------------------------------------------
- private static final class VertexUpdateUdf<VertexKey, VertexValue, Message>
- extends RichCoGroupFunction<Tuple2<VertexKey, Message>, Vertex<VertexKey, VertexValue>, Vertex<VertexKey, VertexValue>>
- implements ResultTypeQueryable<Vertex<VertexKey, VertexValue>>
+ private static abstract class VertexUpdateUdf<K, VVWithDegrees, Message> extends RichCoGroupFunction<
+ Tuple2<K, Message>, Vertex<K, VVWithDegrees>, Vertex<K, VVWithDegrees>>
+ implements ResultTypeQueryable<Vertex<K, VVWithDegrees>>
{
private static final long serialVersionUID = 1L;
- final VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction;
+ final VertexUpdateFunction<K, VVWithDegrees, Message> vertexUpdateFunction;
final MessageIterator<Message> messageIter = new MessageIterator<Message>();
- private transient TypeInformation<Vertex<VertexKey, VV>> resultType;
+ private transient TypeInformation<Vertex<K, VVWithDegrees>> resultType;
- private VertexUpdateUdf(VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction,
- TypeInformation<Vertex<VertexKey, VV>> resultType)
+ private VertexUpdateUdf(VertexUpdateFunction<K, VVWithDegrees, Message> vertexUpdateFunction,
+ TypeInformation<Vertex<K, VVWithDegrees>> resultType)
{
this.vertexUpdateFunction = vertexUpdateFunction;
this.resultType = resultType;
@@ -265,27 +261,26 @@ public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
}
@Override
- public TypeInformation<Vertex<VertexKey, VV>> getProducedType() {
+ public TypeInformation<Vertex<K, VVWithDegrees>> getProducedType() {
return this.resultType;
}
}
- private static final class VertexUpdateUdfSimpleVertexValue<VertexKey, VertexValue, Message>
- extends VertexUpdateUdf<VertexKey, VertexValue, Message> {
+ @SuppressWarnings("serial")
+ private static final class VertexUpdateUdfSimpleVV<K, VV, Message> extends VertexUpdateUdf<K, VV, Message> {
-
- private VertexUpdateUdfSimpleVertexValue(VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction, TypeInformation<Vertex<VertexKey, VertexValue>> resultType) {
+ private VertexUpdateUdfSimpleVV(VertexUpdateFunction<K, VV, Message> vertexUpdateFunction, TypeInformation<Vertex<K, VV>> resultType) {
super(vertexUpdateFunction, resultType);
}
@Override
- public void coGroup(Iterable<Tuple2<VertexKey, Message>> messages,
- Iterable<Vertex<VertexKey, VertexValue>> vertex,
- Collector<Vertex<VertexKey, VertexValue>> out) throws Exception {
- final Iterator<Vertex<VertexKey, VertexValue>> vertexIter = vertex.iterator();
+ public void coGroup(Iterable<Tuple2<K, Message>> messages,
+ Iterable<Vertex<K, VV>> vertex,
+ Collector<Vertex<K, VV>> out) throws Exception {
+ final Iterator<Vertex<K, VV>> vertexIter = vertex.iterator();
if (vertexIter.hasNext()) {
- Vertex<VertexKey, VertexValue> vertexState = vertexIter.next();
+ Vertex<K, VV> vertexState = vertexIter.next();
@SuppressWarnings("unchecked")
Iterator<Tuple2<?, Message>> downcastIter = (Iterator<Tuple2<?, Message>>) (Iterator<?>) messages.iterator();
@@ -295,11 +290,11 @@ public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
vertexUpdateFunction.updateVertex(vertexState, messageIter);
}
else {
- final Iterator<Tuple2<VertexKey, Message>> messageIter = messages.iterator();
+ final Iterator<Tuple2<K, Message>> messageIter = messages.iterator();
if (messageIter.hasNext()) {
String message = "Target vertex does not exist!.";
try {
- Tuple2<VertexKey, Message> next = messageIter.next();
+ Tuple2<K, Message> next = messageIter.next();
message = "Target vertex '" + next.f0 + "' does not exist!.";
} catch (Throwable t) {}
throw new Exception(message);
@@ -310,36 +305,39 @@ public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
}
}
- private static final class VertexUpdateUdfVertexValueWithDegrees<VertexKey, VertexValue, Message> extends VertexUpdateUdf<VertexKey,
- Tuple3<VertexValue, Long, Long>, VertexValue, Message> {
-
+ @SuppressWarnings("serial")
+ private static final class VertexUpdateUdfVVWithDegrees<K, VV, Message> extends VertexUpdateUdf<K, Tuple3<VV, Long, Long>, Message> {
- private VertexUpdateUdfVertexValueWithDegrees(VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction, TypeInformation<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> resultType) {
+ private VertexUpdateUdfVVWithDegrees(VertexUpdateFunction<K, Tuple3<VV, Long, Long>, Message> vertexUpdateFunction,
+ TypeInformation<Vertex<K, Tuple3<VV, Long, Long>>> resultType) {
super(vertexUpdateFunction, resultType);
}
-
+
@Override
- public void coGroup(Iterable<Tuple2<VertexKey, Message>> messages,
- Iterable<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> vertex,
- Collector<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> out) throws Exception {
- final Iterator<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> vertexIter = vertex.iterator();
+ public void coGroup(Iterable<Tuple2<K, Message>> messages, Iterable<Vertex<K, Tuple3<VV, Long, Long>>> vertex,
+ Collector<Vertex<K, Tuple3<VV, Long, Long>>> out) throws Exception {
+ final Iterator<Vertex<K, Tuple3<VV, Long, Long>>> vertexIter = vertex.iterator();
+
if (vertexIter.hasNext()) {
- Vertex<VertexKey, Tuple3<VertexValue, Long, Long>> vertexState = vertexIter.next();
-
+ Vertex<K, Tuple3<VV, Long, Long>> vertexWithDegrees = vertexIter.next();
+
@SuppressWarnings("unchecked")
Iterator<Tuple2<?, Message>> downcastIter = (Iterator<Tuple2<?, Message>>) (Iterator<?>) messages.iterator();
messageIter.setSource(downcastIter);
- vertexUpdateFunction.setOutputWithDegrees(vertexState, out);
- vertexUpdateFunction.updateVertexFromVertexCentricIteration(vertexState, messageIter);
+ vertexUpdateFunction.setInDegree(vertexWithDegrees.f1.f1);
+ vertexUpdateFunction.setOutDegree(vertexWithDegrees.f1.f2);
+
+ vertexUpdateFunction.setOutputWithDegrees(vertexWithDegrees, out);
+ vertexUpdateFunction.updateVertexFromVertexCentricIteration(vertexWithDegrees, messageIter);
}
else {
- final Iterator<Tuple2<VertexKey, Message>> messageIter = messages.iterator();
+ final Iterator<Tuple2<K, Message>> messageIter = messages.iterator();
if (messageIter.hasNext()) {
String message = "Target vertex does not exist!.";
try {
- Tuple2<VertexKey, Message> next = messageIter.next();
+ Tuple2<K, Message> next = messageIter.next();
message = "Target vertex '" + next.f0 + "' does not exist!.";
} catch (Throwable t) {}
throw new Exception(message);
@@ -353,19 +351,19 @@ public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
/*
* UDF that encapsulates the message sending function for graphs where the edges have an associated value.
*/
- private static final class MessagingUdfWithEdgeValues<VertexKey, VertexValue, Message, EdgeValue>
- extends RichCoGroupFunction<Edge<VertexKey, EdgeValue>, Vertex<VertexKey, VertexValue>, Tuple2<VertexKey, Message>>
- implements ResultTypeQueryable<Tuple2<VertexKey, Message>>
+ private static abstract class MessagingUdfWithEdgeValues<K, VVWithDegrees, VV, Message, EV>
+ extends RichCoGroupFunction<Edge<K, EV>, Vertex<K, VVWithDegrees>, Tuple2<K, Message>>
+ implements ResultTypeQueryable<Tuple2<K, Message>>
{
private static final long serialVersionUID = 1L;
- final MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction;
+ final MessagingFunction<K, VV, Message, EV> messagingFunction;
- private transient TypeInformation<Tuple2<VertexKey, Message>> resultType;
-
-
- private MessagingUdfWithEdgeValues(MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction,
- TypeInformation<Tuple2<VertexKey, Message>> resultType)
+ private transient TypeInformation<Tuple2<K, Message>> resultType;
+
+
+ private MessagingUdfWithEdgeValues(MessagingFunction<K, VV, Message, EV> messagingFunction,
+ TypeInformation<Tuple2<K, Message>> resultType)
{
this.messagingFunction = messagingFunction;
this.resultType = resultType;
@@ -386,54 +384,62 @@ public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
}
@Override
- public TypeInformation<Tuple2<VertexKey, Message>> getProducedType() {
+ public TypeInformation<Tuple2<K, Message>> getProducedType() {
return this.resultType;
}
}
- private static final class MessagingUdfWithEdgeValuesSimpleVertexValue<VertexKey, VertexValue, Message, EdgeValue>
- extends MessagingUdfWithEdgeValues<VertexKey, VertexValue, VertexValue, Message, EdgeValue> {
+ @SuppressWarnings("serial")
+ private static final class MessagingUdfWithEVsSimpleVV<K, VV, Message, EV>
+ extends MessagingUdfWithEdgeValues<K, VV, VV, Message, EV> {
- private MessagingUdfWithEdgeValuesSimpleVertexValue(MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction,
- TypeInformation<Tuple2<VertexKey, Message>> resultType) {
+ private MessagingUdfWithEVsSimpleVV(MessagingFunction<K, VV, Message, EV> messagingFunction,
+ TypeInformation<Tuple2<K, Message>> resultType) {
super(messagingFunction, resultType);
}
@Override
- public void coGroup(Iterable<Edge<VertexKey, EdgeValue>> edges,
- Iterable<Vertex<VertexKey, VertexValue>> state,
- Collector<Tuple2<VertexKey, Message>> out) throws Exception {
- final Iterator<Vertex<VertexKey, VertexValue>> stateIter = state.iterator();
-
+ public void coGroup(Iterable<Edge<K, EV>> edges,
+ Iterable<Vertex<K, VV>> state,
+ Collector<Tuple2<K, Message>> out) throws Exception {
+ final Iterator<Vertex<K, VV>> stateIter = state.iterator();
+
if (stateIter.hasNext()) {
- Vertex<VertexKey, VertexValue> newVertexState = stateIter.next();
+ Vertex<K, VV> newVertexState = stateIter.next();
messagingFunction.set((Iterator<?>) edges.iterator(), out);
messagingFunction.sendMessages(newVertexState);
}
}
}
- private static final class MessagingUdfWithEdgeValuesVertexValueWithDegrees<VertexKey, VertexValue, Message, EdgeValue>
- extends MessagingUdfWithEdgeValues<VertexKey, Tuple3<VertexValue, Long, Long>, VertexValue, Message, EdgeValue> {
+ @SuppressWarnings("serial")
+ private static final class MessagingUdfWithEVsVVWithDegrees<K, VV, Message, EV>
+ extends MessagingUdfWithEdgeValues<K, Tuple3<VV, Long, Long>, VV, Message, EV> {
+ private Vertex<K, VV> nextVertex = new Vertex<K, VV>();
- private MessagingUdfWithEdgeValuesVertexValueWithDegrees
- (MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction,
- TypeInformation<Tuple2<VertexKey, Message>> resultType) {
+ private MessagingUdfWithEVsVVWithDegrees(MessagingFunction<K, VV, Message, EV> messagingFunction,
+ TypeInformation<Tuple2<K, Message>> resultType) {
super(messagingFunction, resultType);
}
@Override
- public void coGroup(Iterable<Edge<VertexKey, EdgeValue>> edges,
- Iterable<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> state,
- Collector<Tuple2<VertexKey, Message>> out) throws Exception {
-
- final Iterator<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> stateIter = state.iterator();
+ public void coGroup(Iterable<Edge<K, EV>> edges, Iterable<Vertex<K, Tuple3<VV, Long, Long>>> state,
+ Collector<Tuple2<K, Message>> out) throws Exception {
+ final Iterator<Vertex<K, Tuple3<VV, Long, Long>>> stateIter = state.iterator();
+
if (stateIter.hasNext()) {
- Vertex<VertexKey, Tuple3<VertexValue, Long, Long>> newVertexState = stateIter.next();
+ Vertex<K, Tuple3<VV, Long, Long>> vertexWithDegrees = stateIter.next();
+
+ nextVertex.setField(vertexWithDegrees.f0, 0);
+ nextVertex.setField(vertexWithDegrees.f1.f0, 1);
+
+ messagingFunction.setInDegree(vertexWithDegrees.f1.f1);
+ messagingFunction.setOutDegree(vertexWithDegrees.f1.f2);
+
messagingFunction.set((Iterator<?>) edges.iterator(), out);
- messagingFunction.sendMessagesFromVertexCentricIteration(newVertexState);
+ messagingFunction.sendMessages(nextVertex);
}
}
}
@@ -454,15 +460,14 @@ public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
* @param equalToArg the argument for the equalTo within the coGroup
* @return the messaging function
*/
- private CoGroupOperator<?, ?, Tuple2<VertexKey, Message>> buildMessagingFunction(
- DeltaIteration<Vertex<VertexKey, VertexValue>, Vertex<VertexKey, VertexValue>> iteration,
- TypeInformation<Tuple2<VertexKey, Message>> messageTypeInfo, int whereArg, int equalToArg) {
+ private CoGroupOperator<?, ?, Tuple2<K, Message>> buildMessagingFunction(
+ DeltaIteration<Vertex<K, VV>, Vertex<K, VV>> iteration,
+ TypeInformation<Tuple2<K, Message>> messageTypeInfo, int whereArg, int equalToArg) {
// build the messaging function (co group)
- CoGroupOperator<?, ?, Tuple2<VertexKey, Message>> messages;
- MessagingUdfWithEdgeValues<VertexKey, VertexValue, VertexValue, Message, EdgeValue> messenger =
- new MessagingUdfWithEdgeValuesSimpleVertexValue<VertexKey, VertexValue, Message, EdgeValue>(
- messagingFunction, messageTypeInfo);
+ CoGroupOperator<?, ?, Tuple2<K, Message>> messages;
+ MessagingUdfWithEdgeValues<K, VV, VV, Message, EV> messenger =
+ new MessagingUdfWithEVsSimpleVV<K, VV, Message, EV>(messagingFunction, messageTypeInfo);
messages = this.edgesWithValue.coGroup(iteration.getWorkset()).where(whereArg)
.equalTo(equalToArg).with(messenger);
@@ -489,16 +494,14 @@ public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
* @param equalToArg the argument for the equalTo within the coGroup
* @return the messaging function
*/
- private CoGroupOperator<?, ?, Tuple2<VertexKey, Message>> buildMessagingFunctionVerticesWithDegrees(
- DeltaIteration<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>,
- Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> iteration,
- TypeInformation<Tuple2<VertexKey, Message>> messageTypeInfo, int whereArg, int equalToArg) {
+ private CoGroupOperator<?, ?, Tuple2<K, Message>> buildMessagingFunctionVerticesWithDegrees(
+ DeltaIteration<Vertex<K, Tuple3<VV, Long, Long>>, Vertex<K, Tuple3<VV, Long, Long>>> iteration,
+ TypeInformation<Tuple2<K, Message>> messageTypeInfo, int whereArg, int equalToArg) {
// build the messaging function (co group)
- CoGroupOperator<?, ?, Tuple2<VertexKey, Message>> messages;
- MessagingUdfWithEdgeValues<VertexKey, Tuple3<VertexValue, Long, Long>, VertexValue, Message, EdgeValue> messenger =
- new MessagingUdfWithEdgeValuesVertexValueWithDegrees<VertexKey, VertexValue, Message, EdgeValue>(
- messagingFunction, messageTypeInfo);
+ CoGroupOperator<?, ?, Tuple2<K, Message>> messages;
+ MessagingUdfWithEdgeValues<K, Tuple3<VV, Long, Long>, VV, Message, EV> messenger =
+ new MessagingUdfWithEVsVVWithDegrees<K, VV, Message, EV>(messagingFunction, messageTypeInfo);
messages = this.edgesWithValue.coGroup(iteration.getWorkset()).where(whereArg)
.equalTo(equalToArg).with(messenger);
@@ -518,17 +521,10 @@ public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
/**
* Helper method which sets up an iteration with the given vertex value(either simple or with degrees)
*
- * @param vertices
- * @param <VV>
+ * @param iteration
*/
- private <VV> DeltaIteration<Vertex<VertexKey, VV>, Vertex<VertexKey, VV>> setUpIteration(
- DataSet<Vertex<VertexKey, VV>> vertices) {
-
- final int[] zeroKeyPos = new int[] {0};
-
- final DeltaIteration<Vertex<VertexKey, VV>, Vertex<VertexKey, VV>> iteration =
- vertices.iterateDelta(vertices, this.maximumNumberOfIterations, zeroKeyPos);
+ private void setUpIteration(DeltaIteration<?, ?> iteration) {
// set up the iteration operator
if (this.configuration != null) {
@@ -546,8 +542,6 @@ public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
// no configuration provided; set default name
iteration.name("Vertex-centric iteration (" + updateFunction + " | " + messagingFunction + ")");
}
-
- return iteration;
}
/**
@@ -557,14 +551,16 @@ public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
* @param messageTypeInfo
* @return the operator
*/
- private DataSet<Vertex<VertexKey, VertexValue>> createResultSimpleVertex(EdgeDirection messagingDirection,
- TypeInformation<Tuple2<VertexKey, Message>> messageTypeInfo) {
- DataSet<Tuple2<VertexKey, Message>> messages;
+ private DataSet<Vertex<K, VV>> createResultSimpleVertex(EdgeDirection messagingDirection,
+ TypeInformation<Tuple2<K, Message>> messageTypeInfo) {
+
+ DataSet<Tuple2<K, Message>> messages;
- TypeInformation<Vertex<VertexKey, VertexValue>> vertexTypes = initialVertices.getType();
+ TypeInformation<Vertex<K, VV>> vertexTypes = initialVertices.getType();
- final DeltaIteration<Vertex<VertexKey, VertexValue>, Vertex<VertexKey, VertexValue>> iteration =
- setUpIteration(this.initialVertices);
+ final DeltaIteration<Vertex<K, VV>, Vertex<K, VV>> iteration =
+ initialVertices.iterateDelta(initialVertices, this.maximumNumberOfIterations, 0);
+ setUpIteration(iteration);
switch (messagingDirection) {
case IN:
@@ -581,11 +577,11 @@ public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
throw new IllegalArgumentException("Illegal edge direction");
}
- VertexUpdateUdf<VertexKey, VertexValue, VertexValue, Message> updateUdf =
- new VertexUpdateUdfSimpleVertexValue<VertexKey, VertexValue, Message>(updateFunction, vertexTypes);
+ VertexUpdateUdf<K, VV, Message> updateUdf =
+ new VertexUpdateUdfSimpleVV<K, VV, Message>(updateFunction, vertexTypes);
// build the update function (co group)
- CoGroupOperator<?, ?, Vertex<VertexKey, VertexValue>> updates =
+ CoGroupOperator<?, ?, Vertex<K, VV>> updates =
messages.coGroup(iteration.getSolutionSet()).where(0).equalTo(0).with(updateUdf);
configureUpdateFunction(updates);
@@ -602,46 +598,44 @@ public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
* @param messageTypeInfo
* @return the operator
*/
- private DataSet<Vertex<VertexKey, VertexValue>> createResultVerticesWithDegrees(
- Graph<VertexKey, VertexValue, EdgeValue> graph,
- EdgeDirection messagingDirection,
- TypeInformation<Tuple2<VertexKey, Message>> messageTypeInfo) {
+ @SuppressWarnings("serial")
+ private DataSet<Vertex<K, VV>> createResultVerticesWithDegrees(Graph<K, VV, EV> graph, EdgeDirection messagingDirection,
+ TypeInformation<Tuple2<K, Message>> messageTypeInfo) {
- DataSet<Tuple2<VertexKey, Message>> messages;
+ DataSet<Tuple2<K, Message>> messages;
this.updateFunction.setOptDegrees(this.configuration.isOptDegrees());
- DataSet<Tuple2<VertexKey, Long>> inDegrees = graph.inDegrees();
- DataSet<Tuple2<VertexKey, Long>> outDegrees = graph.outDegrees();
+ DataSet<Tuple2<K, Long>> inDegrees = graph.inDegrees();
+ DataSet<Tuple2<K, Long>> outDegrees = graph.outDegrees();
- DataSet<Tuple3<VertexKey, Long, Long>> degrees = inDegrees.join(outDegrees).where(0).equalTo(0)
- .with(new FlatJoinFunction<Tuple2<VertexKey, Long>, Tuple2<VertexKey, Long>, Tuple3<VertexKey, Long, Long>>() {
+ DataSet<Tuple3<K, Long, Long>> degrees = inDegrees.join(outDegrees).where(0).equalTo(0)
+ .with(new FlatJoinFunction<Tuple2<K, Long>, Tuple2<K, Long>, Tuple3<K, Long, Long>>() {
@Override
- public void join(Tuple2<VertexKey, Long> first, Tuple2<VertexKey, Long> second, Collector<Tuple3<VertexKey, Long, Long>> out) throws Exception {
- out.collect(new Tuple3<VertexKey, Long, Long>(first.f0, first.f1, second.f1));
+ public void join(Tuple2<K, Long> first, Tuple2<K, Long> second, Collector<Tuple3<K, Long, Long>> out) {
+ out.collect(new Tuple3<K, Long, Long>(first.f0, first.f1, second.f1));
}
- });
+ }).withForwardedFieldsFirst("f0;f1").withForwardedFieldsSecond("f1");
- DataSet<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> verticesWithDegrees= initialVertices
+ DataSet<Vertex<K, Tuple3<VV, Long, Long>>> verticesWithDegrees = initialVertices
.join(degrees).where(0).equalTo(0)
- .with(new FlatJoinFunction<Vertex<VertexKey,VertexValue>, Tuple3<VertexKey,Long,Long>, Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>>() {
+ .with(new FlatJoinFunction<Vertex<K,VV>, Tuple3<K,Long,Long>, Vertex<K, Tuple3<VV, Long, Long>>>() {
@Override
- public void join(Vertex<VertexKey, VertexValue> vertex,
- Tuple3<VertexKey, Long, Long> degrees,
- Collector<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> out) throws Exception {
+ public void join(Vertex<K, VV> vertex, Tuple3<K, Long, Long> degrees,
+ Collector<Vertex<K, Tuple3<VV, Long, Long>>> out) throws Exception {
- out.collect(new VertexWithDegrees<VertexKey, Tuple3<VertexValue, Long, Long>>(vertex.getId(),
- new Tuple3<VertexValue, Long, Long>(vertex.getValue(), degrees.f1, degrees.f2)));
+ out.collect(new Vertex<K, Tuple3<VV, Long, Long>>(vertex.getId(),
+ new Tuple3<VV, Long, Long>(vertex.getValue(), degrees.f1, degrees.f2)));
}
- });
+ }).withForwardedFieldsFirst("f0");
// add type info
- TypeInformation<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> vertexTypes = verticesWithDegrees.getType();
+ TypeInformation<Vertex<K, Tuple3<VV, Long, Long>>> vertexTypes = verticesWithDegrees.getType();
- final DeltaIteration<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>,
- Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> iteration =
- setUpIteration(verticesWithDegrees);
+ final DeltaIteration<Vertex<K, Tuple3<VV, Long, Long>>, Vertex<K, Tuple3<VV, Long, Long>>> iteration =
+ verticesWithDegrees.iterateDelta(verticesWithDegrees, this.maximumNumberOfIterations, 0);
+ setUpIteration(iteration);
switch (messagingDirection) {
case IN:
@@ -658,24 +652,26 @@ public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
throw new IllegalArgumentException("Illegal edge direction");
}
- VertexUpdateUdf<VertexKey, Tuple3<VertexValue, Long, Long>, VertexValue, Message> updateUdf =
- new VertexUpdateUdfVertexValueWithDegrees<VertexKey, VertexValue, Message>(updateFunction, vertexTypes);
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ VertexUpdateUdf<K, Tuple3<VV, Long, Long>, Message> updateUdf =
+ new VertexUpdateUdfVVWithDegrees(updateFunction, vertexTypes);
// build the update function (co group)
- CoGroupOperator<?, ?, Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> updates =
+ CoGroupOperator<?, ?, Vertex<K, Tuple3<VV, Long, Long>>> updates =
messages.coGroup(iteration.getSolutionSet()).where(0).equalTo(0).with(updateUdf);
configureUpdateFunction(updates);
- return iteration.closeWith(updates, updates).map(new MapFunction<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>, Vertex<VertexKey, VertexValue>>() {
- @Override
- public Vertex<VertexKey, VertexValue> map(Vertex<VertexKey, Tuple3<VertexValue, Long, Long>> vertex) throws Exception {
- return new Vertex<VertexKey, VertexValue>(vertex.getId(), vertex.getValue().f0);
- }
- });
+ return iteration.closeWith(updates, updates).map(
+ new MapFunction<Vertex<K, Tuple3<VV, Long, Long>>, Vertex<K, VV>>() {
+
+ public Vertex<K, VV> map(Vertex<K, Tuple3<VV, Long, Long>> vertex) {
+ return new Vertex<K, VV>(vertex.getId(), vertex.getValue().f0);
+ }
+ });
}
- private <VV> void configureUpdateFunction(CoGroupOperator<?, ?, Vertex<VertexKey, VV>> updates) {
+ private <VVWithDegree> void configureUpdateFunction(CoGroupOperator<?, ?, Vertex<K, VVWithDegree>> updates) {
// configure coGroup update function with name and broadcast variables
updates = updates.name("Vertex State Updates");
@@ -688,4 +684,4 @@ public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
// let the operator know that we preserve the key field
updates.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0");
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/e2f28d47/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
index bc2e857..9930b50 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
@@ -24,9 +24,7 @@ import java.util.Collection;
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.graph.InaccessibleMethodException;
import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.VertexWithDegrees;
import org.apache.flink.types.Value;
import org.apache.flink.util.Collector;
@@ -35,11 +33,11 @@ import org.apache.flink.util.Collector;
* incoming messages. The central method is {@link #updateVertex(Comparable, Object, MessageIterator)}, which is
* invoked once per vertex per superstep.
*
- * <VertexKey> The vertex key type.
- * <VertexValue> The vertex value type.
+ * <K> The vertex key type.
+ * <VV> The vertex value type.
* <Message> The message type.
*/
-public abstract class VertexUpdateFunction<VertexKey, VertexValue, Message> implements Serializable {
+public abstract class VertexUpdateFunction<K, VV, Message> implements Serializable {
private static final long serialVersionUID = 1L;
@@ -50,11 +48,12 @@ public abstract class VertexUpdateFunction<VertexKey, VertexValue, Message> impl
private long numberOfVertices = -1L;
- public long getNumberOfVertices() throws Exception{
- if (numberOfVertices == -1) {
- throw new InaccessibleMethodException("The number of vertices option is not set. " +
- "To access the number of vertices, call iterationConfiguration.setOptNumVertices(true).");
- }
+ /**
+ * Retrieves the number of vertices in the graph.
+ * @return the number of vertices if the {@link IterationConfiguration#setOptNumVertices(boolean)}
+ * option has been set; -1 otherwise.
+ */
+ public long getNumberOfVertices() {
return numberOfVertices;
}
@@ -66,7 +65,7 @@ public abstract class VertexUpdateFunction<VertexKey, VertexValue, Message> impl
private boolean optDegrees;
- public boolean isOptDegrees() {
+ boolean isOptDegrees() {
return optDegrees;
}
@@ -80,7 +79,7 @@ public abstract class VertexUpdateFunction<VertexKey, VertexValue, Message> impl
/**
* This method is invoked once per vertex per superstep. It receives the current state of the vertex, as well as
- * the incoming messages. It may set a new vertex state via {@link #setNewVertexValue(Object)}. If the vertex
+ * the incoming messages. It may set a new vertex state via {@link #setNewVV(Object)}. If the vertex
* state is changed, it will trigger the sending of messages via the {@link MessagingFunction}.
*
* @param vertex The vertex.
@@ -88,7 +87,7 @@ public abstract class VertexUpdateFunction<VertexKey, VertexValue, Message> impl
*
* @throws Exception The computation may throw exceptions, which causes the superstep to fail.
*/
- public abstract void updateVertex(Vertex<VertexKey, VertexValue> vertex, MessageIterator<Message> inMessages) throws Exception;
+ public abstract void updateVertex(Vertex<K, VV> vertex, MessageIterator<Message> inMessages) throws Exception;
/**
* This method is executed one per superstep before the vertex update function is invoked for each vertex.
@@ -109,7 +108,7 @@ public abstract class VertexUpdateFunction<VertexKey, VertexValue, Message> impl
*
* @param newValue The new vertex value.
*/
- public void setNewVertexValue(VertexValue newValue) {
+ public void setNewVertexValue(VV newValue) {
if(isOptDegrees()) {
outValWithDegrees.f1.f0 = newValue;
outWithDegrees.collect(outValWithDegrees);
@@ -167,30 +166,58 @@ public abstract class VertexUpdateFunction<VertexKey, VertexValue, Message> impl
private IterationRuntimeContext runtimeContext;
- private Collector<Vertex<VertexKey, VertexValue>> out;
+ private Collector<Vertex<K, VV>> out;
- private Collector<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> outWithDegrees;
+ private Collector<Vertex<K, Tuple3<VV, Long, Long>>> outWithDegrees;
- private Vertex<VertexKey, VertexValue> outVal;
+ private Vertex<K, VV> outVal;
- private Vertex<VertexKey, Tuple3<VertexValue, Long, Long>> outValWithDegrees;
+ private Vertex<K, Tuple3<VV, Long, Long>> outValWithDegrees;
+ private long inDegree = -1;
+
+ private long outDegree = -1;
void init(IterationRuntimeContext context) {
this.runtimeContext = context;
}
+ void setOutput(Vertex<K, VV> outVal, Collector<Vertex<K, VV>> out) {
+ this.outVal = outVal;
+ this.out = out;
+ }
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ <ValueWithDegree> void setOutputWithDegrees(Vertex<K, ValueWithDegree> outVal,
+ Collector out) {
+ this.outValWithDegrees = (Vertex<K, Tuple3<VV, Long, Long>>) outVal;
+ this.outWithDegrees = out;
+ }
- void setOutputWithDegrees(Vertex<VertexKey, Tuple3<VertexValue, Long, Long>> outValWithDegrees,
- Collector<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> outWithDegrees) {
- this.outValWithDegrees = outValWithDegrees;
- this.outWithDegrees = outWithDegrees;
+ /**
+ * Retrieves the vertex in-degree (number of in-coming edges).
+ * @return The in-degree of this vertex if the {@link IterationConfiguration#setOptDegrees(boolean)}
+ * option has been set; -1 otherwise.
+ */
+ public long getInDegree() {
+ return inDegree;
}
- void setOutput(Vertex<VertexKey, VertexValue> outVal, Collector<Vertex<VertexKey, VertexValue>> out) {
- this.outVal = outVal;
- this.out = out;
+ void setInDegree(long inDegree) {
+ this.inDegree = inDegree;
+ }
+
+ /**
+ * Retrieve the vertex out-degree (number of out-going edges).
+ * @return The out-degree of this vertex if the {@link IterationConfiguration#setOptDegrees(boolean)}
+ * option has been set; -1 otherwise.
+ */
+ public long getOutDegree() {
+ return outDegree;
+ }
+
+ void setOutDegree(long outDegree) {
+ this.outDegree = outDegree;
}
/**
@@ -204,12 +231,12 @@ public abstract class VertexUpdateFunction<VertexKey, VertexValue, Message> impl
* @param inMessages
* @throws Exception
*/
- void updateVertexFromVertexCentricIteration(Vertex<VertexKey, Tuple3<VertexValue, Long, Long>> vertexState,
+ @SuppressWarnings("unchecked")
+ <VertexWithDegree> void updateVertexFromVertexCentricIteration(Vertex<K, VertexWithDegree> vertexState,
MessageIterator<Message> inMessages) throws Exception {
- VertexWithDegrees<VertexKey, VertexValue> vertex = new VertexWithDegrees<VertexKey, VertexValue>(vertexState.getId(),
- vertexState.getValue().f0);
- vertex.setInDegree(vertexState.getValue().f1);
- vertex.setOutDegree(vertexState.getValue().f2);
+
+ Vertex<K, VV> vertex = new Vertex<K, VV>(vertexState.f0,
+ ((Tuple3<VV, Long, Long>)vertexState.getValue()).f0);
updateVertex(vertex, inMessages);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e2f28d47/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
index e93f581..3fbd0bc 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.spargel.MessageIterator;
http://git-wip-us.apache.org/repos/asf/flink/blob/e2f28d47/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
index 5f5f8b2..ca5d5d9 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
@@ -31,7 +31,6 @@ import org.apache.flink.graph.gsa.GatherFunction;
import org.apache.flink.graph.gsa.GatherSumApplyIteration;
import org.apache.flink.graph.gsa.Neighbor;
import org.apache.flink.graph.gsa.SumFunction;
-import org.apache.flink.graph.IterationConfiguration;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.types.LongValue;
import org.junit.After;
http://git-wip-us.apache.org/repos/asf/flink/blob/e2f28d47/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
index 1c36906..567c015 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
@@ -26,13 +26,10 @@ import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.EdgeDirection;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.IterationConfiguration;
import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.MessagingFunction;
import org.apache.flink.graph.spargel.VertexCentricConfiguration;
@@ -48,6 +45,7 @@ import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import org.apache.flink.graph.utils.VertexToTuple2Map;
@RunWith(Parameterized.class)
public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
@@ -88,6 +86,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
parameters.addBroadcastSetForUpdateFunction("updateBcastSet", env.fromElements(1, 2, 3));
parameters.addBroadcastSetForMessagingFunction("messagingBcastSet", env.fromElements(4, 5, 6));
parameters.registerAggregator("superstepAggregator", new LongSumAggregator());
+ parameters.setOptNumVertices(true);
Graph<Long, Long, Long> result = graph.runVertexCentricIteration(
new UpdateFunction(), new MessageFunction(), 10, parameters);
@@ -136,6 +135,29 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
}
@Test
+ public void testDefaultConfiguration() throws Exception {
+ /*
+ * Test Graph's runVertexCentricIteration when configuration parameters are not provided
+ * i.e. degrees and numVertices will be -1, EdgeDirection will be OUT.
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
+ TestGraphUtils.getLongLongEdges(), env).mapVertices(new AssignOneMapper());
+
+ Graph<Long, Long, Long> result = graph.runVertexCentricIteration(
+ new UpdateFunctionDefault(), new MessageFunctionDefault(), 5);
+
+ result.getVertices().map(new VertexToTuple2Map<Long, Long>()).writeAsCsv(resultPath, "\n", "\t");
+ env.execute();
+ expectedResult = "1 6\n" +
+ "2 6\n" +
+ "3 6\n" +
+ "4 6\n" +
+ "5 6";
+ }
+
+ @Test
public void testIterationDefaultDirection() throws Exception {
/*
@@ -176,7 +198,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
.mapVertices(new InitialiseHashSetMapper());
// configure the iteration
- IterationConfiguration parameters = new IterationConfiguration();
+ VertexCentricConfiguration parameters = new VertexCentricConfiguration();
parameters.setDirection(EdgeDirection.IN);
@@ -208,7 +230,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
.mapVertices(new InitialiseHashSetMapper());
// configure the iteration
- IterationConfiguration parameters = new IterationConfiguration();
+ VertexCentricConfiguration parameters = new VertexCentricConfiguration();
parameters.setDirection(EdgeDirection.ALL);
@@ -227,11 +249,36 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
}
@Test
+ public void testNumVerticesNotSet() throws Exception {
+
+ /*
+ * Test that if the number of vertices option is not set, -1 is returned as value.
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
+ TestGraphUtils.getLongLongEdges(), env);
+
+ DataSet<Vertex<Long, Long>> verticesWithNumVertices = graph.runVertexCentricIteration(new UpdateFunctionNumVertices(),
+ new DummyMessageFunction(), 2).getVertices();
+
+ verticesWithNumVertices.writeAsCsv(resultPath, "\n", "\t");
+ env.execute();
+
+ expectedResult = "1 -1\n" +
+ "2 -1\n" +
+ "3 -1\n" +
+ "4 -1\n" +
+ "5 -1";
+ }
+
+ @Test
public void testInDegreesSet() throws Exception {
/*
- * Test that if the degrees are set, the in degrees can be accessed in every superstep and the value
- * is correctly computed.
+ * Test that if the degrees are set, they can be accessed in every superstep
+ * inside the update function and the value
+ * is correctly computed for degrees in the messaging function.
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -239,14 +286,14 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
TestGraphUtils.getLongLongEdges(), env);
// configure the iteration
- IterationConfiguration parameters = new IterationConfiguration();
+ VertexCentricConfiguration parameters = new VertexCentricConfiguration();
parameters.setOptDegrees(true);
- DataSet<Vertex<Long, Long>> verticesWithInDegree = graph.runVertexCentricIteration(new UpdateFunctionInDegree(),
- new DummyMessageFunction(), 5, parameters).getVertices();
+ DataSet<Vertex<Long, Long>> verticesWithDegrees = graph.runVertexCentricIteration(
+ new UpdateFunctionInDegrees(), new DegreesMessageFunction(), 5, parameters).getVertices();
- verticesWithInDegree.writeAsCsv(resultPath, "\n", "\t");
+ verticesWithDegrees.writeAsCsv(resultPath, "\n", "\t");
env.execute();
expectedResult = "1 1\n" +
@@ -257,41 +304,36 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
}
@Test
- public void testOutDegreesSet() throws Exception {
+ public void testInDegreesNotSet() throws Exception {
/*
- * Test that if the degrees are set, the out degrees can be accessed in every superstep and the value
- * is correctly computed.
+ * Test that if the degrees option is not set, then -1 is returned as a value for in-degree.
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
TestGraphUtils.getLongLongEdges(), env);
- // configure the iteration
- IterationConfiguration parameters = new IterationConfiguration();
-
- parameters.setOptDegrees(true);
+ DataSet<Vertex<Long, Long>> verticesWithDegrees = graph.runVertexCentricIteration(
+ new UpdateFunctionInDegrees(), new DummyMessageFunction(), 2).getVertices();
- DataSet<Vertex<Long, Long>> verticesWithOutDegree = graph.runVertexCentricIteration(new UpdateFunctionOutDegree(),
- new DummyMessageFunction(), 5, parameters).getVertices();
-
- verticesWithOutDegree.writeAsCsv(resultPath, "\n", "\t");
+ verticesWithDegrees.writeAsCsv(resultPath, "\n", "\t");
env.execute();
- expectedResult = "1 2\n" +
- "2 1\n" +
- "3 2\n" +
- "4 1\n" +
- "5 1";
+ expectedResult = "1 -1\n" +
+ "2 -1\n" +
+ "3 -1\n" +
+ "4 -1\n" +
+ "5 -1";
}
@Test
- public void testNumVerticesSet() throws Exception {
+ public void testOutDegreesSet() throws Exception {
/*
- * Test that if the number of vertices option is set, it can be accessed in every superstep and the value
- * is correctly computed.
+ * Test that if the degrees are set, they can be accessed in every superstep
+ * inside the update function and the value
+ * is correctly computed for degrees in the messaging function.
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -299,56 +341,45 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
TestGraphUtils.getLongLongEdges(), env);
// configure the iteration
- IterationConfiguration parameters = new IterationConfiguration();
+ VertexCentricConfiguration parameters = new VertexCentricConfiguration();
- parameters.setOptNumVertices(true);
+ parameters.setOptDegrees(true);
- DataSet<Vertex<Long, Long>> verticesWithNumVertices = graph.runVertexCentricIteration(new UpdateFunctionNumVertices(),
- new DummyMessageFunction(), 5, parameters).getVertices();
+ DataSet<Vertex<Long, Long>> verticesWithDegrees = graph.runVertexCentricIteration(
+ new UpdateFunctionOutDegrees(), new DegreesMessageFunction(), 5, parameters).getVertices();
- verticesWithNumVertices.writeAsCsv(resultPath, "\n", "\t");
+ verticesWithDegrees.writeAsCsv(resultPath, "\n", "\t");
env.execute();
- expectedResult = "1 5\n" +
- "2 5\n" +
- "3 5\n" +
- "4 5\n" +
- "5 5";
+ expectedResult = "1 2\n" +
+ "2 1\n" +
+ "3 2\n" +
+ "4 1\n" +
+ "5 1";
}
@Test
- public void testDegrees() throws Exception {
+ public void testOutDegreesNotSet() throws Exception {
/*
- * Test that if the degrees are set, they can be accessed in every superstep and the value
- * is correctly computed for both in and out degrees.
+ * Test that if the degrees option is not set, then -1 is returned as a value for out-degree.
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Graph<Long, Tuple3<Long, Long, Boolean>, Long> graph = Graph.fromCollection(TestGraphUtils.getLongVerticesWithDegrees(),
+ Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
TestGraphUtils.getLongLongEdges(), env);
- // configure the iteration
- IterationConfiguration parameters = new IterationConfiguration();
-
- parameters.setOptDegrees(true);
-
- DataSet<Vertex<Long, Tuple3<Long, Long, Boolean>>> verticesWithDegrees = graph.runVertexCentricIteration(
- new UpdateFunctionDegrees(), new DegreeMessageFunction(), 5, parameters).getVertices();
+ DataSet<Vertex<Long, Long>> verticesWithDegrees = graph.runVertexCentricIteration(
+ new UpdateFunctionInDegrees(), new DummyMessageFunction(), 2).getVertices();
- verticesWithDegrees.map(new MapFunction<Vertex<Long,Tuple3<Long,Long,Boolean>>, Tuple2<Long, Boolean>>() {
- @Override
- public Tuple2<Long, Boolean> map(Vertex<Long, Tuple3<Long, Long, Boolean>> vertex) throws Exception {
- return new Tuple2<Long, Boolean>(vertex.getId(), vertex.getValue().f2);
- }
- }).writeAsCsv(resultPath, "\n", "\t");
+ verticesWithDegrees.writeAsCsv(resultPath, "\n", "\t");
env.execute();
- expectedResult = "1 true\n" +
- "2 true\n" +
- "3 true\n" +
- "4 true\n" +
- "5 true";
+ expectedResult = "1 -1\n" +
+ "2 -1\n" +
+ "3 -1\n" +
+ "4 -1\n" +
+ "5 -1";
}
@Test
@@ -364,7 +395,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
TestGraphUtils.getLongLongEdges(), env);
// configure the iteration
- IterationConfiguration parameters = new IterationConfiguration();
+ VertexCentricConfiguration parameters = new VertexCentricConfiguration();
parameters.setOptDegrees(true);
parameters.setDirection(EdgeDirection.ALL);
@@ -399,12 +430,36 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
// test aggregator
aggregator = getIterationAggregator("superstepAggregator");
+
+ // test number of vertices
+ Assert.assertEquals(5, getNumberOfVertices());
+
}
@Override
public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
long superstep = getSuperstepNumber();
aggregator.aggregate(superstep);
+
+ setNewVertexValue(vertex.getValue() + 1);
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public static final class UpdateFunctionDefault extends VertexUpdateFunction<Long, Long, Long> {
+
+ LongSumAggregator aggregator = new LongSumAggregator();
+
+ @Override
+ public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
+
+ // test number of vertices
+ Assert.assertEquals(-1, getNumberOfVertices());
+
+ // test degrees
+ Assert.assertEquals(-1, getInDegree());
+ Assert.assertEquals(-1, getOutDegree());
+
setNewVertexValue(vertex.getValue() + 1);
}
}
@@ -421,6 +476,9 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
Assert.assertEquals(4, bcastSet.get(0));
Assert.assertEquals(5, bcastSet.get(1));
Assert.assertEquals(6, bcastSet.get(2));
+
+ // test number of vertices
+ Assert.assertEquals(5, getNumberOfVertices());
// test aggregator
if (getSuperstepNumber() == 2) {
@@ -437,28 +495,18 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
}
@SuppressWarnings("serial")
- public static final class UpdateFunctionInDegree extends VertexUpdateFunction<Long, Long, Long> {
+ public static final class MessageFunctionDefault extends MessagingFunction<Long, Long, Long, Long> {
@Override
- public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
- try {
- setNewVertexValue(((VertexWithDegrees) vertex).getInDegree());
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
-
- @SuppressWarnings("serial")
- public static final class UpdateFunctionOutDegree extends VertexUpdateFunction<Long, Long, Long> {
+ public void sendMessages(Vertex<Long, Long> vertex) {
+ // test number of vertices
+ Assert.assertEquals(-1, getNumberOfVertices());
- @Override
- public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
- try {
- setNewVertexValue(((VertexWithDegrees) vertex).getOutDegree());
- } catch (Exception e) {
- e.printStackTrace();
- }
+ // test degrees
+ Assert.assertEquals(-1, getInDegree());
+ Assert.assertEquals(-1, getOutDegree());
+ //send message to keep vertices active
+ sendMessageToAllNeighbors(vertex.getValue());
}
}
@@ -467,11 +515,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
@Override
public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
- try {
setNewVertexValue(getNumberOfVertices());
- } catch (Exception e) {
- e.printStackTrace();
- }
}
}
@@ -495,8 +539,25 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
}
@SuppressWarnings("serial")
- public static final class VertexUpdateDirection extends VertexUpdateFunction<Long, HashSet<Long>,
- Long> {
+ public static final class DegreesMessageFunction extends MessagingFunction<Long, Long, Long, Long> {
+
+ @Override
+ public void sendMessages(Vertex<Long, Long> vertex) {
+ if (vertex.getId().equals(1)) {
+ Assert.assertEquals(2, getOutDegree());
+ Assert.assertEquals(1, getInDegree());
+ }
+ else if(vertex.getId().equals(3)) {
+ Assert.assertEquals(2, getOutDegree());
+ Assert.assertEquals(2, getInDegree());
+ }
+ //send message to keep vertices active
+ sendMessageToAllNeighbors(vertex.getValue());
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public static final class VertexUpdateDirection extends VertexUpdateFunction<Long, HashSet<Long>, Long> {
@Override
public void updateVertex(Vertex<Long, HashSet<Long>> vertex, MessageIterator<Long> messages) throws Exception {
@@ -511,6 +572,26 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
}
@SuppressWarnings("serial")
+ public static final class UpdateFunctionInDegrees extends VertexUpdateFunction<Long, Long, Long> {
+
+ @Override
+ public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
+ long inDegree = getInDegree();
+ setNewVertexValue(inDegree);
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public static final class UpdateFunctionOutDegrees extends VertexUpdateFunction<Long, Long, Long> {
+
+ @Override
+ public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
+ long outDegree = getOutDegree();
+ setNewVertexValue(outDegree);
+ }
+ }
+
+ @SuppressWarnings("serial")
public static final class VertexUpdateNumNeighbors extends VertexUpdateFunction<Long, Boolean,
Long> {
@@ -519,25 +600,21 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
long count = 0;
- for(long msg : messages) {
+ for(@SuppressWarnings("unused") long msg : messages) {
count++;
}
-
- setNewVertexValue(count == (((VertexWithDegrees)vertex).getInDegree() + ((VertexWithDegrees)vertex).getOutDegree()));
+ setNewVertexValue(count == (getInDegree() + getOutDegree()));
}
}
@SuppressWarnings("serial")
- public static final class UpdateFunctionDegrees extends VertexUpdateFunction<Long, Tuple3<Long, Long, Boolean>, Long> {
+ public static final class UpdateFunctionDegrees extends VertexUpdateFunction<Long, Long, Long> {
@Override
- public void updateVertex(Vertex<Long, Tuple3<Long, Long, Boolean>> vertex, MessageIterator<Long> inMessages) {
- try {
- setNewVertexValue(new Tuple3(vertex.getValue().f0, vertex.getValue().f1, (((VertexWithDegrees)vertex).getInDegree() == vertex.getValue().f0)
- && (((VertexWithDegrees)vertex).getOutDegree() == vertex.getValue().f1) && vertex.getValue().f2));
- } catch (Exception e) {
- e.printStackTrace();
- }
+ public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
+ long inDegree = getInDegree();
+ long outDegree = getOutDegree();
+ setNewVertexValue(inDegree + outDegree);
}
}
@@ -594,16 +671,6 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
}
@SuppressWarnings("serial")
- public static final class DegreeMessageFunction extends MessagingFunction<Long, Tuple3<Long, Long, Boolean>, Long, Long> {
-
- @Override
- public void sendMessages(Vertex<Long, Tuple3<Long, Long, Boolean>> vertex) {
- //send message to keep vertices active
- sendMessageToAllNeighbors(vertex.getValue().f0);
- }
- }
-
- @SuppressWarnings("serial")
public static final class AssignOneMapper implements MapFunction<Vertex<Long, Long>, Long> {
public Long map(Vertex<Long, Long> value) {
http://git-wip-us.apache.org/repos/asf/flink/blob/e2f28d47/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationWithExceptionITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationWithExceptionITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationWithExceptionITCase.java
deleted file mode 100644
index 5c57f47..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationWithExceptionITCase.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.VertexWithDegrees;
-import org.apache.flink.graph.spargel.MessageIterator;
-import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexUpdateFunction;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import static org.junit.Assert.fail;
-
-public class VertexCentricConfigurationWithExceptionITCase {
-
- private static final int PARALLELISM = 4;
-
- private static ForkableFlinkMiniCluster cluster;
-
-
- @BeforeClass
- public static void setupCluster() {
- try {
- Configuration config = new Configuration();
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
- cluster = new ForkableFlinkMiniCluster(config, false);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail("Error starting test cluster: " + e.getMessage());
- }
- }
-
- @AfterClass
- public static void tearDownCluster() {
- try {
- cluster.stop();
- }
- catch (Throwable t) {
- t.printStackTrace();
- fail("Cluster shutdown caused an exception: " + t.getMessage());
- }
- }
-
- @Test
- public void testOutDegreesNotSet() throws Exception {
-
- /*
- * Test that if the degrees are not set, the out degrees cannot be accessed - an
- * exception is thrown.
- */
- final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
- "localhost", cluster.getJobManagerRPCPort());
- env.setParallelism(PARALLELISM);
- env.getConfig().disableSysoutLogging();
-
- Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
- TestGraphUtils.getLongLongEdges(), env);
-
- try {
- DataSet<Vertex<Long, Long>> verticesWithOutDegrees = graph.runVertexCentricIteration(new UpdateFunctionOutDegree(),
- new DummyMessageFunction(), 5).getVertices();
-
- verticesWithOutDegrees.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
- env.execute();
-
- fail("The degree option not set test did not fail");
- } catch (Exception e) {
- // We expect the job to fail with an exception
- }
- }
-
- @Test
- public void testInDegreesNotSet() throws Exception {
-
- /*
- * Test that if the degrees are not set, the in degrees cannot be accessed - an
- * exception is thrown.
- */
- final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
- "localhost", cluster.getJobManagerRPCPort());
- env.setParallelism(PARALLELISM);
- env.getConfig().disableSysoutLogging();
-
- Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
- TestGraphUtils.getLongLongEdges(), env);
-
- try {
- DataSet<Vertex<Long, Long>> verticesWithInDegrees = graph.runVertexCentricIteration(new UpdateFunctionInDegree(),
- new DummyMessageFunction(), 5).getVertices();
-
- verticesWithInDegrees.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
- env.execute();
-
- fail("The degree option not set test did not fail");
- } catch (Exception e) {
- // We expect the job to fail with an exception
- }
- }
-
- @Test
- public void testNumVerticesNotSet() throws Exception {
-
- /*
- * Test that if the number of vertices option is not set, this number cannot be accessed -
- * an exception id thrown.
- */
- final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
- "localhost", cluster.getJobManagerRPCPort());
- env.setParallelism(PARALLELISM);
- env.getConfig().disableSysoutLogging();
-
- Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
- TestGraphUtils.getLongLongEdges(), env);
-
- try {
- DataSet<Vertex<Long, Long>> verticesWithNumVertices = graph.runVertexCentricIteration(new UpdateFunctionNumVertices(),
- new DummyMessageFunction(), 5).getVertices();
-
- verticesWithNumVertices.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
- env.execute();
-
- fail("The num vertices option not set test did not fail");
- } catch (Exception e) {
- // We expect the job to fail with an exception
- }
- }
-
- @SuppressWarnings("serial")
- public static final class UpdateFunctionInDegree extends VertexUpdateFunction<Long, Long, Long> {
-
- @Override
- public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) throws Exception {
- setNewVertexValue(((VertexWithDegrees) vertex).getInDegree());
- }
- }
-
- @SuppressWarnings("serial")
- public static final class UpdateFunctionOutDegree extends VertexUpdateFunction<Long, Long, Long> {
-
- @Override
- public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) throws Exception {
- setNewVertexValue(((VertexWithDegrees)vertex).getOutDegree());
- }
- }
-
- @SuppressWarnings("serial")
- public static final class UpdateFunctionNumVertices extends VertexUpdateFunction<Long, Long, Long> {
-
- @Override
- public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) throws Exception {
- setNewVertexValue(getNumberOfVertices());
- }
- }
-
- @SuppressWarnings("serial")
- public static final class DummyMessageFunction extends MessagingFunction<Long, Long, Long, Long> {
-
- @Override
- public void sendMessages(Vertex<Long, Long> vertex) {
- //send message to keep vertices active
- sendMessageToAllNeighbors(vertex.getValue());
- }
- }
-}