You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Luis Ángel Vicente Sánchez <la...@gmail.com> on 2014/02/02 16:18:42 UTC

Persisting RDD to Redis

I'm trying to create a simple twitter word counter with spark-streaming and
I would like to store the word counts in redis. The program looks like this:

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.twitter._
import org.sedis._
import redis.clients.jedis._

object TwitterWordCount {
  def main(args: Array[String]) {
    if (args.length < 1) {
      System.err.println("Usage: TwitterWordCount <master> [filter1]
[filter2] ... [filterN]")
      System.exit(1)
    }

    val (master, filters) = (args.head, args.tail)

    val pool = new Pool(new JedisPool(new JedisPoolConfig(), "localhost",
6379, 2000))
    val ssc = new StreamingContext(master, "TwitterWordCount", Seconds(5),
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
    val stream = TwitterUtils.createStream(ssc, None, filters,
StorageLevel.MEMORY_ONLY_SER)

    val words = stream.flatMap(status => status.getText.toLowerCase.split("
")).map(word => (word, 1l))

    val cntWords = words.reduceByKey(_ + _)

    cntWords.foreach(rdd =>
      pool.withJedisClient { client =>
        val pipeline = client.pipelined()
        rdd.foreach {
          case (word, count) =>
            pipeline.incrBy(word, count)
        }
        pipeline.sync()
      }
    )

    ssc.start()
  }
}

Everytime I run this program, I get this error:

[error] 14/02/02 15:16:20 ERROR scheduler.JobScheduler: Error running job
streaming job 1391354180000 ms.0
[error] org.apache.spark.SparkException: Job aborted: Task not
serializable: java.io.NotSerializableException: redis.clients.jedis.Pipeline
[error] at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
[error] at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
[error] at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
[error] at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
[error] at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
[error] at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
[error] at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:915)
[error] at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply(DAGScheduler.scala:912)
[error] at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply(DAGScheduler.scala:912)
[error] at scala.Option.foreach(Option.scala:236)
[error] at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:912)
[error] at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:911)
[error] at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
[error] at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
[error] at
org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:911)
[error] at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:616)
[error] at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
[error] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
[error] at akka.actor.ActorCell.invoke(ActorCell.scala:456)
[error] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
[error] at akka.dispatch.Mailbox.run(Mailbox.scala:219)
[error] at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
[error] at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[error] at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[error] at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[error] at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

I have tried to not use redis pipelines and then I get the same error but
related to the Jedis client.

Have anybody done something similar?

Kind regards,

Luis

P.S. I have attached my build.sbt and the scala source code to this file.

Re: Persisting RDD to Redis

Posted by Luis Ángel Vicente Sánchez <la...@gmail.com>.
Yes, I found that after sending my response; the final program is:

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.twitter._
import redis.clients.jedis._

