You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@arrow.apache.org by Brian Hulette <br...@ccri.com> on 2017/10/24 19:44:22 UTC

[DISCUSS] Allow "delta" dictionary batches

One issue we've struggled with when adding an Arrow interface to Geomesa 
is the requirement to send all dictionary batches before record batches 
in the IPC formats. Sometimes we have pre-computed "top-k" stats that we 
can use to assemble a dictionary beforehand, but those don't always 
exist, and even when they do they aren't complete by definition, so we 
could end up hiding valuable data in an "Other" category. So in practice 
we often have to wait to collect all the data before we can start 
streaming anything.

I'd like to propose a couple of modifications to the Arrow IPC formats 
that could help alleviate this problem:
1) Allow multiple dictionary batches to use the same id. The vectors in 
all dictionary batches with the same id can be concatenated together to 
represent the full dictionary with that id.
2) Allow dictionary batches and record batches to be interleaved. For 
the streaming format, there could be an additional requirement that any 
dictionary key used in a record batch must have been defined in a 
previously sent dictionary batch.

These changes would allow producers to send "delta" dictionary batches 
in an Arrow stream to define new keys that will be used in future record 
batches. Here's an example stream with one column of city names, to help 
illustrate the idea:

<SCHEMA>
<DICTIONARY id=0>
(0) "New York"
(1) "Seattle"
(2) "Washington, DC"

<RECORD BATCH 0>
0
1
2
1

<DICTIONARY id=0>
(3) "Chicago"
(4) "San Francisco"

<RECORD BATCH 1>
3
2
4
0
EOS


Decoded Data:
-------------
New York
Seattle
Washington, DC
Seattle
Chicago
Washington, DC
San Francisco
New York


I also think it can be valuable if the requirement mentioned in #2 
applies only to the streaming format, so that the random-access format 
would support dictionary batches following record batches. That way 
producers creating random-access files could start writing record 
batches before all the data for the dictionaries has been assembled.

