You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by hu...@apache.org on 2019/11/14 02:14:51 UTC
[dubbo-js] branch master updated: 闲时心跳
This is an automated email from the ASF dual-hosted git repository.
hufeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-js.git
The following commit(s) were added to refs/heads/master by this push:
new 08988ba 闲时心跳
new 1a494c1 Merge pull request #156 from sunchuanleihit/master
08988ba is described below
commit 08988ba831aea37fc13ea47c63a6c848a693450d
Author: sunchuanleihit <su...@163.com>
AuthorDate: Sat Nov 9 14:07:05 2019 +0800
闲时心跳
---
packages/dubbo/src/registry/zookeeper.ts | 3 +++
packages/dubbo/src/socket-worker.ts | 32 +++++++++++++++++++++++---------
2 files changed, 26 insertions(+), 9 deletions(-)
diff --git a/packages/dubbo/src/registry/zookeeper.ts b/packages/dubbo/src/registry/zookeeper.ts
index 53fdc90..18b0acf 100644
--- a/packages/dubbo/src/registry/zookeeper.ts
+++ b/packages/dubbo/src/registry/zookeeper.ts
@@ -183,6 +183,9 @@ export class ZkRegistry extends Registry<IZkClientProps & IDubboRegistryProps> {
const {url: register, zkAuthInfo} = this._props;
//debug log
log(`connecting zkserver ${register}`);
+ if (this._client) {
+ this._client.removeAllListeners();
+ }
//connect
this._client = zookeeper.createClient(register, {
retries: 10,
diff --git a/packages/dubbo/src/socket-worker.ts b/packages/dubbo/src/socket-worker.ts
index d1a9400..9fa1cf9 100644
--- a/packages/dubbo/src/socket-worker.ts
+++ b/packages/dubbo/src/socket-worker.ts
@@ -52,7 +52,6 @@ export default class SocketWorker implements IObservable<ISocketSubscriber> {
this.host = host;
this.port = port;
this._retry = RETRY_NUM;
- this._retryHeartBeat = RETRY_HEARD_BEAT_TIME;
this._status = SOCKET_STATUS.PADDING;
log('new SocketWorker#%d|> %s %s', pid, host + ':' + port, this._status);
@@ -80,12 +79,13 @@ export default class SocketWorker implements IObservable<ISocketSubscriber> {
private _retry: number;
private _retryTimeoutId: NodeJS.Timer;
- private _retryHeartBeat: number;
private _heartBeatTimer: NodeJS.Timer;
private _socket: net.Socket;
private _status: SOCKET_STATUS;
private _decodeBuff: DecodeBuffer;
private _subscriber: ISocketSubscriber;
+ private _lastReadTimestamp: number = -1;
+ private _lastWriteTimestamp: number = -1;
//==================================public method==========================
@@ -110,6 +110,7 @@ export default class SocketWorker implements IObservable<ISocketSubscriber> {
//when current worker close, fail dubbo request
ctx.pid = this.pid;
const encoder = new DubboEncoder(ctx);
+ this.setWriteTimestamp();
this._socket.write(encoder.encode());
}
@@ -188,7 +189,8 @@ export default class SocketWorker implements IObservable<ISocketSubscriber> {
//reset retry number
this._retry = RETRY_NUM;
- this._retryHeartBeat = RETRY_HEARD_BEAT_TIME;
+ this.setReadTimestamp();
+ this.setWriteTimestamp();
//notifiy subscriber, the socketworker was connected successfully
this._subscriber.onConnect({
@@ -200,18 +202,20 @@ export default class SocketWorker implements IObservable<ISocketSubscriber> {
//heartbeart
//when network is close, the connection maybe not close, so check the heart beat times
this._heartBeatTimer = setInterval(() => {
- if (this._retryHeartBeat > 0) {
- log('emit heartbeat');
- this._retryHeartBeat--;
- this._socket.write(HeartBeat.encode());
- } else {
+ const now = Date.now();
+ if (now - this._lastReadTimestamp > HEART_BEAT * RETRY_HEARD_BEAT_TIME) {
this._onClose(false);
+ } else if ((now - this._lastWriteTimestamp > HEART_BEAT) || (now - this._lastReadTimestamp > HEART_BEAT)){
+ log('SocketWorker#${this.pid} emit heartbeat');
+ this.setWriteTimestamp();
+ this._socket.write(HeartBeat.encode());
}
}, HEART_BEAT);
};
private _onData = data => {
log(`SocketWorker#${this.pid} =receive data=> ${this.host}:${this.port}`);
+ this.setReadTimestamp();
this._decodeBuff.receive(data);
};
@@ -249,6 +253,8 @@ export default class SocketWorker implements IObservable<ISocketSubscriber> {
//clear buffer
this._decodeBuff.clearBuffer();
clearInterval(this._heartBeatTimer);
+ this._lastReadTimestamp = -1;
+ this._lastWriteTimestamp = -1;
if (this._retry > 0) {
//set current status
@@ -274,11 +280,19 @@ export default class SocketWorker implements IObservable<ISocketSubscriber> {
private _onSubscribeDecodeBuff = (data: Buffer) => {
if (HeartBeat.isHeartBeat(data)) {
log(`SocketWorker#${this.pid} <=receive= heartbeat data.`);
- this._retryHeartBeat = RETRY_HEARD_BEAT_TIME; //reset heart beat times
} else {
const json = decode(data);
log(`SocketWorker#${this.pid} <=received=> dubbo result %O`, json);
this._subscriber.onData(json);
}
};
+
+ private setReadTimestamp() {
+ this._lastReadTimestamp = Date.now();
+ }
+
+ private setWriteTimestamp() {
+ this._lastReadTimestamp = Date.now();
+ }
+
}