You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by hu...@apache.org on 2019/11/14 02:14:51 UTC

[dubbo-js] branch master updated: 闲时心跳

This is an automated email from the ASF dual-hosted git repository.

hufeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-js.git


The following commit(s) were added to refs/heads/master by this push:
     new 08988ba  闲时心跳
     new 1a494c1  Merge pull request #156 from sunchuanleihit/master
08988ba is described below

commit 08988ba831aea37fc13ea47c63a6c848a693450d
Author: sunchuanleihit <su...@163.com>
AuthorDate: Sat Nov 9 14:07:05 2019 +0800

    闲时心跳
---
 packages/dubbo/src/registry/zookeeper.ts |  3 +++
 packages/dubbo/src/socket-worker.ts      | 32 +++++++++++++++++++++++---------
 2 files changed, 26 insertions(+), 9 deletions(-)

diff --git a/packages/dubbo/src/registry/zookeeper.ts b/packages/dubbo/src/registry/zookeeper.ts
index 53fdc90..18b0acf 100644
--- a/packages/dubbo/src/registry/zookeeper.ts
+++ b/packages/dubbo/src/registry/zookeeper.ts
@@ -183,6 +183,9 @@ export class ZkRegistry extends Registry<IZkClientProps & IDubboRegistryProps> {
     const {url: register, zkAuthInfo} = this._props;
     //debug log
     log(`connecting zkserver ${register}`);
+    if (this._client) {
+      this._client.removeAllListeners();
+    }
     //connect
     this._client = zookeeper.createClient(register, {
       retries: 10,
diff --git a/packages/dubbo/src/socket-worker.ts b/packages/dubbo/src/socket-worker.ts
index d1a9400..9fa1cf9 100644
--- a/packages/dubbo/src/socket-worker.ts
+++ b/packages/dubbo/src/socket-worker.ts
@@ -52,7 +52,6 @@ export default class SocketWorker implements IObservable<ISocketSubscriber> {
     this.host = host;
     this.port = port;
     this._retry = RETRY_NUM;
-    this._retryHeartBeat = RETRY_HEARD_BEAT_TIME;
     this._status = SOCKET_STATUS.PADDING;
 
     log('new SocketWorker#%d|> %s %s', pid, host + ':' + port, this._status);
@@ -80,12 +79,13 @@ export default class SocketWorker implements IObservable<ISocketSubscriber> {
 
   private _retry: number;
   private _retryTimeoutId: NodeJS.Timer;
-  private _retryHeartBeat: number;
   private _heartBeatTimer: NodeJS.Timer;
   private _socket: net.Socket;
   private _status: SOCKET_STATUS;
   private _decodeBuff: DecodeBuffer;
   private _subscriber: ISocketSubscriber;
+  private _lastReadTimestamp: number = -1;
+  private _lastWriteTimestamp: number = -1;
 
   //==================================public method==========================
 
@@ -110,6 +110,7 @@ export default class SocketWorker implements IObservable<ISocketSubscriber> {
     //when current worker close, fail dubbo request
     ctx.pid = this.pid;
     const encoder = new DubboEncoder(ctx);
+    this.setWriteTimestamp();
     this._socket.write(encoder.encode());
   }
 
@@ -188,7 +189,8 @@ export default class SocketWorker implements IObservable<ISocketSubscriber> {
 
     //reset retry number
     this._retry = RETRY_NUM;
-    this._retryHeartBeat = RETRY_HEARD_BEAT_TIME;
+    this.setReadTimestamp();
+    this.setWriteTimestamp();
 
     //notifiy subscriber, the socketworker was connected successfully
     this._subscriber.onConnect({
@@ -200,18 +202,20 @@ export default class SocketWorker implements IObservable<ISocketSubscriber> {
     //heartbeart
     //when network is close, the connection maybe not close, so check the heart beat times
     this._heartBeatTimer = setInterval(() => {
-      if (this._retryHeartBeat > 0) {
-        log('emit heartbeat');
-        this._retryHeartBeat--;
-        this._socket.write(HeartBeat.encode());
-      } else {
+      const now = Date.now();
+      if (now - this._lastReadTimestamp > HEART_BEAT * RETRY_HEARD_BEAT_TIME) {
         this._onClose(false);
+      } else if ((now - this._lastWriteTimestamp > HEART_BEAT) ||  (now - this._lastReadTimestamp > HEART_BEAT)){
+        log('SocketWorker#${this.pid} emit heartbeat');
+        this.setWriteTimestamp();
+        this._socket.write(HeartBeat.encode());
       }
     }, HEART_BEAT);
   };
 
   private _onData = data => {
     log(`SocketWorker#${this.pid}  =receive data=> ${this.host}:${this.port}`);
+    this.setReadTimestamp();
     this._decodeBuff.receive(data);
   };
 
@@ -249,6 +253,8 @@ export default class SocketWorker implements IObservable<ISocketSubscriber> {
     //clear buffer
     this._decodeBuff.clearBuffer();
     clearInterval(this._heartBeatTimer);
+    this._lastReadTimestamp = -1;
+    this._lastWriteTimestamp = -1;
 
     if (this._retry > 0) {
       //set current status
@@ -274,11 +280,19 @@ export default class SocketWorker implements IObservable<ISocketSubscriber> {
   private _onSubscribeDecodeBuff = (data: Buffer) => {
     if (HeartBeat.isHeartBeat(data)) {
       log(`SocketWorker#${this.pid} <=receive= heartbeat data.`);
-      this._retryHeartBeat = RETRY_HEARD_BEAT_TIME; //reset heart beat times
     } else {
       const json = decode(data);
       log(`SocketWorker#${this.pid} <=received=> dubbo result %O`, json);
       this._subscriber.onData(json);
     }
   };
+
+  private setReadTimestamp() {
+    this._lastReadTimestamp = Date.now();
+  }
+
+  private setWriteTimestamp() {
+    this._lastReadTimestamp = Date.now();
+  }
+
 }