You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by bh...@apache.org on 2018/07/05 19:19:56 UTC
[arrow] branch master updated: ARROW-2779: [JS] stream reader fixes
This is an automated email from the ASF dual-hosted git repository.
bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new d5d39f7 ARROW-2779: [JS] stream reader fixes
d5d39f7 is described below
commit d5d39f770047d671e4879369dd680c69afc370c3
Author: ptaylor <pa...@me.com>
AuthorDate: Thu Jul 5 12:19:59 2018 -0700
ARROW-2779: [JS] stream reader fixes
This is a stop-gap fix for node-stream compatibility. Some APIs (like node's `http` module) still only accept Buffer instances, so this PR emits those if we're in node. Also includes a small fix in `fromReadableStream` to ensure it doesn't try to read the length if there's not enough bytes.
Author: ptaylor <pa...@me.com>
Closes #2201 from trxcllnt/js-stream-reader-fixes and squashes the following commits:
d229a558 <ptaylor> Merge branch 'master' into js-stream-reader-fixes
2c2e6949 <ptaylor> emit native Buffers in node, else Uint8Arrays
ab02407f <ptaylor> silence node's experimental async-iterator warnings
f4835672 <ptaylor> Merge branch 'js-stream-reader-fixes' of github.com:trxcllnt/arrow into js-stream-reader-fixes
34251043 <ptaylor> Update dependencies
481d15aa <ptaylor> guard against reading data from 0-length buffer
d4118712 <ptaylor> guard against reading data from 0-length buffer
9b199c39 <ptaylor> finish implementing unions in the JS reader
953232c0 <ptaylor> write schema metadata
---
integration/integration_test.py | 6 +++---
js/lerna.json | 2 +-
js/package.json | 48 ++++++++++++++++++++---------------------
js/src/data.ts | 30 +++++++++++++-------------
js/src/ipc/reader/node.ts | 4 ++--
js/src/util/node.ts | 19 +++++++++++-----
js/src/vector.ts | 46 +++++++++++++++++++--------------------
js/src/vector/chunked.ts | 4 ++--
js/src/vector/dictionary.ts | 2 +-
js/src/vector/list.ts | 4 ++--
js/test/Arrow.ts | 2 ++
11 files changed, 89 insertions(+), 78 deletions(-)
diff --git a/integration/integration_test.py b/integration/integration_test.py
index 173fe54..8914580 100644
--- a/integration/integration_test.py
+++ b/integration/integration_test.py
@@ -1123,21 +1123,21 @@ class JSTester(Tester):
return self._run(self.VALIDATE, arrow_path, json_path, 'VALIDATE')
def json_to_file(self, json_path, arrow_path):
- cmd = ['node', self.JSON_TO_ARROW, '-a', arrow_path, '-j', json_path]
+ cmd = ['node', '--no-warnings', self.JSON_TO_ARROW, '-a', arrow_path, '-j', json_path]
cmd = ' '.join(cmd)
if self.debug:
print(cmd)
os.system(cmd)
def stream_to_file(self, stream_path, file_path):
- cmd = ['cat', stream_path, '|', 'node', self.STREAM_TO_FILE, '>', file_path]
+ cmd = ['cat', stream_path, '|', 'node', '--no-warnings', self.STREAM_TO_FILE, '>', file_path]
cmd = ' '.join(cmd)
if self.debug:
print(cmd)
os.system(cmd)
def file_to_stream(self, file_path, stream_path):
- cmd = ['cat', file_path, '|', 'node', self.FILE_TO_STREAM, '>', stream_path]
+ cmd = ['cat', file_path, '|', 'node', '--no-warnings', self.FILE_TO_STREAM, '>', stream_path]
cmd = ' '.join(cmd)
if self.debug:
print(cmd)
diff --git a/js/lerna.json b/js/lerna.json
index 0bf16fd..c8e1051 100644
--- a/js/lerna.json
+++ b/js/lerna.json
@@ -1,5 +1,5 @@
{
- "lerna": "2.0.0",
+ "lerna": "2.11.0",
"version": "0.1.1",
"packages": [
"targets/ts",
diff --git a/js/package.json b/js/package.json
index 1586939..60129e3 100644
--- a/js/package.json
+++ b/js/package.json
@@ -53,15 +53,15 @@
"npm-release.sh"
],
"dependencies": {
- "@types/flatbuffers": "1.6.5",
- "@types/node": "10.0.8",
+ "@types/flatbuffers": "1.9.0",
+ "@types/node": "10.5.1",
"@types/text-encoding-utf-8": "1.0.1",
- "command-line-args": "5.0.1",
- "command-line-usage": "4.1.0",
- "flatbuffers": "trxcllnt/flatbuffers-esm",
+ "command-line-args": "5.0.2",
+ "command-line-usage": "5.0.5",
+ "flatbuffers": "1.9.0",
"json-bignum": "0.0.3",
"text-encoding-utf-8": "1.0.2",
- "tslib": "1.9.0"
+ "tslib": "1.9.3"
},
"devDependencies": {
"@std/esm": "0.26.0",
@@ -69,37 +69,37 @@
"@types/jest": "22.2.3",
"babel-jest": "22.4.3",
"benchmark": "2.1.4",
- "coveralls": "3.0.0",
+ "coveralls": "3.0.2",
"del": "3.0.0",
"glob": "7.1.2",
- "google-closure-compiler": "20180506.0.0",
+ "google-closure-compiler": "20180610.0.2",
"gulp": "github:gulpjs/gulp#6d71a658c61edb3090221579d8f97dbe086ba2ed",
"gulp-json-transform": "0.4.5",
- "gulp-rename": "1.2.2",
- "gulp-sourcemaps": "2.6.3",
- "gulp-typescript": "3.2.4",
- "ix": "2.3.4",
+ "gulp-rename": "1.3.0",
+ "gulp-sourcemaps": "2.6.4",
+ "gulp-typescript": "4.0.2",
+ "ix": "2.3.5",
"jest": "22.4.3",
"jest-environment-node-debug": "2.0.0",
"json": "9.0.6",
- "lerna": "2.7.1",
- "lint-staged": "6.0.1",
- "merge2": "1.2.1",
+ "lerna": "2.11.0",
+ "lint-staged": "7.2.0",
+ "merge2": "1.2.2",
"mkdirp": "0.5.1",
- "npm-run-all": "4.1.2",
- "pump": "1.0.2",
+ "npm-run-all": "4.1.3",
+ "pump": "3.0.0",
"rimraf": "2.6.2",
"rxjs": "5.5.6",
- "shx": "0.2.2",
+ "shx": "0.3.1",
"source-map-loader": "0.2.3",
- "trash": "4.2.1",
+ "trash": "4.3.0",
"ts-jest": "22.4.6",
- "ts-node": "6.0.3",
- "tslint": "5.9.1",
- "typedoc": "0.10.0",
- "typescript": "2.7.1",
+ "ts-node": "7.0.0",
+ "tslint": "5.10.0",
+ "typedoc": "0.11.1",
+ "typescript": "2.9.2",
"uglifyjs-webpack-plugin": "1.1.6",
- "webpack": "3.10.0",
+ "webpack": "4.14.0",
"xml2js": "0.4.19"
},
"@std/esm": {
diff --git a/js/src/data.ts b/js/src/data.ts
index 2c79151..5a11759 100644
--- a/js/src/data.ts
+++ b/js/src/data.ts
@@ -86,8 +86,8 @@ export class BaseData<T extends DataType = DataType> implements VectorLike {
}
return nullCount;
}
- public clone<R extends T>(type: R, length = this.length, offset = this.offset, nullCount = this._nullCount) {
- return new BaseData(type, length, offset, nullCount);
+ public clone<R extends T>(type: R, length = this.length, offset = this.offset, nullCount = this._nullCount): Data<R> {
+ return new BaseData(type, length, offset, nullCount) as any;
}
public slice(offset: number, length: number) {
return length <= 0 ? this : this.sliceInternal(this.clone(
@@ -180,8 +180,8 @@ export class NestedData<T extends NestedType = NestedType> extends BaseData<T> {
this.childData = childData;
this[VectorType.VALIDITY] = toTypedArray(Uint8Array, nullBitmap);
}
- public clone<R extends T>(type: R, length = this.length, offset = this.offset, nullCount = this._nullCount) {
- return new NestedData<R>(type, length, this[VectorType.VALIDITY], this.childData, offset, nullCount);
+ public clone<R extends T>(type: R, length = this.length, offset = this.offset, nullCount = this._nullCount): Data<R> {
+ return new NestedData<R>(type, length, this[VectorType.VALIDITY], this.childData, offset, nullCount) as any;
}
protected sliceInternal(clone: this, offset: number, length: number) {
if (!this[VectorType.OFFSET]) {
@@ -208,8 +208,8 @@ export class ListData<T extends ListType> extends SingleNestedData<T> {
super(type, length, nullBitmap, valueChildData, offset, nullCount);
this[VectorType.OFFSET] = toTypedArray(Int32Array, valueOffsets);
}
- public clone<R extends T>(type: R, length = this.length, offset = this.offset, nullCount = this._nullCount) {
- return new ListData<R>(type, length, this[VectorType.VALIDITY], this[VectorType.OFFSET], this._valuesData as any, offset, nullCount);
+ public clone<R extends T>(type: R, length = this.length, offset = this.offset, nullCount = this._nullCount): Data<R> {
+ return new ListData(type, length, this[VectorType.VALIDITY], this[VectorType.OFFSET], this._valuesData as any, offset, nullCount) as any;
}
}
@@ -224,8 +224,8 @@ export class UnionData<T extends (DenseUnion | SparseUnion) = any> extends Neste
return (typeIdToChildIndex[typeId] = idx) && typeIdToChildIndex || typeIdToChildIndex;
}, Object.create(null) as { [key: number]: number });
}
- public clone<R extends T>(type: R, length = this.length, offset = this.offset, nullCount = this._nullCount) {
- return new UnionData<R>(type, length, this[VectorType.VALIDITY], this[VectorType.TYPE], this.childData, offset, nullCount);
+ public clone<R extends T>(type: R, length = this.length, offset = this.offset, nullCount = this._nullCount): Data<R> {
+ return new UnionData<R>(type, length, this[VectorType.VALIDITY], this[VectorType.TYPE], this.childData, offset, nullCount) as any;
}
}
@@ -233,7 +233,7 @@ export class SparseUnionData extends UnionData<SparseUnion> {
constructor(type: SparseUnion, length: number, nullBitmap: Uint8Array | null | undefined, typeIds: Iterable<number>, childData: Data<any>[], offset?: number, nullCount?: number) {
super(type, length, nullBitmap, typeIds, childData, offset, nullCount);
}
- public clone<R extends SparseUnion>(type: R, length = this.length, offset = this.offset, nullCount = this._nullCount) {
+ public clone<R extends SparseUnion>(type: R, length = this.length, offset = this.offset, nullCount = this._nullCount): Data<R> {
return new SparseUnionData(
type,
length,
@@ -241,7 +241,7 @@ export class SparseUnionData extends UnionData<SparseUnion> {
this[VectorType.TYPE],
this.childData,
offset, nullCount
- ) as any as UnionData<R>;
+ ) as any;
}
}
@@ -252,7 +252,7 @@ export class DenseUnionData extends UnionData<DenseUnion> {
super(type, length, nullBitmap, typeIds, childData, offset, nullCount);
this[VectorType.OFFSET] = toTypedArray(Int32Array, valueOffsets);
}
- public clone<R extends DenseUnion>(type: R, length = this.length, offset = this.offset, nullCount = this._nullCount) {
+ public clone<R extends DenseUnion>(type: R, length = this.length, offset = this.offset, nullCount = this._nullCount): Data<R> {
return new DenseUnionData(
type,
length,
@@ -261,7 +261,7 @@ export class DenseUnionData extends UnionData<DenseUnion> {
this[VectorType.OFFSET],
this.childData,
offset, nullCount
- ) as any as UnionData<R>;
+ ) as any;
}
}
@@ -288,12 +288,12 @@ export class ChunkedData<T extends DataType> extends BaseData<T> {
}
return nullCount;
}
- public clone<R extends T>(type: R, length = this.length, offset = this.offset, nullCount = this._nullCount) {
- return new ChunkedData<R>(
+ public clone<R extends T>(type: R, length = this.length, offset = this.offset, nullCount = this._nullCount): Data<R> {
+ return new ChunkedData(
type, length,
this._chunkVectors.map((vec) => vec.clone(vec.data.clone(type))) as any,
offset, nullCount, this._chunkOffsets
- );
+ ) as any;
}
protected sliceInternal(clone: this, offset: number, length: number) {
const chunks = this._chunkVectors;
diff --git a/js/src/ipc/reader/node.ts b/js/src/ipc/reader/node.ts
index 7fbd7bf..8a455e9 100644
--- a/js/src/ipc/reader/node.ts
+++ b/js/src/ipc/reader/node.ts
@@ -50,7 +50,7 @@ export async function* fromReadableStream(stream: NodeJS.ReadableStream) {
return yield bytes;
}
- if (messageLength <= 0) {
+ if (bytes.byteLength > 0 && messageLength <= 0) {
messageLength = new DataView(bytes.buffer).getInt32(0, true);
}
@@ -66,7 +66,7 @@ export async function* fromReadableStream(stream: NodeJS.ReadableStream) {
bytesRead += messageLength + PADDING;
yield bytes.subarray(0, messageLength + PADDING);
bytes = bytes.subarray(messageLength + PADDING);
- messageLength = bytes.byteLength <= 0 ? 0 :
+ messageLength = bytes.byteLength < 4 ? 0 :
new DataView(bytes.buffer).getInt32(bytes.byteOffset, true);
message = null;
}
diff --git a/js/src/util/node.ts b/js/src/util/node.ts
index 857765c..e588cb7 100644
--- a/js/src/util/node.ts
+++ b/js/src/util/node.ts
@@ -27,7 +27,7 @@ export class PipeIterator<T> implements IterableIterator<T> {
if ((res = this.next()).done) break;
} while (emit(stream, encoding, res.value));
}
- return wait(stream, encoding, res && res.done, write);
+ return wait(stream, res && res.done, write);
};
write();
return stream;
@@ -62,23 +62,32 @@ export class AsyncPipeIterator<T> implements AsyncIterableIterator<T> {
if ((res = await this.next()).done) break;
} while (emit(stream, encoding, res.value));
}
- return wait(stream, encoding, res && res.done, write);
+ return wait(stream, res && res.done, write);
};
write();
return stream;
}
}
+const toBufferOrUint8Array = (() => {
+ // If in node, convert Uint8Arrays to Buffer instances. This is necessary
+ // because some node APIs ('http' etc.) don't work unless you give them Buffers.
+ // This eval also defeats closure-compiler, which doesn't recognize the Buffer constructor.
+ const BufferCtor = eval('typeof Buffer !== "undefined" ? Buffer : null');
+ return !BufferCtor ? (arr: Uint8Array) => arr :
+ (arr: Uint8Array) => BufferCtor.from(arr.buffer, arr.byteOffset, arr.byteLength);
+})();
+
function emit(stream: NodeJS.WritableStream, encoding: string, value: any) {
- return stream['write']((encoding === 'utf8' ? value + '\n' : value) as any, encoding);
+ return stream['write']((encoding === 'utf8' ? value + '\n' : toBufferOrUint8Array(value)) as any, encoding);
}
-function wait(stream: NodeJS.WritableStream, encoding: string, done: boolean, write: (x?: any) => void) {
+function wait(stream: NodeJS.WritableStream, done: boolean, write: (x?: any) => void) {
const p = eval('process'); // defeat closure compiler
if (!done) {
stream['once']('error', write);
stream['once']('drain', write);
} else if (!(!p || stream === p.stdout) && !(stream as any)['isTTY']) {
- stream['end'](<any> (encoding === 'utf8' ? '\n' : new Uint8Array(0)));
+ stream['end'](<any> null);
}
}
diff --git a/js/src/vector.ts b/js/src/vector.ts
index 40d8faa..8eb591b 100644
--- a/js/src/vector.ts
+++ b/js/src/vector.ts
@@ -49,7 +49,7 @@ export class Vector<T extends DataType = any> implements VectorLike, View<T>, Vi
this.length = data.length;
let nulls: Uint8Array;
if ((<any> data instanceof ChunkedData) && !(view instanceof ChunkedView)) {
- this.view = new ChunkedView(data);
+ this.view = new ChunkedView(data as any) as any;
} else if (!(view instanceof ValidityView) && (nulls = data.nullBitmap!) && nulls.length > 0 && data.nullCount > 0) {
this.view = new ValidityView(data, view);
} else {
@@ -159,12 +159,12 @@ export abstract class NestedVector<T extends NestedType> extends Vector<T> {
return data as Data<any>[];
} else if (!(<any> (data = this.data) instanceof ChunkedData)) {
// If data isn't chunked, cache and return NestedData's childData
- return this._childData = (data as NestedData<T>).childData;
+ return this._childData = data.childData;
}
// Otherwise if the data is chunked, concatenate the childVectors from each chunk
// to construct a single chunked Vector for each column. Then return the ChunkedData
// instance from each unified chunked column as the childData of a chunked NestedVector
- const chunks = ((data as ChunkedData<T>).chunkVectors as NestedVector<T>[]);
+ const chunks = ((data as any as ChunkedData<T>).chunkVectors as NestedVector<T>[]);
return this._childData = chunks
.reduce<(Vector<T> | null)[][]>((cols, chunk) => chunk.childData
.reduce<(Vector<T> | null)[][]>((cols, _, i) => (
@@ -197,7 +197,7 @@ export class NullVector extends Vector<Null> {
export class BoolVector extends Vector<Bool> {
public static from(data: IterableArrayLike<boolean>) {
- return new BoolVector(new BoolData(new Bool(), data.length, null, packBools(data)));
+ return new BoolVector(new BoolData(new Bool(), data.length, null, packBools(data)) as Data<Bool>);
}
public get values() { return this.data.values; }
constructor(data: Data<Bool>, view: View<Bool> = new BoolView(data)) {
@@ -360,7 +360,7 @@ export class Utf8Vector extends ListVectorBase<Utf8> {
export class ListVector<T extends DataType = DataType> extends ListVectorBase<List<T>> {
// @ts-ignore
public readonly view: ListView<T>;
- constructor(data: Data<T>, view: View<List<T>> = new ListView(data)) {
+ constructor(data: Data<List<T>>, view: ListView<T> = new ListView<T>(data as any)) {
super(data, view);
}
public getChildAt(index: number): Vector<T> | null {
@@ -439,22 +439,22 @@ export const createVector = ((VectorLoader: new <T extends DataType>(data: Data<
<T extends DataType>(data: Data<T>) => TypeVisitor.visitTypeInline(new VectorLoader(data), data.type) as Vector<T>
))(class VectorLoader<T extends DataType> extends TypeVisitor {
constructor(private data: Data<T>) { super(); }
- visitNull (_type: Null) { return new NullVector(this.data); }
- visitInt (_type: Int) { return new IntVector(this.data); }
- visitFloat (_type: Float) { return new FloatVector(this.data); }
- visitBinary (_type: Binary) { return new BinaryVector(this.data); }
- visitUtf8 (_type: Utf8) { return new Utf8Vector(this.data); }
- visitBool (_type: Bool) { return new BoolVector(this.data); }
- visitDecimal (_type: Decimal) { return new DecimalVector(this.data); }
- visitDate (_type: Date_) { return new DateVector(this.data); }
- visitTime (_type: Time) { return new TimeVector(this.data); }
- visitTimestamp (_type: Timestamp) { return new TimestampVector(this.data); }
- visitInterval (_type: Interval) { return new IntervalVector(this.data); }
- visitList (_type: List) { return new ListVector(this.data); }
- visitStruct (_type: Struct) { return new StructVector(this.data); }
- visitUnion (_type: Union) { return new UnionVector(this.data); }
- visitFixedSizeBinary(_type: FixedSizeBinary) { return new FixedSizeBinaryVector(this.data); }
- visitFixedSizeList (_type: FixedSizeList) { return new FixedSizeListVector(this.data); }
- visitMap (_type: Map_) { return new MapVector(this.data); }
- visitDictionary (_type: Dictionary) { return new DictionaryVector(this.data); }
+ visitNull (_type: Null) { return new NullVector(<any> this.data); }
+ visitInt (_type: Int) { return new IntVector(<any> this.data); }
+ visitFloat (_type: Float) { return new FloatVector(<any> this.data); }
+ visitBinary (_type: Binary) { return new BinaryVector(<any> this.data); }
+ visitUtf8 (_type: Utf8) { return new Utf8Vector(<any> this.data); }
+ visitBool (_type: Bool) { return new BoolVector(<any> this.data); }
+ visitDecimal (_type: Decimal) { return new DecimalVector(<any> this.data); }
+ visitDate (_type: Date_) { return new DateVector(<any> this.data); }
+ visitTime (_type: Time) { return new TimeVector(<any> this.data); }
+ visitTimestamp (_type: Timestamp) { return new TimestampVector(<any> this.data); }
+ visitInterval (_type: Interval) { return new IntervalVector(<any> this.data); }
+ visitList (_type: List) { return new ListVector(<any> this.data); }
+ visitStruct (_type: Struct) { return new StructVector(<any> this.data); }
+ visitUnion (_type: Union) { return new UnionVector(<any> this.data); }
+ visitFixedSizeBinary(_type: FixedSizeBinary) { return new FixedSizeBinaryVector(<any> this.data); }
+ visitFixedSizeList (_type: FixedSizeList) { return new FixedSizeListVector(<any> this.data); }
+ visitMap (_type: Map_) { return new MapVector(<any> this.data); }
+ visitDictionary (_type: Dictionary) { return new DictionaryVector(<any> this.data); }
});
diff --git a/js/src/vector/chunked.ts b/js/src/vector/chunked.ts
index 7876bba..8e96d34 100644
--- a/js/src/vector/chunked.ts
+++ b/js/src/vector/chunked.ts
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-import { ChunkedData } from '../data';
+import { ChunkedData, Data } from '../data';
import { View, Vector, NestedVector } from '../vector';
import { DataType, TypedArray, IterableArrayLike } from '../type';
@@ -28,7 +28,7 @@ export class ChunkedView<T extends DataType> implements View<T> {
this.chunkVectors = data.chunkVectors;
this.chunkOffsets = data.chunkOffsets;
}
- public clone(data: ChunkedData<T>): this {
+ public clone(data: ChunkedData<T> & Data<T>): this {
return new ChunkedView(data) as this;
}
public *[Symbol.iterator](): IterableIterator<T['TValue'] | null> {
diff --git a/js/src/vector/dictionary.ts b/js/src/vector/dictionary.ts
index 21f9bac..7014cda 100644
--- a/js/src/vector/dictionary.ts
+++ b/js/src/vector/dictionary.ts
@@ -26,7 +26,7 @@ export class DictionaryView<T extends DataType> implements View<T> {
this.indices = indices;
this.dictionary = dictionary;
}
- public clone(data: Data<Dictionary<T>>): this {
+ public clone(data: Data<Dictionary<T>> & Data<T>): this {
return new DictionaryView(data.dictionary, this.indices.clone(data.indices)) as this;
}
public isValid(index: number): boolean {
diff --git a/js/src/vector/list.ts b/js/src/vector/list.ts
index f1283f4..8e7560e 100644
--- a/js/src/vector/list.ts
+++ b/js/src/vector/list.ts
@@ -83,8 +83,8 @@ export abstract class VariableListViewBase<T extends (ListType | FlatListType)>
export class ListView<T extends DataType> extends VariableListViewBase<List<T>> {
public values: Vector<T>;
constructor(data: Data<T>) {
- super(data);
- this.values = createVector(data.values);
+ super(data as any);
+ this.values = createVector((data as any).values);
}
public getChildAt<R extends T = T>(index: number): Vector<R> | null {
return index === 0 ? (this.values as Vector<R>) : null;
diff --git a/js/test/Arrow.ts b/js/test/Arrow.ts
index 36f98d4..4aac952 100644
--- a/js/test/Arrow.ts
+++ b/js/test/Arrow.ts
@@ -18,6 +18,8 @@
/* tslint:disable */
// Dynamically load an Arrow target build based on command line arguments
+(<any> global).window = (<any> global).window || global;
+
// Fix for Jest in node v10.x
Object.defineProperty(ArrayBuffer, Symbol.hasInstance, {
writable: true,