I need to give Paul Taylor credit for this idea - he actually already 
wrote the JS arrow reader to combine dictionaries with the same id 
(https://github.com/apache/arrow/blob/master/js/src/reader/arrow.ts#L59), 
and it occurred to me that that could be a solution for us.

Thanks
Brian


Re: [DISCUSS] Allow "delta" dictionary batches

Posted by Wes McKinney <we...@gmail.com>.
I'd be OK with invoking YAGNI here and only adding delta batches -- we
can either do this with a "dictionary batch type" enum or an isDelta
boolean flag. If a stream had multiple dictionaries, it might be that
only a single dictionary batch needs to get redefined. This is
probably an esoteric use case so I don't think we should complicate
the stream reader implementations any more than needed.

We actually will need some work in C++ to be able to compare whether
two dictionary encoded arrays are "compatible" for analytics (or
whether they need a "conform" step to ensure the dictionary indices
refer to the same values). i.e. if one has a larger dictionary than
the other, but they are equal up to the end of the shorter one, then
the shorter dictionary can be dropped



On Thu, Oct 26, 2017 at 11:53 AM, Jacques Nadeau <ja...@apache.org> wrote:
> Why not just close the existing stream and start a stream if there is a
> redefine? Just trying to understand the difference in the redefinition
> case.
>
> On Thu, Oct 26, 2017 at 7:13 AM, Brian Hulette <br...@ccri.com>
> wrote:
>
>> My initial thinking was just appending to the dictionary, but it could be
>> useful to have the ability to redefine it as Wes suggested.
>>
>> Redefining does add some extra burden on stream consumers though since a
>> dictionary batch would no longer apply globally - consumers would have to
>> determine the appropriate dictionary batch(es) to apply to a given record
>> batch when looking back at data earlier in the stream.
>>
>> That's not that difficult to implement, but its a complication worth
>> considering.
>>
>> Brian
>>
>>
>>
>> On 10/25/2017 09:25 PM, Wes McKinney wrote:
>>
>>> What I'd proposed was to add metadata to indicate either an append
>>> (DELTA) or a replacement (NEW)
>>>
>>> On Wed, Oct 25, 2017 at 9:23 PM, Jacques Nadeau <ja...@apache.org>
>>> wrote:
>>>
>>>> Is the proposal to only append to the dictionary or to redefine it?
>>>>
>>>>
>>>> On Wed, Oct 25, 2017 at 7:16 AM, Wes McKinney <we...@gmail.com>
>>>> wrote:
>>>>
>>>> Opened https://issues.apache.org/jira/browse/ARROW-1727
>>>>>
>>>>> On Tue, Oct 24, 2017 at 6:16 PM, Wes McKinney <we...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> hi Brian,
>>>>>>
>>>>>> Thanks for bringing this up. I'm +1 on having a mechanism to enable
>>>>>> dictionaries to grow or change mid-stream. I figured that this would
>>>>>> eventually come up and the current design for the stream does not
>>>>>> preclude having dictionaries show up mid-stream. As an example, a
>>>>>> service streaming data from Parquet files might send
>>>>>> dictionary-encoded versions of some columns, and it would not be
>>>>>> practical to have to scan all of the Parquet files of interest to find
>>>>>> the global dictionary. The Apache CarbonData format built some
>>>>>> Spark-based infrastructure around this exact problem, but we cannot
>>>>>> assume that it will be cheap or practical to find the global
>>>>>> dictionary up front.
>>>>>>
>>>>>> I think having dictionary messages occur after the first record
>>>>>> batches is a reasonable strategy. I would suggest we add a "type"
>>>>>> field to the DictionaryBatch message type ([1]) so that we can either
>>>>>> indicate that the message is a NEW dictionary (i.e. the existing one
>>>>>> should be dropped) or a DELTA (additions) to an existing dictionary. I
>>>>>> don't think it will be difficult to accommodate this in the C++
>>>>>> implementation, for example (though we will need to finally implement
>>>>>> "concatenate" for all supported types to make it work).
>>>>>>
>>>>>> Thanks,
>>>>>> Wes
>>>>>>
>>>>>> [1]: https://github.com/apache/arrow/blob/master/format/Message.
>>>>>> fbs#L86
>>>>>>
>>>>>> On Tue, Oct 24, 2017 at 3:44 PM, Brian Hulette <brian.hulette@ccri.com
>>>>>> >
>>>>>>
>>>>> wrote:
>>>>>
>>>>>> One issue we've struggled with when adding an Arrow interface to
>>>>>>>
>>>>>> Geomesa is
>>>>>
>>>>>> the requirement to send all dictionary batches before record batches in
>>>>>>>
>>>>>> the
>>>>>
>>>>>> IPC formats. Sometimes we have pre-computed "top-k" stats that we can
>>>>>>>
>>>>>> use to
>>>>>
>>>>>> assemble a dictionary beforehand, but those don't always exist, and
>>>>>>> even
>>>>>>> when they do they aren't complete by definition, so we could end up
>>>>>>>
>>>>>> hiding
>>>>>
>>>>>> valuable data in an "Other" category. So in practice we often have to
>>>>>>>
>>>>>> wait
>>>>>
>>>>>> to collect all the data before we can start streaming anything.
>>>>>>>
>>>>>>> I'd like to propose a couple of modifications to the Arrow IPC formats
>>>>>>>
>>>>>> that
>>>>>
>>>>>> could help alleviate this problem:
>>>>>>> 1) Allow multiple dictionary batches to use the same id. The vectors
>>>>>>> in
>>>>>>>
>>>>>> all
>>>>>
>>>>>> dictionary batches with the same id can be concatenated together to
>>>>>>> represent the full dictionary with that id.
>>>>>>> 2) Allow dictionary batches and record batches to be interleaved. For
>>>>>>>
>>>>>> the
>>>>>
>>>>>> streaming format, there could be an additional requirement that any
>>>>>>> dictionary key used in a record batch must have been defined in a
>>>>>>>
>>>>>> previously
>>>>>
>>>>>> sent dictionary batch.
>>>>>>>
>>>>>>> These changes would allow producers to send "delta" dictionary batches
>>>>>>>
>>>>>> in an
>>>>>
>>>>>> Arrow stream to define new keys that will be used in future record
>>>>>>>
>>>>>> batches.
>>>>>
>>>>>> Here's an example stream with one column of city names, to help
>>>>>>>
>>>>>> illustrate
>>>>>
>>>>>> the idea:
>>>>>>>
>>>>>>> <SCHEMA>
>>>>>>> <DICTIONARY id=0>
>>>>>>> (0) "New York"
>>>>>>> (1) "Seattle"
>>>>>>> (2) "Washington, DC"
>>>>>>>
>>>>>>> <RECORD BATCH 0>
>>>>>>> 0
>>>>>>> 1
>>>>>>> 2
>>>>>>> 1
>>>>>>>
>>>>>>> <DICTIONARY id=0>
>>>>>>> (3) "Chicago"
>>>>>>> (4) "San Francisco"
>>>>>>>
>>>>>>> <RECORD BATCH 1>
>>>>>>> 3
>>>>>>> 2
>>>>>>> 4
>>>>>>> 0
>>>>>>> EOS
>>>>>>>
>>>>>>>
>>>>>>> Decoded Data:
>>>>>>> -------------
>>>>>>> New York
>>>>>>> Seattle
>>>>>>> Washington, DC
>>>>>>> Seattle
>>>>>>> Chicago
>>>>>>> Washington, DC
>>>>>>> San Francisco
>>>>>>> New York
>>>>>>>
>>>>>>>
>>>>>>> I also think it can be valuable if the requirement mentioned in #2
>>>>>>>
>>>>>> applies
>>>>>
>>>>>> only to the streaming format, so that the random-access format would
>>>>>>>
>>>>>> support
>>>>>
>>>>>> dictionary batches following record batches. That way producers
>>>>>>> creating
>>>>>>> random-access files could start writing record batches before all the
>>>>>>>
>>>>>> data
>>>>>
>>>>>> for the dictionaries has been assembled.
>>>>>>>
>>>>>>> I need to give Paul Taylor credit for this idea - he actually already
>>>>>>>
>>>>>> wrote
>>>>>
>>>>>> the JS arrow reader to combine dictionaries with the same id
>>>>>>> (https://github.com/apache/arrow/blob/master/js/src/reader/
>>>>>>> arrow.ts#L59
>>>>>>>
>>>>>> ),
>>>>>
>>>>>> and it occurred to me that that could be a solution for us.
>>>>>>>
>>>>>>> Thanks
>>>>>>> Brian
>>>>>>>
>>>>>>>
>>