object TwitterWordCount {
  def main(args: Array[String]) {
    if (args.length < 1) {
      System.err.println("Usage: TwitterWordCount <master> [filter1]
[filter2] ... [filterN]")
      System.exit(1)
    }

    System.setProperty("spark.cleaner.ttl", "600")

    val (master, filters) = (args.head, args.tail)

    val ssc = new StreamingContext(master, "TwitterWordCount", Seconds(5),
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
    val stream = TwitterUtils.createStream(ssc, None, filters,
StorageLevel.MEMORY_ONLY_SER)
    val words = stream.flatMap(status => status.getText.toLowerCase.split("
")).countByValue()
    val countWords = words.reduceByKey(_ + _)

    countWords.foreachRDD(rdd =>
      rdd.foreachPartition { iterator =>
        val client = new Jedis("localhost")
        val pipeline = client.pipelined()
        iterator.foreach {
          case (word, count) =>
            pipeline.incrBy(word, count)
        }
        pipeline.sync()
        client.quit()
      }
    )

    ssc.start()
  }
}



2014-02-02 Ewen Cheslack-Postava <me...@ewencp.org>:

> Ah, sorry, I read too quickly and missed that this was for Spark
> Streaming. In that case you will get RDDs from foreach, so I guess you want
> to use RDD.foreachPartition inside the call to DStream.foreach.
>
> -Ewen
>
>
> Luis Ángel Vicente Sánchez wrote:
>
> Thank Ewen! Now I understand why I was getting the error message. It seems
> that foreachPartition doesn't exists as part of the DStream class :-\ I
> will check API docs to find other alternatives.
>
>
>
>
> 2014-02-02 Ewen Cheslack-Postava <me...@ewencp.org>:
>
>> If you use anything created on the driver program within functions run on
>> workers, it needs to be serializable, but your pool of Redis connections is
>> not. Normally, the simple way to fix this is to use the *With methods of
>> RDD (mapWith, flatMapWith, filterWith, and in this case, foreachWith) to
>> instantiate a connection to Redis on a per-partition basis.
>>
>> But your logic for outputting the data doesn't look right.
>> cntWords.foreach's function parameter would be getting each (word,count)
>> element, *not the whole RDD*. You probably want to share a single Redis
>> pipeline for each RDD partition, which you can accomplish with
>> foreachPartition. It gives you an iterator over all elements in each
>> partition. It would look something like this:
>>
>> cntWords.foreachPartition(it =>
>>
>>   val pool = new Pool(new JedisPool(new JedisPoolConfig(), "localhost",
>> 6379, 2000))
>>   pool.withJedisClient { client =>
>>     val pipeline = client.pipeline()
>>     it.foreach { case (word,count) => pipeline.incrBy(word,count) }
>>     pipeline.sync()
>>   }
>> )
>>
>> Of course you wouldn't actually need the pool in that case, but I'm not
>> familiar with the Jedis library so I'm not sure how you'd create just a
>> single connection.
>>
>> -Ewen
>>
>>  Luis Ángel Vicente Sánchez <la...@gmail.com>
>>  February 2, 2014 at 7:18 AM
>> I'm trying to create a simple twitter word counter with spark-streaming
>> and I would like to store the word counts in redis. The program looks like
>> this:
>>
>> import org.apache.spark.storage.StorageLevel
>> import org.apache.spark.streaming.{ Seconds, StreamingContext }
>> import org.apache.spark.streaming.StreamingContext._
>> import org.apache.spark.streaming.twitter._
>> import org.sedis._
>> import redis.clients.jedis._
>>
>> object TwitterWordCount {
>>   def main(args: Array[String]) {
>>     if (args.length < 1) {
>>       System.err.println("Usage: TwitterWordCount <master> [filter1]
>> [filter2] ... [filterN]")
>>       System.exit(1)
>>     }
>>
>>     val (master, filters) = (args.head, args.tail)
>>
>>     val pool = new Pool(new JedisPool(new JedisPoolConfig(), "localhost",
>> 6379, 2000))
>>     val ssc = new StreamingContext(master, "TwitterWordCount",
>> Seconds(5), System.getenv("SPARK_HOME"),
>> StreamingContext.jarOfClass(this.getClass))
>>     val stream = TwitterUtils.createStream(ssc, None, filters,
>> StorageLevel.MEMORY_ONLY_SER)
>>
>>     val words = stream.flatMap(status =>
>> status.getText.toLowerCase.split(" ")).map(word => (word, 1l))
>>
>>     val cntWords = words.reduceByKey(_ + _)
>>
>>     cntWords.foreach(rdd =>
>>       pool.withJedisClient { client =>
>>         val pipeline = client.pipelined()
>>         rdd.foreach {
>>           case (word, count) =>
>>             pipeline.incrBy(word, count)
>>         }
>>         pipeline.sync()
>>       }
>>     )
>>
>>     ssc.start()
>>   }
>> }
>>
>> Everytime I run this program, I get this error:
>>
>> [error] 14/02/02 15:16:20 ERROR scheduler.JobScheduler: Error running job
>> streaming job 1391354180000 ms.0
>> [error] org.apache.spark.SparkException: Job aborted: Task not
>> serializable: java.io.NotSerializableException: redis.clients.jedis.Pipeline
>> [error] at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>> [error] at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
>> [error] at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> [error] at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> [error] at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
>> [error] at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
>> [error] at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:915)
>> [error] at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply(DAGScheduler.scala:912)
>> [error] at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply(DAGScheduler.scala:912)
>> [error] at scala.Option.foreach(Option.scala:236)
>> [error] at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:912)
>> [error] at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:911)
>> [error] at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> [error] at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> [error] at
>> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:911)
>> [error] at
>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:616)
>> [error] at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
>> [error] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>> [error] at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>> [error] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>> [error] at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>> [error] at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>> [error] at
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> [error] at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> [error] at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> [error] at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>> I have tried to not use redis pipelines and then I get the same error but
>> related to the Jedis client.
>>
>> Have anybody done something similar?
>>
>> Kind regards,
>>
>> Luis
>>
>> P.S. I have attached my build.sbt and the scala source code to this file.
>>
>>
>>
>

