You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Ismaël Mejía (JIRA)" <ji...@apache.org> on 2019/01/11 16:05:00 UTC
[jira] [Resolved] (BEAM-6332) Avoid unnecessary serialization steps
when executing combine transform
[ https://issues.apache.org/jira/browse/BEAM-6332?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ismaël Mejía resolved BEAM-6332.
--------------------------------
Resolution: Fixed
Fix Version/s: 2.11.0
> Avoid unnecessary serialization steps when executing combine transform
> ----------------------------------------------------------------------
>
> Key: BEAM-6332
> URL: https://issues.apache.org/jira/browse/BEAM-6332
> Project: Beam
> Issue Type: Improvement
> Components: runner-spark
> Reporter: Vaclav Plajt
> Assignee: Vaclav Plajt
> Priority: Major
> Fix For: 2.11.0
>
> Time Spent: 20m
> Remaining Estimate: 0h
>
> Combine transformation is translated into Spark's RDD API in [GroupCombineFunctions|https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java] `combinePerKey` and `combineGlobally` methods. Both methods use byte arrays as intermediate state of aggregation so they can be transferred over network. That leads to serialization and de-serialization of intermediate aggregation value every time new element is added to aggregation. That is unnecessary and should be avoided.
> _(copied from BEAM-6214 because it explains the approach):_
> We can do much better by letting accumulators to work on user defined java types and only serialize accumulators when we need to send them over the network.
> In order to do this, we need following:
> * Acummulator wrapper -> contains transient `T` value + byte payload, that is filled in during serialization
> * JavaSerialization: accumulator (beam wrapper) needs to implement Serializable and override writeObject and readObject methods and use beam coder
> * KryoSerialization: we need a custom kryo serializer for accumulator wrapper
> This should be enough to hook into all possible spark serialization interfaces.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)