You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by mt...@apache.org on 2016/02/06 19:52:33 UTC
[1/2] avro git commit: AVRO-1778. JavaScript: Add IPC/RPC support.
Repository: avro
Updated Branches:
refs/heads/master a50068f11 -> 9101a42ba
http://git-wip-us.apache.org/repos/asf/avro/blob/9101a42b/lang/js/test/test_protocols.js
----------------------------------------------------------------------
diff --git a/lang/js/test/test_protocols.js b/lang/js/test/test_protocols.js
new file mode 100644
index 0000000..08ef784
--- /dev/null
+++ b/lang/js/test/test_protocols.js
@@ -0,0 +1,1392 @@
+/* jshint node: true, mocha: true */
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+'use strict';
+
+var protocols = require('../lib/protocols'),
+ utils = require('../lib/utils'),
+ assert = require('assert'),
+ stream = require('stream'),
+ util = require('util');
+
+
+var HANDSHAKE_REQUEST_TYPE = protocols.HANDSHAKE_REQUEST_TYPE;
+var HANDSHAKE_RESPONSE_TYPE = protocols.HANDSHAKE_RESPONSE_TYPE;
+var createProtocol = protocols.createProtocol;
+
+
+suite('protocols', function () {
+
+ suite('Protocol', function () {
+
+ test('get name and types', function () {
+ var p = createProtocol({
+ namespace: 'foo',
+ protocol: 'HelloWorld',
+ types: [
+ {
+ name: 'Greeting',
+ type: 'record',
+ fields: [{name: 'message', type: 'string'}]
+ },
+ {
+ name: 'Curse',
+ type: 'error',
+ fields: [{name: 'message', type: 'string'}]
+ }
+ ],
+ messages: {
+ hello: {
+ request: [{name: 'greeting', type: 'Greeting'}],
+ response: 'Greeting',
+ errors: ['Curse']
+ },
+ hi: {
+ request: [{name: 'hey', type: 'string'}],
+ response: 'null',
+ 'one-way': true
+ }
+ }
+ });
+ assert.equal(p.getName(), 'foo.HelloWorld');
+ assert.equal(p.getType('foo.Greeting').getName(true), 'record');
+ });
+
+ test('missing message', function () {
+ var ptcl = createProtocol({namespace: 'com.acme', protocol: 'Hello'});
+ assert.throws(function () {
+ ptcl.on('add', function () {});
+ }, /unknown/);
+ });
+
+ test('missing name', function () {
+ assert.throws(function () {
+ createProtocol({namespace: 'com.acme', messages: {}});
+ });
+ });
+
+ test('missing type', function () {
+ assert.throws(function () {
+ createProtocol({
+ namespace: 'com.acme',
+ protocol: 'HelloWorld',
+ messages: {
+ hello: {
+ request: [{name: 'greeting', type: 'Greeting'}],
+ response: 'Greeting'
+ }
+ }
+ });
+ });
+ });
+
+ test('get messages', function () {
+ var ptcl;
+ ptcl = createProtocol({protocol: 'Empty'});
+ assert.deepEqual(ptcl.getMessages(), {});
+ ptcl = createProtocol({
+ protocol: 'Ping',
+ messages: {
+ ping: {
+ request: [],
+ response: 'string'
+ }
+ }
+ });
+ var messages = ptcl.getMessages();
+ assert.equal(Object.keys(messages).length, 1);
+ assert(messages.ping !== undefined);
+ });
+
+ test('create listener', function (done) {
+ var ptcl = createProtocol({protocol: 'Empty'});
+ var transport = new stream.PassThrough();
+ var ee = ptcl.createListener(transport, function (pending) {
+ assert.equal(pending, 0);
+ done();
+ });
+ ee.destroy();
+ });
+
+ test('subprotocol', function () {
+ var ptcl = createProtocol({namespace: 'com.acme', protocol: 'Hello'});
+ var subptcl = ptcl.subprotocol();
+ assert.strictEqual(subptcl._emitterResolvers, ptcl._emitterResolvers);
+ assert.strictEqual(subptcl._listenerResolvers, ptcl._listenerResolvers);
+ });
+
+ test('invalid emitter', function (done) {
+ var ptcl = createProtocol({protocol: 'Empty'});
+ ptcl.emit('hi', {}, null, function (err) {
+ assert(/invalid emitter/.test(err.string));
+ done();
+ });
+ });
+
+ test('inspect', function () {
+ var p = createProtocol({
+ namespace: 'hello',
+ protocol: 'World',
+ });
+ assert.equal(p.inspect(), '<Protocol "hello.World">');
+ });
+
+ });
+
+ suite('Message', function () {
+
+ var Message = protocols.Message;
+
+ test('empty errors', function () {
+ var m = new Message('Hi', {
+ request: [{name: 'greeting', type: 'string'}],
+ response: 'int'
+ });
+ assert.deepEqual(m.errorType.toString(), '["string"]');
+ });
+
+ test('missing response', function () {
+ assert.throws(function () {
+ new Message('Hi', {
+ request: [{name: 'greeting', type: 'string'}]
+ });
+ });
+ });
+
+ test('invalid one-way', function () {
+ // Non-null response.
+ assert.throws(function () {
+ new Message('Hi', {
+ request: [{name: 'greeting', type: 'string'}],
+ response: 'string',
+ 'one-way': true
+ });
+ });
+ // Non-empty errors.
+ assert.throws(function () {
+ new Message('Hi', {
+ request: [{name: 'greeting', type: 'string'}],
+ response: 'null',
+ errors: ['int'],
+ 'one-way': true
+ });
+ });
+ });
+
+ });
+
+ suite('MessageDecoder', function () {
+
+ var MessageDecoder = protocols.streams.MessageDecoder;
+
+ test('ok', function (done) {
+ var parts = [
+ new Buffer([0, 1]),
+ new Buffer([2]),
+ new Buffer([]),
+ new Buffer([3, 4, 5]),
+ new Buffer([])
+ ];
+ var messages = [];
+ var readable = createReadableStream(parts.map(frame), true);
+ var writable = createWritableStream(messages, true)
+ .on('finish', function () {
+ assert.deepEqual(
+ messages,
+ [new Buffer([0, 1, 2]), new Buffer([3, 4, 5])]
+ );
+ done();
+ });
+ readable.pipe(new MessageDecoder()).pipe(writable);
+ });
+
+ test('trailing data', function (done) {
+ var parts = [
+ new Buffer([0, 1]),
+ new Buffer([2]),
+ new Buffer([]),
+ new Buffer([3])
+ ];
+ var messages = [];
+ var readable = createReadableStream(parts.map(frame), true);
+ var writable = createWritableStream(messages, true);
+ readable
+ .pipe(new MessageDecoder())
+ .on('error', function () {
+ assert.deepEqual(messages, [new Buffer([0, 1, 2])]);
+ done();
+ })
+ .pipe(writable);
+ });
+
+ test('empty', function (done) {
+ var readable = createReadableStream([], true);
+ readable
+ .pipe(new MessageDecoder(true))
+ .on('error', function () { done(); });
+ });
+
+ });
+
+ suite('MessageEncoder', function () {
+
+ var MessageEncoder = protocols.streams.MessageEncoder;
+
+ test('invalid frame size', function () {
+ assert.throws(function () { new MessageEncoder(); });
+ });
+
+ test('ok', function (done) {
+ var messages = [
+ new Buffer([0, 1]),
+ new Buffer([2])
+ ];
+ var frames = [];
+ var readable = createReadableStream(messages, true);
+ var writable = createWritableStream(frames, true);
+ readable
+ .pipe(new MessageEncoder(64))
+ .pipe(writable)
+ .on('finish', function () {
+ assert.deepEqual(
+ frames,
+ [
+ new Buffer([0, 0, 0, 2, 0, 1, 0, 0, 0, 0]),
+ new Buffer([0, 0, 0, 1, 2, 0, 0, 0, 0])
+ ]
+ );
+ done();
+ });
+ });
+
+ test('all zeros', function (done) {
+ var messages = [new Buffer([0, 0, 0, 0])];
+ var frames = [];
+ var readable = createReadableStream(messages, true);
+ var writable = createWritableStream(frames, true);
+ readable
+ .pipe(new MessageEncoder(64))
+ .pipe(writable)
+ .on('finish', function () {
+ assert.deepEqual(
+ frames,
+ [new Buffer([0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 0])]
+ );
+ done();
+ });
+ });
+
+ test('short frame size', function (done) {
+ var messages = [
+ new Buffer([0, 1, 2]),
+ new Buffer([2])
+ ];
+ var frames = [];
+ var readable = createReadableStream(messages, true);
+ var writable = createWritableStream(frames, true);
+ readable
+ .pipe(new MessageEncoder(2))
+ .pipe(writable)
+ .on('finish', function () {
+ assert.deepEqual(
+ frames,
+ [
+ new Buffer([0, 0, 0, 2, 0, 1, 0, 0, 0, 1, 2, 0, 0, 0, 0]),
+ new Buffer([0, 0, 0, 1, 2, 0, 0, 0, 0])
+ ]
+ );
+ done();
+ });
+ });
+
+ });
+
+ suite('StatefulEmitter', function () {
+
+ test('ok handshake', function (done) {
+ var buf = HANDSHAKE_RESPONSE_TYPE.toBuffer({match: 'BOTH'});
+ var bufs = [];
+ var ptcl = createProtocol({protocol: 'Empty'});
+ var handshake = false;
+ ptcl.createEmitter(createTransport([buf], bufs))
+ .on('handshake', function (req, res) {
+ handshake = true;
+ assert(res.match === 'BOTH');
+ assert.deepEqual(
+ Buffer.concat(bufs),
+ HANDSHAKE_REQUEST_TYPE.toBuffer({
+ clientHash: new Buffer(ptcl._hashString, 'binary'),
+ serverHash: new Buffer(ptcl._hashString, 'binary')
+ })
+ );
+ this.destroy();
+ })
+ .on('eot', function () {
+ assert(handshake);
+ done();
+ });
+ });
+
+ test('no server match handshake', function (done) {
+ var ptcl = createProtocol({protocol: 'Empty'});
+ var resBufs = [
+ {
+ match: 'NONE',
+ serverHash: {'org.apache.avro.ipc.MD5': new Buffer(16)},
+ serverProtocol: {string: ptcl.toString()},
+ },
+ {match: 'BOTH'}
+ ].map(function (val) { return HANDSHAKE_RESPONSE_TYPE.toBuffer(val); });
+ var reqBufs = [];
+ var handshakes = 0;
+ ptcl.createEmitter(createTransport(resBufs, reqBufs))
+ .on('handshake', function (req, res) {
+ if (handshakes++) {
+ assert(res.match === 'BOTH');
+ this.destroy();
+ } else {
+ assert(res.match === 'NONE');
+ }
+ })
+ .on('eot', function () {
+ assert.equal(handshakes, 2);
+ done();
+ });
+ });
+
+ test('incompatible protocol', function (done) {
+ var ptcl = createProtocol({protocol: 'Empty'});
+ var hash = new Buffer(16); // Pretend the hash was different.
+ var resBufs = [
+ {
+ match: 'NONE',
+ serverHash: {'org.apache.avro.ipc.MD5': hash},
+ serverProtocol: {string: ptcl.toString()},
+ },
+ {
+ match: 'NONE',
+ serverHash: {'org.apache.avro.ipc.MD5': hash},
+ serverProtocol: {string: ptcl.toString()},
+ meta: {map: {error: new Buffer('abcd')}}
+ }
+ ].map(function (val) { return HANDSHAKE_RESPONSE_TYPE.toBuffer(val); });
+ var error = false;
+ ptcl.createEmitter(createTransport(resBufs, []))
+ .on('error', function (err) {
+ error = true;
+ assert.equal(err.message, 'abcd');
+ })
+ .on('eot', function () {
+ assert(error);
+ done();
+ });
+ });
+
+ test('handshake error', function (done) {
+ var resBufs = [
+ new Buffer([4, 0, 0]), // Invalid handshakes.
+ new Buffer([4, 0, 0])
+ ];
+ var ptcl = createProtocol({protocol: 'Empty'});
+ var error = false;
+ ptcl.createEmitter(createTransport(resBufs, []))
+ .on('error', function (err) {
+ error = true;
+ assert.equal(err.message, 'handshake error');
+ })
+ .on('eot', function () {
+ assert(error);
+ done();
+ });
+ });
+
+ test('orphan response', function (done) {
+ var ptcl = createProtocol({protocol: 'Empty'});
+ var idType = protocols.IdType.createMetadataType();
+ var resBufs = [
+ new Buffer([0, 0, 0]), // OK handshake.
+ idType.toBuffer(23)
+ ];
+ var error = false;
+ ptcl.createEmitter(createTransport(resBufs, []))
+ .on('error', function (err) {
+ error = true;
+ assert(/orphan response:/.test(err.message));
+ })
+ .on('eot', function () {
+ assert(error);
+ done();
+ });
+ });
+
+ test('ended readable', function (done) {
+ var bufs = [];
+ var ptcl = createProtocol({protocol: 'Empty'});
+ ptcl.createEmitter(createTransport([], bufs))
+ .on('eot', function () {
+ assert.equal(bufs.length, 1); // A single handshake was sent.
+ done();
+ });
+ });
+
+ test('interrupted', function (done) {
+ var ptcl = createProtocol({
+ protocol: 'Empty',
+ messages: {
+ id: {request: [{name: 'id', type: 'int'}], response: 'int'}
+ }
+ });
+ var resBufs = [
+ new Buffer([0, 0, 0]), // OK handshake.
+ ];
+ var interrupted = 0;
+ var transport = createTransport(resBufs, []);
+ var ee = ptcl.createEmitter(transport, function () {
+ assert.equal(interrupted, 2);
+ done();
+ });
+
+ ptcl.emit('id', {id: 123}, ee, cb);
+ ptcl.emit('id', {id: 123}, ee, cb);
+
+ function cb(err) {
+ assert.deepEqual(err, {string: 'interrupted'});
+ interrupted++;
+ }
+ });
+
+ test('missing client message', function (done) {
+ var ptcl1 = createProtocol({
+ protocol: 'Ping',
+ messages: {
+ ping: {request: [], response: 'string'}
+ }
+ });
+ var ptcl2 = createProtocol({
+ protocol: 'Ping',
+ messages: {
+ ping: {request: [], response: 'string'},
+ pong: {request: [], response: 'string'}
+ }
+ }).on('ping', function (req, ee, cb) { cb(null, 'ok'); });
+ var transports = createPassthroughTransports();
+ ptcl2.createListener(transports[1]);
+ var ee = ptcl1.createEmitter(transports[0]);
+ ptcl1.emit('ping', {}, ee, function (err, res) {
+ assert.equal(res, 'ok');
+ done();
+ });
+ });
+
+ test('missing server message', function (done) {
+ var ptcl1 = createProtocol({
+ protocol: 'Ping',
+ messages: {
+ ping: {request: [], response: 'string'}
+ }
+ });
+ var ptcl2 = createProtocol({protocol: 'Empty'});
+ var transports = createPassthroughTransports();
+ ptcl2.createListener(transports[1]);
+ ptcl1.createEmitter(transports[0])
+ .on('error', function (err) {
+ assert(/missing server message: ping/.test(err.message));
+ done();
+ });
+ });
+
+ test('trailing data', function (done) {
+ var ptcl = createProtocol({
+ protocol: 'Ping',
+ messages: {
+ ping: {request: [], response: 'string'}
+ }
+ });
+ var transports = createPassthroughTransports();
+ ptcl.createEmitter(transports[0])
+ .on('error', function (err) {
+ assert(/trailing data/.test(err.message));
+ done();
+ });
+ transports[0].readable.end(new Buffer([2, 3]));
+ });
+
+ test('invalid metadata', function (done) {
+ var ptcl = createProtocol({
+ protocol: 'Ping',
+ messages: {
+ ping: {request: [], response: 'string'}
+ }
+ });
+ var transports = createPassthroughTransports();
+ ptcl.createListener(transports[1]);
+ ptcl.createEmitter(transports[0])
+ .on('error', function (err) {
+ assert(/invalid metadata:/.test(err.message));
+ done();
+ })
+ .on('handshake', function () {
+ transports[0].readable.write(frame(new Buffer([2, 3])));
+ transports[0].readable.write(frame(new Buffer(0)));
+ });
+ });
+
+ test('invalid response', function (done) {
+ var ptcl = createProtocol({
+ protocol: 'Ping',
+ messages: {
+ ping: {request: [], response: 'string'}
+ }
+ });
+ var transports = createPassthroughTransports();
+ var ml = ptcl.createListener(transports[1]);
+ var me = ptcl.createEmitter(transports[0])
+ .on('handshake', function () {
+ ml.destroy();
+
+ ptcl.emit('ping', {}, me, function (err) {
+ assert(/invalid response:/.test(err.string));
+ done();
+ });
+
+ var idType = protocols.IdType.createMetadataType();
+ var bufs = [
+ idType.toBuffer(1), // Metadata.
+ new Buffer([3]) // Invalid response.
+ ];
+ transports[0].readable.write(frame(Buffer.concat(bufs)));
+ transports[0].readable.write(frame(new Buffer(0)));
+ });
+ });
+
+ test('one way', function (done) {
+ var beats = 0;
+ var ptcl = createProtocol({
+ protocol: 'Heartbeat',
+ messages: {
+ beat: {request: [], response: 'null', 'one-way': true}
+ }
+ }).on('beat', function (req, ee, cb) {
+ assert.strictEqual(cb, undefined);
+ if (++beats === 2) {
+ done();
+ }
+ });
+ var transports = createPassthroughTransports();
+ ptcl.createListener(transports[1]);
+ var ee = ptcl.createEmitter(transports[0]);
+ ptcl.emit('beat', {}, ee);
+ ptcl.emit('beat', {}, ee);
+ });
+
+ });
+
+ suite('StatelessEmitter', function () {
+
+ test('interrupted before response data', function (done) {
+ var ptcl = createProtocol({
+ protocol: 'Ping',
+ messages: {ping: {request: [], response: 'boolean'}}
+ });
+ var readable = stream.PassThrough()
+ .on('end', done);
+ var writable = createWritableStream([]);
+ var ee = ptcl.createEmitter(function (cb) {
+ cb(readable);
+ return writable;
+ });
+ ptcl.emit('ping', {}, ee, function (err) {
+ assert(/interrupted/.test(err.string));
+ readable.write(frame(new Buffer(2)));
+ readable.end(frame(new Buffer(0)));
+ });
+ ee.destroy(true);
+ });
+
+ });
+
+ suite('StatefulListener', function () {
+
+ test('end readable', function (done) {
+ var ptcl = createProtocol({protocol: 'Empty'});
+ var transports = createPassthroughTransports();
+ ptcl.createListener(transports[0])
+ .on('eot', function (pending) {
+ assert.equal(pending, 0);
+ done();
+ });
+ transports[0].readable.end();
+ });
+
+ test('finish writable', function (done) {
+ var ptcl = createProtocol({protocol: 'Empty'});
+ var transports = createPassthroughTransports();
+ ptcl.createListener(transports[0])
+ .on('eot', function (pending) {
+ assert.equal(pending, 0);
+ done();
+ });
+ transports[0].writable.end();
+ });
+
+ test('invalid handshake', function (done) {
+ var ptcl = createProtocol({protocol: 'Empty'});
+ var transport = createTransport(
+ [new Buffer([4])], // Invalid handshake.
+ []
+ );
+ ptcl.createListener(transport)
+ .on('handshake', function (req, res) {
+ assert(!req.$isValid());
+ assert.equal(res.match, 'NONE');
+ done();
+ });
+ });
+
+ test('missing server message', function (done) {
+ var ptcl1 = createProtocol({protocol: 'Empty'});
+ var ptcl2 = createProtocol({
+ protocol: 'Heartbeat',
+ messages: {beat: {request: [], response: 'boolean'}}
+ });
+ var hash = new Buffer(ptcl2._hashString, 'binary');
+ var req = {
+ clientHash: hash,
+ clientProtocol: {string: ptcl2.toString()},
+ serverHash: hash
+ };
+ var transport = createTransport(
+ [HANDSHAKE_REQUEST_TYPE.toBuffer(req)],
+ []
+ );
+ ptcl1.createListener(transport)
+ .on('handshake', function (req, res) {
+ assert(req.$isValid());
+ assert.equal(res.match, 'NONE');
+ var msg = res.meta.map.error.toString();
+ assert(/missing server message/.test(msg));
+ done();
+ });
+ });
+
+ test('invalid metadata', function (done) {
+ var ptcl = createProtocol({
+ protocol: 'Heartbeat',
+ messages: {beat: {request: [], response: 'boolean'}}
+ });
+ var transports = createPassthroughTransports();
+ ptcl.createListener(transports[1])
+ .on('error', function (err) {
+ assert(/invalid metadata/.test(err.message));
+ done();
+ });
+ ptcl.createEmitter(transports[0])
+ .on('handshake', function () {
+ // Handshake is complete now.
+ var writable = transports[0].writable;
+ writable.write(frame(new Buffer([0]))); // Empty metadata.
+ writable.write(frame(new Buffer(0)));
+ });
+ });
+
+ test('unknown message', function (done) {
+ var ptcl = createProtocol({
+ protocol: 'Heartbeat',
+ messages: {beat: {request: [], response: 'boolean'}}
+ });
+ var transports = createPassthroughTransports();
+ var ee = ptcl.createListener(transports[1])
+ .on('eot', function () {
+ transports[1].writable.end();
+ });
+ ptcl.createEmitter(transports[0])
+ .on('handshake', function () {
+ // Handshake is complete now.
+ this.destroy();
+ var idType = ee._idType;
+ var bufs = [];
+ transports[0].readable
+ .pipe(new protocols.streams.MessageDecoder())
+ .on('data', function (buf) { bufs.push(buf); })
+ .on('end', function () {
+ assert.equal(bufs.length, 1);
+ var tap = new utils.Tap(bufs[0]);
+ idType._read(tap);
+ assert(tap.buf[tap.pos++]); // Error byte.
+ tap.pos++; // Union marker.
+ assert(/unknown message/.test(tap.readString()));
+ done();
+ });
+ [
+ idType.toBuffer(-1),
+ new Buffer([4, 104, 105]), // `hi` message.
+ new Buffer(0) // End of frame.
+ ].forEach(function (buf) {
+ transports[0].writable.write(frame(buf));
+ });
+ transports[0].writable.end();
+ });
+ });
+
+ test('invalid request', function (done) {
+ var ptcl = createProtocol({
+ protocol: 'Heartbeat',
+ messages: {beat: {
+ request: [{name: 'id', type: 'string'}],
+ response: 'boolean'
+ }}
+ });
+ var transports = createPassthroughTransports();
+ var ee = ptcl.createListener(transports[1])
+ .on('eot', function () { transports[1].writable.end(); });
+ ptcl.createEmitter(transports[0])
+ .on('handshake', function () {
+ // Handshake is complete now.
+ this.destroy();
+ var idType = ee._idType;
+ var bufs = [];
+ transports[0].readable
+ .pipe(new protocols.streams.MessageDecoder())
+ .on('data', function (buf) { bufs.push(buf); })
+ .on('end', function () {
+ assert.equal(bufs.length, 1);
+ var tap = new utils.Tap(bufs[0]);
+ idType._read(tap);
+ assert.equal(tap.buf[tap.pos++], 1); // Error byte.
+ assert.equal(tap.buf[tap.pos++], 0); // Union marker.
+ assert(/invalid request/.test(tap.readString()));
+ done();
+ });
+ [
+ idType.toBuffer(-1),
+ new Buffer([8, 98, 101, 97, 116]), // `beat` message.
+ new Buffer([8]), // Invalid Avro string encoding.
+ new Buffer(0) // End of frame.
+ ].forEach(function (buf) {
+ transports[0].writable.write(frame(buf));
+ });
+ transports[0].writable.end();
+ });
+ });
+
+ test('destroy', function (done) {
+ var ptcl = createProtocol({
+ protocol: 'Heartbeat',
+ messages: {beat: {request: [], response: 'boolean'}}
+ }).on('beat', function (req, ee, cb) {
+ ee.destroy();
+ setTimeout(function () { cb(null, true); }, 10);
+ });
+ var transports = createPassthroughTransports();
+ var responded = false;
+ ptcl.createListener(transports[1])
+ .on('eot', function () {
+ assert(responded); // Works because the transport is sync.
+ done();
+ });
+ ptcl.emit('beat', {}, ptcl.createEmitter(transports[0]), function () {
+ responded = true;
+ });
+ });
+
+ });
+
+ suite('StatelessListener', function () {
+
+ test('unknown message', function (done) {
+ var ptcl = createProtocol({
+ protocol: 'Heartbeat',
+ messages: {beat: {request: [], response: 'boolean'}}
+ });
+ var readable = new stream.PassThrough();
+ var writable = new stream.PassThrough();
+ var ee = ptcl.createListener(function (cb) {
+ cb(writable);
+ return readable;
+ });
+ var bufs = [];
+ writable.pipe(new protocols.streams.MessageDecoder())
+ .on('data', function (buf) { bufs.push(buf); })
+ .on('end', function () {
+ assert.equal(bufs.length, 1);
+ var tap = new utils.Tap(bufs[0]);
+ tap.pos = 4; // Skip handshake response.
+ ee._idType._read(tap); // Skip metadata.
+ assert.equal(tap.buf[tap.pos++], 1); // Error.
+ assert.equal(tap.buf[tap.pos++], 0); // Union flag.
+ assert(/unknown message/.test(tap.readString()));
+ done();
+ });
+ var hash = new Buffer(ptcl._hashString, 'binary');
+ var req = {
+ clientHash: hash,
+ clientProtocol: null,
+ serverHash: hash
+ };
+ var encoder = new protocols.streams.MessageEncoder(64);
+ encoder.pipe(readable);
+ encoder.end(Buffer.concat([
+ HANDSHAKE_REQUEST_TYPE.toBuffer(req),
+ new Buffer([0]), // Empty metadata.
+ new Buffer([4, 104, 105]) // `id` message.
+ ]));
+ });
+
+ test('late writable', function (done) {
+ var ptcl = createProtocol({
+ protocol: 'Heartbeat',
+ messages: {beat: {request: [], response: 'boolean'}}
+ }).on('beat', function (req, ee, cb) {
+ cb(null, true);
+ });
+ var readable = new stream.PassThrough();
+ var writable = new stream.PassThrough();
+ ptcl.createListener(function (cb) {
+ setTimeout(function () { cb(readable); }, 10);
+ return writable;
+ });
+ var ee = ptcl.createEmitter(function (cb) {
+ cb(readable);
+ return writable;
+ });
+ ptcl.emit('beat', {}, ee, function (err, res) {
+ assert.strictEqual(err, null);
+ assert.equal(res, true);
+ done();
+ });
+ });
+
+ });
+
+ suite('emit', function () {
+
+ suite('stateful', function () {
+
+ run(function (emitterPtcl, listenerPtcl, cb) {
+ var pt1 = new stream.PassThrough();
+ var pt2 = new stream.PassThrough();
+ var opts = {bufferSize: 48};
+ cb(
+ emitterPtcl.createEmitter({readable: pt1, writable: pt2}, opts),
+ listenerPtcl.createListener({readable: pt2, writable: pt1}, opts)
+ );
+ });
+
+ });
+
+ suite('stateless', function () {
+
+ run(function (emitterPtcl, listenerPtcl, cb) {
+ cb(emitterPtcl.createEmitter(writableFactory));
+
+ function writableFactory(emitterCb) {
+ var reqPt = new stream.PassThrough()
+ .on('finish', function () {
+ listenerPtcl.createListener(function (listenerCb) {
+ var resPt = new stream.PassThrough()
+ .on('finish', function () { emitterCb(resPt); });
+ listenerCb(resPt);
+ return reqPt;
+ });
+ });
+ return reqPt;
+ }
+ });
+
+ });
+
+ function run(setupFn) {
+
+ test('single', function (done) {
+ var ptcl = createProtocol({
+ protocol: 'Math',
+ messages: {
+ negate: {
+ request: [{name: 'n', type: 'int'}],
+ response: 'int'
+ }
+ }
+ });
+ setupFn(ptcl, ptcl, function (ee) {
+ ee.on('eot', function () { done(); });
+ ptcl.on('negate', function (req, ee, cb) { cb(null, -req.n); });
+ ptcl.emit('negate', {n: 20}, ee, function (err, res) {
+ assert.equal(this, ptcl);
+ assert.strictEqual(err, null);
+ assert.equal(res, -20);
+ this.emit('negate', {n: 'hi'}, ee, function (err) {
+ assert(/invalid "int"/.test(err.string));
+ ee.destroy();
+ });
+ });
+ });
+ });
+
+ test('invalid request', function (done) {
+ var ptcl = createProtocol({
+ protocol: 'Math',
+ messages: {
+ negate: {
+ request: [{name: 'n', type: 'int'}],
+ response: 'int'
+ }
+ }
+ }).on('negate', function () { assert(false); });
+ setupFn(ptcl, ptcl, function (ee) {
+ ee.on('eot', function () { done(); });
+ ptcl.emit('negate', {n: 'a'}, ee, function (err) {
+ assert(/invalid "int"/.test(err.string), null);
+ ee.destroy();
+ });
+ });
+ });
+
+ test('error response', function (done) {
+ var msg = 'must be non-negative';
+ var ptcl = createProtocol({
+ protocol: 'Math',
+ messages: {
+ sqrt: {
+ request: [{name: 'n', type: 'float'}],
+ response: 'float'
+ }
+ }
+ }).on('sqrt', function (req, ee, cb) {
+ var n = req.n;
+ if (n < 0) {
+ cb({string: msg});
+ } else {
+ cb(null, Math.sqrt(n));
+ }
+ });
+ setupFn(ptcl, ptcl, function (ee) {
+ ptcl.emit('sqrt', {n: 100}, ee, function (err, res) {
+ assert(Math.abs(res - 10) < 1e-5);
+ ptcl.emit('sqrt', {n: - 10}, ee, function (err) {
+ assert.equal(this, ptcl);
+ assert.equal(err.string, msg);
+ done();
+ });
+ });
+ });
+ });
+
+ test('invalid response', function (done) {
+ var ptcl = createProtocol({
+ protocol: 'Math',
+ messages: {
+ sqrt: {
+ request: [{name: 'n', type: 'float'}],
+ response: 'float'
+ }
+ }
+ }).on('sqrt', function (req, ee, cb) {
+ var n = req.n;
+ if (n < 0) {
+ cb(null, 'complex'); // Invalid response.
+ } else {
+ cb(null, Math.sqrt(n));
+ }
+ });
+ setupFn(ptcl, ptcl, function (ee) {
+ ptcl.emit('sqrt', {n: - 10}, ee, function (err) {
+ // The server error message is propagated to the client.
+ assert(/invalid "float"/.test(err.string));
+ ptcl.emit('sqrt', {n: 100}, ee, function (err, res) {
+ // And the server doesn't die (we can make a new request).
+ assert(Math.abs(res - 10) < 1e-5);
+ done();
+ });
+ });
+ });
+ });
+
+ test('invalid error', function (done) {
+ var ptcl = createProtocol({
+ protocol: 'Math',
+ messages: {
+ sqrt: {
+ request: [{name: 'n', type: 'float'}],
+ response: 'float'
+ }
+ }
+ }).on('sqrt', function (req, ee, cb) {
+ var n = req.n;
+ if (n < 0) {
+ cb({error: 'complex'}); // Invalid error.
+ } else {
+ cb(null, Math.sqrt(n));
+ }
+ });
+ setupFn(ptcl, ptcl, function (ee) {
+ ptcl.emit('sqrt', {n: - 10}, ee, function (err) {
+ assert(/invalid \["string"\]/.test(err.string));
+ ptcl.emit('sqrt', {n: 100}, ee, function (err, res) {
+ // The server still doesn't die (we can make a new request).
+ assert(Math.abs(res - 10) < 1e-5);
+ done();
+ });
+ });
+ });
+ });
+
+ test('out of order', function (done) {
+ var ptcl = createProtocol({
+ protocol: 'Delay',
+ messages: {
+ wait: {
+ request: [
+ {name: 'ms', type: 'float'},
+ {name: 'id', type: 'string'}
+ ],
+ response: 'string'
+ }
+ }
+ }).on('wait', function (req, ee, cb) {
+ var delay = req.ms;
+ if (delay < 0) {
+ cb(new Error('delay must be non-negative'));
+ return;
+ }
+ setTimeout(function () { cb(null, req.id); }, delay);
+ });
+ var ids = [];
+ setupFn(ptcl, ptcl, function (ee) {
+ ee.on('eot', function (pending) {
+ assert.equal(pending, 0);
+ assert.deepEqual(ids, [null, 'b', 'a']);
+ done();
+ });
+ ptcl.emit('wait', {ms: 100, id: 'a'}, ee, function (err, res) {
+ assert.strictEqual(err, null);
+ ids.push(res);
+ });
+ ptcl.emit('wait', {ms: 10, id: 'b'}, ee, function (err, res) {
+ assert.strictEqual(err, null);
+ ids.push(res);
+ ee.destroy();
+ });
+ ptcl.emit('wait', {ms: -100, id: 'c'}, ee, function (err, res) {
+ assert(/non-negative/.test(err.string));
+ ids.push(res);
+ });
+ });
+ });
+
+ test('compatible protocols', function (done) {
+ var emitterPtcl = createProtocol({
+ protocol: 'emitterProtocol',
+ messages: {
+ age: {
+ request: [{name: 'name', type: 'string'}],
+ response: 'long'
+ }
+ }
+ });
+ var listenerPtcl = createProtocol({
+ protocol: 'serverProtocol',
+ messages: {
+ age: {
+ request: [
+ {name: 'name', type: 'string'},
+ {name: 'address', type: ['null', 'string'], 'default': null}
+ ],
+ response: 'int'
+ },
+ id: {
+ request: [{name: 'name', type: 'string'}],
+ response: 'long'
+ }
+ }
+ });
+ setupFn(
+ emitterPtcl,
+ listenerPtcl,
+ function (ee) {
+ listenerPtcl.on('age', function (req, ee, cb) {
+ assert.equal(req.name, 'Ann');
+ cb(null, 23);
+ });
+ emitterPtcl.emit('age', {name: 'Ann'}, ee, function (err, res) {
+ assert.strictEqual(err, null);
+ assert.equal(res, 23);
+ done();
+ });
+ }
+ );
+ });
+
+ test('cached compatible protocols', function (done) {
+ var ptcl1 = createProtocol({
+ protocol: 'emitterProtocol',
+ messages: {
+ age: {
+ request: [{name: 'name', type: 'string'}],
+ response: 'long'
+ }
+ }
+ });
+ var ptcl2 = createProtocol({
+ protocol: 'serverProtocol',
+ messages: {
+ age: {
+ request: [
+ {name: 'name', type: 'string'},
+ {name: 'address', type: ['null', 'string'], 'default': null}
+ ],
+ response: 'int'
+ },
+ id: {
+ request: [{name: 'name', type: 'string'}],
+ response: 'long'
+ }
+ }
+ }).on('age', function (req, ee, cb) { cb(null, 48); });
+ setupFn(
+ ptcl1,
+ ptcl2,
+ function (ee1) {
+ ptcl1.emit('age', {name: 'Ann'}, ee1, function (err, res) {
+ assert.equal(res, 48);
+ setupFn(
+ ptcl1,
+ ptcl2,
+ function (ee2) { // ee2 has the server's protocol.
+ ptcl1.emit('age', {name: 'Bob'}, ee2, function (err, res) {
+ assert.equal(res, 48);
+ done();
+ });
+ }
+ );
+ });
+ }
+ );
+ });
+
+ test('incompatible protocols', function (done) {
+ var emitterPtcl = createProtocol({
+ protocol: 'emitterProtocol',
+ messages: {
+ age: {request: [{name: 'name', type: 'string'}], response: 'long'}
+ }
+ });
+ var listenerPtcl = createProtocol({
+ protocol: 'serverProtocol',
+ messages: {
+ age: {request: [{name: 'name', type: 'int'}], response: 'long'}
+ }
+ }).on('age', function (req, ee, cb) { cb(null, 0); });
+ setupFn(
+ emitterPtcl,
+ listenerPtcl,
+ function (ee) {
+ ee.on('error', function () {}); // For stateful protocols.
+ emitterPtcl.emit('age', {name: 'Ann'}, ee, function (err) {
+ assert(err);
+ done();
+ });
+ }
+ );
+ });
+
+ test('unknown message', function (done) {
+ var ptcl = createProtocol({protocol: 'Empty'});
+ setupFn(ptcl, ptcl, function (ee) {
+ ptcl.emit('echo', {}, ee, function (err) {
+ assert(/unknown/.test(err.string));
+ done();
+ });
+ });
+ });
+
+ test('unsupported message', function (done) {
+ var ptcl = createProtocol({
+ protocol: 'Echo',
+ messages: {
+ echo: {
+ request: [{name: 'id', type: 'string'}],
+ response: 'string'
+ }
+ }
+ });
+ setupFn(ptcl, ptcl, function (ee) {
+ ptcl.emit('echo', {id: ''}, ee, function (err) {
+ assert(/unsupported/.test(err.string));
+ done();
+ });
+ });
+ });
+
+ test('destroy emitter noWait', function (done) {
+ var ptcl = createProtocol({
+ protocol: 'Delay',
+ messages: {
+ wait: {
+ request: [{name: 'ms', type: 'int'}],
+ response: 'string'
+ }
+ }
+ }).on('wait', function (req, ee, cb) {
+ setTimeout(function () { cb(null, 'ok'); }, req.ms);
+ });
+ var interrupted = 0;
+ var eoted = false;
+ setupFn(ptcl, ptcl, function (ee) {
+ ee.on('eot', function (pending) {
+ eoted = true;
+ assert.equal(interrupted, 2);
+ assert.equal(pending, 2);
+ done();
+ });
+ ptcl.emit('wait', {ms: 75}, ee, interruptedCb);
+ ptcl.emit('wait', {ms: 50}, ee, interruptedCb);
+ ptcl.emit('wait', {ms: 10}, ee, function (err, res) {
+ assert.equal(res, 'ok');
+ ee.destroy(true);
+ });
+
+ function interruptedCb(err) {
+ assert(/interrupted/.test(err.string));
+ interrupted++;
+ }
+ });
+ });
+
+ test('destroy emitter', function (done) {
+ var ptcl = createProtocol({
+ protocol: 'Math',
+ messages: {
+ negate: {
+ request: [{name: 'n', type: 'int'}],
+ response: 'int'
+ }
+ }
+ });
+ setupFn(ptcl, ptcl, function (ee) {
+ ptcl.on('negate', function (req, ee, cb) { cb(null, -req.n); });
+ ptcl.emit('negate', {n: 20}, ee, function (err, res) {
+ assert.strictEqual(err, null);
+ assert.equal(res, -20);
+ ee.destroy();
+ this.emit('negate', {n: 'hi'}, ee, function (err) {
+ assert(/destroyed/.test(err.string));
+ done();
+ });
+ });
+ });
+ });
+
+ }
+
+ });
+
+ test('throw error', function () {
+ assert(!tryCatch(null));
+ assert.equal(tryCatch(new Error('hi')), 'hi');
+ assert.equal(tryCatch('hi'), 'hi');
+ assert.equal(tryCatch({string: 'hi'}), 'hi');
+
+ function tryCatch(err) {
+ try {
+ protocols.throwError(err);
+ } catch (err_) {
+ return err_.message;
+ }
+ }
+ });
+
+});
+
+// Helpers.
+
+// Message framing.
+function frame(buf) {
+ var framed = new Buffer(buf.length + 4);
+ framed.writeInt32BE(buf.length);
+ buf.copy(framed, 4);
+ return framed;
+}
+
+function createReadableTransport(bufs, frameSize) {
+ return createReadableStream(bufs)
+ .pipe(new protocols.streams.MessageEncoder(frameSize || 64));
+}
+
+function createWritableTransport(bufs) {
+ var decoder = new protocols.streams.MessageDecoder();
+ decoder.pipe(createWritableStream(bufs));
+ return decoder;
+}
+
+function createTransport(readBufs, writeBufs) {
+ return toDuplex(
+ createReadableTransport(readBufs),
+ createWritableTransport(writeBufs)
+ );
+}
+
+function createPassthroughTransports() {
+ var pt1 = stream.PassThrough();
+ var pt2 = stream.PassThrough();
+ return [{readable: pt1, writable: pt2}, {readable: pt2, writable: pt1}];
+}
+
+// Simplified stream constructor API isn't available in earlier node versions.
+
+function createReadableStream(bufs) {
+ var n = 0;
+ function Stream() { stream.Readable.call(this); }
+ util.inherits(Stream, stream.Readable);
+ Stream.prototype._read = function () {
+ this.push(bufs[n++] || null);
+ };
+ var readable = new Stream();
+ return readable;
+}
+
+function createWritableStream(bufs) {
+ function Stream() { stream.Writable.call(this); }
+ util.inherits(Stream, stream.Writable);
+ Stream.prototype._write = function (buf, encoding, cb) {
+ bufs.push(buf);
+ cb();
+ };
+ return new Stream();
+}
+
+// Combine two (binary) streams into a single duplex one. This is very basic
+// and doesn't handle a lot of cases (e.g. where `_read` doesn't return
+// something).
+function toDuplex(readable, writable) {
+ function Stream() {
+ stream.Duplex.call(this);
+ this.on('finish', function () { writable.end(); });
+ }
+ util.inherits(Stream, stream.Duplex);
+ Stream.prototype._read = function () {
+ this.push(readable.read());
+ };
+ Stream.prototype._write = function (buf, encoding, cb) {
+ writable.write(buf);
+ cb();
+ };
+ return new Stream();
+}
http://git-wip-us.apache.org/repos/asf/avro/blob/9101a42b/lang/js/test/test_schemas.js
----------------------------------------------------------------------
diff --git a/lang/js/test/test_schemas.js b/lang/js/test/test_schemas.js
index cd9a727..b6d197b 100644
--- a/lang/js/test/test_schemas.js
+++ b/lang/js/test/test_schemas.js
@@ -1926,6 +1926,17 @@ suite('types', function () {
assert.deepEqual(person.age, 12);
assert.deepEqual(person.time, date);
assert.throws(function () { derived.toBuffer({age: -1, date: date}); });
+
+ var invalid = {age: -1, time: date};
+ assert.throws(function () { derived.toBuffer(invalid); });
+ var hasError = false;
+ derived.isValid(invalid, {errorHook: function (path, any, type) {
+ hasError = true;
+ assert.deepEqual(path, ['age']);
+ assert.equal(any, -1);
+ assert(type instanceof AgeType);
+ }});
+ assert(hasError);
});
test('recursive', function () {
[2/2] avro git commit: AVRO-1778. JavaScript: Add IPC/RPC support.
Posted by mt...@apache.org.
AVRO-1778. JavaScript: Add IPC/RPC support.
This commit adds protocols to the JavaScript implementation.
The API was designed to:
+ Be simple and idiomatic. The `Protocol` class added here is heavily
inspired by node.js' core `EventEmitter` to keep things as familiar as
possible. Getting a client and server working is straightforward
and requires very few lines of code.
+ Support arbitrary transports, both stateful and stateless. Built-in
node.js streams are supported out of the box (e.g. TCP/UNIX sockets,
or even stdin/stdout). Exchanging messages over a custom transport
requires implementing a single simple function.
+ Work both server-side and in the browser!
Project: http://git-wip-us.apache.org/repos/asf/avro/repo
Commit: http://git-wip-us.apache.org/repos/asf/avro/commit/9101a42b
Tree: http://git-wip-us.apache.org/repos/asf/avro/tree/9101a42b
Diff: http://git-wip-us.apache.org/repos/asf/avro/diff/9101a42b
Branch: refs/heads/master
Commit: 9101a42badd32f0540caa28e27ca053a36e936e3
Parents: a50068f
Author: Matthieu Monsch <mt...@apache.org>
Authored: Sat Feb 6 10:48:34 2016 -0800
Committer: Matthieu Monsch <mt...@apache.org>
Committed: Sat Feb 6 10:51:21 2016 -0800
----------------------------------------------------------------------
CHANGES.txt | 2 +
lang/js/build.sh | 2 +-
lang/js/doc/API.md | 189 +++++
lang/js/doc/Advanced-usage.md | 124 ++++
lang/js/lib/files.js | 8 +-
lang/js/lib/index.js | 3 +
lang/js/lib/protocols.js | 1271 ++++++++++++++++++++++++++++++++
lang/js/lib/schemas.js | 62 +-
lang/js/lib/utils.js | 24 +
lang/js/package.json | 5 +-
lang/js/test/mocha.opts | 2 +
lang/js/test/test_files.js | 8 +-
lang/js/test/test_protocols.js | 1392 +++++++++++++++++++++++++++++++++++
lang/js/test/test_schemas.js | 11 +
14 files changed, 3073 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/avro/blob/9101a42b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 47d99bc..b23636e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -13,6 +13,8 @@ Trunk (not yet released)
AVRO-1793. Python2: Retain stack trace and original exception when failing
to parse schemas. Contributed by Jakob Homan (jghoman).
+ AVRO-1778: JavaScript: Add IPC/RPC support. (mtth)
+
BUG FIXES
http://git-wip-us.apache.org/repos/asf/avro/blob/9101a42b/lang/js/build.sh
----------------------------------------------------------------------
diff --git a/lang/js/build.sh b/lang/js/build.sh
index c551f9d..08823fc 100755
--- a/lang/js/build.sh
+++ b/lang/js/build.sh
@@ -22,7 +22,7 @@ cd `dirname "$0"`
case "$1" in
test)
npm install
- npm test
+ npm run cover
;;
dist)
npm pack
http://git-wip-us.apache.org/repos/asf/avro/blob/9101a42b/lang/js/doc/API.md
----------------------------------------------------------------------
diff --git a/lang/js/doc/API.md b/lang/js/doc/API.md
index a6dd63e..2639ea1 100644
--- a/lang/js/doc/API.md
+++ b/lang/js/doc/API.md
@@ -21,6 +21,7 @@ limitations under the License.
+ [Avro types](#avro-types)
+ [Records](#records)
+ [Files and streams](#files-and-streams)
++ [IPC & RPC](#ipc--rpc)
## Parsing schemas
@@ -758,9 +759,197 @@ The encoding equivalent of `RawDecoder`.
+ `data` {Buffer} Serialized bytes.
+# IPC & RPC
+
+Avro also defines a way of executing remote procedure calls. We expose this via
+an API modeled after node.js' core [`EventEmitter`][event-emitter].
+
+#### Class `Protocol`
+
+`Protocol` instances are obtained by [`parse`](#parseschema-opts)-ing a
+[protocol declaration][protocol-declaration] and provide a way of sending
+remote messages (for example to another machine, or another process on the same
+machine). For this reason, instances of this class are very similar to
+`EventEmitter`s, exposing both [`emit`](#protocolemitname-req-emitter-cb) and
+[`on`](#protocolonname-handler) methods.
+
+Being able to send remote messages (and to do so efficiently) introduces a few
+differences however:
+
++ The types used in each event (for both the emitted message and its response)
+ must be defined upfront in the protocol's declaration.
++ The arguments emitted with each event must match the ones defined in the
+ protocol. Similarly, handlers are guaranteed to be called only with these
+ matching arguments.
++ Events are one-to-one: they have exactly one response (unless they are
+ declared as one-way, in which case they have none).
+
+##### `protocol.emit(name, req, emitter, cb)`
+
++ `name` {String} Name of the message to emit. If this message is sent to a
+ `Protocol` instance with no handler defined for this name, an "unsupported
+ message" error will be returned.
++ `req` {Object} Request value, must correspond to the message's declared
+ request type.
++ `emitter` {MessageEmitter} Emitter used to send the message. See
+ [`createEmitter`](#protocolcreateemittertransport-opts-cb) for how to obtain
+ one.
++ `cb(err, res)` {Function} Function called with the remote call's response
+ (and eventual error) when available. This can be omitted when the message is
+ one way.
+
+Send a message. This is always done asynchronously.
+
+##### `protocol.on(name, handler)`
+
++ `name` {String} Message name to add the handler for. An error will be thrown
+ if this name isn't defined in the protocol. At most one handler can exist for
+ a given name (any previously defined handler will be overwritten).
++ `handler(req, listener, cb)` {Function} Handler, called each time a message
+ with matching name is received. The `listener` argument will be the
+ corresponding `MessageListener` instance. The final callback argument
+ `cb(err, res)` should be called to send the response back to the emitter
+ (except when the message is one way, in which case `cb` will be `undefined`).
+
+Add a handler for a given message.
+
+##### `protocol.createEmitter(transport, [opts,] [cb])`
+
++ `transport` {Duplex|Object|Function} The transport used to communicate with
+ the remote listener. Multiple argument types are supported, see below.
++ `opts` {Object} Options.
+ + `IdType` {LogicalType} Metadata logical type.
+ + `bufferSize` {Number} Internal serialization buffer size (in bytes).
+ Defaults to 2048.
+ + `frameSize` {Number} Size used when [framing messages][framing-messages].
+ Defaults to 2048.
++ `cb(pending)` {Function} End of transmission callback.
+
+Generate a [`MessageEmitter`](#class-messageemitter) for this protocol. This
+emitter can then be used to communicate with a remote server of compatible
+protocol.
+
+There are two major types of transports:
+
++ Stateful
+
+ A pair of binary streams `{readable, writable}`.
+
+ As a convenience passing a single duplex stream is also supported and
+ equivalent to passing `{readable: duplex, writable: duplex}`.
+
++ Stateless
+
+ Stream factory `fn(cb)` which should return a writable stream and call its
+ callback argument with a readable stream (when available).
+
+##### `protocol.createListener(transport, [opts,] [cb])`
+
++ `transport` {Duplex|Object|Function} Similar to [`createEmitter`](#)'s
+ corresponding argument, except that readable and writable roles are reversed
+ for stateless transports.
++ `opts` {Object} Identical to `createEmitter`'s options.
+ + `IdType` {LogicalType} Metadata logical type.
+ + `bufferSize` {Number} Internal serialization buffer size (in bytes).
+ Defaults to 2048.
+ + `frameSize` {Number} Size used when [framing messages][framing-messages].
+ Defaults to 2048.
++ `cb(pending)` {Function} End of transmission callback.
+
+Generate a [`MessageListener`](#class-messagelistener) for this protocol. This
+listener can be used to respond to messages emitted from compatible protocols.
+
+##### `protocol.subprotocol()`
+
+Returns a copy of the original protocol, which inherits all its handlers.
+
+##### `protocol.getMessages()`
+
+Retrieve all the messages defined in the protocol. Each message is an object
+with the following (read-only) properties:
+
++ `name` {String}
++ `requestType` {Type}
++ `responseType` {Type}
++ `errorType` {Type}
++ `oneWay` {Boolean}
+
+##### `protocol.getName()`
+
+Returns the protocol's fully qualified name.
+
+##### `protocol.getType(name)`
+
++ `name` {String} A type's fully qualified name.
+
+Convenience function to retrieve a type defined inside this protocol. Returns
+`undefined` if no type exists for the given name.
+
+
+#### Class `MessageEmitter`
+
+Instance of this class are [`EventEmitter`s][event-emitter], with the following
+events:
+
+##### Event `'handshake'`
+
++ `request` {Object} Handshake request.
++ `response` {Object} Handshake response.
+
+Emitted when the server's handshake response is received.
+
+##### Event `'eot'`
+
++ `pending` {Number} Number of interrupted requests. This will always be zero,
+ unless the emitter was destroyed with `noWait` set.
+
+End of transmission event, emitted after the client is destroyed and there are
+no more pending requests.
+
+##### `emitter.destroy([noWait])`
+
++ `noWait` {Boolean} Cancel any pending requests. By default pending requests
+ will still be honored.
+
+Disable the emitter.
+
+
+#### Class `MessageListener`
+
+Listeners are the receiving-side equivalent of `MessageEmitter`s and are also
+[`EventEmitter`s][event-emitter], with the following events:
+
+##### Event `'handshake'`
+
++ `request` {Object} Handshake request.
++ `response` {Object} Handshake response.
+
+Emitted right before the server sends a handshake response.
+
+##### Event `'eot'`
+
++ `pending` {Number} Number of cancelled pending responses. This will always be
+ zero, unless the listener was destroyed with `noWait` set.
+
+End of transmission event, emitted after the listener is destroyed and there are
+no more responses to send.
+
+##### `listener.destroy([noWait])`
+
++ `noWait` {Boolean} Don't wait for all pending responses to have been sent.
+
+Disable this listener and release underlying streams. In general you shouldn't
+need to call this: stateless listeners will be destroyed automatically when a
+response is sent, and stateful listeners are best destroyed from the client's
+side.
+
+
[canonical-schema]: https://avro.apache.org/docs/current/spec.html#Parsing+Canonical+Form+for+Schemas
[schema-resolution]: https://avro.apache.org/docs/current/spec.html#Schema+Resolution
[sort-order]: https://avro.apache.org/docs/current/spec.html#order
[fingerprint]: https://avro.apache.org/docs/current/spec.html#Schema+Fingerprints
[custom-long]: Advanced-usage#custom-long-types
[logical-types]: Advanced-usage#logical-types
+[framing-messages]: https://avro.apache.org/docs/current/spec.html#Message+Framing
+[event-emitter]: https://nodejs.org/api/events.html#events_class_events_eventemitter
+[protocol-declaration]: https://avro.apache.org/docs/current/spec.html#Protocol+Declaration
http://git-wip-us.apache.org/repos/asf/avro/blob/9101a42b/lang/js/doc/Advanced-usage.md
----------------------------------------------------------------------
diff --git a/lang/js/doc/Advanced-usage.md b/lang/js/doc/Advanced-usage.md
index 23cab5f..4344bb4 100644
--- a/lang/js/doc/Advanced-usage.md
+++ b/lang/js/doc/Advanced-usage.md
@@ -20,6 +20,7 @@ limitations under the License.
+ [Schema evolution](#schema-evolution)
+ [Logical types](#logical-types)
+ [Custom long types](#custom-long-types)
++ [Remote procedure calls](#remote-procedure-calls)
## Schema evolution
@@ -352,8 +353,131 @@ and unpacking routine (for example when using a native C++ addon), we can
disable this behavior by setting `LongType.using`'s `noUnpack` argument to
`true`.
+
+# Remote procedure calls
+
+`avro-js` provides an efficient and "type-safe" API for communicating with
+remote node processes via [`Protocol`s](Api#class-protocol).
+
+To enable this, we first declare the types involved inside an [Avro
+protocol][protocol-declaration]. For example, consider the following simple
+protocol which supports two calls (saved as `./math.avpr`):
+
+```json
+{
+ "protocol": "Math",
+ "doc": "A sample interface for performing math.",
+ "messages": {
+ "multiply": {
+ "doc": "A call for multiplying doubles.",
+ "request": [
+ {"name": "numbers", "type": {"type": "array", "items": "double"}}
+ ],
+ "response": "double"
+ },
+ "add": {
+ "doc": "A call which adds integers, optionally after some delay.",
+ "request": [
+ {"name": "numbers", "type": {"type": "array", "items": "int"}},
+ {"name": "delay", "type": "float", "default": 0}
+ ],
+ "response": "int"
+ }
+ }
+}
+```
+
+Servers and clients then share the same protocol and respectively:
+
++ Implement interface calls (servers):
+
+ ```javascript
+ var protocol = avro.parse('./math.avpr')
+ .on('add', function (req, ee, cb) {
+ var sum = req.numbers.reduce(function (agg, el) { return agg + el; }, 0);
+ setTimeout(function () { cb(null, sum); }, 1000 * req.delay);
+ })
+ .on('multiply', function (req, ee, cb) {
+ var prod = req.numbers.reduce(function (agg, el) { return agg * el; }, 1);
+ cb(null, prod);
+ });
+ ```
+
++ Call the interface (clients):
+
+ ```javascript
+ var protocol = avro.parse('./math.avpr');
+ var ee; // Message emitter, see below for various instantiation examples.
+
+ protocol.emit('add', {numbers: [1, 3, 5], delay: 2}, ee, function (err, res) {
+ console.log(res); // 9!
+ });
+ protocol.emit('multiply', {numbers: [4, 2]}, ee, function (err, res) {
+ console.log(res); // 8!
+ });
+ ```
+
+`avro-js` supports communication between any two node processes connected by
+binary streams. See below for a few different common use-cases.
+
+## Persistent streams
+
+E.g. UNIX sockets, TCP sockets, WebSockets, (and even stdin/stdout).
+
+### Client
+
+```javascript
+var net = require('net');
+
+var ee = protocol.createEmitter(net.createConnection({port: 8000}));
+```
+
+### Server
+
+```javascript
+var net = require('net');
+
+net.createServer()
+ .on('connection', function (con) { protocol.createListener(con); })
+ .listen(8000);
+```
+
+## Transient streams
+
+For example HTTP requests/responses.
+
+### Client
+
+```javascript
+var http = require('http');
+
+var ee = protocol.createEmitter(function (cb) {
+ return http.request({
+ port: 3000,
+ headers: {'content-type': 'avro/binary'},
+ method: 'POST'
+ }).on('response', function (res) { cb(res); });
+});
+```
+
+### Server
+
+Using [express][] for example:
+
+```javascript
+var app = require('express')();
+
+app.post('/', function (req, res) {
+ protocol.createListener(function (cb) { cb(res); return req; });
+});
+
+app.listen(3000);
+```
+
+
[parse-api]: API#parseschema-opts
[create-resolver-api]: API#typecreateresolverwritertype
[logical-type-api]: API#class-logicaltypeattrs-opts-types
[decimal-type]: https://avro.apache.org/docs/current/spec.html#Decimal
[schema-resolution]: https://avro.apache.org/docs/current/spec.html#Schema+Resolution
+[protocol-declaration]: https://avro.apache.org/docs/current/spec.html#Protocol+Declaration
http://git-wip-us.apache.org/repos/asf/avro/blob/9101a42b/lang/js/lib/files.js
----------------------------------------------------------------------
diff --git a/lang/js/lib/files.js b/lang/js/lib/files.js
index b9c6da0..c96da68 100644
--- a/lang/js/lib/files.js
+++ b/lang/js/lib/files.js
@@ -21,7 +21,8 @@
'use strict';
-var schemas = require('./schemas'),
+var protocols = require('./protocols'),
+ schemas = require('./schemas'),
utils = require('./utils'),
fs = require('fs'),
stream = require('stream'),
@@ -66,7 +67,10 @@ var Tap = utils.Tap;
*
*/
function parse(schema, opts) {
- return schemas.createType(loadSchema(schema), opts);
+ var attrs = loadSchema(schema);
+ return attrs.protocol ?
+ protocols.createProtocol(attrs, opts) :
+ schemas.createType(attrs, opts);
}
http://git-wip-us.apache.org/repos/asf/avro/blob/9101a42b/lang/js/lib/index.js
----------------------------------------------------------------------
diff --git a/lang/js/lib/index.js b/lang/js/lib/index.js
index 0eab2ad..666dd56 100644
--- a/lang/js/lib/index.js
+++ b/lang/js/lib/index.js
@@ -29,11 +29,14 @@
*/
var files = require('./files'),
+ protocols = require('./protocols'),
schemas = require('./schemas'),
deprecated = require('../etc/deprecated/validator');
module.exports = {
+ Type: schemas.Type,
+ Protocol: protocols.Protocol,
parse: files.parse,
createFileDecoder: files.createFileDecoder,
createFileEncoder: files.createFileEncoder,
http://git-wip-us.apache.org/repos/asf/avro/blob/9101a42b/lang/js/lib/protocols.js
----------------------------------------------------------------------
diff --git a/lang/js/lib/protocols.js b/lang/js/lib/protocols.js
new file mode 100644
index 0000000..6344f47
--- /dev/null
+++ b/lang/js/lib/protocols.js
@@ -0,0 +1,1271 @@
+/* jshint node: true */
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+'use strict';
+
+/**
+ * This module implements Avro's IPC/RPC logic.
+ *
+ * This is done the Node.js way, mimicking the `EventEmitter` class.
+ *
+ */
+
+var schemas = require('./schemas'),
+ utils = require('./utils'),
+ events = require('events'),
+ stream = require('stream'),
+ util = require('util');
+
+
+var BOOLEAN_TYPE = schemas.createType('boolean');
+var STRING_TYPE = schemas.createType('string');
+var SYSTEM_ERROR_TYPE = schemas.createType(['string']);
+
+var HANDSHAKE_REQUEST_TYPE = schemas.createType({
+ namespace: 'org.apache.avro.ipc',
+ name: 'HandshakeRequest',
+ type: 'record',
+ fields: [
+ {name: 'clientHash', type: {name: 'MD5', type: 'fixed', size: 16}},
+ {name: 'clientProtocol', type: ['null', 'string'], 'default': null},
+ {name: 'serverHash', type: 'org.apache.avro.ipc.MD5'},
+ {
+ name: 'meta',
+ type: ['null', {type: 'map', values: 'bytes'}],
+ 'default': null
+ }
+ ]
+});
+
+var HANDSHAKE_RESPONSE_TYPE = schemas.createType({
+ namespace: 'org.apache.avro.ipc',
+ name: 'HandshakeResponse',
+ type: 'record',
+ fields: [
+ {
+ name: 'match',
+ type: {
+ name: 'HandshakeMatch',
+ type: 'enum',
+ symbols: ['BOTH', 'CLIENT', 'NONE']
+ }
+ },
+ {name: 'serverProtocol', type: ['null', 'string'], 'default': null},
+ {
+ name: 'serverHash',
+ type: ['null', {name: 'MD5', type: 'fixed', size: 16}],
+ 'default': null
+ },
+ {
+ name: 'meta',
+ type: ['null', {type: 'map', values: 'bytes'}],
+ 'default': null
+ }
+ ]
+});
+
+var HandshakeRequest = HANDSHAKE_REQUEST_TYPE.getRecordConstructor();
+var HandshakeResponse = HANDSHAKE_RESPONSE_TYPE.getRecordConstructor();
+var Tap = utils.Tap;
+var f = util.format;
+
+
+/**
+ * Protocol generation function.
+ *
+ * This should be used instead of the protocol constructor. The protocol's
+ * constructor performs no logic to better support efficient protocol copy.
+ *
+ */
+function createProtocol(attrs, opts) {
+ opts = opts || {};
+
+ var name = attrs.protocol;
+ if (!name) {
+ throw new Error('missing protocol name');
+ }
+ opts.namespace = attrs.namespace;
+ if (opts.namespace && !~name.indexOf('.')) {
+ name = f('%s.%s', opts.namespace, name);
+ }
+
+ if (attrs.types) {
+ attrs.types.forEach(function (obj) { schemas.createType(obj, opts); });
+ }
+ var messages = {};
+ if (attrs.messages) {
+ Object.keys(attrs.messages).forEach(function (key) {
+ messages[key] = new Message(key, attrs.messages[key], opts);
+ });
+ }
+
+ return new Protocol(name, messages, opts.registry || {});
+}
+
+/**
+ * An Avro protocol.
+ *
+ * It contains a cache for all remote protocols encountered by its emitters and
+ * listeners. Note that a protocol can be listening to multiple listeners at a
+ * given time. This can be a mix of stateful or stateless listeners.
+ *
+ */
+function Protocol(name, messages, types, ptcl) {
+ this._name = name;
+ this._messages = messages;
+ this._types = types;
+ this._parent = ptcl;
+
+ // Cache a string instead of the buffer to avoid retaining an entire slab.
+ this._hashString = utils.getHash(this.toString()).toString('binary');
+
+ // Listener callbacks. Note the prototype used for handlers when this is a
+ // subprotocol. This lets us easily implement the desired fallback behavior.
+ var self = this;
+ this._handlers = Object.create(ptcl ? ptcl._handlers : null);
+ this._onListenerCall = function (name, req, cb) {
+ var handler = self._handlers[name];
+ if (!handler) {
+ cb(new Error(f('unsupported message: %s', name)));
+ } else {
+ handler.call(self, req, this, cb);
+ }
+ };
+
+ // Resolvers are split since we want emitters to still be able to talk to
+ // servers with more messages (which would be incompatible the other way).
+ this._emitterResolvers = ptcl ? ptcl._emitterResolvers : {};
+ this._listenerResolvers = ptcl ? ptcl._listenerResolvers : {};
+}
+
+Protocol.prototype.subprotocol = function () {
+ return new Protocol(this._name, this._messages, this._types, this);
+};
+
+Protocol.prototype.emit = function (name, req, emitter, cb) {
+ cb = cb || throwError; // To provide a more helpful error message.
+
+ if (
+ !(emitter instanceof MessageEmitter) ||
+ emitter._ptcl._hashString !== this._hashString
+ ) {
+ asyncAvroCb(this, cb, 'invalid emitter');
+ return;
+ }
+
+ var message = this._messages[name];
+ if (!message) {
+ asyncAvroCb(this, cb, f('unknown message: %s', name));
+ return;
+ }
+
+ emitter._emit(message, req, cb);
+};
+
+Protocol.prototype.createEmitter = function (transport, opts, cb) {
+ if (!cb && typeof opts == 'function') {
+ cb = opts;
+ opts = undefined;
+ }
+
+ var emitter;
+ if (typeof transport == 'function') {
+ emitter = new StatelessEmitter(this, transport, opts);
+ } else {
+ var readable, writable;
+ if (isStream(transport)) {
+ readable = writable = transport;
+ } else {
+ readable = transport.readable;
+ writable = transport.writable;
+ }
+ emitter = new StatefulEmitter(this, readable, writable, opts);
+ }
+ if (cb) {
+ emitter.once('eot', cb);
+ }
+ return emitter;
+};
+
+Protocol.prototype.on = function (name, handler) {
+ if (!this._messages[name]) {
+ throw new Error(f('unknown message: %s', name));
+ }
+ this._handlers[name] = handler;
+ return this;
+};
+
+Protocol.prototype.createListener = function (transport, opts, cb) {
+ if (!cb && typeof opts == 'function') {
+ cb = opts;
+ opts = undefined;
+ }
+
+ var listener;
+ if (typeof transport == 'function') {
+ listener = new StatelessListener(this, transport, opts);
+ } else {
+ var readable, writable;
+ if (isStream(transport)) {
+ readable = writable = transport;
+ } else {
+ readable = transport.readable;
+ writable = transport.writable;
+ }
+ listener = new StatefulListener(this, readable, writable, opts);
+ }
+ if (cb) {
+ listener.once('eot', cb);
+ }
+ return listener.on('_call', this._onListenerCall);
+};
+
+Protocol.prototype.getType = function (name) { return this._types[name]; };
+
+Protocol.prototype.getName = function () { return this._name; };
+
+Protocol.prototype.getMessages = function () { return this._messages; };
+
+Protocol.prototype.toString = function () {
+ var namedTypes = [];
+ Object.keys(this._types).forEach(function (name) {
+ var type = this._types[name];
+ if (type.getName()) {
+ namedTypes.push(type);
+ }
+ }, this);
+
+ return schemas.stringify({
+ protocol: this._name,
+ types: namedTypes.length ? namedTypes : undefined,
+ messages: this._messages
+ });
+};
+
+Protocol.prototype.inspect = function () {
+ return f('<Protocol %j>', this._name);
+};
+
+/**
+ * Base message emitter class.
+ *
+ * See below for the two available variants.
+ *
+ */
+function MessageEmitter(ptcl, opts) {
+ events.EventEmitter.call(this);
+
+ this._ptcl = ptcl;
+ this._resolvers = ptcl._emitterResolvers;
+ this._serverHashString = ptcl._hashString;
+ this._idType = IdType.createMetadataType(opts.IdType);
+ this._bufferSize = opts.bufferSize || 2048;
+ this._frameSize = opts.frameSize || 2048;
+
+ this.once('_eot', function (pending) { this.emit('eot', pending); });
+}
+util.inherits(MessageEmitter, events.EventEmitter);
+
+MessageEmitter.prototype._generateResolvers = function (
+ hashString, serverPtcl
+) {
+ var resolvers = {};
+ var emitterMessages = this._ptcl._messages;
+ var serverMessages = serverPtcl._messages;
+ Object.keys(emitterMessages).forEach(function (name) {
+ var cm = emitterMessages[name];
+ var sm = serverMessages[name];
+ if (!sm) {
+ throw new Error(f('missing server message: %s', name));
+ }
+ resolvers[name] = {
+ responseType: cm.responseType.createResolver(sm.responseType),
+ errorType: cm.errorType.createResolver(sm.errorType)
+ };
+ });
+ this._resolvers[hashString] = resolvers;
+};
+
+MessageEmitter.prototype._createHandshakeRequest = function (
+ hashString, noPtcl
+) {
+ return new HandshakeRequest(
+ getHash(this._ptcl),
+ noPtcl ? null : {string: this._ptcl.toString()},
+ new Buffer(hashString, 'binary')
+ );
+};
+
+MessageEmitter.prototype._finalizeHandshake = function (tap, handshakeReq) {
+ var res = HANDSHAKE_RESPONSE_TYPE._read(tap);
+ this.emit('handshake', handshakeReq, res);
+
+ if (handshakeReq.clientProtocol && res.match === 'NONE') {
+ // If the emitter's protocol was included in the original request, this is
+ // not a failure which a retry will fix.
+ var buf = res.meta && res.meta.map.error;
+ throw new Error(buf ? buf.toString() : 'handshake error');
+ }
+
+ var hashString;
+ if (res.serverHash && res.serverProtocol) {
+ // This means the request didn't include the correct server hash. Note that
+ // we use the handshake response's hash rather than our computed one in
+ // case the server computes it differently.
+ hashString = res.serverHash['org.apache.avro.ipc.MD5'].toString('binary');
+ if (!canResolve(this, hashString)) {
+ this._generateResolvers(
+ hashString,
+ createProtocol(JSON.parse(res.serverProtocol.string))
+ );
+ }
+ // Make this hash the new default.
+ this._serverHashString = hashString;
+ } else {
+ hashString = handshakeReq.serverHash.toString('binary');
+ }
+
+ // We return the server's hash for stateless emitters. It might be that the
+ // default hash changes in between requests, in which case using the default
+ // one will fail.
+ return {match: res.match, serverHashString: hashString};
+};
+
+MessageEmitter.prototype._encodeRequest = function (tap, message, req) {
+ safeWrite(tap, STRING_TYPE, message.name);
+ safeWrite(tap, message.requestType, req);
+};
+
+MessageEmitter.prototype._decodeArguments = function (
+ tap, hashString, message
+) {
+ var resolvers = getResolvers(this, hashString, message);
+ var args = [null, null];
+ if (tap.readBoolean()) {
+ args[0] = resolvers.errorType._read(tap);
+ } else {
+ args[1] = resolvers.responseType._read(tap);
+ }
+ if (!tap.isValid()) {
+ throw new Error('truncated message');
+ }
+ return args;
+};
+
+/**
+ * Factory-based emitter.
+ *
+ * This emitter doesn't keep a persistent connection to the server and requires
+ * prepending a handshake to each message emitted. Usage examples include
+ * talking to an HTTP server (where the factory returns an HTTP request).
+ *
+ * Since each message will use its own writable/readable stream pair, the
+ * advantage of this emitter is that it is able to keep track of which response
+ * corresponds to each request without relying on messages' metadata. In
+ * particular, this means these emitters are compatible with any server
+ * implementation.
+ *
+ */
+function StatelessEmitter(ptcl, writableFactory, opts) {
+ opts = opts || {};
+ MessageEmitter.call(this, ptcl, opts);
+
+ this._writableFactory = writableFactory;
+ this._id = 1;
+ this._pending = {};
+ this._destroyed = false;
+ this._interrupted = false;
+}
+util.inherits(StatelessEmitter, MessageEmitter);
+
+StatelessEmitter.prototype._emit = function (message, req, cb) {
+ // We enclose the server's hash inside this message's closure since the
+ // emitter might be emitting several message concurrently and the hash might
+ // change before the response returns (unlikely but possible if the emitter
+ // talks to multiple servers at once or the server changes protocol).
+ var serverHashString = this._serverHashString;
+ var id = this._id++;
+ var self = this;
+
+ this._pending[id] = cb;
+ if (this._destroyed) {
+ asyncAvroCb(undefined, done, 'emitter destroyed');
+ return;
+ }
+ emit(false);
+
+ function emit(retry) {
+ var tap = new Tap(new Buffer(self._bufferSize));
+
+ var handshakeReq = self._createHandshakeRequest(serverHashString, !retry);
+ safeWrite(tap, HANDSHAKE_REQUEST_TYPE, handshakeReq);
+ try {
+ safeWrite(tap, self._idType, id);
+ self._encodeRequest(tap, message, req);
+ } catch (err) {
+ asyncAvroCb(undefined, done, err);
+ return;
+ }
+
+ var writable = self._writableFactory(function onReadable(readable) {
+ if (self._interrupted) {
+ // In case this function is called asynchronously (e.g. when sending
+ // HTTP requests), it might be that we have ended since.
+ return;
+ }
+
+ readable
+ .pipe(new MessageDecoder(true))
+ .on('error', done)
+ // This will happen when the readable stream ends before a single
+ // message has been decoded (e.g. on invalid response).
+ .on('data', function (buf) {
+ readable.unpipe(this); // Single message per readable stream.
+ if (self._interrupted) {
+ return;
+ }
+
+ var tap = new Tap(buf);
+ try {
+ var info = self._finalizeHandshake(tap, handshakeReq);
+ serverHashString = info.serverHashString;
+ if (info.match === 'NONE') {
+ emit(true); // Retry, attaching emitter protocol this time.
+ return;
+ }
+ self._idType._read(tap); // Skip metadata.
+ var args = self._decodeArguments(tap, serverHashString, message);
+ } catch (err) {
+ done(err);
+ return;
+ }
+ done.apply(undefined, args);
+ });
+ });
+
+ var encoder = new MessageEncoder(self._frameSize);
+ encoder.pipe(writable);
+ encoder.end(tap.getValue());
+ }
+
+ function done(err, res) {
+ var cb = self._pending[id];
+ delete self._pending[id];
+ cb.call(self._ptcl, err, res);
+ if (self._destroyed) {
+ self.destroy();
+ }
+ }
+};
+
+StatelessEmitter.prototype.destroy = function (noWait) {
+ this._destroyed = true;
+
+ var pendingIds = Object.keys(this._pending);
+ if (noWait) {
+ this._interrupted = true;
+ pendingIds.forEach(function (id) {
+ this._pending[id]({string: 'interrupted'});
+ delete this._pending[id];
+ }, this);
+ }
+
+ if (noWait || !pendingIds.length) {
+ this.emit('_eot', pendingIds.length);
+ }
+};
+
+/**
+ * Multiplexing emitter.
+ *
+ * These emitters reuse the same streams (both readable and writable) for all
+ * messages. This avoids a lot of overhead (e.g. creating new connections,
+ * re-issuing handshakes) but requires the server to include compatible
+ * metadata in each response (namely forwarding each request's ID into its
+ * response).
+ *
+ * A custom metadata format can be specified via the `idType` option. The
+ * default is compatible with this package's default server (i.e. listener)
+ * implementation.
+ *
+ */
+function StatefulEmitter(ptcl, readable, writable, opts) {
+ opts = opts || {};
+ MessageEmitter.call(this, ptcl, opts);
+
+ this._readable = readable;
+ this._writable = writable;
+ this._id = 1;
+ this._pending = {};
+ this._started = false;
+ this._destroyed = false;
+ this._ended = false; // Readable input ended.
+ this._decoder = new MessageDecoder();
+ this._encoder = new MessageEncoder(this._frameSize);
+
+ var handshakeReq = null;
+ var self = this;
+
+ process.nextTick(function () {
+ self._readable.pipe(self._decoder)
+ .on('error', function (err) { self.emit('error', err); })
+ .on('data', onHandshakeData)
+ .on('end', function () {
+ self._ended = true;
+ self.destroy();
+ });
+
+ self._encoder.pipe(self._writable);
+ emitHandshake(true);
+ });
+
+ function emitHandshake(noPtcl) {
+ handshakeReq = self._createHandshakeRequest(
+ self._serverHashString,
+ noPtcl
+ );
+ self._encoder.write(handshakeReq.$toBuffer());
+ }
+
+ function onHandshakeData(buf) {
+ var tap = new Tap(buf);
+ try {
+ var info = self._finalizeHandshake(tap, handshakeReq);
+ } catch (err) {
+ self.emit('error', err);
+ self.destroy(); // This isn't a recoverable error.
+ return;
+ }
+
+ if (info.match !== 'NONE') {
+ self._decoder
+ .removeListener('data', onHandshakeData)
+ .on('data', onMessageData);
+ self._started = true;
+ self.emit('_start'); // Send any pending messages.
+ } else {
+ emitHandshake(false);
+ }
+ }
+
+ function onMessageData(buf) {
+ var tap = new Tap(buf);
+ try {
+ var id = self._idType._read(tap);
+ if (!id) {
+ throw new Error('missing ID');
+ }
+ } catch (err) {
+ self.emit('error', new Error('invalid metadata: ' + err.message));
+ return;
+ }
+
+ var info = self._pending[id];
+ if (info === undefined) {
+ self.emit('error', new Error('orphan response: ' + id));
+ return;
+ }
+
+ try {
+ var args = self._decodeArguments(
+ tap,
+ self._serverHashString,
+ info.message
+ );
+ } catch (err) {
+ info.cb({string: 'invalid response: ' + err.message});
+ return;
+ }
+ delete self._pending[id];
+ info.cb.apply(self._ptcl, args);
+ if (self._destroyed) {
+ self.destroy();
+ }
+ }
+}
+util.inherits(StatefulEmitter, MessageEmitter);
+
+StatefulEmitter.prototype._emit = function (message, req, cb) {
+ if (this._destroyed) {
+ asyncAvroCb(this._ptcl, cb, 'emitter destroyed');
+ return;
+ }
+
+ var self = this;
+ if (!this._started) {
+ this.once('_start', function () { self._emit(message, req, cb); });
+ return;
+ }
+
+ var tap = new Tap(new Buffer(this._bufferSize));
+ var id = this._id++;
+ try {
+ safeWrite(tap, this._idType, -id);
+ this._encodeRequest(tap, message, req);
+ } catch (err) {
+ asyncAvroCb(this._ptcl, cb, err);
+ return;
+ }
+
+ if (!message.oneWay) {
+ this._pending[id] = {message: message, cb: cb};
+ }
+ this._encoder.write(tap.getValue());
+};
+
+StatefulEmitter.prototype.destroy = function (noWait) {
+ this._destroyed = true;
+ if (!this._started) {
+ this.emit('_start'); // Error out any pending calls.
+ }
+
+ var pendingIds = Object.keys(this._pending);
+ if (pendingIds.length && !(noWait || this._ended)) {
+ return; // Wait for pending requests.
+ }
+ pendingIds.forEach(function (id) {
+ var cb = this._pending[id].cb;
+ delete this._pending[id];
+ cb({string: 'interrupted'});
+ }, this);
+
+ this._readable.unpipe(this._decoder);
+ this._encoder.unpipe(this._writable);
+ this.emit('_eot', pendingIds.length);
+};
+
+/**
+ * The server-side emitter equivalent.
+ *
+ * In particular it is responsible for handling handshakes appropriately.
+ *
+ */
+function MessageListener(ptcl, opts) {
+ events.EventEmitter.call(this);
+ opts = opts || {};
+
+ this._ptcl = ptcl;
+ this._resolvers = ptcl._listenerResolvers;
+ this._emitterHashString = null;
+ this._idType = IdType.createMetadataType(opts.IdType);
+ this._bufferSize = opts.bufferSize || 2048;
+ this._frameSize = opts.frameSize || 2048;
+ this._decoder = new MessageDecoder();
+ this._encoder = new MessageEncoder(this._frameSize);
+ this._destroyed = false;
+ this._pending = 0;
+
+ this.once('_eot', function (pending) { this.emit('eot', pending); });
+}
+util.inherits(MessageListener, events.EventEmitter);
+
+MessageListener.prototype._generateResolvers = function (
+ hashString, emitterPtcl
+) {
+ var resolvers = {};
+ var clientMessages = emitterPtcl._messages;
+ var serverMessages = this._ptcl._messages;
+ Object.keys(clientMessages).forEach(function (name) {
+ var sm = serverMessages[name];
+ if (!sm) {
+ throw new Error(f('missing server message: %s', name));
+ }
+ var cm = clientMessages[name];
+ resolvers[name] = {
+ requestType: sm.requestType.createResolver(cm.requestType)
+ };
+ });
+ this._resolvers[hashString] = resolvers;
+};
+
+MessageListener.prototype._validateHandshake = function (reqTap, resTap) {
+ // Reads handshake request and write corresponding response out. If an error
+ // occurs when parsing the request, a response with match NONE will be sent.
+ // Also emits 'handshake' event with both the request and the response.
+ var validationErr = null;
+ try {
+ var handshakeReq = HANDSHAKE_REQUEST_TYPE._read(reqTap);
+ var serverHashString = handshakeReq.serverHash.toString('binary');
+ } catch (err) {
+ validationErr = err;
+ }
+
+ if (!validationErr) {
+ this._emitterHashString = handshakeReq.clientHash.toString('binary');
+ if (!canResolve(this, this._emitterHashString)) {
+ var emitterPtclString = handshakeReq.clientProtocol;
+ if (emitterPtclString) {
+ try {
+ this._generateResolvers(
+ this._emitterHashString,
+ createProtocol(JSON.parse(emitterPtclString.string))
+ );
+ } catch (err) {
+ validationErr = err;
+ }
+ } else {
+ validationErr = new Error('unknown client protocol hash');
+ }
+ }
+ }
+
+ // We use the handshake response's meta field to transmit an eventual error
+ // to the client. This will let us display a more useful message later on.
+ var serverMatch = serverHashString === this._ptcl._hashString;
+ var handshakeRes = new HandshakeResponse(
+ validationErr ? 'NONE' : serverMatch ? 'BOTH' : 'CLIENT',
+ serverMatch ? null : {string: this._ptcl.toString()},
+ serverMatch ? null : {'org.apache.avro.ipc.MD5': getHash(this._ptcl)},
+ validationErr ? {map: {error: new Buffer(validationErr.message)}} : null
+ );
+
+ this.emit('handshake', handshakeReq, handshakeRes);
+ safeWrite(resTap, HANDSHAKE_RESPONSE_TYPE, handshakeRes);
+ return validationErr === null;
+};
+
+MessageListener.prototype._decodeRequest = function (tap, message) {
+ var resolvers = getResolvers(this, this._emitterHashString, message);
+ var val = resolvers.requestType._read(tap);
+ if (!tap.isValid()) {
+ throw new Error('invalid request');
+ }
+ return val;
+};
+
+MessageListener.prototype._encodeSystemError = function (tap, err) {
+ safeWrite(tap, BOOLEAN_TYPE, true);
+ safeWrite(tap, SYSTEM_ERROR_TYPE, avroError(err));
+};
+
+MessageListener.prototype._encodeArguments = function (
+ tap, message, err, res
+) {
+ var noError = err === null;
+ var pos = tap.pos;
+ safeWrite(tap, BOOLEAN_TYPE, !noError);
+ try {
+ if (noError) {
+ safeWrite(tap, message.responseType, res);
+ } else {
+ if (err instanceof Error) {
+ // Convenience to allow emitter to use JS errors inside handlers.
+ err = avroError(err);
+ }
+ safeWrite(tap, message.errorType, err);
+ }
+ } catch (err) {
+ tap.pos = pos;
+ this._encodeSystemError(tap, err);
+ }
+};
+
+MessageListener.prototype.destroy = function (noWait) {
+ if (!this._destroyed) {
+ // Stop listening. This will also correctly push back any unused bytes into
+ // the readable stream (via `MessageDecoder`'s `unpipe` handler).
+ this._readable.unpipe(this._decoder);
+ }
+
+ this._destroyed = true;
+ if (noWait || !this._pending) {
+ this._encoder.unpipe(this._writable);
+ this.emit('_eot', this._pending);
+ }
+};
+
+/**
+ * Listener for stateless transport.
+ *
+ * This listener expect a handshake to precede each message.
+ *
+ */
+function StatelessListener(ptcl, readableFactory, opts) {
+ MessageListener.call(this, ptcl, opts);
+
+ this._tap = new Tap(new Buffer(this._bufferSize));
+ this._message = undefined;
+
+ var self = this;
+ this._readable = readableFactory(function (writable) {
+ // The encoder will buffer writes that happen before this function is
+ // called, so we don't need to do any special handling.
+ self._writable = self._encoder
+ .pipe(writable)
+ .on('finish', onEnd);
+ });
+
+ this._readable.pipe(this._decoder)
+ .on('data', onRequestData)
+ .on('end', onEnd);
+
+ function onRequestData(buf) {
+ self._pending++;
+ self.destroy(); // Only one message per stateless listener.
+
+ var reqTap = new Tap(buf);
+ if (!self._validateHandshake(reqTap, self._tap)) {
+ onResponse(new Error('invalid handshake'));
+ return;
+ }
+
+ try {
+ self._idType._read(reqTap); // Skip metadata.
+ var name = STRING_TYPE._read(reqTap);
+ self._message = self._ptcl._messages[name];
+ if (!self._message) {
+ throw new Error(f('unknown message: %s', name));
+ }
+ var req = self._decodeRequest(reqTap, self._message);
+ } catch (err) {
+ onResponse(err);
+ return;
+ }
+
+ self.emit('_call', name, req, onResponse);
+ }
+
+ function onResponse(err, res) {
+ safeWrite(self._tap, self._idType, 0);
+ if (!self._message) {
+ self._encodeSystemError(self._tap, err);
+ } else {
+ self._encodeArguments(self._tap, self._message, err, res);
+ }
+ self._pending--;
+ self._encoder.end(self._tap.getValue());
+ }
+
+ function onEnd() { self.destroy(); }
+}
+util.inherits(StatelessListener, MessageListener);
+
+/**
+ * Stateful transport listener.
+ *
+ * A handshake is done when the listener is first opened, then all messages are
+ * sent without.
+ *
+ */
+function StatefulListener(ptcl, readable, writable, opts) {
+ MessageListener.call(this, ptcl, opts);
+
+ this._readable = readable;
+ this._writable = writable;
+
+ var self = this;
+
+ this._readable
+ .pipe(this._decoder)
+ .on('data', onHandshakeData)
+ .on('end', function () { self.destroy(); });
+
+ this._encoder
+ .pipe(this._writable)
+ .on('finish', function () { self.destroy(); });
+
+ function onHandshakeData(buf) {
+ var reqTap = new Tap(buf);
+ var resTap = new Tap(new Buffer(self._bufferSize));
+ if (self._validateHandshake(reqTap, resTap)) {
+ self._decoder
+ .removeListener('data', onHandshakeData)
+ .on('data', onRequestData);
+ }
+ self._encoder.write(resTap.getValue());
+ }
+
+ function onRequestData(buf) {
+ var reqTap = new Tap(buf);
+ var resTap = new Tap(new Buffer(self._bufferSize));
+ var id = 0;
+ try {
+ id = -self._idType._read(reqTap) | 0;
+ if (!id) {
+ throw new Error('missing ID');
+ }
+ } catch (err) {
+ self.emit('error', new Error('invalid metadata: ' + err.message));
+ return;
+ }
+
+ self._pending++;
+ try {
+ var name = STRING_TYPE._read(reqTap);
+ var message = self._ptcl._messages[name];
+ if (!message) {
+ throw new Error('unknown message: ' + name);
+ }
+ var req = self._decodeRequest(reqTap, message);
+ } catch (err) {
+ onResponse(err);
+ return;
+ }
+
+ if (message.oneWay) {
+ self.emit('_call', name, req);
+ self._pending--;
+ } else {
+ self.emit('_call', name, req, onResponse);
+ }
+
+ function onResponse(err, res) {
+ self._pending--;
+ safeWrite(resTap, self._idType, id);
+ if (!message) {
+ self._encodeSystemError(resTap, err);
+ } else {
+ self._encodeArguments(resTap, message, err, res);
+ }
+ self._encoder.write(resTap.getValue(), undefined, function () {
+ if (!self._pending && self._destroyed) {
+ self.destroy(); // For real this time.
+ }
+ });
+ }
+ }
+}
+util.inherits(StatefulListener, MessageListener);
+
+// Helpers.
+
+/**
+ * An Avro message.
+ *
+ */
+function Message(name, attrs, opts) {
+ this.name = name;
+
+ this.requestType = schemas.createType({
+ name: name,
+ type: 'request',
+ fields: attrs.request
+ }, opts);
+
+ if (!attrs.response) {
+ throw new Error('missing response');
+ }
+ this.responseType = schemas.createType(attrs.response, opts);
+
+ var errors = attrs.errors || [];
+ errors.unshift('string');
+ this.errorType = schemas.createType(errors, opts);
+
+ this.oneWay = !!attrs['one-way'];
+ if (this.oneWay) {
+ if (
+ !(this.responseType instanceof schemas.types.NullType) ||
+ errors.length > 1
+ ) {
+ throw new Error('unapplicable one-way parameter');
+ }
+ }
+}
+
+Message.prototype.toJSON = function () {
+ var obj = {
+ request: this.requestType.getFields(),
+ response: this.responseType
+ };
+ var errorTypes = this.errorType.getTypes();
+ if (errorTypes.length > 1) {
+ obj.errors = schemas.createType(errorTypes.slice(1));
+ }
+ return obj;
+};
+
+/**
+ * "Framing" stream.
+ *
+ * @param frameSize {Number} (Maximum) size in bytes of each frame. The last
+ * frame might be shorter.
+ *
+ */
+function MessageEncoder(frameSize) {
+ stream.Transform.call(this);
+ this._frameSize = frameSize | 0;
+ if (this._frameSize <= 0) {
+ throw new Error('invalid frame size');
+ }
+}
+util.inherits(MessageEncoder, stream.Transform);
+
+MessageEncoder.prototype._transform = function (buf, encoding, cb) {
+ var frames = [];
+ var length = buf.length;
+ var start = 0;
+ var end;
+ do {
+ end = start + this._frameSize;
+ if (end > length) {
+ end = length;
+ }
+ frames.push(intBuffer(end - start));
+ frames.push(buf.slice(start, end));
+ } while ((start = end) < length);
+ frames.push(intBuffer(0));
+ cb(null, Buffer.concat(frames));
+};
+
+/**
+ * "Un-framing" stream.
+ *
+ * @param noEmpty {Boolean} Emit an error if the decoder ends before emitting a
+ * single frame.
+ *
+ * This stream should only be used by being piped/unpiped to. Otherwise there
+ * is a risk that too many bytes get consumed from the source stream (i.e.
+ * data corresponding to a partial message might be lost).
+ *
+ */
+function MessageDecoder(noEmpty) {
+ stream.Transform.call(this);
+ this._buf = new Buffer(0);
+ this._bufs = [];
+ this._length = 0;
+ this._empty = !!noEmpty;
+
+ this
+ .on('finish', function () { this.push(null); })
+ .on('unpipe', function (src) {
+ if (~this._length && !src._readableState.ended) {
+ // Not ideal to rely on this to check whether we can unshift, but the
+ // official documentation mentions it (in the context of the read
+ // buffers) so it should be stable. Alternatives are more complex,
+ // costly (e.g. attaching a handler on pipe), and not as fool-proof
+ // (the stream might have ended earlier).
+ this._bufs.push(this._buf);
+ src.unshift(Buffer.concat(this._bufs));
+ }
+ });
+}
+util.inherits(MessageDecoder, stream.Transform);
+
+MessageDecoder.prototype._transform = function (buf, encoding, cb) {
+ buf = Buffer.concat([this._buf, buf]);
+ var frameLength;
+ while (
+ buf.length >= 4 &&
+ buf.length >= (frameLength = buf.readInt32BE(0)) + 4
+ ) {
+ if (frameLength) {
+ this._bufs.push(buf.slice(4, frameLength + 4));
+ this._length += frameLength;
+ } else {
+ var frame = Buffer.concat(this._bufs, this._length);
+ this._empty = false;
+ this._length = 0;
+ this._bufs = [];
+ this.push(frame);
+ }
+ buf = buf.slice(frameLength + 4);
+ }
+ this._buf = buf;
+ cb();
+};
+
+MessageDecoder.prototype._flush = function () {
+ if (this._length || this._buf.length) {
+ this._length = -1; // Don't unshift data on incoming unpipe.
+ this.emit('error', new Error('trailing data'));
+ } else if (this._empty) {
+ this.emit('error', new Error('no message decoded'));
+ }
+};
+
+/**
+ * Default ID generator, using Avro messages' metadata field.
+ *
+ * This is required for stateful emitters to work and can be overridden to read
+ * or write arbitrary metadata. Note that the message contents are
+ * (intentionally) not available when updating this metadata.
+ *
+ */
+function IdType(attrs, opts) {
+ schemas.types.LogicalType.call(this, attrs, opts);
+}
+util.inherits(IdType, schemas.types.LogicalType);
+
+IdType.prototype._fromValue = function (val) {
+ var buf = val.id;
+ return buf && buf.length === 4 ? buf.readInt32BE(0) : 0;
+};
+
+IdType.prototype._toValue = function (any) {
+ return {id: intBuffer(any | 0)};
+};
+
+IdType.createMetadataType = function (Type) {
+ Type = Type || IdType;
+ return new Type({type: 'map', values: 'bytes'});
+};
+
+/**
+ * Returns a buffer containing an integer's big-endian representation.
+ *
+ * @param n {Number} Integer.
+ *
+ */
+function intBuffer(n) {
+ var buf = new Buffer(4);
+ buf.writeInt32BE(n);
+ return buf;
+}
+
+/**
+ * Write and maybe resize.
+ *
+ * @param tap {Tap} Tap written to.
+ * @param type {Type} Avro type.
+ * @param val {...} Corresponding Avro value.
+ *
+ */
+function safeWrite(tap, type, val) {
+ var pos = tap.pos;
+ type._write(tap, val);
+
+ if (!tap.isValid()) {
+ var buf = new Buffer(tap.pos);
+ tap.buf.copy(buf, 0, 0, pos);
+ tap.buf = buf;
+ tap.pos = pos;
+ type._write(tap, val);
+ }
+}
+
+/**
+ * Default callback when not provided.
+ *
+ */
+function throwError(err) {
+ if (!err) {
+ return;
+ }
+ if (typeof err == 'object' && err.string) {
+ err = err.string;
+ }
+ if (typeof err == 'string') {
+ err = new Error(err);
+ }
+ throw err;
+}
+
+/**
+ * Convert an error message into a format suitable for RPC.
+ *
+ * @param err {Error|String} Error message. It will be converted into valid
+ * format for Avro.
+ *
+ */
+function avroError(err) {
+ if (err instanceof Error) {
+ err = err.message;
+ }
+ return {string: err};
+}
+
+/**
+ * Asynchronous error handling.
+ *
+ * @param cb {Function} Callback.
+ * @param err {...} Error, passed as first argument to `cb.` If an `Error`
+ * instance or a string, it will be converted into valid format for Avro.
+ * @param res {...} Response. Passed as second argument to `cb`.
+ *
+ */
+function asyncAvroCb(ctx, cb, err, res) {
+ process.nextTick(function () { cb.call(ctx, avroError(err), res); });
+}
+
+/**
+ * Convenience function to get a protocol's hash.
+ *
+ * @param ptcl {Protocol} Any protocol.
+ *
+ */
+function getHash(ptcl) {
+ return new Buffer(ptcl._hashString, 'binary');
+}
+
+/**
+ * Whether a emitter or listener can resolve messages from a hash string.
+ *
+ * @param emitter {MessageEmitter|MessageListener}
+ * @param hashString {String}
+ *
+ */
+function canResolve(emitter, hashString) {
+ var resolvers = emitter._resolvers[hashString];
+ return !!resolvers || hashString === emitter._ptcl._hashString;
+}
+
+/**
+ * Retrieve resolvers for a given hash string.
+ *
+ * @param emitter {MessageEmitter|MessageListener}
+ * @param hashString {String}
+ * @param message {Message}
+ *
+ */
+function getResolvers(emitter, hashString, message) {
+ if (hashString === emitter._ptcl._hashString) {
+ return message;
+ }
+ var resolvers = emitter._resolvers[hashString];
+ return resolvers && resolvers[message.name];
+}
+
+/**
+ * Check whether something is a stream.
+ *
+ * @param any {Object} Any object.
+ *
+ */
+function isStream(any) {
+ // This is a hacky way of checking that the transport is a stream-like
+ // object. We unfortunately can't use `instanceof Stream` checks since
+ // some libraries (e.g. websocket-stream) return streams which don't
+ // inherit from it.
+ return !!any.pipe;
+}
+
+
+module.exports = {
+ HANDSHAKE_REQUEST_TYPE: HANDSHAKE_REQUEST_TYPE,
+ HANDSHAKE_RESPONSE_TYPE: HANDSHAKE_RESPONSE_TYPE,
+ IdType: IdType,
+ Message: Message,
+ Protocol: Protocol,
+ createProtocol: createProtocol,
+ emitters: {
+ StatefulEmitter: StatefulEmitter,
+ StatelessEmitter: StatelessEmitter
+ },
+ listeners: {
+ StatefulListener: StatefulListener,
+ StatelessListener: StatelessListener
+ },
+ streams: {
+ MessageDecoder: MessageDecoder,
+ MessageEncoder: MessageEncoder
+ },
+ throwError: throwError
+};
http://git-wip-us.apache.org/repos/asf/avro/blob/9101a42b/lang/js/lib/schemas.js
----------------------------------------------------------------------
diff --git a/lang/js/lib/schemas.js b/lang/js/lib/schemas.js
index 325d77a..429ace4 100644
--- a/lang/js/lib/schemas.js
+++ b/lang/js/lib/schemas.js
@@ -23,7 +23,6 @@
var utils = require('./utils'),
buffer = require('buffer'), // For `SlowBuffer`.
- crypto = require('crypto'),
util = require('util');
// Convenience imports.
@@ -45,6 +44,7 @@ var TYPES = {
'map': MapType,
'null': NullType,
'record': RecordType,
+ 'request': RecordType,
'string': StringType,
'union': UnionType
};
@@ -307,31 +307,11 @@ Type.prototype.getName = function (noRef) {
};
Type.prototype.getSchema = function (noDeref) {
- // Since JS objects are unordered, this implementation (unfortunately)
- // relies on engines returning properties in the same order that they are
- // inserted in. This is not in the JS spec, but can be "somewhat" safely
- // assumed (more here: http://stackoverflow.com/q/5525795/1062617).
- return (function (type, registry) {
- return JSON.stringify(type, function (key, value) {
- if (value instanceof Field) {
- return {name: value._name, type: value._type};
- } else if (value && value.name) {
- var name = value.name;
- if (noDeref || registry[name]) {
- return name;
- }
- registry[name] = true;
- }
- return value;
- });
- })(this, {});
+ return stringify(this, noDeref);
};
Type.prototype.getFingerprint = function (algorithm) {
- algorithm = algorithm || 'md5';
- var hash = crypto.createHash(algorithm);
- hash.end(this.getSchema());
- return hash.read();
+ return utils.getHash(this.getSchema(), algorithm);
};
Type.prototype.inspect = function () {
@@ -1434,7 +1414,9 @@ function RecordType(attrs, opts) {
var resolutions = resolveNames(attrs, opts.namespace);
this._name = resolutions.name;
this._aliases = resolutions.aliases;
- Type.call(this, opts.registry);
+ this._type = attrs.type;
+ // Requests shouldn't be registered since their name is only a placeholder.
+ Type.call(this, this._type === 'request' ? undefined : opts.registry);
if (!(attrs.fields instanceof Array)) {
throw new Error(f('non-array %s fields', this._name));
@@ -2156,6 +2138,37 @@ function readArraySize(tap) {
}
/**
+ * Correctly stringify an object which contains types.
+ *
+ * @param obj {Object} The object to stringify. Typically, a type itself or an
+ * object containing types. Any types inside will be expanded only once then
+ * referenced by name.
+ * @param noDeref {Boolean} Always reference types by name when possible,
+ * rather than expand it the first time it is encountered.
+ *
+ */
+function stringify(obj, noDeref) {
+ // Since JS objects are unordered, this implementation (unfortunately)
+ // relies on engines returning properties in the same order that they are
+ // inserted in. This is not in the JS spec, but can be "somewhat" safely
+ // assumed (more here: http://stackoverflow.com/q/5525795/1062617).
+ return (function (registry) {
+ return JSON.stringify(obj, function (key, value) {
+ if (value instanceof Field) {
+ return {name: value._name, type: value._type};
+ } else if (value && value.name) {
+ var name = value.name;
+ if (noDeref || registry[name]) {
+ return name;
+ }
+ registry[name] = true;
+ }
+ return value;
+ });
+ })({});
+}
+
+/**
* Check whether a long can be represented without precision loss.
*
* @param n {Number} The number.
@@ -2192,6 +2205,7 @@ function throwInvalidError(path, val, type) {
module.exports = {
createType: createType,
resolveNames: resolveNames, // Protocols use the same name resolution logic.
+ stringify: stringify,
types: (function () {
// Export the base types along with all concrete implementations.
var obj = {Type: Type, LogicalType: LogicalType};
http://git-wip-us.apache.org/repos/asf/avro/blob/9101a42b/lang/js/lib/utils.js
----------------------------------------------------------------------
diff --git a/lang/js/lib/utils.js b/lang/js/lib/utils.js
index 2627c1a..c453bdd 100644
--- a/lang/js/lib/utils.js
+++ b/lang/js/lib/utils.js
@@ -21,6 +21,9 @@
'use strict';
+var crypto = require('crypto');
+
+
/**
* Uppercase the first letter of a string.
*
@@ -39,6 +42,20 @@ function capitalize(s) { return s.charAt(0).toUpperCase() + s.slice(1); }
function compare(n1, n2) { return n1 === n2 ? 0 : (n1 < n2 ? -1 : 1); }
/**
+ * Compute a string's hash.
+ *
+ * @param str {String} The string to hash.
+ * @param algorithm {String} The algorithm used. Defaults to MD5.
+ *
+ */
+function getHash(str, algorithm) {
+ algorithm = algorithm || 'md5';
+ var hash = crypto.createHash(algorithm);
+ hash.end(str);
+ return hash.read();
+}
+
+/**
* Find index of value in array.
*
* @param arr {Array} Can also be a false-ish value.
@@ -278,6 +295,12 @@ function Tap(buf, pos) {
*/
Tap.prototype.isValid = function () { return this.pos <= this.buf.length; };
+/**
+ * Returns the contents of the tap up to the current position.
+ *
+ */
+Tap.prototype.getValue = function () { return this.buf.slice(0, this.pos); };
+
// Read, skip, write methods.
//
// These should fail silently when the buffer overflows. Note this is only
@@ -623,6 +646,7 @@ module.exports = {
abstractFunction: abstractFunction,
capitalize: capitalize,
compare: compare,
+ getHash: getHash,
toMap: toMap,
singleIndexOf: singleIndexOf,
hasDuplicates: hasDuplicates,
http://git-wip-us.apache.org/repos/asf/avro/blob/9101a42b/lang/js/package.json
----------------------------------------------------------------------
diff --git a/lang/js/package.json b/lang/js/package.json
index fda5325..672faff 100644
--- a/lang/js/package.json
+++ b/lang/js/package.json
@@ -34,8 +34,9 @@
"crypto": "./etc/browser/crypto.js"
},
"scripts": {
- "test": "mocha --ui tdd --reporter dot",
- "clean": "rm -rf node_modules"
+ "cover": "istanbul cover _mocha",
+ "test": "mocha",
+ "clean": "rm -rf coverage node_modules"
},
"dependencies": {
"underscore": "*"
http://git-wip-us.apache.org/repos/asf/avro/blob/9101a42b/lang/js/test/mocha.opts
----------------------------------------------------------------------
diff --git a/lang/js/test/mocha.opts b/lang/js/test/mocha.opts
new file mode 100644
index 0000000..1ed06f8
--- /dev/null
+++ b/lang/js/test/mocha.opts
@@ -0,0 +1,2 @@
+--ui tdd
+--reporter dot
http://git-wip-us.apache.org/repos/asf/avro/blob/9101a42b/lang/js/test/test_files.js
----------------------------------------------------------------------
diff --git a/lang/js/test/test_files.js b/lang/js/test/test_files.js
index c0a334b..4f7c547 100644
--- a/lang/js/test/test_files.js
+++ b/lang/js/test/test_files.js
@@ -22,6 +22,7 @@
'use strict';
var files = require('../lib/files'),
+ protocols = require('../lib/protocols'),
schemas = require('../lib/schemas'),
assert = require('assert'),
fs = require('fs'),
@@ -43,7 +44,7 @@ suite('files', function () {
var parse = files.parse;
- test('object', function () {
+ test('type object', function () {
var obj = {
type: 'record',
name: 'Person',
@@ -52,6 +53,11 @@ suite('files', function () {
assert(parse(obj) instanceof types.RecordType);
});
+ test('protocol object', function () {
+ var obj = {protocol: 'Foo'};
+ assert(parse(obj) instanceof protocols.Protocol);
+ });
+
test('schema instance', function () {
var type = parse({
type: 'record',