You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Jean-Baptiste Onofré (JIRA)" <ji...@apache.org> on 2017/01/04 17:45:58 UTC

[jira] [Commented] (BEAM-649) Pipeline "actions" should use foreachRDD via ParDo.

    [ https://issues.apache.org/jira/browse/BEAM-649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15798856#comment-15798856 ] 

Jean-Baptiste Onofré commented on BEAM-649:
-------------------------------------------

Basically, in the {{TransformTranslator}}, if we add caching on the {{ParDo}} evaluator, it works fine:

{code}
        context.putDataset(transform,
            new BoundedDataset<>(inRDD.mapPartitions(new DoFnFunction<>(accum, doFn,
                context.getRuntimeContext(), sideInputs, windowingStrategy)).cache()));
{code}

However, this result with "over-caching": any RDD/DStream is cached even if it's used only one time. Moreover, over-caching could be an issue for the Spark optimizer.

So, we have to do something smarter: basically, indicate to the {{ParDo}} evaluator if it has to cache or not the RDD/DStream.

One way to determine if caching is require is to traverse the pipeline. We can hook in the {{TranslationModeDetector}} pipeline visitor to determine caching and flag for the evaluator.

> Pipeline "actions" should use foreachRDD via ParDo.
> ---------------------------------------------------
>
>                 Key: BEAM-649
>                 URL: https://issues.apache.org/jira/browse/BEAM-649
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-spark
>            Reporter: Amit Sela
>            Assignee: Jean-Baptiste Onofré
>
> Spark will execute a pipeline ONLY if it's triggered by an action (batch) / output operation (streaming) - http://spark.apache.org/docs/1.6.2/streaming-programming-guide.html#output-operations-on-dstreams.
> Currently, such actions in Beam are mostly implemented via ParDo, and translated by the runner as a Map transformation (via mapPartitions).
> The runner overcomes this by "forcing" actions on untranslated leaves.
> While this is OK, it would be better in some cases, e.g., Sinks, to apply the same ParDo translation but with foreach/foreachRDD instead of foreachPartition/mapPartitions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)