You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@arrow.apache.org by "Victor Bonilla (Jira)" <ji...@apache.org> on 2022/06/03 07:59:00 UTC

[jira] [Closed] (ARROW-16705) [JavaScript] TypeError: RecordBatchReader.from(...).toNodeStream is not a function

     [ https://issues.apache.org/jira/browse/ARROW-16705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Victor Bonilla closed ARROW-16705.
----------------------------------

> [JavaScript] TypeError: RecordBatchReader.from(...).toNodeStream is not a function
> ----------------------------------------------------------------------------------
>
>                 Key: ARROW-16705
>                 URL: https://issues.apache.org/jira/browse/ARROW-16705
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: JavaScript
>    Affects Versions: 8.0.0
>         Environment: Nodejs v16.13.0
>            Reporter: Victor Bonilla
>            Priority: Major
>              Labels: async, ipc, javascript, stream
>
> Trying to code a real-time stream from an async iterable of objects to an IPC Streaming format file I'm getting a TypeError.
> The idea is to stream every message to the arrow file as soon as it arrives without waiting to build the complete table to stream it. To take advantage of the stream event handling, I'm using the functional approach of [node:stream|https://nodejs.org/docs/latest-v16.x/api/stream.html] module (Nodejs v16.13.0).
> The async iterable contains messages that are JS objects containing different data types, for example:
> {code:javascript}
> {
>     id: '6345',
>     product: 'foo',
>     price: 62.78,
>     created: '2022-05-01T16:01:00.105Z',
> }{code}
> Code to replicate the error:
> {code:javascript}
> const {
>     Struct, Field, Utf8, Float32, TimestampMillisecond,
>     RecordBatchReader, RecordBatchStreamWriter,
>     builderThroughAsyncIterable,
> } = require('apache-arrow')
> const fs = require("fs");
> const path = require("path");
> const {pipeline} = require('node:stream');
> const asyncIterable = {
>     [Symbol.asyncIterator]: async function* () {
>         while (true) {
>             const obj = {
>                 id: Math.floor(Math.random() * 10).toString(),
>                 product: 'foo',
>                 price: Math.random() + Math.floor(Math.random() * 10),
>                 created: new Date(),
>             }
>             yield obj;
>             // insert some asynchrony
>             await new Promise((r) => setTimeout(r, 1000));
>         }
>     }
> }
> async function streamToArrow(messagesAsyncIterable) {
>     const message_type = new Struct([
>         new Field('id', new Utf8, false),
>         new Field('product', new Utf8, false),
>         new Field('price', new Float32, false),
>         new Field('created', new TimestampMillisecond, false),
>     ]);
>     const builderOptions = {
>         type: message_type,
>         nullValues: [null, 'n/a', undefined],
>         highWaterMark: 30,
>         queueingStrategy: 'count',
>     };
>     const transform = builderThroughAsyncIterable(builderOptions);  
>     let file_path = './ipc_stream.arrow';
>     const fsWriter = fs.createWriteStream(file_path);
>     pipeline(
>         RecordBatchReader
>             .from(transform(messagesAsyncIterable))
>             .toNodeStream(),  // Throws TypeError: RecordBatchReader.from(...).toNodeStream is not a function         
>         RecordBatchStreamWriter.throughNode(),
>         fsWriter,
>         (err, value) => {
>             if (err) {
>                 console.error(err);
>             } else {
>                 console.log(value, 'value returned');
>             }
>         }
>     ).on('close', () => {
>         console.log('Closed pipeline')
>     });
> }
> streamToArrow(asyncIterable){code}
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)