You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@arrow.apache.org by Sam Davis <Sa...@nanoporetech.com> on 2021/07/23 13:14:50 UTC

[PyArrow] DictionaryArray isDelta Support

Hi,

We want to write out RecordBatches of data, where one or more columns in a batch has a `pa.string()` column encoded as a `pa.dictionary(pa.intX(), pa.string()` as the column only contains a handful of unique values.

However, PyArrow seems to lack support for writing these batches out to either the streaming or (non-streaming) file format.

When attempting to write two distinct batches the following error message is triggered:

> ArrowInvalid: Dictionary replacement detected when writing IPC file format. Arrow IPC files only support a single dictionary for a given field across all batches.

I believe this message is false and that support is possible based on reading the spec:

> Dictionaries are written in the stream and file formats as a sequence of record batches...
> ...
> The dictionary isDelta flag allows existing dictionaries to be expanded for future record batch materializations. A dictionary batch with isDelta set indicates that its vector should be concatenated with those of any previous batches with the same id. In a stream which encodes one column, the list of strings ["A", "B", "C", "B", "D", "C", "E", "A"], with a delta dictionary batch could take the form:

```
<SCHEMA>
<DICTIONARY 0>
(0) "A"
(1) "B"
(2) "C"

<RECORD BATCH 0>
0
1
2
1

<DICTIONARY 0 DELTA>
(3) "D"
(4) "E"

<RECORD BATCH 1>
3
2
4
0
EOS
```

> Alternatively, if isDelta is set to false, then the dictionary replaces the existing dictionary for the same ID. Using the same example as above, an alternate encoding could be:

```
<SCHEMA>
<DICTIONARY 0>
(0) "A"
(1) "B"
(2) "C"

<RECORD BATCH 0>
0
1
2
1

<DICTIONARY 0>
(0) "A"
(1) "C"
(2) "D"
(3) "E"

<RECORD BATCH 1>
2
1
3
0
EOS
```

It also specifies in the IPC File Format (non-streaming) section:

> In the file format, there is no requirement that dictionary keys should be defined in a DictionaryBatch before they are used in a RecordBatch, as long as the keys are defined somewhere in the file. Further more, it is invalid to have more than one non-delta dictionary batch per dictionary ID (i.e. dictionary replacement is not supported). Delta dictionaries are applied in the order they appear in the file footer.

So for the non-streaming format multiple non-delta dictionaries are not supported but one non-delta followed by delta dictionaries should be.

Is it possible to do this in PyArrow? If so, how? If not, how easy would it be to add? Is it currently possible via C++ and therefore can I write a Cython or similar extension that will let me do this now without waiting for a release?

Best,

Sam
IMPORTANT NOTICE: The information transmitted is intended only for the person or entity to which it is addressed and may contain confidential and/or privileged material. Any review, re-transmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and delete the material from any computer. Although we routinely screen for viruses, addressees should check this e-mail and any attachment for viruses. We make no warranty as to absence of viruses in this e-mail or any attachments.

Re: [PyArrow] DictionaryArray isDelta Support

Posted by Sam Davis <Sa...@nanoporetech.com>.
Awesome, thanks Weston.

I had assumed that setting the emit deltas option would enable you to write arbitrary dictionaries and to have the deltas automatically computed, even if the categories are not a strict extension of those previously seen. e.g. the third case in the Python example. To me, from a user perspective this seemed much nicer than having to manually grow the categories.

However, on reflection I can now see that semantically this is difficult because it will require a change of the indices.

Is it worth documenting what a delta is/what the support for deltas means to make this clear?

Sam
________________________________
From: Weston Pace <we...@gmail.com>
Sent: 15 January 2022 01:46
To: user@arrow.apache.org <us...@arrow.apache.org>
Subject: Re: [PyArrow] DictionaryArray isDelta Support

I've been working with IPC files lately so I took another look at this and it was much easier than I expected.

> This suggests the addition of a new data structure within the
> WriteDictionaries loop, likely as an attribute of IpcFormatWriter
> like `last_dictionaries_` that contains a set of values output for
> each dictionary ID to enable fast lookups? (i.e. avoid O(N^2)).

> Does this like a plausible implementation? If not, is there
> something I have missed/a better suggestion? If so, would
> anyone be willing to help with this?

I'm a little confused by this comment.  last_dictionaries_ is *already* a structure that contains the set of values output for each dictionary ID.  The rest of your reasoning / explanation matches my understanding as well.

To fix the file writer to output delta dictionaries all we need to do is move the check around a little[1].  This now matches the spec:

> Further more, it is invalid to have more than onenon-delta dictionary batch per dictionary ID

For dictionary reading we, again, already have a snazzy DictionaryMemo which knows how to append delta dictionaries so all we need to do is change the check around a little[2].

The following simple python example works for me[3].

It's probably too late for the RC0 cutoff but I'll add some test cases on Monday and will try and get it included should we need to build another RC.

[1] https://github.com/apache/arrow/pull/12160/files#diff-1b1d9dca9fdea7624e22f017b8762c4919edf57c2cf43c15d59b8a5e8e1b38a5R1092<https://github.com/apache/arrow/pull/12160/files#diff-1b1d9dca9fdea7624e22f017b8762c4919edf57c2cf43c15d59b8a5e8e1b38a5R1092>
[2] https://github.com/apache/arrow/pull/12160/files#diff-e992169684aea9845ac776ada4cbb2b5dc711b49e5a3fbc6046c92299e1aefceR1057<https://github.com/apache/arrow/pull/12160/files#diff-e992169684aea9845ac776ada4cbb2b5dc711b49e5a3fbc6046c92299e1aefceR1057>
[3] https://gist.github.com/westonpace/31b4e99f94ba03491644174a0c92a620<https://gist.github.com/westonpace/31b4e99f94ba03491644174a0c92a620>

On Fri, Jan 7, 2022 at 1:18 AM Sam Davis <Sa...@nanoporetech.com>> wrote:
Following up on this, I've tried to get up to speed on the IPC File Format writing of dictionaries and I think the following would work to make the code conformant to the IPC File Format specification:

In the WriteDictionaries [1] inner loop, if a dictionary has already been output and emit_dictionary_deltas is True then it should work out which values in the new Array - containing the current dictionary values - have already been output and only emit those that haven't (by creating a new Array containing them to pass to the payload creation and writing?).

This suggests the addition of a new data structure within the WriteDictionaries loop, likely as an attribute of IpcFormatWriter like `last_dictionaries_` that contains a set of values output for each dictionary ID to enable fast lookups? (i.e. avoid O(N^2)).

Does this like a plausible implementation? If not, is there something I have missed/a better suggestion? If so, would anyone be willing to help with this?

[1] https://github.com/apache/arrow/blob/91e3ac53e2e21736ce6291d73fc37da6fa21259d/cpp/src/arrow/ipc/writer.cc#L1060<https://github.com/apache/arrow/blob/91e3ac53e2e21736ce6291d73fc37da6fa21259d/cpp/src/arrow/ipc/writer.cc#L1060>

---

If it helps others, my reasoning to get to this point is as follows:

The IpcFormatWriter handles writing record batches to the IPC format
(non-streaming)

https://github.com/apache/arrow/blob/91e3ac53e2e21736ce6291d73fc37da6fa21259d/cpp/src/arrow/ipc/writer.cc#L982<https://github.com/apache/arrow/blob/91e3ac53e2e21736ce6291d73fc37da6fa21259d/cpp/src/arrow/ipc/writer.cc#L982>

```
class ARROW_EXPORT IpcFormatWriter : public RecordBatchWriter {
 public:
  // A RecordBatchWriter implementation that writes to a IpcPayloadWriter.
```

Writing out a table is - ignoring unified dictionaries - iterating over
RecordBatchs and writing them out using `WriteRecordBatch`.

https://github.com/apache/arrow/blob/91e3ac53e2e21736ce6291d73fc37da6fa21259d/cpp/src/arrow/ipc/writer.cc#L1002<https://github.com/apache/arrow/blob/91e3ac53e2e21736ce6291d73fc37da6fa21259d/cpp/src/arrow/ipc/writer.cc#L1002>

```
  Status WriteRecordBatch(const RecordBatch& batch) override {
```

One of the first things this does is to write out the dictionaties contained
within the batch:

https://github.com/apache/arrow/blob/91e3ac53e2e21736ce6291d73fc37da6fa21259d/cpp/src/arrow/ipc/writer.cc#L1009<https://github.com/apache/arrow/blob/91e3ac53e2e21736ce6291d73fc37da6fa21259d/cpp/src/arrow/ipc/writer.cc#L1009>

```
    RETURN_NOT_OK(WriteDictionaries(batch));
```

This is what we will need to edit to ensure that dictionary writing is handled
correctly.

Recall that a RecordBatch is a "collection of equal-length arrays matching a
particular Schema. A record batch is a table-like data structure that is
semantically a sequence of fields, each a contiguous Arrow array".

Each of the fields can be of dictionary type (which can also be nested), so
there may be more one than one dictionary to write for a batch.

WriteDictionaries is defined further down:

https://github.com/apache/arrow/blob/91e3ac53e2e21736ce6291d73fc37da6fa21259d/cpp/src/arrow/ipc/writer.cc#L1056<https://github.com/apache/arrow/blob/91e3ac53e2e21736ce6291d73fc37da6fa21259d/cpp/src/arrow/ipc/writer.cc#L1056>

```
  Status WriteDictionaries(const RecordBatch& batch) {
```

The first thing it does is call CollectDictionaries:

```
    ARROW_ASSIGN_OR_RAISE(const auto dictionaries, CollectDictionaries(batch, mapper_));
```

`mapper_` is a DictionaryFieldMapper (recall: maps FieldPaths to dictionary
IDs)

CollectDictionaries is declared in the header here:

https://github.com/apache/arrow/blob/6b0248d5b5502a1cf76d5853d15be9844b84522c/cpp/src/arrow/ipc/dictionary.h#L150<https://github.com/apache/arrow/blob/6b0248d5b5502a1cf76d5853d15be9844b84522c/cpp/src/arrow/ipc/dictionary.h#L150>

```
// For writing: collect dictionary entries to write to the IPC stream, in order
// (i.e. inner dictionaries before dependent outer dictionaries).
ARROW_EXPORT
Result<DictionaryVector> CollectDictionaries(const RecordBatch& batch,
                                             const DictionaryFieldMapper& mapper);
```

and defined here:

https://github.com/apache/arrow/blob/6b0248d5b5502a1cf76d5853d15be9844b84522c/cpp/src/arrow/ipc/dictionary.cc#L383<https://github.com/apache/arrow/blob/6b0248d5b5502a1cf76d5853d15be9844b84522c/cpp/src/arrow/ipc/dictionary.cc#L383>

```
Result<DictionaryVector> CollectDictionaries(const RecordBatch& batch,
                                             const DictionaryFieldMapper& mapper) {
  DictionaryCollector collector{mapper, {}};
  RETURN_NOT_OK(collector.Collect(batch));
  return std::move(collector.dictionaries_);
}
```

It creates a DictionaryCollector, passing in the mapper, and then calls Collect
and returning the result.

The DictionaryCollector is defined here:

https://github.com/apache/arrow/blob/6b0248d5b5502a1cf76d5853d15be9844b84522c/cpp/src/arrow/ipc/dictionary.cc#L297<https://github.com/apache/arrow/blob/6b0248d5b5502a1cf76d5853d15be9844b84522c/cpp/src/arrow/ipc/dictionary.cc#L297>

```
struct DictionaryCollector {
  const DictionaryFieldMapper& mapper_;
  DictionaryVector dictionaries_;
```

with `Collect` defined here:

https://github.com/apache/arrow/blob/6b0248d5b5502a1cf76d5853d15be9844b84522c/cpp/src/arrow/ipc/dictionary.cc#L336<https://github.com/apache/arrow/blob/6b0248d5b5502a1cf76d5853d15be9844b84522c/cpp/src/arrow/ipc/dictionary.cc#L336>

```
  Status Collect(const RecordBatch& batch) {
    FieldPosition position;
    const Schema& schema = *batch.schema();
    dictionaries_.reserve(mapper_.num_fields());

    for (int i = 0; i < schema.num_fields(); ++i) {
      RETURN_NOT_OK(Visit(position.child(i), schema.field(i),
batch.column(i).get()));
    }
    return Status::OK();
  }
```

FieldPosition is, it seems, a reverse LinkedList (each node stores a pointer to
its parent) where the nodes record their index (like FieldPath) and depth. It
has a method to walk the list and construct a vector containing the indices.

It then grabs a reference to the schema and reserves enough space in
dictionaries - recall: a vector of tuples (int, Array) - for all the fields
(optimisation to reduce allocations?).

