You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/10/09 18:05:44 UTC

[06/24] flink git commit: [FLINK-2833] [gelly] create a flink-libraries module and move gelly there

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricConfiguration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricConfiguration.java
deleted file mode 100644
index afd4ffd..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricConfiguration.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.spargel;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.EdgeDirection;
-import org.apache.flink.graph.IterationConfiguration;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * A VertexCentricConfiguration object can be used to set the iteration name and
- * degree of parallelism, to register aggregators and use broadcast sets in
- * the {@link org.apache.flink.graph.spargel.VertexUpdateFunction} and {@link org.apache.flink.graph.spargel.MessagingFunction}
- *
- * The VertexCentricConfiguration object is passed as an argument to
- * {@link org.apache.flink.graph.Graph#runVertexCentricIteration (
- * org.apache.flink.graph.spargel.VertexUpdateFunction, org.apache.flink.graph.spargel.MessagingFunction, int,
- * VertexCentricConfiguration)}.
- */
-public class VertexCentricConfiguration extends IterationConfiguration {
-
-	/** the broadcast variables for the update function **/
-	private List<Tuple2<String, DataSet<?>>> bcVarsUpdate = new ArrayList<Tuple2<String,DataSet<?>>>();
-
-	/** the broadcast variables for the messaging function **/
-	private List<Tuple2<String, DataSet<?>>> bcVarsMessaging = new ArrayList<Tuple2<String,DataSet<?>>>();
-
-	/** flag that defines whether the degrees option is set **/
-	private boolean optDegrees = false;
-
-	/** the direction in which the messages should be sent **/
-	private EdgeDirection direction = EdgeDirection.OUT;
-
-	public VertexCentricConfiguration() {}
-
-	/**
-	 * Adds a data set as a broadcast set to the messaging function.
-	 *
-	 * @param name The name under which the broadcast data is available in the messaging function.
-	 * @param data The data set to be broadcasted.
-	 */
-	public void addBroadcastSetForMessagingFunction(String name, DataSet<?> data) {
-		this.bcVarsMessaging.add(new Tuple2<String, DataSet<?>>(name, data));
-	}
-
-	/**
-	 * Adds a data set as a broadcast set to the vertex update function.
-	 *
-	 * @param name The name under which the broadcast data is available in the vertex update function.
-	 * @param data The data set to be broadcasted.
-	 */
-	public void addBroadcastSetForUpdateFunction(String name, DataSet<?> data) {
-		this.bcVarsUpdate.add(new Tuple2<String, DataSet<?>>(name, data));
-	}
-
-	/**
-	 * Get the broadcast variables of the VertexUpdateFunction.
-	 *
-	 * @return a List of Tuple2, where the first field is the broadcast variable name
-	 * and the second field is the broadcast DataSet.
-	 */
-	public List<Tuple2<String, DataSet<?>>> getUpdateBcastVars() {
-		return this.bcVarsUpdate;
-	}
-
-	/**
-	 * Get the broadcast variables of the MessagingFunction.
-	 *
-	 * @return a List of Tuple2, where the first field is the broadcast variable name
-	 * and the second field is the broadcast DataSet.
-	 */
-	public List<Tuple2<String, DataSet<?>>> getMessagingBcastVars() {
-		return this.bcVarsMessaging;
-	}
-
-	/**
-	 * Gets whether the degrees option is set.
-	 * By default, the degrees option is not set.
-	 *
-	 * @return True, if the degree option is set, false otherwise.
-	 */
-	public boolean isOptDegrees() {
-		return optDegrees;
-	}
-
-	/**
-	 * Sets the degree option.
-	 * By default, the degrees option is not set.
-	 *
-	 * @param optDegrees True, to set this option, false otherwise.
-	 */
-	public void setOptDegrees(boolean optDegrees) {
-		this.optDegrees = optDegrees;
-	}
-
-	/**
-	 * Gets the direction in which messages are sent in the MessagingFunction.
-	 * By default the messaging direction is OUT.
-	 *
-	 * @return an EdgeDirection, which can be either IN, OUT or ALL.
-	 */
-	public EdgeDirection getDirection() {
-		return direction;
-	}
-
-	/**
-	 * Sets the direction in which messages are sent in the MessagingFunction.
-	 * By default the messaging direction is OUT.
-	 *
-	 * @param direction - IN, OUT or ALL
-	 */
-	public void setDirection(EdgeDirection direction) {
-		this.direction = direction;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
deleted file mode 100644
index b3a470e..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
+++ /dev/null
@@ -1,686 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.spargel;
-
-import java.util.Iterator;
-import java.util.Map;
-
-import org.apache.flink.api.common.aggregators.Aggregator;
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.operators.DeltaIteration;
-import org.apache.flink.api.common.functions.RichCoGroupFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.operators.CoGroupOperator;
-import org.apache.flink.api.java.operators.CustomUnaryOperation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.EdgeDirection;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.util.Collector;
-
-import com.google.common.base.Preconditions;
-
-/**
- * This class represents iterative graph computations, programmed in a vertex-centric perspective.
- * It is a special case of <i>Bulk Synchronous Parallel<i> computation. The paradigm has also been
- * implemented by Google's <i>Pregel</i> system and by <i>Apache Giraph</i>.
- * <p>
- * Vertex centric algorithms operate on graphs, which are defined through vertices and edges. The 
- * algorithms send messages along the edges and update the state of vertices based on
- * the old state and the incoming messages. All vertices have an initial state.
- * The computation terminates once no vertex updates it state any more.
- * Additionally, a maximum number of iterations (supersteps) may be specified.
- * <p>
- * The computation is here represented by two functions:
- * <ul>
- *   <li>The {@link VertexUpdateFunction} receives incoming messages and may updates the state for
- *   the vertex. If a state is updated, messages are sent from this vertex. Initially, all vertices are
- *   considered updated.</li>
- *   <li>The {@link MessagingFunction} takes the new vertex state and sends messages along the outgoing
- *   edges of the vertex. The outgoing edges may optionally have an associated value, such as a weight.</li>
- * </ul>
- * <p>
- *
- * Vertex-centric graph iterations are are run by calling
- * {@link Graph#runVertexCentricIteration(VertexUpdateFunction, MessagingFunction, int)}.
- *
- * @param <K> The type of the vertex key (the vertex identifier).
- * @param <VV> The type of the vertex value (the state of the vertex).
- * @param <Message> The type of the message sent between vertices along the edges.
- * @param <EV> The type of the values that are associated with the edges.
- */
-public class VertexCentricIteration<K, VV, Message, EV> 
-	implements CustomUnaryOperation<Vertex<K, VV>, Vertex<K, VV>>
-{
-	private final VertexUpdateFunction<K, VV, Message> updateFunction;
-
-	private final MessagingFunction<K, VV, Message, EV> messagingFunction;
-	
-	private final DataSet<Edge<K, EV>> edgesWithValue;
-	
-	private final int maximumNumberOfIterations;
-	
-	private final TypeInformation<Message> messageType;
-	
-	private DataSet<Vertex<K, VV>> initialVertices;
-
-	private VertexCentricConfiguration configuration;
-
-	// ----------------------------------------------------------------------------------
-	
-	private VertexCentricIteration(VertexUpdateFunction<K, VV, Message> uf,
-			MessagingFunction<K, VV, Message, EV> mf,
-			DataSet<Edge<K, EV>> edgesWithValue, 
-			int maximumNumberOfIterations)
-	{
-		Preconditions.checkNotNull(uf);
-		Preconditions.checkNotNull(mf);
-		Preconditions.checkNotNull(edgesWithValue);
-		Preconditions.checkArgument(maximumNumberOfIterations > 0, "The maximum number of iterations must be at least one.");
-
-		this.updateFunction = uf;
-		this.messagingFunction = mf;
-		this.edgesWithValue = edgesWithValue;
-		this.maximumNumberOfIterations = maximumNumberOfIterations;		
-		this.messageType = getMessageType(mf);
-	}
-	
-	private TypeInformation<Message> getMessageType(MessagingFunction<K, VV, Message, EV> mf) {
-		return TypeExtractor.createTypeInfo(MessagingFunction.class, mf.getClass(), 2, null, null);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  Custom Operator behavior
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Sets the input data set for this operator. In the case of this operator this input data set represents
-	 * the set of vertices with their initial state.
-	 * 
-	 * @param inputData The input data set, which in the case of this operator represents the set of
-	 *                  vertices with their initial state.
-	 * 
-	 * @see org.apache.flink.api.java.operators.CustomUnaryOperation#setInput(org.apache.flink.api.java.DataSet)
-	 */
-	@Override
-	public void setInput(DataSet<Vertex<K, VV>> inputData) {
-		this.initialVertices = inputData;
-	}
-	
-	/**
-	 * Creates the operator that represents this vertex-centric graph computation.
-	 * 
-	 * @return The operator that represents this vertex-centric graph computation.
-	 */
-	@Override
-	public DataSet<Vertex<K, VV>> createResult() {
-		if (this.initialVertices == null) {
-			throw new IllegalStateException("The input data set has not been set.");
-		}
-
-		// prepare some type information
-		TypeInformation<K> keyType = ((TupleTypeInfo<?>) initialVertices.getType()).getTypeAt(0);
-		TypeInformation<Tuple2<K, Message>> messageTypeInfo = new TupleTypeInfo<Tuple2<K,Message>>(keyType, messageType);
-
-		// create a graph
-		Graph<K, VV, EV> graph =
-				Graph.fromDataSet(initialVertices, edgesWithValue, initialVertices.getExecutionEnvironment());
-
-		// check whether the numVertices option is set and, if so, compute the total number of vertices
-		// and set it within the messaging and update functions
-
-		if (this.configuration != null && this.configuration.isOptNumVertices()) {
-			try {
-				long numberOfVertices = graph.numberOfVertices();
-				messagingFunction.setNumberOfVertices(numberOfVertices);
-				updateFunction.setNumberOfVertices(numberOfVertices);
-			} catch (Exception e) {
-				e.printStackTrace();
-			}
-		}
-
-		if(this.configuration != null) {
-			messagingFunction.setDirection(this.configuration.getDirection());
-		} else {
-			messagingFunction.setDirection(EdgeDirection.OUT);
-		}
-
-		// retrieve the direction in which the updates are made and in which the messages are sent
-		EdgeDirection messagingDirection = messagingFunction.getDirection();
-
-		// check whether the degrees option is set and, if so, compute the in and the out degrees and
-		// add them to the vertex value
-		if(this.configuration != null && this.configuration.isOptDegrees()) {
-			return createResultVerticesWithDegrees(graph, messagingDirection, messageTypeInfo);
-		} else {
-			return createResultSimpleVertex(messagingDirection, messageTypeInfo);
-		}
-	}
-
-	/**
-	 * Creates a new vertex-centric iteration operator for graphs where the edges are associated with a value (such as
-	 * a weight or distance).
-	 * 
-	 * @param edgesWithValue The data set containing edges.
-	 * @param uf The function that updates the state of the vertices from the incoming messages.
-	 * @param mf The function that turns changed vertex states into messages along the edges.
-	 * 
-	 * @param <K> The type of the vertex key (the vertex identifier).
-	 * @param <VV> The type of the vertex value (the state of the vertex).
-	 * @param <Message> The type of the message sent between vertices along the edges.
-	 * @param <EV> The type of the values that are associated with the edges.
-	 * 
-	 * @return An in stance of the vertex-centric graph computation operator.
-	 */
-	public static final <K, VV, Message, EV>
-			VertexCentricIteration<K, VV, Message, EV> withEdges(
-					DataSet<Edge<K, EV>> edgesWithValue,
-					VertexUpdateFunction<K, VV, Message> uf,
-					MessagingFunction<K, VV, Message, EV> mf,
-					int maximumNumberOfIterations)
-	{
-		return new VertexCentricIteration<K, VV, Message, EV>(uf, mf, edgesWithValue, maximumNumberOfIterations);
-	}
-
-	/**
-	 * Configures this vertex-centric iteration with the provided parameters.
-	 *
-	 * @param parameters the configuration parameters
-	 */
-	public void configure(VertexCentricConfiguration parameters) {
-		this.configuration = parameters;
-	}
-
-	/**
-	 * @return the configuration parameters of this vertex-centric iteration
-	 */
-	public VertexCentricConfiguration getIterationConfiguration() {
-		return this.configuration;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Wrapping UDFs
-	// --------------------------------------------------------------------------------------------
-
-	private static abstract class VertexUpdateUdf<K, VVWithDegrees, Message> extends RichCoGroupFunction<
-		Tuple2<K, Message>, Vertex<K, VVWithDegrees>, Vertex<K, VVWithDegrees>>
-		implements ResultTypeQueryable<Vertex<K, VVWithDegrees>>
-	{
-		private static final long serialVersionUID = 1L;
-		
-		final VertexUpdateFunction<K, VVWithDegrees, Message> vertexUpdateFunction;
-
-		final MessageIterator<Message> messageIter = new MessageIterator<Message>();
-		
-		private transient TypeInformation<Vertex<K, VVWithDegrees>> resultType;
-		
-		
-		private VertexUpdateUdf(VertexUpdateFunction<K, VVWithDegrees, Message> vertexUpdateFunction,
-				TypeInformation<Vertex<K, VVWithDegrees>> resultType)
-		{
-			this.vertexUpdateFunction = vertexUpdateFunction;
-			this.resultType = resultType;
-		}
-
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
-				this.vertexUpdateFunction.init(getIterationRuntimeContext());
-			}
-			this.vertexUpdateFunction.preSuperstep();
-		}
-		
-		@Override
-		public void close() throws Exception {
-			this.vertexUpdateFunction.postSuperstep();
-		}
-
-		@Override
-		public TypeInformation<Vertex<K, VVWithDegrees>> getProducedType() {
-			return this.resultType;
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class VertexUpdateUdfSimpleVV<K, VV, Message> extends VertexUpdateUdf<K, VV, Message> {
-
-		private VertexUpdateUdfSimpleVV(VertexUpdateFunction<K, VV, Message> vertexUpdateFunction, TypeInformation<Vertex<K, VV>> resultType) {
-			super(vertexUpdateFunction, resultType);
-		}
-
-		@Override
-		public void coGroup(Iterable<Tuple2<K, Message>> messages,
-							Iterable<Vertex<K, VV>> vertex,
-							Collector<Vertex<K, VV>> out) throws Exception {
-			final Iterator<Vertex<K, VV>> vertexIter = vertex.iterator();
-
-			if (vertexIter.hasNext()) {
-				Vertex<K, VV> vertexState = vertexIter.next();
-
-				@SuppressWarnings("unchecked")
-				Iterator<Tuple2<?, Message>> downcastIter = (Iterator<Tuple2<?, Message>>) (Iterator<?>) messages.iterator();
-				messageIter.setSource(downcastIter);
-
-				vertexUpdateFunction.setOutput(vertexState, out);
-				vertexUpdateFunction.updateVertex(vertexState, messageIter);
-			}
-			else {
-				final Iterator<Tuple2<K, Message>> messageIter = messages.iterator();
-				if (messageIter.hasNext()) {
-					String message = "Target vertex does not exist!.";
-					try {
-						Tuple2<K, Message> next = messageIter.next();
-						message = "Target vertex '" + next.f0 + "' does not exist!.";
-					} catch (Throwable t) {}
-					throw new Exception(message);
-				} else {
-					throw new Exception();
-				}
-			}
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class VertexUpdateUdfVVWithDegrees<K, VV, Message> extends VertexUpdateUdf<K, Tuple3<VV, Long, Long>, Message> {
-
-		private VertexUpdateUdfVVWithDegrees(VertexUpdateFunction<K, Tuple3<VV, Long, Long>, Message> vertexUpdateFunction,
-				TypeInformation<Vertex<K, Tuple3<VV, Long, Long>>> resultType) {
-			super(vertexUpdateFunction, resultType);
-		}
-		
-		@Override
-		public void coGroup(Iterable<Tuple2<K, Message>> messages, Iterable<Vertex<K, Tuple3<VV, Long, Long>>> vertex,
-							Collector<Vertex<K, Tuple3<VV, Long, Long>>> out) throws Exception {
-
-			final Iterator<Vertex<K, Tuple3<VV, Long, Long>>> vertexIter = vertex.iterator();
-		
-			if (vertexIter.hasNext()) {
-				Vertex<K, Tuple3<VV, Long, Long>> vertexWithDegrees = vertexIter.next();
-		
-				@SuppressWarnings("unchecked")
-				Iterator<Tuple2<?, Message>> downcastIter = (Iterator<Tuple2<?, Message>>) (Iterator<?>) messages.iterator();
-				messageIter.setSource(downcastIter);
-
-				vertexUpdateFunction.setInDegree(vertexWithDegrees.f1.f1);
-				vertexUpdateFunction.setOutDegree(vertexWithDegrees.f1.f2);
-
-				vertexUpdateFunction.setOutputWithDegrees(vertexWithDegrees, out);
-				vertexUpdateFunction.updateVertexFromVertexCentricIteration(vertexWithDegrees, messageIter);
-			}
-			else {
-				final Iterator<Tuple2<K, Message>> messageIter = messages.iterator();
-				if (messageIter.hasNext()) {
-					String message = "Target vertex does not exist!.";
-					try {
-						Tuple2<K, Message> next = messageIter.next();
-						message = "Target vertex '" + next.f0 + "' does not exist!.";
-					} catch (Throwable t) {}
-					throw new Exception(message);
-				} else {
-					throw new Exception();
-				}
-			}
-		}
-	}
-
-	/*
-	 * UDF that encapsulates the message sending function for graphs where the edges have an associated value.
-	 */
-	private static abstract class MessagingUdfWithEdgeValues<K, VVWithDegrees, VV, Message, EV>
-		extends RichCoGroupFunction<Edge<K, EV>, Vertex<K, VVWithDegrees>, Tuple2<K, Message>>
-		implements ResultTypeQueryable<Tuple2<K, Message>>
-	{
-		private static final long serialVersionUID = 1L;
-		
-		final MessagingFunction<K, VV, Message, EV> messagingFunction;
-		
-		private transient TypeInformation<Tuple2<K, Message>> resultType;
-	
-	
-		private MessagingUdfWithEdgeValues(MessagingFunction<K, VV, Message, EV> messagingFunction,
-				TypeInformation<Tuple2<K, Message>> resultType)
-		{
-			this.messagingFunction = messagingFunction;
-			this.resultType = resultType;
-		}
-		
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
-				this.messagingFunction.init(getIterationRuntimeContext());
-			}
-			
-			this.messagingFunction.preSuperstep();
-		}
-		
-		@Override
-		public void close() throws Exception {
-			this.messagingFunction.postSuperstep();
-		}
-		
-		@Override
-		public TypeInformation<Tuple2<K, Message>> getProducedType() {
-			return this.resultType;
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class MessagingUdfWithEVsSimpleVV<K, VV, Message, EV>
-		extends MessagingUdfWithEdgeValues<K, VV, VV, Message, EV> {
-
-		private MessagingUdfWithEVsSimpleVV(MessagingFunction<K, VV, Message, EV> messagingFunction,
-			TypeInformation<Tuple2<K, Message>> resultType) {
-			super(messagingFunction, resultType);
-		}
-
-		@Override
-		public void coGroup(Iterable<Edge<K, EV>> edges,
-							Iterable<Vertex<K, VV>> state,
-							Collector<Tuple2<K, Message>> out) throws Exception {
-			final Iterator<Vertex<K, VV>> stateIter = state.iterator();
-		
-			if (stateIter.hasNext()) {
-				Vertex<K, VV> newVertexState = stateIter.next();
-				messagingFunction.set((Iterator<?>) edges.iterator(), out);
-				messagingFunction.sendMessages(newVertexState);
-			}
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class MessagingUdfWithEVsVVWithDegrees<K, VV, Message, EV>
-		extends MessagingUdfWithEdgeValues<K, Tuple3<VV, Long, Long>, VV, Message, EV> {
-
-		private Vertex<K, VV> nextVertex = new Vertex<K, VV>();
-
-		private MessagingUdfWithEVsVVWithDegrees(MessagingFunction<K, VV, Message, EV> messagingFunction,
-				TypeInformation<Tuple2<K, Message>> resultType) {
-			super(messagingFunction, resultType);
-		}
-
-		@Override
-		public void coGroup(Iterable<Edge<K, EV>> edges, Iterable<Vertex<K, Tuple3<VV, Long, Long>>> state,
-				Collector<Tuple2<K, Message>> out) throws Exception {
-
-			final Iterator<Vertex<K, Tuple3<VV, Long, Long>>> stateIter = state.iterator();
-		
-			if (stateIter.hasNext()) {
-				Vertex<K, Tuple3<VV, Long, Long>> vertexWithDegrees = stateIter.next();
-
-				nextVertex.setField(vertexWithDegrees.f0, 0);
-				nextVertex.setField(vertexWithDegrees.f1.f0, 1);
-
-				messagingFunction.setInDegree(vertexWithDegrees.f1.f1);
-				messagingFunction.setOutDegree(vertexWithDegrees.f1.f2);
-
-				messagingFunction.set((Iterator<?>) edges.iterator(), out);
-				messagingFunction.sendMessages(nextVertex);
-			}
-		}
-	}
-
-
-	// --------------------------------------------------------------------------------------------
-	//  UTIL methods
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * Method that builds the messaging function using a coGroup operator for a simple vertex(without
-	 * degrees).
-	 * It afterwards configures the function with a custom name and broadcast variables.
-	 *
-	 * @param iteration
-	 * @param messageTypeInfo
-	 * @param whereArg the argument for the where within the coGroup
-	 * @param equalToArg the argument for the equalTo within the coGroup
-	 * @return the messaging function
-	 */
-	private CoGroupOperator<?, ?, Tuple2<K, Message>> buildMessagingFunction(
-			DeltaIteration<Vertex<K, VV>, Vertex<K, VV>> iteration,
-			TypeInformation<Tuple2<K, Message>> messageTypeInfo, int whereArg, int equalToArg) {
-
-		// build the messaging function (co group)
-		CoGroupOperator<?, ?, Tuple2<K, Message>> messages;
-		MessagingUdfWithEdgeValues<K, VV, VV, Message, EV> messenger =
-				new MessagingUdfWithEVsSimpleVV<K, VV, Message, EV>(messagingFunction, messageTypeInfo);
-
-		messages = this.edgesWithValue.coGroup(iteration.getWorkset()).where(whereArg)
-				.equalTo(equalToArg).with(messenger);
-
-		// configure coGroup message function with name and broadcast variables
-		messages = messages.name("Messaging");
-		if(this.configuration != null) {
-			for (Tuple2<String, DataSet<?>> e : this.configuration.getMessagingBcastVars()) {
-				messages = messages.withBroadcastSet(e.f1, e.f0);
-			}
-		}
-
-		return messages;
-	}
-
-	/**
-	 * Method that builds the messaging function using a coGroup operator for a vertex
-	 * containing degree information.
-	 * It afterwards configures the function with a custom name and broadcast variables.
-	 *
-	 * @param iteration
-	 * @param messageTypeInfo
-	 * @param whereArg the argument for the where within the coGroup
-	 * @param equalToArg the argument for the equalTo within the coGroup
-	 * @return the messaging function
-	 */
-	private CoGroupOperator<?, ?, Tuple2<K, Message>> buildMessagingFunctionVerticesWithDegrees(
-			DeltaIteration<Vertex<K, Tuple3<VV, Long, Long>>, Vertex<K, Tuple3<VV, Long, Long>>> iteration,
-			TypeInformation<Tuple2<K, Message>> messageTypeInfo, int whereArg, int equalToArg) {
-
-		// build the messaging function (co group)
-		CoGroupOperator<?, ?, Tuple2<K, Message>> messages;
-		MessagingUdfWithEdgeValues<K, Tuple3<VV, Long, Long>, VV, Message, EV> messenger =
-				new MessagingUdfWithEVsVVWithDegrees<K, VV, Message, EV>(messagingFunction, messageTypeInfo);
-
-		messages = this.edgesWithValue.coGroup(iteration.getWorkset()).where(whereArg)
-				.equalTo(equalToArg).with(messenger);
-
-		// configure coGroup message function with name and broadcast variables
-		messages = messages.name("Messaging");
-
-		if (this.configuration != null) {
-			for (Tuple2<String, DataSet<?>> e : this.configuration.getMessagingBcastVars()) {
-				messages = messages.withBroadcastSet(e.f1, e.f0);
-			}
-		}
-
-		return messages;
-	}
-
-	/**
-	 * Helper method which sets up an iteration with the given vertex value(either simple or with degrees)
-	 *
-	 * @param iteration
-	 */
-
-	private void setUpIteration(DeltaIteration<?, ?> iteration) {
-
-		// set up the iteration operator
-		if (this.configuration != null) {
-
-			iteration.name(this.configuration.getName("Vertex-centric iteration (" + updateFunction + " | " + messagingFunction + ")"));
-			iteration.parallelism(this.configuration.getParallelism());
-			iteration.setSolutionSetUnManaged(this.configuration.isSolutionSetUnmanagedMemory());
-
-			// register all aggregators
-			for (Map.Entry<String, Aggregator<?>> entry : this.configuration.getAggregators().entrySet()) {
-				iteration.registerAggregator(entry.getKey(), entry.getValue());
-			}
-		}
-		else {
-			// no configuration provided; set default name
-			iteration.name("Vertex-centric iteration (" + updateFunction + " | " + messagingFunction + ")");
-		}
-	}
-
-	/**
-	 * Creates the operator that represents this vertex centric graph computation for a simple vertex.
-	 *
-	 * @param messagingDirection
-	 * @param messageTypeInfo
-	 * @return the operator
-	 */
-	private DataSet<Vertex<K, VV>> createResultSimpleVertex(EdgeDirection messagingDirection,
-		TypeInformation<Tuple2<K, Message>> messageTypeInfo) {
-
-		DataSet<Tuple2<K, Message>> messages;
-
-		TypeInformation<Vertex<K, VV>> vertexTypes = initialVertices.getType();
-
-		final DeltaIteration<Vertex<K, VV>,	Vertex<K, VV>> iteration =
-				initialVertices.iterateDelta(initialVertices, this.maximumNumberOfIterations, 0);
-				setUpIteration(iteration);
-
-		switch (messagingDirection) {
-			case IN:
-				messages = buildMessagingFunction(iteration, messageTypeInfo, 1, 0);
-				break;
-			case OUT:
-				messages = buildMessagingFunction(iteration, messageTypeInfo, 0, 0);
-				break;
-			case ALL:
-				messages = buildMessagingFunction(iteration, messageTypeInfo, 1, 0)
-						.union(buildMessagingFunction(iteration, messageTypeInfo, 0, 0)) ;
-				break;
-			default:
-				throw new IllegalArgumentException("Illegal edge direction");
-		}
-
-		VertexUpdateUdf<K, VV, Message> updateUdf =
-				new VertexUpdateUdfSimpleVV<K, VV, Message>(updateFunction, vertexTypes);
-
-		// build the update function (co group)
-		CoGroupOperator<?, ?, Vertex<K, VV>> updates =
-				messages.coGroup(iteration.getSolutionSet()).where(0).equalTo(0).with(updateUdf);
-
-		configureUpdateFunction(updates);
-
-		return iteration.closeWith(updates, updates);
-	}
-
-	/**
-	 * Creates the operator that represents this vertex centric graph computation for a vertex with in
-	 * and out degrees added to the vertex value.
-	 *
-	 * @param graph
-	 * @param messagingDirection
-	 * @param messageTypeInfo
-	 * @return the operator
-	 */
-	@SuppressWarnings("serial")
-	private DataSet<Vertex<K, VV>> createResultVerticesWithDegrees(Graph<K, VV, EV> graph, EdgeDirection messagingDirection,
-			TypeInformation<Tuple2<K, Message>> messageTypeInfo) {
-
-		DataSet<Tuple2<K, Message>> messages;
-
-		this.updateFunction.setOptDegrees(this.configuration.isOptDegrees());
-
-		DataSet<Tuple2<K, Long>> inDegrees = graph.inDegrees();
-		DataSet<Tuple2<K, Long>> outDegrees = graph.outDegrees();
-
-		DataSet<Tuple3<K, Long, Long>> degrees = inDegrees.join(outDegrees).where(0).equalTo(0)
-				.with(new FlatJoinFunction<Tuple2<K, Long>, Tuple2<K, Long>, Tuple3<K, Long, Long>>() {
-
-					@Override
-					public void join(Tuple2<K, Long> first, Tuple2<K, Long> second,	Collector<Tuple3<K, Long, Long>> out) {
-						out.collect(new Tuple3<K, Long, Long>(first.f0, first.f1, second.f1));
-					}
-				}).withForwardedFieldsFirst("f0;f1").withForwardedFieldsSecond("f1");
-
-		DataSet<Vertex<K, Tuple3<VV, Long, Long>>> verticesWithDegrees = initialVertices
-				.join(degrees).where(0).equalTo(0)
-				.with(new FlatJoinFunction<Vertex<K,VV>, Tuple3<K,Long,Long>, Vertex<K, Tuple3<VV, Long, Long>>>() {
-					@Override
-					public void join(Vertex<K, VV> vertex, Tuple3<K, Long, Long> degrees,
-									Collector<Vertex<K, Tuple3<VV, Long, Long>>> out) throws Exception {
-
-						out.collect(new Vertex<K, Tuple3<VV, Long, Long>>(vertex.getId(),
-								new Tuple3<VV, Long, Long>(vertex.getValue(), degrees.f1, degrees.f2)));
-					}
-				}).withForwardedFieldsFirst("f0");
-
-		// add type info
-		TypeInformation<Vertex<K, Tuple3<VV, Long, Long>>> vertexTypes = verticesWithDegrees.getType();
-
-		final DeltaIteration<Vertex<K, Tuple3<VV, Long, Long>>,	Vertex<K, Tuple3<VV, Long, Long>>> iteration =
-				verticesWithDegrees.iterateDelta(verticesWithDegrees, this.maximumNumberOfIterations, 0);
-				setUpIteration(iteration);
-
-		switch (messagingDirection) {
-			case IN:
-				messages = buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 1, 0);
-				break;
-			case OUT:
-				messages = buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 0, 0);
-				break;
-			case ALL:
-				messages = buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 1, 0)
-						.union(buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 0, 0)) ;
-				break;
-			default:
-				throw new IllegalArgumentException("Illegal edge direction");
-		}
-
-		@SuppressWarnings({ "unchecked", "rawtypes" })
-		VertexUpdateUdf<K, Tuple3<VV, Long, Long>, Message> updateUdf =
-				new VertexUpdateUdfVVWithDegrees(updateFunction, vertexTypes);
-
-		// build the update function (co group)
-		CoGroupOperator<?, ?, Vertex<K, Tuple3<VV, Long, Long>>> updates =
-				messages.coGroup(iteration.getSolutionSet()).where(0).equalTo(0).with(updateUdf);
-
-		configureUpdateFunction(updates);
-
-		return iteration.closeWith(updates, updates).map(
-				new MapFunction<Vertex<K, Tuple3<VV, Long, Long>>, Vertex<K, VV>>() {
-
-					public Vertex<K, VV> map(Vertex<K, Tuple3<VV, Long, Long>> vertex) {
-						return new Vertex<K, VV>(vertex.getId(), vertex.getValue().f0);
-					}
-				});
-	}
-
-	private <VVWithDegree> void configureUpdateFunction(CoGroupOperator<?, ?, Vertex<K, VVWithDegree>> updates) {
-
-		// configure coGroup update function with name and broadcast variables
-		updates = updates.name("Vertex State Updates");
-		if (this.configuration != null) {
-			for (Tuple2<String, DataSet<?>> e : this.configuration.getUpdateBcastVars()) {
-				updates = updates.withBroadcastSet(e.f1, e.f0);
-			}
-		}
-
-		// let the operator know that we preserve the key field
-		updates.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0");
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
deleted file mode 100644
index 248925b..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.spargel;
-
-import java.io.Serializable;
-import java.util.Collection;
-
-import org.apache.flink.api.common.aggregators.Aggregator;
-import org.apache.flink.api.common.functions.IterationRuntimeContext;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.types.Value;
-import org.apache.flink.util.Collector;
-
-/**
- * This class must be extended by functions that compute the state of the vertex depending on the old state and the
- * incoming messages. The central method is {@link #updateVertex(Comparable, Object, MessageIterator)}, which is
- * invoked once per vertex per superstep.
- * 
- * <K> The vertex key type.
- * <VV> The vertex value type.
- * <Message> The message type.
- */
-public abstract class VertexUpdateFunction<K, VV, Message> implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-
-	// --------------------------------------------------------------------------------------------
-	//  Attributes that allow vertices to access their in/out degrees and the total number of vertices
-	//  inside an iteration.
-	// --------------------------------------------------------------------------------------------
-
-	private long numberOfVertices = -1L;
-
-	/**
-	 * Retrieves the number of vertices in the graph.
-	 * @return the number of vertices if the {@link IterationConfiguration#setOptNumVertices(boolean)}
-	 * option has been set; -1 otherwise.
-	 */
-	public long getNumberOfVertices() {
-		return numberOfVertices;
-	}
-
-	void setNumberOfVertices(long numberOfVertices) {
-		this.numberOfVertices = numberOfVertices;
-	}
-
-	//---------------------------------------------------------------------------------------------
-
-	private boolean optDegrees;
-
-	boolean isOptDegrees() {
-		return optDegrees;
-	}
-
-	void setOptDegrees(boolean optDegrees) {
-		this.optDegrees = optDegrees;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Public API Methods
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * This method is invoked once per vertex per superstep. It receives the current state of the vertex, as well as
-	 * the incoming messages. It may set a new vertex state via {@link #setNewVertexValue(Object)}. If the vertex
-	 * state is changed, it will trigger the sending of messages via the {@link MessagingFunction}.
-	 * 
-	 * @param vertex The vertex.
-	 * @param inMessages The incoming messages to this vertex.
-	 * 
-	 * @throws Exception The computation may throw exceptions, which causes the superstep to fail.
-	 */
-	public abstract void updateVertex(Vertex<K, VV> vertex, MessageIterator<Message> inMessages) throws Exception;
-	
-	/**
-	 * This method is executed one per superstep before the vertex update function is invoked for each vertex.
-	 * 
-	 * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail.
-	 */
-	public void preSuperstep() throws Exception {}
-	
-	/**
-	 * This method is executed one per superstep after the vertex update function has been invoked for each vertex.
-	 * 
-	 * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail.
-	 */
-	public void postSuperstep() throws Exception {}
-	
-	/**
-	 * Sets the new value of this vertex. Setting a new value triggers the sending of outgoing messages from this vertex.
-	 *
-	 * This should be called at most once per updateVertex.
-	 * 
-	 * @param newValue The new vertex value.
-	 */
-	public void setNewVertexValue(VV newValue) {
-		if(setNewVertexValueCalled) {
-			throw new IllegalStateException("setNewVertexValue should only be called at most once per updateVertex");
-		}
-		setNewVertexValueCalled = true;
-		if(isOptDegrees()) {
-			outValWithDegrees.f1.f0 = newValue;
-			outWithDegrees.collect(outValWithDegrees);
-		} else {
-			outVal.setValue(newValue);
-			out.collect(outVal);
-		}
-	}
-	
-	/**
-	 * Gets the number of the superstep, starting at <tt>1</tt>.
-	 * 
-	 * @return The number of the current superstep.
-	 */
-	public int getSuperstepNumber() {
-		return this.runtimeContext.getSuperstepNumber();
-	}
-	
-	/**
-	 * Gets the iteration aggregator registered under the given name. The iteration aggregator combines
-	 * all aggregates globally once per superstep and makes them available in the next superstep.
-	 * 
-	 * @param name The name of the aggregator.
-	 * @return The aggregator registered under this name, or null, if no aggregator was registered.
-	 */
-	public <T extends Aggregator<?>> T getIterationAggregator(String name) {
-		return this.runtimeContext.<T>getIterationAggregator(name);
-	}
-	
-	/**
-	 * Get the aggregated value that an aggregator computed in the previous iteration.
-	 * 
-	 * @param name The name of the aggregator.
-	 * @return The aggregated value of the previous iteration.
-	 */
-	public <T extends Value> T getPreviousIterationAggregate(String name) {
-		return this.runtimeContext.<T>getPreviousIterationAggregate(name);
-	}
-	
-	/**
-	 * Gets the broadcast data set registered under the given name. Broadcast data sets
-	 * are available on all parallel instances of a function. They can be registered via
-	 * {@link org.apache.flink.graph.spargel.VertexCentricConfiguration#addBroadcastSetForUpdateFunction(String, org.apache.flink.api.java.DataSet)}.
-	 * 
-	 * @param name The name under which the broadcast set is registered.
-	 * @return The broadcast data set.
-	 */
-	public <T> Collection<T> getBroadcastSet(String name) {
-		return this.runtimeContext.<T>getBroadcastVariable(name);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  internal methods
-	// --------------------------------------------------------------------------------------------
-	
-	private IterationRuntimeContext runtimeContext;
-
-	private Collector<Vertex<K, VV>> out;
-	
-	private Collector<Vertex<K, Tuple3<VV, Long, Long>>> outWithDegrees;
-
-	private Vertex<K, VV> outVal;
-
-	private Vertex<K, Tuple3<VV, Long, Long>> outValWithDegrees;
-
-	private long inDegree = -1;
-
-	private long outDegree = -1;
-
-	private boolean setNewVertexValueCalled;
-
-	void init(IterationRuntimeContext context) {
-		this.runtimeContext = context;
-	}
-
-	void setOutput(Vertex<K, VV> outVal, Collector<Vertex<K, VV>> out) {
-		this.outVal = outVal;
-		this.out = out;
-		setNewVertexValueCalled = false;
-	}
-
-	@SuppressWarnings({ "unchecked", "rawtypes" })
-	<ValueWithDegree> void setOutputWithDegrees(Vertex<K, ValueWithDegree> outVal,
-			Collector out) {
-		this.outValWithDegrees = (Vertex<K, Tuple3<VV, Long, Long>>) outVal;
-		this.outWithDegrees = out;
-		setNewVertexValueCalled = false;
-	}
-
-	/**
-	 * Retrieves the vertex in-degree (number of in-coming edges).
-	 * @return The in-degree of this vertex if the {@link IterationConfiguration#setOptDegrees(boolean)}
-	 * option has been set; -1 otherwise. 
-	 */
-	public long getInDegree() {
-		return inDegree;
-	}
-
-	void setInDegree(long inDegree) {
-		this.inDegree = inDegree;
-	}
-
-	/**
-	 * Retrieve the vertex out-degree (number of out-going edges).
-	 * @return The out-degree of this vertex if the {@link IterationConfiguration#setOptDegrees(boolean)}
-	 * option has been set; -1 otherwise. 
-	 */
-	public long getOutDegree() {
-		return outDegree;
-	}
-
-	void setOutDegree(long outDegree) {
-		this.outDegree = outDegree;
-	}
-
-	/**
-	 * In order to hide the Tuple3(actualValue, inDegree, OutDegree) vertex value from the user,
-	 * another function will be called from {@link org.apache.flink.graph.spargel.VertexCentricIteration}.
-	 *
-	 * This function will retrieve the vertex from the vertexState and will set its degrees, afterwards calling
-	 * the regular updateVertex function.
-	 *
-	 * @param vertexState
-	 * @param inMessages
-	 * @throws Exception
-	 */
-	@SuppressWarnings("unchecked")
-	<VertexWithDegree> void updateVertexFromVertexCentricIteration(Vertex<K, VertexWithDegree> vertexState,
-												MessageIterator<Message> inMessages) throws Exception {
-
-		Vertex<K, VV> vertex = new Vertex<K, VV>(vertexState.f0,
-				((Tuple3<VV, Long, Long>)vertexState.getValue()).f0);
-
-		updateVertex(vertex, inMessages);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple3Map.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple3Map.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple3Map.java
deleted file mode 100644
index 0e085b4..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple3Map.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.utils;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.graph.Edge;
-
-@ForwardedFields("f0; f1; f2")
-public class EdgeToTuple3Map<K, EV> implements MapFunction<Edge<K, EV>, Tuple3<K, K, EV>> {
-
-	private static final long serialVersionUID = 1L;
-
-	public Tuple3<K, K, EV> map(Edge<K, EV> edge) {
-		return new Tuple3<K, K, EV>(edge.f0, edge.f1, edge.f2);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/NullValueEdgeMapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/NullValueEdgeMapper.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/NullValueEdgeMapper.java
deleted file mode 100644
index 2bd4719..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/NullValueEdgeMapper.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.utils;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.types.NullValue;
-
-public class NullValueEdgeMapper<K, EV> implements	MapFunction<Edge<K, EV>, NullValue> {
-
-	private static final long serialVersionUID = 1L;
-
-	public NullValue map(Edge<K, EV> edge) {
-		return NullValue.getInstance();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToVertexMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToVertexMap.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToVertexMap.java
deleted file mode 100644
index e51362b..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToVertexMap.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.utils;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
-
-@ForwardedFields("f0; f1")
-public class Tuple2ToVertexMap<K, VV> implements MapFunction<Tuple2<K, VV>, Vertex<K, VV>> {
-
-	private static final long serialVersionUID = 1L;
-
-	public Vertex<K, VV> map(Tuple2<K, VV> vertex) {
-		return new Vertex<K, VV>(vertex.f0, vertex.f1);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java
deleted file mode 100644
index 0db9a51..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.utils;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
-
-/**
- * create an Edge DataSetfrom a Tuple3 dataset
- *
- * @param <K>
- * @param <EV>
- */
-@ForwardedFields("f0; f1; f2")
-public class Tuple3ToEdgeMap<K, EV> implements MapFunction<Tuple3<K, K, EV>, Edge<K, EV>> {
-
-	private static final long serialVersionUID = 1L;
-
-	public Edge<K, EV> map(Tuple3<K, K, EV> tuple) {
-		return new Edge<K, EV>(tuple.f0, tuple.f1, tuple.f2);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/VertexToTuple2Map.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/VertexToTuple2Map.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/VertexToTuple2Map.java
deleted file mode 100644
index 04d1f47..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/VertexToTuple2Map.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.utils;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
-
-@ForwardedFields("f0; f1")
-public class VertexToTuple2Map<K, VV> implements MapFunction<Vertex<K, VV>, Tuple2<K, VV>> {
-
-	private static final long serialVersionUID = 1L;
-
-	public Tuple2<K, VV> map(Vertex<K, VV> vertex) {
-		return new Tuple2<K, VV>(vertex.f0, vertex.f1);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
deleted file mode 100644
index 75b672c..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.validation;
-
-import java.io.Serializable;
-
-import org.apache.flink.graph.Graph;
-
-/**
- * A utility for defining validation criteria for different types of Graphs.
- * 
- * @param <K> the vertex key type
- * @param <VV> the vertex value type
- * @param <EV> the edge value type
- */
-@SuppressWarnings("serial")
-public abstract class GraphValidator<K, VV, EV>	implements Serializable {
-
-	public abstract boolean validate(Graph<K, VV, EV> graph) throws Exception;
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
deleted file mode 100644
index 33d469b..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.validation;
-
-import org.apache.flink.api.common.functions.CoGroupFunction;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("serial")
-public class InvalidVertexIdsValidator<K, VV, EV> extends GraphValidator<K, VV, EV> {
-
-	/**
-	 * Checks that the edge set input contains valid vertex Ids, i.e. that they
-	 * also exist in the vertex input set.
-	 * 
-	 * @return a boolean stating whether a graph is valid
-	 *         with respect to its vertex ids.
-	 */
-	@Override
-	public boolean validate(Graph<K, VV, EV> graph) throws Exception {
-		DataSet<Tuple1<K>> edgeIds = graph.getEdges()
-				.flatMap(new MapEdgeIds<K, EV>()).distinct();
-		DataSet<K> invalidIds = graph.getVertices().coGroup(edgeIds).where(0)
-				.equalTo(0).with(new GroupInvalidIds<K, VV>()).first(1);
-
-		return invalidIds.map(new KToTupleMap<K>()).count() == 0;
-	}
-
-	private static final class MapEdgeIds<K, EV> implements FlatMapFunction<Edge<K, EV>, Tuple1<K>> {
-		public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) {
-			out.collect(new Tuple1<K>(edge.f0));
-			out.collect(new Tuple1<K>(edge.f1));
-		}
-	}
-
-	private static final class GroupInvalidIds<K, VV> implements CoGroupFunction<Vertex<K, VV>, Tuple1<K>, K> {
-		public void coGroup(Iterable<Vertex<K, VV>> vertexId,
-				Iterable<Tuple1<K>> edgeId, Collector<K> out) {
-			if (!(vertexId.iterator().hasNext())) {
-				// found an id that doesn't exist in the vertex set
-				out.collect(edgeId.iterator().next().f0);
-			}
-		}
-	}
-
-	private static final class KToTupleMap<K> implements MapFunction<K, Tuple1<K>> {
-		public Tuple1<K> map(K key) throws Exception {
-			return new Tuple1<K>(key);
-		}
-	}
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSACompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSACompilerTest.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSACompilerTest.java
deleted file mode 100644
index 2ad203f..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSACompilerTest.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.gsa;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
-import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
-import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.types.NullValue;
-import org.junit.Test;
-
-public class GSACompilerTest extends CompilerTestBase {
-
-	private static final long serialVersionUID = 1L;
-
-	@Test
-	public void testGSACompiler() {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setParallelism(DEFAULT_PARALLELISM);
-			// compose test program
-			{
-				DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple3<Long, Long, NullValue>(
-						1L, 2L, NullValue.getInstance())).map(new Tuple3ToEdgeMap<Long, NullValue>());
-
-				Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, new InitVertices(), env);
-
-				DataSet<Vertex<Long, Long>> result = graph.runGatherSumApplyIteration(
-						new GatherNeighborIds(), new SelectMinId(),
-						new UpdateComponentId(), 100).getVertices();
-				
-				result.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
-			}
-			
-			Plan p = env.createProgramPlan("GSA Connected Components");
-			OptimizedPlan op = compileNoStats(p);
-			
-			// check the sink
-			SinkPlanNode sink = op.getDataSinks().iterator().next();
-			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
-			assertEquals(DEFAULT_PARALLELISM, sink.getParallelism());
-			assertEquals(PartitioningProperty.HASH_PARTITIONED, sink.getGlobalProperties().getPartitioning());
-			
-			// check the iteration
-			WorksetIterationPlanNode iteration = (WorksetIterationPlanNode) sink.getInput().getSource();
-			assertEquals(DEFAULT_PARALLELISM, iteration.getParallelism());
-			
-			// check the solution set join and the delta
-			PlanNode ssDelta = iteration.getSolutionSetDeltaPlanNode();
-			assertTrue(ssDelta instanceof DualInputPlanNode); // this is only true if the update function preserves the partitioning
-			
-			DualInputPlanNode ssJoin = (DualInputPlanNode) ssDelta;
-			assertEquals(DEFAULT_PARALLELISM, ssJoin.getParallelism());
-			assertEquals(ShipStrategyType.PARTITION_HASH, ssJoin.getInput1().getShipStrategy());
-			assertEquals(new FieldList(0), ssJoin.getInput1().getShipStrategyKeys());
-			
-			// check the workset set join
-			SingleInputPlanNode sumReducer = (SingleInputPlanNode) ssJoin.getInput1().getSource();
-			SingleInputPlanNode gatherMapper = (SingleInputPlanNode) sumReducer.getInput().getSource();
-			DualInputPlanNode edgeJoin = (DualInputPlanNode) gatherMapper.getInput().getSource(); 
-			assertEquals(DEFAULT_PARALLELISM, edgeJoin.getParallelism());
-			// input1 is the workset
-			assertEquals(ShipStrategyType.FORWARD, edgeJoin.getInput1().getShipStrategy());
-			// input2 is the edges
-			assertEquals(ShipStrategyType.PARTITION_HASH, edgeJoin.getInput2().getShipStrategy());
-			assertTrue(edgeJoin.getInput2().getTempMode().isCached());
-
-			assertEquals(new FieldList(0), edgeJoin.getInput2().getShipStrategyKeys());
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class InitVertices	implements MapFunction<Long, Long> {
-
-		public Long map(Long vertexId) {
-			return vertexId;
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class GatherNeighborIds extends GatherFunction<Long, NullValue, Long> {
-
-		public Long gather(Neighbor<Long, NullValue> neighbor) {
-			return neighbor.getNeighborValue();
-		}
-	};
-
-	@SuppressWarnings("serial")
-	private static final class SelectMinId extends SumFunction<Long, NullValue, Long> {
-
-		public Long sum(Long newValue, Long currentValue) {
-			return Math.min(newValue, currentValue);
-		}
-	};
-
-	@SuppressWarnings("serial")
-	private static final class UpdateComponentId extends ApplyFunction<Long, Long, Long> {
-
-		public void apply(Long summedValue, Long origValue) {
-			if (summedValue < origValue) {
-				setResult(summedValue);
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSATranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSATranslationTest.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSATranslationTest.java
deleted file mode 100644
index ced7508..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSATranslationTest.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.gsa;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import org.apache.flink.api.common.aggregators.LongSumAggregator;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.api.java.operators.DeltaIteration;
-import org.apache.flink.api.java.operators.DeltaIterationResultSet;
-import org.apache.flink.api.java.operators.SingleInputUdfOperator;
-import org.apache.flink.api.java.operators.TwoInputUdfOperator;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
-import org.apache.flink.types.NullValue;
-import org.junit.Test;
-
-public class GSATranslationTest {
-
-	@Test
-	public void testTranslation() {
-		try {
-			final String ITERATION_NAME = "Test Name";
-			
-			final String AGGREGATOR_NAME = "AggregatorName";
-			
-			final String BC_SET_GATHER_NAME = "gather messages";
-			
-			final String BC_SET_SUM_NAME = "sum updates";
-
-			final String BC_SET_APLLY_NAME = "apply updates";
-
-			final int NUM_ITERATIONS = 13;
-			
-			final int ITERATION_parallelism = 77;
-			
-			
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			
-			DataSet<Long> bcGather = env.fromElements(1L);
-			DataSet<Long> bcSum = env.fromElements(1L);
-			DataSet<Long> bcApply = env.fromElements(1L);
-
-			DataSet<Vertex<Long, Long>> result;
-
-			// ------------ construct the test program ------------------
-			{
-
-				DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple3<Long, Long, NullValue>(
-						1L, 2L, NullValue.getInstance())).map(new Tuple3ToEdgeMap<Long, NullValue>());
-
-				Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, new InitVertices(), env);
-
-				GSAConfiguration parameters = new GSAConfiguration();
-
-				parameters.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator());
-				parameters.setName(ITERATION_NAME);
-				parameters.setParallelism(ITERATION_parallelism);
-				parameters.addBroadcastSetForGatherFunction(BC_SET_GATHER_NAME, bcGather);
-				parameters.addBroadcastSetForSumFunction(BC_SET_SUM_NAME, bcSum);
-				parameters.addBroadcastSetForApplyFunction(BC_SET_APLLY_NAME, bcApply);
-
-				result = graph.runGatherSumApplyIteration(
-						new GatherNeighborIds(), new SelectMinId(),
-						new UpdateComponentId(), NUM_ITERATIONS, parameters).getVertices();
-				
-				result.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
-			}
-			
-			
-			// ------------- validate the java program ----------------
-			
-			assertTrue(result instanceof DeltaIterationResultSet);
-			
-			DeltaIterationResultSet<?, ?> resultSet = (DeltaIterationResultSet<?, ?>) result;
-			DeltaIteration<?, ?> iteration = (DeltaIteration<?, ?>) resultSet.getIterationHead();
-			
-			// check the basic iteration properties
-			assertEquals(NUM_ITERATIONS, resultSet.getMaxIterations());
-			assertArrayEquals(new int[] {0}, resultSet.getKeyPositions());
-			assertEquals(ITERATION_parallelism, iteration.getParallelism());
-			assertEquals(ITERATION_NAME, iteration.getName());
-			
-			assertEquals(AGGREGATOR_NAME, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());
-			
-			// validate that the semantic properties are set as they should
-			TwoInputUdfOperator<?, ?, ?, ?> solutionSetJoin = (TwoInputUdfOperator<?, ?, ?, ?>) resultSet.getNextWorkset();
-			assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(0, 0).contains(0));
-			assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(1, 0).contains(0));
-
-			SingleInputUdfOperator<?, ?, ?> sumReduce = (SingleInputUdfOperator<?, ?, ?>) solutionSetJoin.getInput1();
-			SingleInputUdfOperator<?, ?, ?> gatherMap = (SingleInputUdfOperator<?, ?, ?>) sumReduce.getInput();
-
-			// validate that the broadcast sets are forwarded
-			assertEquals(bcGather, gatherMap.getBroadcastSets().get(BC_SET_GATHER_NAME));
-			assertEquals(bcSum, sumReduce.getBroadcastSets().get(BC_SET_SUM_NAME));
-			assertEquals(bcApply, solutionSetJoin.getBroadcastSets().get(BC_SET_APLLY_NAME));
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class InitVertices	implements MapFunction<Long, Long> {
-
-		public Long map(Long vertexId) {
-			return vertexId;
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class GatherNeighborIds extends GatherFunction<Long, NullValue, Long> {
-
-		public Long gather(Neighbor<Long, NullValue> neighbor) {
-			return neighbor.getNeighborValue();
-		}
-	};
-
-	@SuppressWarnings("serial")
-	private static final class SelectMinId extends SumFunction<Long, NullValue, Long> {
-
-		public Long sum(Long newValue, Long currentValue) {
-			return Math.min(newValue, currentValue);
-		}
-	};
-
-	@SuppressWarnings("serial")
-	private static final class UpdateComponentId extends ApplyFunction<Long, Long, Long> {
-
-		public void apply(Long summedValue, Long origValue) {
-			if (summedValue < origValue) {
-				setResult(summedValue);
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
deleted file mode 100644
index 7a8143a..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.graph.spargel;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.junit.Test;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.graph.library.ConnectedComponents;
-import org.apache.flink.graph.utils.Tuple2ToVertexMap;
-
-
-public class SpargelCompilerTest extends CompilerTestBase {
-
-	private static final long serialVersionUID = 1L;
-
-	@SuppressWarnings("serial")
-	@Test
-	public void testSpargelCompiler() {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setParallelism(DEFAULT_PARALLELISM);
-			// compose test program
-			{
-
-				DataSet<Vertex<Long, Long>> initialVertices = env.fromElements(
-						new Tuple2<Long, Long>(1L, 1L), new Tuple2<Long, Long>(2L, 2L))
-						.map(new Tuple2ToVertexMap<Long, Long>());
-
-				DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L))
-					.map(new MapFunction<Tuple2<Long,Long>, Edge<Long, NullValue>>() {
-
-						public Edge<Long, NullValue> map(Tuple2<Long, Long> edge) {
-							return new Edge<Long, NullValue>(edge.f0, edge.f1, NullValue.getInstance());
-						}
-				});
-
-				Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);
-				
-				DataSet<Vertex<Long, Long>> result = graph.runVertexCentricIteration(
-						new ConnectedComponents.CCUpdater<Long>(),
-						new ConnectedComponents.CCMessenger<Long>(), 100)
-						.getVertices();
-				
-				result.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
-			}
-			
-			Plan p = env.createProgramPlan("Spargel Connected Components");
-			OptimizedPlan op = compileNoStats(p);
-			
-			// check the sink
-			SinkPlanNode sink = op.getDataSinks().iterator().next();
-			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
-			assertEquals(DEFAULT_PARALLELISM, sink.getParallelism());
-			
-			// check the iteration
-			WorksetIterationPlanNode iteration = (WorksetIterationPlanNode) sink.getInput().getSource();
-			assertEquals(DEFAULT_PARALLELISM, iteration.getParallelism());
-			
-			// check the solution set join and the delta
-			PlanNode ssDelta = iteration.getSolutionSetDeltaPlanNode();
-			assertTrue(ssDelta instanceof DualInputPlanNode); // this is only true if the update functions preserves the partitioning
-			
-			DualInputPlanNode ssJoin = (DualInputPlanNode) ssDelta;
-			assertEquals(DEFAULT_PARALLELISM, ssJoin.getParallelism());
-			assertEquals(ShipStrategyType.PARTITION_HASH, ssJoin.getInput1().getShipStrategy());
-			assertEquals(new FieldList(0), ssJoin.getInput1().getShipStrategyKeys());
-			
-			// check the workset set join
-			DualInputPlanNode edgeJoin = (DualInputPlanNode) ssJoin.getInput1().getSource();
-			assertEquals(DEFAULT_PARALLELISM, edgeJoin.getParallelism());
-			assertEquals(ShipStrategyType.PARTITION_HASH, edgeJoin.getInput1().getShipStrategy());
-			assertEquals(ShipStrategyType.FORWARD, edgeJoin.getInput2().getShipStrategy());
-			assertTrue(edgeJoin.getInput1().getTempMode().isCached());
-			
-			assertEquals(new FieldList(0), edgeJoin.getInput1().getShipStrategyKeys());
-			
-			// check that the initial partitioning is pushed out of the loop
-			assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput1().getShipStrategy());
-			assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput2().getShipStrategy());
-			assertEquals(new FieldList(0), iteration.getInput1().getShipStrategyKeys());
-			assertEquals(new FieldList(0), iteration.getInput2().getShipStrategyKeys());
-			
-			// check that the initial workset sort is outside the loop
-			assertEquals(LocalStrategy.SORT, iteration.getInput2().getLocalStrategy());
-			assertEquals(new FieldList(0), iteration.getInput2().getLocalStrategyKeys());
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@SuppressWarnings("serial")
-	@Test
-	public void testSpargelCompilerWithBroadcastVariable() {
-		try {
-			final String BC_VAR_NAME = "borat variable";
-			
-			
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setParallelism(DEFAULT_PARALLELISM);
-			// compose test program
-			{
-				DataSet<Long> bcVar = env.fromElements(1L);
-
-				DataSet<Vertex<Long, Long>> initialVertices = env.fromElements(
-						new Tuple2<Long, Long>(1L, 1L), new Tuple2<Long, Long>(2L, 2L))
-						.map(new Tuple2ToVertexMap<Long, Long>());
-
-				DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L))
-						.map(new MapFunction<Tuple2<Long,Long>, Edge<Long, NullValue>>() {
-
-							public Edge<Long, NullValue> map(Tuple2<Long, Long> edge) {
-								return new Edge<Long, NullValue>(edge.f0, edge.f1, NullValue.getInstance());
-							}
-					});
-
-				Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);
-
-				VertexCentricConfiguration parameters = new VertexCentricConfiguration();
-				parameters.addBroadcastSetForMessagingFunction(BC_VAR_NAME, bcVar);
-				parameters.addBroadcastSetForUpdateFunction(BC_VAR_NAME, bcVar);
-
-				DataSet<Vertex<Long, Long>> result = graph.runVertexCentricIteration(
-						new ConnectedComponents.CCUpdater<Long>(),
-						new ConnectedComponents.CCMessenger<Long>(), 100)
-						.getVertices();
-					
-				result.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
-
-			}
-			
-			Plan p = env.createProgramPlan("Spargel Connected Components");
-			OptimizedPlan op = compileNoStats(p);
-			
-			// check the sink
-			SinkPlanNode sink = op.getDataSinks().iterator().next();
-			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
-			assertEquals(DEFAULT_PARALLELISM, sink.getParallelism());
-			
-			// check the iteration
-			WorksetIterationPlanNode iteration = (WorksetIterationPlanNode) sink.getInput().getSource();
-			assertEquals(DEFAULT_PARALLELISM, iteration.getParallelism());
-			
-			// check the solution set join and the delta
-			PlanNode ssDelta = iteration.getSolutionSetDeltaPlanNode();
-			assertTrue(ssDelta instanceof DualInputPlanNode); // this is only true if the update functions preserves the partitioning
-			
-			DualInputPlanNode ssJoin = (DualInputPlanNode) ssDelta;
-			assertEquals(DEFAULT_PARALLELISM, ssJoin.getParallelism());
-			assertEquals(ShipStrategyType.PARTITION_HASH, ssJoin.getInput1().getShipStrategy());
-			assertEquals(new FieldList(0), ssJoin.getInput1().getShipStrategyKeys());
-			
-			// check the workset set join
-			DualInputPlanNode edgeJoin = (DualInputPlanNode) ssJoin.getInput1().getSource();
-			assertEquals(DEFAULT_PARALLELISM, edgeJoin.getParallelism());
-			assertEquals(ShipStrategyType.PARTITION_HASH, edgeJoin.getInput1().getShipStrategy());
-			assertEquals(ShipStrategyType.FORWARD, edgeJoin.getInput2().getShipStrategy());
-			assertTrue(edgeJoin.getInput1().getTempMode().isCached());
-			
-			assertEquals(new FieldList(0), edgeJoin.getInput1().getShipStrategyKeys());
-			
-			// check that the initial partitioning is pushed out of the loop
-			assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput1().getShipStrategy());
-			assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput2().getShipStrategy());
-			assertEquals(new FieldList(0), iteration.getInput1().getShipStrategyKeys());
-			assertEquals(new FieldList(0), iteration.getInput2().getShipStrategyKeys());
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-}