You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Sam Elamin <hu...@gmail.com> on 2017/02/07 16:35:08 UTC

Structured Streaming. Dropping Duplicates

Hi All

When trying to read a stream off S3 and I try and drop duplicates I get the
following error:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Append
output mode not supported when there are streaming aggregations on
streaming DataFrames/DataSets;;


Whats strange if I use the batch "spark.read.json", it works

Can I assume you cant drop duplicates in structured streaming

Regards
Sam

Re: Structured Streaming. Dropping Duplicates

Posted by Sam Elamin <hu...@gmail.com>.
Ignore me, a bit more digging and I was able to find the filesink source
<https://github.com/apache/spark/blob/1ae4652b7e1f77a984b8459c778cb06c814192c5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala>

Following that pattern worked a treat!

Thanks again Micheal :)

On Tue, Feb 7, 2017 at 8:44 PM, Sam Elamin <hu...@gmail.com> wrote:

> Sorry those are methods I wrote so you can ignore them :)
>
> so just adding a path parameter tells spark thats where the update log is?
>
> Do I check for the unique id there and identify which batch was written
> and which weren't
>
> Are there any examples of this out there? there aren't much connectors in
> the wild which I can reimplement is there
> Should I look at how the file sink is set up and follow that pattern?
>
>
> Regards
> Sam
>
> On Tue, Feb 7, 2017 at 8:40 PM, Michael Armbrust <mi...@databricks.com>
> wrote:
>
>> The JSON log is only used by the file sink (which it doesn't seem like
>> you are using).  Though, I'm not sure exactly what is going on inside of
>> setupGoogle or how tableReferenceSource is used.
>>
>> Typically you would run df.writeStream.option("path", "/my/path")... and
>> then the transaction log would go into /my/path/_spark_metadata.
>>
>> There is not requirement that a sink uses this kind of a update log.
>> This is just how we get better transactional semantics than HDFS is
>> providing.  If your sink supports transactions natively you should just use
>> those instead.  We pass a unique ID to the sink method addBatch so that you
>> can make sure you don't commit the same transaction more than once.
>>
>> On Tue, Feb 7, 2017 at 3:29 PM, Sam Elamin <hu...@gmail.com>
>> wrote:
>>
>>> Hi Micheal
>>>
>>> If thats the case for the below example, where should i be reading these
>>> json log files first? im assuming sometime between df and query?
>>>
>>>
>>> val df = spark
>>>     .readStream
>>>     .option("tableReferenceSource",tableName)
>>>     .load()
>>> setUpGoogle(spark.sqlContext)
>>>
>>> val query = df
>>>   .writeStream
>>>   .option("tableReferenceSink",tableName2)
>>>   .option("checkpointLocation","checkpoint")
>>>   .start()
>>>
>>>
>>> On Tue, Feb 7, 2017 at 7:24 PM, Michael Armbrust <michael@databricks.com
>>> > wrote:
>>>
>>>> Read the JSON log of files that is in `/your/path/_spark_metadata` and
>>>> only read files that are present in that log (ignore anything else).
>>>>
>>>> On Tue, Feb 7, 2017 at 1:16 PM, Sam Elamin <hu...@gmail.com>
>>>> wrote:
>>>>
>>>>> Ah I see ok so probably it's the retry that's causing it
>>>>>
>>>>> So when you say I'll have to take this into account, how do I best do
>>>>> that? My sink will have to know what was that extra file. And i was under
>>>>> the impression spark would automagically know this because of the
>>>>> checkpoint directory set when you created the writestream
>>>>>
>>>>> If that's not the case then how would I go about ensuring no
>>>>> duplicates?
>>>>>
>>>>>
>>>>> Thanks again for the awesome support!
>>>>>
>>>>> Regards
>>>>> Sam
>>>>> On Tue, 7 Feb 2017 at 18:05, Michael Armbrust <mi...@databricks.com>
>>>>> wrote:
>>>>>
>>>>>> Sorry, I think I was a little unclear.  There are two things at play
>>>>>> here.
>>>>>>
>>>>>>  - Exactly-once semantics with file output: spark writes out extra
>>>>>> metadata on which files are valid to ensure that failures don't cause us to
>>>>>> "double count" any of the input.  Spark 2.0+ detects this info
>>>>>> automatically when you use dataframe reader (spark.read...). There may be
>>>>>> extra files, but they will be ignored. If you are consuming the output with
>>>>>> another system you'll have to take this into account.
>>>>>>  - Retries: right now we always retry the last batch when
>>>>>> restarting.  This is safe/correct because of the above, but we could also
>>>>>> optimize this away by tracking more information about batch progress.
>>>>>>
>>>>>> On Tue, Feb 7, 2017 at 12:25 PM, Sam Elamin <hu...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> Hmm ok I understand that but the job is running for a good few mins
>>>>>> before I kill it so there should not be any jobs left because I can see in
>>>>>> the log that its now polling for new changes, the latest offset is the
>>>>>> right one
>>>>>>
>>>>>> After I kill it and relaunch it picks up that same file?
>>>>>>
>>>>>>
>>>>>> Sorry if I misunderstood you
>>>>>>
>>>>>> On Tue, Feb 7, 2017 at 5:20 PM, Michael Armbrust <
>>>>>> michael@databricks.com> wrote:
>>>>>>
>>>>>> It is always possible that there will be extra jobs from failed
>>>>>> batches. However, for the file sink, only one set of files will make it
>>>>>> into _spark_metadata directory log.  This is how we get atomic commits even
>>>>>> when there are files in more than one directory.  When reading the files
>>>>>> with Spark, we'll detect this directory and use it instead of listStatus to
>>>>>> find the list of valid files.
>>>>>>
>>>>>> On Tue, Feb 7, 2017 at 9:05 AM, Sam Elamin <hu...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> On another note, when it comes to checkpointing on structured
>>>>>> streaming
>>>>>>
>>>>>> I noticed if I have  a stream running off s3 and I kill the process.
>>>>>> The next time the process starts running it dulplicates the last record
>>>>>> inserted. is that normal?
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> So say I have streaming enabled on one folder "test" which only has
>>>>>> two files "update1" and "update 2", then I kill the spark job using Ctrl+C.
>>>>>> When I rerun the stream it picks up "update 2" again
>>>>>>
>>>>>> Is this normal? isnt ctrl+c a failure?
>>>>>>
>>>>>> I would expect checkpointing to know that update 2 was already
>>>>>> processed
>>>>>>
>>>>>> Regards
>>>>>> Sam
>>>>>>
>>>>>> On Tue, Feb 7, 2017 at 4:58 PM, Sam Elamin <hu...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> Thanks Micheal!
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Feb 7, 2017 at 4:49 PM, Michael Armbrust <
>>>>>> michael@databricks.com> wrote:
>>>>>>
>>>>>> Here a JIRA: https://issues.apache.org/jira/browse/SPARK-19497
>>>>>>
>>>>>> We should add this soon.
>>>>>>
>>>>>> On Tue, Feb 7, 2017 at 8:35 AM, Sam Elamin <hu...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> Hi All
>>>>>>
>>>>>> When trying to read a stream off S3 and I try and drop duplicates I
>>>>>> get the following error:
>>>>>>
>>>>>> Exception in thread "main" org.apache.spark.sql.AnalysisException:
>>>>>> Append output mode not supported when there are streaming aggregations on
>>>>>> streaming DataFrames/DataSets;;
>>>>>>
>>>>>>
>>>>>> Whats strange if I use the batch "spark.read.json", it works
>>>>>>
>>>>>> Can I assume you cant drop duplicates in structured streaming
>>>>>>
>>>>>> Regards
>>>>>> Sam
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>
>>>
>>
>

