You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/10/09 18:05:45 UTC
[07/24] flink git commit: [FLINK-2833] [gelly] create a
flink-libraries module and move gelly there
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
deleted file mode 100755
index 23ccb68..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
+++ /dev/null
@@ -1,425 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.gsa;
-
-import org.apache.flink.api.common.aggregators.Aggregator;
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.functions.RichFlatJoinFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.functions.RichReduceFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
-import org.apache.flink.api.java.operators.CustomUnaryOperation;
-import org.apache.flink.api.java.operators.DeltaIteration;
-import org.apache.flink.api.java.operators.JoinOperator;
-import org.apache.flink.api.java.operators.MapOperator;
-import org.apache.flink.api.java.operators.ReduceOperator;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.EdgeDirection;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.util.Collector;
-import java.util.Map;
-
-import com.google.common.base.Preconditions;
-
-/**
- * This class represents iterative graph computations, programmed in a gather-sum-apply perspective.
- *
- * @param <K> The type of the vertex key in the graph
- * @param <VV> The type of the vertex value in the graph
- * @param <EV> The type of the edge value in the graph
- * @param <M> The intermediate type used by the gather, sum and apply functions
- */
-public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperation<Vertex<K, VV>,
- Vertex<K, VV>> {
-
- private DataSet<Vertex<K, VV>> vertexDataSet;
- private DataSet<Edge<K, EV>> edgeDataSet;
-
- private final GatherFunction<VV, EV, M> gather;
- private final SumFunction<VV, EV, M> sum;
- private final ApplyFunction<K, VV, M> apply;
- private final int maximumNumberOfIterations;
- private EdgeDirection direction = EdgeDirection.OUT;
-
- private GSAConfiguration configuration;
-
- // ----------------------------------------------------------------------------------
-
- private GatherSumApplyIteration(GatherFunction<VV, EV, M> gather, SumFunction<VV, EV, M> sum,
- ApplyFunction<K, VV, M> apply, DataSet<Edge<K, EV>> edges, int maximumNumberOfIterations) {
-
- Preconditions.checkNotNull(gather);
- Preconditions.checkNotNull(sum);
- Preconditions.checkNotNull(apply);
- Preconditions.checkNotNull(edges);
- Preconditions.checkArgument(maximumNumberOfIterations > 0, "The maximum number of iterations must be at least one.");
-
- this.gather = gather;
- this.sum = sum;
- this.apply = apply;
- this.edgeDataSet = edges;
- this.maximumNumberOfIterations = maximumNumberOfIterations;
- }
-
- // --------------------------------------------------------------------------------------------
- // Custom Operator behavior
- // --------------------------------------------------------------------------------------------
-
- /**
- * Sets the input data set for this operator. In the case of this operator this input data set represents
- * the set of vertices with their initial state.
- *
- * @param dataSet The input data set, which in the case of this operator represents the set of
- * vertices with their initial state.
- */
- @Override
- public void setInput(DataSet<Vertex<K, VV>> dataSet) {
- this.vertexDataSet = dataSet;
- }
-
- /**
- * Computes the results of the gather-sum-apply iteration
- *
- * @return The resulting DataSet
- */
- @Override
- public DataSet<Vertex<K, VV>> createResult() {
- if (vertexDataSet == null) {
- throw new IllegalStateException("The input data set has not been set.");
- }
-
- // Prepare type information
- TypeInformation<K> keyType = ((TupleTypeInfo<?>) vertexDataSet.getType()).getTypeAt(0);
- TypeInformation<M> messageType = TypeExtractor.createTypeInfo(GatherFunction.class, gather.getClass(), 2, null, null);
- TypeInformation<Tuple2<K, M>> innerType = new TupleTypeInfo<Tuple2<K, M>>(keyType, messageType);
- TypeInformation<Vertex<K, VV>> outputType = vertexDataSet.getType();
-
- // create a graph
- Graph<K, VV, EV> graph =
- Graph.fromDataSet(vertexDataSet, edgeDataSet, vertexDataSet.getExecutionEnvironment());
-
- // check whether the numVertices option is set and, if so, compute the total number of vertices
- // and set it within the gather, sum and apply functions
- if (this.configuration != null && this.configuration.isOptNumVertices()) {
- try {
- long numberOfVertices = graph.numberOfVertices();
- gather.setNumberOfVertices(numberOfVertices);
- sum.setNumberOfVertices(numberOfVertices);
- apply.setNumberOfVertices(numberOfVertices);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- // Prepare UDFs
- GatherUdf<K, VV, EV, M> gatherUdf = new GatherUdf<K, VV, EV, M>(gather, innerType);
- SumUdf<K, VV, EV, M> sumUdf = new SumUdf<K, VV, EV, M>(sum, innerType);
- ApplyUdf<K, VV, EV, M> applyUdf = new ApplyUdf<K, VV, EV, M>(apply, outputType);
-
- final int[] zeroKeyPos = new int[] {0};
- final DeltaIteration<Vertex<K, VV>, Vertex<K, VV>> iteration =
- vertexDataSet.iterateDelta(vertexDataSet, maximumNumberOfIterations, zeroKeyPos);
-
- // set up the iteration operator
- if (this.configuration != null) {
-
- iteration.name(this.configuration.getName(
- "Gather-sum-apply iteration (" + gather + " | " + sum + " | " + apply + ")"));
- iteration.parallelism(this.configuration.getParallelism());
- iteration.setSolutionSetUnManaged(this.configuration.isSolutionSetUnmanagedMemory());
-
- // register all aggregators
- for (Map.Entry<String, Aggregator<?>> entry : this.configuration.getAggregators().entrySet()) {
- iteration.registerAggregator(entry.getKey(), entry.getValue());
- }
- }
- else {
- // no configuration provided; set default name
- iteration.name("Gather-sum-apply iteration (" + gather + " | " + sum + " | " + apply + ")");
- }
-
- // Prepare the neighbors
- if(this.configuration != null) {
- direction = this.configuration.getDirection();
- }
- DataSet<Tuple2<K, Neighbor<VV, EV>>> neighbors;
- switch(direction) {
- case OUT:
- neighbors = iteration
- .getWorkset().join(edgeDataSet)
- .where(0).equalTo(0).with(new ProjectKeyWithNeighborOUT<K, VV, EV>());
- break;
- case IN:
- neighbors = iteration
- .getWorkset().join(edgeDataSet)
- .where(0).equalTo(1).with(new ProjectKeyWithNeighborIN<K, VV, EV>());
- break;
- case ALL:
- neighbors = iteration
- .getWorkset().join(edgeDataSet)
- .where(0).equalTo(0).with(new ProjectKeyWithNeighborOUT<K, VV, EV>()).union(iteration
- .getWorkset().join(edgeDataSet)
- .where(0).equalTo(1).with(new ProjectKeyWithNeighborIN<K, VV, EV>()));
- break;
- default:
- neighbors = iteration
- .getWorkset().join(edgeDataSet)
- .where(0).equalTo(0).with(new ProjectKeyWithNeighborOUT<K, VV, EV>());
- break;
- }
-
- // Gather, sum and apply
- MapOperator<Tuple2<K, Neighbor<VV, EV>>, Tuple2<K, M>> gatherMapOperator = neighbors.map(gatherUdf);
-
- // configure map gather function with name and broadcast variables
- gatherMapOperator = gatherMapOperator.name("Gather");
-
- if (this.configuration != null) {
- for (Tuple2<String, DataSet<?>> e : this.configuration.getGatherBcastVars()) {
- gatherMapOperator = gatherMapOperator.withBroadcastSet(e.f1, e.f0);
- }
- }
- DataSet<Tuple2<K, M>> gatheredSet = gatherMapOperator;
-
- ReduceOperator<Tuple2<K, M>> sumReduceOperator = gatheredSet.groupBy(0).reduce(sumUdf);
-
- // configure reduce sum function with name and broadcast variables
- sumReduceOperator = sumReduceOperator.name("Sum");
-
- if (this.configuration != null) {
- for (Tuple2<String, DataSet<?>> e : this.configuration.getSumBcastVars()) {
- sumReduceOperator = sumReduceOperator.withBroadcastSet(e.f1, e.f0);
- }
- }
- DataSet<Tuple2<K, M>> summedSet = sumReduceOperator;
-
- JoinOperator<?, ?, Vertex<K, VV>> appliedSet = summedSet
- .join(iteration.getSolutionSet())
- .where(0)
- .equalTo(0)
- .with(applyUdf);
-
- // configure join apply function with name and broadcast variables
- appliedSet = appliedSet.name("Apply");
-
- if (this.configuration != null) {
- for (Tuple2<String, DataSet<?>> e : this.configuration.getApplyBcastVars()) {
- appliedSet = appliedSet.withBroadcastSet(e.f1, e.f0);
- }
- }
-
- // let the operator know that we preserve the key field
- appliedSet.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0");
-
- return iteration.closeWith(appliedSet, appliedSet);
- }
-
- /**
- * Creates a new gather-sum-apply iteration operator for graphs
- *
- * @param edges The edge DataSet
- *
- * @param gather The gather function of the GSA iteration
- * @param sum The sum function of the GSA iteration
- * @param apply The apply function of the GSA iteration
- *
- * @param maximumNumberOfIterations The maximum number of iterations executed
- *
- * @param <K> The type of the vertex key in the graph
- * @param <VV> The type of the vertex value in the graph
- * @param <EV> The type of the edge value in the graph
- * @param <M> The intermediate type used by the gather, sum and apply functions
- *
- * @return An in stance of the gather-sum-apply graph computation operator.
- */
- public static final <K, VV, EV, M> GatherSumApplyIteration<K, VV, EV, M>
- withEdges(DataSet<Edge<K, EV>> edges, GatherFunction<VV, EV, M> gather,
- SumFunction<VV, EV, M> sum, ApplyFunction<K, VV, M> apply, int maximumNumberOfIterations) {
-
- return new GatherSumApplyIteration<K, VV, EV, M>(gather, sum, apply, edges, maximumNumberOfIterations);
- }
-
- // --------------------------------------------------------------------------------------------
- // Wrapping UDFs
- // --------------------------------------------------------------------------------------------
-
- @SuppressWarnings("serial")
- @ForwardedFields("f0")
- private static final class GatherUdf<K, VV, EV, M> extends RichMapFunction<Tuple2<K, Neighbor<VV, EV>>,
- Tuple2<K, M>> implements ResultTypeQueryable<Tuple2<K, M>> {
-
- private final GatherFunction<VV, EV, M> gatherFunction;
- private transient TypeInformation<Tuple2<K, M>> resultType;
-
- private GatherUdf(GatherFunction<VV, EV, M> gatherFunction, TypeInformation<Tuple2<K, M>> resultType) {
- this.gatherFunction = gatherFunction;
- this.resultType = resultType;
- }
-
- @Override
- public Tuple2<K, M> map(Tuple2<K, Neighbor<VV, EV>> neighborTuple) {
- M result = this.gatherFunction.gather(neighborTuple.f1);
- return new Tuple2<K, M>(neighborTuple.f0, result);
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
- this.gatherFunction.init(getIterationRuntimeContext());
- }
- this.gatherFunction.preSuperstep();
- }
-
- @Override
- public void close() throws Exception {
- this.gatherFunction.postSuperstep();
- }
-
- @Override
- public TypeInformation<Tuple2<K, M>> getProducedType() {
- return this.resultType;
- }
- }
-
- @SuppressWarnings("serial")
- private static final class SumUdf<K, VV, EV, M> extends RichReduceFunction<Tuple2<K, M>>
- implements ResultTypeQueryable<Tuple2<K, M>>{
-
- private final SumFunction<VV, EV, M> sumFunction;
- private transient TypeInformation<Tuple2<K, M>> resultType;
-
- private SumUdf(SumFunction<VV, EV, M> sumFunction, TypeInformation<Tuple2<K, M>> resultType) {
- this.sumFunction = sumFunction;
- this.resultType = resultType;
- }
-
- @Override
- public Tuple2<K, M> reduce(Tuple2<K, M> arg0, Tuple2<K, M> arg1) throws Exception {
- K key = arg0.f0;
- M result = this.sumFunction.sum(arg0.f1, arg1.f1);
- return new Tuple2<K, M>(key, result);
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
- this.sumFunction.init(getIterationRuntimeContext());
- }
- this.sumFunction.preSuperstep();
- }
-
- @Override
- public void close() throws Exception {
- this.sumFunction.postSuperstep();
- }
-
- @Override
- public TypeInformation<Tuple2<K, M>> getProducedType() {
- return this.resultType;
- }
- }
-
- @SuppressWarnings("serial")
- private static final class ApplyUdf<K, VV, EV, M> extends RichFlatJoinFunction<Tuple2<K, M>,
- Vertex<K, VV>, Vertex<K, VV>> implements ResultTypeQueryable<Vertex<K, VV>> {
-
- private final ApplyFunction<K, VV, M> applyFunction;
- private transient TypeInformation<Vertex<K, VV>> resultType;
-
- private ApplyUdf(ApplyFunction<K, VV, M> applyFunction, TypeInformation<Vertex<K, VV>> resultType) {
- this.applyFunction = applyFunction;
- this.resultType = resultType;
- }
-
- @Override
- public void join(Tuple2<K, M> newValue, final Vertex<K, VV> currentValue, final Collector<Vertex<K, VV>> out) throws Exception {
-
- this.applyFunction.setOutput(currentValue, out);
- this.applyFunction.apply(newValue.f1, currentValue.getValue());
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
- this.applyFunction.init(getIterationRuntimeContext());
- }
- this.applyFunction.preSuperstep();
- }
-
- @Override
- public void close() throws Exception {
- this.applyFunction.postSuperstep();
- }
-
- @Override
- public TypeInformation<Vertex<K, VV>> getProducedType() {
- return this.resultType;
- }
- }
-
- @SuppressWarnings("serial")
- @ForwardedFieldsSecond("f1->f0")
- private static final class ProjectKeyWithNeighborOUT<K, VV, EV> implements FlatJoinFunction<
- Vertex<K, VV>, Edge<K, EV>, Tuple2<K, Neighbor<VV, EV>>> {
-
- public void join(Vertex<K, VV> vertex, Edge<K, EV> edge, Collector<Tuple2<K, Neighbor<VV, EV>>> out) {
- out.collect(new Tuple2<K, Neighbor<VV, EV>>(
- edge.getTarget(), new Neighbor<VV, EV>(vertex.getValue(), edge.getValue())));
- }
- }
-
- @SuppressWarnings("serial")
- @ForwardedFieldsSecond({"f0"})
- private static final class ProjectKeyWithNeighborIN<K, VV, EV> implements FlatJoinFunction<
- Vertex<K, VV>, Edge<K, EV>, Tuple2<K, Neighbor<VV, EV>>> {
-
- public void join(Vertex<K, VV> vertex, Edge<K, EV> edge, Collector<Tuple2<K, Neighbor<VV, EV>>> out) {
- out.collect(new Tuple2<K, Neighbor<VV, EV>>(
- edge.getSource(), new Neighbor<VV, EV>(vertex.getValue(), edge.getValue())));
- }
- }
-
-
-
-
- /**
- * Configures this gather-sum-apply iteration with the provided parameters.
- *
- * @param parameters the configuration parameters
- */
- public void configure(GSAConfiguration parameters) {
- this.configuration = parameters;
- }
-
- /**
- * @return the configuration parameters of this gather-sum-apply iteration
- */
- public GSAConfiguration getIterationConfiguration() {
- return this.configuration;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java
deleted file mode 100755
index 7fa1ed2..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.gsa;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-
-/**
- * This class represents a <sourceVertex, edge> pair
- * This is a wrapper around Tuple2<VV, EV> for convenience in the GatherFunction
- * @param <VV> the vertex value type
- * @param <EV> the edge value type
- */
-@SuppressWarnings("serial")
-public class Neighbor<VV, EV> extends Tuple2<VV, EV> {
-
- public Neighbor() {}
-
- public Neighbor(VV neighborValue, EV edgeValue) {
- super(neighborValue, edgeValue);
- }
-
- public VV getNeighborValue() {
- return this.f0;
- }
-
- public EV getEdgeValue() {
- return this.f1;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
deleted file mode 100755
index f27e275..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.gsa;
-
-import org.apache.flink.api.common.aggregators.Aggregator;
-import org.apache.flink.api.common.functions.IterationRuntimeContext;
-import org.apache.flink.types.Value;
-
-import java.io.Serializable;
-import java.util.Collection;
-
-/**
- * The base class for the second step of a {@link GatherSumApplyIteration}.
- *
- * @param <VV> the vertex value type
- * @param <EV> the edge value type
- * @param <M> the output type
- */
-@SuppressWarnings("serial")
-public abstract class SumFunction<VV, EV, M> implements Serializable {
-
- // --------------------------------------------------------------------------------------------
- // Attribute that allows access to the total number of vertices inside an iteration.
- // --------------------------------------------------------------------------------------------
-
- private long numberOfVertices = -1L;
-
- /**
- * Retrieves the number of vertices in the graph.
- * @return the number of vertices if the {@link IterationConfigurationion#setOptNumVertices(boolean)}
- * option has been set; -1 otherwise.
- */
- public long getNumberOfVertices() {
- return numberOfVertices;
- }
-
- void setNumberOfVertices(long numberOfVertices) {
- this.numberOfVertices = numberOfVertices;
- }
-
- //---------------------------------------------------------------------------------------------
- /**
- * This method is invoked once per superstep, after the {@link GatherFunction}
- * in a {@link GatherSumApplyIteration}.
- * It combines the partial values produced by {@link GatherFunction#gather(Neighbor)}
- * in pairs, until a single value has been computed.
- *
- * @param arg0 the first partial value.
- * @param arg1 the second partial value.
- * @return the combined value.
- */
- public abstract M sum(M arg0, M arg1);
-
- /**
- * This method is executed once per superstep before the vertex update function is invoked for each vertex.
- *
- * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail.
- */
- public void preSuperstep() {}
-
- /**
- * This method is executed once per superstep after the vertex update function has been invoked for each vertex.
- *
- * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail.
- */
- public void postSuperstep() {}
-
- /**
- * Gets the number of the superstep, starting at <tt>1</tt>.
- *
- * @return The number of the current superstep.
- */
- public int getSuperstepNumber() {
- return this.runtimeContext.getSuperstepNumber();
- }
-
- /**
- * Gets the iteration aggregator registered under the given name. The iteration aggregator combines
- * all aggregates globally once per superstep and makes them available in the next superstep.
- *
- * @param name The name of the aggregator.
- * @return The aggregator registered under this name, or null, if no aggregator was registered.
- */
- public <T extends Aggregator<?>> T getIterationAggregator(String name) {
- return this.runtimeContext.<T>getIterationAggregator(name);
- }
-
- /**
- * Get the aggregated value that an aggregator computed in the previous iteration.
- *
- * @param name The name of the aggregator.
- * @return The aggregated value of the previous iteration.
- */
- public <T extends Value> T getPreviousIterationAggregate(String name) {
- return this.runtimeContext.<T>getPreviousIterationAggregate(name);
- }
-
- /**
- * Gets the broadcast data set registered under the given name. Broadcast data sets
- * are available on all parallel instances of a function.
- *
- * @param name The name under which the broadcast set is registered.
- * @return The broadcast data set.
- */
- public <T> Collection<T> getBroadcastSet(String name) {
- return this.runtimeContext.<T>getBroadcastVariable(name);
- }
-
- // --------------------------------------------------------------------------------------------
- // Internal methods
- // --------------------------------------------------------------------------------------------
-
- private IterationRuntimeContext runtimeContext;
-
- public void init(IterationRuntimeContext iterationRuntimeContext) {
- this.runtimeContext = iterationRuntimeContext;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java
deleted file mode 100644
index 0dd39fc..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.library;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.spargel.MessageIterator;
-import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexUpdateFunction;
-
-import java.util.Map;
-import java.util.TreeMap;
-
-/**
- * Community Detection Algorithm.
- *
- * This implementation expects Long Vertex values and labels. The Vertex values of the input Graph provide the initial label assignments.
- *
- * Initially, each vertex is assigned a tuple formed of its own initial value along with a score equal to 1.0.
- * The vertices propagate their labels and max scores in iterations, each time adopting the label with the
- * highest score from the list of received messages. The chosen label is afterwards re-scored using the fraction
- * delta/the superstep number. Delta is passed as a parameter and has 0.5 as a default value.
- *
- * The algorithm converges when vertices no longer update their value or when the maximum number of iterations
- * is reached.
- *
- * @param <K> the Vertex ID type
- *
- * @see <a href="http://arxiv.org/pdf/0808.2633.pdf">article explaining the algorithm in detail</a>
- */
-public class CommunityDetection<K> implements GraphAlgorithm<K, Long, Double, Graph<K, Long, Double>> {
-
- private Integer maxIterations;
-
- private Double delta;
-
- public CommunityDetection(Integer maxIterations, Double delta) {
-
- this.maxIterations = maxIterations;
- this.delta = delta;
- }
-
- @Override
- public Graph<K, Long, Double> run(Graph<K, Long, Double> graph) {
-
- DataSet<Vertex<K, Tuple2<Long, Double>>> initializedVertices = graph.getVertices()
- .map(new AddScoreToVertexValuesMapper<K>());
-
- Graph<K, Tuple2<Long, Double>, Double> graphWithScoredVertices =
- Graph.fromDataSet(initializedVertices, graph.getEdges(), graph.getContext()).getUndirected();
-
- return graphWithScoredVertices.runVertexCentricIteration(new VertexLabelUpdater<K>(delta),
- new LabelMessenger<K>(), maxIterations)
- .mapVertices(new RemoveScoreFromVertexValuesMapper<K>());
- }
-
- @SuppressWarnings("serial")
- public static final class VertexLabelUpdater<K> extends VertexUpdateFunction<
- K, Tuple2<Long, Double>, Tuple2<Long, Double>> {
-
- private Double delta;
-
- public VertexLabelUpdater(Double delta) {
- this.delta = delta;
- }
-
- @Override
- public void updateVertex(Vertex<K, Tuple2<Long, Double>> vertex,
- MessageIterator<Tuple2<Long, Double>> inMessages) throws Exception {
-
- // we would like these two maps to be ordered
- Map<Long, Double> receivedLabelsWithScores = new TreeMap<Long, Double>();
- Map<Long, Double> labelsWithHighestScore = new TreeMap<Long, Double>();
-
- for (Tuple2<Long, Double> message : inMessages) {
- // split the message into received label and score
- Long receivedLabel = message.f0;
- Double receivedScore = message.f1;
-
- // if the label was received before
- if (receivedLabelsWithScores.containsKey(receivedLabel)) {
- Double newScore = receivedScore + receivedLabelsWithScores.get(receivedLabel);
- receivedLabelsWithScores.put(receivedLabel, newScore);
- } else {
- // first time we see the label
- receivedLabelsWithScores.put(receivedLabel, receivedScore);
- }
-
- // store the labels with the highest scores
- if (labelsWithHighestScore.containsKey(receivedLabel)) {
- Double currentScore = labelsWithHighestScore.get(receivedLabel);
- if (currentScore < receivedScore) {
- // record the highest score
- labelsWithHighestScore.put(receivedLabel, receivedScore);
- }
- } else {
- // first time we see this label
- labelsWithHighestScore.put(receivedLabel, receivedScore);
- }
- }
-
- if(receivedLabelsWithScores.size() > 0) {
- // find the label with the highest score from the ones received
- Double maxScore = -Double.MAX_VALUE;
- Long maxScoreLabel = vertex.getValue().f0;
- for (Long curLabel : receivedLabelsWithScores.keySet()) {
-
- if (receivedLabelsWithScores.get(curLabel) > maxScore) {
- maxScore = receivedLabelsWithScores.get(curLabel);
- maxScoreLabel = curLabel;
- }
- }
-
- // find the highest score of maxScoreLabel
- Double highestScore = labelsWithHighestScore.get(maxScoreLabel);
- // re-score the new label
- if (maxScoreLabel != vertex.getValue().f0) {
- highestScore -= delta / getSuperstepNumber();
- }
- // else delta = 0
- // update own label
- setNewVertexValue(new Tuple2<Long, Double>(maxScoreLabel, highestScore));
- }
- }
- }
-
- @SuppressWarnings("serial")
- public static final class LabelMessenger<K> extends MessagingFunction<K, Tuple2<Long, Double>,
- Tuple2<Long, Double>, Double> {
-
- @Override
- public void sendMessages(Vertex<K, Tuple2<Long, Double>> vertex) throws Exception {
-
- for(Edge<K, Double> edge : getEdges()) {
- sendMessageTo(edge.getTarget(), new Tuple2<Long, Double>(vertex.getValue().f0,
- vertex.getValue().f1 * edge.getValue()));
- }
- }
- }
-
- @SuppressWarnings("serial")
- @ForwardedFields("f0")
- public static final class AddScoreToVertexValuesMapper<K> implements MapFunction<
- Vertex<K, Long>, Vertex<K, Tuple2<Long, Double>>> {
-
- public Vertex<K, Tuple2<Long, Double>> map(Vertex<K, Long> vertex) {
- return new Vertex<K, Tuple2<Long, Double>>(
- vertex.getId(), new Tuple2<Long, Double>(vertex.getValue(), 1.0));
- }
- }
-
- @SuppressWarnings("serial")
- public static final class RemoveScoreFromVertexValuesMapper<K> implements MapFunction<
- Vertex<K, Tuple2<Long, Double>>, Long> {
-
- @Override
- public Long map(Vertex<K, Tuple2<Long, Double>> vertex) throws Exception {
- return vertex.getValue().f0;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
deleted file mode 100644
index ed853fe..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.library;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.spargel.MessageIterator;
-import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexUpdateFunction;
-import org.apache.flink.graph.utils.NullValueEdgeMapper;
-import org.apache.flink.types.NullValue;
-
-/**
- * A vertex-centric implementation of the Connected Components algorithm.
- *
- * This implementation assumes that the vertices of the input Graph are initialized with unique, Long component IDs.
- * The vertices propagate their current component ID in iterations, each time adopting a new value from the received neighbor IDs,
- * provided that the value is less than the current minimum.
- *
- * The algorithm converges when vertices no longer update their value or when the maximum number of iterations
- * is reached.
- *
- * The result is a DataSet of vertices, where the vertex value corresponds to the assigned component ID.
- *
- * @see {@link org.apache.flink.graph.library.GSAConnectedComponents}
- */
-@SuppressWarnings("serial")
-public class ConnectedComponents<K, EV> implements GraphAlgorithm<K, Long, EV, DataSet<Vertex<K, Long>>> {
-
- private Integer maxIterations;
-
- public ConnectedComponents(Integer maxIterations) {
- this.maxIterations = maxIterations;
- }
-
- @Override
- public DataSet<Vertex<K, Long>> run(Graph<K, Long, EV> graph) throws Exception {
-
- Graph<K, Long, NullValue> undirectedGraph = graph.mapEdges(new NullValueEdgeMapper<K, EV>())
- .getUndirected();
-
- // initialize vertex values and run the Vertex Centric Iteration
- return undirectedGraph.runVertexCentricIteration(
- new CCUpdater<K>(), new CCMessenger<K>(), maxIterations)
- .getVertices();
- }
-
- /**
- * Updates the value of a vertex by picking the minimum neighbor ID out of all the incoming messages.
- */
- public static final class CCUpdater<K> extends VertexUpdateFunction<K, Long, Long> {
-
- @Override
- public void updateVertex(Vertex<K, Long> vertex, MessageIterator<Long> messages) throws Exception {
- long min = Long.MAX_VALUE;
-
- for (long msg : messages) {
- min = Math.min(min, msg);
- }
-
- // update vertex value, if new minimum
- if (min < vertex.getValue()) {
- setNewVertexValue(min);
- }
- }
- }
-
- /**
- * Distributes the minimum ID associated with a given vertex among all the target vertices.
- */
- public static final class CCMessenger<K> extends MessagingFunction<K, Long, Long, NullValue> {
-
- @Override
- public void sendMessages(Vertex<K, Long> vertex) throws Exception {
- // send current minimum to neighbors
- sendMessageToAllNeighbors(vertex.getValue());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
deleted file mode 100755
index 77bc2cf..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.library;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.gsa.ApplyFunction;
-import org.apache.flink.graph.gsa.GatherFunction;
-import org.apache.flink.graph.gsa.SumFunction;
-import org.apache.flink.graph.gsa.Neighbor;
-import org.apache.flink.graph.utils.NullValueEdgeMapper;
-import org.apache.flink.types.NullValue;
-
-/**
- * This is an implementation of the Connected Components algorithm, using a gather-sum-apply iteration.
- * This implementation assumes that the vertices of the input Graph are initialized with unique, Long component IDs.
- * The result is a DataSet of vertices, where the vertex value corresponds to the assigned component ID.
- *
- * @see {@link org.apache.flink.graph.library.ConnectedComponents}
- */
-public class GSAConnectedComponents<K, EV> implements GraphAlgorithm<K, Long, EV, DataSet<Vertex<K, Long>>> {
-
- private Integer maxIterations;
-
- public GSAConnectedComponents(Integer maxIterations) {
- this.maxIterations = maxIterations;
- }
-
- @Override
- public DataSet<Vertex<K, Long>> run(Graph<K, Long, EV> graph) throws Exception {
-
- Graph<K, Long, NullValue> undirectedGraph = graph.mapEdges(new NullValueEdgeMapper<K, EV>())
- .getUndirected();
-
- // initialize vertex values and run the Vertex Centric Iteration
- return undirectedGraph.runGatherSumApplyIteration(
- new GatherNeighborIds(), new SelectMinId(), new UpdateComponentId<K>(),
- maxIterations).getVertices();
- }
-
- // --------------------------------------------------------------------------------------------
- // Connected Components UDFs
- // --------------------------------------------------------------------------------------------
-
- @SuppressWarnings("serial")
- private static final class GatherNeighborIds extends GatherFunction<Long, NullValue, Long> {
-
- public Long gather(Neighbor<Long, NullValue> neighbor) {
- return neighbor.getNeighborValue();
- }
- };
-
- @SuppressWarnings("serial")
- private static final class SelectMinId extends SumFunction<Long, NullValue, Long> {
-
- public Long sum(Long newValue, Long currentValue) {
- return Math.min(newValue, currentValue);
- }
- };
-
- @SuppressWarnings("serial")
- private static final class UpdateComponentId<K> extends ApplyFunction<K, Long, Long> {
-
- public void apply(Long summedValue, Long origValue) {
- if (summedValue < origValue) {
- setResult(summedValue);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
deleted file mode 100644
index df3e89a..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.library;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.gsa.ApplyFunction;
-import org.apache.flink.graph.gsa.GatherFunction;
-import org.apache.flink.graph.gsa.Neighbor;
-import org.apache.flink.graph.gsa.SumFunction;
-
-/**
- * This is an implementation of a simple PageRank algorithm, using a gather-sum-apply iteration.
- * The user can define the damping factor and the maximum number of iterations.
- * If the number of vertices of the input graph is known, it should be provided as a parameter
- * to speed up computation. Otherwise, the algorithm will first execute a job to count the vertices.
- *
- * The implementation assumes that each page has at least one incoming and one outgoing link.
- */
-public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
-
- private double beta;
- private int maxIterations;
- private long numberOfVertices;
-
- /**
- * @param beta the damping factor
- * @param maxIterations the maximum number of iterations
- */
- public GSAPageRank(double beta, int maxIterations) {
- this.beta = beta;
- this.maxIterations = maxIterations;
- }
-
- public GSAPageRank(double beta, long numVertices, int maxIterations) {
- this.beta = beta;
- this.numberOfVertices = numVertices;
- this.maxIterations = maxIterations;
- }
-
- @Override
- public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) throws Exception {
-
- if (numberOfVertices == 0) {
- numberOfVertices = network.numberOfVertices();
- }
-
- DataSet<Tuple2<K, Long>> vertexOutDegrees = network.outDegrees();
-
- Graph<K, Double, Double> networkWithWeights = network
- .joinWithEdgesOnSource(vertexOutDegrees, new InitWeightsMapper());
-
- return networkWithWeights.runGatherSumApplyIteration(new GatherRanks(numberOfVertices), new SumRanks(),
- new UpdateRanks<K>(beta, numberOfVertices), maxIterations)
- .getVertices();
- }
-
- // --------------------------------------------------------------------------------------------
- // Page Rank UDFs
- // --------------------------------------------------------------------------------------------
-
- @SuppressWarnings("serial")
- private static final class GatherRanks extends GatherFunction<Double, Double, Double> {
-
- long numberOfVertices;
-
- public GatherRanks(long numberOfVertices) {
- this.numberOfVertices = numberOfVertices;
- }
-
- @Override
- public Double gather(Neighbor<Double, Double> neighbor) {
- double neighborRank = neighbor.getNeighborValue();
-
- if(getSuperstepNumber() == 1) {
- neighborRank = 1.0 / numberOfVertices;
- }
-
- return neighborRank * neighbor.getEdgeValue();
- }
- }
-
- @SuppressWarnings("serial")
- private static final class SumRanks extends SumFunction<Double, Double, Double> {
-
- @Override
- public Double sum(Double newValue, Double currentValue) {
- return newValue + currentValue;
- }
- }
-
- @SuppressWarnings("serial")
- private static final class UpdateRanks<K> extends ApplyFunction<K, Double, Double> {
-
- private final double beta;
- private final long numVertices;
-
- public UpdateRanks(double beta, long numberOfVertices) {
- this.beta = beta;
- this.numVertices = numberOfVertices;
- }
-
- @Override
- public void apply(Double rankSum, Double currentValue) {
- setResult((1-beta)/numVertices + beta * rankSum);
- }
- }
-
- @SuppressWarnings("serial")
- private static final class InitWeightsMapper implements MapFunction<Tuple2<Double, Long>, Double> {
- public Double map(Tuple2<Double, Long> value) {
- return value.f0 / value.f1;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
deleted file mode 100755
index 5a76072..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.library;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.gsa.ApplyFunction;
-import org.apache.flink.graph.gsa.GatherFunction;
-import org.apache.flink.graph.gsa.SumFunction;
-import org.apache.flink.graph.gsa.Neighbor;
-
-/**
- * This is an implementation of the Single Source Shortest Paths algorithm, using a gather-sum-apply iteration
- */
-public class GSASingleSourceShortestPaths<K> implements
- GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
-
- private final K srcVertexId;
- private final Integer maxIterations;
-
- public GSASingleSourceShortestPaths(K srcVertexId, Integer maxIterations) {
- this.srcVertexId = srcVertexId;
- this.maxIterations = maxIterations;
- }
-
- @Override
- public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> input) {
-
- return input.mapVertices(new InitVerticesMapper<K>(srcVertexId))
- .runGatherSumApplyIteration(new CalculateDistances(), new ChooseMinDistance(),
- new UpdateDistance<K>(), maxIterations)
- .getVertices();
- }
-
- @SuppressWarnings("serial")
- public static final class InitVerticesMapper<K> implements MapFunction<Vertex<K, Double>, Double> {
-
- private K srcVertexId;
-
- public InitVerticesMapper(K srcId) {
- this.srcVertexId = srcId;
- }
-
- public Double map(Vertex<K, Double> value) {
- if (value.f0.equals(srcVertexId)) {
- return 0.0;
- } else {
- return Double.MAX_VALUE;
- }
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // Single Source Shortest Path UDFs
- // --------------------------------------------------------------------------------------------
-
- @SuppressWarnings("serial")
- private static final class CalculateDistances extends GatherFunction<Double, Double, Double> {
-
- public Double gather(Neighbor<Double, Double> neighbor) {
- return neighbor.getNeighborValue() + neighbor.getEdgeValue();
- }
- };
-
- @SuppressWarnings("serial")
- private static final class ChooseMinDistance extends SumFunction<Double, Double, Double> {
-
- public Double sum(Double newValue, Double currentValue) {
- return Math.min(newValue, currentValue);
- }
- };
-
- @SuppressWarnings("serial")
- private static final class UpdateDistance<K> extends ApplyFunction<K, Double, Double> {
-
- public void apply(Double newDistance, Double oldDistance) {
- if (newDistance < oldDistance) {
- setResult(newDistance);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
deleted file mode 100644
index 76d170d..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.library;
-
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.GraphAlgorithm;
-import org.apache.flink.graph.ReduceNeighborsFunction;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Triplet;
-import org.apache.flink.graph.EdgeDirection;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.types.NullValue;
-
-import java.util.TreeMap;
-
-/**
- * Triangle Count Algorithm.
- *
- * This algorithm operates in three phases. First, vertices select neighbors with id greater than theirs
- * and send messages to them. Each received message is then propagated to neighbors with higher id.
- * Finally, if a node encounters the target id in the list of received messages, it increments the number
- * of triangles found.
- *
- * This implementation is non - iterative.
- *
- * The algorithm takes an undirected, unweighted graph as input and outputs a DataSet
- * which contains a single integer representing the number of triangles.
- */
-public class GSATriangleCount<K extends Comparable<K>, VV, EV> implements
- GraphAlgorithm<K, VV, EV, DataSet<Integer>> {
-
- @SuppressWarnings("serial")
- @Override
- public DataSet<Integer> run(Graph<K, VV, EV> input) throws Exception {
-
- ExecutionEnvironment env = input.getContext();
-
- // order the edges so that src is always higher than trg
- DataSet<Edge<K, NullValue>> edges = input.getEdges().map(new OrderEdges<K, EV>()).distinct();
-
- Graph<K, TreeMap<K, Integer>, NullValue> graph = Graph.fromDataSet(edges,
- new VertexInitializer<K>(), env);
-
- // select neighbors with ids higher than the current vertex id
- // Gather: a no-op in this case
- // Sum: create the set of neighbors
- DataSet<Tuple2<K, TreeMap<K, Integer>>> higherIdNeighbors =
- graph.reduceOnNeighbors(new GatherHigherIdNeighbors<K>(), EdgeDirection.IN);
-
- Graph<K, TreeMap<K, Integer>, NullValue> graphWithReinitializedVertexValues =
- graph.mapVertices(new VertexInitializerEmptyTreeMap<K>());
-
- // Apply: attach the computed values to the vertices
- // joinWithVertices to update the node values
- DataSet<Vertex<K, TreeMap<K, Integer>>> verticesWithHigherIdNeighbors =
- graphWithReinitializedVertexValues.joinWithVertices(higherIdNeighbors, new AttachValues<K>()).getVertices();
-
- Graph<K, TreeMap<K,Integer>, NullValue> graphWithNeighbors = Graph.fromDataSet(verticesWithHigherIdNeighbors,
- edges, env);
-
- // propagate each received value to neighbors with higher id
- // Gather: a no-op in this case
- // Sum: propagate values
- DataSet<Tuple2<K, TreeMap<K, Integer>>> propagatedValues = graphWithNeighbors
- .reduceOnNeighbors(new GatherHigherIdNeighbors<K>(), EdgeDirection.IN);
-
- // Apply: attach propagated values to vertices
- DataSet<Vertex<K, TreeMap<K, Integer>>> verticesWithPropagatedValues =
- graphWithReinitializedVertexValues.joinWithVertices(propagatedValues, new AttachValues<K>()).getVertices();
-
- Graph<K, TreeMap<K, Integer>, NullValue> graphWithPropagatedNeighbors =
- Graph.fromDataSet(verticesWithPropagatedValues, graphWithNeighbors.getEdges(), env);
-
- // Scatter: compute the number of triangles
- DataSet<Integer> numberOfTriangles = graphWithPropagatedNeighbors.getTriplets()
- .map(new ComputeTriangles<K>()).reduce(new ReduceFunction<Integer>() {
-
- @Override
- public Integer reduce(Integer first, Integer second) throws Exception {
- return first + second;
- }
- });
-
- return numberOfTriangles;
- }
-
- @SuppressWarnings("serial")
- private static final class OrderEdges<K extends Comparable<K>, EV> implements
- MapFunction<Edge<K, EV>, Edge<K, NullValue>> {
-
- @Override
- public Edge<K, NullValue> map(Edge<K, EV> edge) throws Exception {
- if (edge.getSource().compareTo(edge.getTarget()) < 0) {
- return new Edge<K, NullValue>(edge.getTarget(), edge.getSource(), NullValue.getInstance());
- } else {
- return new Edge<K, NullValue>(edge.getSource(), edge.getTarget(), NullValue.getInstance());
- }
- }
- }
-
- @SuppressWarnings("serial")
- private static final class VertexInitializer<K> implements MapFunction<K, TreeMap<K, Integer>> {
-
- @Override
- public TreeMap<K, Integer> map(K value) throws Exception {
- TreeMap<K, Integer> neighbors = new TreeMap<K, Integer>();
- neighbors.put(value, 1);
-
- return neighbors;
- }
- }
-
- @SuppressWarnings("serial")
- private static final class VertexInitializerEmptyTreeMap<K> implements
- MapFunction<Vertex<K, TreeMap<K, Integer>>, TreeMap<K, Integer>> {
-
- @Override
- public TreeMap<K, Integer> map(Vertex<K, TreeMap<K, Integer>> vertex) throws Exception {
- return new TreeMap<K, Integer>();
- }
- }
-
- @SuppressWarnings("serial")
- private static final class AttachValues<K> implements MapFunction<Tuple2<TreeMap<K, Integer>,
- TreeMap<K, Integer>>, TreeMap<K, Integer>> {
-
- @Override
- public TreeMap<K, Integer> map(Tuple2<TreeMap<K, Integer>, TreeMap<K, Integer>> tuple2) throws Exception {
- return tuple2.f1;
- }
- }
-
- @SuppressWarnings("serial")
- private static final class GatherHigherIdNeighbors<K> implements
- ReduceNeighborsFunction<TreeMap<K,Integer>> {
-
- @Override
- public TreeMap<K, Integer> reduceNeighbors(TreeMap<K,Integer> first, TreeMap<K,Integer> second) {
- for (K key : second.keySet()) {
- Integer value = first.get(key);
- if (value != null) {
- first.put(key, value + second.get(key));
- } else {
- first.put(key, second.get(key));
- }
- }
- return first;
- }
- }
-
- @SuppressWarnings("serial")
- private static final class ComputeTriangles<K> implements MapFunction<Triplet<K, TreeMap<K, Integer>, NullValue>,
- Integer> {
-
- @Override
- public Integer map(Triplet<K, TreeMap<K, Integer>, NullValue> triplet) throws Exception {
-
- Vertex<K, TreeMap<K, Integer>> srcVertex = triplet.getSrcVertex();
- Vertex<K, TreeMap<K, Integer>> trgVertex = triplet.getTrgVertex();
- int triangles = 0;
-
- if(trgVertex.getValue().get(srcVertex.getId()) != null) {
- triangles = trgVertex.getValue().get(srcVertex.getId());
- }
- return triangles;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
deleted file mode 100644
index 82dfee7..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.library;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.spargel.MessageIterator;
-import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexUpdateFunction;
-import org.apache.flink.graph.utils.NullValueEdgeMapper;
-import org.apache.flink.types.NullValue;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-/**
- * An implementation of the label propagation algorithm. The iterative algorithm
- * detects communities by propagating labels. In each iteration, a vertex adopts
- * the label that is most frequent among its neighbors' labels. Labels are
- * represented by Longs and we assume a total ordering among them, in order to
- * break ties. The algorithm converges when no vertex changes its value or the
- * maximum number of iterations have been reached. Note that different
- * initializations might lead to different results.
- *
- */
-@SuppressWarnings("serial")
-
-public class LabelPropagation<K extends Comparable<K>, EV> implements GraphAlgorithm<K, Long, EV,
- DataSet<Vertex<K, Long>>> {
-
- private final int maxIterations;
-
- public LabelPropagation(int maxIterations) {
- this.maxIterations = maxIterations;
- }
-
- @Override
- public DataSet<Vertex<K, Long>> run(Graph<K, Long, EV> input) {
-
- // iteratively adopt the most frequent label among the neighbors
- // of each vertex
- return input.mapEdges(new NullValueEdgeMapper<K, EV>()).runVertexCentricIteration(new UpdateVertexLabel<K>(), new SendNewLabelToNeighbors<K>(),
- maxIterations).getVertices();
- }
-
- /**
- * Function that updates the value of a vertex by adopting the most frequent
- * label among its in-neighbors
- */
- public static final class UpdateVertexLabel<K> extends VertexUpdateFunction<K, Long, Long> {
-
- public void updateVertex(Vertex<K, Long> vertex,
- MessageIterator<Long> inMessages) {
- Map<Long, Long> labelsWithFrequencies = new HashMap<Long, Long>();
-
- long maxFrequency = 1;
- long mostFrequentLabel = vertex.getValue();
-
- // store the labels with their frequencies
- for (Long msg : inMessages) {
- if (labelsWithFrequencies.containsKey(msg)) {
- long currentFreq = labelsWithFrequencies.get(msg);
- labelsWithFrequencies.put(msg, currentFreq + 1);
- } else {
- labelsWithFrequencies.put(msg, 1L);
- }
- }
- // select the most frequent label: if two or more labels have the
- // same frequency,
- // the node adopts the label with the highest value
- for (Entry<Long, Long> entry : labelsWithFrequencies.entrySet()) {
- if (entry.getValue() == maxFrequency) {
- // check the label value to break ties
- if (entry.getKey() > mostFrequentLabel) {
- mostFrequentLabel = entry.getKey();
- }
- } else if (entry.getValue() > maxFrequency) {
- maxFrequency = entry.getValue();
- mostFrequentLabel = entry.getKey();
- }
- }
-
- // set the new vertex value
- setNewVertexValue(mostFrequentLabel);
- }
- }
-
- /**
- * Sends the vertex label to all out-neighbors
- */
- public static final class SendNewLabelToNeighbors<K> extends MessagingFunction<K, Long, Long, NullValue> {
-
- public void sendMessages(Vertex<K, Long> vertex) {
- sendMessageToAllNeighbors(vertex.getValue());
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
deleted file mode 100644
index 8193dba..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.library;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.spargel.MessageIterator;
-import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexUpdateFunction;
-
-/**
- * This is an implementation of a simple PageRank algorithm, using a vertex-centric iteration.
- * The user can define the damping factor and the maximum number of iterations.
- * If the number of vertices of the input graph is known, it should be provided as a parameter
- * to speed up computation. Otherwise, the algorithm will first execute a job to count the vertices.
- *
- * The implementation assumes that each page has at least one incoming and one outgoing link.
- */
-public class PageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
-
- private double beta;
- private int maxIterations;
- private long numberOfVertices;
-
- /**
- * @param beta the damping factor
- * @param maxIterations the maximum number of iterations
- */
- public PageRank(double beta, int maxIterations) {
- this.beta = beta;
- this.maxIterations = maxIterations;
- this.numberOfVertices = 0;
- }
-
- /**
- * @param beta the damping factor
- * @param maxIterations the maximum number of iterations
- * @param numVertices the number of vertices in the input
- */
- public PageRank(double beta, long numVertices, int maxIterations) {
- this.beta = beta;
- this.maxIterations = maxIterations;
- this.numberOfVertices = numVertices;
- }
-
- @Override
- public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) throws Exception {
-
- if (numberOfVertices == 0) {
- numberOfVertices = network.numberOfVertices();
- }
-
- DataSet<Tuple2<K, Long>> vertexOutDegrees = network.outDegrees();
-
- Graph<K, Double, Double> networkWithWeights = network
- .joinWithEdgesOnSource(vertexOutDegrees, new InitWeightsMapper());
-
- return networkWithWeights.runVertexCentricIteration(new VertexRankUpdater<K>(beta, numberOfVertices),
- new RankMessenger<K>(numberOfVertices), maxIterations)
- .getVertices();
- }
-
- /**
- * Function that updates the rank of a vertex by summing up the partial
- * ranks from all incoming messages and then applying the dampening formula.
- */
- @SuppressWarnings("serial")
- public static final class VertexRankUpdater<K> extends VertexUpdateFunction<K, Double, Double> {
-
- private final double beta;
- private final long numVertices;
-
- public VertexRankUpdater(double beta, long numberOfVertices) {
- this.beta = beta;
- this.numVertices = numberOfVertices;
- }
-
- @Override
- public void updateVertex(Vertex<K, Double> vertex, MessageIterator<Double> inMessages) {
- double rankSum = 0.0;
- for (double msg : inMessages) {
- rankSum += msg;
- }
-
- // apply the dampening factor / random jump
- double newRank = (beta * rankSum) + (1 - beta) / numVertices;
- setNewVertexValue(newRank);
- }
- }
-
- /**
- * Distributes the rank of a vertex among all target vertices according to
- * the transition probability, which is associated with an edge as the edge
- * value.
- */
- @SuppressWarnings("serial")
- public static final class RankMessenger<K> extends MessagingFunction<K, Double, Double, Double> {
-
- private final long numVertices;
-
- public RankMessenger(long numberOfVertices) {
- this.numVertices = numberOfVertices;
- }
-
- @Override
- public void sendMessages(Vertex<K, Double> vertex) {
- if (getSuperstepNumber() == 1) {
- // initialize vertex ranks
- vertex.setValue(new Double(1.0 / numVertices));
- }
-
- for (Edge<K, Double> edge : getEdges()) {
- sendMessageTo(edge.getTarget(), vertex.getValue() * edge.getValue());
- }
- }
- }
-
- @SuppressWarnings("serial")
- private static final class InitWeightsMapper implements MapFunction<Tuple2<Double, Long>, Double> {
- public Double map(Tuple2<Double, Long> value) {
- return value.f0 / value.f1;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
deleted file mode 100644
index 60c4c17..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.library;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.spargel.MessageIterator;
-import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexUpdateFunction;
-
-/**
- * This is an implementation of the Single-Source-Shortest Paths algorithm, using a vertex-centric iteration.
- */
-@SuppressWarnings("serial")
-public class SingleSourceShortestPaths<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
-
- private final K srcVertexId;
- private final Integer maxIterations;
-
- public SingleSourceShortestPaths(K srcVertexId, Integer maxIterations) {
- this.srcVertexId = srcVertexId;
- this.maxIterations = maxIterations;
- }
-
- @Override
- public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> input) {
-
- return input.mapVertices(new InitVerticesMapper<K>(srcVertexId))
- .runVertexCentricIteration(new VertexDistanceUpdater<K>(), new MinDistanceMessenger<K>(),
- maxIterations).getVertices();
- }
-
- public static final class InitVerticesMapper<K> implements MapFunction<Vertex<K, Double>, Double> {
-
- private K srcVertexId;
-
- public InitVerticesMapper(K srcId) {
- this.srcVertexId = srcId;
- }
-
- public Double map(Vertex<K, Double> value) {
- if (value.f0.equals(srcVertexId)) {
- return 0.0;
- } else {
- return Double.MAX_VALUE;
- }
- }
- }
-
- /**
- * Function that updates the value of a vertex by picking the minimum
- * distance from all incoming messages.
- *
- * @param <K>
- */
- public static final class VertexDistanceUpdater<K> extends VertexUpdateFunction<K, Double, Double> {
-
- @Override
- public void updateVertex(Vertex<K, Double> vertex,
- MessageIterator<Double> inMessages) {
-
- Double minDistance = Double.MAX_VALUE;
-
- for (double msg : inMessages) {
- if (msg < minDistance) {
- minDistance = msg;
- }
- }
-
- if (vertex.getValue() > minDistance) {
- setNewVertexValue(minDistance);
- }
- }
- }
-
- /**
- * Distributes the minimum distance associated with a given vertex among all
- * the target vertices summed up with the edge's value.
- *
- * @param <K>
- */
- public static final class MinDistanceMessenger<K> extends MessagingFunction<K, Double, Double, Double> {
-
- @Override
- public void sendMessages(Vertex<K, Double> vertex)
- throws Exception {
- for (Edge<K, Double> edge : getEdges()) {
- sendMessageTo(edge.getTarget(), vertex.getValue() + edge.getValue());
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessageIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessageIterator.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessageIterator.java
deleted file mode 100644
index d6fdc8a..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessageIterator.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.spargel;
-
-import java.util.Iterator;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-
-/**
- * An iterator that returns messages. The iterator is {@link java.lang.Iterable} at the same time to support
- * the <i>foreach</i> syntax.
- */
-public final class MessageIterator<Message> implements Iterator<Message>, Iterable<Message>, java.io.Serializable {
- private static final long serialVersionUID = 1L;
-
- private transient Iterator<Tuple2<?, Message>> source;
-
-
- final void setSource(Iterator<Tuple2<?, Message>> source) {
- this.source = source;
- }
-
- @Override
- public final boolean hasNext() {
- return this.source.hasNext();
- }
-
- @Override
- public final Message next() {
- return this.source.next().f1;
- }
-
- @Override
- public final void remove() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Iterator<Message> iterator() {
- return this;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
deleted file mode 100644
index 4245c24..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
+++ /dev/null
@@ -1,303 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.spargel;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.aggregators.Aggregator;
-import org.apache.flink.api.common.functions.IterationRuntimeContext;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.EdgeDirection;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.types.Value;
-import org.apache.flink.util.Collector;
-
-/**
- * The base class for functions that produce messages between vertices as a part of a {@link VertexCentricIteration}.
- *
- * @param <K> The type of the vertex key (the vertex identifier).
- * @param <VV> The type of the vertex value (the state of the vertex).
- * @param <Message> The type of the message sent between vertices along the edges.
- * @param <EV> The type of the values that are associated with the edges.
- */
-public abstract class MessagingFunction<K, VV, Message, EV> implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- // --------------------------------------------------------------------------------------------
- // Attributes that allow vertices to access their in/out degrees and the total number of vertices
- // inside an iteration.
- // --------------------------------------------------------------------------------------------
-
- private long numberOfVertices = -1L;
-
- /**
- * Retrieves the number of vertices in the graph.
- * @return the number of vertices if the {@link IterationConfiguration#setOptNumVertices(boolean)}
- * option has been set; -1 otherwise.
- */
- public long getNumberOfVertices() {
- return numberOfVertices;
- }
-
- void setNumberOfVertices(long numberOfVertices) {
- this.numberOfVertices = numberOfVertices;
- }
-
- // --------------------------------------------------------------------------------------------
- // Attribute that allows the user to choose the neighborhood type(in/out/all) on which to run
- // the vertex centric iteration.
- // --------------------------------------------------------------------------------------------
-
- private EdgeDirection direction;
-
- /**
- * Retrieves the edge direction in which messages are propagated in the vertex-centric iteration.
- * @return the messaging {@link EdgeDirection}
- */
- public EdgeDirection getDirection() {
- return direction;
- }
-
- void setDirection(EdgeDirection direction) {
- this.direction = direction;
- }
-
- // --------------------------------------------------------------------------------------------
- // Public API Methods
- // --------------------------------------------------------------------------------------------
-
- /**
- * This method is invoked once per superstep for each vertex that was changed in that superstep.
- * It needs to produce the messages that will be received by vertices in the next superstep.
- *
- * @param vertex The vertex that was changed.
- *
- * @throws Exception The computation may throw exceptions, which causes the superstep to fail.
- */
- public abstract void sendMessages(Vertex<K, VV> vertex) throws Exception;
-
- /**
- * This method is executed one per superstep before the vertex update function is invoked for each vertex.
- *
- * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail.
- */
- public void preSuperstep() throws Exception {}
-
- /**
- * This method is executed one per superstep after the vertex update function has been invoked for each vertex.
- *
- * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail.
- */
- public void postSuperstep() throws Exception {}
-
-
- /**
- * Gets an {@link java.lang.Iterable} with all edges. This method is mutually exclusive with
- * {@link #sendMessageToAllNeighbors(Object)} and may be called only once.
- *
- * @return An iterator with all outgoing edges.
- */
- @SuppressWarnings("unchecked")
- public Iterable<Edge<K, EV>> getEdges() {
- if (edgesUsed) {
- throw new IllegalStateException("Can use either 'getEdges()' or 'sendMessageToAllTargets()' exactly once.");
- }
- edgesUsed = true;
- this.edgeIterator.set((Iterator<Edge<K, EV>>) edges);
- return this.edgeIterator;
- }
-
- /**
- * Sends the given message to all vertices that are targets of an outgoing edge of the changed vertex.
- * This method is mutually exclusive to the method {@link #getEdges()} and may be called only once.
- *
- * @param m The message to send.
- */
- public void sendMessageToAllNeighbors(Message m) {
- if (edgesUsed) {
- throw new IllegalStateException("Can use either 'getEdges()' or 'sendMessageToAllTargets()' exactly once.");
- }
-
- edgesUsed = true;
-
- outValue.f1 = m;
-
- while (edges.hasNext()) {
- Tuple next = (Tuple) edges.next();
- K k = next.getField(1);
- outValue.f0 = k;
- out.collect(outValue);
- }
- }
-
- /**
- * Sends the given message to the vertex identified by the given key. If the target vertex does not exist,
- * the next superstep will cause an exception due to a non-deliverable message.
- *
- * @param target The key (id) of the target vertex to message.
- * @param m The message.
- */
- public void sendMessageTo(K target, Message m) {
- outValue.f0 = target;
- outValue.f1 = m;
- out.collect(outValue);
- }
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Gets the number of the superstep, starting at <tt>1</tt>.
- *
- * @return The number of the current superstep.
- */
- public int getSuperstepNumber() {
- return this.runtimeContext.getSuperstepNumber();
- }
-
- /**
- * Gets the iteration aggregator registered under the given name. The iteration aggregator combines
- * all aggregates globally once per superstep and makes them available in the next superstep.
- *
- * @param name The name of the aggregator.
- * @return The aggregator registered under this name, or null, if no aggregator was registered.
- */
- public <T extends Aggregator<?>> T getIterationAggregator(String name) {
- return this.runtimeContext.<T>getIterationAggregator(name);
- }
-
- /**
- * Get the aggregated value that an aggregator computed in the previous iteration.
- *
- * @param name The name of the aggregator.
- * @return The aggregated value of the previous iteration.
- */
- public <T extends Value> T getPreviousIterationAggregate(String name) {
- return this.runtimeContext.<T>getPreviousIterationAggregate(name);
- }
-
- /**
- * Gets the broadcast data set registered under the given name. Broadcast data sets
- * are available on all parallel instances of a function. They can be registered via
- * {@link org.apache.flink.graph.spargel.VertexCentricConfiguration#addBroadcastSetForMessagingFunction(String, org.apache.flink.api.java.DataSet)}.
- *
- * @param name The name under which the broadcast set is registered.
- * @return The broadcast data set.
- */
- public <T> Collection<T> getBroadcastSet(String name) {
- return this.runtimeContext.<T>getBroadcastVariable(name);
- }
-
- // --------------------------------------------------------------------------------------------
- // internal methods and state
- // --------------------------------------------------------------------------------------------
-
- private Tuple2<K, Message> outValue;
-
- private IterationRuntimeContext runtimeContext;
-
- private Iterator<?> edges;
-
- private Collector<Tuple2<K, Message>> out;
-
- private EdgesIterator<K, EV> edgeIterator;
-
- private boolean edgesUsed;
-
- private long inDegree = -1;
-
- private long outDegree = -1;
-
- void init(IterationRuntimeContext context) {
- this.runtimeContext = context;
- this.outValue = new Tuple2<K, Message>();
- this.edgeIterator = new EdgesIterator<K, EV>();
- }
-
- void set(Iterator<?> edges, Collector<Tuple2<K, Message>> out) {
- this.edges = edges;
- this.out = out;
- this.edgesUsed = false;
- }
-
- private static final class EdgesIterator<K, EV>
- implements Iterator<Edge<K, EV>>, Iterable<Edge<K, EV>>
- {
- private Iterator<Edge<K, EV>> input;
-
- private Edge<K, EV> edge = new Edge<K, EV>();
-
- void set(Iterator<Edge<K, EV>> input) {
- this.input = input;
- }
-
- @Override
- public boolean hasNext() {
- return input.hasNext();
- }
-
- @Override
- public Edge<K, EV> next() {
- Edge<K, EV> next = input.next();
- edge.setSource(next.f0);
- edge.setTarget(next.f1);
- edge.setValue(next.f2);
- return edge;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- @Override
- public Iterator<Edge<K, EV>> iterator() {
- return this;
- }
- }
-
- /**
- * Retrieves the vertex in-degree (number of in-coming edges).
- * @return The in-degree of this vertex if the {@link IterationConfiguration#setOptDegrees(boolean)}
- * option has been set; -1 otherwise.
- */
- public long getInDegree() {
- return inDegree;
- }
-
- void setInDegree(long inDegree) {
- this.inDegree = inDegree;
- }
-
- /**
- * Retrieve the vertex out-degree (number of out-going edges).
- * @return The out-degree of this vertex if the {@link IterationConfiguration#setOptDegrees(boolean)}
- * option has been set; -1 otherwise.
- */
- public long getOutDegree() {
- return outDegree;
- }
-
- void setOutDegree(long outDegree) {
- this.outDegree = outDegree;
- }
-}