You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/06/03 04:20:35 UTC
git commit: Avoid dynamic dispatching when unwrapping Hive data.
Repository: spark
Updated Branches:
refs/heads/master ec8be274a -> 862283e9c
Avoid dynamic dispatching when unwrapping Hive data.
This is a follow up of PR #758.
The `unwrapHiveData` function is now composed statically before actual rows are scanned according to the field object inspector to avoid dynamic dispatching cost.
According to the same micro benchmark used in PR #758, this simple change brings slight performance boost: 2.5% for CSV table and 1% for RCFile table.
```
Optimized version:
CSV: 6870 ms, RCFile: 5687 ms
CSV: 6832 ms, RCFile: 5800 ms
CSV: 6822 ms, RCFile: 5679 ms
CSV: 6704 ms, RCFile: 5758 ms
CSV: 6819 ms, RCFile: 5725 ms
Original version:
CSV: 7042 ms, RCFile: 5667 ms
CSV: 6883 ms, RCFile: 5703 ms
CSV: 7115 ms, RCFile: 5665 ms
CSV: 7020 ms, RCFile: 5981 ms
CSV: 6871 ms, RCFile: 5906 ms
```
Author: Cheng Lian <li...@gmail.com>
Closes #935 from liancheng/staticUnwrapping and squashes the following commits:
c49c70c [Cheng Lian] Avoid dynamic dispatching when unwrapping Hive data.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/862283e9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/862283e9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/862283e9
Branch: refs/heads/master
Commit: 862283e9ccace6824880aa4e161723fb3248d438
Parents: ec8be27
Author: Cheng Lian <li...@gmail.com>
Authored: Mon Jun 2 19:20:23 2014 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Mon Jun 2 19:20:23 2014 -0700
----------------------------------------------------------------------
.../apache/spark/sql/hive/hiveOperators.scala | 28 +++++++++++---------
.../sql/hive/execution/HiveComparisonTest.scala | 5 ++--
2 files changed, 18 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/862283e9/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala
index d263c31..29b4b9b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala
@@ -26,8 +26,7 @@ import org.apache.hadoop.hive.ql.plan.{TableDesc, FileSinkDesc}
import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
import org.apache.hadoop.hive.serde2.objectinspector._
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveDecimalObjectInspector
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveVarcharObjectInspector
+import org.apache.hadoop.hive.serde2.objectinspector.primitive._
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
import org.apache.hadoop.hive.serde2.{ColumnProjectionUtils, Serializer}
import org.apache.hadoop.io.Writable
@@ -95,29 +94,34 @@ case class HiveTableScan(
attributes.map { a =>
val ordinal = relation.partitionKeys.indexOf(a)
if (ordinal >= 0) {
+ val dataType = relation.partitionKeys(ordinal).dataType
(_: Any, partitionKeys: Array[String]) => {
- val value = partitionKeys(ordinal)
- val dataType = relation.partitionKeys(ordinal).dataType
- unwrapHiveData(castFromString(value, dataType))
+ castFromString(partitionKeys(ordinal), dataType)
}
} else {
val ref = objectInspector.getAllStructFieldRefs
.find(_.getFieldName == a.name)
.getOrElse(sys.error(s"Can't find attribute $a"))
+ val fieldObjectInspector = ref.getFieldObjectInspector
+
+ val unwrapHiveData = fieldObjectInspector match {
+ case _: HiveVarcharObjectInspector =>
+ (value: Any) => value.asInstanceOf[HiveVarchar].getValue
+ case _: HiveDecimalObjectInspector =>
+ (value: Any) => BigDecimal(value.asInstanceOf[HiveDecimal].bigDecimalValue())
+ case _ =>
+ identity[Any] _
+ }
+
(row: Any, _: Array[String]) => {
val data = objectInspector.getStructFieldData(row, ref)
- unwrapHiveData(unwrapData(data, ref.getFieldObjectInspector))
+ val hiveData = unwrapData(data, fieldObjectInspector)
+ if (hiveData != null) unwrapHiveData(hiveData) else null
}
}
}
}
- private def unwrapHiveData(value: Any) = value match {
- case varchar: HiveVarchar => varchar.getValue
- case decimal: HiveDecimal => BigDecimal(decimal.bigDecimalValue)
- case other => other
- }
-
private def castFromString(value: String, dataType: DataType) = {
Cast(Literal(value), dataType).eval(null)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/862283e9/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index 1b5a132..0f95410 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -133,15 +133,14 @@ abstract class HiveComparisonTest
def isSorted(plan: LogicalPlan): Boolean = plan match {
case _: Join | _: Aggregate | _: BaseRelation | _: Generate | _: Sample | _: Distinct => false
case PhysicalOperation(_, _, Sort(_, _)) => true
- case _ => plan.children.iterator.map(isSorted).exists(_ == true)
+ case _ => plan.children.iterator.exists(isSorted)
}
val orderedAnswer = hiveQuery.logical match {
// Clean out non-deterministic time schema info.
case _: NativeCommand => answer.filterNot(nonDeterministicLine).filterNot(_ == "")
case _: ExplainCommand => answer
- case plan if isSorted(plan) => answer
- case _ => answer.sorted
+ case plan => if (isSorted(plan)) answer else answer.sorted
}
orderedAnswer.map(cleanPaths)
}