You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Hemant Bhanawat <he...@gmail.com> on 2018/04/13 06:12:30 UTC

Sorting on a streaming dataframe

Hi Guys,

Why is sorting on streaming dataframes not supported(unless it is complete
mode)? My downstream needs me to sort the streaming dataframe.

Hemant

Re: Sorting on a streaming dataframe

Posted by Hemant Bhanawat <he...@gmail.com>.
Opened an issue. https://issues.apache.org/jira/browse/SPARK-24144

Since it is a Major issue for us, I have marked it as Major issue. Feel
free to change if that is not the case from Spark's perspective.

On Tue, May 1, 2018 at 4:34 AM, Michael Armbrust <mi...@databricks.com>
wrote:

> Please open a JIRA then!
>
> On Fri, Apr 27, 2018 at 3:59 AM Hemant Bhanawat <he...@gmail.com>
> wrote:
>
>> I see.
>>
>> monotonically_increasing_id on streaming dataFrames will be really
>> helpful to me and I believe to many more users. Adding this functionality
>> in Spark would be efficient in terms of performance as compared to
>> implementing this functionality inside the applications.
>>
>> Hemant
>>
>> On Thu, Apr 26, 2018 at 11:59 PM, Michael Armbrust <
>> michael@databricks.com> wrote:
>>
>>> The basic tenet of structured streaming is that a query should return
>>> the same answer in streaming or batch mode. We support sorting in complete
>>> mode because we have all the data and can sort it correctly and return the
>>> full answer.  In update or append mode, sorting would only return a correct
>>> answer if we could promise that records that sort lower are going to arrive
>>> later (and we can't).  Therefore, it is disallowed.
>>>
>>> If you are just looking for a unique, stable id and you are already
>>> using kafka as the source, you could just combine the partition id and the
>>> offset. The structured streaming connector to Kafka
>>> <https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html>
>>> exposes both of these in the schema of the streaming DataFrame. (similarly
>>> for kinesis you can use the shard id and sequence number)
>>>
>>> If you need the IDs to be contiguous, then this is a somewhat
>>> fundamentally hard problem.  I think the best we could do is add support
>>> for monotonically_increasing_id() in streaming dataframes.
>>>
>>> On Tue, Apr 24, 2018 at 1:38 PM, Chayapan Khannabha <ch...@gmail.com>
>>> wrote:
>>>
>>>> Perhaps your use case fits to Apache Kafka better.
>>>>
>>>> More info at:
>>>> https://kafka.apache.org/documentation/streams/
>>>>
>>>> Everything really comes down to the architecture design and algorithm
>>>> spec. However, from my experience with Spark, there are many good reasons
>>>> why this requirement is not supported ;)
>>>>
>>>> Best,
>>>>
>>>> Chayapan (A)
>>>>
>>>>
>>>> On Apr 24, 2018, at 2:18 PM, Hemant Bhanawat <he...@gmail.com>
>>>> wrote:
>>>>
>>>> Thanks Chris. There are many ways in which I can solve this problem but
>>>> they are cumbersome. The easiest way would have been to sort the streaming
>>>> dataframe. The reason I asked this question is because I could not find a
>>>> reason why sorting on streaming dataframe is disallowed.
>>>>
>>>> Hemant
>>>>
>>>> On Mon, Apr 16, 2018 at 6:09 PM, Bowden, Chris <
>>>> chris.bowden@microfocus.com> wrote:
>>>>
>>>>> You can happily sort the underlying RDD of InternalRow(s) inside a
>>>>> sink, assuming you are willing to implement and maintain your own sink(s).
>>>>> That is, just grabbing the parquet sink, etc. isn’t going to work out of
>>>>> the box. Alternatively map/flatMapGroupsWithState is probably sufficient
>>>>> and requires less working knowledge to make effective reuse of internals.
>>>>> Just group by foo and then sort accordingly and assign ids. The id counter
>>>>> can be stateful per group. Sometimes this problem may not need to be solved
>>>>> at all. For example, if you are using kafka, a proper partitioning scheme
>>>>> and message offsets may be “good enough”.
>>>>> ------------------------------
>>>>> *From:* Hemant Bhanawat <he...@gmail.com>
>>>>> *Sent:* Thursday, April 12, 2018 11:42:59 PM
>>>>> *To:* Reynold Xin
>>>>> *Cc:* dev
>>>>> *Subject:* Re: Sorting on a streaming dataframe
>>>>>
>>>>> Well, we want to assign snapshot ids (incrementing counters) to the
>>>>> incoming records. For that, we are zipping the streaming rdds with that
>>>>> counter using a modified version of ZippedWithIndexRDD. We are ok if the
>>>>> records in the streaming dataframe gets counters in random order but the
>>>>> counter should always be incrementing.
>>>>>
>>>>> This is working fine until we have a failure. When we have a failure,
>>>>> we re-assign the records to snapshot ids  and this time same snapshot id
>>>>> can get assigned to a different record. This is a problem because the
>>>>> primary key in our storage engine is <recordid, snapshotid>. So we want to
>>>>> sort the dataframe so that the records always get the same snapshot id.
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Apr 13, 2018 at 11:43 AM, Reynold Xin <rx...@databricks.com>
>>>>> wrote:
>>>>>
>>>>> Can you describe your use case more?
>>>>>
>>>>> On Thu, Apr 12, 2018 at 11:12 PM Hemant Bhanawat <he...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> Hi Guys,
>>>>>
>>>>> Why is sorting on streaming dataframes not supported(unless it is
>>>>> complete mode)? My downstream needs me to sort the streaming dataframe.
>>>>>
>>>>> Hemant
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>

