You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@arrow.apache.org by John Muehlhausen <jg...@jgm.org> on 2022/12/12 14:47:52 UTC

[Java] VectorSchemaRoot? batches->table

Hello,

pyarrow.Table
from_batches(batches, Schema schema=None)
Construct a Table from a sequence or iterator of Arrow RecordBatches.

What is the equivalent of this in Java?  What is the relationship between
VectorSchemaRoot, Table and RecordBatch in Java?  It all seems a bit
different...

Specifically, I seem to be able to construct a VectorSchemaRoot from an
Arrow IPC stream, but I'm now wondering about the appropriate way to
combine these batches.

Thanks,
John

Re: [Java] VectorSchemaRoot? batches->table

Posted by Larry White <lj...@gmail.com>.
H John,

The different Arrow language libraries have different approaches to working
with the data as you discovered.

For processing individual batches, you should use a VectorSchemaRoot. Table
is intended for non-batch operations, especially where you want stronger
guarantees of immutability. RecordBatch is a Java representation of the
underlying Arrow format, and is very low-level, so you probably don't want
to work directly with that unless you need to work at a very low level.

If you have a large amount of data and it's possible to process the batches
individually, that is probably best. Given enough data it may be your only
option. I will come back to that below where it says limitations.

To combine data from different batches (or VSRs), you would need to combine
the vectors in each batch, then create a VSR or Table from the results.
WIth the caveat that it isn't tested, the code below should do
approximately what you want and produce a single VSR from multiple VSRs
with the same schema:

  /**
   * Concatenate a number of VectorSchemaRoots with the same schema.
   *
   * @param vectorSchemaRoots the VSRs to concatenate
   */
  public static VectorSchemaRoot concatenate(BufferAllocator allocator,
List<VectorSchemaRoot> vectorSchemaRoots)
  {
    assert vectorSchemaRoots.size() > 0;
    Schema firstSchema = vectorSchemaRoots.get(0).getSchema();
    int totalRowCount = 0;
    for (VectorSchemaRoot root : vectorSchemaRoots)
    {
      if (!root.getSchema().equals(firstSchema))
        throw new IllegalArgumentException("All tables must have the same
schema");
      totalRowCount += root.getRowCount();
    }

    final int finalTotalRowCount = totalRowCount;
    FieldVector[] newVectors =
vectorSchemaRoots.get(0).getFieldVectors().stream()
        .map(vec -> {
          FieldVector newVector = (FieldVector)
vec.getTransferPair(allocator).getTo();
          newVector.setInitialCapacity(finalTotalRowCount);
          newVector.allocateNew();
          newVector.setValueCount(finalTotalRowCount);
          return newVector;
        })
        .toArray(FieldVector[]::new);

    int offset = 0;
    for (VectorSchemaRoot root : vectorSchemaRoots)
    {
      int rowCount = root.getRowCount();
      for (int i = 0; i < newVectors.length; i++)
      {
        FieldVector oldVector = root.getVector(i);
        retryCopyFrom(newVectors[i], oldVector, 0, rowCount, offset);
      }
      offset += rowCount;
    }
    return VectorSchemaRoot.of(newVectors);
  }

  /**
   * Concatenate individual vectors.
   * Instead of using copyFromSafe, which checks available memory on each
write,
   * this tries to copy over the entire vector and retry if it fails.
   *
   * @param newVector the vector to copy to
   * @param oldVector the vector to copy from
   * @param oldStart the starting index in the old vector
   * @param oldEnd the ending index in the old vector
   * @param newStart the starting index in the new vector
   */
  public static void retryCopyFrom(ValueVector newVector, ValueVector
oldVector, int oldStart, int oldEnd, int newStart)
  {
    while (true)
    {
      try
      {
        for (int i = oldStart; i < oldEnd; i++)
          newVector.copyFrom(i, i - oldStart + newStart, oldVector);
        break;
      }
      catch (IndexOutOfBoundsException err)
      {
        newVector.reAlloc();
      }
    }
  }

You could modify the code above to return a Table by changing the last
line.

Limitations: While Vectors can be larger, a VectorSchemaRoot is limited to
Integer.MAX_VALUE rows. The same limitation is present in Table currently,
although the plan is to remove that limitation in the future, and provide
built-in support for concatenating tables so that the API would be more
like the pyarrow operation mentioned above.

Hope this is helpful.

larry


On Mon, Dec 12, 2022 at 9:48 AM John Muehlhausen <jg...@jgm.org> wrote:

> Hello,
>
> pyarrow.Table
> from_batches(batches, Schema schema=None)
> Construct a Table from a sequence or iterator of Arrow RecordBatches.
>
> What is the equivalent of this in Java?  What is the relationship between
> VectorSchemaRoot, Table and RecordBatch in Java?  It all seems a bit
> different...
>
> Specifically, I seem to be able to construct a VectorSchemaRoot from an
> Arrow IPC stream, but I'm now wondering about the appropriate way to
> combine these batches.
>
> Thanks,
> John
>