You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Anthony Brew <at...@gmail.com> on 2016/02/22 15:42:48 UTC

Option[Long] parameter in case class parsed from JSON DataFrame failing when key not present in JSON

Hi,
     I'm trying to parse JSON data into a case class using the
DataFrame.as[] function, nut I am hitting an unusual error and the interweb
isnt solving my pain so thought I would reach out for help. Ive truncated
my code a little here to make it readable, but the error is full

My case class looks like....

case class CustomerEvent(
                          customer_id: String,
                          product_id: Option[Long] = None,
                        )


My passing test looks like

"A Full CustomerEvent JSON Object" should "Parse Correctly" in {
  val jsonStr = """ {
                     "customer_id": "3ee066ab571e03dd5f3c443a6c34417a",
                     "product_id": 3,
                        }
                 """
   // apparently deprecation is not an issue
   val rdd = sc.parallelize(Seq(jsonStr))

   import sqlContext.implicits._
   val customers: Dataset[CustomerEvent] =
sqlContext.read.json(rdd).as[CustomerEvent]

   val ce: CustomerEvent = customers.first()
   ce.customer_id should be ("3ee066ab571e03dd5f3c443a6c34417a")
   ce.product_id.get should be (3)
 }

My issue is when the product_id is not part of the json, I get a encoding
error

ie the following

  "A Partial CustomerEvent JSON Object" should " should Parse Correctly" in {
    val jsonStr = """ {
                       "customer_id": "3ee066ab571e03dd5f3c443a6c34417a"
                      }
                  """
    // apparently deprecation is not an issue
    val rdd = sc.parallelize(Seq(jsonStr))

    import sqlContext.implicits._
    val customers: Dataset[CustomerEvent] =
sqlContext.read.json(rdd).as[CustomerEvent]

    val ce: CustomerEvent = customers.first()
    ce.customer_id should be ("3ee066ab571e03dd5f3c443a6c34417a")
    ce.product_id.isDefined should be (false)

  }



My error looks like

Error while decoding: java.lang.UnsupportedOperationException: Cannot
evaluate expression: upcast('product_id,DoubleType,- field (class:
"scala.Option", name: "product_id"),- root class: "data.CustomerEvent")
newinstance(class data.CustomerEvent,invoke(input[3,
StringType],toString,ObjectType(class java.lang.String)),input[0,
LongType],input[9, LongType],invoke(input[5,
StringType],toString,ObjectType(class java.lang.String)),invoke(input[6,
StringType],toString,ObjectType(class java.lang.String)),input[7,
LongType],invoke(input[1, StringType],toString,ObjectType(class
java.lang.String)),wrapoption(input[8,
LongType]),wrapoption(upcast('product_id,DoubleType,- field (class:
"scala.Option", name: "product_id"),- root class:
"data.CustomerEvent")),wrapoption(input[4,
DoubleType]),wrapoption(invoke(input[2,
StringType],toString,ObjectType(class
java.lang.String))),false,ObjectType(class data.CustomerEvent),None)
:- invoke(input[3, StringType],toString,ObjectType(class java.lang.String))
:  +- input[3, StringType]
:- input[0, LongType]
:- input[9, LongType]
:- invoke(input[5, StringType],toString,ObjectType(class java.lang.String))
:  +- input[5, StringType]
:- invoke(input[6, StringType],toString,ObjectType(class java.lang.String))
:  +- input[6, StringType]
:- input[7, LongType]
:- invoke(input[1, StringType],toString,ObjectType(class java.lang.String))
:  +- input[1, StringType]
:- wrapoption(input[8, LongType])
:  +- input[8, LongType]
:- wrapoption(upcast('product_id,DoubleType,- field (class: "scala.Option",
name: "product_id"),- root class: "data.CustomerEvent"))
:  +- upcast('product_id,DoubleType,- field (class: "scala.Option", name:
"product_id"),- root class: "data.CustomerEvent")
:     +- 'product_id
:- wrapoption(input[4, DoubleType])
:  +- input[4, DoubleType]
+- wrapoption(invoke(input[2, StringType],toString,ObjectType(class
java.lang.String)))
   +- invoke(input[2, StringType],toString,ObjectType(class
java.lang.String))
      +- input[2, StringType]


  at
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.fromRow(ExpressionEncoder.scala:224)
  at
org.apache.spark.sql.Dataset$$anonfun$collect$2.apply(Dataset.scala:668)
  at