Re: Structured Streaming. Dropping Duplicates

Posted by Sam Elamin <hu...@gmail.com>.
Sorry those are methods I wrote so you can ignore them :)

so just adding a path parameter tells spark thats where the update log is?

Do I check for the unique id there and identify which batch was written and
which weren't

Are there any examples of this out there? there aren't much connectors in
the wild which I can reimplement is there
Should I look at how the file sink is set up and follow that pattern?


Regards
Sam

On Tue, Feb 7, 2017 at 8:40 PM, Michael Armbrust <mi...@databricks.com>
wrote:

> The JSON log is only used by the file sink (which it doesn't seem like you
> are using).  Though, I'm not sure exactly what is going on inside of
> setupGoogle or how tableReferenceSource is used.
>
> Typically you would run df.writeStream.option("path", "/my/path")... and
> then the transaction log would go into /my/path/_spark_metadata.
>
> There is not requirement that a sink uses this kind of a update log.  This
> is just how we get better transactional semantics than HDFS is providing.
> If your sink supports transactions natively you should just use those
> instead.  We pass a unique ID to the sink method addBatch so that you can
> make sure you don't commit the same transaction more than once.
>
> On Tue, Feb 7, 2017 at 3:29 PM, Sam Elamin <hu...@gmail.com>
> wrote:
>
>> Hi Micheal
>>
>> If thats the case for the below example, where should i be reading these
>> json log files first? im assuming sometime between df and query?
>>
>>
>> val df = spark
>>     .readStream
>>     .option("tableReferenceSource",tableName)
>>     .load()
>> setUpGoogle(spark.sqlContext)
>>
>> val query = df
>>   .writeStream
>>   .option("tableReferenceSink",tableName2)
>>   .option("checkpointLocation","checkpoint")
>>   .start()
>>
>>
>> On Tue, Feb 7, 2017 at 7:24 PM, Michael Armbrust <mi...@databricks.com>
>> wrote:
>>
>>> Read the JSON log of files that is in `/your/path/_spark_metadata` and
>>> only read files that are present in that log (ignore anything else).
>>>
>>> On Tue, Feb 7, 2017 at 1:16 PM, Sam Elamin <hu...@gmail.com>
>>> wrote:
>>>
>>>> Ah I see ok so probably it's the retry that's causing it
>>>>
>>>> So when you say I'll have to take this into account, how do I best do
>>>> that? My sink will have to know what was that extra file. And i was under
>>>> the impression spark would automagically know this because of the
>>>> checkpoint directory set when you created the writestream
>>>>
>>>> If that's not the case then how would I go about ensuring no duplicates?
>>>>
>>>>
>>>> Thanks again for the awesome support!
>>>>
>>>> Regards
>>>> Sam
>>>> On Tue, 7 Feb 2017 at 18:05, Michael Armbrust <mi...@databricks.com>
>>>> wrote:
>>>>
>>>>> Sorry, I think I was a little unclear.  There are two things at play
>>>>> here.
>>>>>
>>>>>  - Exactly-once semantics with file output: spark writes out extra
>>>>> metadata on which files are valid to ensure that failures don't cause us to
>>>>> "double count" any of the input.  Spark 2.0+ detects this info
>>>>> automatically when you use dataframe reader (spark.read...). There may be
>>>>> extra files, but they will be ignored. If you are consuming the output with
>>>>> another system you'll have to take this into account.
>>>>>  - Retries: right now we always retry the last batch when restarting.
>>>>> This is safe/correct because of the above, but we could also optimize this
>>>>> away by tracking more information about batch progress.
>>>>>
>>>>> On Tue, Feb 7, 2017 at 12:25 PM, Sam Elamin <hu...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> Hmm ok I understand that but the job is running for a good few mins
>>>>> before I kill it so there should not be any jobs left because I can see in
>>>>> the log that its now polling for new changes, the latest offset is the
>>>>> right one
>>>>>
>>>>> After I kill it and relaunch it picks up that same file?
>>>>>
>>>>>
>>>>> Sorry if I misunderstood you
>>>>>
>>>>> On Tue, Feb 7, 2017 at 5:20 PM, Michael Armbrust <
>>>>> michael@databricks.com> wrote:
>>>>>
>>>>> It is always possible that there will be extra jobs from failed
>>>>> batches. However, for the file sink, only one set of files will make it
>>>>> into _spark_metadata directory log.  This is how we get atomic commits even
>>>>> when there are files in more than one directory.  When reading the files
>>>>> with Spark, we'll detect this directory and use it instead of listStatus to
>>>>> find the list of valid files.
>>>>>
>>>>> On Tue, Feb 7, 2017 at 9:05 AM, Sam Elamin <hu...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> On another note, when it comes to checkpointing on structured streaming
>>>>>
>>>>> I noticed if I have  a stream running off s3 and I kill the process.
>>>>> The next time the process starts running it dulplicates the last record
>>>>> inserted. is that normal?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> So say I have streaming enabled on one folder "test" which only has
>>>>> two files "update1" and "update 2", then I kill the spark job using Ctrl+C.
>>>>> When I rerun the stream it picks up "update 2" again
>>>>>
>>>>> Is this normal? isnt ctrl+c a failure?
>>>>>
>>>>> I would expect checkpointing to know that update 2 was already
>>>>> processed
>>>>>
>>>>> Regards
>>>>> Sam
>>>>>
>>>>> On Tue, Feb 7, 2017 at 4:58 PM, Sam Elamin <hu...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> Thanks Micheal!
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Feb 7, 2017 at 4:49 PM, Michael Armbrust <
>>>>> michael@databricks.com> wrote:
>>>>>
>>>>> Here a JIRA: https://issues.apache.org/jira/browse/SPARK-19497
>>>>>
>>>>> We should add this soon.
>>>>>
>>>>> On Tue, Feb 7, 2017 at 8:35 AM, Sam Elamin <hu...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> Hi All
>>>>>
>>>>> When trying to read a stream off S3 and I try and drop duplicates I
>>>>> get the following error:
>>>>>
>>>>> Exception in thread "main" org.apache.spark.sql.AnalysisException:
>>>>> Append output mode not supported when there are streaming aggregations on
>>>>> streaming DataFrames/DataSets;;
>>>>>
>>>>>
>>>>> Whats strange if I use the batch "spark.read.json", it works
>>>>>
>>>>> Can I assume you cant drop duplicates in structured streaming
>>>>>
>>>>> Regards
>>>>> Sam
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>
>>
>

