You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/04/26 14:01:12 UTC

[9/9] flink git commit: [FLINK-1514] [gelly] Removed edge value type from ApplyFunction; Reuse the output vertex

[FLINK-1514] [gelly] Removed edge value type from ApplyFunction; Reuse the output vertex

This closes #408


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6e24879b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6e24879b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6e24879b

Branch: refs/heads/master
Commit: 6e24879b96bcd5eee9f1b4af504eced11a901123
Parents: 40f5f3a
Author: vasia <va...@apache.org>
Authored: Fri Apr 24 00:16:37 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Sun Apr 26 13:26:02 2015 +0200

----------------------------------------------------------------------
 flink-staging/flink-gelly/pom.xml               |  0
 .../main/java/org/apache/flink/graph/Graph.java |  4 +--
 .../example/GSAConnectedComponentsExample.java  |  2 +-
 .../GSASingleSourceShortestPathsExample.java    |  2 +-
 .../apache/flink/graph/gsa/ApplyFunction.java   | 16 ++++++----
 .../graph/gsa/GatherSumApplyIteration.java      | 31 ++++++--------------
 6 files changed, 24 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6e24879b/flink-staging/flink-gelly/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/pom.xml b/flink-staging/flink-gelly/pom.xml
old mode 100755
new mode 100644

http://git-wip-us.apache.org/repos/asf/flink/blob/6e24879b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 330c951..f843827 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -214,7 +214,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 					public Vertex<K, VV> map(Tuple1<K> value) throws Exception {
 						return new Vertex<K, VV>(value.f0, mapper.map(value.f0));
 					}
-				}).returns(returnType);
+				}).returns(returnType).withForwardedFields("f0");
 
 		return new Graph<K, VV, EV>(vertices, edges, context);
 	}