org.apache.spark.sql.Dataset$$anonfun$collect$2.apply(Dataset.scala:668)
  at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
  at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
  at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
  at org.apache.spark.sql.Dataset.collect(Dataset.scala:668)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:689)
  at org.apache.spark.sql.Dataset.first(Dataset.scala:654)
  at
data.TestLoadingCustomerEventFromJSON$$anonfun$2.apply$mcV$sp(TestLoadingCustomerEventFromJSON.scala:70)
  at
data.TestLoadingCustomerEventFromJSON$$anonfun$2.apply(TestLoadingCustomerEventFromJSON.scala:50)
  at
data.TestLoadingCustomerEventFromJSON$$anonfun$2.apply(TestLoadingCustomerEventFromJSON.scala:50)
  at
org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
  at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
  at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
  at org.scalatest.Transformer.apply(Transformer.scala:22)
  at org.scalatest.Transformer.apply(Transformer.scala:20)
  at org.scalatest.FlatSpecLike$$anon$1.apply(FlatSpecLike.scala:1647)
  at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
  at org.scalatest.FlatSpec.withFixture(FlatSpec.scala:1683)
  at
org.scalatest.FlatSpecLike$class.invokeWithFixture$1(FlatSpecLike.scala:1644)
  at
org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1656)
  at
org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1656)
  at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
  at org.scalatest.FlatSpecLike$class.runTest(FlatSpecLike.scala:1656)



Any pointers on what I am doing wrong would be gratefully accepted!

Thanks a Million,
Anthony

Re: Option[Long] parameter in case class parsed from JSON DataFrame failing when key not present in JSON

Posted by Anthony Brew <at...@gmail.com>.
Thank you Jakob, you were bang on the money. Jorge appologies my snippets
was partial and I hadn't made it equivelent to my failing test.

For reference and for all that pass this way, here is the (a) working
solution with passing tests without inferring a schema, it was the second
test that had been failing prior to Jakobs pointer.


import org.apache.spark.sql.Dataset
import org.scalatest.Matchers

case class Sample(val time:Long , val opt: Option[Long] = None)

class SampleTest extends SparkSimpleContextConfigurator with Matchers{

  "A JSON Object" should "Parse Correctly" in {
    val jsonStr = """ {"time": 2,
                        "opt": 1
                      }
                  """

    val rdd = sc.parallelize(Seq(jsonStr))

    import sqlContext.implicits._
    val samples: Dataset[Sample] = sqlContext.read.json(rdd).as[Sample]

    val sample: Sample = samples.first()

    sample.time should be (2)
    sample.opt.isDefined should be (true)
    sample.opt.get should be (1)
  }

  "A Partial JSON Object" should "Parse Correctly" in {
    val json = Seq(
                    """ {"time": 2 }
                    """
                    ,
                    """ {"time": 10,"opt": 10}
                    """
                    )

    val rdd = sc.parallelize(json)

    import sqlContext.implicits._
    val samples: Dataset[Sample] = sqlContext.read.json(rdd).as[Sample]
    val sample: Sample = samples.first()

    sample.time should be (2)
    sample.opt.isDefined should be (false)
  }

}








Phone: 087 - 9179799
Quidquid latine dictum sit, altum sonatur

On 23 February 2016 at 00:43, Jakob Odersky <ja...@odersky.com> wrote:

