You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/05/31 21:02:33 UTC

[arrow] branch master updated: ARROW-5396: [JS] Support files and streams with no record batches

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 17e0198  ARROW-5396: [JS] Support files and streams with no record batches
17e0198 is described below

commit 17e0198253f1b528fe7da494100f2861648688c2
Author: ptaylor <pa...@me.com>
AuthorDate: Fri May 31 16:02:20 2019 -0500

    ARROW-5396: [JS] Support files and streams with no record batches
    
    Re: #3871, [ARROW-2119](https://issues.apache.org/jira/browse/ARROW-2119), and closes [ARROW-5396](https://issues.apache.org/jira/browse/ARROW-5396).
    
    This PR updates the JS Readers and Writers to support files and streams with no RecordBatches. The approach here is two-fold:
    
    1. If the Readers' source message stream terminates after reading the Schema message, the Reader will yield a dummy zero-length RecordBatch with the schema.
    2. The Writer always writes the schema for any RecordBatch, but skips writing the RecordBatch field metadata if it's empty.
    
    This is necessary because the reader and writer don't know about each other when they're communicating via the Node and DOM stream i/o primitives; they only know about the values pushed through the streams. Since the RecordBatchReader and Writer don't yield the Schema message as a standalone value, we pump the stream with a zero-length RecordBatch that contains the schema instead.
    
    Author: ptaylor <pa...@me.com>
    Author: Wes McKinney <we...@apache.org>
    
    Closes #4373 from trxcllnt/js/fix-no-record-batches and squashes the following commits:
    
    c86069634 <Wes McKinney> Run no-batches integration test for JS also
    86d192d5b <ptaylor> define an _InternalEmptyRecordBatch class to signal that the reader source stream has no RecordBatches
    193b08de5 <ptaylor> ensure reader and writer support the case where a stream or file has a schema but no recordbatches
---
 integration/integration_test.py |  1 -
 js/src/ipc/message.ts           | 10 ++++------
 js/src/ipc/reader.ts            | 10 +++++++++-
 js/src/ipc/writer.ts            |  6 ++++--
 js/src/recordbatch.ts           | 11 +++++++++++
 5 files changed, 28 insertions(+), 10 deletions(-)