Re: Sorting on a streaming dataframe

Posted by Michael Armbrust <mi...@databricks.com>.
Please open a JIRA then!

On Fri, Apr 27, 2018 at 3:59 AM Hemant Bhanawat <he...@gmail.com>
wrote:

> I see.
>
> monotonically_increasing_id on streaming dataFrames will be really helpful
> to me and I believe to many more users. Adding this functionality in Spark
> would be efficient in terms of performance as compared to implementing this
> functionality inside the applications.
>
> Hemant
>
> On Thu, Apr 26, 2018 at 11:59 PM, Michael Armbrust <michael@databricks.com
> > wrote:
>
>> The basic tenet of structured streaming is that a query should return the
>> same answer in streaming or batch mode. We support sorting in complete mode
>> because we have all the data and can sort it correctly and return the full
>> answer.  In update or append mode, sorting would only return a correct
>> answer if we could promise that records that sort lower are going to arrive
>> later (and we can't).  Therefore, it is disallowed.
>>
>> If you are just looking for a unique, stable id and you are already using
>> kafka as the source, you could just combine the partition id and the
>> offset. The structured streaming connector to Kafka
>> <https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html>
>> exposes both of these in the schema of the streaming DataFrame. (similarly
>> for kinesis you can use the shard id and sequence number)
>>
>> If you need the IDs to be contiguous, then this is a somewhat
>> fundamentally hard problem.  I think the best we could do is add support
>> for monotonically_increasing_id() in streaming dataframes.
>>
>> On Tue, Apr 24, 2018 at 1:38 PM, Chayapan Khannabha <ch...@gmail.com>
>> wrote:
>>
>>> Perhaps your use case fits to Apache Kafka better.
>>>
>>> More info at:
>>> https://kafka.apache.org/documentation/streams/
>>>
>>> Everything really comes down to the architecture design and algorithm
>>> spec. However, from my experience with Spark, there are many good reasons
>>> why this requirement is not supported ;)
>>>
>>> Best,
>>>
>>> Chayapan (A)
>>>
>>>
>>> On Apr 24, 2018, at 2:18 PM, Hemant Bhanawat <he...@gmail.com>
>>> wrote:
>>>
>>> Thanks Chris. There are many ways in which I can solve this problem but
>>> they are cumbersome. The easiest way would have been to sort the streaming
>>> dataframe. The reason I asked this question is because I could not find a
>>> reason why sorting on streaming dataframe is disallowed.
>>>
>>> Hemant
>>>
>>> On Mon, Apr 16, 2018 at 6:09 PM, Bowden, Chris <
>>> chris.bowden@microfocus.com> wrote:
>>>
>>>> You can happily sort the underlying RDD of InternalRow(s) inside a
>>>> sink, assuming you are willing to implement and maintain your own sink(s).
>>>> That is, just grabbing the parquet sink, etc. isn’t going to work out of
>>>> the box. Alternatively map/flatMapGroupsWithState is probably sufficient
>>>> and requires less working knowledge to make effective reuse of internals.
>>>> Just group by foo and then sort accordingly and assign ids. The id counter
>>>> can be stateful per group. Sometimes this problem may not need to be solved
>>>> at all. For example, if you are using kafka, a proper partitioning scheme
>>>> and message offsets may be “good enough”.
>>>> ------------------------------
>>>> *From:* Hemant Bhanawat <he...@gmail.com>
>>>> *Sent:* Thursday, April 12, 2018 11:42:59 PM
>>>> *To:* Reynold Xin
>>>> *Cc:* dev
>>>> *Subject:* Re: Sorting on a streaming dataframe
>>>>
>>>> Well, we want to assign snapshot ids (incrementing counters) to the
>>>> incoming records. For that, we are zipping the streaming rdds with that
>>>> counter using a modified version of ZippedWithIndexRDD. We are ok if the
>>>> records in the streaming dataframe gets counters in random order but the
>>>> counter should always be incrementing.
>>>>
>>>> This is working fine until we have a failure. When we have a failure,
>>>> we re-assign the records to snapshot ids  and this time same snapshot id
>>>> can get assigned to a different record. This is a problem because the
>>>> primary key in our storage engine is <recordid, snapshotid>. So we want to
>>>> sort the dataframe so that the records always get the same snapshot id.
>>>>
>>>>
>>>>
>>>> On Fri, Apr 13, 2018 at 11:43 AM, Reynold Xin <rx...@databricks.com>
>>>> wrote:
>>>>
>>>> Can you describe your use case more?
>>>>
>>>> On Thu, Apr 12, 2018 at 11:12 PM Hemant Bhanawat <he...@gmail.com>
>>>> wrote:
>>>>
>>>> Hi Guys,
>>>>
>>>> Why is sorting on streaming dataframes not supported(unless it is
>>>> complete mode)? My downstream needs me to sort the streaming dataframe.
>>>>
>>>> Hemant
>>>>
>>>>
>>>>
>>>
>>>
>>
>

