You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by hu...@apache.org on 2021/08/04 09:31:33 UTC

[dubbo-js] branch master updated: enhancement: implements cross node process port reused

This is an automated email from the ASF dual-hosted git repository.

hufeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-js.git


The following commit(s) were added to refs/heads/master by this push:
     new 70939dd  enhancement: implements cross node process port reused
70939dd is described below

commit 70939dd6be9681aeb5d89d719bcfc548914247dc
Author: hufeng <fe...@gmail.com>
AuthorDate: Wed Aug 4 17:31:09 2021 +0800

    enhancement: implements cross node process port reused
---
 .gitignore                                         |   1 +
 packages/dubbo-service/package.json                |   2 +
 packages/dubbo-service/src/__integration__/port.ts |  18 ++++
 packages/dubbo-service/src/__tests__/port-test.ts  |  19 +++-
 packages/dubbo-service/src/dubbo-service.ts        |   6 +-
 packages/dubbo-service/src/port.ts                 | 108 ++++++++++++++++++---
 6 files changed, 133 insertions(+), 21 deletions(-)

diff --git a/.gitignore b/.gitignore
index 23cd282..8a9083e 100644
--- a/.gitignore
+++ b/.gitignore
@@ -15,6 +15,7 @@
 #  limitations under the License.
 yarn.lock
 package-lock.json
+.dubbojs
 .idea
 .project
 .settings
