You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by hu...@apache.org on 2021/08/04 09:31:33 UTC
[dubbo-js] branch master updated: enhancement: implements cross
node process port reused
This is an automated email from the ASF dual-hosted git repository.
hufeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-js.git
The following commit(s) were added to refs/heads/master by this push:
new 70939dd enhancement: implements cross node process port reused
70939dd is described below
commit 70939dd6be9681aeb5d89d719bcfc548914247dc
Author: hufeng <fe...@gmail.com>
AuthorDate: Wed Aug 4 17:31:09 2021 +0800
enhancement: implements cross node process port reused
---
.gitignore | 1 +
packages/dubbo-service/package.json | 2 +
packages/dubbo-service/src/__integration__/port.ts | 18 ++++
packages/dubbo-service/src/__tests__/port-test.ts | 19 +++-
packages/dubbo-service/src/dubbo-service.ts | 6 +-
packages/dubbo-service/src/port.ts | 108 ++++++++++++++++++---
6 files changed, 133 insertions(+), 21 deletions(-)
diff --git a/.gitignore b/.gitignore
index 23cd282..8a9083e 100644
--- a/.gitignore
+++ b/.gitignore
@@ -15,6 +15,7 @@
# limitations under the License.
yarn.lock
package-lock.json
+.dubbojs
.idea
.project
.settings
diff --git a/packages/dubbo-service/package.json b/packages/dubbo-service/package.json
index c5946ad..5afd37b 100644
--- a/packages/dubbo-service/package.json
+++ b/packages/dubbo-service/package.json
@@ -19,6 +19,8 @@
},
"dependencies": {
"ip": "^1.1.5",
+ "proper-lockfile": "^4.1.2",
+ "fs-extra": "^10.0.0",
"apache-dubbo-serialization": "^0.0.2",
"apache-dubbo-common": "^0.0.1",
"get-port": "^5.1.1"
diff --git a/packages/dubbo-service/src/__integration__/port.ts b/packages/dubbo-service/src/__integration__/port.ts
new file mode 100644
index 0000000..212e4a1
--- /dev/null
+++ b/packages/dubbo-service/src/__integration__/port.ts
@@ -0,0 +1,18 @@
+import cluster from 'cluster'
+import { portManager } from '../port'
+
+if (cluster.isMaster) {
+ for (let i = 0; i < 4; i++) {
+ cluster.fork()
+ }
+ cluster.on('exit', () => {
+ console.log('fork')
+ cluster.fork()
+ })
+} else {
+ ;(async () => {
+ console.log('pid start---->', process.pid)
+ const port = await portManager.getReusedPort()
+ console.log(port)
+ })()
+}
diff --git a/packages/dubbo-service/src/__tests__/port-test.ts b/packages/dubbo-service/src/__tests__/port-test.ts
index aeab76f..932b8e6 100644
--- a/packages/dubbo-service/src/__tests__/port-test.ts
+++ b/packages/dubbo-service/src/__tests__/port-test.ts
@@ -15,9 +15,20 @@
* limitations under the License.
*/
-import getPort from 'get-port'
+import cluster from 'cluster'
+import path from 'path'
+import fs from 'fs-extra'
+import { portManager } from '../port'
-it('test get port', async () => {
- const port = await getPort()
- expect(port).toBeTruthy()
+describe('port test suite', () => {
+ it('test master process', async () => {
+ expect(portManager.isMasterProcess).toBeTruthy()
+ const port = await portManager.getReusedPort()
+ expect(port).toBeTruthy()
+ expect(
+ fs.existsSync(path.join(process.cwd(), '.dubbojs/dubbo.lock'))
+ ).toBeTruthy()
+ })
+
+ it('test cluster mode', async () => {})
})
diff --git a/packages/dubbo-service/src/dubbo-service.ts b/packages/dubbo-service/src/dubbo-service.ts
index cbbb93e..1e313f7 100644
--- a/packages/dubbo-service/src/dubbo-service.ts
+++ b/packages/dubbo-service/src/dubbo-service.ts
@@ -30,7 +30,7 @@ import {
Request
} from 'apache-dubbo-serialization'
import Context from './context'
-import { randomPort } from './port'
+import { portManager } from './port'
import * as s from './dubbo-setting'
import {
DubboServiceClazzName,
@@ -127,7 +127,7 @@ export default class DubboService {
* start tcp server
*/
private listen = async () => {
- this.port = await randomPort()
+ this.port = await portManager.getReusedPort()
log(`init service with port: %d`, this.port)
this.server = net
@@ -209,7 +209,7 @@ export default class DubboService {
const method = service.methods[request.methodName]
ctx.status = DUBBO_RESPONSE_STATUS.OK
try {
- // FIXEDME waiting dubbo/dj
+ // FIXME waiting dubbo/dj
// check hessian type
// if (!util.checkRetValHessian(res)) {
// ctx.body.err = new Error(
diff --git a/packages/dubbo-service/src/port.ts b/packages/dubbo-service/src/port.ts
index 7ce1a44..3e8c207 100644
--- a/packages/dubbo-service/src/port.ts
+++ b/packages/dubbo-service/src/port.ts
@@ -15,24 +15,104 @@
* limitations under the License.
*/
+import path from 'path'
+import cluster from 'cluster'
import getPort from 'get-port'
import debug from 'debug'
+import fs from 'fs-extra'
+import lockfile from 'proper-lockfile'
const dlog = debug('dubbo-server:get-port')
+const ROOT = path.join(process.cwd(), '.dubbojs')
+const LOCK_FILE = path.join(ROOT, 'dubbo')
-export async function randomPort() {
- // 本地空闲的端口
- // 在多进程同时启动的时候,端口的获取不是竞态的,所以可能导致不同的进程获取的端口是相同的
- // 这时候只有其中一个进程listen该端口,导致其他的进程,监听失败,导致进程启动失败
- //
- // 通过以下核心方式来解决
- // 获取一段的空闲端口,随机选择一个, 通过随机来降低端口冲突的概率
- // 如果冲突 再次获取
- const ports = []
- for (let i = 0; i < 10; i++) {
- const port = await getPort({ port: getPort.makeRange(20888, 30000) })
- ports.push(port)
+export class PortManager {
+ private port: number
+
+ constructor() {
+ if (this.isMasterProcess) {
+ // create dubbo lock file
+ fs.ensureFileSync(LOCK_FILE)
+ } else {
+ this.clearPidPort()
+ }
+ }
+
+ async getReusedPort(): Promise<number> {
+ if (this.isMasterProcess) {
+ this.port = await this.getFreePort()
+ return this.port
+ }
+
+ try {
+ // set file lock
+ const release = await lockfile.lock(LOCK_FILE, {
+ retries: { retries: 5, maxTimeout: 5000 }
+ })
+ dlog('pid %d get lock', process.pid)
+ // find available reused port
+ const dirs = await fs.readdir(ROOT)
+ dlog('scan %s dir includes %O', ROOT, dirs)
+ const excludes = []
+ const portPidFiles = dirs.filter((dir) => !dir.startsWith('dubbo'))
+ for (let portPid of portPidFiles) {
+ const file = fs.readFileSync(path.join(ROOT, portPid)).toString()
+ if (file === '') {
+ release()
+ fs.writeFileSync(path.join(ROOT, portPid), String(process.pid))
+ this.port = Number(portPid)
+ return this.port
+ } else {
+ excludes.push(Number(portPid))
+ }
+ }
+
+ this.port = await this.getFreePort(excludes)
+ fs.writeFileSync(path.join(ROOT, String(this.port)), String(process.pid))
+ release()
+ return this.port
+ } catch (err) {
+ throw err
+ }
+ }
+
+ async getFreePort(exclude: Array<number> = []) {
+ const ports = []
+ for (let i = 0; i < 10; i++) {
+ // gen new port
+ const port = await getPort({ port: getPort.makeRange(20888, 30000) })
+ ports.push(port)
+ }
+
+ const availablePort = ports.filter((port) => !exclude.includes(port))[0]
+ dlog('get random port %d in master mode', availablePort)
+ return availablePort
+ }
+
+ clearPidPort = () => {
+ const cleanup = () => {
+ const pid = process.pid
+ dlog('clear port pid %d', pid)
+ fs.writeFileSync(path.join(ROOT, String(this.port)), '')
+ }
+ ;[
+ 'exit',
+ 'SIGINT',
+ 'SIGUSR2',
+ 'SIGUSR1',
+ 'SIGTERM',
+ 'uncaughtException'
+ ].forEach((event) => {
+ process.on(event, cleanup)
+ })
+ }
+
+ get isMasterProcess() {
+ const isClusterMode = cluster.isMaster
+ const isPm2MasterMode =
+ process.env.NODE_APP_INSTANCE && process.env.NODE_APP_INSTANCE === '0'
+ return isClusterMode || isPm2MasterMode
}
- dlog(`get ports %s`, ports.join())
- return ports[Math.floor(Math.random() * 10)]
}
+
+export const portManager = new PortManager()