Re: Persisting RDD to Redis

Posted by Ewen Cheslack-Postava <me...@ewencp.org>.
Ah, sorry, I read too quickly and missed that this was for Spark 
Streaming. In that case you will get RDDs from foreach, so I guess you 
want to use RDD.foreachPartition inside the call to DStream.foreach.

-Ewen

Luis Ángel Vicente Sánchez wrote:
> Thank Ewen! Now I understand why I was getting the error message. It 
> seems that foreachPartition doesn't exists as part of the DStream 
> class :-\ I will check API docs to find other alternatives.
>
>
>
>
> 2014-02-02 Ewen Cheslack-Postava <me@ewencp.org <ma...@ewencp.org>>:
>
>     If you use anything created on the driver program within functions
>     run on workers, it needs to be serializable, but your pool of
>     Redis connections is not. Normally, the simple way to fix this is
>     to use the *With methods of RDD (mapWith, flatMapWith, filterWith,
>     and in this case, foreachWith) to instantiate a connection to
>     Redis on a per-partition basis.
>
>     But your logic for outputting the data doesn't look right.
>     cntWords.foreach's function parameter would be getting each
>     (word,count) element, *not the whole RDD*. You probably want to
>     share a single Redis pipeline for each RDD partition, which you
>     can accomplish with foreachPartition. It gives you an iterator
>     over all elements in each partition. It would look something like
>     this:
>
>     cntWords.foreachPartition(it =>
>
>       val pool = new Pool(new JedisPool(new JedisPoolConfig(),
>     "localhost", 6379, 2000))
>       pool.withJedisClient { client =>
>         val pipeline = client.pipeline()
>         it.foreach { case (word,count) => pipeline.incrBy(word,count) }
>         pipeline.sync()
>       }
>     )
>
>     Of course you wouldn't actually need the pool in that case, but
>     I'm not familiar with the Jedis library so I'm not sure how you'd
>     create just a single connection.
>
>     -Ewen
>>     Luis Ángel Vicente Sánchez <ma...@gmail.com>
>>     February 2, 2014 at 7:18 AM
>>     I'm trying to create a simple twitter word counter with
>>     spark-streaming and I would like to store the word counts in
>>     redis. The program looks like this:
>>
>>     import org.apache.spark.storage.StorageLevel
>>     import org.apache.spark.streaming.{ Seconds, StreamingContext }
>>     import org.apache.spark.streaming.StreamingContext._
>>     import org.apache.spark.streaming.twitter._
>>     import org.sedis._
>>     import redis.clients.jedis._
>>
>>     object TwitterWordCount {
>>       def main(args: Array[String]) {
>>         if (args.length < 1) {
>>           System.err.println("Usage: TwitterWordCount <master>
>>     [filter1] [filter2] ... [filterN]")
>>           System.exit(1)
>>         }
>>
>>         val (master, filters) = (args.head, args.tail)
>>
>>         val pool = new Pool(new JedisPool(new JedisPoolConfig(),
>>     "localhost", 6379, 2000))
>>         val ssc = new StreamingContext(master, "TwitterWordCount",
>>     Seconds(5), System.getenv("SPARK_HOME"),
>>     StreamingContext.jarOfClass(this.getClass))
>>         val stream = TwitterUtils.createStream(ssc, None, filters,
>>     StorageLevel.MEMORY_ONLY_SER)
>>
>>         val words = stream.flatMap(status ?
>>     status.getText.toLowerCase.split(" ")).map(word ? (word, 1l))
>>
>>         val cntWords = words.reduceByKey(_ + _)
>>
>>         cntWords.foreach(rdd ?
>>           pool.withJedisClient { client ?
>>             val pipeline = client.pipelined()
>>             rdd.foreach {
>>               case (word, count) ?
>>                 pipeline.incrBy(word, count)
>>             }
>>             pipeline.sync()
>>           }
>>         )
>>
>>         ssc.start()
>>       }
>>     }
>>
>>     Everytime I run this program, I get this error:
>>
>>     [error] 14/02/02 15:16:20 ERROR scheduler.JobScheduler: Error
>>     running job streaming job 1391354180000 ms.0
>>     [error] org.apache.spark.SparkException: Job aborted: Task not
>>     serializable: java.io.NotSerializableException:
>>     redis.clients.jedis.Pipeline
>>     [error] at
>>     org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>>     [error] at
>>     org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
>>     [error] at
>>     scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>     [error] at
>>     scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>     [error] at org.apache.spark.scheduler.DAGScheduler.org
>>     <http://org.apache.spark.scheduler.DAGScheduler.org>$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
>>     [error] at org.apache.spark.scheduler.DAGScheduler.org
>>     <http://org.apache.spark.scheduler.DAGScheduler.org>$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
>>     [error] at
>>     org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:915)
>>     [error] at
>>     org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply(DAGScheduler.scala:912)
>>     [error] at
>>     org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply(DAGScheduler.scala:912)
>>     [error] at scala.Option.foreach(Option.scala:236)
>>     [error] at
>>     org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:912)
>>     [error] at
>>     org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:911)
>>     [error] at
>>     scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>     [error] at
>>     scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>     [error] at
>>     org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:911)
>>     [error] at
>>     org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:616)
>>     [error] at
>>     org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
>>     [error] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>>     [error] at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>>     [error] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>>     [error] at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>>     [error] at
>>     akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>     [error] at
>>     scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>     [error] at
>>     scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>     [error] at
>>     scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>     [error] at
>>     scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>>     I have tried to not use redis pipelines and then I get the same
>>     error but related to the Jedis client.
>>
>>     Have anybody done something similar?
>>
>>     Kind regards,
>>
>>     Luis
>>
>>     P.S. I have attached my build.sbt and the scala source code to
>>     this file.
>>
>>
>

