You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2016/10/23 03:43:53 UTC

spark git commit: [SPARK-18035][SQL] Introduce performant and memory efficient APIs to create ArrayBasedMapData

Repository: spark
Updated Branches:
  refs/heads/master bc167a2a5 -> eff4aed1a


[SPARK-18035][SQL] Introduce performant and memory efficient APIs to create ArrayBasedMapData

## What changes were proposed in this pull request?

Jira: https://issues.apache.org/jira/browse/SPARK-18035

In HiveInspectors, I saw that converting Java map to Spark's `ArrayBasedMapData` spent quite sometime in buffer copying : https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala#L658

The reason being `map.toSeq` allocates a new buffer and copies the map entries to it: https://github.com/scala/scala/blob/2.11.x/src/library/scala/collection/MapLike.scala#L323

This copy is not needed as we get rid of it once we extract the key and value arrays.

Here is the call trace:

```
org.apache.spark.sql.hive.HiveInspectors$$anonfun$unwrapperFor$41.apply(HiveInspectors.scala:664)
scala.collection.AbstractMap.toSeq(Map.scala:59)
scala.collection.MapLike$class.toSeq(MapLike.scala:323)
scala.collection.AbstractMap.toBuffer(Map.scala:59)
scala.collection.MapLike$class.toBuffer(MapLike.scala:326)
scala.collection.AbstractTraversable.copyToBuffer(Traversable.scala:104)
scala.collection.TraversableOnce$class.copyToBuffer(TraversableOnce.scala:275)
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
scala.collection.AbstractIterable.foreach(Iterable.scala:54)
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
scala.collection.Iterator$class.foreach(Iterator.scala:893)
scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:59)
scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:59)
```

Also, earlier code was populating keys and values arrays separately by iterating twice. The PR avoids double iteration of the map and does it in one iteration.

EDIT: During code review, there were several more places in the code which were found to do similar thing. The PR dedupes those instances and introduces convenient APIs which are performant and memory efficient

## Performance gains

The number is subjective and depends on how many map columns are accessed in the query and average entries per map. For one the queries that I tried out, I saw 3% CPU savings (end-to-end) for the query.

## How was this patch tested?

This does not change the end result produced so relying on existing tests.

Author: Tejas Patil <te...@fb.com>

Closes #15573 from tejasapatil/SPARK-18035_avoid_toSeq.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eff4aed1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eff4aed1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eff4aed1

Branch: refs/heads/master
Commit: eff4aed1ac1e500d4aa40665dd06b527dffbc111
Parents: bc167a2
Author: Tejas Patil <te...@fb.com>
Authored: Sat Oct 22 20:43:43 2016 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Sat Oct 22 20:43:43 2016 -0700

----------------------------------------------------------------------
 .../sql/catalyst/CatalystTypeConverters.scala   | 53 ++++---------
 .../expressions/complexTypeCreator.scala        | 32 +++++---
 .../sql/catalyst/util/ArrayBasedMapData.scala   | 81 +++++++++++++++++++-
 .../sql/execution/python/EvaluatePython.scala   | 10 +--
 .../apache/spark/sql/hive/HiveInspectors.scala  | 11 +--
 5 files changed, 119 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/eff4aed1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