Re: Structured Streaming. Dropping Duplicates

Posted by Michael Armbrust <mi...@databricks.com>.
The JSON log is only used by the file sink (which it doesn't seem like you
are using).  Though, I'm not sure exactly what is going on inside of
setupGoogle or how tableReferenceSource is used.

Typically you would run df.writeStream.option("path", "/my/path")... and
then the transaction log would go into /my/path/_spark_metadata.

There is not requirement that a sink uses this kind of a update log.  This
is just how we get better transactional semantics than HDFS is providing.
If your sink supports transactions natively you should just use those
instead.  We pass a unique ID to the sink method addBatch so that you can
make sure you don't commit the same transaction more than once.

On Tue, Feb 7, 2017 at 3:29 PM, Sam Elamin <hu...@gmail.com> wrote:

> Hi Micheal
>
> If thats the case for the below example, where should i be reading these
> json log files first? im assuming sometime between df and query?
>
>
> val df = spark
>     .readStream
>     .option("tableReferenceSource",tableName)
>     .load()
> setUpGoogle(spark.sqlContext)
>
> val query = df
>   .writeStream
>   .option("tableReferenceSink",tableName2)
>   .option("checkpointLocation","checkpoint")
>   .start()
>
>
> On Tue, Feb 7, 2017 at 7:24 PM, Michael Armbrust <mi...@databricks.com>
> wrote:
>
>> Read the JSON log of files that is in `/your/path/_spark_metadata` and
>> only read files that are present in that log (ignore anything else).
>>
>> On Tue, Feb 7, 2017 at 1:16 PM, Sam Elamin <hu...@gmail.com>
>> wrote:
>>
>>> Ah I see ok so probably it's the retry that's causing it
>>>
>>> So when you say I'll have to take this into account, how do I best do
>>> that? My sink will have to know what was that extra file. And i was under
>>> the impression spark would automagically know this because of the
>>> checkpoint directory set when you created the writestream
>>>
>>> If that's not the case then how would I go about ensuring no duplicates?
>>>
>>>
>>> Thanks again for the awesome support!
>>>
>>> Regards
>>> Sam
>>> On Tue, 7 Feb 2017 at 18:05, Michael Armbrust <mi...@databricks.com>
>>> wrote:
>>>
>>>> Sorry, I think I was a little unclear.  There are two things at play
>>>> here.
>>>>
>>>>  - Exactly-once semantics with file output: spark writes out extra
>>>> metadata on which files are valid to ensure that failures don't cause us to
>>>> "double count" any of the input.  Spark 2.0+ detects this info
>>>> automatically when you use dataframe reader (spark.read...). There may be
>>>> extra files, but they will be ignored. If you are consuming the output with
>>>> another system you'll have to take this into account.
>>>>  - Retries: right now we always retry the last batch when restarting.
>>>> This is safe/correct because of the above, but we could also optimize this
>>>> away by tracking more information about batch progress.
>>>>
>>>> On Tue, Feb 7, 2017 at 12:25 PM, Sam Elamin <hu...@gmail.com>
>>>> wrote:
>>>>
>>>> Hmm ok I understand that but the job is running for a good few mins
>>>> before I kill it so there should not be any jobs left because I can see in
>>>> the log that its now polling for new changes, the latest offset is the
>>>> right one
>>>>
>>>> After I kill it and relaunch it picks up that same file?
>>>>
>>>>
>>>> Sorry if I misunderstood you
>>>>
>>>> On Tue, Feb 7, 2017 at 5:20 PM, Michael Armbrust <
>>>> michael@databricks.com> wrote:
>>>>
>>>> It is always possible that there will be extra jobs from failed
>>>> batches. However, for the file sink, only one set of files will make it
>>>> into _spark_metadata directory log.  This is how we get atomic commits even
>>>> when there are files in more than one directory.  When reading the files
>>>> with Spark, we'll detect this directory and use it instead of listStatus to
>>>> find the list of valid files.
>>>>
>>>> On Tue, Feb 7, 2017 at 9:05 AM, Sam Elamin <hu...@gmail.com>
>>>> wrote:
>>>>
>>>> On another note, when it comes to checkpointing on structured streaming
>>>>
>>>> I noticed if I have  a stream running off s3 and I kill the process.
>>>> The next time the process starts running it dulplicates the last record
>>>> inserted. is that normal?
>>>>
>>>>
>>>>
>>>>
>>>> So say I have streaming enabled on one folder "test" which only has two
>>>> files "update1" and "update 2", then I kill the spark job using Ctrl+C.
>>>> When I rerun the stream it picks up "update 2" again
>>>>
>>>> Is this normal? isnt ctrl+c a failure?
>>>>
>>>> I would expect checkpointing to know that update 2 was already processed
>>>>
>>>> Regards
>>>> Sam
>>>>
>>>> On Tue, Feb 7, 2017 at 4:58 PM, Sam Elamin <hu...@gmail.com>
>>>> wrote:
>>>>
>>>> Thanks Micheal!
>>>>
>>>>
>>>>
>>>> On Tue, Feb 7, 2017 at 4:49 PM, Michael Armbrust <
>>>> michael@databricks.com> wrote:
>>>>
>>>> Here a JIRA: https://issues.apache.org/jira/browse/SPARK-19497
>>>>
>>>> We should add this soon.
>>>>
>>>> On Tue, Feb 7, 2017 at 8:35 AM, Sam Elamin <hu...@gmail.com>
>>>> wrote:
>>>>
>>>> Hi All
>>>>
>>>> When trying to read a stream off S3 and I try and drop duplicates I get
>>>> the following error:
>>>>
>>>> Exception in thread "main" org.apache.spark.sql.AnalysisException:
>>>> Append output mode not supported when there are streaming aggregations on
>>>> streaming DataFrames/DataSets;;
>>>>
>>>>
>>>> Whats strange if I use the batch "spark.read.json", it works
>>>>
>>>> Can I assume you cant drop duplicates in structured streaming
>>>>
>>>> Regards
>>>> Sam
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>
>

