You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Reuven Lax <re...@google.com> on 2018/06/01 11:54:50 UTC

Re: parquet/beam

This is an interesting direction. I had looked at Arrow as a default Coder
replacement for schema PCollections, and that didn't seem fruitful as we
would need to create Arrow batches of size 1. However using Arrow to encode
batches over the FnAPI might indeed be an interesting approach. Streaming
small batches instead of streaming individual elements may also prove more
efficient.

On Thu, May 31, 2018 at 8:55 PM Lukasz Cwik <lc...@google.com> wrote:

> It really needs someone to take a deep dive and look into whether Arrow is
> a good fit now considering all the use cases that Apache Beam has. I did a
> look about a year ago when designing the Fn Data API and concluded at that
> point in time it wasn't great for several reasons but mainly due to the
> fact that our systems seem to be about pushing/streaming bytes from one
> place to the next. It could become a better fit as Apache Beam improves.
> For example, migrating PCollections to have schemas would be a big push
> towards using something like Arrow as a transport as they would better
> represent each other.
>
> Anyone should feel free to write up a doc and/or prototype/develop an
> Apache Arrow transport.
>
>
> On Thu, May 31, 2018 at 10:43 AM Lukasz Cwik <lc...@google.com> wrote:
>
>> Kenn, it can be done but requires explicit flow control communication
>> between the Runner -> SDK and SDK -> Runner to be developed to support
>> sub-bundle groupings.
>>
>> Transports and in memory layouts are related but improving our coders to
>> use in memory layouts would give us most of the benefit. For example, if we
>> used flatbuffers or an equivalent technology to do that work for us. Which
>> leads us to rethinking how our coders are modelled as encoding and decoding
>> streams of bytes and whether they would be better suited as something else.
>>
>> On Thu, May 31, 2018 at 9:16 AM Kenneth Knowles <kl...@google.com> wrote:
>>
>>> For the latter, can we have the Fn API data plane transmit sub-bundle
>>> groupings to benefit from the memory layout? On input the runner controls,
>>> on output the SDK controls (spilling)? Just random thoughts.
>>>
>>> Kenn
>>>
>>> On Thu, May 31, 2018 at 8:21 AM Lukasz Cwik <lc...@google.com> wrote:
>>>
>>>> Tyler and I had reached out to Arrow folks[1] asking about how could we
>>>> support the KV<Key, Iterable<Values>> when the iterable of values is beyond
>>>> memory size limits. There is an open JIRA about adding support for large
>>>> byte[] and strings and list types in ARROW-750[2]. Robert had pointed out
>>>> that we could do the same thing we are planning to do when using the Beam
>>>> Fn Data API when handling really large values over the Beam Fn State API as
>>>> described here[3].
>>>>
>>>> The other issue that hasn't yet been discussed is that Arrow
>>>> materializes and stores the data on memory (or disk) while the Beam Fn Data
>>>> API is more about "streaming" data between two actors. This allows us to
>>>> process very large bundles and also allow for arbitrary blow up in output
>>>> from a single element (a runner can effectively control how large a bundle
>>>> is that is sent to an SDK harness but can't guarantee that the SDK will not
>>>> take a single element and produce lots and lots of data from it).
>>>>
>>>> 1:
>>>> https://lists.apache.org/thread.html/ce36c311e34af8bea230c89e7ada38923e6845d6bc875ccfbc003cfe@%3Cdev.arrow.apache.org%3E
>>>> 2: https://issues.apache.org/jira/browse/ARROW-750
>>>> 3:
>>>> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.y6e78jyiwn50
>>>>
>>>> On Thu, May 31, 2018 at 7:56 AM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> I've looked at arrow, and there's some trickiness. Beam has a record
>>>>> model and arrow works best with large batches of records. We could do per
>>>>> record encoding, but that might be inefficient in arrow.
>>>>>
>>>>> On Thu, May 31, 2018, 5:50 PM Ismaël Mejía <ie...@gmail.com> wrote:
>>>>>
>>>>>> If I understand correctly Arrow allows a common multi language
>>>>>> in-memory data representation, so basically it is a columnar data
>>>>>> format that you can use to transfer data betweeen libraries in python
>>>>>> (pandas, numpy, etc), Java and other languages. This avoids the
>>>>>> round-trip to disk to do so. So we should maybe take a look to it
>>>>>> because it could be a pretty efficient way to transfer data in
>>>>>> multi-language pipelines (useful for portability). They even seem to
>>>>>> be working in a full platform based on it with streaming capabilities:
>>>>>> https://blog.rstudio.com/2018/04/19/arrow-and-beyond/
>>>>>>
>>>>>> There is also a serialized version of it called feather. I suppose
>>>>>> that an extension to support this format can make sense.
>>>>>> https://github.com/wesm/feather
>>>>>>
>>>>>> Maybe Holden can give some other ideas on possible valid uses on Beam
>>>>>> (or correct me if I say something incorrect) because this seems to be
>>>>>> important in the python on Spark world at this moment.
>>>>>> On Thu, May 31, 2018 at 2:01 AM Chamikara Jayalath <
>>>>>> chamikara@google.com> wrote:
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> > On Wed, May 30, 2018 at 4:43 PM Lukasz Cwik <lc...@google.com>
>>>>>> wrote:
>>>>>> >>
>>>>>> >> For Python Parquet support, hopefully we can have cross language
>>>>>> pipelines solve this so we only need to implement it once. If it is really
>>>>>> popular, having it implemented more then once may be worthwhile.
>>>>>> >
>>>>>> >
>>>>>> > I'd say Parquet format is popular enough to warrant a Python
>>>>>> implementation :). Not sure if there are good candidate client libraries
>>>>>> for Python though.
>>>>>> >
>>>>>> >>
>>>>>> >> Would the point of Arrow be to treat it as an IO connector similar
>>>>>> to ParquetIO or JdbcIO (I was wondering what the purpose of the Arrow
>>>>>> integration is)?
>>>>>> >>
>>>>>> >> Every C library adds some difficulty for users to test out their
>>>>>> pipelines locally unless the C library was cross compiled for several
>>>>>> distributions. Using C libraries increases the need for using a container
>>>>>> like Docker for execution.
>>>>>> >
>>>>>> >
>>>>>> > Usually we've preferred libraries that can be directly installed
>>>>>> from PyPI over libraries that have more complicated deployment models
>>>>>> (native compilation, Conda etc). This will make the connector easily
>>>>>> available for various runner/user deployments.
>>>>>> >
>>>>>> >>
>>>>>> >>
>>>>>> >>
>>>>>> >> On Wed, May 30, 2018 at 1:56 PM Austin Bennett <
>>>>>> whatwouldaustindo@gmail.com> wrote:
>>>>>> >>>
>>>>>> >>> I can see great use cases with s3/Parquet - so that's a great
>>>>>> addition (which JB is addressing, for Java)!
>>>>>> >>>
>>>>>> >>> It would be even more ideal for the use cases I find myself
>>>>>> around for there to be python parquet support, so for perhaps this next
>>>>>> release:
>>>>>> >>> Would it make sense to be exploring: https://arrow.apache.org ?
>>>>>> I'd be happy to explore proper procedure for design/feature proposal and
>>>>>> documentation for Beam, how to scope and develop it.
>>>>>> >>>
>>>>>> >>> Also, from the little I've looked at actual implementation, it
>>>>>> appears that (py)arrow relies on underlying C binaries, which was listed as
>>>>>> a problem or at least a point against choice of package with the developing
>>>>>> python/kafka source.  How big an issue is that -- what else should I be
>>>>>> considering?  Guidance absolutely welcomed!
>>>>>>
>>>>>