You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2016/06/28 19:44:37 UTC
flink git commit: [FLINK-3277] [gelly] Use Value types in Gelly API
Repository: flink
Updated Branches:
refs/heads/master 10898a90f -> 40749ddcd
[FLINK-3277] [gelly] Use Value types in Gelly API
This closes #1671
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/40749ddc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/40749ddc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/40749ddc
Branch: refs/heads/master
Commit: 40749ddcd73c4634d81c2153f64e8934d519be3d
Parents: 10898a9
Author: Greg Hogan <co...@greghogan.com>
Authored: Thu Feb 18 10:40:39 2016 -0500
Committer: Greg Hogan <co...@greghogan.com>
Committed: Tue Jun 28 15:04:16 2016 -0400
----------------------------------------------------------------------
docs/apis/batch/libs/gelly.md | 24 +++----
.../flink/graph/examples/GraphMetrics.java | 15 +++--
.../graph/scala/examples/GraphMetrics.scala | 2 +-
.../org/apache/flink/graph/scala/Graph.scala | 22 +++---
.../main/java/org/apache/flink/graph/Graph.java | 19 ++++--
.../apache/flink/graph/library/GSAPageRank.java | 10 +--
.../apache/flink/graph/library/PageRank.java | 10 +--
.../graph/spargel/ScatterGatherIteration.java | 70 ++++++++++----------
.../graph/generator/CompleteGraphTest.java | 8 +--
.../flink/graph/generator/CycleGraphTest.java | 8 +--
.../flink/graph/generator/EmptyGraphTest.java | 4 +-
.../flink/graph/generator/GridGraphTest.java | 8 +--
.../graph/generator/HypercubeGraphTest.java | 8 +--
.../flink/graph/generator/PathGraphTest.java | 8 +--
.../graph/generator/SingletonEdgeGraphTest.java | 8 +--
.../flink/graph/generator/StarGraphTest.java | 8 +--
.../graph/test/operations/DegreesITCase.java | 25 +++----
.../operations/DegreesWithExceptionITCase.java | 11 +--
18 files changed, 138 insertions(+), 130 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/docs/apis/batch/libs/gelly.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/libs/gelly.md b/docs/apis/batch/libs/gelly.md
index b4b78cc..d8692d6 100644
--- a/docs/apis/batch/libs/gelly.md
+++ b/docs/apis/batch/libs/gelly.md
@@ -346,13 +346,13 @@ DataSet<K> getVertexIds()
DataSet<Tuple2<K, K>> getEdgeIds()
// get a DataSet of <vertex ID, in-degree> pairs for all vertices
-DataSet<Tuple2<K, Long>> inDegrees()
+DataSet<Tuple2<K, LongValue>> inDegrees()
// get a DataSet of <vertex ID, out-degree> pairs for all vertices
-DataSet<Tuple2<K, Long>> outDegrees()
+DataSet<Tuple2<K, LongValue>> outDegrees()
// get a DataSet of <vertex ID, degree> pairs for all vertices, where degree is the sum of in- and out- degrees
-DataSet<Tuple2<K, Long>> getDegrees()
+DataSet<Tuple2<K, LongValue>> getDegrees()
// get the number of vertices
long numberOfVertices()
@@ -381,13 +381,13 @@ getVertexIds: DataSet[K]
getEdgeIds: DataSet[(K, K)]
// get a DataSet of <vertex ID, in-degree> pairs for all vertices
-inDegrees: DataSet[(K, Long)]
+inDegrees: DataSet[(K, LongValue)]
// get a DataSet of <vertex ID, out-degree> pairs for all vertices
-outDegrees: DataSet[(K, Long)]
+outDegrees: DataSet[(K, LongValue)]
// get a DataSet of <vertex ID, degree> pairs for all vertices, where degree is the sum of in- and out- degrees
-getDegrees: DataSet[(K, Long)]
+getDegrees: DataSet[(K, LongValue)]
// get the number of vertices
numberOfVertices: Long
@@ -519,13 +519,13 @@ Note that if the input dataset contains a key multiple times, all Gelly join met
{% highlight java %}
Graph<Long, Double, Double> network = ...
-DataSet<Tuple2<Long, Long>> vertexOutDegrees = network.outDegrees();
+DataSet<Tuple2<Long, LongValue>> vertexOutDegrees = network.outDegrees();
// assign the transition probabilities as the edge weights
Graph<Long, Double, Double> networkWithWeights = network.joinWithEdgesOnSource(vertexOutDegrees,
- new VertexJoinFunction<Double, Long>() {
- public Double vertexJoin(Double vertexValue, Long inputValue) {
- return vertexValue / inputValue;
+ new VertexJoinFunction<Double, LongValue>() {
+ public Double vertexJoin(Double vertexValue, LongValue inputValue) {
+ return vertexValue / inputValue.getValue();
}
});
{% endhighlight %}
@@ -535,10 +535,10 @@ Graph<Long, Double, Double> networkWithWeights = network.joinWithEdgesOnSource(v
{% highlight scala %}
val network: Graph[Long, Double, Double] = ...
-val vertexOutDegrees: DataSet[(Long, Long)] = network.outDegrees
+val vertexOutDegrees: DataSet[(Long, LongValue)] = network.outDegrees
// assign the transition probabilities as the edge weights
-val networkWithWeights = network.joinWithEdgesOnSource(vertexOutDegrees, (v1: Double, v2: Long) => v1 / v2)
+val networkWithWeights = network.joinWithEdgesOnSource(vertexOutDegrees, (v1: Double, v2: LongValue) => v1 / v2.getValue)
{% endhighlight %}
</div>
</div>
http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GraphMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GraphMetrics.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GraphMetrics.java
index 9058538..e7b47bf 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GraphMetrics.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GraphMetrics.java
@@ -18,7 +18,6 @@
package org.apache.flink.graph.examples;
-import org.apache.flink.graph.examples.utils.ExampleUtils;
import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
@@ -27,6 +26,8 @@ import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.examples.utils.ExampleUtils;
+import org.apache.flink.types.LongValue;
import org.apache.flink.types.NullValue;
/**
@@ -66,7 +67,7 @@ public class GraphMetrics implements ProgramDescription {
long numEdges = graph.numberOfEdges();
/** compute the average node degree **/
- DataSet<Tuple2<Long, Long>> verticesWithDegrees = graph.getDegrees();
+ DataSet<Tuple2<Long, LongValue>> verticesWithDegrees = graph.getDegrees();
DataSet<Double> avgNodeDegree = verticesWithDegrees
.aggregate(Aggregations.SUM, 1).map(new AvgNodeDegreeMapper(numVertices));
@@ -96,7 +97,7 @@ public class GraphMetrics implements ProgramDescription {
}
@SuppressWarnings("serial")
- private static final class AvgNodeDegreeMapper implements MapFunction<Tuple2<Long, Long>, Double> {
+ private static final class AvgNodeDegreeMapper implements MapFunction<Tuple2<Long, LongValue>, Double> {
private long numberOfVertices;
@@ -104,14 +105,14 @@ public class GraphMetrics implements ProgramDescription {
this.numberOfVertices = numberOfVertices;
}
- public Double map(Tuple2<Long, Long> sumTuple) {
- return (double) (sumTuple.f1 / numberOfVertices) ;
+ public Double map(Tuple2<Long, LongValue> sumTuple) {
+ return (double) (sumTuple.f1.getValue() / numberOfVertices) ;
}
}
@SuppressWarnings("serial")
- private static final class ProjectVertexId implements MapFunction<Tuple2<Long,Long>, Long> {
- public Long map(Tuple2<Long, Long> value) { return value.f0; }
+ private static final class ProjectVertexId implements MapFunction<Tuple2<Long, LongValue>, Long> {
+ public Long map(Tuple2<Long, LongValue> value) { return value.f0; }
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GraphMetrics.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GraphMetrics.scala b/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GraphMetrics.scala
index f9fa82d..ebf43d4 100644
--- a/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GraphMetrics.scala
+++ b/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GraphMetrics.scala
@@ -61,7 +61,7 @@ object GraphMetrics {
/** compute the average node degree **/
val verticesWithDegrees = graph.getDegrees
- val avgDegree = verticesWithDegrees.sum(1).map(in => (in._2 / numVertices).toDouble)
+ val avgDegree = verticesWithDegrees.sum(1).map(in => (in._2.getValue / numVertices).toDouble)
/** find the vertex with the maximum in-degree **/
val maxInDegreeVertex = graph.inDegrees.max(1).map(in => in._1)
http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
index 165f6c2..f7e13ba 100644
--- a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
+++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
@@ -18,24 +18,22 @@
package org.apache.flink.graph.scala
-import org.apache.flink.util.Preconditions
import org.apache.flink.api.common.functions.{FilterFunction, MapFunction}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.{tuple => jtuple}
import org.apache.flink.api.scala._
import org.apache.flink.graph._
import org.apache.flink.graph.asm.translate.TranslateFunction
-import org.apache.flink.graph.validation.GraphValidator
import org.apache.flink.graph.gsa.{ApplyFunction, GSAConfiguration, GatherFunction, SumFunction}
+import org.apache.flink.graph.pregel.{ComputeFunction, MessageCombiner, VertexCentricConfiguration}
import org.apache.flink.graph.spargel.{MessagingFunction, ScatterGatherConfiguration, VertexUpdateFunction}
+import org.apache.flink.graph.validation.GraphValidator
+import org.apache.flink.types.{LongValue, NullValue}
+import org.apache.flink.util.Preconditions
import org.apache.flink.{graph => jg}
import _root_.scala.collection.JavaConverters._
import _root_.scala.reflect.ClassTag
-import org.apache.flink.types.NullValue
-import org.apache.flink.graph.pregel.ComputeFunction
-import org.apache.flink.graph.pregel.MessageCombiner
-import org.apache.flink.graph.pregel.VertexCentricConfiguration
object Graph {
@@ -803,8 +801,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
*
* @return A DataSet of Tuple2<vertexId, inDegree>
*/
- def inDegrees(): DataSet[(K, Long)] = {
- wrap(jgraph.inDegrees).map(javatuple => (javatuple.f0, javatuple.f1.longValue()))
+ def inDegrees(): DataSet[(K, LongValue)] = {
+ wrap(jgraph.inDegrees).map(javatuple => (javatuple.f0, javatuple.f1))
}
/**
@@ -812,8 +810,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
*
* @return A DataSet of Tuple2<vertexId, outDegree>
*/
- def outDegrees(): DataSet[(K, Long)] = {
- wrap(jgraph.outDegrees).map(javatuple => (javatuple.f0, javatuple.f1.longValue()))
+ def outDegrees(): DataSet[(K, LongValue)] = {
+ wrap(jgraph.outDegrees).map(javatuple => (javatuple.f0, javatuple.f1))
}
/**
@@ -821,8 +819,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
*
* @return A DataSet of Tuple2<vertexId, degree>
*/
- def getDegrees(): DataSet[(K, Long)] = {
- wrap(jgraph.getDegrees).map(javatuple => (javatuple.f0, javatuple.f1.longValue()))
+ def getDegrees(): DataSet[(K, LongValue)] = {
+ wrap(jgraph.getDegrees).map(javatuple => (javatuple.f0, javatuple.f1))
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 3dbb9c4..fe59283 100755
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -61,6 +61,7 @@ import org.apache.flink.graph.utils.Tuple2ToVertexMap;
import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
import org.apache.flink.graph.utils.VertexToTuple2Map;
import org.apache.flink.graph.validation.GraphValidator;
+import org.apache.flink.types.LongValue;
import org.apache.flink.types.NullValue;
import org.apache.flink.util.Collector;
@@ -867,25 +868,31 @@ public class Graph<K, VV, EV> {
*
* @return A DataSet of {@code Tuple2<vertexId, outDegree>}
*/
- public DataSet<Tuple2<K, Long>> outDegrees() {
+ public DataSet<Tuple2<K, LongValue>> outDegrees() {
return vertices.coGroup(edges).where(0).equalTo(0).with(new CountNeighborsCoGroup<K, VV, EV>());
}
private static final class CountNeighborsCoGroup<K, VV, EV>
- implements CoGroupFunction<Vertex<K, VV>, Edge<K, EV>, Tuple2<K, Long>> {
+ implements CoGroupFunction<Vertex<K, VV>, Edge<K, EV>, Tuple2<K, LongValue>> {
+ private LongValue degree = new LongValue();
+
+ private Tuple2<K, LongValue> vertexDegree = new Tuple2<>(null, degree);
+
@SuppressWarnings("unused")
public void coGroup(Iterable<Vertex<K, VV>> vertex, Iterable<Edge<K, EV>> outEdges,
- Collector<Tuple2<K, Long>> out) {
+ Collector<Tuple2<K, LongValue>> out) {
long count = 0;
for (Edge<K, EV> edge : outEdges) {
count++;
}
+ degree.setValue(count);
Iterator<Vertex<K, VV>> vertexIterator = vertex.iterator();
if(vertexIterator.hasNext()) {
- out.collect(new Tuple2<K, Long>(vertexIterator.next().f0, count));
+ vertexDegree.f0 = vertexIterator.next().f0;
+ out.collect(vertexDegree);
} else {
throw new NoSuchElementException("The edge src/trg id could not be found within the vertexIds");
}
@@ -897,7 +904,7 @@ public class Graph<K, VV, EV> {
*
* @return A DataSet of {@code Tuple2<vertexId, inDegree>}
*/
- public DataSet<Tuple2<K, Long>> inDegrees() {
+ public DataSet<Tuple2<K, LongValue>> inDegrees() {
return vertices.coGroup(edges).where(0).equalTo(1).with(new CountNeighborsCoGroup<K, VV, EV>());
}
@@ -907,7 +914,7 @@ public class Graph<K, VV, EV> {
*
* @return A DataSet of {@code Tuple2<vertexId, degree>}
*/
- public DataSet<Tuple2<K, Long>> getDegrees() {
+ public DataSet<Tuple2<K, LongValue>> getDegrees() {
return outDegrees().union(inDegrees()).groupBy(0).sum(1);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
index 324f9c3..ef39395 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
@@ -29,6 +29,7 @@ import org.apache.flink.graph.gsa.GSAConfiguration;
import org.apache.flink.graph.gsa.GatherFunction;
import org.apache.flink.graph.gsa.Neighbor;
import org.apache.flink.graph.gsa.SumFunction;
+import org.apache.flink.types.LongValue;
/**
* This is an implementation of a simple PageRank algorithm, using a gather-sum-apply iteration.
@@ -56,8 +57,7 @@ public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet
@Override
public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) throws Exception {
-
- DataSet<Tuple2<K, Long>> vertexOutDegrees = network.outDegrees();
+ DataSet<Tuple2<K, LongValue>> vertexOutDegrees = network.outDegrees();
Graph<K, Double, Double> networkWithWeights = network
.joinWithEdgesOnSource(vertexOutDegrees, new InitWeights());
@@ -114,10 +114,10 @@ public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet
}
@SuppressWarnings("serial")
- private static final class InitWeights implements EdgeJoinFunction<Double, Long> {
+ private static final class InitWeights implements EdgeJoinFunction<Double, LongValue> {
- public Double edgeJoin(Double edgeValue, Long inputValue) {
- return edgeValue / (double) inputValue;
+ public Double edgeJoin(Double edgeValue, LongValue inputValue) {
+ return edgeValue / (double) inputValue.getValue();
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
index f83b05b..2f1b03b 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
@@ -29,6 +29,7 @@ import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.MessagingFunction;
import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.types.LongValue;
/**
* This is an implementation of a simple PageRank algorithm, using a scatter-gather iteration.
@@ -56,8 +57,7 @@ public class PageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Ve
@Override
public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) throws Exception {
-
- DataSet<Tuple2<K, Long>> vertexOutDegrees = network.outDegrees();
+ DataSet<Tuple2<K, LongValue>> vertexOutDegrees = network.outDegrees();
Graph<K, Double, Double> networkWithWeights = network
.joinWithEdgesOnSource(vertexOutDegrees, new InitWeights());
@@ -118,10 +118,10 @@ public class PageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Ve
}
@SuppressWarnings("serial")
- private static final class InitWeights implements EdgeJoinFunction<Double, Long> {
+ private static final class InitWeights implements EdgeJoinFunction<Double, LongValue> {
- public Double edgeJoin(Double edgeValue, Long inputValue) {
- return edgeValue / (double) inputValue;
+ public Double edgeJoin(Double edgeValue, LongValue inputValue) {
+ return edgeValue / (double) inputValue.getValue();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
index 165ef1e..fc5c210 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
@@ -309,28 +309,28 @@ public class ScatterGatherIteration<K, VV, Message, EV>
}
@SuppressWarnings("serial")
- private static final class VertexUpdateUdfVVWithDegrees<K, VV, Message> extends VertexUpdateUdf<K, Tuple3<VV, Long, Long>, Message> {
+ private static final class VertexUpdateUdfVVWithDegrees<K, VV, Message> extends VertexUpdateUdf<K, Tuple3<VV, LongValue, LongValue>, Message> {
- private VertexUpdateUdfVVWithDegrees(VertexUpdateFunction<K, Tuple3<VV, Long, Long>, Message> vertexUpdateFunction,
- TypeInformation<Vertex<K, Tuple3<VV, Long, Long>>> resultType) {
+ private VertexUpdateUdfVVWithDegrees(VertexUpdateFunction<K, Tuple3<VV, LongValue, LongValue>, Message> vertexUpdateFunction,
+ TypeInformation<Vertex<K, Tuple3<VV, LongValue, LongValue>>> resultType) {
super(vertexUpdateFunction, resultType);
}
@Override
- public void coGroup(Iterable<Tuple2<K, Message>> messages, Iterable<Vertex<K, Tuple3<VV, Long, Long>>> vertex,
- Collector<Vertex<K, Tuple3<VV, Long, Long>>> out) throws Exception {
+ public void coGroup(Iterable<Tuple2<K, Message>> messages, Iterable<Vertex<K, Tuple3<VV, LongValue, LongValue>>> vertex,
+ Collector<Vertex<K, Tuple3<VV, LongValue, LongValue>>> out) throws Exception {
- final Iterator<Vertex<K, Tuple3<VV, Long, Long>>> vertexIter = vertex.iterator();
+ final Iterator<Vertex<K, Tuple3<VV, LongValue, LongValue>>> vertexIter = vertex.iterator();
if (vertexIter.hasNext()) {
- Vertex<K, Tuple3<VV, Long, Long>> vertexWithDegrees = vertexIter.next();
+ Vertex<K, Tuple3<VV, LongValue, LongValue>> vertexWithDegrees = vertexIter.next();
@SuppressWarnings("unchecked")
Iterator<Tuple2<?, Message>> downcastIter = (Iterator<Tuple2<?, Message>>) (Iterator<?>) messages.iterator();
messageIter.setSource(downcastIter);
- vertexUpdateFunction.setInDegree(vertexWithDegrees.f1.f1);
- vertexUpdateFunction.setOutDegree(vertexWithDegrees.f1.f2);
+ vertexUpdateFunction.setInDegree(vertexWithDegrees.f1.f1.getValue());
+ vertexUpdateFunction.setOutDegree(vertexWithDegrees.f1.f2.getValue());
vertexUpdateFunction.setOutputWithDegrees(vertexWithDegrees, out);
vertexUpdateFunction.updateVertexFromScatterGatherIteration(vertexWithDegrees, messageIter);
@@ -420,7 +420,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
@SuppressWarnings("serial")
private static final class MessagingUdfWithEVsVVWithDegrees<K, VV, Message, EV>
- extends MessagingUdfWithEdgeValues<K, Tuple3<VV, Long, Long>, VV, Message, EV> {
+ extends MessagingUdfWithEdgeValues<K, Tuple3<VV, LongValue, LongValue>, VV, Message, EV> {
private Vertex<K, VV> nextVertex = new Vertex<K, VV>();
@@ -430,19 +430,19 @@ public class ScatterGatherIteration<K, VV, Message, EV>
}
@Override
- public void coGroup(Iterable<Edge<K, EV>> edges, Iterable<Vertex<K, Tuple3<VV, Long, Long>>> state,
+ public void coGroup(Iterable<Edge<K, EV>> edges, Iterable<Vertex<K, Tuple3<VV, LongValue, LongValue>>> state,
Collector<Tuple2<K, Message>> out) throws Exception {
- final Iterator<Vertex<K, Tuple3<VV, Long, Long>>> stateIter = state.iterator();
+ final Iterator<Vertex<K, Tuple3<VV, LongValue, LongValue>>> stateIter = state.iterator();
if (stateIter.hasNext()) {
- Vertex<K, Tuple3<VV, Long, Long>> vertexWithDegrees = stateIter.next();
+ Vertex<K, Tuple3<VV, LongValue, LongValue>> vertexWithDegrees = stateIter.next();
nextVertex.setField(vertexWithDegrees.f0, 0);
nextVertex.setField(vertexWithDegrees.f1.f0, 1);
- messagingFunction.setInDegree(vertexWithDegrees.f1.f1);
- messagingFunction.setOutDegree(vertexWithDegrees.f1.f2);
+ messagingFunction.setInDegree(vertexWithDegrees.f1.f1.getValue());
+ messagingFunction.setOutDegree(vertexWithDegrees.f1.f2.getValue());
messagingFunction.set((Iterator<?>) edges.iterator(), out, vertexWithDegrees.getId());
messagingFunction.sendMessages(nextVertex);
@@ -505,13 +505,13 @@ public class ScatterGatherIteration<K, VV, Message, EV>
* @return the messaging function
*/
private CoGroupOperator<?, ?, Tuple2<K, Message>> buildMessagingFunctionVerticesWithDegrees(
- DeltaIteration<Vertex<K, Tuple3<VV, Long, Long>>, Vertex<K, Tuple3<VV, Long, Long>>> iteration,
+ DeltaIteration<Vertex<K, Tuple3<VV, LongValue, LongValue>>, Vertex<K, Tuple3<VV, LongValue, LongValue>>> iteration,
TypeInformation<Tuple2<K, Message>> messageTypeInfo, int whereArg, int equalToArg,
DataSet<LongValue> numberOfVertices) {
// build the messaging function (co group)
CoGroupOperator<?, ?, Tuple2<K, Message>> messages;
- MessagingUdfWithEdgeValues<K, Tuple3<VV, Long, Long>, VV, Message, EV> messenger =
+ MessagingUdfWithEdgeValues<K, Tuple3<VV, LongValue, LongValue>, VV, Message, EV> messenger =
new MessagingUdfWithEVsVVWithDegrees<K, VV, Message, EV>(messagingFunction, messageTypeInfo);
messages = this.edgesWithValue.coGroup(iteration.getWorkset()).where(whereArg)
@@ -626,34 +626,34 @@ public class ScatterGatherIteration<K, VV, Message, EV>
this.updateFunction.setOptDegrees(this.configuration.isOptDegrees());
- DataSet<Tuple2<K, Long>> inDegrees = graph.inDegrees();
- DataSet<Tuple2<K, Long>> outDegrees = graph.outDegrees();
+ DataSet<Tuple2<K, LongValue>> inDegrees = graph.inDegrees();
+ DataSet<Tuple2<K, LongValue>> outDegrees = graph.outDegrees();
- DataSet<Tuple3<K, Long, Long>> degrees = inDegrees.join(outDegrees).where(0).equalTo(0)
- .with(new FlatJoinFunction<Tuple2<K, Long>, Tuple2<K, Long>, Tuple3<K, Long, Long>>() {
+ DataSet<Tuple3<K, LongValue, LongValue>> degrees = inDegrees.join(outDegrees).where(0).equalTo(0)
+ .with(new FlatJoinFunction<Tuple2<K, LongValue>, Tuple2<K, LongValue>, Tuple3<K, LongValue, LongValue>>() {
@Override
- public void join(Tuple2<K, Long> first, Tuple2<K, Long> second, Collector<Tuple3<K, Long, Long>> out) {
- out.collect(new Tuple3<K, Long, Long>(first.f0, first.f1, second.f1));
+ public void join(Tuple2<K, LongValue> first, Tuple2<K, LongValue> second, Collector<Tuple3<K, LongValue, LongValue>> out) {
+ out.collect(new Tuple3<K, LongValue, LongValue>(first.f0, first.f1, second.f1));
}
}).withForwardedFieldsFirst("f0;f1").withForwardedFieldsSecond("f1");
- DataSet<Vertex<K, Tuple3<VV, Long, Long>>> verticesWithDegrees = initialVertices
+ DataSet<Vertex<K, Tuple3<VV, LongValue, LongValue>>> verticesWithDegrees = initialVertices
.join(degrees).where(0).equalTo(0)
- .with(new FlatJoinFunction<Vertex<K,VV>, Tuple3<K,Long,Long>, Vertex<K, Tuple3<VV, Long, Long>>>() {
+ .with(new FlatJoinFunction<Vertex<K,VV>, Tuple3<K, LongValue, LongValue>, Vertex<K, Tuple3<VV, LongValue, LongValue>>>() {
@Override
- public void join(Vertex<K, VV> vertex, Tuple3<K, Long, Long> degrees,
- Collector<Vertex<K, Tuple3<VV, Long, Long>>> out) throws Exception {
+ public void join(Vertex<K, VV> vertex, Tuple3<K, LongValue, LongValue> degrees,
+ Collector<Vertex<K, Tuple3<VV, LongValue, LongValue>>> out) throws Exception {
- out.collect(new Vertex<K, Tuple3<VV, Long, Long>>(vertex.getId(),
- new Tuple3<VV, Long, Long>(vertex.getValue(), degrees.f1, degrees.f2)));
+ out.collect(new Vertex<K, Tuple3<VV, LongValue, LongValue>>(vertex.getId(),
+ new Tuple3<VV, LongValue, LongValue>(vertex.getValue(), degrees.f1, degrees.f2)));
}
}).withForwardedFieldsFirst("f0");
// add type info
- TypeInformation<Vertex<K, Tuple3<VV, Long, Long>>> vertexTypes = verticesWithDegrees.getType();
+ TypeInformation<Vertex<K, Tuple3<VV, LongValue, LongValue>>> vertexTypes = verticesWithDegrees.getType();
- final DeltaIteration<Vertex<K, Tuple3<VV, Long, Long>>, Vertex<K, Tuple3<VV, Long, Long>>> iteration =
+ final DeltaIteration<Vertex<K, Tuple3<VV, LongValue, LongValue>>, Vertex<K, Tuple3<VV, LongValue, LongValue>>> iteration =
verticesWithDegrees.iterateDelta(verticesWithDegrees, this.maximumNumberOfIterations, 0);
setUpIteration(iteration);
@@ -673,11 +673,11 @@ public class ScatterGatherIteration<K, VV, Message, EV>
}
@SuppressWarnings({ "unchecked", "rawtypes" })
- VertexUpdateUdf<K, Tuple3<VV, Long, Long>, Message> updateUdf =
+ VertexUpdateUdf<K, Tuple3<VV, LongValue, LongValue>, Message> updateUdf =
new VertexUpdateUdfVVWithDegrees(updateFunction, vertexTypes);
// build the update function (co group)
- CoGroupOperator<?, ?, Vertex<K, Tuple3<VV, Long, Long>>> updates =
+ CoGroupOperator<?, ?, Vertex<K, Tuple3<VV, LongValue, LongValue>>> updates =
messages.coGroup(iteration.getSolutionSet()).where(0).equalTo(0).with(updateUdf);
if (this.configuration != null && this.configuration.isOptNumVertices()) {
@@ -687,9 +687,9 @@ public class ScatterGatherIteration<K, VV, Message, EV>
configureUpdateFunction(updates);
return iteration.closeWith(updates, updates).map(
- new MapFunction<Vertex<K, Tuple3<VV, Long, Long>>, Vertex<K, VV>>() {
+ new MapFunction<Vertex<K, Tuple3<VV, LongValue, LongValue>>, Vertex<K, VV>>() {
- public Vertex<K, VV> map(Vertex<K, Tuple3<VV, Long, Long>> vertex) {
+ public Vertex<K, VV> map(Vertex<K, Tuple3<VV, LongValue, LongValue>> vertex) {
return new Vertex<K, VV>(vertex.getId(), vertex.getValue().f0);
}
});
http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CompleteGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CompleteGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CompleteGraphTest.java
index af47fdc..6c0e094 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CompleteGraphTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CompleteGraphTest.java
@@ -56,10 +56,10 @@ extends AbstractGraphTest {
assertEquals(vertexCount, graph.numberOfVertices());
assertEquals(vertexCount*(vertexCount-1), graph.numberOfEdges());
- long minInDegree = graph.inDegrees().min(1).collect().get(0).f1;
- long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1;
- long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1;
- long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1;
+ long minInDegree = graph.inDegrees().min(1).collect().get(0).f1.getValue();
+ long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1.getValue();
+ long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1.getValue();
+ long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1.getValue();
assertEquals(vertexCount - 1, minInDegree);
assertEquals(vertexCount - 1, minOutDegree);
http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CycleGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CycleGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CycleGraphTest.java
index fb6799b..ec36aa7 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CycleGraphTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CycleGraphTest.java
@@ -55,10 +55,10 @@ extends AbstractGraphTest {
assertEquals(vertexCount, graph.numberOfVertices());
assertEquals(2 * vertexCount, graph.numberOfEdges());
- long minInDegree = graph.inDegrees().min(1).collect().get(0).f1;
- long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1;
- long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1;
- long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1;
+ long minInDegree = graph.inDegrees().min(1).collect().get(0).f1.getValue();
+ long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1.getValue();
+ long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1.getValue();
+ long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1.getValue();
assertEquals(2, minInDegree);
assertEquals(2, minOutDegree);
http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EmptyGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EmptyGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EmptyGraphTest.java
index bc1ef77..d4a524f 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EmptyGraphTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EmptyGraphTest.java
@@ -54,8 +54,8 @@ extends AbstractGraphTest {
assertEquals(vertexCount, graph.numberOfVertices());
assertEquals(0, graph.numberOfEdges());
- long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1;
- long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1;
+ long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1.getValue();
+ long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1.getValue();
assertEquals(0, maxInDegree);
assertEquals(0, maxOutDegree);
http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/GridGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/GridGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/GridGraphTest.java
index f3fa7db..9606d1a 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/GridGraphTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/GridGraphTest.java
@@ -63,10 +63,10 @@ extends AbstractGraphTest {
assertEquals(2*3*5*7, graph.numberOfVertices());
assertEquals(7 * 2*3*5*7, graph.numberOfEdges());
- long minInDegree = graph.inDegrees().min(1).collect().get(0).f1;
- long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1;
- long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1;
- long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1;
+ long minInDegree = graph.inDegrees().min(1).collect().get(0).f1.getValue();
+ long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1.getValue();
+ long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1.getValue();
+ long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1.getValue();
assertEquals(7, minInDegree);
assertEquals(7, minOutDegree);
http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/HypercubeGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/HypercubeGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/HypercubeGraphTest.java
index 12024be..d723ecb 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/HypercubeGraphTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/HypercubeGraphTest.java
@@ -57,10 +57,10 @@ extends AbstractGraphTest {
assertEquals(1L << dimensions, graph.numberOfVertices());
assertEquals(dimensions * (1L << dimensions), graph.numberOfEdges());
- long minInDegree = graph.inDegrees().min(1).collect().get(0).f1;
- long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1;
- long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1;
- long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1;
+ long minInDegree = graph.inDegrees().min(1).collect().get(0).f1.getValue();
+ long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1.getValue();
+ long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1.getValue();
+ long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1.getValue();
assertEquals(dimensions, minInDegree);
assertEquals(dimensions, minOutDegree);
http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/PathGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/PathGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/PathGraphTest.java
index b8a409f..3c3ce8c 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/PathGraphTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/PathGraphTest.java
@@ -55,10 +55,10 @@ extends AbstractGraphTest {
assertEquals(vertexCount, graph.numberOfVertices());
assertEquals(2 * (vertexCount - 1), graph.numberOfEdges());
- long minInDegree = graph.inDegrees().min(1).collect().get(0).f1;
- long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1;
- long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1;
- long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1;
+ long minInDegree = graph.inDegrees().min(1).collect().get(0).f1.getValue();
+ long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1.getValue();
+ long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1.getValue();
+ long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1.getValue();
assertEquals(1, minInDegree);
assertEquals(1, minOutDegree);
http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/SingletonEdgeGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/SingletonEdgeGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/SingletonEdgeGraphTest.java
index 3877717..44a4d99 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/SingletonEdgeGraphTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/SingletonEdgeGraphTest.java
@@ -56,10 +56,10 @@ extends AbstractGraphTest {
assertEquals(2 * vertexPairCount, graph.numberOfVertices());
assertEquals(2 * vertexPairCount, graph.numberOfEdges());
- long minInDegree = graph.inDegrees().min(1).collect().get(0).f1;
- long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1;
- long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1;
- long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1;
+ long minInDegree = graph.inDegrees().min(1).collect().get(0).f1.getValue();
+ long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1.getValue();
+ long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1.getValue();
+ long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1.getValue();
assertEquals(1, minInDegree);
assertEquals(1, minOutDegree);
http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/StarGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/StarGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/StarGraphTest.java
index 2b090db..c656cfb 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/StarGraphTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/StarGraphTest.java
@@ -57,10 +57,10 @@ extends AbstractGraphTest {
assertEquals(vertexCount, graph.numberOfVertices());
assertEquals(2 * (vertexCount - 1), graph.numberOfEdges());
- long minInDegree = graph.inDegrees().min(1).collect().get(0).f1;
- long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1;
- long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1;
- long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1;
+ long minInDegree = graph.inDegrees().min(1).collect().get(0).f1.getValue();
+ long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1.getValue();
+ long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1.getValue();
+ long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1.getValue();
assertEquals(1, minInDegree);
assertEquals(1, minOutDegree);
http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesITCase.java
index b2744f9..db2ca0d 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.test.TestGraphUtils;
import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.LongValue;
import org.apache.flink.types.NullValue;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -50,8 +51,8 @@ public class DegreesITCase extends MultipleProgramsTestBase {
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
- DataSet<Tuple2<Long,Long>> data =graph.outDegrees();
- List<Tuple2<Long,Long>> result= data.collect();
+ DataSet<Tuple2<Long, LongValue>> data = graph.outDegrees();
+ List<Tuple2<Long, LongValue>> result = data.collect();
expectedResult = "1,2\n" +
@@ -76,8 +77,8 @@ public class DegreesITCase extends MultipleProgramsTestBase {
- DataSet<Tuple2<Long,Long>> data =graph.outDegrees();
- List<Tuple2<Long,Long>> result= data.collect();
+ DataSet<Tuple2<Long, LongValue>> data = graph.outDegrees();
+ List<Tuple2<Long, LongValue>> result = data.collect();
expectedResult = "1,3\n" +
"2,1\n" +
@@ -99,8 +100,8 @@ public class DegreesITCase extends MultipleProgramsTestBase {
TestGraphUtils.getLongLongEdgeData(env), env);
- DataSet<Tuple2<Long,Long>> data =graph.inDegrees();
- List<Tuple2<Long,Long>> result= data.collect();
+ DataSet<Tuple2<Long, LongValue>> data = graph.inDegrees();
+ List<Tuple2<Long, LongValue>> result = data.collect();
expectedResult = "1,1\n" +
"2,1\n" +
@@ -120,8 +121,8 @@ public class DegreesITCase extends MultipleProgramsTestBase {
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeDataWithZeroDegree(env), env);
- DataSet<Tuple2<Long,Long>> data =graph.inDegrees();
- List<Tuple2<Long,Long>> result= data.collect();
+ DataSet<Tuple2<Long, LongValue>> data = graph.inDegrees();
+ List<Tuple2<Long, LongValue>> result = data.collect();
expectedResult = "1,0\n" +
"2,1\n" +
@@ -142,8 +143,8 @@ public class DegreesITCase extends MultipleProgramsTestBase {
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
- DataSet<Tuple2<Long,Long>> data =graph.getDegrees();
- List<Tuple2<Long,Long>> result= data.collect();
+ DataSet<Tuple2<Long, LongValue>> data = graph.getDegrees();
+ List<Tuple2<Long, LongValue>> result = data.collect();
expectedResult = "1,3\n" +
"2,2\n" +
@@ -164,8 +165,8 @@ public class DegreesITCase extends MultipleProgramsTestBase {
Graph<Long, NullValue, Long> graph =
Graph.fromDataSet(TestGraphUtils.getDisconnectedLongLongEdgeData(env), env);
- DataSet<Tuple2<Long,Long>> data =graph.outDegrees();
- List<Tuple2<Long,Long>> result= data.collect();
+ DataSet<Tuple2<Long, LongValue>> data = graph.outDegrees();
+ List<Tuple2<Long, LongValue>> result = data.collect();
expectedResult = "1,2\n" +
"2,1\n" +
http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
index d79768f..551a97b 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.graph.Graph;
import org.apache.flink.graph.test.TestGraphUtils;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.types.LongValue;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -81,7 +82,7 @@ public class DegreesWithExceptionITCase {
TestGraphUtils.getLongLongEdgeInvalidSrcData(env), env);
try {
- graph.outDegrees().output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+ graph.outDegrees().output(new DiscardingOutputFormat<Tuple2<Long, LongValue>>());
env.execute();
fail("graph.outDegrees() did not fail.");
@@ -105,7 +106,7 @@ public class DegreesWithExceptionITCase {
TestGraphUtils.getLongLongEdgeInvalidTrgData(env), env);
try {
- graph.inDegrees().output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+ graph.inDegrees().output(new DiscardingOutputFormat<Tuple2<Long, LongValue>>());
env.execute();
fail("graph.inDegrees() did not fail.");
@@ -129,7 +130,7 @@ public class DegreesWithExceptionITCase {
TestGraphUtils.getLongLongEdgeInvalidTrgData(env), env);
try {
- graph.getDegrees().output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+ graph.getDegrees().output(new DiscardingOutputFormat<Tuple2<Long, LongValue>>());
env.execute();
fail("graph.getDegrees() did not fail.");
@@ -153,7 +154,7 @@ public class DegreesWithExceptionITCase {
TestGraphUtils.getLongLongEdgeInvalidSrcData(env), env);
try {
- graph.getDegrees().output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+ graph.getDegrees().output(new DiscardingOutputFormat<Tuple2<Long, LongValue>>());
env.execute();
fail("graph.getDegrees() did not fail.");
@@ -177,7 +178,7 @@ public class DegreesWithExceptionITCase {
TestGraphUtils.getLongLongEdgeInvalidSrcTrgData(env), env);
try {
- graph.getDegrees().output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+ graph.getDegrees().output(new DiscardingOutputFormat<Tuple2<Long, LongValue>>());
env.execute();
fail("graph.getDegrees() did not fail.");