Re: Structured Streaming. Dropping Duplicates

Posted by Sam Elamin <hu...@gmail.com>.
Hi Micheal

If thats the case for the below example, where should i be reading these
json log files first? im assuming sometime between df and query?


val df = spark
    .readStream
    .option("tableReferenceSource",tableName)
    .load()
setUpGoogle(spark.sqlContext)

val query = df
  .writeStream
  .option("tableReferenceSink",tableName2)
  .option("checkpointLocation","checkpoint")
  .start()


On Tue, Feb 7, 2017 at 7:24 PM, Michael Armbrust <mi...@databricks.com>
wrote:

> Read the JSON log of files that is in `/your/path/_spark_metadata` and
> only read files that are present in that log (ignore anything else).
>
> On Tue, Feb 7, 2017 at 1:16 PM, Sam Elamin <hu...@gmail.com>
> wrote:
>
>> Ah I see ok so probably it's the retry that's causing it
>>
>> So when you say I'll have to take this into account, how do I best do
>> that? My sink will have to know what was that extra file. And i was under
>> the impression spark would automagically know this because of the
>> checkpoint directory set when you created the writestream
>>
>> If that's not the case then how would I go about ensuring no duplicates?
>>
>>
>> Thanks again for the awesome support!
>>
>> Regards
>> Sam
>> On Tue, 7 Feb 2017 at 18:05, Michael Armbrust <mi...@databricks.com>
>> wrote:
>>
>>> Sorry, I think I was a little unclear.  There are two things at play
>>> here.
>>>
>>>  - Exactly-once semantics with file output: spark writes out extra
>>> metadata on which files are valid to ensure that failures don't cause us to
>>> "double count" any of the input.  Spark 2.0+ detects this info
>>> automatically when you use dataframe reader (spark.read...). There may be
>>> extra files, but they will be ignored. If you are consuming the output with
>>> another system you'll have to take this into account.
>>>  - Retries: right now we always retry the last batch when restarting.
>>> This is safe/correct because of the above, but we could also optimize this
>>> away by tracking more information about batch progress.
>>>
>>> On Tue, Feb 7, 2017 at 12:25 PM, Sam Elamin <hu...@gmail.com>
>>> wrote:
>>>
>>> Hmm ok I understand that but the job is running for a good few mins
>>> before I kill it so there should not be any jobs left because I can see in
>>> the log that its now polling for new changes, the latest offset is the
>>> right one
>>>
>>> After I kill it and relaunch it picks up that same file?
>>>
>>>
>>> Sorry if I misunderstood you
>>>
>>> On Tue, Feb 7, 2017 at 5:20 PM, Michael Armbrust <michael@databricks.com
>>> > wrote:
>>>
>>> It is always possible that there will be extra jobs from failed batches.
>>> However, for the file sink, only one set of files will make it into
>>> _spark_metadata directory log.  This is how we get atomic commits even when
>>> there are files in more than one directory.  When reading the files with
>>> Spark, we'll detect this directory and use it instead of listStatus to find
>>> the list of valid files.
>>>
>>> On Tue, Feb 7, 2017 at 9:05 AM, Sam Elamin <hu...@gmail.com>
>>> wrote:
>>>
>>> On another note, when it comes to checkpointing on structured streaming
>>>
>>> I noticed if I have  a stream running off s3 and I kill the process. The
>>> next time the process starts running it dulplicates the last record
>>> inserted. is that normal?
>>>
>>>
>>>
>>>
>>> So say I have streaming enabled on one folder "test" which only has two
>>> files "update1" and "update 2", then I kill the spark job using Ctrl+C.
>>> When I rerun the stream it picks up "update 2" again
>>>
>>> Is this normal? isnt ctrl+c a failure?
>>>
>>> I would expect checkpointing to know that update 2 was already processed
>>>
>>> Regards
>>> Sam
>>>
>>> On Tue, Feb 7, 2017 at 4:58 PM, Sam Elamin <hu...@gmail.com>
>>> wrote:
>>>
>>> Thanks Micheal!
>>>
>>>
>>>
>>> On Tue, Feb 7, 2017 at 4:49 PM, Michael Armbrust <michael@databricks.com
>>> > wrote:
>>>
>>> Here a JIRA: https://issues.apache.org/jira/browse/SPARK-19497
>>>
>>> We should add this soon.
>>>
>>> On Tue, Feb 7, 2017 at 8:35 AM, Sam Elamin <hu...@gmail.com>
>>> wrote:
>>>
>>> Hi All
>>>
>>> When trying to read a stream off S3 and I try and drop duplicates I get
>>> the following error:
>>>
>>> Exception in thread "main" org.apache.spark.sql.AnalysisException:
>>> Append output mode not supported when there are streaming aggregations on
>>> streaming DataFrames/DataSets;;
>>>
>>>
>>> Whats strange if I use the batch "spark.read.json", it works
>>>
>>> Can I assume you cant drop duplicates in structured streaming
>>>
>>> Regards
>>> Sam
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>

