You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@arrow.apache.org by "Victor Bonilla (Jira)" <ji...@apache.org> on 2022/06/01 09:39:00 UTC

[jira] [Created] (ARROW-16705) [JavaScript] TypeError: RecordBatchReader.from(...).toNodeStream is not a function

Victor Bonilla created ARROW-16705:
--------------------------------------

             Summary: [JavaScript] TypeError: RecordBatchReader.from(...).toNodeStream is not a function
                 Key: ARROW-16705
                 URL: https://issues.apache.org/jira/browse/ARROW-16705
             Project: Apache Arrow
          Issue Type: Bug
          Components: JavaScript
    Affects Versions: 8.0.0
         Environment: Nodejs v16.13.0
            Reporter: Victor Bonilla


Trying to code a real-time stream from an async iterable of objects to an IPC Streaming format file I'm getting a TypeError.

The idea is to stream every message to the arrow file as soon as it arrives without waiting to build the complete table to stream it. To take advantage of the stream event handling, I'm using the functional approach of [node:stream|https://nodejs.org/docs/latest-v16.x/api/stream.html] module (Nodejs v16.13.0).

The async iterable contains messages that are JS objects containing different data types, for example:
{code:javascript}
{
    id: '6345',
    product: 'foo',
    price: 62.78,
    created: '2022-05-01T16:01:00.105Z',
}{code}
Code to replicate the error:
{code:javascript}
const {
    Struct, Field, Utf8, Float32, TimestampMillisecond,
    RecordBatchReader, RecordBatchStreamWriter,
    builderThroughAsyncIterable,
} = require('apache-arrow')
const fs = require("fs");
const path = require("path");
const {pipeline} = require('node:stream');

const asyncIterable = {
    [Symbol.asyncIterator]: async function* () {
        while (true) {
            const obj = {
                id: Math.floor(Math.random() * 10).toString(),
                product: 'foo',
                price: Math.random() + Math.floor(Math.random() * 10),
                created: new Date(),
            }
            yield obj;
            // insert some asynchrony
            await new Promise((r) => setTimeout(r, 1000));
        }
    }
}

async function streamToArrow(messagesAsyncIterable) {
    const message_type = new Struct([
        new Field('id', new Utf8, false),
        new Field('product', new Utf8, false),
        new Field('price', new Float32, false),
        new Field('created', new TimestampMillisecond, false),
    ]);

    const builderOptions = {
        type: message_type,
        nullValues: [null, 'n/a', undefined],
        highWaterMark: 30,
        queueingStrategy: 'count',
    };

    const transform = builderThroughAsyncIterable(builderOptions);  
    let file_path = './ipc_stream.arrow';
    const fsWriter = fs.createWriteStream(file_path);

    pipeline(
        RecordBatchReader
            .from(transform(messagesAsyncIterable))
            .toNodeStream(),  // Throws TypeError: RecordBatchReader.from(...).toNodeStream is not a function         
        RecordBatchStreamWriter.throughNode(),
        fsWriter,
        (err, value) => {
            if (err) {
                console.error(err);
            } else {
                console.log(value, 'value returned');
            }
        }
    ).on('close', () => {
        console.log('Closed pipeline')
    });

}

streamToArrow(asyncIterable){code}
 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)