Re: [DISCUSS] Allow "delta" dictionary batches

Posted by Jacques Nadeau <ja...@apache.org>.
Why not just close the existing stream and start a stream if there is a
redefine? Just trying to understand the difference in the redefinition
case.

On Thu, Oct 26, 2017 at 7:13 AM, Brian Hulette <br...@ccri.com>
wrote:

> My initial thinking was just appending to the dictionary, but it could be
> useful to have the ability to redefine it as Wes suggested.
>
> Redefining does add some extra burden on stream consumers though since a
> dictionary batch would no longer apply globally - consumers would have to
> determine the appropriate dictionary batch(es) to apply to a given record
> batch when looking back at data earlier in the stream.
>
> That's not that difficult to implement, but its a complication worth
> considering.
>
> Brian
>
>
>
> On 10/25/2017 09:25 PM, Wes McKinney wrote:
>
>> What I'd proposed was to add metadata to indicate either an append
>> (DELTA) or a replacement (NEW)
>>
>> On Wed, Oct 25, 2017 at 9:23 PM, Jacques Nadeau <ja...@apache.org>
>> wrote:
>>
>>> Is the proposal to only append to the dictionary or to redefine it?
>>>
>>>
>>> On Wed, Oct 25, 2017 at 7:16 AM, Wes McKinney <we...@gmail.com>
>>> wrote:
>>>
>>> Opened https://issues.apache.org/jira/browse/ARROW-1727
>>>>
>>>> On Tue, Oct 24, 2017 at 6:16 PM, Wes McKinney <we...@gmail.com>
>>>> wrote:
>>>>
>>>>> hi Brian,
>>>>>
>>>>> Thanks for bringing this up. I'm +1 on having a mechanism to enable
>>>>> dictionaries to grow or change mid-stream. I figured that this would
>>>>> eventually come up and the current design for the stream does not
>>>>> preclude having dictionaries show up mid-stream. As an example, a
>>>>> service streaming data from Parquet files might send
>>>>> dictionary-encoded versions of some columns, and it would not be
>>>>> practical to have to scan all of the Parquet files of interest to find
>>>>> the global dictionary. The Apache CarbonData format built some
>>>>> Spark-based infrastructure around this exact problem, but we cannot
>>>>> assume that it will be cheap or practical to find the global
>>>>> dictionary up front.
>>>>>
>>>>> I think having dictionary messages occur after the first record
>>>>> batches is a reasonable strategy. I would suggest we add a "type"
>>>>> field to the DictionaryBatch message type ([1]) so that we can either
>>>>> indicate that the message is a NEW dictionary (i.e. the existing one
>>>>> should be dropped) or a DELTA (additions) to an existing dictionary. I
>>>>> don't think it will be difficult to accommodate this in the C++
>>>>> implementation, for example (though we will need to finally implement
>>>>> "concatenate" for all supported types to make it work).
>>>>>
>>>>> Thanks,
>>>>> Wes
>>>>>
>>>>> [1]: https://github.com/apache/arrow/blob/master/format/Message.
>>>>> fbs#L86
>>>>>
>>>>> On Tue, Oct 24, 2017 at 3:44 PM, Brian Hulette <brian.hulette@ccri.com
>>>>> >
>>>>>
>>>> wrote:
>>>>
>>>>> One issue we've struggled with when adding an Arrow interface to
>>>>>>
>>>>> Geomesa is
>>>>
>>>>> the requirement to send all dictionary batches before record batches in
>>>>>>
>>>>> the
>>>>
>>>>> IPC formats. Sometimes we have pre-computed "top-k" stats that we can
>>>>>>
>>>>> use to
>>>>
>>>>> assemble a dictionary beforehand, but those don't always exist, and
>>>>>> even
>>>>>> when they do they aren't complete by definition, so we could end up
>>>>>>
>>>>> hiding
>>>>
>>>>> valuable data in an "Other" category. So in practice we often have to
>>>>>>
>>>>> wait
>>>>
>>>>> to collect all the data before we can start streaming anything.
>>>>>>
>>>>>> I'd like to propose a couple of modifications to the Arrow IPC formats
>>>>>>
>>>>> that
>>>>
>>>>> could help alleviate this problem:
>>>>>> 1) Allow multiple dictionary batches to use the same id. The vectors
>>>>>> in
>>>>>>
>>>>> all
>>>>
>>>>> dictionary batches with the same id can be concatenated together to
>>>>>> represent the full dictionary with that id.
>>>>>> 2) Allow dictionary batches and record batches to be interleaved. For
>>>>>>
>>>>> the
>>>>
>>>>> streaming format, there could be an additional requirement that any
>>>>>> dictionary key used in a record batch must have been defined in a
>>>>>>
>>>>> previously
>>>>
>>>>> sent dictionary batch.
>>>>>>
>>>>>> These changes would allow producers to send "delta" dictionary batches
>>>>>>
>>>>> in an
>>>>
>>>>> Arrow stream to define new keys that will be used in future record
>>>>>>
>>>>> batches.
>>>>
>>>>> Here's an example stream with one column of city names, to help
>>>>>>
>>>>> illustrate
>>>>
>>>>> the idea:
>>>>>>
>>>>>> <SCHEMA>
>>>>>> <DICTIONARY id=0>
>>>>>> (0) "New York"
>>>>>> (1) "Seattle"
>>>>>> (2) "Washington, DC"
>>>>>>
>>>>>> <RECORD BATCH 0>
>>>>>> 0
>>>>>> 1
>>>>>> 2
>>>>>> 1
>>>>>>
>>>>>> <DICTIONARY id=0>
>>>>>> (3) "Chicago"
>>>>>> (4) "San Francisco"
>>>>>>
>>>>>> <RECORD BATCH 1>
>>>>>> 3
>>>>>> 2
>>>>>> 4
>>>>>> 0
>>>>>> EOS
>>>>>>
>>>>>>
>>>>>> Decoded Data:
>>>>>> -------------
>>>>>> New York
>>>>>> Seattle
>>>>>> Washington, DC
>>>>>> Seattle
>>>>>> Chicago
>>>>>> Washington, DC
>>>>>> San Francisco
>>>>>> New York
>>>>>>
>>>>>>
>>>>>> I also think it can be valuable if the requirement mentioned in #2
>>>>>>
>>>>> applies
>>>>
>>>>> only to the streaming format, so that the random-access format would
>>>>>>
>>>>> support
>>>>
>>>>> dictionary batches following record batches. That way producers
>>>>>> creating
>>>>>> random-access files could start writing record batches before all the
>>>>>>
>>>>> data
>>>>
>>>>> for the dictionaries has been assembled.
>>>>>>
>>>>>> I need to give Paul Taylor credit for this idea - he actually already
>>>>>>
>>>>> wrote
>>>>
>>>>> the JS arrow reader to combine dictionaries with the same id
>>>>>> (https://github.com/apache/arrow/blob/master/js/src/reader/
>>>>>> arrow.ts#L59
>>>>>>
>>>>> ),
>>>>
>>>>> and it occurred to me that that could be a solution for us.
>>>>>>
>>>>>> Thanks
>>>>>> Brian
>>>>>>
>>>>>>
>

