You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "SaschaC (JIRA)" <ji...@apache.org> on 2017/03/25 17:52:42 UTC

[jira] [Commented] (SPARK-17670) Spark DataFrame/Dataset no longer supports Option[Map] in case classes

    [ https://issues.apache.org/jira/browse/SPARK-17670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15941820#comment-15941820 ] 

SaschaC commented on SPARK-17670:
---------------------------------

I  can confirm that this bug is for real and it is a major issue.

Depending on how I varied my case classes I sometimes did NOT get a mismatch on Maps but instead of Lists. I used avrohugger to produce case classes from avro schemas. I have case classes that extend from SpecificRecordBase and varied whether the case classes would use Array, immutable.List as well as Map, collections.Map or collections.immutable.Map. The collection types both lists and maps are optional and sometimes spark complains a mismatch on lists but accepts the maps or vice versa, but I could never get spark to accept my schema, regardless of which case class varient I tried.

> Spark DataFrame/Dataset no longer supports Option[Map] in case classes
> ----------------------------------------------------------------------
>
>                 Key: SPARK-17670
>                 URL: https://issues.apache.org/jira/browse/SPARK-17670
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.0.0
>            Reporter: Daniel Williams
>
> Upon upgrading to Spark 2.0 I discovered that previously supported case classes containing members of the type Option[Map] of any key/value binding, mutable or immutable, were no longer supported and produced an exception similar to the following.  Upon further testing I also noticed that Option was support for Seq, case classes, and primitives.  Validating unit tests included using spark-testing-base.
> {code}
> org.apache.spark.sql.AnalysisException: cannot resolve 'wrapoption(staticinvoke(class org.apache.spark.sql.catalyst.util.ArrayBasedMapData$, ObjectType(interface scala.collection.Map), toScalaMap, mapobjects(MapObjects_loopValue32, MapObjects_loopIsNull33, StringType, lambdavariable(MapObjects_loopValue32, MapObjects_loopIsNull33, StringType).toString, cast(lambdavariable(MapObjects_loopValue30, MapObjects_loopIsNull31, StructField(uuid,StringType,true), StructField(timestamp,TimestampType,true), StructField(sourceSystem,StringType,true), StructField(input,MapType(StringType,StringType,true),true)).input as map<string,string>).keyArray).array, mapobjects(MapObjects_loopValue34, MapObjects_loopIsNull35, StringType, lambdavariable(MapObjects_loopValue34, MapObjects_loopIsNull35, StringType).toString, cast(lambdavariable(MapObjects_loopValue30, MapObjects_loopIsNull31, StructField(uuid,StringType,true), StructField(timestamp,TimestampType,true), StructField(sourceSystem,StringType,true), StructField(input,MapType(StringType,StringType,true),true)).input as map<string,string>).valueArray).array, true), ObjectType(interface scala.collection.immutable.Map))' due to data type mismatch: argument 1 requires scala.collection.immutable.Map type, however, 'staticinvoke(class org.apache.spark.sql.catalyst.util.ArrayBasedMapData$, ObjectType(interface scala.collection.Map), toScalaMap, mapobjects(MapObjects_loopValue32, MapObjects_loopIsNull33, StringType, lambdavariable(MapObjects_loopValue32, MapObjects_loopIsNull33, StringType).toString, cast(lambdavariable(MapObjects_loopValue30, MapObjects_loopIsNull31, StructField(uuid,StringType,true), StructField(timestamp,TimestampType,true), StructField(sourceSystem,StringType,true), StructField(input,MapType(StringType,StringType,true),true)).input as map<string,string>).keyArray).array, mapobjects(MapObjects_loopValue34, MapObjects_loopIsNull35, StringType, lambdavariable(MapObjects_loopValue34, MapObjects_loopIsNull35, StringType).toString, cast(lambdavariable(MapObjects_loopValue30, MapObjects_loopIsNull31, StructField(uuid,StringType,true), StructField(timestamp,TimestampType,true), StructField(sourceSystem,StringType,true), StructField(input,MapType(StringType,StringType,true),true)).input as map<string,string>).valueArray).array, true)' is of scala.collection.Map type.;
> at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
> at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:82)
> at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:74)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:301)
> {code}
> Unit tests:
> {code}
> import com.holdenkarau.spark.testing.{DataFrameSuiteBase, SharedSparkContext}
> import org.scalatest.{Matchers, FunSuite}
> import org.slf4j.LoggerFactory
> import scala.util.{Failure, Try, Success}
> case class ImmutableMapTest(data: Map[String, String])
> case class MapTest(data: scala.collection.mutable.Map[String, String])
> case class ImmtableWithOption(data: Option[Map[String, String]])
> case class MutableWithOption(data: Option[scala.collection.mutable.Map[String, String]])
> case class PrimWithOption(data: Option[String])
> case class ArrayWithOption(data: Option[Seq[String]])
> class TestOptionWithDataTypes
>   extends FunSuite
>     with Matchers
>     with SharedSparkContext
>     with DataFrameSuiteBase {
>   val logger = LoggerFactory.getLogger(classOf[TestOptionWithDataTypes])
>   test("test immutable map") {
>     import sqlContext.implicits._
>     val rdd = sc.parallelize(Seq(ImmutableMapTest(Map("1"->"2"))))
>     val result = Try {
>       rdd.toDF()
>     } match {
>       case Success(e) => Option(e)
>       case Failure(e) => {
>         logger.error(e.getMessage, e)
>         None
>       }
>     }
>     result should not be(None)
>     result.get.count() should be(1)
>     result.get.show()
>   }
>   test("test mutable Map") {
>     import sqlContext.implicits._
>     val rdd = sc.parallelize(Seq(MapTest(scala.collection.mutable.Map("1"->"2"))))
>     val result = Try {
>       rdd.toDF()
>     } match {
>       case Success(e) => Option(e)
>       case Failure(e) => {
>         logger.error(e.getMessage, e)
>         None
>       }
>     }
>     result should not be(None)
>     result.get.count() should be(1)
>     result.get.show()
>   }
>   test("test immutable option Map") {
>     import sqlContext.implicits._
>     val rdd = sc.parallelize(Seq(ImmtableWithOption(Option(Map("1"->"2")))))
>     val result = Try {
>       rdd.toDF()
>     } match {
>       case Success(e) => Option(e)
>       case Failure(e) => {
>         logger.error(e.getMessage, e)
>         None
>       }
>     }
>     result should not be(None)
>     result.get.count() should be(1)
>     result.get.show()
>   }
>   test("test mutable option Map") {
>     import sqlContext.implicits._
>     val rdd = sc.parallelize(Seq(MutableWithOption(Option(scala.collection.mutable.Map("1"->"2")))))
>     val result = Try {
>       rdd.toDF()
>     } match {
>       case Success(e) => Option(e)
>       case Failure(e) => {
>         logger.error(e.getMessage, e)
>         None
>       }
>     }
>     result should not be(None)
>     result.get.count() should be(1)
>     result.get.show()
>   }
>   test("test option with prim") {
>     import sqlContext.implicits._
>     val rdd = sc.parallelize(Seq(PrimWithOption(Option("foo"))))
>     val result = Try {
>       rdd.toDF()
>     } match {
>       case Success(e) => Option(e)
>       case Failure(e) => {
>         logger.error(e.getMessage, e)
>         None
>       }
>     }
>     result should not be(None)
>     result.get.count() should be(1)
>     result.get.show()
>   }
>   test("test option with array") {
>     import sqlContext.implicits._
>     val rdd = sc.parallelize(Seq(ArrayWithOption(Option(Seq("foo")))))
>     val result = Try {
>       rdd.toDF()
>     } match {
>       case Success(e) => Option(e)
>       case Failure(e) => {
>         logger.error(e.getMessage, e)
>         None
>       }
>     }
>     result should not be(None)
>     result.get.count() should be(1)
>     result.get.show()
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org