index f542f5c..5b91615 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
@@ -199,34 +199,14 @@ object CatalystTypeConverters {
     private[this] val keyConverter = getConverterForType(keyType)
     private[this] val valueConverter = getConverterForType(valueType)
 
-    override def toCatalystImpl(scalaValue: Any): MapData = scalaValue match {
-      case m: Map[_, _] =>
-        val length = m.size
-        val convertedKeys = new Array[Any](length)
-        val convertedValues = new Array[Any](length)
-
-        var i = 0
-        for ((key, value) <- m) {
-          convertedKeys(i) = keyConverter.toCatalyst(key)
-          convertedValues(i) = valueConverter.toCatalyst(value)
-          i += 1
-        }
-        ArrayBasedMapData(convertedKeys, convertedValues)
-
-      case jmap: JavaMap[_, _] =>
-        val length = jmap.size()
-        val convertedKeys = new Array[Any](length)
-        val convertedValues = new Array[Any](length)
-
-        var i = 0
-        val iter = jmap.entrySet.iterator
-        while (iter.hasNext) {
-          val entry = iter.next()
-          convertedKeys(i) = keyConverter.toCatalyst(entry.getKey)
-          convertedValues(i) = valueConverter.toCatalyst(entry.getValue)
-          i += 1
-        }
-        ArrayBasedMapData(convertedKeys, convertedValues)
+    override def toCatalystImpl(scalaValue: Any): MapData = {
+      val keyFunction = (k: Any) => keyConverter.toCatalyst(k)
+      val valueFunction = (k: Any) => valueConverter.toCatalyst(k)
+
+      scalaValue match {
+        case map: Map[_, _] => ArrayBasedMapData(map, keyFunction, valueFunction)
+        case javaMap: JavaMap[_, _] => ArrayBasedMapData(javaMap, keyFunction, valueFunction)
+      }
     }
 
     override def toScala(catalystValue: MapData): Map[Any, Any] = {
@@ -433,18 +413,11 @@ object CatalystTypeConverters {
     case seq: Seq[Any] => new GenericArrayData(seq.map(convertToCatalyst).toArray)
     case r: Row => InternalRow(r.toSeq.map(convertToCatalyst): _*)
     case arr: Array[Any] => new GenericArrayData(arr.map(convertToCatalyst))
-    case m: Map[_, _] =>
-      val length = m.size
-      val convertedKeys = new Array[Any](length)
-      val convertedValues = new Array[Any](length)
-
-      var i = 0
-      for ((key, value) <- m) {
-        convertedKeys(i) = convertToCatalyst(key)
-        convertedValues(i) = convertToCatalyst(value)
-        i += 1
-      }
-      ArrayBasedMapData(convertedKeys, convertedValues)
+    case map: Map[_, _] =>
+      ArrayBasedMapData(
+        map,
+        (key: Any) => convertToCatalyst(key),
+        (value: Any) => convertToCatalyst(value))
     case other => other
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/eff4aed1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
index 09e22aa..917aa08 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
@@ -427,18 +427,28 @@ case class StringToMap(text: Expression, pairDelim: Expression, keyValueDelim: E
     }
   }
 
-  override def nullSafeEval(str: Any, delim1: Any, delim2: Any): Any = {
-    val array = str.asInstanceOf[UTF8String]
-      .split(delim1.asInstanceOf[UTF8String], -1)
-      .map { kv =>
-        val arr = kv.split(delim2.asInstanceOf[UTF8String], 2)
-        if (arr.length < 2) {
-          Array(arr(0), null)
-        } else {
-          arr
-        }
+  override def nullSafeEval(
+      inputString: Any,
+      stringDelimiter: Any,
+      keyValueDelimiter: Any): Any = {
+    val keyValues =
+      inputString.asInstanceOf[UTF8String].split(stringDelimiter.asInstanceOf[UTF8String], -1)
+
+    val iterator = new Iterator[(UTF8String, UTF8String)] {
+      var index = 0
+      val keyValueDelimiterUTF8String = keyValueDelimiter.asInstanceOf[UTF8String]
+
+      override def hasNext: Boolean = {
+        keyValues.length > index
       }
-    ArrayBasedMapData(array.map(_ (0)), array.map(_ (1)))
+
+      override def next(): (UTF8String, UTF8String) = {
+        val keyValueArray = keyValues(index).split(keyValueDelimiterUTF8String, 2)
+        index += 1
+        (keyValueArray(0), if (keyValueArray.length < 2) null else keyValueArray(1))
+      }
+    }
+    ArrayBasedMapData(iterator, keyValues.size, identity, identity)
   }
 
   override def prettyName: String = "str_to_map"

http://git-wip-us.apache.org/repos/asf/spark/blob/eff4aed1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapData.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapData.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapData.scala
index 4449da1..91b3139 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapData.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapData.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.catalyst.util
 
+import java.util.{Map => JavaMap}
+
 class ArrayBasedMapData(val keyArray: ArrayData, val valueArray: ArrayData) extends MapData {
   require(keyArray.numElements() == valueArray.numElements())
 
@@ -30,12 +32,83 @@ class ArrayBasedMapData(val keyArray: ArrayData, val valueArray: ArrayData) exte
 }
 
 object ArrayBasedMapData {
-  def apply(map: Map[Any, Any]): ArrayBasedMapData = {
-    val array = map.toArray
-    ArrayBasedMapData(array.map(_._1), array.map(_._2))
+  /**
+   * Creates a [[ArrayBasedMapData]] by applying the given converters over
+   * each (key -> value) pair of the input [[java.util.Map]]
+   *
+   * @param javaMap Input map
+   * @param keyConverter This function is applied over all the keys of the input map to
+   *                     obtain the output map's keys
+   * @param valueConverter This function is applied over all the values of the input map to
+   *                       obtain the output map's values
+   */
+  def apply(
+      javaMap: JavaMap[_, _],
+      keyConverter: (Any) => Any,
+      valueConverter: (Any) => Any): ArrayBasedMapData = {
+    import scala.language.existentials
+
+    val keys: Array[Any] = new Array[Any](javaMap.size())
+    val values: Array[Any] = new Array[Any](javaMap.size())
+
+    var i: Int = 0
+    val iterator = javaMap.entrySet().iterator()
+    while (iterator.hasNext) {
+      val entry = iterator.next()
+      keys(i) = keyConverter(entry.getKey)
+      values(i) = valueConverter(entry.getValue)
+      i += 1
+    }
+    ArrayBasedMapData(keys, values)
+  }
+
+  /**
+   * Creates a [[ArrayBasedMapData]] by applying the given converters over
+   * each (key -> value) pair of the input map
+   *
+   * @param map Input map
+   * @param keyConverter This function is applied over all the keys of the input map to
+   *                     obtain the output map's keys
+   * @param valueConverter This function is applied over all the values of the input map to
+   *                       obtain the output map's values
+   */
+  def apply(
+      map: scala.collection.Map[_, _],
+      keyConverter: (Any) => Any = identity,
+      valueConverter: (Any) => Any = identity): ArrayBasedMapData = {
+    ArrayBasedMapData(map.iterator, map.size, keyConverter, valueConverter)
+  }
+
+  /**
+   * Creates a [[ArrayBasedMapData]] by applying the given converters over
+   * each (key -> value) pair from the given iterator
+   *
+   * @param iterator Input iterator
+   * @param size Number of elements
+   * @param keyConverter This function is applied over all the keys extracted from the
+   *                     given iterator to obtain the output map's keys
+   * @param valueConverter This function is applied over all the values extracted from the
+   *                       given iterator to obtain the output map's values
+   */
+  def apply(
+      iterator: Iterator[(_, _)],
+      size: Int,
+      keyConverter: (Any) => Any,
+      valueConverter: (Any) => Any): ArrayBasedMapData = {
+
+    val keys: Array[Any] = new Array[Any](size)
+    val values: Array[Any] = new Array[Any](size)
+
+    var i = 0
+    for ((key, value) <- iterator) {
+      keys(i) = keyConverter(key)
+      values(i) = valueConverter(value)
+      i += 1
+    }
+    ArrayBasedMapData(keys, values)
   }
 
-  def apply(keys: Array[Any], values: Array[Any]): ArrayBasedMapData = {
+  def apply(keys: Array[_], values: Array[_]): ArrayBasedMapData = {
     new ArrayBasedMapData(new GenericArrayData(keys), new GenericArrayData(values))
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/eff4aed1/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
index 724025b..46fd54e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
@@ -124,11 +124,11 @@ object EvaluatePython {
     case (c, ArrayType(elementType, _)) if c.getClass.isArray =>
       new GenericArrayData(c.asInstanceOf[Array[_]].map(e => fromJava(e, elementType)))
 
-    case (c: java.util.Map[_, _], MapType(keyType, valueType, _)) =>
-      val keyValues = c.asScala.toSeq
-      val keys = keyValues.map(kv => fromJava(kv._1, keyType)).toArray
-      val values = keyValues.map(kv => fromJava(kv._2, valueType)).toArray
-      ArrayBasedMapData(keys, values)
+    case (javaMap: java.util.Map[_, _], MapType(keyType, valueType, _)) =>
+      ArrayBasedMapData(
+        javaMap,
+        (key: Any) => fromJava(key, keyType),
+        (value: Any) => fromJava(value, valueType))
 
     case (c, StructType(fields)) if c.getClass.isArray =>
       val array = c.asInstanceOf[Array[_]]

http://git-wip-us.apache.org/repos/asf/spark/blob/eff4aed1/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
index 1625116..e303065 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
@@ -473,10 +473,8 @@ private[hive] trait HiveInspectors {
       case mi: StandardConstantMapObjectInspector =>
         val keyUnwrapper = unwrapperFor(mi.getMapKeyObjectInspector)
         val valueUnwrapper = unwrapperFor(mi.getMapValueObjectInspector)
-        val keyValues = mi.getWritableConstantValue.asScala.toSeq
-        val keys = keyValues.map(kv => keyUnwrapper(kv._1)).toArray
-        val values = keyValues.map(kv => valueUnwrapper(kv._2)).toArray
-        val constant = ArrayBasedMapData(keys, values)
+        val keyValues = mi.getWritableConstantValue
+        val constant = ArrayBasedMapData(keyValues, keyUnwrapper, valueUnwrapper)
         _ => constant
       case li: StandardConstantListObjectInspector =>
         val unwrapper = unwrapperFor(li.getListElementObjectInspector)
@@ -655,10 +653,7 @@ private[hive] trait HiveInspectors {
             if (map == null) {
               null
             } else {
-              val keyValues = map.asScala.toSeq
-              val keys = keyValues.map(kv => keyUnwrapper(kv._1)).toArray
-              val values = keyValues.map(kv => valueUnwrapper(kv._2)).toArray
-              ArrayBasedMapData(keys, values)
+              ArrayBasedMapData(map, keyUnwrapper, valueUnwrapper)
             }
           } else {
             null


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org