You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ke...@apache.org on 2021/03/16 01:39:12 UTC
[skywalking-nodejs] branch master updated: Refactor and cleanups
(#38)
This is an automated email from the ASF dual-hosted git repository.
kezhenxu94 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-nodejs.git
The following commit(s) were added to refs/heads/master by this push:
new 1fbb384 Refactor and cleanups (#38)
1fbb384 is described below
commit 1fbb3846f4b3855a84ab48f773c39f79db2a256d
Author: Tomasz Pytel <to...@gmail.com>
AuthorDate: Mon Mar 15 22:39:06 2021 -0300
Refactor and cleanups (#38)
---
src/annotations/index.ts | 4 +-
src/core/SwPlugin.ts | 57 +++++++++++++++
src/plugins/AMQPLibPlugin.ts | 7 +-
src/plugins/AxiosPlugin.ts | 64 +++++++++--------
src/plugins/ExpressPlugin.ts | 83 ++++++++++++----------
src/plugins/HttpPlugin.ts | 118 +++++++++++--------------------
src/plugins/MongoDBPlugin.ts | 98 ++++++++++++-------------
src/plugins/MySQLPlugin.ts | 38 +++-------
src/plugins/PgPlugin.ts | 54 ++++----------
src/trace/context/SpanContext.ts | 2 +-
src/trace/span/Span.ts | 14 ++--
src/trace/span/StackedSpan.ts | 6 +-
tests/plugins/axios/expected.data.yaml | 64 ++++++++---------
tests/plugins/express/expected.data.yaml | 8 +--
14 files changed, 298 insertions(+), 319 deletions(-)
diff --git a/src/annotations/index.ts b/src/annotations/index.ts
index e6fe0e1..de3b15d 100644
--- a/src/annotations/index.ts
+++ b/src/annotations/index.ts
@@ -28,7 +28,9 @@ export function trace(op?: string) {
const original = descriptor.value;
descriptor.value = function (...args: any[]) {
- const span = ContextManager.current.newLocalSpan(op || key).start();
+ const span = ContextManager.current.newLocalSpan(op || key);
+
+ span.start();
const result = original.apply(this, args);
diff --git a/src/core/SwPlugin.ts b/src/core/SwPlugin.ts
index 5dbbfe7..0c26ccc 100644
--- a/src/core/SwPlugin.ts
+++ b/src/core/SwPlugin.ts
@@ -18,6 +18,7 @@
*/
import PluginInstaller from './PluginInstaller';
+import Span from '../trace/span/Span';
export default interface SwPlugin {
readonly module: string;
@@ -25,3 +26,59 @@ export default interface SwPlugin {
install(installer: PluginInstaller): void;
}
+
+export const wrapEmit = (span: Span, ee: any, doError: boolean = true, endEvent: any = NaN) => { // because NaN !== NaN
+ const _emit = ee.emit;
+
+ Object.defineProperty(ee, 'emit', {configurable: true, writable: true, value: (function(this: any): any {
+ const event = arguments[0];
+
+ span.resync();
+
+ try {
+ if (doError && event === 'error')
+ span.error(arguments[1]);
+
+ return _emit.apply(this, arguments);
+
+ } catch (err) {
+ span.error(err);
+
+ throw err;
+
+ } finally {
+ if (event === endEvent)
+ span.stop();
+ else
+ span.async();
+ }
+ })});
+};
+
+export const wrapCallback = (span: Span, callback: any, idxError: any = false) => {
+ return function(this: any) {
+ if (idxError !== false && arguments[idxError])
+ span.error(arguments[idxError]);
+
+ span.stop();
+
+ return callback.apply(this, arguments);
+ }
+};
+
+export const wrapPromise = (span: Span, promise: any) => {
+ return promise.then(
+ (res: any) => {
+ span.stop();
+
+ return res;
+ },
+
+ (err: any) => {
+ span.error(err);
+ span.stop();
+
+ return Promise.reject(err);
+ }
+ );
+};
diff --git a/src/plugins/AMQPLibPlugin.ts b/src/plugins/AMQPLibPlugin.ts
index 794e620..079b0bc 100644
--- a/src/plugins/AMQPLibPlugin.ts
+++ b/src/plugins/AMQPLibPlugin.ts
@@ -43,8 +43,9 @@ class AMQPLibPlugin implements SwPlugin {
const topic = fields.exchange || '';
const queue = fields.routingKey || '';
const peer = `${this.connection.stream.remoteAddress}:${this.connection.stream.remotePort}`;
+ const span = ContextManager.current.newExitSpan('RabbitMQ/' + topic + '/' + queue + '/Producer', peer, Component.RABBITMQ_PRODUCER);
- const span = ContextManager.current.newExitSpan('RabbitMQ/' + topic + '/' + queue + '/Producer', peer, Component.RABBITMQ_PRODUCER).start();
+ span.start();
try {
span.inject().items.forEach((item) => {
@@ -85,7 +86,9 @@ class AMQPLibPlugin implements SwPlugin {
const topic = message?.fields?.exchange || '';
const queue = message?.fields?.routingKey || '';
const carrier = ContextCarrier.from(message?.properties?.headers || {});
- const span = ContextManager.current.newEntrySpan('RabbitMQ/' + topic + '/' + queue + '/Consumer', carrier).start();
+ const span = ContextManager.current.newEntrySpan('RabbitMQ/' + topic + '/' + queue + '/Consumer', carrier);
+
+ span.start();
try {
span.component = Component.RABBITMQ_CONSUMER;
diff --git a/src/plugins/AxiosPlugin.ts b/src/plugins/AxiosPlugin.ts
index 1491532..1684c99 100644
--- a/src/plugins/AxiosPlugin.ts
+++ b/src/plugins/AxiosPlugin.ts
@@ -17,7 +17,7 @@
*
*/
-import SwPlugin from '../core/SwPlugin';
+import SwPlugin, {wrapPromise} from '../core/SwPlugin';
import { URL } from 'url';
import ContextManager from '../trace/context/ContextManager';
import { Component } from '../trace/Component';
@@ -34,68 +34,74 @@ class AxiosPlugin implements SwPlugin {
}
private interceptClientRequest(installer: PluginInstaller): void {
- const defaults = installer.require('axios/lib/defaults');
- const defaultAdapter = defaults.adapter; // this will be http adapter
+ const Axios = installer.require('axios/lib/core/Axios');
+ const _request = Axios.prototype.request;
+
+ Axios.prototype.request = function(url?: any, config?: any) {
+ if (typeof url === 'string')
+ config = config ? {...config, url} : {url};
+ else
+ config = url ? {...url} : {};
- defaults.adapter = (config: any) => {
const {origin, host, pathname: operation} = new URL(config.url); // TODO: this may throw invalid URL
- const span = ContextManager.current.newExitSpan(operation, host, Component.AXIOS, Component.HTTP).start();
+ const span = ContextManager.current.newExitSpan(operation, host, Component.AXIOS, Component.HTTP);
- let ret: any;
+ span.start();
try {
+ config.headers = config.headers ? {...config.headers} : {};
+
span.component = Component.AXIOS;
span.layer = SpanLayer.HTTP;
span.peer = host;
+
span.tag(Tag.httpURL(origin + operation));
+ span.tag(Tag.httpMethod((config.method || 'GET').toUpperCase()));
- span.inject().items.forEach((item) => {
- config.headers[item.key] = item.value;
- });
+ span.inject().items.forEach((item) => config.headers[item.key] = item.value);
const copyStatus = (response: any) => {
if (response) {
if (response.status) {
span.tag(Tag.httpStatusCode(response.status));
- if (response.status >= 400) {
+
+ if (response.status >= 400)
span.errored = true;
- }
}
- if (response.statusText) {
+ if (response.statusText)
span.tag(Tag.httpStatusMsg(response.statusText));
- }
}
};
- ret = defaultAdapter(config).then(
- (response: any) => {
- copyStatus(response);
+ const ret = _request.call(this, config).then(
+ (res: any) => {
+ copyStatus(res);
span.stop();
- return response;
+ return res;
},
- (error: any) => {
- copyStatus(error.response);
- span.error(error);
+ (err: any) => {
+ copyStatus(err.response);
+ span.error(err);
span.stop();
- return Promise.reject(error);
+ return Promise.reject(err);
}
);
- } catch (e) {
- span.error(e);
- span.stop();
+ span.async();
- throw e;
- }
+ return ret;
- span.async();
+ } catch (err) {
+ span.error(err);
+ span.stop();
- return ret;
- }
+ throw err;
+ }
+ };
}
}
diff --git a/src/plugins/ExpressPlugin.ts b/src/plugins/ExpressPlugin.ts
index 9afd56f..90f86b7 100644
--- a/src/plugins/ExpressPlugin.ts
+++ b/src/plugins/ExpressPlugin.ts
@@ -22,6 +22,7 @@ import { IncomingMessage, ServerResponse } from 'http';
import ContextManager from '../trace/context/ContextManager';
import { Component } from '../trace/Component';
import Tag from '../Tag';
+import EntrySpan from '../trace/span/EntrySpan';
import { SpanLayer } from '../proto/language-agent/Tracing_pb';
import { ContextCarrier } from '../trace/context/ContextCarrier';
import PluginInstaller from '../core/PluginInstaller';
@@ -36,64 +37,68 @@ class ExpressPlugin implements SwPlugin {
private interceptServerRequest(installer: PluginInstaller) {
const router = installer.require('express/lib/router');
- const onFinished = installer.require('on-finished');
const _handle = router.handle;
- router.handle = function(req: IncomingMessage, res: ServerResponse, out: any) {
- const headers = req.rawHeaders || [];
- const headersMap: { [key: string]: string } = {};
+ router.handle = function(req: IncomingMessage, res: ServerResponse, next: any) {
+ const carrier = ContextCarrier.from((req as any).headers || {});
+ const operation = (req.url || '/').replace(/\?.*/g, '');
+ const span: EntrySpan = ContextManager.current.newEntrySpan(operation, carrier, Component.HTTP_SERVER) as EntrySpan;
- for (let i = 0; i < headers.length / 2; i += 2) {
- headersMap[headers[i]] = headers[i + 1];
- }
+ span.component = Component.EXPRESS;
- const carrier = ContextCarrier.from(headersMap);
- const operation = (req.url || '/').replace(/\?.*/g, '');
- const span = ContextManager.current.newEntrySpan(operation, carrier, Component.HTTP_SERVER).start();
-
- let stopped = 0;
- const stopIfNotStopped = (err: Error | null) => {
- if (!stopped++) {
- span.stop();
- span.tag(Tag.httpStatusCode(res.statusCode));
- if (res.statusCode && res.statusCode >= 400) {
- span.errored = true;
- }
- if (err instanceof Error) {
- span.error(err);
- }
- if (res.statusMessage) {
- span.tag(Tag.httpStatusMsg(res.statusMessage));
- }
- }
+ if (span.depth) // if we inherited from http then just change component ID and let http do the work
+ return _handle.apply(this, arguments);
+
+ // all the rest of this code is only needed to make express tracing work if the http plugin is disabled
+
+ let copyStatusErrorAndStopIfNotStopped = (err: Error | undefined) => {
+ copyStatusErrorAndStopIfNotStopped = () => undefined;
+
+ span.tag(Tag.httpStatusCode(res.statusCode));
+
+ if (res.statusCode && res.statusCode >= 400)
+ span.errored = true;
+
+ if (res.statusMessage)
+ span.tag(Tag.httpStatusMsg(res.statusMessage));
+
+ if (err instanceof Error)
+ span.error(err);
+
+ span.stop();
};
+ span.start();
+
try {
span.layer = SpanLayer.HTTP;
- span.component = Component.EXPRESS;
span.peer =
(typeof req.headers['x-forwarded-for'] === 'string' && req.headers['x-forwarded-for'].split(',').shift())
|| (req.connection.remoteFamily === 'IPv6'
? `[${req.connection.remoteAddress}]:${req.connection.remotePort}`
: `${req.connection.remoteAddress}:${req.connection.remotePort}`);
+
span.tag(Tag.httpMethod(req.method));
const ret = _handle.call(this, req, res, (err: Error) => {
- if (err) {
- span.error(err);
- } else {
- span.errored = true;
- }
- out.call(this, err);
- stopped -= 1; // skip first stop attempt, make sure stop executes once status code and message is set
- onFinished(res, stopIfNotStopped); // this must run after any handlers deferred in 'out'
+ span.error(err);
+ next.call(this, err);
});
- onFinished(res, stopIfNotStopped); // this must run after any handlers deferred in 'out'
+
+ if (process.version < 'v12')
+ req.on('end', copyStatusErrorAndStopIfNotStopped); // this insead of req or res.close because Node 10 doesn't emit those
+ else
+ res.on('close', copyStatusErrorAndStopIfNotStopped); // this works better
+
+ span.async();
return ret;
- } catch (e) {
- stopIfNotStopped(e);
- throw e;
+
+ } catch (err) {
+ copyStatusErrorAndStopIfNotStopped(err);
+
+ throw err;
+
} finally { // req.protocol is only possibly available after call to _handle()
span.tag(Tag.httpURL(((req as any).protocol ? (req as any).protocol + '://' : '') + (req.headers.host || '') + req.url));
}
diff --git a/src/plugins/HttpPlugin.ts b/src/plugins/HttpPlugin.ts
index be38904..2644906 100644
--- a/src/plugins/HttpPlugin.ts
+++ b/src/plugins/HttpPlugin.ts
@@ -17,7 +17,7 @@
*
*/
-import SwPlugin from '../core/SwPlugin';
+import SwPlugin, {wrapEmit} from '../core/SwPlugin';
import { URL } from 'url';
import { ClientRequest, IncomingMessage, RequestOptions, ServerResponse } from 'http';
import ContextManager from '../trace/context/ContextManager';
@@ -57,87 +57,42 @@ class HttpPlugin implements SwPlugin {
pathname: url.path || '/',
};
- const httpMethod = arguments[url instanceof URL || typeof url === 'string' ? 1 : 0]?.method || 'GET';
- const httpURL = protocol + '://' + host + pathname;
const operation = pathname.replace(/\?.*$/g, '');
+ const span: ExitSpan = ContextManager.current.newExitSpan(operation, host, Component.HTTP) as ExitSpan;
- const span: ExitSpan = ContextManager.current.newExitSpan(operation, host, Component.HTTP).start() as ExitSpan;
+ if (span.depth) // if we inherited from a higher level plugin then do nothing, higher level should do all the work and we don't duplicate here
+ return _request.apply(this, arguments);
- try {
- if (span.depth === 1) { // only set HTTP if this span is not overridden by a higher level one
- span.component = Component.HTTP;
- span.layer = SpanLayer.HTTP;
- }
-
- if (!span.peer)
- span.peer = host;
+ span.start();
- if (!span.hasTag(Tag.httpURLKey)) // only set if a higher level plugin with more info did not already set
- span.tag(Tag.httpURL(httpURL));
+ try {
+ span.component = Component.HTTP;
+ span.layer = SpanLayer.HTTP;
+ span.peer = host;
- if (!span.hasTag(Tag.httpMethodKey))
- span.tag(Tag.httpMethod(httpMethod));
+ span.tag(Tag.httpURL(protocol + '://' + host + pathname));
+ span.tag(Tag.httpMethod(arguments[url instanceof URL || typeof url === 'string' ? 1 : 0]?.method || 'GET'));
const req: ClientRequest = _request.apply(this, arguments);
span.inject().items.forEach((item) => req.setHeader(item.key, item.value));
+ wrapEmit(span, req, true, 'close');
+
req.on('timeout', () => span.log('Timeout', true));
req.on('abort', () => span.log('Abort', span.errored = true));
- req.on('error', (err) => span.error(err));
-
- const _emit = req.emit;
-
- req.emit = function(): any {
- const event = arguments[0];
-
- span.resync();
-
- try {
- if (event === 'response') {
- const res = arguments[1];
-
- span.tag(Tag.httpStatusCode(res.statusCode));
-
- if (res.statusCode && res.statusCode >= 400)
- span.errored = true;
-
- if (res.statusMessage)
- span.tag(Tag.httpStatusMsg(res.statusMessage));
-
- const _emitRes = res.emit;
- res.emit = function(): any {
- span.resync();
+ req.on('response', (res: any) => {
+ span.tag(Tag.httpStatusCode(res.statusCode));
- try {
- return _emitRes.apply(this, arguments);
+ if (res.statusCode && res.statusCode >= 400)
+ span.errored = true;
- } catch (err) {
- span.error(err);
+ if (res.statusMessage)
+ span.tag(Tag.httpStatusMsg(res.statusMessage));
- throw err;
-
- } finally {
- span.async();
- }
- }
- }
-
- return _emit.apply(this, arguments as any);
-
- } catch (err) {
- span.error(err);
-
- throw err;
-
- } finally {
- if (event === 'close')
- span.stop();
- else
- span.async();
- }
- };
+ wrapEmit(span, res, false);
+ });
span.async();
@@ -160,15 +115,11 @@ class HttpPlugin implements SwPlugin {
return _addListener.call(this, event, event === 'request' ? _sw_request : handler, ...addArgs);
function _sw_request(this: any, req: IncomingMessage, res: ServerResponse, ...reqArgs: any[]) {
- const headers = req.rawHeaders || [];
- const headersMap: { [key: string]: string } = {};
-
- for (let i = 0; i < headers.length / 2; i += 2)
- headersMap[headers[i]] = headers[i + 1];
-
- const carrier = ContextCarrier.from(headersMap);
+ const carrier = ContextCarrier.from((req as any).headers || {});
const operation = (req.url || '/').replace(/\?.*/g, '');
- const span = ContextManager.current.newEntrySpan(operation, carrier).start();
+ const span = ContextManager.current.newEntrySpan(operation, carrier);
+
+ span.start();
try {
span.component = Component.HTTP_SERVER;
@@ -198,9 +149,22 @@ class HttpPlugin implements SwPlugin {
span.stop();
};
- req.on('end', copyStatusAndStopIfNotStopped); // this insead of 'close' because Node 10 doesn't emit those
- res.on('abort', () => (span.errored = true, span.log('Abort', true), copyStatusAndStopIfNotStopped()));
- res.on('error', (err) => (span.error(err), copyStatusAndStopIfNotStopped()));
+ if (process.version < 'v12')
+ req.on('end', copyStatusAndStopIfNotStopped); // this insead of req or res.close because Node 10 doesn't emit those
+ else
+ res.on('close', copyStatusAndStopIfNotStopped); // this works better
+
+ res.on('abort', () => {
+ span.errored = true;
+
+ span.log('Abort', true);
+ copyStatusAndStopIfNotStopped();
+ });
+
+ res.on('error', (err) => {
+ span.error(err);
+ copyStatusAndStopIfNotStopped();
+ });
span.async();
diff --git a/src/plugins/MongoDBPlugin.ts b/src/plugins/MongoDBPlugin.ts
index c0de1ed..a4355ce 100644
--- a/src/plugins/MongoDBPlugin.ts
+++ b/src/plugins/MongoDBPlugin.ts
@@ -17,7 +17,7 @@
*
*/
-import SwPlugin from '../core/SwPlugin';
+import SwPlugin, {wrapPromise} from '../core/SwPlugin';
import ContextManager from '../trace/context/ContextManager';
import { Component } from '../trace/Component';
import ExitSpan from '../trace/span/ExitSpan';
@@ -36,13 +36,12 @@ class MongoDBPlugin implements SwPlugin {
// Problematic because other exit spans may be created during processing, for this reason we do not .resync() this
// span to the span list until it is closed. If the cursor is never closed then the span will not be sent.
- maybeHookCursor(span: any, cursor: any): boolean {
+ hookCursorMaybe(span: any, cursor: any): boolean {
if (!(cursor instanceof this.Cursor))
return false;
cursor.on('error', (err: any) => {
span.error(err);
- span.stop();
});
cursor.on('close', () => {
@@ -57,21 +56,20 @@ class MongoDBPlugin implements SwPlugin {
const Collection = installer.require('mongodb/lib/collection');
this.Cursor = installer.require('mongodb/lib/cursor');
- const wrapCallback = (span: any, args: any[], idx: number): boolean => {
+ const wrapCallbackWithCursorMaybe = (span: any, args: any[], idx: number): boolean => {
const callback = args.length > idx && typeof args[idx = args.length - 1] === 'function' ? args[idx] : null;
if (!callback)
return false;
- args[idx] = function(this: any, error: any, result: any) {
- if (error || !plugin.maybeHookCursor(span, result)) {
- if (error)
- span.error(error);
+ args[idx] = function(this: any) { // arguments = [error: any, result: any]
+ if (arguments[0])
+ span.error(arguments[0]);
+ if (arguments[0] || !plugin.hookCursorMaybe(span, arguments[1]))
span.stop();
- }
- return callback.call(this, error, result);
+ return callback.apply(this, arguments);
}
return true;
@@ -95,13 +93,13 @@ class MongoDBPlugin implements SwPlugin {
if (agentConfig.mongoTraceParameters)
span.tag(Tag.dbMongoParameters(stringify(args[0])));
- return wrapCallback(span, args, 1);
+ return wrapCallbackWithCursorMaybe(span, args, 1);
};
const deleteFunc = function(this: any, operation: string, span: any, args: any[]): boolean { // args = [filter, options, callback]
span.tag(Tag.dbStatement(`${this.s.namespace.collection}.${operation}(${stringify(args[0])})`));
- return wrapCallback(span, args, 1);
+ return wrapCallbackWithCursorMaybe(span, args, 1);
};
const updateFunc = function(this: any, operation: string, span: any, args: any[]): boolean { // args = [filter, update, options, callback]
@@ -110,19 +108,19 @@ class MongoDBPlugin implements SwPlugin {
if (agentConfig.mongoTraceParameters)
span.tag(Tag.dbMongoParameters(stringify(args[1])));
- return wrapCallback(span, args, 2);
+ return wrapCallbackWithCursorMaybe(span, args, 2);
};
const findOneFunc = function(this: any, operation: string, span: any, args: any[]): boolean { // args = [query, options, callback]
span.tag(Tag.dbStatement(`${this.s.namespace.collection}.${operation}(${typeof args[0] !== 'function' ? stringify(args[0]) : ''})`));
- return wrapCallback(span, args, 0);
+ return wrapCallbackWithCursorMaybe(span, args, 0);
};
const findAndRemoveFunc = function(this: any, operation: string, span: any, args: any[]): boolean { // args = [query, sort, options, callback]
span.tag(Tag.dbStatement(`${this.s.namespace.collection}.${operation}(${stringify(args[0])}${typeof args[1] !== 'function' && args[1] !== undefined ? ', ' + stringify(args[1]) : ''})`));
- return wrapCallback(span, args, 1);
+ return wrapCallbackWithCursorMaybe(span, args, 1);
};
const findAndModifyFunc = function(this: any, operation: string, span: any, args: any[]): boolean { // args = [query, sort, doc, options, callback]
@@ -139,19 +137,19 @@ class MongoDBPlugin implements SwPlugin {
span.tag(Tag.dbStatement(`${this.s.namespace.collection}.${operation}(${params})`));
- return wrapCallback(span, args, 1);
+ return wrapCallbackWithCursorMaybe(span, args, 1);
};
const mapReduceFunc = function(this: any, operation: string, span: any, args: any[]): boolean { // args = [map, reduce, options, callback]
span.tag(Tag.dbStatement(`${this.s.namespace.collection}.${operation}(${args[0]}, ${args[1]})`));
- return wrapCallback(span, args, 2);
+ return wrapCallbackWithCursorMaybe(span, args, 2);
};
const dropFunc = function(this: any, operation: string, span: any, args: any[]): boolean { // args = [options, callback]
span.tag(Tag.dbStatement(`${this.s.namespace.collection}.${operation}()`));
- return wrapCallback(span, args, 0);
+ return wrapCallbackWithCursorMaybe(span, args, 0);
};
this.interceptOperation(Collection, 'insert', insertFunc);
@@ -195,24 +193,28 @@ class MongoDBPlugin implements SwPlugin {
this.interceptOperation(Collection, 'indexExists', deleteFunc);
this.interceptOperation(Collection, 'indexInformation', dropFunc);
this.interceptOperation(Collection, 'listIndexes', dropFunc); // cursor
+ this.interceptOperation(Collection, 'stats', dropFunc);
this.interceptOperation(Collection, 'rename', deleteFunc);
this.interceptOperation(Collection, 'drop', dropFunc);
+ this.interceptOperation(Collection, 'options', dropFunc);
+ this.interceptOperation(Collection, 'isCapped', dropFunc);
+
+ // TODO
+
+ // DB functions
// TODO?
- // stats
- // options
- // isCapped
- // initializeUnorderedBulkOp
- // initializeOrderedBulkOp
- // watch
+ // group
// NODO:
- // group
+ // initializeUnorderedBulkOp
+ // initializeOrderedBulkOp
// parallelCollectionScan
// geoHaystackSearch
+ // watch
}
interceptOperation(Collection: any, operation: string, operationFunc: any): void {
@@ -226,10 +228,13 @@ class MongoDBPlugin implements SwPlugin {
const spans = ContextManager.spans;
let span = spans[spans.length - 1];
- if (span && span.component === Component.MONGODB && span instanceof ExitSpan) // mongodb has called into itself internally
+ // XXX: mongodb calls back into itself at this level in several places, for this reason we just do a normal call
+ // if this is detected instead of opening a new span. This should not affect secondary db calls being recorded
+ // from a cursor since this span is kept async until the cursor is closed, at which point it is stoppped.
+
+ if (span?.component === Component.MONGODB) // mongodb has called into itself internally, span instanceof ExitSpan assumed
return _original.apply(this, args);
- let ret: any;
let host: string;
try {
@@ -238,7 +243,9 @@ class MongoDBPlugin implements SwPlugin {
host = '???';
}
- span = ContextManager.current.newExitSpan('MongoDB/' + operation, host, Component.MONGODB).start();
+ span = ContextManager.current.newExitSpan('MongoDB/' + operation, host, Component.MONGODB);
+
+ span.start();
try {
span.component = Component.MONGODB;
@@ -250,45 +257,32 @@ class MongoDBPlugin implements SwPlugin {
const hasCB = operationFunc.call(this, operation, span, args);
- ret = _original.apply(this, args);
+ let ret = _original.apply(this, args);
if (!hasCB) {
- if (plugin.maybeHookCursor(span, ret)) {
+ if (plugin.hookCursorMaybe(span, ret)) {
// NOOP
- } else if (!ret || typeof ret.then !== 'function') { // generic Promise check
- span.stop(); // no callback passed in and no Promise or Cursor returned, play it safe
-
- return ret;
-
- } else {
- ret = ret.then(
- (res: any) => {
- span.stop();
-
- return res;
- },
+ } else if (ret && typeof ret.then === 'function') { // generic Promise check
+ ret = wrapPromise(span, ret);
- (err: any) => {
- span.error(err);
- span.stop();
+ } else { // no callback passed in and no Promise or Cursor returned, play it safe
+ span.stop();
- return Promise.reject(err);
- }
- );
+ return ret;
}
}
+ span.async();
+
+ return ret;
+
} catch (e) {
span.error(e);
span.stop();
throw e;
}
-
- span.async();
-
- return ret;
};
}
}
diff --git a/src/plugins/MySQLPlugin.ts b/src/plugins/MySQLPlugin.ts
index b6f4826..0f67b04 100644
--- a/src/plugins/MySQLPlugin.ts
+++ b/src/plugins/MySQLPlugin.ts
@@ -17,7 +17,7 @@
*
*/
-import SwPlugin from '../core/SwPlugin';
+import SwPlugin, {wrapEmit, wrapCallback} from '../core/SwPlugin';
import ContextManager from '../trace/context/ContextManager';
import { Component } from '../trace/Component';
import Tag from '../Tag';
@@ -34,21 +34,12 @@ class MySQLPlugin implements SwPlugin {
const _query = Connection.prototype.query;
Connection.prototype.query = function(sql: any, values: any, cb: any) {
- const wrapCallback = (_cb: any) => {
- return function(this: any, error: any, results: any, fields: any) {
- if (error)
- span.error(error);
-
- span.stop();
-
- return _cb.call(this, error, results, fields);
- }
- };
-
let query: any;
const host = `${this.config.host}:${this.config.port}`;
- const span = ContextManager.current.newExitSpan('mysql/query', host, Component.MYSQL).start();
+ const span = ContextManager.current.newExitSpan('mysql/query', host, Component.MYSQL);
+
+ span.start();
try {
span.component = Component.MYSQL;
@@ -63,20 +54,20 @@ class MySQLPlugin implements SwPlugin {
let streaming: any;
if (typeof sql === 'function') {
- sql = wrapCallback(sql);
+ sql = wrapCallback(span, sql, 0);
} else if (typeof sql === 'object') {
_sql = sql.sql;
if (typeof values === 'function') {
- values = wrapCallback(values);
+ values = wrapCallback(span, values, 0);
_values = sql.values;
} else if (values !== undefined) {
_values = values;
if (typeof cb === 'function') {
- cb = wrapCallback(cb);
+ cb = wrapCallback(span, cb, 0);
} else {
streaming = true;
}
@@ -89,13 +80,13 @@ class MySQLPlugin implements SwPlugin {
_sql = sql;
if (typeof values === 'function') {
- values = wrapCallback(values);
+ values = wrapCallback(span, values, 0);
} else if (values !== undefined) {
_values = values;
if (typeof cb === 'function') {
- cb = wrapCallback(cb);
+ cb = wrapCallback(span, cb, 0);
} else {
streaming = true;
}
@@ -118,15 +109,8 @@ class MySQLPlugin implements SwPlugin {
query = _query.call(this, sql, values, cb);
- if (streaming) {
- query.on('error', (e: any) => {
- span.error(e);
- });
-
- query.on('end', () => {
- span.stop()
- });
- }
+ if (streaming)
+ wrapEmit(span, query, true, 'end');
} catch (e) {
span.error(e);
diff --git a/src/plugins/PgPlugin.ts b/src/plugins/PgPlugin.ts
index ed6415a..7129def 100644
--- a/src/plugins/PgPlugin.ts
+++ b/src/plugins/PgPlugin.ts
@@ -17,7 +17,7 @@
*
*/
-import SwPlugin from '../core/SwPlugin';
+import SwPlugin, {wrapEmit, wrapCallback, wrapPromise} from '../core/SwPlugin';
import ContextManager from '../trace/context/ContextManager';
import { Component } from '../trace/Component';
import Tag from '../Tag';
@@ -41,21 +41,12 @@ class MySQLPlugin implements SwPlugin {
const _query = Client.prototype.query;
Client.prototype.query = function(config: any, values: any, callback: any) {
- const wrapCallback = (_cb: any) => {
- return function(this: any, err: any, res: any) {
- if (err)
- span.error(err);
-
- span.stop();
-
- return _cb.call(this, err, res);
- }
- };
-
let query: any;
const host = `${this.host}:${this.port}`;
- const span = ContextManager.current.newExitSpan('pg/query', host, Component.POSTGRESQL).start();
+ const span = ContextManager.current.newExitSpan('pg/query', host, Component.POSTGRESQL);
+
+ span.start();
try {
span.component = Component.POSTGRESQL;
@@ -76,16 +67,16 @@ class MySQLPlugin implements SwPlugin {
_values = config.values;
if (typeof config.callback === 'function')
- config.callback = wrapCallback(config.callback);
+ config.callback = wrapCallback(span, config.callback, 0);
}
if (typeof values === 'function')
- values = wrapCallback(values);
+ values = wrapCallback(span, values, 0);
else if (_values !== undefined)
_values = values;
if (typeof callback === 'function')
- callback = wrapCallback(callback);
+ callback = wrapCallback(span, callback, 0);
span.tag(Tag.dbStatement(`${_sql}`));
@@ -101,32 +92,11 @@ class MySQLPlugin implements SwPlugin {
query = _query.call(this, config, values, callback);
if (query) {
- if (Cursor && query instanceof Cursor) {
- query.on('error', (err: any) => {
- span.error(err);
- span.stop();
- });
-
- query.on('end', () => {
- span.stop();
- });
-
- } else if (typeof query.then === 'function') { // generic Promise check
- query = query.then(
- (res: any) => {
- span.stop();
-
- return res;
- },
-
- (err: any) => {
- span.error(err);
- span.stop();
-
- return Promise.reject(err);
- }
- );
- } // else we assume there was a callback
+ if (Cursor && query instanceof Cursor)
+ wrapEmit(span, query, true, 'end');
+ else if (typeof query.then === 'function') // generic Promise check
+ query = wrapPromise(span, query);
+ // else we assume there was a callback
}
} catch (e) {
diff --git a/src/trace/context/SpanContext.ts b/src/trace/context/SpanContext.ts
index 0421d53..8c39072 100644
--- a/src/trace/context/SpanContext.ts
+++ b/src/trace/context/SpanContext.ts
@@ -224,7 +224,7 @@ export default class SpanContext implements Context {
nSpans: this.nSpans,
});
- if (!ContextManager.hasContext) {
+ if (!ContextManager.hasContext || !ContextManager.spans.length) {
ContextManager.restore(span.context, [span]);
} else if (ContextManager.spans.every((s) => s.id !== span.id)) {
ContextManager.spans.push(span);
diff --git a/src/trace/span/Span.ts b/src/trace/span/Span.ts
index a03808c..e6a1b56 100644
--- a/src/trace/span/Span.ts
+++ b/src/trace/span/Span.ts
@@ -72,25 +72,21 @@ export default abstract class Span {
if (options.component) this.component = options.component;
}
- start(): this {
+ start(): void {
this.startTime = new Date().getTime();
this.context.start(this);
- return this;
}
- stop(): this {
+ stop(): void {
this.context.stop(this);
- return this;
}
- async(): this {
+ async(): void {
this.context.async(this);
- return this;
}
- resync(): this {
+ resync(): void {
this.context.resync(this);
- return this;
}
finish(segment: Segment): boolean {
@@ -144,7 +140,7 @@ export default abstract class Span {
error(error: Error): this {
this.errored = true;
- this.log('Stack', error.stack || '')
+ this.log('Stack', error?.stack || '');
return this;
}
diff --git a/src/trace/span/StackedSpan.ts b/src/trace/span/StackedSpan.ts
index 3b1fc74..77e3243 100644
--- a/src/trace/span/StackedSpan.ts
+++ b/src/trace/span/StackedSpan.ts
@@ -27,17 +27,15 @@ export default class StackedSpan extends Span {
super(options);
}
- start(): this {
+ start(): void {
if (++this.depth === 1) {
super.start();
}
- return this;
}
- stop(): this {
+ stop(): void {
if (--this.depth === 0) {
super.stop();
}
- return this;
}
}
diff --git a/tests/plugins/axios/expected.data.yaml b/tests/plugins/axios/expected.data.yaml
index 4f0ac6e..c50deae 100644
--- a/tests/plugins/axios/expected.data.yaml
+++ b/tests/plugins/axios/expected.data.yaml
@@ -21,6 +21,26 @@ segmentItems:
segments:
- segmentId: not null
spans:
+ - operationName: /json
+ operationId: 0
+ parentSpanId: 0
+ spanId: 1
+ spanLayer: Http
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 4005
+ spanType: Exit
+ peer: httpbin.org
+ skipAnalysis: false
+ tags:
+ - key: http.url
+ value: http://httpbin.org/json
+ - key: http.method
+ value: GET
+ - key: http.status.code
+ value: '200'
+ - key: http.status.msg
+ value: OK
- operationName: /axios
operationId: 0
parentSpanId: -1
@@ -50,26 +70,6 @@ segmentItems:
parentServiceInstance: not null
parentService: client
traceId: not null
- - operationName: /json
- operationId: 0
- parentSpanId: 0
- spanId: 1
- spanLayer: Http
- startTime: gt 0
- endTime: gt 0
- componentId: 4005
- spanType: Exit
- peer: httpbin.org
- skipAnalysis: false
- tags:
- - key: http.url
- value: http://httpbin.org/json
- - key: http.method
- value: GET
- - key: http.status.code
- value: '200'
- - key: http.status.msg
- value: OK
- serviceName: client
segmentSize: 1
segments:
@@ -77,12 +77,12 @@ segmentItems:
spans:
- operationName: /axios
operationId: 0
- parentSpanId: -1
- spanId: 0
+ parentSpanId: 0
+ spanId: 1
spanLayer: Http
tags:
- key: http.url
- value: http://localhost:5001/axios
+ value: http://server:5000/axios
- key: http.method
value: GET
- key: http.status.code
@@ -91,18 +91,18 @@ segmentItems:
value: OK
startTime: gt 0
endTime: gt 0
- componentId: 49
- spanType: Entry
- peer: not null
+ componentId: 4005
+ spanType: Exit
+ peer: server:5000
skipAnalysis: false
- operationName: /axios
operationId: 0
- parentSpanId: 0
- spanId: 1
+ parentSpanId: -1
+ spanId: 0
spanLayer: Http
tags:
- key: http.url
- value: http://server:5000/axios
+ value: http://localhost:5001/axios
- key: http.method
value: GET
- key: http.status.code
@@ -111,7 +111,7 @@ segmentItems:
value: OK
startTime: gt 0
endTime: gt 0
- componentId: 4005
- spanType: Exit
- peer: server:5000
+ componentId: 49
+ spanType: Entry
+ peer: not null
skipAnalysis: false
diff --git a/tests/plugins/express/expected.data.yaml b/tests/plugins/express/expected.data.yaml
index 118eda0..1595e5d 100644
--- a/tests/plugins/express/expected.data.yaml
+++ b/tests/plugins/express/expected.data.yaml
@@ -27,10 +27,10 @@ segmentItems:
spanId: 0
spanLayer: Http
tags:
- - key: http.method
- value: GET
- key: http.url
value: http://server:5000/express
+ - key: http.method
+ value: GET
- key: http.status.code
value: '200'
- key: http.status.msg
@@ -81,10 +81,10 @@ segmentItems:
spanId: 0
spanLayer: Http
tags:
- - key: http.method
- value: GET
- key: http.url
value: http://localhost:5001/express
+ - key: http.method
+ value: GET
- key: http.status.code
value: '200'
- key: http.status.msg