You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/05/27 03:18:04 UTC

spark git commit: [SPARK-7637] [SQL] O(N) merge implementation for StructType merge

Repository: spark
Updated Branches:
  refs/heads/master 0463428b6 -> 03668348e


[SPARK-7637] [SQL] O(N) merge implementation for StructType merge

Contribution is my original work and I license the work to the project under the projects open source license.

Author: rowan <ro...@googlemail.com>

Closes #6259 from rowan000/SPARK-7637 and squashes the following commits:

c479df4 [rowan] SPARK-7637: rename mapFields to fieldsMap as per comments on github.
8d2e419 [rowan] SPARK-7637: fix up whitespace changes
0e9d662 [rowan] SPARK-7637: O(N) merge implementatio for StructType merge


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/03668348
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/03668348
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/03668348

Branch: refs/heads/master
Commit: 03668348e29eb52c1a7d57a1e0ed7fca6c323890
Parents: 0463428
Author: rowan <ro...@googlemail.com>
Authored: Tue May 26 18:17:16 2015 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Tue May 26 18:17:16 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/types/StructType.scala | 12 +++-
 .../apache/spark/sql/types/DataTypeSuite.scala  | 73 +++++++++++++++++++-
 2 files changed, 81 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/03668348/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
index 7e00a27..a4f30c8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
@@ -230,10 +230,10 @@ object StructType {
       case (StructType(leftFields), StructType(rightFields)) =>
         val newFields = ArrayBuffer.empty[StructField]
 
+        val rightMapped = fieldsMap(rightFields)
         leftFields.foreach {
           case leftField @ StructField(leftName, leftType, leftNullable, _) =>
-            rightFields
-              .find(_.name == leftName)
+            rightMapped.get(leftName)
               .map { case rightField @ StructField(_, rightType, rightNullable, _) =>
               leftField.copy(
                 dataType = merge(leftType, rightType),
@@ -243,8 +243,9 @@ object StructType {
               .foreach(newFields += _)
         }
 
+        val leftMapped = fieldsMap(leftFields)
         rightFields
-          .filterNot(f => leftFields.map(_.name).contains(f.name))
+          .filterNot(f => leftMapped.get(f.name).nonEmpty)
           .foreach(newFields += _)
 
         StructType(newFields)
@@ -264,4 +265,9 @@ object StructType {
       case _ =>
         throw new SparkException(s"Failed to merge incompatible data types $left and $right")
     }
+  
+  private[sql] def fieldsMap(fields: Array[StructField]): Map[String, StructField] = {
+    import scala.collection.breakOut
+    fields.map(s => (s.name, s))(breakOut)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/03668348/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala
index d797510..a73317c 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.types
 
+import org.apache.spark.SparkException
 import org.scalatest.FunSuite
 
 class DataTypeSuite extends FunSuite {
@@ -69,6 +70,76 @@ class DataTypeSuite extends FunSuite {
     }
   }
 
+  test("fieldsMap returns map of name to StructField") {
+    val struct = StructType(
+      StructField("a", LongType) :: 
+      StructField("b", FloatType) :: Nil)
+
+    val mapped = StructType.fieldsMap(struct.fields)
+
+    val expected = Map(
+      "a" -> StructField("a", LongType),
+      "b" -> StructField("b", FloatType))
+
+    assert(mapped === expected)
+  }
+
+  test("merge where right is empty") {
+    val left = StructType(
+      StructField("a", LongType) ::
+      StructField("b", FloatType) :: Nil)
+
+    val right = StructType(List())
+    val merged = left.merge(right)
+    
+    assert(merged === left)
+  }
+
+  test("merge where left is empty") {
+
+    val left = StructType(List())
+
+    val right = StructType(
+      StructField("a", LongType) ::
+      StructField("b", FloatType) :: Nil)
+
+    val merged = left.merge(right)
+
+    assert(right === merged)
+
+  }
+
+  test("merge where both are non-empty") {
+    val left = StructType(
+      StructField("a", LongType) ::
+      StructField("b", FloatType) :: Nil)
+
+    val right = StructType(
+      StructField("c", LongType) :: Nil)
+
+    val expected = StructType(
+      StructField("a", LongType) ::
+      StructField("b", FloatType) ::
+      StructField("c", LongType) :: Nil)
+
+    val merged = left.merge(right)
+
+    assert(merged === expected)
+  }
+
+  test("merge where right contains type conflict") {
+    val left = StructType(
+      StructField("a", LongType) ::
+      StructField("b", FloatType) :: Nil)
+
+    val right = StructType(
+      StructField("b", LongType) :: Nil)
+    
+    intercept[SparkException] {
+      left.merge(right)
+    }
+  }
+
   def checkDataTypeJsonRepr(dataType: DataType): Unit = {
     test(s"JSON - $dataType") {
       assert(DataType.fromJson(dataType.json) === dataType)
@@ -120,7 +191,7 @@ class DataTypeSuite extends FunSuite {
   checkDefaultSize(DecimalType(10, 5), 4096)
   checkDefaultSize(DecimalType.Unlimited, 4096)
   checkDefaultSize(DateType, 4)
-  checkDefaultSize(TimestampType,12)
+  checkDefaultSize(TimestampType, 12)
   checkDefaultSize(StringType, 4096)
   checkDefaultSize(BinaryType, 4096)
   checkDefaultSize(ArrayType(DoubleType, true), 800)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org