Re: [DISCUSS] Allow "delta" dictionary batches

Posted by Brian Hulette <br...@ccri.com>.
My initial thinking was just appending to the dictionary, but it could 
be useful to have the ability to redefine it as Wes suggested.

Redefining does add some extra burden on stream consumers though since a 
dictionary batch would no longer apply globally - consumers would have 
to determine the appropriate dictionary batch(es) to apply to a given 
record batch when looking back at data earlier in the stream.

That's not that difficult to implement, but its a complication worth 
considering.

Brian


On 10/25/2017 09:25 PM, Wes McKinney wrote:
> What I'd proposed was to add metadata to indicate either an append
> (DELTA) or a replacement (NEW)
>
> On Wed, Oct 25, 2017 at 9:23 PM, Jacques Nadeau <ja...@apache.org> wrote:
>> Is the proposal to only append to the dictionary or to redefine it?
>>
>>
>> On Wed, Oct 25, 2017 at 7:16 AM, Wes McKinney <we...@gmail.com> wrote:
>>
>>> Opened https://issues.apache.org/jira/browse/ARROW-1727
>>>
>>> On Tue, Oct 24, 2017 at 6:16 PM, Wes McKinney <we...@gmail.com> wrote:
>>>> hi Brian,
>>>>
>>>> Thanks for bringing this up. I'm +1 on having a mechanism to enable
>>>> dictionaries to grow or change mid-stream. I figured that this would
>>>> eventually come up and the current design for the stream does not
>>>> preclude having dictionaries show up mid-stream. As an example, a
>>>> service streaming data from Parquet files might send
>>>> dictionary-encoded versions of some columns, and it would not be
>>>> practical to have to scan all of the Parquet files of interest to find
>>>> the global dictionary. The Apache CarbonData format built some
>>>> Spark-based infrastructure around this exact problem, but we cannot
>>>> assume that it will be cheap or practical to find the global
>>>> dictionary up front.
>>>>
>>>> I think having dictionary messages occur after the first record
>>>> batches is a reasonable strategy. I would suggest we add a "type"
>>>> field to the DictionaryBatch message type ([1]) so that we can either
>>>> indicate that the message is a NEW dictionary (i.e. the existing one
>>>> should be dropped) or a DELTA (additions) to an existing dictionary. I
>>>> don't think it will be difficult to accommodate this in the C++
>>>> implementation, for example (though we will need to finally implement
>>>> "concatenate" for all supported types to make it work).
>>>>
>>>> Thanks,
>>>> Wes
>>>>
>>>> [1]: https://github.com/apache/arrow/blob/master/format/Message.fbs#L86
>>>>
>>>> On Tue, Oct 24, 2017 at 3:44 PM, Brian Hulette <br...@ccri.com>
>>> wrote:
>>>>> One issue we've struggled with when adding an Arrow interface to
>>> Geomesa is
>>>>> the requirement to send all dictionary batches before record batches in
>>> the
>>>>> IPC formats. Sometimes we have pre-computed "top-k" stats that we can
>>> use to
>>>>> assemble a dictionary beforehand, but those don't always exist, and even
>>>>> when they do they aren't complete by definition, so we could end up
>>> hiding
>>>>> valuable data in an "Other" category. So in practice we often have to
>>> wait
>>>>> to collect all the data before we can start streaming anything.
>>>>>
>>>>> I'd like to propose a couple of modifications to the Arrow IPC formats
>>> that
>>>>> could help alleviate this problem:
>>>>> 1) Allow multiple dictionary batches to use the same id. The vectors in
>>> all
>>>>> dictionary batches with the same id can be concatenated together to
>>>>> represent the full dictionary with that id.
>>>>> 2) Allow dictionary batches and record batches to be interleaved. For
>>> the
>>>>> streaming format, there could be an additional requirement that any
>>>>> dictionary key used in a record batch must have been defined in a
>>> previously
>>>>> sent dictionary batch.
>>>>>
>>>>> These changes would allow producers to send "delta" dictionary batches
>>> in an
>>>>> Arrow stream to define new keys that will be used in future record
>>> batches.
>>>>> Here's an example stream with one column of city names, to help
>>> illustrate
>>>>> the idea:
>>>>>
>>>>> <SCHEMA>
>>>>> <DICTIONARY id=0>
>>>>> (0) "New York"
>>>>> (1) "Seattle"
>>>>> (2) "Washington, DC"
>>>>>
>>>>> <RECORD BATCH 0>
>>>>> 0
>>>>> 1
>>>>> 2
>>>>> 1
>>>>>
>>>>> <DICTIONARY id=0>
>>>>> (3) "Chicago"
>>>>> (4) "San Francisco"
>>>>>
>>>>> <RECORD BATCH 1>
>>>>> 3
>>>>> 2
>>>>> 4
>>>>> 0
>>>>> EOS
>>>>>
>>>>>
>>>>> Decoded Data:
>>>>> -------------
>>>>> New York
>>>>> Seattle
>>>>> Washington, DC
>>>>> Seattle
>>>>> Chicago
>>>>> Washington, DC
>>>>> San Francisco
>>>>> New York
>>>>>
>>>>>
>>>>> I also think it can be valuable if the requirement mentioned in #2
>>> applies
>>>>> only to the streaming format, so that the random-access format would
>>> support
>>>>> dictionary batches following record batches. That way producers creating
>>>>> random-access files could start writing record batches before all the
>>> data
>>>>> for the dictionaries has been assembled.
>>>>>
>>>>> I need to give Paul Taylor credit for this idea - he actually already
>>> wrote
>>>>> the JS arrow reader to combine dictionaries with the same id
>>>>> (https://github.com/apache/arrow/blob/master/js/src/reader/arrow.ts#L59
>>> ),
>>>>> and it occurred to me that that could be a solution for us.
>>>>>
>>>>> Thanks
>>>>> Brian
>>>>>