Re: Sorting on a streaming dataframe

Posted by Hemant Bhanawat <he...@gmail.com>.
I see.

monotonically_increasing_id on streaming dataFrames will be really helpful
to me and I believe to many more users. Adding this functionality in Spark
would be efficient in terms of performance as compared to implementing this
functionality inside the applications.

Hemant

On Thu, Apr 26, 2018 at 11:59 PM, Michael Armbrust <mi...@databricks.com>
wrote:

> The basic tenet of structured streaming is that a query should return the
> same answer in streaming or batch mode. We support sorting in complete mode
> because we have all the data and can sort it correctly and return the full
> answer.  In update or append mode, sorting would only return a correct
> answer if we could promise that records that sort lower are going to arrive
> later (and we can't).  Therefore, it is disallowed.
>
> If you are just looking for a unique, stable id and you are already using
> kafka as the source, you could just combine the partition id and the
> offset. The structured streaming connector to Kafka
> <https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html>
> exposes both of these in the schema of the streaming DataFrame. (similarly
> for kinesis you can use the shard id and sequence number)
>
> If you need the IDs to be contiguous, then this is a somewhat
> fundamentally hard problem.  I think the best we could do is add support
> for monotonically_increasing_id() in streaming dataframes.
>
> On Tue, Apr 24, 2018 at 1:38 PM, Chayapan Khannabha <ch...@gmail.com>
> wrote:
>
>> Perhaps your use case fits to Apache Kafka better.
>>
>> More info at:
>> https://kafka.apache.org/documentation/streams/
>>
>> Everything really comes down to the architecture design and algorithm
>> spec. However, from my experience with Spark, there are many good reasons
>> why this requirement is not supported ;)
>>
>> Best,
>>
>> Chayapan (A)
>>
>>
>> On Apr 24, 2018, at 2:18 PM, Hemant Bhanawat <he...@gmail.com>
>> wrote:
>>
>> Thanks Chris. There are many ways in which I can solve this problem but
>> they are cumbersome. The easiest way would have been to sort the streaming
>> dataframe. The reason I asked this question is because I could not find a
>> reason why sorting on streaming dataframe is disallowed.
>>
>> Hemant
>>
>> On Mon, Apr 16, 2018 at 6:09 PM, Bowden, Chris <
>> chris.bowden@microfocus.com> wrote:
>>
>>> You can happily sort the underlying RDD of InternalRow(s) inside a sink,
>>> assuming you are willing to implement and maintain your own sink(s). That
>>> is, just grabbing the parquet sink, etc. isn’t going to work out of the
>>> box. Alternatively map/flatMapGroupsWithState is probably sufficient and
>>> requires less working knowledge to make effective reuse of internals. Just
>>> group by foo and then sort accordingly and assign ids. The id counter can
>>> be stateful per group. Sometimes this problem may not need to be solved at
>>> all. For example, if you are using kafka, a proper partitioning scheme and
>>> message offsets may be “good enough”.
>>> ------------------------------
>>> *From:* Hemant Bhanawat <he...@gmail.com>
>>> *Sent:* Thursday, April 12, 2018 11:42:59 PM
>>> *To:* Reynold Xin
>>> *Cc:* dev
>>> *Subject:* Re: Sorting on a streaming dataframe
>>>
>>> Well, we want to assign snapshot ids (incrementing counters) to the
>>> incoming records. For that, we are zipping the streaming rdds with that
>>> counter using a modified version of ZippedWithIndexRDD. We are ok if the
>>> records in the streaming dataframe gets counters in random order but the
>>> counter should always be incrementing.
>>>
>>> This is working fine until we have a failure. When we have a failure, we
>>> re-assign the records to snapshot ids  and this time same snapshot id can
>>> get assigned to a different record. This is a problem because the primary
>>> key in our storage engine is <recordid, snapshotid>. So we want to sort the
>>> dataframe so that the records always get the same snapshot id.
>>>
>>>
>>>
>>> On Fri, Apr 13, 2018 at 11:43 AM, Reynold Xin <rx...@databricks.com>
>>> wrote:
>>>
>>> Can you describe your use case more?
>>>
>>> On Thu, Apr 12, 2018 at 11:12 PM Hemant Bhanawat <he...@gmail.com>
>>> wrote:
>>>
>>> Hi Guys,
>>>
>>> Why is sorting on streaming dataframes not supported(unless it is
>>> complete mode)? My downstream needs me to sort the streaming dataframe.
>>>
>>> Hemant
>>>
>>>
>>>
>>
>>
>

