You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Alonso Isidoro Roman <al...@gmail.com> on 2016/05/24 16:15:08 UTC

About an runtime error when trying to recover a tuple from a kafka topic using spark streaming and scala

Hi everyone, i am trying to access to a field within a Tuple2 and when i
push a json message from amazon, a runtime error happens. The software
tries to push a case class within a kafka topic, then i want to recover it
using spark streaming so i can feed a machine learning algorithm (ALS )and
save results within a mongo instance.

This is the github project:

https://github.com/alonsoir/awesome-recommendation-engine/tree/develop

Kafka Producer

package example.producer
import play.api.libs.json._import example.utils._import
scala.concurrent.Futureimport
example.model.{AmazonProductAndRating,AmazonProduct,AmazonRating}import
example.utils.AmazonPageParserimport
scala.concurrent.ExecutionContext.Implicits.globalimport
scala.concurrent.Future

/**
args(0) : productId
args(1) : userdId

Usage: ./amazon-producer-example 0981531679 someUserId 3.0
*/object AmazonProducerExample {
def main(args: Array[String]): Unit = {
val productId = args(0).toStringval userId = args(1).toStringval
rating = args(2).toDoubleval topicName = "amazonRatingsTopic"
val producer = Producer[String](topicName)
//0981531679 is Scala
Puzzlers...AmazonPageParser.parse(productId,userId,rating).onSuccess {
case amazonRating =>
  //Is this the correct way? the best performance? possibly not, what
about using avro or parquet? How can i push data in avro or parquet
format?
  //You can see that i am pushing json String to kafka topic, not raw
String, but is there any difference?
  producer.send(Json.toJson(amazonRating).toString)
  println("amazon product with rating sent to kafka cluster..." +
amazonRating.toString)
  System.exit(0)}
}}

This is the definition of necessary case classes, the file is named
models.scala:

package example.model
import play.api.libs.json.Jsonimport reactivemongo.bson.Macros
case class AmazonProduct(itemId: String, title: String, url: String,
img: String, description: String)case class AmazonRating(userId:
String, productId: String, rating: Double)
case class AmazonProductAndRating(product: AmazonProduct, rating: AmazonRating)
// For MongoDBobject AmazonRating {implicit val amazonRatingHandler =
Macros.handler[AmazonRating]implicit val amazonRatingFormat =
Json.format[AmazonRating]lazy val empty: AmazonRating =
AmazonRating("-1", "-1", -1d)}

This is the full code of the spark streaming process:

i am trying to access to a field within a Tuple2 and compiler is
returning me an error. The software tries to push a case class within
a kafka topic, then i want to recover it using spark streaming so i
can feed a machine learning algorithm and save results within a mongo
instance.

This is the code (COMMENTS UPDATED. NOT WORKING YET! almost there!):

This is the github project:

https://github.com/alonsoir/awesome-recommendation-engine/tree/develop

Kafka Producer

package example.producer
import play.api.libs.json._import example.utils._import
scala.concurrent.Futureimport
example.model.{AmazonProductAndRating,AmazonProduct,AmazonRating}import
example.utils.AmazonPageParserimport
scala.concurrent.ExecutionContext.Implicits.globalimport
scala.concurrent.Future

/**
args(0) : productId
args(1) : userdId

Usage: ./amazon-producer-example 0981531679 someUserId 3.0
*/object AmazonProducerExample {
def main(args: Array[String]): Unit = {
val productId = args(0).toStringval userId = args(1).toStringval
rating = args(2).toDoubleval topicName = "amazonRatingsTopic"
val producer = Producer[String](topicName)
//0981531679 is Scala
Puzzlers...AmazonPageParser.parse(productId,userId,rating).onSuccess {
case amazonRating =>
  //Is this the correct way? the best performance? possibly not, what
about using avro or parquet? How can i push data in avro or parquet
format?
  //You can see that i am pushing json String to kafka topic, not raw
String, but is there any difference?
  producer.send(Json.toJson(amazonRating).toString)
  //producer.send(amazonRating.toString)
  println("amazon product with rating sent to kafka cluster..." +
amazonRating.toString)
  System.exit(0)}
}}

This is the definition of necessary case classes (UPDATED), the file is
named models.scala:

package example.model
import play.api.libs.json.Jsonimport reactivemongo.bson.Macros
case class AmazonProduct(itemId: String, title: String, url: String,
img: String, description: String)case class AmazonRating(userId:
String, productId: String, rating: Double)
case class AmazonProductAndRating(product: AmazonProduct, rating: AmazonRating)
// For MongoDBobject AmazonRating {implicit val amazonRatingHandler =
Macros.handler[AmazonRating]implicit val amazonRatingFormat =
Json.format[AmazonRating]lazy val empty: AmazonRating =
AmazonRating("-1", "-1", -1d)}

This is the full code of the spark streaming process:

