You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flume.apache.org by Santiago Mola <sm...@stratio.com> on 2014/11/28 10:13:22 UTC

Using a PositionTracker in EventDeserializers

Hi,

We have a recurring need of Flume deserializers that go beyond line or
blob. Some examples are XML deserialization where events are generated with
XPath/XQuery expressions, parsers for XLS, PDF, etc.

There is no proper solution in Flume for these use case. A significant
amount of our projects required workarounds for this such as an external
preprocessing or postprocessing step.

So we have explored the following solutions to the problem:

- Using BlobDeserializer and then using an interceptor (1 to N events) to
perform the transformation. This is currently not possible since an
interceptor must output 0 or 1 event for each input event. This was brought
up in this mailing list long time ago [1] but it seems no one came up with
a viable solution.

- Implementing an EventDeserializer. We have done this in some cases with
different degrees of success. For example, with a XML deserializer with
XPath [2]. The main limitation of this approach is the lack of a common
method for position tracking at the deserializer level. Currently, Flume's
core has a PositionTracker at the Source/InputStream level, which tracks
the input offset. LineDeserializer and BlobDeserializer rely on the
assumption that events can be mapped to an input offset (i.e. an event can
be created by reading only from a given input offset). This assumption is
not valid for more complex use cases (e.g. can't produce events without
reading file headers). This can be solved by using a second PositionTracker
at the deserializer level. Here's a commit with a possible implementation
of this approach [3].

Do you think this is a problem worth solving in Flume? If yes, what would
be the best approach?


[1]
http://mail-archives.apache.org/mod_mbox/flume-dev/201208.mbox/%3CCABCB9rJ0-puRp1FfPfvyfO41wnMgUh=tiFCpGufwXbNYV_PGOQ@mail.gmail.com%3E
[2]
https://github.com/Stratio/flume-ingestion/tree/develop/stratio-deserializers/stratio-xmlxpath-deserializer
[3]
https://github.com/Stratio/flume/commit/a6fac7247b7fc48dec5dc3ab4c658ab4e5c0e753

Best,
-- 

Santiago M. Mola


<http://www.stratio.com/>
Avenida de Europa, 26. Ática 5. 3ª Planta
28224 Pozuelo de Alarcón, Madrid
Tel: +34 91 352 59 42 // *@stratiobd <https://twitter.com/StratioBD>*

Re: Using a PositionTracker in EventDeserializers

Posted by Santiago Mola <sm...@stratio.com>.
Hi,

2014-12-04 1:19 GMT+01:00 Hari Shreedharan <hs...@cloudera.com>:

> OK, I think I understand. Can you file a jira and post this info + a
> design doc if you have a design in mind?
>

Sure. I'll prepare a more elaborate proposal.

Best,
-- 

Santiago M. Mola


<http://www.stratio.com/>
Avenida de Europa, 26. Ática 5. 3ª Planta
28224 Pozuelo de Alarcón, Madrid
Tel: +34 91 352 59 42 // *@stratiobd <https://twitter.com/StratioBD>*

Re: Using a PositionTracker in EventDeserializers

Posted by Hari Shreedharan <hs...@cloudera.com>.
OK, I think I understand. Can you file a jira and post this info + a design doc if you have a design in mind?


Thanks,
Hari

On Tue, Dec 2, 2014 at 1:00 AM, Santiago Mola <sm...@stratio.com> wrote:

> 2014-12-02 7:04 GMT+01:00 Hari Shreedharan <hs...@cloudera.com>:
>> Wouldn’t the mark and reset be enough? Do you really need access to the
>> underlying offsets? The resettable stream already provides mark and reset
>>
> As far as I know, that is not enough. I'll explain some use cases and maybe
> you can suggest a better approach:
> XML deserialization
> ================
> XPath and XQuery parsers require you to parse the whole input document
> before traversing it. XPath can actually be streamed, but resuming the
> stream from an arbitrary point is far from trivial. As far as I know,
> streaming XQuery remains a research issue without any standard solution.
> The solution we have implemented: read the whole document (.mark()'ing at
> the begginning of the resettable stream) and extract all the events at
> once. We have to track the event index we returned last, so we use a
> PositionTracker where we actually store event index, not stream position.
> If the source is re-created after a crash, it will start reading the file
> from the beginning, but it will start returning events from the last event
> index that is stored in the PositionTracker.
> We're working with similar approaches for some internal projects such as
> PDF deserialization.
> Decompression
> =============
> Resuming decompression of a compressed stream at an arbitrary input offset
> (the one stored at resettable stream) is usually not possible. Also, there
> is no way to map an arbitrary offset in the decompressed stream to an
> offset in the compressed stream. So we apply the same mechanism as in the
> previous case (but at the ResettableInputStream level): we use a
> ResettableInputStream implementation that wraps another
> ResettableInputStream. The DecompressInputStream marks the underlying
> ResettableInputStream at 0 and starts decompressing, tracking the offset in
> the decompressed stream. If resuming is needed, it starts decompressing
> from the beginning, but skipping all input until the last offset tracked in
> the decompressed stream. This approach is suboptimal, since it requires
> using a buffer that is always as large as the maximum batch size in bytes,
> but it works otherwise.
> Best,
> -- 
> Santiago M. Mola
> <http://www.stratio.com/>
> Avenida de Europa, 26. Ática 5. 3ª Planta
> 28224 Pozuelo de Alarcón, Madrid
> Tel: +34 91 352 59 42 // *@stratiobd <https://twitter.com/StratioBD>*