Re: Structured Streaming. Dropping Duplicates

Posted by Michael Armbrust <mi...@databricks.com>.
Read the JSON log of files that is in `/your/path/_spark_metadata` and only
read files that are present in that log (ignore anything else).

On Tue, Feb 7, 2017 at 1:16 PM, Sam Elamin <hu...@gmail.com> wrote:

> Ah I see ok so probably it's the retry that's causing it
>
> So when you say I'll have to take this into account, how do I best do
> that? My sink will have to know what was that extra file. And i was under
> the impression spark would automagically know this because of the
> checkpoint directory set when you created the writestream
>
> If that's not the case then how would I go about ensuring no duplicates?
>
>
> Thanks again for the awesome support!
>
> Regards
> Sam
> On Tue, 7 Feb 2017 at 18:05, Michael Armbrust <mi...@databricks.com>
> wrote:
>
>> Sorry, I think I was a little unclear.  There are two things at play here.
>>
>>  - Exactly-once semantics with file output: spark writes out extra
>> metadata on which files are valid to ensure that failures don't cause us to
>> "double count" any of the input.  Spark 2.0+ detects this info
>> automatically when you use dataframe reader (spark.read...). There may be
>> extra files, but they will be ignored. If you are consuming the output with
>> another system you'll have to take this into account.
>>  - Retries: right now we always retry the last batch when restarting.
>> This is safe/correct because of the above, but we could also optimize this
>> away by tracking more information about batch progress.
>>
>> On Tue, Feb 7, 2017 at 12:25 PM, Sam Elamin <hu...@gmail.com>
>> wrote:
>>
>> Hmm ok I understand that but the job is running for a good few mins
>> before I kill it so there should not be any jobs left because I can see in
>> the log that its now polling for new changes, the latest offset is the
>> right one
>>
>> After I kill it and relaunch it picks up that same file?
>>
>>
>> Sorry if I misunderstood you
>>
>> On Tue, Feb 7, 2017 at 5:20 PM, Michael Armbrust <mi...@databricks.com>
>> wrote:
>>
>> It is always possible that there will be extra jobs from failed batches.
>> However, for the file sink, only one set of files will make it into
>> _spark_metadata directory log.  This is how we get atomic commits even when
>> there are files in more than one directory.  When reading the files with
>> Spark, we'll detect this directory and use it instead of listStatus to find
>> the list of valid files.
>>
>> On Tue, Feb 7, 2017 at 9:05 AM, Sam Elamin <hu...@gmail.com>
>> wrote:
>>
>> On another note, when it comes to checkpointing on structured streaming
>>
>> I noticed if I have  a stream running off s3 and I kill the process. The
>> next time the process starts running it dulplicates the last record
>> inserted. is that normal?
>>
>>
>>
>>
>> So say I have streaming enabled on one folder "test" which only has two
>> files "update1" and "update 2", then I kill the spark job using Ctrl+C.
>> When I rerun the stream it picks up "update 2" again
>>
>> Is this normal? isnt ctrl+c a failure?
>>
>> I would expect checkpointing to know that update 2 was already processed
>>
>> Regards
>> Sam
>>
>> On Tue, Feb 7, 2017 at 4:58 PM, Sam Elamin <hu...@gmail.com>
>> wrote:
>>
>> Thanks Micheal!
>>
>>
>>
>> On Tue, Feb 7, 2017 at 4:49 PM, Michael Armbrust <mi...@databricks.com>
>> wrote:
>>
>> Here a JIRA: https://issues.apache.org/jira/browse/SPARK-19497
>>
>> We should add this soon.
>>
>> On Tue, Feb 7, 2017 at 8:35 AM, Sam Elamin <hu...@gmail.com>
>> wrote:
>>
>> Hi All
>>
>> When trying to read a stream off S3 and I try and drop duplicates I get
>> the following error:
>>
>> Exception in thread "main" org.apache.spark.sql.AnalysisException:
>> Append output mode not supported when there are streaming aggregations on
>> streaming DataFrames/DataSets;;
>>
>>
>> Whats strange if I use the batch "spark.read.json", it works
>>
>> Can I assume you cant drop duplicates in structured streaming
>>
>> Regards
>> Sam
>>
>>
>>
>>
>>
>>
>>
>>

