You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@arrow.apache.org by "Fokko (via GitHub)" <gi...@apache.org> on 2023/05/24 21:30:49 UTC

[GitHub] [arrow] Fokko opened a new issue, #35748: Implement efficient merging of chunked arrays

Fokko opened a new issue, #35748:
URL: https://github.com/apache/arrow/issues/35748

   ### Describe the enhancement requested
   
   At PyIceberg we're relying heavily on Arrow for reading the data. This includes Iceberg positional deletes. A positional deletes file will tell which indices are deleted in a parquet file. There can be one or more arrays telling which indices are deleted. The arrays are already sorted, and ideally it also deduplicates the numbers:
   
   ```python
   import pyarrow as pa
   
   a1 = pa.chunked_array([[1, 2, 3], [6, 7]])
   a2 = pa.chunked_array([[4, 7, 8]])
   
   # [1, 2, 3, 4, 6, 7, 8]
   pa.merge_arrays(a1, a2)
   ```
   
   The result can be an array or a chunked array, but that's up to the implementer what's most efficient.
   
   Currently we use:
   ```
   In [2]: from heapq import merge
      ...: import pyarrow as pa
      ...: 
      ...: a1 = pa.chunked_array([[1, 2, 3], [6, 7]])
      ...: a2 = pa.chunked_array([[4, 7, 8]])
      ...: 
      ...: list(merge(*[a1, a2], key=lambda e: e.as_py()))
   Out[2]: 
   [<pyarrow.Int64Scalar: 1>,
    <pyarrow.Int64Scalar: 2>,
    <pyarrow.Int64Scalar: 3>,
    <pyarrow.Int64Scalar: 4>,
    <pyarrow.Int64Scalar: 6>,
    <pyarrow.Int64Scalar: 7>,
    <pyarrow.Int64Scalar: 7>,
    <pyarrow.Int64Scalar: 8>]
   ```
   
   ### Component(s)
   
   Python


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@arrow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] jorisvandenbossche commented on issue #35748: [Python] Implement efficient merging of chunked arrays

Posted by "jorisvandenbossche (via GitHub)" <gi...@apache.org>.
jorisvandenbossche commented on issue #35748:
URL: https://github.com/apache/arrow/issues/35748#issuecomment-1570330670

   We already have `pa.concat_arrays` that will give a single (non-chunked) array. However, that expects a list of Arrays, and doesn't work with ChunkedArrays. So to use it concatenate a list of chunked arrays into a single one, we need some more gymnastics to flatten the chunks, currently:
   
   ```python
   >>> merged = pa.concat_arrays([chunk for arr in [a1, a2] for chunk in arr.chunks])
   >>> merged
   <pyarrow.lib.Int64Array object at 0x7fe2e69710c0>
   [
     1,
     2,
     3,
     6,
     7,
     4,
     7,
     8
   ]
   ```
   
   We should maybe update `pa.concat_arrays` to also accept ChunkedArrays.
   
   Of course, that doesn't make them unique. You can then get the unique values of the merged array:
   
   ```python
   >>> merged.unique()
   <pyarrow.lib.Int64Array object at 0x7fe2e6bd12a0>
   [
     1,
     2,
     3,
     6,
     7,
     4,
     8
   ]
   ```
   
   But for larger arrays, it might be more efficient to first get the uniques before actually concatenating, since we can also calculate the uniques values directly for a ChunkedArray. If we convert the list of chunked arrays into one chunked array (which is zero copy), and then get the uniques of this:
   
   ```python
   >>> merged_chunked = pa.chunked_array([chunk for arr in [a1, a2] for chunk in arr.chunks])
   >>> merged_chunked.unique()
   <pyarrow.lib.Int64Array object at 0x7fe2e692f880>
   [
     1,
     2,
     3,
     6,
     7,
     4,
     8
   ]
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] [Python] Implement efficient merging of chunked arrays [arrow]

Posted by "mapleFU (via GitHub)" <gi...@apache.org>.
mapleFU commented on issue #35748:
URL: https://github.com/apache/arrow/issues/35748#issuecomment-1765981963

   Thanks a lot! This is better cleaner than my code!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] Fokko commented on issue #35748: [Python] Implement efficient merging of chunked arrays

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on issue #35748:
URL: https://github.com/apache/arrow/issues/35748#issuecomment-1591178372

   Thanks for the input, I really appreciate it. I went with the following:
   
   ![image](https://github.com/apache/arrow/assets/1134248/bc963ae2-33c8-44a4-8410-bc04340cdd1c)
   
   As expected, it is an order of magnitude faster than my fancy generator. It seemed that [numpy.setdiff1d](https://numpy.org/doc/stable/reference/generated/numpy.setdiff1d.html) is doing what I was looking for. As expected, sorting the array upfront does not help the performance. I'm going to close this one now since the performance looks satisfactory. Again, thanks for the pointers!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] Fokko commented on issue #35748: [Python] Implement efficient merging of chunked arrays

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on issue #35748:
URL: https://github.com/apache/arrow/issues/35748#issuecomment-1577578808

   @jorisvandenbossche Sorry for not being clear. It is not about the concat operation but about the merge operation. Both of the input arrays are already unique and sorted. The input arrays can overlap, but the arrays themselves don't.
   
   The expected output would be:
   ```
   >>> merged_chunked = pa.chunked_array([chunk for arr in [a1, a2] for chunk in arr.chunks])
   >>> merged_chunked.unique()
   <pyarrow.lib.Int64Array object at 0x7fe2e692f880>
   [
     1,
     2,
     3,
     4, <-- 4 comes after 3 and before 6
     6,
     7, <-- 7 is present in both arrays, but we are interested in the value just once.
     8
   ]
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] [Python] Implement efficient merging of chunked arrays [arrow]

