You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ga...@apache.org on 2015/10/14 12:09:29 UTC
[32/52] [partial] couchdb-nmo git commit: prepare for release
http://git-wip-us.apache.org/repos/asf/couchdb-nmo/blob/753f1767/node_modules/couchbulkimporter/node_modules/mongodb/node_modules/mongodb-core/lib/topologies/server.js
----------------------------------------------------------------------
diff --git a/node_modules/couchbulkimporter/node_modules/mongodb/node_modules/mongodb-core/lib/topologies/server.js b/node_modules/couchbulkimporter/node_modules/mongodb/node_modules/mongodb-core/lib/topologies/server.js
new file mode 100644
index 0000000..0fae9ea
--- /dev/null
+++ b/node_modules/couchbulkimporter/node_modules/mongodb/node_modules/mongodb-core/lib/topologies/server.js
@@ -0,0 +1,1230 @@
+ "use strict";
+
+var inherits = require('util').inherits
+ , f = require('util').format
+ , bindToCurrentDomain = require('../connection/utils').bindToCurrentDomain
+ , EventEmitter = require('events').EventEmitter
+ , Pool = require('../connection/pool')
+ , b = require('bson')
+ , Query = require('../connection/commands').Query
+ , MongoError = require('../error')
+ , ReadPreference = require('./read_preference')
+ , BasicCursor = require('../cursor')
+ , CommandResult = require('./command_result')
+ , getSingleProperty = require('../connection/utils').getSingleProperty
+ , getProperty = require('../connection/utils').getProperty
+ , debugOptions = require('../connection/utils').debugOptions
+ , BSON = require('bson').native().BSON
+ , PreTwoSixWireProtocolSupport = require('../wireprotocol/2_4_support')
+ , TwoSixWireProtocolSupport = require('../wireprotocol/2_6_support')
+ , ThreeTwoWireProtocolSupport = require('../wireprotocol/3_2_support')
+ , Session = require('./session')
+ , Logger = require('../connection/logger')
+ , MongoCR = require('../auth/mongocr')
+ , X509 = require('../auth/x509')
+ , Plain = require('../auth/plain')
+ , GSSAPI = require('../auth/gssapi')
+ , SSPI = require('../auth/sspi')
+ , ScramSHA1 = require('../auth/scram');
+
+/**
+ * @fileOverview The **Server** class is a class that represents a single server topology and is
+ * used to construct connections.
+ *
+ * @example
+ * var Server = require('mongodb-core').Server
+ * , ReadPreference = require('mongodb-core').ReadPreference
+ * , assert = require('assert');
+ *
+ * var server = new Server({host: 'localhost', port: 27017});
+ * // Wait for the connection event
+ * server.on('connect', function(server) {
+ * server.destroy();
+ * });
+ *
+ * // Start connecting
+ * server.connect();
+ */
+
+// All bson types
+var bsonTypes = [b.Long, b.ObjectID, b.Binary, b.Code, b.DBRef, b.Symbol, b.Double, b.Timestamp, b.MaxKey, b.MinKey];
+// BSON parser
+var bsonInstance = null;
+// Server instance id
+var serverId = 0;
+// Callbacks instance id
+var callbackId = 0;
+
+// Single store for all callbacks
+var Callbacks = function() {
+ // EventEmitter.call(this);
+ var self = this;
+ // Callbacks
+ this.callbacks = {};
+ // Set the callbacks id
+ this.id = callbackId++;
+ // Set the type to server
+ this.type = 'server';
+}
+
+//
+// Clone the options
+var cloneOptions = function(options) {
+ var opts = {};
+ for(var name in options) {
+ opts[name] = options[name];
+ }
+ return opts;
+}
+
+//
+// Flush all callbacks
+Callbacks.prototype.flush = function(err) {
+ for(var id in this.callbacks) {
+ if(!isNaN(parseInt(id, 10))) {
+ var callback = this.callbacks[id];
+ delete this.callbacks[id];
+ callback(err, null);
+ }
+ }
+}
+
+Callbacks.prototype.emit = function(id, err, value) {
+ var callback = this.callbacks[id];
+ delete this.callbacks[id];
+ callback(err, value);
+}
+
+Callbacks.prototype.raw = function(id) {
+ if(this.callbacks[id] == null) return false;
+ return this.callbacks[id].raw == true ? true : false
+}
+
+Callbacks.prototype.documentsReturnedIn = function(id) {
+ if(this.callbacks[id] == null) return false;
+ return typeof this.callbacks[id].documentsReturnedIn == 'string' ? this.callbacks[id].documentsReturnedIn : null;
+}
+
+Callbacks.prototype.unregister = function(id) {
+ delete this.callbacks[id];
+}
+
+Callbacks.prototype.register = function(id, callback) {
+ this.callbacks[id] = bindToCurrentDomain(callback);
+}
+
+/**
+ * @ignore
+ */
+var bindToCurrentDomain = function(callback) {
+ var domain = process.domain;
+ if(domain == null || callback == null) return callback;
+ return domain.bind(callback);
+}
+
+var DISCONNECTED = 'disconnected';
+var CONNECTING = 'connecting';
+var CONNECTED = 'connected';
+var DESTROYED = 'destroyed';
+
+// Supports server
+var supportsServer = function(_s) {
+ return _s.ismaster && typeof _s.ismaster.minWireVersion == 'number';
+}
+
+//
+// createWireProtocolHandler
+var createWireProtocolHandler = function(result) {
+ // 3.2 wire protocol handler
+ if(result && result.maxWireVersion >= 4) {
+ return new ThreeTwoWireProtocolSupport(new TwoSixWireProtocolSupport());
+ }
+
+ // 2.6 wire protocol handler
+ if(result && result.maxWireVersion >= 2) {
+ return new TwoSixWireProtocolSupport();
+ }
+
+ // 2.4 or earlier wire protocol handler
+ return new PreTwoSixWireProtocolSupport();
+}
+
+//
+// Reconnect server
+var reconnectServer = function(self, state) {
+ // If the current reconnect retries is 0 stop attempting to reconnect
+ if(state.currentReconnectRetry == 0) {
+ return self.destroy(true, true);
+ }
+
+ // Adjust the number of retries
+ state.currentReconnectRetry = state.currentReconnectRetry - 1;
+
+ // Set status to connecting
+ state.state = CONNECTING;
+ // Create a new Pool
+ state.pool = new Pool(state.options);
+ // error handler
+ var reconnectErrorHandler = function(err) {
+ state.state = DISCONNECTED;
+ // Destroy the pool
+ state.pool.destroy();
+ // Adjust the number of retries
+ state.currentReconnectRetry = state.currentReconnectRetry - 1;
+ // No more retries
+ if(state.currentReconnectRetry <= 0) {
+ self.state = DESTROYED;
+ self.emit('error', f('failed to connect to %s:%s after %s retries', state.options.host, state.options.port, state.reconnectTries));
+ } else {
+ setTimeout(function() {
+ reconnectServer(self, state);
+ }, state.reconnectInterval);
+ }
+ }
+
+ //
+ // Attempt to connect
+ state.pool.once('connect', function() {
+ // Reset retries
+ state.currentReconnectRetry = state.reconnectTries;
+
+ // Remove any non used handlers
+ var events = ['error', 'close', 'timeout', 'parseError'];
+ events.forEach(function(e) {
+ state.pool.removeAllListeners(e);
+ });
+
+ // Set connected state
+ state.state = CONNECTED;
+
+ // Add proper handlers
+ state.pool.once('error', reconnectErrorHandler);
+ state.pool.once('close', closeHandler(self, state));
+ state.pool.once('timeout', timeoutHandler(self, state));
+ state.pool.once('parseError', fatalErrorHandler(self, state));
+
+ // We need to ensure we have re-authenticated
+ var keys = Object.keys(state.authProviders);
+ if(keys.length == 0) return self.emit('reconnect', self);
+
+ // Execute all providers
+ var count = keys.length;
+ // Iterate over keys
+ for(var i = 0; i < keys.length; i++) {
+ state.authProviders[keys[i]].reauthenticate(self, state.pool, function(err, r) {
+ count = count - 1;
+ // We are done, emit reconnect event
+ if(count == 0) {
+ return self.emit('reconnect', self);
+ }
+ });
+ }
+ });
+
+ //
+ // Handle connection failure
+ state.pool.once('error', errorHandler(self, state));
+ state.pool.once('close', errorHandler(self, state));
+ state.pool.once('timeout', errorHandler(self, state));
+ state.pool.once('parseError', errorHandler(self, state));
+
+ // Connect pool
+ state.pool.connect();
+}
+
+//
+// Handlers
+var messageHandler = function(self, state) {
+ return function(response, connection) {
+ try {
+ // Parse the message
+ response.parse({raw: state.callbacks.raw(response.responseTo), documentsReturnedIn: state.callbacks.documentsReturnedIn(response.responseTo)});
+ if(state.logger.isDebug()) state.logger.debug(f('message [%s] received from %s', response.raw.toString('hex'), self.name));
+ state.callbacks.emit(response.responseTo, null, response);
+ } catch (err) {
+ state.callbacks.flush(new MongoError(err));
+ self.destroy();
+ }
+ }
+}
+
+var errorHandler = function(self, state) {
+ return function(err, connection) {
+ if(state.state == DISCONNECTED || state.state == DESTROYED) return;
+ // Set disconnected state
+ state.state = DISCONNECTED;
+ if(state.readPreferenceStrategies != null) notifyStrategies(self, self.s, 'error', [self]);
+ if(state.logger.isInfo()) state.logger.info(f('server %s errored out with %s', self.name, JSON.stringify(err)));
+ // Flush out all the callbacks
+ if(state.callbacks) state.callbacks.flush(new MongoError(f("server %s received an error %s", self.name, JSON.stringify(err))));
+ // Destroy all connections
+ self.destroy();
+ // Emit error event
+ if(state.emitError && self.listeners('error').length > 0) self.emit('error', err, self);
+ // If we specified the driver to reconnect perform it
+ if(state.reconnect) setTimeout(function() {
+ // state.currentReconnectRetry = state.reconnectTries,
+ reconnectServer(self, state)
+ }, state.reconnectInterval);
+ }
+}
+
+var fatalErrorHandler = function(self, state) {
+ return function(err, connection) {
+ if(state.state == DISCONNECTED || state.state == DESTROYED) return;
+ // Set disconnected state
+ state.state = DISCONNECTED;
+
+ if(state.readPreferenceStrategies != null) notifyStrategies(self, self.s, 'error', [self]);
+ if(state.logger.isInfo()) state.logger.info(f('server %s errored out with %s', self.name, JSON.stringify(err)));
+ // Flush out all the callbacks
+ if(state.callbacks) state.callbacks.flush(new MongoError(f("server %s received an error %s", self.name, JSON.stringify(err))));
+ // Emit error event
+ if(self.listeners('error').length > 0) self.emit('error', err, self);
+ // If we specified the driver to reconnect perform it
+ if(state.reconnect) setTimeout(function() {
+ // state.currentReconnectRetry = state.reconnectTries,
+ reconnectServer(self, state)
+ }, state.reconnectInterval);
+ // Destroy all connections
+ self.destroy();
+ }
+}
+
+var timeoutHandler = function(self, state) {
+ return function(err, connection) {
+ if(state.state == DISCONNECTED || state.state == DESTROYED) return;
+ // Set disconnected state
+ state.state = DISCONNECTED;
+
+ if(state.readPreferenceStrategies != null) notifyStrategies(self, self.s, 'timeout', [self]);
+ if(state.logger.isInfo()) state.logger.info(f('server %s timed out', self.name));
+ // Flush out all the callbacks
+ if(state.callbacks) state.callbacks.flush(new MongoError(f("server %s timed out", self.name)));
+ // Emit error event
+ self.emit('timeout', err, self);
+ // If we specified the driver to reconnect perform it
+ if(state.reconnect) setTimeout(function() {
+ // state.currentReconnectRetry = state.reconnectTries,
+ reconnectServer(self, state)
+ }, state.reconnectInterval);
+ // Destroy all connections
+ self.destroy();
+ }
+}
+
+var closeHandler = function(self, state) {
+ return function(err, connection) {
+ if(state.state == DISCONNECTED || state.state == DESTROYED) return;
+ // Set disconnected state
+ state.state = DISCONNECTED;
+
+ if(state.readPreferenceStrategies != null) notifyStrategies(self, self.s, 'close', [self]);
+ if(state.logger.isInfo()) state.logger.info(f('server %s closed', self.name));
+ // Flush out all the callbacks
+ if(state.callbacks) state.callbacks.flush(new MongoError(f("server %s sockets closed", self.name)));
+ // Emit error event
+ self.emit('close', err, self);
+ // If we specified the driver to reconnect perform it
+ if(state.reconnect) setTimeout(function() {
+ // state.currentReconnectRetry = state.reconnectTries,
+ reconnectServer(self, state)
+ }, state.reconnectInterval);
+ // Destroy all connections
+ self.destroy();
+ }
+}
+
+var connectHandler = function(self, state) {
+ // Apply all stored authentications
+ var applyAuthentications = function(callback) {
+ // We need to ensure we have re-authenticated
+ var keys = Object.keys(state.authProviders);
+ if(keys.length == 0) return callback(null, null);
+
+ // Execute all providers
+ var count = keys.length;
+ // Iterate over keys
+ for(var i = 0; i < keys.length; i++) {
+ state.authProviders[keys[i]].reauthenticate(self, state.pool, function(err, r) {
+ count = count - 1;
+ // We are done, emit reconnect event
+ if(count == 0) {
+ return callback(null, null);
+ }
+ });
+ }
+ }
+
+ return function(connection) {
+ // Apply any applyAuthentications
+ applyAuthentications(function() {
+
+ // Execute an ismaster
+ self.command('system.$cmd', {ismaster:true}, function(err, r) {
+ if(err) {
+ state.state = DISCONNECTED;
+ return self.emit('close', err, self);
+ }
+
+ // Set the current ismaster
+ if(!err) {
+ state.ismaster = r.result;
+ }
+
+ // Emit the ismaster
+ self.emit('ismaster', r.result, self);
+
+ // Determine the wire protocol handler
+ state.wireProtocolHandler = createWireProtocolHandler(state.ismaster);
+
+ // Set the wireProtocolHandler
+ state.options.wireProtocolHandler = state.wireProtocolHandler;
+
+ // Log the ismaster if available
+ if(state.logger.isInfo()) state.logger.info(f('server %s connected with ismaster [%s]', self.name, JSON.stringify(r.result)));
+
+ // Validate if we it's a server we can connect to
+ if(!supportsServer(state) && state.wireProtocolHandler == null) {
+ state.state = DISCONNECTED
+ return self.emit('error', new MongoError("non supported server version"), self);
+ }
+
+ // Set the details
+ if(state.ismaster && state.ismaster.me) state.serverDetails.name = state.ismaster.me;
+
+ // No read preference strategies just emit connect
+ if(state.readPreferenceStrategies == null) {
+ state.state = CONNECTED;
+ return self.emit('connect', self);
+ }
+
+ // Signal connect to all readPreferences
+ notifyStrategies(self, self.s, 'connect', [self], function(err, result) {
+ state.state = CONNECTED;
+ return self.emit('connect', self);
+ });
+ });
+ });
+ }
+}
+
+var slaveOk = function(r) {
+ if(r) return r.slaveOk()
+ return false;
+}
+
+//
+// Execute readPreference Strategies
+var notifyStrategies = function(self, state, op, params, callback) {
+ if(typeof callback != 'function') {
+ // Notify query start to any read Preference strategies
+ for(var name in state.readPreferenceStrategies) {
+ if(state.readPreferenceStrategies[name][op]) {
+ var strat = state.readPreferenceStrategies[name];
+ strat[op].apply(strat, params);
+ }
+ }
+ // Finish up
+ return;
+ }
+
+ // Execute the async callbacks
+ var nPreferences = Object.keys(state.readPreferenceStrategies).length;
+ if(nPreferences == 0) return callback(null, null);
+ for(var name in state.readPreferenceStrategies) {
+ if(state.readPreferenceStrategies[name][op]) {
+ var strat = state.readPreferenceStrategies[name];
+ // Add a callback to params
+ var cParams = params.slice(0);
+ cParams.push(function(err, r) {
+ nPreferences = nPreferences - 1;
+ if(nPreferences == 0) {
+ callback(null, null);
+ }
+ })
+ // Execute the readPreference
+ strat[op].apply(strat, cParams);
+ }
+ }
+}
+
+var debugFields = ['reconnect', 'reconnectTries', 'reconnectInterval', 'emitError', 'cursorFactory', 'host'
+ , 'port', 'size', 'keepAlive', 'keepAliveInitialDelay', 'noDelay', 'connectionTimeout'
+ , 'socketTimeout', 'singleBufferSerializtion', 'ssl', 'ca', 'cert', 'key', 'rejectUnauthorized', 'promoteLongs'];
+
+/**
+ * Creates a new Server instance
+ * @class
+ * @param {boolean} [options.reconnect=true] Server will attempt to reconnect on loss of connection
+ * @param {number} [options.reconnectTries=30] Server attempt to reconnect #times
+ * @param {number} [options.reconnectInterval=1000] Server will wait # milliseconds between retries
+ * @param {boolean} [options.emitError=false] Server will emit errors events
+ * @param {Cursor} [options.cursorFactory=Cursor] The cursor factory class used for all query cursors
+ * @param {string} options.host The server host
+ * @param {number} options.port The server port
+ * @param {number} [options.size=5] Server connection pool size
+ * @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled
+ * @param {number} [options.keepAliveInitialDelay=0] Initial delay before TCP keep alive enabled
+ * @param {boolean} [options.noDelay=true] TCP Connection no delay
+ * @param {number} [options.connectionTimeout=0] TCP Connection timeout setting
+ * @param {number} [options.socketTimeout=0] TCP Socket timeout setting
+ * @param {boolean} [options.ssl=false] Use SSL for connection
+ * @param {Buffer} [options.ca] SSL Certificate store binary buffer
+ * @param {Buffer} [options.cert] SSL Certificate binary buffer
+ * @param {Buffer} [options.key] SSL Key file binary buffer
+ * @param {string} [options.passphrase] SSL Certificate pass phrase
+ * @param {boolean} [options.rejectUnauthorized=true] Reject unauthorized server certificates
+ * @param {boolean} [options.promoteLongs=true] Convert Long values from the db into Numbers if they fit into 53 bits
+ * @return {Server} A cursor instance
+ * @fires Server#connect
+ * @fires Server#close
+ * @fires Server#error
+ * @fires Server#timeout
+ * @fires Server#parseError
+ * @fires Server#reconnect
+ */
+var Server = function(options) {
+ var self = this;
+
+ // Add event listener
+ EventEmitter.call(this);
+
+ // BSON Parser, ensure we have a single instance
+ if(bsonInstance == null) {
+ bsonInstance = new BSON(bsonTypes);
+ }
+
+ // Reconnect retries
+ var reconnectTries = options.reconnectTries || 30;
+
+ // Keeps all the internal state of the server
+ this.s = {
+ // Options
+ options: options
+ // Contains all the callbacks
+ , callbacks: new Callbacks()
+ // Logger
+ , logger: Logger('Server', options)
+ // Server state
+ , state: DISCONNECTED
+ // Reconnect option
+ , reconnect: typeof options.reconnect == 'boolean' ? options.reconnect : true
+ , reconnectTries: reconnectTries
+ , reconnectInterval: options.reconnectInterval || 1000
+ // Swallow or emit errors
+ , emitError: typeof options.emitError == 'boolean' ? options.emitError : false
+ // Current state
+ , currentReconnectRetry: reconnectTries
+ // Contains the ismaster
+ , ismaster: null
+ // Contains any alternate strategies for picking
+ , readPreferenceStrategies: options.readPreferenceStrategies
+ // Auth providers
+ , authProviders: options.authProviders || {}
+ // Server instance id
+ , id: serverId++
+ // Grouping tag used for debugging purposes
+ , tag: options.tag
+ // Do we have a not connected handler
+ , disconnectHandler: options.disconnectHandler
+ // wireProtocolHandler methods
+ , wireProtocolHandler: options.wireProtocolHandler || new PreTwoSixWireProtocolSupport()
+ // Factory overrides
+ , Cursor: options.cursorFactory || BasicCursor
+ // BSON Parser, ensure we have a single instance
+ , bsonInstance: bsonInstance
+ // Pick the right bson parser
+ , bson: options.bson ? options.bson : bsonInstance
+ // Internal connection pool
+ , pool: null
+ // Server details
+ , serverDetails: {
+ host: options.host
+ , port: options.port
+ , name: options.port ? f("%s:%s", options.host, options.port) : options.host
+ }
+ }
+
+ // Reference state
+ var s = this.s;
+
+ // Add bson parser to options
+ options.bson = s.bson;
+
+ // Set error properties
+ getProperty(this, 'name', 'name', s.serverDetails, {});
+ getProperty(this, 'bson', 'bson', s.options, {});
+ getProperty(this, 'wireProtocolHandler', 'wireProtocolHandler', s.options, {});
+ getSingleProperty(this, 'id', s.id);
+
+ // Add auth providers
+ this.addAuthProvider('mongocr', new MongoCR());
+ this.addAuthProvider('x509', new X509());
+ this.addAuthProvider('plain', new Plain());
+ this.addAuthProvider('gssapi', new GSSAPI());
+ this.addAuthProvider('sspi', new SSPI());
+ this.addAuthProvider('scram-sha-1', new ScramSHA1());
+}
+
+inherits(Server, EventEmitter);
+
+/**
+ * Execute a command
+ * @method
+ * @param {string} type Type of BSON parser to use (c++ or js)
+ */
+Server.prototype.setBSONParserType = function(type) {
+ var nBSON = null;
+
+ if(type == 'c++') {
+ nBSON = require('bson').native().BSON;
+ } else if(type == 'js') {
+ nBSON = require('bson').pure().BSON;
+ } else {
+ throw new MongoError(f("% parser not supported", type));
+ }
+
+ this.s.options.bson = new nBSON(bsonTypes);
+}
+
+/**
+ * Returns the last known ismaster document for this server
+ * @method
+ * @return {object}
+ */
+Server.prototype.lastIsMaster = function() {
+ return this.s.ismaster;
+}
+
+/**
+ * Initiate server connect
+ * @method
+ */
+Server.prototype.connect = function(_options) {
+ var self = this;
+ // Set server specific settings
+ _options = _options || {}
+ // Set the promotion
+ if(typeof _options.promoteLongs == 'boolean') {
+ self.s.options.promoteLongs = _options.promoteLongs;
+ }
+
+ // Destroy existing pool
+ if(self.s.pool) {
+ self.s.pool.destroy();
+ self.s.pool = null;
+ }
+
+ // Set the state to connection
+ self.s.state = CONNECTING;
+ // Create a new connection pool
+ if(!self.s.pool) {
+ self.s.options.messageHandler = messageHandler(self, self.s);
+ self.s.pool = new Pool(self.s.options);
+ }
+
+ // Add all the event handlers
+ self.s.pool.once('timeout', timeoutHandler(self, self.s));
+ self.s.pool.once('close', closeHandler(self, self.s));
+ self.s.pool.once('error', errorHandler(self, self.s));
+ self.s.pool.once('connect', connectHandler(self, self.s));
+ self.s.pool.once('parseError', fatalErrorHandler(self, self.s));
+
+ // Connect the pool
+ self.s.pool.connect();
+}
+
+/**
+ * Destroy the server connection
+ * @method
+ */
+Server.prototype.destroy = function(emitClose, emitDestroy) {
+ var self = this;
+ if(self.s.logger.isDebug()) self.s.logger.debug(f('destroy called on server %s', self.name));
+ // Emit close
+ if(emitClose && self.listeners('close').length > 0) self.emit('close', self);
+
+ // Emit destroy event
+ if(emitDestroy) self.emit('destroy', self);
+ // Set state as destroyed
+ self.s.state = DESTROYED;
+ // Close the pool
+ self.s.pool.destroy();
+ // Flush out all the callbacks
+ if(self.s.callbacks) self.s.callbacks.flush(new MongoError(f("server %s sockets closed", self.name)));
+}
+
+/**
+ * Figure out if the server is connected
+ * @method
+ * @return {boolean}
+ */
+Server.prototype.isConnected = function() {
+ var self = this;
+ if(self.s.pool) return self.s.pool.isConnected();
+ return false;
+}
+
+/**
+ * Figure out if the server instance was destroyed by calling destroy
+ * @method
+ * @return {boolean}
+ */
+Server.prototype.isDestroyed = function() {
+ return this.s.state == DESTROYED;
+}
+
+var executeSingleOperation = function(self, ns, cmd, queryOptions, options, onAll, callback) {
+ // Create a query instance
+ var query = new Query(self.s.bson, ns, cmd, queryOptions);
+
+ // Set slave OK
+ query.slaveOk = slaveOk(options.readPreference);
+
+ // Notify query start to any read Preference strategies
+ if(self.s.readPreferenceStrategies != null)
+ notifyStrategies(self, self.s, 'startOperation', [self, query, new Date()]);
+
+ // Get a connection (either passed or from the pool)
+ var connection = options.connection || self.s.pool.get();
+
+ // Double check if we have a valid connection
+ if(!connection.isConnected()) {
+ return callback(new MongoError(f("no connection available to server %s", self.name)));
+ }
+
+ // Print cmd and execution connection if in debug mode for logging
+ if(self.s.logger.isDebug()) {
+ var json = connection.toJSON();
+ self.s.logger.debug(f('cmd [%s] about to be executed on connection with id %s at %s:%s', JSON.stringify(cmd), json.id, json.host, json.port));
+ }
+
+ // Execute multiple queries
+ if(onAll) {
+ var connections = self.s.pool.getAll();
+ var total = connections.length;
+ // We have an error
+ var error = null;
+ // Execute on all connections
+ for(var i = 0; i < connections.length; i++) {
+ try {
+ query.incRequestId();
+ connections[i].write(query.toBin());
+ } catch(err) {
+ total = total - 1;
+ if(total == 0) return callback(MongoError.create(err));
+ }
+
+ // Register the callback
+ self.s.callbacks.register(query.requestId, function(err, result) {
+ if(err) error = err;
+ total = total - 1;
+
+ // Done
+ if(total == 0) {
+ // Notify end of command
+ notifyStrategies(self, self.s, 'endOperation', [self, error, result, new Date()]);
+ if(error) return callback(MongoError.create(error));
+ // Execute callback, catch and rethrow if needed
+ try { callback(null, new CommandResult(result.documents[0], connections)); }
+ catch(err) { process.nextTick(function() { throw err}); }
+ }
+ });
+ }
+
+ return;
+ }
+
+ // Execute a single command query
+ try {
+ connection.write(query.toBin());
+ } catch(err) {
+ return callback(MongoError.create(err));
+ }
+
+ // Register the callback
+ self.s.callbacks.register(query.requestId, function(err, result) {
+ // Notify end of command
+ notifyStrategies(self, self.s, 'endOperation', [self, err, result, new Date()]);
+ if(err) return callback(err);
+ if(result.documents[0]['$err']
+ || result.documents[0]['errmsg']
+ || result.documents[0]['err']
+ || result.documents[0]['code']) return callback(MongoError.create(result.documents[0]));
+ // Execute callback, catch and rethrow if needed
+ try { callback(null, new CommandResult(result.documents[0], connection)); }
+ catch(err) { process.nextTick(function() { throw err}); }
+ });
+}
+
+/**
+ * Execute a command
+ * @method
+ * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
+ * @param {object} cmd The command hash
+ * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
+ * @param {Connection} [options.connection] Specify connection object to execute command against
+ * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
+ * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
+ * @param {opResultCallback} callback A callback function
+ */
+Server.prototype.command = function(ns, cmd, options, callback) {
+ if(typeof options == 'function') callback = options, options = {};
+ var self = this;
+ if(this.s.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
+ // Ensure we have no options
+ options = options || {};
+ // Do we have a read Preference it need to be of type ReadPreference
+ if(options.readPreference && !(options.readPreference instanceof ReadPreference)) {
+ throw new Error("readPreference must be an instance of ReadPreference");
+ }
+
+ // Debug log
+ if(self.s.logger.isDebug()) self.s.logger.debug(f('executing command [%s] against %s', JSON.stringify({
+ ns: ns, cmd: cmd, options: debugOptions(debugFields, options)
+ }), self.name));
+
+ // Topology is not connected, save the call in the provided store to be
+ // Executed at some point when the handler deems it's reconnected
+ if(!self.isConnected() && self.s.disconnectHandler != null) {
+ callback = bindToCurrentDomain(callback);
+ return self.s.disconnectHandler.add('command', ns, cmd, options, callback);
+ }
+
+ // If we have no connection error
+ if(!self.s.pool.isConnected()) return callback(new MongoError(f("no connection available to server %s", self.name)));
+
+ // Execute on all connections
+ var onAll = typeof options.onAll == 'boolean' ? options.onAll : false;
+
+ // Check keys
+ var checkKeys = typeof options.checkKeys == 'boolean' ? options.checkKeys: false;
+
+ // Serialize function
+ var serializeFunctions = typeof options.serializeFunctions == 'boolean' ? options.serializeFunctions : false;
+
+ // Ignore undefined values
+ var ignoreUndefined = typeof options.ignoreUndefined == 'boolean' ? options.ignoreUndefined : false;
+
+ // Query options
+ var queryOptions = {
+ numberToSkip: 0, numberToReturn: -1, checkKeys: checkKeys
+ };
+
+ // Set up the serialize functions and ignore undefined
+ if(serializeFunctions) queryOptions.serializeFunctions = serializeFunctions;
+ if(ignoreUndefined) queryOptions.ignoreUndefined = ignoreUndefined;
+
+ // Single operation execution
+ if(!Array.isArray(cmd)) {
+ return executeSingleOperation(self, ns, cmd, queryOptions, options, onAll, callback);
+ }
+
+ // Build commands for each of the instances
+ var queries = new Array(cmd.length);
+ for(var i = 0; i < cmd.length; i++) {
+ queries[i] = new Query(self.s.bson, ns, cmd[i], queryOptions);
+ queries[i].slaveOk = slaveOk(options.readPreference);
+ }
+
+ // Notify query start to any read Preference strategies
+ if(self.s.readPreferenceStrategies != null)
+ notifyStrategies(self, self.s, 'startOperation', [self, queries, new Date()]);
+
+ // Get a connection (either passed or from the pool)
+ var connection = options.connection || self.s.pool.get();
+
+ // Double check if we have a valid connection
+ if(!connection.isConnected()) {
+ return callback(new MongoError(f("no connection available to server %s", self.name)));
+ }
+
+ // Print cmd and execution connection if in debug mode for logging
+ if(self.s.logger.isDebug()) {
+ var json = connection.toJSON();
+ self.s.logger.debug(f('cmd [%s] about to be executed on connection with id %s at %s:%s', JSON.stringify(queries), json.id, json.host, json.port));
+ }
+
+ // Canceled operations
+ var canceled = false;
+ // Number of operations left
+ var operationsLeft = queries.length;
+ // Results
+ var results = [];
+
+ // We need to nest the callbacks
+ for(var i = 0; i < queries.length; i++) {
+ // Get the query object
+ var query = queries[i];
+
+ // Execute a single command query
+ try {
+ connection.write(query.toBin());
+ } catch(err) {
+ return callback(MongoError.create(err));
+ }
+
+ // Register the callback
+ self.s.callbacks.register(query.requestId, function(err, result) {
+ // If it's canceled ignore the operation
+ if(canceled) return;
+ // Update the current index
+ operationsLeft = operationsLeft - 1;
+
+ // If we have an error cancel the operation
+ if(err) {
+ canceled = true;
+ return callback(err);
+ }
+
+ // Return the result
+ if(result.documents[0]['$err']
+ || result.documents[0]['errmsg']
+ || result.documents[0]['err']
+ || result.documents[0]['code']) {
+
+ // Set to canceled
+ canceled = true;
+ // Return the error
+ return callback(MongoError.create(result.documents[0]));
+ }
+
+ // Push results
+ results.push(result.documents[0]);
+
+ // We are done, return the result
+ if(operationsLeft == 0) {
+ // Notify end of command
+ notifyStrategies(self, self.s, 'endOperation', [self, err, result, new Date()]);
+
+ // Turn into command results
+ var commandResults = new Array(results.length);
+ for(var i = 0; i < results.length; i++) {
+ commandResults[i] = new CommandResult(results[i], connection);
+ }
+
+ // Execute callback, catch and rethrow if needed
+ try { callback(null, commandResults); }
+ catch(err) { process.nextTick(function() { throw err}); }
+ }
+ });
+ }
+}
+
+/**
+ * Insert one or more documents
+ * @method
+ * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
+ * @param {array} ops An array of documents to insert
+ * @param {boolean} [options.ordered=true] Execute in order or out of order
+ * @param {object} [options.writeConcern={}] Write concern for the operation
+ * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
+ * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
+ * @param {opResultCallback} callback A callback function
+ */
+Server.prototype.insert = function(ns, ops, options, callback) {
+ if(typeof options == 'function') callback = options, options = {};
+ var self = this;
+ if(this.s.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
+ // Topology is not connected, save the call in the provided store to be
+ // Executed at some point when the handler deems it's reconnected
+ if(!self.isConnected() && self.s.disconnectHandler != null) {
+ callback = bindToCurrentDomain(callback);
+ return self.s.disconnectHandler.add('insert', ns, ops, options, callback);
+ }
+
+ // Setup the docs as an array
+ ops = Array.isArray(ops) ? ops : [ops];
+ // Execute write
+ return self.s.wireProtocolHandler.insert(self, self.s.ismaster, ns, self.s.bson, self.s.pool, self.s.callbacks, ops, options, callback);
+}
+
+/**
+ * Perform one or more update operations
+ * @method
+ * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
+ * @param {array} ops An array of updates
+ * @param {boolean} [options.ordered=true] Execute in order or out of order
+ * @param {object} [options.writeConcern={}] Write concern for the operation
+ * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
+ * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
+ * @param {opResultCallback} callback A callback function
+ */
+Server.prototype.update = function(ns, ops, options, callback) {
+ if(typeof options == 'function') callback = options, options = {};
+ var self = this;
+ if(this.s.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
+ // Topology is not connected, save the call in the provided store to be
+ // Executed at some point when the handler deems it's reconnected
+ if(!self.isConnected() && self.s.disconnectHandler != null) {
+ callback = bindToCurrentDomain(callback);
+ return self.s.disconnectHandler.add('update', ns, ops, options, callback);
+ }
+
+ // Setup the docs as an array
+ ops = Array.isArray(ops) ? ops : [ops];
+ // Execute write
+ return self.s.wireProtocolHandler.update(self, self.s.ismaster, ns, self.s.bson, self.s.pool, self.s.callbacks, ops, options, callback);
+}
+
+/**
+ * Perform one or more remove operations
+ * @method
+ * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
+ * @param {array} ops An array of removes
+ * @param {boolean} [options.ordered=true] Execute in order or out of order
+ * @param {object} [options.writeConcern={}] Write concern for the operation
+ * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
+ * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
+ * @param {opResultCallback} callback A callback function
+ */
+Server.prototype.remove = function(ns, ops, options, callback) {
+ if(typeof options == 'function') callback = options, options = {};
+ var self = this;
+ if(this.s.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
+ // Topology is not connected, save the call in the provided store to be
+ // Executed at some point when the handler deems it's reconnected
+ if(!self.isConnected() && self.s.disconnectHandler != null) {
+ callback = bindToCurrentDomain(callback);
+ return self.s.disconnectHandler.add('remove', ns, ops, options, callback);
+ }
+
+ // Setup the docs as an array
+ ops = Array.isArray(ops) ? ops : [ops];
+ // Execute write
+ return self.s.wireProtocolHandler.remove(self, self.s.ismaster, ns, self.s.bson, self.s.pool, self.s.callbacks, ops, options, callback);
+}
+
+/**
+ * Authenticate using a specified mechanism
+ * @method
+ * @param {string} mechanism The Auth mechanism we are invoking
+ * @param {string} db The db we are invoking the mechanism against
+ * @param {...object} param Parameters for the specific mechanism
+ * @param {authResultCallback} callback A callback function
+ */
+Server.prototype.auth = function(mechanism, db) {
+ var self = this;
+ var args = Array.prototype.slice.call(arguments, 2);
+ var callback = args.pop();
+
+ // If we don't have the mechanism fail
+ if(self.s.authProviders[mechanism] == null && mechanism != 'default')
+ throw new MongoError(f("auth provider %s does not exist", mechanism));
+
+ // If we have the default mechanism we pick mechanism based on the wire
+ // protocol max version. If it's >= 3 then scram-sha1 otherwise mongodb-cr
+ if(mechanism == 'default' && self.s.ismaster && self.s.ismaster.maxWireVersion >= 3) {
+ mechanism = 'scram-sha-1';
+ } else if(mechanism == 'default') {
+ mechanism = 'mongocr';
+ }
+
+ // Actual arguments
+ var finalArguments = [self, self.s.pool, db].concat(args.slice(0)).concat([function(err, r) {
+ if(err) return callback(err);
+ if(!r) return callback(new MongoError('could not authenticate'));
+ callback(null, new Session({}, self));
+ }]);
+
+ // Let's invoke the auth mechanism
+ self.s.authProviders[mechanism].auth.apply(self.s.authProviders[mechanism], finalArguments);
+}
+
+//
+// Plugin methods
+//
+
+/**
+ * Add custom read preference strategy
+ * @method
+ * @param {string} name Name of the read preference strategy
+ * @param {object} strategy Strategy object instance
+ */
+Server.prototype.addReadPreferenceStrategy = function(name, strategy) {
+ var self = this;
+ if(self.s.readPreferenceStrategies == null) self.s.readPreferenceStrategies = {};
+ self.s.readPreferenceStrategies[name] = strategy;
+}
+
+/**
+ * Add custom authentication mechanism
+ * @method
+ * @param {string} name Name of the authentication mechanism
+ * @param {object} provider Authentication object instance
+ */
+Server.prototype.addAuthProvider = function(name, provider) {
+ var self = this;
+ self.s.authProviders[name] = provider;
+}
+
+/**
+ * Compare two server instances
+ * @method
+ * @param {Server} server Server to compare equality against
+ * @return {boolean}
+ */
+Server.prototype.equals = function(server) {
+ if(typeof server == 'string') return server == this.name;
+ return server.name == this.name;
+}
+
+/**
+ * All raw connections
+ * @method
+ * @return {Connection[]}
+ */
+Server.prototype.connections = function() {
+ return this.s.pool.getAll();
+}
+
+/**
+ * Get server
+ * @method
+ * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
+ * @return {Server}
+ */
+Server.prototype.getServer = function(options) {
+ return this;
+}
+
+/**
+ * Get connection
+ * @method
+ * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
+ * @return {Connection}
+ */
+Server.prototype.getConnection = function(options) {
+ return this.s.pool.get();
+}
+
+/**
+ * Get callbacks object
+ * @method
+ * @return {Callbacks}
+ */
+Server.prototype.getCallbacks = function() {
+ return this.s.callbacks;
+}
+
+/**
+ * Name of BSON parser currently used
+ * @method
+ * @return {string}
+ */
+Server.prototype.parserType = function() {
+ var s = this.s;
+ if(s.options.bson.serialize.toString().indexOf('[native code]') != -1)
+ return 'c++';
+ return 'js';
+}
+
+// // Command
+// {
+// find: ns
+// , query: <object>
+// , limit: <n>
+// , fields: <object>
+// , skip: <n>
+// , hint: <string>
+// , explain: <boolean>
+// , snapshot: <boolean>
+// , batchSize: <n>
+// , returnKey: <boolean>
+// , maxScan: <n>
+// , min: <n>
+// , max: <n>
+// , showDiskLoc: <boolean>
+// , comment: <string>
+// , maxTimeMS: <n>
+// , raw: <boolean>
+// , readPreference: <ReadPreference>
+// , tailable: <boolean>
+// , oplogReplay: <boolean>
+// , noCursorTimeout: <boolean>
+// , awaitdata: <boolean>
+// , exhaust: <boolean>
+// , partial: <boolean>
+// }
+
+/**
+ * Perform one or more remove operations
+ * @method
+ * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
+ * @param {{object}|{Long}} cmd Can be either a command returning a cursor or a cursorId
+ * @param {object} [options.batchSize=0] Batchsize for the operation
+ * @param {array} [options.documents=[]] Initial documents list for cursor
+ * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
+ * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
+ * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
+ * @param {opResultCallback} callback A callback function
+ */
+Server.prototype.cursor = function(ns, cmd, cursorOptions) {
+ var s = this.s;
+ cursorOptions = cursorOptions || {};
+ // Set up final cursor type
+ var FinalCursor = cursorOptions.cursorFactory || s.Cursor;
+ // Return the cursor
+ return new FinalCursor(s.bson, ns, cmd, cursorOptions, this, s.options);
+}
+
+/**
+ * A server connect event, used to verify that the connection is up and running
+ *
+ * @event Server#connect
+ * @type {Server}
+ */
+
+/**
+ * The server connection closed, all pool connections closed
+ *
+ * @event Server#close
+ * @type {Server}
+ */
+
+/**
+ * The server connection caused an error, all pool connections closed
+ *
+ * @event Server#error
+ * @type {Server}
+ */
+
+/**
+ * The server connection timed out, all pool connections closed
+ *
+ * @event Server#timeout
+ * @type {Server}
+ */
+
+/**
+ * The driver experienced an invalid message, all pool connections closed
+ *
+ * @event Server#parseError
+ * @type {Server}
+ */
+
+/**
+ * The server reestablished the connection
+ *
+ * @event Server#reconnect
+ * @type {Server}
+ */
+
+/**
+ * This is an insert result callback
+ *
+ * @callback opResultCallback
+ * @param {error} error An error object. Set to null if no error present
+ * @param {CommandResult} command result
+ */
+
+/**
+ * This is an authentication result callback
+ *
+ * @callback authResultCallback
+ * @param {error} error An error object. Set to null if no error present
+ * @param {Session} an authenticated session
+ */
+
+module.exports = Server;
http://git-wip-us.apache.org/repos/asf/couchdb-nmo/blob/753f1767/node_modules/couchbulkimporter/node_modules/mongodb/node_modules/mongodb-core/lib/topologies/session.js
----------------------------------------------------------------------
diff --git a/node_modules/couchbulkimporter/node_modules/mongodb/node_modules/mongodb-core/lib/topologies/session.js b/node_modules/couchbulkimporter/node_modules/mongodb/node_modules/mongodb-core/lib/topologies/session.js
new file mode 100644
index 0000000..032c3c5
--- /dev/null
+++ b/node_modules/couchbulkimporter/node_modules/mongodb/node_modules/mongodb-core/lib/topologies/session.js
@@ -0,0 +1,93 @@
+"use strict";
+
+var inherits = require('util').inherits
+ , f = require('util').format
+ , EventEmitter = require('events').EventEmitter;
+
+/**
+ * Creates a new Authentication Session
+ * @class
+ * @param {object} [options] Options for the session
+ * @param {{Server}|{ReplSet}|{Mongos}} topology The topology instance underpinning the session
+ */
+var Session = function(options, topology) {
+ this.options = options;
+ this.topology = topology;
+
+ // Add event listener
+ EventEmitter.call(this);
+}
+
+inherits(Session, EventEmitter);
+
+/**
+ * Execute a command
+ * @method
+ * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
+ * @param {object} cmd The command hash
+ * @param {object} [options.readPreference] Specify read preference if command supports it
+ * @param {object} [options.connection] Specify connection object to execute command against
+ * @param {opResultCallback} callback A callback function
+ */
+Session.prototype.command = function(ns, cmd, options, callback) {
+ this.topology.command(ns, cmd, options, callback);
+}
+
+/**
+ * Insert one or more documents
+ * @method
+ * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
+ * @param {array} ops An array of documents to insert
+ * @param {boolean} [options.ordered=true] Execute in order or out of order
+ * @param {object} [options.writeConcern={}] Write concern for the operation
+ * @param {opResultCallback} callback A callback function
+ */
+Session.prototype.insert = function(ns, ops, options, callback) {
+ this.topology.insert(ns, ops, options, callback);
+}
+
+/**
+ * Perform one or more update operations
+ * @method
+ * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
+ * @param {array} ops An array of updates
+ * @param {boolean} [options.ordered=true] Execute in order or out of order
+ * @param {object} [options.writeConcern={}] Write concern for the operation
+ * @param {opResultCallback} callback A callback function
+ */
+Session.prototype.update = function(ns, ops, options, callback) {
+ this.topology.update(ns, ops, options, callback);
+}
+
+/**
+ * Perform one or more remove operations
+ * @method
+ * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
+ * @param {array} ops An array of removes
+ * @param {boolean} [options.ordered=true] Execute in order or out of order
+ * @param {object} [options.writeConcern={}] Write concern for the operation
+ * @param {opResultCallback} callback A callback function
+ */
+Session.prototype.remove = function(ns, ops, options, callback) {
+ this.topology.remove(ns, ops, options, callback);
+}
+
+/**
+ * Perform one or more remove operations
+ * @method
+ * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
+ * @param {{object}|{Long}} cmd Can be either a command returning a cursor or a cursorId
+ * @param {object} [options.batchSize=0] Batchsize for the operation
+ * @param {array} [options.documents=[]] Initial documents list for cursor
+ * @param {boolean} [options.tailable=false] Tailable flag set
+ * @param {boolean} [options.oplogReply=false] oplogReply flag set
+ * @param {boolean} [options.awaitdata=false] awaitdata flag set
+ * @param {boolean} [options.exhaust=false] exhaust flag set
+ * @param {boolean} [options.partial=false] partial flag set
+ * @param {opResultCallback} callback A callback function
+ */
+Session.prototype.cursor = function(ns, cmd, options) {
+ return this.topology.cursor(ns, cmd, options);
+}
+
+module.exports = Session;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/couchdb-nmo/blob/753f1767/node_modules/couchbulkimporter/node_modules/mongodb/node_modules/mongodb-core/lib/topologies/strategies/ping.js
----------------------------------------------------------------------
diff --git a/node_modules/couchbulkimporter/node_modules/mongodb/node_modules/mongodb-core/lib/topologies/strategies/ping.js b/node_modules/couchbulkimporter/node_modules/mongodb/node_modules/mongodb-core/lib/topologies/strategies/ping.js
new file mode 100644
index 0000000..3a7b460
--- /dev/null
+++ b/node_modules/couchbulkimporter/node_modules/mongodb/node_modules/mongodb-core/lib/topologies/strategies/ping.js
@@ -0,0 +1,276 @@
+"use strict";
+
+var Logger = require('../../connection/logger')
+ , EventEmitter = require('events').EventEmitter
+ , inherits = require('util').inherits
+ , f = require('util').format;
+
+/**
+ * Creates a new Ping read preference strategy instance
+ * @class
+ * @param {number} [options.pingInterval=5000] Ping interval to check the response time to the different servers
+ * @param {number} [options.acceptableLatency=250] Acceptable latency for selecting a server for reading (in milliseconds)
+ * @return {Ping} A cursor instance
+ */
+var Ping = function(options) {
+ // Add event listener
+ EventEmitter.call(this);
+
+ // Contains the ping state
+ this.s = {
+ // Contains all the ping data
+ pings: {}
+ // Set no options if none provided
+ , options: options || {}
+ // Logger
+ , logger: Logger('Ping', options)
+ // Ping interval
+ , pingInterval: options.pingInterval || 10000
+ , acceptableLatency: options.acceptableLatency || 15
+ // Debug options
+ , debug: typeof options.debug == 'boolean' ? options.debug : false
+ // Index
+ , index: 0
+ // Current ping time
+ , lastPing: null
+
+ }
+
+ // Log the options set
+ if(this.s.logger.isDebug()) this.s.logger.debug(f('ping strategy interval [%s], acceptableLatency [%s]', this.s.pingInterval, this.s.acceptableLatency));
+
+ // If we have enabled debug
+ if(this.s.debug) {
+ // Add access to the read Preference Strategies
+ Object.defineProperty(this, 'data', {
+ enumerable: true, get: function() { return this.s.pings; }
+ });
+ }
+}
+
+inherits(Ping, EventEmitter);
+
+/**
+ * @ignore
+ */
+var filterByTags = function(readPreference, servers) {
+ if(readPreference.tags == null) return servers;
+ var filteredServers = [];
+ var tags = readPreference.tags;
+
+ // Iterate over all the servers
+ for(var i = 0; i < servers.length; i++) {
+ var serverTag = servers[i].lastIsMaster().tags || {};
+ // Did we find the a matching server
+ var found = true;
+ // Check if the server is valid
+ for(var name in tags) {
+ if(serverTag[name] != tags[name]) found = false;
+ }
+
+ // Add to candidate list
+ if(found) filteredServers.push(servers[i]);
+ }
+
+ // Returned filtered servers
+ return filteredServers;
+}
+
+/**
+ * Pick a server
+ * @method
+ * @param {State} set The current replicaset state object
+ * @param {ReadPreference} readPreference The current readPreference object
+ * @param {readPreferenceResultCallback} callback The callback to return the result from the function
+ * @return {object}
+ */
+Ping.prototype.pickServer = function(set, readPreference) {
+ var self = this;
+ // Only get primary and secondaries as seeds
+ var seeds = {};
+ var servers = [];
+ if(set.primary) {
+ servers.push(set.primary);
+ }
+
+ for(var i = 0; i < set.secondaries.length; i++) {
+ servers.push(set.secondaries[i]);
+ }
+
+ // Filter by tags
+ servers = filterByTags(readPreference, servers);
+
+ // Transform the list
+ var serverList = [];
+ // for(var name in seeds) {
+ for(var i = 0; i < servers.length; i++) {
+ serverList.push({name: servers[i].name, time: self.s.pings[servers[i].name] || 0});
+ }
+
+ // Sort by time
+ serverList.sort(function(a, b) {
+ return a.time > b.time;
+ });
+
+ // Locate lowest time (picked servers are lowest time + acceptable Latency margin)
+ var lowest = serverList.length > 0 ? serverList[0].time : 0;
+
+ // Filter by latency
+ serverList = serverList.filter(function(s) {
+ return s.time <= lowest + self.s.acceptableLatency;
+ });
+
+ // No servers, default to primary
+ if(serverList.length == 0 && set.primary) {
+ if(self.s.logger.isInfo()) self.s.logger.info(f('picked primary server [%s]', set.primary.name));
+ return set.primary;
+ } else if(serverList.length == 0) {
+ return null
+ }
+
+ // We picked first server
+ if(self.s.logger.isInfo()) self.s.logger.info(f('picked server [%s] with ping latency [%s]', serverList[0].name, serverList[0].time));
+
+ // Add to the index
+ self.s.index = self.s.index + 1;
+ // Select the index
+ self.s.index = self.s.index % serverList.length;
+ // Return the first server of the sorted and filtered list
+ return set.get(serverList[self.s.index].name);
+}
+
+/**
+ * Start of an operation
+ * @method
+ * @param {Server} server The server the operation is running against
+ * @param {object} query The operation running
+ * @param {Date} date The start time of the operation
+ * @return {object}
+ */
+Ping.prototype.startOperation = function(server, query, date) {
+}
+
+/**
+ * End of an operation
+ * @method
+ * @param {Server} server The server the operation is running against
+ * @param {error} err An error from the operation
+ * @param {object} result The result from the operation
+ * @param {Date} date The start time of the operation
+ * @return {object}
+ */
+Ping.prototype.endOperation = function(server, err, result, date) {
+}
+
+/**
+ * High availability process running
+ * @method
+ * @param {State} set The current replicaset state object
+ * @param {resultCallback} callback The callback to return the result from the function
+ * @return {object}
+ */
+Ping.prototype.ha = function(topology, state, callback) {
+ var self = this;
+ var servers = state.getAll();
+ var count = servers.length;
+
+ // No servers return
+ if(servers.length == 0) return callback(null, null);
+
+ // Return if we have not yet reached the ping interval
+ if(self.s.lastPing != null) {
+ var diff = new Date().getTime() - self.s.lastPing.getTime();
+ if(diff < self.s.pingInterval) return callback(null, null);
+ }
+
+ // Execute operation
+ var operation = function(_server) {
+ var start = new Date();
+ // Execute ping against server
+ _server.command('system.$cmd', {ismaster:1}, function(err, r) {
+ count = count - 1;
+ var time = new Date().getTime() - start.getTime();
+ self.s.pings[_server.name] = time;
+ // Log info for debug
+ if(self.s.logger.isDebug()) self.s.logger.debug(f('ha latency for server [%s] is [%s] ms', _server.name, time));
+ // We are done with all the servers
+ if(count == 0) {
+ // Emit ping event
+ topology.emit('ping', err, r ? r.result : null);
+ // Update the last ping time
+ self.s.lastPing = new Date();
+ // Return
+ callback(null, null);
+ }
+ });
+ }
+
+ // Let's ping all servers
+ while(servers.length > 0) {
+ operation(servers.shift());
+ }
+}
+
+var removeServer = function(self, server) {
+ delete self.s.pings[server.name];
+}
+
+/**
+ * Server connection closed
+ * @method
+ * @param {Server} server The server that closed
+ */
+Ping.prototype.close = function(server) {
+ removeServer(this, server);
+}
+
+/**
+ * Server connection errored out
+ * @method
+ * @param {Server} server The server that errored out
+ */
+Ping.prototype.error = function(server) {
+ removeServer(this, server);
+}
+
+/**
+ * Server connection timeout
+ * @method
+ * @param {Server} server The server that timed out
+ */
+Ping.prototype.timeout = function(server) {
+ removeServer(this, server);
+}
+
+/**
+ * Server connection happened
+ * @method
+ * @param {Server} server The server that connected
+ * @param {resultCallback} callback The callback to return the result from the function
+ */
+Ping.prototype.connect = function(server, callback) {
+ var self = this;
+ // Get the command start date
+ var start = new Date();
+ // Execute ping against server
+ server.command('system.$cmd', {ismaster:1}, function(err, r) {
+ var time = new Date().getTime() - start.getTime();
+ self.s.pings[server.name] = time;
+ // Log info for debug
+ if(self.s.logger.isDebug()) self.s.logger.debug(f('connect latency for server [%s] is [%s] ms', server.name, time));
+ // Set last ping
+ self.s.lastPing = new Date();
+ // Done, return
+ callback(null, null);
+ });
+}
+
+/**
+ * This is a result from a readPreference strategy
+ *
+ * @callback readPreferenceResultCallback
+ * @param {error} error An error object. Set to null if no error present
+ * @param {Server} server The server picked by the strategy
+ */
+
+module.exports = Ping;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/couchdb-nmo/blob/753f1767/node_modules/couchbulkimporter/node_modules/mongodb/node_modules/mongodb-core/lib/wireprotocol/2_4_support.js
----------------------------------------------------------------------
diff --git a/node_modules/couchbulkimporter/node_modules/mongodb/node_modules/mongodb-core/lib/wireprotocol/2_4_support.js b/node_modules/couchbulkimporter/node_modules/mongodb/node_modules/mongodb-core/lib/wireprotocol/2_4_support.js
new file mode 100644
index 0000000..e2e6a67
--- /dev/null
+++ b/node_modules/couchbulkimporter/node_modules/mongodb/node_modules/mongodb-core/lib/wireprotocol/2_4_support.js
@@ -0,0 +1,559 @@
+"use strict";
+
+var Insert = require('./commands').Insert
+ , Update = require('./commands').Update
+ , Remove = require('./commands').Remove
+ , Query = require('../connection/commands').Query
+ , copy = require('../connection/utils').copy
+ , KillCursor = require('../connection/commands').KillCursor
+ , GetMore = require('../connection/commands').GetMore
+ , Query = require('../connection/commands').Query
+ , ReadPreference = require('../topologies/read_preference')
+ , f = require('util').format
+ , CommandResult = require('../topologies/command_result')
+ , MongoError = require('../error')
+ , Long = require('bson').Long;
+
+// Write concern fields
+var writeConcernFields = ['w', 'wtimeout', 'j', 'fsync'];
+
+var WireProtocol = function() {}
+
+//
+// Needs to support legacy mass insert as well as ordered/unordered legacy
+// emulation
+//
+WireProtocol.prototype.insert = function(topology, ismaster, ns, bson, pool, callbacks, ops, options, callback) {
+ options = options || {};
+ // Default is ordered execution
+ var ordered = typeof options.ordered == 'boolean' ? options.ordered : true;
+ var legacy = typeof options.legacy == 'boolean' ? options.legacy : false;
+ ops = Array.isArray(ops) ? ops :[ops];
+
+ // If we have more than a 1000 ops fails
+ if(ops.length > 1000) return callback(new MongoError("exceeded maximum write batch size of 1000"));
+
+ // Write concern
+ var writeConcern = options.writeConcern || {w:1};
+
+ // We are unordered
+ if(!ordered || writeConcern.w == 0) {
+ return executeUnordered('insert', Insert, ismaster, ns, bson, pool, callbacks, ops, options, callback);
+ }
+
+ return executeOrdered('insert', Insert, ismaster, ns, bson, pool, callbacks, ops, options, callback);
+}
+
+WireProtocol.prototype.update = function(topology, ismaster, ns, bson, pool, callbacks, ops, options, callback) {
+ options = options || {};
+ // Default is ordered execution
+ var ordered = typeof options.ordered == 'boolean' ? options.ordered : true;
+ ops = Array.isArray(ops) ? ops :[ops];
+
+ // Write concern
+ var writeConcern = options.writeConcern || {w:1};
+
+ // We are unordered
+ if(!ordered || writeConcern.w == 0) {
+ return executeUnordered('update', Update, ismaster, ns, bson, pool, callbacks, ops, options, callback);
+ }
+
+ return executeOrdered('update', Update, ismaster, ns, bson, pool, callbacks, ops, options, callback);
+}
+
+WireProtocol.prototype.remove = function(topology, ismaster, ns, bson, pool, callbacks, ops, options, callback) {
+ options = options || {};
+ // Default is ordered execution
+ var ordered = typeof options.ordered == 'boolean' ? options.ordered : true;
+ ops = Array.isArray(ops) ? ops :[ops];
+
+ // Write concern
+ var writeConcern = options.writeConcern || {w:1};
+
+ // We are unordered
+ if(!ordered || writeConcern.w == 0) {
+ return executeUnordered('remove', Remove, ismaster, ns, bson, pool, callbacks, ops, options, callback);
+ }
+
+ return executeOrdered('remove', Remove, ismaster, ns, bson, pool, callbacks, ops, options, callback);
+}
+
+WireProtocol.prototype.killCursor = function(bson, ns, cursorId, connection, callbacks, callback) {
+ // Create a kill cursor command
+ var killCursor = new KillCursor(bson, [cursorId]);
+ // Execute the kill cursor command
+ if(connection && connection.isConnected()) connection.write(killCursor.toBin());
+ // Set cursor to 0
+ cursorId = Long.ZERO;
+ // Return to caller
+ if(callback) callback(null, null);
+}
+
+WireProtocol.prototype.getMore = function(bson, ns, cursorState, batchSize, raw, connection, callbacks, options, callback) {
+ // Create getMore command
+ var getMore = new GetMore(bson, ns, cursorState.cursorId, {numberToReturn: batchSize});
+
+ // Query callback
+ var queryCallback = function(err, r) {
+ if(err) return callback(err);
+
+ // If we have a timed out query or a cursor that was killed
+ if((r.responseFlags & (1 << 0)) != 0) {
+ return callback(new MongoError("cursor killed or timed out"), null);
+ }
+
+ // Ensure we have a Long valie cursor id
+ var cursorId = typeof r.cursorId == 'number'
+ ? Long.fromNumber(r.cursorId)
+ : r.cursorId;
+
+ // Set all the values
+ cursorState.documents = r.documents;
+ cursorState.cursorId = cursorId;
+
+ // Return
+ callback(null);
+ }
+
+ // If we have a raw query decorate the function
+ if(raw) {
+ queryCallback.raw = raw;
+ }
+
+ // Register a callback
+ callbacks.register(getMore.requestId, queryCallback);
+ // Write out the getMore command
+ connection.write(getMore.toBin());
+}
+
+WireProtocol.prototype.command = function(bson, ns, cmd, cursorState, topology, options) {
+ // Establish type of command
+ if(cmd.find) {
+ return setupClassicFind(bson, ns, cmd, cursorState, topology, options)
+ } else if(cursorState.cursorId != null) {
+ } else if(cmd) {
+ return setupCommand(bson, ns, cmd, cursorState, topology, options);
+ } else {
+ throw new MongoError(f("command %s does not return a cursor", JSON.stringify(cmd)));
+ }
+}
+
+//
+// Execute a find command
+var setupClassicFind = function(bson, ns, cmd, cursorState, topology, options) {
+ var readPreference = options.readPreference || new ReadPreference('primary');
+ if(typeof readPreference == 'string') readPreference = new ReadPreference(readPreference);
+ if(!(readPreference instanceof ReadPreference)) throw new MongoError('readPreference must be a ReadPreference instance');
+
+ // Ensure we have at least some options
+ options = options || {};
+ // Set the optional batchSize
+ cursorState.batchSize = cmd.batchSize || cursorState.batchSize;
+ var numberToReturn = 0;
+
+ // Unpack the limit and batchSize values
+ if(cursorState.limit == 0) {
+ numberToReturn = cursorState.batchSize;
+ } else if(cursorState.limit < 0 || cursorState.limit < cursorState.batchSize || (cursorState.limit > 0 && cursorState.batchSize == 0)) {
+ numberToReturn = cursorState.limit;
+ } else {
+ numberToReturn = cursorState.batchSize;
+ }
+
+ var numberToSkip = cursorState.skip || 0;
+ // Build actual find command
+ var findCmd = {};
+ // Using special modifier
+ var usesSpecialModifier = false;
+
+ // We have a Mongos topology, check if we need to add a readPreference
+ if(topology.type == 'mongos' && readPreference) {
+ findCmd['$readPreference'] = readPreference.toJSON();
+ usesSpecialModifier = true;
+ }
+
+ // Add special modifiers to the query
+ if(cmd.sort) findCmd['orderby'] = cmd.sort, usesSpecialModifier = true;
+ if(cmd.hint) findCmd['$hint'] = cmd.hint, usesSpecialModifier = true;
+ if(cmd.snapshot) findCmd['$snapshot'] = cmd.snapshot, usesSpecialModifier = true;
+ if(cmd.returnKey) findCmd['$returnKey'] = cmd.returnKey, usesSpecialModifier = true;
+ if(cmd.maxScan) findCmd['$maxScan'] = cmd.maxScan, usesSpecialModifier = true;
+ if(cmd.min) findCmd['$min'] = cmd.min, usesSpecialModifier = true;
+ if(cmd.max) findCmd['$max'] = cmd.max, usesSpecialModifier = true;
+ if(cmd.showDiskLoc) findCmd['$showDiskLoc'] = cmd.showDiskLoc, usesSpecialModifier = true;
+ if(cmd.comment) findCmd['$comment'] = cmd.comment, usesSpecialModifier = true;
+ if(cmd.maxTimeMS) findCmd['$maxTimeMS'] = cmd.maxTimeMS, usesSpecialModifier = true;
+
+ // If we have explain, return a single document and close cursor
+ if(cmd.explain) {
+ numberToReturn = -1;
+ usesSpecialModifier = true;
+ findCmd['$explain'] = true;
+ }
+
+ // If we have a special modifier
+ if(usesSpecialModifier) {
+ findCmd['$query'] = cmd.query;
+ } else {
+ findCmd = cmd.query;
+ }
+
+ // Throw on majority readConcern passed in
+ if(cmd.readConcern && cmd.readConcern.level != 'local') {
+ throw new MongoError(f('server find command does not support a readConcern level of %s', cmd.readConcern.level));
+ }
+
+ // Remove readConcern, ensure no failing commands
+ if(cmd.readConcern) {
+ cmd = copy(cmd);
+ delete cmd['readConcern'];
+ }
+
+ // Set up the serialize and ignoreUndefined fields
+ var serializeFunctions = typeof options.serializeFunctions == 'boolean'
+ ? options.serializeFunctions : false;
+ var ignoreUndefined = typeof options.ignoreUndefined == 'boolean'
+ ? options.ignoreUndefined : false;
+
+ // Build Query object
+ var query = new Query(bson, ns, findCmd, {
+ numberToSkip: numberToSkip, numberToReturn: numberToReturn
+ , checkKeys: false, returnFieldSelector: cmd.fields
+ , serializeFunctions: serializeFunctions, ignoreUndefined: ignoreUndefined
+ });
+
+ // Set query flags
+ query.slaveOk = readPreference.slaveOk();
+
+ // Set up the option bits for wire protocol
+ if(typeof cmd.tailable == 'boolean') query.tailable = cmd.tailable;
+ if(typeof cmd.oplogReplay == 'boolean') query.oplogReplay = cmd.oplogReplay;
+ if(typeof cmd.noCursorTimeout == 'boolean') query.noCursorTimeout = cmd.noCursorTimeout;
+ if(typeof cmd.awaitData == 'boolean') query.awaitData = cmd.awaitData;
+ if(typeof cmd.exhaust == 'boolean') query.exhaust = cmd.exhaust;
+ if(typeof cmd.partial == 'boolean') query.partial = cmd.partial;
+ // Return the query
+ return query;
+}
+
+//
+// Set up a command cursor
+var setupCommand = function(bson, ns, cmd, cursorState, topology, options) {
+ var readPreference = options.readPreference || new ReadPreference('primary');
+ if(typeof readPreference == 'string') readPreference = new ReadPreference(readPreference);
+ if(!(readPreference instanceof ReadPreference)) throw new MongoError('readPreference must be a ReadPreference instance');
+
+ // Set empty options object
+ options = options || {}
+
+ // Final query
+ var finalCmd = {};
+ for(var name in cmd) {
+ finalCmd[name] = cmd[name];
+ }
+
+ // Build command namespace
+ var parts = ns.split(/\./);
+
+ // We have a Mongos topology, check if we need to add a readPreference
+ if(topology.type == 'mongos' && readPreference) {
+ finalCmd['$readPreference'] = readPreference.toJSON();
+ }
+
+ // Throw on majority readConcern passed in
+ if(cmd.readConcern && cmd.readConcern.level != 'local') {
+ throw new MongoError(f('server %s command does not support a readConcern level of %s', JSON.stringify(cmd), cmd.readConcern.level));
+ }
+
+ // Remove readConcern, ensure no failing commands
+ if(cmd.readConcern) delete cmd['readConcern'];
+
+ // Serialize functions
+ var serializeFunctions = typeof options.serializeFunctions == 'boolean'
+ ? options.serializeFunctions : false;
+
+ // Set up the serialize and ignoreUndefined fields
+ var ignoreUndefined = typeof options.ignoreUndefined == 'boolean'
+ ? options.ignoreUndefined : false;
+
+ // Build Query object
+ var query = new Query(bson, f('%s.$cmd', parts.shift()), finalCmd, {
+ numberToSkip: 0, numberToReturn: -1
+ , checkKeys: false, serializeFunctions: serializeFunctions
+ , ignoreUndefined: ignoreUndefined
+ });
+
+ // Set query flags
+ query.slaveOk = readPreference.slaveOk();
+
+ // Return the query
+ return query;
+}
+
+/**
+ * @ignore
+ */
+var bindToCurrentDomain = function(callback) {
+ var domain = process.domain;
+ if(domain == null || callback == null) {
+ return callback;
+ } else {
+ return domain.bind(callback);
+ }
+}
+
+var hasWriteConcern = function(writeConcern) {
+ if(writeConcern.w
+ || writeConcern.wtimeout
+ || writeConcern.j == true
+ || writeConcern.fsync == true
+ || Object.keys(writeConcern).length == 0) {
+ return true;
+ }
+ return false;
+}
+
+var cloneWriteConcern = function(writeConcern) {
+ var wc = {};
+ if(writeConcern.w != null) wc.w = writeConcern.w;
+ if(writeConcern.wtimeout != null) wc.wtimeout = writeConcern.wtimeout;
+ if(writeConcern.j != null) wc.j = writeConcern.j;
+ if(writeConcern.fsync != null) wc.fsync = writeConcern.fsync;
+ return wc;
+}
+
+//
+// Aggregate up all the results
+//
+var aggregateWriteOperationResults = function(opType, ops, results, connection) {
+ var finalResult = { ok: 1, n: 0 }
+
+ // Map all the results coming back
+ for(var i = 0; i < results.length; i++) {
+ var result = results[i];
+ var op = ops[i];
+
+ if((result.upserted || (result.updatedExisting == false)) && finalResult.upserted == null) {
+ finalResult.upserted = [];
+ }
+
+ // Push the upserted document to the list of upserted values
+ if(result.upserted) {
+ finalResult.upserted.push({index: i, _id: result.upserted});
+ }
+
+ // We have an upsert where we passed in a _id
+ if(result.updatedExisting == false && result.n == 1 && result.upserted == null) {
+ finalResult.upserted.push({index: i, _id: op.q._id});
+ }
+
+ // We have an insert command
+ if(result.ok == 1 && opType == 'insert' && result.err == null) {
+ finalResult.n = finalResult.n + 1;
+ }
+
+ // We have a command error
+ if(result != null && result.ok == 0 || result.err || result.errmsg) {
+ if(result.ok == 0) finalResult.ok = 0;
+ finalResult.code = result.code;
+ finalResult.errmsg = result.errmsg || result.err || result.errMsg;
+
+ // Check if we have a write error
+ if(result.code == 11000
+ || result.code == 11001
+ || result.code == 12582
+ || result.code == 16544
+ || result.code == 16538
+ || result.code == 16542
+ || result.code == 14
+ || result.code == 13511) {
+ if(finalResult.writeErrors == null) finalResult.writeErrors = [];
+ finalResult.writeErrors.push({
+ index: i
+ , code: result.code
+ , errmsg: result.errmsg || result.err || result.errMsg
+ });
+ } else {
+ finalResult.writeConcernError = {
+ code: result.code
+ , errmsg: result.errmsg || result.err || result.errMsg
+ }
+ }
+ } else if(typeof result.n == 'number') {
+ finalResult.n += result.n;
+ } else {
+ finalResult.n += 1;
+ }
+
+ // Result as expected
+ if(result != null && result.lastOp) finalResult.lastOp = result.lastOp;
+ }
+
+ // Return finalResult aggregated results
+ return new CommandResult(finalResult, connection);
+}
+
+//
+// Execute all inserts in an ordered manner
+//
+var executeOrdered = function(opType ,command, ismaster, ns, bson, pool, callbacks, ops, options, callback) {
+ var _ops = ops.slice(0);
+ // Bind to current domain
+ callback = bindToCurrentDomain(callback);
+ // Collect all the getLastErrors
+ var getLastErrors = [];
+
+ // Execute an operation
+ var executeOp = function(list, _callback) {
+ // Get a pool connection
+ var connection = pool.get();
+ // No more items in the list
+ if(list.length == 0) return _callback(null, aggregateWriteOperationResults(opType, ops, getLastErrors, connection));
+
+ // Get the first operation
+ var doc = list.shift();
+
+ // Create an insert command
+ var op = new command(Query.getRequestId(), ismaster, bson, ns, [doc], options);
+ // Write concern
+ var optionWriteConcern = options.writeConcern || {w:1};
+ // Final write concern
+ var writeConcern = cloneWriteConcern(optionWriteConcern);
+
+ // Get the db name
+ var db = ns.split('.').shift();
+
+ // Error out if no connection available
+ if(connection == null)
+ return _callback(new MongoError("no connection available"));
+
+ try {
+ // Execute the insert
+ connection.write(op.toBin());
+
+ // If write concern 0 don't fire getLastError
+ if(hasWriteConcern(writeConcern)) {
+ var getLastErrorCmd = {getlasterror: 1};
+ // Merge all the fields
+ for(var i = 0; i < writeConcernFields.length; i++) {
+ if(writeConcern[writeConcernFields[i]] != null)
+ getLastErrorCmd[writeConcernFields[i]] = writeConcern[writeConcernFields[i]];
+ }
+
+ // Create a getLastError command
+ var getLastErrorOp = new Query(bson, f("%s.$cmd", db), getLastErrorCmd, {numberToReturn: -1});
+ // Write the lastError message
+ connection.write(getLastErrorOp.toBin());
+ // Register the callback
+ callbacks.register(getLastErrorOp.requestId, function(err, result) {
+ if(err) return callback(err);
+ // Get the document
+ var doc = result.documents[0];
+ // Save the getLastError document
+ getLastErrors.push(doc);
+ // If we have an error terminate
+ if(doc.ok == 0 || doc.err || doc.errmsg) return callback(null, aggregateWriteOperationResults(opType, ops, getLastErrors, connection));
+ // Execute the next op in the list
+ executeOp(list, callback);
+ });
+ }
+ } catch(err) {
+ if(typeof err == 'string') err = new MongoError(err);
+ // We have a serialization error, rewrite as a write error to have same behavior as modern
+ // write commands
+ getLastErrors.push({ ok: 1, errmsg: err.message, code: 14 });
+ // Return due to an error
+ return callback(null, aggregateWriteOperationResults(opType, ops, getLastErrors, connection));
+ }
+ }
+
+ // Execute the operations
+ executeOp(_ops, callback);
+}
+
+var executeUnordered = function(opType, command, ismaster, ns, bson, pool, callbacks, ops, options, callback) {
+ // Bind to current domain
+ callback = bindToCurrentDomain(callback);
+ // Total operations to write
+ var totalOps = ops.length;
+ // Collect all the getLastErrors
+ var getLastErrors = [];
+ // Write concern
+ var optionWriteConcern = options.writeConcern || {w:1};
+ // Final write concern
+ var writeConcern = cloneWriteConcern(optionWriteConcern);
+ // Driver level error
+ var error;
+
+ // Execute all the operations
+ for(var i = 0; i < ops.length; i++) {
+ // Create an insert command
+ var op = new command(Query.getRequestId(), ismaster, bson, ns, [ops[i]], options);
+ // Get db name
+ var db = ns.split('.').shift();
+
+ // Get a pool connection
+ var connection = pool.get();
+
+ // Error out if no connection available
+ if(connection == null)
+ return _callback(new MongoError("no connection available"));
+
+ try {
+ // Execute the insert
+ connection.write(op.toBin());
+ // If write concern 0 don't fire getLastError
+ if(hasWriteConcern(writeConcern)) {
+ var getLastErrorCmd = {getlasterror: 1};
+ // Merge all the fields
+ for(var j = 0; j < writeConcernFields.length; j++) {
+ if(writeConcern[writeConcernFields[j]] != null)
+ getLastErrorCmd[writeConcernFields[j]] = writeConcern[writeConcernFields[j]];
+ }
+
+ // Create a getLastError command
+ var getLastErrorOp = new Query(bson, f("%s.$cmd", db), getLastErrorCmd, {numberToReturn: -1});
+ // Write the lastError message
+ connection.write(getLastErrorOp.toBin());
+
+ // Give the result from getLastError the right index
+ var callbackOp = function(_index) {
+ return function(err, result) {
+ if(err) error = err;
+ // Update the number of operations executed
+ totalOps = totalOps - 1;
+ // Save the getLastError document
+ if(!err) getLastErrors[_index] = result.documents[0];
+ // Check if we are done
+ if(totalOps == 0) {
+ if(error) return callback(error);
+ callback(null, aggregateWriteOperationResults(opType, ops, getLastErrors, connection));
+ }
+ }
+ }
+
+ // Register the callback
+ callbacks.register(getLastErrorOp.requestId, callbackOp(i));
+ }
+ } catch(err) {
+ if(typeof err == 'string') err = new MongoError(err);
+ // Update the number of operations executed
+ totalOps = totalOps - 1;
+ // We have a serialization error, rewrite as a write error to have same behavior as modern
+ // write commands
+ getLastErrors[i] = { ok: 1, errmsg: err.message, code: 14 };
+ // Check if we are done
+ if(totalOps == 0) {
+ callback(null, aggregateWriteOperationResults(opType, ops, getLastErrors, connection));
+ }
+ }
+ }
+
+ // Empty w:0 return
+ if(writeConcern
+ && writeConcern.w == 0 && callback) {
+ callback(null, null);
+ }
+}
+
+module.exports = WireProtocol;
http://git-wip-us.apache.org/repos/asf/couchdb-nmo/blob/753f1767/node_modules/couchbulkimporter/node_modules/mongodb/node_modules/mongodb-core/lib/wireprotocol/2_6_support.js
----------------------------------------------------------------------
diff --git a/node_modules/couchbulkimporter/node_modules/mongodb/node_modules/mongodb-core/lib/wireprotocol/2_6_support.js b/node_modules/couchbulkimporter/node_modules/mongodb/node_modules/mongodb-core/lib/wireprotocol/2_6_support.js
new file mode 100644
index 0000000..b1d1d46
--- /dev/null
+++ b/node_modules/couchbulkimporter/node_modules/mongodb/node_modules/mongodb-core/lib/wireprotocol/2_6_support.js
@@ -0,0 +1,291 @@
+"use strict";
+
+var Insert = require('./commands').Insert
+ , Update = require('./commands').Update
+ , Remove = require('./commands').Remove
+ , Query = require('../connection/commands').Query
+ , copy = require('../connection/utils').copy
+ , KillCursor = require('../connection/commands').KillCursor
+ , GetMore = require('../connection/commands').GetMore
+ , Query = require('../connection/commands').Query
+ , ReadPreference = require('../topologies/read_preference')
+ , f = require('util').format
+ , CommandResult = require('../topologies/command_result')
+ , MongoError = require('../error')
+ , Long = require('bson').Long;
+
+var WireProtocol = function() {}
+
+//
+// Execute a write operation
+var executeWrite = function(topology, type, opsField, ns, ops, options, callback) {
+ if(ops.length == 0) throw new MongoError("insert must contain at least one document");
+ if(typeof options == 'function') {
+ callback = options;
+ options = {};
+ }
+
+ // Split the ns up to get db and collection
+ var p = ns.split(".");
+ var d = p.shift();
+ // Options
+ var ordered = typeof options.ordered == 'boolean' ? options.ordered : true;
+ var writeConcern = options.writeConcern || {};
+ // return skeleton
+ var writeCommand = {};
+ writeCommand[type] = p.join('.');
+ writeCommand[opsField] = ops;
+ writeCommand.ordered = ordered;
+ writeCommand.writeConcern = writeConcern;
+
+ // Options object
+ var opts = {};
+ if(type == 'insert') opts.checkKeys = true;
+ // Ensure we support serialization of functions
+ if(options.serializeFunctions) opts.serializeFunctions = options.serializeFunctions;
+ if(options.ignoreUndefined) opts.ignoreUndefined = options.ignoreUndefined;
+ // Execute command
+ topology.command(f("%s.$cmd", d), writeCommand, opts, callback);
+}
+
+//
+// Needs to support legacy mass insert as well as ordered/unordered legacy
+// emulation
+//
+WireProtocol.prototype.insert = function(topology, ismaster, ns, bson, pool, callbacks, ops, options, callback) {
+ executeWrite(topology, 'insert', 'documents', ns, ops, options, callback);
+}
+
+WireProtocol.prototype.update = function(topology, ismaster, ns, bson, pool, callbacks, ops, options, callback) {
+ executeWrite(topology, 'update', 'updates', ns, ops, options, callback);
+}
+
+WireProtocol.prototype.remove = function(topology, ismaster, ns, bson, pool, callbacks, ops, options, callback) {
+ executeWrite(topology, 'delete', 'deletes', ns, ops, options, callback);
+}
+
+WireProtocol.prototype.killCursor = function(bson, ns, cursorId, connection, callbacks, callback) {
+ // Create a kill cursor command
+ var killCursor = new KillCursor(bson, [cursorId]);
+ // Execute the kill cursor command
+ if(connection && connection.isConnected()) connection.write(killCursor.toBin());
+ // Set cursor to 0
+ cursorId = Long.ZERO;
+ // Return to caller
+ if(callback) callback(null, null);
+}
+
+WireProtocol.prototype.getMore = function(bson, ns, cursorState, batchSize, raw, connection, callbacks, options, callback) {
+ // Create getMore command
+ var getMore = new GetMore(bson, ns, cursorState.cursorId, {numberToReturn: batchSize});
+
+ // Query callback
+ var queryCallback = function(err, r) {
+ if(err) return callback(err);
+
+ // If we have a timed out query or a cursor that was killed
+ if((r.responseFlags & (1 << 0)) != 0) {
+ return callback(new MongoError("cursor killed or timed out"), null);
+ }
+
+ // Ensure we have a Long valie cursor id
+ var cursorId = typeof r.cursorId == 'number'
+ ? Long.fromNumber(r.cursorId)
+ : r.cursorId;
+
+ // Set all the values
+ cursorState.documents = r.documents;
+ cursorState.cursorId = cursorId;
+
+ // Return
+ callback(null);
+ }
+
+ // If we have a raw query decorate the function
+ if(raw) {
+ queryCallback.raw = raw;
+ }
+
+ // Register a callback
+ callbacks.register(getMore.requestId, queryCallback);
+ // Write out the getMore command
+ connection.write(getMore.toBin());
+}
+
+WireProtocol.prototype.command = function(bson, ns, cmd, cursorState, topology, options) {
+ // Establish type of command
+ if(cmd.find) {
+ return setupClassicFind(bson, ns, cmd, cursorState, topology, options)
+ } else if(cursorState.cursorId != null) {
+ } else if(cmd) {
+ return setupCommand(bson, ns, cmd, cursorState, topology, options);
+ } else {
+ throw new MongoError(f("command %s does not return a cursor", JSON.stringify(cmd)));
+ }
+}
+
+//
+// Execute a find command
+var setupClassicFind = function(bson, ns, cmd, cursorState, topology, options) {
+ var readPreference = options.readPreference || new ReadPreference('primary');
+ if(typeof readPreference == 'string') readPreference = new ReadPreference(readPreference);
+ if(!(readPreference instanceof ReadPreference)) throw new MongoError('readPreference must be a ReadPreference instance');
+
+ // Ensure we have at least some options
+ options = options || {};
+ // Set the optional batchSize
+ cursorState.batchSize = cmd.batchSize || cursorState.batchSize;
+ var numberToReturn = 0;
+
+ // Unpack the limit and batchSize values
+ if(cursorState.limit == 0) {
+ numberToReturn = cursorState.batchSize;
+ } else if(cursorState.limit < 0 || cursorState.limit < cursorState.batchSize || (cursorState.limit > 0 && cursorState.batchSize == 0)) {
+ numberToReturn = cursorState.limit;
+ } else {
+ numberToReturn = cursorState.batchSize;
+ }
+
+ var numberToSkip = cursorState.skip || 0;
+ // Build actual find command
+ var findCmd = {};
+ // Using special modifier
+ var usesSpecialModifier = false;
+
+ // We have a Mongos topology, check if we need to add a readPreference
+ if(topology.type == 'mongos' && readPreference) {
+ findCmd['$readPreference'] = readPreference.toJSON();
+ usesSpecialModifier = true;
+ }
+
+ // Add special modifiers to the query
+ if(cmd.sort) findCmd['orderby'] = cmd.sort, usesSpecialModifier = true;
+ if(cmd.hint) findCmd['$hint'] = cmd.hint, usesSpecialModifier = true;
+ if(cmd.snapshot) findCmd['$snapshot'] = cmd.snapshot, usesSpecialModifier = true;
+ if(cmd.returnKey) findCmd['$returnKey'] = cmd.returnKey, usesSpecialModifier = true;
+ if(cmd.maxScan) findCmd['$maxScan'] = cmd.maxScan, usesSpecialModifier = true;
+ if(cmd.min) findCmd['$min'] = cmd.min, usesSpecialModifier = true;
+ if(cmd.max) findCmd['$max'] = cmd.max, usesSpecialModifier = true;
+ if(cmd.showDiskLoc) findCmd['$showDiskLoc'] = cmd.showDiskLoc, usesSpecialModifier = true;
+ if(cmd.comment) findCmd['$comment'] = cmd.comment, usesSpecialModifier = true;
+ if(cmd.maxTimeMS) findCmd['$maxTimeMS'] = cmd.maxTimeMS, usesSpecialModifier = true;
+
+ // If we have explain, return a single document and close cursor
+ if(cmd.explain) {
+ numberToReturn = -1;
+ usesSpecialModifier = true;
+ findCmd['$explain'] = true;
+ }
+
+ // If we have a special modifier
+ if(usesSpecialModifier) {
+ findCmd['$query'] = cmd.query;
+ } else {
+ findCmd = cmd.query;
+ }
+
+ // Throw on majority readConcern passed in
+ if(cmd.readConcern && cmd.readConcern.level != 'local') {
+ throw new MongoError(f('server find command does not support a readConcern level of %s', cmd.readConcern.level));
+ }
+
+ // Remove readConcern, ensure no failing commands
+ if(cmd.readConcern) {
+ cmd = copy(cmd);
+ delete cmd['readConcern'];
+ }
+
+ // Serialize functions
+ var serializeFunctions = typeof options.serializeFunctions == 'boolean'
+ ? options.serializeFunctions : false;
+ var ignoreUndefined = typeof options.ignoreUndefined == 'boolean'
+ ? options.ignoreUndefined : false;
+
+ // Build Query object
+ var query = new Query(bson, ns, findCmd, {
+ numberToSkip: numberToSkip, numberToReturn: numberToReturn
+ , checkKeys: false, returnFieldSelector: cmd.fields
+ , serializeFunctions: serializeFunctions
+ , ignoreUndefined: ignoreUndefined
+ });
+
+ // Set query flags
+ query.slaveOk = readPreference.slaveOk();
+
+ // Set up the option bits for wire protocol
+ if(typeof cmd.tailable == 'boolean') query.tailable = cmd.tailable;
+ if(typeof cmd.oplogReplay == 'boolean') query.oplogReplay = cmd.oplogReplay;
+ if(typeof cmd.noCursorTimeout == 'boolean') query.noCursorTimeout = cmd.noCursorTimeout;
+ if(typeof cmd.awaitData == 'boolean') query.awaitData = cmd.awaitData;
+ if(typeof cmd.exhaust == 'boolean') query.exhaust = cmd.exhaust;
+ if(typeof cmd.partial == 'boolean') query.partial = cmd.partial;
+ // Return the query
+ return query;
+}
+
+//
+// Set up a command cursor
+var setupCommand = function(bson, ns, cmd, cursorState, topology, options) {
+ var readPreference = options.readPreference || new ReadPreference('primary');
+ if(typeof readPreference == 'string') readPreference = new ReadPreference(readPreference);
+ if(!(readPreference instanceof ReadPreference)) throw new MongoError('readPreference must be a ReadPreference instance');
+
+ // Set empty options object
+ options = options || {}
+
+ // Final query
+ var finalCmd = {};
+ for(var name in cmd) {
+ finalCmd[name] = cmd[name];
+ }
+
+ // Build command namespace
+ var parts = ns.split(/\./);
+
+ // We have a Mongos topology, check if we need to add a readPreference
+ if(topology.type == 'mongos' && readPreference) {
+ finalCmd['$readPreference'] = readPreference.toJSON();
+ }
+
+ // Serialize functions
+ var serializeFunctions = typeof options.serializeFunctions == 'boolean'
+ ? options.serializeFunctions : false;
+
+ var ignoreUndefined = typeof options.ignoreUndefined == 'boolean'
+ ? options.ignoreUndefined : false;
+
+ // Throw on majority readConcern passed in
+ if(cmd.readConcern && cmd.readConcern.level != 'local') {
+ throw new MongoError(f('server %s command does not support a readConcern level of %s', JSON.stringify(cmd), cmd.readConcern.level));
+ }
+
+ // Remove readConcern, ensure no failing commands
+ if(cmd.readConcern) delete cmd['readConcern'];
+
+ // Build Query object
+ var query = new Query(bson, f('%s.$cmd', parts.shift()), finalCmd, {
+ numberToSkip: 0, numberToReturn: -1
+ , checkKeys: false, serializeFunctions: serializeFunctions
+ , ignoreUndefined: ignoreUndefined
+ });
+
+ // Set query flags
+ query.slaveOk = readPreference.slaveOk();
+
+ // Return the query
+ return query;
+}
+
+/**
+ * @ignore
+ */
+var bindToCurrentDomain = function(callback) {
+ var domain = process.domain;
+ if(domain == null || callback == null) {
+ return callback;
+ } else {
+ return domain.bind(callback);
+ }
+}
+
+module.exports = WireProtocol;