diff --git a/packages/dubbo-service/package.json b/packages/dubbo-service/package.json
index c5946ad..5afd37b 100644
--- a/packages/dubbo-service/package.json
+++ b/packages/dubbo-service/package.json
@@ -19,6 +19,8 @@
   },
   "dependencies": {
     "ip": "^1.1.5",
+    "proper-lockfile": "^4.1.2",
+    "fs-extra": "^10.0.0",
     "apache-dubbo-serialization": "^0.0.2",
     "apache-dubbo-common": "^0.0.1",
     "get-port": "^5.1.1"
diff --git a/packages/dubbo-service/src/__integration__/port.ts b/packages/dubbo-service/src/__integration__/port.ts
new file mode 100644
index 0000000..212e4a1
--- /dev/null
+++ b/packages/dubbo-service/src/__integration__/port.ts
@@ -0,0 +1,18 @@
+import cluster from 'cluster'
+import { portManager } from '../port'
+
+if (cluster.isMaster) {
+  for (let i = 0; i < 4; i++) {
+    cluster.fork()
+  }
+  cluster.on('exit', () => {
+    console.log('fork')
+    cluster.fork()
+  })
+} else {
+  ;(async () => {
+    console.log('pid start---->', process.pid)
+    const port = await portManager.getReusedPort()
+    console.log(port)
+  })()
+}
diff --git a/packages/dubbo-service/src/__tests__/port-test.ts b/packages/dubbo-service/src/__tests__/port-test.ts
index aeab76f..932b8e6 100644
--- a/packages/dubbo-service/src/__tests__/port-test.ts
+++ b/packages/dubbo-service/src/__tests__/port-test.ts
@@ -15,9 +15,20 @@
  * limitations under the License.
  */
 
-import getPort from 'get-port'
+import cluster from 'cluster'
+import path from 'path'
+import fs from 'fs-extra'
+import { portManager } from '../port'
 
-it('test get port', async () => {
-  const port = await getPort()
-  expect(port).toBeTruthy()
+describe('port test suite', () => {
+  it('test master process', async () => {
+    expect(portManager.isMasterProcess).toBeTruthy()
+    const port = await portManager.getReusedPort()
+    expect(port).toBeTruthy()
+    expect(
+      fs.existsSync(path.join(process.cwd(), '.dubbojs/dubbo.lock'))
+    ).toBeTruthy()
+  })
+
+  it('test cluster mode', async () => {})
 })
diff --git a/packages/dubbo-service/src/dubbo-service.ts b/packages/dubbo-service/src/dubbo-service.ts
index cbbb93e..1e313f7 100644
--- a/packages/dubbo-service/src/dubbo-service.ts
+++ b/packages/dubbo-service/src/dubbo-service.ts
@@ -30,7 +30,7 @@ import {
   Request
 } from 'apache-dubbo-serialization'
 import Context from './context'
-import { randomPort } from './port'
+import { portManager } from './port'
 import * as s from './dubbo-setting'
 import {
   DubboServiceClazzName,
@@ -127,7 +127,7 @@ export default class DubboService {
    * start tcp server
    */
   private listen = async () => {
-    this.port = await randomPort()
+    this.port = await portManager.getReusedPort()
     log(`init service with port: %d`, this.port)
 
     this.server = net
@@ -209,7 +209,7 @@ export default class DubboService {
         const method = service.methods[request.methodName]
         ctx.status = DUBBO_RESPONSE_STATUS.OK
         try {
-          // FIXEDME waiting dubbo/dj
+          // FIXME waiting dubbo/dj
           // check hessian type
           // if (!util.checkRetValHessian(res)) {
           //   ctx.body.err = new Error(
diff --git a/packages/dubbo-service/src/port.ts b/packages/dubbo-service/src/port.ts
index 7ce1a44..3e8c207 100644
--- a/packages/dubbo-service/src/port.ts
+++ b/packages/dubbo-service/src/port.ts
@@ -15,24 +15,104 @@
  * limitations under the License.
  */
 
+import path from 'path'
+import cluster from 'cluster'
 import getPort from 'get-port'
 import debug from 'debug'
+import fs from 'fs-extra'
+import lockfile from 'proper-lockfile'
 
 const dlog = debug('dubbo-server:get-port')
+const ROOT = path.join(process.cwd(), '.dubbojs')
+const LOCK_FILE = path.join(ROOT, 'dubbo')
 
-export async function randomPort() {
-  // 本地空闲的端口
-  // 在多进程同时启动的时候,端口的获取不是竞态的,所以可能导致不同的进程获取的端口是相同的
-  // 这时候只有其中一个进程listen该端口,导致其他的进程,监听失败,导致进程启动失败
-  //
-  // 通过以下核心方式来解决
-  // 获取一段的空闲端口,随机选择一个, 通过随机来降低端口冲突的概率
-  // 如果冲突 再次获取
-  const ports = []
-  for (let i = 0; i < 10; i++) {
-    const port = await getPort({ port: getPort.makeRange(20888, 30000) })
-    ports.push(port)
+export class PortManager {
+  private port: number
+
+  constructor() {
+    if (this.isMasterProcess) {
+      // create dubbo lock file
+      fs.ensureFileSync(LOCK_FILE)
+    } else {
+      this.clearPidPort()
+    }
+  }
+
+  async getReusedPort(): Promise<number> {
+    if (this.isMasterProcess) {
+      this.port = await this.getFreePort()
+      return this.port
+    }
+
+    try {
+      // set file lock
+      const release = await lockfile.lock(LOCK_FILE, {
+        retries: { retries: 5, maxTimeout: 5000 }
+      })
+      dlog('pid %d get lock', process.pid)
+      // find available reused port
+      const dirs = await fs.readdir(ROOT)
+      dlog('scan %s dir includes %O', ROOT, dirs)
+      const excludes = []
+      const portPidFiles = dirs.filter((dir) => !dir.startsWith('dubbo'))
+      for (let portPid of portPidFiles) {
+        const file = fs.readFileSync(path.join(ROOT, portPid)).toString()
+        if (file === '') {
+          release()
+          fs.writeFileSync(path.join(ROOT, portPid), String(process.pid))
+          this.port = Number(portPid)
+          return this.port
+        } else {
+          excludes.push(Number(portPid))
+        }
+      }
+
+      this.port = await this.getFreePort(excludes)
+      fs.writeFileSync(path.join(ROOT, String(this.port)), String(process.pid))
+      release()
+      return this.port
+    } catch (err) {
+      throw err
+    }
+  }
+
+  async getFreePort(exclude: Array<number> = []) {
+    const ports = []
+    for (let i = 0; i < 10; i++) {
+      // gen new port
+      const port = await getPort({ port: getPort.makeRange(20888, 30000) })
+      ports.push(port)
+    }
+
+    const availablePort = ports.filter((port) => !exclude.includes(port))[0]
+    dlog('get random port %d in master mode', availablePort)
+    return availablePort
+  }
+
+  clearPidPort = () => {
+    const cleanup = () => {
+      const pid = process.pid
+      dlog('clear port pid %d', pid)
+      fs.writeFileSync(path.join(ROOT, String(this.port)), '')
+    }
+    ;[
+      'exit',
+      'SIGINT',
+      'SIGUSR2',
+      'SIGUSR1',
+      'SIGTERM',
+      'uncaughtException'
+    ].forEach((event) => {
+      process.on(event, cleanup)
+    })
+  }
+
+  get isMasterProcess() {
+    const isClusterMode = cluster.isMaster
+    const isPm2MasterMode =
+      process.env.NODE_APP_INSTANCE && process.env.NODE_APP_INSTANCE === '0'
+    return isClusterMode || isPm2MasterMode
   }
-  dlog(`get ports %s`, ports.join())
-  return ports[Math.floor(Math.random() * 10)]
 }
+
+export const portManager = new PortManager()