Posted by "mapleFU (via GitHub)" <gi...@apache.org>.
mapleFU commented on issue #35748:
URL: https://github.com/apache/arrow/issues/35748#issuecomment-1765741400

   I've another problem. If we have `RecordBatch` like schema "struct<a:int, b:string>", and we only need to merge the sequence by `a`, how can we easily achieve this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] jorisvandenbossche commented on issue #35748: [Python] Implement efficient merging of chunked arrays

Posted by "jorisvandenbossche (via GitHub)" <gi...@apache.org>.
jorisvandenbossche commented on issue #35748:
URL: https://github.com/apache/arrow/issues/35748#issuecomment-1589187240

   > So you can get part of the way there by combining all the chunks and using `pyarrow.compute.sort`. This still means the individual chunks will be "resorted".
   
   If you want unique values in the end, in general I expect it to be more efficient to first get the uniques, and only sort afterwards (that also avoids having to materialize the full sorted array).  
   Maybe if you already have mostly sorted / mostly unique chunks to start with, that might be different.
   
   ---
   
   Note that we nowadays have a `sort()` helper method that does the `sort_indices`+`take` under the hood for you, so you can write Weston's version as:
   
   ```
   all_chunks.sort().unique()
   ```
   
   (while my suggestion was `all_chunks.unique().sort()`, i.e. just a different order of calling sort vs unique)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] [Python] Implement efficient merging of chunked arrays [arrow]

Posted by "jorisvandenbossche (via GitHub)" <gi...@apache.org>.
jorisvandenbossche commented on issue #35748:
URL: https://github.com/apache/arrow/issues/35748#issuecomment-1765898863

   Can you give an example of input data and expected result, or explain a bit more elaborately what you are trying achieve? (I don't fully understand what you mean with "merge" in this case if you have multiple fields, eg what to do with the other field?)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] jorisvandenbossche commented on issue #35748: [Python] Implement efficient merging of chunked arrays

Posted by "jorisvandenbossche (via GitHub)" <gi...@apache.org>.
jorisvandenbossche commented on issue #35748:
URL: https://github.com/apache/arrow/issues/35748#issuecomment-1578291747

   Yes, but what you call "merge" is essentially a "concat (or combine into chunked arrya) + uniques + sort" ?
   
   ```
   >>> merged_chunked = pa.chunked_array([chunk for arr in [a1, a2] for chunk in arr.chunks])
   >>> merged_chunked.unique().sort()
   <pyarrow.lib.Int64Array object at 0x7f79b66b3520>
   [
     1,
     2,
     3,
     4,
     6,
     7,
     8
   ]
   ```
    
   And the question is then if this can be done more efficiently than the combination of those existing kernels, if you know the input is already unique and/or is already sorted? (or if it is worth adding a helper function that does this combination for you, as convenience)
   
   As I mentioned in the Iceberg PR, numpy has a set of of "set operation" functions for arrays (union, intersection, difference). I _think_ this case of "merging" arrays is a union set operation. In numpy, this is `np.union1d` (but just for 2 input arrays), for which the docstring says "Return the unique, sorted array of values that are in either of the two input arrays". But to note, in numpy this function is actually just implemented as `np.unique(np.concatenate(arr1, arr2))` under the hood (the `np.unique` already sorts). 
   
   One of the other set operations, `np.intersect1d`, has a keyword `assume_unique=False/True` with which you can indicate you know the input arrays are already unique, to be able to use a faster implementation (i.e. avoiding to call `unique` on the input arrays).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] [Python] Implement efficient merging of chunked arrays [arrow]

