You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/10/06 12:14:25 UTC

[1/2] flink git commit: [FLINK-2786] Remove Spargel code and docs; Port Spargel tests to Gelly; Remove Beta badge from Gelly

Repository: flink
Updated Branches:
  refs/heads/master 2b81b17d5 -> 39115abe4


http://git-wip-us.apache.org/repos/asf/flink/blob/39115abe/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexUpdateFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexUpdateFunction.java b/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexUpdateFunction.java
deleted file mode 100644
index 9a494b1..0000000
--- a/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexUpdateFunction.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.spargel.java;
-
-import java.io.Serializable;
-import java.util.Collection;
-
-import org.apache.flink.api.common.aggregators.Aggregator;
-import org.apache.flink.api.common.functions.IterationRuntimeContext;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.types.Value;
-import org.apache.flink.util.Collector;
-
-/**
- * This class must be extended by functions that compute the state of the vertex depending on the old state and the
- * incoming messages. The central method is {@link #updateVertex(Comparable, Object, MessageIterator)}, which is
- * invoked once per vertex per superstep.
- * 
- * <VertexKey> The vertex key type.
- * <VertexValue> The vertex value type.
- * <Message> The message type.
- */
-@Deprecated
-public abstract class VertexUpdateFunction<VertexKey extends Comparable<VertexKey>, VertexValue, Message> implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-	
-	// --------------------------------------------------------------------------------------------
-	//  Public API Methods
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * This method is invoked once per vertex per superstep. It receives the current state of the vertex, as well as
-	 * the incoming messages. It may set a new vertex state via {@link #setNewVertexValue(Object)}. If the vertex
-	 * state is changed, it will trigger the sending of messages via the {@link MessagingFunction}.
-	 * 
-	 * @param vertexKey The key (identifier) of the vertex.
-	 * @param vertexValue The value (state) of the vertex.
-	 * @param inMessages The incoming messages to this vertex.
-	 * 
-	 * @throws Exception The computation may throw exceptions, which causes the superstep to fail.
-	 */
-	@Deprecated
-	public abstract void updateVertex(VertexKey vertexKey, VertexValue vertexValue, MessageIterator<Message> inMessages) throws Exception;
-	
-	/**
-	 * This method is executed one per superstep before the vertex update function is invoked for each vertex.
-	 * 
-	 * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail.
-	 */
-	@Deprecated
-	public void preSuperstep() throws Exception {}
-	
-	/**
-	 * This method is executed one per superstep after the vertex update function has been invoked for each vertex.
-	 * 
-	 * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail.
-	 */
-	@Deprecated
-	public void postSuperstep() throws Exception {}
-	
-	/**
-	 * Sets the new value of this vertex. Setting a new value triggers the sending of outgoing messages from this vertex.
-	 * 
-	 * @param newValue The new vertex value.
-	 */
-	@Deprecated
-	public void setNewVertexValue(VertexValue newValue) {
-		outVal.f1 = newValue;
-		out.collect(outVal);
-	}
-	
-	/**
-	 * Gets the number of the superstep, starting at <tt>1</tt>.
-	 * 
-	 * @return The number of the current superstep.
-	 */
-	@Deprecated
-	public int getSuperstepNumber() {
-		return this.runtimeContext.getSuperstepNumber();
-	}
-	
-	/**
-	 * Gets the iteration aggregator registered under the given name. The iteration aggregator is combines
-	 * all aggregates globally once per superstep and makes them available in the next superstep.
-	 * 
-	 * @param name The name of the aggregator.
-	 * @return The aggregator registered under this name, or null, if no aggregator was registered.
-	 */
-	@Deprecated
-	public <T extends Aggregator<?>> T getIterationAggregator(String name) {
-		return this.runtimeContext.<T>getIterationAggregator(name);
-	}
-	
-	/**
-	 * Get the aggregated value that an aggregator computed in the previous iteration.
-	 * 
-	 * @param name The name of the aggregator.
-	 * @return The aggregated value of the previous iteration.
-	 */
-	@Deprecated
-	public <T extends Value> T getPreviousIterationAggregate(String name) {
-		return this.runtimeContext.<T>getPreviousIterationAggregate(name);
-	}
-	
-	/**
-	 * Gets the broadcast data set registered under the given name. Broadcast data sets
-	 * are available on all parallel instances of a function. They can be registered via
-	 * {@link VertexCentricIteration#addBroadcastSetForUpdateFunction(String, org.apache.flink.api.java.DataSet)}.
-	 * 
-	 * @param name The name under which the broadcast set is registered.
-	 * @return The broadcast data set.
-	 */
-	@Deprecated
-	public <T> Collection<T> getBroadcastSet(String name) {
-		return this.runtimeContext.<T>getBroadcastVariable(name);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  internal methods
-	// --------------------------------------------------------------------------------------------
-	
-	private IterationRuntimeContext runtimeContext;
-	
-	private Collector<Tuple2<VertexKey, VertexValue>> out;
-	
-	private Tuple2<VertexKey, VertexValue> outVal;
-	
-	
-	void init(IterationRuntimeContext context) {
-		this.runtimeContext = context;
-	}
-	
-	void setOutput(Tuple2<VertexKey, VertexValue> val, Collector<Tuple2<VertexKey, VertexValue>> out) {
-		this.out = out;
-		this.outVal = val;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/39115abe/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java b/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java
deleted file mode 100644
index 90439a6..0000000
--- a/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.spargel.java.examples;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.spargel.java.MessageIterator;
-import org.apache.flink.spargel.java.MessagingFunction;
-import org.apache.flink.spargel.java.VertexCentricIteration;
-import org.apache.flink.spargel.java.VertexUpdateFunction;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-@SuppressWarnings({"serial", "unchecked"})
-public class SpargelConnectedComponents {
-
-	public static void main(String[] args) throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		DataSet<Long> vertexIds = env.generateSequence(0, 10);
-		DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(0L, 2L), new Tuple2<Long, Long>(2L, 4L), new Tuple2<Long, Long>(4L, 8L),
-															new Tuple2<Long, Long>(1L, 5L), new Tuple2<Long, Long>(3L, 7L), new Tuple2<Long, Long>(3L, 9L));
-		
-		DataSet<Tuple2<Long, Long>> initialVertices = vertexIds.map(new IdAssigner());
-		
-		DataSet<Tuple2<Long, Long>> result = initialVertices.runOperation(VertexCentricIteration.withPlainEdges(edges, new CCUpdater(), new CCMessager(), 100));
-		
-		result.print();
-		env.execute("Spargel Connected Components");
-	}
-	
-	public static final class CCUpdater extends VertexUpdateFunction<Long, Long, Long> {
-		@Override
-		public void updateVertex(Long vertexKey, Long vertexValue, MessageIterator<Long> inMessages) {
-			long min = Long.MAX_VALUE;
-			for (long msg : inMessages) {
-				min = Math.min(min, msg);
-			}
-			if (min < vertexValue) {
-				setNewVertexValue(min);
-			}
-		}
-	}
-	
-	public static final class CCMessager extends MessagingFunction<Long, Long, Long, NullValue> {
-		@Override
-		public void sendMessages(Long vertexId, Long componentId) {
-			sendMessageToAllNeighbors(componentId);
-		}
-	}
-	
-	/**
-	 * A map function that takes a Long value and creates a 2-tuple out of it:
-	 * <pre>(Long value) -> (value, value)</pre>
-	 */
-	@Deprecated
-	public static final class IdAssigner implements MapFunction<Long, Tuple2<Long, Long>> {
-		@Override
-		public Tuple2<Long, Long> map(Long value) {
-			return new Tuple2<Long, Long>(value, value);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/39115abe/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java b/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java
deleted file mode 100644
index fccb195..0000000
--- a/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.spargel.java.examples;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.spargel.java.MessageIterator;
-import org.apache.flink.spargel.java.MessagingFunction;
-import org.apache.flink.spargel.java.OutgoingEdge;
-import org.apache.flink.spargel.java.VertexCentricIteration;
-import org.apache.flink.spargel.java.VertexUpdateFunction;
-import org.apache.flink.util.Collector;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-/**
- * An implementation of the basic PageRank algorithm in the vertex-centric API (spargel).
- * In this implementation, the edges carry a weight (the transition probability).
- */
-@SuppressWarnings("serial")
-public class SpargelPageRank {
-	
-	private static final double BETA = 0.85;
-
-	
-	public static void main(String[] args) throws Exception {
-		final int numVertices = 100;
-		
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		// enumerate some sample edges and assign an initial uniform probability (rank)
-		DataSet<Tuple2<Long, Double>> intialRanks = env.generateSequence(1, numVertices)
-								.map(new MapFunction<Long, Tuple2<Long, Double>>() {
-									public Tuple2<Long, Double> map(Long value) {
-										return new Tuple2<Long, Double>(value, 1.0/numVertices);
-									}
-								});
-		
-		// generate some random edges. the transition probability on each edge is 1/num-out-edges of the source vertex
-		DataSet<Tuple3<Long, Long, Double>> edgesWithProbability = env.generateSequence(1, numVertices)
-								.flatMap(new FlatMapFunction<Long, Tuple3<Long, Long, Double>>() {
-									public void flatMap(Long value, Collector<Tuple3<Long, Long, Double>> out) {
-										int numOutEdges = (int) (Math.random() * (numVertices / 2));
-										for (int i = 0; i < numOutEdges; i++) {
-											long target = (long) (Math.random() * numVertices) + 1;
-											out.collect(new Tuple3<Long, Long, Double>(value, target, 1.0/numOutEdges));
-										}
-									}
-								});
-		
-		DataSet<Tuple2<Long, Double>> result = intialRanks.runOperation(
-			VertexCentricIteration.withValuedEdges(edgesWithProbability,
-						new VertexRankUpdater(numVertices, BETA), new RankMessenger(), 20));
-		
-		result.print();
-		env.execute("Spargel PageRank");
-	}
-	
-	/**
-	 * Function that updates the rank of a vertex by summing up the partial ranks from all incoming messages
-	 * and then applying the dampening formula.
-	 */
-	public static final class VertexRankUpdater extends VertexUpdateFunction<Long, Double, Double> {
-		
-		private final long numVertices;
-		private final double beta;
-		
-		public VertexRankUpdater(long numVertices, double beta) {
-			this.numVertices = numVertices;
-			this.beta = beta;
-		}
-
-		@Override
-		public void updateVertex(Long vertexKey, Double vertexValue, MessageIterator<Double> inMessages) {
-			double rankSum = 0.0;
-			for (double msg : inMessages) {
-				rankSum += msg;
-			}
-			
-			// apply the dampening factor / random jump
-			double newRank = (beta * rankSum) + (1-BETA)/numVertices;
-			setNewVertexValue(newRank);
-		}
-	}
-	
-	/**
-	 * Distributes the rank of a vertex among all target vertices according to the transition probability,
-	 * which is associated with an edge as the edge value.
-	 */
-	public static final class RankMessenger extends MessagingFunction<Long, Double, Double, Double> {
-		
-		@Override
-		public void sendMessages(Long vertexId, Double newRank) {
-			for (OutgoingEdge<Long, Double> edge : getOutgoingEdges()) {
-				sendMessageTo(edge.target(), newRank * edge.edgeValue());
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/39115abe/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java b/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
deleted file mode 100644
index ae4ee95..0000000
--- a/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.spargel.java.examples;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.spargel.java.MessageIterator;
-import org.apache.flink.spargel.java.MessagingFunction;
-import org.apache.flink.spargel.java.OutgoingEdge;
-import org.apache.flink.spargel.java.VertexCentricIteration;
-import org.apache.flink.spargel.java.VertexUpdateFunction;
-import org.apache.flink.util.Collector;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-/**
- * An implementation of the basic PageRank algorithm in the vertex-centric API (spargel).
- * In this implementation, the edges carry a weight (the transition probability).
- */
-@SuppressWarnings("serial")
-public class SpargelPageRankCountingVertices {
-	
-	private static final double BETA = 0.85;
-
-	
-	public static void main(String[] args) throws Exception {
-		final int NUM_VERTICES = 100;
-		
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		// a list of vertices
-		DataSet<Long> vertices = env.generateSequence(1, NUM_VERTICES);
-		
-		// generate some random edges. the transition probability on each edge is 1/num-out-edges of the source vertex
-		DataSet<Tuple3<Long, Long, Double>> edgesWithProbability = env.generateSequence(1, NUM_VERTICES)
-								.flatMap(new FlatMapFunction<Long, Tuple3<Long, Long, Double>>() {
-									public void flatMap(Long value, Collector<Tuple3<Long, Long, Double>> out) {
-										int numOutEdges = (int) (Math.random() * (NUM_VERTICES / 2));
-										for (int i = 0; i < numOutEdges; i++) {
-											long target = (long) (Math.random() * NUM_VERTICES) + 1;
-											out.collect(new Tuple3<Long, Long, Double>(value, target, 1.0/numOutEdges));
-										}
-									}
-								});
-		
-		// ---------- start of the algorithm ---------------
-		
-		// count the number of vertices
-		DataSet<Long> count = vertices
-			.map(new MapFunction<Long, Long>() {
-				public Long map(Long value) {
-					return 1L;
-				}
-			})
-			.reduce(new ReduceFunction<Long>() {
-				public Long reduce(Long value1, Long value2) {
-					return value1 + value2;
-				}
-			});
-		
-		// enumerate some sample edges and assign an initial uniform probability (rank)
-		DataSet<Tuple2<Long, Double>> intialRanks = vertices
-			.map(new RichMapFunction<Long, Tuple2<Long, Double>>() {
-				
-				private long numVertices;
-				
-				@Override
-				public void open(Configuration parameters) {
-					numVertices = getRuntimeContext().<Long>getBroadcastVariable("count").iterator().next();
-				}
-				
-				public Tuple2<Long, Double> map(Long value) {
-					return new Tuple2<Long, Double>(value, 1.0/numVertices);
-				}
-			}).withBroadcastSet(count, "count");
-		
-
-		VertexCentricIteration<Long, Double, Double, Double> iteration = VertexCentricIteration.withValuedEdges(edgesWithProbability,
-				new VertexRankUpdater(BETA), new RankMessenger(), 20);
-		iteration.addBroadcastSetForUpdateFunction("count", count);
-		
-		
-		DataSet<Tuple2<Long, Double>> result = intialRanks.runOperation(iteration);
-		
-		result.print();
-		env.execute("Spargel PageRank");
-	}
-	
-	/**
-	 * Function that updates the rank of a vertex by summing up the partial ranks from all incoming messages
-	 * and then applying the dampening formula.
-	 */
-	public static final class VertexRankUpdater extends VertexUpdateFunction<Long, Double, Double> {
-		
-		private final double beta;
-		private long numVertices;
-		
-		public VertexRankUpdater(double beta) {
-			this.beta = beta;
-		}
-		
-		@Override
-		public void preSuperstep() {
-			numVertices = this.<Long>getBroadcastSet("count").iterator().next();
-		}
-
-		@Override
-		public void updateVertex(Long vertexKey, Double vertexValue, MessageIterator<Double> inMessages) {
-			double rankSum = 0.0;
-			for (double msg : inMessages) {
-				rankSum += msg;
-			}
-			
-			// apply the dampening factor / random jump
-			double newRank = (beta * rankSum) + (1-BETA)/numVertices;
-			setNewVertexValue(newRank);
-		}
-	}
-	
-	/**
-	 * Distributes the rank of a vertex among all target vertices according to the transition probability,
-	 * which is associated with an edge as the edge value.
-	 */
-	public static final class RankMessenger extends MessagingFunction<Long, Double, Double, Double> {
-		
-		@Override
-		public void sendMessages(Long vertexId, Double newRank) {
-			for (OutgoingEdge<Long, Double> edge : getOutgoingEdges()) {
-				sendMessageTo(edge.target(), newRank * edge.edgeValue());
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/39115abe/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/Edge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/Edge.java b/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/Edge.java
deleted file mode 100644
index 0c39688..0000000
--- a/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/Edge.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.spargel.java.record;
-
-
-import org.apache.flink.types.Key;
-import org.apache.flink.types.Value;
-
-
-public final class Edge<VertexKey extends Key<VertexKey>, EdgeValue extends Value> {
-	
-	private VertexKey target;
-	private EdgeValue edgeValue;
-	
-	void set(VertexKey target, EdgeValue edgeValue) {
-		this.target = target;
-		this.edgeValue = edgeValue;
-	}
-	
-	public VertexKey target() {
-		return target;
-	}
-	
-	public EdgeValue edgeValue() {
-		return edgeValue;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/39115abe/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/MessageIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/MessageIterator.java b/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/MessageIterator.java
deleted file mode 100644
index 03022d2..0000000
--- a/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/MessageIterator.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.spargel.java.record;
-
-import java.util.Iterator;
-
-import org.apache.flink.types.Record;
-import org.apache.flink.types.Value;
-
-public final class MessageIterator<Message extends Value> implements Iterator<Message>, Iterable<Message> {
-
-	private final Message instance;
-	private Iterator<Record> source;
-	
-	public MessageIterator(Message instance) {
-		this.instance = instance;
-	}
-	
-	public final void setSource(Iterator<Record> source) {
-		this.source = source;
-	}
-	
-	@Override
-	public final boolean hasNext() {
-		return this.source.hasNext();
-	}
-	
-	@Override
-	public final Message next() {
-		this.source.next().getFieldInto(1, this.instance);
-		return this.instance;
-	}
-
-	@Override
-	public final void remove() {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public Iterator<Message> iterator() {
-		return this;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/39115abe/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/MessagingFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/MessagingFunction.java b/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/MessagingFunction.java
deleted file mode 100644
index 794dce5..0000000
--- a/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/MessagingFunction.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.spargel.java.record;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.aggregators.Aggregator;
-import org.apache.flink.api.common.functions.IterationRuntimeContext;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.Key;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.Value;
-import org.apache.flink.util.Collector;
-
-public abstract class MessagingFunction<VertexKey extends Key<VertexKey>, VertexValue extends Value, Message extends Value, EdgeValue extends Value> implements Serializable {
-
-	// --------------------------------------------------------------------------------------------
-	//  Public API Methods
-	// --------------------------------------------------------------------------------------------
-	
-	public abstract void sendMessages(VertexKey vertexKey, VertexValue vertexValue) throws Exception;
-	
-	public void setup(Configuration config) throws Exception {}
-	
-	public void preSuperstep() throws Exception {}
-	
-	public void postSuperstep() throws Exception {}
-	
-	
-	public Iterator<Edge<VertexKey, EdgeValue>> getOutgoingEdges() {
-		if (edgesUsed) {
-			throw new IllegalStateException("Can use either 'getOutgoingEdges()' or 'sendMessageToAllTargets()'.");
-		}
-		
-		edgesUsed = true;
-		edgeIter.set(edges);
-		return edgeIter;
-	}
-	
-	public void sendMessageToAllNeighbors(Message m) {
-		if (edgesUsed) {
-			throw new IllegalStateException("Can use either 'getOutgoingEdges()' or 'sendMessageToAllTargets()'.");
-		}
-		
-		edgesUsed = true;
-		while (edges.hasNext()) {
-			Record next = edges.next();
-			VertexKey k = next.getField(1, this.keyClass);
-			outValue.setField(0, k);
-			outValue.setField(1, m);
-			out.collect(outValue);
-		}
-	}
-	
-	public void sendMessageTo(VertexKey target, Message m) {
-		outValue.setField(0, target);
-		outValue.setField(1, m);
-		out.collect(outValue);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	public int getSuperstep() {
-		return this.runtimeContext.getSuperstepNumber();
-	}
-	
-	public <T extends Aggregator<?>> T getIterationAggregator(String name) {
-		return this.runtimeContext.<T>getIterationAggregator(name);
-	}
-	
-	public <T extends Value> T getPreviousIterationAggregate(String name) {
-		return this.runtimeContext.<T>getPreviousIterationAggregate(name);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  internal methods and state
-	// --------------------------------------------------------------------------------------------
-	
-	private Record outValue;
-	
-	private IterationRuntimeContext runtimeContext;
-	
-	private Iterator<Record> edges;
-	
-	private Collector<Record> out;
-	
-	private EdgesIterator<VertexKey, EdgeValue> edgeIter;
-	
-	private Class<VertexKey> keyClass;
-	
-	private boolean edgesUsed;
-	
-	
-	@SuppressWarnings("unchecked")
-	void init(IterationRuntimeContext context, VertexKey keyHolder, EdgeValue edgeValueHolder) {
-		this.runtimeContext = context;
-		this.edgeIter = new EdgesIterator<VertexKey, EdgeValue>(keyHolder, edgeValueHolder);
-		this.outValue = new Record();
-		this.keyClass = (Class<VertexKey>) keyHolder.getClass();
-	}
-	
-	void set(Iterator<Record> edges, Collector<Record> out) {
-		this.edges = edges;
-		this.out = out;
-		this.edgesUsed = false;
-	}
-	
-	private static final long serialVersionUID = 1L;
-	
-	private static final class EdgesIterator<VertexKey extends Key<VertexKey>, EdgeValue extends Value> implements Iterator<Edge<VertexKey, EdgeValue>> {
-
-		private Iterator<Record> input;
-		private VertexKey keyHolder;
-		private EdgeValue edgeValueHolder;
-		
-		private Edge<VertexKey, EdgeValue> edge = new Edge<VertexKey, EdgeValue>();
-		
-		EdgesIterator(VertexKey keyHolder, EdgeValue edgeValueHolder) {
-			this.keyHolder = keyHolder;
-			this.edgeValueHolder = edgeValueHolder;
-		}
-		
-		void set(Iterator<Record> input) {
-			this.input = input;
-		}
-		
-		@Override
-		public boolean hasNext() {
-			return input.hasNext();
-		}
-
-		@Override
-		public Edge<VertexKey, EdgeValue> next() {
-			Record next = input.next();
-			next.getFieldInto(0, keyHolder);
-			next.getFieldInto(1, edgeValueHolder);
-			edge.set(keyHolder, edgeValueHolder);
-			return edge;
-		}
-
-		@Override
-		public void remove() {
-			throw new UnsupportedOperationException();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/39115abe/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java b/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java
deleted file mode 100644
index 7c137c9..0000000
--- a/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java
+++ /dev/null
@@ -1,289 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.spargel.java.record;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.aggregators.AggregatorRegistry;
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.java.record.functions.CoGroupFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsFirst;
-import org.apache.flink.api.java.record.operators.CoGroupOperator;
-import org.apache.flink.api.java.record.operators.DeltaIteration;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.Key;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.Value;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.ReflectionUtil;
-
-@SuppressWarnings("deprecation")
-public class SpargelIteration {
-	
-	private static final String DEFAULT_NAME = "<unnamed vertex-centric iteration>";
-	
-	private final DeltaIteration iteration;
-	
-	private final Class<? extends Key<?>> vertexKey;
-	private final Class<? extends Value> vertexValue;
-	private final Class<? extends Value> messageType;
-	private final Class<? extends Value> edgeValue;
-	
-	private final CoGroupOperator vertexUpdater;
-	private final CoGroupOperator messager;
-	
-	
-	// ----------------------------------------------------------------------------------
-	
-	public <VertexKey extends Key<VertexKey>, VertexValue extends Value, Message extends Value, EdgeValue extends Value>
-			SpargelIteration(MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf,
-			VertexUpdateFunction<VertexKey, VertexValue, Message> uf)
-	{
-		this(mf, uf, DEFAULT_NAME);
-	}
-	
-	public <VertexKey extends Key<VertexKey>, VertexValue extends Value, Message extends Value, EdgeValue extends Value> SpargelIteration(
-			MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf, VertexUpdateFunction<VertexKey, VertexValue, Message> uf,
-			String name)
-	{
-		// get the types
-		this.vertexKey = ReflectionUtil.getTemplateType1(mf.getClass());
-		this.vertexValue = ReflectionUtil.getTemplateType2(mf.getClass());
-		this.messageType = ReflectionUtil.getTemplateType3(mf.getClass());
-		this.edgeValue = ReflectionUtil.getTemplateType4(mf.getClass());
-		
-		if (vertexKey == null || vertexValue == null || messageType == null || edgeValue == null) {
-			throw new RuntimeException();
-		}
-	
-		// instantiate the data flow
-		this.iteration = new DeltaIteration(0, name);
-		
-		this.messager = CoGroupOperator.builder(MessagingDriver.class, vertexKey, 0, 0)
-			.input2(iteration.getWorkset())
-			.name("Message Sender")
-			.build();
-		this.vertexUpdater = CoGroupOperator.builder(VertexUpdateDriver.class, vertexKey, 0, 0)
-			.input1(messager)
-			.input2(iteration.getSolutionSet())
-			.name("Vertex Updater")
-			.build();
-		
-		iteration.setNextWorkset(vertexUpdater);
-		iteration.setSolutionSetDelta(vertexUpdater);
-		
-		// parameterize the data flow
-		try {
-			Configuration vertexUdfParams = vertexUpdater.getParameters();
-			InstantiationUtil.writeObjectToConfig(uf, vertexUdfParams, VertexUpdateDriver.UDF_PARAM);
-			vertexUdfParams.setClass(VertexUpdateDriver.KEY_PARAM, vertexKey);
-			vertexUdfParams.setClass(VertexUpdateDriver.VALUE_PARAM, vertexValue);
-			vertexUdfParams.setClass(VertexUpdateDriver.MESSAGE_PARAM, messageType);
-			
-			Configuration messageUdfParams = messager.getParameters();
-			InstantiationUtil.writeObjectToConfig(mf, messageUdfParams, MessagingDriver.UDF_PARAM);
-			messageUdfParams.setClass(MessagingDriver.KEY_PARAM, vertexKey);
-			messageUdfParams.setClass(MessagingDriver.VALUE_PARAM, vertexValue);
-			messageUdfParams.setClass(MessagingDriver.MESSAGE_PARAM, messageType);
-			messageUdfParams.setClass(MessagingDriver.EDGE_PARAM, edgeValue);
-		}
-		catch (IOException e) {
-			throw new RuntimeException("Could not serialize the UDFs for distribution" + 
-					(e.getMessage() == null ? '.' : ": " + e.getMessage()), e);
-		}
-	}
-	
-	// ----------------------------------------------------------------------------------
-	//  inputs and outputs
-	// ----------------------------------------------------------------------------------
-	
-	public void setVertexInput(Operator<Record> c) {
-		this.iteration.setInitialSolutionSet(c);
-		this.iteration.setInitialWorkset(c);
-	}
-	
-	public void setEdgesInput(Operator<Record> c) {
-		this.messager.setFirstInput(c);
-	}
-	
-	public Operator<?> getOutput() {
-		return this.iteration;
-	}
-
-	public void setParallelism(int parallelism) {
-		this.iteration.setParallelism(parallelism);
-	}
-	
-	public void setNumberOfIterations(int iterations) {
-		this.iteration.setMaximumNumberOfIterations(iterations);
-	}
-	
-	public AggregatorRegistry getAggregators() {
-		return this.iteration.getAggregators();
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  Wrapping UDFs
-	// --------------------------------------------------------------------------------------------
-	
-	@ConstantFieldsFirst(0)
-	public static final class VertexUpdateDriver<K extends Key<K>, V extends Value, M extends Value> extends CoGroupFunction {
-		
-		private static final long serialVersionUID = 1L;
-		
-		private static final String UDF_PARAM = "spargel.udf";
-		private static final String KEY_PARAM = "spargel.key-type";
-		private static final String VALUE_PARAM = "spargel.value-type";
-		private static final String MESSAGE_PARAM = "spargel.message-type";
-		
-		private VertexUpdateFunction<K, V, M> vertexUpdateFunction;
-		
-		private K vertexKey;
-		private V vertexValue;
-		private MessageIterator<M> messageIter;
-
-		@Override
-		public void coGroup(Iterator<Record> messages, Iterator<Record> vertex, Collector<Record> out) throws Exception {
-			
-			if (vertex.hasNext()) {
-				Record first = vertex.next();
-				first.getFieldInto(0, vertexKey);
-				first.getFieldInto(1, vertexValue);
-				messageIter.setSource(messages);
-				vertexUpdateFunction.setOutput(first, out);
-				vertexUpdateFunction.updateVertex(vertexKey, vertexValue, messageIter);
-			} else {
-				if (messages.hasNext()) {
-					String message = "Target vertex does not exist!.";
-					try {
-						Record next = messages.next();
-						next.getFieldInto(0, vertexKey);
-						message = "Target vertex '" + vertexKey + "' does not exist!.";
-					} catch (Throwable t) {}
-					throw new Exception(message);
-				} else {
-					throw new Exception();
-				}
-			}
-		}
-		
-		@SuppressWarnings("unchecked")
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			// instantiate only the first time
-			if (vertexUpdateFunction == null) {
-				ClassLoader cl = getRuntimeContext().getUserCodeClassLoader();
-				
-				Class<K> vertexKeyClass = parameters.getClass(KEY_PARAM, null, cl);
-				Class<V> vertexValueClass = parameters.getClass(VALUE_PARAM, null, cl);
-				Class<M> messageClass = parameters.getClass(MESSAGE_PARAM, null, cl);
-				
-				vertexKey = InstantiationUtil.instantiate(vertexKeyClass, Key.class);
-				vertexValue = InstantiationUtil.instantiate(vertexValueClass, Value.class);
-				messageIter = new MessageIterator<M>(InstantiationUtil.instantiate(messageClass, Value.class));
-				
-				ClassLoader ucl = getRuntimeContext().getUserCodeClassLoader();
-				
-				try {
-					this.vertexUpdateFunction = (VertexUpdateFunction<K, V, M>) InstantiationUtil.readObjectFromConfig(parameters, UDF_PARAM, ucl);
-				} catch (Exception e) {
-					String message = e.getMessage() == null ? "." : ": " + e.getMessage();
-					throw new Exception("Could not instantiate VertexUpdateFunction" + message, e);
-				}
-				
-				this.vertexUpdateFunction.init(getIterationRuntimeContext());
-				this.vertexUpdateFunction.setup(parameters);
-			}
-			this.vertexUpdateFunction.preSuperstep();
-		}
-		
-		@Override
-		public void close() throws Exception {
-			this.vertexUpdateFunction.postSuperstep();
-		}
-	}
-	
-	public static final class MessagingDriver<K extends Key<K>, V extends Value, M extends Value, E extends Value> extends CoGroupFunction {
-
-		private static final long serialVersionUID = 1L;
-		
-		private static final String UDF_PARAM = "spargel.udf";
-		private static final String KEY_PARAM = "spargel.key-type";
-		private static final String VALUE_PARAM = "spargel.value-type";
-		private static final String MESSAGE_PARAM = "spargel.message-type";
-		private static final String EDGE_PARAM = "spargel.edge-value";
-		
-		
-		private MessagingFunction<K, V, M, E> messagingFunction;
-		
-		private K vertexKey;
-		private V vertexValue;
-		
-		@Override
-		public void coGroup(Iterator<Record> edges, Iterator<Record> state, Collector<Record> out) throws Exception {
-			if (state.hasNext()) {
-				Record first = state.next();
-				first.getFieldInto(0, vertexKey);
-				first.getFieldInto(1, vertexValue);
-				messagingFunction.set(edges, out);
-				messagingFunction.sendMessages(vertexKey, vertexValue);
-			}
-		}
-		
-		@SuppressWarnings("unchecked")
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			// instantiate only the first time
-			if (messagingFunction == null) {
-				ClassLoader cl = getRuntimeContext().getUserCodeClassLoader();
-				
-				Class<K> vertexKeyClass = parameters.getClass(KEY_PARAM, null, cl);
-				Class<V> vertexValueClass = parameters.getClass(VALUE_PARAM, null, cl);
-//				Class<M> messageClass = parameters.getClass(MESSAGE_PARAM, null, Value.class);
-				Class<E> edgeClass = parameters.getClass(EDGE_PARAM, null, cl);
-				
-				vertexKey = InstantiationUtil.instantiate(vertexKeyClass, Key.class);
-				vertexValue = InstantiationUtil.instantiate(vertexValueClass, Value.class);
-				
-				K edgeKeyHolder = InstantiationUtil.instantiate(vertexKeyClass, Key.class);
-				E edgeValueHolder = InstantiationUtil.instantiate(edgeClass, Value.class);
-				
-				ClassLoader ucl = getRuntimeContext().getUserCodeClassLoader();
-				
-				try {
-					this.messagingFunction = (MessagingFunction<K, V, M, E>) InstantiationUtil.readObjectFromConfig(parameters, UDF_PARAM, ucl);
-				} catch (Exception e) {
-					String message = e.getMessage() == null ? "." : ": " + e.getMessage();
-					throw new Exception("Could not instantiate MessagingFunction" + message, e);
-				}
-				
-				this.messagingFunction.init(getIterationRuntimeContext(), edgeKeyHolder, edgeValueHolder);
-				this.messagingFunction.setup(parameters);
-			}
-			this.messagingFunction.preSuperstep();
-		}
-		
-		@Override
-		public void close() throws Exception {
-			this.messagingFunction.postSuperstep();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/39115abe/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/VertexUpdateFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/VertexUpdateFunction.java b/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/VertexUpdateFunction.java
deleted file mode 100644
index 299b898..0000000
--- a/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/VertexUpdateFunction.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.spargel.java.record;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.common.aggregators.Aggregator;
-import org.apache.flink.api.common.functions.IterationRuntimeContext;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.Key;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.Value;
-import org.apache.flink.util.Collector;
-
-/**
- * 
- * <VertexKey> The vertex key type.
- * <VertexValue> The vertex value type.
- * <Message> The message type.
- */
-public abstract class VertexUpdateFunction<VertexKey extends Key<VertexKey>, VertexValue extends Value, Message extends Value> implements Serializable {
-
-	// --------------------------------------------------------------------------------------------
-	//  Public API Methods
-	// --------------------------------------------------------------------------------------------
-	
-	public abstract void updateVertex(VertexKey vertexKey, VertexValue vertexValue, MessageIterator<Message> inMessages) throws Exception;
-	
-	public void setup(Configuration config) throws Exception {}
-	
-	public void preSuperstep() throws Exception {}
-	
-	public void postSuperstep() throws Exception {}
-	
-	public void setNewVertexValue(VertexValue newValue) {
-		outVal.setField(1, newValue);
-		out.collect(outVal);
-	}
-	
-	public int getSuperstep() {
-		return this.runtimeContext.getSuperstepNumber();
-	}
-	
-	public <T extends Aggregator<?>> T getIterationAggregator(String name) {
-		return this.runtimeContext.<T>getIterationAggregator(name);
-	}
-	
-	public <T extends Value> T getPreviousIterationAggregate(String name) {
-		return this.runtimeContext.<T>getPreviousIterationAggregate(name);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  internal methods
-	// --------------------------------------------------------------------------------------------
-	
-	private IterationRuntimeContext runtimeContext;
-	
-	private Collector<Record> out;
-	
-	private Record outVal;
-	
-	
-	void init(IterationRuntimeContext context) {
-		this.runtimeContext = context;
-	}
-	
-	void setOutput(Record val, Collector<Record> out) {
-		this.out = out;
-		this.outVal = val;
-	}
-	
-	// serializability
-	private static final long serialVersionUID = 1L;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/39115abe/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java b/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
deleted file mode 100644
index 018daf8..0000000
--- a/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.spargel.java;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.junit.Test;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.spargel.java.examples.SpargelConnectedComponents.CCMessager;
-import org.apache.flink.spargel.java.examples.SpargelConnectedComponents.CCUpdater;
-import org.apache.flink.spargel.java.examples.SpargelConnectedComponents.IdAssigner;
-
-
-public class SpargelCompilerTest extends CompilerTestBase {
-
-	@Test
-	public void testSpargelCompiler() {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setParallelism(DEFAULT_PARALLELISM);
-			// compose test program
-			{
-				DataSet<Long> vertexIds = env.generateSequence(1, 2);
-				
-				@SuppressWarnings("unchecked")
-				DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
-				
-				DataSet<Tuple2<Long, Long>> initialVertices = vertexIds.map(new IdAssigner());
-				DataSet<Tuple2<Long, Long>> result = initialVertices.runOperation(VertexCentricIteration.withPlainEdges(edges, new CCUpdater(), new CCMessager(), 100));
-				
-				result.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
-			}
-			
-			Plan p = env.createProgramPlan("Spargel Connected Components");
-			OptimizedPlan op = compileNoStats(p);
-			
-			// check the sink
-			SinkPlanNode sink = op.getDataSinks().iterator().next();
-			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
-			assertEquals(DEFAULT_PARALLELISM, sink.getParallelism());
-			
-			// check the iteration
-			WorksetIterationPlanNode iteration = (WorksetIterationPlanNode) sink.getInput().getSource();
-			assertEquals(DEFAULT_PARALLELISM, iteration.getParallelism());
-			
-			// check the solution set join and the delta
-			PlanNode ssDelta = iteration.getSolutionSetDeltaPlanNode();
-			assertTrue(ssDelta instanceof DualInputPlanNode); // this is only true if the update functions preserves the partitioning
-			
-			DualInputPlanNode ssJoin = (DualInputPlanNode) ssDelta;
-			assertEquals(DEFAULT_PARALLELISM, ssJoin.getParallelism());
-			assertEquals(ShipStrategyType.PARTITION_HASH, ssJoin.getInput1().getShipStrategy());
-			assertEquals(new FieldList(0), ssJoin.getInput1().getShipStrategyKeys());
-			
-			// check the workset set join
-			DualInputPlanNode edgeJoin = (DualInputPlanNode) ssJoin.getInput1().getSource();
-			assertEquals(DEFAULT_PARALLELISM, edgeJoin.getParallelism());
-			assertEquals(ShipStrategyType.PARTITION_HASH, edgeJoin.getInput1().getShipStrategy());
-			assertEquals(ShipStrategyType.FORWARD, edgeJoin.getInput2().getShipStrategy());
-			assertTrue(edgeJoin.getInput1().getTempMode().isCached());
-			
-			assertEquals(new FieldList(0), edgeJoin.getInput1().getShipStrategyKeys());
-			
-			// check that the initial partitioning is pushed out of the loop
-			assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput1().getShipStrategy());
-			assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput2().getShipStrategy());
-			assertEquals(new FieldList(0), iteration.getInput1().getShipStrategyKeys());
-			assertEquals(new FieldList(0), iteration.getInput2().getShipStrategyKeys());
-			
-			// check that the initial workset sort is outside the loop
-			assertEquals(LocalStrategy.SORT, iteration.getInput2().getLocalStrategy());
-			assertEquals(new FieldList(0), iteration.getInput2().getLocalStrategyKeys());
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testSpargelCompilerWithBroadcastVariable() {
-		try {
-			final String BC_VAR_NAME = "borat variable";
-			
-			
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setParallelism(DEFAULT_PARALLELISM);
-			// compose test program
-			{
-				DataSet<Long> bcVar = env.fromElements(1L);
-				
-				DataSet<Long> vertexIds = env.generateSequence(1, 2);
-				
-				@SuppressWarnings("unchecked")
-				DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
-				
-				DataSet<Tuple2<Long, Long>> initialVertices = vertexIds.map(new IdAssigner());
-				
-				VertexCentricIteration<Long, Long, Long, ?> vcIter = VertexCentricIteration.withPlainEdges(edges, new CCUpdater(), new CCMessager(), 100);
-				vcIter.addBroadcastSetForMessagingFunction(BC_VAR_NAME, bcVar);
-				vcIter.addBroadcastSetForUpdateFunction(BC_VAR_NAME, bcVar);
-				
-				DataSet<Tuple2<Long, Long>> result = initialVertices.runOperation(vcIter);
-				
-				result.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
-			}
-			
-			Plan p = env.createProgramPlan("Spargel Connected Components");
-			OptimizedPlan op = compileNoStats(p);
-			
-			// check the sink
-			SinkPlanNode sink = op.getDataSinks().iterator().next();
-			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
-			assertEquals(DEFAULT_PARALLELISM, sink.getParallelism());
-			
-			// check the iteration
-			WorksetIterationPlanNode iteration = (WorksetIterationPlanNode) sink.getInput().getSource();
-			assertEquals(DEFAULT_PARALLELISM, iteration.getParallelism());
-			
-			// check the solution set join and the delta
-			PlanNode ssDelta = iteration.getSolutionSetDeltaPlanNode();
-			assertTrue(ssDelta instanceof DualInputPlanNode); // this is only true if the update functions preserves the partitioning
-			
-			DualInputPlanNode ssJoin = (DualInputPlanNode) ssDelta;
-			assertEquals(DEFAULT_PARALLELISM, ssJoin.getParallelism());
-			assertEquals(ShipStrategyType.PARTITION_HASH, ssJoin.getInput1().getShipStrategy());
-			assertEquals(new FieldList(0), ssJoin.getInput1().getShipStrategyKeys());
-			
-			// check the workset set join
-			DualInputPlanNode edgeJoin = (DualInputPlanNode) ssJoin.getInput1().getSource();
-			assertEquals(DEFAULT_PARALLELISM, edgeJoin.getParallelism());
-			assertEquals(ShipStrategyType.PARTITION_HASH, edgeJoin.getInput1().getShipStrategy());
-			assertEquals(ShipStrategyType.FORWARD, edgeJoin.getInput2().getShipStrategy());
-			assertTrue(edgeJoin.getInput1().getTempMode().isCached());
-			
-			assertEquals(new FieldList(0), edgeJoin.getInput1().getShipStrategyKeys());
-			
-			// check that the initial partitioning is pushed out of the loop
-			assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput1().getShipStrategy());
-			assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput2().getShipStrategy());
-			assertEquals(new FieldList(0), iteration.getInput1().getShipStrategyKeys());
-			assertEquals(new FieldList(0), iteration.getInput2().getShipStrategyKeys());
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/39115abe/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelTranslationTest.java b/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelTranslationTest.java
deleted file mode 100644
index cd48dcd..0000000
--- a/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelTranslationTest.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.spargel.java;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import org.junit.Test;
-import org.apache.flink.api.common.aggregators.LongSumAggregator;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.operators.DeltaIteration;
-import org.apache.flink.api.java.operators.DeltaIterationResultSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.operators.TwoInputUdfOperator;
-import org.apache.flink.api.java.tuple.Tuple2;
-
-@SuppressWarnings("serial")
-public class SpargelTranslationTest {
-
-	@Test
-	public void testTranslationPlainEdges() {
-		try {
-			final String ITERATION_NAME = "Test Name";
-			
-			final String AGGREGATOR_NAME = "AggregatorName";
-			
-			final String BC_SET_MESSAGES_NAME = "borat messages";
-			
-			final String BC_SET_UPDATES_NAME = "borat updates";
-			;
-			final int NUM_ITERATIONS = 13;
-			
-			final int ITERATION_parallelism = 77;
-			
-			
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			
-			DataSet<Long> bcMessaging = env.fromElements(1L);
-			DataSet<Long> bcUpdate = env.fromElements(1L);
-			
-			DataSet<Tuple2<String, Double>> result;
-			
-			// ------------ construct the test program ------------------
-			{
-				
-				@SuppressWarnings("unchecked")
-				DataSet<Tuple2<String, Double>> initialVertices = env.fromElements(new Tuple2<String, Double>("abc", 3.44));
-	
-				@SuppressWarnings("unchecked")
-				DataSet<Tuple2<String, String>> edges = env.fromElements(new Tuple2<String, String>("a", "c"));
-				
-				
-				VertexCentricIteration<String, Double, Long, ?> vertexIteration = 
-						VertexCentricIteration.withPlainEdges(edges, new UpdateFunction(), new MessageFunctionNoEdgeValue(), NUM_ITERATIONS);
-				vertexIteration.addBroadcastSetForMessagingFunction(BC_SET_MESSAGES_NAME, bcMessaging);
-				vertexIteration.addBroadcastSetForUpdateFunction(BC_SET_UPDATES_NAME, bcUpdate);
-				
-				vertexIteration.setName(ITERATION_NAME);
-				vertexIteration.setParallelism(ITERATION_parallelism);
-				
-				vertexIteration.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator());
-				
-				result = initialVertices.runOperation(vertexIteration);
-			}
-			
-			
-			// ------------- validate the java program ----------------
-			
-			assertTrue(result instanceof DeltaIterationResultSet);
-			
-			DeltaIterationResultSet<?, ?> resultSet = (DeltaIterationResultSet<?, ?>) result;
-			DeltaIteration<?, ?> iteration = (DeltaIteration<?, ?>) resultSet.getIterationHead();
-			
-			// check the basic iteration properties
-			assertEquals(NUM_ITERATIONS, resultSet.getMaxIterations());
-			assertArrayEquals(new int[] {0}, resultSet.getKeyPositions());
-			assertEquals(ITERATION_parallelism, iteration.getParallelism());
-			assertEquals(ITERATION_NAME, iteration.getName());
-			
-			assertEquals(AGGREGATOR_NAME, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());
-			
-			// validate that the semantic properties are set as they should
-			TwoInputUdfOperator<?, ?, ?, ?> solutionSetJoin = (TwoInputUdfOperator<?, ?, ?, ?>) resultSet.getNextWorkset();
-			assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(0, 0).contains(0));
-			assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(1, 0).contains(0));
-			
-			TwoInputUdfOperator<?, ?, ?, ?> edgesJoin = (TwoInputUdfOperator<?, ?, ?, ?>) solutionSetJoin.getInput1();
-			
-			// validate that the broadcast sets are forwarded
-			assertEquals(bcUpdate, solutionSetJoin.getBroadcastSets().get(BC_SET_UPDATES_NAME));
-			assertEquals(bcMessaging, edgesJoin.getBroadcastSets().get(BC_SET_MESSAGES_NAME));
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testTranslationPlainEdgesWithForkedBroadcastVariable() {
-		try {
-			final String ITERATION_NAME = "Test Name";
-			
-			final String AGGREGATOR_NAME = "AggregatorName";
-			
-			final String BC_SET_MESSAGES_NAME = "borat messages";
-			
-			final String BC_SET_UPDATES_NAME = "borat updates";
-			;
-			final int NUM_ITERATIONS = 13;
-			
-			final int ITERATION_parallelism = 77;
-			
-			
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			
-			DataSet<Long> bcVar = env.fromElements(1L);
-			
-			DataSet<Tuple2<String, Double>> result;
-			
-			// ------------ construct the test program ------------------
-			{
-				
-				@SuppressWarnings("unchecked")
-				DataSet<Tuple2<String, Double>> initialVertices = env.fromElements(new Tuple2<String, Double>("abc", 3.44));
-	
-				@SuppressWarnings("unchecked")
-				DataSet<Tuple2<String, String>> edges = env.fromElements(new Tuple2<String, String>("a", "c"));
-				
-				
-				VertexCentricIteration<String, Double, Long, ?> vertexIteration = 
-						VertexCentricIteration.withPlainEdges(edges, new UpdateFunction(), new MessageFunctionNoEdgeValue(), NUM_ITERATIONS);
-				vertexIteration.addBroadcastSetForMessagingFunction(BC_SET_MESSAGES_NAME, bcVar);
-				vertexIteration.addBroadcastSetForUpdateFunction(BC_SET_UPDATES_NAME, bcVar);
-				
-				vertexIteration.setName(ITERATION_NAME);
-				vertexIteration.setParallelism(ITERATION_parallelism);
-				
-				vertexIteration.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator());
-				
-				result = initialVertices.runOperation(vertexIteration);
-			}
-			
-			
-			// ------------- validate the java program ----------------
-			
-			assertTrue(result instanceof DeltaIterationResultSet);
-			
-			DeltaIterationResultSet<?, ?> resultSet = (DeltaIterationResultSet<?, ?>) result;
-			DeltaIteration<?, ?> iteration = (DeltaIteration<?, ?>) resultSet.getIterationHead();
-			
-			// check the basic iteration properties
-			assertEquals(NUM_ITERATIONS, resultSet.getMaxIterations());
-			assertArrayEquals(new int[] {0}, resultSet.getKeyPositions());
-			assertEquals(ITERATION_parallelism, iteration.getParallelism());
-			assertEquals(ITERATION_NAME, iteration.getName());
-			
-			assertEquals(AGGREGATOR_NAME, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());
-			
-			// validate that the semantic properties are set as they should
-			TwoInputUdfOperator<?, ?, ?, ?> solutionSetJoin = (TwoInputUdfOperator<?, ?, ?, ?>) resultSet.getNextWorkset();
-			assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(0, 0).contains(0));
-			assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(1, 0).contains(0));
-			
-			TwoInputUdfOperator<?, ?, ?, ?> edgesJoin = (TwoInputUdfOperator<?, ?, ?, ?>) solutionSetJoin.getInput1();
-			
-			// validate that the broadcast sets are forwarded
-			assertEquals(bcVar, solutionSetJoin.getBroadcastSets().get(BC_SET_UPDATES_NAME));
-			assertEquals(bcVar, edgesJoin.getBroadcastSets().get(BC_SET_MESSAGES_NAME));
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	public static class UpdateFunction extends VertexUpdateFunction<String, Double, Long> {
-
-		@Override
-		public void updateVertex(String vertexKey, Double vertexValue, MessageIterator<Long> inMessages) {}
-	}
-	
-	public static class MessageFunctionNoEdgeValue extends MessagingFunction<String, Double, Long, Object> {
-
-		@Override
-		public void sendMessages(String vertexKey, Double vertexValue) {}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/39115abe/flink-staging/flink-spargel/src/test/java/org/apache/flink/test/spargel/SpargelConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-spargel/src/test/java/org/apache/flink/test/spargel/SpargelConnectedComponentsITCase.java b/flink-staging/flink-spargel/src/test/java/org/apache/flink/test/spargel/SpargelConnectedComponentsITCase.java
deleted file mode 100644
index 166f7a2..0000000
--- a/flink-staging/flink-spargel/src/test/java/org/apache/flink/test/spargel/SpargelConnectedComponentsITCase.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.spargel;
-
-import java.io.BufferedReader;
-
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.spargel.java.VertexCentricIteration;
-import org.apache.flink.spargel.java.examples.SpargelConnectedComponents.CCMessager;
-import org.apache.flink.spargel.java.examples.SpargelConnectedComponents.CCUpdater;
-import org.apache.flink.spargel.java.examples.SpargelConnectedComponents.IdAssigner;
-import org.apache.flink.test.testdata.ConnectedComponentsData;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-@SuppressWarnings("serial")
-public class SpargelConnectedComponentsITCase extends JavaProgramTestBase {
-
-	private static final long SEED = 9487520347802987L;
-	
-	private static final int NUM_VERTICES = 1000;
-	
-	private static final int NUM_EDGES = 10000;
-
-	private String resultPath;
-	
-	
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempFilePath("results");
-	}
-	
-	@Override
-	protected void testProgram() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		DataSet<Long> vertexIds = env.generateSequence(1, NUM_VERTICES);
-		DataSet<String> edgeString = env.fromElements(ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED).split("\n"));
-		
-		DataSet<Tuple2<Long, Long>> edges = edgeString.map(new EdgeParser());
-		
-		DataSet<Tuple2<Long, Long>> initialVertices = vertexIds.map(new IdAssigner());
-		DataSet<Tuple2<Long, Long>> result = initialVertices.runOperation(VertexCentricIteration.withPlainEdges(edges, new CCUpdater(), new CCMessager(), 100));
-		
-		result.writeAsCsv(resultPath, "\n", " ");
-		env.execute("Spargel Connected Components");
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		for (BufferedReader reader : getResultReader(resultPath)) {
-			ConnectedComponentsData.checkOddEvenResult(reader);
-		}
-	}
-	
-	public static final class EdgeParser extends RichMapFunction<String, Tuple2<Long, Long>> {
-		public Tuple2<Long, Long> map(String value) {
-			String[] nums = value.split(" ");
-			return new Tuple2<Long, Long>(Long.parseLong(nums[0]), Long.parseLong(nums[1]));
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/39115abe/flink-staging/flink-spargel/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-staging/flink-spargel/src/test/resources/log4j-test.properties b/flink-staging/flink-spargel/src/test/resources/log4j-test.properties
deleted file mode 100644
index 2fb9345..0000000
--- a/flink-staging/flink-spargel/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,19 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=OFF
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/39115abe/flink-staging/flink-spargel/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-spargel/src/test/resources/logback-test.xml b/flink-staging/flink-spargel/src/test/resources/logback-test.xml
deleted file mode 100644
index 8b3bb27..0000000
--- a/flink-staging/flink-spargel/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,29 +0,0 @@
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<configuration>
-    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
-        <encoder>
-            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
-        </encoder>
-    </appender>
-
-    <root level="WARN">
-        <appender-ref ref="STDOUT"/>
-    </root>
-</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/39115abe/flink-staging/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/pom.xml b/flink-staging/pom.xml
index a0cda67..271d26c 100644
--- a/flink-staging/pom.xml
+++ b/flink-staging/pom.xml
@@ -37,7 +37,6 @@ under the License.
 	<modules>
 		<module>flink-avro</module>
 		<module>flink-jdbc</module>
-		<module>flink-spargel</module>
 		<module>flink-hadoop-compatibility</module>
 		<module>flink-streaming</module>
 		<module>flink-hbase</module>


[2/2] flink git commit: [FLINK-2786] Remove Spargel code and docs; Port Spargel tests to Gelly; Remove Beta badge from Gelly

Posted by va...@apache.org.
[FLINK-2786] Remove Spargel code and docs; Port Spargel tests to Gelly; Remove Beta badge from Gelly

This closes #1229


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/39115abe
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/39115abe
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/39115abe

Branch: refs/heads/master
Commit: 39115abe43e8a0233ed6954006fbe7102238bd68
Parents: 2b81b17
Author: vasia <va...@apache.org>
Authored: Mon Oct 5 20:08:55 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Tue Oct 6 11:47:32 2015 +0200

----------------------------------------------------------------------
 docs/_includes/navbar.html                      |   3 +-
 docs/libs/fig/spargel_example.png               | Bin 199032 -> 0 bytes
 docs/libs/fig/spargel_example_input.png         | Bin 113478 -> 0 bytes
 docs/libs/gelly_guide.md                        | 114 +---
 docs/libs/spargel_guide.md                      | 133 ----
 .../graph/spargel/SpargelCompilerTest.java      | 214 +++++++
 .../graph/spargel/SpargelTranslationTest.java   | 231 +++++++
 flink-staging/flink-spargel/pom.xml             |  72 ---
 .../flink/spargel/java/MessageIterator.java     |  58 --
 .../flink/spargel/java/MessagingFunction.java   | 288 ---------
 .../apache/flink/spargel/java/OutgoingEdge.java |  67 --
 .../spargel/java/VertexCentricIteration.java    | 612 -------------------
 .../spargel/java/VertexUpdateFunction.java      | 154 -----
 .../examples/SpargelConnectedComponents.java    |  80 ---
 .../spargel/java/examples/SpargelPageRank.java  | 117 ----
 .../SpargelPageRankCountingVertices.java        | 154 -----
 .../apache/flink/spargel/java/record/Edge.java  |  43 --
 .../spargel/java/record/MessageIterator.java    |  59 --
 .../spargel/java/record/MessagingFunction.java  | 163 -----
 .../spargel/java/record/SpargelIteration.java   | 289 ---------
 .../java/record/VertexUpdateFunction.java       |  90 ---
 .../flink/spargel/java/SpargelCompilerTest.java | 183 ------
 .../spargel/java/SpargelTranslationTest.java    | 211 -------
 .../SpargelConnectedComponentsITCase.java       |  81 ---
 .../src/test/resources/log4j-test.properties    |  19 -
 .../src/test/resources/logback-test.xml         |  29 -
 flink-staging/pom.xml                           |   1 -
 27 files changed, 447 insertions(+), 3018 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/39115abe/docs/_includes/navbar.html
----------------------------------------------------------------------
diff --git a/docs/_includes/navbar.html b/docs/_includes/navbar.html
index e36e259..1ea3916 100644
--- a/docs/_includes/navbar.html
+++ b/docs/_includes/navbar.html
@@ -102,8 +102,7 @@ under the License.
             <li class="dropdown{% if page.url contains '/libs/' %} active{% endif %}">
               <a href="{{ libs }}" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Libraries <span class="caret"></span></a>
                 <ul class="dropdown-menu" role="menu">
-                  <li><a href="{{ libs }}/spargel_guide.html">Graphs: Spargel</a></li>
-                  <li><a href="{{ libs }}/gelly_guide.html">Graphs: Gelly <span class="badge">Beta</span></a></li>
+                  <li><a href="{{ libs }}/gelly_guide.html">Graphs: Gelly</a></li>
                   <li><a href="{{ libs }}/ml/">Machine Learning <span class="badge">Beta</span></a></li>
                   <li><a href="{{ libs }}/table.html">Relational: Table <span class="badge">Beta</span></a></li>
               </ul>

http://git-wip-us.apache.org/repos/asf/flink/blob/39115abe/docs/libs/fig/spargel_example.png
----------------------------------------------------------------------
diff --git a/docs/libs/fig/spargel_example.png b/docs/libs/fig/spargel_example.png
deleted file mode 100644
index 21d20f8..0000000
Binary files a/docs/libs/fig/spargel_example.png and /dev/null differ

http://git-wip-us.apache.org/repos/asf/flink/blob/39115abe/docs/libs/fig/spargel_example_input.png
----------------------------------------------------------------------
diff --git a/docs/libs/fig/spargel_example_input.png b/docs/libs/fig/spargel_example_input.png
deleted file mode 100644
index 01ead46..0000000
Binary files a/docs/libs/fig/spargel_example_input.png and /dev/null differ

http://git-wip-us.apache.org/repos/asf/flink/blob/39115abe/docs/libs/gelly_guide.md
----------------------------------------------------------------------
diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md
index 0c3748b..e08cf32 100644
--- a/docs/libs/gelly_guide.md
+++ b/docs/libs/gelly_guide.md
@@ -1,6 +1,5 @@
 ---
 title: "Gelly: Flink Graph API"
-is_beta: true
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
@@ -677,7 +676,7 @@ The vertex-centric model, also known as "think like a vertex" model, expresses c
 * <strong>Messaging</strong>:  produce the messages that a vertex will send to other vertices.
 * <strong>Value Update</strong>: update the vertex value using the received messages.
 
-Gelly wraps Flink's [Spargel API](spargel_guide.html) to provide methods for vertex-centric iterations. The user only needs to implement two functions, corresponding to the phases above: a `VertexUpdateFunction`, which defines how a vertex will update its value based on the received messages and a `MessagingFunction`, which allows a vertex to send out messages for the next superstep.
+Gelly provides methods for vertex-centric iterations. The user only needs to implement two functions, corresponding to the phases above: a `VertexUpdateFunction`, which defines how a vertex will update its value based on the received messages and a `MessagingFunction`, which allows a vertex to send out messages for the next superstep.
 These functions and the maximum number of iterations to run are given as parameters to Gelly's `runVertexCentricIteration`. This method will execute the vertex-centric iteration on the input Graph and return a new Graph, with updated vertex values.
 
 A vertex-centric iteration can be extended with information such as the total number of vertices, the in degree and out degree.
@@ -1430,114 +1429,3 @@ env.execute
 </div>
 
 [Back to top](#top)
-
-Migrating Spargel Code to Gelly
------------
-
-Gelly provides the old Spargel API functionality through its vertex-centric iteration methods. Applications can be easily migrated from one API to the other, using the following
-general guidelines:
-
-* <strong>Vertex and Edge Types</strong>: In Spargel, vertices and edges are defined using tuples (`Tuple2` for vertices, `Tuple2` for edges without values, `Tuple3` for edges with values). Gelly has a more intuitive [graph representation](#graph-representation) by introducing the `Vertex` and `Edge` types.
-
-* <strong>Methods for Plain Edges and for Valued Edges</strong>: In Spargel, there are separate methods for edges with values and edges without values when running the vertex centric iteration (i.e. `withValuedEdges()`, `withPlainEdges()`). In Gelly, this distinction is no longer needede because an edge with no value will simply have a `NullValue` type.
-
-* <strong>OutgoingEdge</strong>: Spargel's `OutgoingEdge` is replaced by `Edge` in Gelly.
-
-* <strong>Running a Vertex Centric Iteration</strong>: In Spargel, an iteration is run by calling the `runOperation()` method on a `VertexCentricIteration`. The edge type (plain or valued) dictates the method to be called. The arguments are: a data set of edges, the vertex update function, the messaging function and the maximum number of iterations. The result is a `DataSet<Tuple2<vertexId, vertexValue>>` representing the updated vertices.
-In Gelly, an iteration is run by calling `runVertexCentricIteration()` on a `Graph`. The parameters given to this method are the vertex update function, the messaging function and the maximum number of iterations. The result is a new `Graph` with updated vertex values.
-
-* <strong>Configuring a Vertex Centric Iteration</strong>: In Spargel, an iteration is configured by directly setting a set of parameters on the `VertexCentricIteration` instance (e.g. `iteration.setName("Spargel Iteration")`). In Gelly, a vertex-centric iteration is configured using the `IterationConfiguration` object (e.g. `iterationConfiguration.setName("Gelly Iteration”)`). An instance of this object is then passed as a final parameter to the `runVertexCentricIteration()` method.
-
-* <strong>Record API</strong>: Spargel's Record API was completely removed from Gelly.
-
-In the following section, we present a step-by-step example for porting the Connected Components algorithm from Spargel to Gelly.
-
-In Spargel, the edges and vertices are defined by a `DataSet<Tuple2<IdType, EdgeValue>>` and a `DataSet<Tuple2<IdType, VertexValue>>` respectively:
-
-{% highlight java %}
-// Spargel API
-ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-DataSet<Tuple2<Long, Long>> edges = ...
-DataSet<Tuple2<Long, Long>> initialVertices = vertexIds.map(new IdAssigner());
-
-DataSet<Tuple2<Long, Long>> result = initialVertices.runOperation(
-				VertexCentricIteration.withPlainEdges(edges, new CCUpdater(), new CCMessenger(),
-				maxIterations));
-
-result.print();
-env.execute("Spargel Connected Components");
-{% endhighlight %}
-
-In this algorithm, initially, each vertex has its own ID as a value (is in its own component).
-Hence, the need for `IdAssigner()`, which is used to initialize the vertex values.
-
-<p class="text-center">
-    <img alt="Spargel Example Input" width="75%" src="fig/spargel_example_input.png" />
-</p>
-
-In Gelly, the edges and vertices have a more intuitive definition: they are represented by separate types `Edge`, `Vertex`.
-After defining the edge data set, we can create a `Graph` from it.
-
-{% highlight java %}
-// Gelly API
-ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-DataSet<Edge<Long, NullValue>> edges = ...
-
-Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, new MapFunction<Long, Long>() {
-			@Override
-			public Long map(Long value) throws Exception {
-				return value;
-			}
-		}, env);
-
-DataSet<Vertex<Long, Long>> result = graph.runVertexCentricIteration(new CCUpdater(), new CCMessenger(), maxIterations)
-					.getVertices();
-
-result.print();
-env.execute("Gelly Connected Components");
-{% endhighlight %}
-
-Notice that when assigning the initial vertex IDs, there is no need to perform a separate map operation. The value is specified directly in the `fromDataSet()` method.
-Instead of calling `runOperation()` on the set of vertices, `runVertexCentricIteration()` is called on the `Graph` instance.
-As previously stated, `runVertexCentricIteration` returns a new `Graph` with the updated vertex values. In order to retrieve the result (since for this algorithm we are only interested in the vertex ids and their corresponding values), we will call the `getVertices()` method.
-
-The user-defined `VertexUpdateFunction` and `MessagingFunction` remain unchanged in Gelly, so you can reuse them without any changes.
-
-In the Connected Components algorithm, the vertices propagate their current component ID in iterations, each time adopting a new value from the received neighbor IDs, provided that the value is smaller than the current minimum.
-To this end, we iterate over all received messages and update the vertex value, if necessary:
-
-{% highlight java %}
-public static final class CCUpdater extends VertexUpdateFunction<Long, Long, Long> {
-	@Override
-	public void updateVertex(Long vertexKey, Long vertexValue, MessageIterator<Long> inMessages) {
-		long min = Long.MAX_VALUE;
-		for (long msg : inMessages) {
-			min = Math.min(min, msg);
-		}
-		if (min < vertexValue) {
-			setNewVertexValue(min);
-		}
-	}
-}
-{% endhighlight %}
-
-The **messages in each superstep** consist of the **current component ID** seen by the vertex:
-{% highlight java %}
-public static final class CCMessenger extends MessagingFunction<Long, Long, Long, NullValue> {
-	@Override
-	public void sendMessages(Long vertexId, Long componentId) {
-		sendMessageToAllNeighbors(componentId);
-	}
-}
-{% endhighlight %}
-
-
-Similarly to Spargel, if the value of a vertex does not change during a superstep, it will **not send** any messages in the superstep. This allows to do incremental updates to the **hot (changing) parts** of the graph, while leaving **cold (steady) parts** untouched.
-
-The computation **terminates** after a specified *maximum number of supersteps* **-OR-** when the *vertex states stop changing*.
-
-<p class="text-center">
-    <img alt="Spargel Example" width="75%" src="fig/spargel_example.png" />
-</p>
-
-[Back to top](#top)

http://git-wip-us.apache.org/repos/asf/flink/blob/39115abe/docs/libs/spargel_guide.md
----------------------------------------------------------------------
diff --git a/docs/libs/spargel_guide.md b/docs/libs/spargel_guide.md
deleted file mode 100644
index 127df38..0000000
--- a/docs/libs/spargel_guide.md
+++ /dev/null
@@ -1,133 +0,0 @@
----
-title: "Spargel Graph Processing API - DEPRECATED"
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-Spargel is our [Giraph](http://giraph.apache.org) like **graph processing** Java API. It supports basic graph computations, which are run as a sequence of [supersteps]({{site.baseurl}}/apis/iterations.html#supersteps). Spargel and Giraph both implement the [Bulk Synchronous Parallel (BSP)](https://en.wikipedia.org/wiki/Bulk_Synchronous_Parallel) programming model, propsed by Google's [Pregel](http://googleresearch.blogspot.de/2009/06/large-scale-graph-computing-at-google.html).
-
-The API provides a **vertex-centric** view on graph processing with two basic operations per superstep:
-
-  1. **Send messages** to other vertices, and
-  2. **Receive messages** from other vertices and **update own vertex state**.
-
-This vertex-centric view makes it easy to express a large class of graph problems efficiently. We will list all *relevant interfaces* of the **Spargel API** to implement and walk through an **example Spargel program**.
-
-* This will be replaced by the TOC
-{:toc}
-
-Spargel API - DEPRECATED
------------
-The Spargel API is Deprecated. Please check out the new [Gelly API](gelly_guide.html) for graph processing with Apache Flink. If you want to port your Spargel code into Gelly,
-please check the [migration guide](gelly_guide.html#migrating-spargel-code-to-gelly).
-
-The Spargel API is part of the *addons* Maven project. All relevant classes are located in the *org.apache.flink.spargel.java* package.
-
-Add the following dependency to your `pom.xml` to use the Spargel.
-
-~~~xml
-<dependency>
-	<groupId>org.apache.flink</groupId>
-	<artifactId>flink-spargel</artifactId>
-	<version>{{site.version}}</version>
-</dependency>
-~~~
-
-Extend **VertexUpdateFunction&lt;***VertexKeyType*, *VertexValueType*, *MessageType***&gt;** to implement your *custom vertex update logic*.
-
-Extend **MessagingFunction&lt;***VertexKeyType*, *VertexValueType*, *MessageType*, *EdgeValueType***&gt;** to implement your *custom message logic*.
-
-Create a **SpargelIteration** operator to include Spargel in your data flow.
-
-Example: Propagate Minimum Vertex ID in Graph
----------------------------------------------
-
-The Spargel operator **SpargelIteration** includes Spargel graph processing into your data flow. As usual, it can be combined with other operators like *map*, *reduce*, *join*, etc.
-
-~~~java
-FileDataSource vertices = new FileDataSource(...);
-FileDataSource edges = new FileDataSource(...);
-
-SpargelIteration iteration = new SpargelIteration(new MinMessager(), new MinNeighborUpdater());
-iteration.setVertexInput(vertices);
-iteration.setEdgesInput(edges);
-iteration.setNumberOfIterations(maxIterations);
-
-FileDataSink result = new FileDataSink(...);
-result.setInput(iteration.getOutput());
-
-new Plan(result);
-~~~
-
-Besides the **program logic** of vertex updates in *MinNeighborUpdater* and messages in *MinMessager*, you have to specify the **initial vertex** and **edge input**. Every vertex has a **key** and **value**. In each superstep, it **receives messages** from other vertices and updates its state:
-
-  - **Vertex** input: **(id**: *VertexKeyType*, **value**: *VertexValueType***)**
-  - **Edge** input: **(source**: *VertexKeyType*, **target**: *VertexKeyType*[, **value**: *EdgeValueType*])
-
-For our example, we set the vertex ID as both *id and value* (initial minimum) and *leave out the edge values* as we don't need them:
-
-<p class="text-center">
-    <img alt="Spargel Example Input" width="75%" src="fig/spargel_example_input.png" />
-</p>
-
-In order to **propagate the minimum vertex ID**, we iterate over all received messages (which contain the neighboring IDs) and update our value, if we found a new minimum:
-
-~~~java
-public class MinNeighborUpdater extends VertexUpdateFunction<IntValue, IntValue, IntValue> {
-	
-	@Override
-	public void updateVertex(IntValue id, IntValue currentMin, Iterator<IntValue> messages) {
-		int min = Integer.MAX_VALUE;
-
-		// iterate over all received messages
-		while (messages.hasNext()) {
-			int next = messages.next().getValue();
-			min = next < min ? next : min;
-		}
-
-		// update vertex value, if new minimum
-		if (min < currentMin.getValue()) {
-			setNewVertexValue(new IntValue(min));
-		}
-	}
-}
-~~~
-
-The **messages in each superstep** consist of the **current minimum ID** seen by the vertex:
-
-~~~java
-public class MinMessager extends MessagingFunction<IntValue, IntValue, IntValue, NullValue> {
-	
-	@Override
-	public void sendMessages(IntValue id, IntValue currentMin) {
-		// send current minimum to neighbors
-		sendMessageToAllNeighbors(currentMin);
-    }
-}
-~~~
-
-The **API-provided method** `sendMessageToAllNeighbors(MessageType)` sends the message to all neighboring vertices. It is also possible to address specific vertices with `sendMessageTo(VertexKeyType, MessageType)`.
-
-If the value of a vertex does not change during a superstep, it will **not send** any messages in the superstep. This allows to do incremental updates to the **hot (changing) parts** of the graph, while leaving **cold (steady) parts** untouched.
-
-The computation **terminates** after a specified *maximum number of supersteps* **-OR-** the *vertex states stop changing*.
-
-<p class="text-center">
-    <img alt="Spargel Example" width="75%" src="fig/spargel_example.png" />
-</p>

http://git-wip-us.apache.org/repos/asf/flink/blob/39115abe/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
new file mode 100644
index 0000000..7a8143a
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.graph.spargel;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.util.CompilerTestBase;
+import org.junit.Test;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.util.LocalStrategy;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.graph.library.ConnectedComponents;
+import org.apache.flink.graph.utils.Tuple2ToVertexMap;
+
+
+public class SpargelCompilerTest extends CompilerTestBase {
+
+	private static final long serialVersionUID = 1L;
+
+	@SuppressWarnings("serial")
+	@Test
+	public void testSpargelCompiler() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setParallelism(DEFAULT_PARALLELISM);
+			// compose test program
+			{
+
+				DataSet<Vertex<Long, Long>> initialVertices = env.fromElements(
+						new Tuple2<Long, Long>(1L, 1L), new Tuple2<Long, Long>(2L, 2L))
+						.map(new Tuple2ToVertexMap<Long, Long>());
+
+				DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L))
+					.map(new MapFunction<Tuple2<Long,Long>, Edge<Long, NullValue>>() {
+
+						public Edge<Long, NullValue> map(Tuple2<Long, Long> edge) {
+							return new Edge<Long, NullValue>(edge.f0, edge.f1, NullValue.getInstance());
+						}
+				});
+
+				Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);
+				
+				DataSet<Vertex<Long, Long>> result = graph.runVertexCentricIteration(
+						new ConnectedComponents.CCUpdater<Long>(),
+						new ConnectedComponents.CCMessenger<Long>(), 100)
+						.getVertices();
+				
+				result.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
+			}
+			
+			Plan p = env.createProgramPlan("Spargel Connected Components");
+			OptimizedPlan op = compileNoStats(p);
+			
+			// check the sink
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+			assertEquals(DEFAULT_PARALLELISM, sink.getParallelism());
+			
+			// check the iteration
+			WorksetIterationPlanNode iteration = (WorksetIterationPlanNode) sink.getInput().getSource();
+			assertEquals(DEFAULT_PARALLELISM, iteration.getParallelism());
+			
+			// check the solution set join and the delta
+			PlanNode ssDelta = iteration.getSolutionSetDeltaPlanNode();
+			assertTrue(ssDelta instanceof DualInputPlanNode); // this is only true if the update functions preserves the partitioning
+			
+			DualInputPlanNode ssJoin = (DualInputPlanNode) ssDelta;
+			assertEquals(DEFAULT_PARALLELISM, ssJoin.getParallelism());
+			assertEquals(ShipStrategyType.PARTITION_HASH, ssJoin.getInput1().getShipStrategy());
+			assertEquals(new FieldList(0), ssJoin.getInput1().getShipStrategyKeys());
+			
+			// check the workset set join
+			DualInputPlanNode edgeJoin = (DualInputPlanNode) ssJoin.getInput1().getSource();
+			assertEquals(DEFAULT_PARALLELISM, edgeJoin.getParallelism());
+			assertEquals(ShipStrategyType.PARTITION_HASH, edgeJoin.getInput1().getShipStrategy());
+			assertEquals(ShipStrategyType.FORWARD, edgeJoin.getInput2().getShipStrategy());
+			assertTrue(edgeJoin.getInput1().getTempMode().isCached());
+			
+			assertEquals(new FieldList(0), edgeJoin.getInput1().getShipStrategyKeys());
+			
+			// check that the initial partitioning is pushed out of the loop
+			assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput1().getShipStrategy());
+			assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput2().getShipStrategy());
+			assertEquals(new FieldList(0), iteration.getInput1().getShipStrategyKeys());
+			assertEquals(new FieldList(0), iteration.getInput2().getShipStrategyKeys());
+			
+			// check that the initial workset sort is outside the loop
+			assertEquals(LocalStrategy.SORT, iteration.getInput2().getLocalStrategy());
+			assertEquals(new FieldList(0), iteration.getInput2().getLocalStrategyKeys());
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@SuppressWarnings("serial")
+	@Test
+	public void testSpargelCompilerWithBroadcastVariable() {
+		try {
+			final String BC_VAR_NAME = "borat variable";
+			
+			
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setParallelism(DEFAULT_PARALLELISM);
+			// compose test program
+			{
+				DataSet<Long> bcVar = env.fromElements(1L);
+
+				DataSet<Vertex<Long, Long>> initialVertices = env.fromElements(
+						new Tuple2<Long, Long>(1L, 1L), new Tuple2<Long, Long>(2L, 2L))
+						.map(new Tuple2ToVertexMap<Long, Long>());
+
+				DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L))
+						.map(new MapFunction<Tuple2<Long,Long>, Edge<Long, NullValue>>() {
+
+							public Edge<Long, NullValue> map(Tuple2<Long, Long> edge) {
+								return new Edge<Long, NullValue>(edge.f0, edge.f1, NullValue.getInstance());
+							}
+					});
+
+				Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);
+
+				VertexCentricConfiguration parameters = new VertexCentricConfiguration();
+				parameters.addBroadcastSetForMessagingFunction(BC_VAR_NAME, bcVar);
+				parameters.addBroadcastSetForUpdateFunction(BC_VAR_NAME, bcVar);
+
+				DataSet<Vertex<Long, Long>> result = graph.runVertexCentricIteration(
+						new ConnectedComponents.CCUpdater<Long>(),
+						new ConnectedComponents.CCMessenger<Long>(), 100)
+						.getVertices();
+					
+				result.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
+
+			}
+			
+			Plan p = env.createProgramPlan("Spargel Connected Components");
+			OptimizedPlan op = compileNoStats(p);
+			
+			// check the sink
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+			assertEquals(DEFAULT_PARALLELISM, sink.getParallelism());
+			
+			// check the iteration
+			WorksetIterationPlanNode iteration = (WorksetIterationPlanNode) sink.getInput().getSource();
+			assertEquals(DEFAULT_PARALLELISM, iteration.getParallelism());
+			
+			// check the solution set join and the delta
+			PlanNode ssDelta = iteration.getSolutionSetDeltaPlanNode();
+			assertTrue(ssDelta instanceof DualInputPlanNode); // this is only true if the update functions preserves the partitioning
+			
+			DualInputPlanNode ssJoin = (DualInputPlanNode) ssDelta;
+			assertEquals(DEFAULT_PARALLELISM, ssJoin.getParallelism());
+			assertEquals(ShipStrategyType.PARTITION_HASH, ssJoin.getInput1().getShipStrategy());
+			assertEquals(new FieldList(0), ssJoin.getInput1().getShipStrategyKeys());
+			
+			// check the workset set join
+			DualInputPlanNode edgeJoin = (DualInputPlanNode) ssJoin.getInput1().getSource();
+			assertEquals(DEFAULT_PARALLELISM, edgeJoin.getParallelism());
+			assertEquals(ShipStrategyType.PARTITION_HASH, edgeJoin.getInput1().getShipStrategy());
+			assertEquals(ShipStrategyType.FORWARD, edgeJoin.getInput2().getShipStrategy());
+			assertTrue(edgeJoin.getInput1().getTempMode().isCached());
+			
+			assertEquals(new FieldList(0), edgeJoin.getInput1().getShipStrategyKeys());
+			
+			// check that the initial partitioning is pushed out of the loop
+			assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput1().getShipStrategy());
+			assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput2().getShipStrategy());
+			assertEquals(new FieldList(0), iteration.getInput1().getShipStrategyKeys());
+			assertEquals(new FieldList(0), iteration.getInput2().getShipStrategyKeys());
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/39115abe/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java
new file mode 100644
index 0000000..bb3a131
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.graph.spargel;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.junit.Test;
+import org.apache.flink.api.common.aggregators.LongSumAggregator;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.operators.DeltaIterationResultSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.TwoInputUdfOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.NullValue;
+
+@SuppressWarnings("serial")
+public class SpargelTranslationTest {
+
+	@Test
+	public void testTranslationPlainEdges() {
+		try {
+			final String ITERATION_NAME = "Test Name";
+			
+			final String AGGREGATOR_NAME = "AggregatorName";
+			
+			final String BC_SET_MESSAGES_NAME = "borat messages";
+			
+			final String BC_SET_UPDATES_NAME = "borat updates";
+			;
+			final int NUM_ITERATIONS = 13;
+			
+			final int ITERATION_parallelism = 77;
+			
+			
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Long> bcMessaging = env.fromElements(1L);
+			DataSet<Long> bcUpdate = env.fromElements(1L);
+			
+			DataSet<Vertex<String, Double>> result;
+			
+			// ------------ construct the test program ------------------
+			{
+				
+				DataSet<Tuple2<String, Double>> initialVertices = env.fromElements(new Tuple2<String, Double>("abc", 3.44));
+
+				DataSet<Tuple2<String, String>> edges = env.fromElements(new Tuple2<String, String>("a", "c"));
+
+				Graph<String, Double, NullValue> graph = Graph.fromTupleDataSet(initialVertices,
+						edges.map(new MapFunction<Tuple2<String,String>, Tuple3<String, String, NullValue>>() {
+
+							public Tuple3<String, String, NullValue> map(
+									Tuple2<String, String> edge) {
+								return new Tuple3<String, String, NullValue>(edge.f0, edge.f1, NullValue.getInstance());
+							}
+						}), env);
+
+				VertexCentricConfiguration parameters = new VertexCentricConfiguration();
+
+				parameters.addBroadcastSetForMessagingFunction(BC_SET_MESSAGES_NAME, bcMessaging);
+				parameters.addBroadcastSetForUpdateFunction(BC_SET_UPDATES_NAME, bcUpdate);
+				parameters.setName(ITERATION_NAME);
+				parameters.setParallelism(ITERATION_parallelism);
+				parameters.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator());
+
+				result = graph.runVertexCentricIteration(new UpdateFunction(), new MessageFunctionNoEdgeValue(),
+						NUM_ITERATIONS, parameters).getVertices();
+
+				result.output(new DiscardingOutputFormat<Vertex<String, Double>>());
+			}
+			
+			
+			// ------------- validate the java program ----------------
+			
+			assertTrue(result instanceof DeltaIterationResultSet);
+			
+			DeltaIterationResultSet<?, ?> resultSet = (DeltaIterationResultSet<?, ?>) result;
+			DeltaIteration<?, ?> iteration = (DeltaIteration<?, ?>) resultSet.getIterationHead();
+			
+			// check the basic iteration properties
+			assertEquals(NUM_ITERATIONS, resultSet.getMaxIterations());
+			assertArrayEquals(new int[] {0}, resultSet.getKeyPositions());
+			assertEquals(ITERATION_parallelism, iteration.getParallelism());
+			assertEquals(ITERATION_NAME, iteration.getName());
+			
+			assertEquals(AGGREGATOR_NAME, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());
+			
+			// validate that the semantic properties are set as they should
+			TwoInputUdfOperator<?, ?, ?, ?> solutionSetJoin = (TwoInputUdfOperator<?, ?, ?, ?>) resultSet.getNextWorkset();
+			assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(0, 0).contains(0));
+			assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(1, 0).contains(0));
+			
+			TwoInputUdfOperator<?, ?, ?, ?> edgesJoin = (TwoInputUdfOperator<?, ?, ?, ?>) solutionSetJoin.getInput1();
+			
+			// validate that the broadcast sets are forwarded
+			assertEquals(bcUpdate, solutionSetJoin.getBroadcastSets().get(BC_SET_UPDATES_NAME));
+			assertEquals(bcMessaging, edgesJoin.getBroadcastSets().get(BC_SET_MESSAGES_NAME));
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testTranslationPlainEdgesWithForkedBroadcastVariable() {
+		try {
+			final String ITERATION_NAME = "Test Name";
+			
+			final String AGGREGATOR_NAME = "AggregatorName";
+			
+			final String BC_SET_MESSAGES_NAME = "borat messages";
+			
+			final String BC_SET_UPDATES_NAME = "borat updates";
+			;
+			final int NUM_ITERATIONS = 13;
+			
+			final int ITERATION_parallelism = 77;
+			
+			
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Long> bcVar = env.fromElements(1L);
+			
+			DataSet<Vertex<String, Double>> result;
+			
+			// ------------ construct the test program ------------------
+			{
+
+				DataSet<Tuple2<String, Double>> initialVertices = env.fromElements(new Tuple2<String, Double>("abc", 3.44));
+
+				DataSet<Tuple2<String, String>> edges = env.fromElements(new Tuple2<String, String>("a", "c"));
+
+				Graph<String, Double, NullValue> graph = Graph.fromTupleDataSet(initialVertices,
+						edges.map(new MapFunction<Tuple2<String,String>, Tuple3<String, String, NullValue>>() {
+
+							public Tuple3<String, String, NullValue> map(
+									Tuple2<String, String> edge) {
+								return new Tuple3<String, String, NullValue>(edge.f0, edge.f1, NullValue.getInstance());
+							}
+						}), env);
+
+				VertexCentricConfiguration parameters = new VertexCentricConfiguration();
+
+				parameters.addBroadcastSetForMessagingFunction(BC_SET_MESSAGES_NAME, bcVar);
+				parameters.addBroadcastSetForUpdateFunction(BC_SET_UPDATES_NAME, bcVar);
+				parameters.setName(ITERATION_NAME);
+				parameters.setParallelism(ITERATION_parallelism);
+				parameters.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator());
+				
+				result = graph.runVertexCentricIteration(new UpdateFunction(), new MessageFunctionNoEdgeValue(),
+						NUM_ITERATIONS, parameters).getVertices();
+
+				result.output(new DiscardingOutputFormat<Vertex<String, Double>>());
+			}
+			
+			
+			// ------------- validate the java program ----------------
+			
+			assertTrue(result instanceof DeltaIterationResultSet);
+			
+			DeltaIterationResultSet<?, ?> resultSet = (DeltaIterationResultSet<?, ?>) result;
+			DeltaIteration<?, ?> iteration = (DeltaIteration<?, ?>) resultSet.getIterationHead();
+			
+			// check the basic iteration properties
+			assertEquals(NUM_ITERATIONS, resultSet.getMaxIterations());
+			assertArrayEquals(new int[] {0}, resultSet.getKeyPositions());
+			assertEquals(ITERATION_parallelism, iteration.getParallelism());
+			assertEquals(ITERATION_NAME, iteration.getName());
+			
+			assertEquals(AGGREGATOR_NAME, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());
+			
+			// validate that the semantic properties are set as they should
+			TwoInputUdfOperator<?, ?, ?, ?> solutionSetJoin = (TwoInputUdfOperator<?, ?, ?, ?>) resultSet.getNextWorkset();
+			assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(0, 0).contains(0));
+			assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(1, 0).contains(0));
+			
+			TwoInputUdfOperator<?, ?, ?, ?> edgesJoin = (TwoInputUdfOperator<?, ?, ?, ?>) solutionSetJoin.getInput1();
+			
+			// validate that the broadcast sets are forwarded
+			assertEquals(bcVar, solutionSetJoin.getBroadcastSets().get(BC_SET_UPDATES_NAME));
+			assertEquals(bcVar, edgesJoin.getBroadcastSets().get(BC_SET_MESSAGES_NAME));
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public static class UpdateFunction extends VertexUpdateFunction<String, Double, Long> {
+
+		@Override
+		public void updateVertex(Vertex<String, Double> vertex, MessageIterator<Long> inMessages) {}
+	}
+	
+	public static class MessageFunctionNoEdgeValue extends MessagingFunction<String, Double, Long, NullValue> {
+
+		@Override
+		public void sendMessages(Vertex<String, Double> vertex) {}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/39115abe/flink-staging/flink-spargel/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-spargel/pom.xml b/flink-staging/flink-spargel/pom.xml
deleted file mode 100644
index 662412a..0000000
--- a/flink-staging/flink-spargel/pom.xml
+++ /dev/null
@@ -1,72 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-staging</artifactId>
-		<version>0.10-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-	
-	<artifactId>flink-spargel</artifactId>
-	<name>flink-spargel</name>
-
-	<packaging>jar</packaging>
-
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-core</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-java</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-clients</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-optimizer</artifactId>
-			<version>${project.version}</version>
-			<type>test-jar</type>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>com.google.guava</groupId>
-			<artifactId>guava</artifactId>
-			<version>${guava.version}</version>
-		</dependency>
-	</dependencies>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/39115abe/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/MessageIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/MessageIterator.java b/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/MessageIterator.java
deleted file mode 100644
index 535732d..0000000
--- a/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/MessageIterator.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.spargel.java;
-
-import java.util.Iterator;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-
-/**
- * An iterator that returns messages. The iterator is {@link java.lang.Iterable} at the same time to support
- * the <i>foreach</i> syntax.
- */
-public final class MessageIterator<Message> implements Iterator<Message>, Iterable<Message>, java.io.Serializable {
-	private static final long serialVersionUID = 1L;
-
-	private transient Iterator<Tuple2<?, Message>> source;
-	
-	
-	final void setSource(Iterator<Tuple2<?, Message>> source) {
-		this.source = source;
-	}
-	
-	@Override
-	public final boolean hasNext() {
-		return this.source.hasNext();
-	}
-	
-	@Override
-	public final Message next() {
-		return this.source.next().f1;
-	}
-
-	@Override
-	public final void remove() {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public Iterator<Message> iterator() {
-		return this;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/39115abe/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/MessagingFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/MessagingFunction.java b/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/MessagingFunction.java
deleted file mode 100644
index f6056e2..0000000
--- a/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/MessagingFunction.java
+++ /dev/null
@@ -1,288 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.spargel.java;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.aggregators.Aggregator;
-import org.apache.flink.api.common.functions.IterationRuntimeContext;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.spargel.java.OutgoingEdge;
-import org.apache.flink.types.Value;
-import org.apache.flink.util.Collector;
-
-/**
- * The base class for functions that produce messages between vertices as a part of a {@link VertexCentricIteration}.
- * 
- * @param <VertexKey> The type of the vertex key (the vertex identifier).
- * @param <VertexValue> The type of the vertex value (the state of the vertex).
- * @param <Message> The type of the message sent between vertices along the edges.
- * @param <EdgeValue> The type of the values that are associated with the edges.
- */
-@Deprecated
-public abstract class MessagingFunction<VertexKey extends Comparable<VertexKey>, VertexValue, Message, EdgeValue> implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-	
-	// --------------------------------------------------------------------------------------------
-	//  Public API Methods
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * This method is invoked once per superstep for each vertex that was changed in that superstep.
-	 * It needs to produce the messages that will be received by vertices in the next superstep.
-	 * 
-	 * @param vertexKey The key of the vertex that was changed.
-	 * @param vertexValue The value (state) of the vertex that was changed.
-	 * 
-	 * @throws Exception The computation may throw exceptions, which causes the superstep to fail.
-	 */
-	@Deprecated
-	public abstract void sendMessages(VertexKey vertexKey, VertexValue vertexValue) throws Exception;
-	
-	/**
-	 * This method is executed one per superstep before the vertex update function is invoked for each vertex.
-	 * 
-	 * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail.
-	 */
-	@Deprecated
-	public void preSuperstep() throws Exception {}
-	
-	/**
-	 * This method is executed one per superstep after the vertex update function has been invoked for each vertex.
-	 * 
-	 * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail.
-	 */
-	@Deprecated
-	public void postSuperstep() throws Exception {}
-	
-	
-	/**
-	 * Gets an {@link java.lang.Iterable} with all outgoing edges. This method is mutually exclusive with
-	 * {@link #sendMessageToAllNeighbors(Object)} and may be called only once.
-	 * 
-	 * @return An iterator with all outgoing edges.
-	 */
-	@SuppressWarnings("unchecked")
-	public Iterable<OutgoingEdge<VertexKey, EdgeValue>> getOutgoingEdges() {
-		if (edgesUsed) {
-			throw new IllegalStateException("Can use either 'getOutgoingEdges()' or 'sendMessageToAllTargets()' exactly once.");
-		}
-		edgesUsed = true;
-		
-		if (this.edgeWithValueIter != null) {
-			this.edgeWithValueIter.set((Iterator<Tuple3<VertexKey, VertexKey, EdgeValue>>) edges);
-			return this.edgeWithValueIter;
-		} else {
-			this.edgeNoValueIter.set((Iterator<Tuple2<VertexKey, VertexKey>>) edges);
-			return this.edgeNoValueIter;
-		}
-	}
-	
-	/**
-	 * Sends the given message to all vertices that are targets of an outgoing edge of the changed vertex.
-	 * This method is mutually exclusive to the method {@link #getOutgoingEdges()} and may be called only once.
-	 * 
-	 * @param m The message to send.
-	 */
-	public void sendMessageToAllNeighbors(Message m) {
-		if (edgesUsed) {
-			throw new IllegalStateException("Can use either 'getOutgoingEdges()' or 'sendMessageToAllTargets()' exactly once.");
-		}
-		
-		edgesUsed = true;
-		
-		outValue.f1 = m;
-		
-		while (edges.hasNext()) {
-			Tuple next = (Tuple) edges.next();
-			VertexKey k = next.getField(1);
-			outValue.f0 = k;
-			out.collect(outValue);
-		}
-	}
-	
-	/**
-	 * Sends the given message to the vertex identified by the given key. If the target vertex does not exist,
-	 * the next superstep will cause an exception due to a non-deliverable message.
-	 * 
-	 * @param target The key (id) of the target vertex to message.
-	 * @param m The message.
-	 */
-	public void sendMessageTo(VertexKey target, Message m) {
-		outValue.f0 = target;
-		outValue.f1 = m;
-		out.collect(outValue);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Gets the number of the superstep, starting at <tt>1</tt>.
-	 * 
-	 * @return The number of the current superstep.
-	 */
-	public int getSuperstepNumber() {
-		return this.runtimeContext.getSuperstepNumber();
-	}
-	
-	/**
-	 * Gets the iteration aggregator registered under the given name. The iteration aggregator is combines
-	 * all aggregates globally once per superstep and makes them available in the next superstep.
-	 * 
-	 * @param name The name of the aggregator.
-	 * @return The aggregator registered under this name, or null, if no aggregator was registered.
-	 */
-	public <T extends Aggregator<?>> T getIterationAggregator(String name) {
-		return this.runtimeContext.<T>getIterationAggregator(name);
-	}
-	
-	/**
-	 * Get the aggregated value that an aggregator computed in the previous iteration.
-	 * 
-	 * @param name The name of the aggregator.
-	 * @return The aggregated value of the previous iteration.
-	 */
-	public <T extends Value> T getPreviousIterationAggregate(String name) {
-		return this.runtimeContext.<T>getPreviousIterationAggregate(name);
-	}
-	
-	/**
-	 * Gets the broadcast data set registered under the given name. Broadcast data sets
-	 * are available on all parallel instances of a function. They can be registered via
-	 * {@link VertexCentricIteration#addBroadcastSetForMessagingFunction(String, org.apache.flink.api.java.DataSet)}.
-	 * 
-	 * @param name The name under which the broadcast set is registered.
-	 * @return The broadcast data set.
-	 */
-	public <T> Collection<T> getBroadcastSet(String name) {
-		return this.runtimeContext.<T>getBroadcastVariable(name);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  internal methods and state
-	// --------------------------------------------------------------------------------------------
-	
-	private Tuple2<VertexKey, Message> outValue;
-	
-	private IterationRuntimeContext runtimeContext;
-	
-	private Iterator<?> edges;
-	
-	private Collector<Tuple2<VertexKey, Message>> out;
-	
-	private EdgesIteratorNoEdgeValue<VertexKey, EdgeValue> edgeNoValueIter;
-	
-	private EdgesIteratorWithEdgeValue<VertexKey, EdgeValue> edgeWithValueIter;
-	
-	private boolean edgesUsed;
-	
-	
-	void init(IterationRuntimeContext context, boolean hasEdgeValue) {
-		this.runtimeContext = context;
-		this.outValue = new Tuple2<VertexKey, Message>();
-		
-		if (hasEdgeValue) {
-			this.edgeWithValueIter = new EdgesIteratorWithEdgeValue<VertexKey, EdgeValue>();
-		} else {
-			this.edgeNoValueIter = new EdgesIteratorNoEdgeValue<VertexKey, EdgeValue>();
-		}
-	}
-	
-	void set(Iterator<?> edges, Collector<Tuple2<VertexKey, Message>> out) {
-		this.edges = edges;
-		this.out = out;
-		this.edgesUsed = false;
-	}
-	
-	
-	
-	private static final class EdgesIteratorNoEdgeValue<VertexKey extends Comparable<VertexKey>, EdgeValue> 
-		implements Iterator<OutgoingEdge<VertexKey, EdgeValue>>, Iterable<OutgoingEdge<VertexKey, EdgeValue>>
-	{
-		private Iterator<Tuple2<VertexKey, VertexKey>> input;
-		
-		private OutgoingEdge<VertexKey, EdgeValue> edge = new OutgoingEdge<VertexKey, EdgeValue>();
-		
-		
-		void set(Iterator<Tuple2<VertexKey, VertexKey>> input) {
-			this.input = input;
-		}
-		
-		@Override
-		public boolean hasNext() {
-			return input.hasNext();
-		}
-
-		@Override
-		public OutgoingEdge<VertexKey, EdgeValue> next() {
-			Tuple2<VertexKey, VertexKey> next = input.next();
-			edge.set(next.f1, null);
-			return edge;
-		}
-
-		@Override
-		public void remove() {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public Iterator<OutgoingEdge<VertexKey, EdgeValue>> iterator() {
-			return this;
-		}
-	}
-	
-	
-	private static final class EdgesIteratorWithEdgeValue<VertexKey extends Comparable<VertexKey>, EdgeValue> 
-		implements Iterator<OutgoingEdge<VertexKey, EdgeValue>>, Iterable<OutgoingEdge<VertexKey, EdgeValue>>
-	{
-		private Iterator<Tuple3<VertexKey, VertexKey, EdgeValue>> input;
-		
-		private OutgoingEdge<VertexKey, EdgeValue> edge = new OutgoingEdge<VertexKey, EdgeValue>();
-		
-		void set(Iterator<Tuple3<VertexKey, VertexKey, EdgeValue>> input) {
-			this.input = input;
-		}
-		
-		@Override
-		public boolean hasNext() {
-			return input.hasNext();
-		}
-
-		@Override
-		public OutgoingEdge<VertexKey, EdgeValue> next() {
-			Tuple3<VertexKey, VertexKey, EdgeValue> next = input.next();
-			edge.set(next.f1, next.f2);
-			return edge;
-		}
-
-		@Override
-		public void remove() {
-			throw new UnsupportedOperationException();
-		}
-		@Override
-		public Iterator<OutgoingEdge<VertexKey, EdgeValue>> iterator() {
-			return this;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/39115abe/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/OutgoingEdge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/OutgoingEdge.java b/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/OutgoingEdge.java
deleted file mode 100644
index f3ca627..0000000
--- a/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/OutgoingEdge.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.spargel.java;
-
-/**
- * <tt>Edge</tt> objects represent edges between vertices. Edges are defined by their source and target
- * vertex id. Edges may have an associated value (for example a weight or a distance), if the
- * graph algorithm was initialized with the
- * {@link VertexCentricIteration#withValuedEdges(org.apache.flink.api.java.DataSet, VertexUpdateFunction, MessagingFunction, int)}
- * method.
- *
- * @param <VertexKey> The type of the vertex key.
- * @param <EdgeValue> The type of the value associated with the edge. For scenarios where the edges do not hold
- *                    value, this type may be arbitrary.
- */
-@Deprecated
-public final class OutgoingEdge<VertexKey extends Comparable<VertexKey>, EdgeValue> implements java.io.Serializable {
-	
-	private static final long serialVersionUID = 1L;
-	
-	private VertexKey target;
-	
-	private EdgeValue edgeValue;
-	
-	void set(VertexKey target, EdgeValue edgeValue) {
-		this.target = target;
-		this.edgeValue = edgeValue;
-	}
-	
-	/**
-	 * Gets the target vertex id.
-	 * 
-	 * @return The target vertex id.
-	 */
-	@Deprecated
-	public VertexKey target() {
-		return target;
-	}
-	
-	/**
-	 * Gets the value associated with the edge. The value may be null if the iteration was initialized with
-	 * an edge data set without edge values.
-	 * Typical examples of edge values are weights or distances of the path represented by the edge.
-	 *  
-	 * @return The value associated with the edge.
-	 */
-	@Deprecated
-	public EdgeValue edgeValue() {
-		return edgeValue;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/39115abe/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java b/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
deleted file mode 100644
index 839ee46..0000000
--- a/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
+++ /dev/null
@@ -1,612 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.spargel.java;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.common.aggregators.Aggregator;
-import org.apache.flink.api.common.functions.RichCoGroupFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.operators.CoGroupOperator;
-import org.apache.flink.api.java.operators.CustomUnaryOperation;
-import org.apache.flink.api.java.operators.DeltaIteration;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.Collector;
-
-import com.google.common.base.Preconditions;
-
-/**
- * This class represents iterative graph computations, programmed in a vertex-centric perspective.
- * It is a special case of <i>Bulk Synchronous Parallel<i> computation. The paradigm has also been
- * implemented by Google's <i>Pregel</i> system and by <i>Apache Giraph</i>.
- * <p>
- * Vertex centric algorithms operate on graphs, which are defined through vertices and edges. The 
- * algorithms send messages along the edges and update the state of vertices based on
- * the old state and the incoming messages. All vertices have an initial state.
- * The computation terminates once no vertex updates it state any more.
- * Additionally, a maximum number of iterations (supersteps) may be specified.
- * <p>
- * The computation is here represented by two functions:
- * <ul>
- *   <li>The {@link VertexUpdateFunction} receives incoming messages and may updates the state for
- *   the vertex. If a state is updated, messages are sent from this vertex. Initially, all vertices are
- *   considered updated.</li>
- *   <li>The {@link MessagingFunction} takes the new vertex state and sends messages along the outgoing
- *   edges of the vertex. The outgoing edges may optionally have an associated value, such as a weight.</li>
- * </ul>
- * <p>
- * Vertex-centric graph iterations are instantiated by the
- * {@link #withPlainEdges(DataSet, VertexUpdateFunction, MessagingFunction, int)} method, or the
- * {@link #withValuedEdges(DataSet, VertexUpdateFunction, MessagingFunction, int)} method, depending on whether
- * the graph's edges are carrying values.
- *
- * @param <VertexKey> The type of the vertex key (the vertex identifier).
- * @param <VertexValue> The type of the vertex value (the state of the vertex).
- * @param <Message> The type of the message sent between vertices along the edges.
- * @param <EdgeValue> The type of the values that are associated with the edges.
- */
-@Deprecated
-public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>, VertexValue, Message, EdgeValue> 
-	implements CustomUnaryOperation<Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, VertexValue>>
-{
-	private final VertexUpdateFunction<VertexKey, VertexValue, Message> updateFunction;
-	
-	private final MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction;
-	
-	private final DataSet<Tuple2<VertexKey, VertexKey>> edgesWithoutValue;
-	
-	private final DataSet<Tuple3<VertexKey, VertexKey, EdgeValue>> edgesWithValue;
-	
-	private final Map<String, Aggregator<?>> aggregators;
-	
-	private final int maximumNumberOfIterations;
-	
-	private final List<Tuple2<String, DataSet<?>>> bcVarsUpdate = new ArrayList<Tuple2<String,DataSet<?>>>(4);
-	
-	private final List<Tuple2<String, DataSet<?>>> bcVarsMessaging = new ArrayList<Tuple2<String,DataSet<?>>>(4);
-	
-	private final TypeInformation<Message> messageType;
-	
-	private DataSet<Tuple2<VertexKey, VertexValue>> initialVertices;
-	
-	private String name;
-	
-	private int parallelism = -1;
-	
-	private boolean unmanagedSolutionSet;
-	
-	// ----------------------------------------------------------------------------------
-	
-	private  VertexCentricIteration(VertexUpdateFunction<VertexKey, VertexValue, Message> uf,
-			MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf,
-			DataSet<Tuple2<VertexKey, VertexKey>> edgesWithoutValue,
-			int maximumNumberOfIterations)
-	{
-		Preconditions.checkNotNull(uf);
-		Preconditions.checkNotNull(mf);
-		Preconditions.checkNotNull(edgesWithoutValue);
-		Preconditions.checkArgument(maximumNumberOfIterations > 0, "The maximum number of iterations must be at least one.");
-		
-		// check that the edges are actually a valid tuple set of vertex key types
-		TypeInformation<Tuple2<VertexKey, VertexKey>> edgesType = edgesWithoutValue.getType();
-		Preconditions.checkArgument(edgesType.isTupleType() && edgesType.getArity() == 2, "The edges data set (for edges without edge values) must consist of 2-tuples.");
-		
-		TupleTypeInfo<?> tupleInfo = (TupleTypeInfo<?>) edgesType;
-		Preconditions.checkArgument(tupleInfo.getTypeAt(0).equals(tupleInfo.getTypeAt(1))
-			&& Comparable.class.isAssignableFrom(tupleInfo.getTypeAt(0).getTypeClass()),
-			"Both tuple fields (source and target vertex id) must be of the data type that represents the vertex key and implement the java.lang.Comparable interface.");
-		
-		this.updateFunction = uf;
-		this.messagingFunction = mf;
-		this.edgesWithoutValue = edgesWithoutValue;
-		this.edgesWithValue = null;
-		this.maximumNumberOfIterations = maximumNumberOfIterations;
-		this.aggregators = new HashMap<String, Aggregator<?>>();
-		
-		this.messageType = getMessageType(mf);
-	}
-	
-	private VertexCentricIteration(VertexUpdateFunction<VertexKey, VertexValue, Message> uf,
-			MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf,
-			DataSet<Tuple3<VertexKey, VertexKey, EdgeValue>> edgesWithValue, 
-			int maximumNumberOfIterations,
-			boolean edgeHasValueMarker)
-	{
-		Preconditions.checkNotNull(uf);
-		Preconditions.checkNotNull(mf);
-		Preconditions.checkNotNull(edgesWithValue);
-		Preconditions.checkArgument(maximumNumberOfIterations > 0, "The maximum number of iterations must be at least one.");
-		
-		// check that the edges are actually a valid tuple set of vertex key types
-		TypeInformation<Tuple3<VertexKey, VertexKey, EdgeValue>> edgesType = edgesWithValue.getType();
-		Preconditions.checkArgument(edgesType.isTupleType() && edgesType.getArity() == 3, "The edges data set (for edges with edge values) must consist of 3-tuples.");
-		
-		TupleTypeInfo<?> tupleInfo = (TupleTypeInfo<?>) edgesType;
-		Preconditions.checkArgument(tupleInfo.getTypeAt(0).equals(tupleInfo.getTypeAt(1))
-			&& Comparable.class.isAssignableFrom(tupleInfo.getTypeAt(0).getTypeClass()),
-			"The first two tuple fields (source and target vertex id) must be of the data type that represents the vertex key and implement the java.lang.Comparable interface.");
-		
-		Preconditions.checkArgument(maximumNumberOfIterations > 0, "The maximum number of iterations must be at least one.");
-		
-		this.updateFunction = uf;
-		this.messagingFunction = mf;
-		this.edgesWithoutValue = null;
-		this.edgesWithValue = edgesWithValue;
-		this.maximumNumberOfIterations = maximumNumberOfIterations;
-		this.aggregators = new HashMap<String, Aggregator<?>>();
-		
-		this.messageType = getMessageType(mf);
-	}
-	
-	private TypeInformation<Message> getMessageType(MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf) {
-		return TypeExtractor.createTypeInfo(MessagingFunction.class, mf.getClass(), 2, null, null);
-	}
-	
-	/**
-	 * Registers a new aggregator. Aggregators registered here are available during the execution of the vertex updates
-	 * via {@link VertexUpdateFunction#getIterationAggregator(String)} and
-	 * {@link VertexUpdateFunction#getPreviousIterationAggregate(String)}.
-	 * 
-	 * @param name The name of the aggregator, used to retrieve it and its aggregates during execution. 
-	 * @param aggregator The aggregator.
-	 */
-	@Deprecated
-	public void registerAggregator(String name, Aggregator<?> aggregator) {
-		this.aggregators.put(name, aggregator);
-	}
-	
-	/**
-	 * Adds a data set as a broadcast set to the messaging function.
-	 * 
-	 * @param name The name under which the broadcast data is available in the messaging function.
-	 * @param data The data set to be broadcasted.
-	 */
-	@Deprecated
-	public void addBroadcastSetForMessagingFunction(String name, DataSet<?> data) {
-		this.bcVarsMessaging.add(new Tuple2<String, DataSet<?>>(name, data));
-	}
-
-	/**
-	 * Adds a data set as a broadcast set to the vertex update function.
-	 * 
-	 * @param name The name under which the broadcast data is available in the vertex update function.
-	 * @param data The data set to be broadcasted.
-	 */
-	@Deprecated
-	public void addBroadcastSetForUpdateFunction(String name, DataSet<?> data) {
-		this.bcVarsUpdate.add(new Tuple2<String, DataSet<?>>(name, data));
-	}
-	
-	/**
-	 * Sets the name for the vertex-centric iteration. The name is displayed in logs and messages.
-	 * 
-	 * @param name The name for the iteration.
-	 */
-	@Deprecated
-	public void setName(String name) {
-		this.name = name;
-	}
-	
-	/**
-	 * Gets the name from this vertex-centric iteration.
-	 * 
-	 * @return The name of the iteration.
-	 */
-	@Deprecated
-	public String getName() {
-		return name;
-	}
-	
-	/**
-	 * Sets the parallelism for the iteration.
-	 * 
-	 * @param parallelism The parallelism.
-	 */
-	@Deprecated
-	public void setParallelism(int parallelism) {
-		Preconditions.checkArgument(parallelism > 0 || parallelism == -1, "The parallelism must be positive, or -1 (use default).");
-		this.parallelism = parallelism;
-	}
-	
-	/**
-	 * Gets the iteration's parallelism.
-	 * 
-	 * @return The iterations parallelism, or -1, if not set.
-	 */
-	@Deprecated
-	public int getParallelism() {
-		return parallelism;
-	}
-	
-	/**
-	 * Defines whether the solution set is kept in managed memory (Flink's internal way of keeping object
-	 * in serialized form) or as a simple object map.
-	 * By default, the solution set runs in managed memory.
-	 * 
-	 * @param unmanaged True, to keep the solution set in unmanaged memory, false otherwise.
-	 */
-	@Deprecated
-	public void setSolutionSetUnmanagedMemory(boolean unmanaged) {
-		this.unmanagedSolutionSet = unmanaged;
-	}
-	
-	/**
-	 * Gets whether the solution set is kept in managed memory (Flink's internal way of keeping object
-	 * in serialized form) or as a simple object map.
-	 * By default, the solution set runs in managed memory.
-	 * 
-	 * @return True, if the solution set is in unmanaged memory, false otherwise.
-	 */
-	@Deprecated
-	public boolean isSolutionSetUnmanagedMemory() {
-		return this.unmanagedSolutionSet;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  Custom Operator behavior
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Sets the input data set for this operator. In the case of this operator this input data set represents
-	 * the set of vertices with their initial state.
-	 * 
-	 * @param inputData The input data set, which in the case of this operator represents the set of
-	 *                  vertices with their initial state.
-	 * 
-	 * @see org.apache.flink.api.java.operators.CustomUnaryOperation#setInput(org.apache.flink.api.java.DataSet)
-	 */
-	@Override
-	public void setInput(DataSet<Tuple2<VertexKey, VertexValue>> inputData) {
-		// sanity check that we really have two tuples
-		TypeInformation<Tuple2<VertexKey, VertexValue>> inputType = inputData.getType();
-		Preconditions.checkArgument(inputType.isTupleType() && inputType.getArity() == 2, "The input data set (the initial vertices) must consist of 2-tuples.");
-
-		// check that the key type here is the same as for the edges
-		TypeInformation<VertexKey> keyType = ((TupleTypeInfo<?>) inputType).getTypeAt(0);
-		TypeInformation<?> edgeType = edgesWithoutValue != null ? edgesWithoutValue.getType() : edgesWithValue.getType();
-		TypeInformation<VertexKey> edgeKeyType = ((TupleTypeInfo<?>) edgeType).getTypeAt(0);
-		
-		Preconditions.checkArgument(keyType.equals(edgeKeyType), "The first tuple field (the vertex id) of the input data set (the initial vertices) " +
-				"must be the same data type as the first fields of the edge data set (the source vertex id). " +
-				"Here, the key type for the vertex ids is '%s' and the key type  for the edges is '%s'.", keyType, edgeKeyType);
-
-		this.initialVertices = inputData;
-	}
-	
-	/**
-	 * Creates the operator that represents this vertex-centric graph computation.
-	 * 
-	 * @return The operator that represents this vertex-centric graph computation.
-	 */
-	@Override
-	public DataSet<Tuple2<VertexKey, VertexValue>> createResult() {
-		if (this.initialVertices == null) {
-			throw new IllegalStateException("The input data set has not been set.");
-		}
-		
-		// prepare some type information
-		TypeInformation<Tuple2<VertexKey, VertexValue>> vertexTypes = initialVertices.getType();
-		TypeInformation<VertexKey> keyType = ((TupleTypeInfo<?>) initialVertices.getType()).getTypeAt(0);
-		TypeInformation<Tuple2<VertexKey, Message>> messageTypeInfo = new TupleTypeInfo<Tuple2<VertexKey,Message>>(keyType, messageType);		
-		
-		// set up the iteration operator
-		final String name = (this.name != null) ? this.name :
-			"Vertex-centric iteration (" + updateFunction + " | " + messagingFunction + ")";
-		final int[] zeroKeyPos = new int[] {0};
-	
-		final DeltaIteration<Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, VertexValue>> iteration =
-			this.initialVertices.iterateDelta(this.initialVertices, this.maximumNumberOfIterations, zeroKeyPos);
-		iteration.name(name);
-		iteration.parallelism(parallelism);
-		iteration.setSolutionSetUnManaged(unmanagedSolutionSet);
-		
-		// register all aggregators
-		for (Map.Entry<String, Aggregator<?>> entry : this.aggregators.entrySet()) {
-			iteration.registerAggregator(entry.getKey(), entry.getValue());
-		}
-		
-		// build the messaging function (co group)
-		CoGroupOperator<?, ?, Tuple2<VertexKey, Message>> messages;
-		if (edgesWithoutValue != null) {
-			MessagingUdfNoEdgeValues<VertexKey, VertexValue, Message> messenger = new MessagingUdfNoEdgeValues<VertexKey, VertexValue, Message>(messagingFunction, messageTypeInfo);
-			messages = this.edgesWithoutValue.coGroup(iteration.getWorkset()).where(0).equalTo(0).with(messenger);
-		}
-		else {
-			MessagingUdfWithEdgeValues<VertexKey, VertexValue, Message, EdgeValue> messenger = new MessagingUdfWithEdgeValues<VertexKey, VertexValue, Message, EdgeValue>(messagingFunction, messageTypeInfo);
-			messages = this.edgesWithValue.coGroup(iteration.getWorkset()).where(0).equalTo(0).with(messenger);
-		}
-		
-		// configure coGroup message function with name and broadcast variables
-		messages = messages.name("Messaging");
-		for (Tuple2<String, DataSet<?>> e : this.bcVarsMessaging) {
-			messages = messages.withBroadcastSet(e.f1, e.f0);
-		}
-		
-		VertexUpdateUdf<VertexKey, VertexValue, Message> updateUdf = new VertexUpdateUdf<VertexKey, VertexValue, Message>(updateFunction, vertexTypes);
-		
-		// build the update function (co group)
-		CoGroupOperator<?, ?, Tuple2<VertexKey, VertexValue>> updates =
-				messages.coGroup(iteration.getSolutionSet()).where(0).equalTo(0).with(updateUdf);
-		
-		// configure coGroup update function with name and broadcast variables
-		updates = updates.name("Vertex State Updates");
-		for (Tuple2<String, DataSet<?>> e : this.bcVarsUpdate) {
-			updates = updates.withBroadcastSet(e.f1, e.f0);
-		}
-
-		// let the operator know that we preserve the key field
-		updates.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0");
-		
-		return iteration.closeWith(updates, updates);
-		
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	// Constructor builders to avoid signature conflicts with generic type erasure
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Creates a new vertex-centric iteration operator for graphs where the edges are not associated with a value.
-	 * 
-	 * @param edgesWithoutValue The data set containing edges. Edges are represented as 2-tuples: (source-id, target-id)
-	 * @param vertexUpdateFunction The function that updates the state of the vertices from the incoming messages.
-	 * @param messagingFunction The function that turns changed vertex states into messages along the edges.
-	 * 
-	 * @param <VertexKey> The type of the vertex key (the vertex identifier).
-	 * @param <VertexValue> The type of the vertex value (the state of the vertex).
-	 * @param <Message> The type of the message sent between vertices along the edges.
-	 * 
-	 * @return An in stance of the vertex-centric graph computation operator.
-	 */
-	@Deprecated
-	public static final <VertexKey extends Comparable<VertexKey>, VertexValue, Message>
-			VertexCentricIteration<VertexKey, VertexValue, Message, ?> withPlainEdges(
-					DataSet<Tuple2<VertexKey, VertexKey>> edgesWithoutValue,
-						VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction,
-						MessagingFunction<VertexKey, VertexValue, Message, ?> messagingFunction,
-						int maximumNumberOfIterations)
-	{
-		@SuppressWarnings("unchecked")
-		MessagingFunction<VertexKey, VertexValue, Message, Object> tmf = 
-								(MessagingFunction<VertexKey, VertexValue, Message, Object>) messagingFunction;
-		
-		return new VertexCentricIteration<VertexKey, VertexValue, Message, Object>(vertexUpdateFunction, tmf, edgesWithoutValue, maximumNumberOfIterations);
-	}
-	
-	/**
-	 * Creates a new vertex-centric iteration operator for graphs where the edges are associated with a value (such as
-	 * a weight or distance).
-	 * 
-	 * @param edgesWithValue The data set containing edges. Edges are represented as 2-tuples: (source-id, target-id)
-	 * @param uf The function that updates the state of the vertices from the incoming messages.
-	 * @param mf The function that turns changed vertex states into messages along the edges.
-	 * 
-	 * @param <VertexKey> The type of the vertex key (the vertex identifier).
-	 * @param <VertexValue> The type of the vertex value (the state of the vertex).
-	 * @param <Message> The type of the message sent between vertices along the edges.
-	 * @param <EdgeValue> The type of the values that are associated with the edges.
-	 * 
-	 * @return An in stance of the vertex-centric graph computation operator.
-	 */
-	@Deprecated
-	public static final <VertexKey extends Comparable<VertexKey>, VertexValue, Message, EdgeValue>
-			VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue> withValuedEdges(
-					DataSet<Tuple3<VertexKey, VertexKey, EdgeValue>> edgesWithValue,
-					VertexUpdateFunction<VertexKey, VertexValue, Message> uf,
-					MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf,
-					int maximumNumberOfIterations)
-	{
-		return new VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>(uf, mf, edgesWithValue, maximumNumberOfIterations, true);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  Wrapping UDFs
-	// --------------------------------------------------------------------------------------------
-	
-	private static final class VertexUpdateUdf<VertexKey extends Comparable<VertexKey>, VertexValue, Message> 
-		extends RichCoGroupFunction<Tuple2<VertexKey, Message>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, VertexValue>>
-		implements ResultTypeQueryable<Tuple2<VertexKey, VertexValue>>
-	{
-		private static final long serialVersionUID = 1L;
-		
-		private final VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction;
-
-		private final MessageIterator<Message> messageIter = new MessageIterator<Message>();
-		
-		private transient TypeInformation<Tuple2<VertexKey, VertexValue>> resultType;
-		
-		
-		private VertexUpdateUdf(VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction,
-				TypeInformation<Tuple2<VertexKey, VertexValue>> resultType)
-		{
-			this.vertexUpdateFunction = vertexUpdateFunction;
-			this.resultType = resultType;
-		}
-
-		@Override
-		public void coGroup(Iterable<Tuple2<VertexKey, Message>> messages, Iterable<Tuple2<VertexKey, VertexValue>> vertex,
-				Collector<Tuple2<VertexKey, VertexValue>> out)
-			throws Exception
-		{
-			final Iterator<Tuple2<VertexKey, VertexValue>> vertexIter = vertex.iterator();
-			
-			if (vertexIter.hasNext()) {
-				Tuple2<VertexKey, VertexValue> vertexState = vertexIter.next();
-				
-				@SuppressWarnings("unchecked")
-				Iterator<Tuple2<?, Message>> downcastIter = (Iterator<Tuple2<?, Message>>) (Iterator<?>) messages.iterator();
-				messageIter.setSource(downcastIter);
-				
-				vertexUpdateFunction.setOutput(vertexState, out);
-				vertexUpdateFunction.updateVertex(vertexState.f0, vertexState.f1, messageIter);
-			}
-			else {
-				final Iterator<Tuple2<VertexKey, Message>> messageIter = messages.iterator();
-				if (messageIter.hasNext()) {
-					String message = "Target vertex does not exist!.";
-					try {
-						Tuple2<VertexKey, Message> next = messageIter.next();
-						message = "Target vertex '" + next.f0 + "' does not exist!.";
-					} catch (Throwable t) {}
-					throw new Exception(message);
-				} else {
-					throw new Exception();
-				}
-			}
-		}
-		
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
-				this.vertexUpdateFunction.init(getIterationRuntimeContext());
-			}
-			this.vertexUpdateFunction.preSuperstep();
-		}
-		
-		@Override
-		public void close() throws Exception {
-			this.vertexUpdateFunction.postSuperstep();
-		}
-
-		@Override
-		public TypeInformation<Tuple2<VertexKey, VertexValue>> getProducedType() {
-			return this.resultType;
-		}
-	}
-	
-	/*
-	 * UDF that encapsulates the message sending function for graphs where the edges have no associated values.
-	 */
-	private static final class MessagingUdfNoEdgeValues<VertexKey extends Comparable<VertexKey>, VertexValue, Message> 
-		extends RichCoGroupFunction<Tuple2<VertexKey, VertexKey>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, Message>>
-		implements ResultTypeQueryable<Tuple2<VertexKey, Message>>
-	{
-		private static final long serialVersionUID = 1L;
-		
-		private final MessagingFunction<VertexKey, VertexValue, Message, ?> messagingFunction;
-		
-		private transient TypeInformation<Tuple2<VertexKey, Message>> resultType;
-		
-		
-		private MessagingUdfNoEdgeValues(MessagingFunction<VertexKey, VertexValue, Message, ?> messagingFunction,
-				TypeInformation<Tuple2<VertexKey, Message>> resultType)
-		{
-			this.messagingFunction = messagingFunction;
-			this.resultType = resultType;
-		}
-		
-		@Override
-		public void coGroup(Iterable<Tuple2<VertexKey, VertexKey>> edges,
-				Iterable<Tuple2<VertexKey, VertexValue>> state, Collector<Tuple2<VertexKey, Message>> out)
-			throws Exception
-		{
-			final Iterator<Tuple2<VertexKey, VertexValue>> stateIter = state.iterator();
-			
-			if (stateIter.hasNext()) {
-				Tuple2<VertexKey, VertexValue> newVertexState = stateIter.next();
-				messagingFunction.set((Iterator<?>) edges.iterator(), out);
-				messagingFunction.sendMessages(newVertexState.f0, newVertexState.f1);
-			}
-		}
-		
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
-				this.messagingFunction.init(getIterationRuntimeContext(), false);
-			}
-			
-			this.messagingFunction.preSuperstep();
-		}
-		
-		@Override
-		public void close() throws Exception {
-			this.messagingFunction.postSuperstep();
-		}
-
-		@Override
-		public TypeInformation<Tuple2<VertexKey, Message>> getProducedType() {
-			return this.resultType;
-		}
-	}
-	
-	/*
-	 * UDF that encapsulates the message sending function for graphs where the edges have an associated value.
-	 */
-	private static final class MessagingUdfWithEdgeValues<VertexKey extends Comparable<VertexKey>, VertexValue, Message, EdgeValue> 
-		extends RichCoGroupFunction<Tuple3<VertexKey, VertexKey, EdgeValue>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, Message>>
-		implements ResultTypeQueryable<Tuple2<VertexKey, Message>>
-	{
-		private static final long serialVersionUID = 1L;
-		
-		private final MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction;
-		
-		private transient TypeInformation<Tuple2<VertexKey, Message>> resultType;
-		
-		
-		private MessagingUdfWithEdgeValues(MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction,
-				TypeInformation<Tuple2<VertexKey, Message>> resultType)
-		{
-			this.messagingFunction = messagingFunction;
-			this.resultType = resultType;
-		}
-
-		@Override
-		public void coGroup(Iterable<Tuple3<VertexKey, VertexKey, EdgeValue>> edges,
-				Iterable<Tuple2<VertexKey, VertexValue>> state, Collector<Tuple2<VertexKey, Message>> out)
-			throws Exception
-		{
-			final Iterator<Tuple2<VertexKey, VertexValue>> stateIter = state.iterator();
-			
-			if (stateIter.hasNext()) {
-				Tuple2<VertexKey, VertexValue> newVertexState = stateIter.next();
-				messagingFunction.set((Iterator<?>) edges.iterator(), out);
-				messagingFunction.sendMessages(newVertexState.f0, newVertexState.f1);
-			}
-		}
-		
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
-				this.messagingFunction.init(getIterationRuntimeContext(), true);
-			}
-			
-			this.messagingFunction.preSuperstep();
-		}
-		
-		@Override
-		public void close() throws Exception {
-			this.messagingFunction.postSuperstep();
-		}
-		
-		@Override
-		public TypeInformation<Tuple2<VertexKey, Message>> getProducedType() {
-			return this.resultType;
-		}
-	}
-}