> I think the issue is that the `json.read` function has no idea of the
> underlying schema, in fact the documentation
> (
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameReader
> )
> says:
>
> > Unless the schema is specified using schema function, this function goes
> through the input once to determine the input schema.
>
> so since your test data does not contain a record with a product_id,
> json.read creates a schema that does not contain it. Only after
> determining the (incorrect) schema, you treat it as a Dataset of
> CustomerEvent which will fail.
> Try creating a schema (StructType) manually for your CustomerEvent
> case class and pass it to the `json.schema` function before calling
> `read`. I.e. something like
>
> val sch = StructType(StructField("customer_id",StringType,false),
> StructField(porduct_id,IntegerType,true)) //there's probably a better
> way to get the schema from a case class
> val customers: Dataset[CustomerEvent] =
> sqlContext.read.schema(sch).json(rdd).as[CustomerEvent]
>
> just a pointer, I haven't tested this.
> regards,
> --Jakob
>
> On Mon, Feb 22, 2016 at 12:17 PM, Jorge Machado <jo...@me.com> wrote:
> > Hi Anthony,
> >
> > I try the code on my self.  I think it is on the jsonStr:
> >
> > I do it with : val jsonStr = """{"customer_id":
> > "3ee066ab571e03dd5f3c443a6c34417a","product_id": 3}”""
> >
> > or is it the “,” after your 3 oder the “\n”
> >
> > Regards
> >
> >
> >
> > On 22/02/2016, at 15:42, Anthony Brew <at...@gmail.com> wrote:
> >
> > Hi,
> >      I'm trying to parse JSON data into a case class using the
> > DataFrame.as[] function, nut I am hitting an unusual error and the
> interweb
> > isnt solving my pain so thought I would reach out for help. Ive
> truncated my
> > code a little here to make it readable, but the error is full
> >
> > My case class looks like....
> >
> > case class CustomerEvent(
> >                           customer_id: String,
> >                           product_id: Option[Long] = None,
> >                         )
> >
> >
> > My passing test looks like
> >
> > "A Full CustomerEvent JSON Object" should "Parse Correctly" in {
> >   val jsonStr = """ {
> >                      "customer_id": "3ee066ab571e03dd5f3c443a6c34417a",
> >                      "product_id": 3,
> >                         }
> >                  """
> >    // apparently deprecation is not an issue
> >    val rdd = sc.parallelize(Seq(jsonStr))
> >
> >    import sqlContext.implicits._
> >    val customers: Dataset[CustomerEvent] =
> > sqlContext.read.json(rdd).as[CustomerEvent]
> >
> >    val ce: CustomerEvent = customers.first()
> >    ce.customer_id should be ("3ee066ab571e03dd5f3c443a6c34417a")
> >    ce.product_id.get should be (3)
> >  }
> >
> > My issue is when the product_id is not part of the json, I get a encoding
> > error
> >
> > ie the following
> >
> >   "A Partial CustomerEvent JSON Object" should " should Parse Correctly"
> in
> > {
> >     val jsonStr = """ {
> >                        "customer_id": "3ee066ab571e03dd5f3c443a6c34417a"
> >                       }
> >                   """
> >     // apparently deprecation is not an issue
> >     val rdd = sc.parallelize(Seq(jsonStr))
> >
> >     import sqlContext.implicits._
> >     val customers: Dataset[CustomerEvent] =
> > sqlContext.read.json(rdd).as[CustomerEvent]
> >
> >     val ce: CustomerEvent = customers.first()
> >     ce.customer_id should be ("3ee066ab571e03dd5f3c443a6c34417a")
> >     ce.product_id.isDefined should be (false)
> >
> >   }
> >
> >
> >
> > My error looks like
> >
> > Error while decoding: java.lang.UnsupportedOperationException: Cannot
> > evaluate expression: upcast('product_id,DoubleType,- field (class:
> > "scala.Option", name: "product_id"),- root class: "data.CustomerEvent")
> > newinstance(class data.CustomerEvent,invoke(input[3,
> > StringType],toString,ObjectType(class java.lang.String)),input[0,
> > LongType],input[9, LongType],invoke(input[5,
> > StringType],toString,ObjectType(class java.lang.String)),invoke(input[6,
> > StringType],toString,ObjectType(class java.lang.String)),input[7,
> > LongType],invoke(input[1, StringType],toString,ObjectType(class
> > java.lang.String)),wrapoption(input[8,
> > LongType]),wrapoption(upcast('product_id,DoubleType,- field (class:
> > "scala.Option", name: "product_id"),- root class:
> > "data.CustomerEvent")),wrapoption(input[4,
> > DoubleType]),wrapoption(invoke(input[2,
> > StringType],toString,ObjectType(class
> > java.lang.String))),false,ObjectType(class data.CustomerEvent),None)
> > :- invoke(input[3, StringType],toString,ObjectType(class
> java.lang.String))
> > :  +- input[3, StringType]
> > :- input[0, LongType]
> > :- input[9, LongType]
> > :- invoke(input[5, StringType],toString,ObjectType(class
> java.lang.String))
> > :  +- input[5, StringType]
> > :- invoke(input[6, StringType],toString,ObjectType(class
> java.lang.String))
> > :  +- input[6, StringType]
> > :- input[7, LongType]
> > :- invoke(input[1, StringType],toString,ObjectType(class
> java.lang.String))
> > :  +- input[1, StringType]
> > :- wrapoption(input[8, LongType])
> > :  +- input[8, LongType]
> > :- wrapoption(upcast('product_id,DoubleType,- field (class:
> "scala.Option",
> > name: "product_id"),- root class: "data.CustomerEvent"))
> > :  +- upcast('product_id,DoubleType,- field (class: "scala.Option", name:
> > "product_id"),- root class: "data.CustomerEvent")
> > :     +- 'product_id
> > :- wrapoption(input[4, DoubleType])
> > :  +- input[4, DoubleType]
> > +- wrapoption(invoke(input[2, StringType],toString,ObjectType(class
> > java.lang.String)))
> >    +- invoke(input[2, StringType],toString,ObjectType(class
> > java.lang.String))
> >       +- input[2, StringType]
> >
> >
> >   at
> >
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.fromRow(ExpressionEncoder.scala:224)
> >   at
> > org.apache.spark.sql.Dataset$$anonfun$collect$2.apply(Dataset.scala:668)
> >   at
> > org.apache.spark.sql.Dataset$$anonfun$collect$2.apply(Dataset.scala:668)
> >   at
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
> >   at
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
> >   at
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> >   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
> >   at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
> >   at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
> >   at org.apache.spark.sql.Dataset.collect(Dataset.scala:668)
> >   at org.apache.spark.sql.Dataset.take(Dataset.scala:689)
> >   at org.apache.spark.sql.Dataset.first(Dataset.scala:654)
> >   at
> >
> data.TestLoadingCustomerEventFromJSON$$anonfun$2.apply$mcV$sp(TestLoadingCustomerEventFromJSON.scala:70)
> >   at
> >
> data.TestLoadingCustomerEventFromJSON$$anonfun$2.apply(TestLoadingCustomerEventFromJSON.scala:50)
> >   at
> >
> data.TestLoadingCustomerEventFromJSON$$anonfun$2.apply(TestLoadingCustomerEventFromJSON.scala:50)
> >   at
> >
> org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
> >   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
> >   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
> >   at org.scalatest.Transformer.apply(Transformer.scala:22)
> >   at org.scalatest.Transformer.apply(Transformer.scala:20)
> >   at org.scalatest.FlatSpecLike$$anon$1.apply(FlatSpecLike.scala:1647)
> >   at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
> >   at org.scalatest.FlatSpec.withFixture(FlatSpec.scala:1683)
> >   at
> >
> org.scalatest.FlatSpecLike$class.invokeWithFixture$1(FlatSpecLike.scala:1644)
> >   at
> >
> org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1656)
> >   at
> >
> org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1656)
> >   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
> >   at org.scalatest.FlatSpecLike$class.runTest(FlatSpecLike.scala:1656)
> >
> >
> >
> > Any pointers on what I am doing wrong would be gratefully accepted!
> >
> > Thanks a Million,
> > Anthony
> >
> >
>

Re: Option[Long] parameter in case class parsed from JSON DataFrame failing when key not present in JSON

Posted by Jakob Odersky <ja...@odersky.com>.
I think the issue is that the `json.read` function has no idea of the
underlying schema, in fact the documentation
(http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameReader)
says:

> Unless the schema is specified using schema function, this function goes through the input once to determine the input schema.

so since your test data does not contain a record with a product_id,
json.read creates a schema that does not contain it. Only after
determining the (incorrect) schema, you treat it as a Dataset of
CustomerEvent which will fail.
Try creating a schema (StructType) manually for your CustomerEvent
case class and pass it to the `json.schema` function before calling
`read`. I.e. something like

val sch = StructType(StructField("customer_id",StringType,false),
StructField(porduct_id,IntegerType,true)) //there's probably a better
way to get the schema from a case class
val customers: Dataset[CustomerEvent] =
sqlContext.read.schema(sch).json(rdd).as[CustomerEvent]

just a pointer, I haven't tested this.
regards,
--Jakob

On Mon, Feb 22, 2016 at 12:17 PM, Jorge Machado <jo...@me.com> wrote:
> Hi Anthony,
>
> I try the code on my self.  I think it is on the jsonStr:
>
> I do it with : val jsonStr = """{"customer_id":
> "3ee066ab571e03dd5f3c443a6c34417a","product_id": 3}”""
>
> or is it the “,” after your 3 oder the “\n”
>
> Regards
>
>
>
> On 22/02/2016, at 15:42, Anthony Brew <at...@gmail.com> wrote:
>
> Hi,
>      I'm trying to parse JSON data into a case class using the
> DataFrame.as[] function, nut I am hitting an unusual error and the interweb
> isnt solving my pain so thought I would reach out for help. Ive truncated my
> code a little here to make it readable, but the error is full
>
> My case class looks like....
>
> case class CustomerEvent(
>                           customer_id: String,
>                           product_id: Option[Long] = None,
>                         )
>
>
> My passing test looks like
>
> "A Full CustomerEvent JSON Object" should "Parse Correctly" in {
>   val jsonStr = """ {
>                      "customer_id": "3ee066ab571e03dd5f3c443a6c34417a",
>                      "product_id": 3,
>                         }
>                  """
>    // apparently deprecation is not an issue
>    val rdd = sc.parallelize(Seq(jsonStr))
>
>    import sqlContext.implicits._
>    val customers: Dataset[CustomerEvent] =
> sqlContext.read.json(rdd).as[CustomerEvent]
>
>    val ce: CustomerEvent = customers.first()
>    ce.customer_id should be ("3ee066ab571e03dd5f3c443a6c34417a")
>    ce.product_id.get should be (3)
>  }
>
> My issue is when the product_id is not part of the json, I get a encoding
> error
>
> ie the following
>
>   "A Partial CustomerEvent JSON Object" should " should Parse Correctly" in
> {
>     val jsonStr = """ {
>                        "customer_id": "3ee066ab571e03dd5f3c443a6c34417a"
>                       }
>                   """
>     // apparently deprecation is not an issue
>     val rdd = sc.parallelize(Seq(jsonStr))
>
>     import sqlContext.implicits._
>     val customers: Dataset[CustomerEvent] =
> sqlContext.read.json(rdd).as[CustomerEvent]
>
>     val ce: CustomerEvent = customers.first()
>     ce.customer_id should be ("3ee066ab571e03dd5f3c443a6c34417a")
>     ce.product_id.isDefined should be (false)
>
>   }
>
>
>
> My error looks like
>
> Error while decoding: java.lang.UnsupportedOperationException: Cannot
> evaluate expression: upcast('product_id,DoubleType,- field (class:
> "scala.Option", name: "product_id"),- root class: "data.CustomerEvent")
> newinstance(class data.CustomerEvent,invoke(input[3,
> StringType],toString,ObjectType(class java.lang.String)),input[0,
> LongType],input[9, LongType],invoke(input[5,
> StringType],toString,ObjectType(class java.lang.String)),invoke(input[6,
> StringType],toString,ObjectType(class java.lang.String)),input[7,
> LongType],invoke(input[1, StringType],toString,ObjectType(class
> java.lang.String)),wrapoption(input[8,
> LongType]),wrapoption(upcast('product_id,DoubleType,- field (class:
> "scala.Option", name: "product_id"),- root class:
> "data.CustomerEvent")),wrapoption(input[4,
> DoubleType]),wrapoption(invoke(input[2,
> StringType],toString,ObjectType(class
> java.lang.String))),false,ObjectType(class data.CustomerEvent),None)
> :- invoke(input[3, StringType],toString,ObjectType(class java.lang.String))
> :  +- input[3, StringType]
> :- input[0, LongType]
> :- input[9, LongType]
> :- invoke(input[5, StringType],toString,ObjectType(class java.lang.String))
> :  +- input[5, StringType]
> :- invoke(input[6, StringType],toString,ObjectType(class java.lang.String))
> :  +- input[6, StringType]
> :- input[7, LongType]
> :- invoke(input[1, StringType],toString,ObjectType(class java.lang.String))
> :  +- input[1, StringType]
> :- wrapoption(input[8, LongType])
> :  +- input[8, LongType]
> :- wrapoption(upcast('product_id,DoubleType,- field (class: "scala.Option",
> name: "product_id"),- root class: "data.CustomerEvent"))
> :  +- upcast('product_id,DoubleType,- field (class: "scala.Option", name:
> "product_id"),- root class: "data.CustomerEvent")
> :     +- 'product_id
> :- wrapoption(input[4, DoubleType])
> :  +- input[4, DoubleType]
> +- wrapoption(invoke(input[2, StringType],toString,ObjectType(class
> java.lang.String)))
>    +- invoke(input[2, StringType],toString,ObjectType(class
> java.lang.String))
>       +- input[2, StringType]
>
>
>   at
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.fromRow(ExpressionEncoder.scala:224)
>   at
> org.apache.spark.sql.Dataset$$anonfun$collect$2.apply(Dataset.scala:668)
>   at
> org.apache.spark.sql.Dataset$$anonfun$collect$2.apply(Dataset.scala:668)
>   at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>   at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>   at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
>   at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
>   at org.apache.spark.sql.Dataset.collect(Dataset.scala:668)
>   at org.apache.spark.sql.Dataset.take(Dataset.scala:689)
>   at org.apache.spark.sql.Dataset.first(Dataset.scala:654)
>   at
> data.TestLoadingCustomerEventFromJSON$$anonfun$2.apply$mcV$sp(TestLoadingCustomerEventFromJSON.scala:70)
>   at
> data.TestLoadingCustomerEventFromJSON$$anonfun$2.apply(TestLoadingCustomerEventFromJSON.scala:50)
>   at
> data.TestLoadingCustomerEventFromJSON$$anonfun$2.apply(TestLoadingCustomerEventFromJSON.scala:50)
>   at
> org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
>   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
>   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
>   at org.scalatest.Transformer.apply(Transformer.scala:22)
>   at org.scalatest.Transformer.apply(Transformer.scala:20)
>   at org.scalatest.FlatSpecLike$$anon$1.apply(FlatSpecLike.scala:1647)
>   at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
>   at org.scalatest.FlatSpec.withFixture(FlatSpec.scala:1683)
>   at
> org.scalatest.FlatSpecLike$class.invokeWithFixture$1(FlatSpecLike.scala:1644)
>   at
> org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1656)
>   at
> org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1656)
>   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
>   at org.scalatest.FlatSpecLike$class.runTest(FlatSpecLike.scala:1656)
>
>
>
> Any pointers on what I am doing wrong would be gratefully accepted!
>
> Thanks a Million,
> Anthony
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Option[Long] parameter in case class parsed from JSON DataFrame failing when key not present in JSON

