You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@arrow.apache.org by "Alec Buckenheimer (Jira)" <ji...@apache.org> on 2021/05/25 13:24:00 UTC

[jira] [Created] (ARROW-12872) `pyarrow._plasma.PlasmaClient.delete` behavior undocumented

Alec Buckenheimer created ARROW-12872:
-----------------------------------------

             Summary: `pyarrow._plasma.PlasmaClient.delete` behavior undocumented
                 Key: ARROW-12872
                 URL: https://issues.apache.org/jira/browse/ARROW-12872
             Project: Apache Arrow
          Issue Type: Improvement
            Reporter: Alec Buckenheimer


Hi all,

I've been using plasma to speed up some multiprocessing I'm doing and have had issues where my plasma server runs out of memory even though my logs show that I should have plenty of space accounting for `size_created - size_deleted` . The documentation for plasma is [a bit scarce|#using-arrow-and-pandas-with-plasma] and does not mention how to use the `.delete` method but I'd expect that running a `.delete([oid])` would free up that space immediately which does not seem to be the case. The tests for this are [actually commented out|https://github.com/apache/arrow/blob/b4f2c1c72f745acf12e8a6d2d031750745ab2de2/python/pyarrow/tests/test_plasma.py#L592], and doing some testing of my own I've found that delete actually working is implicitly linked to some reference counter on the buffer. This kinda makes sense from the zero-copy perspective but makes it really difficult to keep a lid on the amount of data in the plasma store as its not immediately clear which refs are hanging around where. Should I be explicitly copying buffer data after pulling from plasma to make these deletes happen (if so how do i do that)?

For what its worth I've captured this behavior inline below, if someone could just tell me if this is expected and if there's an easy work around I'd really appreciate that. I'm sure the docs could use a bit of love too.

 

 
{code:java}
import pyarrow as pa
import pyarrow.plasma as pl
def table_to_plasma(
    table: pa.Table,
    cli: pl.PlasmaClient,
) -> pl.ObjectID:
    batches = table.to_batches()    size = sum(b.nbytes for b in batches)
    # actual buffer space is a tiny bit more than the size of the tables so add
    # some wiggle room
    size = int(max(size * 1.01, size + 512))
    oid = pl.ObjectID.from_random()
    buf = cli.create(oid, size)
    writer = pa.ipc.new_stream(
        pa.FixedSizeBufferWriter(buf), batches[0].schema
    )
    for b in batches:
        writer.write_batch(b)
    writer.close()
    cli.seal(oid)
    return oid
def table_from_plasma(
    oid: pl.ObjectID,
    cli:pl.PlasmaClient,
) -> pa.Table:
    buf = cli.get_buffers([oid])
    assert len(buf) == 1
    buf = buf[0]
    stream = pa.ipc.open_stream(buf)
    return stream.read_all()
def test():
    t = pa.table([pa.array([1])], schema=pa.schema([pa.field('a', pa.int64())]))
    with pl.start_plasma_store(int(1e8)) as (pl_name, pl_proc):
        cli = pl.connect(pl_name)
        oid = table_to_plasma(t, cli)
        t2 = table_from_plasma(oid, cli)
        assert len(t2) == len(t)
        cli.delete([oid])
        assert not cli.contains(oid)  # this unexpectedly fails        del t2
        import gc
        gc.collect()
        assert not cli.contains(oid)  # this succeeds

{code}

 import pyarrow as pa
 import pyarrow.plasma as pl

 

def table_to_plasma(
 table: pa.Table,
 cli: pl.PlasmaClient,
 ) -> pl.ObjectID:
 batches = table.to_batches()

size = sum(b.nbytes for b in batches)
 # actual buffer space is a tiny bit more than the size of the tables so add
 # some wiggle room
 size = int(max(size * 1.01, size + 512))
 oid = pl.ObjectID.from_random()
 buf = cli.create(oid, size)
 writer = pa.ipc.new_stream(
 pa.FixedSizeBufferWriter(buf), batches[0].schema
 )
 for b in batches:
 writer.write_batch(b)
 writer.close()
 cli.seal(oid)
 return oid

def table_from_plasma(
 oid: pl.ObjectID,
 cli:pl.PlasmaClient,
 ) -> pa.Table:
 buf = cli.get_buffers([oid])
 assert len(buf) == 1
 buf = buf[0]
 stream = pa.ipc.open_stream(buf)
 return stream.read_all()

def test():
 t = pa.table([pa.array([1])], schema=pa.schema([pa.field('a', pa.int64())]))
 with pl.start_plasma_store(int(1e8)) as (pl_name, pl_proc):
 cli = pl.connect(pl_name)
 oid = table_to_plasma(t, cli)
 t2 = table_from_plasma(oid, cli)
 assert len(t2) == len(t)
 cli.delete([oid])
 assert not cli.contains(oid) # this unexpectedly fails

del t2
 import gc
 gc.collect()
 assert not cli.contains(oid) # this succeeds
 ```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)