You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/11/26 09:55:36 UTC
spark git commit: Removing confusing TripletFields
Repository: spark
Updated Branches:
refs/heads/master e7f4d2534 -> 288ce583b
Removing confusing TripletFields
After additional discussion with rxin, I think having all the possible `TripletField` options is confusing. This pull request reduces the triplet fields to:
```java
/**
* None of the triplet fields are exposed.
*/
public static final TripletFields None = new TripletFields(false, false, false);
/**
* Expose only the edge field and not the source or destination field.
*/
public static final TripletFields EdgeOnly = new TripletFields(false, false, true);
/**
* Expose the source and edge fields but not the destination field. (Same as Src)
*/
public static final TripletFields Src = new TripletFields(true, false, true);
/**
* Expose the destination and edge fields but not the source field. (Same as Dst)
*/
public static final TripletFields Dst = new TripletFields(false, true, true);
/**
* Expose all the fields (source, edge, and destination).
*/
public static final TripletFields All = new TripletFields(true, true, true);
```
Author: Joseph E. Gonzalez <jo...@gmail.com>
Closes #3472 from jegonzal/SimplifyTripletFields and squashes the following commits:
91796b5 [Joseph E. Gonzalez] removing confusing triplet fields
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/288ce583
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/288ce583
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/288ce583
Branch: refs/heads/master
Commit: 288ce583b05004a8c71dcd836fab23caff5d4ba7
Parents: e7f4d25
Author: Joseph E. Gonzalez <jo...@gmail.com>
Authored: Wed Nov 26 00:55:28 2014 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Nov 26 00:55:28 2014 -0800
----------------------------------------------------------------------
.../org/apache/spark/graphx/GraphOps.scala | 6 ++--
.../org/apache/spark/graphx/TripletFields.java | 29 ++------------------
.../org/apache/spark/graphx/lib/PageRank.scala | 4 +--
.../org/apache/spark/graphx/GraphSuite.scala | 2 +-
4 files changed, 8 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/288ce583/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
index d515038..116d1ea 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
@@ -129,15 +129,15 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
ctx.sendToSrc(Array((ctx.dstId, ctx.dstAttr)))
ctx.sendToDst(Array((ctx.srcId, ctx.srcAttr)))
},
- (a, b) => a ++ b, TripletFields.SrcDstOnly)
+ (a, b) => a ++ b, TripletFields.All)
case EdgeDirection.In =>
graph.aggregateMessages[Array[(VertexId,VD)]](
ctx => ctx.sendToDst(Array((ctx.srcId, ctx.srcAttr))),
- (a, b) => a ++ b, TripletFields.SrcOnly)
+ (a, b) => a ++ b, TripletFields.Src)
case EdgeDirection.Out =>
graph.aggregateMessages[Array[(VertexId,VD)]](
ctx => ctx.sendToSrc(Array((ctx.dstId, ctx.dstAttr))),
- (a, b) => a ++ b, TripletFields.DstOnly)
+ (a, b) => a ++ b, TripletFields.Dst)
case EdgeDirection.Both =>
throw new SparkException("collectEdges does not support EdgeDirection.Both. Use" +
"EdgeDirection.Either instead.")
http://git-wip-us.apache.org/repos/asf/spark/blob/288ce583/graphx/src/main/scala/org/apache/spark/graphx/TripletFields.java
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/TripletFields.java b/graphx/src/main/scala/org/apache/spark/graphx/TripletFields.java
index 8dfccfe..7eb4ae0 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/TripletFields.java
+++ b/graphx/src/main/scala/org/apache/spark/graphx/TripletFields.java
@@ -56,39 +56,14 @@ public class TripletFields implements Serializable {
public static final TripletFields EdgeOnly = new TripletFields(false, false, true);
/**
- * Expose only the source field and not the edge or destination field.
- */
- public static final TripletFields SrcOnly = new TripletFields(true, false, false);
-
- /**
- * Expose only the destination field and not the edge or source field.
- */
- public static final TripletFields DstOnly = new TripletFields(false, true, false);
-
- /**
- * Expose the source and destination fields but not the edge field.
- */
- public static final TripletFields SrcDstOnly = new TripletFields(true, true, false);
-
- /**
* Expose the source and edge fields but not the destination field. (Same as Src)
*/
- public static final TripletFields SrcAndEdge = new TripletFields(true, false, true);
-
- /**
- * Expose the source and edge fields but not the destination field. (Same as SrcAndEdge)
- */
- public static final TripletFields Src = SrcAndEdge;
+ public static final TripletFields Src = new TripletFields(true, false, true);
/**
* Expose the destination and edge fields but not the source field. (Same as Dst)
*/
- public static final TripletFields DstAndEdge = new TripletFields(false, true, true);
-
- /**
- * Expose the destination and edge fields but not the source field. (Same as DstAndEdge)
- */
- public static final TripletFields Dst = DstAndEdge;
+ public static final TripletFields Dst = new TripletFields(false, true, true);
/**
* Expose all the fields (source, edge, and destination).
http://git-wip-us.apache.org/repos/asf/spark/blob/288ce583/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
index e40ae0d..e139959 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
@@ -85,7 +85,7 @@ object PageRank extends Logging {
// Associate the degree with each vertex
.outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => deg.getOrElse(0) }
// Set the weight on the edges based on the degree
- .mapTriplets( e => 1.0 / e.srcAttr, TripletFields.SrcOnly )
+ .mapTriplets( e => 1.0 / e.srcAttr, TripletFields.Src )
// Set the vertex attributes to the initial pagerank values
.mapVertices( (id, attr) => resetProb )
@@ -97,7 +97,7 @@ object PageRank extends Logging {
// Compute the outgoing rank contributions of each vertex, perform local preaggregation, and
// do the final aggregation at the receiving vertices. Requires a shuffle for aggregation.
val rankUpdates = rankGraph.aggregateMessages[Double](
- ctx => ctx.sendToDst(ctx.srcAttr * ctx.attr), _ + _, TripletFields.SrcAndEdge)
+ ctx => ctx.sendToDst(ctx.srcAttr * ctx.attr), _ + _, TripletFields.Src)
// Apply the final rank updates to get the new ranks, using join to preserve ranks of vertices
// that didn't receive a message. Requires a shuffle for broadcasting updated ranks to the
http://git-wip-us.apache.org/repos/asf/spark/blob/288ce583/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index df773db..a05d1dd 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -328,7 +328,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
"expected ctx.dstAttr to be null due to TripletFields, but it was " + ctx.dstAttr)
}
ctx.sendToDst(ctx.srcAttr)
- }, _ + _, TripletFields.SrcOnly)
+ }, _ + _, TripletFields.Src)
assert(agg.collect().toSet === (1 to n).map(x => (x: VertexId, "v")).toSet)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org