You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@arrow.apache.org by 鲁立君 <lu...@icloud.com> on 2021/02/23 08:45:48 UTC
[Java] Is it possible to put a list of ArrowRecordBatch to Plasma
with one singe id?
Hi,
I am an beginner of Arrow and Plasma, I am trying to use them in my project to improve performance.
I will slice the origin VectorSchemaRoot and get many sliced VectorSchemaRoot in my project, and then I will store these VectorSchemaRoot into Plasma and the objectIds are the output of my task.
Now I have a problem, for example, If I need to sort multiple VectorSchemaRoot globally and output it sequentially, my solution is to build a secondary index, sort it, and slice the raw data then output. So I will get a lot of VectorSchemaRoot and a lot of objectId. This puts a lot of pressure on my metadata store.
I have a preliminary understanding of the principle of using Plasma to store ArrowRecordBatch, I think it's a process of serializing ArrowRecordBatch and deserializing it after reading it. However, the serialization and deserialization utility classes currently provided by Arrow only allow storing or reading a single ArrowRecordBatch.
Here is my sample code
// store a recordBatchList and get a list of objectId
public List<String> storeRecordBatch(Schema schema, List<ArrowRecordBatch> recordBatchList) throws IOException {
List<String> result = new ArrayList<>();
for (ArrowRecordBatch recordBatch : recordBatchList) {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
WritableByteChannel writableByteChannel = Channels.newChannel(byteArrayOutputStream);
WriteChannel writeChannel = new WriteChannel(writableByteChannel);
MessageSerializer.serialize(writeChannel, schema);
MessageSerializer.serialize(writeChannel, recordBatch);
byte[] id = new byte[20];
do {
new Random().nextBytes(id);
} while (plasmaClient.contains(id));
plasmaClient.put(id, byteArrayOutputStream.toByteArray(), null);
assert plasmaClient.contains(id);
String objectId = ByteBufUtil.hexDump(id);
result.add(objectId);
}
return result;
}
// get a list of ArrowRecordBatch with a list of objectId
public List<ArrowRecordBatch> getRecordBatches(List<String> blockIdList, BufferAllocator allocator) throws IOException {
List<ArrowRecordBatch> recordBatchList = new ArrayList<>();
Schema schema = null;
for (String blockIdStr : blockIdList) {
byte[] blockId = ByteBufUtil.decodeHexDump(blockIdStr);
if (!plasmaClient.contains(blockId)) {
throw new CIMemStoreException(CiMmeStoreResultEnum.BlockNotFoundException);
}
ByteBuffer buffer = plasmaClient.getObjAsByteBuffer(blockId, 0, false);
byte[] bytes = new byte[buffer.capacity()];
buffer.get(bytes);
plasmaClient.release(blockId);
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
ReadableByteChannel readableByteChannel = Channels.newChannel(byteArrayInputStream);
ReadChannel readChannel = new ReadChannel(readableByteChannel);
schema = MessageSerializer.deserializeSchema(readChannel);
ArrowRecordBatch arrowRecordBatch = MessageSerializer.deserializeRecordBatch(readChannel, allocator);
recordBatchList.add(arrowRecordBatch);
}
return recordBatchList;
}
Is there a way to put a list of ArrowRecordBatch to Plasma with one singe id?
Re: [Java] Is it possible to put a list of ArrowRecordBatch to Plasma
with one singe id?
Posted by Micah Kornfield <em...@gmail.com>.
I think Plasma is a generic object store, so you could potentially
serialize all the data to a IPC stream file and store that.
On Tue, Feb 23, 2021 at 12:46 AM 鲁立君 <lu...@icloud.com> wrote:
> Hi,
>
> I am an beginner of Arrow and Plasma, I am trying to use them in my
> project to improve performance.
>
> I will slice the origin VectorSchemaRoot and get many
> sliced VectorSchemaRoot in my project, and then I will store these
> VectorSchemaRoot into Plasma and the objectIds are the output of my task.
>
> Now I have a problem, for example, If I need to sort multiple
> VectorSchemaRoot globally and output it sequentially, my solution is to
> build a secondary index, sort it, and slice the raw data then output. So I
> will get a lot of VectorSchemaRoot and a lot of objectId. This puts a lot
> of pressure on my metadata store.
>
> I have a preliminary understanding of the principle of using Plasma to
> store ArrowRecordBatch, I think it's a process of serializing
> ArrowRecordBatch and deserializing it after reading it. However, the
> serialization and deserialization utility classes currently provided by
> Arrow only allow storing or reading a single ArrowRecordBatch.
>
> Here is my sample code
>
> // store a recordBatchList and get a list of objectId
>
> public List<String> storeRecordBatch(Schema schema, List<ArrowRecordBatch> recordBatchList) throws IOException {
> List<String> result = new ArrayList<>();
> for (ArrowRecordBatch recordBatch : recordBatchList) {
> ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
> WritableByteChannel writableByteChannel = Channels.newChannel(byteArrayOutputStream);
> WriteChannel writeChannel = new WriteChannel(writableByteChannel);
> MessageSerializer.serialize(writeChannel, schema);
> MessageSerializer.serialize(writeChannel, recordBatch);
>
> byte[] id = new byte[20];
> do {
> new Random().nextBytes(id);
> } while (plasmaClient.contains(id));
>
> plasmaClient.put(id, byteArrayOutputStream.toByteArray(), null);
> assert plasmaClient.contains(id);
> String objectId = ByteBufUtil.hexDump(id);
> result.add(objectId);
> }
> return result;
> }
>
>
> // get a list of ArrowRecordBatch with a list of objectId
>
> public List<ArrowRecordBatch> getRecordBatches(List<String> blockIdList, BufferAllocator allocator) throws IOException {
>
> List<ArrowRecordBatch> recordBatchList = new ArrayList<>();
> Schema schema = null;
> for (String blockIdStr : blockIdList) {
> byte[] blockId = ByteBufUtil.decodeHexDump(blockIdStr);
> if (!plasmaClient.contains(blockId)) {
> throw new CIMemStoreException(CiMmeStoreResultEnum.BlockNotFoundException);
> }
> ByteBuffer buffer = plasmaClient.getObjAsByteBuffer(blockId, 0, false);
> byte[] bytes = new byte[buffer.capacity()];
> buffer.get(bytes);
> plasmaClient.release(blockId);
> ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
> ReadableByteChannel readableByteChannel = Channels.newChannel(byteArrayInputStream);
> ReadChannel readChannel = new ReadChannel(readableByteChannel);
> schema = MessageSerializer.deserializeSchema(readChannel);
> ArrowRecordBatch arrowRecordBatch = MessageSerializer.deserializeRecordBatch(readChannel, allocator);
> recordBatchList.add(arrowRecordBatch);
>
> }
> return recordBatchList;
> }
>
>
> Is there a way to put a list of ArrowRecordBatch to Plasma with one singe
> id?
>
>
>
>
>
>
>