@@ -1213,7 +1213,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 	 */
 	public <M> Graph<K, VV, EV> runGatherSumApplyIteration(
 			GatherFunction<VV, EV, M> gatherFunction, SumFunction<VV, EV, M> sumFunction,
-			ApplyFunction<VV, EV, M> applyFunction, int maximumNumberOfIterations) {
+			ApplyFunction<K, VV, M> applyFunction, int maximumNumberOfIterations) {
 
 		GatherSumApplyIteration<K, VV, EV, M> iteration = GatherSumApplyIteration.withEdges(
 				edges, gatherFunction, sumFunction, applyFunction, maximumNumberOfIterations);

http://git-wip-us.apache.org/repos/asf/flink/blob/6e24879b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
index dba28cd..7c39123 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
@@ -102,7 +102,7 @@ public class GSAConnectedComponentsExample implements ProgramDescription {
 	};
 
 	@SuppressWarnings("serial")
-	private static final class UpdateComponentId extends ApplyFunction<Long, NullValue, Long> {
+	private static final class UpdateComponentId extends ApplyFunction<Long, Long, Long> {
 
 		public void apply(Long summedValue, Long origValue) {
 			if (summedValue < origValue) {

http://git-wip-us.apache.org/repos/asf/flink/blob/6e24879b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
index 166a3d3..75cbd78 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
@@ -111,7 +111,7 @@ public class GSASingleSourceShortestPathsExample implements ProgramDescription {
 	};
 
 	@SuppressWarnings("serial")
-	private static final class UpdateDistance extends ApplyFunction<Double, Double, Double> {
+	private static final class UpdateDistance extends ApplyFunction<Long, Double, Double> {
 
 		public void apply(Double newDistance, Double oldDistance) {
 			if (newDistance < oldDistance) {

http://git-wip-us.apache.org/repos/asf/flink/blob/6e24879b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
index ab64c92..75f64f9 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
@@ -19,14 +19,16 @@
 package org.apache.flink.graph.gsa;
 
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.graph.Vertex;
 import org.apache.flink.util.Collector;
 
 import java.io.Serializable;
 
 @SuppressWarnings("serial")
-public abstract class ApplyFunction<VV extends Serializable, EV extends Serializable, M> implements Serializable {
+public abstract class ApplyFunction<K extends Comparable<K> & Serializable, VV extends Serializable, M>
+	implements Serializable {
 
-	public abstract void apply(M message, VV vertexValue);
+	public abstract void apply(M newValue, VV currentValue);
 
 	/**
 	 * Sets the result for the apply function
@@ -34,7 +36,8 @@ public abstract class ApplyFunction<VV extends Serializable, EV extends Serializ
 	 * @param result the result of the apply phase
 	 */
 	public void setResult(VV result) {
-		out.collect(result);
+		outVal.f1 = result;
+		out.collect(outVal);
 	}
 
 	/**
@@ -58,14 +61,17 @@ public abstract class ApplyFunction<VV extends Serializable, EV extends Serializ
 	@SuppressWarnings("unused")
 	private IterationRuntimeContext runtimeContext;
 
-	private Collector<VV> out;
+	private Collector<Vertex<K, VV>> out;
+
+	private Vertex<K, VV> outVal;
 
 	public void init(IterationRuntimeContext iterationRuntimeContext) {
 		this.runtimeContext = iterationRuntimeContext;
 	};
 
-	public void setOutput(Collector<VV> out) {
+	public void setOutput(Vertex<K, VV> vertex, Collector<Vertex<K, VV>> out) {
 		this.out = out;
+		this.outVal = vertex;
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6e24879b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
index 7fcd427..22be591 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
@@ -58,13 +58,13 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
 
 	private final GatherFunction<VV, EV, M> gather;
 	private final SumFunction<VV, EV, M> sum;
-	private final ApplyFunction<VV, EV, M> apply;
+	private final ApplyFunction<K, VV, M> apply;
 	private final int maximumNumberOfIterations;
 
 	// ----------------------------------------------------------------------------------
 
 	private GatherSumApplyIteration(GatherFunction<VV, EV, M> gather, SumFunction<VV, EV, M> sum,
-			ApplyFunction<VV, EV, M> apply, DataSet<Edge<K, EV>> edges, int maximumNumberOfIterations) {
+			ApplyFunction<K, VV, M> apply, DataSet<Edge<K, EV>> edges, int maximumNumberOfIterations) {
 
 		Validate.notNull(gather);
 		Validate.notNull(sum);
@@ -161,7 +161,7 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
 	 */
 	public static final <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable, M>
 			GatherSumApplyIteration<K, VV, EV, M> withEdges(DataSet<Edge<K, EV>> edges,
-			GatherFunction<VV, EV, M> gather, SumFunction<VV, EV, M> sum, ApplyFunction<VV, EV, M> apply,
+			GatherFunction<VV, EV, M> gather, SumFunction<VV, EV, M> sum, ApplyFunction<K, VV, M> apply,
 			int maximumNumberOfIterations) {
 		return new GatherSumApplyIteration<K, VV, EV, M>(gather, sum, apply, edges, maximumNumberOfIterations);
 	}
@@ -253,32 +253,19 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
 			VV extends Serializable, EV extends Serializable, M> extends RichFlatJoinFunction<Tuple2<K, M>,
 			Vertex<K, VV>, Vertex<K, VV>> implements ResultTypeQueryable<Vertex<K, VV>> {
 
-		private final ApplyFunction<VV, EV, M> applyFunction;
+		private final ApplyFunction<K, VV, M> applyFunction;
 		private transient TypeInformation<Vertex<K, VV>> resultType;
 
-		private ApplyUdf(ApplyFunction<VV, EV, M> applyFunction, TypeInformation<Vertex<K, VV>> resultType) {
+		private ApplyUdf(ApplyFunction<K, VV, M> applyFunction, TypeInformation<Vertex<K, VV>> resultType) {
 			this.applyFunction = applyFunction;
 			this.resultType = resultType;
 		}
 
 		@Override
-		public void join(Tuple2<K, M> arg0, Vertex<K, VV> arg1, final Collector<Vertex<K, VV>> out) throws Exception {
-
-			final K key = arg1.getId();
-			Collector<VV> userOut = new Collector<VV>() {
-				@Override
-				public void collect(VV record) {
-					out.collect(new Vertex<K, VV>(key, record));
-				}
-
-				@Override
-				public void close() {
-					out.close();
-				}
-			};
-
-			this.applyFunction.setOutput(userOut);
-			this.applyFunction.apply(arg0.f1, arg1.getValue());
+		public void join(Tuple2<K, M> newValue, final Vertex<K, VV> currentValue, final Collector<Vertex<K, VV>> out) throws Exception {
+
+			this.applyFunction.setOutput(currentValue, out);
+			this.applyFunction.apply(newValue.f1, currentValue.getValue());
 		}
 
 		@Override