You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by kpeng1 <kp...@gmail.com> on 2014/09/03 23:05:06 UTC

Spark Streaming into HBase

I have been trying to understand how spark streaming and hbase connect, but
have not been successful. What I am trying to do is given a spark stream,
process that stream and store the results in an hbase table. So far this is
what I have:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevel
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{HBaseAdmin,HTable,Put,Get}
import org.apache.hadoop.hbase.util.Bytes

def blah(row: Array[String]) {
  val hConf = new HBaseConfiguration()
  val hTable = new HTable(hConf, "table")
  val thePut = new Put(Bytes.toBytes(row(0)))
  thePut.add(Bytes.toBytes("cf"), Bytes.toBytes(row(0)),
Bytes.toBytes(row(0)))
  hTable.put(thePut)
}

val ssc = new StreamingContext(sc, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999,
StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.map(_.split(","))
val store = words.foreachRDD(rdd => rdd.foreach(blah))
ssc.start()

I am currently running the above code in spark-shell. I am not sure what I
am doing wrong.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-tp13378.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Spark Streaming into HBase

Posted by kpeng1 <kp...@gmail.com>.
Sean,

I create a streaming context near the bottom of the code (ssc) and
basically apply a foreachRDD on the resulting DStream so that I can get
access to the underlying RDD, which in return I apply a foreach on and pass
in my function which applies the storing logic.

Is there a different approach I should be using?

Thanks for the help.


On Wed, Sep 3, 2014 at 2:43 PM, Sean Owen-2 [via Apache Spark User List] <
ml-node+s1001560n13385h70@n3.nabble.com> wrote:

> This doesn't seem to have to do with HBase per se. Some function is
> getting the StreamingContext into the closure and that won't work. Is
> this exactly the code? since it doesn't reference a StreamingContext,
> but is there maybe a different version in reality that tries to use
> StreamingContext inside a function?
>
> On Wed, Sep 3, 2014 at 10:36 PM, Ted Yu <[hidden email]
> <http://user/SendEmail.jtp?type=node&node=13385&i=0>> wrote:
>
> > Adding back user@
> >
> > I am not familiar with the NotSerializableException. Can you show the
> full
> > stack trace ?
> >
> > See SPARK-1297 for changes you need to make so that Spark works with
> hbase
> > 0.98
> >
> > Cheers
> >
> >
> > On Wed, Sep 3, 2014 at 2:33 PM, Kevin Peng <[hidden email]
> <http://user/SendEmail.jtp?type=node&node=13385&i=1>> wrote:
> >>
> >> Ted,
> >>
> >> The hbase-site.xml is in the classpath (had worse issues before...
> until I
> >> figured that it wasn't in the path).
> >>
> >> I get the following error in the spark-shell:
> >> org.apache.spark.SparkException: Job aborted due to stage failure: Task
> >> not serializable: java.io.NotSerializableException:
> >> org.apache.spark.streaming.StreamingContext
> >>         at
> >> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.sc
>
> >> ...
> >>
> >> I also double checked the hbase table, just in case, and nothing new is
> >> written in there.
> >>
> >> I am using hbase version: 0.98.1-cdh5.1.0 the default one with the
> >> CDH5.1.0 distro.
> >>
> >> Thank you for the help.
> >>
> >>
> >> On Wed, Sep 3, 2014 at 2:09 PM, Ted Yu <[hidden email]
> <http://user/SendEmail.jtp?type=node&node=13385&i=2>> wrote:
> >>>
> >>> Is hbase-site.xml in the classpath ?
> >>> Do you observe any exception from the code below or in region server
> log
> >>> ?
> >>>
> >>> Which hbase release are you using ?
> >>>
> >>>
> >>> On Wed, Sep 3, 2014 at 2:05 PM, kpeng1 <[hidden email]
> <http://user/SendEmail.jtp?type=node&node=13385&i=3>> wrote:
> >>>>
> >>>> I have been trying to understand how spark streaming and hbase
> connect,
> >>>> but
> >>>> have not been successful. What I am trying to do is given a spark
> >>>> stream,
> >>>> process that stream and store the results in an hbase table. So far
> this
> >>>> is
> >>>> what I have:
> >>>>
> >>>> import org.apache.spark.SparkConf
> >>>> import org.apache.spark.streaming.{Seconds, StreamingContext}
> >>>> import org.apache.spark.streaming.StreamingContext._
> >>>> import org.apache.spark.storage.StorageLevel
> >>>> import org.apache.hadoop.hbase.HBaseConfiguration
> >>>> import org.apache.hadoop.hbase.client.{HBaseAdmin,HTable,Put,Get}
> >>>> import org.apache.hadoop.hbase.util.Bytes
> >>>>
> >>>> def blah(row: Array[String]) {
> >>>>   val hConf = new HBaseConfiguration()
> >>>>   val hTable = new HTable(hConf, "table")
> >>>>   val thePut = new Put(Bytes.toBytes(row(0)))
> >>>>   thePut.add(Bytes.toBytes("cf"), Bytes.toBytes(row(0)),
> >>>> Bytes.toBytes(row(0)))
> >>>>   hTable.put(thePut)
> >>>> }
> >>>>
> >>>> val ssc = new StreamingContext(sc, Seconds(1))
> >>>> val lines = ssc.socketTextStream("localhost", 9999,
> >>>> StorageLevel.MEMORY_AND_DISK_SER)
> >>>> val words = lines.map(_.split(","))
> >>>> val store = words.foreachRDD(rdd => rdd.foreach(blah))
> >>>> ssc.start()
> >>>>
> >>>> I am currently running the above code in spark-shell. I am not sure
> what
> >>>> I
> >>>> am doing wrong.
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> View this message in context:
> >>>>
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-tp13378.html
> >>>> Sent from the Apache Spark User List mailing list archive at
> Nabble.com.
> >>>>
> >>>> ---------------------------------------------------------------------
> >>>> To unsubscribe, e-mail: [hidden email]
> <http://user/SendEmail.jtp?type=node&node=13385&i=4>
> >>>> For additional commands, e-mail: [hidden email]
> <http://user/SendEmail.jtp?type=node&node=13385&i=5>
> >>>>
> >>>
> >>
> >
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [hidden email]
> <http://user/SendEmail.jtp?type=node&node=13385&i=6>
> For additional commands, e-mail: [hidden email]
> <http://user/SendEmail.jtp?type=node&node=13385&i=7>
>
>
>
> ------------------------------
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-tp13378p13385.html
>  To unsubscribe from Spark Streaming into HBase, click here
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=13378&code=a3BlbmcxQGdtYWlsLmNvbXwxMzM3OHwxMjA2NzA5NzQ3>
> .
> NAML
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-tp13378p13386.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Spark Streaming into HBase

Posted by Tathagata Das <ta...@gmail.com>.
Hmmm, even i dont understand why TheMain needs to be serializable. It might
be cleaner (as in avoid such mysterious closure issues) to actually create
a separate sbt/maven project (instead of a shell) and run the streaming
application from there.

TD


On Thu, Sep 4, 2014 at 10:25 AM, kpeng1 <kp...@gmail.com> wrote:

> Tathagata,
>
> Thanks for all the help.  It looks like the blah method doesn't need to be
> wrapped around a serializable object, but the main streaming calls do.  I
> am currently running everything from spark-shell so I did not have a main
> function and object to wrap the streaming, map, and foreach calls.  After
> wrapping those calls in an object and making that object Serializable,
> everything seems to be working.
>
> import org.apache.spark.SparkConf
> import org.apache.spark.streaming.{Seconds, StreamingContext}
> import org.apache.spark.streaming.StreamingContext._
> import org.apache.spark.storage.StorageLevel
> import org.apache.hadoop.hbase.HBaseConfiguration
> import org.apache.hadoop.hbase.client.{HBaseAdmin,HTable,Put,Get}
> import org.apache.hadoop.hbase.util.Bytes
>
> object Blaher {
>   def blah(row: Array[String]) {
>     val hConf = new HBaseConfiguration()
>     val hTable = new HTable(hConf, "table")
>     val thePut = new Put(Bytes.toBytes(row(0)))
>      thePut.add(Bytes.toBytes("cf"), Bytes.toBytes(row(0)),
> Bytes.toBytes(row(0)))
>     hTable.put(thePut)
>   }
> }
>
> object TheMain extends Serializable{
>   def run() {
>     val ssc = new StreamingContext(sc, Seconds(1))
>     val lines = ssc.socketTextStream("localhost", 9977,
> StorageLevel.MEMORY_AND_DISK_SER)
>     val words = lines.map(_.split(","))
>     val store = words.foreachRDD(rdd => rdd.foreach(Blaher.blah))
>     ssc.start()
>   }
> }
>
> TheMain.run()
>
> Though, I don't really understand why TheMain object needs to be
> Serializable, but the Blaher object doesn't.
>
>
>
>
> On Wed, Sep 3, 2014 at 7:59 PM, Tathagata Das [via Apache Spark User List]
> <[hidden email] <http://user/SendEmail.jtp?type=node&node=13478&i=0>>
> wrote:
>
>> This is some issue with how Scala computes closures. Here because of the
>> function blah it is trying the serialize the whole function that this code
>> is part of. Can you define the function blah outside the main function?  In
>> fact you canTry putting the function in a serializable object.
>>
>> object BlahFunction extends Serializable {
>>
>>    def blah(row: Array[Byte]) { .... }
>> }
>>
>> On a related note, opening a connection for every record in the RDD is
>> pretty inefficient. Use rdd.foreachPartition instead - open the connection,
>> write the whole partition, and then close the conneciton.
>>
>> TD
>>
>>
>> On Wed, Sep 3, 2014 at 4:24 PM, Kevin Peng <[hidden email]
>> <http://user/SendEmail.jtp?type=node&node=13406&i=0>> wrote:
>>
>>> Ted,
>>>
>>> Here is the full stack trace coming from spark-shell:
>>>
>>> 14/09/03 16:21:03 ERROR scheduler.JobScheduler: Error running job
>>> streaming job 1409786463000 ms.0
>>>
>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>> not serializable: java.io.NotSerializableException:
>>> org.apache.spark.streaming.StreamingContext
>>>
>>> at org.apache.spark.scheduler.DAGScheduler.org
>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
>>>
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
>>>
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
>>>
>>> at
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>
>>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>
>>> at
>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
>>>
>>> at org.apache.spark.scheduler.DAGScheduler.org
>>> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770)
>>>
>>> at org.apache.spark.scheduler.DAGScheduler.org
>>> $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713)
>>>
>>> at
>>> org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697)
>>>
>>> at
>>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176)
>>>
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>>>
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>>>
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>>>
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>>>
>>> at
>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>>
>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>
>>> at
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>
>>>
>>> Basically, what I am doing on the terminal where I run nc -lk, I type in
>>> words separated by commas and hit enter i.e. "bill,ted".
>>>
>>>
>>> On Wed, Sep 3, 2014 at 2:36 PM, Ted Yu <[hidden email]
>>> <http://user/SendEmail.jtp?type=node&node=13406&i=1>> wrote:
>>>
>>>> Adding back user@
>>>>
>>>> I am not familiar with the NotSerializableException. Can you show the
>>>> full stack trace ?
>>>>
>>>> See SPARK-1297 for changes you need to make so that Spark works with
>>>> hbase 0.98
>>>>
>>>> Cheers
>>>>
>>>>
>>>> On Wed, Sep 3, 2014 at 2:33 PM, Kevin Peng <[hidden email]
>>>> <http://user/SendEmail.jtp?type=node&node=13406&i=2>> wrote:
>>>>
>>>>> Ted,
>>>>>
>>>>> The hbase-site.xml is in the classpath (had worse issues before...
>>>>> until I figured that it wasn't in the path).
>>>>>
>>>>> I get the following error in the spark-shell:
>>>>> org.apache.spark.SparkException: Job aborted due to stage failure:
>>>>> Task not serializable: java.io.NotSerializableException:
>>>>> org.apache.spark.streaming.StreamingContext
>>>>>         at org.apache.spark.scheduler.DAGScheduler.org
>>>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.sc
>>>>> ...
>>>>>
>>>>> I also double checked the hbase table, just in case, and nothing new
>>>>> is written in there.
>>>>>
>>>>> I am using hbase version: 0.98.1-cdh5.1.0 the default one with the
>>>>> CDH5.1.0 distro.
>>>>>
>>>>> Thank you for the help.
>>>>>
>>>>>
>>>>> On Wed, Sep 3, 2014 at 2:09 PM, Ted Yu <[hidden email]
>>>>> <http://user/SendEmail.jtp?type=node&node=13406&i=3>> wrote:
>>>>>
>>>>>> Is hbase-site.xml in the classpath ?
>>>>>> Do you observe any exception from the code below or in region server
>>>>>> log ?
>>>>>>
>>>>>> Which hbase release are you using ?
>>>>>>
>>>>>>
>>>>>> On Wed, Sep 3, 2014 at 2:05 PM, kpeng1 <[hidden email]
>>>>>> <http://user/SendEmail.jtp?type=node&node=13406&i=4>> wrote:
>>>>>>
>>>>>>> I have been trying to understand how spark streaming and hbase
>>>>>>> connect, but
>>>>>>> have not been successful. What I am trying to do is given a spark
>>>>>>> stream,
>>>>>>> process that stream and store the results in an hbase table. So far
>>>>>>> this is
>>>>>>> what I have:
>>>>>>>
>>>>>>> import org.apache.spark.SparkConf
>>>>>>> import org.apache.spark.streaming.{Seconds, StreamingContext}
>>>>>>> import org.apache.spark.streaming.StreamingContext._
>>>>>>> import org.apache.spark.storage.StorageLevel
>>>>>>> import org.apache.hadoop.hbase.HBaseConfiguration
>>>>>>> import org.apache.hadoop.hbase.client.{HBaseAdmin,HTable,Put,Get}
>>>>>>> import org.apache.hadoop.hbase.util.Bytes
>>>>>>>
>>>>>>> def blah(row: Array[String]) {
>>>>>>>   val hConf = new HBaseConfiguration()
>>>>>>>   val hTable = new HTable(hConf, "table")
>>>>>>>   val thePut = new Put(Bytes.toBytes(row(0)))
>>>>>>>   thePut.add(Bytes.toBytes("cf"), Bytes.toBytes(row(0)),
>>>>>>> Bytes.toBytes(row(0)))
>>>>>>>   hTable.put(thePut)
>>>>>>> }
>>>>>>>
>>>>>>> val ssc = new StreamingContext(sc, Seconds(1))
>>>>>>> val lines = ssc.socketTextStream("localhost", 9999,
>>>>>>> StorageLevel.MEMORY_AND_DISK_SER)
>>>>>>> val words = lines.map(_.split(","))
>>>>>>> val store = words.foreachRDD(rdd => rdd.foreach(blah))
>>>>>>> ssc.start()
>>>>>>>
>>>>>>> I am currently running the above code in spark-shell. I am not sure
>>>>>>> what I
>>>>>>> am doing wrong.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> View this message in context:
>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-tp13378.html
>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>> Nabble.com.
>>>>>>>
>>>>>>> ---------------------------------------------------------------------
>>>>>>> To unsubscribe, e-mail: [hidden email]
>>>>>>> <http://user/SendEmail.jtp?type=node&node=13406&i=5>
>>>>>>> For additional commands, e-mail: [hidden email]
>>>>>>> <http://user/SendEmail.jtp?type=node&node=13406&i=6>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>>
>> ------------------------------
>>  If you reply to this email, your message will be added to the
>> discussion below:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-tp13378p13406.html
>>  To unsubscribe from Spark Streaming into HBase, click here.
>> NAML
>> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>
>
>
> ------------------------------
> View this message in context: Re: Spark Streaming into HBase
> <http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-tp13378p13478.html>
> Sent from the Apache Spark User List mailing list archive
> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>

Re: Spark Streaming into HBase

Posted by kpeng1 <kp...@gmail.com>.
Tathagata,

Thanks for all the help.  It looks like the blah method doesn't need to be
wrapped around a serializable object, but the main streaming calls do.  I
am currently running everything from spark-shell so I did not have a main
function and object to wrap the streaming, map, and foreach calls.  After
wrapping those calls in an object and making that object Serializable,
everything seems to be working.

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevel
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{HBaseAdmin,HTable,Put,Get}
import org.apache.hadoop.hbase.util.Bytes

object Blaher {
  def blah(row: Array[String]) {
    val hConf = new HBaseConfiguration()
    val hTable = new HTable(hConf, "table")
    val thePut = new Put(Bytes.toBytes(row(0)))
    thePut.add(Bytes.toBytes("cf"), Bytes.toBytes(row(0)),
Bytes.toBytes(row(0)))
    hTable.put(thePut)
  }
}

object TheMain extends Serializable{
  def run() {
    val ssc = new StreamingContext(sc, Seconds(1))
    val lines = ssc.socketTextStream("localhost", 9977,
StorageLevel.MEMORY_AND_DISK_SER)
    val words = lines.map(_.split(","))
    val store = words.foreachRDD(rdd => rdd.foreach(Blaher.blah))
    ssc.start()
  }
}

TheMain.run()

Though, I don't really understand why TheMain object needs to be
Serializable, but the Blaher object doesn't.




On Wed, Sep 3, 2014 at 7:59 PM, Tathagata Das [via Apache Spark User List] <
ml-node+s1001560n13406h21@n3.nabble.com> wrote:

> This is some issue with how Scala computes closures. Here because of the
> function blah it is trying the serialize the whole function that this code
> is part of. Can you define the function blah outside the main function?  In
> fact you canTry putting the function in a serializable object.
>
> object BlahFunction extends Serializable {
>
>    def blah(row: Array[Byte]) { .... }
> }
>
> On a related note, opening a connection for every record in the RDD is
> pretty inefficient. Use rdd.foreachPartition instead - open the connection,
> write the whole partition, and then close the conneciton.
>
> TD
>
>
> On Wed, Sep 3, 2014 at 4:24 PM, Kevin Peng <[hidden email]
> <http://user/SendEmail.jtp?type=node&node=13406&i=0>> wrote:
>
>> Ted,
>>
>> Here is the full stack trace coming from spark-shell:
>>
>> 14/09/03 16:21:03 ERROR scheduler.JobScheduler: Error running job
>> streaming job 1409786463000 ms.0
>>
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> not serializable: java.io.NotSerializableException:
>> org.apache.spark.streaming.StreamingContext
>>
>> at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
>>
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
>>
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
>>
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>
>> at
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
>>
>> at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770)
>>
>> at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713)
>>
>> at
>> org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697)
>>
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176)
>>
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>>
>> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>>
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>>
>> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>>
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>
>> at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>>
>> Basically, what I am doing on the terminal where I run nc -lk, I type in
>> words separated by commas and hit enter i.e. "bill,ted".
>>
>>
>> On Wed, Sep 3, 2014 at 2:36 PM, Ted Yu <[hidden email]
>> <http://user/SendEmail.jtp?type=node&node=13406&i=1>> wrote:
>>
>>> Adding back user@
>>>
>>> I am not familiar with the NotSerializableException. Can you show the
>>> full stack trace ?
>>>
>>> See SPARK-1297 for changes you need to make so that Spark works with
>>> hbase 0.98
>>>
>>> Cheers
>>>
>>>
>>> On Wed, Sep 3, 2014 at 2:33 PM, Kevin Peng <[hidden email]
>>> <http://user/SendEmail.jtp?type=node&node=13406&i=2>> wrote:
>>>
>>>> Ted,
>>>>
>>>> The hbase-site.xml is in the classpath (had worse issues before...
>>>> until I figured that it wasn't in the path).
>>>>
>>>> I get the following error in the spark-shell:
>>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>>> not serializable: java.io.NotSerializableException:
>>>> org.apache.spark.streaming.StreamingContext
>>>>         at org.apache.spark.scheduler.DAGScheduler.org
>>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.sc
>>>> ...
>>>>
>>>> I also double checked the hbase table, just in case, and nothing new is
>>>> written in there.
>>>>
>>>> I am using hbase version: 0.98.1-cdh5.1.0 the default one with the
>>>> CDH5.1.0 distro.
>>>>
>>>> Thank you for the help.
>>>>
>>>>
>>>> On Wed, Sep 3, 2014 at 2:09 PM, Ted Yu <[hidden email]
>>>> <http://user/SendEmail.jtp?type=node&node=13406&i=3>> wrote:
>>>>
>>>>> Is hbase-site.xml in the classpath ?
>>>>> Do you observe any exception from the code below or in region server
>>>>> log ?
>>>>>
>>>>> Which hbase release are you using ?
>>>>>
>>>>>
>>>>> On Wed, Sep 3, 2014 at 2:05 PM, kpeng1 <[hidden email]
>>>>> <http://user/SendEmail.jtp?type=node&node=13406&i=4>> wrote:
>>>>>
>>>>>> I have been trying to understand how spark streaming and hbase
>>>>>> connect, but
>>>>>> have not been successful. What I am trying to do is given a spark
>>>>>> stream,
>>>>>> process that stream and store the results in an hbase table. So far
>>>>>> this is
>>>>>> what I have:
>>>>>>
>>>>>> import org.apache.spark.SparkConf
>>>>>> import org.apache.spark.streaming.{Seconds, StreamingContext}
>>>>>> import org.apache.spark.streaming.StreamingContext._
>>>>>> import org.apache.spark.storage.StorageLevel
>>>>>> import org.apache.hadoop.hbase.HBaseConfiguration
>>>>>> import org.apache.hadoop.hbase.client.{HBaseAdmin,HTable,Put,Get}
>>>>>> import org.apache.hadoop.hbase.util.Bytes
>>>>>>
>>>>>> def blah(row: Array[String]) {
>>>>>>   val hConf = new HBaseConfiguration()
>>>>>>   val hTable = new HTable(hConf, "table")
>>>>>>   val thePut = new Put(Bytes.toBytes(row(0)))
>>>>>>   thePut.add(Bytes.toBytes("cf"), Bytes.toBytes(row(0)),
>>>>>> Bytes.toBytes(row(0)))
>>>>>>   hTable.put(thePut)
>>>>>> }
>>>>>>
>>>>>> val ssc = new StreamingContext(sc, Seconds(1))
>>>>>> val lines = ssc.socketTextStream("localhost", 9999,
>>>>>> StorageLevel.MEMORY_AND_DISK_SER)
>>>>>> val words = lines.map(_.split(","))
>>>>>> val store = words.foreachRDD(rdd => rdd.foreach(blah))
>>>>>> ssc.start()
>>>>>>
>>>>>> I am currently running the above code in spark-shell. I am not sure
>>>>>> what I
>>>>>> am doing wrong.
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> View this message in context:
>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-tp13378.html
>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>> Nabble.com.
>>>>>>
>>>>>> ---------------------------------------------------------------------
>>>>>> To unsubscribe, e-mail: [hidden email]
>>>>>> <http://user/SendEmail.jtp?type=node&node=13406&i=5>
>>>>>> For additional commands, e-mail: [hidden email]
>>>>>> <http://user/SendEmail.jtp?type=node&node=13406&i=6>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>
>
> ------------------------------
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-tp13378p13406.html
>  To unsubscribe from Spark Streaming into HBase, click here
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=13378&code=a3BlbmcxQGdtYWlsLmNvbXwxMzM3OHwxMjA2NzA5NzQ3>
> .
> NAML
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-tp13378p13478.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Spark Streaming into HBase

Posted by Tathagata Das <ta...@gmail.com>.
This is some issue with how Scala computes closures. Here because of the
function blah it is trying the serialize the whole function that this code
is part of. Can you define the function blah outside the main function?  In
fact you canTry putting the function in a serializable object.

object BlahFunction extends Serializable {

   def blah(row: Array[Byte]) { .... }
}

On a related note, opening a connection for every record in the RDD is
pretty inefficient. Use rdd.foreachPartition instead - open the connection,
write the whole partition, and then close the conneciton.

TD


On Wed, Sep 3, 2014 at 4:24 PM, Kevin Peng <kp...@gmail.com> wrote:

> Ted,
>
> Here is the full stack trace coming from spark-shell:
>
> 14/09/03 16:21:03 ERROR scheduler.JobScheduler: Error running job
> streaming job 1409786463000 ms.0
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task
> not serializable: java.io.NotSerializableException:
> org.apache.spark.streaming.StreamingContext
>
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
>
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
>
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
>
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
>
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770)
>
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713)
>
> at
> org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697)
>
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176)
>
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
> Basically, what I am doing on the terminal where I run nc -lk, I type in
> words separated by commas and hit enter i.e. "bill,ted".
>
>
> On Wed, Sep 3, 2014 at 2:36 PM, Ted Yu <yu...@gmail.com> wrote:
>
>> Adding back user@
>>
>> I am not familiar with the NotSerializableException. Can you show the
>> full stack trace ?
>>
>> See SPARK-1297 for changes you need to make so that Spark works with
>> hbase 0.98
>>
>> Cheers
>>
>>
>> On Wed, Sep 3, 2014 at 2:33 PM, Kevin Peng <kp...@gmail.com> wrote:
>>
>>> Ted,
>>>
>>> The hbase-site.xml is in the classpath (had worse issues before... until
>>> I figured that it wasn't in the path).
>>>
>>> I get the following error in the spark-shell:
>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>> not serializable: java.io.NotSerializableException:
>>> org.apache.spark.streaming.StreamingContext
>>>         at org.apache.spark.scheduler.DAGScheduler.org
>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.sc
>>> ...
>>>
>>> I also double checked the hbase table, just in case, and nothing new is
>>> written in there.
>>>
>>> I am using hbase version: 0.98.1-cdh5.1.0 the default one with the
>>> CDH5.1.0 distro.
>>>
>>> Thank you for the help.
>>>
>>>
>>> On Wed, Sep 3, 2014 at 2:09 PM, Ted Yu <yu...@gmail.com> wrote:
>>>
>>>> Is hbase-site.xml in the classpath ?
>>>> Do you observe any exception from the code below or in region server
>>>> log ?
>>>>
>>>> Which hbase release are you using ?
>>>>
>>>>
>>>> On Wed, Sep 3, 2014 at 2:05 PM, kpeng1 <kp...@gmail.com> wrote:
>>>>
>>>>> I have been trying to understand how spark streaming and hbase
>>>>> connect, but
>>>>> have not been successful. What I am trying to do is given a spark
>>>>> stream,
>>>>> process that stream and store the results in an hbase table. So far
>>>>> this is
>>>>> what I have:
>>>>>
>>>>> import org.apache.spark.SparkConf
>>>>> import org.apache.spark.streaming.{Seconds, StreamingContext}
>>>>> import org.apache.spark.streaming.StreamingContext._
>>>>> import org.apache.spark.storage.StorageLevel
>>>>> import org.apache.hadoop.hbase.HBaseConfiguration
>>>>> import org.apache.hadoop.hbase.client.{HBaseAdmin,HTable,Put,Get}
>>>>> import org.apache.hadoop.hbase.util.Bytes
>>>>>
>>>>> def blah(row: Array[String]) {
>>>>>   val hConf = new HBaseConfiguration()
>>>>>   val hTable = new HTable(hConf, "table")
>>>>>   val thePut = new Put(Bytes.toBytes(row(0)))
>>>>>   thePut.add(Bytes.toBytes("cf"), Bytes.toBytes(row(0)),
>>>>> Bytes.toBytes(row(0)))
>>>>>   hTable.put(thePut)
>>>>> }
>>>>>
>>>>> val ssc = new StreamingContext(sc, Seconds(1))
>>>>> val lines = ssc.socketTextStream("localhost", 9999,
>>>>> StorageLevel.MEMORY_AND_DISK_SER)
>>>>> val words = lines.map(_.split(","))
>>>>> val store = words.foreachRDD(rdd => rdd.foreach(blah))
>>>>> ssc.start()
>>>>>
>>>>> I am currently running the above code in spark-shell. I am not sure
>>>>> what I
>>>>> am doing wrong.
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> View this message in context:
>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-tp13378.html
>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>> Nabble.com.
>>>>>
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Spark Streaming into HBase

Posted by Kevin Peng <kp...@gmail.com>.
Ted,

Here is the full stack trace coming from spark-shell:

14/09/03 16:21:03 ERROR scheduler.JobScheduler: Error running job streaming
job 1409786463000 ms.0

org.apache.spark.SparkException: Job aborted due to stage failure: Task not
serializable: java.io.NotSerializableException:
org.apache.spark.streaming.StreamingContext

at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)