Re: Sorting on a streaming dataframe

Posted by Michael Armbrust <mi...@databricks.com>.
The basic tenet of structured streaming is that a query should return the
same answer in streaming or batch mode. We support sorting in complete mode
because we have all the data and can sort it correctly and return the full
answer.  In update or append mode, sorting would only return a correct
answer if we could promise that records that sort lower are going to arrive
later (and we can't).  Therefore, it is disallowed.

If you are just looking for a unique, stable id and you are already using
kafka as the source, you could just combine the partition id and the
offset. The structured streaming connector to Kafka
<https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html>
exposes both of these in the schema of the streaming DataFrame. (similarly
for kinesis you can use the shard id and sequence number)

If you need the IDs to be contiguous, then this is a somewhat fundamentally
hard problem.  I think the best we could do is add support
for monotonically_increasing_id() in streaming dataframes.

On Tue, Apr 24, 2018 at 1:38 PM, Chayapan Khannabha <ch...@gmail.com>
wrote:

> Perhaps your use case fits to Apache Kafka better.
>
> More info at:
> https://kafka.apache.org/documentation/streams/
>
> Everything really comes down to the architecture design and algorithm
> spec. However, from my experience with Spark, there are many good reasons
> why this requirement is not supported ;)
>
> Best,
>
> Chayapan (A)
>
>
> On Apr 24, 2018, at 2:18 PM, Hemant Bhanawat <he...@gmail.com> wrote:
>
> Thanks Chris. There are many ways in which I can solve this problem but
> they are cumbersome. The easiest way would have been to sort the streaming
> dataframe. The reason I asked this question is because I could not find a
> reason why sorting on streaming dataframe is disallowed.
>
> Hemant
>
> On Mon, Apr 16, 2018 at 6:09 PM, Bowden, Chris <
> chris.bowden@microfocus.com> wrote:
>
>> You can happily sort the underlying RDD of InternalRow(s) inside a sink,
>> assuming you are willing to implement and maintain your own sink(s). That
>> is, just grabbing the parquet sink, etc. isn’t going to work out of the
>> box. Alternatively map/flatMapGroupsWithState is probably sufficient and
>> requires less working knowledge to make effective reuse of internals. Just
>> group by foo and then sort accordingly and assign ids. The id counter can
>> be stateful per group. Sometimes this problem may not need to be solved at
>> all. For example, if you are using kafka, a proper partitioning scheme and
>> message offsets may be “good enough”.
>> ------------------------------
>> *From:* Hemant Bhanawat <he...@gmail.com>
>> *Sent:* Thursday, April 12, 2018 11:42:59 PM
>> *To:* Reynold Xin
>> *Cc:* dev
>> *Subject:* Re: Sorting on a streaming dataframe
>>
>> Well, we want to assign snapshot ids (incrementing counters) to the
>> incoming records. For that, we are zipping the streaming rdds with that
>> counter using a modified version of ZippedWithIndexRDD. We are ok if the
>> records in the streaming dataframe gets counters in random order but the
>> counter should always be incrementing.
>>
>> This is working fine until we have a failure. When we have a failure, we
>> re-assign the records to snapshot ids  and this time same snapshot id can
>> get assigned to a different record. This is a problem because the primary
>> key in our storage engine is <recordid, snapshotid>. So we want to sort the
>> dataframe so that the records always get the same snapshot id.
>>
>>
>>
>> On Fri, Apr 13, 2018 at 11:43 AM, Reynold Xin <rx...@databricks.com>
>> wrote:
>>
>> Can you describe your use case more?
>>
>> On Thu, Apr 12, 2018 at 11:12 PM Hemant Bhanawat <he...@gmail.com>
>> wrote:
>>
>> Hi Guys,
>>
>> Why is sorting on streaming dataframes not supported(unless it is
>> complete mode)? My downstream needs me to sort the streaming dataframe.
>>
>> Hemant
>>
>>
>>
>
>