diff --git a/integration/integration_test.py b/integration/integration_test.py
index 0a508e8..0c5aee4 100644
--- a/integration/integration_test.py
+++ b/integration/integration_test.py
@@ -1031,7 +1031,6 @@ def get_generated_json_files(tempdir=None, flight=False):
 
     file_objs = [
         (generate_primitive_case([], name='primitive_no_batches')
-         .skip_category('JS')
          .skip_category('Java')),
         generate_primitive_case([17, 20], name='primitive'),
         generate_primitive_case([0, 0, 0], name='primitive_zerolength'),
diff --git a/js/src/ipc/message.ts b/js/src/ipc/message.ts
index 194e4ac..c8d3b76 100644
--- a/js/src/ipc/message.ts
+++ b/js/src/ipc/message.ts
@@ -165,21 +165,19 @@ export class JSONMessageReader extends MessageReader {
         this._json = source instanceof ArrowJSON ? source : new ArrowJSON(source);
     }
     public next() {
-        const { _json, _batchIndex, _dictionaryIndex } = this;
-        const numBatches = _json.batches.length;
-        const numDictionaries = _json.dictionaries.length;
+        const { _json } = this;
         if (!this._schema) {
             this._schema = true;
             const message = Message.fromJSON(_json.schema, MessageHeader.Schema);
-            return { value: message, done: _batchIndex >= numBatches && _dictionaryIndex >= numDictionaries };
+            return { done: false, value: message };
         }
-        if (_dictionaryIndex < numDictionaries) {
+        if (this._dictionaryIndex < _json.dictionaries.length) {
             const batch = _json.dictionaries[this._dictionaryIndex++];
             this._body = batch['data']['columns'];
             const message = Message.fromJSON(batch, MessageHeader.DictionaryBatch);
             return { done: false, value: message };
         }
-        if (_batchIndex < numBatches) {
+        if (this._batchIndex < _json.batches.length) {
             const batch = _json.batches[this._batchIndex++];
             this._body = batch['columns'];
             const message = Message.fromJSON(batch, MessageHeader.RecordBatch);
diff --git a/js/src/ipc/reader.ts b/js/src/ipc/reader.ts
index 78c4036..6778c50 100644
--- a/js/src/ipc/reader.ts
+++ b/js/src/ipc/reader.ts
@@ -22,12 +22,12 @@ import { Footer } from './metadata/file';
 import { Schema, Field } from '../schema';
 import streamAdapters from '../io/adapters';
 import { Message } from './metadata/message';
-import { RecordBatch } from '../recordbatch';
 import * as metadata from './metadata/message';
 import { ArrayBufferViewInput } from '../util/buffer';
 import { ByteStream, AsyncByteStream } from '../io/stream';
 import { RandomAccessFile, AsyncRandomAccessFile } from '../io/file';
 import { VectorLoader, JSONVectorLoader } from '../visitor/vectorloader';
+import { RecordBatch, _InternalEmptyPlaceholderRecordBatch } from '../recordbatch';
 import {
     FileHandle,
     ArrowJSONLike,
@@ -438,6 +438,10 @@ class RecordBatchStreamReaderImpl<T extends { [key: string]: DataType } = any> e
                 this.dictionaries.set(header.id, vector);
             }
         }
+        if (this.schema && this._recordBatchIndex === 0) {
+            this._recordBatchIndex++;
+            return { done: false, value: new _InternalEmptyPlaceholderRecordBatch<T>(this.schema) };
+        }
         return this.return();
     }
     protected _readNextMessageAndValidate<T extends MessageHeader>(type?: T | null) {
@@ -508,6 +512,10 @@ class AsyncRecordBatchStreamReaderImpl<T extends { [key: string]: DataType } = a
                 this.dictionaries.set(header.id, vector);
             }
         }
+        if (this.schema && this._recordBatchIndex === 0) {
+            this._recordBatchIndex++;
+            return { done: false, value: new _InternalEmptyPlaceholderRecordBatch<T>(this.schema) };
+        }
         return await this.return();
     }
     protected async _readNextMessageAndValidate<T extends MessageHeader>(type?: T | null) {
diff --git a/js/src/ipc/writer.ts b/js/src/ipc/writer.ts
index d6394d1..803799f 100644
--- a/js/src/ipc/writer.ts
+++ b/js/src/ipc/writer.ts
@@ -22,7 +22,6 @@ import { Column } from '../column';
 import { Schema, Field } from '../schema';
 import { Chunked } from '../vector/chunked';
 import { Message } from './metadata/message';
-import { RecordBatch } from '../recordbatch';
 import * as metadata from './metadata/message';
 import { DataType, Dictionary } from '../type';
 import { FileBlock, Footer } from './metadata/file';
@@ -32,6 +31,7 @@ import { VectorAssembler } from '../visitor/vectorassembler';
 import { JSONTypeAssembler } from '../visitor/jsontypeassembler';
 import { JSONVectorAssembler } from '../visitor/jsonvectorassembler';
 import { ArrayBufferViewInput, toUint8Array } from '../util/buffer';
+import { RecordBatch, _InternalEmptyPlaceholderRecordBatch } from '../recordbatch';
 import { Writable, ReadableInterop, ReadableDOMStreamOptions } from '../io/interfaces';
 import { isPromise, isAsyncIterable, isWritableDOMStream, isWritableNodeStream, isIterable } from '../util/compat';
 
@@ -162,7 +162,9 @@ export class RecordBatchWriter<T extends { [key: string]: DataType } = any> exte
         }
 
         if (payload instanceof RecordBatch) {
-            this._writeRecordBatch(payload);
+            if (!(payload instanceof _InternalEmptyPlaceholderRecordBatch)) {
+                this._writeRecordBatch(payload);
+            }
         } else if (payload instanceof Table) {
             this.writeAll(payload.chunks);
         } else if (isIterable(payload)) {
diff --git a/js/src/recordbatch.ts b/js/src/recordbatch.ts
index 9c8169b..052d5ca 100644
--- a/js/src/recordbatch.ts
+++ b/js/src/recordbatch.ts
@@ -99,3 +99,14 @@ export class RecordBatch<T extends { [key: string]: DataType } = any>
         return new RecordBatch<{ [key: string]: K }>(schema, this.length, childData);
     }
 }
+
+/**
+ * @ignore
+ * @private
+ */
+/* tslint:disable:class-name */
+export class _InternalEmptyPlaceholderRecordBatch<T extends { [key: string]: DataType } = any> extends RecordBatch<T> {
+    constructor(schema: Schema<T>) {
+        super(schema, 0, schema.fields.map((f) => Data.new(f.type, 0, 0, 0)));
+    }
+}