You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@arrow.apache.org by John Muehlhausen <jg...@jgm.org> on 2022/12/12 14:47:52 UTC
[Java] VectorSchemaRoot? batches->table
Hello,
pyarrow.Table
from_batches(batches, Schema schema=None)
Construct a Table from a sequence or iterator of Arrow RecordBatches.
What is the equivalent of this in Java? What is the relationship between
VectorSchemaRoot, Table and RecordBatch in Java? It all seems a bit
different...
Specifically, I seem to be able to construct a VectorSchemaRoot from an
Arrow IPC stream, but I'm now wondering about the appropriate way to
combine these batches.
Thanks,
John
Re: [Java] VectorSchemaRoot? batches->table
Posted by Larry White <lj...@gmail.com>.
H John,
The different Arrow language libraries have different approaches to working
with the data as you discovered.
For processing individual batches, you should use a VectorSchemaRoot. Table
is intended for non-batch operations, especially where you want stronger
guarantees of immutability. RecordBatch is a Java representation of the
underlying Arrow format, and is very low-level, so you probably don't want
to work directly with that unless you need to work at a very low level.
If you have a large amount of data and it's possible to process the batches
individually, that is probably best. Given enough data it may be your only
option. I will come back to that below where it says limitations.
To combine data from different batches (or VSRs), you would need to combine
the vectors in each batch, then create a VSR or Table from the results.
WIth the caveat that it isn't tested, the code below should do
approximately what you want and produce a single VSR from multiple VSRs
with the same schema:
/**
* Concatenate a number of VectorSchemaRoots with the same schema.
*
* @param vectorSchemaRoots the VSRs to concatenate
*/
public static VectorSchemaRoot concatenate(BufferAllocator allocator,
List<VectorSchemaRoot> vectorSchemaRoots)
{
assert vectorSchemaRoots.size() > 0;
Schema firstSchema = vectorSchemaRoots.get(0).getSchema();
int totalRowCount = 0;
for (VectorSchemaRoot root : vectorSchemaRoots)
{
if (!root.getSchema().equals(firstSchema))
throw new IllegalArgumentException("All tables must have the same
schema");
totalRowCount += root.getRowCount();
}
final int finalTotalRowCount = totalRowCount;
FieldVector[] newVectors =
vectorSchemaRoots.get(0).getFieldVectors().stream()
.map(vec -> {
FieldVector newVector = (FieldVector)
vec.getTransferPair(allocator).getTo();
newVector.setInitialCapacity(finalTotalRowCount);
newVector.allocateNew();
newVector.setValueCount(finalTotalRowCount);
return newVector;
})
.toArray(FieldVector[]::new);
int offset = 0;
for (VectorSchemaRoot root : vectorSchemaRoots)
{
int rowCount = root.getRowCount();
for (int i = 0; i < newVectors.length; i++)
{
FieldVector oldVector = root.getVector(i);
retryCopyFrom(newVectors[i], oldVector, 0, rowCount, offset);
}
offset += rowCount;
}
return VectorSchemaRoot.of(newVectors);
}
/**
* Concatenate individual vectors.
* Instead of using copyFromSafe, which checks available memory on each
write,
* this tries to copy over the entire vector and retry if it fails.
*
* @param newVector the vector to copy to
* @param oldVector the vector to copy from
* @param oldStart the starting index in the old vector
* @param oldEnd the ending index in the old vector
* @param newStart the starting index in the new vector
*/
public static void retryCopyFrom(ValueVector newVector, ValueVector
oldVector, int oldStart, int oldEnd, int newStart)
{
while (true)
{
try
{
for (int i = oldStart; i < oldEnd; i++)
newVector.copyFrom(i, i - oldStart + newStart, oldVector);
break;
}
catch (IndexOutOfBoundsException err)
{
newVector.reAlloc();
}
}
}
You could modify the code above to return a Table by changing the last
line.
Limitations: While Vectors can be larger, a VectorSchemaRoot is limited to
Integer.MAX_VALUE rows. The same limitation is present in Table currently,
although the plan is to remove that limitation in the future, and provide
built-in support for concatenating tables so that the API would be more
like the pyarrow operation mentioned above.
Hope this is helpful.
larry
On Mon, Dec 12, 2022 at 9:48 AM John Muehlhausen <jg...@jgm.org> wrote:
> Hello,
>
> pyarrow.Table
> from_batches(batches, Schema schema=None)
> Construct a Table from a sequence or iterator of Arrow RecordBatches.
>
> What is the equivalent of this in Java? What is the relationship between
> VectorSchemaRoot, Table and RecordBatch in Java? It all seems a bit
> different...
>
> Specifically, I seem to be able to construct a VectorSchemaRoot from an
> Arrow IPC stream, but I'm now wondering about the appropriate way to
> combine these batches.
>
> Thanks,
> John
>