Re: Sorting on a streaming dataframe

Posted by Chayapan Khannabha <ch...@gmail.com>.
Perhaps your use case fits to Apache Kafka better.

More info at:
https://kafka.apache.org/documentation/streams/ <https://kafka.apache.org/documentation/streams/>

Everything really comes down to the architecture design and algorithm spec. However, from my experience with Spark, there are many good reasons why this requirement is not supported ;)

Best,

Chayapan (A)


> On Apr 24, 2018, at 2:18 PM, Hemant Bhanawat <he...@gmail.com> wrote:
> 
> Thanks Chris. There are many ways in which I can solve this problem but they are cumbersome. The easiest way would have been to sort the streaming dataframe. The reason I asked this question is because I could not find a reason why sorting on streaming dataframe is disallowed. 
> 
> Hemant
> 
> On Mon, Apr 16, 2018 at 6:09 PM, Bowden, Chris <chris.bowden@microfocus.com <ma...@microfocus.com>> wrote:
> You can happily sort the underlying RDD of InternalRow(s) inside a sink, assuming you are willing to implement and maintain your own sink(s). That is, just grabbing the parquet sink, etc. isn’t going to work out of the box. Alternatively map/flatMapGroupsWithState is probably sufficient and requires less working knowledge to make effective reuse of internals. Just group by foo and then sort accordingly and assign ids. The id counter can be stateful per group. Sometimes this problem may not need to be solved at all. For example, if you are using kafka, a proper partitioning scheme and message offsets may be “good enough”. 
> From: Hemant Bhanawat <hemant9379@gmail.com <ma...@gmail.com>>
> Sent: Thursday, April 12, 2018 11:42:59 PM
> To: Reynold Xin
> Cc: dev
> Subject: Re: Sorting on a streaming dataframe
>  
> Well, we want to assign snapshot ids (incrementing counters) to the incoming records. For that, we are zipping the streaming rdds with that counter using a modified version of ZippedWithIndexRDD. We are ok if the records in the streaming dataframe gets counters in random order but the counter should always be incrementing. 
> 
> This is working fine until we have a failure. When we have a failure, we re-assign the records to snapshot ids  and this time same snapshot id can get assigned to a different record. This is a problem because the primary key in our storage engine is <recordid, snapshotid>. So we want to sort the dataframe so that the records always get the same snapshot id. 
> 
> 
> 
> On Fri, Apr 13, 2018 at 11:43 AM, Reynold Xin <rxin@databricks.com <ma...@databricks.com>> wrote:
> Can you describe your use case more?
> 
> On Thu, Apr 12, 2018 at 11:12 PM Hemant Bhanawat <hemant9379@gmail.com <ma...@gmail.com>> wrote:
> Hi Guys, 
> 
> Why is sorting on streaming dataframes not supported(unless it is complete mode)? My downstream needs me to sort the streaming dataframe.
> 
> Hemant 
> 
> 


