You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Arijit <Ar...@live.com> on 2016/11/07 22:04:41 UTC

Spark Streaming Data loss on failure to write BlockAdditionEvent failure to WAL

Hello All,


We are using Spark 1.6.2 with WAL enabled and encountering data loss when the following exception/warning happens. We are using HDFS as our checkpoint directory.


Questions are:


1. Is this a bug in Spark or issue with our configuration? Source looks like the following. Which file already exist or who is suppose to set hdfs.append.support configuration? Why doesn't it happen all the time?


private[streaming] object HdfsUtils {

  def getOutputStream(path: String, conf: Configuration): FSDataOutputStream = {
    val dfsPath = new Path(path)
    val dfs = getFileSystemForPath(dfsPath, conf)
    // If the file exists and we have append support, append instead of creating a new file
    val stream: FSDataOutputStream = {
      if (dfs.isFile(dfsPath)) {
        if (conf.getBoolean("hdfs.append.support", false) || dfs.isInstanceOf[RawLocalFileSystem]) {
          dfs.append(dfsPath)
        } else {
          throw new IllegalStateException("File exists and there is no append support!")
        }
      } else {
        dfs.create(dfsPath)
      }
    }
    stream
  }


2. Why does the job not retry and eventually fail when this error occurs? The job skips processing the exact number of events dumped in the log. For this particular example I see 987 + 4686 events were not processed and are lost for ever (does not recover even on restart).



16/11/07 21:23:39 ERROR WriteAheadLogManager  for Thread: Failed to write to write ahead log after 3 failures
16/11/07 21:23:39 WARN BatchedWriteAheadLog: BatchedWriteAheadLog Writer failed to write ArrayBuffer(Record(java.nio.HeapByteBuffer[pos=1212 lim=1212 cap=1212],1478553818985,scala.concurrent.impl.Promise$DefaultPromise@5ce88cb6), Record(
java.nio.HeapByteBuffer[pos=1212 lim=1212 cap=1212],1478553818985,scala.concurrent.impl.Promise$DefaultPromise@6d8f1feb))
java.lang.IllegalStateException: File exists and there is no append support!
        at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream(FileBasedWriteAheadLogWriter.scala:33)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<init>(FileBasedWriteAheadLogWriter.scala:41)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:217)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:86)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:48)
        at org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:173)
        at org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:140)
        at java.lang.Thread.run(Thread.java:745)
16/11/07 21:23:39 WARN ReceivedBlockTracker: Exception thrown while writing record: BlockAdditionEvent(ReceivedBlockInfo(2,Some(987),None,WriteAheadLogBasedStoreResult(input-2-1478553647101,Some(987),FileBasedWriteAheadLogSegment(hdfs://
mycluster/EventCheckpoint-30-8-16-3/receivedData/2/log-1478553818621-1478553878621,0,41597)))) to the WriteAheadLog.
java.lang.IllegalStateException: File exists and there is no append support!
        at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream(FileBasedWriteAheadLogWriter.scala:33)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<init>(FileBasedWriteAheadLogWriter.scala:41)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:217)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:86)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:48)
        at org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:173)
        at org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:140)
        at java.lang.Thread.run(Thread.java:745)
16/11/07 21:23:39 WARN ReceivedBlockTracker: Exception thrown while writing record: BlockAdditionEvent(ReceivedBlockInfo(3,Some(4686),None,WriteAheadLogBasedStoreResult(input-3-1478553647042,Some(4686),FileBasedWriteAheadLogSegment(hdfs:
//mycluster/EventCheckpoint-30-8-16-3/receivedData/3/log-1478553818624-1478553878624,0,197473)))) to the WriteAheadLog.
java.lang.IllegalStateException: File exists and there is no append support!
        at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream(FileBasedWriteAheadLogWriter.scala:33)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<init>(FileBasedWriteAheadLogWriter.scala:41)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:217)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:86)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:48)
        at org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:173)
        at org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:140)
        at java.lang.Thread.run(Thread.java:745)
16/11/07 21:23:39 INFO BlockManagerInfo: Added input-3-1478553647043 in memory on 10.0.0.11:42316 (size: 283.1 KB, free: 2.6 GB)

I am sure Spark Streaming is not expected to lose data when WAL is enabled. So what are we doing wrong here?

Thanks, Arijit


Re: Spark Streaming Data loss on failure to write BlockAdditionEvent failure to WAL

Posted by Dirceu Semighini Filho <di...@gmail.com>.
Nice, thank you I'll test this property to see if the error stop;


2016-11-17 14:48 GMT-02:00 Arijit <Ar...@live.com>:

> Hi Dirceu,
>
>
> For the append issue we are setting "hdfs.append.support" (from Spark code
> which reads HDFS configuration) to "true" in hdfs-site.xml and that seemed
> to have solved the issue. Of course we are using HDFS which does support
> append. I think the actual configuration Spark should check is
> "dfs.support.append".
>
>
> I believe failure is intermittent since in most cases a new file is
> created to store the block addition event. I need to look into the code
> again to see when these files are created new and when they are appended.
>
>
> Thanks, Arijit
>
>
> ------------------------------
> *From:* Dirceu Semighini Filho <di...@gmail.com>
> *Sent:* Thursday, November 17, 2016 6:50:28 AM
> *To:* Arijit
> *Cc:* Tathagata Das; user@spark.apache.org
>
> *Subject:* Re: Spark Streaming Data loss on failure to write
> BlockAdditionEvent failure to WAL
>
> Hi Arijit,
> Have you find a solution for this? I'm facing the same problem in Spark
> 1.6.1, but here the error happens only a few times, so our hdfs does
> support append.
> This is what I can see in the logs:
> 2016-11-17 13:43:20,012 ERROR [BatchedWriteAheadLog Writer]
> WriteAheadLogManager  for Thread: Failed to write to write ahead log after
> 3 failures
>
>
>
>
> 2016-11-08 14:47 GMT-02:00 Arijit <Ar...@live.com>:
>
>> Thanks TD.
>>
>>
>> Is "hdfs.append.support" a standard configuration? I see a seemingly
>> equivalent configuration "dfs.support.append" that is used in our
>> version of HDFS.
>>
>>
>> In case we want to use a pseudo file-system (like S3)  which does not
>> support append what are our options? I am not familiar with the code yet
>> but is it possible to generate a new file whenever conflict of this sort
>> happens?
>>
>>
>> Thanks again, Arijit
>> ------------------------------
>> *From:* Tathagata Das <ta...@gmail.com>
>> *Sent:* Monday, November 7, 2016 7:59:06 PM
>> *To:* Arijit
>> *Cc:* user@spark.apache.org
>> *Subject:* Re: Spark Streaming Data loss on failure to write
>> BlockAdditionEvent failure to WAL
>>
>> For WAL in Spark to work with HDFS, the HDFS version you are running must
>> support file appends. Contact your HDFS package/installation provider to
>> figure out whether this is supported by your HDFS installation.
>>
>> On Mon, Nov 7, 2016 at 2:04 PM, Arijit <Ar...@live.com> wrote:
>>
>>> Hello All,
>>>
>>>
>>> We are using Spark 1.6.2 with WAL enabled and encountering data loss
>>> when the following exception/warning happens. We are using HDFS as our
>>> checkpoint directory.
>>>
>>>
>>> Questions are:
>>>
>>>
>>> 1. Is this a bug in Spark or issue with our configuration? Source looks
>>> like the following. Which file already exist or who is suppose to set
>>> hdfs.append.support configuration? Why doesn't it happen all the time?
>>>
>>>
>>> private[streaming] object HdfsUtils {
>>>
>>>   def getOutputStream(path: String, conf: Configuration): FSDataOutputStream = {
>>>     val dfsPath = new Path(path)
>>>     val dfs = getFileSystemForPath(dfsPath, conf)
>>>     // If the file exists and we have append support, append instead of creating a new file
>>>     val stream: FSDataOutputStream = {
>>>       if (dfs.isFile(dfsPath)) {
>>>         if (conf.getBoolean("hdfs.append.support", false) || dfs.isInstanceOf[RawLocalFileSystem]) {
>>>           dfs.append(dfsPath)
>>>         } else {
>>>           throw new IllegalStateException("File exists and there is no append support!")
>>>         }
>>>       } else {
>>>         dfs.create(dfsPath)
>>>       }
>>>     }
>>>     stream
>>>   }
>>>
>>>
>>> 2. Why does the job not retry and eventually fail when this error
>>> occurs? The job skips processing the exact number of events dumped in the
>>> log. For this particular example I see 987 + 4686 events were not processed
>>> and are lost for ever (does not recover even on restart).
>>>
>>>
>>> 16/11/07 21:23:39 ERROR WriteAheadLogManager  for Thread: Failed to
>>> write to write ahead log after 3 failures
>>> 16/11/07 21:23:39 WARN BatchedWriteAheadLog: BatchedWriteAheadLog Writer
>>> failed to write ArrayBuffer(Record(java.nio.HeapByteBuffer[pos=1212
>>> lim=1212 cap=1212],1478553818985,scala.concurrent.impl.Promise$Defaul
>>> tPromise@5ce88cb6), Record(
>>> java.nio.HeapByteBuffer[pos=1212 lim=1212 cap=1212],1478553818985,scala.
>>> concurrent.impl.Promise$DefaultPromise@6d8f1feb))
>>> java.lang.IllegalStateException: File exists and there is no append
>>> support!
>>>         at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(H
>>> dfsUtils.scala:35)
>>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter
>>> .org$apache$spark$streaming$util$FileBasedWriteAheadLogWrite
>>> r$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
>>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter
>>> .org$apache$spark$streaming$util$FileBasedWriteAheadLogWrite
>>> r$$stream(FileBasedWriteAheadLogWriter.scala:33)
>>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter
>>> .<init>(FileBasedWriteAheadLogWriter.scala:41)
>>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLo
>>> gWriter(FileBasedWriteAheadLog.scala:217)
>>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write
>>> (FileBasedWriteAheadLog.scala:86)
>>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write
>>> (FileBasedWriteAheadLog.scala:48)
>>>         at org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apa
>>> che$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(
>>> BatchedWriteAheadLog.scala:173)
>>>         at org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1
>>> .run(BatchedWriteAheadLog.scala:140)
>>>         at java.lang.Thread.run(Thread.java:745)
>>> 16/11/07 21:23:39 WARN ReceivedBlockTracker: Exception thrown while
>>> writing record: BlockAdditionEvent(ReceivedBlockInfo(2,Some(987
>>> ),None,WriteAheadLogBasedStoreResult(input-
>>> 2-1478553647101,Some(987),FileBasedWriteAheadLogSegment(hdfs://
>>> mycluster/EventCheckpoint-30-8-16-3/receivedData/2/log-14785
>>> 53818621-1478553878621,0,41597)))) to the WriteAheadLog.
>>> java.lang.IllegalStateException: File exists and there is no append
>>> support!
>>>         at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(H
>>> dfsUtils.scala:35)
>>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter
>>> .org$apache$spark$streaming$util$FileBasedWriteAheadLogWrite
>>> r$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
>>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter
>>> .org$apache$spark$streaming$util$FileBasedWriteAheadLogWrite
>>> r$$stream(FileBasedWriteAheadLogWriter.scala:33)
>>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter
>>> .<init>(FileBasedWriteAheadLogWriter.scala:41)
>>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLo
>>> gWriter(FileBasedWriteAheadLog.scala:217)
>>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write
>>> (FileBasedWriteAheadLog.scala:86)
>>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write
>>> (FileBasedWriteAheadLog.scala:48)
>>>         at org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apa
>>> che$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(
>>> BatchedWriteAheadLog.scala:173)
>>>         at org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1
>>> .run(BatchedWriteAheadLog.scala:140)
>>>         at java.lang.Thread.run(Thread.java:745)
>>> 16/11/07 21:23:39 WARN ReceivedBlockTracker: Exception thrown while
>>> writing record: BlockAdditionEvent(ReceivedBlockInfo(3,Some(4686
>>> ),None,WriteAheadLogBasedStoreResult(input-3-1478553647042,Some(4686),Fi
>>> leBasedWriteAheadLogSegment(hdfs:
>>> //mycluster/EventCheckpoint-30-8-16-3/receivedData/3/log-147
>>> 8553818624-1478553878624,0,197473)))) to the WriteAheadLog.
>>> java.lang.IllegalStateException: File exists and there is no append
>>> support!
>>>         at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(H
>>> dfsUtils.scala:35)
>>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter
>>> .org$apache$spark$streaming$util$FileBasedWriteAheadLogWrite
>>> r$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
>>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter
>>> .org$apache$spark$streaming$util$FileBasedWriteAheadLogWrite
>>> r$$stream(FileBasedWriteAheadLogWriter.scala:33)
>>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter
>>> .<init>(FileBasedWriteAheadLogWriter.scala:41)
>>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLo
>>> gWriter(FileBasedWriteAheadLog.scala:217)
>>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write
>>> (FileBasedWriteAheadLog.scala:86)
>>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write
>>> (FileBasedWriteAheadLog.scala:48)
>>>         at org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apa
>>> che$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(
>>> BatchedWriteAheadLog.scala:173)
>>>         at org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1
>>> .run(BatchedWriteAheadLog.scala:140)
>>>         at java.lang.Thread.run(Thread.java:745)
>>> 16/11/07 21:23:39 INFO BlockManagerInfo: Added input-3-1478553647043 in
>>> memory on 10.0.0.11:42316 (size: 283.1 KB, free: 2.6 GB)
>>>
>>> I am sure Spark Streaming is not expected to lose data when WAL is
>>> enabled. So what are we doing wrong here?
>>>
>>> Thanks, Arijit
>>>
>>>
>>
>

Re: Spark Streaming Data loss on failure to write BlockAdditionEvent failure to WAL

Posted by Arijit <Ar...@live.com>.
Hi Dirceu,


For the append issue we are setting "hdfs.append.support" (from Spark code which reads HDFS configuration) to "true" in hdfs-site.xml and that seemed to have solved the issue. Of course we are using HDFS which does support append. I think the actual configuration Spark should check is "dfs.support.append".


I believe failure is intermittent since in most cases a new file is created to store the block addition event. I need to look into the code again to see when these files are created new and when they are appended.


Thanks, Arijit


________________________________
From: Dirceu Semighini Filho <di...@gmail.com>
Sent: Thursday, November 17, 2016 6:50:28 AM
To: Arijit
Cc: Tathagata Das; user@spark.apache.org
Subject: Re: Spark Streaming Data loss on failure to write BlockAdditionEvent failure to WAL

Hi Arijit,
Have you find a solution for this? I'm facing the same problem in Spark 1.6.1, but here the error happens only a few times, so our hdfs does support append.
This is what I can see in the logs:
2016-11-17 13:43:20,012 ERROR [BatchedWriteAheadLog Writer] WriteAheadLogManager  for Thread: Failed to write to write ahead log after 3 failures




2016-11-08 14:47 GMT-02:00 Arijit <Ar...@live.com>>:

Thanks TD.


Is "hdfs.append.support" a standard configuration? I see a seemingly equivalent configuration "dfs.support.append" that is used in our version of HDFS.


In case we want to use a pseudo file-system (like S3)  which does not support append what are our options? I am not familiar with the code yet but is it possible to generate a new file whenever conflict of this sort happens?


Thanks again, Arijit

________________________________
From: Tathagata Das <ta...@gmail.com>>
Sent: Monday, November 7, 2016 7:59:06 PM
To: Arijit
Cc: user@spark.apache.org<ma...@spark.apache.org>
Subject: Re: Spark Streaming Data loss on failure to write BlockAdditionEvent failure to WAL

For WAL in Spark to work with HDFS, the HDFS version you are running must support file appends. Contact your HDFS package/installation provider to figure out whether this is supported by your HDFS installation.

On Mon, Nov 7, 2016 at 2:04 PM, Arijit <Ar...@live.com>> wrote:

Hello All,


We are using Spark 1.6.2 with WAL enabled and encountering data loss when the following exception/warning happens. We are using HDFS as our checkpoint directory.


Questions are:


1. Is this a bug in Spark or issue with our configuration? Source looks like the following. Which file already exist or who is suppose to set hdfs.append.support configuration? Why doesn't it happen all the time?


private[streaming] object HdfsUtils {

  def getOutputStream(path: String, conf: Configuration): FSDataOutputStream = {
    val dfsPath = new Path(path)
    val dfs = getFileSystemForPath(dfsPath, conf)
    // If the file exists and we have append support, append instead of creating a new file
    val stream: FSDataOutputStream = {
      if (dfs.isFile(dfsPath)) {
        if (conf.getBoolean("hdfs.append.support", false) || dfs.isInstanceOf[RawLocalFileSystem]) {
          dfs.append(dfsPath)
        } else {
          throw new IllegalStateException("File exists and there is no append support!")
        }
      } else {
        dfs.create(dfsPath)
      }
    }
    stream
  }


2. Why does the job not retry and eventually fail when this error occurs? The job skips processing the exact number of events dumped in the log. For this particular example I see 987 + 4686 events were not processed and are lost for ever (does not recover even on restart).



16/11/07 21:23:39 ERROR WriteAheadLogManager  for Thread: Failed to write to write ahead log after 3 failures
16/11/07 21:23:39 WARN BatchedWriteAheadLog: BatchedWriteAheadLog Writer failed to write ArrayBuffer(Record(java.nio.HeapByteBuffer[pos=1212 lim=1212 cap=1212],1478553818985,scala.concurrent.impl.Promise$DefaultPromise@5ce88cb6), Record(
java.nio.HeapByteBuffer[pos=1212 lim=1212 cap=1212],1478553818985,scala.concurrent.impl.Promise$DefaultPromise@6d8f1feb))
java.lang.IllegalStateException: File exists and there is no append support!
        at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org<http://org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org>$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org<http://org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org>$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream(FileBasedWriteAheadLogWriter.scala:33)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<init>(FileBasedWriteAheadLogWriter.scala:41)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:217)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:86)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:48)
        at org.apache.spark.streaming.util.BatchedWriteAheadLog.org<http://org.apache.spark.streaming.util.BatchedWriteAheadLog.org>$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:173)
        at org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:140)
        at java.lang.Thread.run(Thread.java:745)
16/11/07 21:23:39 WARN ReceivedBlockTracker: Exception thrown while writing record: BlockAdditionEvent(ReceivedBlockInfo(2,Some(987),None,WriteAheadLogBasedStoreResult(input-2-1478553647101,Some(987),FileBasedWriteAheadLogSegment(hdfs://
mycluster/EventCheckpoint-30-8-16-3/receivedData/2/log-1478553818621-1478553878621,0,41597)))) to the WriteAheadLog.
java.lang.IllegalStateException: File exists and there is no append support!
        at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org<http://org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org>$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org<http://org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org>$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream(FileBasedWriteAheadLogWriter.scala:33)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<init>(FileBasedWriteAheadLogWriter.scala:41)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:217)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:86)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:48)
        at org.apache.spark.streaming.util.BatchedWriteAheadLog.org<http://org.apache.spark.streaming.util.BatchedWriteAheadLog.org>$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:173)
        at org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:140)
        at java.lang.Thread.run(Thread.java:745)