Then for each field in the schema, it calls `Visit` with a new FieldPosition
(via the call to `position.child(i)`, the field at position i in the schema,
and the RecordBatch data.

Visit is defined here:

https://github.com/apache/arrow/blob/6b0248d5b5502a1cf76d5853d15be9844b84522c/cpp/src/arrow/ipc/dictionary.cc#L310<https://github.com/apache/arrow/blob/6b0248d5b5502a1cf76d5853d15be9844b84522c/cpp/src/arrow/ipc/dictionary.cc#L310>

```
  Status Visit(const FieldPosition& position, const std::shared_ptr<Field>& field, const Array* array) {
```

The result of this is that `dictionaries_` contains all the tuples of (id,
Array) where array contains the dictionary values (e.g. strings).

Going right back up the stack to `WriteDictionaries` we now iterate over each
of these dictionaries:

https://github.com/apache/arrow/blob/91e3ac53e2e21736ce6291d73fc37da6fa21259d/cpp/src/arrow/ipc/writer.cc#L1060<https://github.com/apache/arrow/blob/91e3ac53e2e21736ce6291d73fc37da6fa21259d/cpp/src/arrow/ipc/writer.cc#L1060>

```
    for (const auto& pair : dictionaries) {
      int64_t dictionary_id = pair.first;
      const auto& dictionary = pair.second;
```

If a dictionary with this ID has already been output, then the code checks
whether it is the same to avoid writing repeats twice.

```
      // If a dictionary with this id was already emitted, check if it was the same.
      auto* last_dictionary = &last_dictionaries_[dictionary_id];
      const bool dictionary_exists = (*last_dictionary != nullptr);
      int64_t delta_start = 0;
      if (dictionary_exists) {
        if ((*last_dictionary)->data() == dictionary->data()) {
          // Fast shortcut for a common case.
          // Same dictionary data by pointer => no need to emit it again
          continue;
        }
        const int64_t last_length = (*last_dictionary)->length();
        const int64_t new_length = dictionary->length();
        if (new_length == last_length &&
            ((*last_dictionary)->Equals(dictionary, equal_options))) {
          // Same dictionary by value => no need to emit it again
          // (while this can have a CPU cost, this code path is required
          //  for the IPC file format)
          continue;
        }
```

If either of these equality conditions trigger, then we continue on to check
the next dictionary.

If it hasn't been previously output then we might either be able to write a
replacement or a delta.

Currently the code throws an error in both of these cases if we're writing to
the IPC file format - this goes against the spec.

If the current dictionary is "just" an extension of the previous dictionary and
deltas are enabled then it marks where the new starting position is.

Given this, it creates a payload - delta or not - and writes it, ensuring to
update the stats data structure.

```
      IpcPayload payload;
      if (delta_start) {
        RETURN_NOT_OK(GetDictionaryPayload(dictionary_id, /*is_delta=*/true,
                                           dictionary->Slice(delta_start), options_,
                                           &payload));
      } else {
        RETURN_NOT_OK(
            GetDictionaryPayload(dictionary_id, dictionary, options_, &payload));
      }
      RETURN_NOT_OK(WritePayload(payload));
      ++stats_.num_dictionary_batches;
      if (dictionary_exists) {
        if (delta_start) {
          ++stats_.num_dictionary_deltas;
        } else {
          ++stats_.num_replaced_dictionaries;
        }
      }
```
[https://opengraph.githubassets.com/4c8e8fc44775a5fdec5f2e872f85c4e1e308871726e6b34b91cd52894bbe3ed7/apache/arrow]<https://github.com/apache/arrow/blob/91e3ac53e2e21736ce6291d73fc37da6fa21259d/cpp/src/arrow/ipc/writer.cc#L1060>
arrow/writer.cc at 91e3ac53e2e21736ce6291d73fc37da6fa21259d   apache/arrow<https://github.com/apache/arrow/blob/91e3ac53e2e21736ce6291d73fc37da6fa21259d/cpp/src/arrow/ipc/writer.cc#L1060>
Apache Arrow is a multi-language toolbox for accelerated data interchange and in-memory processing - arrow/writer.cc at 91e3ac53e2e21736ce6291d73fc37da6fa21259d · apache/arrow
github.com<http://github.com>


________________________________
From: Weston Pace <we...@gmail.com>>
Sent: 27 July 2021 19:52
To: user@arrow.apache.org<ma...@arrow.apache.org> <us...@arrow.apache.org>>
Subject: Re: [PyArrow] DictionaryArray isDelta Support

I'm able to verify what Sam is seeing. It appears Arrow does not
support dictionary deltas in the file format. However, from my
reading of the spec it does indeed seem proper deltas should be
allowed. A small patch[1] allowed me to write delta dictionaries in
the file format but then reading fails at [2] which seems more
explicit that dictionary deltas of any kind were not originally
supported with the file format. I think the fix for read will
probably be a bit more involved and require some good testing. I've
opened ARROW-13467 for further discussion.

[1] https://github.com/apache/arrow/commit/54df8581a9d825664785fc406950204e345a5b3b<https://github.com/apache/arrow/commit/54df8581a9d825664785fc406950204e345a5b3b>
[2] https://github.com/apache/arrow/blob/81ff679c47754692224f655dab32cc0936bb5f55/cpp/src/arrow/ipc/reader.cc#L1062<https://github.com/apache/arrow/blob/81ff679c47754692224f655dab32cc0936bb5f55/cpp/src/arrow/ipc/reader.cc#L1062>
[3] https://issues.apache.org/jira/browse/ARROW-13467<https://issues.apache.org/jira/browse/ARROW-13467>

On Sun, Jul 25, 2021 at 10:12 PM Sam Davis <Sa...@nanoporetech.com>> wrote:
>
> Hi Wes,
>
> Yes, that is exactly it. For the file format, the spec dictates that it should be possible to output deltas but currently this is not possible. An `ArrowInvalid` error is thrown.
>
> Example code:
>
> ```
> import pandas as pd
> import pyarrow as pa
>
> print(pa.__version__)
>
> schema = pa.schema([
> ("foo", pa.dictionary(pa.int16(), pa.string()))
> ])
>
> pd1 = pd.DataFrame({"foo": pd.Categorical(["a"], categories=["a", "b"])})
> b1 = pa.RecordBatch.from_pandas(pd1, schema=schema)
>
> pd2 = pd.DataFrame({"foo": pd.Categorical(["a", "bbbb"], categories=["a", "b", "bbbb"])})
> b2 = pa.RecordBatch.from_pandas(pd2, schema=schema)
>
> options = pa.ipc.IpcWriteOptions(emit_dictionary_deltas=True)
>
> with pa.ipc.new_file("/tmp/sdavis_tmp.arrow", schema=schema, options=options) as writer:
> writer.write(b1)
>
> writer.write(b2)
> ```
>
> Best,
>
> Sam
> ________________________________
> From: Wes McKinney <we...@gmail.com>>
> Sent: 24 July 2021 01:43
> To: user@arrow.apache.org<ma...@arrow.apache.org> <us...@arrow.apache.org>>
> Subject: Re: [PyArrow] DictionaryArray isDelta Support
>
> If I'm interpreting you correctly, the issue is that every dictionary
> must be a prefix of a common dictionary for the delta logic to work.
> So if the first batch has
>
> "a", "b"
>
> then in the next batch
>
> "a", "b", "c" is OK and will emit a delta
> "b", "a", "c" is not and will trigger this error
>
> If we wanted to allow for deltas coming from unordered dictionaries as
> an option, that could be implemented in theory but it not super
> trivial
>
> On Fri, Jul 23, 2021 at 9:25 AM Sam Davis <Sa...@nanoporetech.com>> wrote:
> >
> > For reference, I think this check in the C++ code triggers regardless of whether the delta option is turned on:
> >
> > https://github.com/apache/arrow/blob/e0401123736c85283e527797a113a3c38c0915f2/cpp/src/arrow/ipc/writer.cc#L1066<https://github.com/apache/arrow/blob/e0401123736c85283e527797a113a3c38c0915f2/cpp/src/arrow/ipc/writer.cc#L1066>
> > ________________________________
> > From: Sam Davis <Sa...@nanoporetech.com>>
> > Sent: 23 July 2021 14:43
> > To: user@arrow.apache.org<ma...@arrow.apache.org> <us...@arrow.apache.org>>
> > Subject: Re: [PyArrow] DictionaryArray isDelta Support
> >
> > Yes I know this as quoted in the spec. What I am wondering is for the file format how can I write deltas out using PyArrow?
> >
> > The previous example was a trivial version of reality.
> >
> > More concretely, say I want to write 100e6 rows out in multiple RecordBatches to a non-streaming file format using PyArrow. I do not want to do a complete pass ahead of time to compute the full set of strings for the relevant columns and would therefore like to dump out deltas when new strings appear in a given column. Is this possible?
> >
> > In the example code ideally this would "just" add on the delta containing the dictionary difference of it and the previous batches. I'm happy as a user to maintain the full set of categories seen thus far and tell PyArrow what the delta is if necessary.
> > ________________________________
> > From: Wes McKinney <we...@gmail.com>>
> > Sent: 23 July 2021 14:36
> > To: user@arrow.apache.org<ma...@arrow.apache.org> <us...@arrow.apache.org>>
> > Subject: Re: [PyArrow] DictionaryArray isDelta Support
> >
> > Dictionary replacements aren't supported in the file format, only
> > deltas. Your use case is a replacement, not a delta. You could use the
> > stream format instead.
> >
> > On Fri, Jul 23, 2021 at 8:32 AM Sam Davis <Sa...@nanoporetech.com>> wrote:
> > >
> > > Hey Wes,
> > >
> > > Thanks, I had not spotted this before! It doesn't seem to change the behaviour with `pa.ipc.new_file` however. Maybe I'm using it incorrectly?
> > >
> > > ```
> > > import pandas as pd
> > > import pyarrow as pa
> > >
> > > print(pa.__version__)
> > >
> > > schema = pa.schema([
> > > ("foo", pa.dictionary(pa.int16(), pa.string()))
> > > ])
> > >
> > > pd1 = pd.DataFrame({"foo": pd.Categorical(["aaaa"], categories=["a"*i for i in range(64)])})
> > > b1 = pa.RecordBatch.from_pandas(pd1, schema=schema)
> > >
> > > pd2 = pd.DataFrame({"foo": pd.Categorical(["aaaa"], categories=["b"*i for i in range(64)])})
> > > b2 = pa.RecordBatch.from_pandas(pd2, schema=schema)
> > >
> > > options = pa.ipc.IpcWriteOptions(emit_dictionary_deltas=True)
> > >
> > > with pa.ipc.new_file("/tmp/sdavis_tmp.arrow", schema=b1.schema, options=options) as writer:
> > > writer.write(b1)
> > > writer.write(b2)
> > > ```
> > >
> > > Version printed: 4.0.1
> > >
> > > Sam
> > > ________________________________
> > > From: Wes McKinney <we...@gmail.com>>
> > > Sent: 23 July 2021 14:24
> > > To: user@arrow.apache.org<ma...@arrow.apache.org> <us...@arrow.apache.org>>
> > > Subject: Re: [PyArrow] DictionaryArray isDelta Support
> > >
> > > hi Sam
> > >
> > > On Fri, Jul 23, 2021 at 8:15 AM Sam Davis <Sa...@nanoporetech.com>> wrote:
> > > >
> > > > Hi,
> > > >
> > > > We want to write out RecordBatches of data, where one or more columns in a batch has a `pa.string()` column encoded as a `pa.dictionary(pa.intX(), pa.string()` as the column only contains a handful of unique values.
> > > >
> > > > However, PyArrow seems to lack support for writing these batches out to either the streaming or (non-streaming) file format.
> > > >
> > > > When attempting to write two distinct batches the following error message is triggered:
> > > >
> > > > > ArrowInvalid: Dictionary replacement detected when writing IPC file format. Arrow IPC files only support a single dictionary for a given field across all batches.
> > > >
> > > > I believe this message is false and that support is possible based on reading the spec:
> > > >
> > > > > Dictionaries are written in the stream and file formats as a sequence of record batches...
> > > > > ...
> > > > > The dictionary isDelta flag allows existing dictionaries to be expanded for future record batch materializations. A dictionary batch with isDelta set indicates that its vector should be concatenated with those of any previous batches with the same id. In a stream which encodes one column, the list of strings ["A", "B", "C", "B", "D", "C", "E", "A"], with a delta dictionary batch could take the form:
> > > >
> > > > ```
> > > > <SCHEMA>
> > > > <DICTIONARY 0>
> > > > (0) "A"
> > > > (1) "B"
> > > > (2) "C"
> > > >
> > > > <RECORD BATCH 0>
> > > > 0
> > > > 1
> > > > 2
> > > > 1
> > > >
> > > > <DICTIONARY 0 DELTA>
> > > > (3) "D"
> > > > (4) "E"
> > > >
> > > > <RECORD BATCH 1>
> > > > 3
> > > > 2
> > > > 4
> > > > 0
> > > > EOS
> > > > ```
> > > >
> > > > > Alternatively, if isDelta is set to false, then the dictionary replaces the existing dictionary for the same ID. Using the same example as above, an alternate encoding could be:
> > > >
> > > > ```
> > > > <SCHEMA>
> > > > <DICTIONARY 0>
> > > > (0) "A"
> > > > (1) "B"
> > > > (2) "C"
> > > >
> > > > <RECORD BATCH 0>
> > > > 0
> > > > 1
> > > > 2
> > > > 1
> > > >
> > > > <DICTIONARY 0>
> > > > (0) "A"
> > > > (1) "C"
> > > > (2) "D"
> > > > (3) "E"
> > > >
> > > > <RECORD BATCH 1>
> > > > 2
> > > > 1
> > > > 3
> > > > 0
> > > > EOS
> > > > ```
> > > >
> > > > It also specifies in the IPC File Format (non-streaming) section:
> > > >
> > > > > In the file format, there is no requirement that dictionary keys should be defined in a DictionaryBatch before they are used in a RecordBatch, as long as the keys are defined somewhere in the file. Further more, it is invalid to have more than one non-delta dictionary batch per dictionary ID (i.e. dictionary replacement is not supported). Delta dictionaries are applied in the order they appear in the file footer.
> > > >
> > > > So for the non-streaming format multiple non-delta dictionaries are not supported but one non-delta followed by delta dictionaries should be.
> > > >
> > > > Is it possible to do this in PyArrow? If so, how? If not, how easy would it be to add? Is it currently possible via C++ and therefore can I write a Cython or similar extension that will let me do this now without waiting for a release?
> > > >
> > >
> > > In pyarrow (3.0.0 or later), you need to opt into emitting dictionary
> > > deltas using pyarrow.ipc.IpcWriteOptions. Can you show your code?
> > >
> > > https://github.com/apache/arrow/commit/8d76312dd397ebe07b71531f6d23b8caa76703dc<https://github.com/apache/arrow/commit/8d76312dd397ebe07b71531f6d23b8caa76703dc>
> > >
> > > > Best,
> > > >
> > > > Sam
> > > > IMPORTANT NOTICE: The information transmitted is intended only for the person or entity to which it is addressed and may contain confidential and/or privileged material. Any review, re-transmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and delete the material from any computer. Although we routinely screen for viruses, addressees should check this e-mail and any attachment for viruses. We make no warranty as to absence of viruses in this e-mail or any attachments.
> > > IMPORTANT NOTICE: The information transmitted is intended only for the person or entity to which it is addressed and may contain confidential and/or privileged material. Any review, re-transmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and delete the material from any computer. Although we routinely screen for viruses, addressees should check this e-mail and any attachment for viruses. We make no warranty as to absence of viruses in this e-mail or any attachments.
> > IMPORTANT NOTICE: The information transmitted is intended only for the person or entity to which it is addressed and may contain confidential and/or privileged material. Any review, re-transmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and delete the material from any computer. Although we routinely screen for viruses, addressees should check this e-mail and any attachment for viruses. We make no warranty as to absence of viruses in this e-mail or any attachments.
> IMPORTANT NOTICE: The information transmitted is intended only for the person or entity to which it is addressed and may contain confidential and/or privileged material. Any review, re-transmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and delete the material from any computer. Although we routinely screen for viruses, addressees should check this e-mail and any attachment for viruses. We make no warranty as to absence of viruses in this e-mail or any attachments.
IMPORTANT NOTICE: The information transmitted is intended only for the person or entity to which it is addressed and may contain confidential and/or privileged material. Any review, re-transmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and delete the material from any computer. Although we routinely screen for viruses, addressees should check this e-mail and any attachment for viruses. We make no warranty as to absence of viruses in this e-mail or any attachments.
IMPORTANT NOTICE: The information transmitted is intended only for the person or entity to which it is addressed and may contain confidential and/or privileged material. Any review, re-transmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and delete the material from any computer. Although we routinely screen for viruses, addressees should check this e-mail and any attachment for viruses. We make no warranty as to absence of viruses in this e-mail or any attachments.

Re: [PyArrow] DictionaryArray isDelta Support

Posted by Weston Pace <we...@gmail.com>.
I've been working with IPC files lately so I took another look at this and
it was much easier than I expected.

> This suggests the addition of a new data structure within the
> WriteDictionaries loop, likely as an attribute of IpcFormatWriter
> like `last_dictionaries_` that contains a set of values output for
> each dictionary ID to enable fast lookups? (i.e. avoid O(N^2)).

> Does this like a plausible implementation? If not, is there
> something I have missed/a better suggestion? If so, would
> anyone be willing to help with this?

I'm a little confused by this comment.  last_dictionaries_ is *already* a
structure that contains the set of values output for each dictionary ID.
The rest of your reasoning / explanation matches my understanding as well.

To fix the file writer to output delta dictionaries all we need to do is
move the check around a little[1].  This now matches the spec:

> Further more, it is invalid to have more than onenon-delta dictionary
batch per dictionary ID

For dictionary reading we, again, already have a snazzy DictionaryMemo
which knows how to append delta dictionaries so all we need to do is change
the check around a little[2].

The following simple python example works for me[3].

It's probably too late for the RC0 cutoff but I'll add some test cases on
Monday and will try and get it included should we need to build another RC.

[1]
https://github.com/apache/arrow/pull/12160/files#diff-1b1d9dca9fdea7624e22f017b8762c4919edf57c2cf43c15d59b8a5e8e1b38a5R1092
[2]
https://github.com/apache/arrow/pull/12160/files#diff-e992169684aea9845ac776ada4cbb2b5dc711b49e5a3fbc6046c92299e1aefceR1057
[3] https://gist.github.com/westonpace/31b4e99f94ba03491644174a0c92a620

On Fri, Jan 7, 2022 at 1:18 AM Sam Davis <Sa...@nanoporetech.com> wrote:

> Following up on this, I've tried to get up to speed on the IPC File Format
> writing of dictionaries and I think the following would work to make the
> code conformant to the IPC File Format specification:
>
> In the WriteDictionaries [1] inner loop, if a dictionary has already been
> output and emit_dictionary_deltas is True then it should work out which
> values in the new Array - containing the current dictionary values - have
> already been output and only emit those that haven't (by creating a new
> Array containing them to pass to the payload creation and writing?).
>
> This suggests the addition of a new data structure within the
> WriteDictionaries loop, likely as an attribute of IpcFormatWriter like
> `last_dictionaries_` that contains a set of values output for each
> dictionary ID to enable fast lookups? (i.e. avoid O(N^2)).
>
> Does this like a plausible implementation? If not, is there something I
> have missed/a better suggestion? If so, would anyone be willing to help
> with this?
>
> [1]
> https://github.com/apache/arrow/blob/91e3ac53e2e21736ce6291d73fc37da6fa21259d/cpp/src/arrow/ipc/writer.cc#L1060
>
> ---
>
> If it helps others, my reasoning to get to this point is as follows:
>
> The IpcFormatWriter handles writing record batches to the IPC format
> (non-streaming)
>
>
> https://github.com/apache/arrow/blob/91e3ac53e2e21736ce6291d73fc37da6fa21259d/cpp/src/arrow/ipc/writer.cc#L982
>
> ```
> class ARROW_EXPORT IpcFormatWriter : public RecordBatchWriter {
>  public:
>   // A RecordBatchWriter implementation that writes to a IpcPayloadWriter.
> ```
>
> Writing out a table is - ignoring unified dictionaries - iterating over
> RecordBatchs and writing them out using `WriteRecordBatch`.
>
>
> https://github.com/apache/arrow/blob/91e3ac53e2e21736ce6291d73fc37da6fa21259d/cpp/src/arrow/ipc/writer.cc#L1002
>
> ```
>   Status WriteRecordBatch(const RecordBatch& batch) override {
> ```
>
> One of the first things this does is to write out the dictionaties
> contained
> within the batch:
>
>
> https://github.com/apache/arrow/blob/91e3ac53e2e21736ce6291d73fc37da6fa21259d/cpp/src/arrow/ipc/writer.cc#L1009
>
> ```
>     RETURN_NOT_OK(WriteDictionaries(batch));
> ```
>
> This is what we will need to edit to ensure that dictionary writing is
> handled
> correctly.
>
> Recall that a RecordBatch is a "collection of equal-length arrays matching
> a
> particular Schema. A record batch is a table-like data structure that is
> semantically a sequence of fields, each a contiguous Arrow array".
>
> Each of the fields can be of dictionary type (which can also be nested), so
> there may be more one than one dictionary to write for a batch.
>
> WriteDictionaries is defined further down:
>
>
> https://github.com/apache/arrow/blob/91e3ac53e2e21736ce6291d73fc37da6fa21259d/cpp/src/arrow/ipc/writer.cc#L1056
>
> ```
>   Status WriteDictionaries(const RecordBatch& batch) {
> ```
>
> The first thing it does is call CollectDictionaries:
>
> ```
>     ARROW_ASSIGN_OR_RAISE(const auto dictionaries,
> CollectDictionaries(batch, mapper_));
> ```
>
> `mapper_` is a DictionaryFieldMapper (recall: maps FieldPaths to dictionary
> IDs)
>
> CollectDictionaries is declared in the header here:
>
>
> https://github.com/apache/arrow/blob/6b0248d5b5502a1cf76d5853d15be9844b84522c/cpp/src/arrow/ipc/dictionary.h#L150
>
> ```
> // For writing: collect dictionary entries to write to the IPC stream, in
> order
> // (i.e. inner dictionaries before dependent outer dictionaries).
> ARROW_EXPORT
> Result<DictionaryVector> CollectDictionaries(const RecordBatch& batch,
>                                              const DictionaryFieldMapper&
> mapper);
> ```
>
> and defined here:
>
>
> https://github.com/apache/arrow/blob/6b0248d5b5502a1cf76d5853d15be9844b84522c/cpp/src/arrow/ipc/dictionary.cc#L383
>
> ```
> Result<DictionaryVector> CollectDictionaries(const RecordBatch& batch,
>                                              const DictionaryFieldMapper&
> mapper) {
>   DictionaryCollector collector{mapper, {}};
>   RETURN_NOT_OK(collector.Collect(batch));
>   return std::move(collector.dictionaries_);
> }
> ```
>
> It creates a DictionaryCollector, passing in the mapper, and then calls
> Collect
> and returning the result.
>
> The DictionaryCollector is defined here:
>
>
> https://github.com/apache/arrow/blob/6b0248d5b5502a1cf76d5853d15be9844b84522c/cpp/src/arrow/ipc/dictionary.cc#L297
>
> ```
> struct DictionaryCollector {
>   const DictionaryFieldMapper& mapper_;
>   DictionaryVector dictionaries_;
> ```
>
> with `Collect` defined here:
>
>
> https://github.com/apache/arrow/blob/6b0248d5b5502a1cf76d5853d15be9844b84522c/cpp/src/arrow/ipc/dictionary.cc#L336
>
> ```
>   Status Collect(const RecordBatch& batch) {
>     FieldPosition position;
>     const Schema& schema = *batch.schema();
>     dictionaries_.reserve(mapper_.num_fields());
>
>     for (int i = 0; i < schema.num_fields(); ++i) {
>       RETURN_NOT_OK(Visit(position.child(i), schema.field(i),
> batch.column(i).get()));
>     }
>     return Status::OK();
>   }
> ```
>
> FieldPosition is, it seems, a reverse LinkedList (each node stores a
> pointer to
> its parent) where the nodes record their index (like FieldPath) and depth.
> It
> has a method to walk the list and construct a vector containing the
> indices.
>
> It then grabs a reference to the schema and reserves enough space in
> dictionaries - recall: a vector of tuples (int, Array) - for all the fields
> (optimisation to reduce allocations?).
>
> Then for each field in the schema, it calls `Visit` with a new
> FieldPosition
> (via the call to `position.child(i)`, the field at position i in the
> schema,
> and the RecordBatch data.
>
> Visit is defined here:
>
>
> https://github.com/apache/arrow/blob/6b0248d5b5502a1cf76d5853d15be9844b84522c/cpp/src/arrow/ipc/dictionary.cc#L310
>
> ```
>   Status Visit(const FieldPosition& position, const
> std::shared_ptr<Field>& field, const Array* array) {
> ```
>
> The result of this is that `dictionaries_` contains all the tuples of (id,
> Array) where array contains the dictionary values (e.g. strings).
>
> Going right back up the stack to `WriteDictionaries` we now iterate over
> each
> of these dictionaries:
>
>
> https://github.com/apache/arrow/blob/91e3ac53e2e21736ce6291d73fc37da6fa21259d/cpp/src/arrow/ipc/writer.cc#L1060
>
> ```
>     for (const auto& pair : dictionaries) {
>       int64_t dictionary_id = pair.first;
>       const auto& dictionary = pair.second;
> ```
>
> If a dictionary with this ID has already been output, then the code checks
> whether it is the same to avoid writing repeats twice.
>
> ```
>       // If a dictionary with this id was already emitted, check if it was
> the same.
>       auto* last_dictionary = &last_dictionaries_[dictionary_id];
>       const bool dictionary_exists = (*last_dictionary != nullptr);
>       int64_t delta_start = 0;
>       if (dictionary_exists) {
>         if ((*last_dictionary)->data() == dictionary->data()) {
>           // Fast shortcut for a common case.
>           // Same dictionary data by pointer => no need to emit it again
>           continue;
>         }
>         const int64_t last_length = (*last_dictionary)->length();
>         const int64_t new_length = dictionary->length();
>         if (new_length == last_length &&
>             ((*last_dictionary)->Equals(dictionary, equal_options))) {
>           // Same dictionary by value => no need to emit it again
>           // (while this can have a CPU cost, this code path is required
>           //  for the IPC file format)
>           continue;
>         }
> ```
>
> If either of these equality conditions trigger, then we continue on to
> check
> the next dictionary.
>
> If it hasn't been previously output then we might either be able to write a
> replacement or a delta.
>
> Currently the code throws an error in both of these cases if we're writing
> to
> the IPC file format - this goes against the spec.
>
> If the current dictionary is "just" an extension of the previous
> dictionary and
> deltas are enabled then it marks where the new starting position is.
>
> Given this, it creates a payload - delta or not - and writes it, ensuring
> to
> update the stats data structure.
>
> ```
>       IpcPayload payload;
>       if (delta_start) {
>         RETURN_NOT_OK(GetDictionaryPayload(dictionary_id,
> /*is_delta=*/true,
>                                            dictionary->Slice(delta_start),
> options_,
>                                            &payload));
>       } else {
>         RETURN_NOT_OK(
>             GetDictionaryPayload(dictionary_id, dictionary, options_,
> &payload));
>       }
>       RETURN_NOT_OK(WritePayload(payload));
>       ++stats_.num_dictionary_batches;
>       if (dictionary_exists) {
>         if (delta_start) {
>           ++stats_.num_dictionary_deltas;
>         } else {
>           ++stats_.num_replaced_dictionaries;
>         }
>       }
> ```
>
> <https://github.com/apache/arrow/blob/91e3ac53e2e21736ce6291d73fc37da6fa21259d/cpp/src/arrow/ipc/writer.cc#L1060>
> arrow/writer.cc at 91e3ac53e2e21736ce6291d73fc37da6fa21259d · apache/arrow
> <https://github.com/apache/arrow/blob/91e3ac53e2e21736ce6291d73fc37da6fa21259d/cpp/src/arrow/ipc/writer.cc#L1060>
> Apache Arrow is a multi-language toolbox for accelerated data interchange
> and in-memory processing - arrow/writer.cc at
> 91e3ac53e2e21736ce6291d73fc37da6fa21259d · apache/arrow
> github.com
> **
>
> ------------------------------
> *From:* Weston Pace <we...@gmail.com>
> *Sent:* 27 July 2021 19:52
> *To:* user@arrow.apache.org <us...@arrow.apache.org>
> *Subject:* Re: [PyArrow] DictionaryArray isDelta Support
>
> I'm able to verify what Sam is seeing. It appears Arrow does not
> support dictionary deltas in the file format. However, from my
> reading of the spec it does indeed seem proper deltas should be
> allowed. A small patch[1] allowed me to write delta dictionaries in
> the file format but then reading fails at [2] which seems more
> explicit that dictionary deltas of any kind were not originally
> supported with the file format. I think the fix for read will
> probably be a bit more involved and require some good testing. I've
> opened ARROW-13467 for further discussion.
>
> [1]
> https://github.com/apache/arrow/commit/54df8581a9d825664785fc406950204e345a5b3b
> [2]
> https://github.com/apache/arrow/blob/81ff679c47754692224f655dab32cc0936bb5f55/cpp/src/arrow/ipc/reader.cc#L1062
> [3] https://issues.apache.org/jira/browse/ARROW-13467
>
> On Sun, Jul 25, 2021 at 10:12 PM Sam Davis <Sa...@nanoporetech.com>
> wrote:
> >
> > Hi Wes,
> >
> > Yes, that is exactly it. For the file format, the spec dictates that it
> should be possible to output deltas but currently this is not possible. An
> `ArrowInvalid` error is thrown.
> >
> > Example code:
> >
> > ```
> > import pandas as pd
> > import pyarrow as pa
> >
> > print(pa.__version__)
> >
> > schema = pa.schema([
> > ("foo", pa.dictionary(pa.int16(), pa.string()))
> > ])
> >
> > pd1 = pd.DataFrame({"foo": pd.Categorical(["a"], categories=["a", "b"])})
> > b1 = pa.RecordBatch.from_pandas(pd1, schema=schema)
> >
> > pd2 = pd.DataFrame({"foo": pd.Categorical(["a", "bbbb"],
> categories=["a", "b", "bbbb"])})
> > b2 = pa.RecordBatch.from_pandas(pd2, schema=schema)
> >
> > options = pa.ipc.IpcWriteOptions(emit_dictionary_deltas=True)
> >
> > with pa.ipc.new_file("/tmp/sdavis_tmp.arrow", schema=schema,
> options=options) as writer:
> > writer.write(b1)
> >
> > writer.write(b2)
> > ```
> >
> > Best,
> >
> > Sam
> > ________________________________
> > From: Wes McKinney <we...@gmail.com>
> > Sent: 24 July 2021 01:43
> > To: user@arrow.apache.org <us...@arrow.apache.org>
> > Subject: Re: [PyArrow] DictionaryArray isDelta Support
> >
> > If I'm interpreting you correctly, the issue is that every dictionary
> > must be a prefix of a common dictionary for the delta logic to work.
> > So if the first batch has
> >
> > "a", "b"
> >
> > then in the next batch
> >
> > "a", "b", "c" is OK and will emit a delta
> > "b", "a", "c" is not and will trigger this error
> >
> > If we wanted to allow for deltas coming from unordered dictionaries as
> > an option, that could be implemented in theory but it not super
> > trivial
> >
> > On Fri, Jul 23, 2021 at 9:25 AM Sam Davis <Sa...@nanoporetech.com>
> wrote:
> > >
> > > For reference, I think this check in the C++ code triggers regardless
> of whether the delta option is turned on:
> > >
> > >
> https://github.com/apache/arrow/blob/e0401123736c85283e527797a113a3c38c0915f2/cpp/src/arrow/ipc/writer.cc#L1066
> > > ________________________________
> > > From: Sam Davis <Sa...@nanoporetech.com>
> > > Sent: 23 July 2021 14:43
> > > To: user@arrow.apache.org <us...@arrow.apache.org>
> > > Subject: Re: [PyArrow] DictionaryArray isDelta Support
> > >
> > > Yes I know this as quoted in the spec. What I am wondering is for the
> file format how can I write deltas out using PyArrow?
> > >
> > > The previous example was a trivial version of reality.
> > >
> > > More concretely, say I want to write 100e6 rows out in multiple
> RecordBatches to a non-streaming file format using PyArrow. I do not want
> to do a complete pass ahead of time to compute the full set of strings for
> the relevant columns and would therefore like to dump out deltas when new
> strings appear in a given column. Is this possible?
> > >
> > > In the example code ideally this would "just" add on the delta
> containing the dictionary difference of it and the previous batches. I'm
> happy as a user to maintain the full set of categories seen thus far and
> tell PyArrow what the delta is if necessary.
> > > ________________________________
> > > From: Wes McKinney <we...@gmail.com>
> > > Sent: 23 July 2021 14:36
> > > To: user@arrow.apache.org <us...@arrow.apache.org>
> > > Subject: Re: [PyArrow] DictionaryArray isDelta Support
> > >
> > > Dictionary replacements aren't supported in the file format, only
> > > deltas. Your use case is a replacement, not a delta. You could use the
> > > stream format instead.
> > >
> > > On Fri, Jul 23, 2021 at 8:32 AM Sam Davis <Sa...@nanoporetech.com>
> wrote:
> > > >
> > > > Hey Wes,
> > > >
> > > > Thanks, I had not spotted this before! It doesn't seem to change the
> behaviour with `pa.ipc.new_file` however. Maybe I'm using it incorrectly?
> > > >
> > > > ```
> > > > import pandas as pd
> > > > import pyarrow as pa
> > > >
> > > > print(pa.__version__)
> > > >
> > > > schema = pa.schema([
> > > > ("foo", pa.dictionary(pa.int16(), pa.string()))
> > > > ])
> > > >
> > > > pd1 = pd.DataFrame({"foo": pd.Categorical(["aaaa"],
> categories=["a"*i for i in range(64)])})
> > > > b1 = pa.RecordBatch.from_pandas(pd1, schema=schema)
> > > >
> > > > pd2 = pd.DataFrame({"foo": pd.Categorical(["aaaa"],
> categories=["b"*i for i in range(64)])})
> > > > b2 = pa.RecordBatch.from_pandas(pd2, schema=schema)
> > > >
> > > > options = pa.ipc.IpcWriteOptions(emit_dictionary_deltas=True)
> > > >
> > > > with pa.ipc.new_file("/tmp/sdavis_tmp.arrow", schema=b1.schema,
> options=options) as writer:
> > > > writer.write(b1)
> > > > writer.write(b2)
> > > > ```
> > > >
> > > > Version printed: 4.0.1
> > > >
> > > > Sam
> > > > ________________________________
> > > > From: Wes McKinney <we...@gmail.com>
> > > > Sent: 23 July 2021 14:24
> > > > To: user@arrow.apache.org <us...@arrow.apache.org>
> > > > Subject: Re: [PyArrow] DictionaryArray isDelta Support
> > > >
> > > > hi Sam
> > > >
> > > > On Fri, Jul 23, 2021 at 8:15 AM Sam Davis <
> Sam.Davis@nanoporetech.com> wrote:
> > > > >
> > > > > Hi,
> > > > >
> > > > > We want to write out RecordBatches of data, where one or more
> columns in a batch has a `pa.string()` column encoded as a
> `pa.dictionary(pa.intX(), pa.string()` as the column only contains a
> handful of unique values.
> > > > >
> > > > > However, PyArrow seems to lack support for writing these batches
> out to either the streaming or (non-streaming) file format.
> > > > >
> > > > > When attempting to write two distinct batches the following error
> message is triggered:
> > > > >
> > > > > > ArrowInvalid: Dictionary replacement detected when writing IPC
> file format. Arrow IPC files only support a single dictionary for a given
> field across all batches.
> > > > >
> > > > > I believe this message is false and that support is possible based
> on reading the spec:
> > > > >
> > > > > > Dictionaries are written in the stream and file formats as a
> sequence of record batches...
> > > > > > ...
> > > > > > The dictionary isDelta flag allows existing dictionaries to be
> expanded for future record batch materializations. A dictionary batch with
> isDelta set indicates that its vector should be concatenated with those of
> any previous batches with the same id. In a stream which encodes one
> column, the list of strings ["A", "B", "C", "B", "D", "C", "E", "A"], with
> a delta dictionary batch could take the form:
> > > > >
> > > > > ```
> > > > > <SCHEMA>
> > > > > <DICTIONARY 0>
> > > > > (0) "A"
> > > > > (1) "B"
> > > > > (2) "C"
> > > > >
> > > > > <RECORD BATCH 0>
> > > > > 0
> > > > > 1
> > > > > 2
> > > > > 1
> > > > >
> > > > > <DICTIONARY 0 DELTA>
> > > > > (3) "D"
> > > > > (4) "E"
> > > > >
> > > > > <RECORD BATCH 1>
> > > > > 3
> > > > > 2
> > > > > 4
> > > > > 0
> > > > > EOS
> > > > > ```
> > > > >
> > > > > > Alternatively, if isDelta is set to false, then the dictionary
> replaces the existing dictionary for the same ID. Using the same example as
> above, an alternate encoding could be:
> > > > >
> > > > > ```
> > > > > <SCHEMA>
> > > > > <DICTIONARY 0>
> > > > > (0) "A"
> > > > > (1) "B"
> > > > > (2) "C"
> > > > >
> > > > > <RECORD BATCH 0>
> > > > > 0
> > > > > 1
> > > > > 2
> > > > > 1
> > > > >
> > > > > <DICTIONARY 0>
> > > > > (0) "A"
> > > > > (1) "C"
> > > > > (2) "D"
> > > > > (3) "E"
> > > > >
> > > > > <RECORD BATCH 1>
> > > > > 2
> > > > > 1
> > > > > 3
> > > > > 0
> > > > > EOS
> > > > > ```
> > > > >
> > > > > It also specifies in the IPC File Format (non-streaming) section:
> > > > >
> > > > > > In the file format, there is no requirement that dictionary keys
> should be defined in a DictionaryBatch before they are used in a
> RecordBatch, as long as the keys are defined somewhere in the file. Further
> more, it is invalid to have more than one non-delta dictionary batch per
> dictionary ID (i.e. dictionary replacement is not supported). Delta
> dictionaries are applied in the order they appear in the file footer.
> > > > >
> > > > > So for the non-streaming format multiple non-delta dictionaries
> are not supported but one non-delta followed by delta dictionaries should
> be.
> > > > >
> > > > > Is it possible to do this in PyArrow? If so, how? If not, how easy
> would it be to add? Is it currently possible via C++ and therefore can I
> write a Cython or similar extension that will let me do this now without
> waiting for a release?
> > > > >
> > > >
> > > > In pyarrow (3.0.0 or later), you need to opt into emitting dictionary
> > > > deltas using pyarrow.ipc.IpcWriteOptions. Can you show your code?
> > > >
> > > >
> https://github.com/apache/arrow/commit/8d76312dd397ebe07b71531f6d23b8caa76703dc
> > > >
> > > > > Best,
> > > > >
> > > > > Sam
> > > > > IMPORTANT NOTICE: The information transmitted is intended only for
> the person or entity to which it is addressed and may contain confidential
> and/or privileged material. Any review, re-transmission, dissemination or
> other use of, or taking of any action in reliance upon, this information by
> persons or entities other than the intended recipient is prohibited. If you
> received this in error, please contact the sender and delete the material
> from any computer. Although we routinely screen for viruses, addressees
> should check this e-mail and any attachment for viruses. We make no
> warranty as to absence of viruses in this e-mail or any attachments.
> > > > IMPORTANT NOTICE: The information transmitted is intended only for
> the person or entity to which it is addressed and may contain confidential
> and/or privileged material. Any review, re-transmission, dissemination or
> other use of, or taking of any action in reliance upon, this information by
> persons or entities other than the intended recipient is prohibited. If you
> received this in error, please contact the sender and delete the material
> from any computer. Although we routinely screen for viruses, addressees
> should check this e-mail and any attachment for viruses. We make no
> warranty as to absence of viruses in this e-mail or any attachments.
> > > IMPORTANT NOTICE: The information transmitted is intended only for the
> person or entity to which it is addressed and may contain confidential
> and/or privileged material. Any review, re-transmission, dissemination or
> other use of, or taking of any action in reliance upon, this information by
> persons or entities other than the intended recipient is prohibited. If you
> received this in error, please contact the sender and delete the material
> from any computer. Although we routinely screen for viruses, addressees
> should check this e-mail and any attachment for viruses. We make no
> warranty as to absence of viruses in this e-mail or any attachments.
> > IMPORTANT NOTICE: The information transmitted is intended only for the
> person or entity to which it is addressed and may contain confidential
> and/or privileged material. Any review, re-transmission, dissemination or
> other use of, or taking of any action in reliance upon, this information by
> persons or entities other than the intended recipient is prohibited. If you
> received this in error, please contact the sender and delete the material
> from any computer. Although we routinely screen for viruses, addressees
> should check this e-mail and any attachment for viruses. We make no
> warranty as to absence of viruses in this e-mail or any attachments.
> IMPORTANT NOTICE: The information transmitted is intended only for the
> person or entity to which it is addressed and may contain confidential
> and/or privileged material. Any review, re-transmission, dissemination or
> other use of, or taking of any action in reliance upon, this information by
> persons or entities other than the intended recipient is prohibited. If you
> received this in error, please contact the sender and delete the material
> from any computer. Although we routinely screen for viruses, addressees
> should check this e-mail and any attachment for viruses. We make no
> warranty as to absence of viruses in this e-mail or any attachments.
>

Re: [PyArrow] DictionaryArray isDelta Support

Posted by Sam Davis <Sa...@nanoporetech.com>.
Following up on this, I've tried to get up to speed on the IPC File Format writing of dictionaries and I think the following would work to make the code conformant to the IPC File Format specification:

In the WriteDictionaries [1] inner loop, if a dictionary has already been output and emit_dictionary_deltas is True then it should work out which values in the new Array - containing the current dictionary values - have already been output and only emit those that haven't (by creating a new Array containing them to pass to the payload creation and writing?).

This suggests the addition of a new data structure within the WriteDictionaries loop, likely as an attribute of IpcFormatWriter like `last_dictionaries_` that contains a set of values output for each dictionary ID to enable fast lookups? (i.e. avoid O(N^2)).

Does this like a plausible implementation? If not, is there something I have missed/a better suggestion? If so, would anyone be willing to help with this?

[1] https://github.com/apache/arrow/blob/91e3ac53e2e21736ce6291d73fc37da6fa21259d/cpp/src/arrow/ipc/writer.cc#L1060

---

If it helps others, my reasoning to get to this point is as follows:

The IpcFormatWriter handles writing record batches to the IPC format
(non-streaming)

https://github.com/apache/arrow/blob/91e3ac53e2e21736ce6291d73fc37da6fa21259d/cpp/src/arrow/ipc/writer.cc#L982

```
class ARROW_EXPORT IpcFormatWriter : public RecordBatchWriter {
 public:
  // A RecordBatchWriter implementation that writes to a IpcPayloadWriter.
```

Writing out a table is - ignoring unified dictionaries - iterating over
RecordBatchs and writing them out using `WriteRecordBatch`.

https://github.com/apache/arrow/blob/91e3ac53e2e21736ce6291d73fc37da6fa21259d/cpp/src/arrow/ipc/writer.cc#L1002

```
  Status WriteRecordBatch(const RecordBatch& batch) override {
```

One of the first things this does is to write out the dictionaties contained
within the batch:

https://github.com/apache/arrow/blob/91e3ac53e2e21736ce6291d73fc37da6fa21259d/cpp/src/arrow/ipc/writer.cc#L1009

```
    RETURN_NOT_OK(WriteDictionaries(batch));
```

This is what we will need to edit to ensure that dictionary writing is handled
correctly.

Recall that a RecordBatch is a "collection of equal-length arrays matching a
particular Schema. A record batch is a table-like data structure that is
semantically a sequence of fields, each a contiguous Arrow array".

Each of the fields can be of dictionary type (which can also be nested), so
there may be more one than one dictionary to write for a batch.

WriteDictionaries is defined further down:

https://github.com/apache/arrow/blob/91e3ac53e2e21736ce6291d73fc37da6fa21259d/cpp/src/arrow/ipc/writer.cc#L1056

```
  Status WriteDictionaries(const RecordBatch& batch) {
```

The first thing it does is call CollectDictionaries:

```
    ARROW_ASSIGN_OR_RAISE(const auto dictionaries, CollectDictionaries(batch, mapper_));
```

`mapper_` is a DictionaryFieldMapper (recall: maps FieldPaths to dictionary
IDs)

CollectDictionaries is declared in the header here:

https://github.com/apache/arrow/blob/6b0248d5b5502a1cf76d5853d15be9844b84522c/cpp/src/arrow/ipc/dictionary.h#L150

```
// For writing: collect dictionary entries to write to the IPC stream, in order
// (i.e. inner dictionaries before dependent outer dictionaries).
ARROW_EXPORT
Result<DictionaryVector> CollectDictionaries(const RecordBatch& batch,
                                             const DictionaryFieldMapper& mapper);
```

and defined here:

https://github.com/apache/arrow/blob/6b0248d5b5502a1cf76d5853d15be9844b84522c/cpp/src/arrow/ipc/dictionary.cc#L383

```
Result<DictionaryVector> CollectDictionaries(const RecordBatch& batch,
                                             const DictionaryFieldMapper& mapper) {
  DictionaryCollector collector{mapper, {}};
  RETURN_NOT_OK(collector.Collect(batch));
  return std::move(collector.dictionaries_);
}
```

It creates a DictionaryCollector, passing in the mapper, and then calls Collect
and returning the result.

The DictionaryCollector is defined here:

https://github.com/apache/arrow/blob/6b0248d5b5502a1cf76d5853d15be9844b84522c/cpp/src/arrow/ipc/dictionary.cc#L297

```
struct DictionaryCollector {
  const DictionaryFieldMapper& mapper_;
  DictionaryVector dictionaries_;
```

with `Collect` defined here:

https://github.com/apache/arrow/blob/6b0248d5b5502a1cf76d5853d15be9844b84522c/cpp/src/arrow/ipc/dictionary.cc#L336

```
  Status Collect(const RecordBatch& batch) {
    FieldPosition position;
    const Schema& schema = *batch.schema();
    dictionaries_.reserve(mapper_.num_fields());

    for (int i = 0; i < schema.num_fields(); ++i) {
      RETURN_NOT_OK(Visit(position.child(i), schema.field(i),
batch.column(i).get()));
    }
    return Status::OK();
  }
```

FieldPosition is, it seems, a reverse LinkedList (each node stores a pointer to
its parent) where the nodes record their index (like FieldPath) and depth. It
has a method to walk the list and construct a vector containing the indices.

It then grabs a reference to the schema and reserves enough space in
dictionaries - recall: a vector of tuples (int, Array) - for all the fields
(optimisation to reduce allocations?).

Then for each field in the schema, it calls `Visit` with a new FieldPosition
(via the call to `position.child(i)`, the field at position i in the schema,
and the RecordBatch data.

Visit is defined here:

https://github.com/apache/arrow/blob/6b0248d5b5502a1cf76d5853d15be9844b84522c/cpp/src/arrow/ipc/dictionary.cc#L310

```
  Status Visit(const FieldPosition& position, const std::shared_ptr<Field>& field, const Array* array) {
```

The result of this is that `dictionaries_` contains all the tuples of (id,
Array) where array contains the dictionary values (e.g. strings).

Going right back up the stack to `WriteDictionaries` we now iterate over each
of these dictionaries:

https://github.com/apache/arrow/blob/91e3ac53e2e21736ce6291d73fc37da6fa21259d/cpp/src/arrow/ipc/writer.cc#L1060

```
    for (const auto& pair : dictionaries) {
      int64_t dictionary_id = pair.first;
      const auto& dictionary = pair.second;
```

If a dictionary with this ID has already been output, then the code checks
whether it is the same to avoid writing repeats twice.

```
      // If a dictionary with this id was already emitted, check if it was the same.
      auto* last_dictionary = &last_dictionaries_[dictionary_id];
      const bool dictionary_exists = (*last_dictionary != nullptr);
      int64_t delta_start = 0;
      if (dictionary_exists) {
        if ((*last_dictionary)->data() == dictionary->data()) {
          // Fast shortcut for a common case.
          // Same dictionary data by pointer => no need to emit it again
          continue;
        }
        const int64_t last_length = (*last_dictionary)->length();
        const int64_t new_length = dictionary->length();
        if (new_length == last_length &&
            ((*last_dictionary)->Equals(dictionary, equal_options))) {
          // Same dictionary by value => no need to emit it again
          // (while this can have a CPU cost, this code path is required
          //  for the IPC file format)
          continue;
        }
```

If either of these equality conditions trigger, then we continue on to check
the next dictionary.

If it hasn't been previously output then we might either be able to write a
replacement or a delta.

Currently the code throws an error in both of these cases if we're writing to
the IPC file format - this goes against the spec.

If the current dictionary is "just" an extension of the previous dictionary and
deltas are enabled then it marks where the new starting position is.

Given this, it creates a payload - delta or not - and writes it, ensuring to
update the stats data structure.

```
      IpcPayload payload;
      if (delta_start) {
        RETURN_NOT_OK(GetDictionaryPayload(dictionary_id, /*is_delta=*/true,
                                           dictionary->Slice(delta_start), options_,
                                           &payload));
      } else {
        RETURN_NOT_OK(
            GetDictionaryPayload(dictionary_id, dictionary, options_, &payload));
      }
      RETURN_NOT_OK(WritePayload(payload));
      ++stats_.num_dictionary_batches;
      if (dictionary_exists) {
        if (delta_start) {
          ++stats_.num_dictionary_deltas;
        } else {
          ++stats_.num_replaced_dictionaries;
        }
      }
```
[https://opengraph.githubassets.com/4c8e8fc44775a5fdec5f2e872f85c4e1e308871726e6b34b91cd52894bbe3ed7/apache/arrow]<https://github.com/apache/arrow/blob/91e3ac53e2e21736ce6291d73fc37da6fa21259d/cpp/src/arrow/ipc/writer.cc#L1060>
arrow/writer.cc at 91e3ac53e2e21736ce6291d73fc37da6fa21259d * apache/arrow<https://github.com/apache/arrow/blob/91e3ac53e2e21736ce6291d73fc37da6fa21259d/cpp/src/arrow/ipc/writer.cc#L1060>
Apache Arrow is a multi-language toolbox for accelerated data interchange and in-memory processing - arrow/writer.cc at 91e3ac53e2e21736ce6291d73fc37da6fa21259d * apache/arrow
github.com
?

________________________________
From: Weston Pace <we...@gmail.com>
Sent: 27 July 2021 19:52
To: user@arrow.apache.org <us...@arrow.apache.org>
Subject: Re: [PyArrow] DictionaryArray isDelta Support

I'm able to verify what Sam is seeing. It appears Arrow does not
support dictionary deltas in the file format. However, from my
reading of the spec it does indeed seem proper deltas should be
allowed. A small patch[1] allowed me to write delta dictionaries in
the file format but then reading fails at [2] which seems more
explicit that dictionary deltas of any kind were not originally
supported with the file format. I think the fix for read will
probably be a bit more involved and require some good testing. I've
opened ARROW-13467 for further discussion.

[1] https://github.com/apache/arrow/commit/54df8581a9d825664785fc406950204e345a5b3b<https://github.com/apache/arrow/commit/54df8581a9d825664785fc406950204e345a5b3b>
[2] https://github.com/apache/arrow/blob/81ff679c47754692224f655dab32cc0936bb5f55/cpp/src/arrow/ipc/reader.cc#L1062<https://github.com/apache/arrow/blob/81ff679c47754692224f655dab32cc0936bb5f55/cpp/src/arrow/ipc/reader.cc#L1062>
[3] https://issues.apache.org/jira/browse/ARROW-13467<https://issues.apache.org/jira/browse/ARROW-13467>

On Sun, Jul 25, 2021 at 10:12 PM Sam Davis <Sa...@nanoporetech.com> wrote:
>
> Hi Wes,
>
> Yes, that is exactly it. For the file format, the spec dictates that it should be possible to output deltas but currently this is not possible. An `ArrowInvalid` error is thrown.
>
> Example code:
>
> ```
> import pandas as pd
> import pyarrow as pa
>
> print(pa.__version__)
>
> schema = pa.schema([
> ("foo", pa.dictionary(pa.int16(), pa.string()))
> ])
>
> pd1 = pd.DataFrame({"foo": pd.Categorical(["a"], categories=["a", "b"])})
> b1 = pa.RecordBatch.from_pandas(pd1, schema=schema)
>
> pd2 = pd.DataFrame({"foo": pd.Categorical(["a", "bbbb"], categories=["a", "b", "bbbb"])})
> b2 = pa.RecordBatch.from_pandas(pd2, schema=schema)
>
> options = pa.ipc.IpcWriteOptions(emit_dictionary_deltas=True)
>
> with pa.ipc.new_file("/tmp/sdavis_tmp.arrow", schema=schema, options=options) as writer:
> writer.write(b1)
>
> writer.write(b2)
> ```
>
> Best,
>
> Sam
> ________________________________
> From: Wes McKinney <we...@gmail.com>
> Sent: 24 July 2021 01:43
> To: user@arrow.apache.org <us...@arrow.apache.org>
> Subject: Re: [PyArrow] DictionaryArray isDelta Support
>
> If I'm interpreting you correctly, the issue is that every dictionary
> must be a prefix of a common dictionary for the delta logic to work.
> So if the first batch has
>
> "a", "b"
>
> then in the next batch
>
> "a", "b", "c" is OK and will emit a delta
> "b", "a", "c" is not and will trigger this error
>
> If we wanted to allow for deltas coming from unordered dictionaries as
> an option, that could be implemented in theory but it not super
> trivial
>
> On Fri, Jul 23, 2021 at 9:25 AM Sam Davis <Sa...@nanoporetech.com> wrote:
> >
> > For reference, I think this check in the C++ code triggers regardless of whether the delta option is turned on:
> >
> > https://github.com/apache/arrow/blob/e0401123736c85283e527797a113a3c38c0915f2/cpp/src/arrow/ipc/writer.cc#L1066<https://github.com/apache/arrow/blob/e0401123736c85283e527797a113a3c38c0915f2/cpp/src/arrow/ipc/writer.cc#L1066>
> > ________________________________
> > From: Sam Davis <Sa...@nanoporetech.com>
> > Sent: 23 July 2021 14:43
> > To: user@arrow.apache.org <us...@arrow.apache.org>
> > Subject: Re: [PyArrow] DictionaryArray isDelta Support
> >
> > Yes I know this as quoted in the spec. What I am wondering is for the file format how can I write deltas out using PyArrow?
> >
> > The previous example was a trivial version of reality.
> >
> > More concretely, say I want to write 100e6 rows out in multiple RecordBatches to a non-streaming file format using PyArrow. I do not want to do a complete pass ahead of time to compute the full set of strings for the relevant columns and would therefore like to dump out deltas when new strings appear in a given column. Is this possible?
> >
> > In the example code ideally this would "just" add on the delta containing the dictionary difference of it and the previous batches. I'm happy as a user to maintain the full set of categories seen thus far and tell PyArrow what the delta is if necessary.
> > ________________________________
> > From: Wes McKinney <we...@gmail.com>
> > Sent: 23 July 2021 14:36
> > To: user@arrow.apache.org <us...@arrow.apache.org>
> > Subject: Re: [PyArrow] DictionaryArray isDelta Support
> >
> > Dictionary replacements aren't supported in the file format, only
> > deltas. Your use case is a replacement, not a delta. You could use the
> > stream format instead.
> >
> > On Fri, Jul 23, 2021 at 8:32 AM Sam Davis <Sa...@nanoporetech.com> wrote:
> > >
> > > Hey Wes,
> > >
> > > Thanks, I had not spotted this before! It doesn't seem to change the behaviour with `pa.ipc.new_file` however. Maybe I'm using it incorrectly?
> > >
> > > ```
> > > import pandas as pd
> > > import pyarrow as pa
> > >
> > > print(pa.__version__)
> > >
> > > schema = pa.schema([
> > > ("foo", pa.dictionary(pa.int16(), pa.string()))
> > > ])
> > >
> > > pd1 = pd.DataFrame({"foo": pd.Categorical(["aaaa"], categories=["a"*i for i in range(64)])})
> > > b1 = pa.RecordBatch.from_pandas(pd1, schema=schema)
> > >
> > > pd2 = pd.DataFrame({"foo": pd.Categorical(["aaaa"], categories=["b"*i for i in range(64)])})
> > > b2 = pa.RecordBatch.from_pandas(pd2, schema=schema)
> > >
> > > options = pa.ipc.IpcWriteOptions(emit_dictionary_deltas=True)
> > >
> > > with pa.ipc.new_file("/tmp/sdavis_tmp.arrow", schema=b1.schema, options=options) as writer:
> > > writer.write(b1)
> > > writer.write(b2)
> > > ```
> > >
> > > Version printed: 4.0.1
> > >
> > > Sam
> > > ________________________________
> > > From: Wes McKinney <we...@gmail.com>
> > > Sent: 23 July 2021 14:24
> > > To: user@arrow.apache.org <us...@arrow.apache.org>
> > > Subject: Re: [PyArrow] DictionaryArray isDelta Support
> > >
> > > hi Sam
> > >
> > > On Fri, Jul 23, 2021 at 8:15 AM Sam Davis <Sa...@nanoporetech.com> wrote:
> > > >
> > > > Hi,
> > > >
> > > > We want to write out RecordBatches of data, where one or more columns in a batch has a `pa.string()` column encoded as a `pa.dictionary(pa.intX(), pa.string()` as the column only contains a handful of unique values.
> > > >
> > > > However, PyArrow seems to lack support for writing these batches out to either the streaming or (non-streaming) file format.
> > > >
> > > > When attempting to write two distinct batches the following error message is triggered:
> > > >
> > > > > ArrowInvalid: Dictionary replacement detected when writing IPC file format. Arrow IPC files only support a single dictionary for a given field across all batches.
> > > >
> > > > I believe this message is false and that support is possible based on reading the spec:
> > > >
> > > > > Dictionaries are written in the stream and file formats as a sequence of record batches...
> > > > > ...
> > > > > The dictionary isDelta flag allows existing dictionaries to be expanded for future record batch materializations. A dictionary batch with isDelta set indicates that its vector should be concatenated with those of any previous batches with the same id. In a stream which encodes one column, the list of strings ["A", "B", "C", "B", "D", "C", "E", "A"], with a delta dictionary batch could take the form:
> > > >
> > > > ```
> > > > <SCHEMA>
> > > > <DICTIONARY 0>
> > > > (0) "A"
> > > > (1) "B"
> > > > (2) "C"
> > > >
> > > > <RECORD BATCH 0>
> > > > 0
> > > > 1
> > > > 2
> > > > 1
> > > >
> > > > <DICTIONARY 0 DELTA>
> > > > (3) "D"
> > > > (4) "E"
> > > >
> > > > <RECORD BATCH 1>
> > > > 3
> > > > 2
> > > > 4
> > > > 0
> > > > EOS
> > > > ```
> > > >
> > > > > Alternatively, if isDelta is set to false, then the dictionary replaces the existing dictionary for the same ID. Using the same example as above, an alternate encoding could be:
> > > >
> > > > ```
> > > > <SCHEMA>
> > > > <DICTIONARY 0>
> > > > (0) "A"
> > > > (1) "B"
> > > > (2) "C"
> > > >
> > > > <RECORD BATCH 0>
> > > > 0
> > > > 1
> > > > 2
> > > > 1
> > > >
> > > > <DICTIONARY 0>
> > > > (0) "A"
> > > > (1) "C"
> > > > (2) "D"
> > > > (3) "E"
> > > >
> > > > <RECORD BATCH 1>
> > > > 2
> > > > 1
> > > > 3
> > > > 0
> > > > EOS
> > > > ```
> > > >
> > > > It also specifies in the IPC File Format (non-streaming) section:
> > > >
> > > > > In the file format, there is no requirement that dictionary keys should be defined in a DictionaryBatch before they are used in a RecordBatch, as long as the keys are defined somewhere in the file. Further more, it is invalid to have more than one non-delta dictionary batch per dictionary ID (i.e. dictionary replacement is not supported). Delta dictionaries are applied in the order they appear in the file footer.
> > > >
> > > > So for the non-streaming format multiple non-delta dictionaries are not supported but one non-delta followed by delta dictionaries should be.
> > > >
> > > > Is it possible to do this in PyArrow? If so, how? If not, how easy would it be to add? Is it currently possible via C++ and therefore can I write a Cython or similar extension that will let me do this now without waiting for a release?
> > > >
> > >
> > > In pyarrow (3.0.0 or later), you need to opt into emitting dictionary
> > > deltas using pyarrow.ipc.IpcWriteOptions. Can you show your code?
> > >
> > > https://github.com/apache/arrow/commit/8d76312dd397ebe07b71531f6d23b8caa76703dc<https://github.com/apache/arrow/commit/8d76312dd397ebe07b71531f6d23b8caa76703dc>
> > >
> > > > Best,
> > > >
> > > > Sam
> > > > IMPORTANT NOTICE: The information transmitted is intended only for the person or entity to which it is addressed and may contain confidential and/or privileged material. Any review, re-transmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and delete the material from any computer. Although we routinely screen for viruses, addressees should check this e-mail and any attachment for viruses. We make no warranty as to absence of viruses in this e-mail or any attachments.
> > > IMPORTANT NOTICE: The information transmitted is intended only for the person or entity to which it is addressed and may contain confidential and/or privileged material. Any review, re-transmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and delete the material from any computer. Although we routinely screen for viruses, addressees should check this e-mail and any attachment for viruses. We make no warranty as to absence of viruses in this e-mail or any attachments.
> > IMPORTANT NOTICE: The information transmitted is intended only for the person or entity to which it is addressed and may contain confidential and/or privileged material. Any review, re-transmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and delete the material from any computer. Although we routinely screen for viruses, addressees should check this e-mail and any attachment for viruses. We make no warranty as to absence of viruses in this e-mail or any attachments.
> IMPORTANT NOTICE: The information transmitted is intended only for the person or entity to which it is addressed and may contain confidential and/or privileged material. Any review, re-transmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and delete the material from any computer. Although we routinely screen for viruses, addressees should check this e-mail and any attachment for viruses. We make no warranty as to absence of viruses in this e-mail or any attachments.
IMPORTANT NOTICE: The information transmitted is intended only for the person or entity to which it is addressed and may contain confidential and/or privileged material. Any review, re-transmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and delete the material from any computer. Although we routinely screen for viruses, addressees should check this e-mail and any attachment for viruses. We make no warranty as to absence of viruses in this e-mail or any attachments.

Re: [PyArrow] DictionaryArray isDelta Support

Posted by Weston Pace <we...@gmail.com>.
I'm able to verify what Sam is seeing.  It appears Arrow does not
support dictionary deltas in the file format.  However, from my
reading of the spec it does indeed seem proper deltas should be
allowed.  A small patch[1] allowed me to write delta dictionaries in
the file format but then reading fails at [2] which seems more
explicit that dictionary deltas of any kind were not originally
supported with the file format.  I think the fix for read will
probably be a bit more involved and require some good testing.  I've
opened ARROW-13467 for further discussion.

[1] https://github.com/apache/arrow/commit/54df8581a9d825664785fc406950204e345a5b3b
[2] https://github.com/apache/arrow/blob/81ff679c47754692224f655dab32cc0936bb5f55/cpp/src/arrow/ipc/reader.cc#L1062
[3] https://issues.apache.org/jira/browse/ARROW-13467

On Sun, Jul 25, 2021 at 10:12 PM Sam Davis <Sa...@nanoporetech.com> wrote:
>
> Hi Wes,
>
> Yes, that is exactly it. For the file format, the spec dictates that it should be possible to output deltas but currently this is not possible. An `ArrowInvalid` error is thrown.
>
> Example code:
>
> ```
> import pandas as pd
> import pyarrow as pa
>
> print(pa.__version__)
>
> schema = pa.schema([
>     ("foo", pa.dictionary(pa.int16(), pa.string()))
> ])
>
> pd1 = pd.DataFrame({"foo": pd.Categorical(["a"], categories=["a", "b"])})
> b1 = pa.RecordBatch.from_pandas(pd1, schema=schema)
>
> pd2 = pd.DataFrame({"foo": pd.Categorical(["a", "bbbb"], categories=["a", "b", "bbbb"])})
> b2 = pa.RecordBatch.from_pandas(pd2, schema=schema)
>
> options = pa.ipc.IpcWriteOptions(emit_dictionary_deltas=True)
>
> with pa.ipc.new_file("/tmp/sdavis_tmp.arrow", schema=schema, options=options) as writer:
>     writer.write(b1)
>
>     writer.write(b2)
> ```
>
> Best,
>
> Sam
> ________________________________
> From: Wes McKinney <we...@gmail.com>
> Sent: 24 July 2021 01:43
> To: user@arrow.apache.org <us...@arrow.apache.org>
> Subject: Re: [PyArrow] DictionaryArray isDelta Support
>
> If I'm interpreting you correctly, the issue is that every dictionary
> must be a prefix of a common dictionary for the delta logic to work.
> So if the first batch has
>
> "a", "b"
>
> then in the next batch
>
> "a", "b", "c" is OK and will emit a delta
> "b", "a", "c" is not and will trigger this error
>
> If we wanted to allow for deltas coming from unordered dictionaries as
> an option, that could be implemented in theory but it not super
> trivial
>
> On Fri, Jul 23, 2021 at 9:25 AM Sam Davis <Sa...@nanoporetech.com> wrote:
> >
> > For reference, I think this check in the C++ code triggers regardless of whether the delta option is turned on:
> >
> > https://github.com/apache/arrow/blob/e0401123736c85283e527797a113a3c38c0915f2/cpp/src/arrow/ipc/writer.cc#L1066
> > ________________________________
> > From: Sam Davis <Sa...@nanoporetech.com>
> > Sent: 23 July 2021 14:43
> > To: user@arrow.apache.org <us...@arrow.apache.org>
> > Subject: Re: [PyArrow] DictionaryArray isDelta Support
> >
> > Yes I know this as quoted in the spec. What I am wondering is for the file format how can I write deltas out using PyArrow?
> >
> > The previous example was a trivial version of reality.
> >
> > More concretely, say I want to write 100e6 rows out in multiple RecordBatches to a non-streaming file format using PyArrow. I do not want to do a complete pass ahead of time to compute the full set of strings for the relevant columns and would therefore like to dump out deltas when new strings appear in a given column. Is this possible?
> >
> > In the example code ideally this would "just" add on the delta containing the dictionary difference of it and the previous batches. I'm happy as a user to maintain the full set of categories seen thus far and tell PyArrow what the delta is if necessary.
> > ________________________________
> > From: Wes McKinney <we...@gmail.com>
> > Sent: 23 July 2021 14:36
> > To: user@arrow.apache.org <us...@arrow.apache.org>
> > Subject: Re: [PyArrow] DictionaryArray isDelta Support
> >
> > Dictionary replacements aren't supported in the file format, only
> > deltas. Your use case is a replacement, not a delta. You could use the
> > stream format instead.
> >
> > On Fri, Jul 23, 2021 at 8:32 AM Sam Davis <Sa...@nanoporetech.com> wrote:
> > >
> > > Hey Wes,
> > >
> > > Thanks, I had not spotted this before! It doesn't seem to change the behaviour with `pa.ipc.new_file` however. Maybe I'm using it incorrectly?
> > >
> > > ```
> > > import pandas as pd
> > > import pyarrow as pa
> > >
> > > print(pa.__version__)
> > >
> > > schema = pa.schema([
> > > ("foo", pa.dictionary(pa.int16(), pa.string()))
> > > ])
> > >
> > > pd1 = pd.DataFrame({"foo": pd.Categorical(["aaaa"], categories=["a"*i for i in range(64)])})
> > > b1 = pa.RecordBatch.from_pandas(pd1, schema=schema)
> > >
> > > pd2 = pd.DataFrame({"foo": pd.Categorical(["aaaa"], categories=["b"*i for i in range(64)])})
> > > b2 = pa.RecordBatch.from_pandas(pd2, schema=schema)
> > >
> > > options = pa.ipc.IpcWriteOptions(emit_dictionary_deltas=True)
> > >
> > > with pa.ipc.new_file("/tmp/sdavis_tmp.arrow", schema=b1.schema, options=options) as writer:
> > > writer.write(b1)
> > > writer.write(b2)
> > > ```
> > >
> > > Version printed: 4.0.1
> > >
> > > Sam
> > > ________________________________
> > > From: Wes McKinney <we...@gmail.com>
> > > Sent: 23 July 2021 14:24
> > > To: user@arrow.apache.org <us...@arrow.apache.org>
> > > Subject: Re: [PyArrow] DictionaryArray isDelta Support
> > >
> > > hi Sam
> > >
> > > On Fri, Jul 23, 2021 at 8:15 AM Sam Davis <Sa...@nanoporetech.com> wrote:
> > > >
> > > > Hi,
> > > >
> > > > We want to write out RecordBatches of data, where one or more columns in a batch has a `pa.string()` column encoded as a `pa.dictionary(pa.intX(), pa.string()` as the column only contains a handful of unique values.
> > > >
> > > > However, PyArrow seems to lack support for writing these batches out to either the streaming or (non-streaming) file format.
> > > >
> > > > When attempting to write two distinct batches the following error message is triggered:
> > > >
> > > > > ArrowInvalid: Dictionary replacement detected when writing IPC file format. Arrow IPC files only support a single dictionary for a given field across all batches.
> > > >
> > > > I believe this message is false and that support is possible based on reading the spec:
> > > >
> > > > > Dictionaries are written in the stream and file formats as a sequence of record batches...
> > > > > ...
> > > > > The dictionary isDelta flag allows existing dictionaries to be expanded for future record batch materializations. A dictionary batch with isDelta set indicates that its vector should be concatenated with those of any previous batches with the same id. In a stream which encodes one column, the list of strings ["A", "B", "C", "B", "D", "C", "E", "A"], with a delta dictionary batch could take the form:
> > > >
> > > > ```
> > > > <SCHEMA>
> > > > <DICTIONARY 0>
> > > > (0) "A"
> > > > (1) "B"
> > > > (2) "C"
> > > >
> > > > <RECORD BATCH 0>
> > > > 0
> > > > 1
> > > > 2
> > > > 1
> > > >
> > > > <DICTIONARY 0 DELTA>
> > > > (3) "D"
> > > > (4) "E"
> > > >
> > > > <RECORD BATCH 1>
> > > > 3
> > > > 2
> > > > 4
> > > > 0
> > > > EOS
> > > > ```
> > > >
> > > > > Alternatively, if isDelta is set to false, then the dictionary replaces the existing dictionary for the same ID. Using the same example as above, an alternate encoding could be:
> > > >
> > > > ```
> > > > <SCHEMA>
> > > > <DICTIONARY 0>
> > > > (0) "A"
> > > > (1) "B"
> > > > (2) "C"
> > > >
> > > > <RECORD BATCH 0>
> > > > 0
> > > > 1
> > > > 2
> > > > 1
> > > >
> > > > <DICTIONARY 0>
> > > > (0) "A"
> > > > (1) "C"
> > > > (2) "D"
> > > > (3) "E"
> > > >
> > > > <RECORD BATCH 1>
> > > > 2
> > > > 1
> > > > 3
> > > > 0
> > > > EOS
> > > > ```
> > > >
> > > > It also specifies in the IPC File Format (non-streaming) section:
> > > >
> > > > > In the file format, there is no requirement that dictionary keys should be defined in a DictionaryBatch before they are used in a RecordBatch, as long as the keys are defined somewhere in the file. Further more, it is invalid to have more than one non-delta dictionary batch per dictionary ID (i.e. dictionary replacement is not supported). Delta dictionaries are applied in the order they appear in the file footer.
> > > >
> > > > So for the non-streaming format multiple non-delta dictionaries are not supported but one non-delta followed by delta dictionaries should be.
> > > >
> > > > Is it possible to do this in PyArrow? If so, how? If not, how easy would it be to add? Is it currently possible via C++ and therefore can I write a Cython or similar extension that will let me do this now without waiting for a release?
> > > >
> > >
> > > In pyarrow (3.0.0 or later), you need to opt into emitting dictionary
> > > deltas using pyarrow.ipc.IpcWriteOptions. Can you show your code?
> > >
> > > https://github.com/apache/arrow/commit/8d76312dd397ebe07b71531f6d23b8caa76703dc
> > >
> > > > Best,
> > > >
> > > > Sam
> > > > IMPORTANT NOTICE: The information transmitted is intended only for the person or entity to which it is addressed and may contain confidential and/or privileged material. Any review, re-transmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and delete the material from any computer. Although we routinely screen for viruses, addressees should check this e-mail and any attachment for viruses. We make no warranty as to absence of viruses in this e-mail or any attachments.
> > > IMPORTANT NOTICE: The information transmitted is intended only for the person or entity to which it is addressed and may contain confidential and/or privileged material. Any review, re-transmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and delete the material from any computer. Although we routinely screen for viruses, addressees should check this e-mail and any attachment for viruses. We make no warranty as to absence of viruses in this e-mail or any attachments.
> > IMPORTANT NOTICE: The information transmitted is intended only for the person or entity to which it is addressed and may contain confidential and/or privileged material. Any review, re-transmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and delete the material from any computer. Although we routinely screen for viruses, addressees should check this e-mail and any attachment for viruses. We make no warranty as to absence of viruses in this e-mail or any attachments.
> IMPORTANT NOTICE: The information transmitted is intended only for the person or entity to which it is addressed and may contain confidential and/or privileged material. Any review, re-transmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and delete the material from any computer. Although we routinely screen for viruses, addressees should check this e-mail and any attachment for viruses. We make no warranty as to absence of viruses in this e-mail or any attachments.

Re: [PyArrow] DictionaryArray isDelta Support

Posted by Sam Davis <Sa...@nanoporetech.com>.
Hi Wes,

Yes, that is exactly it. For the file format, the spec dictates that it should be possible to output deltas but currently this is not possible. An `ArrowInvalid` error is thrown.

Example code:

```
import pandas as pd
import pyarrow as pa

print(pa.__version__)

schema = pa.schema([
    ("foo", pa.dictionary(pa.int16(), pa.string()))
])

pd1 = pd.DataFrame({"foo": pd.Categorical(["a"], categories=["a", "b"])})
b1 = pa.RecordBatch.from_pandas(pd1, schema=schema)

pd2 = pd.DataFrame({"foo": pd.Categorical(["a", "bbbb"], categories=["a", "b", "bbbb"])})
b2 = pa.RecordBatch.from_pandas(pd2, schema=schema)

options = pa.ipc.IpcWriteOptions(emit_dictionary_deltas=True)

with pa.ipc.new_file("/tmp/sdavis_tmp.arrow", schema=schema, options=options) as writer:
    writer.write(b1)

    writer.write(b2)
```

Best,

Sam
________________________________
From: Wes McKinney <we...@gmail.com>
Sent: 24 July 2021 01:43
To: user@arrow.apache.org <us...@arrow.apache.org>
Subject: Re: [PyArrow] DictionaryArray isDelta Support

If I'm interpreting you correctly, the issue is that every dictionary
must be a prefix of a common dictionary for the delta logic to work.
So if the first batch has

"a", "b"

then in the next batch

"a", "b", "c" is OK and will emit a delta
"b", "a", "c" is not and will trigger this error

If we wanted to allow for deltas coming from unordered dictionaries as
an option, that could be implemented in theory but it not super
trivial

On Fri, Jul 23, 2021 at 9:25 AM Sam Davis <Sa...@nanoporetech.com> wrote:
>
> For reference, I think this check in the C++ code triggers regardless of whether the delta option is turned on:
>
> https://github.com/apache/arrow/blob/e0401123736c85283e527797a113a3c38c0915f2/cpp/src/arrow/ipc/writer.cc#L1066<https://github.com/apache/arrow/blob/e0401123736c85283e527797a113a3c38c0915f2/cpp/src/arrow/ipc/writer.cc#L1066>
> ________________________________
> From: Sam Davis <Sa...@nanoporetech.com>
> Sent: 23 July 2021 14:43
> To: user@arrow.apache.org <us...@arrow.apache.org>
> Subject: Re: [PyArrow] DictionaryArray isDelta Support
>
> Yes I know this as quoted in the spec. What I am wondering is for the file format how can I write deltas out using PyArrow?
>
> The previous example was a trivial version of reality.
>
> More concretely, say I want to write 100e6 rows out in multiple RecordBatches to a non-streaming file format using PyArrow. I do not want to do a complete pass ahead of time to compute the full set of strings for the relevant columns and would therefore like to dump out deltas when new strings appear in a given column. Is this possible?
>
> In the example code ideally this would "just" add on the delta containing the dictionary difference of it and the previous batches. I'm happy as a user to maintain the full set of categories seen thus far and tell PyArrow what the delta is if necessary.
> ________________________________
> From: Wes McKinney <we...@gmail.com>
> Sent: 23 July 2021 14:36
> To: user@arrow.apache.org <us...@arrow.apache.org>
> Subject: Re: [PyArrow] DictionaryArray isDelta Support
>
> Dictionary replacements aren't supported in the file format, only
> deltas. Your use case is a replacement, not a delta. You could use the
> stream format instead.
>
> On Fri, Jul 23, 2021 at 8:32 AM Sam Davis <Sa...@nanoporetech.com> wrote:
> >
> > Hey Wes,
> >
> > Thanks, I had not spotted this before! It doesn't seem to change the behaviour with `pa.ipc.new_file` however. Maybe I'm using it incorrectly?
> >
> > ```
> > import pandas as pd
> > import pyarrow as pa
> >
> > print(pa.__version__)
> >
> > schema = pa.schema([
> > ("foo", pa.dictionary(pa.int16(), pa.string()))
> > ])
> >
> > pd1 = pd.DataFrame({"foo": pd.Categorical(["aaaa"], categories=["a"*i for i in range(64)])})
> > b1 = pa.RecordBatch.from_pandas(pd1, schema=schema)
> >
> > pd2 = pd.DataFrame({"foo": pd.Categorical(["aaaa"], categories=["b"*i for i in range(64)])})
> > b2 = pa.RecordBatch.from_pandas(pd2, schema=schema)
> >
> > options = pa.ipc.IpcWriteOptions(emit_dictionary_deltas=True)
> >
> > with pa.ipc.new_file("/tmp/sdavis_tmp.arrow", schema=b1.schema, options=options) as writer:
> > writer.write(b1)
> > writer.write(b2)
> > ```
> >
> > Version printed: 4.0.1
> >
> > Sam
> > ________________________________
> > From: Wes McKinney <we...@gmail.com>
> > Sent: 23 July 2021 14:24
> > To: user@arrow.apache.org <us...@arrow.apache.org>
> > Subject: Re: [PyArrow] DictionaryArray isDelta Support
> >
> > hi Sam
> >
> > On Fri, Jul 23, 2021 at 8:15 AM Sam Davis <Sa...@nanoporetech.com> wrote:
> > >
> > > Hi,
> > >
> > > We want to write out RecordBatches of data, where one or more columns in a batch has a `pa.string()` column encoded as a `pa.dictionary(pa.intX(), pa.string()` as the column only contains a handful of unique values.
> > >
> > > However, PyArrow seems to lack support for writing these batches out to either the streaming or (non-streaming) file format.
> > >
> > > When attempting to write two distinct batches the following error message is triggered:
> > >
> > > > ArrowInvalid: Dictionary replacement detected when writing IPC file format. Arrow IPC files only support a single dictionary for a given field across all batches.
> > >
> > > I believe this message is false and that support is possible based on reading the spec:
> > >
> > > > Dictionaries are written in the stream and file formats as a sequence of record batches...
> > > > ...
> > > > The dictionary isDelta flag allows existing dictionaries to be expanded for future record batch materializations. A dictionary batch with isDelta set indicates that its vector should be concatenated with those of any previous batches with the same id. In a stream which encodes one column, the list of strings ["A", "B", "C", "B", "D", "C", "E", "A"], with a delta dictionary batch could take the form:
> > >
> > > ```
> > > <SCHEMA>
> > > <DICTIONARY 0>
> > > (0) "A"
> > > (1) "B"
> > > (2) "C"
> > >
> > > <RECORD BATCH 0>
> > > 0
> > > 1
> > > 2
> > > 1
> > >
> > > <DICTIONARY 0 DELTA>
> > > (3) "D"
> > > (4) "E"
> > >
> > > <RECORD BATCH 1>
> > > 3
> > > 2
> > > 4
> > > 0
> > > EOS
> > > ```
> > >
> > > > Alternatively, if isDelta is set to false, then the dictionary replaces the existing dictionary for the same ID. Using the same example as above, an alternate encoding could be:
> > >
> > > ```
> > > <SCHEMA>
> > > <DICTIONARY 0>
> > > (0) "A"
> > > (1) "B"
> > > (2) "C"
> > >
> > > <RECORD BATCH 0>
> > > 0
> > > 1
> > > 2
> > > 1
> > >
> > > <DICTIONARY 0>
> > > (0) "A"
> > > (1) "C"
> > > (2) "D"
> > > (3) "E"
> > >
> > > <RECORD BATCH 1>
> > > 2
> > > 1
> > > 3
> > > 0
> > > EOS
> > > ```
> > >
> > > It also specifies in the IPC File Format (non-streaming) section:
> > >
> > > > In the file format, there is no requirement that dictionary keys should be defined in a DictionaryBatch before they are used in a RecordBatch, as long as the keys are defined somewhere in the file. Further more, it is invalid to have more than one non-delta dictionary batch per dictionary ID (i.e. dictionary replacement is not supported). Delta dictionaries are applied in the order they appear in the file footer.
> > >
> > > So for the non-streaming format multiple non-delta dictionaries are not supported but one non-delta followed by delta dictionaries should be.
> > >
> > > Is it possible to do this in PyArrow? If so, how? If not, how easy would it be to add? Is it currently possible via C++ and therefore can I write a Cython or similar extension that will let me do this now without waiting for a release?
> > >
> >
> > In pyarrow (3.0.0 or later), you need to opt into emitting dictionary
> > deltas using pyarrow.ipc.IpcWriteOptions. Can you show your code?
> >
> > https://github.com/apache/arrow/commit/8d76312dd397ebe07b71531f6d23b8caa76703dc<https://github.com/apache/arrow/commit/8d76312dd397ebe07b71531f6d23b8caa76703dc>
> >
> > > Best,
> > >
> > > Sam
> > > IMPORTANT NOTICE: The information transmitted is intended only for the person or entity to which it is addressed and may contain confidential and/or privileged material. Any review, re-transmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and delete the material from any computer. Although we routinely screen for viruses, addressees should check this e-mail and any attachment for viruses. We make no warranty as to absence of viruses in this e-mail or any attachments.
> > IMPORTANT NOTICE: The information transmitted is intended only for the person or entity to which it is addressed and may contain confidential and/or privileged material. Any review, re-transmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and delete the material from any computer. Although we routinely screen for viruses, addressees should check this e-mail and any attachment for viruses. We make no warranty as to absence of viruses in this e-mail or any attachments.
> IMPORTANT NOTICE: The information transmitted is intended only for the person or entity to which it is addressed and may contain confidential and/or privileged material. Any review, re-transmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and delete the material from any computer. Although we routinely screen for viruses, addressees should check this e-mail and any attachment for viruses. We make no warranty as to absence of viruses in this e-mail or any attachments.
IMPORTANT NOTICE: The information transmitted is intended only for the person or entity to which it is addressed and may contain confidential and/or privileged material. Any review, re-transmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and delete the material from any computer. Although we routinely screen for viruses, addressees should check this e-mail and any attachment for viruses. We make no warranty as to absence of viruses in this e-mail or any attachments.

Re: [PyArrow] DictionaryArray isDelta Support

Posted by Wes McKinney <we...@gmail.com>.
If I'm interpreting you correctly, the issue is that every dictionary
must be a prefix of a common dictionary for the delta logic to work.
So if the first batch has

"a", "b"

then in the next batch

"a", "b", "c" is OK and will emit a delta
"b", "a", "c" is not and will trigger this error

If we wanted to allow for deltas coming from unordered dictionaries as
an option, that could be implemented in theory but it not super
trivial

On Fri, Jul 23, 2021 at 9:25 AM Sam Davis <Sa...@nanoporetech.com> wrote:
>
> For reference, I think this check in the C++ code triggers regardless of whether the delta option is turned on:
>
> https://github.com/apache/arrow/blob/e0401123736c85283e527797a113a3c38c0915f2/cpp/src/arrow/ipc/writer.cc#L1066
> ________________________________
> From: Sam Davis <Sa...@nanoporetech.com>
> Sent: 23 July 2021 14:43
> To: user@arrow.apache.org <us...@arrow.apache.org>
> Subject: Re: [PyArrow] DictionaryArray isDelta Support
>
> Yes I know this as quoted in the spec. What I am wondering is for the file format how can I write deltas out using PyArrow?
>
> The previous example was a trivial version of reality.
>
> More concretely, say I want to write 100e6 rows out in multiple RecordBatches to a non-streaming file format using PyArrow. I do not want to do a complete pass ahead of time to compute the full set of strings for the relevant columns and would therefore like to dump out deltas when new strings appear in a given column. Is this possible?
>
> In the example code ideally this would "just" add on the delta containing the dictionary difference of it and the previous batches. I'm happy as a user to maintain the full set of categories seen thus far and tell PyArrow what the delta is if necessary.
> ________________________________
> From: Wes McKinney <we...@gmail.com>
> Sent: 23 July 2021 14:36
> To: user@arrow.apache.org <us...@arrow.apache.org>
> Subject: Re: [PyArrow] DictionaryArray isDelta Support
>
> Dictionary replacements aren't supported in the file format, only
> deltas. Your use case is a replacement, not a delta. You could use the
> stream format instead.
>
> On Fri, Jul 23, 2021 at 8:32 AM Sam Davis <Sa...@nanoporetech.com> wrote:
> >
> > Hey Wes,
> >
> > Thanks, I had not spotted this before! It doesn't seem to change the behaviour with `pa.ipc.new_file` however. Maybe I'm using it incorrectly?
> >
> > ```
> > import pandas as pd
> > import pyarrow as pa
> >
> > print(pa.__version__)
> >
> > schema = pa.schema([
> > ("foo", pa.dictionary(pa.int16(), pa.string()))
> > ])
> >
> > pd1 = pd.DataFrame({"foo": pd.Categorical(["aaaa"], categories=["a"*i for i in range(64)])})
> > b1 = pa.RecordBatch.from_pandas(pd1, schema=schema)
> >
> > pd2 = pd.DataFrame({"foo": pd.Categorical(["aaaa"], categories=["b"*i for i in range(64)])})
> > b2 = pa.RecordBatch.from_pandas(pd2, schema=schema)
> >
> > options = pa.ipc.IpcWriteOptions(emit_dictionary_deltas=True)
> >
> > with pa.ipc.new_file("/tmp/sdavis_tmp.arrow", schema=b1.schema, options=options) as writer:
> > writer.write(b1)
> > writer.write(b2)
> > ```
> >
> > Version printed: 4.0.1
> >
> > Sam
> > ________________________________
> > From: Wes McKinney <we...@gmail.com>
> > Sent: 23 July 2021 14:24
> > To: user@arrow.apache.org <us...@arrow.apache.org>
> > Subject: Re: [PyArrow] DictionaryArray isDelta Support
> >
> > hi Sam
> >
> > On Fri, Jul 23, 2021 at 8:15 AM Sam Davis <Sa...@nanoporetech.com> wrote:
> > >
> > > Hi,
> > >
> > > We want to write out RecordBatches of data, where one or more columns in a batch has a `pa.string()` column encoded as a `pa.dictionary(pa.intX(), pa.string()` as the column only contains a handful of unique values.
> > >
> > > However, PyArrow seems to lack support for writing these batches out to either the streaming or (non-streaming) file format.
> > >
> > > When attempting to write two distinct batches the following error message is triggered:
> > >
> > > > ArrowInvalid: Dictionary replacement detected when writing IPC file format. Arrow IPC files only support a single dictionary for a given field across all batches.
> > >
> > > I believe this message is false and that support is possible based on reading the spec:
> > >
> > > > Dictionaries are written in the stream and file formats as a sequence of record batches...
> > > > ...
> > > > The dictionary isDelta flag allows existing dictionaries to be expanded for future record batch materializations. A dictionary batch with isDelta set indicates that its vector should be concatenated with those of any previous batches with the same id. In a stream which encodes one column, the list of strings ["A", "B", "C", "B", "D", "C", "E", "A"], with a delta dictionary batch could take the form:
> > >
> > > ```
> > > <SCHEMA>
> > > <DICTIONARY 0>
> > > (0) "A"
> > > (1) "B"
> > > (2) "C"
> > >
> > > <RECORD BATCH 0>
> > > 0
> > > 1
> > > 2
> > > 1
> > >
> > > <DICTIONARY 0 DELTA>
> > > (3) "D"
> > > (4) "E"
> > >
> > > <RECORD BATCH 1>
> > > 3
> > > 2
> > > 4
> > > 0
> > > EOS
> > > ```
> > >
> > > > Alternatively, if isDelta is set to false, then the dictionary replaces the existing dictionary for the same ID. Using the same example as above, an alternate encoding could be:
> > >
> > > ```
> > > <SCHEMA>
> > > <DICTIONARY 0>
> > > (0) "A"
> > > (1) "B"
> > > (2) "C"
> > >
> > > <RECORD BATCH 0>
> > > 0
> > > 1
> > > 2
> > > 1
> > >
> > > <DICTIONARY 0>
> > > (0) "A"
> > > (1) "C"
> > > (2) "D"
> > > (3) "E"
> > >
> > > <RECORD BATCH 1>
> > > 2
> > > 1
> > > 3
> > > 0
> > > EOS
> > > ```
> > >
> > > It also specifies in the IPC File Format (non-streaming) section:
> > >
> > > > In the file format, there is no requirement that dictionary keys should be defined in a DictionaryBatch before they are used in a RecordBatch, as long as the keys are defined somewhere in the file. Further more, it is invalid to have more than one non-delta dictionary batch per dictionary ID (i.e. dictionary replacement is not supported). Delta dictionaries are applied in the order they appear in the file footer.
> > >
> > > So for the non-streaming format multiple non-delta dictionaries are not supported but one non-delta followed by delta dictionaries should be.
> > >
> > > Is it possible to do this in PyArrow? If so, how? If not, how easy would it be to add? Is it currently possible via C++ and therefore can I write a Cython or similar extension that will let me do this now without waiting for a release?
> > >
> >
> > In pyarrow (3.0.0 or later), you need to opt into emitting dictionary
> > deltas using pyarrow.ipc.IpcWriteOptions. Can you show your code?
> >
> > https://github.com/apache/arrow/commit/8d76312dd397ebe07b71531f6d23b8caa76703dc
> >
> > > Best,
> > >
> > > Sam
> > > IMPORTANT NOTICE: The information transmitted is intended only for the person or entity to which it is addressed and may contain confidential and/or privileged material. Any review, re-transmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and delete the material from any computer. Although we routinely screen for viruses, addressees should check this e-mail and any attachment for viruses. We make no warranty as to absence of viruses in this e-mail or any attachments.
> > IMPORTANT NOTICE: The information transmitted is intended only for the person or entity to which it is addressed and may contain confidential and/or privileged material. Any review, re-transmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and delete the material from any computer. Although we routinely screen for viruses, addressees should check this e-mail and any attachment for viruses. We make no warranty as to absence of viruses in this e-mail or any attachments.
> IMPORTANT NOTICE: The information transmitted is intended only for the person or entity to which it is addressed and may contain confidential and/or privileged material. Any review, re-transmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and delete the material from any computer. Although we routinely screen for viruses, addressees should check this e-mail and any attachment for viruses. We make no warranty as to absence of viruses in this e-mail or any attachments.

Re: [PyArrow] DictionaryArray isDelta Support

Posted by Sam Davis <Sa...@nanoporetech.com>.
For reference, I think this check in the C++ code triggers regardless of whether the delta option is turned on:

https://github.com/apache/arrow/blob/e0401123736c85283e527797a113a3c38c0915f2/cpp/src/arrow/ipc/writer.cc#L1066
________________________________
From: Sam Davis <Sa...@nanoporetech.com>
Sent: 23 July 2021 14:43
To: user@arrow.apache.org <us...@arrow.apache.org>
Subject: Re: [PyArrow] DictionaryArray isDelta Support

Yes I know this as quoted in the spec. What I am wondering is for the file format how can I write deltas out using PyArrow?

The previous example was a trivial version of reality.

More concretely, say I want to write 100e6 rows out in multiple RecordBatches to a non-streaming file format using PyArrow. I do not want to do a complete pass ahead of time to compute the full set of strings for the relevant columns and would therefore like to dump out deltas when new strings appear in a given column. Is this possible?

In the example code ideally this would "just" add on the delta containing the dictionary difference of it and the previous batches. I'm happy as a user to maintain the full set of categories seen thus far and tell PyArrow what the delta is if necessary.
________________________________
From: Wes McKinney <we...@gmail.com>
Sent: 23 July 2021 14:36
To: user@arrow.apache.org <us...@arrow.apache.org>
Subject: Re: [PyArrow] DictionaryArray isDelta Support

Dictionary replacements aren't supported in the file format, only
deltas. Your use case is a replacement, not a delta. You could use the
stream format instead.

On Fri, Jul 23, 2021 at 8:32 AM Sam Davis <Sa...@nanoporetech.com> wrote:
>
> Hey Wes,
>
> Thanks, I had not spotted this before! It doesn't seem to change the behaviour with `pa.ipc.new_file` however. Maybe I'm using it incorrectly?
>
> ```
> import pandas as pd
> import pyarrow as pa
>
> print(pa.__version__)
>
> schema = pa.schema([
> ("foo", pa.dictionary(pa.int16(), pa.string()))
> ])
>
> pd1 = pd.DataFrame({"foo": pd.Categorical(["aaaa"], categories=["a"*i for i in range(64)])})
> b1 = pa.RecordBatch.from_pandas(pd1, schema=schema)
>
> pd2 = pd.DataFrame({"foo": pd.Categorical(["aaaa"], categories=["b"*i for i in range(64)])})
> b2 = pa.RecordBatch.from_pandas(pd2, schema=schema)
>
> options = pa.ipc.IpcWriteOptions(emit_dictionary_deltas=True)
>
> with pa.ipc.new_file("/tmp/sdavis_tmp.arrow", schema=b1.schema, options=options) as writer:
> writer.write(b1)
> writer.write(b2)
> ```
>
> Version printed: 4.0.1
>
> Sam
> ________________________________
> From: Wes McKinney <we...@gmail.com>
> Sent: 23 July 2021 14:24
> To: user@arrow.apache.org <us...@arrow.apache.org>
> Subject: Re: [PyArrow] DictionaryArray isDelta Support
>
> hi Sam
>
> On Fri, Jul 23, 2021 at 8:15 AM Sam Davis <Sa...@nanoporetech.com> wrote:
> >
> > Hi,
> >
> > We want to write out RecordBatches of data, where one or more columns in a batch has a `pa.string()` column encoded as a `pa.dictionary(pa.intX(), pa.string()` as the column only contains a handful of unique values.
> >
> > However, PyArrow seems to lack support for writing these batches out to either the streaming or (non-streaming) file format.
> >
> > When attempting to write two distinct batches the following error message is triggered:
> >
> > > ArrowInvalid: Dictionary replacement detected when writing IPC file format. Arrow IPC files only support a single dictionary for a given field across all batches.
> >
> > I believe this message is false and that support is possible based on reading the spec:
> >
> > > Dictionaries are written in the stream and file formats as a sequence of record batches...
> > > ...
> > > The dictionary isDelta flag allows existing dictionaries to be expanded for future record batch materializations. A dictionary batch with isDelta set indicates that its vector should be concatenated with those of any previous batches with the same id. In a stream which encodes one column, the list of strings ["A", "B", "C", "B", "D", "C", "E", "A"], with a delta dictionary batch could take the form:
> >
> > ```
> > <SCHEMA>
> > <DICTIONARY 0>
> > (0) "A"
> > (1) "B"
> > (2) "C"
> >
> > <RECORD BATCH 0>
> > 0
> > 1
> > 2
> > 1
> >
> > <DICTIONARY 0 DELTA>
> > (3) "D"
> > (4) "E"
> >
> > <RECORD BATCH 1>
> > 3
> > 2
> > 4
> > 0
> > EOS
> > ```
> >
> > > Alternatively, if isDelta is set to false, then the dictionary replaces the existing dictionary for the same ID. Using the same example as above, an alternate encoding could be:
> >
> > ```
> > <SCHEMA>
> > <DICTIONARY 0>
> > (0) "A"
> > (1) "B"
> > (2) "C"
> >
> > <RECORD BATCH 0>
> > 0
> > 1
> > 2
> > 1
> >
> > <DICTIONARY 0>
> > (0) "A"
> > (1) "C"
> > (2) "D"
> > (3) "E"
> >
> > <RECORD BATCH 1>
> > 2
> > 1
> > 3
> > 0
> > EOS
> > ```
> >
> > It also specifies in the IPC File Format (non-streaming) section:
> >
> > > In the file format, there is no requirement that dictionary keys should be defined in a DictionaryBatch before they are used in a RecordBatch, as long as the keys are defined somewhere in the file. Further more, it is invalid to have more than one non-delta dictionary batch per dictionary ID (i.e. dictionary replacement is not supported). Delta dictionaries are applied in the order they appear in the file footer.
> >
> > So for the non-streaming format multiple non-delta dictionaries are not supported but one non-delta followed by delta dictionaries should be.
> >
> > Is it possible to do this in PyArrow? If so, how? If not, how easy would it be to add? Is it currently possible via C++ and therefore can I write a Cython or similar extension that will let me do this now without waiting for a release?
> >
>
> In pyarrow (3.0.0 or later), you need to opt into emitting dictionary
> deltas using pyarrow.ipc.IpcWriteOptions. Can you show your code?
>
> https://github.com/apache/arrow/commit/8d76312dd397ebe07b71531f6d23b8caa76703dc<https://github.com/apache/arrow/commit/8d76312dd397ebe07b71531f6d23b8caa76703dc>
>
> > Best,
> >
> > Sam
> > IMPORTANT NOTICE: The information transmitted is intended only for the person or entity to which it is addressed and may contain confidential and/or privileged material. Any review, re-transmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and delete the material from any computer. Although we routinely screen for viruses, addressees should check this e-mail and any attachment for viruses. We make no warranty as to absence of viruses in this e-mail or any attachments.
> IMPORTANT NOTICE: The information transmitted is intended only for the person or entity to which it is addressed and may contain confidential and/or privileged material. Any review, re-transmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and delete the material from any computer. Although we routinely screen for viruses, addressees should check this e-mail and any attachment for viruses. We make no warranty as to absence of viruses in this e-mail or any attachments.
IMPORTANT NOTICE: The information transmitted is intended only for the person or entity to which it is addressed and may contain confidential and/or privileged material. Any review, re-transmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and delete the material from any computer. Although we routinely screen for viruses, addressees should check this e-mail and any attachment for viruses. We make no warranty as to absence of viruses in this e-mail or any attachments.

Re: [PyArrow] DictionaryArray isDelta Support

Posted by Sam Davis <Sa...@nanoporetech.com>.
Yes I know this as quoted in the spec. What I am wondering is for the file format how can I write deltas out using PyArrow?

The previous example was a trivial version of reality.

More concretely, say I want to write 100e6 rows out in multiple RecordBatches to a non-streaming file format using PyArrow. I do not want to do a complete pass ahead of time to compute the full set of strings for the relevant columns and would therefore like to dump out deltas when new strings appear in a given column. Is this possible?

In the example code ideally this would "just" add on the delta containing the dictionary difference of it and the previous batches. I'm happy as a user to maintain the full set of categories seen thus far and tell PyArrow what the delta is if necessary.
________________________________
From: Wes McKinney <we...@gmail.com>
Sent: 23 July 2021 14:36
To: user@arrow.apache.org <us...@arrow.apache.org>
Subject: Re: [PyArrow] DictionaryArray isDelta Support

Dictionary replacements aren't supported in the file format, only
deltas. Your use case is a replacement, not a delta. You could use the
stream format instead.

On Fri, Jul 23, 2021 at 8:32 AM Sam Davis <Sa...@nanoporetech.com> wrote:
>
> Hey Wes,
>
> Thanks, I had not spotted this before! It doesn't seem to change the behaviour with `pa.ipc.new_file` however. Maybe I'm using it incorrectly?
>
> ```
> import pandas as pd
> import pyarrow as pa
>
> print(pa.__version__)
>
> schema = pa.schema([
> ("foo", pa.dictionary(pa.int16(), pa.string()))
> ])
>
> pd1 = pd.DataFrame({"foo": pd.Categorical(["aaaa"], categories=["a"*i for i in range(64)])})
> b1 = pa.RecordBatch.from_pandas(pd1, schema=schema)
>
> pd2 = pd.DataFrame({"foo": pd.Categorical(["aaaa"], categories=["b"*i for i in range(64)])})
> b2 = pa.RecordBatch.from_pandas(pd2, schema=schema)
>
> options = pa.ipc.IpcWriteOptions(emit_dictionary_deltas=True)
>
> with pa.ipc.new_file("/tmp/sdavis_tmp.arrow", schema=b1.schema, options=options) as writer:
> writer.write(b1)
> writer.write(b2)
> ```
>
> Version printed: 4.0.1
>
> Sam
> ________________________________
> From: Wes McKinney <we...@gmail.com>
> Sent: 23 July 2021 14:24
> To: user@arrow.apache.org <us...@arrow.apache.org>
> Subject: Re: [PyArrow] DictionaryArray isDelta Support
>
> hi Sam
>
> On Fri, Jul 23, 2021 at 8:15 AM Sam Davis <Sa...@nanoporetech.com> wrote:
> >
> > Hi,
> >
> > We want to write out RecordBatches of data, where one or more columns in a batch has a `pa.string()` column encoded as a `pa.dictionary(pa.intX(), pa.string()` as the column only contains a handful of unique values.
> >
> > However, PyArrow seems to lack support for writing these batches out to either the streaming or (non-streaming) file format.
> >
> > When attempting to write two distinct batches the following error message is triggered:
> >
> > > ArrowInvalid: Dictionary replacement detected when writing IPC file format. Arrow IPC files only support a single dictionary for a given field across all batches.
> >
> > I believe this message is false and that support is possible based on reading the spec:
> >
> > > Dictionaries are written in the stream and file formats as a sequence of record batches...
> > > ...
> > > The dictionary isDelta flag allows existing dictionaries to be expanded for future record batch materializations. A dictionary batch with isDelta set indicates that its vector should be concatenated with those of any previous batches with the same id. In a stream which encodes one column, the list of strings ["A", "B", "C", "B", "D", "C", "E", "A"], with a delta dictionary batch could take the form:
> >
> > ```
> > <SCHEMA>
> > <DICTIONARY 0>
> > (0) "A"
> > (1) "B"
> > (2) "C"
> >
> > <RECORD BATCH 0>
> > 0
> > 1
> > 2
> > 1
> >
> > <DICTIONARY 0 DELTA>
> > (3) "D"
> > (4) "E"
> >
> > <RECORD BATCH 1>
> > 3
> > 2
> > 4
> > 0
> > EOS
> > ```
> >
> > > Alternatively, if isDelta is set to false, then the dictionary replaces the existing dictionary for the same ID. Using the same example as above, an alternate encoding could be:
> >
> > ```
> > <SCHEMA>
> > <DICTIONARY 0>
> > (0) "A"
> > (1) "B"
> > (2) "C"
> >
> > <RECORD BATCH 0>
> > 0
> > 1
> > 2
> > 1
> >
> > <DICTIONARY 0>
> > (0) "A"
> > (1) "C"
> > (2) "D"
> > (3) "E"
> >
> > <RECORD BATCH 1>
> > 2
> > 1
> > 3
> > 0
> > EOS
> > ```
> >
> > It also specifies in the IPC File Format (non-streaming) section:
> >
> > > In the file format, there is no requirement that dictionary keys should be defined in a DictionaryBatch before they are used in a RecordBatch, as long as the keys are defined somewhere in the file. Further more, it is invalid to have more than one non-delta dictionary batch per dictionary ID (i.e. dictionary replacement is not supported). Delta dictionaries are applied in the order they appear in the file footer.
> >
> > So for the non-streaming format multiple non-delta dictionaries are not supported but one non-delta followed by delta dictionaries should be.
> >
> > Is it possible to do this in PyArrow? If so, how? If not, how easy would it be to add? Is it currently possible via C++ and therefore can I write a Cython or similar extension that will let me do this now without waiting for a release?
> >
>
> In pyarrow (3.0.0 or later), you need to opt into emitting dictionary
> deltas using pyarrow.ipc.IpcWriteOptions. Can you show your code?
>
> https://github.com/apache/arrow/commit/8d76312dd397ebe07b71531f6d23b8caa76703dc<https://github.com/apache/arrow/commit/8d76312dd397ebe07b71531f6d23b8caa76703dc>
>
> > Best,
> >
> > Sam
> > IMPORTANT NOTICE: The information transmitted is intended only for the person or entity to which it is addressed and may contain confidential and/or privileged material. Any review, re-transmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and delete the material from any computer. Although we routinely screen for viruses, addressees should check this e-mail and any attachment for viruses. We make no warranty as to absence of viruses in this e-mail or any attachments.
> IMPORTANT NOTICE: The information transmitted is intended only for the person or entity to which it is addressed and may contain confidential and/or privileged material. Any review, re-transmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and delete the material from any computer. Although we routinely screen for viruses, addressees should check this e-mail and any attachment for viruses. We make no warranty as to absence of viruses in this e-mail or any attachments.
IMPORTANT NOTICE: The information transmitted is intended only for the person or entity to which it is addressed and may contain confidential and/or privileged material. Any review, re-transmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and delete the material from any computer. Although we routinely screen for viruses, addressees should check this e-mail and any attachment for viruses. We make no warranty as to absence of viruses in this e-mail or any attachments.

Re: [PyArrow] DictionaryArray isDelta Support

Posted by Wes McKinney <we...@gmail.com>.
Dictionary replacements aren't supported in the file format, only
deltas. Your use case is a replacement, not a delta. You could use the
stream format instead.

On Fri, Jul 23, 2021 at 8:32 AM Sam Davis <Sa...@nanoporetech.com> wrote:
>
> Hey Wes,
>
> Thanks, I had not spotted this before! It doesn't seem to change the behaviour with `pa.ipc.new_file` however. Maybe I'm using it incorrectly?
>
> ```
> import pandas as pd
> import pyarrow as pa
>
> print(pa.__version__)
>
> schema = pa.schema([
>     ("foo", pa.dictionary(pa.int16(), pa.string()))
> ])
>
> pd1 = pd.DataFrame({"foo": pd.Categorical(["aaaa"], categories=["a"*i for i in range(64)])})
> b1 = pa.RecordBatch.from_pandas(pd1, schema=schema)
>
> pd2 = pd.DataFrame({"foo": pd.Categorical(["aaaa"], categories=["b"*i for i in range(64)])})
> b2 = pa.RecordBatch.from_pandas(pd2, schema=schema)
>
> options = pa.ipc.IpcWriteOptions(emit_dictionary_deltas=True)
>
> with pa.ipc.new_file("/tmp/sdavis_tmp.arrow", schema=b1.schema, options=options) as writer:
>     writer.write(b1)
>     writer.write(b2)
> ```
>
> Version printed: 4.0.1
>
> Sam
> ________________________________
> From: Wes McKinney <we...@gmail.com>
> Sent: 23 July 2021 14:24
> To: user@arrow.apache.org <us...@arrow.apache.org>
> Subject: Re: [PyArrow] DictionaryArray isDelta Support
>
> hi Sam
>
> On Fri, Jul 23, 2021 at 8:15 AM Sam Davis <Sa...@nanoporetech.com> wrote:
> >
> > Hi,
> >
> > We want to write out RecordBatches of data, where one or more columns in a batch has a `pa.string()` column encoded as a `pa.dictionary(pa.intX(), pa.string()` as the column only contains a handful of unique values.
> >
> > However, PyArrow seems to lack support for writing these batches out to either the streaming or (non-streaming) file format.
> >
> > When attempting to write two distinct batches the following error message is triggered:
> >
> > > ArrowInvalid: Dictionary replacement detected when writing IPC file format. Arrow IPC files only support a single dictionary for a given field across all batches.
> >
> > I believe this message is false and that support is possible based on reading the spec:
> >
> > > Dictionaries are written in the stream and file formats as a sequence of record batches...
> > > ...
> > > The dictionary isDelta flag allows existing dictionaries to be expanded for future record batch materializations. A dictionary batch with isDelta set indicates that its vector should be concatenated with those of any previous batches with the same id. In a stream which encodes one column, the list of strings ["A", "B", "C", "B", "D", "C", "E", "A"], with a delta dictionary batch could take the form:
> >
> > ```
> > <SCHEMA>
> > <DICTIONARY 0>
> > (0) "A"
> > (1) "B"
> > (2) "C"
> >
> > <RECORD BATCH 0>
> > 0
> > 1
> > 2
> > 1
> >
> > <DICTIONARY 0 DELTA>
> > (3) "D"
> > (4) "E"
> >
> > <RECORD BATCH 1>
> > 3
> > 2
> > 4
> > 0
> > EOS
> > ```
> >
> > > Alternatively, if isDelta is set to false, then the dictionary replaces the existing dictionary for the same ID. Using the same example as above, an alternate encoding could be:
> >
> > ```
> > <SCHEMA>
> > <DICTIONARY 0>
> > (0) "A"
> > (1) "B"
> > (2) "C"
> >
> > <RECORD BATCH 0>
> > 0
> > 1
> > 2
> > 1
> >
> > <DICTIONARY 0>
> > (0) "A"
> > (1) "C"
> > (2) "D"
> > (3) "E"
> >
> > <RECORD BATCH 1>
> > 2
> > 1
> > 3
> > 0
> > EOS
> > ```
> >
> > It also specifies in the IPC File Format (non-streaming) section:
> >
> > > In the file format, there is no requirement that dictionary keys should be defined in a DictionaryBatch before they are used in a RecordBatch, as long as the keys are defined somewhere in the file. Further more, it is invalid to have more than one non-delta dictionary batch per dictionary ID (i.e. dictionary replacement is not supported). Delta dictionaries are applied in the order they appear in the file footer.
> >
> > So for the non-streaming format multiple non-delta dictionaries are not supported but one non-delta followed by delta dictionaries should be.
> >
> > Is it possible to do this in PyArrow? If so, how? If not, how easy would it be to add? Is it currently possible via C++ and therefore can I write a Cython or similar extension that will let me do this now without waiting for a release?
> >
>
> In pyarrow (3.0.0 or later), you need to opt into emitting dictionary
> deltas using pyarrow.ipc.IpcWriteOptions. Can you show your code?
>
> https://github.com/apache/arrow/commit/8d76312dd397ebe07b71531f6d23b8caa76703dc
>
> > Best,
> >
> > Sam
> > IMPORTANT NOTICE: The information transmitted is intended only for the person or entity to which it is addressed and may contain confidential and/or privileged material. Any review, re-transmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and delete the material from any computer. Although we routinely screen for viruses, addressees should check this e-mail and any attachment for viruses. We make no warranty as to absence of viruses in this e-mail or any attachments.
> IMPORTANT NOTICE: The information transmitted is intended only for the person or entity to which it is addressed and may contain confidential and/or privileged material. Any review, re-transmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and delete the material from any computer. Although we routinely screen for viruses, addressees should check this e-mail and any attachment for viruses. We make no warranty as to absence of viruses in this e-mail or any attachments.

Re: [PyArrow] DictionaryArray isDelta Support

Posted by Sam Davis <Sa...@nanoporetech.com>.
Hey Wes,

Thanks, I had not spotted this before! It doesn't seem to change the behaviour with `pa.ipc.new_file` however. Maybe I'm using it incorrectly?

```
import pandas as pd
import pyarrow as pa

print(pa.__version__)

schema = pa.schema([
    ("foo", pa.dictionary(pa.int16(), pa.string()))
])

pd1 = pd.DataFrame({"foo": pd.Categorical(["aaaa"], categories=["a"*i for i in range(64)])})
b1 = pa.RecordBatch.from_pandas(pd1, schema=schema)

pd2 = pd.DataFrame({"foo": pd.Categorical(["aaaa"], categories=["b"*i for i in range(64)])})
b2 = pa.RecordBatch.from_pandas(pd2, schema=schema)

options = pa.ipc.IpcWriteOptions(emit_dictionary_deltas=True)

with pa.ipc.new_file("/tmp/sdavis_tmp.arrow", schema=b1.schema, options=options) as writer:
    writer.write(b1)
    writer.write(b2)
```

Version printed: 4.0.1

Sam
________________________________
From: Wes McKinney <we...@gmail.com>
Sent: 23 July 2021 14:24
To: user@arrow.apache.org <us...@arrow.apache.org>
Subject: Re: [PyArrow] DictionaryArray isDelta Support

hi Sam

On Fri, Jul 23, 2021 at 8:15 AM Sam Davis <Sa...@nanoporetech.com> wrote:
>
> Hi,
>
> We want to write out RecordBatches of data, where one or more columns in a batch has a `pa.string()` column encoded as a `pa.dictionary(pa.intX(), pa.string()` as the column only contains a handful of unique values.
>
> However, PyArrow seems to lack support for writing these batches out to either the streaming or (non-streaming) file format.
>
> When attempting to write two distinct batches the following error message is triggered:
>
> > ArrowInvalid: Dictionary replacement detected when writing IPC file format. Arrow IPC files only support a single dictionary for a given field across all batches.
>
> I believe this message is false and that support is possible based on reading the spec:
>
> > Dictionaries are written in the stream and file formats as a sequence of record batches...
> > ...
> > The dictionary isDelta flag allows existing dictionaries to be expanded for future record batch materializations. A dictionary batch with isDelta set indicates that its vector should be concatenated with those of any previous batches with the same id. In a stream which encodes one column, the list of strings ["A", "B", "C", "B", "D", "C", "E", "A"], with a delta dictionary batch could take the form:
>
> ```
> <SCHEMA>
> <DICTIONARY 0>
> (0) "A"
> (1) "B"
> (2) "C"
>
> <RECORD BATCH 0>
> 0
> 1
> 2
> 1
>
> <DICTIONARY 0 DELTA>
> (3) "D"
> (4) "E"
>
> <RECORD BATCH 1>
> 3
> 2
> 4
> 0
> EOS
> ```
>
> > Alternatively, if isDelta is set to false, then the dictionary replaces the existing dictionary for the same ID. Using the same example as above, an alternate encoding could be:
>
> ```
> <SCHEMA>
> <DICTIONARY 0>
> (0) "A"
> (1) "B"
> (2) "C"
>
> <RECORD BATCH 0>
> 0
> 1
> 2
> 1
>
> <DICTIONARY 0>
> (0) "A"
> (1) "C"
> (2) "D"
> (3) "E"
>
> <RECORD BATCH 1>
> 2
> 1
> 3
> 0
> EOS
> ```
>
> It also specifies in the IPC File Format (non-streaming) section:
>
> > In the file format, there is no requirement that dictionary keys should be defined in a DictionaryBatch before they are used in a RecordBatch, as long as the keys are defined somewhere in the file. Further more, it is invalid to have more than one non-delta dictionary batch per dictionary ID (i.e. dictionary replacement is not supported). Delta dictionaries are applied in the order they appear in the file footer.
>
> So for the non-streaming format multiple non-delta dictionaries are not supported but one non-delta followed by delta dictionaries should be.
>
> Is it possible to do this in PyArrow? If so, how? If not, how easy would it be to add? Is it currently possible via C++ and therefore can I write a Cython or similar extension that will let me do this now without waiting for a release?
>

In pyarrow (3.0.0 or later), you need to opt into emitting dictionary
deltas using pyarrow.ipc.IpcWriteOptions. Can you show your code?

https://github.com/apache/arrow/commit/8d76312dd397ebe07b71531f6d23b8caa76703dc<https://github.com/apache/arrow/commit/8d76312dd397ebe07b71531f6d23b8caa76703dc>

> Best,
>
> Sam
> IMPORTANT NOTICE: The information transmitted is intended only for the person or entity to which it is addressed and may contain confidential and/or privileged material. Any review, re-transmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and delete the material from any computer. Although we routinely screen for viruses, addressees should check this e-mail and any attachment for viruses. We make no warranty as to absence of viruses in this e-mail or any attachments.
IMPORTANT NOTICE: The information transmitted is intended only for the person or entity to which it is addressed and may contain confidential and/or privileged material. Any review, re-transmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and delete the material from any computer. Although we routinely screen for viruses, addressees should check this e-mail and any attachment for viruses. We make no warranty as to absence of viruses in this e-mail or any attachments.

Re: [PyArrow] DictionaryArray isDelta Support

Posted by Wes McKinney <we...@gmail.com>.
hi Sam

On Fri, Jul 23, 2021 at 8:15 AM Sam Davis <Sa...@nanoporetech.com> wrote:
>
> Hi,
>
> We want to write out RecordBatches of data, where one or more columns in a batch has a `pa.string()` column encoded as a `pa.dictionary(pa.intX(), pa.string()` as the column only contains a handful of unique values.
>
> However, PyArrow seems to lack support for writing these batches out to either the streaming or (non-streaming) file format.
>
> When attempting to write two distinct batches the following error message is triggered:
>
> > ArrowInvalid: Dictionary replacement detected when writing IPC file format. Arrow IPC files only support a single dictionary for a given field across all batches.
>
> I believe this message is false and that support is possible based on reading the spec:
>
> > Dictionaries are written in the stream and file formats as a sequence of record batches...
> > ...
> > The dictionary isDelta flag allows existing dictionaries to be expanded for future record batch materializations. A dictionary batch with isDelta set indicates that its vector should be concatenated with those of any previous batches with the same id. In a stream which encodes one column, the list of strings ["A", "B", "C", "B", "D", "C", "E", "A"], with a delta dictionary batch could take the form:
>
> ```
> <SCHEMA>
> <DICTIONARY 0>
> (0) "A"
> (1) "B"
> (2) "C"
>
> <RECORD BATCH 0>
> 0
> 1
> 2
> 1
>
> <DICTIONARY 0 DELTA>
> (3) "D"
> (4) "E"
>
> <RECORD BATCH 1>
> 3
> 2
> 4
> 0
> EOS
> ```
>
> > Alternatively, if isDelta is set to false, then the dictionary replaces the existing dictionary for the same ID. Using the same example as above, an alternate encoding could be:
>
> ```
> <SCHEMA>
> <DICTIONARY 0>
> (0) "A"
> (1) "B"
> (2) "C"
>
> <RECORD BATCH 0>
> 0
> 1
> 2
> 1
>
> <DICTIONARY 0>
> (0) "A"
> (1) "C"
> (2) "D"
> (3) "E"
>
> <RECORD BATCH 1>
> 2
> 1
> 3
> 0
> EOS
> ```
>
> It also specifies in the IPC File Format (non-streaming) section:
>
> > In the file format, there is no requirement that dictionary keys should be defined in a DictionaryBatch before they are used in a RecordBatch, as long as the keys are defined somewhere in the file. Further more, it is invalid to have more than one non-delta dictionary batch per dictionary ID (i.e. dictionary replacement is not supported). Delta dictionaries are applied in the order they appear in the file footer.
>
> So for the non-streaming format multiple non-delta dictionaries are not supported but one non-delta followed by delta dictionaries should be.
>
> Is it possible to do this in PyArrow? If so, how? If not, how easy would it be to add? Is it currently possible via C++ and therefore can I write a Cython or similar extension that will let me do this now without waiting for a release?
>

In pyarrow (3.0.0 or later), you need to opt into emitting dictionary
deltas using pyarrow.ipc.IpcWriteOptions. Can you show your code?

https://github.com/apache/arrow/commit/8d76312dd397ebe07b71531f6d23b8caa76703dc

> Best,
>
> Sam
> IMPORTANT NOTICE: The information transmitted is intended only for the person or entity to which it is addressed and may contain confidential and/or privileged material. Any review, re-transmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and delete the material from any computer. Although we routinely screen for viruses, addressees should check this e-mail and any attachment for viruses. We make no warranty as to absence of viruses in this e-mail or any attachments.