You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by ravidspark <ra...@gmail.com> on 2018/05/11 21:25:47 UTC

ordered ingestion not guaranteed

Hi All,

I am using Spark 2.2.0 & I have below use case:

*Reading from Kafka using Spark Streaming and updating(not just inserting)
the records into downstream database*

I understand that the way Spark read messages from Kafka will not be in a
order of timestamp as stored in Kafka partitions rather, in the order of
offsets of the partitions. So, for suppose if there are two messages in
kafka with the same key but one message with timestamp which is latest and
is placed in the smallest offset, one more message with oldest timestamp
placed in at earliest offset. In this case, as Spark reads from smallest ->
earliest offset, the latest timestamp will be processed first and then
oldest timestamp resulting in an unordered ingestion into the DB.

If both these messages fell into the same rdd, then applying a reduce
function we can ignore the message with oldest timestamp and process the
latest timestamp message. But, I am not quite sure how to handle if these
messages fall into different RDD's in the stream. An approach I was trying
is to hit the DB and retrieve the timestamp in DB for that key and compare
and ignore if old timestamp. But, this is not an efficient way when handling
millions of messages as DB handling is expensive.

Is there a better way of solving this problem?




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: ordered ingestion not guaranteed

Posted by ravidspark <ra...@gmail.com>.
Jorn,

Thanks for the response. My downstream database is Kudu.

1. Yes. As you have suggested, I have been using a central caching mechanism
that caches the rdd results and to make a comparison with the next batch to
check for the latest timestamps and ignore the old timestamps. But, I see
handling this is not easy and not efficient.

2. My main objective is to update the record with the latest timestamp. If I
define timestamp as primary key then all I will be doing is a normal insert
as, timestamp will always be unique(most probably as in my case it is nano
second granulized).

I am looking for some functionality with in Spark to achieve this. I am
reading about windowing technique and watermarking but, I am doubtful as
they are used only for aggregations and not sure if I can use them in these
scenario. Any suggestion are appreciated.


Thanks,
Ravi




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: ordered ingestion not guaranteed

Posted by Jörn Franke <jo...@gmail.com>.
What DB do you have? 

You have some options, such as
1) use a key value store (they can be accessed very efficiently) to see if there has been a newer key already processed - if yes then ignore value if no then insert into database
2) redesign the key to include the timestamp and find out the latest one when querying the database 

> On 11. May 2018, at 23:25, ravidspark <ra...@gmail.com> wrote:
> 
> Hi All,
> 
> I am using Spark 2.2.0 & I have below use case:
> 
> *Reading from Kafka using Spark Streaming and updating(not just inserting)
> the records into downstream database*
> 
> I understand that the way Spark read messages from Kafka will not be in a
> order of timestamp as stored in Kafka partitions rather, in the order of
> offsets of the partitions. So, for suppose if there are two messages in
> kafka with the same key but one message with timestamp which is latest and
> is placed in the smallest offset, one more message with oldest timestamp
> placed in at earliest offset. In this case, as Spark reads from smallest ->
> earliest offset, the latest timestamp will be processed first and then
> oldest timestamp resulting in an unordered ingestion into the DB.
> 
> If both these messages fell into the same rdd, then applying a reduce
> function we can ignore the message with oldest timestamp and process the
> latest timestamp message. But, I am not quite sure how to handle if these
> messages fall into different RDD's in the stream. An approach I was trying
> is to hit the DB and retrieve the timestamp in DB for that key and compare
> and ignore if old timestamp. But, this is not an efficient way when handling
> millions of messages as DB handling is expensive.
> 
> Is there a better way of solving this problem?
> 
> 
> 
> 
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> 
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
> 

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org