Re: Persisting RDD to Redis

Posted by Luis Ángel Vicente Sánchez <la...@gmail.com>.
Thank Ewen! Now I understand why I was getting the error message. It seems
that foreachPartition doesn't exists as part of the DStream class :-\ I
will check API docs to find other alternatives.




2014-02-02 Ewen Cheslack-Postava <me...@ewencp.org>:

> If you use anything created on the driver program within functions run on
> workers, it needs to be serializable, but your pool of Redis connections is
> not. Normally, the simple way to fix this is to use the *With methods of
> RDD (mapWith, flatMapWith, filterWith, and in this case, foreachWith) to
> instantiate a connection to Redis on a per-partition basis.
>
> But your logic for outputting the data doesn't look right.
> cntWords.foreach's function parameter would be getting each (word,count)
> element, *not the whole RDD*. You probably want to share a single Redis
> pipeline for each RDD partition, which you can accomplish with
> foreachPartition. It gives you an iterator over all elements in each
> partition. It would look something like this:
>
> cntWords.foreachPartition(it =>
>
>   val pool = new Pool(new JedisPool(new JedisPoolConfig(), "localhost",
> 6379, 2000))
>   pool.withJedisClient { client =>
>     val pipeline = client.pipeline()
>     it.foreach { case (word,count) => pipeline.incrBy(word,count) }
>     pipeline.sync()
>   }
> )
>
> Of course you wouldn't actually need the pool in that case, but I'm not
> familiar with the Jedis library so I'm not sure how you'd create just a
> single connection.
>
> -Ewen
>
>   Luis Ángel Vicente Sánchez <la...@gmail.com>
>  February 2, 2014 at 7:18 AM
> I'm trying to create a simple twitter word counter with spark-streaming
> and I would like to store the word counts in redis. The program looks like
> this:
>
> import org.apache.spark.storage.StorageLevel
> import org.apache.spark.streaming.{ Seconds, StreamingContext }
> import org.apache.spark.streaming.StreamingContext._
> import org.apache.spark.streaming.twitter._
> import org.sedis._
> import redis.clients.jedis._
>
> object TwitterWordCount {
>   def main(args: Array[String]) {
>     if (args.length < 1) {
>       System.err.println("Usage: TwitterWordCount <master> [filter1]
> [filter2] ... [filterN]")
>       System.exit(1)
>     }
>
>     val (master, filters) = (args.head, args.tail)
>
>     val pool = new Pool(new JedisPool(new JedisPoolConfig(), "localhost",
> 6379, 2000))
>     val ssc = new StreamingContext(master, "TwitterWordCount", Seconds(5),
> System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
>     val stream = TwitterUtils.createStream(ssc, None, filters,
> StorageLevel.MEMORY_ONLY_SER)
>
>     val words = stream.flatMap(status => status.getText.toLowerCase.split("
> ")).map(word => (word, 1l))
>
>     val cntWords = words.reduceByKey(_ + _)
>
>     cntWords.foreach(rdd =>
>       pool.withJedisClient { client =>
>         val pipeline = client.pipelined()
>         rdd.foreach {
>           case (word, count) =>
>             pipeline.incrBy(word, count)
>         }
>         pipeline.sync()
>       }
>     )
>
>     ssc.start()
>   }
> }
>
> Everytime I run this program, I get this error:
>
> [error] 14/02/02 15:16:20 ERROR scheduler.JobScheduler: Error running job
> streaming job 1391354180000 ms.0
> [error] org.apache.spark.SparkException: Job aborted: Task not
> serializable: java.io.NotSerializableException: redis.clients.jedis.Pipeline
> [error] at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
> [error] at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
> [error] at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> [error] at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> [error] at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
> [error] at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
> [error] at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:915)
> [error] at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply(DAGScheduler.scala:912)
> [error] at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply(DAGScheduler.scala:912)
> [error] at scala.Option.foreach(Option.scala:236)
> [error] at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:912)
> [error] at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:911)
> [error] at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> [error] at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> [error] at
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:911)
> [error] at
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:616)
> [error] at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
> [error] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> [error] at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> [error] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> [error] at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> [error] at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> [error] at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [error] at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [error] at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [error] at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> I have tried to not use redis pipelines and then I get the same error but
> related to the Jedis client.
>
> Have anybody done something similar?
>
> Kind regards,
>
> Luis
>
> P.S. I have attached my build.sbt and the scala source code to this file.
>
>
>