at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)

at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770)

at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713)

at
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697)

at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)

at akka.actor.ActorCell.invoke(ActorCell.scala:456)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)

at akka.dispatch.Mailbox.run(Mailbox.scala:219)

at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Basically, what I am doing on the terminal where I run nc -lk, I type in
words separated by commas and hit enter i.e. "bill,ted".


On Wed, Sep 3, 2014 at 2:36 PM, Ted Yu <yu...@gmail.com> wrote:

> Adding back user@
>
> I am not familiar with the NotSerializableException. Can you show the
> full stack trace ?
>
> See SPARK-1297 for changes you need to make so that Spark works with
> hbase 0.98
>
> Cheers
>
>
> On Wed, Sep 3, 2014 at 2:33 PM, Kevin Peng <kp...@gmail.com> wrote:
>
>> Ted,
>>
>> The hbase-site.xml is in the classpath (had worse issues before... until
>> I figured that it wasn't in the path).
>>
>> I get the following error in the spark-shell:
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> not serializable: java.io.NotSerializableException:
>> org.apache.spark.streaming.StreamingContext
>>         at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.sc
>> ...
>>
>> I also double checked the hbase table, just in case, and nothing new is
>> written in there.
>>
>> I am using hbase version: 0.98.1-cdh5.1.0 the default one with the
>> CDH5.1.0 distro.
>>
>> Thank you for the help.
>>
>>
>> On Wed, Sep 3, 2014 at 2:09 PM, Ted Yu <yu...@gmail.com> wrote:
>>
>>> Is hbase-site.xml in the classpath ?
>>> Do you observe any exception from the code below or in region server log
>>> ?
>>>
>>> Which hbase release are you using ?
>>>
>>>
>>> On Wed, Sep 3, 2014 at 2:05 PM, kpeng1 <kp...@gmail.com> wrote:
>>>
>>>> I have been trying to understand how spark streaming and hbase connect,
>>>> but
>>>> have not been successful. What I am trying to do is given a spark
>>>> stream,
>>>> process that stream and store the results in an hbase table. So far
>>>> this is
>>>> what I have:
>>>>
>>>> import org.apache.spark.SparkConf
>>>> import org.apache.spark.streaming.{Seconds, StreamingContext}
>>>> import org.apache.spark.streaming.StreamingContext._
>>>> import org.apache.spark.storage.StorageLevel
>>>> import org.apache.hadoop.hbase.HBaseConfiguration
>>>> import org.apache.hadoop.hbase.client.{HBaseAdmin,HTable,Put,Get}
>>>> import org.apache.hadoop.hbase.util.Bytes
>>>>
>>>> def blah(row: Array[String]) {
>>>>   val hConf = new HBaseConfiguration()
>>>>   val hTable = new HTable(hConf, "table")
>>>>   val thePut = new Put(Bytes.toBytes(row(0)))
>>>>   thePut.add(Bytes.toBytes("cf"), Bytes.toBytes(row(0)),
>>>> Bytes.toBytes(row(0)))
>>>>   hTable.put(thePut)
>>>> }
>>>>
>>>> val ssc = new StreamingContext(sc, Seconds(1))
>>>> val lines = ssc.socketTextStream("localhost", 9999,
>>>> StorageLevel.MEMORY_AND_DISK_SER)
>>>> val words = lines.map(_.split(","))
>>>> val store = words.foreachRDD(rdd => rdd.foreach(blah))
>>>> ssc.start()
>>>>
>>>> I am currently running the above code in spark-shell. I am not sure
>>>> what I
>>>> am doing wrong.
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-tp13378.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>
>>>>
>>>
>>
>

Re: Spark Streaming into HBase

Posted by Sean Owen <so...@cloudera.com>.
This doesn't seem to have to do with HBase per se. Some function is
getting the StreamingContext into the closure and that won't work. Is
this exactly the code? since it doesn't reference a StreamingContext,
but is there maybe a different version in reality that tries to use
StreamingContext inside a function?

On Wed, Sep 3, 2014 at 10:36 PM, Ted Yu <yu...@gmail.com> wrote:
> Adding back user@
>
> I am not familiar with the NotSerializableException. Can you show the full
> stack trace ?
>
> See SPARK-1297 for changes you need to make so that Spark works with hbase
> 0.98
>
> Cheers
>
>
> On Wed, Sep 3, 2014 at 2:33 PM, Kevin Peng <kp...@gmail.com> wrote:
>>
>> Ted,
>>
>> The hbase-site.xml is in the classpath (had worse issues before... until I
>> figured that it wasn't in the path).
>>
>> I get the following error in the spark-shell:
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> not serializable: java.io.NotSerializableException:
>> org.apache.spark.streaming.StreamingContext
>>         at
>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.sc
>> ...
>>
>> I also double checked the hbase table, just in case, and nothing new is
>> written in there.
>>
>> I am using hbase version: 0.98.1-cdh5.1.0 the default one with the
>> CDH5.1.0 distro.
>>
>> Thank you for the help.
>>
>>
>> On Wed, Sep 3, 2014 at 2:09 PM, Ted Yu <yu...@gmail.com> wrote:
>>>
>>> Is hbase-site.xml in the classpath ?
>>> Do you observe any exception from the code below or in region server log
>>> ?
>>>
>>> Which hbase release are you using ?
>>>
>>>
>>> On Wed, Sep 3, 2014 at 2:05 PM, kpeng1 <kp...@gmail.com> wrote:
>>>>
>>>> I have been trying to understand how spark streaming and hbase connect,
>>>> but
>>>> have not been successful. What I am trying to do is given a spark
>>>> stream,
>>>> process that stream and store the results in an hbase table. So far this
>>>> is
>>>> what I have:
>>>>
>>>> import org.apache.spark.SparkConf
>>>> import org.apache.spark.streaming.{Seconds, StreamingContext}
>>>> import org.apache.spark.streaming.StreamingContext._
>>>> import org.apache.spark.storage.StorageLevel
>>>> import org.apache.hadoop.hbase.HBaseConfiguration
>>>> import org.apache.hadoop.hbase.client.{HBaseAdmin,HTable,Put,Get}
>>>> import org.apache.hadoop.hbase.util.Bytes
>>>>
>>>> def blah(row: Array[String]) {
>>>>   val hConf = new HBaseConfiguration()
>>>>   val hTable = new HTable(hConf, "table")
>>>>   val thePut = new Put(Bytes.toBytes(row(0)))
>>>>   thePut.add(Bytes.toBytes("cf"), Bytes.toBytes(row(0)),
>>>> Bytes.toBytes(row(0)))
>>>>   hTable.put(thePut)
>>>> }
>>>>
>>>> val ssc = new StreamingContext(sc, Seconds(1))
>>>> val lines = ssc.socketTextStream("localhost", 9999,
>>>> StorageLevel.MEMORY_AND_DISK_SER)
>>>> val words = lines.map(_.split(","))
>>>> val store = words.foreachRDD(rdd => rdd.foreach(blah))
>>>> ssc.start()
>>>>
>>>> I am currently running the above code in spark-shell. I am not sure what
>>>> I
>>>> am doing wrong.
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-tp13378.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>
>>>
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Spark Streaming into HBase

Posted by Ted Yu <yu...@gmail.com>.
Adding back user@

I am not familiar with the NotSerializableException. Can you show the full
stack trace ?

See SPARK-1297 for changes you need to make so that Spark works with hbase
0.98

Cheers


On Wed, Sep 3, 2014 at 2:33 PM, Kevin Peng <kp...@gmail.com> wrote:

> Ted,
>
> The hbase-site.xml is in the classpath (had worse issues before... until I
> figured that it wasn't in the path).
>
> I get the following error in the spark-shell:
> org.apache.spark.SparkException: Job aborted due to stage failure: Task
> not serializable: java.io.NotSerializableException:
> org.apache.spark.streaming.StreamingContext
>         at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.sc
> ...
>
> I also double checked the hbase table, just in case, and nothing new is
> written in there.
>
> I am using hbase version: 0.98.1-cdh5.1.0 the default one with the
> CDH5.1.0 distro.
>
> Thank you for the help.
>
>
> On Wed, Sep 3, 2014 at 2:09 PM, Ted Yu <yu...@gmail.com> wrote:
>
>> Is hbase-site.xml in the classpath ?
>> Do you observe any exception from the code below or in region server log ?
>>
>> Which hbase release are you using ?
>>
>>
>> On Wed, Sep 3, 2014 at 2:05 PM, kpeng1 <kp...@gmail.com> wrote:
>>
>>> I have been trying to understand how spark streaming and hbase connect,
>>> but
>>> have not been successful. What I am trying to do is given a spark stream,
>>> process that stream and store the results in an hbase table. So far this
>>> is
>>> what I have:
>>>
>>> import org.apache.spark.SparkConf
>>> import org.apache.spark.streaming.{Seconds, StreamingContext}
>>> import org.apache.spark.streaming.StreamingContext._
>>> import org.apache.spark.storage.StorageLevel
>>> import org.apache.hadoop.hbase.HBaseConfiguration
>>> import org.apache.hadoop.hbase.client.{HBaseAdmin,HTable,Put,Get}
>>> import org.apache.hadoop.hbase.util.Bytes
>>>
>>> def blah(row: Array[String]) {
>>>   val hConf = new HBaseConfiguration()
>>>   val hTable = new HTable(hConf, "table")
>>>   val thePut = new Put(Bytes.toBytes(row(0)))
>>>   thePut.add(Bytes.toBytes("cf"), Bytes.toBytes(row(0)),
>>> Bytes.toBytes(row(0)))
>>>   hTable.put(thePut)
>>> }
>>>
>>> val ssc = new StreamingContext(sc, Seconds(1))
>>> val lines = ssc.socketTextStream("localhost", 9999,
>>> StorageLevel.MEMORY_AND_DISK_SER)
>>> val words = lines.map(_.split(","))
>>> val store = words.foreachRDD(rdd => rdd.foreach(blah))
>>> ssc.start()
>>>
>>> I am currently running the above code in spark-shell. I am not sure what
>>> I
>>> am doing wrong.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-tp13378.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> For additional commands, e-mail: user-help@spark.apache.org
>>>
>>>
>>
>