16/11/07 21:23:39 WARN ReceivedBlockTracker: Exception thrown while writing record: BlockAdditionEvent(ReceivedBlockInfo(3,Some(4686),None,WriteAheadLogBasedStoreResult(input-3-1478553647042,Some(4686),FileBasedWriteAheadLogSegment(hdfs:
//mycluster/EventCheckpoint-30-8-16-3/receivedData/3/log-1478553818624-1478553878624,0,197473)))) to the WriteAheadLog.
java.lang.IllegalStateException: File exists and there is no append support!
        at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org<http://org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org>$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org<http://org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org>$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream(FileBasedWriteAheadLogWriter.scala:33)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<init>(FileBasedWriteAheadLogWriter.scala:41)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:217)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:86)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:48)
        at org.apache.spark.streaming.util.BatchedWriteAheadLog.org<http://org.apache.spark.streaming.util.BatchedWriteAheadLog.org>$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:173)
        at org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:140)
        at java.lang.Thread.run(Thread.java:745)
16/11/07 21:23:39 INFO BlockManagerInfo: Added input-3-1478553647043 in memory on 10.0.0.11:42316<http://10.0.0.11:42316> (size: 283.1 KB, free: 2.6 GB)

I am sure Spark Streaming is not expected to lose data when WAL is enabled. So what are we doing wrong here?

Thanks, Arijit




Re: Spark Streaming Data loss on failure to write BlockAdditionEvent failure to WAL

Posted by Dirceu Semighini Filho <di...@gmail.com>.
Hi Arijit,
Have you find a solution for this? I'm facing the same problem in Spark
1.6.1, but here the error happens only a few times, so our hdfs does
support append.
This is what I can see in the logs:
2016-11-17 13:43:20,012 ERROR [BatchedWriteAheadLog Writer]
WriteAheadLogManager  for Thread: Failed to write to write ahead log after
3 failures




2016-11-08 14:47 GMT-02:00 Arijit <Ar...@live.com>:

> Thanks TD.
>
>
> Is "hdfs.append.support" a standard configuration? I see a seemingly
> equivalent configuration "dfs.support.append" that is used in our version
> of HDFS.
>
>
> In case we want to use a pseudo file-system (like S3)  which does not
> support append what are our options? I am not familiar with the code yet
> but is it possible to generate a new file whenever conflict of this sort
> happens?
>
>
> Thanks again, Arijit
> ------------------------------
> *From:* Tathagata Das <ta...@gmail.com>
> *Sent:* Monday, November 7, 2016 7:59:06 PM
> *To:* Arijit
> *Cc:* user@spark.apache.org
> *Subject:* Re: Spark Streaming Data loss on failure to write
> BlockAdditionEvent failure to WAL
>
> For WAL in Spark to work with HDFS, the HDFS version you are running must
> support file appends. Contact your HDFS package/installation provider to
> figure out whether this is supported by your HDFS installation.
>
> On Mon, Nov 7, 2016 at 2:04 PM, Arijit <Ar...@live.com> wrote:
>
>> Hello All,
>>
>>
>> We are using Spark 1.6.2 with WAL enabled and encountering data loss when
>> the following exception/warning happens. We are using HDFS as our
>> checkpoint directory.
>>
>>
>> Questions are:
>>
>>
>> 1. Is this a bug in Spark or issue with our configuration? Source looks
>> like the following. Which file already exist or who is suppose to set
>> hdfs.append.support configuration? Why doesn't it happen all the time?
>>
>>
>> private[streaming] object HdfsUtils {
>>
>>   def getOutputStream(path: String, conf: Configuration): FSDataOutputStream = {
>>     val dfsPath = new Path(path)
>>     val dfs = getFileSystemForPath(dfsPath, conf)
>>     // If the file exists and we have append support, append instead of creating a new file
>>     val stream: FSDataOutputStream = {
>>       if (dfs.isFile(dfsPath)) {
>>         if (conf.getBoolean("hdfs.append.support", false) || dfs.isInstanceOf[RawLocalFileSystem]) {
>>           dfs.append(dfsPath)
>>         } else {
>>           throw new IllegalStateException("File exists and there is no append support!")
>>         }
>>       } else {
>>         dfs.create(dfsPath)
>>       }
>>     }
>>     stream
>>   }
>>
>>
>> 2. Why does the job not retry and eventually fail when this error occurs?
>> The job skips processing the exact number of events dumped in the log. For
>> this particular example I see 987 + 4686 events were not processed and are
>> lost for ever (does not recover even on restart).
>>
>>
>> 16/11/07 21:23:39 ERROR WriteAheadLogManager  for Thread: Failed to write
>> to write ahead log after 3 failures
>> 16/11/07 21:23:39 WARN BatchedWriteAheadLog: BatchedWriteAheadLog Writer
>> failed to write ArrayBuffer(Record(java.nio.HeapByteBuffer[pos=1212
>> lim=1212 cap=1212],1478553818985,scala.concurrent.impl.Promise$Defaul
>> tPromise@5ce88cb6), Record(
>> java.nio.HeapByteBuffer[pos=1212 lim=1212 cap=1212],1478553818985,scala.
>> concurrent.impl.Promise$DefaultPromise@6d8f1feb))
>> java.lang.IllegalStateException: File exists and there is no append
>> support!
>>         at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(
>> HdfsUtils.scala:35)
>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter
>> .org$apache$spark$streaming$util$FileBasedWriteAheadLogWrite
>> r$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter
>> .org$apache$spark$streaming$util$FileBasedWriteAheadLogWrite
>> r$$stream(FileBasedWriteAheadLogWriter.scala:33)
>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter
>> .<init>(FileBasedWriteAheadLogWriter.scala:41)
>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLo
>> gWriter(FileBasedWriteAheadLog.scala:217)
>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write
>> (FileBasedWriteAheadLog.scala:86)
>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write
>> (FileBasedWriteAheadLog.scala:48)
>>         at org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apa
>> che$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(
>> BatchedWriteAheadLog.scala:173)
>>         at org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$
>> 1.run(BatchedWriteAheadLog.scala:140)
>>         at java.lang.Thread.run(Thread.java:745)
>> 16/11/07 21:23:39 WARN ReceivedBlockTracker: Exception thrown while
>> writing record: BlockAdditionEvent(ReceivedBlockInfo(2,Some(987
>> ),None,WriteAheadLogBasedStoreResult(input-2-1478553647101,Some(987),File
>> BasedWriteAheadLogSegment(hdfs://
>> mycluster/EventCheckpoint-30-8-16-3/receivedData/2/log-14785
>> 53818621-1478553878621,0,41597)))) to the WriteAheadLog.
>> java.lang.IllegalStateException: File exists and there is no append
>> support!
>>         at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(
>> HdfsUtils.scala:35)
>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter
>> .org$apache$spark$streaming$util$FileBasedWriteAheadLogWrite
>> r$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter
>> .org$apache$spark$streaming$util$FileBasedWriteAheadLogWrite
>> r$$stream(FileBasedWriteAheadLogWriter.scala:33)
>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter
>> .<init>(FileBasedWriteAheadLogWriter.scala:41)
>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLo
>> gWriter(FileBasedWriteAheadLog.scala:217)
>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write
>> (FileBasedWriteAheadLog.scala:86)
>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write
>> (FileBasedWriteAheadLog.scala:48)
>>         at org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apa
>> che$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(
>> BatchedWriteAheadLog.scala:173)
>>         at org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$
>> 1.run(BatchedWriteAheadLog.scala:140)
>>         at java.lang.Thread.run(Thread.java:745)
>> 16/11/07 21:23:39 WARN ReceivedBlockTracker: Exception thrown while
>> writing record: BlockAdditionEvent(ReceivedBlockInfo(3,Some(4686
>> ),None,WriteAheadLogBasedStoreResult(input-3-1478553647042,Some(4686),Fi
>> leBasedWriteAheadLogSegment(hdfs:
>> //mycluster/EventCheckpoint-30-8-16-3/receivedData/3/log-147
>> 8553818624-1478553878624,0,197473)))) to the WriteAheadLog.
>> java.lang.IllegalStateException: File exists and there is no append
>> support!
>>         at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(
>> HdfsUtils.scala:35)
>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter
>> .org$apache$spark$streaming$util$FileBasedWriteAheadLogWrite
>> r$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter
>> .org$apache$spark$streaming$util$FileBasedWriteAheadLogWrite
>> r$$stream(FileBasedWriteAheadLogWriter.scala:33)
>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter
>> .<init>(FileBasedWriteAheadLogWriter.scala:41)
>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLo
>> gWriter(FileBasedWriteAheadLog.scala:217)
>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write
>> (FileBasedWriteAheadLog.scala:86)
>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write
>> (FileBasedWriteAheadLog.scala:48)
>>         at org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apa
>> che$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(
>> BatchedWriteAheadLog.scala:173)
>>         at org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$
>> 1.run(BatchedWriteAheadLog.scala:140)
>>         at java.lang.Thread.run(Thread.java:745)
>> 16/11/07 21:23:39 INFO BlockManagerInfo: Added input-3-1478553647043 in
>> memory on 10.0.0.11:42316 (size: 283.1 KB, free: 2.6 GB)
>>
>> I am sure Spark Streaming is not expected to lose data when WAL is
>> enabled. So what are we doing wrong here?
>>
>> Thanks, Arijit
>>
>>
>

Re: Spark Streaming Data loss on failure to write BlockAdditionEvent failure to WAL

Posted by Arijit <Ar...@live.com>.
Thanks TD.


Is "hdfs.append.support" a standard configuration? I see a seemingly equivalent configuration "dfs.support.append" that is used in our version of HDFS.


In case we want to use a pseudo file-system (like S3)  which does not support append what are our options? I am not familiar with the code yet but is it possible to generate a new file whenever conflict of this sort happens?


Thanks again, Arijit

________________________________
From: Tathagata Das <ta...@gmail.com>
Sent: Monday, November 7, 2016 7:59:06 PM
To: Arijit
Cc: user@spark.apache.org
Subject: Re: Spark Streaming Data loss on failure to write BlockAdditionEvent failure to WAL

For WAL in Spark to work with HDFS, the HDFS version you are running must support file appends. Contact your HDFS package/installation provider to figure out whether this is supported by your HDFS installation.

On Mon, Nov 7, 2016 at 2:04 PM, Arijit <Ar...@live.com>> wrote:

Hello All,


We are using Spark 1.6.2 with WAL enabled and encountering data loss when the following exception/warning happens. We are using HDFS as our checkpoint directory.


Questions are:


1. Is this a bug in Spark or issue with our configuration? Source looks like the following. Which file already exist or who is suppose to set hdfs.append.support configuration? Why doesn't it happen all the time?


private[streaming] object HdfsUtils {

  def getOutputStream(path: String, conf: Configuration): FSDataOutputStream = {
    val dfsPath = new Path(path)
    val dfs = getFileSystemForPath(dfsPath, conf)
    // If the file exists and we have append support, append instead of creating a new file
    val stream: FSDataOutputStream = {
      if (dfs.isFile(dfsPath)) {
        if (conf.getBoolean("hdfs.append.support", false) || dfs.isInstanceOf[RawLocalFileSystem]) {
          dfs.append(dfsPath)
        } else {
          throw new IllegalStateException("File exists and there is no append support!")
        }
      } else {
        dfs.create(dfsPath)
      }
    }
    stream
  }


2. Why does the job not retry and eventually fail when this error occurs? The job skips processing the exact number of events dumped in the log. For this particular example I see 987 + 4686 events were not processed and are lost for ever (does not recover even on restart).



16/11/07 21:23:39 ERROR WriteAheadLogManager  for Thread: Failed to write to write ahead log after 3 failures
16/11/07 21:23:39 WARN BatchedWriteAheadLog: BatchedWriteAheadLog Writer failed to write ArrayBuffer(Record(java.nio.HeapByteBuffer[pos=1212 lim=1212 cap=1212],1478553818985,scala.concurrent.impl.Promise$DefaultPromise@5ce88cb6), Record(
java.nio.HeapByteBuffer[pos=1212 lim=1212 cap=1212],1478553818985,scala.concurrent.impl.Promise$DefaultPromise@6d8f1feb))
java.lang.IllegalStateException: File exists and there is no append support!
        at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org<http://org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org>$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org<http://org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org>$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream(FileBasedWriteAheadLogWriter.scala:33)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<init>(FileBasedWriteAheadLogWriter.scala:41)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:217)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:86)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:48)
        at org.apache.spark.streaming.util.BatchedWriteAheadLog.org<http://org.apache.spark.streaming.util.BatchedWriteAheadLog.org>$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:173)
        at org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:140)
        at java.lang.Thread.run(Thread.java:745)
16/11/07 21:23:39 WARN ReceivedBlockTracker: Exception thrown while writing record: BlockAdditionEvent(ReceivedBlockInfo(2,Some(987),None,WriteAheadLogBasedStoreResult(input-2-1478553647101,Some(987),FileBasedWriteAheadLogSegment(hdfs://
mycluster/EventCheckpoint-30-8-16-3/receivedData/2/log-1478553818621-1478553878621,0,41597)))) to the WriteAheadLog.
java.lang.IllegalStateException: File exists and there is no append support!
        at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org<http://org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org>$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org<http://org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org>$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream(FileBasedWriteAheadLogWriter.scala:33)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<init>(FileBasedWriteAheadLogWriter.scala:41)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:217)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:86)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:48)
        at org.apache.spark.streaming.util.BatchedWriteAheadLog.org<http://org.apache.spark.streaming.util.BatchedWriteAheadLog.org>$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:173)
        at org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:140)
        at java.lang.Thread.run(Thread.java:745)
16/11/07 21:23:39 WARN ReceivedBlockTracker: Exception thrown while writing record: BlockAdditionEvent(ReceivedBlockInfo(3,Some(4686),None,WriteAheadLogBasedStoreResult(input-3-1478553647042,Some(4686),FileBasedWriteAheadLogSegment(hdfs:
//mycluster/EventCheckpoint-30-8-16-3/receivedData/3/log-1478553818624-1478553878624,0,197473)))) to the WriteAheadLog.
java.lang.IllegalStateException: File exists and there is no append support!
        at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org<http://org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org>$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org<http://org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org>$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream(FileBasedWriteAheadLogWriter.scala:33)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<init>(FileBasedWriteAheadLogWriter.scala:41)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:217)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:86)
        at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:48)
        at org.apache.spark.streaming.util.BatchedWriteAheadLog.org<http://org.apache.spark.streaming.util.BatchedWriteAheadLog.org>$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:173)
        at org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:140)
        at java.lang.Thread.run(Thread.java:745)
16/11/07 21:23:39 INFO BlockManagerInfo: Added input-3-1478553647043 in memory on 10.0.0.11:42316<http://10.0.0.11:42316> (size: 283.1 KB, free: 2.6 GB)

I am sure Spark Streaming is not expected to lose data when WAL is enabled. So what are we doing wrong here?

Thanks, Arijit



Re: Spark Streaming Data loss on failure to write BlockAdditionEvent failure to WAL

Posted by Tathagata Das <ta...@gmail.com>.
For WAL in Spark to work with HDFS, the HDFS version you are running must
support file appends. Contact your HDFS package/installation provider to
figure out whether this is supported by your HDFS installation.

On Mon, Nov 7, 2016 at 2:04 PM, Arijit <Ar...@live.com> wrote:

> Hello All,
>
>
> We are using Spark 1.6.2 with WAL enabled and encountering data loss when
> the following exception/warning happens. We are using HDFS as our
> checkpoint directory.
>
>
> Questions are:
>
>
> 1. Is this a bug in Spark or issue with our configuration? Source looks
> like the following. Which file already exist or who is suppose to set
> hdfs.append.support configuration? Why doesn't it happen all the time?
>
>
> private[streaming] object HdfsUtils {
>
>   def getOutputStream(path: String, conf: Configuration): FSDataOutputStream = {
>     val dfsPath = new Path(path)
>     val dfs = getFileSystemForPath(dfsPath, conf)
>     // If the file exists and we have append support, append instead of creating a new file
>     val stream: FSDataOutputStream = {
>       if (dfs.isFile(dfsPath)) {
>         if (conf.getBoolean("hdfs.append.support", false) || dfs.isInstanceOf[RawLocalFileSystem]) {
>           dfs.append(dfsPath)
>         } else {
>           throw new IllegalStateException("File exists and there is no append support!")
>         }
>       } else {
>         dfs.create(dfsPath)
>       }
>     }
>     stream
>   }
>
>
> 2. Why does the job not retry and eventually fail when this error occurs?
> The job skips processing the exact number of events dumped in the log. For
> this particular example I see 987 + 4686 events were not processed and are
> lost for ever (does not recover even on restart).
>
>
> 16/11/07 21:23:39 ERROR WriteAheadLogManager  for Thread: Failed to write
> to write ahead log after 3 failures
> 16/11/07 21:23:39 WARN BatchedWriteAheadLog: BatchedWriteAheadLog Writer
> failed to write ArrayBuffer(Record(java.nio.HeapByteBuffer[pos=1212
> lim=1212 cap=1212],1478553818985,scala.concurrent.impl.Promise$
> DefaultPromise@5ce88cb6), Record(
> java.nio.HeapByteBuffer[pos=1212 lim=1212 cap=1212],1478553818985,scala.
> concurrent.impl.Promise$DefaultPromise@6d8f1feb))
> java.lang.IllegalStateException: File exists and there is no append
> support!
>         at org.apache.spark.streaming.util.HdfsUtils$.
> getOutputStream(HdfsUtils.scala:35)
>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.
> org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$
> stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.
> org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream(
> FileBasedWriteAheadLogWriter.scala:33)
>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<
> init>(FileBasedWriteAheadLogWriter.scala:41)
>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.
> getLogWriter(FileBasedWriteAheadLog.scala:217)
>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.
> write(FileBasedWriteAheadLog.scala:86)
>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.
> write(FileBasedWriteAheadLog.scala:48)
>         at org.apache.spark.streaming.util.BatchedWriteAheadLog.org$
> apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(
> BatchedWriteAheadLog.scala:173)
>         at org.apache.spark.streaming.util.BatchedWriteAheadLog$$
> anon$1.run(BatchedWriteAheadLog.scala:140)
>         at java.lang.Thread.run(Thread.java:745)
> 16/11/07 21:23:39 WARN ReceivedBlockTracker: Exception thrown while
> writing record: BlockAdditionEvent(ReceivedBlockInfo(2,Some(987),None,
> WriteAheadLogBasedStoreResult(input-2-1478553647101,Some(987),
> FileBasedWriteAheadLogSegment(hdfs://
> mycluster/EventCheckpoint-30-8-16-3/receivedData/2/log-
> 1478553818621-1478553878621,0,41597)))) to the WriteAheadLog.
> java.lang.IllegalStateException: File exists and there is no append
> support!
>         at org.apache.spark.streaming.util.HdfsUtils$.
> getOutputStream(HdfsUtils.scala:35)
>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.
> org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$
> stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.
> org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream(
> FileBasedWriteAheadLogWriter.scala:33)
>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<
> init>(FileBasedWriteAheadLogWriter.scala:41)
>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.
> getLogWriter(FileBasedWriteAheadLog.scala:217)
>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.
> write(FileBasedWriteAheadLog.scala:86)
>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.
> write(FileBasedWriteAheadLog.scala:48)
>         at org.apache.spark.streaming.util.BatchedWriteAheadLog.org$
> apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(
> BatchedWriteAheadLog.scala:173)
>         at org.apache.spark.streaming.util.BatchedWriteAheadLog$$
> anon$1.run(BatchedWriteAheadLog.scala:140)
>         at java.lang.Thread.run(Thread.java:745)
> 16/11/07 21:23:39 WARN ReceivedBlockTracker: Exception thrown while
> writing record: BlockAdditionEvent(ReceivedBlockInfo(3,Some(4686),None,
> WriteAheadLogBasedStoreResult(input-3-1478553647042,Some(4686),
> FileBasedWriteAheadLogSegment(hdfs:
> //mycluster/EventCheckpoint-30-8-16-3/receivedData/3/log-
> 1478553818624-1478553878624,0,197473)))) to the WriteAheadLog.
> java.lang.IllegalStateException: File exists and there is no append
> support!
>         at org.apache.spark.streaming.util.HdfsUtils$.
> getOutputStream(HdfsUtils.scala:35)
>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.
> org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$
> stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.
> org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream(
> FileBasedWriteAheadLogWriter.scala:33)
>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<
> init>(FileBasedWriteAheadLogWriter.scala:41)
>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.
> getLogWriter(FileBasedWriteAheadLog.scala:217)
>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.
> write(FileBasedWriteAheadLog.scala:86)
>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.
> write(FileBasedWriteAheadLog.scala:48)
>         at org.apache.spark.streaming.util.BatchedWriteAheadLog.org$
> apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(
> BatchedWriteAheadLog.scala:173)
>         at org.apache.spark.streaming.util.BatchedWriteAheadLog$$
> anon$1.run(BatchedWriteAheadLog.scala:140)
>         at java.lang.Thread.run(Thread.java:745)
> 16/11/07 21:23:39 INFO BlockManagerInfo: Added input-3-1478553647043 in
> memory on 10.0.0.11:42316 (size: 283.1 KB, free: 2.6 GB)
>
> I am sure Spark Streaming is not expected to lose data when WAL is
> enabled. So what are we doing wrong here?
>
> Thanks, Arijit
>
>