Re: Persisting RDD to Redis

Posted by Ewen Cheslack-Postava <me...@ewencp.org>.
If you use anything created on the driver program within functions run 
on workers, it needs to be serializable, but your pool of Redis 
connections is not. Normally, the simple way to fix this is to use the 
*With methods of RDD (mapWith, flatMapWith, filterWith, and in this 
case, foreachWith) to instantiate a connection to Redis on a 
per-partition basis.

But your logic for outputting the data doesn't look right. 
cntWords.foreach's function parameter would be getting each (word,count) 
element, *not the whole RDD*. You probably want to share a single Redis 
pipeline for each RDD partition, which you can accomplish with 
foreachPartition. It gives you an iterator over all elements in each 
partition. It would look something like this:

cntWords.foreachPartition(it =>
   val pool = new Pool(new JedisPool(new JedisPoolConfig(), "localhost", 
6379, 2000))
   pool.withJedisClient { client =>
     val pipeline = client.pipeline()
     it.foreach { case (word,count) => pipeline.incrBy(word,count) }
     pipeline.sync()
   }
)

Of course you wouldn't actually need the pool in that case, but I'm not 
familiar with the Jedis library so I'm not sure how you'd create just a 
single connection.

-Ewen
> Luis Ángel Vicente Sánchez <ma...@gmail.com>
> February 2, 2014 at 7:18 AM
> I'm trying to create a simple twitter word counter with 
> spark-streaming and I would like to store the word counts in redis. 
> The program looks like this:
>
> import org.apache.spark.storage.StorageLevel
> import org.apache.spark.streaming.{ Seconds, StreamingContext }
> import org.apache.spark.streaming.StreamingContext._
> import org.apache.spark.streaming.twitter._
> import org.sedis._
> import redis.clients.jedis._
>
> object TwitterWordCount {
>   def main(args: Array[String]) {
>     if (args.length < 1) {
>       System.err.println("Usage: TwitterWordCount <master> [filter1] 
> [filter2] ... [filterN]")
>       System.exit(1)
>     }
>
>     val (master, filters) = (args.head, args.tail)
>
>     val pool = new Pool(new JedisPool(new JedisPoolConfig(), 
> "localhost", 6379, 2000))
>     val ssc = new StreamingContext(master, "TwitterWordCount", 
> Seconds(5), System.getenv("SPARK_HOME"), 
> StreamingContext.jarOfClass(this.getClass))
>     val stream = TwitterUtils.createStream(ssc, None, filters, 
> StorageLevel.MEMORY_ONLY_SER)
>
>     val words = stream.flatMap(status ? 
> status.getText.toLowerCase.split(" ")).map(word ? (word, 1l))
>
>     val cntWords = words.reduceByKey(_ + _)
>
>     cntWords.foreach(rdd ?
>       pool.withJedisClient { client ?
>         val pipeline = client.pipelined()
>         rdd.foreach {
>           case (word, count) ?
>             pipeline.incrBy(word, count)
>         }
>         pipeline.sync()
>       }
>     )
>
>     ssc.start()
>   }
> }
>
> Everytime I run this program, I get this error:
>
> [error] 14/02/02 15:16:20 ERROR scheduler.JobScheduler: Error running 
> job streaming job 1391354180000 ms.0
> [error] org.apache.spark.SparkException: Job aborted: Task not 
> serializable: java.io.NotSerializableException: 
> redis.clients.jedis.Pipeline
> [error] at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
> [error] at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
> [error] at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> [error] at 
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> [error] at org.apache.spark.scheduler.DAGScheduler.org 
> <http://org.apache.spark.scheduler.DAGScheduler.org>$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
> [error] at org.apache.spark.scheduler.DAGScheduler.org 
> <http://org.apache.spark.scheduler.DAGScheduler.org>$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
> [error] at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:915)
> [error] at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply(DAGScheduler.scala:912)
> [error] at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply(DAGScheduler.scala:912)
> [error] at scala.Option.foreach(Option.scala:236)
> [error] at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:912)
> [error] at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:911)
> [error] at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> [error] at 
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> [error] at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:911)
> [error] at 
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:616)
> [error] at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
> [error] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> [error] at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> [error] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> [error] at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> [error] at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> [error] at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [error] at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [error] at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [error] at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> I have tried to not use redis pipelines and then I get the same error 
> but related to the Jedis client.
>
> Have anybody done something similar?
>
> Kind regards,
>
> Luis
>
> P.S. I have attached my build.sbt and the scala source code to this file.
>
>