You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2014/09/04 08:50:05 UTC
git commit: [HOTFIX] [SPARK-3400] Revert 9b225ac "fix GraphX EdgeRDD
zipPartitions"
Repository: spark
Updated Branches:
refs/heads/master 1bed0a386 -> 00362dac9
[HOTFIX] [SPARK-3400] Revert 9b225ac "fix GraphX EdgeRDD zipPartitions"
9b225ac3072de522b40b46aba6df1f1c231f13ef has been causing GraphX tests
to fail nondeterministically, which is blocking development for others.
Author: Ankur Dave <an...@gmail.com>
Closes #2271 from ankurdave/SPARK-3400 and squashes the following commits:
10c2a97 [Ankur Dave] [HOTFIX] [SPARK-3400] Revert 9b225ac "fix GraphX EdgeRDD zipPartitions"
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/00362dac
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/00362dac
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/00362dac
Branch: refs/heads/master
Commit: 00362dac976cd05b06638deb11d990d612429e0b
Parents: 1bed0a3
Author: Ankur Dave <an...@gmail.com>
Authored: Wed Sep 3 23:49:47 2014 -0700
Committer: Ankur Dave <an...@gmail.com>
Committed: Wed Sep 3 23:49:47 2014 -0700
----------------------------------------------------------------------
.../scala/org/apache/spark/graphx/EdgeRDD.scala | 4 ++--
.../scala/org/apache/spark/graphx/GraphSuite.scala | 16 ----------------
2 files changed, 2 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/00362dac/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
index 35fbd47..5bcb96b 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
@@ -19,7 +19,7 @@ package org.apache.spark.graphx
import scala.reflect.{classTag, ClassTag}
-import org.apache.spark._
+import org.apache.spark.{OneToOneDependency, Partition, Partitioner, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
@@ -55,7 +55,7 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
* partitioner that allows co-partitioning with `partitionsRDD`.
*/
override val partitioner =
- partitionsRDD.partitioner.orElse(Some(new HashPartitioner(partitionsRDD.partitions.size)))
+ partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))
override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = {
val p = firstParent[(PartitionID, EdgePartition[ED, VD])].iterator(part, context)
http://git-wip-us.apache.org/repos/asf/spark/blob/00362dac/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index eaaa449..6506bac 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -19,7 +19,6 @@ package org.apache.spark.graphx
import org.scalatest.FunSuite
-import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.graphx.Graph._
import org.apache.spark.graphx.PartitionStrategy._
@@ -351,19 +350,4 @@ class GraphSuite extends FunSuite with LocalSparkContext {
}
}
- test("non-default number of edge partitions") {
- val n = 10
- val defaultParallelism = 3
- val numEdgePartitions = 4
- assert(defaultParallelism != numEdgePartitions)
- val conf = new SparkConf()
- .set("spark.default.parallelism", defaultParallelism.toString)
- val sc = new SparkContext("local", "test", conf)
- val edges = sc.parallelize((1 to n).map(x => (x: VertexId, 0: VertexId)),
- numEdgePartitions)
- val graph = Graph.fromEdgeTuples(edges, 1)
- val neighborAttrSums = graph.mapReduceTriplets[Int](
- et => Iterator((et.dstId, et.srcAttr)), _ + _)
- assert(neighborAttrSums.collect.toSet === Set((0: VertexId, n)))
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org