Re: [DISCUSS] Allow "delta" dictionary batches

Posted by Wes McKinney <we...@gmail.com>.
What I'd proposed was to add metadata to indicate either an append
(DELTA) or a replacement (NEW)

On Wed, Oct 25, 2017 at 9:23 PM, Jacques Nadeau <ja...@apache.org> wrote:
> Is the proposal to only append to the dictionary or to redefine it?
>
>
> On Wed, Oct 25, 2017 at 7:16 AM, Wes McKinney <we...@gmail.com> wrote:
>
>> Opened https://issues.apache.org/jira/browse/ARROW-1727
>>
>> On Tue, Oct 24, 2017 at 6:16 PM, Wes McKinney <we...@gmail.com> wrote:
>> > hi Brian,
>> >
>> > Thanks for bringing this up. I'm +1 on having a mechanism to enable
>> > dictionaries to grow or change mid-stream. I figured that this would
>> > eventually come up and the current design for the stream does not
>> > preclude having dictionaries show up mid-stream. As an example, a
>> > service streaming data from Parquet files might send
>> > dictionary-encoded versions of some columns, and it would not be
>> > practical to have to scan all of the Parquet files of interest to find
>> > the global dictionary. The Apache CarbonData format built some
>> > Spark-based infrastructure around this exact problem, but we cannot
>> > assume that it will be cheap or practical to find the global
>> > dictionary up front.
>> >
>> > I think having dictionary messages occur after the first record
>> > batches is a reasonable strategy. I would suggest we add a "type"
>> > field to the DictionaryBatch message type ([1]) so that we can either
>> > indicate that the message is a NEW dictionary (i.e. the existing one
>> > should be dropped) or a DELTA (additions) to an existing dictionary. I
>> > don't think it will be difficult to accommodate this in the C++
>> > implementation, for example (though we will need to finally implement
>> > "concatenate" for all supported types to make it work).
>> >
>> > Thanks,
>> > Wes
>> >
>> > [1]: https://github.com/apache/arrow/blob/master/format/Message.fbs#L86
>> >
>> > On Tue, Oct 24, 2017 at 3:44 PM, Brian Hulette <br...@ccri.com>
>> wrote:
>> >> One issue we've struggled with when adding an Arrow interface to
>> Geomesa is
>> >> the requirement to send all dictionary batches before record batches in
>> the
>> >> IPC formats. Sometimes we have pre-computed "top-k" stats that we can
>> use to
>> >> assemble a dictionary beforehand, but those don't always exist, and even
>> >> when they do they aren't complete by definition, so we could end up
>> hiding
>> >> valuable data in an "Other" category. So in practice we often have to
>> wait
>> >> to collect all the data before we can start streaming anything.
>> >>
>> >> I'd like to propose a couple of modifications to the Arrow IPC formats
>> that
>> >> could help alleviate this problem:
>> >> 1) Allow multiple dictionary batches to use the same id. The vectors in
>> all
>> >> dictionary batches with the same id can be concatenated together to
>> >> represent the full dictionary with that id.
>> >> 2) Allow dictionary batches and record batches to be interleaved. For
>> the
>> >> streaming format, there could be an additional requirement that any
>> >> dictionary key used in a record batch must have been defined in a
>> previously
>> >> sent dictionary batch.
>> >>
>> >> These changes would allow producers to send "delta" dictionary batches
>> in an
>> >> Arrow stream to define new keys that will be used in future record
>> batches.
>> >> Here's an example stream with one column of city names, to help
>> illustrate
>> >> the idea:
>> >>
>> >> <SCHEMA>
>> >> <DICTIONARY id=0>
>> >> (0) "New York"
>> >> (1) "Seattle"
>> >> (2) "Washington, DC"
>> >>
>> >> <RECORD BATCH 0>
>> >> 0
>> >> 1
>> >> 2
>> >> 1
>> >>
>> >> <DICTIONARY id=0>
>> >> (3) "Chicago"
>> >> (4) "San Francisco"
>> >>
>> >> <RECORD BATCH 1>
>> >> 3
>> >> 2
>> >> 4
>> >> 0
>> >> EOS
>> >>
>> >>
>> >> Decoded Data:
>> >> -------------
>> >> New York
>> >> Seattle
>> >> Washington, DC
>> >> Seattle
>> >> Chicago
>> >> Washington, DC
>> >> San Francisco
>> >> New York
>> >>
>> >>
>> >> I also think it can be valuable if the requirement mentioned in #2
>> applies
>> >> only to the streaming format, so that the random-access format would
>> support
>> >> dictionary batches following record batches. That way producers creating
>> >> random-access files could start writing record batches before all the
>> data
>> >> for the dictionaries has been assembled.
>> >>
>> >> I need to give Paul Taylor credit for this idea - he actually already
>> wrote
>> >> the JS arrow reader to combine dictionaries with the same id
>> >> (https://github.com/apache/arrow/blob/master/js/src/reader/arrow.ts#L59
>> ),
>> >> and it occurred to me that that could be a solution for us.
>> >>
>> >> Thanks
>> >> Brian
>> >>
>>

