You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Dibyendu Bhattacharya <di...@gmail.com> on 2015/09/26 08:49:25 UTC

Re: Spark Streaming with Tachyon : Data Loss on Receiver Failure due to WAL error

Hi,

Recently I was working on a PR to use Tachyon as OFF_HEAP store for Spark
Streaming and make sure Spark Streaming can recover from Driver failure and
recover the blocks form Tachyon.

The The Motivation for this PR is  :

If Streaming application stores the blocks OFF_HEAP, it may not need any
WAL like feature to recover from Driver failure. As long as the writing of
blocks to Tachyon from Streaming receiver is durable, it should be
recoverable from Tachyon directly on Driver failure.
This can solve the issue of expensive WAL write and duplicating the blocks
both in MEMORY and also WAL and also guarantee end to end No-Data-Loss
channel using OFF_HEAP store.

https://github.com/apache/spark/pull/8817

This PR still under review . But having done various fail over testing in
my environment , I see this PR worked perfectly fine without any data loss
. Let see what TD and other have to say on this PR .

Below is the configuration I used to test this PR ..


Spark : 1.6 from Master
Tachyon : 0.7.1

SparkConfiguration Details :

SparkConf conf = new SparkConf().setAppName("TestTachyon")
.set("spark.streaming.unpersist", "true")
.set("spark.local.dir", "/mnt1/spark/tincan")
.set("tachyon.zookeeper.address","10.252.5.113:2182")
.set("tachyon.usezookeeper","true")
.set("spark.externalBlockStore.url", "tachyon-ft://
ip-10-252-5-113.asskickery.us:19998")
        .set("spark.externalBlockStore.baseDir", "/sparkstreaming")
        .set("spark.externalBlockStore.folderName","pearson")
        .set("spark.externalBlockStore.dirId", "subpub")

.set("spark.externalBlockStore.keepExternalBlockStoreDirOnShutdown","true");

JavaStreamingContext jsc = new JavaStreamingContext(conf, new Duration(
10000));

String checkpointDirectory = "hdfs://10.252.5.113:9000/user/hadoop/spark/wal
";

jsc.checkpoint(checkpointDirectory);


//I am using the My Receiver Based Consumer (
https://github.com/dibbhatt/kafka-spark-consumer) . But
KafkaUtil.CreateStream will also work

JavaDStream<MessageAndMetadata> unionStreams = ReceiverLauncher.launch(
jsc, props, numberOfReceivers, StorageLevel.OFF_HEAP());




Regards,
Dibyendu

On Sat, Sep 26, 2015 at 11:59 AM, N B <nb...@gmail.com> wrote:

> Hi Dibyendu,
>
> How does one go about configuring spark streaming to use tachyon as its
> place for storing checkpoints? Also, can one do this with tachyon running
> on a completely different node than where spark processes are running?
>
> Thanks
> Nikunj
>
>
> On Thu, May 21, 2015 at 8:35 PM, Dibyendu Bhattacharya <
> dibyendu.bhattachary@gmail.com> wrote:
>
>> Hi Tathagata,
>>
>> Thanks for looking into this. Further investigating I found that the
>> issue is with Tachyon does not support File Append. The streaming receiver
>> which writes to WAL when failed, and again restarted, not able to append to
>> same WAL file after restart.
>>
>> I raised this with Tachyon user group, and Haoyuan told that within 3
>> months time Tachyon file append will be ready. Will revisit this issue
>> again then .
>>
>> Regards,
>> Dibyendu
>>
>>
>> On Fri, May 22, 2015 at 12:24 AM, Tathagata Das <td...@databricks.com>
>> wrote:
>>
>>> Looks like somehow the file size reported by the FSInputDStream of
>>> Tachyon's FileSystem interface, is returning zero.
>>>
>>> On Mon, May 11, 2015 at 4:38 AM, Dibyendu Bhattacharya <
>>> dibyendu.bhattachary@gmail.com> wrote:
>>>
>>>> Just to follow up this thread further .
>>>>
>>>> I was doing some fault tolerant testing of Spark Streaming with Tachyon
>>>> as OFF_HEAP block store. As I said in earlier email, I could able to solve
>>>> the BlockNotFound exception when I used Hierarchical Storage of
>>>> Tachyon ,  which is good.
>>>>
>>>> I continue doing some testing around storing the Spark Streaming WAL
>>>> and CheckPoint files also in Tachyon . Here is few finding ..
>>>>
>>>>
>>>> When I store the Spark Streaming Checkpoint location in Tachyon , the
>>>> throughput is much higher . I tested the Driver and Receiver failure cases
>>>> , and Spark Streaming is able to recover without any Data Loss on Driver
>>>> failure.
>>>>
>>>> *But on Receiver failure , Spark Streaming looses data* as I see
>>>> Exception while reading the WAL file from Tachyon "receivedData" location
>>>>  for the same Receiver id which just failed.
>>>>
>>>> If I change the Checkpoint location back to HDFS , Spark Streaming can
>>>> recover from both Driver and Receiver failure .
>>>>
>>>> Here is the Log details when Spark Streaming receiver failed ...I
>>>> raised a JIRA for the same issue :
>>>> https://issues.apache.org/jira/browse/SPARK-7525
>>>>
>>>>
>>>>
>>>> INFO : org.apache.spark.scheduler.DAGScheduler - *Executor lost: 2
>>>> (epoch 1)*
>>>> INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Trying to
>>>> remove executor 2 from BlockManagerMaster.
>>>> INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Removing
>>>> block manager BlockManagerId(2, 10.252.5.54, 45789)
>>>> INFO : org.apache.spark.storage.BlockManagerMaster - Removed 2
>>>> successfully in removeExecutor
>>>> INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - *Registered
>>>> receiver for stream 2 from 10.252.5.62*:47255
>>>> WARN : org.apache.spark.scheduler.TaskSetManager - Lost task 2.1 in
>>>> stage 103.0 (TID 421, 10.252.5.62): org.apache.spark.SparkException: *Could
>>>> not read data from write ahead log record
>>>> FileBasedWriteAheadLogSegment(tachyon-ft://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919
>>>> <http://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919>)*
>>>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>>>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144)
>>>> at
>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
>>>> at
>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
>>>> at scala.Option.getOrElse(Option.scala:120)
>>>> at
>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168)
>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>>>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>> at java.lang.Thread.run(Thread.java:744)
>>>> Caused by: java.lang.IllegalArgumentException:* Seek position is past
>>>> EOF: 645603894, fileSize = 0*
>>>> at tachyon.hadoop.HdfsFileInputStream.seek(HdfsFileInputStream.java:239)
>>>> at
>>>> org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37)
>>>> at
>>>> org.apache.spark.streaming.util.FileBasedWriteAheadLogRandomReader.read(FileBasedWriteAheadLogRandomReader.scala:37)
>>>> at
>>>> org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:104)
>>>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>>>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141)
>>>> ... 15 more
>>>>
>>>> INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.2 in
>>>> stage 103.0 (TID 422, 10.252.5.61, ANY, 1909 bytes)
>>>> INFO : org.apache.spark.scheduler.TaskSetManager - Lost task 2.2 in
>>>> stage 103.0 (TID 422) on executor 10.252.5.61:
>>>> org.apache.spark.SparkException (Could not read data from write ahead log
>>>> record FileBasedWriteAheadLogSegment(tachyon-ft://
>>>> 10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919))
>>>> [duplicate 1]
>>>> INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.3 in
>>>> stage 103.0 (TID 423, 10.252.5.62, ANY, 1909 bytes)
>>>> INFO : org.apache.spark.deploy.client.AppClient$ClientActor - Executor
>>>> updated: app-20150511104442-0048/2 is now LOST (worker lost)
>>>> INFO : org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend -
>>>> Executor app-20150511104442-0048/2 removed: worker lost
>>>> ERROR: org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend -
>>>> Asked to remove non-existent executor 2
>>>> INFO : org.apache.spark.scheduler.TaskSetManager - Lost task 2.3 in
>>>> stage 103.0 (TID 423) on executor 10.252.5.62:
>>>> org.apache.spark.SparkException (Could not read data from write ahead log
>>>> record FileBasedWriteAheadLogSegment(tachyon-ft://
>>>> 10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919))
>>>> [duplicate 2]
>>>> ERROR: org.apache.spark.scheduler.TaskSetManager - Task 2 in stage
>>>> 103.0 failed 4 times; aborting job
>>>> INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Removed TaskSet
>>>> 103.0, whose tasks have all completed, from pool
>>>> INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Cancelling stage
>>>> 103
>>>> INFO : org.apache.spark.scheduler.DAGScheduler - ResultStage 103
>>>> (foreachRDD at Consumer.java:92) failed in 0.943 s
>>>> INFO : org.apache.spark.scheduler.DAGScheduler - Job 120 failed:
>>>> foreachRDD at Consumer.java:92, took 0.953482 s
>>>> ERROR: org.apache.spark.streaming.scheduler.JobScheduler - Error
>>>> running job streaming job 1431341145000 ms.0
>>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>>> 2 in stage 103.0 failed 4 times, most recent failure: Lost task 2.3 in
>>>> stage 103.0 (TID 423, 10.252.5.62): org.apache.spark.SparkException: Could
>>>> not read data from write ahead log record
>>>> FileBasedWriteAheadLogSegment(tachyon-ft://
>>>> 10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919
>>>> )
>>>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>>>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144)
>>>> at
>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
>>>> at
>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
>>>> at scala.Option.getOrElse(Option.scala:120)
>>>> at
>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168)
>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>>>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>> at java.lang.Thread.run(Thread.java:744)
>>>> Caused by: java.lang.IllegalArgumentException: Seek position is past
>>>> EOF: 645603894, fileSize = 0
>>>> at tachyon.hadoop.HdfsFileInputStream.seek(HdfsFileInputStream.java:239)
>>>> at
>>>> org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37)
>>>> at
>>>> org.apache.spark.streaming.util.FileBasedWriteAheadLogRandomReader.read(FileBasedWriteAheadLogRandomReader.scala:37)
>>>> at
>>>> org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:104)
>>>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>>>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141)
>>>> ... 15 more
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, May 8, 2015 at 11:03 PM, Haoyuan Li <ha...@gmail.com>
>>>> wrote:
>>>>
>>>>> Thanks for the updates!
>>>>>
>>>>> Best,
>>>>>
>>>>> Haoyuan
>>>>>
>>>>> On Fri, May 8, 2015 at 8:40 AM, Dibyendu Bhattacharya <
>>>>> dibyendu.bhattachary@gmail.com> wrote:
>>>>>
>>>>>> Just a followup on this Thread .
>>>>>>
>>>>>> I tried Hierarchical Storage on Tachyon (
>>>>>> http://tachyon-project.org/Hierarchy-Storage-on-Tachyon.html ) , and
>>>>>> that
>>>>>> seems to have worked and I did not see any any Spark Job failed due to
>>>>>> BlockNotFoundException.
>>>>>> below is my  Hierarchical Storage settings..
>>>>>>
>>>>>>   -Dtachyon.worker.hierarchystore.level.max=2
>>>>>>   -Dtachyon.worker.hierarchystore.level0.alias=MEM
>>>>>>   -Dtachyon.worker.hierarchystore.level0.dirs.path=$TACHYON_RAM_FOLDER
>>>>>>
>>>>>>
>>>>>> -Dtachyon.worker.hierarchystore.level0.dirs.quota=$TACHYON_WORKER_MEMORY_SIZE
>>>>>>   -Dtachyon.worker.hierarchystore.level1.alias=HDD
>>>>>>   -Dtachyon.worker.hierarchystore.level1.dirs.path=/mnt/tachyon
>>>>>>   -Dtachyon.worker.hierarchystore.level1.dirs.quota=50GB
>>>>>>   -Dtachyon.worker.allocate.strategy=MAX_FREE
>>>>>>   -Dtachyon.worker.evict.strategy=LRU
>>>>>>
>>>>>> Regards,
>>>>>> Dibyendu
>>>>>>
>>>>>> On Thu, May 7, 2015 at 1:46 PM, Dibyendu Bhattacharya <
>>>>>> dibyendu.bhattachary@gmail.com> wrote:
>>>>>>
>>>>>> > Dear All ,
>>>>>> >
>>>>>> > I have been playing with Spark Streaming on Tachyon as the OFF_HEAP
>>>>>> block
>>>>>> > store  . Primary reason for evaluating Tachyon is to find if
>>>>>> Tachyon can
>>>>>> > solve the Spark BlockNotFoundException .
>>>>>> >
>>>>>> > In traditional MEMORY_ONLY StorageLevel, when blocks are evicted ,
>>>>>> jobs
>>>>>> > failed due to block not found exception and storing blocks in
>>>>>> > MEMORY_AND_DISK is not a good option either as it impact the
>>>>>> throughput a
>>>>>> > lot .
>>>>>> >
>>>>>> >
>>>>>> > To test how Tachyon behave , I took the latest spark 1.4 from
>>>>>> master , and
>>>>>> > used Tachyon 0.6.4 and configured Tachyon in Fault Tolerant Mode .
>>>>>> Tachyon
>>>>>> > is running in 3 Node AWS x-large cluster and Spark is running in 3
>>>>>> node AWS
>>>>>> > x-large cluster.
>>>>>> >
>>>>>> > I have used the low level Receiver based Kafka consumer (
>>>>>> > https://github.com/dibbhatt/kafka-spark-consumer)  which I have
>>>>>> written
>>>>>> > to pull from Kafka and write Blocks to Tachyon
>>>>>> >
>>>>>> >
>>>>>> > I found there is similar improvement in throughput (as MEMORY_ONLY
>>>>>> case )
>>>>>> > but very good overall memory utilization (as it is off heap store) .
>>>>>> >
>>>>>> >
>>>>>> > But I found one issue on which I need to clarification .
>>>>>> >
>>>>>> >
>>>>>> > In Tachyon case also , I find  BlockNotFoundException  , but due to
>>>>>> a
>>>>>> > different reason .  What I see TachyonBlockManager.scala put the
>>>>>> blocks in
>>>>>> > WriteType.TRY_CACHE configuration . And because of this Blocks ate
>>>>>> evicted
>>>>>> > from Tachyon Cache and when Spark try to find the block it throws
>>>>>> >  BlockNotFoundException .
>>>>>> >
>>>>>> > I see a pull request which discuss the same ..
>>>>>> >
>>>>>> > https://github.com/apache/spark/pull/158#discussion_r11195271
>>>>>> >
>>>>>> >
>>>>>> > When I modified the WriteType to CACHE_THROUGH , BlockDropException
>>>>>> is
>>>>>> > gone , but it again impact the throughput ..
>>>>>> >
>>>>>> >
>>>>>> > Just curious to know , if Tachyon has any settings which can solve
>>>>>> the
>>>>>> > Block Eviction from Cache to Disk, other than explicitly setting
>>>>>> > CACHE_THROUGH  ?
>>>>>> >
>>>>>> > Regards,
>>>>>> > Dibyendu
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Haoyuan Li
>>>>> CEO, Tachyon Nexus <http://www.tachyonnexus.com/>
>>>>> AMPLab, EECS, UC Berkeley http://www.cs.berkeley.edu/~haoyuan/
>>>>>
>>>>
>>>>
>>>
>>
>

Re: Spark Streaming with Tachyon : Data Loss on Receiver Failure due to WAL error

Posted by N B <nb...@gmail.com>.
I wanted to add that we are not configuring the WAL in our scenario.

Thanks again,
Nikunj


On Sat, Sep 26, 2015 at 11:35 AM, N B <nb...@gmail.com> wrote:

> Hi Dibyendu,
>
> Thanks. I believe I understand why it has been an issue using S3 for
> checkpoints based on your explanation. But does this limitation apply only
> if recovery is needed in case of driver failure?
>
> What if we are not interested in recovery after a driver failure. However,
> just for the purposes of running streaming pipelines that do
> reduceByKeyAndWindow() and updateStateByKey() operations it needs to have a
> checkpoint directory configured.
>
> Do you think this usage will also run into issues if an S3 location is
> provided for the checkpoint directory. We will not use it to do any
> explicit recovery like I stated above.
>
> Thanks
> Nikunj
>
>
>
> On Sat, Sep 26, 2015 at 3:13 AM, Dibyendu Bhattacharya <
> dibyendu.bhattachary@gmail.com> wrote:
>
>> In Spark Streaming , Checkpoint Directory is used for two purpose
>>
>> 1. Metadata checkpointing
>>
>> 2. Data checkpointing
>>
>> If you enable WAL to recover from Driver failure, Spark Streaming will
>> also write the Received Blocks in WAL which stored in checkpoint directory.
>>
>> For streaming solution to recover from any failure without any data loss
>> , you need to enable Meta Data Check pointing and WAL.  You do not need to
>> enable Data Check pointing.
>>
>> From my experiments and the PR I mentioned , I configured the Meta Data
>> Check Pointing in HDFS , and stored the Received Blocks OFF_HEAP.  And I
>> did not use any WAL . The PR I proposed would recover from Driver fail-over
>> without using any WAL like feature because Blocks are already available in
>> Tachyon. The Meta Data Checkpoint helps to recover the meta data about past
>> received blocks.
>>
>> Now the question is , can I configure Tachyon as my Metadata Checkpoint
>> location ? I tried that , and Streaming application writes the
>> receivedBlockMeataData to Tachyon, but on driver failure, it can not
>> recover the received block meta data from Tachyon. I sometime see Zero size
>> files in Tachyon checkpoint location , and it can not recover past events .
>> I need to understand what is the issue of storing meta data in Tachyon .
>> That needs a different JIRA I guess.
>>
>> Let me know I am able to explain the current scenario around Spark
>> Streaming and Tachyon .
>>
>> Regards,
>> Dibyendu
>>
>>
>>
>>
>> On Sat, Sep 26, 2015 at 1:04 PM, N B <nb...@gmail.com> wrote:
>>
>>> Hi Dibyendu,
>>>
>>> I am not sure I understand completely. But are you suggesting that
>>> currently there is no way to enable Checkpoint directory to be in Tachyon?
>>>
>>> Thanks
>>> Nikunj
>>>
>>>
>>> On Fri, Sep 25, 2015 at 11:49 PM, Dibyendu Bhattacharya <
>>> dibyendu.bhattachary@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> Recently I was working on a PR to use Tachyon as OFF_HEAP store for
>>>> Spark Streaming and make sure Spark Streaming can recover from Driver
>>>> failure and recover the blocks form Tachyon.
>>>>
>>>> The The Motivation for this PR is  :
>>>>
>>>> If Streaming application stores the blocks OFF_HEAP, it may not need
>>>> any WAL like feature to recover from Driver failure. As long as the writing
>>>> of blocks to Tachyon from Streaming receiver is durable, it should be
>>>> recoverable from Tachyon directly on Driver failure.
>>>> This can solve the issue of expensive WAL write and duplicating the
>>>> blocks both in MEMORY and also WAL and also guarantee end to end
>>>> No-Data-Loss channel using OFF_HEAP store.
>>>>
>>>> https://github.com/apache/spark/pull/8817
>>>>
>>>> This PR still under review . But having done various fail over testing
>>>> in my environment , I see this PR worked perfectly fine without any data
>>>> loss . Let see what TD and other have to say on this PR .
>>>>
>>>> Below is the configuration I used to test this PR ..
>>>>
>>>>
>>>> Spark : 1.6 from Master
>>>> Tachyon : 0.7.1
>>>>
>>>> SparkConfiguration Details :
>>>>
>>>> SparkConf conf = new SparkConf().setAppName("TestTachyon")
>>>> .set("spark.streaming.unpersist", "true")
>>>> .set("spark.local.dir", "/mnt1/spark/tincan")
>>>> .set("tachyon.zookeeper.address","10.252.5.113:2182")
>>>> .set("tachyon.usezookeeper","true")
>>>> .set("spark.externalBlockStore.url", "tachyon-ft://
>>>> ip-10-252-5-113.asskickery.us:19998")
>>>>         .set("spark.externalBlockStore.baseDir", "/sparkstreaming")
>>>>         .set("spark.externalBlockStore.folderName","pearson")
>>>>         .set("spark.externalBlockStore.dirId", "subpub")
>>>>
>>>> .set("spark.externalBlockStore.keepExternalBlockStoreDirOnShutdown","true");
>>>>
>>>> JavaStreamingContext jsc = new JavaStreamingContext(conf, new Duration(
>>>> 10000));
>>>>
>>>> String checkpointDirectory = "hdfs://
>>>> 10.252.5.113:9000/user/hadoop/spark/wal";
>>>>
>>>> jsc.checkpoint(checkpointDirectory);
>>>>
>>>>
>>>> //I am using the My Receiver Based Consumer (
>>>> https://github.com/dibbhatt/kafka-spark-consumer) . But
>>>> KafkaUtil.CreateStream will also work
>>>>
>>>> JavaDStream<MessageAndMetadata> unionStreams = ReceiverLauncher.launch(
>>>> jsc, props, numberOfReceivers, StorageLevel.OFF_HEAP());
>>>>
>>>>
>>>>
>>>>
>>>> Regards,
>>>> Dibyendu
>>>>
>>>> On Sat, Sep 26, 2015 at 11:59 AM, N B <nb...@gmail.com> wrote:
>>>>
>>>>> Hi Dibyendu,
>>>>>
>>>>> How does one go about configuring spark streaming to use tachyon as
>>>>> its place for storing checkpoints? Also, can one do this with tachyon
>>>>> running on a completely different node than where spark processes are
>>>>> running?
>>>>>
>>>>> Thanks
>>>>> Nikunj
>>>>>
>>>>>
>>>>> On Thu, May 21, 2015 at 8:35 PM, Dibyendu Bhattacharya <
>>>>> dibyendu.bhattachary@gmail.com> wrote:
>>>>>
>>>>>> Hi Tathagata,
>>>>>>
>>>>>> Thanks for looking into this. Further investigating I found that the
>>>>>> issue is with Tachyon does not support File Append. The streaming receiver
>>>>>> which writes to WAL when failed, and again restarted, not able to append to
>>>>>> same WAL file after restart.
>>>>>>
>>>>>> I raised this with Tachyon user group, and Haoyuan told that within 3
>>>>>> months time Tachyon file append will be ready. Will revisit this issue
>>>>>> again then .
>>>>>>
>>>>>> Regards,
>>>>>> Dibyendu
>>>>>>
>>>>>>
>>>>>> On Fri, May 22, 2015 at 12:24 AM, Tathagata Das <td...@databricks.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Looks like somehow the file size reported by the FSInputDStream of
>>>>>>> Tachyon's FileSystem interface, is returning zero.
>>>>>>>
>>>>>>> On Mon, May 11, 2015 at 4:38 AM, Dibyendu Bhattacharya <
>>>>>>> dibyendu.bhattachary@gmail.com> wrote:
>>>>>>>
>>>>>>>> Just to follow up this thread further .
>>>>>>>>
>>>>>>>> I was doing some fault tolerant testing of Spark Streaming with
>>>>>>>> Tachyon as OFF_HEAP block store. As I said in earlier email, I could able
>>>>>>>> to solve the BlockNotFound exception when I used Hierarchical Storage
>>>>>>>> of Tachyon ,  which is good.
>>>>>>>>
>>>>>>>> I continue doing some testing around storing the Spark Streaming
>>>>>>>> WAL and CheckPoint files also in Tachyon . Here is few finding ..
>>>>>>>>
>>>>>>>>
>>>>>>>> When I store the Spark Streaming Checkpoint location in Tachyon ,
>>>>>>>> the throughput is much higher . I tested the Driver and Receiver failure
>>>>>>>> cases , and Spark Streaming is able to recover without any Data Loss on
>>>>>>>> Driver failure.
>>>>>>>>
>>>>>>>> *But on Receiver failure , Spark Streaming looses data* as I see
>>>>>>>> Exception while reading the WAL file from Tachyon "receivedData" location
>>>>>>>>  for the same Receiver id which just failed.
>>>>>>>>
>>>>>>>> If I change the Checkpoint location back to HDFS , Spark Streaming
>>>>>>>> can recover from both Driver and Receiver failure .
>>>>>>>>
>>>>>>>> Here is the Log details when Spark Streaming receiver failed ...I
>>>>>>>> raised a JIRA for the same issue :
>>>>>>>> https://issues.apache.org/jira/browse/SPARK-7525
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> INFO : org.apache.spark.scheduler.DAGScheduler - *Executor lost: 2
>>>>>>>> (epoch 1)*
>>>>>>>> INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Trying
>>>>>>>> to remove executor 2 from BlockManagerMaster.
>>>>>>>> INFO : org.apache.spark.storage.BlockManagerMasterEndpoint -
>>>>>>>> Removing block manager BlockManagerId(2, 10.252.5.54, 45789)
>>>>>>>> INFO : org.apache.spark.storage.BlockManagerMaster - Removed 2
>>>>>>>> successfully in removeExecutor
>>>>>>>> INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - *Registered
>>>>>>>> receiver for stream 2 from 10.252.5.62*:47255
>>>>>>>> WARN : org.apache.spark.scheduler.TaskSetManager - Lost task 2.1 in
>>>>>>>> stage 103.0 (TID 421, 10.252.5.62): org.apache.spark.SparkException: *Could
>>>>>>>> not read data from write ahead log record
>>>>>>>> FileBasedWriteAheadLogSegment(tachyon-ft://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919
>>>>>>>> <http://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919>)*
>>>>>>>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>>>>>>>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144)
>>>>>>>> at
>>>>>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
>>>>>>>> at
>>>>>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
>>>>>>>> at scala.Option.getOrElse(Option.scala:120)
>>>>>>>> at
>>>>>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168)
>>>>>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>>>>> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>>>>>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>>>>> at
>>>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>>>>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>>>>>>>> at
>>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>>>>>>> at
>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>>> at
>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>>> at java.lang.Thread.run(Thread.java:744)
>>>>>>>> Caused by: java.lang.IllegalArgumentException:* Seek position is
>>>>>>>> past EOF: 645603894, fileSize = 0*
>>>>>>>> at
>>>>>>>> tachyon.hadoop.HdfsFileInputStream.seek(HdfsFileInputStream.java:239)
>>>>>>>> at
>>>>>>>> org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37)
>>>>>>>> at
>>>>>>>> org.apache.spark.streaming.util.FileBasedWriteAheadLogRandomReader.read(FileBasedWriteAheadLogRandomReader.scala:37)
>>>>>>>> at
>>>>>>>> org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:104)
>>>>>>>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>>>>>>>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141)
>>>>>>>> ... 15 more
>>>>>>>>
>>>>>>>> INFO : org.apache.spark.scheduler.TaskSetManager - Starting task
>>>>>>>> 2.2 in stage 103.0 (TID 422, 10.252.5.61, ANY, 1909 bytes)
>>>>>>>> INFO : org.apache.spark.scheduler.TaskSetManager - Lost task 2.2 in
>>>>>>>> stage 103.0 (TID 422) on executor 10.252.5.61:
>>>>>>>> org.apache.spark.SparkException (Could not read data from write ahead log
>>>>>>>> record FileBasedWriteAheadLogSegment(tachyon-ft://
>>>>>>>> 10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919))
>>>>>>>> [duplicate 1]
>>>>>>>> INFO : org.apache.spark.scheduler.TaskSetManager - Starting task
>>>>>>>> 2.3 in stage 103.0 (TID 423, 10.252.5.62, ANY, 1909 bytes)
>>>>>>>> INFO : org.apache.spark.deploy.client.AppClient$ClientActor -
>>>>>>>> Executor updated: app-20150511104442-0048/2 is now LOST (worker lost)
>>>>>>>> INFO :
>>>>>>>> org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend - Executor
>>>>>>>> app-20150511104442-0048/2 removed: worker lost
>>>>>>>> ERROR:
>>>>>>>> org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend - Asked to
>>>>>>>> remove non-existent executor 2
>>>>>>>> INFO : org.apache.spark.scheduler.TaskSetManager - Lost task 2.3 in
>>>>>>>> stage 103.0 (TID 423) on executor 10.252.5.62:
>>>>>>>> org.apache.spark.SparkException (Could not read data from write ahead log
>>>>>>>> record FileBasedWriteAheadLogSegment(tachyon-ft://
>>>>>>>> 10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919))
>>>>>>>> [duplicate 2]
>>>>>>>> ERROR: org.apache.spark.scheduler.TaskSetManager - Task 2 in stage
>>>>>>>> 103.0 failed 4 times; aborting job
>>>>>>>> INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Removed
>>>>>>>> TaskSet 103.0, whose tasks have all completed, from pool
>>>>>>>> INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Cancelling
>>>>>>>> stage 103
>>>>>>>> INFO : org.apache.spark.scheduler.DAGScheduler - ResultStage 103
>>>>>>>> (foreachRDD at Consumer.java:92) failed in 0.943 s
>>>>>>>> INFO : org.apache.spark.scheduler.DAGScheduler - Job 120 failed:
>>>>>>>> foreachRDD at Consumer.java:92, took 0.953482 s
>>>>>>>> ERROR: org.apache.spark.streaming.scheduler.JobScheduler - Error
>>>>>>>> running job streaming job 1431341145000 ms.0
>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage failure:
>>>>>>>> Task 2 in stage 103.0 failed 4 times, most recent failure: Lost task 2.3 in
>>>>>>>> stage 103.0 (TID 423, 10.252.5.62): org.apache.spark.SparkException: Could
>>>>>>>> not read data from write ahead log record
>>>>>>>> FileBasedWriteAheadLogSegment(tachyon-ft://
>>>>>>>> 10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919
>>>>>>>> )
>>>>>>>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>>>>>>>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144)
>>>>>>>> at
>>>>>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
>>>>>>>> at
>>>>>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
>>>>>>>> at scala.Option.getOrElse(Option.scala:120)
>>>>>>>> at
>>>>>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168)
>>>>>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>>>>> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>>>>>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>>>>> at
>>>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>>>>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>>>>>>>> at
>>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>>>>>>> at
>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>>> at
>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>>> at java.lang.Thread.run(Thread.java:744)
>>>>>>>> Caused by: java.lang.IllegalArgumentException: Seek position is
>>>>>>>> past EOF: 645603894, fileSize = 0
>>>>>>>> at
>>>>>>>> tachyon.hadoop.HdfsFileInputStream.seek(HdfsFileInputStream.java:239)
>>>>>>>> at
>>>>>>>> org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37)
>>>>>>>> at
>>>>>>>> org.apache.spark.streaming.util.FileBasedWriteAheadLogRandomReader.read(FileBasedWriteAheadLogRandomReader.scala:37)
>>>>>>>> at
>>>>>>>> org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:104)
>>>>>>>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>>>>>>>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141)
>>>>>>>> ... 15 more
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, May 8, 2015 at 11:03 PM, Haoyuan Li <ha...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks for the updates!
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>>
>>>>>>>>> Haoyuan
>>>>>>>>>
>>>>>>>>> On Fri, May 8, 2015 at 8:40 AM, Dibyendu Bhattacharya <
>>>>>>>>> dibyendu.bhattachary@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Just a followup on this Thread .
>>>>>>>>>>
>>>>>>>>>> I tried Hierarchical Storage on Tachyon (
>>>>>>>>>> http://tachyon-project.org/Hierarchy-Storage-on-Tachyon.html ) ,
>>>>>>>>>> and that
>>>>>>>>>> seems to have worked and I did not see any any Spark Job failed
>>>>>>>>>> due to
>>>>>>>>>> BlockNotFoundException.
>>>>>>>>>> below is my  Hierarchical Storage settings..
>>>>>>>>>>
>>>>>>>>>>   -Dtachyon.worker.hierarchystore.level.max=2
>>>>>>>>>>   -Dtachyon.worker.hierarchystore.level0.alias=MEM
>>>>>>>>>>
>>>>>>>>>> -Dtachyon.worker.hierarchystore.level0.dirs.path=$TACHYON_RAM_FOLDER
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> -Dtachyon.worker.hierarchystore.level0.dirs.quota=$TACHYON_WORKER_MEMORY_SIZE
>>>>>>>>>>   -Dtachyon.worker.hierarchystore.level1.alias=HDD
>>>>>>>>>>   -Dtachyon.worker.hierarchystore.level1.dirs.path=/mnt/tachyon
>>>>>>>>>>   -Dtachyon.worker.hierarchystore.level1.dirs.quota=50GB
>>>>>>>>>>   -Dtachyon.worker.allocate.strategy=MAX_FREE
>>>>>>>>>>   -Dtachyon.worker.evict.strategy=LRU
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Dibyendu
>>>>>>>>>>
>>>>>>>>>> On Thu, May 7, 2015 at 1:46 PM, Dibyendu Bhattacharya <
>>>>>>>>>> dibyendu.bhattachary@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>> > Dear All ,
>>>>>>>>>> >
>>>>>>>>>> > I have been playing with Spark Streaming on Tachyon as the
>>>>>>>>>> OFF_HEAP block
>>>>>>>>>> > store  . Primary reason for evaluating Tachyon is to find if
>>>>>>>>>> Tachyon can
>>>>>>>>>> > solve the Spark BlockNotFoundException .
>>>>>>>>>> >
>>>>>>>>>> > In traditional MEMORY_ONLY StorageLevel, when blocks are
>>>>>>>>>> evicted , jobs
>>>>>>>>>> > failed due to block not found exception and storing blocks in
>>>>>>>>>> > MEMORY_AND_DISK is not a good option either as it impact the
>>>>>>>>>> throughput a
>>>>>>>>>> > lot .
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> > To test how Tachyon behave , I took the latest spark 1.4 from
>>>>>>>>>> master , and
>>>>>>>>>> > used Tachyon 0.6.4 and configured Tachyon in Fault Tolerant
>>>>>>>>>> Mode . Tachyon
>>>>>>>>>> > is running in 3 Node AWS x-large cluster and Spark is running
>>>>>>>>>> in 3 node AWS
>>>>>>>>>> > x-large cluster.
>>>>>>>>>> >
>>>>>>>>>> > I have used the low level Receiver based Kafka consumer (
>>>>>>>>>> > https://github.com/dibbhatt/kafka-spark-consumer)  which I
>>>>>>>>>> have written
>>>>>>>>>> > to pull from Kafka and write Blocks to Tachyon
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> > I found there is similar improvement in throughput (as
>>>>>>>>>> MEMORY_ONLY case )
>>>>>>>>>> > but very good overall memory utilization (as it is off heap
>>>>>>>>>> store) .
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> > But I found one issue on which I need to clarification .
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> > In Tachyon case also , I find  BlockNotFoundException  , but
>>>>>>>>>> due to a
>>>>>>>>>> > different reason .  What I see TachyonBlockManager.scala put
>>>>>>>>>> the blocks in
>>>>>>>>>> > WriteType.TRY_CACHE configuration . And because of this Blocks
>>>>>>>>>> ate evicted
>>>>>>>>>> > from Tachyon Cache and when Spark try to find the block it
>>>>>>>>>> throws
>>>>>>>>>> >  BlockNotFoundException .
>>>>>>>>>> >
>>>>>>>>>> > I see a pull request which discuss the same ..
>>>>>>>>>> >
>>>>>>>>>> > https://github.com/apache/spark/pull/158#discussion_r11195271
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> > When I modified the WriteType to CACHE_THROUGH ,
>>>>>>>>>> BlockDropException is
>>>>>>>>>> > gone , but it again impact the throughput ..
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> > Just curious to know , if Tachyon has any settings which can
>>>>>>>>>> solve the
>>>>>>>>>> > Block Eviction from Cache to Disk, other than explicitly setting
>>>>>>>>>> > CACHE_THROUGH  ?
>>>>>>>>>> >
>>>>>>>>>> > Regards,
>>>>>>>>>> > Dibyendu
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Haoyuan Li
>>>>>>>>> CEO, Tachyon Nexus <http://www.tachyonnexus.com/>
>>>>>>>>> AMPLab, EECS, UC Berkeley http://www.cs.berkeley.edu/~haoyuan/
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Spark Streaming with Tachyon : Data Loss on Receiver Failure due to WAL error

Posted by N B <nb...@gmail.com>.
Hi Dibyendu,

Thanks. I believe I understand why it has been an issue using S3 for
checkpoints based on your explanation. But does this limitation apply only
if recovery is needed in case of driver failure?

What if we are not interested in recovery after a driver failure. However,
just for the purposes of running streaming pipelines that do
reduceByKeyAndWindow() and updateStateByKey() operations it needs to have a
checkpoint directory configured.

Do you think this usage will also run into issues if an S3 location is
provided for the checkpoint directory. We will not use it to do any
explicit recovery like I stated above.

Thanks
Nikunj



On Sat, Sep 26, 2015 at 3:13 AM, Dibyendu Bhattacharya <
dibyendu.bhattachary@gmail.com> wrote:

> In Spark Streaming , Checkpoint Directory is used for two purpose
>
> 1. Metadata checkpointing
>
> 2. Data checkpointing
>
> If you enable WAL to recover from Driver failure, Spark Streaming will
> also write the Received Blocks in WAL which stored in checkpoint directory.
>
> For streaming solution to recover from any failure without any data loss ,
> you need to enable Meta Data Check pointing and WAL.  You do not need to
> enable Data Check pointing.
>
> From my experiments and the PR I mentioned , I configured the Meta Data
> Check Pointing in HDFS , and stored the Received Blocks OFF_HEAP.  And I
> did not use any WAL . The PR I proposed would recover from Driver fail-over
> without using any WAL like feature because Blocks are already available in
> Tachyon. The Meta Data Checkpoint helps to recover the meta data about past
> received blocks.
>
> Now the question is , can I configure Tachyon as my Metadata Checkpoint
> location ? I tried that , and Streaming application writes the
> receivedBlockMeataData to Tachyon, but on driver failure, it can not
> recover the received block meta data from Tachyon. I sometime see Zero size
> files in Tachyon checkpoint location , and it can not recover past events .
> I need to understand what is the issue of storing meta data in Tachyon .
> That needs a different JIRA I guess.
>
> Let me know I am able to explain the current scenario around Spark
> Streaming and Tachyon .
>
> Regards,
> Dibyendu
>
>
>
>
> On Sat, Sep 26, 2015 at 1:04 PM, N B <nb...@gmail.com> wrote:
>
>> Hi Dibyendu,
>>
>> I am not sure I understand completely. But are you suggesting that
>> currently there is no way to enable Checkpoint directory to be in Tachyon?
>>
>> Thanks
>> Nikunj
>>
>>
>> On Fri, Sep 25, 2015 at 11:49 PM, Dibyendu Bhattacharya <
>> dibyendu.bhattachary@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Recently I was working on a PR to use Tachyon as OFF_HEAP store for
>>> Spark Streaming and make sure Spark Streaming can recover from Driver
>>> failure and recover the blocks form Tachyon.
>>>
>>> The The Motivation for this PR is  :
>>>
>>> If Streaming application stores the blocks OFF_HEAP, it may not need any
>>> WAL like feature to recover from Driver failure. As long as the writing of
>>> blocks to Tachyon from Streaming receiver is durable, it should be
>>> recoverable from Tachyon directly on Driver failure.
>>> This can solve the issue of expensive WAL write and duplicating the
>>> blocks both in MEMORY and also WAL and also guarantee end to end
>>> No-Data-Loss channel using OFF_HEAP store.
>>>
>>> https://github.com/apache/spark/pull/8817
>>>
>>> This PR still under review . But having done various fail over testing
>>> in my environment , I see this PR worked perfectly fine without any data
>>> loss . Let see what TD and other have to say on this PR .
>>>
>>> Below is the configuration I used to test this PR ..
>>>
>>>
>>> Spark : 1.6 from Master
>>> Tachyon : 0.7.1
>>>
>>> SparkConfiguration Details :
>>>
>>> SparkConf conf = new SparkConf().setAppName("TestTachyon")
>>> .set("spark.streaming.unpersist", "true")
>>> .set("spark.local.dir", "/mnt1/spark/tincan")
>>> .set("tachyon.zookeeper.address","10.252.5.113:2182")
>>> .set("tachyon.usezookeeper","true")
>>> .set("spark.externalBlockStore.url", "tachyon-ft://
>>> ip-10-252-5-113.asskickery.us:19998")
>>>         .set("spark.externalBlockStore.baseDir", "/sparkstreaming")
>>>         .set("spark.externalBlockStore.folderName","pearson")
>>>         .set("spark.externalBlockStore.dirId", "subpub")
>>>
>>> .set("spark.externalBlockStore.keepExternalBlockStoreDirOnShutdown","true");
>>>
>>> JavaStreamingContext jsc = new JavaStreamingContext(conf, new Duration(
>>> 10000));
>>>
>>> String checkpointDirectory = "hdfs://
>>> 10.252.5.113:9000/user/hadoop/spark/wal";
>>>
>>> jsc.checkpoint(checkpointDirectory);
>>>
>>>
>>> //I am using the My Receiver Based Consumer (
>>> https://github.com/dibbhatt/kafka-spark-consumer) . But
>>> KafkaUtil.CreateStream will also work
>>>
>>> JavaDStream<MessageAndMetadata> unionStreams = ReceiverLauncher.launch(
>>> jsc, props, numberOfReceivers, StorageLevel.OFF_HEAP());
>>>
>>>
>>>
>>>
>>> Regards,
>>> Dibyendu
>>>
>>> On Sat, Sep 26, 2015 at 11:59 AM, N B <nb...@gmail.com> wrote:
>>>
>>>> Hi Dibyendu,
>>>>
>>>> How does one go about configuring spark streaming to use tachyon as its
>>>> place for storing checkpoints? Also, can one do this with tachyon running
>>>> on a completely different node than where spark processes are running?
>>>>
>>>> Thanks
>>>> Nikunj
>>>>
>>>>
>>>> On Thu, May 21, 2015 at 8:35 PM, Dibyendu Bhattacharya <
>>>> dibyendu.bhattachary@gmail.com> wrote:
>>>>
>>>>> Hi Tathagata,
>>>>>
>>>>> Thanks for looking into this. Further investigating I found that the
>>>>> issue is with Tachyon does not support File Append. The streaming receiver
>>>>> which writes to WAL when failed, and again restarted, not able to append to
>>>>> same WAL file after restart.
>>>>>
>>>>> I raised this with Tachyon user group, and Haoyuan told that within 3
>>>>> months time Tachyon file append will be ready. Will revisit this issue
>>>>> again then .
>>>>>
>>>>> Regards,
>>>>> Dibyendu
>>>>>
>>>>>
>>>>> On Fri, May 22, 2015 at 12:24 AM, Tathagata Das <td...@databricks.com>
>>>>> wrote:
>>>>>
>>>>>> Looks like somehow the file size reported by the FSInputDStream of
>>>>>> Tachyon's FileSystem interface, is returning zero.
>>>>>>
>>>>>> On Mon, May 11, 2015 at 4:38 AM, Dibyendu Bhattacharya <
>>>>>> dibyendu.bhattachary@gmail.com> wrote:
>>>>>>
>>>>>>> Just to follow up this thread further .
>>>>>>>
>>>>>>> I was doing some fault tolerant testing of Spark Streaming with
>>>>>>> Tachyon as OFF_HEAP block store. As I said in earlier email, I could able
>>>>>>> to solve the BlockNotFound exception when I used Hierarchical Storage
>>>>>>> of Tachyon ,  which is good.
>>>>>>>
>>>>>>> I continue doing some testing around storing the Spark Streaming WAL
>>>>>>> and CheckPoint files also in Tachyon . Here is few finding ..
>>>>>>>
>>>>>>>
>>>>>>> When I store the Spark Streaming Checkpoint location in Tachyon ,
>>>>>>> the throughput is much higher . I tested the Driver and Receiver failure
>>>>>>> cases , and Spark Streaming is able to recover without any Data Loss on
>>>>>>> Driver failure.
>>>>>>>
>>>>>>> *But on Receiver failure , Spark Streaming looses data* as I see
>>>>>>> Exception while reading the WAL file from Tachyon "receivedData" location
>>>>>>>  for the same Receiver id which just failed.
>>>>>>>
>>>>>>> If I change the Checkpoint location back to HDFS , Spark Streaming
>>>>>>> can recover from both Driver and Receiver failure .
>>>>>>>
>>>>>>> Here is the Log details when Spark Streaming receiver failed ...I
>>>>>>> raised a JIRA for the same issue :
>>>>>>> https://issues.apache.org/jira/browse/SPARK-7525
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> INFO : org.apache.spark.scheduler.DAGScheduler - *Executor lost: 2
>>>>>>> (epoch 1)*
>>>>>>> INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Trying
>>>>>>> to remove executor 2 from BlockManagerMaster.
>>>>>>> INFO : org.apache.spark.storage.BlockManagerMasterEndpoint -
>>>>>>> Removing block manager BlockManagerId(2, 10.252.5.54, 45789)
>>>>>>> INFO : org.apache.spark.storage.BlockManagerMaster - Removed 2
>>>>>>> successfully in removeExecutor
>>>>>>> INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - *Registered
>>>>>>> receiver for stream 2 from 10.252.5.62*:47255
>>>>>>> WARN : org.apache.spark.scheduler.TaskSetManager - Lost task 2.1 in
>>>>>>> stage 103.0 (TID 421, 10.252.5.62): org.apache.spark.SparkException: *Could
>>>>>>> not read data from write ahead log record
>>>>>>> FileBasedWriteAheadLogSegment(tachyon-ft://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919
>>>>>>> <http://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919>)*
>>>>>>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>>>>>>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144)
>>>>>>> at
>>>>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
>>>>>>> at
>>>>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
>>>>>>> at scala.Option.getOrElse(Option.scala:120)
>>>>>>> at
>>>>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168)
>>>>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>>>> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>>>>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>>>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>>>>>>> at
>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>>>>>> at
>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>> at
>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>> at java.lang.Thread.run(Thread.java:744)
>>>>>>> Caused by: java.lang.IllegalArgumentException:* Seek position is
>>>>>>> past EOF: 645603894, fileSize = 0*
>>>>>>> at
>>>>>>> tachyon.hadoop.HdfsFileInputStream.seek(HdfsFileInputStream.java:239)
>>>>>>> at
>>>>>>> org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37)
>>>>>>> at
>>>>>>> org.apache.spark.streaming.util.FileBasedWriteAheadLogRandomReader.read(FileBasedWriteAheadLogRandomReader.scala:37)
>>>>>>> at
>>>>>>> org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:104)
>>>>>>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>>>>>>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141)
>>>>>>> ... 15 more
>>>>>>>
>>>>>>> INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.2
>>>>>>> in stage 103.0 (TID 422, 10.252.5.61, ANY, 1909 bytes)
>>>>>>> INFO : org.apache.spark.scheduler.TaskSetManager - Lost task 2.2 in
>>>>>>> stage 103.0 (TID 422) on executor 10.252.5.61:
>>>>>>> org.apache.spark.SparkException (Could not read data from write ahead log
>>>>>>> record FileBasedWriteAheadLogSegment(tachyon-ft://
>>>>>>> 10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919))
>>>>>>> [duplicate 1]
>>>>>>> INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.3
>>>>>>> in stage 103.0 (TID 423, 10.252.5.62, ANY, 1909 bytes)
>>>>>>> INFO : org.apache.spark.deploy.client.AppClient$ClientActor -
>>>>>>> Executor updated: app-20150511104442-0048/2 is now LOST (worker lost)
>>>>>>> INFO :
>>>>>>> org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend - Executor
>>>>>>> app-20150511104442-0048/2 removed: worker lost
>>>>>>> ERROR:
>>>>>>> org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend - Asked to
>>>>>>> remove non-existent executor 2
>>>>>>> INFO : org.apache.spark.scheduler.TaskSetManager - Lost task 2.3 in
>>>>>>> stage 103.0 (TID 423) on executor 10.252.5.62:
>>>>>>> org.apache.spark.SparkException (Could not read data from write ahead log
>>>>>>> record FileBasedWriteAheadLogSegment(tachyon-ft://
>>>>>>> 10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919))
>>>>>>> [duplicate 2]
>>>>>>> ERROR: org.apache.spark.scheduler.TaskSetManager - Task 2 in stage
>>>>>>> 103.0 failed 4 times; aborting job
>>>>>>> INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Removed
>>>>>>> TaskSet 103.0, whose tasks have all completed, from pool
>>>>>>> INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Cancelling
>>>>>>> stage 103
>>>>>>> INFO : org.apache.spark.scheduler.DAGScheduler - ResultStage 103
>>>>>>> (foreachRDD at Consumer.java:92) failed in 0.943 s
>>>>>>> INFO : org.apache.spark.scheduler.DAGScheduler - Job 120 failed:
>>>>>>> foreachRDD at Consumer.java:92, took 0.953482 s
>>>>>>> ERROR: org.apache.spark.streaming.scheduler.JobScheduler - Error
>>>>>>> running job streaming job 1431341145000 ms.0
>>>>>>> org.apache.spark.SparkException: Job aborted due to stage failure:
>>>>>>> Task 2 in stage 103.0 failed 4 times, most recent failure: Lost task 2.3 in
>>>>>>> stage 103.0 (TID 423, 10.252.5.62): org.apache.spark.SparkException: Could
>>>>>>> not read data from write ahead log record
>>>>>>> FileBasedWriteAheadLogSegment(tachyon-ft://
>>>>>>> 10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919
>>>>>>> )
>>>>>>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>>>>>>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144)
>>>>>>> at
>>>>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
>>>>>>> at
>>>>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
>>>>>>> at scala.Option.getOrElse(Option.scala:120)
>>>>>>> at
>>>>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168)
>>>>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>>>> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>>>>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>>>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>>>>>>> at
>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>>>>>> at
>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>> at
>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>> at java.lang.Thread.run(Thread.java:744)
>>>>>>> Caused by: java.lang.IllegalArgumentException: Seek position is past
>>>>>>> EOF: 645603894, fileSize = 0
>>>>>>> at
>>>>>>> tachyon.hadoop.HdfsFileInputStream.seek(HdfsFileInputStream.java:239)
>>>>>>> at
>>>>>>> org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37)
>>>>>>> at
>>>>>>> org.apache.spark.streaming.util.FileBasedWriteAheadLogRandomReader.read(FileBasedWriteAheadLogRandomReader.scala:37)
>>>>>>> at
>>>>>>> org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:104)
>>>>>>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>>>>>>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141)
>>>>>>> ... 15 more
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, May 8, 2015 at 11:03 PM, Haoyuan Li <ha...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks for the updates!
>>>>>>>>
>>>>>>>> Best,
>>>>>>>>
>>>>>>>> Haoyuan
>>>>>>>>
>>>>>>>> On Fri, May 8, 2015 at 8:40 AM, Dibyendu Bhattacharya <
>>>>>>>> dibyendu.bhattachary@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Just a followup on this Thread .
>>>>>>>>>
>>>>>>>>> I tried Hierarchical Storage on Tachyon (
>>>>>>>>> http://tachyon-project.org/Hierarchy-Storage-on-Tachyon.html ) ,
>>>>>>>>> and that
>>>>>>>>> seems to have worked and I did not see any any Spark Job failed
>>>>>>>>> due to
>>>>>>>>> BlockNotFoundException.
>>>>>>>>> below is my  Hierarchical Storage settings..
>>>>>>>>>
>>>>>>>>>   -Dtachyon.worker.hierarchystore.level.max=2
>>>>>>>>>   -Dtachyon.worker.hierarchystore.level0.alias=MEM
>>>>>>>>>
>>>>>>>>> -Dtachyon.worker.hierarchystore.level0.dirs.path=$TACHYON_RAM_FOLDER
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> -Dtachyon.worker.hierarchystore.level0.dirs.quota=$TACHYON_WORKER_MEMORY_SIZE
>>>>>>>>>   -Dtachyon.worker.hierarchystore.level1.alias=HDD
>>>>>>>>>   -Dtachyon.worker.hierarchystore.level1.dirs.path=/mnt/tachyon
>>>>>>>>>   -Dtachyon.worker.hierarchystore.level1.dirs.quota=50GB
>>>>>>>>>   -Dtachyon.worker.allocate.strategy=MAX_FREE
>>>>>>>>>   -Dtachyon.worker.evict.strategy=LRU
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Dibyendu
>>>>>>>>>
>>>>>>>>> On Thu, May 7, 2015 at 1:46 PM, Dibyendu Bhattacharya <
>>>>>>>>> dibyendu.bhattachary@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>> > Dear All ,
>>>>>>>>> >
>>>>>>>>> > I have been playing with Spark Streaming on Tachyon as the
>>>>>>>>> OFF_HEAP block
>>>>>>>>> > store  . Primary reason for evaluating Tachyon is to find if
>>>>>>>>> Tachyon can
>>>>>>>>> > solve the Spark BlockNotFoundException .
>>>>>>>>> >
>>>>>>>>> > In traditional MEMORY_ONLY StorageLevel, when blocks are evicted
>>>>>>>>> , jobs
>>>>>>>>> > failed due to block not found exception and storing blocks in
>>>>>>>>> > MEMORY_AND_DISK is not a good option either as it impact the
>>>>>>>>> throughput a
>>>>>>>>> > lot .
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> > To test how Tachyon behave , I took the latest spark 1.4 from
>>>>>>>>> master , and
>>>>>>>>> > used Tachyon 0.6.4 and configured Tachyon in Fault Tolerant Mode
>>>>>>>>> . Tachyon
>>>>>>>>> > is running in 3 Node AWS x-large cluster and Spark is running in
>>>>>>>>> 3 node AWS
>>>>>>>>> > x-large cluster.
>>>>>>>>> >
>>>>>>>>> > I have used the low level Receiver based Kafka consumer (
>>>>>>>>> > https://github.com/dibbhatt/kafka-spark-consumer)  which I have
>>>>>>>>> written
>>>>>>>>> > to pull from Kafka and write Blocks to Tachyon
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> > I found there is similar improvement in throughput (as
>>>>>>>>> MEMORY_ONLY case )
>>>>>>>>> > but very good overall memory utilization (as it is off heap
>>>>>>>>> store) .
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> > But I found one issue on which I need to clarification .
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> > In Tachyon case also , I find  BlockNotFoundException  , but due
>>>>>>>>> to a
>>>>>>>>> > different reason .  What I see TachyonBlockManager.scala put the
>>>>>>>>> blocks in
>>>>>>>>> > WriteType.TRY_CACHE configuration . And because of this Blocks
>>>>>>>>> ate evicted
>>>>>>>>> > from Tachyon Cache and when Spark try to find the block it throws
>>>>>>>>> >  BlockNotFoundException .
>>>>>>>>> >
>>>>>>>>> > I see a pull request which discuss the same ..
>>>>>>>>> >
>>>>>>>>> > https://github.com/apache/spark/pull/158#discussion_r11195271
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> > When I modified the WriteType to CACHE_THROUGH ,
>>>>>>>>> BlockDropException is
>>>>>>>>> > gone , but it again impact the throughput ..
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> > Just curious to know , if Tachyon has any settings which can
>>>>>>>>> solve the
>>>>>>>>> > Block Eviction from Cache to Disk, other than explicitly setting
>>>>>>>>> > CACHE_THROUGH  ?
>>>>>>>>> >
>>>>>>>>> > Regards,
>>>>>>>>> > Dibyendu
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Haoyuan Li
>>>>>>>> CEO, Tachyon Nexus <http://www.tachyonnexus.com/>
>>>>>>>> AMPLab, EECS, UC Berkeley http://www.cs.berkeley.edu/~haoyuan/
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Spark Streaming with Tachyon : Data Loss on Receiver Failure due to WAL error

Posted by Dibyendu Bhattacharya <di...@gmail.com>.
In Spark Streaming , Checkpoint Directory is used for two purpose

1. Metadata checkpointing

2. Data checkpointing

If you enable WAL to recover from Driver failure, Spark Streaming will also
write the Received Blocks in WAL which stored in checkpoint directory.

For streaming solution to recover from any failure without any data loss ,
you need to enable Meta Data Check pointing and WAL.  You do not need to
enable Data Check pointing.

>From my experiments and the PR I mentioned , I configured the Meta Data
Check Pointing in HDFS , and stored the Received Blocks OFF_HEAP.  And I
did not use any WAL . The PR I proposed would recover from Driver fail-over
without using any WAL like feature because Blocks are already available in
Tachyon. The Meta Data Checkpoint helps to recover the meta data about past
received blocks.

Now the question is , can I configure Tachyon as my Metadata Checkpoint
location ? I tried that , and Streaming application writes the
receivedBlockMeataData to Tachyon, but on driver failure, it can not
recover the received block meta data from Tachyon. I sometime see Zero size
files in Tachyon checkpoint location , and it can not recover past events .
I need to understand what is the issue of storing meta data in Tachyon .
That needs a different JIRA I guess.

Let me know I am able to explain the current scenario around Spark
Streaming and Tachyon .

Regards,
Dibyendu




On Sat, Sep 26, 2015 at 1:04 PM, N B <nb...@gmail.com> wrote:

> Hi Dibyendu,
>
> I am not sure I understand completely. But are you suggesting that
> currently there is no way to enable Checkpoint directory to be in Tachyon?
>
> Thanks
> Nikunj
>
>
> On Fri, Sep 25, 2015 at 11:49 PM, Dibyendu Bhattacharya <
> dibyendu.bhattachary@gmail.com> wrote:
>
>> Hi,
>>
>> Recently I was working on a PR to use Tachyon as OFF_HEAP store for Spark
>> Streaming and make sure Spark Streaming can recover from Driver failure and
>> recover the blocks form Tachyon.
>>
>> The The Motivation for this PR is  :
>>
>> If Streaming application stores the blocks OFF_HEAP, it may not need any
>> WAL like feature to recover from Driver failure. As long as the writing of
>> blocks to Tachyon from Streaming receiver is durable, it should be
>> recoverable from Tachyon directly on Driver failure.
>> This can solve the issue of expensive WAL write and duplicating the
>> blocks both in MEMORY and also WAL and also guarantee end to end
>> No-Data-Loss channel using OFF_HEAP store.
>>
>> https://github.com/apache/spark/pull/8817
>>
>> This PR still under review . But having done various fail over testing in
>> my environment , I see this PR worked perfectly fine without any data loss
>> . Let see what TD and other have to say on this PR .
>>
>> Below is the configuration I used to test this PR ..
>>
>>
>> Spark : 1.6 from Master
>> Tachyon : 0.7.1
>>
>> SparkConfiguration Details :
>>
>> SparkConf conf = new SparkConf().setAppName("TestTachyon")
>> .set("spark.streaming.unpersist", "true")
>> .set("spark.local.dir", "/mnt1/spark/tincan")
>> .set("tachyon.zookeeper.address","10.252.5.113:2182")
>> .set("tachyon.usezookeeper","true")
>> .set("spark.externalBlockStore.url", "tachyon-ft://
>> ip-10-252-5-113.asskickery.us:19998")
>>         .set("spark.externalBlockStore.baseDir", "/sparkstreaming")
>>         .set("spark.externalBlockStore.folderName","pearson")
>>         .set("spark.externalBlockStore.dirId", "subpub")
>>
>> .set("spark.externalBlockStore.keepExternalBlockStoreDirOnShutdown","true");
>>
>> JavaStreamingContext jsc = new JavaStreamingContext(conf, new Duration(
>> 10000));
>>
>> String checkpointDirectory = "hdfs://
>> 10.252.5.113:9000/user/hadoop/spark/wal";
>>
>> jsc.checkpoint(checkpointDirectory);
>>
>>
>> //I am using the My Receiver Based Consumer (
>> https://github.com/dibbhatt/kafka-spark-consumer) . But
>> KafkaUtil.CreateStream will also work
>>
>> JavaDStream<MessageAndMetadata> unionStreams = ReceiverLauncher.launch(
>> jsc, props, numberOfReceivers, StorageLevel.OFF_HEAP());
>>
>>
>>
>>
>> Regards,
>> Dibyendu
>>
>> On Sat, Sep 26, 2015 at 11:59 AM, N B <nb...@gmail.com> wrote:
>>
>>> Hi Dibyendu,
>>>
>>> How does one go about configuring spark streaming to use tachyon as its
>>> place for storing checkpoints? Also, can one do this with tachyon running
>>> on a completely different node than where spark processes are running?
>>>
>>> Thanks
>>> Nikunj
>>>
>>>
>>> On Thu, May 21, 2015 at 8:35 PM, Dibyendu Bhattacharya <
>>> dibyendu.bhattachary@gmail.com> wrote:
>>>
>>>> Hi Tathagata,
>>>>
>>>> Thanks for looking into this. Further investigating I found that the
>>>> issue is with Tachyon does not support File Append. The streaming receiver
>>>> which writes to WAL when failed, and again restarted, not able to append to
>>>> same WAL file after restart.
>>>>
>>>> I raised this with Tachyon user group, and Haoyuan told that within 3
>>>> months time Tachyon file append will be ready. Will revisit this issue
>>>> again then .
>>>>
>>>> Regards,
>>>> Dibyendu
>>>>
>>>>
>>>> On Fri, May 22, 2015 at 12:24 AM, Tathagata Das <td...@databricks.com>
>>>> wrote:
>>>>
>>>>> Looks like somehow the file size reported by the FSInputDStream of
>>>>> Tachyon's FileSystem interface, is returning zero.
>>>>>
>>>>> On Mon, May 11, 2015 at 4:38 AM, Dibyendu Bhattacharya <
>>>>> dibyendu.bhattachary@gmail.com> wrote:
>>>>>
>>>>>> Just to follow up this thread further .
>>>>>>
>>>>>> I was doing some fault tolerant testing of Spark Streaming with
>>>>>> Tachyon as OFF_HEAP block store. As I said in earlier email, I could able
>>>>>> to solve the BlockNotFound exception when I used Hierarchical Storage
>>>>>> of Tachyon ,  which is good.
>>>>>>
>>>>>> I continue doing some testing around storing the Spark Streaming WAL
>>>>>> and CheckPoint files also in Tachyon . Here is few finding ..
>>>>>>
>>>>>>
>>>>>> When I store the Spark Streaming Checkpoint location in Tachyon , the
>>>>>> throughput is much higher . I tested the Driver and Receiver failure cases
>>>>>> , and Spark Streaming is able to recover without any Data Loss on Driver
>>>>>> failure.
>>>>>>
>>>>>> *But on Receiver failure , Spark Streaming looses data* as I see
>>>>>> Exception while reading the WAL file from Tachyon "receivedData" location
>>>>>>  for the same Receiver id which just failed.
>>>>>>
>>>>>> If I change the Checkpoint location back to HDFS , Spark Streaming
>>>>>> can recover from both Driver and Receiver failure .
>>>>>>
>>>>>> Here is the Log details when Spark Streaming receiver failed ...I
>>>>>> raised a JIRA for the same issue :
>>>>>> https://issues.apache.org/jira/browse/SPARK-7525
>>>>>>
>>>>>>
>>>>>>
>>>>>> INFO : org.apache.spark.scheduler.DAGScheduler - *Executor lost: 2
>>>>>> (epoch 1)*
>>>>>> INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Trying
>>>>>> to remove executor 2 from BlockManagerMaster.
>>>>>> INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Removing
>>>>>> block manager BlockManagerId(2, 10.252.5.54, 45789)
>>>>>> INFO : org.apache.spark.storage.BlockManagerMaster - Removed 2
>>>>>> successfully in removeExecutor
>>>>>> INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - *Registered
>>>>>> receiver for stream 2 from 10.252.5.62*:47255
>>>>>> WARN : org.apache.spark.scheduler.TaskSetManager - Lost task 2.1 in
>>>>>> stage 103.0 (TID 421, 10.252.5.62): org.apache.spark.SparkException: *Could
>>>>>> not read data from write ahead log record
>>>>>> FileBasedWriteAheadLogSegment(tachyon-ft://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919
>>>>>> <http://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919>)*
>>>>>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>>>>>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144)
>>>>>> at
>>>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
>>>>>> at
>>>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
>>>>>> at scala.Option.getOrElse(Option.scala:120)
>>>>>> at
>>>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168)
>>>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>>> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>>>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>>>>>> at
>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>>>>> at
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>> at
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>> at java.lang.Thread.run(Thread.java:744)
>>>>>> Caused by: java.lang.IllegalArgumentException:* Seek position is
>>>>>> past EOF: 645603894, fileSize = 0*
>>>>>> at
>>>>>> tachyon.hadoop.HdfsFileInputStream.seek(HdfsFileInputStream.java:239)
>>>>>> at
>>>>>> org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37)
>>>>>> at
>>>>>> org.apache.spark.streaming.util.FileBasedWriteAheadLogRandomReader.read(FileBasedWriteAheadLogRandomReader.scala:37)
>>>>>> at
>>>>>> org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:104)
>>>>>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>>>>>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141)
>>>>>> ... 15 more
>>>>>>
>>>>>> INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.2
>>>>>> in stage 103.0 (TID 422, 10.252.5.61, ANY, 1909 bytes)
>>>>>> INFO : org.apache.spark.scheduler.TaskSetManager - Lost task 2.2 in
>>>>>> stage 103.0 (TID 422) on executor 10.252.5.61:
>>>>>> org.apache.spark.SparkException (Could not read data from write ahead log
>>>>>> record FileBasedWriteAheadLogSegment(tachyon-ft://
>>>>>> 10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919))
>>>>>> [duplicate 1]
>>>>>> INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.3
>>>>>> in stage 103.0 (TID 423, 10.252.5.62, ANY, 1909 bytes)
>>>>>> INFO : org.apache.spark.deploy.client.AppClient$ClientActor -
>>>>>> Executor updated: app-20150511104442-0048/2 is now LOST (worker lost)
>>>>>> INFO : org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend
>>>>>> - Executor app-20150511104442-0048/2 removed: worker lost
>>>>>> ERROR: org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend
>>>>>> - Asked to remove non-existent executor 2
>>>>>> INFO : org.apache.spark.scheduler.TaskSetManager - Lost task 2.3 in
>>>>>> stage 103.0 (TID 423) on executor 10.252.5.62:
>>>>>> org.apache.spark.SparkException (Could not read data from write ahead log
>>>>>> record FileBasedWriteAheadLogSegment(tachyon-ft://
>>>>>> 10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919))
>>>>>> [duplicate 2]
>>>>>> ERROR: org.apache.spark.scheduler.TaskSetManager - Task 2 in stage
>>>>>> 103.0 failed 4 times; aborting job
>>>>>> INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Removed TaskSet
>>>>>> 103.0, whose tasks have all completed, from pool
>>>>>> INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Cancelling
>>>>>> stage 103
>>>>>> INFO : org.apache.spark.scheduler.DAGScheduler - ResultStage 103
>>>>>> (foreachRDD at Consumer.java:92) failed in 0.943 s
>>>>>> INFO : org.apache.spark.scheduler.DAGScheduler - Job 120 failed:
>>>>>> foreachRDD at Consumer.java:92, took 0.953482 s
>>>>>> ERROR: org.apache.spark.streaming.scheduler.JobScheduler - Error
>>>>>> running job streaming job 1431341145000 ms.0
>>>>>> org.apache.spark.SparkException: Job aborted due to stage failure:
>>>>>> Task 2 in stage 103.0 failed 4 times, most recent failure: Lost task 2.3 in
>>>>>> stage 103.0 (TID 423, 10.252.5.62): org.apache.spark.SparkException: Could
>>>>>> not read data from write ahead log record
>>>>>> FileBasedWriteAheadLogSegment(tachyon-ft://
>>>>>> 10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919
>>>>>> )
>>>>>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>>>>>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144)
>>>>>> at
>>>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
>>>>>> at
>>>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
>>>>>> at scala.Option.getOrElse(Option.scala:120)
>>>>>> at
>>>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168)
>>>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>>> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>>>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>>>>>> at
>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>>>>> at
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>> at
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>> at java.lang.Thread.run(Thread.java:744)
>>>>>> Caused by: java.lang.IllegalArgumentException: Seek position is past
>>>>>> EOF: 645603894, fileSize = 0
>>>>>> at
>>>>>> tachyon.hadoop.HdfsFileInputStream.seek(HdfsFileInputStream.java:239)
>>>>>> at
>>>>>> org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37)
>>>>>> at
>>>>>> org.apache.spark.streaming.util.FileBasedWriteAheadLogRandomReader.read(FileBasedWriteAheadLogRandomReader.scala:37)
>>>>>> at
>>>>>> org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:104)
>>>>>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>>>>>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141)
>>>>>> ... 15 more
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, May 8, 2015 at 11:03 PM, Haoyuan Li <ha...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks for the updates!
>>>>>>>
>>>>>>> Best,
>>>>>>>
>>>>>>> Haoyuan
>>>>>>>
>>>>>>> On Fri, May 8, 2015 at 8:40 AM, Dibyendu Bhattacharya <
>>>>>>> dibyendu.bhattachary@gmail.com> wrote:
>>>>>>>
>>>>>>>> Just a followup on this Thread .
>>>>>>>>
>>>>>>>> I tried Hierarchical Storage on Tachyon (
>>>>>>>> http://tachyon-project.org/Hierarchy-Storage-on-Tachyon.html ) ,
>>>>>>>> and that
>>>>>>>> seems to have worked and I did not see any any Spark Job failed due
>>>>>>>> to
>>>>>>>> BlockNotFoundException.
>>>>>>>> below is my  Hierarchical Storage settings..
>>>>>>>>
>>>>>>>>   -Dtachyon.worker.hierarchystore.level.max=2
>>>>>>>>   -Dtachyon.worker.hierarchystore.level0.alias=MEM
>>>>>>>>
>>>>>>>> -Dtachyon.worker.hierarchystore.level0.dirs.path=$TACHYON_RAM_FOLDER
>>>>>>>>
>>>>>>>>
>>>>>>>> -Dtachyon.worker.hierarchystore.level0.dirs.quota=$TACHYON_WORKER_MEMORY_SIZE
>>>>>>>>   -Dtachyon.worker.hierarchystore.level1.alias=HDD
>>>>>>>>   -Dtachyon.worker.hierarchystore.level1.dirs.path=/mnt/tachyon
>>>>>>>>   -Dtachyon.worker.hierarchystore.level1.dirs.quota=50GB
>>>>>>>>   -Dtachyon.worker.allocate.strategy=MAX_FREE
>>>>>>>>   -Dtachyon.worker.evict.strategy=LRU
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Dibyendu
>>>>>>>>
>>>>>>>> On Thu, May 7, 2015 at 1:46 PM, Dibyendu Bhattacharya <
>>>>>>>> dibyendu.bhattachary@gmail.com> wrote:
>>>>>>>>
>>>>>>>> > Dear All ,
>>>>>>>> >
>>>>>>>> > I have been playing with Spark Streaming on Tachyon as the
>>>>>>>> OFF_HEAP block
>>>>>>>> > store  . Primary reason for evaluating Tachyon is to find if
>>>>>>>> Tachyon can
>>>>>>>> > solve the Spark BlockNotFoundException .
>>>>>>>> >
>>>>>>>> > In traditional MEMORY_ONLY StorageLevel, when blocks are evicted
>>>>>>>> , jobs
>>>>>>>> > failed due to block not found exception and storing blocks in
>>>>>>>> > MEMORY_AND_DISK is not a good option either as it impact the
>>>>>>>> throughput a
>>>>>>>> > lot .
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > To test how Tachyon behave , I took the latest spark 1.4 from
>>>>>>>> master , and
>>>>>>>> > used Tachyon 0.6.4 and configured Tachyon in Fault Tolerant Mode
>>>>>>>> . Tachyon
>>>>>>>> > is running in 3 Node AWS x-large cluster and Spark is running in
>>>>>>>> 3 node AWS
>>>>>>>> > x-large cluster.
>>>>>>>> >
>>>>>>>> > I have used the low level Receiver based Kafka consumer (
>>>>>>>> > https://github.com/dibbhatt/kafka-spark-consumer)  which I have
>>>>>>>> written
>>>>>>>> > to pull from Kafka and write Blocks to Tachyon
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > I found there is similar improvement in throughput (as
>>>>>>>> MEMORY_ONLY case )
>>>>>>>> > but very good overall memory utilization (as it is off heap
>>>>>>>> store) .
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > But I found one issue on which I need to clarification .
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > In Tachyon case also , I find  BlockNotFoundException  , but due
>>>>>>>> to a
>>>>>>>> > different reason .  What I see TachyonBlockManager.scala put the
>>>>>>>> blocks in
>>>>>>>> > WriteType.TRY_CACHE configuration . And because of this Blocks
>>>>>>>> ate evicted
>>>>>>>> > from Tachyon Cache and when Spark try to find the block it throws
>>>>>>>> >  BlockNotFoundException .
>>>>>>>> >
>>>>>>>> > I see a pull request which discuss the same ..
>>>>>>>> >
>>>>>>>> > https://github.com/apache/spark/pull/158#discussion_r11195271
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > When I modified the WriteType to CACHE_THROUGH ,
>>>>>>>> BlockDropException is
>>>>>>>> > gone , but it again impact the throughput ..
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > Just curious to know , if Tachyon has any settings which can
>>>>>>>> solve the
>>>>>>>> > Block Eviction from Cache to Disk, other than explicitly setting
>>>>>>>> > CACHE_THROUGH  ?
>>>>>>>> >
>>>>>>>> > Regards,
>>>>>>>> > Dibyendu
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Haoyuan Li
>>>>>>> CEO, Tachyon Nexus <http://www.tachyonnexus.com/>
>>>>>>> AMPLab, EECS, UC Berkeley http://www.cs.berkeley.edu/~haoyuan/
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Spark Streaming with Tachyon : Data Loss on Receiver Failure due to WAL error

Posted by N B <nb...@gmail.com>.
Hi Dibyendu,

I am not sure I understand completely. But are you suggesting that
currently there is no way to enable Checkpoint directory to be in Tachyon?

Thanks
Nikunj


On Fri, Sep 25, 2015 at 11:49 PM, Dibyendu Bhattacharya <
dibyendu.bhattachary@gmail.com> wrote:

> Hi,
>
> Recently I was working on a PR to use Tachyon as OFF_HEAP store for Spark
> Streaming and make sure Spark Streaming can recover from Driver failure and
> recover the blocks form Tachyon.
>
> The The Motivation for this PR is  :
>
> If Streaming application stores the blocks OFF_HEAP, it may not need any
> WAL like feature to recover from Driver failure. As long as the writing of
> blocks to Tachyon from Streaming receiver is durable, it should be
> recoverable from Tachyon directly on Driver failure.
> This can solve the issue of expensive WAL write and duplicating the blocks
> both in MEMORY and also WAL and also guarantee end to end No-Data-Loss
> channel using OFF_HEAP store.
>
> https://github.com/apache/spark/pull/8817
>
> This PR still under review . But having done various fail over testing in
> my environment , I see this PR worked perfectly fine without any data loss
> . Let see what TD and other have to say on this PR .
>
> Below is the configuration I used to test this PR ..
>
>
> Spark : 1.6 from Master
> Tachyon : 0.7.1
>
> SparkConfiguration Details :
>
> SparkConf conf = new SparkConf().setAppName("TestTachyon")
> .set("spark.streaming.unpersist", "true")
> .set("spark.local.dir", "/mnt1/spark/tincan")
> .set("tachyon.zookeeper.address","10.252.5.113:2182")
> .set("tachyon.usezookeeper","true")
> .set("spark.externalBlockStore.url", "tachyon-ft://
> ip-10-252-5-113.asskickery.us:19998")
>         .set("spark.externalBlockStore.baseDir", "/sparkstreaming")
>         .set("spark.externalBlockStore.folderName","pearson")
>         .set("spark.externalBlockStore.dirId", "subpub")
>
> .set("spark.externalBlockStore.keepExternalBlockStoreDirOnShutdown","true");
>
> JavaStreamingContext jsc = new JavaStreamingContext(conf, new Duration(
> 10000));
>
> String checkpointDirectory = "hdfs://
> 10.252.5.113:9000/user/hadoop/spark/wal";
>
> jsc.checkpoint(checkpointDirectory);
>
>
> //I am using the My Receiver Based Consumer (
> https://github.com/dibbhatt/kafka-spark-consumer) . But
> KafkaUtil.CreateStream will also work
>
> JavaDStream<MessageAndMetadata> unionStreams = ReceiverLauncher.launch(
> jsc, props, numberOfReceivers, StorageLevel.OFF_HEAP());
>
>
>
>
> Regards,
> Dibyendu
>
> On Sat, Sep 26, 2015 at 11:59 AM, N B <nb...@gmail.com> wrote:
>
>> Hi Dibyendu,
>>
>> How does one go about configuring spark streaming to use tachyon as its
>> place for storing checkpoints? Also, can one do this with tachyon running
>> on a completely different node than where spark processes are running?
>>
>> Thanks
>> Nikunj
>>
>>
>> On Thu, May 21, 2015 at 8:35 PM, Dibyendu Bhattacharya <
>> dibyendu.bhattachary@gmail.com> wrote:
>>
>>> Hi Tathagata,
>>>
>>> Thanks for looking into this. Further investigating I found that the
>>> issue is with Tachyon does not support File Append. The streaming receiver
>>> which writes to WAL when failed, and again restarted, not able to append to
>>> same WAL file after restart.
>>>
>>> I raised this with Tachyon user group, and Haoyuan told that within 3
>>> months time Tachyon file append will be ready. Will revisit this issue
>>> again then .
>>>
>>> Regards,
>>> Dibyendu
>>>
>>>
>>> On Fri, May 22, 2015 at 12:24 AM, Tathagata Das <td...@databricks.com>
>>> wrote:
>>>
>>>> Looks like somehow the file size reported by the FSInputDStream of
>>>> Tachyon's FileSystem interface, is returning zero.
>>>>
>>>> On Mon, May 11, 2015 at 4:38 AM, Dibyendu Bhattacharya <
>>>> dibyendu.bhattachary@gmail.com> wrote:
>>>>
>>>>> Just to follow up this thread further .
>>>>>
>>>>> I was doing some fault tolerant testing of Spark Streaming with
>>>>> Tachyon as OFF_HEAP block store. As I said in earlier email, I could able
>>>>> to solve the BlockNotFound exception when I used Hierarchical Storage
>>>>> of Tachyon ,  which is good.
>>>>>
>>>>> I continue doing some testing around storing the Spark Streaming WAL
>>>>> and CheckPoint files also in Tachyon . Here is few finding ..
>>>>>
>>>>>
>>>>> When I store the Spark Streaming Checkpoint location in Tachyon , the
>>>>> throughput is much higher . I tested the Driver and Receiver failure cases
>>>>> , and Spark Streaming is able to recover without any Data Loss on Driver
>>>>> failure.
>>>>>
>>>>> *But on Receiver failure , Spark Streaming looses data* as I see
>>>>> Exception while reading the WAL file from Tachyon "receivedData" location
>>>>>  for the same Receiver id which just failed.
>>>>>
>>>>> If I change the Checkpoint location back to HDFS , Spark Streaming can
>>>>> recover from both Driver and Receiver failure .
>>>>>
>>>>> Here is the Log details when Spark Streaming receiver failed ...I
>>>>> raised a JIRA for the same issue :
>>>>> https://issues.apache.org/jira/browse/SPARK-7525
>>>>>
>>>>>
>>>>>
>>>>> INFO : org.apache.spark.scheduler.DAGScheduler - *Executor lost: 2
>>>>> (epoch 1)*
>>>>> INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Trying to
>>>>> remove executor 2 from BlockManagerMaster.
>>>>> INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Removing
>>>>> block manager BlockManagerId(2, 10.252.5.54, 45789)
>>>>> INFO : org.apache.spark.storage.BlockManagerMaster - Removed 2
>>>>> successfully in removeExecutor
>>>>> INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - *Registered
>>>>> receiver for stream 2 from 10.252.5.62*:47255
>>>>> WARN : org.apache.spark.scheduler.TaskSetManager - Lost task 2.1 in
>>>>> stage 103.0 (TID 421, 10.252.5.62): org.apache.spark.SparkException: *Could
>>>>> not read data from write ahead log record
>>>>> FileBasedWriteAheadLogSegment(tachyon-ft://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919
>>>>> <http://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919>)*
>>>>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>>>>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144)
>>>>> at
>>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
>>>>> at
>>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
>>>>> at scala.Option.getOrElse(Option.scala:120)
>>>>> at
>>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168)
>>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>>>>> at
>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>> at java.lang.Thread.run(Thread.java:744)
>>>>> Caused by: java.lang.IllegalArgumentException:* Seek position is past
>>>>> EOF: 645603894, fileSize = 0*
>>>>> at
>>>>> tachyon.hadoop.HdfsFileInputStream.seek(HdfsFileInputStream.java:239)
>>>>> at
>>>>> org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37)
>>>>> at
>>>>> org.apache.spark.streaming.util.FileBasedWriteAheadLogRandomReader.read(FileBasedWriteAheadLogRandomReader.scala:37)
>>>>> at
>>>>> org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:104)
>>>>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>>>>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141)
>>>>> ... 15 more
>>>>>
>>>>> INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.2
>>>>> in stage 103.0 (TID 422, 10.252.5.61, ANY, 1909 bytes)
>>>>> INFO : org.apache.spark.scheduler.TaskSetManager - Lost task 2.2 in
>>>>> stage 103.0 (TID 422) on executor 10.252.5.61:
>>>>> org.apache.spark.SparkException (Could not read data from write ahead log
>>>>> record FileBasedWriteAheadLogSegment(tachyon-ft://
>>>>> 10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919))
>>>>> [duplicate 1]
>>>>> INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.3
>>>>> in stage 103.0 (TID 423, 10.252.5.62, ANY, 1909 bytes)
>>>>> INFO : org.apache.spark.deploy.client.AppClient$ClientActor - Executor
>>>>> updated: app-20150511104442-0048/2 is now LOST (worker lost)
>>>>> INFO : org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend
>>>>> - Executor app-20150511104442-0048/2 removed: worker lost
>>>>> ERROR: org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend
>>>>> - Asked to remove non-existent executor 2
>>>>> INFO : org.apache.spark.scheduler.TaskSetManager - Lost task 2.3 in
>>>>> stage 103.0 (TID 423) on executor 10.252.5.62:
>>>>> org.apache.spark.SparkException (Could not read data from write ahead log
>>>>> record FileBasedWriteAheadLogSegment(tachyon-ft://
>>>>> 10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919))
>>>>> [duplicate 2]
>>>>> ERROR: org.apache.spark.scheduler.TaskSetManager - Task 2 in stage
>>>>> 103.0 failed 4 times; aborting job
>>>>> INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Removed TaskSet
>>>>> 103.0, whose tasks have all completed, from pool
>>>>> INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Cancelling stage
>>>>> 103
>>>>> INFO : org.apache.spark.scheduler.DAGScheduler - ResultStage 103
>>>>> (foreachRDD at Consumer.java:92) failed in 0.943 s
>>>>> INFO : org.apache.spark.scheduler.DAGScheduler - Job 120 failed:
>>>>> foreachRDD at Consumer.java:92, took 0.953482 s
>>>>> ERROR: org.apache.spark.streaming.scheduler.JobScheduler - Error
>>>>> running job streaming job 1431341145000 ms.0
>>>>> org.apache.spark.SparkException: Job aborted due to stage failure:
>>>>> Task 2 in stage 103.0 failed 4 times, most recent failure: Lost task 2.3 in
>>>>> stage 103.0 (TID 423, 10.252.5.62): org.apache.spark.SparkException: Could
>>>>> not read data from write ahead log record
>>>>> FileBasedWriteAheadLogSegment(tachyon-ft://
>>>>> 10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919
>>>>> )
>>>>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>>>>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144)
>>>>> at
>>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
>>>>> at
>>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
>>>>> at scala.Option.getOrElse(Option.scala:120)
>>>>> at
>>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168)
>>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>>>>> at
>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>> at java.lang.Thread.run(Thread.java:744)
>>>>> Caused by: java.lang.IllegalArgumentException: Seek position is past
>>>>> EOF: 645603894, fileSize = 0
>>>>> at
>>>>> tachyon.hadoop.HdfsFileInputStream.seek(HdfsFileInputStream.java:239)
>>>>> at
>>>>> org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37)
>>>>> at
>>>>> org.apache.spark.streaming.util.FileBasedWriteAheadLogRandomReader.read(FileBasedWriteAheadLogRandomReader.scala:37)
>>>>> at
>>>>> org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:104)
>>>>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>>>>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141)
>>>>> ... 15 more
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, May 8, 2015 at 11:03 PM, Haoyuan Li <ha...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Thanks for the updates!
>>>>>>
>>>>>> Best,
>>>>>>
>>>>>> Haoyuan
>>>>>>
>>>>>> On Fri, May 8, 2015 at 8:40 AM, Dibyendu Bhattacharya <
>>>>>> dibyendu.bhattachary@gmail.com> wrote:
>>>>>>
>>>>>>> Just a followup on this Thread .
>>>>>>>
>>>>>>> I tried Hierarchical Storage on Tachyon (
>>>>>>> http://tachyon-project.org/Hierarchy-Storage-on-Tachyon.html ) ,
>>>>>>> and that
>>>>>>> seems to have worked and I did not see any any Spark Job failed due
>>>>>>> to
>>>>>>> BlockNotFoundException.
>>>>>>> below is my  Hierarchical Storage settings..
>>>>>>>
>>>>>>>   -Dtachyon.worker.hierarchystore.level.max=2
>>>>>>>   -Dtachyon.worker.hierarchystore.level0.alias=MEM
>>>>>>>
>>>>>>> -Dtachyon.worker.hierarchystore.level0.dirs.path=$TACHYON_RAM_FOLDER
>>>>>>>
>>>>>>>
>>>>>>> -Dtachyon.worker.hierarchystore.level0.dirs.quota=$TACHYON_WORKER_MEMORY_SIZE
>>>>>>>   -Dtachyon.worker.hierarchystore.level1.alias=HDD
>>>>>>>   -Dtachyon.worker.hierarchystore.level1.dirs.path=/mnt/tachyon
>>>>>>>   -Dtachyon.worker.hierarchystore.level1.dirs.quota=50GB
>>>>>>>   -Dtachyon.worker.allocate.strategy=MAX_FREE
>>>>>>>   -Dtachyon.worker.evict.strategy=LRU
>>>>>>>
>>>>>>> Regards,
>>>>>>> Dibyendu
>>>>>>>
>>>>>>> On Thu, May 7, 2015 at 1:46 PM, Dibyendu Bhattacharya <
>>>>>>> dibyendu.bhattachary@gmail.com> wrote:
>>>>>>>
>>>>>>> > Dear All ,
>>>>>>> >
>>>>>>> > I have been playing with Spark Streaming on Tachyon as the
>>>>>>> OFF_HEAP block
>>>>>>> > store  . Primary reason for evaluating Tachyon is to find if
>>>>>>> Tachyon can
>>>>>>> > solve the Spark BlockNotFoundException .
>>>>>>> >
>>>>>>> > In traditional MEMORY_ONLY StorageLevel, when blocks are evicted ,
>>>>>>> jobs
>>>>>>> > failed due to block not found exception and storing blocks in
>>>>>>> > MEMORY_AND_DISK is not a good option either as it impact the
>>>>>>> throughput a
>>>>>>> > lot .
>>>>>>> >
>>>>>>> >
>>>>>>> > To test how Tachyon behave , I took the latest spark 1.4 from
>>>>>>> master , and
>>>>>>> > used Tachyon 0.6.4 and configured Tachyon in Fault Tolerant Mode .
>>>>>>> Tachyon
>>>>>>> > is running in 3 Node AWS x-large cluster and Spark is running in 3
>>>>>>> node AWS
>>>>>>> > x-large cluster.
>>>>>>> >
>>>>>>> > I have used the low level Receiver based Kafka consumer (
>>>>>>> > https://github.com/dibbhatt/kafka-spark-consumer)  which I have
>>>>>>> written
>>>>>>> > to pull from Kafka and write Blocks to Tachyon
>>>>>>> >
>>>>>>> >
>>>>>>> > I found there is similar improvement in throughput (as MEMORY_ONLY
>>>>>>> case )
>>>>>>> > but very good overall memory utilization (as it is off heap store)
>>>>>>> .
>>>>>>> >
>>>>>>> >
>>>>>>> > But I found one issue on which I need to clarification .
>>>>>>> >
>>>>>>> >
>>>>>>> > In Tachyon case also , I find  BlockNotFoundException  , but due
>>>>>>> to a
>>>>>>> > different reason .  What I see TachyonBlockManager.scala put the
>>>>>>> blocks in
>>>>>>> > WriteType.TRY_CACHE configuration . And because of this Blocks ate
>>>>>>> evicted
>>>>>>> > from Tachyon Cache and when Spark try to find the block it throws
>>>>>>> >  BlockNotFoundException .
>>>>>>> >
>>>>>>> > I see a pull request which discuss the same ..
>>>>>>> >
>>>>>>> > https://github.com/apache/spark/pull/158#discussion_r11195271
>>>>>>> >
>>>>>>> >
>>>>>>> > When I modified the WriteType to CACHE_THROUGH ,
>>>>>>> BlockDropException is
>>>>>>> > gone , but it again impact the throughput ..
>>>>>>> >
>>>>>>> >
>>>>>>> > Just curious to know , if Tachyon has any settings which can solve
>>>>>>> the
>>>>>>> > Block Eviction from Cache to Disk, other than explicitly setting
>>>>>>> > CACHE_THROUGH  ?
>>>>>>> >
>>>>>>> > Regards,
>>>>>>> > Dibyendu
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Haoyuan Li
>>>>>> CEO, Tachyon Nexus <http://www.tachyonnexus.com/>
>>>>>> AMPLab, EECS, UC Berkeley http://www.cs.berkeley.edu/~haoyuan/
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>