You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by if...@apache.org on 2021/03/27 18:38:35 UTC
[rocketmq-client-nodejs] 01/01: refactor: powered by N-API and
rocketmq-client-cpp@re_dev
This is an automated email from the ASF dual-hosted git repository.
ifplusor pushed a commit to branch n-api
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-nodejs.git
commit d7d8287a5882b9f06035fb3b84017aea079b5db7
Author: James Yin <yw...@hotmail.com>
AuthorDate: Fri Mar 26 12:30:05 2021 +0800
refactor: powered by N-API and rocketmq-client-cpp@re_dev
---
.clang-format | 111 ++++++
.gitignore | 6 +-
.gitmodules | 5 +-
README.md | 10 +-
binding.gyp | 28 +-
deps/rocketmq | 2 +-
lib/common.js | 28 --
lib/env_init.js | 32 --
lib/producer.js | 6 +-
lib/push_consumer.js | 6 +-
package.json | 22 +-
script/download_lib.js | 131 ------
script/get_linux_distro_route.js | 102 -----
src/consumer_ack.cpp | 86 ++--
src/consumer_ack.h | 51 +--
src/consumer_ack_inner.cpp | 77 ----
src/consumer_ack_inner.h | 42 --
src/producer.cpp | 440 +++++++++++----------
src/producer.h | 48 +--
src/push_consumer.cpp | 546 ++++++++++----------------
src/push_consumer.h | 66 ++--
src/rocketmq.cpp | 32 +-
src/workers/producer/send_message.h | 80 ----
src/workers/producer/start_or_shutdown.h | 88 -----
src/workers/push_consumer/start_or_shutdown.h | 88 -----
25 files changed, 691 insertions(+), 1442 deletions(-)
diff --git a/.clang-format b/.clang-format
new file mode 100644
index 0000000..4aad29c
--- /dev/null
+++ b/.clang-format
@@ -0,0 +1,111 @@
+---
+Language: Cpp
+# BasedOnStyle: Google
+AccessModifierOffset: -1
+AlignAfterOpenBracket: Align
+AlignConsecutiveAssignments: false
+AlignConsecutiveDeclarations: false
+AlignEscapedNewlines: Right
+AlignOperands: true
+AlignTrailingComments: true
+AllowAllParametersOfDeclarationOnNextLine: true
+AllowShortBlocksOnASingleLine: false
+AllowShortCaseLabelsOnASingleLine: false
+AllowShortFunctionsOnASingleLine: Inline
+AllowShortIfStatementsOnASingleLine: true
+AllowShortLoopsOnASingleLine: true
+AlwaysBreakAfterDefinitionReturnType: None
+AlwaysBreakAfterReturnType: None
+AlwaysBreakBeforeMultilineStrings: false
+AlwaysBreakTemplateDeclarations: true
+BinPackArguments: false
+BinPackParameters: false
+BraceWrapping:
+ AfterClass: false
+ AfterControlStatement: false
+ AfterEnum: false
+ AfterFunction: false
+ AfterNamespace: false
+ AfterObjCDeclaration: false
+ AfterStruct: false
+ AfterUnion: false
+ AfterExternBlock: false
+ BeforeCatch: false
+ BeforeElse: false
+ IndentBraces: false
+ SplitEmptyFunction: true
+ SplitEmptyRecord: true
+ SplitEmptyNamespace: true
+BreakBeforeBinaryOperators: None
+BreakBeforeBraces: Attach
+BreakBeforeInheritanceComma: false
+BreakBeforeTernaryOperators: true
+BreakConstructorInitializersBeforeComma: false
+BreakConstructorInitializers: BeforeColon
+BreakAfterJavaFieldAnnotations: false
+BreakStringLiterals: true
+ColumnLimit: 80
+CommentPragmas: '^ IWYU pragma:'
+CompactNamespaces: false
+ConstructorInitializerAllOnOneLineOrOnePerLine: true
+ConstructorInitializerIndentWidth: 4
+ContinuationIndentWidth: 4
+Cpp11BracedListStyle: true
+DerivePointerAlignment: false
+DisableFormat: false
+ExperimentalAutoDetectBinPacking: false
+FixNamespaceComments: true
+ForEachMacros:
+ - foreach
+ - Q_FOREACH
+ - BOOST_FOREACH
+IncludeBlocks: Preserve
+IncludeCategories:
+ - Regex: '^<ext/.*\.h>'
+ Priority: 2
+ - Regex: '^<.*\.h>'
+ Priority: 1
+ - Regex: '^<.*'
+ Priority: 2
+ - Regex: '.*'
+ Priority: 3
+IncludeIsMainRegex: '([-_](test|unittest))?$'
+IndentCaseLabels: true
+IndentPPDirectives: None
+IndentWidth: 2
+IndentWrappedFunctionNames: false
+JavaScriptQuotes: Leave
+JavaScriptWrapImports: true
+KeepEmptyLinesAtTheStartOfBlocks: false
+MacroBlockBegin: ''
+MacroBlockEnd: ''
+MaxEmptyLinesToKeep: 1
+NamespaceIndentation: None
+ObjCBlockIndentWidth: 2
+ObjCSpaceAfterProperty: false
+ObjCSpaceBeforeProtocolList: false
+PenaltyBreakAssignment: 2
+PenaltyBreakBeforeFirstCallParameter: 1
+PenaltyBreakComment: 300
+PenaltyBreakFirstLessLess: 120
+PenaltyBreakString: 1000
+PenaltyExcessCharacter: 1000000
+PenaltyReturnTypeOnItsOwnLine: 200
+PointerAlignment: Left
+ReflowComments: true
+SortIncludes: true
+SortUsingDeclarations: true
+SpaceAfterCStyleCast: false
+SpaceAfterTemplateKeyword: true
+SpaceBeforeAssignmentOperators: true
+SpaceBeforeParens: ControlStatements
+SpaceInEmptyParentheses: false
+SpacesBeforeTrailingComments: 2
+SpacesInAngles: false
+SpacesInContainerLiterals: true
+SpacesInCStyleCastParentheses: false
+SpacesInParentheses: false
+SpacesInSquareBrackets: false
+Standard: Auto
+TabWidth: 8
+UseTab: Never
diff --git a/.gitignore b/.gitignore
index a50fd0b..6978ed0 100644
--- a/.gitignore
+++ b/.gitignore
@@ -197,6 +197,8 @@ $RECYCLE.BIN/
# Windows shortcuts
*.lnk
-build
package-lock.json
-.vscode
+.vscode/
+build/
+Debug/
+Release/
diff --git a/.gitmodules b/.gitmodules
index c9fc461..51ac186 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -1,3 +1,4 @@
[submodule "deps/rocketmq"]
-path = deps/rocketmq
-url = https://github.com/apache/rocketmq-client-cpp.git
+ path = deps/rocketmq
+ url = https://github.com/apache/rocketmq-client-cpp.git
+ branch = re_dev
diff --git a/README.md b/README.md
index cdb8f65..dd15ba7 100644
--- a/README.md
+++ b/README.md
@@ -6,12 +6,12 @@
[![TravisCI](https://travis-ci.org/apache/rocketmq-client-nodejs.svg)](https://travis-ci.org/apache/rocketmq-client-nodejs)
[![Dependency](https://david-dm.org/apache/rocketmq-client-nodejs.svg)](https://david-dm.org/apache/rocketmq-client-nodejs)
-This official Node.js client is a lightweight wrapper around [rocketmq-client-cpp](https://github.com/apache/rocketmq-client-cpp), a finely tuned CPP client.
+This official Node.js client is a lightweight wrapper around [re_dev branch of rocketmq-client-cpp](https://github.com/apache/rocketmq-client-cpp/tree/re_dev), a finely tuned CPP client.
> **Notice 1:** This client is still in `dev` version. Use it cautiously in production.
-> **Notice 2:** This SDK is now only support macOS and Ubuntu **14.04**. Ubuntu 16+ is not supported and CentOS is not tested yet.
+> **Notice 2:** This SDK is now only tested on macOS. Ubuntu and CentOS is not tested yet.
## Installation
@@ -50,7 +50,8 @@ new Producer(groupId[, instanceName][, options]);
- `compressLevel`: the compress level (0-9) of this producer, default to `5` where `0` is fastest and `9` is most compressed;
- `sendMessageTimeout`: send message timeout millisecond, default to `3000` and suggestion is 2000 - 3000ms;
- `maxMessageSize`: max message size with unit (B), default to `1024 * 128` which means 128K;
- - `logFileNum`: C++ core logic log file number, default to 3 and log file path is `$HOME/logs/rocketmq-cpp`;
+ - `logDir`: the folder where C++ core logic log store, default log file path is `$HOME/logs/rocketmq-cpp`;
+ - `logFileNum`: C++ core logic log file number, default to 3;
- `logFileSize`: size of each C++ core logic log file with unit (B);
- `logLevel`: C++ core logic log level in `"fatal"`, `"error"`, `"warn"`, `"info"`, `"debug"`, `"trace"` and `"num"`.
@@ -177,7 +178,8 @@ new PushConsumer(groupId[, instanceName][, options]);
- `nameServer`: the name server of RocketMQ;
- `threadCount`: the thread number of underlying C++ logic;
- `maxBatchSize`: message max batch size;
- - `logFileNum`: C++ core logic log file number, default to 3 and log file path is `$HOME/logs/rocketmq-cpp`;
+ - `logDir`: the folder where C++ core logic log store, default log file path is `$HOME/logs/rocketmq-cpp`;
+ - `logFileNum`: C++ core logic log file number, default to 3;
- `logFileSize`: size of each C++ core logic log file with unit (B);
- `logLevel`: C++ core logic log level in `"fatal"`, `"error"`, `"warn"`, `"info"`, `"debug"`, `"trace"` and `"num"`.
diff --git a/binding.gyp b/binding.gyp
index 8f4a46e..c7c1d71 100644
--- a/binding.gyp
+++ b/binding.gyp
@@ -21,21 +21,21 @@
"src/rocketmq.cpp",
"src/producer.cpp",
"src/push_consumer.cpp",
- "src/consumer_ack.cpp",
- "src/consumer_ack_inner.cpp"
+ "src/consumer_ack.cpp"
],
"include_dirs": [
"deps/rocketmq/include",
- "<!(node -e \"require('nan')\")"
+ "<!@(node -p \"require('node-addon-api').include\")"
+ ],
+ "library_dirs": [
+ "<(module_root_dir)/deps/rocketmq/bin"
],
"conditions": [
["OS==\"linux\"", {
- "libraries": [
- "<(module_root_dir)/deps/lib/librocketmq.a"
- ],
- "cflags_cc!": [ "-fno-exceptions", "-pthread", "-Wl,--no-as-needed", "-ldl" ],
- "cflags_cc": [ "-Wno-ignored-qualifiers" ],
- "cflags": [ "-std=c++11", "-g" ]
+ "libraries": [ "-lrocketmq" ],
+ "cflags_cc!": [ "-fno-exceptions", "-fno-rtti", "-pthread", "-Wl,--no-as-needed", "-ldl" ],
+ "cflags_cc": [ "-Wall", "-std=c++11" ],
+ "cflags": [ "-g" ]
}],
["OS==\"win\"", {
"libraries": [
@@ -49,13 +49,13 @@
]
}],
["OS==\"mac\"", {
+ "libraries": [ "-lrocketmq" ],
"xcode_settings": {
- "GCC_ENABLE_CPP_EXCEPTIONS": "YES"
+ "GCC_ENABLE_CPP_EXCEPTIONS": "YES",
+ 'GCC_ENABLE_CPP_RTTI': 'YES'
},
- "cflags!": [ "-fno-exceptions" ],
- "cflags_cc!": [ "-fno-exceptions", "-pthread", "-Wl,--no-as-needed", "-ldl" ],
- "cflags_cc": [ "-Wno-ignored-qualifiers" ],
- "cflags": [ "-std=c++11", "-stdlib=libc++" ]
+ "cflags_cc!": [ "-fno-exceptions", "-fno-rtti", "-pthread", "-Wl,--no-as-needed", "-ldl" ],
+ "cflags_cc": [ "-Wall", "-std=c++11", "-stdlib=libc++" ]
}]
]
}
diff --git a/deps/rocketmq b/deps/rocketmq
index d5887b6..db0e209 160000
--- a/deps/rocketmq
+++ b/deps/rocketmq
@@ -1 +1 @@
-Subproject commit d5887b63ddbba16fec562f64dca7f77ce9ca0bb1
+Subproject commit db0e20972a78d70930acce5eaff1456408b72c73
diff --git a/lib/common.js b/lib/common.js
deleted file mode 100644
index 2386605..0000000
--- a/lib/common.js
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-"use strict";
-
-exports.requireBinding = function(name) {
- let mod;
- try {
- mod = require(`../build/Debug/${name}`);
- } catch(e) {
- mod = require(`../build/Release/${name}`);
- }
-
- return mod;
-};
diff --git a/lib/env_init.js b/lib/env_init.js
deleted file mode 100644
index 0c26a1d..0000000
--- a/lib/env_init.js
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-"use strict";
-
-const os = require("os");
-const path = require("path");
-
-const common = require("./common");
-
-const binding = common.requireBinding("rocketmq");
-
-switch(os.platform()) {
-case "darwin":
- process.env.EVENT_NOKQUEUE = "1";
- binding.macosDLOpen(path.join(__dirname, "../deps/lib/librocketmq.dylib"));
- break;
-default: break;
-}
diff --git a/lib/producer.js b/lib/producer.js
index f4f5c93..02b777e 100644
--- a/lib/producer.js
+++ b/lib/producer.js
@@ -16,13 +16,9 @@
*/
"use strict";
-require("./env_init");
-
const assert = require("assert");
-const common = require("./common");
-
-const binding = common.requireBinding("rocketmq");
+const binding = require("bindings")("rocketmq");
const START_OR_SHUTDOWN = Symbol("RocketMQProducer#startOrShutdown");
diff --git a/lib/push_consumer.js b/lib/push_consumer.js
index 3bb51de..7cf97ab 100644
--- a/lib/push_consumer.js
+++ b/lib/push_consumer.js
@@ -16,14 +16,10 @@
*/
"use strict";
-require("./env_init");
-
const assert = require("assert");
const EventEmitter = require("events").EventEmitter;
-const common = require("./common");
-
-const binding = common.requireBinding("rocketmq");
+const binding = require("bindings")("rocketmq");
const START_OR_SHUTDOWN = Symbol("RocketMQPushConsumer#startOrShutdown");
const START_STATUS = {
diff --git a/package.json b/package.json
index 074198d..5e38e40 100644
--- a/package.json
+++ b/package.json
@@ -1,25 +1,23 @@
{
"name": "apache-rocketmq",
- "version": "1.0.0-rc1",
- "cppSDKVersion": "1.2.0",
+ "version": "2.0.0-rc1",
+ "cppSDKVersion": "3.0.0",
"description": "RocketMQ binding for Node.js",
+ "license": "Apache-2.0",
+ "author": "James Yin <if...@apache.org>",
"main": "index.js",
"scripts": {
+ "preinstall": "git submodule update --init --recommend-shallow",
+ "install": "deps/rocketmq/build.sh && node-gyp rebuild",
"test": "npm run lint && echo 'temp example test' && node example/producer.js && node example/push_consumer.js",
- "lint": "eslint .",
- "install": "node ./script/download_lib.js && node-gyp rebuild"
+ "lint": "eslint ."
},
- "author": "XadillaX <i...@2333.moe>",
- "license": "Apache-2.0",
"dependencies": {
- "co": "^4.6.0",
- "destroy": "^1.0.4",
- "getos": "^3.1.1",
- "mkdirp": "^0.5.1",
- "nan": "^2.11.1",
- "urllib": "^2.31.3"
+ "bindings": "^1.5.0",
+ "node-addon-api": "^3.1.0"
},
"devDependencies": {
+ "co": "^4.6.0",
"eslint": "^5.9.0",
"eslint-config-rocketmq-style": "^1.0.0"
}
diff --git a/script/download_lib.js b/script/download_lib.js
deleted file mode 100644
index 5cfebfd..0000000
--- a/script/download_lib.js
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-"use strict";
-
-const fs = require("fs");
-const os = require("os");
-const path = require("path");
-
-const co = require("co");
-const destroy = require("destroy");
-const _mkdirp = require("mkdirp");
-const urllib = require("urllib");
-
-const getLinuxDistroRoute = require("./get_linux_distro_route");
-const pkg = require("../package");
-
-let REGISTRY_MIRROR =
- process.env.NODE_ROCKETMQ_REGISTRY ||
- "https://opensource-rocketmq-client.oss-cn-hangzhou.aliyuncs.com";
-if(!REGISTRY_MIRROR.endsWith("/")) REGISTRY_MIRROR += "/";
-
-const CPP_SDK_VERSION = pkg.cppSDKVersion;
-const LIB_DIR = path.join(__dirname, "..", "deps", "lib");
-const URL_ROOT = `${REGISTRY_MIRROR}cpp-client`;
-
-function mkdirp(dir) {
- return new Promise((resolve, reject) => {
- _mkdirp(dir, null, err => {
- if(err) reject(err);
- else resolve();
- });
- });
-}
-
-function *getUrlArray() {
- const platform = os.platform();
- const ret = [];
- let distro;
-
- switch(platform) {
- case "win32":
- ret.push(`${URL_ROOT}/windows/${CPP_SDK_VERSION}/rocketmq-client-cpp.dll`);
- ret.push(`${URL_ROOT}/windows/${CPP_SDK_VERSION}/rocketmq-client-cpp.lib`);
- break;
-
- case "darwin":
- ret.push(`${URL_ROOT}/mac/${CPP_SDK_VERSION}/librocketmq.dylib`);
- break;
-
- case "linux":
- distro = yield getLinuxDistroRoute();
- ret.push(`${URL_ROOT}/linux/${CPP_SDK_VERSION}/${distro}/librocketmq.a`);
- break;
-
- default: throw new Error(`Unsupported platform ${platform}`);
- }
-
- return ret;
-}
-
-co(function *() {
- let urls;
- try {
- urls = yield getUrlArray();
- } catch(e) {
- console.error(`[rocketmq sdk] [error] ${e.message}`);
- process.exit(4);
- }
-
- yield mkdirp(LIB_DIR);
- let writeTimes = 0;
- for(const url of urls) {
- console.log(`[rocketmq sdk] [info] downloading [${url}]...`);
-
- const resp = yield urllib.request(url, {
- timeout: 60000 * 5,
- followRedirect: true,
- streaming: true
- });
-
- if(resp.status !== 200) {
- destroy(resp.res);
- console.error(`[rocketmq sdk] [error] error status ${resp.status} while downloading [${url}].`);
- process.exit(4);
- }
-
- const readStream = resp.res;
- const filename = path.join(LIB_DIR, path.basename(url));
- const writeStream = fs.createWriteStream(filename, {
- encoding: "binary"
- });
-
- // eslint-disable-next-line
- function handleDownladCallback(err) {
- if(err) {
- console.error(`[rocketmq sdk] [error] error occurred while downloading [${url}] to [${filename}].`);
- console.error(err.stack);
- process.exit(4);
- }
-
- writeTimes++;
- destroy(resp.res);
-
- console.log(`[rocketmq sdk] [info] downloaded library [${url}].`);
- if(writeTimes === urls.length) {
- console.log("[rocketmq sdk] [info] all libraries have been written to disk.");
- process.exit(0);
- }
- }
-
- readStream.on("error", handleDownladCallback);
- writeStream.on("error", handleDownladCallback);
- writeStream.on("finish", handleDownladCallback);
-
- readStream.pipe(writeStream);
- }
-});
diff --git a/script/get_linux_distro_route.js b/script/get_linux_distro_route.js
deleted file mode 100644
index f03c827..0000000
--- a/script/get_linux_distro_route.js
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-"use strict";
-
-const assert = require("assert");
-
-const getos = require("getos");
-
-const RHEL_MAP_ARRAY = [
- "Centos",
- "Red Hat Linux",
- "RHEL",
- "Scientific Linux",
- "ScientificSL",
- "ScientificCERNSLC",
- "ScientificFermiLTS",
- "ScientificSLF"
-];
-const NORMAL_MAP_ARRAY = [
- "Alpine Linux",
- "Amazon Linux",
- "Arch Linux",
- "Chakra",
- "Debian",
- "elementary OS",
- "IYCC",
- "Linux Mint",
- "Manjaro Linux",
- "Ubuntu Linux"
-];
-
-function realGetLinuxDistroRoute(dist, release) {
- let major;
- if(release) {
- major = Number(release.split(".")[0]);
- }
-
- // RHEL Distros
- if(RHEL_MAP_ARRAY.includes(dist)) {
- assert(major >= 5 && major <= 7, `Only support ${dist} 5-7.`);
- return `RHEL${major}.x`;
- }
-
- // Fedora
- if("Fedora" === dist) {
- if(major <= 18) {
- return "RHEL6.x";
- } else if(major === 19) {
- return "RHEL7.x";
- }
- }
-
- if("Ubuntu Linux" === dist) {
- assert([ 14, 16, 18 ].includes(major));
- return `UBUNTU/${major}.04`;
- }
-
- if("Debian" === dist) {
- assert(major >= 8 && major <= 10);
- return `UBUNTU/${(major + 2) / 2}.04`;
- }
-
- // Ubuntu Distros
- if(!NORMAL_MAP_ARRAY.includes(dist)) {
- console.error(`[rocketmq sdk] [warn] ${dist} may not supported, fallback to use Ubuntu library.`);
- }
-
- return "UBUNTU/14.04";
-}
-
-function getLinuxDistroRoute() {
- return new Promise((resolve, reject) => {
- getos(function(err, ret) {
- if(err) return reject(err);
-
- let route;
- try {
- route = realGetLinuxDistroRoute(ret.dist, ret.release);
- } catch(e) {
- return reject(e);
- }
-
- resolve(route);
- });
- });
-}
-
-module.exports = getLinuxDistroRoute;
diff --git a/src/consumer_ack.cpp b/src/consumer_ack.cpp
index e2f5c09..8de4aaf 100644
--- a/src/consumer_ack.cpp
+++ b/src/consumer_ack.cpp
@@ -15,76 +15,52 @@
* limitations under the License.
*/
#include "consumer_ack.h"
+#include <exception>
+#include "napi.h"
namespace __node_rocketmq__ {
-Nan::Persistent<Function> ConsumerAck::constructor;
+Napi::Object ConsumerAck::Init(Napi::Env env, Napi::Object exports) {
+ Napi::Function func = DefineClass(
+ env, "ConsumerAck", {InstanceMethod<&ConsumerAck::Done>("done")});
-ConsumerAck::ConsumerAck() :
- inner(NULL)
-{
-}
+ Napi::FunctionReference* constructor = new Napi::FunctionReference();
+ *constructor = Napi::Persistent(func);
+ env.SetInstanceData<Napi::FunctionReference>(constructor);
-ConsumerAck::~ConsumerAck()
-{
- inner = NULL;
+ exports.Set("ConsumerAck", func);
+ return exports;
}
-NAN_MODULE_INIT(ConsumerAck::Init)
-{
- Local<FunctionTemplate> tpl = Nan::New<FunctionTemplate>(New);
- tpl->SetClassName(Nan::New("ConsumerAck").ToLocalChecked());
- tpl->InstanceTemplate()->SetInternalFieldCount(1);
-
- Nan::SetPrototypeMethod(tpl, "done", Done);
-
- constructor.Reset(tpl->GetFunction());
- Nan::Set(target, Nan::New("ConsumerAck").ToLocalChecked(), tpl->GetFunction());
+Napi::Object ConsumerAck::NewInstance(Napi::Env env) {
+ Napi::Object obj = env.GetInstanceData<Napi::FunctionReference>()->New({});
+ return obj;
}
-NAN_METHOD(ConsumerAck::New)
-{
- Isolate* isolate = info.GetIsolate();
- Local<Context> context = Context::New(isolate);
-
- if(!info.IsConstructCall())
- {
- Local<Function> _constructor = Nan::New<Function>(constructor);
- info.GetReturnValue().Set(_constructor->NewInstance(context, 0, 0).ToLocalChecked());
- return;
- }
+ConsumerAck::ConsumerAck(const Napi::CallbackInfo& info)
+ : Napi::ObjectWrap<ConsumerAck>(info) {}
- ConsumerAck* producer = new ConsumerAck();
- producer->Wrap(info.This());
- info.GetReturnValue().Set(info.This());
+void ConsumerAck::SetPromise(std::promise<bool>&& promise) {
+ promise_ = std::move(promise);
}
-NAN_METHOD(ConsumerAck::Done)
-{
- ConsumerAck* ack = ObjectWrap::Unwrap<ConsumerAck>(info.Holder());
- bool succ = true;
-
- if(info.Length() >= 1)
- {
- succ = (info[0]->IsUndefined() || info[0]->IsNull() || Nan::To<bool>(info[0]).FromJust());
- }
+void ConsumerAck::Done(bool ack) {
+ promise_.set_value(ack);
+}
- // call inner ack's `Ack` function to emit the true `Acker`'s `Ack` function
- // and finish waiting of consume thread
- CConsumeStatus status = succ ?
- CConsumeStatus::E_CONSUME_SUCCESS :
- CConsumeStatus::E_RECONSUME_LATER;
- ack->Ack(status);
+void ConsumerAck::Done(std::exception_ptr exception) {
+ promise_.set_exception(exception);
}
-void ConsumerAck::Ack(CConsumeStatus status)
-{
- if(inner)
- {
- // call inner ack in the main event loop
- inner->Ack(status);
- inner = NULL;
+Napi::Value ConsumerAck::Done(const Napi::CallbackInfo& info) {
+ if (info.Length() >= 1) {
+ Napi::Value ack = info[0];
+ if (ack.IsBoolean() && !ack.ToBoolean()) {
+ Done(false);
}
+ }
+ Done(true);
+ return info.Env().Undefined();
}
-}
+} // namespace __node_rocketmq__
diff --git a/src/consumer_ack.h b/src/consumer_ack.h
index 682d85e..ff0ebd1 100644
--- a/src/consumer_ack.h
+++ b/src/consumer_ack.h
@@ -17,50 +17,31 @@
#ifndef __ROCKETMQ_CONSUMER_ACK_H__
#define __ROCKETMQ_CONSUMER_ACK_H__
-#include "consumer_ack_inner.h"
-#include <nan.h>
+#include <future>
-namespace __node_rocketmq__ {
-
-using v8::Context;
-using v8::Function;
-using v8::FunctionTemplate;
-using v8::Isolate;
-using v8::Local;
-using v8::Object;
-using v8::String;
-using v8::Value;
+#include <napi.h>
-class ConsumerAck : public Nan::ObjectWrap {
-public:
- static NAN_MODULE_INIT(Init);
-
-private:
- explicit ConsumerAck();
- ~ConsumerAck();
+namespace __node_rocketmq__ {
- static NAN_METHOD(New);
- static NAN_METHOD(Done);
+class ConsumerAck : public Napi::ObjectWrap<ConsumerAck> {
+ public:
+ static Napi::Object Init(Napi::Env env, Napi::Object exports);
+ static Napi::Object NewInstance(Napi::Env env);
- void Ack(CConsumeStatus status);
+ ConsumerAck(const Napi::CallbackInfo& info);
- static Nan::Persistent<v8::Function> constructor;
+ void SetPromise(std::promise<bool>&& promise);
-public:
- void SetInner(ConsumerAckInner* _inner)
- {
- inner = _inner;
- }
+ void Done(bool ack);
+ void Done(std::exception_ptr exception);
- static Nan::Persistent<v8::Function>& GetConstructor()
- {
- return constructor;
- }
+ private:
+ Napi::Value Done(const Napi::CallbackInfo& info);
-private:
- ConsumerAckInner* inner;
+ private:
+ std::promise<bool> promise_;
};
-}
+} // namespace __node_rocketmq__
#endif
diff --git a/src/consumer_ack_inner.cpp b/src/consumer_ack_inner.cpp
deleted file mode 100644
index 47b1a07..0000000
--- a/src/consumer_ack_inner.cpp
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "consumer_ack_inner.h"
-
-namespace __node_rocketmq__ {
-
-ConsumerAckInner::ConsumerAckInner() :
- acked(false)
-{
- uv_cond_init(&cond);
- uv_mutex_init(&mutex);
-}
-
-ConsumerAckInner::~ConsumerAckInner()
-{
- uv_mutex_destroy(&mutex);
- uv_cond_destroy(&cond);
-}
-
-void ConsumerAckInner::Ack(CConsumeStatus _status)
-{
- uv_mutex_lock(&mutex);
- bool _acked = acked;
-
- if(_acked)
- {
- uv_mutex_unlock(&mutex);
- return;
- }
-
- status = _status;
- acked = true;
-
- // tell `this->WaitResult()` to continue
- uv_cond_signal(&cond);
- uv_mutex_unlock(&mutex);
-}
-
-CConsumeStatus ConsumerAckInner::WaitResult()
-{
- uv_mutex_lock(&mutex);
-
- // if `cond signal` send before `WaitResult()`,
- // `uv_cond_wait` will be blocked and never continue
- //
- // so we have to return result directly without `uv_cond_wait`
- if(acked)
- {
- CConsumeStatus _status = status;
- uv_mutex_unlock(&mutex);
- return _status;
- }
-
- // wait for `this->Ack()` and that will emit `uv_cond_signal` to let it stop wait
- uv_cond_wait(&cond, &mutex);
-
- CConsumeStatus _status = status;
- uv_mutex_unlock(&mutex);
-
- return _status;
-}
-
-}
diff --git a/src/consumer_ack_inner.h b/src/consumer_ack_inner.h
deleted file mode 100644
index 9caa105..0000000
--- a/src/consumer_ack_inner.h
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __ROCKETMQ_CONSUMER_ACK_INNER_H__
-#define __ROCKETMQ_CONSUMER_ACK_INNER_H__
-
-#include <uv.h>
-#include <CPushConsumer.h>
-
-namespace __node_rocketmq__ {
-
-class ConsumerAckInner {
-public:
- ConsumerAckInner();
- ~ConsumerAckInner();
-
- void Ack(CConsumeStatus _status);
- CConsumeStatus WaitResult();
-
-private:
- bool acked;
- CConsumeStatus status;
- uv_mutex_t mutex;
- uv_cond_t cond;
-};
-
-}
-
-#endif
diff --git a/src/producer.cpp b/src/producer.cpp
index 3ae4047..45dfe1f 100644
--- a/src/producer.cpp
+++ b/src/producer.cpp
@@ -15,253 +15,277 @@
* limitations under the License.
*/
#include "producer.h"
-#include "workers/producer/send_message.h"
-#include "workers/producer/start_or_shutdown.h"
-#include <MQClientException.h>
+#include <cstddef>
+#include <exception>
#include <string>
-using namespace std;
-namespace __node_rocketmq__ {
-
-#define NAN_GET_CPRODUCER() \
- RocketMQProducer* _v8_producer = ObjectWrap::Unwrap<RocketMQProducer>(info.Holder()); \
- CProducer* producer_ptr = _v8_producer->GetProducer();
-
-Nan::Persistent<Function> RocketMQProducer::constructor;
+#include <napi.h>
-RocketMQProducer::RocketMQProducer(const char* group_id, const char* instance_name)
-{
- producer_ptr = CreateProducer(group_id);
- if(instance_name)
- {
- SetProducerInstanceName(producer_ptr, instance_name);
- }
-}
+#include <ClientRPCHook.h>
+#include <LoggerConfig.h>
+#include <MQException.h>
+#include <MQMessage.h>
+#include <SendCallback.h>
-RocketMQProducer::~RocketMQProducer()
-{
- try
- {
- ShutdownProducer(producer_ptr);
- }
- catch (...)
- {
- //
- }
+namespace __node_rocketmq__ {
- DestroyProducer(producer_ptr);
+Napi::Object RocketMQProducer::Init(Napi::Env env, Napi::Object exports) {
+ Napi::Function func =
+ DefineClass(env,
+ "RocketMQProducer",
+ {
+ InstanceMethod<&RocketMQProducer::Start>("start"),
+ InstanceMethod<&RocketMQProducer::Shutdown>("shutdown"),
+ InstanceMethod<&RocketMQProducer::Send>("send"),
+ InstanceMethod<&RocketMQProducer::SetSessionCredentials>(
+ "setSessionCredentials"),
+ });
+
+ Napi::FunctionReference* constructor = new Napi::FunctionReference();
+ *constructor = Napi::Persistent(func);
+ env.SetInstanceData<Napi::FunctionReference>(constructor);
+
+ exports.Set("Producer", func);
+ return exports;
}
-void RocketMQProducer::SetOptions(Local<Object> options)
-{
- // set name server
- Local<Value> _name_server_v = Nan::Get(options, Nan::New<String>("nameServer").ToLocalChecked()).ToLocalChecked();
- if(_name_server_v->IsString())
- {
- Nan::Utf8String namesrv(_name_server_v);
- SetProducerNameServerAddress(producer_ptr, *namesrv);
- }
-
- // set group name
- Local<Value> _group_name_v = Nan::Get(options, Nan::New<String>("groupName").ToLocalChecked()).ToLocalChecked();
- if(_group_name_v->IsString())
- {
- Nan::Utf8String group_name(_group_name_v);
- SetProducerGroupName(producer_ptr, *group_name);
- }
+RocketMQProducer::RocketMQProducer(const Napi::CallbackInfo& info)
+ : Napi::ObjectWrap<RocketMQProducer>(info), producer_("") {
+ const Napi::Value group_name = info[0];
+ if (group_name.IsString()) {
+ producer_.set_group_name(group_name.ToString());
+ }
- // set log num & single log size
- int file_num = 3;
- int64 file_size = 104857600;
- Local<Value> _log_file_num_v = Nan::Get(options, Nan::New<String>("logFileNum").ToLocalChecked()).ToLocalChecked();
- Local<Value> _log_file_size_v = Nan::Get(options, Nan::New<String>("logFileSize").ToLocalChecked()).ToLocalChecked();
- if(_log_file_num_v->IsNumber())
- {
- file_num = _log_file_num_v->Int32Value();
- }
- if(_log_file_size_v->IsNumber())
- {
- file_size = _log_file_size_v->Int32Value();
- }
- SetProducerLogFileNumAndSize(producer_ptr, file_num, file_size);
-
- // set log level
- Local<Value> _log_level_v = Nan::Get(options, Nan::New<String>("logLevel").ToLocalChecked()).ToLocalChecked();
- if(_log_level_v->IsNumber())
- {
- int level = _log_level_v->Int32Value();
- SetProducerLogLevel(producer_ptr, (CLogLevel)level);
- }
+ const Napi::Value instance_name = info[1];
+ if (instance_name.IsString()) {
+ producer_.set_instance_name(instance_name.ToString());
+ }
- // set compress level
- Local<Value> _compress_level_v = Nan::Get(options, Nan::New<String>("compressLevel").ToLocalChecked()).ToLocalChecked();
- if(_compress_level_v->IsNumber()) {
- int level = _compress_level_v->Int32Value();
- SetProducerCompressLevel(producer_ptr, level);
- }
+ const Napi::Value options = info[2];
+ if (options.IsObject()) {
+ // try to set options
+ SetOptions(options.ToObject());
+ }
+}
- // set send message timeout
- Local<Value> _send_message_timeout_v = Nan::Get(options, Nan::New<String>("sendMessageTimeout").ToLocalChecked()).ToLocalChecked();
- if(_send_message_timeout_v->IsNumber())
- {
- int timeout = _send_message_timeout_v->Int32Value();
- SetProducerSendMsgTimeout(producer_ptr, timeout);
- }
+RocketMQProducer::~RocketMQProducer() {
+ producer_.shutdown();
+}
- // set max message size
- Local<Value> _max_message_size_v = Nan::Get(options, Nan::New<String>("maxMessageSize").ToLocalChecked()).ToLocalChecked();
- if(_max_message_size_v->IsNumber())
- {
- int size = _max_message_size_v->Int32Value();
- SetProducerMaxMessageSize(producer_ptr, size);
+void RocketMQProducer::SetOptions(const Napi::Object& options) {
+ // set name server
+ Napi::Value name_server = options.Get("nameServer");
+ if (name_server.IsString()) {
+ producer_.set_namesrv_addr(name_server.ToString());
+ }
+
+ // set group name
+ Napi::Value group_name = options.Get("groupName");
+ if (group_name.IsString()) {
+ producer_.set_group_name(group_name.ToString());
+ }
+
+ // set max message size
+ Napi::Value max_message_size = options.Get("maxMessageSize");
+ if (max_message_size.IsNumber()) {
+ producer_.set_max_message_size(max_message_size.ToNumber());
+ }
+
+ // set compress level
+ Napi::Value compress_level = options.Get("compressLevel");
+ if (compress_level.IsNumber()) {
+ producer_.set_compress_level(compress_level.ToNumber());
+ }
+
+ // set send message timeout
+ Napi::Value send_message_timeout = options.Get("sendMessageTimeout");
+ if (send_message_timeout.IsNumber()) {
+ producer_.set_send_msg_timeout(send_message_timeout.ToNumber());
+ }
+
+ // set log level
+ Napi::Value log_level = options.Get("logLevel");
+ if (log_level.IsNumber()) {
+ int32_t level = log_level.ToNumber();
+ if (level >= 0 && level < rocketmq::LogLevel::LOG_LEVEL_LEVEL_NUM) {
+ rocketmq::GetDefaultLoggerConfig().set_level(
+ static_cast<rocketmq::LogLevel>(level));
}
+ }
+
+ // set log directory
+ Napi::Value log_dir = options.Get("logDir");
+ if (log_dir.IsString()) {
+ rocketmq::GetDefaultLoggerConfig().set_path(log_dir.ToString());
+ }
+
+ // set log file size
+ Napi::Value log_file_size = options.Get("logFileSize");
+ if (log_file_size.IsNumber()) {
+ rocketmq::GetDefaultLoggerConfig().set_file_count(log_file_size.ToNumber());
+ }
+
+ // set log file num
+ Napi::Value log_file_num = options.Get("logFileNum");
+ if (log_file_num.IsNumber()) {
+ rocketmq::GetDefaultLoggerConfig().set_file_count(log_file_num.ToNumber());
+ }
}
-NAN_MODULE_INIT(RocketMQProducer::Init)
-{
- Local<FunctionTemplate> tpl = Nan::New<FunctionTemplate>(New);
- tpl->SetClassName(Nan::New("RocketMQProducer").ToLocalChecked());
- tpl->InstanceTemplate()->SetInternalFieldCount(1);
+Napi::Value RocketMQProducer::SetSessionCredentials(
+ const Napi::CallbackInfo& info) {
+ Napi::String access_key = info[0].As<Napi::String>();
+ Napi::String secret_key = info[1].As<Napi::String>();
+ Napi::String ons_channel = info[2].As<Napi::String>();
- Nan::SetPrototypeMethod(tpl, "start", Start);
- Nan::SetPrototypeMethod(tpl, "shutdown", Shutdown);
- Nan::SetPrototypeMethod(tpl, "send", Send);
- Nan::SetPrototypeMethod(tpl, "setSessionCredentials", SetSessionCredentials);
+ auto rpc_hook = std::make_shared<rocketmq::ClientRPCHook>(
+ rocketmq::SessionCredentials(access_key, secret_key, ons_channel));
+ producer_.setRPCHook(rpc_hook);
- constructor.Reset(tpl->GetFunction());
- Nan::Set(target, Nan::New("Producer").ToLocalChecked(), tpl->GetFunction());
+ return info.Env().Undefined();
}
-NAN_METHOD(RocketMQProducer::New)
-{
- Isolate* isolate = info.GetIsolate();
- Local<Context> context = Context::New(isolate);
-
- if(!info.IsConstructCall())
- {
- const int argc = 3;
- Local<Value> argv[argc] = { info[0], info[1], info[2] };
- Local<Function> _constructor = Nan::New<v8::Function>(constructor);
- info.GetReturnValue().Set(_constructor->NewInstance(context, argc, argv).ToLocalChecked());
- return;
- }
+class ProducerStartWorker : public Napi::AsyncWorker {
+ public:
+ ProducerStartWorker(const Napi::Function& callback,
+ const rocketmq::DefaultMQProducer& producer)
+ : Napi::AsyncWorker(callback), producer_(producer) {}
- Nan::Utf8String group_id(info[0]);
- Nan::Utf8String instance_name(info[1]);
- Local<Object> options = Nan::To<Object>(info[2]).ToLocalChecked();
- RocketMQProducer* producer = new RocketMQProducer(*group_id, info[1]->IsNull() ? NULL : *instance_name);
+ void Execute() override { producer_.start(); }
- producer->Wrap(info.This());
+ private:
+ rocketmq::DefaultMQProducer producer_;
+};
- // try to set options
- try
- {
- producer->SetOptions(options);
- }
- catch (runtime_error e)
- {
- Nan::ThrowError(e.what());
- return;
- }
-
- info.GetReturnValue().Set(info.This());
+Napi::Value RocketMQProducer::Start(const Napi::CallbackInfo& info) {
+ Napi::Function callback = info[0].As<Napi::Function>();
+ auto* worker = new ProducerStartWorker(callback, producer_);
+ worker->Queue();
+ return info.Env().Undefined();
}
-NAN_METHOD(RocketMQProducer::Start)
-{
- NAN_GET_CPRODUCER();
-
- Nan::Callback* callback = (info[0]->IsFunction()) ?
- new Nan::Callback(Nan::To<Function>(info[0]).ToLocalChecked()) :
- NULL;
-
- Nan::AsyncQueueWorker(new ProducerStartOrShutdownWorker(callback, producer_ptr, ProducerWorkerType::START_PRODUCER));
-}
+class ProducerShutdownWorker : public Napi::AsyncWorker {
+ public:
+ ProducerShutdownWorker(const Napi::Function& callback,
+ const rocketmq::DefaultMQProducer& producer)
+ : Napi::AsyncWorker(callback), producer_(producer) {}
-NAN_METHOD(RocketMQProducer::Shutdown)
-{
- NAN_GET_CPRODUCER();
+ void Execute() override { producer_.shutdown(); }
- Nan::Callback* callback = (info[0]->IsFunction()) ?
- new Nan::Callback(Nan::To<Function>(info[0]).ToLocalChecked()) :
- NULL;
+ private:
+ rocketmq::DefaultMQProducer producer_;
+};
- Nan::AsyncQueueWorker(new ProducerStartOrShutdownWorker(callback, producer_ptr, ProducerWorkerType::SHUTDOWN_PRODUCER));
+Napi::Value RocketMQProducer::Shutdown(const Napi::CallbackInfo& info) {
+ Napi::Function callback = info[0].As<Napi::Function>();
+ auto* worker = new ProducerShutdownWorker(callback, producer_);
+ worker->Queue();
+ return info.Env().Undefined();
}
-NAN_METHOD(RocketMQProducer::SetSessionCredentials)
-{
- NAN_GET_CPRODUCER();
-
- Nan::Utf8String access_key(info[0]);
- Nan::Utf8String secret_key(info[1]);
- Nan::Utf8String ons_channel(info[2]);
-
- int ret;
- try
- {
- ret = SetProducerSessionCredentials(producer_ptr, *access_key, *secret_key, *ons_channel);
+class ProducerSendCallback : public rocketmq::AutoDeleteSendCallback {
+ private:
+ struct ResultOrException {
+ std::unique_ptr<rocketmq::SendResult> result;
+ std::exception_ptr exception;
+ };
+
+ public:
+ ProducerSendCallback(Napi::Env&& env, Napi::Function&& callback)
+ : callback_(
+ Callback::New(env, callback, "RocketMQ Send Callback", 0, 1)) {}
+
+ ~ProducerSendCallback() { callback_.Release(); }
+
+ void onSuccess(rocketmq::SendResult& send_result) override {
+ auto* data =
+ new ResultOrException{std::unique_ptr<rocketmq::SendResult>(
+ new rocketmq::SendResult(send_result)),
+ nullptr};
+ napi_status status = callback_.BlockingCall(data);
+ if (status != napi_ok) {
+ // TODO: Handle error
+ std::exit(-1);
}
- catch(runtime_error e)
- {
- Nan::ThrowError(e.what());
+ }
+
+ void onException(rocketmq::MQException& exception) noexcept override {
+ auto* data =
+ new ResultOrException{nullptr, std::make_exception_ptr(exception)};
+ napi_status status = callback_.BlockingCall(data);
+ if (status != napi_ok) {
+ // TODO: Handle error
+ std::exit(-1);
}
- catch(std::exception& e)
- {
- Nan::ThrowError(e.what());
+ }
+
+ static void CallJs(Napi::Env env,
+ Napi::Function callback,
+ std::nullptr_t*,
+ ResultOrException* data) {
+ std::unique_ptr<ResultOrException> data_guard(data);
+ if (env != nullptr) {
+ if (callback != nullptr) {
+ if (data->exception) {
+ try {
+ std::rethrow_exception(data->exception);
+ } catch (const std::exception& e) {
+ callback.Call(Napi::Object::New(callback.Env()),
+ {Napi::Error::New(env, e.what()).Value()});
+ }
+ } else {
+ callback.Call(Napi::Object::New(callback.Env()),
+ {env.Undefined(),
+ Napi::Number::New(env, data->result->send_status()),
+ Napi::String::New(env, data->result->msg_id()),
+ Napi::Number::New(env, data->result->queue_offset())});
+ }
+ }
}
- catch(rocketmq::MQException& e)
- {
- Nan::ThrowError(e.what());
+ }
+
+ private:
+ using Callback = Napi::TypedThreadSafeFunction<std::nullptr_t,
+ ResultOrException,
+ &ProducerSendCallback::CallJs>;
+
+ Callback callback_;
+};
+
+Napi::Value RocketMQProducer::Send(const Napi::CallbackInfo& info) {
+ rocketmq::MQMessage message = [&]() {
+ Napi::String topic = info[0].As<Napi::String>();
+ Napi::Value body = info[1];
+ if (body.IsString()) {
+ return rocketmq::MQMessage(topic, body.ToString());
+ } else {
+ Napi::Buffer<char> buffer = body.As<Napi::Buffer<char>>();
+ return rocketmq::MQMessage(topic,
+ std::string(buffer.Data(), buffer.Length()));
}
- info.GetReturnValue().Set(ret);
-}
-
-NAN_METHOD(RocketMQProducer::Send)
-{
- Nan::Utf8String topic(info[0]);
- Local<Object> options = Nan::To<Object>(info[2]).ToLocalChecked();
-
- CMessage* msg = CreateMessage(*topic);
+ }();
- Local<Value> _tags_to_be_checked = Nan::Get(options, Nan::New<String>("tags").ToLocalChecked()).ToLocalChecked();
- Local<Value> _keys_to_be_checked = Nan::Get(options, Nan::New<String>("keys").ToLocalChecked()).ToLocalChecked();
+ const Napi::Value options_v = info[2];
+ if (options_v.IsObject()) {
+ const Napi::Object options = options_v.ToObject();
- if(_tags_to_be_checked->IsString())
- {
- Nan::Utf8String tags(_tags_to_be_checked);
- SetMessageTags(msg, *tags);
+ Napi::Value tags = options.Get("tags");
+ if (tags.IsString()) {
+ message.set_tags(tags.ToString());
}
- if(_keys_to_be_checked->IsString())
- {
- Nan::Utf8String keys(_keys_to_be_checked);
- SetMessageKeys(msg, *keys);
+ Napi::Value keys = options.Get("keys");
+ if (keys.IsString()) {
+ message.set_keys(keys.ToString());
}
+ }
- // set message body:
- // 1. if it's a string, call `SetMessageBody`;
- // 2. if it's a buffer, call `SetByteMessageBody`.
- if(info[1]->IsString())
- {
- Nan::Utf8String body(info[1]);
- SetMessageBody(msg, *body);
- }
- else
- {
- Local<Object> node_buff_object = Nan::To<Object>(info[1]).ToLocalChecked();
- unsigned int length = node::Buffer::Length(node_buff_object);
- const char* buff = node::Buffer::Data(node_buff_object);
- SetByteMessageBody(msg, buff, length);
- }
+ auto* send_callback =
+ new ProducerSendCallback(info.Env(), info[3].As<Napi::Function>());
+ producer_.send(message, send_callback);
- Nan::Callback* callback = (info[3]->IsFunction()) ?
- new Nan::Callback(Nan::To<Function>(info[3]).ToLocalChecked()) :
- NULL;
-
- RocketMQProducer* producer = ObjectWrap::Unwrap<RocketMQProducer>(info.Holder());
- Nan::AsyncQueueWorker(new ProducerSendMessageWorker(callback, producer, msg));
+ return info.Env().Undefined();
}
-}
+} // namespace __node_rocketmq__
diff --git a/src/producer.h b/src/producer.h
index d20cd29..780efa4 100644
--- a/src/producer.h
+++ b/src/producer.h
@@ -17,46 +17,34 @@
#ifndef __ROCKETMQ_PRODUCER_H__
#define __ROCKETMQ_PRODUCER_H__
-#include <CProducer.h>
-#include <nan.h>
+#include <napi.h>
-namespace __node_rocketmq__ {
+#include <DefaultMQProducer.h>
-using v8::Context;
-using v8::Function;
-using v8::FunctionTemplate;
-using v8::Isolate;
-using v8::Local;
-using v8::Object;
-using v8::String;
-using v8::Value;
+namespace __node_rocketmq__ {
-class RocketMQProducer : public Nan::ObjectWrap {
-public:
- static NAN_MODULE_INIT(Init);
+class RocketMQProducer : public Napi::ObjectWrap<RocketMQProducer> {
+ public:
+ static Napi::Object Init(Napi::Env env, Napi::Object exports);
-public:
- CProducer* GetProducer() { return producer_ptr; }
+ RocketMQProducer(const Napi::CallbackInfo& info);
+ ~RocketMQProducer();
-private:
- explicit RocketMQProducer(const char* group_id, const char* instance_name);
- ~RocketMQProducer();
+ private:
+ Napi::Value SetSessionCredentials(const Napi::CallbackInfo& info);
- static NAN_METHOD(New);
- static NAN_METHOD(Start);
- static NAN_METHOD(Shutdown);
- static NAN_METHOD(Send);
- static NAN_METHOD(SetSessionCredentials);
+ Napi::Value Start(const Napi::CallbackInfo& info);
+ Napi::Value Shutdown(const Napi::CallbackInfo& info);
- static Nan::Persistent<Function> constructor;
+ Napi::Value Send(const Napi::CallbackInfo& info);
-private:
- void SetOptions(Local<Object> options);
+ private:
+ void SetOptions(const Napi::Object& options);
-private:
- CProducer* producer_ptr;
+ private:
+ rocketmq::DefaultMQProducer producer_;
};
-}
+} // namespace __node_rocketmq__
#endif
diff --git a/src/push_consumer.cpp b/src/push_consumer.cpp
index 074a64d..63a786f 100644
--- a/src/push_consumer.cpp
+++ b/src/push_consumer.cpp
@@ -14,381 +14,259 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#include <map>
#include "push_consumer.h"
-#include "consumer_ack.h"
-#include "workers/push_consumer/start_or_shutdown.h"
-
-using namespace std;
-
-namespace __node_rocketmq__ {
-struct MessageHandlerParam
-{
- RocketMQPushConsumer* consumer;
- ConsumerAckInner* ack;
- CMessageExt* msg;
-};
-char message_handler_param_keys[5][8] = { "topic", "tags", "keys", "body", "msgId" };
-
-uv_mutex_t _get_msg_ext_column_lock;
-
-map<CPushConsumer*, RocketMQPushConsumer*> _push_consumer_map;
+#include <exception>
+#include <future>
-#define NAN_GET_CPUSHCONSUMER() \
- RocketMQPushConsumer* _v8_consumer = ObjectWrap::Unwrap<RocketMQPushConsumer>(info.Holder()); \
- CPushConsumer* consumer_ptr = _v8_consumer->GetConsumer();
+#include <napi.h>
-Nan::Persistent<Function> RocketMQPushConsumer::constructor;
+#include <ClientRPCHook.h>
+#include <LoggerConfig.h>
+#include <MQMessageListener.h>
-RocketMQPushConsumer::RocketMQPushConsumer(const char* group_id, const char* instance_name) :
- consumer_ptr(NULL)
-{
- consumer_ptr = CreatePushConsumer(group_id);
+#include "consumer_ack.h"
- if(instance_name)
- {
- SetPushConsumerInstanceName(consumer_ptr, instance_name);
- }
+using namespace std;
- _push_consumer_map[consumer_ptr] = this;
+namespace __node_rocketmq__ {
- RegisterMessageCallback(consumer_ptr, RocketMQPushConsumer::OnMessage);
+Napi::Object RocketMQPushConsumer::Init(Napi::Env env, Napi::Object exports) {
+ Napi::Function func = DefineClass(
+ env,
+ "RocketMQPushConsumer",
+ {
+ InstanceMethod<&RocketMQPushConsumer::Start>("start"),
+ InstanceMethod<&RocketMQPushConsumer::Shutdown>("shutdown"),
+ InstanceMethod<&RocketMQPushConsumer::Subscribe>("subscribe"),
+ InstanceMethod<&RocketMQPushConsumer::SetListener>("setListener"),
+ InstanceMethod<&RocketMQPushConsumer::SetSessionCredentials>(
+ "setSessionCredentials"),
+ });
+
+ Napi::FunctionReference* constructor = new Napi::FunctionReference();
+ *constructor = Napi::Persistent(func);
+ env.SetInstanceData<Napi::FunctionReference>(constructor);
+
+ exports.Set("PushConsumer", func);
+ return exports;
}
-RocketMQPushConsumer::~RocketMQPushConsumer()
-{
- try
- {
- ShutdownPushConsumer(consumer_ptr);
- auto it = _push_consumer_map.find(consumer_ptr);
- if(it != _push_consumer_map.end())
- {
- _push_consumer_map.erase(consumer_ptr);
- }
- }
- catch(...)
- {
- //
- }
+RocketMQPushConsumer::RocketMQPushConsumer(const Napi::CallbackInfo& info)
+ : Napi::ObjectWrap<RocketMQPushConsumer>(info), consumer_("") {
+ const Napi::Value group_name = info[0];
+ if (group_name.IsString()) {
+ consumer_.set_group_name(group_name.ToString());
+ }
- DestroyPushConsumer(consumer_ptr);
- consumer_ptr = NULL;
-}
-
-void RocketMQPushConsumer::SetOptions(Local<Object> options)
-{
- // set name server
- Local<Value> _name_server_v = Nan::Get(options, Nan::New<String>("nameServer").ToLocalChecked()).ToLocalChecked();
- if(_name_server_v->IsString())
- {
- Nan::Utf8String namesrv(_name_server_v);
- SetPushConsumerNameServerAddress(consumer_ptr, *namesrv);
- }
+ const Napi::Value instance_name = info[1];
+ if (instance_name.IsString()) {
+ consumer_.set_instance_name(instance_name.ToString());
+ }
- // set thread count
- Local<Value> _thread_count_v = Nan::Get(options, Nan::New<String>("threadCount").ToLocalChecked()).ToLocalChecked();
- if(_thread_count_v->IsNumber())
- {
- int thread_count = Nan::To<int32_t>(_thread_count_v).FromJust();
- if(thread_count > 0)
- {
- SetPushConsumerThreadCount(consumer_ptr, thread_count);
- }
- }
-
- // set message batch max size
- Local<Value> _max_batch_size_v = Nan::Get(options, Nan::New<String>("maxBatchSize").ToLocalChecked()).ToLocalChecked();
- if(_max_batch_size_v->IsNumber())
- {
- int max_batch_size = Nan::To<int32_t>(_max_batch_size_v).FromJust();
- if(max_batch_size > 0)
- {
- SetPushConsumerMessageBatchMaxSize(consumer_ptr, max_batch_size);
- }
- }
-
- // set log num & single log size
- int file_num = 3;
- int64 file_size = 104857600;
- Local<Value> _log_file_num_v = Nan::Get(options, Nan::New<String>("logFileNum").ToLocalChecked()).ToLocalChecked();
- Local<Value> _log_file_size_v = Nan::Get(options, Nan::New<String>("logFileSize").ToLocalChecked()).ToLocalChecked();
- if(_log_file_num_v->IsNumber())
- {
- file_num = _log_file_num_v->Int32Value();
- }
- if(_log_file_size_v->IsNumber())
- {
- file_size = _log_file_size_v->Int32Value();
- }
- SetPushConsumerLogFileNumAndSize(consumer_ptr, file_num, file_size);
-
- // set log level
- Local<Value> _log_level_v = Nan::Get(options, Nan::New<String>("logLevel").ToLocalChecked()).ToLocalChecked();
- if(_log_level_v->IsNumber())
- {
- int level = _log_level_v->Int32Value();
- SetPushConsumerLogLevel(consumer_ptr, (CLogLevel) level);
- }
+ const Napi::Value options = info[2];
+ if (options.IsObject()) {
+ // try to set options
+ SetOptions(options.ToObject());
+ }
}
-NAN_MODULE_INIT(RocketMQPushConsumer::Init)
-{
- uv_mutex_init(&_get_msg_ext_column_lock);
- Local<FunctionTemplate> tpl = Nan::New<FunctionTemplate>(New);
- tpl->SetClassName(Nan::New("RocketMQPushConsumer").ToLocalChecked());
- tpl->InstanceTemplate()->SetInternalFieldCount(1);
-
- Nan::SetPrototypeMethod(tpl, "start", Start);
- Nan::SetPrototypeMethod(tpl, "shutdown", Shutdown);
- Nan::SetPrototypeMethod(tpl, "subscribe", Subscribe);
- Nan::SetPrototypeMethod(tpl, "setListener", SetListener);
- Nan::SetPrototypeMethod(tpl, "setSessionCredentials", SetSessionCredentials);
-
- constructor.Reset(tpl->GetFunction());
- Nan::Set(target, Nan::New("PushConsumer").ToLocalChecked(), tpl->GetFunction());
+RocketMQPushConsumer::~RocketMQPushConsumer() {
+ consumer_.shutdown();
}
-NAN_METHOD(RocketMQPushConsumer::New)
-{
- Isolate* isolate = info.GetIsolate();
- Local<Context> context = Context::New(isolate);
-
- if(!info.IsConstructCall())
- {
- const int argc = 3;
- Local<Value> argv[argc] = { info[0], info[1], info[2] };
- Local<Function> _constructor = Nan::New<v8::Function>(constructor);
- info.GetReturnValue().Set(_constructor->NewInstance(context, argc, argv).ToLocalChecked());
- return;
- }
-
- Nan::Utf8String v8_group_id(info[0]);
- Nan::Utf8String v8_instance_name(info[1]);
- string group_id = *v8_group_id;
- string instance_name = *v8_instance_name;
- Local<Object> options = Nan::To<Object>(info[2]).ToLocalChecked();
- RocketMQPushConsumer* consumer = new RocketMQPushConsumer(group_id.c_str(), info[1]->IsNull() ? NULL : instance_name.c_str());
-
- consumer->Wrap(info.This());
-
- // try to set options
- try
- {
- consumer->SetOptions(options);
- }
- catch(const runtime_error e)
- {
- Nan::ThrowError(e.what());
- return;
- }
- catch(const std::exception& e)
- {
- Nan::ThrowError(e.what());
- return;
+void RocketMQPushConsumer::SetOptions(const Napi::Object& options) {
+ // set name server
+ Napi::Value name_server = options.Get("nameServer");
+ if (name_server.IsString()) {
+ consumer_.set_namesrv_addr(name_server.ToString());
+ }
+
+ // set group name
+ Napi::Value group_name = options.Get("groupName");
+ if (group_name.IsString()) {
+ consumer_.set_group_name(group_name.ToString());
+ }
+
+ // set thread count
+ Napi::Value thread_count = options.Get("threadCount");
+ if (thread_count.IsNumber()) {
+ consumer_.set_consume_thread_nums(thread_count.ToNumber());
+ }
+
+ // set message batch max size
+ Napi::Value max_batch_size = options.Get("maxBatchSize");
+ if (max_batch_size.IsNumber()) {
+ consumer_.set_consume_message_batch_max_size(max_batch_size.ToNumber());
+ }
+
+ // set log level
+ Napi::Value log_level = options.Get("logLevel");
+ if (log_level.IsNumber()) {
+ int32_t level = log_level.ToNumber();
+ if (level >= 0 && level < rocketmq::LogLevel::LOG_LEVEL_LEVEL_NUM) {
+ rocketmq::GetDefaultLoggerConfig().set_level(
+ static_cast<rocketmq::LogLevel>(level));
}
-
- info.GetReturnValue().Set(info.This());
-}
-
-NAN_METHOD(RocketMQPushConsumer::Start)
-{
- NAN_GET_CPUSHCONSUMER();
-
- Nan::Callback* callback = (info[0]->IsFunction()) ?
- new Nan::Callback(Nan::To<Function>(info[0]).ToLocalChecked()) :
- NULL;
-
- Nan::AsyncQueueWorker(new PushConsumerStartOrShutdownWorker(callback, consumer_ptr, PushConsumerWorkerType::START_PUSH_CONSUMER));
+ }
+
+ // set log directory
+ Napi::Value log_dir = options.Get("logDir");
+ if (log_dir.IsString()) {
+ rocketmq::GetDefaultLoggerConfig().set_path(log_dir.ToString());
+ }
+
+ // set log file size
+ Napi::Value log_file_size = options.Get("logFileSize");
+ if (log_file_size.IsNumber()) {
+ rocketmq::GetDefaultLoggerConfig().set_file_count(log_file_size.ToNumber());
+ }
+
+ // set log file num
+ Napi::Value log_file_num = options.Get("logFileNum");
+ if (log_file_num.IsNumber()) {
+ rocketmq::GetDefaultLoggerConfig().set_file_count(log_file_num.ToNumber());
+ }
}
-NAN_METHOD(RocketMQPushConsumer::Shutdown)
-{
- NAN_GET_CPUSHCONSUMER();
+Napi::Value RocketMQPushConsumer::SetSessionCredentials(
+ const Napi::CallbackInfo& info) {
+ Napi::String access_key = info[0].As<Napi::String>();
+ Napi::String secret_key = info[1].As<Napi::String>();
+ Napi::String ons_channel = info[2].As<Napi::String>();
- Nan::Callback* callback = (info[0]->IsFunction()) ?
- new Nan::Callback(Nan::To<Function>(info[0]).ToLocalChecked()) :
- NULL;
+ auto rpc_hook = std::make_shared<rocketmq::ClientRPCHook>(
+ rocketmq::SessionCredentials(access_key, secret_key, ons_channel));
+ consumer_.setRPCHook(rpc_hook);
- Nan::AsyncQueueWorker(new PushConsumerStartOrShutdownWorker(callback, consumer_ptr, PushConsumerWorkerType::SHUTDOWN_PUSH_CONSUMER));
+ return info.Env().Undefined();
}
-NAN_METHOD(RocketMQPushConsumer::Subscribe)
-{
- NAN_GET_CPUSHCONSUMER();
+class ConsumerStartWorker : public Napi::AsyncWorker {
+ public:
+ ConsumerStartWorker(const Napi::Function& callback,
+ const rocketmq::DefaultMQPushConsumer& consumer)
+ : Napi::AsyncWorker(callback), consumer_(consumer) {}
- Nan::Utf8String v8_topic(info[0]);
- Nan::Utf8String v8_expression(info[1]);
- string topic = *v8_topic;
- string expression = *v8_expression;
+ void Execute() override { consumer_.start(); }
- int ret;
- try
- {
- ret = ::Subscribe(consumer_ptr, topic.c_str(), expression.c_str());
- }
- catch(const runtime_error e)
- {
- Nan::ThrowError(e.what());
- return;
- }
- catch(const std::exception& e)
- {
- Nan::ThrowError(e.what());
- return;
- }
-
- info.GetReturnValue().Set(ret);
-}
-
-NAN_METHOD(RocketMQPushConsumer::SetListener)
-{
- RocketMQPushConsumer* consumer = ObjectWrap::Unwrap<RocketMQPushConsumer>(info.Holder());
- if(!consumer->listener_func.IsEmpty())
- {
- consumer->listener_func.Reset();
- }
+ private:
+ rocketmq::DefaultMQPushConsumer consumer_;
+};
- consumer->listener_func.Reset(Nan::To<Function>(info[0]).ToLocalChecked());
+Napi::Value RocketMQPushConsumer::Start(const Napi::CallbackInfo& info) {
+ Napi::Function callback = info[0].As<Napi::Function>();
+ auto* worker = new ConsumerStartWorker(callback, consumer_);
+ worker->Queue();
+ return info.Env().Undefined();
}
-NAN_METHOD(RocketMQPushConsumer::SetSessionCredentials)
-{
- NAN_GET_CPUSHCONSUMER();
+class ConsumerShutdownWorker : public Napi::AsyncWorker {
+ public:
+ ConsumerShutdownWorker(const Napi::Function& callback,
+ const rocketmq::DefaultMQPushConsumer& consumer)
+ : Napi::AsyncWorker(callback), consumer_(consumer) {}
- Nan::Utf8String access_key(info[0]);
- Nan::Utf8String secret_key(info[1]);
- Nan::Utf8String ons_channel(info[2]);
+ void Execute() override { consumer_.shutdown(); }
- int ret;
- try
- {
- ret = SetPushConsumerSessionCredentials(consumer_ptr, *access_key, *secret_key, *ons_channel);
- }
- catch(const runtime_error e)
- {
- Nan::ThrowError(e.what());
- return;
- }
- catch(const std::exception& e)
- {
- Nan::ThrowError(e.what());
- return;
- }
+ private:
+ rocketmq::DefaultMQPushConsumer consumer_;
+};
- info.GetReturnValue().Set(ret);
+Napi::Value RocketMQPushConsumer::Shutdown(const Napi::CallbackInfo& info) {
+ Napi::Function callback = info[0].As<Napi::Function>();
+ auto* worker = new ConsumerShutdownWorker(callback, consumer_);
+ worker->Queue();
+ return info.Env().Undefined();
}
-string RocketMQPushConsumer::GetMessageColumn(char* name, CMessageExt* msg)
-{
- const char* orig = NULL;
-
- uv_mutex_lock(&_get_msg_ext_column_lock);
- switch(name[0])
- {
- // topic / tags
- case 't':
- orig = name[1] == 'o' ? GetMessageTopic(msg) : GetMessageTags(msg);
- break;
-
- // keys
- case 'k':
- orig = GetMessageKeys(msg);
- break;
-
- // body
- case 'b':
- orig = GetMessageBody(msg);
- break;
-
- // msgId
- case 'm':
- orig = GetMessageId(msg);
- break;
-
- default:
- orig = NULL;
- break;
- }
+Napi::Value RocketMQPushConsumer::Subscribe(const Napi::CallbackInfo& info) {
+ Napi::String topic = info[0].As<Napi::String>();
+ Napi::String expression = info[1].As<Napi::String>();
- uv_mutex_unlock(&_get_msg_ext_column_lock);
+ consumer_.subscribe(topic, expression);
- if(!orig) return "";
- return orig;
+ return info.Env().Undefined();
}
-void close_async_done(uv_handle_t* handle)
-{
- free(handle);
-}
-
-void RocketMQPushConsumer::HandleMessageInEventLoop(uv_async_t* async)
-{
- Nan::HandleScope scope;
-
- Isolate* isolate = Isolate::GetCurrent();
- Local<Context> context = isolate->GetCurrentContext();
-
- MessageHandlerParam* param = (MessageHandlerParam*)(async->data);
- RocketMQPushConsumer* consumer = param->consumer;
- ConsumerAckInner* ack_inner = param->ack;
- CMessageExt* msg = param->msg;
-
- // create the JavaScript ack object and then set inner ack object
- Local<Function> cons = Nan::New<Function>(ConsumerAck::GetConstructor());
- Local<Object> ack_obj = cons->NewInstance(context, 0, 0).ToLocalChecked();
- ConsumerAck* ack = ObjectWrap::Unwrap<ConsumerAck>(ack_obj);
- ack->SetInner(ack_inner);
-
- // TODO: const char *GetMessageProperty(CMessageExt *msgExt, const char *key);
- Local<Object> result = Nan::New<Object>();
- for(int i = 0; i < 5; i++)
- {
- Nan::Set(
- result,
- Nan::New(message_handler_param_keys[i]).ToLocalChecked(),
- Nan::New(RocketMQPushConsumer::GetMessageColumn(message_handler_param_keys[i], msg)).ToLocalChecked());
+class ConsumerMessageListener : public rocketmq::MessageListenerConcurrently {
+ private:
+ struct MessageAndPromise {
+ rocketmq::MQMessageExt message;
+ std::promise<bool> promise;
+ };
+
+ public:
+ ConsumerMessageListener(Napi::Env& env, Napi::Function&& callback)
+ : listener_(
+ Listener::New(env, callback, "RocketMQ Message Listener", 0, 1)) {}
+
+ ~ConsumerMessageListener() { listener_.Release(); }
+
+ rocketmq::ConsumeStatus consumeMessage(
+ std::vector<rocketmq::MQMessageExt>& msgs) override {
+ for (auto& msg : msgs) {
+ MessageAndPromise data{msg, std::promise<bool>()};
+ auto future = data.promise.get_future();
+ listener_.BlockingCall(&data);
+ try {
+ if (!future.get()) {
+ return rocketmq::ConsumeStatus::RECONSUME_LATER;
+ }
+ } catch (const std::exception& e) {
+ return rocketmq::ConsumeStatus::RECONSUME_LATER;
+ }
}
-
- Local<Value> argv[2] = {
- result,
- ack_obj
- };
- Nan::Callback* callback = consumer->GetListenerFunction();
- callback->Call(2, argv);
-
- uv_close((uv_handle_t*)async, close_async_done);
-}
-
-int RocketMQPushConsumer::OnMessage(CPushConsumer* consumer_ptr, CMessageExt* msg_ext)
-{
- RocketMQPushConsumer* consumer = _push_consumer_map[consumer_ptr];
- if (!consumer)
- {
- // TODO: error handle
- return CConsumeStatus::E_RECONSUME_LATER;
+ return rocketmq::ConsumeStatus::CONSUME_SUCCESS;
+ };
+
+ static void CallJs(Napi::Env env,
+ Napi::Function listener,
+ std::nullptr_t*,
+ MessageAndPromise* data) {
+ if (env != nullptr) {
+ if (listener != nullptr) {
+ Napi::Object message = Napi::Object::New(env);
+ message.Set("topic", data->message.topic());
+ message.Set("tags", data->message.tags());
+ message.Set("keys", data->message.keys());
+ message.Set("body", data->message.body());
+ message.Set("msgId", data->message.msg_id());
+
+ Napi::Object ack = ConsumerAck::NewInstance(env);
+ ConsumerAck* consumer_ack = Napi::ObjectWrap<ConsumerAck>::Unwrap(ack);
+ consumer_ack->SetPromise(std::move(data->promise));
+
+ try {
+ listener.Call(Napi::Object::New(listener.Env()), {message, ack});
+ } catch (const Napi::Error& e) {
+ try {
+ consumer_ack->Done(std::current_exception());
+ } catch (const std::future_error&) {
+ // ignore
+ }
+ }
+ return;
+ }
}
+ data->promise.set_value(false);
+ }
- ConsumerAckInner ack_inner;
-
- // create async parameter
- MessageHandlerParam param;
- param.consumer = consumer;
- param.ack = &ack_inner;
- param.msg = msg_ext;
+ private:
+ using Listener =
+ Napi::TypedThreadSafeFunction<std::nullptr_t,
+ MessageAndPromise,
+ &ConsumerMessageListener::CallJs>;
- // create a new async handler and bind with `RocketMQPushConsumer::HandleMessageInEventLoop`
- uv_async_t* async = (uv_async_t*)malloc(sizeof(uv_async_t));
- uv_async_init(uv_default_loop(), async, RocketMQPushConsumer::HandleMessageInEventLoop);
- async->data = (void*)(¶m);
-
- // send async handler
- uv_async_send(async);
-
- // wait for result
- CConsumeStatus status = ack_inner.WaitResult();
+ Listener listener_;
+};
- return status;
+Napi::Value RocketMQPushConsumer::SetListener(const Napi::CallbackInfo& info) {
+ Napi::Env env = info.Env();
+ listener_.reset(
+ new ConsumerMessageListener(env, info[0].As<Napi::Function>()));
+ consumer_.registerMessageListener(listener_.get());
+ return env.Undefined();
}
-}
+} // namespace __node_rocketmq__
diff --git a/src/push_consumer.h b/src/push_consumer.h
index 6250d4e..47b3d14 100644
--- a/src/push_consumer.h
+++ b/src/push_consumer.h
@@ -17,62 +17,40 @@
#ifndef __ROCKETMQ_PUSH_CONSUMER_H__
#define __ROCKETMQ_PUSH_CONSUMER_H__
-#include <CPushConsumer.h>
-#include <uv.h>
-#include <nan.h>
#include <string>
-namespace __node_rocketmq__ {
+#include <napi.h>
+
+#include <DefaultMQPushConsumer.h>
-using v8::Context;
-using v8::Function;
-using v8::FunctionTemplate;
-using v8::Isolate;
-using v8::Local;
-using v8::Object;
-using v8::String;
-using v8::Value;
+namespace __node_rocketmq__ {
-class RocketMQPushConsumer : public Nan::ObjectWrap {
-public:
- static NAN_MODULE_INIT(Init);
- static int OnMessage(CPushConsumer* consumer_ptr, CMessageExt* msg_ext);
- static std::string GetMessageColumn(char* name, CMessageExt* msg);
+class ConsumerMessageListener;
-private:
- explicit RocketMQPushConsumer(const char* group_id, const char* instance_name);
- ~RocketMQPushConsumer();
+class RocketMQPushConsumer : public Napi::ObjectWrap<RocketMQPushConsumer> {
+ public:
+ static Napi::Object Init(Napi::Env env, Napi::Object exports);
- static NAN_METHOD(New);
- static NAN_METHOD(Start);
- static NAN_METHOD(Shutdown);
- static NAN_METHOD(Subscribe);
- static NAN_METHOD(SetListener);
- static NAN_METHOD(SetSessionCredentials);
+ RocketMQPushConsumer(const Napi::CallbackInfo& info);
+ ~RocketMQPushConsumer();
- static Nan::Persistent<v8::Function> constructor;
+ private:
+ Napi::Value SetSessionCredentials(const Napi::CallbackInfo& info);
- void SetOptions(Local<Object>);
- static void HandleMessageInEventLoop(uv_async_t* async);
+ Napi::Value Start(const Napi::CallbackInfo& info);
+ Napi::Value Shutdown(const Napi::CallbackInfo& info);
-protected:
- CPushConsumer* GetConsumer()
- {
- return consumer_ptr;
- }
+ Napi::Value Subscribe(const Napi::CallbackInfo& info);
+ Napi::Value SetListener(const Napi::CallbackInfo& info);
- Nan::Callback* GetListenerFunction()
- {
- Nan::Callback* cb;
- cb = &listener_func;
- return cb;
- }
+ private:
+ void SetOptions(const Napi::Object& options);
-private:
- CPushConsumer* consumer_ptr;
- Nan::Callback listener_func;
+ private:
+ rocketmq::DefaultMQPushConsumer consumer_;
+ std::unique_ptr<ConsumerMessageListener> listener_;
};
-}
+} // namespace __node_rocketmq__
#endif
diff --git a/src/rocketmq.cpp b/src/rocketmq.cpp
index 3b426fb..63fd41f 100644
--- a/src/rocketmq.cpp
+++ b/src/rocketmq.cpp
@@ -14,35 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#include <nan.h>
+#include <napi.h>
+#include "consumer_ack.h"
#include "producer.h"
#include "push_consumer.h"
-#include "consumer_ack.h"
namespace __node_rocketmq__ {
-#if defined(__APPLE__)
-uv_lib_t lib;
-
-NAN_METHOD(DLOpen)
-{
- Nan::Utf8String filename(info[0]);
- uv_dlopen(*filename, &lib);
+Napi::Object Init(Napi::Env env, Napi::Object exports) {
+ RocketMQProducer::Init(env, exports);
+ RocketMQPushConsumer::Init(env, exports);
+ ConsumerAck::Init(env, exports);
+ return exports;
}
-#endif
-NAN_MODULE_INIT(Init)
-{
- RocketMQProducer::Init(target);
- RocketMQPushConsumer::Init(target);
- ConsumerAck::Init(target);
+NODE_API_MODULE(rocketmq, Init)
-#if defined(__APPLE__)
- Nan::Set(target, Nan::New("macosDLOpen").ToLocalChecked(), Nan::New<v8::FunctionTemplate>(DLOpen)->GetFunction());
-#endif
-}
-
-NODE_MODULE(rocketmq, Init)
-
-}
+} // namespace __node_rocketmq__
diff --git a/src/workers/producer/send_message.h b/src/workers/producer/send_message.h
deleted file mode 100644
index 6974e5a..0000000
--- a/src/workers/producer/send_message.h
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __ROCKETMQ_SEND_MESSAGE_H__
-#define __ROCKETMQ_SEND_MESSAGE_H__
-
-#include <nan.h>
-#include <CProducer.h>
-#include <MQClientException.h>
-
-namespace __node_rocketmq__ {
-
-using namespace std;
-
-class ProducerSendMessageWorker : public Nan::AsyncWorker {
-public:
- ProducerSendMessageWorker(Nan::Callback* callback, RocketMQProducer* producer, CMessage* msg) :
- AsyncWorker(callback),
- msg(msg),
- producer(producer)
- {
- }
-
- ~ProducerSendMessageWorker()
- {
- DestroyMessage(msg);
- }
-
- void Execute()
- {
- try
- {
- SendMessageSync(producer->GetProducer(), msg, &send_ret);
- }
- catch(const runtime_error e)
- {
- SetErrorMessage(e.what());
- }
- catch(const std::exception& e)
- {
- SetErrorMessage(e.what());
- }
- }
-
- void HandleOKCallback()
- {
- Nan::HandleScope scope;
-
- Local<Value> argv[] = {
- Nan::Undefined(),
- Nan::New<v8::Number>((unsigned int)send_ret.sendStatus),
- Nan::New<v8::String>(send_ret.msgId).ToLocalChecked(),
- Nan::New<v8::Number>((long long)send_ret.offset)
- };
- callback->Call(4, argv);
- }
-
-private:
- CMessage* msg;
- RocketMQProducer* producer;
-
- CSendResult send_ret;
-};
-
-}
-
-#endif
diff --git a/src/workers/producer/start_or_shutdown.h b/src/workers/producer/start_or_shutdown.h
deleted file mode 100644
index dde99a6..0000000
--- a/src/workers/producer/start_or_shutdown.h
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __ROCKETMQ_PRODUCER_START_OR_SHUTDOWN_H__
-#define __ROCKETMQ_PRODUCER_START_OR_SHUTDOWN_H__
-
-#include <nan.h>
-#include <CProducer.h>
-#include <MQClientException.h>
-
-namespace __node_rocketmq__ {
-
-using namespace std;
-
-enum ProducerWorkerType {
- START_PRODUCER = 0,
- SHUTDOWN_PRODUCER
-};
-
-class ProducerStartOrShutdownWorker : public Nan::AsyncWorker {
-public:
- ProducerStartOrShutdownWorker(Nan::Callback* callback, CProducer* producer_ptr, ProducerWorkerType type) :
- AsyncWorker(callback),
- producer_ptr(producer_ptr),
- ret(0),
- type(type)
- {
- }
-
- ~ProducerStartOrShutdownWorker()
- {
- }
-
- void Execute()
- {
- try
- {
- switch(type) {
- case START_PRODUCER:
- ret = StartProducer(producer_ptr); break;
- case SHUTDOWN_PRODUCER:
- ret = ShutdownProducer(producer_ptr); break;
- default: break;
- }
- }
- catch(const runtime_error e)
- {
- SetErrorMessage(e.what());
- }
- catch(const exception& e)
- {
- SetErrorMessage(e.what());
- }
- }
-
- void HandleOKCallback()
- {
- Nan::HandleScope scope;
-
- Local<Value> argv[] = {
- Nan::Undefined(),
- Nan::New<v8::Number>((int)ret),
- };
- callback->Call(2, argv);
- }
-
-private:
- CProducer* producer_ptr;
- int ret;
- ProducerWorkerType type;
-};
-
-}
-
-#endif
diff --git a/src/workers/push_consumer/start_or_shutdown.h b/src/workers/push_consumer/start_or_shutdown.h
deleted file mode 100644
index 1169059..0000000
--- a/src/workers/push_consumer/start_or_shutdown.h
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __ROCKETMQ_PUSH_CONSUMER_START_OR_SHUTDOWN_H__
-#define __ROCKETMQ_PUSH_CONSUMER_START_OR_SHUTDOWN_H__
-
-#include <nan.h>
-#include <CPushConsumer.h>
-#include <MQClientException.h>
-
-namespace __node_rocketmq__ {
-
-using namespace std;
-
-enum PushConsumerWorkerType {
- START_PUSH_CONSUMER = 0,
- SHUTDOWN_PUSH_CONSUMER
-};
-
-class PushConsumerStartOrShutdownWorker : public Nan::AsyncWorker {
-public:
- PushConsumerStartOrShutdownWorker(Nan::Callback* callback, CPushConsumer* consumer_ptr, PushConsumerWorkerType type) :
- AsyncWorker(callback),
- consumer_ptr(consumer_ptr),
- ret(0),
- type(type)
- {
- }
-
- ~PushConsumerStartOrShutdownWorker()
- {
- }
-
- void Execute()
- {
- try
- {
- switch(type) {
- case START_PUSH_CONSUMER:
- ret = StartPushConsumer(consumer_ptr); break;
- case SHUTDOWN_PUSH_CONSUMER:
- ret = ShutdownPushConsumer(consumer_ptr); break;
- default: break;
- }
- }
- catch(const runtime_error e)
- {
- SetErrorMessage(e.what());
- }
- catch(const exception& e)
- {
- SetErrorMessage(e.what());
- }
- }
-
- void HandleOKCallback()
- {
- Nan::HandleScope scope;
-
- Local<Value> argv[] = {
- Nan::Undefined(),
- Nan::New<v8::Number>((int)ret),
- };
- callback->Call(2, argv);
- }
-
-private:
- CPushConsumer* consumer_ptr;
- int ret;
- PushConsumerWorkerType type;
-};
-
-}
-
-#endif