Re: [DISCUSS] Allow "delta" dictionary batches

Posted by Jacques Nadeau <ja...@apache.org>.
Is the proposal to only append to the dictionary or to redefine it?


On Wed, Oct 25, 2017 at 7:16 AM, Wes McKinney <we...@gmail.com> wrote:

> Opened https://issues.apache.org/jira/browse/ARROW-1727
>
> On Tue, Oct 24, 2017 at 6:16 PM, Wes McKinney <we...@gmail.com> wrote:
> > hi Brian,
> >
> > Thanks for bringing this up. I'm +1 on having a mechanism to enable
> > dictionaries to grow or change mid-stream. I figured that this would
> > eventually come up and the current design for the stream does not
> > preclude having dictionaries show up mid-stream. As an example, a
> > service streaming data from Parquet files might send
> > dictionary-encoded versions of some columns, and it would not be
> > practical to have to scan all of the Parquet files of interest to find
> > the global dictionary. The Apache CarbonData format built some
> > Spark-based infrastructure around this exact problem, but we cannot
> > assume that it will be cheap or practical to find the global
> > dictionary up front.
> >
> > I think having dictionary messages occur after the first record
> > batches is a reasonable strategy. I would suggest we add a "type"
> > field to the DictionaryBatch message type ([1]) so that we can either
> > indicate that the message is a NEW dictionary (i.e. the existing one
> > should be dropped) or a DELTA (additions) to an existing dictionary. I
> > don't think it will be difficult to accommodate this in the C++
> > implementation, for example (though we will need to finally implement
> > "concatenate" for all supported types to make it work).
> >
> > Thanks,
> > Wes
> >
> > [1]: https://github.com/apache/arrow/blob/master/format/Message.fbs#L86
> >
> > On Tue, Oct 24, 2017 at 3:44 PM, Brian Hulette <br...@ccri.com>
> wrote:
> >> One issue we've struggled with when adding an Arrow interface to
> Geomesa is
> >> the requirement to send all dictionary batches before record batches in
> the
> >> IPC formats. Sometimes we have pre-computed "top-k" stats that we can
> use to
> >> assemble a dictionary beforehand, but those don't always exist, and even
> >> when they do they aren't complete by definition, so we could end up
> hiding
> >> valuable data in an "Other" category. So in practice we often have to
> wait
> >> to collect all the data before we can start streaming anything.
> >>
> >> I'd like to propose a couple of modifications to the Arrow IPC formats
> that
> >> could help alleviate this problem:
> >> 1) Allow multiple dictionary batches to use the same id. The vectors in
> all
> >> dictionary batches with the same id can be concatenated together to
> >> represent the full dictionary with that id.
> >> 2) Allow dictionary batches and record batches to be interleaved. For
> the
> >> streaming format, there could be an additional requirement that any
> >> dictionary key used in a record batch must have been defined in a
> previously
> >> sent dictionary batch.
> >>
> >> These changes would allow producers to send "delta" dictionary batches
> in an
> >> Arrow stream to define new keys that will be used in future record
> batches.
> >> Here's an example stream with one column of city names, to help
> illustrate
> >> the idea:
> >>
> >> <SCHEMA>
> >> <DICTIONARY id=0>
> >> (0) "New York"
> >> (1) "Seattle"
> >> (2) "Washington, DC"
> >>
> >> <RECORD BATCH 0>
> >> 0
> >> 1
> >> 2
> >> 1
> >>
> >> <DICTIONARY id=0>
> >> (3) "Chicago"
> >> (4) "San Francisco"
> >>
> >> <RECORD BATCH 1>
> >> 3
> >> 2
> >> 4
> >> 0
> >> EOS
> >>
> >>
> >> Decoded Data:
> >> -------------
> >> New York
> >> Seattle
> >> Washington, DC
> >> Seattle
> >> Chicago
> >> Washington, DC
> >> San Francisco
> >> New York
> >>
> >>
> >> I also think it can be valuable if the requirement mentioned in #2
> applies
> >> only to the streaming format, so that the random-access format would
> support
> >> dictionary batches following record batches. That way producers creating
> >> random-access files could start writing record batches before all the
> data
> >> for the dictionaries has been assembled.
> >>
> >> I need to give Paul Taylor credit for this idea - he actually already
> wrote
> >> the JS arrow reader to combine dictionaries with the same id
> >> (https://github.com/apache/arrow/blob/master/js/src/reader/arrow.ts#L59
> ),
> >> and it occurred to me that that could be a solution for us.
> >>
> >> Thanks
> >> Brian
> >>
>