Posted by Jorge Machado <jo...@me.com>.
Hi Anthony, 

I try the code on my self.  I think it is on the jsonStr: 

I do it with : val jsonStr = """{"customer_id": "3ee066ab571e03dd5f3c443a6c34417a","product_id": 3}”""

or is it the “,” after your 3 oder the “\n” 

Regards 



> On 22/02/2016, at 15:42, Anthony Brew <at...@gmail.com> wrote:
> 
> Hi,
>      I'm trying to parse JSON data into a case class using the DataFrame.as[] function, nut I am hitting an unusual error and the interweb isnt solving my pain so thought I would reach out for help. Ive truncated my code a little here to make it readable, but the error is full 
> 
> My case class looks like....
> case class CustomerEvent(
>                           customer_id: String,              
>                           product_id: Option[Long] = None,
>                         )
> 
> My passing test looks like 
> 
> "A Full CustomerEvent JSON Object" should "Parse Correctly" in {
>   val jsonStr = """ {
>                      "customer_id": "3ee066ab571e03dd5f3c443a6c34417a",
>                      "product_id": 3,
>                         }
>                  """
>    // apparently deprecation is not an issue
>    val rdd = sc.parallelize(Seq(jsonStr))
> 
>    import sqlContext.implicits._
>    val customers: Dataset[CustomerEvent] = sqlContext.read.json(rdd).as[CustomerEvent]
> 
>    val ce: CustomerEvent = customers.first()
>    ce.customer_id should be ("3ee066ab571e03dd5f3c443a6c34417a")
>    ce.product_id.get should be (3)
>  }
> My issue is when the product_id is not part of the json, I get a encoding error 
> 
> ie the following  
> 
>   "A Partial CustomerEvent JSON Object" should " should Parse Correctly" in {
>     val jsonStr = """ {
>                        "customer_id": "3ee066ab571e03dd5f3c443a6c34417a"
>                       }
>                   """
>     // apparently deprecation is not an issue
>     val rdd = sc.parallelize(Seq(jsonStr))
> 
>     import sqlContext.implicits._
>     val customers: Dataset[CustomerEvent] = sqlContext.read.json(rdd).as[CustomerEvent]
> 
>     val ce: CustomerEvent = customers.first()
>     ce.customer_id should be ("3ee066ab571e03dd5f3c443a6c34417a")
>     ce.product_id.isDefined should be (false)
> 
>   }
> 
> 
> My error looks like 
> 
> Error while decoding: java.lang.UnsupportedOperationException: Cannot evaluate expression: upcast('product_id,DoubleType,- field (class: "scala.Option", name: "product_id"),- root class: "data.CustomerEvent")
> newinstance(class data.CustomerEvent,invoke(input[3, StringType],toString,ObjectType(class java.lang.String)),input[0, LongType],input[9, LongType],invoke(input[5, StringType],toString,ObjectType(class java.lang.String)),invoke(input[6, StringType],toString,ObjectType(class java.lang.String)),input[7, LongType],invoke(input[1, StringType],toString,ObjectType(class java.lang.String)),wrapoption(input[8, LongType]),wrapoption(upcast('product_id,DoubleType,- field (class: "scala.Option", name: "product_id"),- root class: "data.CustomerEvent")),wrapoption(input[4, DoubleType]),wrapoption(invoke(input[2, StringType],toString,ObjectType(class java.lang.String))),false,ObjectType(class data.CustomerEvent),None)
> :- invoke(input[3, StringType],toString,ObjectType(class java.lang.String))
> :  +- input[3, StringType]
> :- input[0, LongType]
> :- input[9, LongType]
> :- invoke(input[5, StringType],toString,ObjectType(class java.lang.String))
> :  +- input[5, StringType]
> :- invoke(input[6, StringType],toString,ObjectType(class java.lang.String))
> :  +- input[6, StringType]
> :- input[7, LongType]
> :- invoke(input[1, StringType],toString,ObjectType(class java.lang.String))
> :  +- input[1, StringType]
> :- wrapoption(input[8, LongType])
> :  +- input[8, LongType]
> :- wrapoption(upcast('product_id,DoubleType,- field (class: "scala.Option", name: "product_id"),- root class: "data.CustomerEvent"))
> :  +- upcast('product_id,DoubleType,- field (class: "scala.Option", name: "product_id"),- root class: "data.CustomerEvent")
> :     +- 'product_id
> :- wrapoption(input[4, DoubleType])
> :  +- input[4, DoubleType]
> +- wrapoption(invoke(input[2, StringType],toString,ObjectType(class java.lang.String)))
>    +- invoke(input[2, StringType],toString,ObjectType(class java.lang.String))
>       +- input[2, StringType]
> 
> 
>   	at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.fromRow(ExpressionEncoder.scala:224)
>   	at org.apache.spark.sql.Dataset$$anonfun$collect$2.apply(Dataset.scala:668)
>   	at org.apache.spark.sql.Dataset$$anonfun$collect$2.apply(Dataset.scala:668)
>   	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>   	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>   	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>   	at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
>   	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
>   	at org.apache.spark.sql.Dataset.collect(Dataset.scala:668)
>   	at org.apache.spark.sql.Dataset.take(Dataset.scala:689)
>   	at org.apache.spark.sql.Dataset.first(Dataset.scala:654)
>   	at data.TestLoadingCustomerEventFromJSON$$anonfun$2.apply$mcV$sp(TestLoadingCustomerEventFromJSON.scala:70)
>   	at data.TestLoadingCustomerEventFromJSON$$anonfun$2.apply(TestLoadingCustomerEventFromJSON.scala:50)
>   	at data.TestLoadingCustomerEventFromJSON$$anonfun$2.apply(TestLoadingCustomerEventFromJSON.scala:50)
>   	at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
>   	at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
>   	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
>   	at org.scalatest.Transformer.apply(Transformer.scala:22)
>   	at org.scalatest.Transformer.apply(Transformer.scala:20)
>   	at org.scalatest.FlatSpecLike$$anon$1.apply(FlatSpecLike.scala:1647)
>   	at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
>   	at org.scalatest.FlatSpec.withFixture(FlatSpec.scala:1683)
>   	at org.scalatest.FlatSpecLike$class.invokeWithFixture$1(FlatSpecLike.scala:1644)
>   	at org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1656)
>   	at org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1656)
>   	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
>   	at org.scalatest.FlatSpecLike$class.runTest(FlatSpecLike.scala:1656)
> 
> 
> 
> Any pointers on what I am doing wrong would be gratefully accepted!
> 
> Thanks a Million,
> Anthony
>