You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by do...@apache.org on 2022/06/01 18:32:52 UTC

[arrow] branch master updated: ARROW-16704: [JS] Handle case where `tableFromIPC` input is an async `RecordBatchReader` (#13278)

This is an automated email from the ASF dual-hosted git repository.

domoritz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 5611f2bd0d ARROW-16704: [JS] Handle case where `tableFromIPC` input is an async `RecordBatchReader` (#13278)
5611f2bd0d is described below

commit 5611f2bd0d6136b005d137a84b50709fc5c813bb
Author: Paul Taylor <pa...@me.com>
AuthorDate: Wed Jun 1 11:32:46 2022 -0700

    ARROW-16704: [JS] Handle case where `tableFromIPC` input is an async `RecordBatchReader` (#13278)
    
    Authored-by: ptaylor <pa...@me.com>
    Signed-off-by: Dominik Moritz <do...@gmail.com>
---
 js/src/ipc/reader.ts                    |  2 +-
 js/src/ipc/serialization.ts             | 26 ++++++++++++++++++++------
 js/test/unit/ipc/serialization-tests.ts | 32 ++++++++++++++++++++++++++++----
 3 files changed, 49 insertions(+), 11 deletions(-)

diff --git a/js/src/ipc/reader.ts b/js/src/ipc/reader.ts
index 5b949322d9..77496e799b 100644
--- a/js/src/ipc/reader.ts
+++ b/js/src/ipc/reader.ts
@@ -147,7 +147,7 @@ export class RecordBatchReader<T extends TypeMap = any> extends ReadableInterop<
     public static from<T extends TypeMap = any>(source: FromArg1): Promise<RecordBatchStreamReader<T>>;
     public static from<T extends TypeMap = any>(source: FromArg2): RecordBatchFileReader<T> | RecordBatchStreamReader<T>;
     public static from<T extends TypeMap = any>(source: FromArg3): Promise<RecordBatchFileReader<T> | RecordBatchStreamReader<T>>;
-    public static from<T extends TypeMap = any>(source: FromArg4): Promise<RecordBatchFileReader<T> | AsyncRecordBatchReaders<T>>;
+    public static from<T extends TypeMap = any>(source: FromArg4): Promise<AsyncRecordBatchFileReader<T> | AsyncRecordBatchStreamReader<T>>;
     public static from<T extends TypeMap = any>(source: FromArg5): Promise<AsyncRecordBatchFileReader<T> | AsyncRecordBatchStreamReader<T>>;
     /** @nocollapse */
     public static from<T extends TypeMap = any>(source: any) {
diff --git a/js/src/ipc/serialization.ts b/js/src/ipc/serialization.ts
index 680babd7b2..aee4676213 100644
--- a/js/src/ipc/serialization.ts
+++ b/js/src/ipc/serialization.ts
@@ -18,22 +18,36 @@
 import { Table } from '../table.js';
 import { TypeMap } from '../type.js';
 import { isPromise } from '../util/compat.js';
-import { FromArg0, FromArg1, FromArg2, FromArg3, FromArg4, FromArg5, RecordBatchReader } from './reader.js';
+import {
+    FromArg0, FromArg1, FromArg2, FromArg3, FromArg4, FromArg5,
+    RecordBatchReader,
+    RecordBatchFileReader, RecordBatchStreamReader,
+    AsyncRecordBatchFileReader, AsyncRecordBatchStreamReader
+} from './reader.js';
 import { RecordBatchFileWriter, RecordBatchStreamWriter } from './writer.js';
 
+type RecordBatchReaders<T extends TypeMap = any> = RecordBatchFileReader<T> | RecordBatchStreamReader<T>;
+type AsyncRecordBatchReaders<T extends TypeMap = any> = AsyncRecordBatchFileReader<T> | AsyncRecordBatchStreamReader<T>;
+
 /**
  * Deserialize the IPC format into a {@link Table}. This function is a
  * convenience wrapper for {@link RecordBatchReader}. Opposite of {@link tableToIPC}.
  */
 export function tableFromIPC<T extends TypeMap = any>(source: FromArg0 | FromArg2): Table<T>;
 export function tableFromIPC<T extends TypeMap = any>(source: FromArg1): Promise<Table<T>>;
-export function tableFromIPC<T extends TypeMap = any>(source: FromArg3 | FromArg4 | FromArg5): Promise<Table<T>> | Table<T>;
+export function tableFromIPC<T extends TypeMap = any>(source: FromArg3 | FromArg4 | FromArg5): Promise<Table<T>>;
+export function tableFromIPC<T extends TypeMap = any>(source: RecordBatchReaders<T>): Table<T>;
+export function tableFromIPC<T extends TypeMap = any>(source: AsyncRecordBatchReaders<T>): Promise<Table<T>>;
+export function tableFromIPC<T extends TypeMap = any>(source: RecordBatchReader<T>): Table<T> | Promise<Table<T>>;
 export function tableFromIPC<T extends TypeMap = any>(input: any): Table<T> | Promise<Table<T>> {
-    const reader = RecordBatchReader.from<T>(input);
-    if (isPromise(reader)) {
-        return (async () => new Table(await (await reader).readAll()))();
+    const reader = RecordBatchReader.from<T>(input) as RecordBatchReader<T> | Promise<RecordBatchReader<T>>;
+    if (isPromise<RecordBatchReader<T>>(reader)) {
+        return reader.then((reader) => tableFromIPC(reader)) as Promise<Table<T>>;
+    }
+    if (reader.isAsync()) {
+        return (reader as AsyncRecordBatchReaders<T>).readAll().then((xs) => new Table(xs));
     }
-    return new Table(reader.readAll());
+    return new Table((reader as RecordBatchReaders<T>).readAll());
 }
 
 /**
diff --git a/js/test/unit/ipc/serialization-tests.ts b/js/test/unit/ipc/serialization-tests.ts
index 375b5159e6..f31a242096 100644
--- a/js/test/unit/ipc/serialization-tests.ts
+++ b/js/test/unit/ipc/serialization-tests.ts
@@ -19,9 +19,9 @@ import '../../jest-extensions.js';
 import * as generate from '../../generate-test-data.js';
 
 import {
-    Table, Schema, Field, DataType, TypeMap, Dictionary, Int32, Float32, Utf8, Null,
+    Table, Schema, Field, DataType, TypeMap, Dictionary, Int32, Float32, Uint8, Utf8, Null,
     makeVector,
-    tableFromIPC, tableToIPC
+    tableFromIPC, tableToIPC, RecordBatchReader, RecordBatchStreamWriter
 } from 'apache-arrow';
 
 const deepCopy = (t: Table) => tableFromIPC(tableToIPC(t));
@@ -37,6 +37,30 @@ function createTable<T extends TypeMap = any>(schema: Schema<T>, chunkLengths: n
     return generate.table(chunkLengths, schema).table;
 }
 
+describe('tableFromIPC', () => {
+    test('handles AsyncRecordBatchReader input', async () => {
+        type T = { a: Uint8 };
+
+        const sources = [
+            new Table({ a: makeVector(new Uint8Array([1, 2, 3])) }),
+            new Table({ a: makeVector(new Uint8Array([4, 5, 6])) }),
+        ];
+
+        const writer = sources.reduce(
+            (writer, source) => writer.writeAll(source),
+            new RecordBatchStreamWriter<T>({ autoDestroy: false })
+        );
+
+        writer.close();
+
+        let index = 0;
+
+        for await (const reader of RecordBatchReader.readAll<T>(writer[Symbol.asyncIterator]())) {
+            expect(await tableFromIPC(reader)).toEqualTable(sources[index++]);
+        }
+    });
+});
+
 describe('tableToIPC()', () => {
 
     test(`to file format`, () => {
@@ -45,7 +69,7 @@ describe('tableToIPC()', () => {
         });
         const buffer = tableToIPC(source, 'file');
         const result = tableFromIPC(buffer);
-        expect(source).toEqualTable(result);
+        expect(result).toEqualTable(source);
     });
 
     test(`to stream format`, () => {
@@ -54,7 +78,7 @@ describe('tableToIPC()', () => {
         });
         const buffer = tableToIPC(source, 'stream');
         const result = tableFromIPC(buffer);
-        expect(source).toEqualTable(result);
+        expect(result).toEqualTable(source);
     });
 
     test(`doesn't swap the order of buffers that share the same underlying ArrayBuffer but are in a different order`, () => {