You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ke...@apache.org on 2020/07/26 03:51:27 UTC
[skywalking-nodejs] 02/23: Fix a bug and refactor some codes
This is an automated email from the ASF dual-hosted git repository.
kezhenxu94 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-nodejs.git
commit cc2415120e42c1802f146a43edf74c0e1629bed2
Author: kezhenxu94 <ke...@163.com>
AuthorDate: Sat Jun 13 23:16:50 2020 +0800
Fix a bug and refactor some codes
---
package.json | 6 +-
src/agent/Buffer.ts | 16 +++-
src/agent/protocol/Protocol.ts | 3 +
src/agent/protocol/grpc/GrpcProtocol.ts | 90 ++--------------------
src/agent/protocol/grpc/SegmentObjectAdapter.ts | 3 +
.../agent/protocol/grpc/clients/Client.ts | 15 +---
.../HeartbeatClient.ts} | 65 +++-------------
.../TraceReportClient.ts} | 64 +++------------
src/config/AgentConfig.ts | 2 +
src/trace/context/ContextManager.ts | 5 ++
src/trace/context/SpanContext.ts | 5 +-
src/trace/span/LocalSpan.ts | 5 +-
src/trace/span/Span.ts | 6 ++
src/trace/span/StackedSpan.ts | 2 +-
14 files changed, 75 insertions(+), 212 deletions(-)
diff --git a/package.json b/package.json
index 0fcb9a0..43c164f 100644
--- a/package.json
+++ b/package.json
@@ -1,7 +1,7 @@
{
"name": "skywalking-nodejs",
"version": "1.0.0",
- "description": "",
+ "description": "The NodeJS agent for Apache SkyWalking",
"main": "lib/index.js",
"typings": "lib/index.d.ts",
"scripts": {
@@ -19,8 +19,8 @@
"lib/**/*"
],
"keywords": [],
- "author": "",
- "license": "ISC",
+ "author": "kezhenxu94",
+ "license": "Apache 2.0",
"devDependencies": {
"@types/google-protobuf": "^3.7.2",
"@types/node": "^14.0.11",
diff --git a/src/agent/Buffer.ts b/src/agent/Buffer.ts
index 755848a..63c28da 100644
--- a/src/agent/Buffer.ts
+++ b/src/agent/Buffer.ts
@@ -19,12 +19,18 @@
import { createLogger } from '@/logging';
import Segment from '@/trace/context/Segment';
+import config from '@/config/AgentConfig';
const logger = createLogger('Buffer');
class Buffer {
- maxSize = 1000;
- buffer: Segment[] = [];
+ maxSize: number;
+ buffer: Segment[];
+
+ constructor(maxSize: number = 1000) {
+ this.maxSize = maxSize;
+ this.buffer = [];
+ }
get length(): number {
return this.buffer.length;
@@ -41,4 +47,8 @@ class Buffer {
}
}
-export default new Buffer();
+export default new Buffer(
+ Number.isSafeInteger(config.maxBufferSize)
+ ? Number.parseInt(config.maxBufferSize, 10)
+ : 1000,
+);
diff --git a/src/agent/protocol/Protocol.ts b/src/agent/protocol/Protocol.ts
index 45085d4..7f87c6d 100644
--- a/src/agent/protocol/Protocol.ts
+++ b/src/agent/protocol/Protocol.ts
@@ -17,6 +17,9 @@
*
*/
+/**
+ * The transport protocol between the agent and the backend (OAP).
+ */
export default interface Protocol {
isConnected: boolean;
diff --git a/src/agent/protocol/grpc/GrpcProtocol.ts b/src/agent/protocol/grpc/GrpcProtocol.ts
index 6e04232..1597491 100644
--- a/src/agent/protocol/grpc/GrpcProtocol.ts
+++ b/src/agent/protocol/grpc/GrpcProtocol.ts
@@ -18,101 +18,21 @@
*/
import Protocol from '@/agent/protocol/Protocol';
-import * as grpc from 'grpc';
-import { connectivityState } from 'grpc';
-import config from '@/config/AgentConfig';
-import { ManagementServiceClient } from '@/proto/management/Management_grpc_pb';
-import { InstancePingPkg } from '@/proto/management/Management_pb';
-import { createLogger } from '@/logging';
-import AuthInterceptor from '@/agent/protocol/grpc/AuthInterceptor';
-import { TraceSegmentReportServiceClient } from '@/proto/language-agent/Tracing_grpc_pb';
-import buffer from '@/agent/Buffer';
-import SegmentObjectAdapter from '@/agent/protocol/grpc/SegmentObjectAdapter';
-
-const logger = createLogger('GrpcProtocol');
+import HeartbeatClient from '@/agent/protocol/grpc/clients/HeartbeatClient';
+import TraceReportClient from '@/agent/protocol/grpc/clients/TraceReportClient';
export default class GrpcProtocol implements Protocol {
- heartbeatClient: ManagementServiceClient;
- heartbeatTimer?: NodeJS.Timeout;
-
- reporterClient: TraceSegmentReportServiceClient;
- reportTimer?: NodeJS.Timeout;
-
- constructor() {
- this.heartbeatClient = new ManagementServiceClient(
- config.collectorAddress,
- grpc.credentials.createInsecure(),
- { interceptors: [AuthInterceptor] },
- );
-
- this.reporterClient = new TraceSegmentReportServiceClient(
- config.collectorAddress,
- grpc.credentials.createInsecure(),
- { interceptors: [AuthInterceptor] },
- );
- }
get isConnected(): boolean {
- return (this.heartbeatClient.getChannel().getConnectivityState(true) === connectivityState.READY)
- && (this.reporterClient.getChannel().getConnectivityState(true) === connectivityState.READY);
+ return HeartbeatClient.isConnected && TraceReportClient.isConnected;
}
heartbeat() {
- if (this.heartbeatTimer) {
- logger.warn(`
- The heartbeat timer has already been scheduled,
- this may be a potential bug, please consider reporting
- this to https://github.com/apache/skywalking/issues/new
- `);
- return;
- }
-
- const keepAlivePkg = new InstancePingPkg()
- .setService(config.serviceName)
- .setServiceinstance(config.serviceInstance);
-
- this.heartbeatTimer = setInterval(() => {
- this.heartbeatClient.keepAlive(
- keepAlivePkg,
-
- (error, _) => {
- if (error) {
- logger.error('Failed to send heartbeat', error);
- }
- },
- );
- }, 3000,
- ).unref();
+ HeartbeatClient.start();
}
report() {
- const reportFunction = () => {
- try {
- if (buffer.length === 0) {
- return;
- }
-
- const stream = this.reporterClient.collect((error, _) => {
- if (error) {
- logger.error('Failed to report trace data', error);
- }
- });
-
- while (buffer.buffer.length > 0) {
- const segment = buffer.buffer.pop();
- if (segment) {
- logger.info('Sending segment', { segment });
- stream.write(new SegmentObjectAdapter(segment));
- }
- }
-
- stream.end();
- } finally {
- this.reportTimer = setTimeout(reportFunction, 1000).unref();
- }
- };
-
- this.reportTimer = setTimeout(reportFunction, 1000).unref();
+ TraceReportClient.start();
}
};
diff --git a/src/agent/protocol/grpc/SegmentObjectAdapter.ts b/src/agent/protocol/grpc/SegmentObjectAdapter.ts
index e6c336b..b5cb16c 100644
--- a/src/agent/protocol/grpc/SegmentObjectAdapter.ts
+++ b/src/agent/protocol/grpc/SegmentObjectAdapter.ts
@@ -22,6 +22,9 @@ import config from '@/config/AgentConfig';
import { KeyStringValuePair } from '@/proto/common/Common_pb';
import Segment from '@/trace/context/Segment';
+/**
+ * An adapter that adapts {@link Segment} objects to gRPC object {@link SegmentObject}.
+ */
export default class SegmentObjectAdapter extends SegmentObject {
constructor(segment: Segment) {
super();
diff --git a/typings/environment.d.ts b/src/agent/protocol/grpc/clients/Client.ts
similarity index 79%
rename from typings/environment.d.ts
rename to src/agent/protocol/grpc/clients/Client.ts
index 7c8f1fd..bbd4361 100644
--- a/typings/environment.d.ts
+++ b/src/agent/protocol/grpc/clients/Client.ts
@@ -17,15 +17,8 @@
*
*/
-declare global {
- namespace NodeJS {
- interface ProcessEnv {
- AUTHORIZATION?: string;
- COLLECTOR_ADDRESS?: string;
- SERVICE_INSTANCE?: string;
- SERVICE_NAME?: string;
- }
- }
-}
+export default interface Client {
+ readonly isConnected: boolean;
-export {};
+ start(): void;
+}
diff --git a/src/agent/protocol/grpc/GrpcProtocol.ts b/src/agent/protocol/grpc/clients/HeartbeatClient.ts
similarity index 57%
copy from src/agent/protocol/grpc/GrpcProtocol.ts
copy to src/agent/protocol/grpc/clients/HeartbeatClient.ts
index 6e04232..b1d21d1 100644
--- a/src/agent/protocol/grpc/GrpcProtocol.ts
+++ b/src/agent/protocol/grpc/clients/HeartbeatClient.ts
@@ -17,47 +17,35 @@
*
*/
-import Protocol from '@/agent/protocol/Protocol';
+
+import { ManagementServiceClient } from '@/proto/management/Management_grpc_pb';
import * as grpc from 'grpc';
import { connectivityState } from 'grpc';
-import config from '@/config/AgentConfig';
-import { ManagementServiceClient } from '@/proto/management/Management_grpc_pb';
-import { InstancePingPkg } from '@/proto/management/Management_pb';
import { createLogger } from '@/logging';
import AuthInterceptor from '@/agent/protocol/grpc/AuthInterceptor';
-import { TraceSegmentReportServiceClient } from '@/proto/language-agent/Tracing_grpc_pb';
-import buffer from '@/agent/Buffer';
-import SegmentObjectAdapter from '@/agent/protocol/grpc/SegmentObjectAdapter';
+import { InstancePingPkg } from '@/proto/management/Management_pb';
+import config from '@/config/AgentConfig';
+import Client from '@/agent/protocol/grpc/clients/Client';
-const logger = createLogger('GrpcProtocol');
+const logger = createLogger('HeartbeatTask');
-export default class GrpcProtocol implements Protocol {
+class HeartbeatClient implements Client {
heartbeatClient: ManagementServiceClient;
heartbeatTimer?: NodeJS.Timeout;
- reporterClient: TraceSegmentReportServiceClient;
- reportTimer?: NodeJS.Timeout;
-
constructor() {
this.heartbeatClient = new ManagementServiceClient(
config.collectorAddress,
grpc.credentials.createInsecure(),
{ interceptors: [AuthInterceptor] },
);
-
- this.reporterClient = new TraceSegmentReportServiceClient(
- config.collectorAddress,
- grpc.credentials.createInsecure(),
- { interceptors: [AuthInterceptor] },
- );
}
get isConnected(): boolean {
- return (this.heartbeatClient.getChannel().getConnectivityState(true) === connectivityState.READY)
- && (this.reporterClient.getChannel().getConnectivityState(true) === connectivityState.READY);
+ return this.heartbeatClient.getChannel().getConnectivityState(true) === connectivityState.READY;
}
- heartbeat() {
+ start() {
if (this.heartbeatTimer) {
logger.warn(`
The heartbeat timer has already been scheduled,
@@ -81,38 +69,9 @@ export default class GrpcProtocol implements Protocol {
}
},
);
- }, 3000,
+ }, 20000,
).unref();
}
+}
- report() {
- const reportFunction = () => {
- try {
- if (buffer.length === 0) {
- return;
- }
-
- const stream = this.reporterClient.collect((error, _) => {
- if (error) {
- logger.error('Failed to report trace data', error);
- }
- });
-
- while (buffer.buffer.length > 0) {
- const segment = buffer.buffer.pop();
- if (segment) {
- logger.info('Sending segment', { segment });
- stream.write(new SegmentObjectAdapter(segment));
- }
- }
-
- stream.end();
- } finally {
- this.reportTimer = setTimeout(reportFunction, 1000).unref();
- }
- };
-
- this.reportTimer = setTimeout(reportFunction, 1000).unref();
- }
-
-};
+export default new HeartbeatClient();
diff --git a/src/agent/protocol/grpc/GrpcProtocol.ts b/src/agent/protocol/grpc/clients/TraceReportClient.ts
similarity index 56%
copy from src/agent/protocol/grpc/GrpcProtocol.ts
copy to src/agent/protocol/grpc/clients/TraceReportClient.ts
index 6e04232..0d70b83 100644
--- a/src/agent/protocol/grpc/GrpcProtocol.ts
+++ b/src/agent/protocol/grpc/clients/TraceReportClient.ts
@@ -17,34 +17,22 @@
*
*/
-import Protocol from '@/agent/protocol/Protocol';
+import { TraceSegmentReportServiceClient } from '@/proto/language-agent/Tracing_grpc_pb';
+import config from '@/config/AgentConfig';
import * as grpc from 'grpc';
import { connectivityState } from 'grpc';
-import config from '@/config/AgentConfig';
-import { ManagementServiceClient } from '@/proto/management/Management_grpc_pb';
-import { InstancePingPkg } from '@/proto/management/Management_pb';
-import { createLogger } from '@/logging';
import AuthInterceptor from '@/agent/protocol/grpc/AuthInterceptor';
-import { TraceSegmentReportServiceClient } from '@/proto/language-agent/Tracing_grpc_pb';
import buffer from '@/agent/Buffer';
import SegmentObjectAdapter from '@/agent/protocol/grpc/SegmentObjectAdapter';
+import { createLogger } from '@/logging';
+import Client from '@/agent/protocol/grpc/clients/Client';
-const logger = createLogger('GrpcProtocol');
-
-export default class GrpcProtocol implements Protocol {
- heartbeatClient: ManagementServiceClient;
- heartbeatTimer?: NodeJS.Timeout;
+const logger = createLogger('TraceReportClient');
+class TraceReportClient implements Client {
reporterClient: TraceSegmentReportServiceClient;
- reportTimer?: NodeJS.Timeout;
constructor() {
- this.heartbeatClient = new ManagementServiceClient(
- config.collectorAddress,
- grpc.credentials.createInsecure(),
- { interceptors: [AuthInterceptor] },
- );
-
this.reporterClient = new TraceSegmentReportServiceClient(
config.collectorAddress,
grpc.credentials.createInsecure(),
@@ -53,39 +41,10 @@ export default class GrpcProtocol implements Protocol {
}
get isConnected(): boolean {
- return (this.heartbeatClient.getChannel().getConnectivityState(true) === connectivityState.READY)
- && (this.reporterClient.getChannel().getConnectivityState(true) === connectivityState.READY);
- }
-
- heartbeat() {
- if (this.heartbeatTimer) {
- logger.warn(`
- The heartbeat timer has already been scheduled,
- this may be a potential bug, please consider reporting
- this to https://github.com/apache/skywalking/issues/new
- `);
- return;
- }
-
- const keepAlivePkg = new InstancePingPkg()
- .setService(config.serviceName)
- .setServiceinstance(config.serviceInstance);
-
- this.heartbeatTimer = setInterval(() => {
- this.heartbeatClient.keepAlive(
- keepAlivePkg,
-
- (error, _) => {
- if (error) {
- logger.error('Failed to send heartbeat', error);
- }
- },
- );
- }, 3000,
- ).unref();
+ return this.reporterClient.getChannel().getConnectivityState(true) === connectivityState.READY;
}
- report() {
+ start() {
const reportFunction = () => {
try {
if (buffer.length === 0) {
@@ -108,11 +67,12 @@ export default class GrpcProtocol implements Protocol {
stream.end();
} finally {
- this.reportTimer = setTimeout(reportFunction, 1000).unref();
+ setTimeout(reportFunction, 1000).unref();
}
};
- this.reportTimer = setTimeout(reportFunction, 1000).unref();
+ setTimeout(reportFunction, 1000).unref();
}
+}
-};
+export default new TraceReportClient();
diff --git a/src/config/AgentConfig.ts b/src/config/AgentConfig.ts
index 6270e1b..56f010b 100644
--- a/src/config/AgentConfig.ts
+++ b/src/config/AgentConfig.ts
@@ -22,6 +22,7 @@ export type AgentConfig = {
serviceInstance?: string;
collectorAddress?: string;
authorization?: string;
+ maxBufferSize?: number;
}
export default {
@@ -29,4 +30,5 @@ export default {
serviceInstance: process.env.SERVICE_INSTANCE || 'your-node-js-instance',
collectorAddress: process.env.COLLECTOR_ADDRESS || '127.0.0.1:11800',
authorization: process.env.AUTHORIZATION,
+ maxBufferSize: process.env.MAX_BUFFER_SIZE || '1000',
};
diff --git a/src/trace/context/ContextManager.ts b/src/trace/context/ContextManager.ts
index 77fc7f6..2b5ce35 100644
--- a/src/trace/context/ContextManager.ts
+++ b/src/trace/context/ContextManager.ts
@@ -28,8 +28,13 @@ class ContextManager {
constructor() {
this.scopeContext = new Map<number, Context>();
+ this.scopeContext.set(1, new SpanContext());
+
this.hooks = createHook({
init: (asyncId: number, type: string, triggerAsyncId: number, resource: object) => {
+ if (type === 'TIMERWRAP') {
+ return;
+ }
const context = this.scopeContext.get(triggerAsyncId) || new SpanContext();
this.scopeContext.set(asyncId, context);
},
diff --git a/src/trace/context/SpanContext.ts b/src/trace/context/SpanContext.ts
index f4addeb..3f92da3 100644
--- a/src/trace/context/SpanContext.ts
+++ b/src/trace/context/SpanContext.ts
@@ -27,6 +27,7 @@ import LocalSpan from '@/trace/span/LocalSpan';
import * as assert from 'assert';
import buffer from '@/agent/Buffer';
import { createLogger } from '@/logging';
+import { executionAsyncId } from 'async_hooks';
const logger = createLogger('SpanContext');
@@ -47,7 +48,7 @@ export default class SpanContext implements Context {
}
newEntrySpan(operation: string, carrier?: ContextCarrier): Span {
- logger.debug('Creating entry span', { parentId: this.parentId });
+ logger.debug('Creating entry span', { parentId: this.parentId, executionAsyncId: executionAsyncId() });
return new EntrySpan({
id: this.spanId++,
parentId: this.parentId,
@@ -57,6 +58,7 @@ export default class SpanContext implements Context {
}
newExitSpan(operation: string, peer: string, carrier?: ContextCarrier): Span {
+ logger.debug('Creating exit span', { parentId: this.parentId, executionAsyncId: executionAsyncId() });
return new ExitSpan({
id: this.spanId++,
parentId: this.parentId,
@@ -66,6 +68,7 @@ export default class SpanContext implements Context {
}
newLocalSpan(operation: string): Span {
+ logger.debug('Creating local span', { parentId: this.parentId, executionAsyncId: executionAsyncId() });
return new LocalSpan({
id: this.spanId++,
parentId: this.parentId,
diff --git a/src/trace/span/LocalSpan.ts b/src/trace/span/LocalSpan.ts
index 85969dc..c20e4da 100644
--- a/src/trace/span/LocalSpan.ts
+++ b/src/trace/span/LocalSpan.ts
@@ -17,11 +17,10 @@
*
*/
-import StackedSpan from '@/trace/span/StackedSpan';
-import { SpanCtorOptions } from '@/trace/span/Span';
+import Span, { SpanCtorOptions } from '@/trace/span/Span';
import { SpanType } from '@/proto/language-agent/Tracing_pb';
-export default class LocalSpan extends StackedSpan {
+export default class LocalSpan extends Span {
constructor(options: SpanCtorOptions) {
super(Object.assign(options, {
type: SpanType.LOCAL,
diff --git a/src/trace/span/Span.ts b/src/trace/span/Span.ts
index bb690c8..388e194 100644
--- a/src/trace/span/Span.ts
+++ b/src/trace/span/Span.ts
@@ -26,6 +26,7 @@ import { ContextCarrier } from '@/trace/context/Carrier';
import ID from '@/trace/ID';
import SegmentRef from '@/trace/context/SegmentRef';
import { SpanLayer, SpanType } from '@/proto/language-agent/Tracing_pb';
+import { createLogger } from '@/logging';
export type SpanCtorOptions = {
context: Context;
@@ -37,6 +38,8 @@ export type SpanCtorOptions = {
component?: Component;
};
+const logger = createLogger('Span');
+
export default abstract class Span {
readonly context: Context;
readonly type: SpanType;
@@ -69,17 +72,20 @@ export default abstract class Span {
}
start(): this {
+ logger.debug('Starting span', this);
this.startTime = new Date().getTime();
this.context.start(this);
return this;
}
stop(): this {
+ logger.debug('Stopping span', this);
this.context.stop(this);
return this;
}
finish(segment: Segment): boolean {
+ logger.debug('Finishing span', this);
this.endTime = new Date().getTime();
segment.archive(this);
return true;
diff --git a/src/trace/span/StackedSpan.ts b/src/trace/span/StackedSpan.ts
index 196c7c1..357350b 100644
--- a/src/trace/span/StackedSpan.ts
+++ b/src/trace/span/StackedSpan.ts
@@ -32,7 +32,7 @@ export default class StackedSpan extends Span {
}
finish(segment: Segment): boolean {
- logger.debug('Finishing', { depth: this.depth });
+ logger.debug('Finishing span', this);
return --this.depth === 0 && super.finish(segment);
}
}