Re: Sorting on a streaming dataframe

Posted by Arun Mahadevan <ar...@apache.org>.
I guess sorting would make sense only when you have the complete data set. In streaming you don’t know what record is coming next so doesn’t make sense to sort it (except in the aggregated complete output mode where the entire result table is emitted each time and the results can be sorted).

Thanks,
Arun

From:  Hemant Bhanawat <he...@gmail.com>
Date:  Tuesday, April 24, 2018 at 12:18 AM
To:  "Bowden, Chris" <ch...@microfocus.com>
Cc:  Reynold Xin <rx...@databricks.com>, dev <de...@spark.apache.org>
Subject:  Re: Sorting on a streaming dataframe

Thanks Chris. There are many ways in which I can solve this problem but they are cumbersome. The easiest way would have been to sort the streaming dataframe. The reason I asked this question is because I could not find a reason why sorting on streaming dataframe is disallowed. 

Hemant

On Mon, Apr 16, 2018 at 6:09 PM, Bowden, Chris <ch...@microfocus.com> wrote:
You can happily sort the underlying RDD of InternalRow(s) inside a sink, assuming you are willing to implement and maintain your own sink(s). That is, just grabbing the parquet sink, etc. isn’t going to work out of the box. Alternatively map/flatMapGroupsWithState is probably sufficient and requires less working knowledge to make effective reuse of internals. Just group by foo and then sort accordingly and assign ids. The id counter can be stateful per group. Sometimes this problem may not need to be solved at all. For example, if you are using kafka, a proper partitioning scheme and message offsets may be “good enough”. From: Hemant Bhanawat <he...@gmail.com>
Sent: Thursday, April 12, 2018 11:42:59 PM
To: Reynold Xin
Cc: dev
Subject: Re: Sorting on a streaming dataframe
 
Well, we want to assign snapshot ids (incrementing counters) to the incoming records. For that, we are zipping the streaming rdds with that counter using a modified version of ZippedWithIndexRDD. We are ok if the records in the streaming dataframe gets counters in random order but the counter should always be incrementing. 

This is working fine until we have a failure. When we have a failure, we re-assign the records to snapshot ids  and this time same snapshot id can get assigned to a different record. This is a problem because the primary key in our storage engine is <recordid, snapshotid>. So we want to sort the dataframe so that the records always get the same snapshot id. 



On Fri, Apr 13, 2018 at 11:43 AM, Reynold Xin <rx...@databricks.com> wrote:
Can you describe your use case more?

On Thu, Apr 12, 2018 at 11:12 PM Hemant Bhanawat <he...@gmail.com> wrote:
Hi Guys, 