Re: [DISCUSS] Allow "delta" dictionary batches

Posted by Wes McKinney <we...@gmail.com>.
Opened https://issues.apache.org/jira/browse/ARROW-1727

On Tue, Oct 24, 2017 at 6:16 PM, Wes McKinney <we...@gmail.com> wrote:
> hi Brian,
>
> Thanks for bringing this up. I'm +1 on having a mechanism to enable
> dictionaries to grow or change mid-stream. I figured that this would
> eventually come up and the current design for the stream does not
> preclude having dictionaries show up mid-stream. As an example, a
> service streaming data from Parquet files might send
> dictionary-encoded versions of some columns, and it would not be
> practical to have to scan all of the Parquet files of interest to find
> the global dictionary. The Apache CarbonData format built some
> Spark-based infrastructure around this exact problem, but we cannot
> assume that it will be cheap or practical to find the global
> dictionary up front.
>
> I think having dictionary messages occur after the first record
> batches is a reasonable strategy. I would suggest we add a "type"
> field to the DictionaryBatch message type ([1]) so that we can either
> indicate that the message is a NEW dictionary (i.e. the existing one
> should be dropped) or a DELTA (additions) to an existing dictionary. I
> don't think it will be difficult to accommodate this in the C++
> implementation, for example (though we will need to finally implement
> "concatenate" for all supported types to make it work).
>
> Thanks,
> Wes
>
> [1]: https://github.com/apache/arrow/blob/master/format/Message.fbs#L86
>
> On Tue, Oct 24, 2017 at 3:44 PM, Brian Hulette <br...@ccri.com> wrote:
>> One issue we've struggled with when adding an Arrow interface to Geomesa is
>> the requirement to send all dictionary batches before record batches in the
>> IPC formats. Sometimes we have pre-computed "top-k" stats that we can use to
>> assemble a dictionary beforehand, but those don't always exist, and even
>> when they do they aren't complete by definition, so we could end up hiding
>> valuable data in an "Other" category. So in practice we often have to wait
>> to collect all the data before we can start streaming anything.
>>
>> I'd like to propose a couple of modifications to the Arrow IPC formats that
>> could help alleviate this problem:
>> 1) Allow multiple dictionary batches to use the same id. The vectors in all
>> dictionary batches with the same id can be concatenated together to
>> represent the full dictionary with that id.
>> 2) Allow dictionary batches and record batches to be interleaved. For the
>> streaming format, there could be an additional requirement that any
>> dictionary key used in a record batch must have been defined in a previously
>> sent dictionary batch.
>>
>> These changes would allow producers to send "delta" dictionary batches in an
>> Arrow stream to define new keys that will be used in future record batches.
>> Here's an example stream with one column of city names, to help illustrate
>> the idea:
>>
>> <SCHEMA>
>> <DICTIONARY id=0>
>> (0) "New York"
>> (1) "Seattle"
>> (2) "Washington, DC"
>>
>> <RECORD BATCH 0>
>> 0
>> 1
>> 2
>> 1
>>
>> <DICTIONARY id=0>
>> (3) "Chicago"
>> (4) "San Francisco"
>>
>> <RECORD BATCH 1>
>> 3
>> 2
>> 4
>> 0
>> EOS
>>
>>
>> Decoded Data:
>> -------------
>> New York
>> Seattle
>> Washington, DC
>> Seattle
>> Chicago
>> Washington, DC
>> San Francisco
>> New York
>>
>>
>> I also think it can be valuable if the requirement mentioned in #2 applies
>> only to the streaming format, so that the random-access format would support
>> dictionary batches following record batches. That way producers creating
>> random-access files could start writing record batches before all the data
>> for the dictionaries has been assembled.
>>
>> I need to give Paul Taylor credit for this idea - he actually already wrote
>> the JS arrow reader to combine dictionaries with the same id
>> (https://github.com/apache/arrow/blob/master/js/src/reader/arrow.ts#L59),
>> and it occurred to me that that could be a solution for us.
>>
>> Thanks
>> Brian
>>

Re: [DISCUSS] Allow "delta" dictionary batches

Posted by Wes McKinney <we...@gmail.com>.
hi Brian,

Thanks for bringing this up. I'm +1 on having a mechanism to enable
dictionaries to grow or change mid-stream. I figured that this would
eventually come up and the current design for the stream does not
preclude having dictionaries show up mid-stream. As an example, a
service streaming data from Parquet files might send
dictionary-encoded versions of some columns, and it would not be
practical to have to scan all of the Parquet files of interest to find
the global dictionary. The Apache CarbonData format built some
Spark-based infrastructure around this exact problem, but we cannot
assume that it will be cheap or practical to find the global
dictionary up front.

I think having dictionary messages occur after the first record
batches is a reasonable strategy. I would suggest we add a "type"
field to the DictionaryBatch message type ([1]) so that we can either
indicate that the message is a NEW dictionary (i.e. the existing one
should be dropped) or a DELTA (additions) to an existing dictionary. I
don't think it will be difficult to accommodate this in the C++
implementation, for example (though we will need to finally implement
"concatenate" for all supported types to make it work).

Thanks,
Wes

[1]: https://github.com/apache/arrow/blob/master/format/Message.fbs#L86

On Tue, Oct 24, 2017 at 3:44 PM, Brian Hulette <br...@ccri.com> wrote:
> One issue we've struggled with when adding an Arrow interface to Geomesa is
> the requirement to send all dictionary batches before record batches in the
> IPC formats. Sometimes we have pre-computed "top-k" stats that we can use to
> assemble a dictionary beforehand, but those don't always exist, and even
> when they do they aren't complete by definition, so we could end up hiding
> valuable data in an "Other" category. So in practice we often have to wait
> to collect all the data before we can start streaming anything.
>
> I'd like to propose a couple of modifications to the Arrow IPC formats that
> could help alleviate this problem:
> 1) Allow multiple dictionary batches to use the same id. The vectors in all
> dictionary batches with the same id can be concatenated together to
> represent the full dictionary with that id.
> 2) Allow dictionary batches and record batches to be interleaved. For the
> streaming format, there could be an additional requirement that any
> dictionary key used in a record batch must have been defined in a previously
> sent dictionary batch.
>
> These changes would allow producers to send "delta" dictionary batches in an
> Arrow stream to define new keys that will be used in future record batches.
> Here's an example stream with one column of city names, to help illustrate
> the idea:
>
> <SCHEMA>
> <DICTIONARY id=0>
> (0) "New York"
> (1) "Seattle"
> (2) "Washington, DC"
>
> <RECORD BATCH 0>
> 0
> 1
> 2
> 1
>
> <DICTIONARY id=0>
> (3) "Chicago"
> (4) "San Francisco"
>
> <RECORD BATCH 1>
> 3
> 2
> 4
> 0
> EOS
>
>
> Decoded Data:
> -------------
> New York
> Seattle
> Washington, DC
> Seattle
> Chicago
> Washington, DC
> San Francisco
> New York
>
>
> I also think it can be valuable if the requirement mentioned in #2 applies
> only to the streaming format, so that the random-access format would support
> dictionary batches following record batches. That way producers creating
> random-access files could start writing record batches before all the data
> for the dictionaries has been assembled.
>
> I need to give Paul Taylor credit for this idea - he actually already wrote
> the JS arrow reader to combine dictionaries with the same id
> (https://github.com/apache/arrow/blob/master/js/src/reader/arrow.ts#L59),
> and it occurred to me that that could be a solution for us.
>
> Thanks
> Brian
>