You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/10/09 18:05:44 UTC
[06/24] flink git commit: [FLINK-2833] [gelly] create a
flink-libraries module and move gelly there
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricConfiguration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricConfiguration.java
deleted file mode 100644
index afd4ffd..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricConfiguration.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.spargel;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.EdgeDirection;
-import org.apache.flink.graph.IterationConfiguration;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * A VertexCentricConfiguration object can be used to set the iteration name and
- * degree of parallelism, to register aggregators and use broadcast sets in
- * the {@link org.apache.flink.graph.spargel.VertexUpdateFunction} and {@link org.apache.flink.graph.spargel.MessagingFunction}
- *
- * The VertexCentricConfiguration object is passed as an argument to
- * {@link org.apache.flink.graph.Graph#runVertexCentricIteration (
- * org.apache.flink.graph.spargel.VertexUpdateFunction, org.apache.flink.graph.spargel.MessagingFunction, int,
- * VertexCentricConfiguration)}.
- */
-public class VertexCentricConfiguration extends IterationConfiguration {
-
- /** the broadcast variables for the update function **/
- private List<Tuple2<String, DataSet<?>>> bcVarsUpdate = new ArrayList<Tuple2<String,DataSet<?>>>();
-
- /** the broadcast variables for the messaging function **/
- private List<Tuple2<String, DataSet<?>>> bcVarsMessaging = new ArrayList<Tuple2<String,DataSet<?>>>();
-
- /** flag that defines whether the degrees option is set **/
- private boolean optDegrees = false;
-
- /** the direction in which the messages should be sent **/
- private EdgeDirection direction = EdgeDirection.OUT;
-
- public VertexCentricConfiguration() {}
-
- /**
- * Adds a data set as a broadcast set to the messaging function.
- *
- * @param name The name under which the broadcast data is available in the messaging function.
- * @param data The data set to be broadcasted.
- */
- public void addBroadcastSetForMessagingFunction(String name, DataSet<?> data) {
- this.bcVarsMessaging.add(new Tuple2<String, DataSet<?>>(name, data));
- }
-
- /**
- * Adds a data set as a broadcast set to the vertex update function.
- *
- * @param name The name under which the broadcast data is available in the vertex update function.
- * @param data The data set to be broadcasted.
- */
- public void addBroadcastSetForUpdateFunction(String name, DataSet<?> data) {
- this.bcVarsUpdate.add(new Tuple2<String, DataSet<?>>(name, data));
- }
-
- /**
- * Get the broadcast variables of the VertexUpdateFunction.
- *
- * @return a List of Tuple2, where the first field is the broadcast variable name
- * and the second field is the broadcast DataSet.
- */
- public List<Tuple2<String, DataSet<?>>> getUpdateBcastVars() {
- return this.bcVarsUpdate;
- }
-
- /**
- * Get the broadcast variables of the MessagingFunction.
- *
- * @return a List of Tuple2, where the first field is the broadcast variable name
- * and the second field is the broadcast DataSet.
- */
- public List<Tuple2<String, DataSet<?>>> getMessagingBcastVars() {
- return this.bcVarsMessaging;
- }
-
- /**
- * Gets whether the degrees option is set.
- * By default, the degrees option is not set.
- *
- * @return True, if the degree option is set, false otherwise.
- */
- public boolean isOptDegrees() {
- return optDegrees;
- }
-
- /**
- * Sets the degree option.
- * By default, the degrees option is not set.
- *
- * @param optDegrees True, to set this option, false otherwise.
- */
- public void setOptDegrees(boolean optDegrees) {
- this.optDegrees = optDegrees;
- }
-
- /**
- * Gets the direction in which messages are sent in the MessagingFunction.
- * By default the messaging direction is OUT.
- *
- * @return an EdgeDirection, which can be either IN, OUT or ALL.
- */
- public EdgeDirection getDirection() {
- return direction;
- }
-
- /**
- * Sets the direction in which messages are sent in the MessagingFunction.
- * By default the messaging direction is OUT.
- *
- * @param direction - IN, OUT or ALL
- */
- public void setDirection(EdgeDirection direction) {
- this.direction = direction;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
deleted file mode 100644
index b3a470e..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
+++ /dev/null
@@ -1,686 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.spargel;
-
-import java.util.Iterator;
-import java.util.Map;
-
-import org.apache.flink.api.common.aggregators.Aggregator;
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.operators.DeltaIteration;
-import org.apache.flink.api.common.functions.RichCoGroupFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.operators.CoGroupOperator;
-import org.apache.flink.api.java.operators.CustomUnaryOperation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.EdgeDirection;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.util.Collector;
-
-import com.google.common.base.Preconditions;
-
-/**
- * This class represents iterative graph computations, programmed in a vertex-centric perspective.
- * It is a special case of <i>Bulk Synchronous Parallel<i> computation. The paradigm has also been
- * implemented by Google's <i>Pregel</i> system and by <i>Apache Giraph</i>.
- * <p>
- * Vertex centric algorithms operate on graphs, which are defined through vertices and edges. The
- * algorithms send messages along the edges and update the state of vertices based on
- * the old state and the incoming messages. All vertices have an initial state.
- * The computation terminates once no vertex updates it state any more.
- * Additionally, a maximum number of iterations (supersteps) may be specified.
- * <p>
- * The computation is here represented by two functions:
- * <ul>
- * <li>The {@link VertexUpdateFunction} receives incoming messages and may updates the state for
- * the vertex. If a state is updated, messages are sent from this vertex. Initially, all vertices are
- * considered updated.</li>
- * <li>The {@link MessagingFunction} takes the new vertex state and sends messages along the outgoing
- * edges of the vertex. The outgoing edges may optionally have an associated value, such as a weight.</li>
- * </ul>
- * <p>
- *
- * Vertex-centric graph iterations are are run by calling
- * {@link Graph#runVertexCentricIteration(VertexUpdateFunction, MessagingFunction, int)}.
- *
- * @param <K> The type of the vertex key (the vertex identifier).
- * @param <VV> The type of the vertex value (the state of the vertex).
- * @param <Message> The type of the message sent between vertices along the edges.
- * @param <EV> The type of the values that are associated with the edges.
- */
-public class VertexCentricIteration<K, VV, Message, EV>
- implements CustomUnaryOperation<Vertex<K, VV>, Vertex<K, VV>>
-{
- private final VertexUpdateFunction<K, VV, Message> updateFunction;
-
- private final MessagingFunction<K, VV, Message, EV> messagingFunction;
-
- private final DataSet<Edge<K, EV>> edgesWithValue;
-
- private final int maximumNumberOfIterations;
-
- private final TypeInformation<Message> messageType;
-
- private DataSet<Vertex<K, VV>> initialVertices;
-
- private VertexCentricConfiguration configuration;
-
- // ----------------------------------------------------------------------------------
-
- private VertexCentricIteration(VertexUpdateFunction<K, VV, Message> uf,
- MessagingFunction<K, VV, Message, EV> mf,
- DataSet<Edge<K, EV>> edgesWithValue,
- int maximumNumberOfIterations)
- {
- Preconditions.checkNotNull(uf);
- Preconditions.checkNotNull(mf);
- Preconditions.checkNotNull(edgesWithValue);
- Preconditions.checkArgument(maximumNumberOfIterations > 0, "The maximum number of iterations must be at least one.");
-
- this.updateFunction = uf;
- this.messagingFunction = mf;
- this.edgesWithValue = edgesWithValue;
- this.maximumNumberOfIterations = maximumNumberOfIterations;
- this.messageType = getMessageType(mf);
- }
-
- private TypeInformation<Message> getMessageType(MessagingFunction<K, VV, Message, EV> mf) {
- return TypeExtractor.createTypeInfo(MessagingFunction.class, mf.getClass(), 2, null, null);
- }
-
- // --------------------------------------------------------------------------------------------
- // Custom Operator behavior
- // --------------------------------------------------------------------------------------------
-
- /**
- * Sets the input data set for this operator. In the case of this operator this input data set represents
- * the set of vertices with their initial state.
- *
- * @param inputData The input data set, which in the case of this operator represents the set of
- * vertices with their initial state.
- *
- * @see org.apache.flink.api.java.operators.CustomUnaryOperation#setInput(org.apache.flink.api.java.DataSet)
- */
- @Override
- public void setInput(DataSet<Vertex<K, VV>> inputData) {
- this.initialVertices = inputData;
- }
-
- /**
- * Creates the operator that represents this vertex-centric graph computation.
- *
- * @return The operator that represents this vertex-centric graph computation.
- */
- @Override
- public DataSet<Vertex<K, VV>> createResult() {
- if (this.initialVertices == null) {
- throw new IllegalStateException("The input data set has not been set.");
- }
-
- // prepare some type information
- TypeInformation<K> keyType = ((TupleTypeInfo<?>) initialVertices.getType()).getTypeAt(0);
- TypeInformation<Tuple2<K, Message>> messageTypeInfo = new TupleTypeInfo<Tuple2<K,Message>>(keyType, messageType);
-
- // create a graph
- Graph<K, VV, EV> graph =
- Graph.fromDataSet(initialVertices, edgesWithValue, initialVertices.getExecutionEnvironment());
-
- // check whether the numVertices option is set and, if so, compute the total number of vertices
- // and set it within the messaging and update functions
-
- if (this.configuration != null && this.configuration.isOptNumVertices()) {
- try {
- long numberOfVertices = graph.numberOfVertices();
- messagingFunction.setNumberOfVertices(numberOfVertices);
- updateFunction.setNumberOfVertices(numberOfVertices);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- if(this.configuration != null) {
- messagingFunction.setDirection(this.configuration.getDirection());
- } else {
- messagingFunction.setDirection(EdgeDirection.OUT);
- }
-
- // retrieve the direction in which the updates are made and in which the messages are sent
- EdgeDirection messagingDirection = messagingFunction.getDirection();
-
- // check whether the degrees option is set and, if so, compute the in and the out degrees and
- // add them to the vertex value
- if(this.configuration != null && this.configuration.isOptDegrees()) {
- return createResultVerticesWithDegrees(graph, messagingDirection, messageTypeInfo);
- } else {
- return createResultSimpleVertex(messagingDirection, messageTypeInfo);
- }
- }
-
- /**
- * Creates a new vertex-centric iteration operator for graphs where the edges are associated with a value (such as
- * a weight or distance).
- *
- * @param edgesWithValue The data set containing edges.
- * @param uf The function that updates the state of the vertices from the incoming messages.
- * @param mf The function that turns changed vertex states into messages along the edges.
- *
- * @param <K> The type of the vertex key (the vertex identifier).
- * @param <VV> The type of the vertex value (the state of the vertex).
- * @param <Message> The type of the message sent between vertices along the edges.
- * @param <EV> The type of the values that are associated with the edges.
- *
- * @return An in stance of the vertex-centric graph computation operator.
- */
- public static final <K, VV, Message, EV>
- VertexCentricIteration<K, VV, Message, EV> withEdges(
- DataSet<Edge<K, EV>> edgesWithValue,
- VertexUpdateFunction<K, VV, Message> uf,
- MessagingFunction<K, VV, Message, EV> mf,
- int maximumNumberOfIterations)
- {
- return new VertexCentricIteration<K, VV, Message, EV>(uf, mf, edgesWithValue, maximumNumberOfIterations);
- }
-
- /**
- * Configures this vertex-centric iteration with the provided parameters.
- *
- * @param parameters the configuration parameters
- */
- public void configure(VertexCentricConfiguration parameters) {
- this.configuration = parameters;
- }
-
- /**
- * @return the configuration parameters of this vertex-centric iteration
- */
- public VertexCentricConfiguration getIterationConfiguration() {
- return this.configuration;
- }
-
- // --------------------------------------------------------------------------------------------
- // Wrapping UDFs
- // --------------------------------------------------------------------------------------------
-
- private static abstract class VertexUpdateUdf<K, VVWithDegrees, Message> extends RichCoGroupFunction<
- Tuple2<K, Message>, Vertex<K, VVWithDegrees>, Vertex<K, VVWithDegrees>>
- implements ResultTypeQueryable<Vertex<K, VVWithDegrees>>
- {
- private static final long serialVersionUID = 1L;
-
- final VertexUpdateFunction<K, VVWithDegrees, Message> vertexUpdateFunction;
-
- final MessageIterator<Message> messageIter = new MessageIterator<Message>();
-
- private transient TypeInformation<Vertex<K, VVWithDegrees>> resultType;
-
-
- private VertexUpdateUdf(VertexUpdateFunction<K, VVWithDegrees, Message> vertexUpdateFunction,
- TypeInformation<Vertex<K, VVWithDegrees>> resultType)
- {
- this.vertexUpdateFunction = vertexUpdateFunction;
- this.resultType = resultType;
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
- this.vertexUpdateFunction.init(getIterationRuntimeContext());
- }
- this.vertexUpdateFunction.preSuperstep();
- }
-
- @Override
- public void close() throws Exception {
- this.vertexUpdateFunction.postSuperstep();
- }
-
- @Override
- public TypeInformation<Vertex<K, VVWithDegrees>> getProducedType() {
- return this.resultType;
- }
- }
-
- @SuppressWarnings("serial")
- private static final class VertexUpdateUdfSimpleVV<K, VV, Message> extends VertexUpdateUdf<K, VV, Message> {
-
- private VertexUpdateUdfSimpleVV(VertexUpdateFunction<K, VV, Message> vertexUpdateFunction, TypeInformation<Vertex<K, VV>> resultType) {
- super(vertexUpdateFunction, resultType);
- }
-
- @Override
- public void coGroup(Iterable<Tuple2<K, Message>> messages,
- Iterable<Vertex<K, VV>> vertex,
- Collector<Vertex<K, VV>> out) throws Exception {
- final Iterator<Vertex<K, VV>> vertexIter = vertex.iterator();
-
- if (vertexIter.hasNext()) {
- Vertex<K, VV> vertexState = vertexIter.next();
-
- @SuppressWarnings("unchecked")
- Iterator<Tuple2<?, Message>> downcastIter = (Iterator<Tuple2<?, Message>>) (Iterator<?>) messages.iterator();
- messageIter.setSource(downcastIter);
-
- vertexUpdateFunction.setOutput(vertexState, out);
- vertexUpdateFunction.updateVertex(vertexState, messageIter);
- }
- else {
- final Iterator<Tuple2<K, Message>> messageIter = messages.iterator();
- if (messageIter.hasNext()) {
- String message = "Target vertex does not exist!.";
- try {
- Tuple2<K, Message> next = messageIter.next();
- message = "Target vertex '" + next.f0 + "' does not exist!.";
- } catch (Throwable t) {}
- throw new Exception(message);
- } else {
- throw new Exception();
- }
- }
- }
- }
-
- @SuppressWarnings("serial")
- private static final class VertexUpdateUdfVVWithDegrees<K, VV, Message> extends VertexUpdateUdf<K, Tuple3<VV, Long, Long>, Message> {
-
- private VertexUpdateUdfVVWithDegrees(VertexUpdateFunction<K, Tuple3<VV, Long, Long>, Message> vertexUpdateFunction,
- TypeInformation<Vertex<K, Tuple3<VV, Long, Long>>> resultType) {
- super(vertexUpdateFunction, resultType);
- }
-
- @Override
- public void coGroup(Iterable<Tuple2<K, Message>> messages, Iterable<Vertex<K, Tuple3<VV, Long, Long>>> vertex,
- Collector<Vertex<K, Tuple3<VV, Long, Long>>> out) throws Exception {
-
- final Iterator<Vertex<K, Tuple3<VV, Long, Long>>> vertexIter = vertex.iterator();
-
- if (vertexIter.hasNext()) {
- Vertex<K, Tuple3<VV, Long, Long>> vertexWithDegrees = vertexIter.next();
-
- @SuppressWarnings("unchecked")
- Iterator<Tuple2<?, Message>> downcastIter = (Iterator<Tuple2<?, Message>>) (Iterator<?>) messages.iterator();
- messageIter.setSource(downcastIter);
-
- vertexUpdateFunction.setInDegree(vertexWithDegrees.f1.f1);
- vertexUpdateFunction.setOutDegree(vertexWithDegrees.f1.f2);
-
- vertexUpdateFunction.setOutputWithDegrees(vertexWithDegrees, out);
- vertexUpdateFunction.updateVertexFromVertexCentricIteration(vertexWithDegrees, messageIter);
- }
- else {
- final Iterator<Tuple2<K, Message>> messageIter = messages.iterator();
- if (messageIter.hasNext()) {
- String message = "Target vertex does not exist!.";
- try {
- Tuple2<K, Message> next = messageIter.next();
- message = "Target vertex '" + next.f0 + "' does not exist!.";
- } catch (Throwable t) {}
- throw new Exception(message);
- } else {
- throw new Exception();
- }
- }
- }
- }
-
- /*
- * UDF that encapsulates the message sending function for graphs where the edges have an associated value.
- */
- private static abstract class MessagingUdfWithEdgeValues<K, VVWithDegrees, VV, Message, EV>
- extends RichCoGroupFunction<Edge<K, EV>, Vertex<K, VVWithDegrees>, Tuple2<K, Message>>
- implements ResultTypeQueryable<Tuple2<K, Message>>
- {
- private static final long serialVersionUID = 1L;
-
- final MessagingFunction<K, VV, Message, EV> messagingFunction;
-
- private transient TypeInformation<Tuple2<K, Message>> resultType;
-
-
- private MessagingUdfWithEdgeValues(MessagingFunction<K, VV, Message, EV> messagingFunction,
- TypeInformation<Tuple2<K, Message>> resultType)
- {
- this.messagingFunction = messagingFunction;
- this.resultType = resultType;
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
- this.messagingFunction.init(getIterationRuntimeContext());
- }
-
- this.messagingFunction.preSuperstep();
- }
-
- @Override
- public void close() throws Exception {
- this.messagingFunction.postSuperstep();
- }
-
- @Override
- public TypeInformation<Tuple2<K, Message>> getProducedType() {
- return this.resultType;
- }
- }
-
- @SuppressWarnings("serial")
- private static final class MessagingUdfWithEVsSimpleVV<K, VV, Message, EV>
- extends MessagingUdfWithEdgeValues<K, VV, VV, Message, EV> {
-
- private MessagingUdfWithEVsSimpleVV(MessagingFunction<K, VV, Message, EV> messagingFunction,
- TypeInformation<Tuple2<K, Message>> resultType) {
- super(messagingFunction, resultType);
- }
-
- @Override
- public void coGroup(Iterable<Edge<K, EV>> edges,
- Iterable<Vertex<K, VV>> state,
- Collector<Tuple2<K, Message>> out) throws Exception {
- final Iterator<Vertex<K, VV>> stateIter = state.iterator();
-
- if (stateIter.hasNext()) {
- Vertex<K, VV> newVertexState = stateIter.next();
- messagingFunction.set((Iterator<?>) edges.iterator(), out);
- messagingFunction.sendMessages(newVertexState);
- }
- }
- }
-
- @SuppressWarnings("serial")
- private static final class MessagingUdfWithEVsVVWithDegrees<K, VV, Message, EV>
- extends MessagingUdfWithEdgeValues<K, Tuple3<VV, Long, Long>, VV, Message, EV> {
-
- private Vertex<K, VV> nextVertex = new Vertex<K, VV>();
-
- private MessagingUdfWithEVsVVWithDegrees(MessagingFunction<K, VV, Message, EV> messagingFunction,
- TypeInformation<Tuple2<K, Message>> resultType) {
- super(messagingFunction, resultType);
- }
-
- @Override
- public void coGroup(Iterable<Edge<K, EV>> edges, Iterable<Vertex<K, Tuple3<VV, Long, Long>>> state,
- Collector<Tuple2<K, Message>> out) throws Exception {
-
- final Iterator<Vertex<K, Tuple3<VV, Long, Long>>> stateIter = state.iterator();
-
- if (stateIter.hasNext()) {
- Vertex<K, Tuple3<VV, Long, Long>> vertexWithDegrees = stateIter.next();
-
- nextVertex.setField(vertexWithDegrees.f0, 0);
- nextVertex.setField(vertexWithDegrees.f1.f0, 1);
-
- messagingFunction.setInDegree(vertexWithDegrees.f1.f1);
- messagingFunction.setOutDegree(vertexWithDegrees.f1.f2);
-
- messagingFunction.set((Iterator<?>) edges.iterator(), out);
- messagingFunction.sendMessages(nextVertex);
- }
- }
- }
-
-
- // --------------------------------------------------------------------------------------------
- // UTIL methods
- // --------------------------------------------------------------------------------------------
-
- /**
- * Method that builds the messaging function using a coGroup operator for a simple vertex(without
- * degrees).
- * It afterwards configures the function with a custom name and broadcast variables.
- *
- * @param iteration
- * @param messageTypeInfo
- * @param whereArg the argument for the where within the coGroup
- * @param equalToArg the argument for the equalTo within the coGroup
- * @return the messaging function
- */
- private CoGroupOperator<?, ?, Tuple2<K, Message>> buildMessagingFunction(
- DeltaIteration<Vertex<K, VV>, Vertex<K, VV>> iteration,
- TypeInformation<Tuple2<K, Message>> messageTypeInfo, int whereArg, int equalToArg) {
-
- // build the messaging function (co group)
- CoGroupOperator<?, ?, Tuple2<K, Message>> messages;
- MessagingUdfWithEdgeValues<K, VV, VV, Message, EV> messenger =
- new MessagingUdfWithEVsSimpleVV<K, VV, Message, EV>(messagingFunction, messageTypeInfo);
-
- messages = this.edgesWithValue.coGroup(iteration.getWorkset()).where(whereArg)
- .equalTo(equalToArg).with(messenger);
-
- // configure coGroup message function with name and broadcast variables
- messages = messages.name("Messaging");
- if(this.configuration != null) {
- for (Tuple2<String, DataSet<?>> e : this.configuration.getMessagingBcastVars()) {
- messages = messages.withBroadcastSet(e.f1, e.f0);
- }
- }
-
- return messages;
- }
-
- /**
- * Method that builds the messaging function using a coGroup operator for a vertex
- * containing degree information.
- * It afterwards configures the function with a custom name and broadcast variables.
- *
- * @param iteration
- * @param messageTypeInfo
- * @param whereArg the argument for the where within the coGroup
- * @param equalToArg the argument for the equalTo within the coGroup
- * @return the messaging function
- */
- private CoGroupOperator<?, ?, Tuple2<K, Message>> buildMessagingFunctionVerticesWithDegrees(
- DeltaIteration<Vertex<K, Tuple3<VV, Long, Long>>, Vertex<K, Tuple3<VV, Long, Long>>> iteration,
- TypeInformation<Tuple2<K, Message>> messageTypeInfo, int whereArg, int equalToArg) {
-
- // build the messaging function (co group)
- CoGroupOperator<?, ?, Tuple2<K, Message>> messages;
- MessagingUdfWithEdgeValues<K, Tuple3<VV, Long, Long>, VV, Message, EV> messenger =
- new MessagingUdfWithEVsVVWithDegrees<K, VV, Message, EV>(messagingFunction, messageTypeInfo);
-
- messages = this.edgesWithValue.coGroup(iteration.getWorkset()).where(whereArg)
- .equalTo(equalToArg).with(messenger);
-
- // configure coGroup message function with name and broadcast variables
- messages = messages.name("Messaging");
-
- if (this.configuration != null) {
- for (Tuple2<String, DataSet<?>> e : this.configuration.getMessagingBcastVars()) {
- messages = messages.withBroadcastSet(e.f1, e.f0);
- }
- }
-
- return messages;
- }
-
- /**
- * Helper method which sets up an iteration with the given vertex value(either simple or with degrees)
- *
- * @param iteration
- */
-
- private void setUpIteration(DeltaIteration<?, ?> iteration) {
-
- // set up the iteration operator
- if (this.configuration != null) {
-
- iteration.name(this.configuration.getName("Vertex-centric iteration (" + updateFunction + " | " + messagingFunction + ")"));
- iteration.parallelism(this.configuration.getParallelism());
- iteration.setSolutionSetUnManaged(this.configuration.isSolutionSetUnmanagedMemory());
-
- // register all aggregators
- for (Map.Entry<String, Aggregator<?>> entry : this.configuration.getAggregators().entrySet()) {
- iteration.registerAggregator(entry.getKey(), entry.getValue());
- }
- }
- else {
- // no configuration provided; set default name
- iteration.name("Vertex-centric iteration (" + updateFunction + " | " + messagingFunction + ")");
- }
- }
-
- /**
- * Creates the operator that represents this vertex centric graph computation for a simple vertex.
- *
- * @param messagingDirection
- * @param messageTypeInfo
- * @return the operator
- */
- private DataSet<Vertex<K, VV>> createResultSimpleVertex(EdgeDirection messagingDirection,
- TypeInformation<Tuple2<K, Message>> messageTypeInfo) {
-
- DataSet<Tuple2<K, Message>> messages;
-
- TypeInformation<Vertex<K, VV>> vertexTypes = initialVertices.getType();
-
- final DeltaIteration<Vertex<K, VV>, Vertex<K, VV>> iteration =
- initialVertices.iterateDelta(initialVertices, this.maximumNumberOfIterations, 0);
- setUpIteration(iteration);
-
- switch (messagingDirection) {
- case IN:
- messages = buildMessagingFunction(iteration, messageTypeInfo, 1, 0);
- break;
- case OUT:
- messages = buildMessagingFunction(iteration, messageTypeInfo, 0, 0);
- break;
- case ALL:
- messages = buildMessagingFunction(iteration, messageTypeInfo, 1, 0)
- .union(buildMessagingFunction(iteration, messageTypeInfo, 0, 0)) ;
- break;
- default:
- throw new IllegalArgumentException("Illegal edge direction");
- }
-
- VertexUpdateUdf<K, VV, Message> updateUdf =
- new VertexUpdateUdfSimpleVV<K, VV, Message>(updateFunction, vertexTypes);
-
- // build the update function (co group)
- CoGroupOperator<?, ?, Vertex<K, VV>> updates =
- messages.coGroup(iteration.getSolutionSet()).where(0).equalTo(0).with(updateUdf);
-
- configureUpdateFunction(updates);
-
- return iteration.closeWith(updates, updates);
- }
-
- /**
- * Creates the operator that represents this vertex centric graph computation for a vertex with in
- * and out degrees added to the vertex value.
- *
- * @param graph
- * @param messagingDirection
- * @param messageTypeInfo
- * @return the operator
- */
- @SuppressWarnings("serial")
- private DataSet<Vertex<K, VV>> createResultVerticesWithDegrees(Graph<K, VV, EV> graph, EdgeDirection messagingDirection,
- TypeInformation<Tuple2<K, Message>> messageTypeInfo) {
-
- DataSet<Tuple2<K, Message>> messages;
-
- this.updateFunction.setOptDegrees(this.configuration.isOptDegrees());
-
- DataSet<Tuple2<K, Long>> inDegrees = graph.inDegrees();
- DataSet<Tuple2<K, Long>> outDegrees = graph.outDegrees();
-
- DataSet<Tuple3<K, Long, Long>> degrees = inDegrees.join(outDegrees).where(0).equalTo(0)
- .with(new FlatJoinFunction<Tuple2<K, Long>, Tuple2<K, Long>, Tuple3<K, Long, Long>>() {
-
- @Override
- public void join(Tuple2<K, Long> first, Tuple2<K, Long> second, Collector<Tuple3<K, Long, Long>> out) {
- out.collect(new Tuple3<K, Long, Long>(first.f0, first.f1, second.f1));
- }
- }).withForwardedFieldsFirst("f0;f1").withForwardedFieldsSecond("f1");
-
- DataSet<Vertex<K, Tuple3<VV, Long, Long>>> verticesWithDegrees = initialVertices
- .join(degrees).where(0).equalTo(0)
- .with(new FlatJoinFunction<Vertex<K,VV>, Tuple3<K,Long,Long>, Vertex<K, Tuple3<VV, Long, Long>>>() {
- @Override
- public void join(Vertex<K, VV> vertex, Tuple3<K, Long, Long> degrees,
- Collector<Vertex<K, Tuple3<VV, Long, Long>>> out) throws Exception {
-
- out.collect(new Vertex<K, Tuple3<VV, Long, Long>>(vertex.getId(),
- new Tuple3<VV, Long, Long>(vertex.getValue(), degrees.f1, degrees.f2)));
- }
- }).withForwardedFieldsFirst("f0");
-
- // add type info
- TypeInformation<Vertex<K, Tuple3<VV, Long, Long>>> vertexTypes = verticesWithDegrees.getType();
-
- final DeltaIteration<Vertex<K, Tuple3<VV, Long, Long>>, Vertex<K, Tuple3<VV, Long, Long>>> iteration =
- verticesWithDegrees.iterateDelta(verticesWithDegrees, this.maximumNumberOfIterations, 0);
- setUpIteration(iteration);
-
- switch (messagingDirection) {
- case IN:
- messages = buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 1, 0);
- break;
- case OUT:
- messages = buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 0, 0);
- break;
- case ALL:
- messages = buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 1, 0)
- .union(buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 0, 0)) ;
- break;
- default:
- throw new IllegalArgumentException("Illegal edge direction");
- }
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
- VertexUpdateUdf<K, Tuple3<VV, Long, Long>, Message> updateUdf =
- new VertexUpdateUdfVVWithDegrees(updateFunction, vertexTypes);
-
- // build the update function (co group)
- CoGroupOperator<?, ?, Vertex<K, Tuple3<VV, Long, Long>>> updates =
- messages.coGroup(iteration.getSolutionSet()).where(0).equalTo(0).with(updateUdf);
-
- configureUpdateFunction(updates);
-
- return iteration.closeWith(updates, updates).map(
- new MapFunction<Vertex<K, Tuple3<VV, Long, Long>>, Vertex<K, VV>>() {
-
- public Vertex<K, VV> map(Vertex<K, Tuple3<VV, Long, Long>> vertex) {
- return new Vertex<K, VV>(vertex.getId(), vertex.getValue().f0);
- }
- });
- }
-
- private <VVWithDegree> void configureUpdateFunction(CoGroupOperator<?, ?, Vertex<K, VVWithDegree>> updates) {
-
- // configure coGroup update function with name and broadcast variables
- updates = updates.name("Vertex State Updates");
- if (this.configuration != null) {
- for (Tuple2<String, DataSet<?>> e : this.configuration.getUpdateBcastVars()) {
- updates = updates.withBroadcastSet(e.f1, e.f0);
- }
- }
-
- // let the operator know that we preserve the key field
- updates.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0");
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
deleted file mode 100644
index 248925b..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.spargel;
-
-import java.io.Serializable;
-import java.util.Collection;
-
-import org.apache.flink.api.common.aggregators.Aggregator;
-import org.apache.flink.api.common.functions.IterationRuntimeContext;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.types.Value;
-import org.apache.flink.util.Collector;
-
-/**
- * This class must be extended by functions that compute the state of the vertex depending on the old state and the
- * incoming messages. The central method is {@link #updateVertex(Comparable, Object, MessageIterator)}, which is
- * invoked once per vertex per superstep.
- *
- * <K> The vertex key type.
- * <VV> The vertex value type.
- * <Message> The message type.
- */
-public abstract class VertexUpdateFunction<K, VV, Message> implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- // --------------------------------------------------------------------------------------------
- // Attributes that allow vertices to access their in/out degrees and the total number of vertices
- // inside an iteration.
- // --------------------------------------------------------------------------------------------
-
- private long numberOfVertices = -1L;
-
- /**
- * Retrieves the number of vertices in the graph.
- * @return the number of vertices if the {@link IterationConfiguration#setOptNumVertices(boolean)}
- * option has been set; -1 otherwise.
- */
- public long getNumberOfVertices() {
- return numberOfVertices;
- }
-
- void setNumberOfVertices(long numberOfVertices) {
- this.numberOfVertices = numberOfVertices;
- }
-
- //---------------------------------------------------------------------------------------------
-
- private boolean optDegrees;
-
- boolean isOptDegrees() {
- return optDegrees;
- }
-
- void setOptDegrees(boolean optDegrees) {
- this.optDegrees = optDegrees;
- }
-
- // --------------------------------------------------------------------------------------------
- // Public API Methods
- // --------------------------------------------------------------------------------------------
-
- /**
- * This method is invoked once per vertex per superstep. It receives the current state of the vertex, as well as
- * the incoming messages. It may set a new vertex state via {@link #setNewVertexValue(Object)}. If the vertex
- * state is changed, it will trigger the sending of messages via the {@link MessagingFunction}.
- *
- * @param vertex The vertex.
- * @param inMessages The incoming messages to this vertex.
- *
- * @throws Exception The computation may throw exceptions, which causes the superstep to fail.
- */
- public abstract void updateVertex(Vertex<K, VV> vertex, MessageIterator<Message> inMessages) throws Exception;
-
- /**
- * This method is executed one per superstep before the vertex update function is invoked for each vertex.
- *
- * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail.
- */
- public void preSuperstep() throws Exception {}
-
- /**
- * This method is executed one per superstep after the vertex update function has been invoked for each vertex.
- *
- * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail.
- */
- public void postSuperstep() throws Exception {}
-
- /**
- * Sets the new value of this vertex. Setting a new value triggers the sending of outgoing messages from this vertex.
- *
- * This should be called at most once per updateVertex.
- *
- * @param newValue The new vertex value.
- */
- public void setNewVertexValue(VV newValue) {
- if(setNewVertexValueCalled) {
- throw new IllegalStateException("setNewVertexValue should only be called at most once per updateVertex");
- }
- setNewVertexValueCalled = true;
- if(isOptDegrees()) {
- outValWithDegrees.f1.f0 = newValue;
- outWithDegrees.collect(outValWithDegrees);
- } else {
- outVal.setValue(newValue);
- out.collect(outVal);
- }
- }
-
- /**
- * Gets the number of the superstep, starting at <tt>1</tt>.
- *
- * @return The number of the current superstep.
- */
- public int getSuperstepNumber() {
- return this.runtimeContext.getSuperstepNumber();
- }
-
- /**
- * Gets the iteration aggregator registered under the given name. The iteration aggregator combines
- * all aggregates globally once per superstep and makes them available in the next superstep.
- *
- * @param name The name of the aggregator.
- * @return The aggregator registered under this name, or null, if no aggregator was registered.
- */
- public <T extends Aggregator<?>> T getIterationAggregator(String name) {
- return this.runtimeContext.<T>getIterationAggregator(name);
- }
-
- /**
- * Get the aggregated value that an aggregator computed in the previous iteration.
- *
- * @param name The name of the aggregator.
- * @return The aggregated value of the previous iteration.
- */
- public <T extends Value> T getPreviousIterationAggregate(String name) {
- return this.runtimeContext.<T>getPreviousIterationAggregate(name);
- }
-
- /**
- * Gets the broadcast data set registered under the given name. Broadcast data sets
- * are available on all parallel instances of a function. They can be registered via
- * {@link org.apache.flink.graph.spargel.VertexCentricConfiguration#addBroadcastSetForUpdateFunction(String, org.apache.flink.api.java.DataSet)}.
- *
- * @param name The name under which the broadcast set is registered.
- * @return The broadcast data set.
- */
- public <T> Collection<T> getBroadcastSet(String name) {
- return this.runtimeContext.<T>getBroadcastVariable(name);
- }
-
- // --------------------------------------------------------------------------------------------
- // internal methods
- // --------------------------------------------------------------------------------------------
-
- private IterationRuntimeContext runtimeContext;
-
- private Collector<Vertex<K, VV>> out;
-
- private Collector<Vertex<K, Tuple3<VV, Long, Long>>> outWithDegrees;
-
- private Vertex<K, VV> outVal;
-
- private Vertex<K, Tuple3<VV, Long, Long>> outValWithDegrees;
-
- private long inDegree = -1;
-
- private long outDegree = -1;
-
- private boolean setNewVertexValueCalled;
-
- void init(IterationRuntimeContext context) {
- this.runtimeContext = context;
- }
-
- void setOutput(Vertex<K, VV> outVal, Collector<Vertex<K, VV>> out) {
- this.outVal = outVal;
- this.out = out;
- setNewVertexValueCalled = false;
- }
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
- <ValueWithDegree> void setOutputWithDegrees(Vertex<K, ValueWithDegree> outVal,
- Collector out) {
- this.outValWithDegrees = (Vertex<K, Tuple3<VV, Long, Long>>) outVal;
- this.outWithDegrees = out;
- setNewVertexValueCalled = false;
- }
-
- /**
- * Retrieves the vertex in-degree (number of in-coming edges).
- * @return The in-degree of this vertex if the {@link IterationConfiguration#setOptDegrees(boolean)}
- * option has been set; -1 otherwise.
- */
- public long getInDegree() {
- return inDegree;
- }
-
- void setInDegree(long inDegree) {
- this.inDegree = inDegree;
- }
-
- /**
- * Retrieve the vertex out-degree (number of out-going edges).
- * @return The out-degree of this vertex if the {@link IterationConfiguration#setOptDegrees(boolean)}
- * option has been set; -1 otherwise.
- */
- public long getOutDegree() {
- return outDegree;
- }
-
- void setOutDegree(long outDegree) {
- this.outDegree = outDegree;
- }
-
- /**
- * In order to hide the Tuple3(actualValue, inDegree, OutDegree) vertex value from the user,
- * another function will be called from {@link org.apache.flink.graph.spargel.VertexCentricIteration}.
- *
- * This function will retrieve the vertex from the vertexState and will set its degrees, afterwards calling
- * the regular updateVertex function.
- *
- * @param vertexState
- * @param inMessages
- * @throws Exception
- */
- @SuppressWarnings("unchecked")
- <VertexWithDegree> void updateVertexFromVertexCentricIteration(Vertex<K, VertexWithDegree> vertexState,
- MessageIterator<Message> inMessages) throws Exception {
-
- Vertex<K, VV> vertex = new Vertex<K, VV>(vertexState.f0,
- ((Tuple3<VV, Long, Long>)vertexState.getValue()).f0);
-
- updateVertex(vertex, inMessages);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple3Map.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple3Map.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple3Map.java
deleted file mode 100644
index 0e085b4..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple3Map.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.utils;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.graph.Edge;
-
-@ForwardedFields("f0; f1; f2")
-public class EdgeToTuple3Map<K, EV> implements MapFunction<Edge<K, EV>, Tuple3<K, K, EV>> {
-
- private static final long serialVersionUID = 1L;
-
- public Tuple3<K, K, EV> map(Edge<K, EV> edge) {
- return new Tuple3<K, K, EV>(edge.f0, edge.f1, edge.f2);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/NullValueEdgeMapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/NullValueEdgeMapper.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/NullValueEdgeMapper.java
deleted file mode 100644
index 2bd4719..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/NullValueEdgeMapper.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.utils;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.types.NullValue;
-
-public class NullValueEdgeMapper<K, EV> implements MapFunction<Edge<K, EV>, NullValue> {
-
- private static final long serialVersionUID = 1L;
-
- public NullValue map(Edge<K, EV> edge) {
- return NullValue.getInstance();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToVertexMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToVertexMap.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToVertexMap.java
deleted file mode 100644
index e51362b..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToVertexMap.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.utils;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
-
-@ForwardedFields("f0; f1")
-public class Tuple2ToVertexMap<K, VV> implements MapFunction<Tuple2<K, VV>, Vertex<K, VV>> {
-
- private static final long serialVersionUID = 1L;
-
- public Vertex<K, VV> map(Tuple2<K, VV> vertex) {
- return new Vertex<K, VV>(vertex.f0, vertex.f1);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java
deleted file mode 100644
index 0db9a51..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.utils;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
-
-/**
- * create an Edge DataSetfrom a Tuple3 dataset
- *
- * @param <K>
- * @param <EV>
- */
-@ForwardedFields("f0; f1; f2")
-public class Tuple3ToEdgeMap<K, EV> implements MapFunction<Tuple3<K, K, EV>, Edge<K, EV>> {
-
- private static final long serialVersionUID = 1L;
-
- public Edge<K, EV> map(Tuple3<K, K, EV> tuple) {
- return new Edge<K, EV>(tuple.f0, tuple.f1, tuple.f2);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/VertexToTuple2Map.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/VertexToTuple2Map.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/VertexToTuple2Map.java
deleted file mode 100644
index 04d1f47..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/VertexToTuple2Map.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.utils;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
-
-@ForwardedFields("f0; f1")
-public class VertexToTuple2Map<K, VV> implements MapFunction<Vertex<K, VV>, Tuple2<K, VV>> {
-
- private static final long serialVersionUID = 1L;
-
- public Tuple2<K, VV> map(Vertex<K, VV> vertex) {
- return new Tuple2<K, VV>(vertex.f0, vertex.f1);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
deleted file mode 100644
index 75b672c..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.validation;
-
-import java.io.Serializable;
-
-import org.apache.flink.graph.Graph;
-
-/**
- * A utility for defining validation criteria for different types of Graphs.
- *
- * @param <K> the vertex key type
- * @param <VV> the vertex value type
- * @param <EV> the edge value type
- */
-@SuppressWarnings("serial")
-public abstract class GraphValidator<K, VV, EV> implements Serializable {
-
- public abstract boolean validate(Graph<K, VV, EV> graph) throws Exception;
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
deleted file mode 100644
index 33d469b..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.validation;
-
-import org.apache.flink.api.common.functions.CoGroupFunction;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("serial")
-public class InvalidVertexIdsValidator<K, VV, EV> extends GraphValidator<K, VV, EV> {
-
- /**
- * Checks that the edge set input contains valid vertex Ids, i.e. that they
- * also exist in the vertex input set.
- *
- * @return a boolean stating whether a graph is valid
- * with respect to its vertex ids.
- */
- @Override
- public boolean validate(Graph<K, VV, EV> graph) throws Exception {
- DataSet<Tuple1<K>> edgeIds = graph.getEdges()
- .flatMap(new MapEdgeIds<K, EV>()).distinct();
- DataSet<K> invalidIds = graph.getVertices().coGroup(edgeIds).where(0)
- .equalTo(0).with(new GroupInvalidIds<K, VV>()).first(1);
-
- return invalidIds.map(new KToTupleMap<K>()).count() == 0;
- }
-
- private static final class MapEdgeIds<K, EV> implements FlatMapFunction<Edge<K, EV>, Tuple1<K>> {
- public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) {
- out.collect(new Tuple1<K>(edge.f0));
- out.collect(new Tuple1<K>(edge.f1));
- }
- }
-
- private static final class GroupInvalidIds<K, VV> implements CoGroupFunction<Vertex<K, VV>, Tuple1<K>, K> {
- public void coGroup(Iterable<Vertex<K, VV>> vertexId,
- Iterable<Tuple1<K>> edgeId, Collector<K> out) {
- if (!(vertexId.iterator().hasNext())) {
- // found an id that doesn't exist in the vertex set
- out.collect(edgeId.iterator().next().f0);
- }
- }
- }
-
- private static final class KToTupleMap<K> implements MapFunction<K, Tuple1<K>> {
- public Tuple1<K> map(K key) throws Exception {
- return new Tuple1<K>(key);
- }
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSACompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSACompilerTest.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSACompilerTest.java
deleted file mode 100644
index 2ad203f..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSACompilerTest.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.gsa;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
-import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
-import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.types.NullValue;
-import org.junit.Test;
-
-public class GSACompilerTest extends CompilerTestBase {
-
- private static final long serialVersionUID = 1L;
-
- @Test
- public void testGSACompiler() {
- try {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(DEFAULT_PARALLELISM);
- // compose test program
- {
- DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple3<Long, Long, NullValue>(
- 1L, 2L, NullValue.getInstance())).map(new Tuple3ToEdgeMap<Long, NullValue>());
-
- Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, new InitVertices(), env);
-
- DataSet<Vertex<Long, Long>> result = graph.runGatherSumApplyIteration(
- new GatherNeighborIds(), new SelectMinId(),
- new UpdateComponentId(), 100).getVertices();
-
- result.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
- }
-
- Plan p = env.createProgramPlan("GSA Connected Components");
- OptimizedPlan op = compileNoStats(p);
-
- // check the sink
- SinkPlanNode sink = op.getDataSinks().iterator().next();
- assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
- assertEquals(DEFAULT_PARALLELISM, sink.getParallelism());
- assertEquals(PartitioningProperty.HASH_PARTITIONED, sink.getGlobalProperties().getPartitioning());
-
- // check the iteration
- WorksetIterationPlanNode iteration = (WorksetIterationPlanNode) sink.getInput().getSource();
- assertEquals(DEFAULT_PARALLELISM, iteration.getParallelism());
-
- // check the solution set join and the delta
- PlanNode ssDelta = iteration.getSolutionSetDeltaPlanNode();
- assertTrue(ssDelta instanceof DualInputPlanNode); // this is only true if the update function preserves the partitioning
-
- DualInputPlanNode ssJoin = (DualInputPlanNode) ssDelta;
- assertEquals(DEFAULT_PARALLELISM, ssJoin.getParallelism());
- assertEquals(ShipStrategyType.PARTITION_HASH, ssJoin.getInput1().getShipStrategy());
- assertEquals(new FieldList(0), ssJoin.getInput1().getShipStrategyKeys());
-
- // check the workset set join
- SingleInputPlanNode sumReducer = (SingleInputPlanNode) ssJoin.getInput1().getSource();
- SingleInputPlanNode gatherMapper = (SingleInputPlanNode) sumReducer.getInput().getSource();
- DualInputPlanNode edgeJoin = (DualInputPlanNode) gatherMapper.getInput().getSource();
- assertEquals(DEFAULT_PARALLELISM, edgeJoin.getParallelism());
- // input1 is the workset
- assertEquals(ShipStrategyType.FORWARD, edgeJoin.getInput1().getShipStrategy());
- // input2 is the edges
- assertEquals(ShipStrategyType.PARTITION_HASH, edgeJoin.getInput2().getShipStrategy());
- assertTrue(edgeJoin.getInput2().getTempMode().isCached());
-
- assertEquals(new FieldList(0), edgeJoin.getInput2().getShipStrategyKeys());
- }
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @SuppressWarnings("serial")
- private static final class InitVertices implements MapFunction<Long, Long> {
-
- public Long map(Long vertexId) {
- return vertexId;
- }
- }
-
- @SuppressWarnings("serial")
- private static final class GatherNeighborIds extends GatherFunction<Long, NullValue, Long> {
-
- public Long gather(Neighbor<Long, NullValue> neighbor) {
- return neighbor.getNeighborValue();
- }
- };
-
- @SuppressWarnings("serial")
- private static final class SelectMinId extends SumFunction<Long, NullValue, Long> {
-
- public Long sum(Long newValue, Long currentValue) {
- return Math.min(newValue, currentValue);
- }
- };
-
- @SuppressWarnings("serial")
- private static final class UpdateComponentId extends ApplyFunction<Long, Long, Long> {
-
- public void apply(Long summedValue, Long origValue) {
- if (summedValue < origValue) {
- setResult(summedValue);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSATranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSATranslationTest.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSATranslationTest.java
deleted file mode 100644
index ced7508..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSATranslationTest.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.gsa;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import org.apache.flink.api.common.aggregators.LongSumAggregator;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.api.java.operators.DeltaIteration;
-import org.apache.flink.api.java.operators.DeltaIterationResultSet;
-import org.apache.flink.api.java.operators.SingleInputUdfOperator;
-import org.apache.flink.api.java.operators.TwoInputUdfOperator;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
-import org.apache.flink.types.NullValue;
-import org.junit.Test;
-
-public class GSATranslationTest {
-
- @Test
- public void testTranslation() {
- try {
- final String ITERATION_NAME = "Test Name";
-
- final String AGGREGATOR_NAME = "AggregatorName";
-
- final String BC_SET_GATHER_NAME = "gather messages";
-
- final String BC_SET_SUM_NAME = "sum updates";
-
- final String BC_SET_APLLY_NAME = "apply updates";
-
- final int NUM_ITERATIONS = 13;
-
- final int ITERATION_parallelism = 77;
-
-
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Long> bcGather = env.fromElements(1L);
- DataSet<Long> bcSum = env.fromElements(1L);
- DataSet<Long> bcApply = env.fromElements(1L);
-
- DataSet<Vertex<Long, Long>> result;
-
- // ------------ construct the test program ------------------
- {
-
- DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple3<Long, Long, NullValue>(
- 1L, 2L, NullValue.getInstance())).map(new Tuple3ToEdgeMap<Long, NullValue>());
-
- Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, new InitVertices(), env);
-
- GSAConfiguration parameters = new GSAConfiguration();
-
- parameters.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator());
- parameters.setName(ITERATION_NAME);
- parameters.setParallelism(ITERATION_parallelism);
- parameters.addBroadcastSetForGatherFunction(BC_SET_GATHER_NAME, bcGather);
- parameters.addBroadcastSetForSumFunction(BC_SET_SUM_NAME, bcSum);
- parameters.addBroadcastSetForApplyFunction(BC_SET_APLLY_NAME, bcApply);
-
- result = graph.runGatherSumApplyIteration(
- new GatherNeighborIds(), new SelectMinId(),
- new UpdateComponentId(), NUM_ITERATIONS, parameters).getVertices();
-
- result.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
- }
-
-
- // ------------- validate the java program ----------------
-
- assertTrue(result instanceof DeltaIterationResultSet);
-
- DeltaIterationResultSet<?, ?> resultSet = (DeltaIterationResultSet<?, ?>) result;
- DeltaIteration<?, ?> iteration = (DeltaIteration<?, ?>) resultSet.getIterationHead();
-
- // check the basic iteration properties
- assertEquals(NUM_ITERATIONS, resultSet.getMaxIterations());
- assertArrayEquals(new int[] {0}, resultSet.getKeyPositions());
- assertEquals(ITERATION_parallelism, iteration.getParallelism());
- assertEquals(ITERATION_NAME, iteration.getName());
-
- assertEquals(AGGREGATOR_NAME, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());
-
- // validate that the semantic properties are set as they should
- TwoInputUdfOperator<?, ?, ?, ?> solutionSetJoin = (TwoInputUdfOperator<?, ?, ?, ?>) resultSet.getNextWorkset();
- assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(0, 0).contains(0));
- assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(1, 0).contains(0));
-
- SingleInputUdfOperator<?, ?, ?> sumReduce = (SingleInputUdfOperator<?, ?, ?>) solutionSetJoin.getInput1();
- SingleInputUdfOperator<?, ?, ?> gatherMap = (SingleInputUdfOperator<?, ?, ?>) sumReduce.getInput();
-
- // validate that the broadcast sets are forwarded
- assertEquals(bcGather, gatherMap.getBroadcastSets().get(BC_SET_GATHER_NAME));
- assertEquals(bcSum, sumReduce.getBroadcastSets().get(BC_SET_SUM_NAME));
- assertEquals(bcApply, solutionSetJoin.getBroadcastSets().get(BC_SET_APLLY_NAME));
- }
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @SuppressWarnings("serial")
- private static final class InitVertices implements MapFunction<Long, Long> {
-
- public Long map(Long vertexId) {
- return vertexId;
- }
- }
-
- @SuppressWarnings("serial")
- private static final class GatherNeighborIds extends GatherFunction<Long, NullValue, Long> {
-
- public Long gather(Neighbor<Long, NullValue> neighbor) {
- return neighbor.getNeighborValue();
- }
- };
-
- @SuppressWarnings("serial")
- private static final class SelectMinId extends SumFunction<Long, NullValue, Long> {
-
- public Long sum(Long newValue, Long currentValue) {
- return Math.min(newValue, currentValue);
- }
- };
-
- @SuppressWarnings("serial")
- private static final class UpdateComponentId extends ApplyFunction<Long, Long, Long> {
-
- public void apply(Long summedValue, Long origValue) {
- if (summedValue < origValue) {
- setResult(summedValue);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
deleted file mode 100644
index 7a8143a..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.graph.spargel;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.junit.Test;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.graph.library.ConnectedComponents;
-import org.apache.flink.graph.utils.Tuple2ToVertexMap;
-
-
-public class SpargelCompilerTest extends CompilerTestBase {
-
- private static final long serialVersionUID = 1L;
-
- @SuppressWarnings("serial")
- @Test
- public void testSpargelCompiler() {
- try {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(DEFAULT_PARALLELISM);
- // compose test program
- {
-
- DataSet<Vertex<Long, Long>> initialVertices = env.fromElements(
- new Tuple2<Long, Long>(1L, 1L), new Tuple2<Long, Long>(2L, 2L))
- .map(new Tuple2ToVertexMap<Long, Long>());
-
- DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L))
- .map(new MapFunction<Tuple2<Long,Long>, Edge<Long, NullValue>>() {
-
- public Edge<Long, NullValue> map(Tuple2<Long, Long> edge) {
- return new Edge<Long, NullValue>(edge.f0, edge.f1, NullValue.getInstance());
- }
- });
-
- Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);
-
- DataSet<Vertex<Long, Long>> result = graph.runVertexCentricIteration(
- new ConnectedComponents.CCUpdater<Long>(),
- new ConnectedComponents.CCMessenger<Long>(), 100)
- .getVertices();
-
- result.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
- }
-
- Plan p = env.createProgramPlan("Spargel Connected Components");
- OptimizedPlan op = compileNoStats(p);
-
- // check the sink
- SinkPlanNode sink = op.getDataSinks().iterator().next();
- assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
- assertEquals(DEFAULT_PARALLELISM, sink.getParallelism());
-
- // check the iteration
- WorksetIterationPlanNode iteration = (WorksetIterationPlanNode) sink.getInput().getSource();
- assertEquals(DEFAULT_PARALLELISM, iteration.getParallelism());
-
- // check the solution set join and the delta
- PlanNode ssDelta = iteration.getSolutionSetDeltaPlanNode();
- assertTrue(ssDelta instanceof DualInputPlanNode); // this is only true if the update functions preserves the partitioning
-
- DualInputPlanNode ssJoin = (DualInputPlanNode) ssDelta;
- assertEquals(DEFAULT_PARALLELISM, ssJoin.getParallelism());
- assertEquals(ShipStrategyType.PARTITION_HASH, ssJoin.getInput1().getShipStrategy());
- assertEquals(new FieldList(0), ssJoin.getInput1().getShipStrategyKeys());
-
- // check the workset set join
- DualInputPlanNode edgeJoin = (DualInputPlanNode) ssJoin.getInput1().getSource();
- assertEquals(DEFAULT_PARALLELISM, edgeJoin.getParallelism());
- assertEquals(ShipStrategyType.PARTITION_HASH, edgeJoin.getInput1().getShipStrategy());
- assertEquals(ShipStrategyType.FORWARD, edgeJoin.getInput2().getShipStrategy());
- assertTrue(edgeJoin.getInput1().getTempMode().isCached());
-
- assertEquals(new FieldList(0), edgeJoin.getInput1().getShipStrategyKeys());
-
- // check that the initial partitioning is pushed out of the loop
- assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput1().getShipStrategy());
- assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput2().getShipStrategy());
- assertEquals(new FieldList(0), iteration.getInput1().getShipStrategyKeys());
- assertEquals(new FieldList(0), iteration.getInput2().getShipStrategyKeys());
-
- // check that the initial workset sort is outside the loop
- assertEquals(LocalStrategy.SORT, iteration.getInput2().getLocalStrategy());
- assertEquals(new FieldList(0), iteration.getInput2().getLocalStrategyKeys());
- }
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @SuppressWarnings("serial")
- @Test
- public void testSpargelCompilerWithBroadcastVariable() {
- try {
- final String BC_VAR_NAME = "borat variable";
-
-
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(DEFAULT_PARALLELISM);
- // compose test program
- {
- DataSet<Long> bcVar = env.fromElements(1L);
-
- DataSet<Vertex<Long, Long>> initialVertices = env.fromElements(
- new Tuple2<Long, Long>(1L, 1L), new Tuple2<Long, Long>(2L, 2L))
- .map(new Tuple2ToVertexMap<Long, Long>());
-
- DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L))
- .map(new MapFunction<Tuple2<Long,Long>, Edge<Long, NullValue>>() {
-
- public Edge<Long, NullValue> map(Tuple2<Long, Long> edge) {
- return new Edge<Long, NullValue>(edge.f0, edge.f1, NullValue.getInstance());
- }
- });
-
- Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);
-
- VertexCentricConfiguration parameters = new VertexCentricConfiguration();
- parameters.addBroadcastSetForMessagingFunction(BC_VAR_NAME, bcVar);
- parameters.addBroadcastSetForUpdateFunction(BC_VAR_NAME, bcVar);
-
- DataSet<Vertex<Long, Long>> result = graph.runVertexCentricIteration(
- new ConnectedComponents.CCUpdater<Long>(),
- new ConnectedComponents.CCMessenger<Long>(), 100)
- .getVertices();
-
- result.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
-
- }
-
- Plan p = env.createProgramPlan("Spargel Connected Components");
- OptimizedPlan op = compileNoStats(p);
-
- // check the sink
- SinkPlanNode sink = op.getDataSinks().iterator().next();
- assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
- assertEquals(DEFAULT_PARALLELISM, sink.getParallelism());
-
- // check the iteration
- WorksetIterationPlanNode iteration = (WorksetIterationPlanNode) sink.getInput().getSource();
- assertEquals(DEFAULT_PARALLELISM, iteration.getParallelism());
-
- // check the solution set join and the delta
- PlanNode ssDelta = iteration.getSolutionSetDeltaPlanNode();
- assertTrue(ssDelta instanceof DualInputPlanNode); // this is only true if the update functions preserves the partitioning
-
- DualInputPlanNode ssJoin = (DualInputPlanNode) ssDelta;
- assertEquals(DEFAULT_PARALLELISM, ssJoin.getParallelism());
- assertEquals(ShipStrategyType.PARTITION_HASH, ssJoin.getInput1().getShipStrategy());
- assertEquals(new FieldList(0), ssJoin.getInput1().getShipStrategyKeys());
-
- // check the workset set join
- DualInputPlanNode edgeJoin = (DualInputPlanNode) ssJoin.getInput1().getSource();
- assertEquals(DEFAULT_PARALLELISM, edgeJoin.getParallelism());
- assertEquals(ShipStrategyType.PARTITION_HASH, edgeJoin.getInput1().getShipStrategy());
- assertEquals(ShipStrategyType.FORWARD, edgeJoin.getInput2().getShipStrategy());
- assertTrue(edgeJoin.getInput1().getTempMode().isCached());
-
- assertEquals(new FieldList(0), edgeJoin.getInput1().getShipStrategyKeys());
-
- // check that the initial partitioning is pushed out of the loop
- assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput1().getShipStrategy());
- assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput2().getShipStrategy());
- assertEquals(new FieldList(0), iteration.getInput1().getShipStrategyKeys());
- assertEquals(new FieldList(0), iteration.getInput2().getShipStrategyKeys());
- }
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-}