You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/05/12 19:49:17 UTC
git commit: Revert "SPARK-1786: Edge Partition Serialization"
Repository: spark
Updated Branches:
refs/heads/master a6b02fb74 -> af15c82bf
Revert "SPARK-1786: Edge Partition Serialization"
This reverts commit a6b02fb7486356493474c7f42bb714c9cce215ca.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/af15c82b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/af15c82b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/af15c82b
Branch: refs/heads/master
Commit: af15c82bfe2c3f73142b8f310784a0e85841539d
Parents: a6b02fb
Author: Patrick Wendell <pw...@gmail.com>
Authored: Mon May 12 10:49:03 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Mon May 12 10:49:03 2014 -0700
----------------------------------------------------------------------
.../spark/graphx/GraphKryoRegistrator.scala | 9 +-
.../spark/graphx/impl/EdgePartition.scala | 14 +-
.../graphx/impl/EdgePartitionBuilder.scala | 4 +-
.../spark/graphx/impl/EdgeTripletIterator.scala | 2 +-
.../graphx/impl/RoutingTablePartition.scala | 4 +-
.../graphx/impl/ShippableVertexPartition.scala | 2 +-
.../spark/graphx/impl/VertexPartition.scala | 2 +-
.../spark/graphx/impl/VertexPartitionBase.scala | 6 +-
.../graphx/impl/VertexPartitionBaseOps.scala | 4 +-
.../GraphXPrimitiveKeyOpenHashMap.scala | 153 -------------------
.../collection/PrimitiveKeyOpenHashMap.scala | 153 +++++++++++++++++++
.../spark/graphx/impl/EdgePartitionSuite.scala | 18 ---
12 files changed, 175 insertions(+), 196 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/af15c82b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala
index f97f329..d295d01 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala
@@ -24,9 +24,6 @@ import org.apache.spark.util.BoundedPriorityQueue
import org.apache.spark.util.collection.BitSet
import org.apache.spark.graphx.impl._
-import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
-import org.apache.spark.util.collection.OpenHashSet
-
/**
* Registers GraphX classes with Kryo for improved performance.
@@ -46,8 +43,8 @@ class GraphKryoRegistrator extends KryoRegistrator {
kryo.register(classOf[PartitionStrategy])
kryo.register(classOf[BoundedPriorityQueue[Object]])
kryo.register(classOf[EdgeDirection])
- kryo.register(classOf[GraphXPrimitiveKeyOpenHashMap[VertexId, Int]])
- kryo.register(classOf[OpenHashSet[Int]])
- kryo.register(classOf[OpenHashSet[Long]])
+
+ // This avoids a large number of hash table lookups.
+ kryo.setReferences(false)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/af15c82b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
index a5c9cd1..871e81f 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
@@ -20,7 +20,7 @@ package org.apache.spark.graphx.impl
import scala.reflect.{classTag, ClassTag}
import org.apache.spark.graphx._
-import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
+import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
/**
* A collection of edges stored in columnar format, along with any vertex attributes referenced. The
@@ -42,12 +42,12 @@ import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
private[graphx]
class EdgePartition[
@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag, VD: ClassTag](
- val srcIds: Array[VertexId] = null,
- val dstIds: Array[VertexId] = null,
- val data: Array[ED] = null,
- val index: GraphXPrimitiveKeyOpenHashMap[VertexId, Int] = null,
- val vertices: VertexPartition[VD] = null,
- val activeSet: Option[VertexSet] = None
+ @transient val srcIds: Array[VertexId],
+ @transient val dstIds: Array[VertexId],
+ @transient val data: Array[ED],
+ @transient val index: PrimitiveKeyOpenHashMap[VertexId, Int],
+ @transient val vertices: VertexPartition[VD],
+ @transient val activeSet: Option[VertexSet] = None
) extends Serializable {
/** Return a new `EdgePartition` with the specified edge data. */
http://git-wip-us.apache.org/repos/asf/spark/blob/af15c82b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
index 4520beb..ecb49be 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
@@ -23,7 +23,7 @@ import scala.util.Sorting
import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveVector}
import org.apache.spark.graphx._
-import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
+import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
private[graphx]
class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag, VD: ClassTag](
@@ -41,7 +41,7 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag, VD: Cla
val srcIds = new Array[VertexId](edgeArray.size)
val dstIds = new Array[VertexId](edgeArray.size)
val data = new Array[ED](edgeArray.size)
- val index = new GraphXPrimitiveKeyOpenHashMap[VertexId, Int]
+ val index = new PrimitiveKeyOpenHashMap[VertexId, Int]
// Copy edges into columnar structures, tracking the beginnings of source vertex id clusters and
// adding them to the index
if (edgeArray.length > 0) {
http://git-wip-us.apache.org/repos/asf/spark/blob/af15c82b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
index 56f79a7..ebb0b94 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
@@ -20,7 +20,7 @@ package org.apache.spark.graphx.impl
import scala.reflect.ClassTag
import org.apache.spark.graphx._
-import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
+import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
/**
* The Iterator type returned when constructing edge triplets. This could be an anonymous class in
http://git-wip-us.apache.org/repos/asf/spark/blob/af15c82b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala
index d02e923..927e32a 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala
@@ -25,7 +25,7 @@ import org.apache.spark.rdd.ShuffledRDD
import org.apache.spark.util.collection.{BitSet, PrimitiveVector}
import org.apache.spark.graphx._
-import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
+import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
/**
* A message from the edge partition `pid` to the vertex partition containing `vid` specifying that
@@ -69,7 +69,7 @@ object RoutingTablePartition {
: Iterator[RoutingTableMessage] = {
// Determine which positions each vertex id appears in using a map where the low 2 bits
// represent src and dst
- val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, Byte]
+ val map = new PrimitiveKeyOpenHashMap[VertexId, Byte]
edgePartition.srcIds.iterator.foreach { srcId =>
map.changeValue(srcId, 0x1, (b: Byte) => (b | 0x1).toByte)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/af15c82b/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala
index dca54b8..f4e221d 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala
@@ -22,7 +22,7 @@ import scala.reflect.ClassTag
import org.apache.spark.util.collection.{BitSet, PrimitiveVector}
import org.apache.spark.graphx._
-import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
+import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
/** Stores vertex attributes to ship to an edge partition. */
private[graphx]
http://git-wip-us.apache.org/repos/asf/spark/blob/af15c82b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala
index 55c7a19..f1d1747 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala
@@ -22,7 +22,7 @@ import scala.reflect.ClassTag
import org.apache.spark.util.collection.BitSet
import org.apache.spark.graphx._
-import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
+import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
private[graphx] object VertexPartition {
/** Construct a `VertexPartition` from the given vertices. */
http://git-wip-us.apache.org/repos/asf/spark/blob/af15c82b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBase.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBase.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBase.scala
index 34939b2..8d9e020 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBase.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBase.scala
@@ -23,7 +23,7 @@ import scala.reflect.ClassTag
import org.apache.spark.util.collection.BitSet
import org.apache.spark.graphx._
-import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
+import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
private[graphx] object VertexPartitionBase {
/**
@@ -32,7 +32,7 @@ private[graphx] object VertexPartitionBase {
*/
def initFrom[VD: ClassTag](iter: Iterator[(VertexId, VD)])
: (VertexIdToIndexMap, Array[VD], BitSet) = {
- val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, VD]
+ val map = new PrimitiveKeyOpenHashMap[VertexId, VD]
iter.foreach { pair =>
map(pair._1) = pair._2
}
@@ -45,7 +45,7 @@ private[graphx] object VertexPartitionBase {
*/
def initFrom[VD: ClassTag](iter: Iterator[(VertexId, VD)], mergeFunc: (VD, VD) => VD)
: (VertexIdToIndexMap, Array[VD], BitSet) = {
- val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, VD]
+ val map = new PrimitiveKeyOpenHashMap[VertexId, VD]
iter.foreach { pair =>
map.setMerge(pair._1, pair._2, mergeFunc)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/af15c82b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala
index a4f769b..21ff615 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala
@@ -25,7 +25,7 @@ import org.apache.spark.Logging
import org.apache.spark.util.collection.BitSet
import org.apache.spark.graphx._
-import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
+import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
/**
* An class containing additional operations for subclasses of VertexPartitionBase that provide
@@ -224,7 +224,7 @@ private[graphx] abstract class VertexPartitionBaseOps
* Construct a new VertexPartition whose index contains only the vertices in the mask.
*/
def reindex(): Self[VD] = {
- val hashMap = new GraphXPrimitiveKeyOpenHashMap[VertexId, VD]
+ val hashMap = new PrimitiveKeyOpenHashMap[VertexId, VD]
val arbitraryMerge = (a: VD, b: VD) => a
for ((k, v) <- self.iterator) {
hashMap.setMerge(k, v, arbitraryMerge)
http://git-wip-us.apache.org/repos/asf/spark/blob/af15c82b/graphx/src/main/scala/org/apache/spark/graphx/util/collection/GraphXPrimitiveKeyOpenHashMap.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/collection/GraphXPrimitiveKeyOpenHashMap.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/collection/GraphXPrimitiveKeyOpenHashMap.scala
deleted file mode 100644
index 57b01b6..0000000
--- a/graphx/src/main/scala/org/apache/spark/graphx/util/collection/GraphXPrimitiveKeyOpenHashMap.scala
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.graphx.util.collection
-
-import org.apache.spark.util.collection.OpenHashSet
-
-import scala.reflect._
-
-/**
- * A fast hash map implementation for primitive, non-null keys. This hash map supports
- * insertions and updates, but not deletions. This map is about an order of magnitude
- * faster than java.util.HashMap, while using much less space overhead.
- *
- * Under the hood, it uses our OpenHashSet implementation.
- */
-private[graphx]
-class GraphXPrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassTag,
- @specialized(Long, Int, Double) V: ClassTag](
- val keySet: OpenHashSet[K], var _values: Array[V])
- extends Iterable[(K, V)]
- with Serializable {
-
- /**
- * Allocate an OpenHashMap with a fixed initial capacity
- */
- def this(initialCapacity: Int) =
- this(new OpenHashSet[K](initialCapacity), new Array[V](initialCapacity))
-
- /**
- * Allocate an OpenHashMap with a default initial capacity, providing a true
- * no-argument constructor.
- */
- def this() = this(64)
-
- /**
- * Allocate an OpenHashMap with a fixed initial capacity
- */
- def this(keySet: OpenHashSet[K]) = this(keySet, new Array[V](keySet.capacity))
-
- require(classTag[K] == classTag[Long] || classTag[K] == classTag[Int])
-
- private var _oldValues: Array[V] = null
-
- override def size = keySet.size
-
- /** Get the value for a given key */
- def apply(k: K): V = {
- val pos = keySet.getPos(k)
- _values(pos)
- }
-
- /** Get the value for a given key, or returns elseValue if it doesn't exist. */
- def getOrElse(k: K, elseValue: V): V = {
- val pos = keySet.getPos(k)
- if (pos >= 0) _values(pos) else elseValue
- }
-
- /** Set the value for a key */
- def update(k: K, v: V) {
- val pos = keySet.addWithoutResize(k) & OpenHashSet.POSITION_MASK
- _values(pos) = v
- keySet.rehashIfNeeded(k, grow, move)
- _oldValues = null
- }
-
-
- /** Set the value for a key */
- def setMerge(k: K, v: V, mergeF: (V, V) => V) {
- val pos = keySet.addWithoutResize(k)
- val ind = pos & OpenHashSet.POSITION_MASK
- if ((pos & OpenHashSet.NONEXISTENCE_MASK) != 0) { // if first add
- _values(ind) = v
- } else {
- _values(ind) = mergeF(_values(ind), v)
- }
- keySet.rehashIfNeeded(k, grow, move)
- _oldValues = null
- }
-
-
- /**
- * If the key doesn't exist yet in the hash map, set its value to defaultValue; otherwise,
- * set its value to mergeValue(oldValue).
- *
- * @return the newly updated value.
- */
- def changeValue(k: K, defaultValue: => V, mergeValue: (V) => V): V = {
- val pos = keySet.addWithoutResize(k)
- if ((pos & OpenHashSet.NONEXISTENCE_MASK) != 0) {
- val newValue = defaultValue
- _values(pos & OpenHashSet.POSITION_MASK) = newValue
- keySet.rehashIfNeeded(k, grow, move)
- newValue
- } else {
- _values(pos) = mergeValue(_values(pos))
- _values(pos)
- }
- }
-
- override def iterator = new Iterator[(K, V)] {
- var pos = 0
- var nextPair: (K, V) = computeNextPair()
-
- /** Get the next value we should return from next(), or null if we're finished iterating */
- def computeNextPair(): (K, V) = {
- pos = keySet.nextPos(pos)
- if (pos >= 0) {
- val ret = (keySet.getValue(pos), _values(pos))
- pos += 1
- ret
- } else {
- null
- }
- }
-
- def hasNext = nextPair != null
-
- def next() = {
- val pair = nextPair
- nextPair = computeNextPair()
- pair
- }
- }
-
- // The following member variables are declared as protected instead of private for the
- // specialization to work (specialized class extends the unspecialized one and needs access
- // to the "private" variables).
- // They also should have been val's. We use var's because there is a Scala compiler bug that
- // would throw illegal access error at runtime if they are declared as val's.
- protected var grow = (newCapacity: Int) => {
- _oldValues = _values
- _values = new Array[V](newCapacity)
- }
-
- protected var move = (oldPos: Int, newPos: Int) => {
- _values(newPos) = _oldValues(oldPos)
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/af15c82b/graphx/src/main/scala/org/apache/spark/graphx/util/collection/PrimitiveKeyOpenHashMap.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/collection/PrimitiveKeyOpenHashMap.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/collection/PrimitiveKeyOpenHashMap.scala
new file mode 100644
index 0000000..7b02e2e
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/util/collection/PrimitiveKeyOpenHashMap.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx.util.collection
+
+import org.apache.spark.util.collection.OpenHashSet
+
+import scala.reflect._
+
+/**
+ * A fast hash map implementation for primitive, non-null keys. This hash map supports
+ * insertions and updates, but not deletions. This map is about an order of magnitude
+ * faster than java.util.HashMap, while using much less space overhead.
+ *
+ * Under the hood, it uses our OpenHashSet implementation.
+ */
+private[graphx]
+class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassTag,
+ @specialized(Long, Int, Double) V: ClassTag](
+ val keySet: OpenHashSet[K], var _values: Array[V])
+ extends Iterable[(K, V)]
+ with Serializable {
+
+ /**
+ * Allocate an OpenHashMap with a fixed initial capacity
+ */
+ def this(initialCapacity: Int) =
+ this(new OpenHashSet[K](initialCapacity), new Array[V](initialCapacity))
+
+ /**
+ * Allocate an OpenHashMap with a default initial capacity, providing a true
+ * no-argument constructor.
+ */
+ def this() = this(64)
+
+ /**
+ * Allocate an OpenHashMap with a fixed initial capacity
+ */
+ def this(keySet: OpenHashSet[K]) = this(keySet, new Array[V](keySet.capacity))
+
+ require(classTag[K] == classTag[Long] || classTag[K] == classTag[Int])
+
+ private var _oldValues: Array[V] = null
+
+ override def size = keySet.size
+
+ /** Get the value for a given key */
+ def apply(k: K): V = {
+ val pos = keySet.getPos(k)
+ _values(pos)
+ }
+
+ /** Get the value for a given key, or returns elseValue if it doesn't exist. */
+ def getOrElse(k: K, elseValue: V): V = {
+ val pos = keySet.getPos(k)
+ if (pos >= 0) _values(pos) else elseValue
+ }
+
+ /** Set the value for a key */
+ def update(k: K, v: V) {
+ val pos = keySet.addWithoutResize(k) & OpenHashSet.POSITION_MASK
+ _values(pos) = v
+ keySet.rehashIfNeeded(k, grow, move)
+ _oldValues = null
+ }
+
+
+ /** Set the value for a key */
+ def setMerge(k: K, v: V, mergeF: (V, V) => V) {
+ val pos = keySet.addWithoutResize(k)
+ val ind = pos & OpenHashSet.POSITION_MASK
+ if ((pos & OpenHashSet.NONEXISTENCE_MASK) != 0) { // if first add
+ _values(ind) = v
+ } else {
+ _values(ind) = mergeF(_values(ind), v)
+ }
+ keySet.rehashIfNeeded(k, grow, move)
+ _oldValues = null
+ }
+
+
+ /**
+ * If the key doesn't exist yet in the hash map, set its value to defaultValue; otherwise,
+ * set its value to mergeValue(oldValue).
+ *
+ * @return the newly updated value.
+ */
+ def changeValue(k: K, defaultValue: => V, mergeValue: (V) => V): V = {
+ val pos = keySet.addWithoutResize(k)
+ if ((pos & OpenHashSet.NONEXISTENCE_MASK) != 0) {
+ val newValue = defaultValue
+ _values(pos & OpenHashSet.POSITION_MASK) = newValue
+ keySet.rehashIfNeeded(k, grow, move)
+ newValue
+ } else {
+ _values(pos) = mergeValue(_values(pos))
+ _values(pos)
+ }
+ }
+
+ override def iterator = new Iterator[(K, V)] {
+ var pos = 0
+ var nextPair: (K, V) = computeNextPair()
+
+ /** Get the next value we should return from next(), or null if we're finished iterating */
+ def computeNextPair(): (K, V) = {
+ pos = keySet.nextPos(pos)
+ if (pos >= 0) {
+ val ret = (keySet.getValue(pos), _values(pos))
+ pos += 1
+ ret
+ } else {
+ null
+ }
+ }
+
+ def hasNext = nextPair != null
+
+ def next() = {
+ val pair = nextPair
+ nextPair = computeNextPair()
+ pair
+ }
+ }
+
+ // The following member variables are declared as protected instead of private for the
+ // specialization to work (specialized class extends the unspecialized one and needs access
+ // to the "private" variables).
+ // They also should have been val's. We use var's because there is a Scala compiler bug that
+ // would throw illegal access error at runtime if they are declared as val's.
+ protected var grow = (newCapacity: Int) => {
+ _oldValues = _values
+ _values = new Array[V](newCapacity)
+ }
+
+ protected var move = (oldPos: Int, newPos: Int) => {
+ _values(newPos) = _oldValues(oldPos)
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/af15c82b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
index 28fd112..d2e0c01 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
@@ -22,9 +22,6 @@ import scala.util.Random
import org.scalatest.FunSuite
-import org.apache.spark.SparkConf
-import org.apache.spark.serializer.KryoSerializer
-
import org.apache.spark.graphx._
class EdgePartitionSuite extends FunSuite {
@@ -123,19 +120,4 @@ class EdgePartitionSuite extends FunSuite {
assert(!ep.isActive(-1))
assert(ep.numActives == Some(2))
}
-
- test("Kryo serialization") {
- val aList = List((0, 1, 0), (1, 0, 0), (1, 2, 0), (5, 4, 0), (5, 5, 0))
- val a: EdgePartition[Int, Int] = makeEdgePartition(aList)
- val conf = new SparkConf()
- .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")
- val s = new KryoSerializer(conf).newInstance()
- val aSer: EdgePartition[Int, Int] = s.deserialize(s.serialize(a))
- assert(aSer.srcIds.toList === a.srcIds.toList)
- assert(aSer.dstIds.toList === a.dstIds.toList)
- assert(aSer.data.toList === a.data.toList)
- assert(aSer.index != null)
- assert(aSer.vertices.iterator.toSet === a.vertices.iterator.toSet)
- }
}