You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/04/26 14:01:12 UTC
[9/9] flink git commit: [FLINK-1514] [gelly] Removed edge value type
from ApplyFunction; Reuse the output vertex
[FLINK-1514] [gelly] Removed edge value type from ApplyFunction; Reuse the output vertex
This closes #408
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6e24879b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6e24879b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6e24879b
Branch: refs/heads/master
Commit: 6e24879b96bcd5eee9f1b4af504eced11a901123
Parents: 40f5f3a
Author: vasia <va...@apache.org>
Authored: Fri Apr 24 00:16:37 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Sun Apr 26 13:26:02 2015 +0200
----------------------------------------------------------------------
flink-staging/flink-gelly/pom.xml | 0
.../main/java/org/apache/flink/graph/Graph.java | 4 +--
.../example/GSAConnectedComponentsExample.java | 2 +-
.../GSASingleSourceShortestPathsExample.java | 2 +-
.../apache/flink/graph/gsa/ApplyFunction.java | 16 ++++++----
.../graph/gsa/GatherSumApplyIteration.java | 31 ++++++--------------
6 files changed, 24 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6e24879b/flink-staging/flink-gelly/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/pom.xml b/flink-staging/flink-gelly/pom.xml
old mode 100755
new mode 100644
http://git-wip-us.apache.org/repos/asf/flink/blob/6e24879b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 330c951..f843827 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -214,7 +214,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
public Vertex<K, VV> map(Tuple1<K> value) throws Exception {
return new Vertex<K, VV>(value.f0, mapper.map(value.f0));
}
- }).returns(returnType);
+ }).returns(returnType).withForwardedFields("f0");
return new Graph<K, VV, EV>(vertices, edges, context);
}
@@ -1213,7 +1213,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
*/
public <M> Graph<K, VV, EV> runGatherSumApplyIteration(
GatherFunction<VV, EV, M> gatherFunction, SumFunction<VV, EV, M> sumFunction,
- ApplyFunction<VV, EV, M> applyFunction, int maximumNumberOfIterations) {
+ ApplyFunction<K, VV, M> applyFunction, int maximumNumberOfIterations) {
GatherSumApplyIteration<K, VV, EV, M> iteration = GatherSumApplyIteration.withEdges(
edges, gatherFunction, sumFunction, applyFunction, maximumNumberOfIterations);
http://git-wip-us.apache.org/repos/asf/flink/blob/6e24879b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
index dba28cd..7c39123 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
@@ -102,7 +102,7 @@ public class GSAConnectedComponentsExample implements ProgramDescription {
};
@SuppressWarnings("serial")
- private static final class UpdateComponentId extends ApplyFunction<Long, NullValue, Long> {
+ private static final class UpdateComponentId extends ApplyFunction<Long, Long, Long> {
public void apply(Long summedValue, Long origValue) {
if (summedValue < origValue) {
http://git-wip-us.apache.org/repos/asf/flink/blob/6e24879b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
index 166a3d3..75cbd78 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
@@ -111,7 +111,7 @@ public class GSASingleSourceShortestPathsExample implements ProgramDescription {
};
@SuppressWarnings("serial")
- private static final class UpdateDistance extends ApplyFunction<Double, Double, Double> {
+ private static final class UpdateDistance extends ApplyFunction<Long, Double, Double> {
public void apply(Double newDistance, Double oldDistance) {
if (newDistance < oldDistance) {
http://git-wip-us.apache.org/repos/asf/flink/blob/6e24879b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
index ab64c92..75f64f9 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
@@ -19,14 +19,16 @@
package org.apache.flink.graph.gsa;
import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.graph.Vertex;
import org.apache.flink.util.Collector;
import java.io.Serializable;
@SuppressWarnings("serial")
-public abstract class ApplyFunction<VV extends Serializable, EV extends Serializable, M> implements Serializable {
+public abstract class ApplyFunction<K extends Comparable<K> & Serializable, VV extends Serializable, M>
+ implements Serializable {
- public abstract void apply(M message, VV vertexValue);
+ public abstract void apply(M newValue, VV currentValue);
/**
* Sets the result for the apply function
@@ -34,7 +36,8 @@ public abstract class ApplyFunction<VV extends Serializable, EV extends Serializ
* @param result the result of the apply phase
*/
public void setResult(VV result) {
- out.collect(result);
+ outVal.f1 = result;
+ out.collect(outVal);
}
/**
@@ -58,14 +61,17 @@ public abstract class ApplyFunction<VV extends Serializable, EV extends Serializ
@SuppressWarnings("unused")
private IterationRuntimeContext runtimeContext;
- private Collector<VV> out;
+ private Collector<Vertex<K, VV>> out;
+
+ private Vertex<K, VV> outVal;
public void init(IterationRuntimeContext iterationRuntimeContext) {
this.runtimeContext = iterationRuntimeContext;
};
- public void setOutput(Collector<VV> out) {
+ public void setOutput(Vertex<K, VV> vertex, Collector<Vertex<K, VV>> out) {
this.out = out;
+ this.outVal = vertex;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e24879b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
index 7fcd427..22be591 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
@@ -58,13 +58,13 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
private final GatherFunction<VV, EV, M> gather;
private final SumFunction<VV, EV, M> sum;
- private final ApplyFunction<VV, EV, M> apply;
+ private final ApplyFunction<K, VV, M> apply;
private final int maximumNumberOfIterations;
// ----------------------------------------------------------------------------------
private GatherSumApplyIteration(GatherFunction<VV, EV, M> gather, SumFunction<VV, EV, M> sum,
- ApplyFunction<VV, EV, M> apply, DataSet<Edge<K, EV>> edges, int maximumNumberOfIterations) {
+ ApplyFunction<K, VV, M> apply, DataSet<Edge<K, EV>> edges, int maximumNumberOfIterations) {
Validate.notNull(gather);
Validate.notNull(sum);
@@ -161,7 +161,7 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
*/
public static final <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable, M>
GatherSumApplyIteration<K, VV, EV, M> withEdges(DataSet<Edge<K, EV>> edges,
- GatherFunction<VV, EV, M> gather, SumFunction<VV, EV, M> sum, ApplyFunction<VV, EV, M> apply,
+ GatherFunction<VV, EV, M> gather, SumFunction<VV, EV, M> sum, ApplyFunction<K, VV, M> apply,
int maximumNumberOfIterations) {
return new GatherSumApplyIteration<K, VV, EV, M>(gather, sum, apply, edges, maximumNumberOfIterations);
}
@@ -253,32 +253,19 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
VV extends Serializable, EV extends Serializable, M> extends RichFlatJoinFunction<Tuple2<K, M>,
Vertex<K, VV>, Vertex<K, VV>> implements ResultTypeQueryable<Vertex<K, VV>> {
- private final ApplyFunction<VV, EV, M> applyFunction;
+ private final ApplyFunction<K, VV, M> applyFunction;
private transient TypeInformation<Vertex<K, VV>> resultType;
- private ApplyUdf(ApplyFunction<VV, EV, M> applyFunction, TypeInformation<Vertex<K, VV>> resultType) {
+ private ApplyUdf(ApplyFunction<K, VV, M> applyFunction, TypeInformation<Vertex<K, VV>> resultType) {
this.applyFunction = applyFunction;
this.resultType = resultType;
}
@Override
- public void join(Tuple2<K, M> arg0, Vertex<K, VV> arg1, final Collector<Vertex<K, VV>> out) throws Exception {
-
- final K key = arg1.getId();
- Collector<VV> userOut = new Collector<VV>() {
- @Override
- public void collect(VV record) {
- out.collect(new Vertex<K, VV>(key, record));
- }
-
- @Override
- public void close() {
- out.close();
- }
- };
-
- this.applyFunction.setOutput(userOut);
- this.applyFunction.apply(arg0.f1, arg1.getValue());
+ public void join(Tuple2<K, M> newValue, final Vertex<K, VV> currentValue, final Collector<Vertex<K, VV>> out) throws Exception {
+
+ this.applyFunction.setOutput(currentValue, out);
+ this.applyFunction.apply(newValue.f1, currentValue.getValue());
}
@Override