You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Harry Brundage (JIRA)" <ji...@apache.org> on 2015/01/23 20:19:35 UTC

[jira] [Commented] (SPARK-2688) Need a way to run multiple data pipeline concurrently

    [ https://issues.apache.org/jira/browse/SPARK-2688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14289771#comment-14289771 ] 

Harry Brundage commented on SPARK-2688:
---------------------------------------

I respectfully disagree :)

Persist is one way to implement this functionality in the same way that intermediate HDFS writes between map reduce jobs is the way to implement Hive queries: it is the slow path, and isn't necessary. Even if your dataset fits in memory, its still slower to cache and reuse than to just fly through HDFS read -> spark job -> HDFS write as fast as possible, and if the data doesn't fit in memory, you are now forced to pay a disk write penalty for persist(MEMORY_AND_DISK) that isn't strictly necessary. From what I understand, that's explicitly Spark's purpose: to avoid intermediate persist steps that slow everything down needlessly when you can stream through memory or iterate on data in memory all nice and fast like.  I'd also like to note that Matei posted about doing this once on the mailing list here: http://apache-spark-user-list.1001560.n3.nabble.com/Is-deferred-execution-of-multiple-RDDs-ever-coming-td10310.html

I've spoken to Josh Rosen and Sandy about this both briefly, and from what I understand, this is really hard. The main problem is rooted throughout the whole project: RDDs pull data from their parent stages, as opposed to having data pushed at them. If data was pushed, then the double-output thing would be decidedly easier, but since it is pulled, the parent stage needs some buffer/persistence mechanism to keep intermediate data around until all the downstream dependents are ready to ask for it. I am not really sure how changing the model from pull to push would affect the R part of RDD, but I think its definitely a consideration. 

To root this discussion in reality a bit more, I have an example use case I think is valid. We very often materialize large volumes of JSON logs from raw sources like a cloud based syslog sink, or aggregated nginx logs, or what have you. In the vast majority of circumstances all the json lines parse just fine, but a very small select few are broken. We don't want to tear down the whole spark job because one line is invalid, but these things seem inevitable, especially when dealing with stuff from the web as most Hadoop users are. So, we rescue JSON parse errors, but we want to log that it happened somehow, and hopefully leave the data around for manual inspections. Ralph Kimball calls this a "reject bin". You stick the malformed stuff in a secondary location, and ignore it if there is a really low volume, but always have it around to go back to in the event you've been systematically ignoring something you shouldn't, or the volume of rejects gets too high for you to have confident they really are just randomly cosmic-ray broken. For us, we don't want to .persist the entire mass of JSON data immediately lifting it up off the disk, as we are probably going to filter it down or manipulate it very soon after, but unless we persist it, we have no way to "fork" the RDD into the good-quality parsed JSON data for the rest of the job, and the bad quality JSON data we want to write to some other secondary reject location. Feel free to suggest a different persistence mechanism for the rejects: map over the data and log it to some other service or whatever, but I don't really like this solution. We need to now implement some external state store when we have HDFS handy, we need to resolve duplicates if a spark task dies for whatever reason (YARN preemption in our case), and then re-executes and re-logs broken data, and now we have a whole other operational component. For the time being, we're just logging broken JSON in the executor logs, but these get reaped frequently and are far from user friendly to query.

So, I suggest keeping this open as a ticket to evaluate if switching to pushing data through the DAG is ok. If you folks don't want to do it that's totally fine, but I would like to make sure that is the case. 

> Need a way to run multiple data pipeline concurrently
> -----------------------------------------------------
>
>                 Key: SPARK-2688
>                 URL: https://issues.apache.org/jira/browse/SPARK-2688
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>    Affects Versions: 1.0.1
>            Reporter: Xuefu Zhang
>
> Suppose we want to do the following data processing: 
> {code}
> rdd1 -> rdd2 -> rdd3
>            | -> rdd4
>            | -> rdd5
>            \ -> rdd6
> {code}
> where -> represents a transformation. rdd3 to rrdd6 are all derived from an intermediate rdd2. We use foreach(fn) with a dummy function to trigger the execution. However, rdd.foreach(fn) only trigger pipeline rdd1 -> rdd2 -> rdd3. To make things worse, when we call rdd4.foreach(), rdd2 will be recomputed. This is very inefficient. Ideally, we should be able to trigger the execution the whole graph and reuse rdd2, but there doesn't seem to be a way doing so. Tez already realized the importance of this (TEZ-391), so I think Spark should provide this too.
> This is required for Hive to support multi-insert queries. HIVE-7292.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org