You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@arrow.apache.org by "Alec Buckenheimer (Jira)" <ji...@apache.org> on 2021/05/25 13:26:00 UTC

[jira] [Updated] (ARROW-12872) `pyarrow._plasma.PlasmaClient.delete` behavior undocumented

     [ https://issues.apache.org/jira/browse/ARROW-12872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Alec Buckenheimer updated ARROW-12872:
--------------------------------------
    Component/s: Python
                 C++ - Plasma

> `pyarrow._plasma.PlasmaClient.delete` behavior undocumented
> -----------------------------------------------------------
>
>                 Key: ARROW-12872
>                 URL: https://issues.apache.org/jira/browse/ARROW-12872
>             Project: Apache Arrow
>          Issue Type: Improvement
>          Components: C++ - Plasma, Python
>    Affects Versions: 4.0.0
>            Reporter: Alec Buckenheimer
>            Priority: Major
>
> Hi all,
> I've been using plasma to speed up some multiprocessing I'm doing and have had issues where my plasma server runs out of memory even though my logs show that I should have plenty of space accounting for `size_created - size_deleted` . The documentation for plasma is [a bit scarce|#using-arrow-and-pandas-with-plasma] and does not mention how to use the `.delete` method but I'd expect that running a `.delete([oid])` would free up that space immediately which does not seem to be the case. The tests for this are [actually commented out|https://github.com/apache/arrow/blob/b4f2c1c72f745acf12e8a6d2d031750745ab2de2/python/pyarrow/tests/test_plasma.py#L592], and doing some testing of my own I've found that delete actually working is implicitly linked to some reference counter on the buffer. This kinda makes sense from the zero-copy perspective but makes it really difficult to keep a lid on the amount of data in the plasma store as its not immediately clear which refs are hanging around where. Should I be explicitly copying buffer data after pulling from plasma to make these deletes happen (if so how do i do that)?
> For what its worth I've captured this behavior inline below, if someone could just tell me if this is expected and if there's an easy work around I'd really appreciate that. I'm sure the docs could use a bit of love too.
>  
>  
> {code:java}
> import pyarrow as pa
> import pyarrow.plasma as pl
> def table_to_plasma(
>     table: pa.Table,
>     cli: pl.PlasmaClient,
> ) -> pl.ObjectID:
>     batches = table.to_batches()    size = sum(b.nbytes for b in batches)
>     # actual buffer space is a tiny bit more than the size of the tables so add
>     # some wiggle room
>     size = int(max(size * 1.01, size + 512))
>     oid = pl.ObjectID.from_random()
>     buf = cli.create(oid, size)
>     writer = pa.ipc.new_stream(
>         pa.FixedSizeBufferWriter(buf), batches[0].schema
>     )
>     for b in batches:
>         writer.write_batch(b)
>     writer.close()
>     cli.seal(oid)
>     return oid
> def table_from_plasma(
>     oid: pl.ObjectID,
>     cli:pl.PlasmaClient,
> ) -> pa.Table:
>     buf = cli.get_buffers([oid])
>     assert len(buf) == 1
>     buf = buf[0]
>     stream = pa.ipc.open_stream(buf)
>     return stream.read_all()
> def test():
>     t = pa.table([pa.array([1])], schema=pa.schema([pa.field('a', pa.int64())]))
>     with pl.start_plasma_store(int(1e8)) as (pl_name, pl_proc):
>         cli = pl.connect(pl_name)
>         oid = table_to_plasma(t, cli)
>         t2 = table_from_plasma(oid, cli)
>         assert len(t2) == len(t)
>         cli.delete([oid])
>         assert not cli.contains(oid)  # this unexpectedly fails        del t2
>         import gc
>         gc.collect()
>         assert not cli.contains(oid)  # this succeeds
> {code}
>  import pyarrow as pa
>  import pyarrow.plasma as pl
>  
> def table_to_plasma(
>  table: pa.Table,
>  cli: pl.PlasmaClient,
>  ) -> pl.ObjectID:
>  batches = table.to_batches()
> size = sum(b.nbytes for b in batches)
>  # actual buffer space is a tiny bit more than the size of the tables so add
>  # some wiggle room
>  size = int(max(size * 1.01, size + 512))
>  oid = pl.ObjectID.from_random()
>  buf = cli.create(oid, size)
>  writer = pa.ipc.new_stream(
>  pa.FixedSizeBufferWriter(buf), batches[0].schema
>  )
>  for b in batches:
>  writer.write_batch(b)
>  writer.close()
>  cli.seal(oid)
>  return oid
> def table_from_plasma(
>  oid: pl.ObjectID,
>  cli:pl.PlasmaClient,
>  ) -> pa.Table:
>  buf = cli.get_buffers([oid])
>  assert len(buf) == 1
>  buf = buf[0]
>  stream = pa.ipc.open_stream(buf)
>  return stream.read_all()
> def test():
>  t = pa.table([pa.array([1])], schema=pa.schema([pa.field('a', pa.int64())]))
>  with pl.start_plasma_store(int(1e8)) as (pl_name, pl_proc):
>  cli = pl.connect(pl_name)
>  oid = table_to_plasma(t, cli)
>  t2 = table_from_plasma(oid, cli)
>  assert len(t2) == len(t)
>  cli.delete([oid])
>  assert not cli.contains(oid) # this unexpectedly fails
> del t2
>  import gc
>  gc.collect()
>  assert not cli.contains(oid) # this succeeds
>  ```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)