You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2018/09/27 09:45:54 UTC

[arrow] branch master updated: ARROW-3256, 3304: [JS] fix file footer inconsistency, yield all messages from the stream reader

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new c1cf985  ARROW-3256,3304: [JS] fix file footer inconsistency, yield all messages from the stream reader
c1cf985 is described below

commit c1cf98594fbd1287022ad6636f2624cd32813bd6
Author: ptaylor <pa...@me.com>
AuthorDate: Thu Sep 27 05:45:43 2018 -0400

    ARROW-3256,3304: [JS] fix file footer inconsistency, yield all messages from the stream reader
    
    Yield all messages from the stream reader to support reading multiple tables from the same source stream. This is something a stream-reader refactor should support out of the box, but this PR adds this ability to master until that refactor is done. I added [an integration test](https://github.com/trxcllnt/arrow/blob/6c58f4aea84c7eebdd807f67d066d93d9348b110/js/test/integration/validate-tests.ts#L195) that shows what such a reader would look like today.
    
    Author: ptaylor <pa...@me.com>
    
    Closes #2616 from trxcllnt/js-stream-reader-fixes-rebased and squashes the following commits:
    
    2095e4ebf <ptaylor> write correct bodyLength into FileBlock
    126a2171c <ptaylor> add exports for closure-compiler
    6c58f4aea <ptaylor> emit all messages from the stream reader to support reading multiple tables from the same source stream
---
 js/src/Arrow.ts                       |  4 ++
 js/src/ipc/metadata.ts                |  3 +-
 js/src/ipc/reader/binary.ts           | 12 ++---
 js/src/ipc/reader/node.ts             |  6 ++-
 js/src/ipc/writer/binary.ts           | 12 ++---
 js/src/util/node.ts                   |  2 +-
 js/test/integration/validate-tests.ts | 82 +++++++++++++++++++++++++++++++++++
 7 files changed, 105 insertions(+), 16 deletions(-)

diff --git a/js/src/Arrow.ts b/js/src/Arrow.ts
index 52b9b25..61556c4 100644
--- a/js/src/Arrow.ts
+++ b/js/src/Arrow.ts
@@ -30,6 +30,7 @@ import { Schema, Field, Type } from './type';
 import { Table, DataFrame, NextFunc, BindFunc, CountByResult } from './table';
 import { fromReadableStream } from './ipc/reader/node';
 import { read, readAsync, readStream } from './ipc/reader/arrow';
+import { readBuffersAsync, readRecordBatchesAsync } from './ipc/reader/arrow';
 import { serializeFile, serializeStream } from './ipc/writer/binary';
 
 export import View = vector_.View;
@@ -41,6 +42,7 @@ export import TypedArrayConstructor = type_.TypedArrayConstructor;
 
 export { fromReadableStream };
 export { read, readAsync, readStream };
+export { readBuffersAsync, readRecordBatchesAsync };
 export { serializeFile, serializeStream };
 export { Table, DataFrame, NextFunc, BindFunc, CountByResult };
 export { Field, Schema, RecordBatch, Vector, Type };
@@ -213,6 +215,8 @@ try {
         Arrow['readAsync'] = readAsync;
         Arrow['readStream'] = readStream;
         Arrow['fromReadableStream'] = fromReadableStream;
+        Arrow['readBuffersAsync'] = readBuffersAsync;
+        Arrow['readRecordBatchesAsync'] = readRecordBatchesAsync;
 
         Arrow['serializeFile'] = serializeFile;
         Arrow['serializeStream'] = serializeStream;
diff --git a/js/src/ipc/metadata.ts b/js/src/ipc/metadata.ts
index 25b94b1..025b051 100644
--- a/js/src/ipc/metadata.ts
+++ b/js/src/ipc/metadata.ts
@@ -17,7 +17,6 @@
 
 /* tslint:disable:class-name */
 
-import { align } from '../util/bit';
 import { Schema, Long, MessageHeader, MetadataVersion } from '../type';
 
 export class Footer {
@@ -53,7 +52,7 @@ export class RecordBatchMetadata extends Message {
     public buffers: BufferMetadata[];
     constructor(version: MetadataVersion, length: Long | number, nodes: FieldMetadata[], buffers: BufferMetadata[], bodyLength?: Long | number) {
         if (bodyLength === void(0)) {
-            bodyLength = buffers.reduce((s, b) => align(s + b.length + (b.offset - s), 8), 0);
+            bodyLength = buffers.reduce((bodyLength, buffer) => bodyLength + buffer.length, 0);
         }
         super(version, bodyLength, MessageHeader.RecordBatch);
         this.nodes = nodes;
diff --git a/js/src/ipc/reader/binary.ts b/js/src/ipc/reader/binary.ts
index 65dc91a..988ce60 100644
--- a/js/src/ipc/reader/binary.ts
+++ b/js/src/ipc/reader/binary.ts
@@ -39,7 +39,7 @@ import {
 
 import ByteBuffer = flatbuffers.ByteBuffer;
 
-type MessageReader = (bb: ByteBuffer) => IterableIterator<RecordBatchMetadata | DictionaryBatch>;
+type MessageReader = (bb: ByteBuffer) => IterableIterator<Message>;
 
 export function* readBuffers<T extends Uint8Array | Buffer | string>(sources: Iterable<T> | Uint8Array | Buffer | string) {
     let schema: Schema | null = null;
@@ -56,8 +56,8 @@ export function* readBuffers<T extends Uint8Array | Buffer | string>(sources: It
                     schema, message,
                     loader: new BinaryDataLoader(
                         bb,
-                        arrayIterator(message.nodes),
-                        arrayIterator(message.buffers),
+                        arrayIterator((message as any).nodes || []),
+                        arrayIterator((message as any).buffers || []),
                         dictionaries
                     )
                 };
@@ -78,8 +78,8 @@ export async function* readBuffersAsync<T extends Uint8Array | Buffer | string>(
                     schema, message,
                     loader: new BinaryDataLoader(
                         bb,
-                        arrayIterator(message.nodes),
-                        arrayIterator(message.buffers),
+                        arrayIterator((message as any).nodes || []),
+                        arrayIterator((message as any).buffers || []),
                         dictionaries
                     )
                 };
@@ -148,7 +148,7 @@ function* readStreamMessages(bb: ByteBuffer) {
         } else if (Message.isDictionaryBatch(message)) {
             yield message;
         } else {
-            continue;
+            yield message;
         }
         // position the buffer after the body to read the next message
         bb.setPosition(bb.position() + message.bodyLength);
diff --git a/js/src/ipc/reader/node.ts b/js/src/ipc/reader/node.ts
index 8a455e9..24295c8 100644
--- a/js/src/ipc/reader/node.ts
+++ b/js/src/ipc/reader/node.ts
@@ -29,6 +29,10 @@ export async function* fromReadableStream(stream: NodeJS.ReadableStream) {
 
     for await (let chunk of (stream as any as AsyncIterable<Uint8Array | Buffer | string>)) {
 
+        if (chunk == null) {
+            continue;
+        }
+
         const grown = new Uint8Array(bytes.byteLength + chunk.length);
 
         if (typeof chunk !== 'string') {
@@ -54,7 +58,7 @@ export async function* fromReadableStream(stream: NodeJS.ReadableStream) {
             messageLength = new DataView(bytes.buffer).getInt32(0, true);
         }
 
-        while (messageLength < bytes.byteLength) {
+        while (messageLength > 0 && messageLength <= bytes.byteLength) {
             if (!message) {
                 (bb = new ByteBuffer(bytes)).setPosition(4);
                 if (message = _Message.getRootAsMessage(bb)) {
diff --git a/js/src/ipc/writer/binary.ts b/js/src/ipc/writer/binary.ts
index 2bab428..4be3dc7 100644
--- a/js/src/ipc/writer/binary.ts
+++ b/js/src/ipc/writer/binary.ts
@@ -54,7 +54,7 @@ export function* serializeFile(table: Table) {
 
     // First yield the magic string (aligned)
     let buffer = new Uint8Array(align(magicLength, 8));
-    let metadataLength, byteLength = buffer.byteLength;
+    let metadataLength, bodyLength, byteLength = buffer.byteLength;
     buffer.set(MAGIC, 0);
     yield buffer;
 
@@ -66,15 +66,15 @@ export function* serializeFile(table: Table) {
     for (const [id, field] of table.schema.dictionaries) {
         const vec = table.getColumn(field.name) as DictionaryVector;
         if (vec && vec.dictionary) {
-            ({ metadataLength, buffer } = serializeDictionaryBatch(vec.dictionary, id));
-            dictionaryBatches.push(new FileBlock(metadataLength, buffer.byteLength, byteLength));
+            ({ metadataLength, bodyLength, buffer } = serializeDictionaryBatch(vec.dictionary, id));
+            dictionaryBatches.push(new FileBlock(metadataLength, bodyLength, byteLength));
             byteLength += buffer.byteLength;
             yield buffer;
         }
     }
     for (const recordBatch of table.batches) {
-        ({ metadataLength, buffer } = serializeRecordBatch(recordBatch));
-        recordBatches.push(new FileBlock(metadataLength, buffer.byteLength, byteLength));
+        ({ metadataLength, bodyLength, buffer } = serializeRecordBatch(recordBatch));
+        recordBatches.push(new FileBlock(metadataLength, bodyLength, byteLength));
         byteLength += buffer.byteLength;
         yield buffer;
     }
@@ -127,7 +127,7 @@ export function serializeMessage(message: Message, data?: Uint8Array) {
     (data && dataByteLength > 0) && messageBytes.set(data, metadataLength);
     // if (messageBytes.byteLength % 8 !== 0) { debugger; }
     // Return the metadata length because we need to write it into each FileBlock also
-    return { metadataLength, buffer: messageBytes };
+    return { metadataLength, bodyLength: message.bodyLength, buffer: messageBytes };
 }
 
 export function serializeFooter(footer: Footer) {
diff --git a/js/src/util/node.ts b/js/src/util/node.ts
index 8e58f6b..e5c5066 100644
--- a/js/src/util/node.ts
+++ b/js/src/util/node.ts
@@ -88,6 +88,6 @@ function wait(stream: NodeJS.WritableStream, done: boolean, write: (x?: any) =>
         stream['once']('error', write);
         stream['once']('drain', write);
     } else if (!(!p || stream === p.stdout) && !(stream as any)['isTTY']) {
-        stream['end'](<any> null);
+        stream['end']();
     }
 }
diff --git a/js/test/integration/validate-tests.ts b/js/test/integration/validate-tests.ts
index c301d65..5d0d3ff 100644
--- a/js/test/integration/validate-tests.ts
+++ b/js/test/integration/validate-tests.ts
@@ -22,10 +22,15 @@ import Arrow from '../Arrow';
 import { zip } from 'ix/iterable/zip';
 import { toArray } from 'ix/iterable/toarray';
 
+import { AsyncIterableX } from 'ix/asynciterable/asynciterablex';
+import { zip as zipAsync } from 'ix/asynciterable/zip';
+import { toArray as toArrayAsync } from 'ix/asynciterable/toarray';
+
 /* tslint:disable */
 const { parse: bignumJSONParse } = require('json-bignum');
 
 const { Table, read } = Arrow;
+const { fromReadableStream, readBuffersAsync, readRecordBatchesAsync } = Arrow;
 
 if (!process.env.JSON_PATHS || !process.env.ARROW_PATHS) {
     throw new Error('Integration tests need paths to both json and arrow files');
@@ -125,6 +130,7 @@ describe(`Integration`, () => {
             testTableToBuffersIntegration('binary', 'stream')(json, arrowBuffer);
         });
     }
+    testReadingMultipleTablesFromTheSameStream();
 });
 
 function testReaderIntegration(jsonData: any, arrowBuffer: Uint8Array) {
@@ -183,3 +189,79 @@ function testTableToBuffersIntegration(srcFormat: 'json' | 'binary', arrowFormat
         });
     }
 }
+
+function testReadingMultipleTablesFromTheSameStream() {
+
+    test('Can read multiple tables from the same stream with a special stream reader', async () => {
+
+        async function* allTablesReadableStream() {
+            for (const [, arrowPath] of jsonAndArrowPaths) {
+                for await (const buffer of fs.createReadStream(arrowPath)) {
+                    yield buffer as Uint8Array;
+                }
+            }
+        }
+
+        const pathsAsync = AsyncIterableX.from(jsonAndArrowPaths);
+        const batchesAsync = readBatches(allTablesReadableStream());
+        const pathsAndBatches = zipAsync(pathsAsync, batchesAsync);
+
+        for await (const [[jsonFilePath, arrowFilePath], batches] of pathsAndBatches) {
+
+            const streamTable = new Table(await toArrayAsync(batches));
+            const binaryTable = Table.from(getOrReadFileBuffer(arrowFilePath) as Uint8Array);
+            const jsonTable = Table.from(bignumJSONParse(getOrReadFileBuffer(jsonFilePath, 'utf8')));
+
+            expect(streamTable.length).toEqual(jsonTable.length);
+            expect(streamTable.length).toEqual(binaryTable.length);
+            expect(streamTable.numCols).toEqual(jsonTable.numCols);
+            expect(streamTable.numCols).toEqual(binaryTable.numCols);
+            for (let i = -1, n = streamTable.numCols; ++i < n;) {
+                const v1 = streamTable.getColumnAt(i);
+                const v2 = jsonTable.getColumnAt(i);
+                const v3 = binaryTable.getColumnAt(i);
+                const name = streamTable.schema.fields[i].name;
+                (expect([v1, `stream`, name]) as any).toEqualVector([v2, `json`]);
+                (expect([v1, `stream`, name]) as any).toEqualVector([v3, `binary`]);
+            }
+        }
+    });
+
+    async function* readBatches(stream: AsyncIterable<Uint8Array>) {
+
+        let message: any, done = false, broke = false;
+        let source = buffers(fromReadableStream(stream as any));
+    
+        do {
+            yield readRecordBatchesAsync(messages({
+                next(x: any) { return source.next(x); },
+                throw(x: any) { return source.throw!(x); },
+                [Symbol.asyncIterator]() { return this; },
+            }));
+        } while (!done || (message = null));
+    
+        source.return && (await source.return());
+    
+        async function* messages(source: AsyncIterableIterator<Uint8Array>) {
+            for await (message of readBuffersAsync(source)) {
+                if (broke = message.message.headerType === 1) {
+                    break;
+                }
+                yield message;
+                message = null;
+            }
+            done = done || !broke;
+            broke = false;
+        }
+    
+        async function* buffers(source: AsyncIterableIterator<Uint8Array>) {
+            while (!done) {
+                message && (yield message.loader.bytes);
+                const next = await source.next();
+                if (!(done = next.done)) {
+                    yield next.value;
+                }
+            }
+        }
+    }
+}