object AmazonKafkaConnector {
private var numAmazonProductCollected = 0Lprivate var partNum =
0private val numAmazonProductToCollect = 10000000
//this settings must be in reference.confprivate val Database =
"alonsodb"private val ratingCollection = "amazonRatings"private val
MongoHost = "127.0.0.1"private val MongoPort = 27017private val
MongoProvider = "com.stratio.datasource.mongodb"
private val jsonParser = new JsonParser()private val gson = new
GsonBuilder().setPrettyPrinting().create()
private def prepareMongoEnvironment(): MongoClient = {
  val mongoClient = MongoClient(MongoHost, MongoPort)
  mongoClient}
private def closeMongoEnviroment(mongoClient : MongoClient) = {
  mongoClient.close()
  println("mongoclient closed!")}
private def cleanMongoEnvironment(mongoClient: MongoClient) = {
  cleanMongoData(mongoClient)
  mongoClient.close()}
private def cleanMongoData(client: MongoClient): Unit = {
  val collection = client(Database)(ratingCollection)
  collection.dropCollection()}
def main(args: Array[String]) {// Process program arguments and set properties
if (args.length < 2) {
  System.err.println("Usage: " + this.getClass.getSimpleName + "
<brokers> <topics>")
  System.exit(1)}
val Array(brokers, topics) = args

println("Initializing Streaming Spark Context and kafka
connector...")// Create context with 2 second batch intervalval
sparkConf = new SparkConf().setAppName("AmazonKafkaConnector")
                               .setMaster("local[4]")

.set("spark.driver.allowMultipleContexts", "true")
val sc = new SparkContext(sparkConf)
sc.addJar("target/scala-2.10/blog-spark-recommendation_2.10-1.0-SNAPSHOT.jar")val
ssc = new StreamingContext(sparkConf, Seconds(2))
// Create direct kafka stream with brokers and topicsval topicsSet =
topics.split(",").toSetval kafkaParams = Map[String,
String]("metadata.broker.list" -> brokers)val messages =
KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](ssc, kafkaParams, topicsSet)
println("Initialized Streaming Spark Context and kafka connector...")
//create recomendation module
println("Creating rating recommender module...")val ratingFile=
"ratings.csv"val recommender = new Recommender(sc,ratingFile)
println("Initialized rating recommender module...")
//i have to convert messages which is a InputDStream into a
Seq...//tuesday, 24 of May//compiling fine, but it does not work yet,
almost there! a runtime error happens!.
try{
messages.foreachRDD( rdd =>{
  val count = rdd.count()
  if (count > 0){
    //someMessages should be AmazonRating...
    val someMessages = rdd.take(count.toInt)
    println("<------>")
    println("someMessages is " + someMessages)
    someMessages.foreach(println)
    println("<------>")
    println("<---POSSIBLE SOLUTION--->")
    messages
    .map { case (_, jsonRating) =>
      val jsValue = Json.parse(jsonRating)
      AmazonRating.amazonRatingFormat.reads(jsValue) match {
        case JsSuccess(rating, _) => rating
        case JsError(_) => AmazonRating.empty
      }
         }
    .filter(_ != AmazonRating.empty)
    //UPDATED with @Yuval tip, now compiling fine, but an runtime error happens!
    .foreachRDD(_.foreachPartition(it => recommender.predictWithALS(it.toSeq)))

    println("<---POSSIBLE SOLUTION--->")

  }
  })}catch{
  case e: IllegalArgumentException => {println("illegal arg. exception")};
  case e: IllegalStateException    => {println("illegal state exception")};
  case e: ClassCastException       => {println("ClassCastException")};
  case e: Exception                => {println(" Generic Exception")};}finally{

  println("Finished taking data from kafka topic...")}
ssc.start()
ssc.awaitTermination()

println("Finished!")}//def main}//object AmazonKafkaConnector

The output of the exception is:

tuesday, 24 of May...
The exception is too long, here is the pastebin link
<http://pastebin.com/AgjcvnaV>...

I noticed that the stack trace says something about this line:The
stack trace:16/05/24 16:55:44 INFO DAGScheduler: Submitting 1 missing
tasks from ResultStage 0 (KafkaRDD[74] at createDirectStream at
AmazonKafkaConnectorWithMongo.scala:100)
The mentioned line is:
val messages = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)

Another error is that the spark streaming process interrupts abruptly...

Recommender.predict signature method looks like:

def predict(ratings: Seq[AmazonRating]) = {
  // train model
  val myRatings = ratings.map(toSparkRating)
  val myRatingRDD = sc.parallelize(myRatings)

  val startAls = DateTime.now
  val model = ALS.train((sparkRatings ++
myRatingRDD).repartition(NumPartitions), 10, 20, 0.01)

  val myProducts = myRatings.map(_.product).toSet
  val candidates = sc.parallelize((0 until
productDict.size).filterNot(myProducts.contains))

  // get ratings of all products not in my history ordered by rating
(higher first) and only keep the first NumRecommendations
   val myUserId = userDict.getIndex(MyUsername)
   val recommendations = model.predict(candidates.map((myUserId, _))).collect
   val endAls = DateTime.now
   val result =
recommendations.sortBy(-_.rating).take(NumRecommendations).map(toAmazonRating)
   val alsTime = Seconds.secondsBetween(startAls, endAls).getSeconds

   println(s"ALS Time: $alsTime seconds")
   result
   }

Thank you for reading until here and i hope you can help me, i am still
learning this language and the technology. This query is already asked in stack
overflow
<http://stackoverflow.com/questions/37303202/about-an-error-accessing-a-field-inside-tuple2>
with
a open bounty.

Alonso Isidoro Roman
[image: https://]about.me/alonso.isidoro.roman