Re: Structured Streaming. Dropping Duplicates

Posted by Sam Elamin <hu...@gmail.com>.
Ah I see ok so probably it's the retry that's causing it

So when you say I'll have to take this into account, how do I best do that?
My sink will have to know what was that extra file. And i was under the
impression spark would automagically know this because of the checkpoint
directory set when you created the writestream

If that's not the case then how would I go about ensuring no duplicates?


Thanks again for the awesome support!

Regards
Sam
On Tue, 7 Feb 2017 at 18:05, Michael Armbrust <mi...@databricks.com>
wrote:

> Sorry, I think I was a little unclear.  There are two things at play here.
>
>  - Exactly-once semantics with file output: spark writes out extra
> metadata on which files are valid to ensure that failures don't cause us to
> "double count" any of the input.  Spark 2.0+ detects this info
> automatically when you use dataframe reader (spark.read...). There may be
> extra files, but they will be ignored. If you are consuming the output with
> another system you'll have to take this into account.
>  - Retries: right now we always retry the last batch when restarting.
> This is safe/correct because of the above, but we could also optimize this
> away by tracking more information about batch progress.
>
> On Tue, Feb 7, 2017 at 12:25 PM, Sam Elamin <hu...@gmail.com>
> wrote:
>
> Hmm ok I understand that but the job is running for a good few mins before
> I kill it so there should not be any jobs left because I can see in the log
> that its now polling for new changes, the latest offset is the right one
>
> After I kill it and relaunch it picks up that same file?
>
>
> Sorry if I misunderstood you
>
> On Tue, Feb 7, 2017 at 5:20 PM, Michael Armbrust <mi...@databricks.com>
> wrote:
>
> It is always possible that there will be extra jobs from failed batches.
> However, for the file sink, only one set of files will make it into
> _spark_metadata directory log.  This is how we get atomic commits even when
> there are files in more than one directory.  When reading the files with
> Spark, we'll detect this directory and use it instead of listStatus to find
> the list of valid files.
>
> On Tue, Feb 7, 2017 at 9:05 AM, Sam Elamin <hu...@gmail.com>
> wrote:
>
> On another note, when it comes to checkpointing on structured streaming
>
> I noticed if I have  a stream running off s3 and I kill the process. The
> next time the process starts running it dulplicates the last record
> inserted. is that normal?
>
>
>
>
> So say I have streaming enabled on one folder "test" which only has two
> files "update1" and "update 2", then I kill the spark job using Ctrl+C.
> When I rerun the stream it picks up "update 2" again
>
> Is this normal? isnt ctrl+c a failure?
>
> I would expect checkpointing to know that update 2 was already processed
>
> Regards
> Sam
>
> On Tue, Feb 7, 2017 at 4:58 PM, Sam Elamin <hu...@gmail.com>
> wrote:
>
> Thanks Micheal!
>
>
>
> On Tue, Feb 7, 2017 at 4:49 PM, Michael Armbrust <mi...@databricks.com>
> wrote:
>
> Here a JIRA: https://issues.apache.org/jira/browse/SPARK-19497
>
> We should add this soon.
>
> On Tue, Feb 7, 2017 at 8:35 AM, Sam Elamin <hu...@gmail.com>
> wrote:
>
> Hi All
>
> When trying to read a stream off S3 and I try and drop duplicates I get
> the following error:
>
> Exception in thread "main" org.apache.spark.sql.AnalysisException: Append
> output mode not supported when there are streaming aggregations on
> streaming DataFrames/DataSets;;
>
>
> Whats strange if I use the batch "spark.read.json", it works
>
> Can I assume you cant drop duplicates in structured streaming
>
> Regards
> Sam
>
>
>
>
>
>
>
>

Re: Structured Streaming. Dropping Duplicates

Posted by Michael Armbrust <mi...@databricks.com>.
Sorry, I think I was a little unclear.  There are two things at play here.

 - Exactly-once semantics with file output: spark writes out extra metadata
on which files are valid to ensure that failures don't cause us to "double
count" any of the input.  Spark 2.0+ detects this info automatically when
you use dataframe reader (spark.read...). There may be extra files, but
they will be ignored. If you are consuming the output with another system
you'll have to take this into account.
 - Retries: right now we always retry the last batch when restarting.  This
is safe/correct because of the above, but we could also optimize this away
by tracking more information about batch progress.

On Tue, Feb 7, 2017 at 12:25 PM, Sam Elamin <hu...@gmail.com> wrote:

