You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Sunita Arvind <su...@gmail.com> on 2014/12/08 22:43:54 UTC

Transform SchemaRDDs into new SchemaRDDs

Hi,

I need to generate some flags based on certain columns and add it back to
the schemaRDD for further operations. Do I have to use case class
(reflection or programmatically). I am using parquet files, so schema is
being automatically derived. This is a great feature. thanks to Spark
developers, however, if subsequent createSchemaRDD doesnt work, this
feature seems unusable for advanced levels. I hope there is some way of
doing it and I am missing something. This is what I am attempting.
Appreciate your help.


Here is my code block:

    import sqlContext.createSchemaRDD
     val YR_indv_purchase = createSchemaRDD(indv_purchase.map{row => {
          val v_YR = scala.math.ceil(monthsBetween(row.getString(2),
row.getString(5))(dateFormats.YYYYMMDD))
          val YR = "YR"+v_YR.toString()
      (row, YR)
     }
    }
   ).registerTempTable("YRIndvPurchase")


---------------------------------------------------------------------------------------------------------------------------------------
------- just for completeness, here are the functions being used--------
import com.github.nscala_time.time.Imports._
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.Months

object dateFormats {
  val YYYYMMDD = DateTimeFormat.forPattern("YYYYMMDD")
}

def toDateTime(dtString: String)(implicit fmt: DateTimeFormatter): DateTime
=
    fmt.parseDateTime(dtString)


 def monthsBetween(FromDT: String, ToDT: String)(implicit fmt:
DateTimeFormatter): Int =
    Months.monthsBetween(toDateTime(FromDT)(fmt),
toDateTime(ToDT)(fmt)).getMonths

----------------------------------------------------------------------------------------------------------------------------------------

This compiles ok and throws a runtime exception as below:

Exception in thread "main" scala.MatchError: org.apache.spark.sql.Row (of
class scala.reflect.internal.Types$TypeRef$$anon$3)
at
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:72)
at
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:64)
at
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:62)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:62)
at
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:50)
at
org.apache.spark.sql.catalyst.ScalaReflection$.attributesFor(ScalaReflection.scala:44)
at
org.apache.spark.sql.execution.ExistingRdd$.fromProductRdd(basicOperators.scala:229)
at org.apache.spark.sql.SQLContext.createSchemaRDD(SQLContext.scala:94)
at croevss.StageJoin$.vsswf(StageJoin.scala:162)
at croevss.StageJoin$.main(StageJoin.scala:41)
at croevss.StageJoin.main(StageJoin.scala)


regards
Sunita Koppar