You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Patrick McGloin <mc...@gmail.com> on 2016/01/21 12:32:27 UTC
Spark Streaming Write Ahead Log (WAL) not replaying data after restart
Hi all,
To have a simple way of testing the Spark Streaming Write Ahead Log I
created a very simple Custom Input Receiver, which will generate strings
and store those:
class InMemoryStringReceiver extends
Receiver[String](StorageLevel.MEMORY_AND_DISK_SER) {
val batchID = System.currentTimeMillis()
def onStart() {
new Thread("InMemoryStringReceiver") {
override def run(): Unit = {
var i = 0
while(true) {
//http://spark.apache.org/docs/latest/streaming-custom-receivers.html
//To implement a reliable receiver, you have to use
store(multiple-records) to store data.
store(ArrayBuffer(s"$batchID-$i"))
println(s"Stored => [$batchID-$i)]")
Thread.sleep(1000L)
i = i + 1
}
}
}.start()
}
def onStop() {}
}
I then created a simple Application which will use the Custom Receiver to
stream the data and process it:
object DStreamResilienceTest extends App {
val conf = new
SparkConf().setMaster("local[*]").setAppName("DStreamResilienceTest").set("spark.streaming.receiver.writeAheadLog.enable",
"true")
val ssc = new StreamingContext(conf, Seconds(1))
ssc.checkpoint("hdfs://myhdfsserver/user/spark/checkpoint/DStreamResilienceTest")
val customReceiverStream: ReceiverInputDStream[String] =
ssc.receiverStream(new InMemoryStringReceiver())
customReceiverStream.foreachRDD { (rdd: RDD[String]) =>
println(s"processed => [${rdd.collect().toList}]")
Thread.sleep(2000L)
}
ssc.start()
ssc.awaitTermination()
}
As you can see the processing of each received RDD has sleep of 2 seconds
while the Strings are stored every second. This creates a backlog and the
new strings pile up, and should be stored in the WAL. Indeed, I can see the
files in the checkpoint dirs getting updated. Running the app I get output
like this:
[info] Stored => [1453374654941-0)]
[info] processed => [List(1453374654941-0)]
[info] Stored => [1453374654941-1)]
[info] Stored => [1453374654941-2)]
[info] processed => [List(1453374654941-1)]
[info] Stored => [1453374654941-3)]
[info] Stored => [1453374654941-4)]
[info] processed => [List(1453374654941-2)]
[info] Stored => [1453374654941-5)]
[info] Stored => [1453374654941-6)]
[info] processed => [List(1453374654941-3)]
[info] Stored => [1453374654941-7)]
[info] Stored => [1453374654941-8)]
[info] processed => [List(1453374654941-4)]
[info] Stored => [1453374654941-9)]
[info] Stored => [1453374654941-10)]
As you would expect, the storing is out pacing the processing. So I kill
the application and restart it. This time I commented out the sleep in the
foreachRDD so that the processing can clear any backlog:
[info] Stored => [1453374753946-0)]
[info] processed => [List(1453374753946-0)]
[info] Stored => [1453374753946-1)]
[info] processed => [List(1453374753946-1)]
[info] Stored => [1453374753946-2)]
[info] processed => [List(1453374753946-2)]
[info] Stored => [1453374753946-3)]
[info] processed => [List(1453374753946-3)]
[info] Stored => [1453374753946-4)]
[info] processed => [List(1453374753946-4)]
As you can see the new events are processed but none from the previous
batch. The old WAL logs are cleared and I see log messages like this but
the old data does not get processed.
INFO WriteAheadLogManager : Recovered 1 write ahead log files from
hdfs://myhdfsserver/user/spark/checkpoint/DStreamResilienceTest/receivedData/0
What am I doing wrong? I am using Spark 1.5.2.
Best regards,
Patrick
Re: Spark Streaming Write Ahead Log (WAL) not replaying data after restart
Posted by Patrick McGloin <mc...@gmail.com>.
Thank you Shixiong, that is what I was missing.
On 26 January 2016 at 00:27, Shixiong(Ryan) Zhu <sh...@databricks.com>
wrote:
> You need to define a create function and use StreamingContext.getOrCreate.
> See the example here:
> http://spark.apache.org/docs/latest/streaming-programming-guide.html#how-to-configure-checkpointing
>
> On Thu, Jan 21, 2016 at 3:32 AM, Patrick McGloin <
> mcgloin.patrick@gmail.com> wrote:
>
>> Hi all,
>>
>> To have a simple way of testing the Spark Streaming Write Ahead Log I
>> created a very simple Custom Input Receiver, which will generate strings
>> and store those:
>>
>> class InMemoryStringReceiver extends Receiver[String](StorageLevel.MEMORY_AND_DISK_SER) {
>>
>> val batchID = System.currentTimeMillis()
>>
>> def onStart() {
>> new Thread("InMemoryStringReceiver") {
>> override def run(): Unit = {
>> var i = 0
>> while(true) {
>> //http://spark.apache.org/docs/latest/streaming-custom-receivers.html
>> //To implement a reliable receiver, you have to use store(multiple-records) to store data.
>> store(ArrayBuffer(s"$batchID-$i"))
>> println(s"Stored => [$batchID-$i)]")
>> Thread.sleep(1000L)
>> i = i + 1
>> }
>> }
>> }.start()
>> }
>>
>> def onStop() {}
>> }
>>
>> I then created a simple Application which will use the Custom Receiver to
>> stream the data and process it:
>>
>> object DStreamResilienceTest extends App {
>>
>> val conf = new SparkConf().setMaster("local[*]").setAppName("DStreamResilienceTest").set("spark.streaming.receiver.writeAheadLog.enable", "true")
>> val ssc = new StreamingContext(conf, Seconds(1))
>> ssc.checkpoint("hdfs://myhdfsserver/user/spark/checkpoint/DStreamResilienceTest")
>> val customReceiverStream: ReceiverInputDStream[String] = ssc.receiverStream(new InMemoryStringReceiver())
>> customReceiverStream.foreachRDD { (rdd: RDD[String]) =>
>> println(s"processed => [${rdd.collect().toList}]")
>> Thread.sleep(2000L)
>> }
>> ssc.start()
>> ssc.awaitTermination()
>>
>> }
>>
>> As you can see the processing of each received RDD has sleep of 2 seconds
>> while the Strings are stored every second. This creates a backlog and the
>> new strings pile up, and should be stored in the WAL. Indeed, I can see the
>> files in the checkpoint dirs getting updated. Running the app I get output
>> like this:
>>
>> [info] Stored => [1453374654941-0)]
>> [info] processed => [List(1453374654941-0)]
>> [info] Stored => [1453374654941-1)]
>> [info] Stored => [1453374654941-2)]
>> [info] processed => [List(1453374654941-1)]
>> [info] Stored => [1453374654941-3)]
>> [info] Stored => [1453374654941-4)]
>> [info] processed => [List(1453374654941-2)]
>> [info] Stored => [1453374654941-5)]
>> [info] Stored => [1453374654941-6)]
>> [info] processed => [List(1453374654941-3)]
>> [info] Stored => [1453374654941-7)]
>> [info] Stored => [1453374654941-8)]
>> [info] processed => [List(1453374654941-4)]
>> [info] Stored => [1453374654941-9)]
>> [info] Stored => [1453374654941-10)]
>>
>> As you would expect, the storing is out pacing the processing. So I kill
>> the application and restart it. This time I commented out the sleep in the
>> foreachRDD so that the processing can clear any backlog:
>>
>> [info] Stored => [1453374753946-0)]
>> [info] processed => [List(1453374753946-0)]
>> [info] Stored => [1453374753946-1)]
>> [info] processed => [List(1453374753946-1)]
>> [info] Stored => [1453374753946-2)]
>> [info] processed => [List(1453374753946-2)]
>> [info] Stored => [1453374753946-3)]
>> [info] processed => [List(1453374753946-3)]
>> [info] Stored => [1453374753946-4)]
>> [info] processed => [List(1453374753946-4)]
>>
>> As you can see the new events are processed but none from the previous
>> batch. The old WAL logs are cleared and I see log messages like this but
>> the old data does not get processed.
>>
>> INFO WriteAheadLogManager : Recovered 1 write ahead log files from hdfs://myhdfsserver/user/spark/checkpoint/DStreamResilienceTest/receivedData/0
>>
>> What am I doing wrong? I am using Spark 1.5.2.
>>
>> Best regards,
>>
>> Patrick
>>
>
>
Re: Spark Streaming Write Ahead Log (WAL) not replaying data after restart
Posted by "Shixiong(Ryan) Zhu" <sh...@databricks.com>.
You need to define a create function and use StreamingContext.getOrCreate.
See the example here:
http://spark.apache.org/docs/latest/streaming-programming-guide.html#how-to-configure-checkpointing
On Thu, Jan 21, 2016 at 3:32 AM, Patrick McGloin <mc...@gmail.com>
wrote:
> Hi all,
>
> To have a simple way of testing the Spark Streaming Write Ahead Log I
> created a very simple Custom Input Receiver, which will generate strings
> and store those:
>
> class InMemoryStringReceiver extends Receiver[String](StorageLevel.MEMORY_AND_DISK_SER) {
>
> val batchID = System.currentTimeMillis()
>
> def onStart() {
> new Thread("InMemoryStringReceiver") {
> override def run(): Unit = {
> var i = 0
> while(true) {
> //http://spark.apache.org/docs/latest/streaming-custom-receivers.html
> //To implement a reliable receiver, you have to use store(multiple-records) to store data.
> store(ArrayBuffer(s"$batchID-$i"))
> println(s"Stored => [$batchID-$i)]")
> Thread.sleep(1000L)
> i = i + 1
> }
> }
> }.start()
> }
>
> def onStop() {}
> }
>
> I then created a simple Application which will use the Custom Receiver to
> stream the data and process it:
>
> object DStreamResilienceTest extends App {
>
> val conf = new SparkConf().setMaster("local[*]").setAppName("DStreamResilienceTest").set("spark.streaming.receiver.writeAheadLog.enable", "true")
> val ssc = new StreamingContext(conf, Seconds(1))
> ssc.checkpoint("hdfs://myhdfsserver/user/spark/checkpoint/DStreamResilienceTest")
> val customReceiverStream: ReceiverInputDStream[String] = ssc.receiverStream(new InMemoryStringReceiver())
> customReceiverStream.foreachRDD { (rdd: RDD[String]) =>
> println(s"processed => [${rdd.collect().toList}]")
> Thread.sleep(2000L)
> }
> ssc.start()
> ssc.awaitTermination()
>
> }
>
> As you can see the processing of each received RDD has sleep of 2 seconds
> while the Strings are stored every second. This creates a backlog and the
> new strings pile up, and should be stored in the WAL. Indeed, I can see the
> files in the checkpoint dirs getting updated. Running the app I get output
> like this:
>
> [info] Stored => [1453374654941-0)]
> [info] processed => [List(1453374654941-0)]
> [info] Stored => [1453374654941-1)]
> [info] Stored => [1453374654941-2)]
> [info] processed => [List(1453374654941-1)]
> [info] Stored => [1453374654941-3)]
> [info] Stored => [1453374654941-4)]
> [info] processed => [List(1453374654941-2)]
> [info] Stored => [1453374654941-5)]
> [info] Stored => [1453374654941-6)]
> [info] processed => [List(1453374654941-3)]
> [info] Stored => [1453374654941-7)]
> [info] Stored => [1453374654941-8)]
> [info] processed => [List(1453374654941-4)]
> [info] Stored => [1453374654941-9)]
> [info] Stored => [1453374654941-10)]
>
> As you would expect, the storing is out pacing the processing. So I kill
> the application and restart it. This time I commented out the sleep in the
> foreachRDD so that the processing can clear any backlog:
>
> [info] Stored => [1453374753946-0)]
> [info] processed => [List(1453374753946-0)]
> [info] Stored => [1453374753946-1)]
> [info] processed => [List(1453374753946-1)]
> [info] Stored => [1453374753946-2)]
> [info] processed => [List(1453374753946-2)]
> [info] Stored => [1453374753946-3)]
> [info] processed => [List(1453374753946-3)]
> [info] Stored => [1453374753946-4)]
> [info] processed => [List(1453374753946-4)]
>
> As you can see the new events are processed but none from the previous
> batch. The old WAL logs are cleared and I see log messages like this but
> the old data does not get processed.
>
> INFO WriteAheadLogManager : Recovered 1 write ahead log files from hdfs://myhdfsserver/user/spark/checkpoint/DStreamResilienceTest/receivedData/0
>
> What am I doing wrong? I am using Spark 1.5.2.
>
> Best regards,
>
> Patrick
>