> Hmm ok I understand that but the job is running for a good few mins before
> I kill it so there should not be any jobs left because I can see in the log
> that its now polling for new changes, the latest offset is the right one
>
> After I kill it and relaunch it picks up that same file?
>
>
> Sorry if I misunderstood you
>
> On Tue, Feb 7, 2017 at 5:20 PM, Michael Armbrust <mi...@databricks.com>
> wrote:
>
>> It is always possible that there will be extra jobs from failed batches.
>> However, for the file sink, only one set of files will make it into
>> _spark_metadata directory log.  This is how we get atomic commits even when
>> there are files in more than one directory.  When reading the files with
>> Spark, we'll detect this directory and use it instead of listStatus to find
>> the list of valid files.
>>
>> On Tue, Feb 7, 2017 at 9:05 AM, Sam Elamin <hu...@gmail.com>
>> wrote:
>>
>>> On another note, when it comes to checkpointing on structured streaming
>>>
>>> I noticed if I have  a stream running off s3 and I kill the process. The
>>> next time the process starts running it dulplicates the last record
>>> inserted. is that normal?
>>>
>>>
>>>
>>>
>>> So say I have streaming enabled on one folder "test" which only has two
>>> files "update1" and "update 2", then I kill the spark job using Ctrl+C.
>>> When I rerun the stream it picks up "update 2" again
>>>
>>> Is this normal? isnt ctrl+c a failure?
>>>
>>> I would expect checkpointing to know that update 2 was already processed
>>>
>>> Regards
>>> Sam
>>>
>>> On Tue, Feb 7, 2017 at 4:58 PM, Sam Elamin <hu...@gmail.com>
>>> wrote:
>>>
>>>> Thanks Micheal!
>>>>
>>>>
>>>>
>>>> On Tue, Feb 7, 2017 at 4:49 PM, Michael Armbrust <
>>>> michael@databricks.com> wrote:
>>>>
>>>>> Here a JIRA: https://issues.apache.org/jira/browse/SPARK-19497
>>>>>
>>>>> We should add this soon.
>>>>>
>>>>> On Tue, Feb 7, 2017 at 8:35 AM, Sam Elamin <hu...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi All
>>>>>>
>>>>>> When trying to read a stream off S3 and I try and drop duplicates I
>>>>>> get the following error:
>>>>>>
>>>>>> Exception in thread "main" org.apache.spark.sql.AnalysisException:
>>>>>> Append output mode not supported when there are streaming aggregations on
>>>>>> streaming DataFrames/DataSets;;
>>>>>>
>>>>>>
>>>>>> Whats strange if I use the batch "spark.read.json", it works
>>>>>>
>>>>>> Can I assume you cant drop duplicates in structured streaming
>>>>>>
>>>>>> Regards
>>>>>> Sam
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Structured Streaming. Dropping Duplicates

Posted by Sam Elamin <hu...@gmail.com>.
Hmm ok I understand that but the job is running for a good few mins before
I kill it so there should not be any jobs left because I can see in the log
that its now polling for new changes, the latest offset is the right one

After I kill it and relaunch it picks up that same file?


Sorry if I misunderstood you

On Tue, Feb 7, 2017 at 5:20 PM, Michael Armbrust <mi...@databricks.com>
wrote:

> It is always possible that there will be extra jobs from failed batches.
> However, for the file sink, only one set of files will make it into
> _spark_metadata directory log.  This is how we get atomic commits even when
> there are files in more than one directory.  When reading the files with
> Spark, we'll detect this directory and use it instead of listStatus to find
> the list of valid files.
>
> On Tue, Feb 7, 2017 at 9:05 AM, Sam Elamin <hu...@gmail.com>
> wrote:
>
>> On another note, when it comes to checkpointing on structured streaming
>>
>> I noticed if I have  a stream running off s3 and I kill the process. The
>> next time the process starts running it dulplicates the last record
>> inserted. is that normal?
>>
>>
>>
>>
>> So say I have streaming enabled on one folder "test" which only has two
>> files "update1" and "update 2", then I kill the spark job using Ctrl+C.
>> When I rerun the stream it picks up "update 2" again
>>
>> Is this normal? isnt ctrl+c a failure?
>>
>> I would expect checkpointing to know that update 2 was already processed
>>
>> Regards
>> Sam
>>
>> On Tue, Feb 7, 2017 at 4:58 PM, Sam Elamin <hu...@gmail.com>
>> wrote:
>>
>>> Thanks Micheal!
>>>
>>>
>>>
>>> On Tue, Feb 7, 2017 at 4:49 PM, Michael Armbrust <michael@databricks.com
>>> > wrote:
>>>
>>>> Here a JIRA: https://issues.apache.org/jira/browse/SPARK-19497
>>>>
>>>> We should add this soon.
>>>>
>>>> On Tue, Feb 7, 2017 at 8:35 AM, Sam Elamin <hu...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi All
>>>>>
>>>>> When trying to read a stream off S3 and I try and drop duplicates I
>>>>> get the following error:
>>>>>
>>>>> Exception in thread "main" org.apache.spark.sql.AnalysisException:
>>>>> Append output mode not supported when there are streaming aggregations on
>>>>> streaming DataFrames/DataSets;;
>>>>>
>>>>>
>>>>> Whats strange if I use the batch "spark.read.json", it works
>>>>>
>>>>> Can I assume you cant drop duplicates in structured streaming
>>>>>
>>>>> Regards
>>>>> Sam
>>>>>
>>>>
>>>>
>>>
>>
>

Re: Structured Streaming. Dropping Duplicates

