You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/21 04:38:00 UTC

[jira] [Resolved] (SPARK-12097) How to do a cached, batched JDBC-lookup in Spark Streaming?

     [ https://issues.apache.org/jira/browse/SPARK-12097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon resolved SPARK-12097.
----------------------------------
    Resolution: Incomplete

> How to do a cached, batched JDBC-lookup in Spark Streaming?
> -----------------------------------------------------------
>
>                 Key: SPARK-12097
>                 URL: https://issues.apache.org/jira/browse/SPARK-12097
>             Project: Spark
>          Issue Type: Brainstorming
>          Components: DStreams
>            Reporter: Christian Kurz
>            Priority: Major
>              Labels: bulk-closed
>
> h3. Use-case
> I need to enrich incoming Kafka data with data from a lookup table (or query) on a relational database. Lookup data is changing slowly over time (So caching is okay for a certain retention time). Lookup data is potentially huge (So loading all data upfront is not option).
> h3. Problem
> The overall design idea is to implement a cached and batched JDBC lookup. That is, for any lookup keys, which are missing from the lookup cache, a JDBC lookup is done to retrieve the missing lookup data. JDBC lookups are rather expensive (connection overhead, number of round-trips) and therefore must be done in batches. E.g. one JDBC lookup per 100 missing keys.
> So the high-level logic might look something like this:
> # For every Kafka RDD we extract all lookup keys
> # For all lookup keys we check whether the lookup data is already available already in cache and whether this cached information has not expired, yet.
> # For any lookup keys not found in cache (or expired), we send batched prepared JDBC Statements to the database to fetch the missing lookup data:
>     {{SELECT c1, c2, c3 FROM ... WHERE k1 in (?,?,?,...)}}
> to minimize the number of JDBC round-trips.
> # At this point we have up-to-date lookup data for all lookup keys and can perform the actual lookup operation.
> Does this approach make sense on Spark? Would Spark State DStreams be the right way to go? Or other design approaches?
> Assuming Spark State DStreams are the right direction, the low-level question is how to do the batching?
> Would this particular signature of DStream.updateStateByKey ( iterator-> iterator):
> {code:borderStyle=solid}
> def updateStateByKey[S: ClassTag](
>       updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
>       partitioner: Partitioner,
>       rememberPartitioner: Boolean,
>       initialRDD: RDD[(K, S)]
>     )
> {code}
> be the right way to batch multiple incoming keys into a single JDBC-lookup query?
> Would the new {{DStream.trackStateByKey()}} be a better approach?
> The second more high-level question: is there a way to chain multiple state operations on the same state object?
> Going with the above design approach the entire lookup logic would be handcrafted into some java/scala/python {{updateFunc}}. This function would go over all incoming keys, check which ones are missing from cache, batch the missing ones, run the JDBC queries and union the returned lookup data with the existing cache from the State object.
> The fact that all of this must be handcrafted into a single function seems to be caused by the fact that Spark State processing logic on a high-level works like this:
> {code:borderStyle=solid}
> input: prevStateRdd, inputRDD
> output: updateStateFunc( prevStateRdd, inputRdd )}}
> {code}
> So only a single updateStateFunc operating on prevStateRdd and inputRdd in one go. Once done there is no way to further refine the State as part of the current micro batch.
> The multi-step processing required here sounds like a typical use-case for a DStream: apply multiple operations one after the other on some incoming data. So I wonder whether there is a way to extend the concept of state processing (may be it already has been extended?) to do something like:
> {code:borderStyle=solid}
> *input: prevStateRdd, inputRdd*
> missingKeys     = inputRdd.filter( <not exists in prevStateRdd> )  
> foundKeys       = inputRdd.filter( <exists in prevStateRdd> )  
> newLookupData   = lookupKeysUsingJdbcDataFrameRead( missingKeys.collect() )
> newStateRdd     = newLookupData.union( foundKeys).union( prevStateRdd )
> *output: newStateRdd*
> {code}
> This would nicely leverage all the power and richness of Spark. The only missing bit - and the reason why this approach does not work today (based on my naive understanding of Spark - is that {{newStateRdd}} cannot be declared to be the {{prevStateRdd}} of the next micro batch.
> If Spark had a way of declaring an RDD (or DStream to be the parent for the next batch run), even complex (chained) state operations would be easy to describe and would not require hand-written Java/Python/Scala updateFunctions.
> Thanks a lot for taking the time to read all of this!!!
> Any thoughts/pointers are much appreciated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org