You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ks...@apache.org on 2019/03/24 18:54:17 UTC

[arrow] branch master updated: ARROW-4976: [JS] Invalidate RecordBatchReader node/dom streams on reset()

This is an automated email from the ASF dual-hosted git repository.

kszucs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 9884b9e  ARROW-4976: [JS] Invalidate RecordBatchReader node/dom streams on reset()
9884b9e is described below

commit 9884b9ea20398ba89846460e2c7ab5f2bd2164de
Author: ptaylor <pa...@me.com>
AuthorDate: Sun Mar 24 19:53:56 2019 +0100

    ARROW-4976: [JS] Invalidate RecordBatchReader node/dom streams on reset()
    
    Invalidates RecordBatchReader's internal Node or DOM streams so they can be piped to separate writable streams when reset.
    
    Closes https://issues.apache.org/jira/browse/ARROW-4976.
    
    This allows us to fix the EPIPE error in `arrow2csv` when piping to other processes which may close before consuming all the output:
    ![readtwotables](https://user-images.githubusercontent.com/178183/54714693-ca780780-4b0e-11e9-9083-2e5b04e7a226.gif)
    
    Author: ptaylor <pa...@me.com>
    
    Closes #3991 from trxcllnt/js/reset-reader-streams and squashes the following commits:
    
    6b8ccd58 <ptaylor> regenerate package-lock
    7ecb00c3 <ptaylor> reset reader's internal node/DOM streams, fix EPIPE regression in arrow2csv
    1fe1c17a <ptaylor> update to Ix with DOM streams, remove workaround for DOM stream writer tests
---
 js/package-lock.json                          | 99 +++++++++++++++++----------
 js/package.json                               |  4 +-
 js/src/bin/arrow2csv.ts                       | 98 ++++++++++++++------------
 js/src/io/interfaces.ts                       |  4 +-
 js/src/ipc/reader.ts                          |  2 +
 js/test/unit/ipc/reader/streams-dom-tests.ts  | 41 ++++++++++-
 js/test/unit/ipc/reader/streams-node-tests.ts | 44 ++++++++++--
 js/test/unit/ipc/writer/streams-dom-tests.ts  | 17 ++---
 8 files changed, 205 insertions(+), 104 deletions(-)

diff --git a/js/package-lock.json b/js/package-lock.json
index 1dc65df..7ce4165 100644
--- a/js/package-lock.json
+++ b/js/package-lock.json
@@ -1,8 +1,7 @@
 {
   "name": "apache-arrow",
-  "version": "0.3.0",
-  "lockfileVersion": 1,
   "requires": true,
+  "lockfileVersion": 1,
   "dependencies": {
     "@babel/code-frame": {
       "version": "7.0.0",
@@ -4931,7 +4930,8 @@
         "ansi-regex": {
           "version": "2.1.1",
           "bundled": true,
-          "dev": true
+          "dev": true,
+          "optional": true
         },
         "aproba": {
           "version": "1.2.0",
@@ -4952,12 +4952,14 @@
         "balanced-match": {
           "version": "1.0.0",
           "bundled": true,
-          "dev": true
+          "dev": true,
+          "optional": true
         },
         "brace-expansion": {
           "version": "1.1.11",
           "bundled": true,
           "dev": true,
+          "optional": true,
           "requires": {
             "balanced-match": "^1.0.0",
             "concat-map": "0.0.1"
@@ -4972,17 +4974,20 @@
         "code-point-at": {
           "version": "1.1.0",
           "bundled": true,
-          "dev": true
+          "dev": true,
+          "optional": true
         },
         "concat-map": {
           "version": "0.0.1",
           "bundled": true,
-          "dev": true
+          "dev": true,
+          "optional": true
         },
         "console-control-strings": {
           "version": "1.1.0",
           "bundled": true,
-          "dev": true
+          "dev": true,
+          "optional": true
         },
         "core-util-is": {
           "version": "1.0.2",
@@ -5099,7 +5104,8 @@
         "inherits": {
           "version": "2.0.3",
           "bundled": true,
-          "dev": true
+          "dev": true,
+          "optional": true
         },
         "ini": {
           "version": "1.3.5",
@@ -5111,6 +5117,7 @@
           "version": "1.0.0",
           "bundled": true,
           "dev": true,
+          "optional": true,
           "requires": {
             "number-is-nan": "^1.0.0"
           }
@@ -5125,6 +5132,7 @@
           "version": "3.0.4",
           "bundled": true,
           "dev": true,
+          "optional": true,
           "requires": {
             "brace-expansion": "^1.1.7"
           }
@@ -5132,12 +5140,14 @@
         "minimist": {
           "version": "0.0.8",
           "bundled": true,
-          "dev": true
+          "dev": true,
+          "optional": true
         },
         "minipass": {
           "version": "2.3.5",
           "bundled": true,
           "dev": true,
+          "optional": true,
           "requires": {
             "safe-buffer": "^5.1.2",
             "yallist": "^3.0.0"
@@ -5156,6 +5166,7 @@
           "version": "0.5.1",
           "bundled": true,
           "dev": true,
+          "optional": true,
           "requires": {
             "minimist": "0.0.8"
           }
@@ -5236,7 +5247,8 @@
         "number-is-nan": {
           "version": "1.0.1",
           "bundled": true,
-          "dev": true
+          "dev": true,
+          "optional": true
         },
         "object-assign": {
           "version": "4.1.1",
@@ -5248,6 +5260,7 @@
           "version": "1.4.0",
           "bundled": true,
           "dev": true,
+          "optional": true,
           "requires": {
             "wrappy": "1"
           }
@@ -5333,7 +5346,8 @@
         "safe-buffer": {
           "version": "5.1.2",
           "bundled": true,
-          "dev": true
+          "dev": true,
+          "optional": true
         },
         "safer-buffer": {
           "version": "2.1.2",
@@ -5369,6 +5383,7 @@
           "version": "1.0.2",
           "bundled": true,
           "dev": true,
+          "optional": true,
           "requires": {
             "code-point-at": "^1.0.0",
             "is-fullwidth-code-point": "^1.0.0",
@@ -5388,6 +5403,7 @@
           "version": "3.0.1",
           "bundled": true,
           "dev": true,
+          "optional": true,
           "requires": {
             "ansi-regex": "^2.0.0"
           }
@@ -5431,12 +5447,14 @@
         "wrappy": {
           "version": "1.0.2",
           "bundled": true,
-          "dev": true
+          "dev": true,
+          "optional": true
         },
         "yallist": {
           "version": "3.0.3",
           "bundled": true,
-          "dev": true
+          "dev": true,
+          "optional": true
         }
       }
     },
@@ -5844,16 +5862,16 @@
       }
     },
     "google-closure-compiler": {
-      "version": "20190121.0.0",
-      "resolved": "https://registry.npmjs.org/google-closure-compiler/-/google-closure-compiler-20190121.0.0.tgz",
-      "integrity": "sha512-FIp3+KxjtDwykDTr1WsFo0QexEopAC4bDXXZfnEdgHECF7hCeFAAsLUPxMmj9Wx+O39eFCXGAzY7w0k5aU9qjg==",
+      "version": "20190301.0.0",
+      "resolved": "https://registry.npmjs.org/google-closure-compiler/-/google-closure-compiler-20190301.0.0.tgz",
+      "integrity": "sha512-FCtg6VsC9BhvbDLh+idMP4F3gka60KLEW0Oqw7M/vhBZnP2/aB4zzxuUDo5LOxuR+RyVqB4VyGOFnM9Z/14iVw==",
       "dev": true,
       "requires": {
         "chalk": "^1.0.0",
-        "google-closure-compiler-java": "^20190121.0.0",
-        "google-closure-compiler-js": "^20190121.0.0",
-        "google-closure-compiler-linux": "^20190121.0.0",
-        "google-closure-compiler-osx": "^20190121.0.0",
+        "google-closure-compiler-java": "^20190301.0.0",
+        "google-closure-compiler-js": "^20190301.0.0",
+        "google-closure-compiler-linux": "^20190301.0.0",
+        "google-closure-compiler-osx": "^20190301.0.0",
         "minimist": "^1.2.0",
         "vinyl": "^2.0.1",
         "vinyl-sourcemaps-apply": "^0.2.0"
@@ -5887,28 +5905,28 @@
       }
     },
     "google-closure-compiler-java": {
-      "version": "20190121.0.0",
-      "resolved": "https://registry.npmjs.org/google-closure-compiler-java/-/google-closure-compiler-java-20190121.0.0.tgz",
-      "integrity": "sha512-UCQ7ZXOlk/g101DS4TqyW+SaoR+4GVq7NKrwebH4gnESY76Xuz7FRrKWwfAXwltmiYAUVZCVI4qpoEz48V+VjA==",
+      "version": "20190301.0.0",
+      "resolved": "https://registry.npmjs.org/google-closure-compiler-java/-/google-closure-compiler-java-20190301.0.0.tgz",
+      "integrity": "sha512-IMv77Mu1chPjSaJC1PWyKSNIvm19nSjx4oXvf67ZBLRkuPKHb3S1ECD3l71pfxNZ2+2tAXnxkEcWcREJ8ph4Tg==",
       "dev": true
     },
     "google-closure-compiler-js": {
-      "version": "20190121.0.0",
-      "resolved": "https://registry.npmjs.org/google-closure-compiler-js/-/google-closure-compiler-js-20190121.0.0.tgz",
-      "integrity": "sha512-PgY0Fy+fXZnjir6aPz/FVJPXuwZf5pKJ9n7Hf1HL4x1lhqVIf3i+u3Ed6ZWCXa+YiEhvwH5RTQr/iPP/D3gDRg==",
+      "version": "20190301.0.0",
+      "resolved": "https://registry.npmjs.org/google-closure-compiler-js/-/google-closure-compiler-js-20190301.0.0.tgz",
+      "integrity": "sha512-J0HVHwpGf3o5MwyifrYhfhNpD7Zznn+fktcKKmwhguKqaNbgCr1AfnaGEarej3Lx1W9CouJEm5OTRTZRJgvRHQ==",
       "dev": true
     },
     "google-closure-compiler-linux": {
-      "version": "20190121.0.0",
-      "resolved": "https://registry.npmjs.org/google-closure-compiler-linux/-/google-closure-compiler-linux-20190121.0.0.tgz",
-      "integrity": "sha512-cw4qr9TuB2gB53l/oYadZLuw+zOi2yggYFtnNA5jvTLTqY8m2VZAL5DGL6gmCtZovbQ0bv9ANqjT8NxEtcSzfw==",
+      "version": "20190301.0.0",
+      "resolved": "https://registry.npmjs.org/google-closure-compiler-linux/-/google-closure-compiler-linux-20190301.0.0.tgz",
+      "integrity": "sha512-r+47izRha1ZOHP8E5wq7YsjatzJVD0yn/7dnZA/jSJmTxoFDfEaV78PYGAgCpL8kslHHApPDFEn9Ozx2eSH2gg==",
       "dev": true,
       "optional": true
     },
     "google-closure-compiler-osx": {
-      "version": "20190121.0.0",
-      "resolved": "https://registry.npmjs.org/google-closure-compiler-osx/-/google-closure-compiler-osx-20190121.0.0.tgz",
-      "integrity": "sha512-6OqyUcgojPCqCuzdyKLwmIkBhfoWF3cVzaX8vaJvQ3SYwlITBT3aepMEZiWFRVvvml+ojs1AJcZvQIqFke8X1w==",
+      "version": "20190301.0.0",
+      "resolved": "https://registry.npmjs.org/google-closure-compiler-osx/-/google-closure-compiler-osx-20190301.0.0.tgz",
+      "integrity": "sha512-W/Mub4k7oKcd1XYIae0NrJysNvpiAjXhq0DCoTJaTZzkc8dGVqcvrQ/YqYNwLkUULqL1dsrYyt3jv1X6l9OqZw==",
       "dev": true,
       "optional": true
     },
@@ -7037,14 +7055,21 @@
       }
     },
     "ix": {
-      "version": "2.5.1",
-      "resolved": "https://registry.npmjs.org/ix/-/ix-2.5.1.tgz",
-      "integrity": "sha512-YPX759NbhmIynoCYsxcpKBCQDFkeVup4xGaAylnIRaM+md7qrLyoW7kow0iqx4cJr8PUG85/cfwfjylqehg8bQ==",
+      "version": "2.5.2",
+      "resolved": "https://registry.npmjs.org/ix/-/ix-2.5.2.tgz",
+      "integrity": "sha512-RY0NlNdA5frPtUDx9Bctw/L8OTMUgb7OqOa+Z2r9XqiWnUAjLM6xZ4hj9M+/NNI1NLAsXju5WUFyZC9l2kqnMQ==",
       "dev": true,
       "requires": {
-        "@types/node": "^10.12.18",
-        "is-stream": "1.1.0",
+        "@types/node": "^11.10.4",
         "tslib": "^1.9.3"
+      },
+      "dependencies": {
+        "@types/node": {
+          "version": "11.11.4",
+          "resolved": "https://registry.npmjs.org/@types/node/-/node-11.11.4.tgz",
+          "integrity": "sha512-02tIL+QIi/RW4E5xILdoAMjeJ9kYq5t5S2vciUdFPXv/ikFTb0zK8q9vXkg4+WAJuYXGiVT1H28AkD2C+IkXVw==",
+          "dev": true
+        }
       }
     },
     "jest": {
diff --git a/js/package.json b/js/package.json
index e6ec2ed..6a29003 100644
--- a/js/package.json
+++ b/js/package.json
@@ -78,13 +78,13 @@
     "del": "3.0.0",
     "esm": "3.1.4",
     "glob": "7.1.3",
-    "google-closure-compiler": "20190121.0.0",
+    "google-closure-compiler": "20190301.0.0",
     "gulp": "4.0.0",
     "gulp-json-transform": "0.4.6",
     "gulp-rename": "1.4.0",
     "gulp-sourcemaps": "2.6.4",
     "gulp-typescript": "5.0.0",
-    "ix": "2.5.1",
+    "ix": "2.5.2",
     "jest": "23.6.0",
     "jest-environment-node-debug": "2.0.0",
     "jest-silent-reporter": "0.1.1",
diff --git a/js/src/bin/arrow2csv.ts b/js/src/bin/arrow2csv.ts
index b86f852..1cceb0f 100644
--- a/js/src/bin/arrow2csv.ts
+++ b/js/src/bin/arrow2csv.ts
@@ -27,7 +27,6 @@ import { Schema } from '../schema';
 
 const padLeft = require('pad-left');
 const bignumJSONParse = require('json-bignum').parse;
-const pipeline = require('util').promisify(stream.pipeline);
 const argv = require(`command-line-args`)(cliOpts(), { partial: true });
 const files = argv.help ? [] : [...(argv.file || []), ...(argv._unknown || [])].filter(Boolean);
 
@@ -56,11 +55,11 @@ type ToStringState = {
         if (state.closed) { break; }
         for await (reader of recordBatchReaders(source)) {
             hasReaders = true;
-            const source = reader.toNodeStream();
-            const xform = batchesToString(state, reader.schema);
-            const sink = new stream.PassThrough();
-            sink.pipe(process.stdout, { end: false });
-            await pipeline(source, xform, sink).catch(() => state.closed = true);
+            const transformToString = batchesToString(state, reader.schema);
+            await pipeTo(
+                reader.pipe(transformToString),
+                process.stdout, { end: false }
+            ).catch(() => state.closed = true); // Handle EPIPE errors
         }
         if (state.closed) { break; }
     }
@@ -74,6 +73,21 @@ type ToStringState = {
     return process.exitCode || 1;
 }).then((code) => process.exit(code));
 
+function pipeTo(source: NodeJS.ReadableStream, sink: NodeJS.WritableStream, opts?: { end: boolean }) {
+    return new Promise((resolve, reject) => {
+
+        source.on('end', onEnd).pipe(sink, opts).on('error', onErr);
+
+        function onEnd() { done(undefined, resolve); }
+        function onErr(err:any) { done(err, reject); }
+        function done(e: any, cb: (e?: any) => void) {
+            source.removeListener('end', onEnd);
+            sink.removeListener('error', onErr);
+            cb(e);
+        }
+    });
+}
+
 async function *recordBatchReaders(createSourceStream: () => NodeJS.ReadableStream) {
 
     let json = new AsyncByteQueue();
@@ -117,11 +131,10 @@ function batchesToString(state: ToStringState, schema: Schema) {
     state.maxColWidths = header.map((x, i) => Math.max(maxColWidths[i] || 0, x.length));
 
     return new stream.Transform({
-        transform,
         encoding: 'utf8',
         writableObjectMode: true,
         readableObjectMode: false,
-        final(this: stream.Transform, cb: (error?: Error | null) => void) {
+        final(cb: (error?: Error | null) => void) {
             // if there were no batches, then print the Schema, and metadata
             if (batchId === -1) {
                 this.push(`${horizontalRule(state.maxColWidths, hr, sep)}\n\n`);
@@ -132,46 +145,45 @@ function batchesToString(state: ToStringState, schema: Schema) {
             }
             this.push(`${horizontalRule(state.maxColWidths, hr, sep)}\n\n`);
             cb();
-        }
-    });
-
-    function transform(this: stream.Transform, batch: RecordBatch, _enc: string, cb: (error?: Error, data?: any) => void) {
-
-        batch = !(state.schema && state.schema.length) ? batch : batch.select(...state.schema);
-
-        if (state.closed) { return cb(undefined, null); }
-
-        // Pass one to convert to strings and count max column widths
-        state.maxColWidths = measureColumnWidths(rowId, batch, header.map((x, i) => Math.max(maxColWidths[i] || 0, x.length)));
-
-        // If this is the first batch in a stream, print a top horizontal rule, schema metadata, and 
-        if (++batchId === 0) {
-            this.push(`${horizontalRule(state.maxColWidths, hr, sep)}\n`);
-            if (state.metadata && batch.schema.metadata.size > 0) {
-                this.push(`metadata:\n${formatMetadata(batch.schema.metadata)}\n`);
+        },
+        transform(batch: RecordBatch, _enc: string, cb: (error?: Error, data?: any) => void) {
+    
+            batch = !(state.schema && state.schema.length) ? batch : batch.select(...state.schema);
+    
+            if (state.closed) { return cb(undefined, null); }
+    
+            // Pass one to convert to strings and count max column widths
+            state.maxColWidths = measureColumnWidths(rowId, batch, header.map((x, i) => Math.max(maxColWidths[i] || 0, x.length)));
+    
+            // If this is the first batch in a stream, print a top horizontal rule, schema metadata, and 
+            if (++batchId === 0) {
                 this.push(`${horizontalRule(state.maxColWidths, hr, sep)}\n`);
+                if (state.metadata && batch.schema.metadata.size > 0) {
+                    this.push(`metadata:\n${formatMetadata(batch.schema.metadata)}\n`);
+                    this.push(`${horizontalRule(state.maxColWidths, hr, sep)}\n`);
+                }
+                if (batch.length <= 0 || batch.numCols <= 0) {
+                    this.push(`${formatRow(header, maxColWidths = state.maxColWidths, sep)}\n`);
+                }
             }
-            if (batch.length <= 0 || batch.numCols <= 0) {
-                this.push(`${formatRow(header, maxColWidths = state.maxColWidths, sep)}\n`);
-            }
-        }
-
-        if (batch.length > 0 && batch.numCols > 0) {
-            // If any of the column widths changed, print the header again
-            if (rowId % 350 !== 0 && JSON.stringify(state.maxColWidths) !== JSON.stringify(maxColWidths)) {
-                this.push(`${formatRow(header, state.maxColWidths, sep)}\n`);
-            }
-            maxColWidths = state.maxColWidths;
-            for (const row of batch) {
-                if (state.closed) { break; } else if (!row) { continue; }
-                if (rowId++ % 350 === 0) {
-                    this.push(`${formatRow(header, maxColWidths, sep)}\n`);
+    
+            if (batch.length > 0 && batch.numCols > 0) {
+                // If any of the column widths changed, print the header again
+                if (rowId % 350 !== 0 && JSON.stringify(state.maxColWidths) !== JSON.stringify(maxColWidths)) {
+                    this.push(`${formatRow(header, state.maxColWidths, sep)}\n`);
+                }
+                maxColWidths = state.maxColWidths;
+                for (const row of batch) {
+                    if (state.closed) { break; } else if (!row) { continue; }
+                    if (rowId++ % 350 === 0) {
+                        this.push(`${formatRow(header, maxColWidths, sep)}\n`);
+                    }
+                    this.push(`${formatRow([rowId, ...row].map(valueToString), maxColWidths, sep)}\n`);
                 }
-                this.push(`${formatRow([rowId, ...row].map(valueToString), maxColWidths, sep)}\n`);
             }
+            cb();
         }
-        cb();
-    }
+    });
 }
 
 function horizontalRule(maxColWidths: number[], hr = '-', sep = ' |') {
diff --git a/js/src/io/interfaces.ts b/js/src/io/interfaces.ts
index 9892562..e057c2d 100644
--- a/js/src/io/interfaces.ts
+++ b/js/src/io/interfaces.ts
@@ -81,12 +81,12 @@ export abstract class ReadableInterop<T> {
         return this._getDOMStream().pipeThrough(duplex, options);
     }
 
-    private _DOMStream?: ReadableStream<T>;
+    protected _DOMStream?: ReadableStream<T>;
     private _getDOMStream() {
         return this._DOMStream || (this._DOMStream = this.toDOMStream());
     }
 
-    private _nodeStream?: import('stream').Readable;
+    protected _nodeStream?: import('stream').Readable;
     private _getNodeStream() {
         return this._nodeStream || (this._nodeStream = this.toNodeStream());
     }
diff --git a/js/src/ipc/reader.ts b/js/src/ipc/reader.ts
index 6ef9dc4..78c4036 100644
--- a/js/src/ipc/reader.ts
+++ b/js/src/ipc/reader.ts
@@ -95,6 +95,8 @@ export class RecordBatchReader<T extends { [key: string]: DataType } = any> exte
     }
     public reset(schema?: Schema<T> | null): this {
         this._impl.reset(schema);
+        this._DOMStream = undefined;
+        this._nodeStream = undefined;
         return this;
     }
     public open(options?: OpenOptions) {
diff --git a/js/test/unit/ipc/reader/streams-dom-tests.ts b/js/test/unit/ipc/reader/streams-dom-tests.ts
index e471867..27aaee9 100644
--- a/js/test/unit/ipc/reader/streams-dom-tests.ts
+++ b/js/test/unit/ipc/reader/streams-dom-tests.ts
@@ -110,6 +110,43 @@ import { ArrowIOTestHelper, readableDOMStreamToAsyncIterator } from '../helpers'
         });
     }
 
+    it('readAll() should pipe to separate WhatWG WritableStreams', async () => {
+
+        expect.hasAssertions();
+
+        const tables = [...generateRandomTables([10, 20, 30])];
+
+        const stream = concatStream(tables.map((table, i) =>
+            RecordBatchStreamWriter.writeAll(table).toDOMStream({
+                // Alternate between bytes mode and regular mode because code coverage
+                type: i % 2 === 0 ? 'bytes' : undefined
+            })
+        )) as ReadableStream<Uint8Array>;
+
+        let tableIndex = -1;
+        let reader: RecordBatchReader | undefined;
+
+        for await (reader of RecordBatchReader.readAll(stream)) {
+
+            validateStreamState(reader, stream, false);
+
+            const output = reader
+                .pipeThrough(RecordBatchStreamWriter.throughDOM())
+                .pipeThrough(new TransformStream());
+
+            validateStreamState(reader, output, false, false);
+
+            const sourceTable = tables[++tableIndex];
+            const streamTable = await Table.from(output);
+            expect(streamTable).toEqualTable(sourceTable);
+            expect(output.locked).toBe(false);
+        }
+
+        expect(reader).toBeDefined();
+        validateStreamState(reader!, stream, true);
+        expect(tableIndex).toBe(tables.length - 1);
+    });
+
     it('should not close the underlying WhatWG ReadableStream when reading multiple tables to completion', async () => {
 
         expect.hasAssertions();
@@ -183,7 +220,7 @@ import { ArrowIOTestHelper, readableDOMStreamToAsyncIterator } from '../helpers'
     });
 })();
 
-function validateStreamState(reader: RecordBatchReader, stream: ReadableStream, closed: boolean) {
+function validateStreamState(reader: RecordBatchReader, stream: ReadableStream, closed: boolean, locked = !closed) {
     expect(reader.closed).toBe(closed);
-    expect(stream.locked).toBe(!closed);
+    expect(stream.locked).toBe(locked);
 }
diff --git a/js/test/unit/ipc/reader/streams-node-tests.ts b/js/test/unit/ipc/reader/streams-node-tests.ts
index 62c947a..b03b68c 100644
--- a/js/test/unit/ipc/reader/streams-node-tests.ts
+++ b/js/test/unit/ipc/reader/streams-node-tests.ts
@@ -36,12 +36,12 @@ import { validateRecordBatchAsyncIterator } from '../validate';
     }
 
     /* tslint:disable */
-    const stream = require('stream');
+    const { Readable, PassThrough } = require('stream');
     /* tslint:disable */
     const { parse: bignumJSONParse } = require('json-bignum');
     /* tslint:disable */
     const concatStream = ((multistream) => (...xs: any[]) =>
-        new stream.Readable().wrap(multistream(...xs))
+        new Readable().wrap(multistream(...xs))
     )(require('multistream'));
 
     for (const table of generateRandomTables([10, 20, 30])) {
@@ -113,6 +113,40 @@ import { validateRecordBatchAsyncIterator } from '../validate';
         });
     }
 
+    it('readAll() should pipe to separate NodeJS WritableStreams', async () => {
+
+        expect.hasAssertions();
+
+        const tables = [...generateRandomTables([10, 20, 30])];
+
+        const stream = concatStream(tables.map((table) =>
+            () => RecordBatchStreamWriter.writeAll(table).toNodeStream()
+        )) as NodeJS.ReadableStream;
+
+        let tableIndex = -1;
+        let reader: RecordBatchReader | undefined;
+
+        for await (reader of RecordBatchReader.readAll(stream)) {
+
+            validateStreamState(reader, stream, false);
+
+            const output = reader
+                .pipe(RecordBatchStreamWriter.throughNode())
+                .pipe(new PassThrough());
+
+            validateStreamState(reader, output, false);
+
+            const sourceTable = tables[++tableIndex];
+            const streamTable = await Table.from(output);
+            expect(streamTable).toEqualTable(sourceTable);
+            expect(Boolean(output.readableFlowing)).toBe(false);
+        }
+
+        expect(reader).toBeDefined();
+        validateStreamState(reader!, stream, true);
+        expect(tableIndex).toBe(tables.length - 1);
+    });
+
     it('should not close the underlying NodeJS ReadableStream when reading multiple tables to completion', async () => {
 
         expect.hasAssertions();
@@ -182,7 +216,7 @@ import { validateRecordBatchAsyncIterator } from '../validate';
 
 function validateStreamState(reader: RecordBatchReader, stream: NodeJS.ReadableStream, closed: boolean, readable = !closed) {
     expect(reader.closed).toBe(closed);
-    expect(stream.readable).toBe(readable);
-    expect((stream as any).destroyed).toBe(closed);
-    expect((stream as any).readableFlowing).toBe(false);
+    expect(Boolean(stream.readable)).toBe(readable);
+    expect(Boolean((stream as any).destroyed)).toBe(closed);
+    expect(Boolean((stream as any).readableFlowing)).toBe(false);
 }
diff --git a/js/test/unit/ipc/writer/streams-dom-tests.ts b/js/test/unit/ipc/writer/streams-dom-tests.ts
index cc6e961..ba3f0a4 100644
--- a/js/test/unit/ipc/writer/streams-dom-tests.ts
+++ b/js/test/unit/ipc/writer/streams-dom-tests.ts
@@ -32,7 +32,6 @@ import {
 } from '../../../Arrow';
 
 import {
-    nodeToDOMStream,
     ArrowIOTestHelper,
     concatBuffersAsync,
     readableDOMStreamToAsyncIterator
@@ -51,9 +50,6 @@ import {
     }
 
     /* tslint:disable */
-    const { PassThrough } = require('stream');
-
-    /* tslint:disable */
     const { parse: bignumJSONParse } = require('json-bignum');
 
     for (const table of generateRandomTables([10, 20, 30])) {
@@ -233,19 +229,16 @@ import {
 
     describe(`RecordBatchStreamWriter.throughDOM`, () => {
 
-        const psOpts = { objectMode: true };
         const opts = { autoDestroy: false };
         const sleep = (n: number) => new Promise((r) => setTimeout(r, n));
 
         it(`should write a stream of tables to the same output stream`, async () => {
 
             const tables = [] as Table[];
-            const stream = (AsyncIterable.from(generateRandomTables([10, 20, 30]))
+            const stream = AsyncIterable.from(generateRandomTables([10, 20, 30]))
                 // insert some asynchrony
                 .tap({ async next(table) { tables.push(table); await sleep(1); } })
-                // have to bail out to `any` until Ix supports DOM streams
-                .pipe((xs: any) => <any> nodeToDOMStream(xs.pipe(new PassThrough(psOpts)))) as any)
-                .pipeThrough(RecordBatchStreamWriter.throughDOM(opts)) as ReadableStream<Uint8Array>;
+                .pipeThrough(RecordBatchStreamWriter.throughDOM(opts));
 
             for await (const reader of RecordBatchReader.readAll(stream)) {
                 const sourceTable = tables.shift()!;
@@ -260,14 +253,12 @@ import {
         it(`should write a stream of record batches to the same output stream`, async () => {
 
             const tables = [] as Table[];
-            const stream = (AsyncIterable.from(generateRandomTables([10, 20, 30]))
+            const stream = AsyncIterable.from(generateRandomTables([10, 20, 30]))
                 // insert some asynchrony
                 .tap({ async next(table) { tables.push(table); await sleep(1); } })
                 // flatMap from Table -> RecordBatches[]
                 .flatMap((table) => AsyncIterable.as(table.chunks))
-                // have to bail out to `any` until Ix supports DOM streams
-                .pipe((xs: any) => <any> nodeToDOMStream(xs.pipe(new PassThrough(psOpts)))) as any)
-                .pipeThrough(RecordBatchStreamWriter.throughDOM(opts)) as ReadableStream<Uint8Array>;
+                .pipeThrough(RecordBatchStreamWriter.throughDOM(opts));
     
             for await (const reader of RecordBatchReader.readAll(stream)) {
                 const sourceTable = tables.shift()!;