You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ks...@apache.org on 2019/03/24 18:54:17 UTC
[arrow] branch master updated: ARROW-4976: [JS] Invalidate
RecordBatchReader node/dom streams on reset()
This is an automated email from the ASF dual-hosted git repository.
kszucs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 9884b9e ARROW-4976: [JS] Invalidate RecordBatchReader node/dom streams on reset()
9884b9e is described below
commit 9884b9ea20398ba89846460e2c7ab5f2bd2164de
Author: ptaylor <pa...@me.com>
AuthorDate: Sun Mar 24 19:53:56 2019 +0100
ARROW-4976: [JS] Invalidate RecordBatchReader node/dom streams on reset()
Invalidates RecordBatchReader's internal Node or DOM streams so they can be piped to separate writable streams when reset.
Closes https://issues.apache.org/jira/browse/ARROW-4976.
This allows us to fix the EPIPE error in `arrow2csv` when piping to other processes which may close before consuming all the output:
![readtwotables](https://user-images.githubusercontent.com/178183/54714693-ca780780-4b0e-11e9-9083-2e5b04e7a226.gif)
Author: ptaylor <pa...@me.com>
Closes #3991 from trxcllnt/js/reset-reader-streams and squashes the following commits:
6b8ccd58 <ptaylor> regenerate package-lock
7ecb00c3 <ptaylor> reset reader's internal node/DOM streams, fix EPIPE regression in arrow2csv
1fe1c17a <ptaylor> update to Ix with DOM streams, remove workaround for DOM stream writer tests
---
js/package-lock.json | 99 +++++++++++++++++----------
js/package.json | 4 +-
js/src/bin/arrow2csv.ts | 98 ++++++++++++++------------
js/src/io/interfaces.ts | 4 +-
js/src/ipc/reader.ts | 2 +
js/test/unit/ipc/reader/streams-dom-tests.ts | 41 ++++++++++-
js/test/unit/ipc/reader/streams-node-tests.ts | 44 ++++++++++--
js/test/unit/ipc/writer/streams-dom-tests.ts | 17 ++---
8 files changed, 205 insertions(+), 104 deletions(-)
diff --git a/js/package-lock.json b/js/package-lock.json
index 1dc65df..7ce4165 100644
--- a/js/package-lock.json
+++ b/js/package-lock.json
@@ -1,8 +1,7 @@
{
"name": "apache-arrow",
- "version": "0.3.0",
- "lockfileVersion": 1,
"requires": true,
+ "lockfileVersion": 1,
"dependencies": {
"@babel/code-frame": {
"version": "7.0.0",
@@ -4931,7 +4930,8 @@
"ansi-regex": {
"version": "2.1.1",
"bundled": true,
- "dev": true
+ "dev": true,
+ "optional": true
},
"aproba": {
"version": "1.2.0",
@@ -4952,12 +4952,14 @@
"balanced-match": {
"version": "1.0.0",
"bundled": true,
- "dev": true
+ "dev": true,
+ "optional": true
},
"brace-expansion": {
"version": "1.1.11",
"bundled": true,
"dev": true,
+ "optional": true,
"requires": {
"balanced-match": "^1.0.0",
"concat-map": "0.0.1"
@@ -4972,17 +4974,20 @@
"code-point-at": {
"version": "1.1.0",
"bundled": true,
- "dev": true
+ "dev": true,
+ "optional": true
},
"concat-map": {
"version": "0.0.1",
"bundled": true,
- "dev": true
+ "dev": true,
+ "optional": true
},
"console-control-strings": {
"version": "1.1.0",
"bundled": true,
- "dev": true
+ "dev": true,
+ "optional": true
},
"core-util-is": {
"version": "1.0.2",
@@ -5099,7 +5104,8 @@
"inherits": {
"version": "2.0.3",
"bundled": true,
- "dev": true
+ "dev": true,
+ "optional": true
},
"ini": {
"version": "1.3.5",
@@ -5111,6 +5117,7 @@
"version": "1.0.0",
"bundled": true,
"dev": true,
+ "optional": true,
"requires": {
"number-is-nan": "^1.0.0"
}
@@ -5125,6 +5132,7 @@
"version": "3.0.4",
"bundled": true,
"dev": true,
+ "optional": true,
"requires": {
"brace-expansion": "^1.1.7"
}
@@ -5132,12 +5140,14 @@
"minimist": {
"version": "0.0.8",
"bundled": true,
- "dev": true
+ "dev": true,
+ "optional": true
},
"minipass": {
"version": "2.3.5",
"bundled": true,
"dev": true,
+ "optional": true,
"requires": {
"safe-buffer": "^5.1.2",
"yallist": "^3.0.0"
@@ -5156,6 +5166,7 @@
"version": "0.5.1",
"bundled": true,
"dev": true,
+ "optional": true,
"requires": {
"minimist": "0.0.8"
}
@@ -5236,7 +5247,8 @@
"number-is-nan": {
"version": "1.0.1",
"bundled": true,
- "dev": true
+ "dev": true,
+ "optional": true
},
"object-assign": {
"version": "4.1.1",
@@ -5248,6 +5260,7 @@
"version": "1.4.0",
"bundled": true,
"dev": true,
+ "optional": true,
"requires": {
"wrappy": "1"
}
@@ -5333,7 +5346,8 @@
"safe-buffer": {
"version": "5.1.2",
"bundled": true,
- "dev": true
+ "dev": true,
+ "optional": true
},
"safer-buffer": {
"version": "2.1.2",
@@ -5369,6 +5383,7 @@
"version": "1.0.2",
"bundled": true,
"dev": true,
+ "optional": true,
"requires": {
"code-point-at": "^1.0.0",
"is-fullwidth-code-point": "^1.0.0",
@@ -5388,6 +5403,7 @@
"version": "3.0.1",
"bundled": true,
"dev": true,
+ "optional": true,
"requires": {
"ansi-regex": "^2.0.0"
}
@@ -5431,12 +5447,14 @@
"wrappy": {
"version": "1.0.2",
"bundled": true,
- "dev": true
+ "dev": true,
+ "optional": true
},
"yallist": {
"version": "3.0.3",
"bundled": true,
- "dev": true
+ "dev": true,
+ "optional": true
}
}
},
@@ -5844,16 +5862,16 @@
}
},
"google-closure-compiler": {
- "version": "20190121.0.0",
- "resolved": "https://registry.npmjs.org/google-closure-compiler/-/google-closure-compiler-20190121.0.0.tgz",
- "integrity": "sha512-FIp3+KxjtDwykDTr1WsFo0QexEopAC4bDXXZfnEdgHECF7hCeFAAsLUPxMmj9Wx+O39eFCXGAzY7w0k5aU9qjg==",
+ "version": "20190301.0.0",
+ "resolved": "https://registry.npmjs.org/google-closure-compiler/-/google-closure-compiler-20190301.0.0.tgz",
+ "integrity": "sha512-FCtg6VsC9BhvbDLh+idMP4F3gka60KLEW0Oqw7M/vhBZnP2/aB4zzxuUDo5LOxuR+RyVqB4VyGOFnM9Z/14iVw==",
"dev": true,
"requires": {
"chalk": "^1.0.0",
- "google-closure-compiler-java": "^20190121.0.0",
- "google-closure-compiler-js": "^20190121.0.0",
- "google-closure-compiler-linux": "^20190121.0.0",
- "google-closure-compiler-osx": "^20190121.0.0",
+ "google-closure-compiler-java": "^20190301.0.0",
+ "google-closure-compiler-js": "^20190301.0.0",
+ "google-closure-compiler-linux": "^20190301.0.0",
+ "google-closure-compiler-osx": "^20190301.0.0",
"minimist": "^1.2.0",
"vinyl": "^2.0.1",
"vinyl-sourcemaps-apply": "^0.2.0"
@@ -5887,28 +5905,28 @@
}
},
"google-closure-compiler-java": {
- "version": "20190121.0.0",
- "resolved": "https://registry.npmjs.org/google-closure-compiler-java/-/google-closure-compiler-java-20190121.0.0.tgz",
- "integrity": "sha512-UCQ7ZXOlk/g101DS4TqyW+SaoR+4GVq7NKrwebH4gnESY76Xuz7FRrKWwfAXwltmiYAUVZCVI4qpoEz48V+VjA==",
+ "version": "20190301.0.0",
+ "resolved": "https://registry.npmjs.org/google-closure-compiler-java/-/google-closure-compiler-java-20190301.0.0.tgz",
+ "integrity": "sha512-IMv77Mu1chPjSaJC1PWyKSNIvm19nSjx4oXvf67ZBLRkuPKHb3S1ECD3l71pfxNZ2+2tAXnxkEcWcREJ8ph4Tg==",
"dev": true
},
"google-closure-compiler-js": {
- "version": "20190121.0.0",
- "resolved": "https://registry.npmjs.org/google-closure-compiler-js/-/google-closure-compiler-js-20190121.0.0.tgz",
- "integrity": "sha512-PgY0Fy+fXZnjir6aPz/FVJPXuwZf5pKJ9n7Hf1HL4x1lhqVIf3i+u3Ed6ZWCXa+YiEhvwH5RTQr/iPP/D3gDRg==",
+ "version": "20190301.0.0",
+ "resolved": "https://registry.npmjs.org/google-closure-compiler-js/-/google-closure-compiler-js-20190301.0.0.tgz",
+ "integrity": "sha512-J0HVHwpGf3o5MwyifrYhfhNpD7Zznn+fktcKKmwhguKqaNbgCr1AfnaGEarej3Lx1W9CouJEm5OTRTZRJgvRHQ==",
"dev": true
},
"google-closure-compiler-linux": {
- "version": "20190121.0.0",
- "resolved": "https://registry.npmjs.org/google-closure-compiler-linux/-/google-closure-compiler-linux-20190121.0.0.tgz",
- "integrity": "sha512-cw4qr9TuB2gB53l/oYadZLuw+zOi2yggYFtnNA5jvTLTqY8m2VZAL5DGL6gmCtZovbQ0bv9ANqjT8NxEtcSzfw==",
+ "version": "20190301.0.0",
+ "resolved": "https://registry.npmjs.org/google-closure-compiler-linux/-/google-closure-compiler-linux-20190301.0.0.tgz",
+ "integrity": "sha512-r+47izRha1ZOHP8E5wq7YsjatzJVD0yn/7dnZA/jSJmTxoFDfEaV78PYGAgCpL8kslHHApPDFEn9Ozx2eSH2gg==",
"dev": true,
"optional": true
},
"google-closure-compiler-osx": {
- "version": "20190121.0.0",
- "resolved": "https://registry.npmjs.org/google-closure-compiler-osx/-/google-closure-compiler-osx-20190121.0.0.tgz",
- "integrity": "sha512-6OqyUcgojPCqCuzdyKLwmIkBhfoWF3cVzaX8vaJvQ3SYwlITBT3aepMEZiWFRVvvml+ojs1AJcZvQIqFke8X1w==",
+ "version": "20190301.0.0",
+ "resolved": "https://registry.npmjs.org/google-closure-compiler-osx/-/google-closure-compiler-osx-20190301.0.0.tgz",
+ "integrity": "sha512-W/Mub4k7oKcd1XYIae0NrJysNvpiAjXhq0DCoTJaTZzkc8dGVqcvrQ/YqYNwLkUULqL1dsrYyt3jv1X6l9OqZw==",
"dev": true,
"optional": true
},
@@ -7037,14 +7055,21 @@
}
},
"ix": {
- "version": "2.5.1",
- "resolved": "https://registry.npmjs.org/ix/-/ix-2.5.1.tgz",
- "integrity": "sha512-YPX759NbhmIynoCYsxcpKBCQDFkeVup4xGaAylnIRaM+md7qrLyoW7kow0iqx4cJr8PUG85/cfwfjylqehg8bQ==",
+ "version": "2.5.2",
+ "resolved": "https://registry.npmjs.org/ix/-/ix-2.5.2.tgz",
+ "integrity": "sha512-RY0NlNdA5frPtUDx9Bctw/L8OTMUgb7OqOa+Z2r9XqiWnUAjLM6xZ4hj9M+/NNI1NLAsXju5WUFyZC9l2kqnMQ==",
"dev": true,
"requires": {
- "@types/node": "^10.12.18",
- "is-stream": "1.1.0",
+ "@types/node": "^11.10.4",
"tslib": "^1.9.3"
+ },
+ "dependencies": {
+ "@types/node": {
+ "version": "11.11.4",
+ "resolved": "https://registry.npmjs.org/@types/node/-/node-11.11.4.tgz",
+ "integrity": "sha512-02tIL+QIi/RW4E5xILdoAMjeJ9kYq5t5S2vciUdFPXv/ikFTb0zK8q9vXkg4+WAJuYXGiVT1H28AkD2C+IkXVw==",
+ "dev": true
+ }
}
},
"jest": {
diff --git a/js/package.json b/js/package.json
index e6ec2ed..6a29003 100644
--- a/js/package.json
+++ b/js/package.json
@@ -78,13 +78,13 @@
"del": "3.0.0",
"esm": "3.1.4",
"glob": "7.1.3",
- "google-closure-compiler": "20190121.0.0",
+ "google-closure-compiler": "20190301.0.0",
"gulp": "4.0.0",
"gulp-json-transform": "0.4.6",
"gulp-rename": "1.4.0",
"gulp-sourcemaps": "2.6.4",
"gulp-typescript": "5.0.0",
- "ix": "2.5.1",
+ "ix": "2.5.2",
"jest": "23.6.0",
"jest-environment-node-debug": "2.0.0",
"jest-silent-reporter": "0.1.1",
diff --git a/js/src/bin/arrow2csv.ts b/js/src/bin/arrow2csv.ts
index b86f852..1cceb0f 100644
--- a/js/src/bin/arrow2csv.ts
+++ b/js/src/bin/arrow2csv.ts
@@ -27,7 +27,6 @@ import { Schema } from '../schema';
const padLeft = require('pad-left');
const bignumJSONParse = require('json-bignum').parse;
-const pipeline = require('util').promisify(stream.pipeline);
const argv = require(`command-line-args`)(cliOpts(), { partial: true });
const files = argv.help ? [] : [...(argv.file || []), ...(argv._unknown || [])].filter(Boolean);
@@ -56,11 +55,11 @@ type ToStringState = {
if (state.closed) { break; }
for await (reader of recordBatchReaders(source)) {
hasReaders = true;
- const source = reader.toNodeStream();
- const xform = batchesToString(state, reader.schema);
- const sink = new stream.PassThrough();
- sink.pipe(process.stdout, { end: false });
- await pipeline(source, xform, sink).catch(() => state.closed = true);
+ const transformToString = batchesToString(state, reader.schema);
+ await pipeTo(
+ reader.pipe(transformToString),
+ process.stdout, { end: false }
+ ).catch(() => state.closed = true); // Handle EPIPE errors
}
if (state.closed) { break; }
}
@@ -74,6 +73,21 @@ type ToStringState = {
return process.exitCode || 1;
}).then((code) => process.exit(code));
+function pipeTo(source: NodeJS.ReadableStream, sink: NodeJS.WritableStream, opts?: { end: boolean }) {
+ return new Promise((resolve, reject) => {
+
+ source.on('end', onEnd).pipe(sink, opts).on('error', onErr);
+
+ function onEnd() { done(undefined, resolve); }
+ function onErr(err:any) { done(err, reject); }
+ function done(e: any, cb: (e?: any) => void) {
+ source.removeListener('end', onEnd);
+ sink.removeListener('error', onErr);
+ cb(e);
+ }
+ });
+}
+
async function *recordBatchReaders(createSourceStream: () => NodeJS.ReadableStream) {
let json = new AsyncByteQueue();
@@ -117,11 +131,10 @@ function batchesToString(state: ToStringState, schema: Schema) {
state.maxColWidths = header.map((x, i) => Math.max(maxColWidths[i] || 0, x.length));
return new stream.Transform({
- transform,
encoding: 'utf8',
writableObjectMode: true,
readableObjectMode: false,
- final(this: stream.Transform, cb: (error?: Error | null) => void) {
+ final(cb: (error?: Error | null) => void) {
// if there were no batches, then print the Schema, and metadata
if (batchId === -1) {
this.push(`${horizontalRule(state.maxColWidths, hr, sep)}\n\n`);
@@ -132,46 +145,45 @@ function batchesToString(state: ToStringState, schema: Schema) {
}
this.push(`${horizontalRule(state.maxColWidths, hr, sep)}\n\n`);
cb();
- }
- });
-
- function transform(this: stream.Transform, batch: RecordBatch, _enc: string, cb: (error?: Error, data?: any) => void) {
-
- batch = !(state.schema && state.schema.length) ? batch : batch.select(...state.schema);
-
- if (state.closed) { return cb(undefined, null); }
-
- // Pass one to convert to strings and count max column widths
- state.maxColWidths = measureColumnWidths(rowId, batch, header.map((x, i) => Math.max(maxColWidths[i] || 0, x.length)));
-
- // If this is the first batch in a stream, print a top horizontal rule, schema metadata, and
- if (++batchId === 0) {
- this.push(`${horizontalRule(state.maxColWidths, hr, sep)}\n`);
- if (state.metadata && batch.schema.metadata.size > 0) {
- this.push(`metadata:\n${formatMetadata(batch.schema.metadata)}\n`);
+ },
+ transform(batch: RecordBatch, _enc: string, cb: (error?: Error, data?: any) => void) {
+
+ batch = !(state.schema && state.schema.length) ? batch : batch.select(...state.schema);
+
+ if (state.closed) { return cb(undefined, null); }
+
+ // Pass one to convert to strings and count max column widths
+ state.maxColWidths = measureColumnWidths(rowId, batch, header.map((x, i) => Math.max(maxColWidths[i] || 0, x.length)));
+
+ // If this is the first batch in a stream, print a top horizontal rule, schema metadata, and
+ if (++batchId === 0) {
this.push(`${horizontalRule(state.maxColWidths, hr, sep)}\n`);
+ if (state.metadata && batch.schema.metadata.size > 0) {
+ this.push(`metadata:\n${formatMetadata(batch.schema.metadata)}\n`);
+ this.push(`${horizontalRule(state.maxColWidths, hr, sep)}\n`);
+ }
+ if (batch.length <= 0 || batch.numCols <= 0) {
+ this.push(`${formatRow(header, maxColWidths = state.maxColWidths, sep)}\n`);
+ }
}
- if (batch.length <= 0 || batch.numCols <= 0) {
- this.push(`${formatRow(header, maxColWidths = state.maxColWidths, sep)}\n`);
- }
- }
-
- if (batch.length > 0 && batch.numCols > 0) {
- // If any of the column widths changed, print the header again
- if (rowId % 350 !== 0 && JSON.stringify(state.maxColWidths) !== JSON.stringify(maxColWidths)) {
- this.push(`${formatRow(header, state.maxColWidths, sep)}\n`);
- }
- maxColWidths = state.maxColWidths;
- for (const row of batch) {
- if (state.closed) { break; } else if (!row) { continue; }
- if (rowId++ % 350 === 0) {
- this.push(`${formatRow(header, maxColWidths, sep)}\n`);
+
+ if (batch.length > 0 && batch.numCols > 0) {
+ // If any of the column widths changed, print the header again
+ if (rowId % 350 !== 0 && JSON.stringify(state.maxColWidths) !== JSON.stringify(maxColWidths)) {
+ this.push(`${formatRow(header, state.maxColWidths, sep)}\n`);
+ }
+ maxColWidths = state.maxColWidths;
+ for (const row of batch) {
+ if (state.closed) { break; } else if (!row) { continue; }
+ if (rowId++ % 350 === 0) {
+ this.push(`${formatRow(header, maxColWidths, sep)}\n`);
+ }
+ this.push(`${formatRow([rowId, ...row].map(valueToString), maxColWidths, sep)}\n`);
}
- this.push(`${formatRow([rowId, ...row].map(valueToString), maxColWidths, sep)}\n`);
}
+ cb();
}
- cb();
- }
+ });
}
function horizontalRule(maxColWidths: number[], hr = '-', sep = ' |') {
diff --git a/js/src/io/interfaces.ts b/js/src/io/interfaces.ts
index 9892562..e057c2d 100644
--- a/js/src/io/interfaces.ts
+++ b/js/src/io/interfaces.ts
@@ -81,12 +81,12 @@ export abstract class ReadableInterop<T> {
return this._getDOMStream().pipeThrough(duplex, options);
}
- private _DOMStream?: ReadableStream<T>;
+ protected _DOMStream?: ReadableStream<T>;
private _getDOMStream() {
return this._DOMStream || (this._DOMStream = this.toDOMStream());
}
- private _nodeStream?: import('stream').Readable;
+ protected _nodeStream?: import('stream').Readable;
private _getNodeStream() {
return this._nodeStream || (this._nodeStream = this.toNodeStream());
}
diff --git a/js/src/ipc/reader.ts b/js/src/ipc/reader.ts
index 6ef9dc4..78c4036 100644
--- a/js/src/ipc/reader.ts
+++ b/js/src/ipc/reader.ts
@@ -95,6 +95,8 @@ export class RecordBatchReader<T extends { [key: string]: DataType } = any> exte
}
public reset(schema?: Schema<T> | null): this {
this._impl.reset(schema);
+ this._DOMStream = undefined;
+ this._nodeStream = undefined;
return this;
}
public open(options?: OpenOptions) {
diff --git a/js/test/unit/ipc/reader/streams-dom-tests.ts b/js/test/unit/ipc/reader/streams-dom-tests.ts
index e471867..27aaee9 100644
--- a/js/test/unit/ipc/reader/streams-dom-tests.ts
+++ b/js/test/unit/ipc/reader/streams-dom-tests.ts
@@ -110,6 +110,43 @@ import { ArrowIOTestHelper, readableDOMStreamToAsyncIterator } from '../helpers'
});
}
+ it('readAll() should pipe to separate WhatWG WritableStreams', async () => {
+
+ expect.hasAssertions();
+
+ const tables = [...generateRandomTables([10, 20, 30])];
+
+ const stream = concatStream(tables.map((table, i) =>
+ RecordBatchStreamWriter.writeAll(table).toDOMStream({
+ // Alternate between bytes mode and regular mode because code coverage
+ type: i % 2 === 0 ? 'bytes' : undefined
+ })
+ )) as ReadableStream<Uint8Array>;
+
+ let tableIndex = -1;
+ let reader: RecordBatchReader | undefined;
+
+ for await (reader of RecordBatchReader.readAll(stream)) {
+
+ validateStreamState(reader, stream, false);
+
+ const output = reader
+ .pipeThrough(RecordBatchStreamWriter.throughDOM())
+ .pipeThrough(new TransformStream());
+
+ validateStreamState(reader, output, false, false);
+
+ const sourceTable = tables[++tableIndex];
+ const streamTable = await Table.from(output);
+ expect(streamTable).toEqualTable(sourceTable);
+ expect(output.locked).toBe(false);
+ }
+
+ expect(reader).toBeDefined();
+ validateStreamState(reader!, stream, true);
+ expect(tableIndex).toBe(tables.length - 1);
+ });
+
it('should not close the underlying WhatWG ReadableStream when reading multiple tables to completion', async () => {
expect.hasAssertions();
@@ -183,7 +220,7 @@ import { ArrowIOTestHelper, readableDOMStreamToAsyncIterator } from '../helpers'
});
})();
-function validateStreamState(reader: RecordBatchReader, stream: ReadableStream, closed: boolean) {
+function validateStreamState(reader: RecordBatchReader, stream: ReadableStream, closed: boolean, locked = !closed) {
expect(reader.closed).toBe(closed);
- expect(stream.locked).toBe(!closed);
+ expect(stream.locked).toBe(locked);
}
diff --git a/js/test/unit/ipc/reader/streams-node-tests.ts b/js/test/unit/ipc/reader/streams-node-tests.ts
index 62c947a..b03b68c 100644
--- a/js/test/unit/ipc/reader/streams-node-tests.ts
+++ b/js/test/unit/ipc/reader/streams-node-tests.ts
@@ -36,12 +36,12 @@ import { validateRecordBatchAsyncIterator } from '../validate';
}
/* tslint:disable */
- const stream = require('stream');
+ const { Readable, PassThrough } = require('stream');
/* tslint:disable */
const { parse: bignumJSONParse } = require('json-bignum');
/* tslint:disable */
const concatStream = ((multistream) => (...xs: any[]) =>
- new stream.Readable().wrap(multistream(...xs))
+ new Readable().wrap(multistream(...xs))
)(require('multistream'));
for (const table of generateRandomTables([10, 20, 30])) {
@@ -113,6 +113,40 @@ import { validateRecordBatchAsyncIterator } from '../validate';
});
}
+ it('readAll() should pipe to separate NodeJS WritableStreams', async () => {
+
+ expect.hasAssertions();
+
+ const tables = [...generateRandomTables([10, 20, 30])];
+
+ const stream = concatStream(tables.map((table) =>
+ () => RecordBatchStreamWriter.writeAll(table).toNodeStream()
+ )) as NodeJS.ReadableStream;
+
+ let tableIndex = -1;
+ let reader: RecordBatchReader | undefined;
+
+ for await (reader of RecordBatchReader.readAll(stream)) {
+
+ validateStreamState(reader, stream, false);
+
+ const output = reader
+ .pipe(RecordBatchStreamWriter.throughNode())
+ .pipe(new PassThrough());
+
+ validateStreamState(reader, output, false);
+
+ const sourceTable = tables[++tableIndex];
+ const streamTable = await Table.from(output);
+ expect(streamTable).toEqualTable(sourceTable);
+ expect(Boolean(output.readableFlowing)).toBe(false);
+ }
+
+ expect(reader).toBeDefined();
+ validateStreamState(reader!, stream, true);
+ expect(tableIndex).toBe(tables.length - 1);
+ });
+
it('should not close the underlying NodeJS ReadableStream when reading multiple tables to completion', async () => {
expect.hasAssertions();
@@ -182,7 +216,7 @@ import { validateRecordBatchAsyncIterator } from '../validate';
function validateStreamState(reader: RecordBatchReader, stream: NodeJS.ReadableStream, closed: boolean, readable = !closed) {
expect(reader.closed).toBe(closed);
- expect(stream.readable).toBe(readable);
- expect((stream as any).destroyed).toBe(closed);
- expect((stream as any).readableFlowing).toBe(false);
+ expect(Boolean(stream.readable)).toBe(readable);
+ expect(Boolean((stream as any).destroyed)).toBe(closed);
+ expect(Boolean((stream as any).readableFlowing)).toBe(false);
}
diff --git a/js/test/unit/ipc/writer/streams-dom-tests.ts b/js/test/unit/ipc/writer/streams-dom-tests.ts
index cc6e961..ba3f0a4 100644
--- a/js/test/unit/ipc/writer/streams-dom-tests.ts
+++ b/js/test/unit/ipc/writer/streams-dom-tests.ts
@@ -32,7 +32,6 @@ import {
} from '../../../Arrow';
import {
- nodeToDOMStream,
ArrowIOTestHelper,
concatBuffersAsync,
readableDOMStreamToAsyncIterator
@@ -51,9 +50,6 @@ import {
}
/* tslint:disable */
- const { PassThrough } = require('stream');
-
- /* tslint:disable */
const { parse: bignumJSONParse } = require('json-bignum');
for (const table of generateRandomTables([10, 20, 30])) {
@@ -233,19 +229,16 @@ import {
describe(`RecordBatchStreamWriter.throughDOM`, () => {
- const psOpts = { objectMode: true };
const opts = { autoDestroy: false };
const sleep = (n: number) => new Promise((r) => setTimeout(r, n));
it(`should write a stream of tables to the same output stream`, async () => {
const tables = [] as Table[];
- const stream = (AsyncIterable.from(generateRandomTables([10, 20, 30]))
+ const stream = AsyncIterable.from(generateRandomTables([10, 20, 30]))
// insert some asynchrony
.tap({ async next(table) { tables.push(table); await sleep(1); } })
- // have to bail out to `any` until Ix supports DOM streams
- .pipe((xs: any) => <any> nodeToDOMStream(xs.pipe(new PassThrough(psOpts)))) as any)
- .pipeThrough(RecordBatchStreamWriter.throughDOM(opts)) as ReadableStream<Uint8Array>;
+ .pipeThrough(RecordBatchStreamWriter.throughDOM(opts));
for await (const reader of RecordBatchReader.readAll(stream)) {
const sourceTable = tables.shift()!;
@@ -260,14 +253,12 @@ import {
it(`should write a stream of record batches to the same output stream`, async () => {
const tables = [] as Table[];
- const stream = (AsyncIterable.from(generateRandomTables([10, 20, 30]))
+ const stream = AsyncIterable.from(generateRandomTables([10, 20, 30]))
// insert some asynchrony
.tap({ async next(table) { tables.push(table); await sleep(1); } })
// flatMap from Table -> RecordBatches[]
.flatMap((table) => AsyncIterable.as(table.chunks))
- // have to bail out to `any` until Ix supports DOM streams
- .pipe((xs: any) => <any> nodeToDOMStream(xs.pipe(new PassThrough(psOpts)))) as any)
- .pipeThrough(RecordBatchStreamWriter.throughDOM(opts)) as ReadableStream<Uint8Array>;
+ .pipeThrough(RecordBatchStreamWriter.throughDOM(opts));
for await (const reader of RecordBatchReader.readAll(stream)) {
const sourceTable = tables.shift()!;