Posted by Michael Armbrust <mi...@databricks.com>.
It is always possible that there will be extra jobs from failed batches.
However, for the file sink, only one set of files will make it into
_spark_metadata directory log.  This is how we get atomic commits even when
there are files in more than one directory.  When reading the files with
Spark, we'll detect this directory and use it instead of listStatus to find
the list of valid files.

On Tue, Feb 7, 2017 at 9:05 AM, Sam Elamin <hu...@gmail.com> wrote:

> On another note, when it comes to checkpointing on structured streaming
>
> I noticed if I have  a stream running off s3 and I kill the process. The
> next time the process starts running it dulplicates the last record
> inserted. is that normal?
>
>
>
>
> So say I have streaming enabled on one folder "test" which only has two
> files "update1" and "update 2", then I kill the spark job using Ctrl+C.
> When I rerun the stream it picks up "update 2" again
>
> Is this normal? isnt ctrl+c a failure?
>
> I would expect checkpointing to know that update 2 was already processed
>
> Regards
> Sam
>
> On Tue, Feb 7, 2017 at 4:58 PM, Sam Elamin <hu...@gmail.com>
> wrote:
>
>> Thanks Micheal!
>>
>>
>>
>> On Tue, Feb 7, 2017 at 4:49 PM, Michael Armbrust <mi...@databricks.com>
>> wrote:
>>
>>> Here a JIRA: https://issues.apache.org/jira/browse/SPARK-19497
>>>
>>> We should add this soon.
>>>
>>> On Tue, Feb 7, 2017 at 8:35 AM, Sam Elamin <hu...@gmail.com>
>>> wrote:
>>>
>>>> Hi All
>>>>
>>>> When trying to read a stream off S3 and I try and drop duplicates I get
>>>> the following error:
>>>>
>>>> Exception in thread "main" org.apache.spark.sql.AnalysisException:
>>>> Append output mode not supported when there are streaming aggregations on
>>>> streaming DataFrames/DataSets;;
>>>>
>>>>
>>>> Whats strange if I use the batch "spark.read.json", it works
>>>>
>>>> Can I assume you cant drop duplicates in structured streaming
>>>>
>>>> Regards
>>>> Sam
>>>>
>>>
>>>
>>
>

Re: Structured Streaming. Dropping Duplicates

Posted by Sam Elamin <hu...@gmail.com>.
On another note, when it comes to checkpointing on structured streaming

I noticed if I have  a stream running off s3 and I kill the process. The
next time the process starts running it dulplicates the last record
inserted. is that normal?




So say I have streaming enabled on one folder "test" which only has two
files "update1" and "update 2", then I kill the spark job using Ctrl+C.
When I rerun the stream it picks up "update 2" again

Is this normal? isnt ctrl+c a failure?

I would expect checkpointing to know that update 2 was already processed

Regards
Sam

On Tue, Feb 7, 2017 at 4:58 PM, Sam Elamin <hu...@gmail.com> wrote:

> Thanks Micheal!
>
>
>
> On Tue, Feb 7, 2017 at 4:49 PM, Michael Armbrust <mi...@databricks.com>
> wrote:
>
>> Here a JIRA: https://issues.apache.org/jira/browse/SPARK-19497
>>
>> We should add this soon.
>>
>> On Tue, Feb 7, 2017 at 8:35 AM, Sam Elamin <hu...@gmail.com>
>> wrote:
>>
>>> Hi All
>>>
>>> When trying to read a stream off S3 and I try and drop duplicates I get
>>> the following error:
>>>
>>> Exception in thread "main" org.apache.spark.sql.AnalysisException:
>>> Append output mode not supported when there are streaming aggregations on
>>> streaming DataFrames/DataSets;;
>>>
>>>
>>> Whats strange if I use the batch "spark.read.json", it works
>>>
>>> Can I assume you cant drop duplicates in structured streaming
>>>
>>> Regards
>>> Sam
>>>
>>
>>
>

Re: Structured Streaming. Dropping Duplicates

Posted by Sam Elamin <hu...@gmail.com>.
Thanks Micheal!



On Tue, Feb 7, 2017 at 4:49 PM, Michael Armbrust <mi...@databricks.com>
wrote:

> Here a JIRA: https://issues.apache.org/jira/browse/SPARK-19497
>
> We should add this soon.
>
> On Tue, Feb 7, 2017 at 8:35 AM, Sam Elamin <hu...@gmail.com>
> wrote:
>
>> Hi All
>>
>> When trying to read a stream off S3 and I try and drop duplicates I get
>> the following error:
>>
>> Exception in thread "main" org.apache.spark.sql.AnalysisException:
>> Append output mode not supported when there are streaming aggregations on
>> streaming DataFrames/DataSets;;
>>
>>
>> Whats strange if I use the batch "spark.read.json", it works
>>
>> Can I assume you cant drop duplicates in structured streaming
>>
>> Regards
>> Sam
>>
>
>

Re: Structured Streaming. Dropping Duplicates

Posted by Michael Armbrust <mi...@databricks.com>.
Here a JIRA: https://issues.apache.org/jira/browse/SPARK-19497

We should add this soon.

On Tue, Feb 7, 2017 at 8:35 AM, Sam Elamin <hu...@gmail.com> wrote:

> Hi All
>
> When trying to read a stream off S3 and I try and drop duplicates I get
> the following error:
>
> Exception in thread "main" org.apache.spark.sql.AnalysisException: Append
> output mode not supported when there are streaming aggregations on
> streaming DataFrames/DataSets;;
>
>
> Whats strange if I use the batch "spark.read.json", it works
>
> Can I assume you cant drop duplicates in structured streaming
>
> Regards
> Sam
>