Posted by "mapleFU (via GitHub)" <gi...@apache.org>.
mapleFU commented on issue #35748:
URL: https://github.com/apache/arrow/issues/35748#issuecomment-1765922917

   Oh my bad. I've written some code in this way, since it's my own logic.
   
   ```
   a: [(1, "1"), (2, "2)]
   b: [(1, "3"), (3, "4)]
   ```
   
   concate and unique by column 0:
   
   ```
   a: [(1, "3"), (2, "2), (3, "4)]
   ```
   
   Since it contains the logic of merging, so I written the code myself.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on issue #35748: [Python] Implement efficient merging of chunked arrays

Posted by "westonpace (via GitHub)" <gi...@apache.org>.
westonpace commented on issue #35748:
URL: https://github.com/apache/arrow/issues/35748#issuecomment-1585236382

   I don't think we document this anywhere but the sort_indices function for chunked arrays works by first sorting the individual arrays and then merging.  So you can get part of the way there by combining all the chunks and using `pyarrow.compute.sort`.  This still means the individual chunks will be "resorted".  At the end of the day we use std::stable_sort for the sorting.  I don't know if it is optimized to run quickly on already-sorted arrays but SO seems to suggest it's [not terrible](https://stackoverflow.com/questions/6567326/does-stdsort-check-if-a-vector-is-already-sorted) (though I'm not sure if std::stable_sort will behave the same).
   
   Unfortunately, I don't know of any compute function or clever trick to handle the uniqueness part.
   
   It would be nice to combine all of these into a single compute vector function that works on chunked arrays if someone wanted to.
   
   ```
   import pyarrow as pa
   import pyarrow.compute as pc
   
   a1 = pa.chunked_array([[1, 2, 3], [6, 7]])
   a2 = pa.chunked_array([[4, 7, 8]])
   
   all_chunks = pa.chunked_array(a1.chunks + a2.chunks) # Cheap                                                                                                                                                       
   sort_indices = pc.sort_indices(all_chunks) # Could be cheaper, but not terrible, probably not worse than O(N*log(N))                                                                                               
   sorted_arr = pc.take(all_chunks, sort_indices) # Should be O(N), probably as fast or faster than concat                                                                                                            
   sorted_uniq_arr = pc.unique(sorted_arr) # Unfortunate we have to do this step                                                                                                                                      
   
   print(sorted_uniq_arr)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] Fokko closed issue #35748: [Python] Implement efficient merging of chunked arrays

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko closed issue #35748: [Python] Implement efficient merging of chunked arrays
URL: https://github.com/apache/arrow/issues/35748


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] [Python] Implement efficient merging of chunked arrays [arrow]

Posted by "jorisvandenbossche (via GitHub)" <gi...@apache.org>.
jorisvandenbossche commented on issue #35748:
URL: https://github.com/apache/arrow/issues/35748#issuecomment-1765974158

   So whenever there is a duplicate in the first field, you take the last occurrence of that one (i.e. use the value of the second field of the last occurrence)?
   
   In pandas this can be done with `DataFrame.drop_duplicates()` method, where you can specify which subset of columns to consider for determining duplicate rows, and then which duplicate to keep in the result (first/last). 
   
   We have an issue requesting this feature, see https://github.com/apache/arrow/issues/30950. There is some discussion there about potential workarounds. 
   
   I think one of the ideas mentioned there actually works nowadays, using groupby the key(s) you want to deduplicate, and then aggregate the remaining columns with the "last" aggregation.
   
   ```
   In [14]: batch1 = pa.RecordBatch.from_struct_array(pa.array([(1, "1"), (2, "2")], pa.struct([("a", "int64"), ("b", "string")])))
   
   In [15]: batch2 = pa.RecordBatch.from_struct_array(pa.array([(1, "3"), (3, "4")], pa.struct([("a", "int64"), ("b", "string")])))
   
   In [16]: table = pa.Table.from_batches([batch1, batch2])
   
   In [17]: table
   Out[17]: 
   pyarrow.Table
   a: int64
   b: string
   ----
   a: [[1,2],[1,3]]
   b: [["1","2"],["3","4"]]
   
   In [18]: table.group_by("a", use_threads=False).aggregate([("b", "last")])
   Out[18]: 
   pyarrow.Table
   a: int64
   b_last: string
   ----
   a: [[1,2,3]]
   b_last: [["3","2","4"]]
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org