You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by do...@apache.org on 2022/06/01 18:32:52 UTC
[arrow] branch master updated: ARROW-16704: [JS] Handle case where `tableFromIPC` input is an async `RecordBatchReader` (#13278)
This is an automated email from the ASF dual-hosted git repository.
domoritz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 5611f2bd0d ARROW-16704: [JS] Handle case where `tableFromIPC` input is an async `RecordBatchReader` (#13278)
5611f2bd0d is described below
commit 5611f2bd0d6136b005d137a84b50709fc5c813bb
Author: Paul Taylor <pa...@me.com>
AuthorDate: Wed Jun 1 11:32:46 2022 -0700
ARROW-16704: [JS] Handle case where `tableFromIPC` input is an async `RecordBatchReader` (#13278)
Authored-by: ptaylor <pa...@me.com>
Signed-off-by: Dominik Moritz <do...@gmail.com>
---
js/src/ipc/reader.ts | 2 +-
js/src/ipc/serialization.ts | 26 ++++++++++++++++++++------
js/test/unit/ipc/serialization-tests.ts | 32 ++++++++++++++++++++++++++++----
3 files changed, 49 insertions(+), 11 deletions(-)
diff --git a/js/src/ipc/reader.ts b/js/src/ipc/reader.ts
index 5b949322d9..77496e799b 100644
--- a/js/src/ipc/reader.ts
+++ b/js/src/ipc/reader.ts
@@ -147,7 +147,7 @@ export class RecordBatchReader<T extends TypeMap = any> extends ReadableInterop<
public static from<T extends TypeMap = any>(source: FromArg1): Promise<RecordBatchStreamReader<T>>;
public static from<T extends TypeMap = any>(source: FromArg2): RecordBatchFileReader<T> | RecordBatchStreamReader<T>;
public static from<T extends TypeMap = any>(source: FromArg3): Promise<RecordBatchFileReader<T> | RecordBatchStreamReader<T>>;
- public static from<T extends TypeMap = any>(source: FromArg4): Promise<RecordBatchFileReader<T> | AsyncRecordBatchReaders<T>>;
+ public static from<T extends TypeMap = any>(source: FromArg4): Promise<AsyncRecordBatchFileReader<T> | AsyncRecordBatchStreamReader<T>>;
public static from<T extends TypeMap = any>(source: FromArg5): Promise<AsyncRecordBatchFileReader<T> | AsyncRecordBatchStreamReader<T>>;
/** @nocollapse */
public static from<T extends TypeMap = any>(source: any) {
diff --git a/js/src/ipc/serialization.ts b/js/src/ipc/serialization.ts
index 680babd7b2..aee4676213 100644
--- a/js/src/ipc/serialization.ts
+++ b/js/src/ipc/serialization.ts
@@ -18,22 +18,36 @@
import { Table } from '../table.js';
import { TypeMap } from '../type.js';
import { isPromise } from '../util/compat.js';
-import { FromArg0, FromArg1, FromArg2, FromArg3, FromArg4, FromArg5, RecordBatchReader } from './reader.js';
+import {
+ FromArg0, FromArg1, FromArg2, FromArg3, FromArg4, FromArg5,
+ RecordBatchReader,
+ RecordBatchFileReader, RecordBatchStreamReader,
+ AsyncRecordBatchFileReader, AsyncRecordBatchStreamReader
+} from './reader.js';
import { RecordBatchFileWriter, RecordBatchStreamWriter } from './writer.js';
+type RecordBatchReaders<T extends TypeMap = any> = RecordBatchFileReader<T> | RecordBatchStreamReader<T>;
+type AsyncRecordBatchReaders<T extends TypeMap = any> = AsyncRecordBatchFileReader<T> | AsyncRecordBatchStreamReader<T>;
+
/**
* Deserialize the IPC format into a {@link Table}. This function is a
* convenience wrapper for {@link RecordBatchReader}. Opposite of {@link tableToIPC}.
*/
export function tableFromIPC<T extends TypeMap = any>(source: FromArg0 | FromArg2): Table<T>;
export function tableFromIPC<T extends TypeMap = any>(source: FromArg1): Promise<Table<T>>;
-export function tableFromIPC<T extends TypeMap = any>(source: FromArg3 | FromArg4 | FromArg5): Promise<Table<T>> | Table<T>;
+export function tableFromIPC<T extends TypeMap = any>(source: FromArg3 | FromArg4 | FromArg5): Promise<Table<T>>;
+export function tableFromIPC<T extends TypeMap = any>(source: RecordBatchReaders<T>): Table<T>;
+export function tableFromIPC<T extends TypeMap = any>(source: AsyncRecordBatchReaders<T>): Promise<Table<T>>;
+export function tableFromIPC<T extends TypeMap = any>(source: RecordBatchReader<T>): Table<T> | Promise<Table<T>>;
export function tableFromIPC<T extends TypeMap = any>(input: any): Table<T> | Promise<Table<T>> {
- const reader = RecordBatchReader.from<T>(input);
- if (isPromise(reader)) {
- return (async () => new Table(await (await reader).readAll()))();
+ const reader = RecordBatchReader.from<T>(input) as RecordBatchReader<T> | Promise<RecordBatchReader<T>>;
+ if (isPromise<RecordBatchReader<T>>(reader)) {
+ return reader.then((reader) => tableFromIPC(reader)) as Promise<Table<T>>;
+ }
+ if (reader.isAsync()) {
+ return (reader as AsyncRecordBatchReaders<T>).readAll().then((xs) => new Table(xs));
}
- return new Table(reader.readAll());
+ return new Table((reader as RecordBatchReaders<T>).readAll());
}
/**
diff --git a/js/test/unit/ipc/serialization-tests.ts b/js/test/unit/ipc/serialization-tests.ts
index 375b5159e6..f31a242096 100644
--- a/js/test/unit/ipc/serialization-tests.ts
+++ b/js/test/unit/ipc/serialization-tests.ts
@@ -19,9 +19,9 @@ import '../../jest-extensions.js';
import * as generate from '../../generate-test-data.js';
import {
- Table, Schema, Field, DataType, TypeMap, Dictionary, Int32, Float32, Utf8, Null,
+ Table, Schema, Field, DataType, TypeMap, Dictionary, Int32, Float32, Uint8, Utf8, Null,
makeVector,
- tableFromIPC, tableToIPC
+ tableFromIPC, tableToIPC, RecordBatchReader, RecordBatchStreamWriter
} from 'apache-arrow';
const deepCopy = (t: Table) => tableFromIPC(tableToIPC(t));
@@ -37,6 +37,30 @@ function createTable<T extends TypeMap = any>(schema: Schema<T>, chunkLengths: n
return generate.table(chunkLengths, schema).table;
}
+describe('tableFromIPC', () => {
+ test('handles AsyncRecordBatchReader input', async () => {
+ type T = { a: Uint8 };
+
+ const sources = [
+ new Table({ a: makeVector(new Uint8Array([1, 2, 3])) }),
+ new Table({ a: makeVector(new Uint8Array([4, 5, 6])) }),
+ ];
+
+ const writer = sources.reduce(
+ (writer, source) => writer.writeAll(source),
+ new RecordBatchStreamWriter<T>({ autoDestroy: false })
+ );
+
+ writer.close();
+
+ let index = 0;
+
+ for await (const reader of RecordBatchReader.readAll<T>(writer[Symbol.asyncIterator]())) {
+ expect(await tableFromIPC(reader)).toEqualTable(sources[index++]);
+ }
+ });
+});
+
describe('tableToIPC()', () => {
test(`to file format`, () => {
@@ -45,7 +69,7 @@ describe('tableToIPC()', () => {
});
const buffer = tableToIPC(source, 'file');
const result = tableFromIPC(buffer);
- expect(source).toEqualTable(result);
+ expect(result).toEqualTable(source);
});
test(`to stream format`, () => {
@@ -54,7 +78,7 @@ describe('tableToIPC()', () => {
});
const buffer = tableToIPC(source, 'stream');
const result = tableFromIPC(buffer);
- expect(source).toEqualTable(result);
+ expect(result).toEqualTable(source);
});
test(`doesn't swap the order of buffers that share the same underlying ArrayBuffer but are in a different order`, () => {