Re: Using a PositionTracker in EventDeserializers

Posted by Santiago Mola <sm...@stratio.com>.
2014-12-02 7:04 GMT+01:00 Hari Shreedharan <hs...@cloudera.com>:

> Wouldn’t the mark and reset be enough? Do you really need access to the
> underlying offsets? The resettable stream already provides mark and reset
>

As far as I know, that is not enough. I'll explain some use cases and maybe
you can suggest a better approach:

XML deserialization
================

XPath and XQuery parsers require you to parse the whole input document
before traversing it. XPath can actually be streamed, but resuming the
stream from an arbitrary point is far from trivial. As far as I know,
streaming XQuery remains a research issue without any standard solution.

The solution we have implemented: read the whole document (.mark()'ing at
the begginning of the resettable stream) and extract all the events at
once. We have to track the event index we returned last, so we use a
PositionTracker where we actually store event index, not stream position.
If the source is re-created after a crash, it will start reading the file
from the beginning, but it will start returning events from the last event
index that is stored in the PositionTracker.

We're working with similar approaches for some internal projects such as
PDF deserialization.

Decompression
=============

Resuming decompression of a compressed stream at an arbitrary input offset
(the one stored at resettable stream) is usually not possible. Also, there
is no way to map an arbitrary offset in the decompressed stream to an
offset in the compressed stream. So we apply the same mechanism as in the
previous case (but at the ResettableInputStream level): we use a
ResettableInputStream implementation that wraps another
ResettableInputStream. The DecompressInputStream marks the underlying
ResettableInputStream at 0 and starts decompressing, tracking the offset in
the decompressed stream. If resuming is needed, it starts decompressing
from the beginning, but skipping all input until the last offset tracked in
the decompressed stream. This approach is suboptimal, since it requires
using a buffer that is always as large as the maximum batch size in bytes,
but it works otherwise.

Best,
-- 

Santiago M. Mola


<http://www.stratio.com/>
Avenida de Europa, 26. Ática 5. 3ª Planta
28224 Pozuelo de Alarcón, Madrid
Tel: +34 91 352 59 42 // *@stratiobd <https://twitter.com/StratioBD>*

Re: Using a PositionTracker in EventDeserializers

Posted by Hari Shreedharan <hs...@cloudera.com>.
Wouldn’t the mark and reset be enough? Do you really need access to the underlying offsets? The resettable stream already provides mark and reset


Thanks,
Hari

On Fri, Nov 28, 2014 at 1:14 AM, Santiago Mola <sm...@stratio.com> wrote:

> Hi,
> We have a recurring need of Flume deserializers that go beyond line or
> blob. Some examples are XML deserialization where events are generated with
> XPath/XQuery expressions, parsers for XLS, PDF, etc.
> There is no proper solution in Flume for these use case. A significant
> amount of our projects required workarounds for this such as an external
> preprocessing or postprocessing step.
> So we have explored the following solutions to the problem:
> - Using BlobDeserializer and then using an interceptor (1 to N events) to
> perform the transformation. This is currently not possible since an
> interceptor must output 0 or 1 event for each input event. This was brought
> up in this mailing list long time ago [1] but it seems no one came up with
> a viable solution.
> - Implementing an EventDeserializer. We have done this in some cases with
> different degrees of success. For example, with a XML deserializer with
> XPath [2]. The main limitation of this approach is the lack of a common
> method for position tracking at the deserializer level. Currently, Flume's
> core has a PositionTracker at the Source/InputStream level, which tracks
> the input offset. LineDeserializer and BlobDeserializer rely on the
> assumption that events can be mapped to an input offset (i.e. an event can
> be created by reading only from a given input offset). This assumption is
> not valid for more complex use cases (e.g. can't produce events without
> reading file headers). This can be solved by using a second PositionTracker
> at the deserializer level. Here's a commit with a possible implementation
> of this approach [3].
> Do you think this is a problem worth solving in Flume? If yes, what would
> be the best approach?
> [1]
> http://mail-archives.apache.org/mod_mbox/flume-dev/201208.mbox/%3CCABCB9rJ0-puRp1FfPfvyfO41wnMgUh=tiFCpGufwXbNYV_PGOQ@mail.gmail.com%3E
> [2]
> https://github.com/Stratio/flume-ingestion/tree/develop/stratio-deserializers/stratio-xmlxpath-deserializer
> [3]
> https://github.com/Stratio/flume/commit/a6fac7247b7fc48dec5dc3ab4c658ab4e5c0e753
> Best,
> -- 
> Santiago M. Mola
> <http://www.stratio.com/>
> Avenida de Europa, 26. Ática 5. 3ª Planta
> 28224 Pozuelo de Alarcón, Madrid
> Tel: +34 91 352 59 42 // *@stratiobd <https://twitter.com/StratioBD>*