Why is sorting on streaming dataframes not supported(unless it is complete mode)? My downstream needs me to sort the streaming dataframe.

Hemant 




Re: Sorting on a streaming dataframe

Posted by Hemant Bhanawat <he...@gmail.com>.
Thanks Chris. There are many ways in which I can solve this problem but
they are cumbersome. The easiest way would have been to sort the streaming
dataframe. The reason I asked this question is because I could not find a
reason why sorting on streaming dataframe is disallowed.

Hemant

On Mon, Apr 16, 2018 at 6:09 PM, Bowden, Chris <ch...@microfocus.com>
wrote:

> You can happily sort the underlying RDD of InternalRow(s) inside a sink,
> assuming you are willing to implement and maintain your own sink(s). That
> is, just grabbing the parquet sink, etc. isn’t going to work out of the
> box. Alternatively map/flatMapGroupsWithState is probably sufficient and
> requires less working knowledge to make effective reuse of internals. Just
> group by foo and then sort accordingly and assign ids. The id counter can
> be stateful per group. Sometimes this problem may not need to be solved at
> all. For example, if you are using kafka, a proper partitioning scheme and
> message offsets may be “good enough”.
> ------------------------------
> *From:* Hemant Bhanawat <he...@gmail.com>
> *Sent:* Thursday, April 12, 2018 11:42:59 PM
> *To:* Reynold Xin
> *Cc:* dev
> *Subject:* Re: Sorting on a streaming dataframe
>
> Well, we want to assign snapshot ids (incrementing counters) to the
> incoming records. For that, we are zipping the streaming rdds with that
> counter using a modified version of ZippedWithIndexRDD. We are ok if the
> records in the streaming dataframe gets counters in random order but the
> counter should always be incrementing.
>
> This is working fine until we have a failure. When we have a failure, we
> re-assign the records to snapshot ids  and this time same snapshot id can
> get assigned to a different record. This is a problem because the primary
> key in our storage engine is <recordid, snapshotid>. So we want to sort the
> dataframe so that the records always get the same snapshot id.
>
>
>
> On Fri, Apr 13, 2018 at 11:43 AM, Reynold Xin <rx...@databricks.com> wrote:
>
> Can you describe your use case more?
>
> On Thu, Apr 12, 2018 at 11:12 PM Hemant Bhanawat <he...@gmail.com>
> wrote:
>
> Hi Guys,
>
> Why is sorting on streaming dataframes not supported(unless it is complete
> mode)? My downstream needs me to sort the streaming dataframe.
>
> Hemant
>
>
>

Re: Sorting on a streaming dataframe

Posted by Hemant Bhanawat <he...@gmail.com>.
Well, we want to assign snapshot ids (incrementing counters) to the
incoming records. For that, we are zipping the streaming rdds with that
counter using a modified version of ZippedWithIndexRDD. We are ok if the
records in the streaming dataframe gets counters in random order but the
counter should always be incrementing.

This is working fine until we have a failure. When we have a failure, we
re-assign the records to snapshot ids  and this time same snapshot id can
get assigned to a different record. This is a problem because the primary
key in our storage engine is <recordid, snapshotid>. So we want to sort the
dataframe so that the records always get the same snapshot id.



On Fri, Apr 13, 2018 at 11:43 AM, Reynold Xin <rx...@databricks.com> wrote:

> Can you describe your use case more?
>
> On Thu, Apr 12, 2018 at 11:12 PM Hemant Bhanawat <he...@gmail.com>
> wrote:
>
>> Hi Guys,
>>
>> Why is sorting on streaming dataframes not supported(unless it is
>> complete mode)? My downstream needs me to sort the streaming dataframe.
>>
>> Hemant
>>
>

Re: Sorting on a streaming dataframe

Posted by Reynold Xin <rx...@databricks.com>.
Can you describe your use case more?

On Thu, Apr 12, 2018 at 11:12 PM Hemant Bhanawat <he...@gmail.com>
wrote:

> Hi Guys,
>
> Why is sorting on streaming dataframes not supported(unless it is complete
> mode)? My downstream needs me to sort the streaming dataframe.
>
> Hemant
>