You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by if...@apache.org on 2021/03/28 07:50:13 UTC

[rocketmq-client-nodejs] branch n-api updated (494ae5d -> bee1190)

This is an automated email from the ASF dual-hosted git repository.

ifplusor pushed a change to branch n-api
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-nodejs.git.


 discard 494ae5d  fix: compile on linux and ci
 discard f38eae3  chore: bump rocketmq-client-cpp to compatible with the old version of CMake
 discard d7d8287  refactor: powered by N-API and rocketmq-client-cpp@re_dev
     new bee1190  refactor: powered by N-API and rocketmq-client-cpp@re_dev

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (494ae5d)
            \
             N -- N -- N   refs/heads/n-api (bee1190)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 deps/rocketmq         |  2 +-
 src/producer.cpp      | 63 +++++++++++++++++++++--------------------
 src/push_consumer.cpp | 77 +++++++++++++++++++++++++--------------------------
 3 files changed, 70 insertions(+), 72 deletions(-)

[rocketmq-client-nodejs] 01/01: refactor: powered by N-API and rocketmq-client-cpp@re_dev

Posted by if...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ifplusor pushed a commit to branch n-api
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-nodejs.git

commit bee1190bef77a620a2b97e7cf9b117bdc83aad91
Author: James Yin <yw...@hotmail.com>
AuthorDate: Fri Mar 26 12:30:05 2021 +0800

    refactor: powered by N-API and rocketmq-client-cpp@re_dev
---
 .clang-format                                 | 111 ++++++
 .gitignore                                    |   6 +-
 .gitmodules                                   |   5 +-
 README.md                                     |  10 +-
 binding.gyp                                   |  28 +-
 deps/rocketmq                                 |   2 +-
 lib/common.js                                 |  28 --
 lib/env_init.js                               |  32 --
 lib/producer.js                               |   6 +-
 lib/push_consumer.js                          |   6 +-
 package.json                                  |  22 +-
 script/download_lib.js                        | 131 -------
 script/get_linux_distro_route.js              | 102 -----
 src/consumer_ack.cpp                          |  86 ++--
 src/consumer_ack.h                            |  51 +--
 src/consumer_ack_inner.cpp                    |  77 ----
 src/consumer_ack_inner.h                      |  42 --
 src/producer.cpp                              | 439 +++++++++++----------
 src/producer.h                                |  48 +--
 src/push_consumer.cpp                         | 539 ++++++++++----------------
 src/push_consumer.h                           |  66 ++--
 src/rocketmq.cpp                              |  32 +-
 src/workers/producer/send_message.h           |  80 ----
 src/workers/producer/start_or_shutdown.h      |  88 -----
 src/workers/push_consumer/start_or_shutdown.h |  88 -----
 25 files changed, 686 insertions(+), 1439 deletions(-)

diff --git a/.clang-format b/.clang-format
new file mode 100644
index 0000000..4aad29c
--- /dev/null
+++ b/.clang-format
@@ -0,0 +1,111 @@
+---
+Language:        Cpp
+# BasedOnStyle:  Google
+AccessModifierOffset: -1
+AlignAfterOpenBracket: Align
+AlignConsecutiveAssignments: false
+AlignConsecutiveDeclarations: false
+AlignEscapedNewlines: Right
+AlignOperands:   true
+AlignTrailingComments: true
+AllowAllParametersOfDeclarationOnNextLine: true
+AllowShortBlocksOnASingleLine: false
+AllowShortCaseLabelsOnASingleLine: false
+AllowShortFunctionsOnASingleLine: Inline
+AllowShortIfStatementsOnASingleLine: true
+AllowShortLoopsOnASingleLine: true
+AlwaysBreakAfterDefinitionReturnType: None
+AlwaysBreakAfterReturnType: None
+AlwaysBreakBeforeMultilineStrings: false
+AlwaysBreakTemplateDeclarations: true
+BinPackArguments: false
+BinPackParameters: false
+BraceWrapping:
+  AfterClass:      false
+  AfterControlStatement: false
+  AfterEnum:       false
+  AfterFunction:   false
+  AfterNamespace:  false
+  AfterObjCDeclaration: false
+  AfterStruct:     false
+  AfterUnion:      false
+  AfterExternBlock: false
+  BeforeCatch:     false
+  BeforeElse:      false
+  IndentBraces:    false
+  SplitEmptyFunction: true
+  SplitEmptyRecord: true
+  SplitEmptyNamespace: true
+BreakBeforeBinaryOperators: None
+BreakBeforeBraces: Attach
+BreakBeforeInheritanceComma: false
+BreakBeforeTernaryOperators: true
+BreakConstructorInitializersBeforeComma: false
+BreakConstructorInitializers: BeforeColon
+BreakAfterJavaFieldAnnotations: false
+BreakStringLiterals: true
+ColumnLimit:      80
+CommentPragmas:  '^ IWYU pragma:'
+CompactNamespaces: false
+ConstructorInitializerAllOnOneLineOrOnePerLine: true
+ConstructorInitializerIndentWidth: 4
+ContinuationIndentWidth: 4
+Cpp11BracedListStyle: true
+DerivePointerAlignment: false
+DisableFormat:   false
+ExperimentalAutoDetectBinPacking: false
+FixNamespaceComments: true
+ForEachMacros:
+  - foreach
+  - Q_FOREACH
+  - BOOST_FOREACH
+IncludeBlocks:   Preserve
+IncludeCategories:
+  - Regex:           '^<ext/.*\.h>'
+    Priority:        2
+  - Regex:           '^<.*\.h>'
+    Priority:        1
+  - Regex:           '^<.*'
+    Priority:        2
+  - Regex:           '.*'
+    Priority:        3
+IncludeIsMainRegex: '([-_](test|unittest))?$'
+IndentCaseLabels: true
+IndentPPDirectives: None
+IndentWidth:     2
+IndentWrappedFunctionNames: false
+JavaScriptQuotes: Leave
+JavaScriptWrapImports: true
+KeepEmptyLinesAtTheStartOfBlocks: false
+MacroBlockBegin: ''
+MacroBlockEnd:   ''
+MaxEmptyLinesToKeep: 1
+NamespaceIndentation: None
+ObjCBlockIndentWidth: 2
+ObjCSpaceAfterProperty: false
+ObjCSpaceBeforeProtocolList: false
+PenaltyBreakAssignment: 2
+PenaltyBreakBeforeFirstCallParameter: 1
+PenaltyBreakComment: 300
+PenaltyBreakFirstLessLess: 120
+PenaltyBreakString: 1000
+PenaltyExcessCharacter: 1000000
+PenaltyReturnTypeOnItsOwnLine: 200
+PointerAlignment: Left
+ReflowComments:  true
+SortIncludes:    true
+SortUsingDeclarations: true
+SpaceAfterCStyleCast: false
+SpaceAfterTemplateKeyword: true
+SpaceBeforeAssignmentOperators: true
+SpaceBeforeParens: ControlStatements
+SpaceInEmptyParentheses: false
+SpacesBeforeTrailingComments: 2
+SpacesInAngles:  false
+SpacesInContainerLiterals: true
+SpacesInCStyleCastParentheses: false
+SpacesInParentheses: false
+SpacesInSquareBrackets: false
+Standard:        Auto
+TabWidth:        8
+UseTab:          Never
diff --git a/.gitignore b/.gitignore
index a50fd0b..6978ed0 100644
--- a/.gitignore
+++ b/.gitignore
@@ -197,6 +197,8 @@ $RECYCLE.BIN/
 # Windows shortcuts
 *.lnk
 
-build
 package-lock.json
-.vscode
+.vscode/
+build/
+Debug/
+Release/
diff --git a/.gitmodules b/.gitmodules
index c9fc461..51ac186 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -1,3 +1,4 @@
 [submodule "deps/rocketmq"]
-path = deps/rocketmq
-url = https://github.com/apache/rocketmq-client-cpp.git
+	path = deps/rocketmq
+	url = https://github.com/apache/rocketmq-client-cpp.git
+	branch = re_dev
diff --git a/README.md b/README.md
index cdb8f65..dd15ba7 100644
--- a/README.md
+++ b/README.md
@@ -6,12 +6,12 @@
 [![TravisCI](https://travis-ci.org/apache/rocketmq-client-nodejs.svg)](https://travis-ci.org/apache/rocketmq-client-nodejs)
 [![Dependency](https://david-dm.org/apache/rocketmq-client-nodejs.svg)](https://david-dm.org/apache/rocketmq-client-nodejs)
 
-This official Node.js client is a lightweight wrapper around  [rocketmq-client-cpp](https://github.com/apache/rocketmq-client-cpp), a finely tuned CPP client.
+This official Node.js client is a lightweight wrapper around [re_dev branch of rocketmq-client-cpp](https://github.com/apache/rocketmq-client-cpp/tree/re_dev), a finely tuned CPP client.
 
 
 > **Notice 1:** This client is still in `dev` version. Use it cautiously in production.
 
-> **Notice 2:** This SDK is now only support macOS and Ubuntu **14.04**. Ubuntu 16+ is not supported and CentOS is not tested yet.
+> **Notice 2:** This SDK is now only tested on macOS. Ubuntu and CentOS is not tested yet.
 
 ## Installation
 
@@ -50,7 +50,8 @@ new Producer(groupId[, instanceName][, options]);
   - `compressLevel`: the compress level (0-9) of this producer, default to `5` where `0` is fastest and `9` is most compressed;
   - `sendMessageTimeout`: send message timeout millisecond, default to `3000` and suggestion is 2000 - 3000ms;
   - `maxMessageSize`: max message size with unit (B), default to `1024 * 128` which means 128K;
-  - `logFileNum`: C++ core logic log file number, default to 3 and log file path is `$HOME/logs/rocketmq-cpp`;
+  - `logDir`: the folder where C++ core logic log store, default log file path is `$HOME/logs/rocketmq-cpp`;
+  - `logFileNum`: C++ core logic log file number, default to 3;
   - `logFileSize`: size of each C++ core logic log file with unit (B);
   - `logLevel`: C++ core logic log level in `"fatal"`, `"error"`, `"warn"`, `"info"`, `"debug"`, `"trace"` and `"num"`.
 
@@ -177,7 +178,8 @@ new PushConsumer(groupId[, instanceName][, options]);
   - `nameServer`: the name server of RocketMQ;
   - `threadCount`: the thread number of underlying C++ logic;
   - `maxBatchSize`: message max batch size;
-  - `logFileNum`: C++ core logic log file number, default to 3 and log file path is `$HOME/logs/rocketmq-cpp`;
+  - `logDir`: the folder where C++ core logic log store, default log file path is `$HOME/logs/rocketmq-cpp`;
+  - `logFileNum`: C++ core logic log file number, default to 3;
   - `logFileSize`: size of each C++ core logic log file with unit (B);
   - `logLevel`: C++ core logic log level in `"fatal"`, `"error"`, `"warn"`, `"info"`, `"debug"`, `"trace"` and `"num"`.
 
diff --git a/binding.gyp b/binding.gyp
index 8f4a46e..c7c1d71 100644
--- a/binding.gyp
+++ b/binding.gyp
@@ -21,21 +21,21 @@
         "src/rocketmq.cpp",
         "src/producer.cpp",
         "src/push_consumer.cpp",
-        "src/consumer_ack.cpp",
-        "src/consumer_ack_inner.cpp"
+        "src/consumer_ack.cpp"
       ],
       "include_dirs": [
         "deps/rocketmq/include",
-        "<!(node -e \"require('nan')\")"
+        "<!@(node -p \"require('node-addon-api').include\")"
+      ],
+      "library_dirs": [
+        "<(module_root_dir)/deps/rocketmq/bin"
       ],
       "conditions": [
         ["OS==\"linux\"", {
-          "libraries": [
-            "<(module_root_dir)/deps/lib/librocketmq.a"
-          ],
-          "cflags_cc!": [ "-fno-exceptions", "-pthread", "-Wl,--no-as-needed", "-ldl" ],
-          "cflags_cc": [ "-Wno-ignored-qualifiers" ],
-          "cflags": [ "-std=c++11", "-g" ]
+          "libraries": [ "-lrocketmq" ],
+          "cflags_cc!": [ "-fno-exceptions", "-fno-rtti", "-pthread", "-Wl,--no-as-needed", "-ldl" ],
+          "cflags_cc": [ "-Wall", "-std=c++11" ],
+          "cflags": [ "-g" ]
         }],
         ["OS==\"win\"", {
           "libraries": [
@@ -49,13 +49,13 @@
           ]
         }],
         ["OS==\"mac\"", {
+          "libraries": [ "-lrocketmq" ],
           "xcode_settings": {
-            "GCC_ENABLE_CPP_EXCEPTIONS": "YES"
+            "GCC_ENABLE_CPP_EXCEPTIONS": "YES",
+            'GCC_ENABLE_CPP_RTTI': 'YES'
           },
-          "cflags!": [ "-fno-exceptions" ],
-          "cflags_cc!": [ "-fno-exceptions", "-pthread", "-Wl,--no-as-needed", "-ldl" ],
-          "cflags_cc": [ "-Wno-ignored-qualifiers" ],
-          "cflags": [ "-std=c++11", "-stdlib=libc++" ]
+          "cflags_cc!": [ "-fno-exceptions", "-fno-rtti", "-pthread", "-Wl,--no-as-needed", "-ldl" ],
+          "cflags_cc": [ "-Wall", "-std=c++11", "-stdlib=libc++" ]
         }]
       ]
     }
diff --git a/deps/rocketmq b/deps/rocketmq
index d5887b6..8c783fd 160000
--- a/deps/rocketmq
+++ b/deps/rocketmq
@@ -1 +1 @@
-Subproject commit d5887b63ddbba16fec562f64dca7f77ce9ca0bb1
+Subproject commit 8c783fda2979979382776a5edd46cae93eacf4cb
diff --git a/lib/common.js b/lib/common.js
deleted file mode 100644
index 2386605..0000000
--- a/lib/common.js
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-"use strict";
-
-exports.requireBinding = function(name) {
-    let mod;
-    try {
-        mod = require(`../build/Debug/${name}`);
-    } catch(e) {
-        mod = require(`../build/Release/${name}`);
-    }
-
-    return mod;
-};
diff --git a/lib/env_init.js b/lib/env_init.js
deleted file mode 100644
index 0c26a1d..0000000
--- a/lib/env_init.js
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-"use strict";
-
-const os = require("os");
-const path = require("path");
-
-const common = require("./common");
-
-const binding = common.requireBinding("rocketmq");
-
-switch(os.platform()) {
-case "darwin":
-    process.env.EVENT_NOKQUEUE = "1";
-    binding.macosDLOpen(path.join(__dirname, "../deps/lib/librocketmq.dylib"));
-    break;
-default: break;
-}
diff --git a/lib/producer.js b/lib/producer.js
index f4f5c93..02b777e 100644
--- a/lib/producer.js
+++ b/lib/producer.js
@@ -16,13 +16,9 @@
  */
 "use strict";
 
-require("./env_init");
-
 const assert = require("assert");
 
-const common = require("./common");
-
-const binding = common.requireBinding("rocketmq");
+const binding = require("bindings")("rocketmq");
 
 const START_OR_SHUTDOWN = Symbol("RocketMQProducer#startOrShutdown");
 
diff --git a/lib/push_consumer.js b/lib/push_consumer.js
index 3bb51de..7cf97ab 100644
--- a/lib/push_consumer.js
+++ b/lib/push_consumer.js
@@ -16,14 +16,10 @@
  */
 "use strict";
 
-require("./env_init");
-
 const assert = require("assert");
 const EventEmitter = require("events").EventEmitter;
 
-const common = require("./common");
-
-const binding = common.requireBinding("rocketmq");
+const binding = require("bindings")("rocketmq");
 
 const START_OR_SHUTDOWN = Symbol("RocketMQPushConsumer#startOrShutdown");
 const START_STATUS = {
diff --git a/package.json b/package.json
index 074198d..f2ed65e 100644
--- a/package.json
+++ b/package.json
@@ -1,25 +1,23 @@
 {
   "name": "apache-rocketmq",
-  "version": "1.0.0-rc1",
-  "cppSDKVersion": "1.2.0",
+  "version": "2.0.0-rc1",
+  "cppSDKVersion": "3.0.0",
   "description": "RocketMQ binding for Node.js",
+  "license": "Apache-2.0",
+  "author": "James Yin <if...@apache.org>",
   "main": "index.js",
   "scripts": {
+    "preinstall": "git submodule update --init --recommend-shallow",
+    "install": "if [ ! -d deps/rocketmq/bin ]; then deps/rocketmq/build.sh; fi && node-gyp rebuild",
     "test": "npm run lint && echo 'temp example test' && node example/producer.js && node example/push_consumer.js",
-    "lint": "eslint .",
-    "install": "node ./script/download_lib.js && node-gyp rebuild"
+    "lint": "eslint ."
   },
-  "author": "XadillaX <i...@2333.moe>",
-  "license": "Apache-2.0",
   "dependencies": {
-    "co": "^4.6.0",
-    "destroy": "^1.0.4",
-    "getos": "^3.1.1",
-    "mkdirp": "^0.5.1",
-    "nan": "^2.11.1",
-    "urllib": "^2.31.3"
+    "bindings": "^1.5.0",
+    "node-addon-api": "^3.1.0"
   },
   "devDependencies": {
+    "co": "^4.6.0",
     "eslint": "^5.9.0",
     "eslint-config-rocketmq-style": "^1.0.0"
   }
diff --git a/script/download_lib.js b/script/download_lib.js
deleted file mode 100644
index 5cfebfd..0000000
--- a/script/download_lib.js
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-"use strict";
-
-const fs = require("fs");
-const os = require("os");
-const path = require("path");
-
-const co = require("co");
-const destroy = require("destroy");
-const _mkdirp = require("mkdirp");
-const urllib = require("urllib");
-
-const getLinuxDistroRoute = require("./get_linux_distro_route");
-const pkg = require("../package");
-
-let REGISTRY_MIRROR =
-    process.env.NODE_ROCKETMQ_REGISTRY ||
-    "https://opensource-rocketmq-client.oss-cn-hangzhou.aliyuncs.com";
-if(!REGISTRY_MIRROR.endsWith("/")) REGISTRY_MIRROR += "/";
-
-const CPP_SDK_VERSION = pkg.cppSDKVersion;
-const LIB_DIR = path.join(__dirname, "..", "deps", "lib");
-const URL_ROOT = `${REGISTRY_MIRROR}cpp-client`;
-
-function mkdirp(dir) {
-    return new Promise((resolve, reject) => {
-        _mkdirp(dir, null, err => {
-            if(err) reject(err);
-            else resolve();
-        });
-    });
-}
-
-function *getUrlArray() {
-    const platform = os.platform();
-    const ret = [];
-    let distro;
-
-    switch(platform) {
-    case "win32":
-        ret.push(`${URL_ROOT}/windows/${CPP_SDK_VERSION}/rocketmq-client-cpp.dll`);
-        ret.push(`${URL_ROOT}/windows/${CPP_SDK_VERSION}/rocketmq-client-cpp.lib`);
-        break;
-
-    case "darwin":
-        ret.push(`${URL_ROOT}/mac/${CPP_SDK_VERSION}/librocketmq.dylib`);
-        break;
-
-    case "linux":
-        distro = yield getLinuxDistroRoute();
-        ret.push(`${URL_ROOT}/linux/${CPP_SDK_VERSION}/${distro}/librocketmq.a`);
-        break;
-
-    default: throw new Error(`Unsupported platform ${platform}`);
-    }
-
-    return ret;
-}
-
-co(function *() {
-    let urls;
-    try {
-        urls = yield getUrlArray();
-    } catch(e) {
-        console.error(`[rocketmq sdk] [error] ${e.message}`);
-        process.exit(4);
-    }
-
-    yield mkdirp(LIB_DIR);
-    let writeTimes = 0;
-    for(const url of urls) {
-        console.log(`[rocketmq sdk] [info] downloading [${url}]...`);
-
-        const resp = yield urllib.request(url, {
-            timeout: 60000 * 5,
-            followRedirect: true,
-            streaming: true
-        });
-
-        if(resp.status !== 200) {
-            destroy(resp.res);
-            console.error(`[rocketmq sdk] [error] error status ${resp.status} while downloading [${url}].`);
-            process.exit(4);
-        }
-
-        const readStream = resp.res;
-        const filename = path.join(LIB_DIR, path.basename(url));
-        const writeStream = fs.createWriteStream(filename, {
-            encoding: "binary"
-        });
-
-        // eslint-disable-next-line
-        function handleDownladCallback(err) {
-            if(err) {
-                console.error(`[rocketmq sdk] [error] error occurred while downloading [${url}] to [${filename}].`);
-                console.error(err.stack);
-                process.exit(4);
-            }
-
-            writeTimes++;
-            destroy(resp.res);
-
-            console.log(`[rocketmq sdk] [info] downloaded library [${url}].`);
-            if(writeTimes === urls.length) {
-                console.log("[rocketmq sdk] [info] all libraries have been written to disk.");
-                process.exit(0);
-            }
-        }
-
-        readStream.on("error", handleDownladCallback);
-        writeStream.on("error", handleDownladCallback);
-        writeStream.on("finish", handleDownladCallback);
-
-        readStream.pipe(writeStream);
-    }
-});
diff --git a/script/get_linux_distro_route.js b/script/get_linux_distro_route.js
deleted file mode 100644
index f03c827..0000000
--- a/script/get_linux_distro_route.js
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-"use strict";
-
-const assert = require("assert");
-
-const getos = require("getos");
-
-const RHEL_MAP_ARRAY = [
-    "Centos",
-    "Red Hat Linux",
-    "RHEL",
-    "Scientific Linux",
-    "ScientificSL",
-    "ScientificCERNSLC",
-    "ScientificFermiLTS",
-    "ScientificSLF"
-];
-const NORMAL_MAP_ARRAY = [
-    "Alpine Linux",
-    "Amazon Linux",
-    "Arch Linux",
-    "Chakra",
-    "Debian",
-    "elementary OS",
-    "IYCC",
-    "Linux Mint",
-    "Manjaro Linux",
-    "Ubuntu Linux"
-];
-
-function realGetLinuxDistroRoute(dist, release) {
-    let major;
-    if(release) {
-        major = Number(release.split(".")[0]);
-    }
-
-    // RHEL Distros
-    if(RHEL_MAP_ARRAY.includes(dist)) {
-        assert(major >= 5 && major <= 7, `Only support ${dist} 5-7.`);
-        return `RHEL${major}.x`;
-    }
-
-    // Fedora
-    if("Fedora" === dist) {
-        if(major <= 18) {
-            return "RHEL6.x";
-        } else if(major === 19) {
-            return "RHEL7.x";
-        }
-    }
-
-    if("Ubuntu Linux" === dist) {
-        assert([ 14, 16, 18 ].includes(major));
-        return `UBUNTU/${major}.04`;
-    }
-
-    if("Debian" === dist) {
-        assert(major >= 8 && major <= 10);
-        return `UBUNTU/${(major + 2) / 2}.04`;
-    }
-
-    // Ubuntu Distros
-    if(!NORMAL_MAP_ARRAY.includes(dist)) {
-        console.error(`[rocketmq sdk] [warn] ${dist} may not supported, fallback to use Ubuntu library.`);
-    }
-
-    return "UBUNTU/14.04";
-}
-
-function getLinuxDistroRoute() {
-    return new Promise((resolve, reject) => {
-        getos(function(err, ret) {
-            if(err) return reject(err);
-
-            let route;
-            try {
-                route = realGetLinuxDistroRoute(ret.dist, ret.release);
-            } catch(e) {
-                return reject(e);
-            }
-
-            resolve(route);
-        });
-    });
-}
-
-module.exports = getLinuxDistroRoute;
diff --git a/src/consumer_ack.cpp b/src/consumer_ack.cpp
index e2f5c09..8de4aaf 100644
--- a/src/consumer_ack.cpp
+++ b/src/consumer_ack.cpp
@@ -15,76 +15,52 @@
  * limitations under the License.
  */
 #include "consumer_ack.h"
+#include <exception>
+#include "napi.h"
 
 namespace __node_rocketmq__ {
 
-Nan::Persistent<Function> ConsumerAck::constructor;
+Napi::Object ConsumerAck::Init(Napi::Env env, Napi::Object exports) {
+  Napi::Function func = DefineClass(
+      env, "ConsumerAck", {InstanceMethod<&ConsumerAck::Done>("done")});
 
-ConsumerAck::ConsumerAck() :
-    inner(NULL)
-{
-}
+  Napi::FunctionReference* constructor = new Napi::FunctionReference();
+  *constructor = Napi::Persistent(func);
+  env.SetInstanceData<Napi::FunctionReference>(constructor);
 
-ConsumerAck::~ConsumerAck()
-{
-    inner = NULL;
+  exports.Set("ConsumerAck", func);
+  return exports;
 }
 
-NAN_MODULE_INIT(ConsumerAck::Init)
-{
-    Local<FunctionTemplate> tpl = Nan::New<FunctionTemplate>(New);
-    tpl->SetClassName(Nan::New("ConsumerAck").ToLocalChecked());
-    tpl->InstanceTemplate()->SetInternalFieldCount(1);
-
-    Nan::SetPrototypeMethod(tpl, "done", Done);
-
-    constructor.Reset(tpl->GetFunction());
-    Nan::Set(target, Nan::New("ConsumerAck").ToLocalChecked(), tpl->GetFunction());
+Napi::Object ConsumerAck::NewInstance(Napi::Env env) {
+  Napi::Object obj = env.GetInstanceData<Napi::FunctionReference>()->New({});
+  return obj;
 }
 
-NAN_METHOD(ConsumerAck::New)
-{
-    Isolate* isolate = info.GetIsolate();
-    Local<Context> context = Context::New(isolate);
-
-    if(!info.IsConstructCall())
-    {
-        Local<Function> _constructor = Nan::New<Function>(constructor);
-        info.GetReturnValue().Set(_constructor->NewInstance(context, 0, 0).ToLocalChecked());
-        return;
-    }
+ConsumerAck::ConsumerAck(const Napi::CallbackInfo& info)
+    : Napi::ObjectWrap<ConsumerAck>(info) {}
 
-    ConsumerAck* producer = new ConsumerAck();
-    producer->Wrap(info.This());
-    info.GetReturnValue().Set(info.This());
+void ConsumerAck::SetPromise(std::promise<bool>&& promise) {
+  promise_ = std::move(promise);
 }
 
-NAN_METHOD(ConsumerAck::Done)
-{
-    ConsumerAck* ack = ObjectWrap::Unwrap<ConsumerAck>(info.Holder());
-    bool succ = true;
-
-    if(info.Length() >= 1)
-    {
-        succ = (info[0]->IsUndefined() || info[0]->IsNull() || Nan::To<bool>(info[0]).FromJust());
-    }
+void ConsumerAck::Done(bool ack) {
+  promise_.set_value(ack);
+}
 
-    // call inner ack's `Ack` function to emit the true `Acker`'s `Ack` function
-    // and finish waiting of consume thread
-    CConsumeStatus status = succ ?
-        CConsumeStatus::E_CONSUME_SUCCESS :
-        CConsumeStatus::E_RECONSUME_LATER;
-    ack->Ack(status);
+void ConsumerAck::Done(std::exception_ptr exception) {
+  promise_.set_exception(exception);
 }
 
-void ConsumerAck::Ack(CConsumeStatus status)
-{
-    if(inner)
-    {
-        // call inner ack in the main event loop
-        inner->Ack(status);
-        inner = NULL;
+Napi::Value ConsumerAck::Done(const Napi::CallbackInfo& info) {
+  if (info.Length() >= 1) {
+    Napi::Value ack = info[0];
+    if (ack.IsBoolean() && !ack.ToBoolean()) {
+      Done(false);
     }
+  }
+  Done(true);
+  return info.Env().Undefined();
 }
 
-}
+}  // namespace __node_rocketmq__
diff --git a/src/consumer_ack.h b/src/consumer_ack.h
index 682d85e..ff0ebd1 100644
--- a/src/consumer_ack.h
+++ b/src/consumer_ack.h
@@ -17,50 +17,31 @@
 #ifndef __ROCKETMQ_CONSUMER_ACK_H__
 #define __ROCKETMQ_CONSUMER_ACK_H__
 
-#include "consumer_ack_inner.h"
-#include <nan.h>
+#include <future>
 
-namespace __node_rocketmq__ {
-
-using v8::Context;
-using v8::Function;
-using v8::FunctionTemplate;
-using v8::Isolate;
-using v8::Local;
-using v8::Object;
-using v8::String;
-using v8::Value;
+#include <napi.h>
 
-class ConsumerAck : public Nan::ObjectWrap {
-public:
-    static NAN_MODULE_INIT(Init);
-
-private:
-    explicit ConsumerAck();
-    ~ConsumerAck();
+namespace __node_rocketmq__ {
 
-    static NAN_METHOD(New);
-    static NAN_METHOD(Done);
+class ConsumerAck : public Napi::ObjectWrap<ConsumerAck> {
+ public:
+  static Napi::Object Init(Napi::Env env, Napi::Object exports);
+  static Napi::Object NewInstance(Napi::Env env);
 
-    void Ack(CConsumeStatus status);
+  ConsumerAck(const Napi::CallbackInfo& info);
 
-    static Nan::Persistent<v8::Function> constructor;
+  void SetPromise(std::promise<bool>&& promise);
 
-public:
-    void SetInner(ConsumerAckInner* _inner)
-    {
-        inner = _inner;
-    }
+  void Done(bool ack);
+  void Done(std::exception_ptr exception);
 
-    static Nan::Persistent<v8::Function>& GetConstructor()
-    {
-        return constructor;
-    }
+ private:
+  Napi::Value Done(const Napi::CallbackInfo& info);
 
-private:
-    ConsumerAckInner* inner;
+ private:
+  std::promise<bool> promise_;
 };
 
-}
+}  // namespace __node_rocketmq__
 
 #endif
diff --git a/src/consumer_ack_inner.cpp b/src/consumer_ack_inner.cpp
deleted file mode 100644
index 47b1a07..0000000
--- a/src/consumer_ack_inner.cpp
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "consumer_ack_inner.h"
-
-namespace __node_rocketmq__ {
-
-ConsumerAckInner::ConsumerAckInner() :
-    acked(false)
-{
-    uv_cond_init(&cond);
-    uv_mutex_init(&mutex);
-}
-
-ConsumerAckInner::~ConsumerAckInner()
-{
-    uv_mutex_destroy(&mutex);
-    uv_cond_destroy(&cond);
-}
-
-void ConsumerAckInner::Ack(CConsumeStatus _status)
-{
-    uv_mutex_lock(&mutex);
-    bool _acked = acked;
-
-    if(_acked)
-    {
-        uv_mutex_unlock(&mutex);
-        return;
-    }
-
-    status = _status;
-    acked = true;
-
-    // tell `this->WaitResult()` to continue
-    uv_cond_signal(&cond);
-    uv_mutex_unlock(&mutex);
-}
-
-CConsumeStatus ConsumerAckInner::WaitResult()
-{
-    uv_mutex_lock(&mutex);
-
-    // if `cond signal` send before `WaitResult()`,
-    // `uv_cond_wait` will be blocked and never continue
-    //
-    // so we have to return result directly without `uv_cond_wait`
-    if(acked)
-    {
-        CConsumeStatus _status = status;
-        uv_mutex_unlock(&mutex);
-        return _status;
-    }
-
-    // wait for `this->Ack()` and that will emit `uv_cond_signal` to let it stop wait
-    uv_cond_wait(&cond, &mutex);
-
-    CConsumeStatus _status = status;
-    uv_mutex_unlock(&mutex);
-
-    return _status;
-}
-
-}
diff --git a/src/consumer_ack_inner.h b/src/consumer_ack_inner.h
deleted file mode 100644
index 9caa105..0000000
--- a/src/consumer_ack_inner.h
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __ROCKETMQ_CONSUMER_ACK_INNER_H__
-#define __ROCKETMQ_CONSUMER_ACK_INNER_H__
-
-#include <uv.h>
-#include <CPushConsumer.h>
-
-namespace __node_rocketmq__ {
-
-class ConsumerAckInner {
-public:
-    ConsumerAckInner();
-    ~ConsumerAckInner();
-
-    void Ack(CConsumeStatus _status);
-    CConsumeStatus WaitResult();
-
-private:
-    bool acked;
-    CConsumeStatus status;
-    uv_mutex_t mutex;
-    uv_cond_t cond;
-};
-
-}
-
-#endif
diff --git a/src/producer.cpp b/src/producer.cpp
index 3ae4047..d965db8 100644
--- a/src/producer.cpp
+++ b/src/producer.cpp
@@ -15,253 +15,276 @@
  * limitations under the License.
  */
 #include "producer.h"
-#include "workers/producer/send_message.h"
-#include "workers/producer/start_or_shutdown.h"
 
-#include <MQClientException.h>
+#include <cstddef>
+#include <exception>
 #include <string>
-using namespace std;
 
-namespace __node_rocketmq__ {
-
-#define NAN_GET_CPRODUCER() \
-    RocketMQProducer* _v8_producer = ObjectWrap::Unwrap<RocketMQProducer>(info.Holder()); \
-    CProducer* producer_ptr = _v8_producer->GetProducer();
-
-Nan::Persistent<Function> RocketMQProducer::constructor;
+#include <napi.h>
 
-RocketMQProducer::RocketMQProducer(const char* group_id, const char* instance_name)
-{
-    producer_ptr = CreateProducer(group_id);
-    if(instance_name)
-    {
-        SetProducerInstanceName(producer_ptr, instance_name);
-    }
-}
+#include <ClientRPCHook.h>
+#include <LoggerConfig.h>
+#include <MQException.h>
+#include <MQMessage.h>
+#include <SendCallback.h>
 
-RocketMQProducer::~RocketMQProducer()
-{
-    try
-    {
-        ShutdownProducer(producer_ptr);
-    }
-    catch (...)
-    {
-        //
-    }
+namespace __node_rocketmq__ {
 
-    DestroyProducer(producer_ptr);
+Napi::Object RocketMQProducer::Init(Napi::Env env, Napi::Object exports) {
+  Napi::Function func =
+      DefineClass(env,
+                  "RocketMQProducer",
+                  {
+                      InstanceMethod<&RocketMQProducer::Start>("start"),
+                      InstanceMethod<&RocketMQProducer::Shutdown>("shutdown"),
+                      InstanceMethod<&RocketMQProducer::Send>("send"),
+                      InstanceMethod<&RocketMQProducer::SetSessionCredentials>(
+                          "setSessionCredentials"),
+                  });
+
+  Napi::FunctionReference* constructor = new Napi::FunctionReference();
+  *constructor = Napi::Persistent(func);
+  env.SetInstanceData<Napi::FunctionReference>(constructor);
+
+  exports.Set("Producer", func);
+  return exports;
 }
 
-void RocketMQProducer::SetOptions(Local<Object> options)
-{
-    // set name server
-    Local<Value> _name_server_v = Nan::Get(options, Nan::New<String>("nameServer").ToLocalChecked()).ToLocalChecked();
-    if(_name_server_v->IsString())
-    {
-        Nan::Utf8String namesrv(_name_server_v);
-        SetProducerNameServerAddress(producer_ptr, *namesrv);
-    }
+RocketMQProducer::RocketMQProducer(const Napi::CallbackInfo& info)
+    : Napi::ObjectWrap<RocketMQProducer>(info), producer_("") {
+  const Napi::Value group_name = info[0];
+  if (group_name.IsString()) {
+    producer_.set_group_name(group_name.ToString());
+  }
 
-    // set group name
-    Local<Value> _group_name_v = Nan::Get(options, Nan::New<String>("groupName").ToLocalChecked()).ToLocalChecked();
-    if(_group_name_v->IsString())
-    {
-        Nan::Utf8String group_name(_group_name_v);
-        SetProducerGroupName(producer_ptr, *group_name);
-    }
+  const Napi::Value instance_name = info[1];
+  if (instance_name.IsString()) {
+    producer_.set_instance_name(instance_name.ToString());
+  }
 
-    // set log num & single log size
-    int file_num = 3;
-    int64 file_size = 104857600;
-    Local<Value> _log_file_num_v = Nan::Get(options, Nan::New<String>("logFileNum").ToLocalChecked()).ToLocalChecked();
-    Local<Value> _log_file_size_v = Nan::Get(options, Nan::New<String>("logFileSize").ToLocalChecked()).ToLocalChecked();
-    if(_log_file_num_v->IsNumber())
-    {
-        file_num = _log_file_num_v->Int32Value();
-    }
-    if(_log_file_size_v->IsNumber())
-    {
-        file_size = _log_file_size_v->Int32Value();
-    }
-    SetProducerLogFileNumAndSize(producer_ptr, file_num, file_size);
-
-    // set log level
-    Local<Value> _log_level_v = Nan::Get(options, Nan::New<String>("logLevel").ToLocalChecked()).ToLocalChecked();
-    if(_log_level_v->IsNumber())
-    {
-        int level = _log_level_v->Int32Value();
-        SetProducerLogLevel(producer_ptr, (CLogLevel)level);
-    }
-
-    // set compress level
-    Local<Value> _compress_level_v = Nan::Get(options, Nan::New<String>("compressLevel").ToLocalChecked()).ToLocalChecked();
-    if(_compress_level_v->IsNumber()) {
-        int level = _compress_level_v->Int32Value();
-        SetProducerCompressLevel(producer_ptr, level);
-    }
+  const Napi::Value options = info[2];
+  if (options.IsObject()) {
+    // try to set options
+    SetOptions(options.ToObject());
+  }
+}
 
-    // set send message timeout
-    Local<Value> _send_message_timeout_v = Nan::Get(options, Nan::New<String>("sendMessageTimeout").ToLocalChecked()).ToLocalChecked();
-    if(_send_message_timeout_v->IsNumber())
-    {
-        int timeout = _send_message_timeout_v->Int32Value();
-        SetProducerSendMsgTimeout(producer_ptr, timeout);
-    }
+RocketMQProducer::~RocketMQProducer() {
+  producer_.shutdown();
+}
 
-    // set max message size
-    Local<Value> _max_message_size_v = Nan::Get(options, Nan::New<String>("maxMessageSize").ToLocalChecked()).ToLocalChecked();
-    if(_max_message_size_v->IsNumber())
-    {
-        int size = _max_message_size_v->Int32Value();
-        SetProducerMaxMessageSize(producer_ptr, size);
+void RocketMQProducer::SetOptions(const Napi::Object& options) {
+  // set name server
+  Napi::Value name_server = options.Get("nameServer");
+  if (name_server.IsString()) {
+    producer_.set_namesrv_addr(name_server.ToString());
+  }
+
+  // set group name
+  Napi::Value group_name = options.Get("groupName");
+  if (group_name.IsString()) {
+    producer_.set_group_name(group_name.ToString());
+  }
+
+  // set max message size
+  Napi::Value max_message_size = options.Get("maxMessageSize");
+  if (max_message_size.IsNumber()) {
+    producer_.set_max_message_size(max_message_size.ToNumber());
+  }
+
+  // set compress level
+  Napi::Value compress_level = options.Get("compressLevel");
+  if (compress_level.IsNumber()) {
+    producer_.set_compress_level(compress_level.ToNumber());
+  }
+
+  // set send message timeout
+  Napi::Value send_message_timeout = options.Get("sendMessageTimeout");
+  if (send_message_timeout.IsNumber()) {
+    producer_.set_send_msg_timeout(send_message_timeout.ToNumber());
+  }
+
+  // set log level
+  Napi::Value log_level = options.Get("logLevel");
+  if (log_level.IsNumber()) {
+    int32_t level = log_level.ToNumber();
+    if (level >= 0 && level < rocketmq::LogLevel::LOG_LEVEL_LEVEL_NUM) {
+      rocketmq::GetDefaultLoggerConfig().set_level(
+          static_cast<rocketmq::LogLevel>(level));
     }
+  }
+
+  // set log directory
+  Napi::Value log_dir = options.Get("logDir");
+  if (log_dir.IsString()) {
+    rocketmq::GetDefaultLoggerConfig().set_path(log_dir.ToString());
+  }
+
+  // set log file size
+  Napi::Value log_file_size = options.Get("logFileSize");
+  if (log_file_size.IsNumber()) {
+    rocketmq::GetDefaultLoggerConfig().set_file_count(log_file_size.ToNumber());
+  }
+
+  // set log file num
+  Napi::Value log_file_num = options.Get("logFileNum");
+  if (log_file_num.IsNumber()) {
+    rocketmq::GetDefaultLoggerConfig().set_file_count(log_file_num.ToNumber());
+  }
 }
 
-NAN_MODULE_INIT(RocketMQProducer::Init)
-{
-    Local<FunctionTemplate> tpl = Nan::New<FunctionTemplate>(New);
-    tpl->SetClassName(Nan::New("RocketMQProducer").ToLocalChecked());
-    tpl->InstanceTemplate()->SetInternalFieldCount(1);
+Napi::Value RocketMQProducer::SetSessionCredentials(
+    const Napi::CallbackInfo& info) {
+  Napi::String access_key = info[0].As<Napi::String>();
+  Napi::String secret_key = info[1].As<Napi::String>();
+  Napi::String ons_channel = info[2].As<Napi::String>();
 
-    Nan::SetPrototypeMethod(tpl, "start", Start);
-    Nan::SetPrototypeMethod(tpl, "shutdown", Shutdown);
-    Nan::SetPrototypeMethod(tpl, "send", Send);
-    Nan::SetPrototypeMethod(tpl, "setSessionCredentials", SetSessionCredentials);
+  auto rpc_hook = std::make_shared<rocketmq::ClientRPCHook>(
+      rocketmq::SessionCredentials(access_key, secret_key, ons_channel));
+  producer_.setRPCHook(rpc_hook);
 
-    constructor.Reset(tpl->GetFunction());
-    Nan::Set(target, Nan::New("Producer").ToLocalChecked(), tpl->GetFunction());
+  return info.Env().Undefined();
 }
 
-NAN_METHOD(RocketMQProducer::New)
-{
-    Isolate* isolate = info.GetIsolate();
-    Local<Context> context = Context::New(isolate);
-
-    if(!info.IsConstructCall())
-    {
-        const int argc = 3;
-        Local<Value> argv[argc] = { info[0], info[1], info[2] };
-        Local<Function> _constructor = Nan::New<v8::Function>(constructor);
-        info.GetReturnValue().Set(_constructor->NewInstance(context, argc, argv).ToLocalChecked());
-        return;
-    }
-
-    Nan::Utf8String group_id(info[0]);
-    Nan::Utf8String instance_name(info[1]);
-    Local<Object> options = Nan::To<Object>(info[2]).ToLocalChecked();
-    RocketMQProducer* producer = new RocketMQProducer(*group_id, info[1]->IsNull() ? NULL : *instance_name);
+class ProducerStartWorker : public Napi::AsyncWorker {
+ public:
+  ProducerStartWorker(const Napi::Function& callback,
+                      const rocketmq::DefaultMQProducer& producer)
+      : Napi::AsyncWorker(callback), producer_(producer) {}
 
-    producer->Wrap(info.This());
+  void Execute() override { producer_.start(); }
 
-    // try to set options
-    try
-    {
-        producer->SetOptions(options);
-    }
-    catch (runtime_error e)
-    {
-        Nan::ThrowError(e.what());
-        return;
-    }
+ private:
+  rocketmq::DefaultMQProducer producer_;
+};
 
-    info.GetReturnValue().Set(info.This());
+Napi::Value RocketMQProducer::Start(const Napi::CallbackInfo& info) {
+  Napi::Function callback = info[0].As<Napi::Function>();
+  auto* worker = new ProducerStartWorker(callback, producer_);
+  worker->Queue();
+  return info.Env().Undefined();
 }
 
-NAN_METHOD(RocketMQProducer::Start)
-{
-    NAN_GET_CPRODUCER();
+class ProducerShutdownWorker : public Napi::AsyncWorker {
+ public:
+  ProducerShutdownWorker(const Napi::Function& callback,
+                         const rocketmq::DefaultMQProducer& producer)
+      : Napi::AsyncWorker(callback), producer_(producer) {}
 
-    Nan::Callback* callback = (info[0]->IsFunction()) ?
-        new Nan::Callback(Nan::To<Function>(info[0]).ToLocalChecked()) :
-        NULL;
+  void Execute() override { producer_.shutdown(); }
 
-    Nan::AsyncQueueWorker(new ProducerStartOrShutdownWorker(callback, producer_ptr, ProducerWorkerType::START_PRODUCER));
-}
-
-NAN_METHOD(RocketMQProducer::Shutdown)
-{
-    NAN_GET_CPRODUCER();
-
-    Nan::Callback* callback = (info[0]->IsFunction()) ?
-        new Nan::Callback(Nan::To<Function>(info[0]).ToLocalChecked()) :
-        NULL;
+ private:
+  rocketmq::DefaultMQProducer producer_;
+};
 
-    Nan::AsyncQueueWorker(new ProducerStartOrShutdownWorker(callback, producer_ptr, ProducerWorkerType::SHUTDOWN_PRODUCER));
+Napi::Value RocketMQProducer::Shutdown(const Napi::CallbackInfo& info) {
+  Napi::Function callback = info[0].As<Napi::Function>();
+  auto* worker = new ProducerShutdownWorker(callback, producer_);
+  worker->Queue();
+  return info.Env().Undefined();
 }
 
-NAN_METHOD(RocketMQProducer::SetSessionCredentials)
-{
-    NAN_GET_CPRODUCER();
-
-    Nan::Utf8String access_key(info[0]);
-    Nan::Utf8String secret_key(info[1]);
-    Nan::Utf8String ons_channel(info[2]);
-
-    int ret;
-    try
-    {
-        ret = SetProducerSessionCredentials(producer_ptr, *access_key, *secret_key, *ons_channel);
+struct ResultOrException {
+  std::unique_ptr<rocketmq::SendResult> result;
+  std::exception_ptr exception;
+};
+
+void CallProducerSendJsCallback(Napi::Env env,
+                                Napi::Function callback,
+                                std::nullptr_t*,
+                                ResultOrException* data) {
+  std::unique_ptr<ResultOrException> data_guard(data);
+  if (env != nullptr) {
+    if (callback != nullptr) {
+      if (data->exception) {
+        try {
+          std::rethrow_exception(data->exception);
+        } catch (const std::exception& e) {
+          callback.Call(Napi::Object::New(callback.Env()),
+                        {Napi::Error::New(env, e.what()).Value()});
+        }
+      } else {
+        callback.Call(Napi::Object::New(callback.Env()),
+                      {env.Undefined(),
+                       Napi::Number::New(env, data->result->send_status()),
+                       Napi::String::New(env, data->result->msg_id()),
+                       Napi::Number::New(env, data->result->queue_offset())});
+      }
     }
-    catch(runtime_error e)
-    {
-        Nan::ThrowError(e.what());
+  }
+}
+
+class ProducerSendCallback : public rocketmq::AutoDeleteSendCallback {
+ public:
+  ProducerSendCallback(Napi::Env&& env, Napi::Function&& callback)
+      : callback_(
+            Callback::New(env, callback, "RocketMQ Send Callback", 0, 1)) {}
+
+  ~ProducerSendCallback() { callback_.Release(); }
+
+  void onSuccess(rocketmq::SendResult& send_result) override {
+    auto* data =
+        new ResultOrException{std::unique_ptr<rocketmq::SendResult>(
+                                  new rocketmq::SendResult(send_result)),
+                              nullptr};
+    napi_status status = callback_.BlockingCall(data);
+    if (status != napi_ok) {
+      // TODO: Handle error
+      std::exit(-1);
     }
-    catch(std::exception& e)
-    {
-        Nan::ThrowError(e.what());
+  }
+
+  void onException(rocketmq::MQException& exception) noexcept override {
+    auto* data =
+        new ResultOrException{nullptr, std::make_exception_ptr(exception)};
+    napi_status status = callback_.BlockingCall(data);
+    if (status != napi_ok) {
+      // TODO: Handle error
+      std::exit(-1);
     }
-    catch(rocketmq::MQException& e)
-    {
-        Nan::ThrowError(e.what());
+  }
+
+ private:
+  using Callback = Napi::TypedThreadSafeFunction<std::nullptr_t,
+                                                 ResultOrException,
+                                                 &CallProducerSendJsCallback>;
+
+  Callback callback_;
+};
+
+Napi::Value RocketMQProducer::Send(const Napi::CallbackInfo& info) {
+  rocketmq::MQMessage message = [&]() {
+    Napi::String topic = info[0].As<Napi::String>();
+    Napi::Value body = info[1];
+    if (body.IsString()) {
+      return rocketmq::MQMessage(topic, body.ToString());
+    } else {
+      Napi::Buffer<char> buffer = body.As<Napi::Buffer<char>>();
+      return rocketmq::MQMessage(topic,
+                                 std::string(buffer.Data(), buffer.Length()));
     }
-    info.GetReturnValue().Set(ret);
-}
-
-NAN_METHOD(RocketMQProducer::Send)
-{
-    Nan::Utf8String topic(info[0]);
-    Local<Object> options = Nan::To<Object>(info[2]).ToLocalChecked();
-
-    CMessage* msg = CreateMessage(*topic);
+  }();
 
-    Local<Value> _tags_to_be_checked = Nan::Get(options, Nan::New<String>("tags").ToLocalChecked()).ToLocalChecked();
-    Local<Value> _keys_to_be_checked = Nan::Get(options, Nan::New<String>("keys").ToLocalChecked()).ToLocalChecked();
+  const Napi::Value options_v = info[2];
+  if (options_v.IsObject()) {
+    const Napi::Object options = options_v.ToObject();
 
-    if(_tags_to_be_checked->IsString())
-    {
-        Nan::Utf8String tags(_tags_to_be_checked);
-        SetMessageTags(msg, *tags);
+    Napi::Value tags = options.Get("tags");
+    if (tags.IsString()) {
+      message.set_tags(tags.ToString());
     }
 
-    if(_keys_to_be_checked->IsString())
-    {
-        Nan::Utf8String keys(_keys_to_be_checked);
-        SetMessageKeys(msg, *keys);
+    Napi::Value keys = options.Get("keys");
+    if (keys.IsString()) {
+      message.set_keys(keys.ToString());
     }
+  }
 
-    // set message body:
-    //   1. if it's a string, call `SetMessageBody`;
-    //   2. if it's a buffer, call `SetByteMessageBody`.
-    if(info[1]->IsString())
-    {
-        Nan::Utf8String body(info[1]);
-        SetMessageBody(msg, *body);
-    }
-    else
-    {
-        Local<Object> node_buff_object = Nan::To<Object>(info[1]).ToLocalChecked();
-        unsigned int length = node::Buffer::Length(node_buff_object);
-        const char* buff = node::Buffer::Data(node_buff_object);
-        SetByteMessageBody(msg, buff, length);
-    }
+  auto* send_callback =
+      new ProducerSendCallback(info.Env(), info[3].As<Napi::Function>());
+  producer_.send(message, send_callback);
 
-    Nan::Callback* callback = (info[3]->IsFunction()) ?
-        new Nan::Callback(Nan::To<Function>(info[3]).ToLocalChecked()) :
-        NULL;
-
-    RocketMQProducer* producer = ObjectWrap::Unwrap<RocketMQProducer>(info.Holder());
-    Nan::AsyncQueueWorker(new ProducerSendMessageWorker(callback, producer, msg));
+  return info.Env().Undefined();
 }
 
-}
+}  // namespace __node_rocketmq__
diff --git a/src/producer.h b/src/producer.h
index d20cd29..780efa4 100644
--- a/src/producer.h
+++ b/src/producer.h
@@ -17,46 +17,34 @@
 #ifndef __ROCKETMQ_PRODUCER_H__
 #define __ROCKETMQ_PRODUCER_H__
 
-#include <CProducer.h>
-#include <nan.h>
+#include <napi.h>
 
-namespace __node_rocketmq__ {
+#include <DefaultMQProducer.h>
 
-using v8::Context;
-using v8::Function;
-using v8::FunctionTemplate;
-using v8::Isolate;
-using v8::Local;
-using v8::Object;
-using v8::String;
-using v8::Value;
+namespace __node_rocketmq__ {
 
-class RocketMQProducer : public Nan::ObjectWrap {
-public:
-    static NAN_MODULE_INIT(Init);
+class RocketMQProducer : public Napi::ObjectWrap<RocketMQProducer> {
+ public:
+  static Napi::Object Init(Napi::Env env, Napi::Object exports);
 
-public:
-    CProducer* GetProducer() { return producer_ptr; }
+  RocketMQProducer(const Napi::CallbackInfo& info);
+  ~RocketMQProducer();
 
-private:
-    explicit RocketMQProducer(const char* group_id, const char* instance_name);
-    ~RocketMQProducer();
+ private:
+  Napi::Value SetSessionCredentials(const Napi::CallbackInfo& info);
 
-    static NAN_METHOD(New);
-    static NAN_METHOD(Start);
-    static NAN_METHOD(Shutdown);
-    static NAN_METHOD(Send);
-    static NAN_METHOD(SetSessionCredentials);
+  Napi::Value Start(const Napi::CallbackInfo& info);
+  Napi::Value Shutdown(const Napi::CallbackInfo& info);
 
-    static Nan::Persistent<Function> constructor;
+  Napi::Value Send(const Napi::CallbackInfo& info);
 
-private:
-    void SetOptions(Local<Object> options);
+ private:
+  void SetOptions(const Napi::Object& options);
 
-private:
-    CProducer* producer_ptr;
+ private:
+  rocketmq::DefaultMQProducer producer_;
 };
 
-}
+}  // namespace __node_rocketmq__
 
 #endif
diff --git a/src/push_consumer.cpp b/src/push_consumer.cpp
index 074a64d..3e207a7 100644
--- a/src/push_consumer.cpp
+++ b/src/push_consumer.cpp
@@ -14,381 +14,258 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include <map>
 #include "push_consumer.h"
-#include "consumer_ack.h"
-#include "workers/push_consumer/start_or_shutdown.h"
-
-using namespace std;
-
-namespace __node_rocketmq__ {
-
-struct MessageHandlerParam
-{
-    RocketMQPushConsumer* consumer;
-    ConsumerAckInner* ack;
-    CMessageExt* msg;
-};
-char message_handler_param_keys[5][8] = { "topic", "tags", "keys", "body", "msgId" };
-
-uv_mutex_t _get_msg_ext_column_lock;
 
-map<CPushConsumer*, RocketMQPushConsumer*> _push_consumer_map;
+#include <exception>
+#include <future>
 
-#define NAN_GET_CPUSHCONSUMER() \
-    RocketMQPushConsumer* _v8_consumer = ObjectWrap::Unwrap<RocketMQPushConsumer>(info.Holder()); \
-    CPushConsumer* consumer_ptr = _v8_consumer->GetConsumer();
+#include <napi.h>
 
-Nan::Persistent<Function> RocketMQPushConsumer::constructor;
+#include <ClientRPCHook.h>
+#include <LoggerConfig.h>
+#include <MQMessageListener.h>
 
-RocketMQPushConsumer::RocketMQPushConsumer(const char* group_id, const char* instance_name) :
-    consumer_ptr(NULL)
-{
-    consumer_ptr = CreatePushConsumer(group_id);
-
-    if(instance_name)
-    {
-        SetPushConsumerInstanceName(consumer_ptr, instance_name);
-    }
-
-    _push_consumer_map[consumer_ptr] = this;
+#include "consumer_ack.h"
 
-    RegisterMessageCallback(consumer_ptr, RocketMQPushConsumer::OnMessage);
-}
+using namespace std;
 
-RocketMQPushConsumer::~RocketMQPushConsumer()
-{
-    try
-    {
-        ShutdownPushConsumer(consumer_ptr);
-        auto it = _push_consumer_map.find(consumer_ptr);
-        if(it != _push_consumer_map.end())
-        {
-            _push_consumer_map.erase(consumer_ptr);
-        }
-    }
-    catch(...)
-    {
-        //
-    }
+namespace __node_rocketmq__ {
 
-    DestroyPushConsumer(consumer_ptr);
-    consumer_ptr = NULL;
+Napi::Object RocketMQPushConsumer::Init(Napi::Env env, Napi::Object exports) {
+  Napi::Function func = DefineClass(
+      env,
+      "RocketMQPushConsumer",
+      {
+          InstanceMethod<&RocketMQPushConsumer::Start>("start"),
+          InstanceMethod<&RocketMQPushConsumer::Shutdown>("shutdown"),
+          InstanceMethod<&RocketMQPushConsumer::Subscribe>("subscribe"),
+          InstanceMethod<&RocketMQPushConsumer::SetListener>("setListener"),
+          InstanceMethod<&RocketMQPushConsumer::SetSessionCredentials>(
+              "setSessionCredentials"),
+      });
+
+  Napi::FunctionReference* constructor = new Napi::FunctionReference();
+  *constructor = Napi::Persistent(func);
+  env.SetInstanceData<Napi::FunctionReference>(constructor);
+
+  exports.Set("PushConsumer", func);
+  return exports;
 }
 
-void RocketMQPushConsumer::SetOptions(Local<Object> options)
-{
-    // set name server
-    Local<Value> _name_server_v = Nan::Get(options, Nan::New<String>("nameServer").ToLocalChecked()).ToLocalChecked();
-    if(_name_server_v->IsString())
-    {
-        Nan::Utf8String namesrv(_name_server_v);
-        SetPushConsumerNameServerAddress(consumer_ptr, *namesrv);
-    }
-
-    // set thread count
-    Local<Value> _thread_count_v = Nan::Get(options, Nan::New<String>("threadCount").ToLocalChecked()).ToLocalChecked();
-    if(_thread_count_v->IsNumber())
-    {
-        int thread_count = Nan::To<int32_t>(_thread_count_v).FromJust();
-        if(thread_count > 0)
-        {
-            SetPushConsumerThreadCount(consumer_ptr, thread_count);
-        }
-    }
+RocketMQPushConsumer::RocketMQPushConsumer(const Napi::CallbackInfo& info)
+    : Napi::ObjectWrap<RocketMQPushConsumer>(info), consumer_("") {
+  const Napi::Value group_name = info[0];
+  if (group_name.IsString()) {
+    consumer_.set_group_name(group_name.ToString());
+  }
 
-    // set message batch max size
-    Local<Value> _max_batch_size_v = Nan::Get(options, Nan::New<String>("maxBatchSize").ToLocalChecked()).ToLocalChecked();
-    if(_max_batch_size_v->IsNumber())
-    {
-        int max_batch_size = Nan::To<int32_t>(_max_batch_size_v).FromJust();
-        if(max_batch_size > 0)
-        {
-            SetPushConsumerMessageBatchMaxSize(consumer_ptr, max_batch_size);
-        }
-    }
+  const Napi::Value instance_name = info[1];
+  if (instance_name.IsString()) {
+    consumer_.set_instance_name(instance_name.ToString());
+  }
 
-    // set log num & single log size
-    int file_num = 3;
-    int64 file_size = 104857600;
-    Local<Value> _log_file_num_v = Nan::Get(options, Nan::New<String>("logFileNum").ToLocalChecked()).ToLocalChecked();
-    Local<Value> _log_file_size_v = Nan::Get(options, Nan::New<String>("logFileSize").ToLocalChecked()).ToLocalChecked();
-    if(_log_file_num_v->IsNumber())
-    {
-        file_num = _log_file_num_v->Int32Value();
-    }
-    if(_log_file_size_v->IsNumber())
-    {
-        file_size = _log_file_size_v->Int32Value();
-    }
-    SetPushConsumerLogFileNumAndSize(consumer_ptr, file_num, file_size);
-
-    // set log level
-    Local<Value> _log_level_v = Nan::Get(options, Nan::New<String>("logLevel").ToLocalChecked()).ToLocalChecked();
-    if(_log_level_v->IsNumber())
-    {
-        int level = _log_level_v->Int32Value();
-        SetPushConsumerLogLevel(consumer_ptr, (CLogLevel) level);
-    }
+  const Napi::Value options = info[2];
+  if (options.IsObject()) {
+    // try to set options
+    SetOptions(options.ToObject());
+  }
 }
 
-NAN_MODULE_INIT(RocketMQPushConsumer::Init)
-{
-    uv_mutex_init(&_get_msg_ext_column_lock);
-    Local<FunctionTemplate> tpl = Nan::New<FunctionTemplate>(New);
-    tpl->SetClassName(Nan::New("RocketMQPushConsumer").ToLocalChecked());
-    tpl->InstanceTemplate()->SetInternalFieldCount(1);
-
-    Nan::SetPrototypeMethod(tpl, "start", Start);
-    Nan::SetPrototypeMethod(tpl, "shutdown", Shutdown);
-    Nan::SetPrototypeMethod(tpl, "subscribe", Subscribe);
-    Nan::SetPrototypeMethod(tpl, "setListener", SetListener);
-    Nan::SetPrototypeMethod(tpl, "setSessionCredentials", SetSessionCredentials);
-
-    constructor.Reset(tpl->GetFunction());
-    Nan::Set(target, Nan::New("PushConsumer").ToLocalChecked(), tpl->GetFunction());
+RocketMQPushConsumer::~RocketMQPushConsumer() {
+  consumer_.shutdown();
 }
 
-NAN_METHOD(RocketMQPushConsumer::New)
-{
-    Isolate* isolate = info.GetIsolate();
-    Local<Context> context = Context::New(isolate);
-
-    if(!info.IsConstructCall())
-    {
-        const int argc = 3;
-        Local<Value> argv[argc] = { info[0], info[1], info[2] };
-        Local<Function> _constructor = Nan::New<v8::Function>(constructor);
-        info.GetReturnValue().Set(_constructor->NewInstance(context, argc, argv).ToLocalChecked());
-        return;
-    }
-
-    Nan::Utf8String v8_group_id(info[0]);
-    Nan::Utf8String v8_instance_name(info[1]);
-    string group_id = *v8_group_id;
-    string instance_name = *v8_instance_name;
-    Local<Object> options = Nan::To<Object>(info[2]).ToLocalChecked();
-    RocketMQPushConsumer* consumer = new RocketMQPushConsumer(group_id.c_str(), info[1]->IsNull() ? NULL : instance_name.c_str());
-
-    consumer->Wrap(info.This());
-
-    // try to set options
-    try
-    {
-        consumer->SetOptions(options);
-    }
-    catch(const runtime_error e)
-    {
-        Nan::ThrowError(e.what());
-        return;
+void RocketMQPushConsumer::SetOptions(const Napi::Object& options) {
+  // set name server
+  Napi::Value name_server = options.Get("nameServer");
+  if (name_server.IsString()) {
+    consumer_.set_namesrv_addr(name_server.ToString());
+  }
+
+  // set group name
+  Napi::Value group_name = options.Get("groupName");
+  if (group_name.IsString()) {
+    consumer_.set_group_name(group_name.ToString());
+  }
+
+  // set thread count
+  Napi::Value thread_count = options.Get("threadCount");
+  if (thread_count.IsNumber()) {
+    consumer_.set_consume_thread_nums(thread_count.ToNumber());
+  }
+
+  // set message batch max size
+  Napi::Value max_batch_size = options.Get("maxBatchSize");
+  if (max_batch_size.IsNumber()) {
+    consumer_.set_consume_message_batch_max_size(max_batch_size.ToNumber());
+  }
+
+  // set log level
+  Napi::Value log_level = options.Get("logLevel");
+  if (log_level.IsNumber()) {
+    int32_t level = log_level.ToNumber();
+    if (level >= 0 && level < rocketmq::LogLevel::LOG_LEVEL_LEVEL_NUM) {
+      rocketmq::GetDefaultLoggerConfig().set_level(
+          static_cast<rocketmq::LogLevel>(level));
     }
-    catch(const std::exception& e)
-    {
-        Nan::ThrowError(e.what());
-        return;
-    }
-
-    info.GetReturnValue().Set(info.This());
-}
-
-NAN_METHOD(RocketMQPushConsumer::Start)
-{
-    NAN_GET_CPUSHCONSUMER();
-
-    Nan::Callback* callback = (info[0]->IsFunction()) ?
-        new Nan::Callback(Nan::To<Function>(info[0]).ToLocalChecked()) :
-        NULL;
-
-    Nan::AsyncQueueWorker(new PushConsumerStartOrShutdownWorker(callback, consumer_ptr, PushConsumerWorkerType::START_PUSH_CONSUMER));
+  }
+
+  // set log directory
+  Napi::Value log_dir = options.Get("logDir");
+  if (log_dir.IsString()) {
+    rocketmq::GetDefaultLoggerConfig().set_path(log_dir.ToString());
+  }
+
+  // set log file size
+  Napi::Value log_file_size = options.Get("logFileSize");
+  if (log_file_size.IsNumber()) {
+    rocketmq::GetDefaultLoggerConfig().set_file_count(log_file_size.ToNumber());
+  }
+
+  // set log file num
+  Napi::Value log_file_num = options.Get("logFileNum");
+  if (log_file_num.IsNumber()) {
+    rocketmq::GetDefaultLoggerConfig().set_file_count(log_file_num.ToNumber());
+  }
 }
 
-NAN_METHOD(RocketMQPushConsumer::Shutdown)
-{
-    NAN_GET_CPUSHCONSUMER();
+Napi::Value RocketMQPushConsumer::SetSessionCredentials(
+    const Napi::CallbackInfo& info) {
+  Napi::String access_key = info[0].As<Napi::String>();
+  Napi::String secret_key = info[1].As<Napi::String>();
+  Napi::String ons_channel = info[2].As<Napi::String>();
 
-    Nan::Callback* callback = (info[0]->IsFunction()) ?
-        new Nan::Callback(Nan::To<Function>(info[0]).ToLocalChecked()) :
-        NULL;
+  auto rpc_hook = std::make_shared<rocketmq::ClientRPCHook>(
+      rocketmq::SessionCredentials(access_key, secret_key, ons_channel));
+  consumer_.setRPCHook(rpc_hook);
 
-    Nan::AsyncQueueWorker(new PushConsumerStartOrShutdownWorker(callback, consumer_ptr, PushConsumerWorkerType::SHUTDOWN_PUSH_CONSUMER));
+  return info.Env().Undefined();
 }
 
-NAN_METHOD(RocketMQPushConsumer::Subscribe)
-{
-    NAN_GET_CPUSHCONSUMER();
+class ConsumerStartWorker : public Napi::AsyncWorker {
+ public:
+  ConsumerStartWorker(const Napi::Function& callback,
+                      const rocketmq::DefaultMQPushConsumer& consumer)
+      : Napi::AsyncWorker(callback), consumer_(consumer) {}
 
-    Nan::Utf8String v8_topic(info[0]);
-    Nan::Utf8String v8_expression(info[1]);
-    string topic = *v8_topic;
-    string expression = *v8_expression;
+  void Execute() override { consumer_.start(); }
 
-    int ret;
-    try
-    {
-        ret = ::Subscribe(consumer_ptr, topic.c_str(), expression.c_str());
-    }
-    catch(const runtime_error e)
-    {
-        Nan::ThrowError(e.what());
-        return;
-    }
-    catch(const std::exception& e)
-    {
-        Nan::ThrowError(e.what());
-        return;
-    }
-
-    info.GetReturnValue().Set(ret);
-}
-
-NAN_METHOD(RocketMQPushConsumer::SetListener)
-{
-    RocketMQPushConsumer* consumer = ObjectWrap::Unwrap<RocketMQPushConsumer>(info.Holder());
-    if(!consumer->listener_func.IsEmpty())
-    {
-        consumer->listener_func.Reset();
-    }
+ private:
+  rocketmq::DefaultMQPushConsumer consumer_;
+};
 
-    consumer->listener_func.Reset(Nan::To<Function>(info[0]).ToLocalChecked());
+Napi::Value RocketMQPushConsumer::Start(const Napi::CallbackInfo& info) {
+  Napi::Function callback = info[0].As<Napi::Function>();
+  auto* worker = new ConsumerStartWorker(callback, consumer_);
+  worker->Queue();
+  return info.Env().Undefined();
 }
 
-NAN_METHOD(RocketMQPushConsumer::SetSessionCredentials)
-{
-    NAN_GET_CPUSHCONSUMER();
+class ConsumerShutdownWorker : public Napi::AsyncWorker {
+ public:
+  ConsumerShutdownWorker(const Napi::Function& callback,
+                         const rocketmq::DefaultMQPushConsumer& consumer)
+      : Napi::AsyncWorker(callback), consumer_(consumer) {}
 
-    Nan::Utf8String access_key(info[0]);
-    Nan::Utf8String secret_key(info[1]);
-    Nan::Utf8String ons_channel(info[2]);
+  void Execute() override { consumer_.shutdown(); }
 
-    int ret;
-    try
-    {
-        ret = SetPushConsumerSessionCredentials(consumer_ptr, *access_key, *secret_key, *ons_channel);
-    }
-    catch(const runtime_error e)
-    {
-        Nan::ThrowError(e.what());
-        return;
-    }
-    catch(const std::exception& e)
-    {
-        Nan::ThrowError(e.what());
-        return;
-    }
+ private:
+  rocketmq::DefaultMQPushConsumer consumer_;
+};
 
-    info.GetReturnValue().Set(ret);
+Napi::Value RocketMQPushConsumer::Shutdown(const Napi::CallbackInfo& info) {
+  Napi::Function callback = info[0].As<Napi::Function>();
+  auto* worker = new ConsumerShutdownWorker(callback, consumer_);
+  worker->Queue();
+  return info.Env().Undefined();
 }
 
-string RocketMQPushConsumer::GetMessageColumn(char* name, CMessageExt* msg)
-{
-    const char* orig = NULL;
-
-    uv_mutex_lock(&_get_msg_ext_column_lock);
-    switch(name[0])
-    {
-    // topic / tags
-    case 't':
-        orig = name[1] == 'o' ? GetMessageTopic(msg) : GetMessageTags(msg);
-        break;
-
-    // keys
-    case 'k':
-        orig = GetMessageKeys(msg);
-        break;
-
-    // body
-    case 'b':
-        orig = GetMessageBody(msg);
-        break;
-
-    // msgId
-    case 'm':
-        orig = GetMessageId(msg);
-        break;
-
-    default:
-        orig = NULL;
-        break;
-    }
+Napi::Value RocketMQPushConsumer::Subscribe(const Napi::CallbackInfo& info) {
+  Napi::String topic = info[0].As<Napi::String>();
+  Napi::String expression = info[1].As<Napi::String>();
 
-    uv_mutex_unlock(&_get_msg_ext_column_lock);
+  consumer_.subscribe(topic, expression);
 
-    if(!orig) return "";
-    return orig;
+  return info.Env().Undefined();
 }
 
-void close_async_done(uv_handle_t* handle)
-{
-    free(handle);
-}
+struct MessageAndPromise {
+  rocketmq::MQMessageExt message;
+  std::promise<bool> promise;
+};
 
-void RocketMQPushConsumer::HandleMessageInEventLoop(uv_async_t* async)
-{
-    Nan::HandleScope scope;
-
-    Isolate* isolate = Isolate::GetCurrent();
-    Local<Context> context = isolate->GetCurrentContext();
-
-    MessageHandlerParam* param = (MessageHandlerParam*)(async->data);
-    RocketMQPushConsumer* consumer = param->consumer;
-    ConsumerAckInner* ack_inner = param->ack;
-    CMessageExt* msg = param->msg;
-
-    // create the JavaScript ack object and then set inner ack object
-    Local<Function> cons = Nan::New<Function>(ConsumerAck::GetConstructor());
-    Local<Object> ack_obj = cons->NewInstance(context, 0, 0).ToLocalChecked();
-    ConsumerAck* ack = ObjectWrap::Unwrap<ConsumerAck>(ack_obj);
-    ack->SetInner(ack_inner);
-
-    // TODO: const char *GetMessageProperty(CMessageExt *msgExt, const char *key);
-    Local<Object> result = Nan::New<Object>();
-    for(int i = 0; i < 5; i++)
-    {
-        Nan::Set(
-            result,
-            Nan::New(message_handler_param_keys[i]).ToLocalChecked(),
-            Nan::New(RocketMQPushConsumer::GetMessageColumn(message_handler_param_keys[i], msg)).ToLocalChecked());
+void CallConsumerMessageJsListener(Napi::Env env,
+                                   Napi::Function listener,
+                                   std::nullptr_t*,
+                                   MessageAndPromise* data) {
+  if (env != nullptr) {
+    if (listener != nullptr) {
+      Napi::Object message = Napi::Object::New(env);
+      message.Set("topic", data->message.topic());
+      message.Set("tags", data->message.tags());
+      message.Set("keys", data->message.keys());
+      message.Set("body", data->message.body());
+      message.Set("msgId", data->message.msg_id());
+
+      Napi::Object ack = ConsumerAck::NewInstance(env);
+      ConsumerAck* consumer_ack = Napi::ObjectWrap<ConsumerAck>::Unwrap(ack);
+      consumer_ack->SetPromise(std::move(data->promise));
+
+      try {
+        listener.Call(Napi::Object::New(listener.Env()), {message, ack});
+      } catch (const Napi::Error& e) {
+        try {
+          consumer_ack->Done(std::current_exception());
+        } catch (const std::future_error&) {
+          // ignore
+        }
+      }
+      return;
     }
-
-    Local<Value> argv[2] = {
-        result,
-        ack_obj
-    };
-    Nan::Callback* callback = consumer->GetListenerFunction();
-    callback->Call(2, argv);
-
-    uv_close((uv_handle_t*)async, close_async_done);
+  }
+  data->promise.set_value(false);
 }
 
-int RocketMQPushConsumer::OnMessage(CPushConsumer* consumer_ptr, CMessageExt* msg_ext)
-{
-    RocketMQPushConsumer* consumer = _push_consumer_map[consumer_ptr];
-    if (!consumer)
-    {
-        // TODO: error handle
-        return CConsumeStatus::E_RECONSUME_LATER;
+class ConsumerMessageListener : public rocketmq::MessageListenerConcurrently {
+ public:
+  ConsumerMessageListener(Napi::Env& env, Napi::Function&& callback)
+      : listener_(
+            Listener::New(env, callback, "RocketMQ Message Listener", 0, 1)) {}
+
+  ~ConsumerMessageListener() { listener_.Release(); }
+
+  rocketmq::ConsumeStatus consumeMessage(
+      std::vector<rocketmq::MQMessageExt>& msgs) override {
+    for (auto& msg : msgs) {
+      MessageAndPromise data{msg, std::promise<bool>()};
+      auto future = data.promise.get_future();
+      listener_.BlockingCall(&data);
+      try {
+        if (!future.get()) {
+          return rocketmq::ConsumeStatus::RECONSUME_LATER;
+        }
+      } catch (const std::exception& e) {
+        return rocketmq::ConsumeStatus::RECONSUME_LATER;
+      }
     }
+    return rocketmq::ConsumeStatus::CONSUME_SUCCESS;
+  };
 
-    ConsumerAckInner ack_inner;
-
-    // create async parameter
-    MessageHandlerParam param;
-    param.consumer = consumer;
-    param.ack = &ack_inner;
-    param.msg = msg_ext;
+ private:
+  using Listener =
+      Napi::TypedThreadSafeFunction<std::nullptr_t,
+                                    MessageAndPromise,
+                                    &CallConsumerMessageJsListener>;
 
-    // create a new async handler and bind with `RocketMQPushConsumer::HandleMessageInEventLoop`
-    uv_async_t* async = (uv_async_t*)malloc(sizeof(uv_async_t));
-    uv_async_init(uv_default_loop(), async, RocketMQPushConsumer::HandleMessageInEventLoop);
-    async->data = (void*)(&param);
-
-    // send async handler
-    uv_async_send(async);
-
-    // wait for result
-    CConsumeStatus status = ack_inner.WaitResult();
+  Listener listener_;
+};
 
-    return status;
+Napi::Value RocketMQPushConsumer::SetListener(const Napi::CallbackInfo& info) {
+  Napi::Env env = info.Env();
+  listener_.reset(
+      new ConsumerMessageListener(env, info[0].As<Napi::Function>()));
+  consumer_.registerMessageListener(listener_.get());
+  return env.Undefined();
 }
 
-}
+}  // namespace __node_rocketmq__
diff --git a/src/push_consumer.h b/src/push_consumer.h
index 6250d4e..47b3d14 100644
--- a/src/push_consumer.h
+++ b/src/push_consumer.h
@@ -17,62 +17,40 @@
 #ifndef __ROCKETMQ_PUSH_CONSUMER_H__
 #define __ROCKETMQ_PUSH_CONSUMER_H__
 
-#include <CPushConsumer.h>
-#include <uv.h>
-#include <nan.h>
 #include <string>
 
-namespace __node_rocketmq__ {
+#include <napi.h>
+
+#include <DefaultMQPushConsumer.h>
 
-using v8::Context;
-using v8::Function;
-using v8::FunctionTemplate;
-using v8::Isolate;
-using v8::Local;
-using v8::Object;
-using v8::String;
-using v8::Value;
+namespace __node_rocketmq__ {
 
-class RocketMQPushConsumer : public Nan::ObjectWrap {
-public:
-    static NAN_MODULE_INIT(Init);
-    static int OnMessage(CPushConsumer* consumer_ptr, CMessageExt* msg_ext);
-    static std::string GetMessageColumn(char* name, CMessageExt* msg);
+class ConsumerMessageListener;
 
-private:
-    explicit RocketMQPushConsumer(const char* group_id, const char* instance_name);
-    ~RocketMQPushConsumer();
+class RocketMQPushConsumer : public Napi::ObjectWrap<RocketMQPushConsumer> {
+ public:
+  static Napi::Object Init(Napi::Env env, Napi::Object exports);
 
-    static NAN_METHOD(New);
-    static NAN_METHOD(Start);
-    static NAN_METHOD(Shutdown);
-    static NAN_METHOD(Subscribe);
-    static NAN_METHOD(SetListener);
-    static NAN_METHOD(SetSessionCredentials);
+  RocketMQPushConsumer(const Napi::CallbackInfo& info);
+  ~RocketMQPushConsumer();
 
-    static Nan::Persistent<v8::Function> constructor;
+ private:
+  Napi::Value SetSessionCredentials(const Napi::CallbackInfo& info);
 
-    void SetOptions(Local<Object>);
-    static void HandleMessageInEventLoop(uv_async_t* async);
+  Napi::Value Start(const Napi::CallbackInfo& info);
+  Napi::Value Shutdown(const Napi::CallbackInfo& info);
 
-protected:
-    CPushConsumer* GetConsumer()
-    {
-        return consumer_ptr;
-    }
+  Napi::Value Subscribe(const Napi::CallbackInfo& info);
+  Napi::Value SetListener(const Napi::CallbackInfo& info);
 
-    Nan::Callback* GetListenerFunction()
-    {
-        Nan::Callback* cb;
-        cb = &listener_func;
-        return cb;
-    }
+ private:
+  void SetOptions(const Napi::Object& options);
 
-private:
-    CPushConsumer* consumer_ptr;
-    Nan::Callback listener_func;
+ private:
+  rocketmq::DefaultMQPushConsumer consumer_;
+  std::unique_ptr<ConsumerMessageListener> listener_;
 };
 
-}
+}  // namespace __node_rocketmq__
 
 #endif
diff --git a/src/rocketmq.cpp b/src/rocketmq.cpp
index 3b426fb..63fd41f 100644
--- a/src/rocketmq.cpp
+++ b/src/rocketmq.cpp
@@ -14,35 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include <nan.h>
+#include <napi.h>
 
+#include "consumer_ack.h"
 #include "producer.h"
 #include "push_consumer.h"
-#include "consumer_ack.h"
 
 namespace __node_rocketmq__ {
 
-#if defined(__APPLE__)
-uv_lib_t lib;
-
-NAN_METHOD(DLOpen)
-{
-    Nan::Utf8String filename(info[0]);
-    uv_dlopen(*filename, &lib);
+Napi::Object Init(Napi::Env env, Napi::Object exports) {
+  RocketMQProducer::Init(env, exports);
+  RocketMQPushConsumer::Init(env, exports);
+  ConsumerAck::Init(env, exports);
+  return exports;
 }
-#endif
 
-NAN_MODULE_INIT(Init)
-{
-    RocketMQProducer::Init(target);
-    RocketMQPushConsumer::Init(target);
-    ConsumerAck::Init(target);
+NODE_API_MODULE(rocketmq, Init)
 
-#if defined(__APPLE__)
-    Nan::Set(target, Nan::New("macosDLOpen").ToLocalChecked(), Nan::New<v8::FunctionTemplate>(DLOpen)->GetFunction());
-#endif
-}
-
-NODE_MODULE(rocketmq, Init)
-
-}
+}  // namespace __node_rocketmq__
diff --git a/src/workers/producer/send_message.h b/src/workers/producer/send_message.h
deleted file mode 100644
index 6974e5a..0000000
--- a/src/workers/producer/send_message.h
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __ROCKETMQ_SEND_MESSAGE_H__
-#define __ROCKETMQ_SEND_MESSAGE_H__
-
-#include <nan.h>
-#include <CProducer.h>
-#include <MQClientException.h>
-
-namespace __node_rocketmq__ {
-
-using namespace std;
-
-class ProducerSendMessageWorker : public Nan::AsyncWorker {
-public:
-    ProducerSendMessageWorker(Nan::Callback* callback, RocketMQProducer* producer, CMessage* msg) :
-        AsyncWorker(callback),
-        msg(msg),
-        producer(producer)
-    {
-    }
-
-    ~ProducerSendMessageWorker()
-    {
-        DestroyMessage(msg);
-    }
-
-    void Execute()
-    {
-        try
-        {
-            SendMessageSync(producer->GetProducer(), msg, &send_ret);
-        }
-        catch(const runtime_error e)
-        {
-            SetErrorMessage(e.what());
-        }
-        catch(const std::exception& e)
-        {
-            SetErrorMessage(e.what());
-        }
-    }
-
-    void HandleOKCallback()
-    {
-        Nan::HandleScope scope;
-
-        Local<Value> argv[] = {
-            Nan::Undefined(),
-            Nan::New<v8::Number>((unsigned int)send_ret.sendStatus),
-            Nan::New<v8::String>(send_ret.msgId).ToLocalChecked(),
-            Nan::New<v8::Number>((long long)send_ret.offset)
-        };
-        callback->Call(4, argv);
-    }
-
-private:
-    CMessage* msg;
-    RocketMQProducer* producer;
-
-    CSendResult send_ret;
-};
-
-}
-
-#endif
diff --git a/src/workers/producer/start_or_shutdown.h b/src/workers/producer/start_or_shutdown.h
deleted file mode 100644
index dde99a6..0000000
--- a/src/workers/producer/start_or_shutdown.h
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __ROCKETMQ_PRODUCER_START_OR_SHUTDOWN_H__
-#define __ROCKETMQ_PRODUCER_START_OR_SHUTDOWN_H__
-
-#include <nan.h>
-#include <CProducer.h>
-#include <MQClientException.h>
-
-namespace __node_rocketmq__ {
-
-using namespace std;
-
-enum ProducerWorkerType {
-    START_PRODUCER = 0,
-    SHUTDOWN_PRODUCER
-};
-
-class ProducerStartOrShutdownWorker : public Nan::AsyncWorker {
-public:
-    ProducerStartOrShutdownWorker(Nan::Callback* callback, CProducer* producer_ptr, ProducerWorkerType type) :
-        AsyncWorker(callback),
-        producer_ptr(producer_ptr),
-        ret(0),
-        type(type)
-    {
-    }
-
-    ~ProducerStartOrShutdownWorker()
-    {
-    }
-
-    void Execute()
-    {
-        try
-        {
-            switch(type) {
-            case START_PRODUCER:
-                ret = StartProducer(producer_ptr); break;
-            case SHUTDOWN_PRODUCER:
-                ret = ShutdownProducer(producer_ptr); break;
-            default: break;
-            }
-        }
-        catch(const runtime_error e)
-        {
-            SetErrorMessage(e.what());
-        }
-        catch(const exception& e)
-        {
-            SetErrorMessage(e.what());
-        }
-    }
-
-    void HandleOKCallback()
-    {
-        Nan::HandleScope scope;
-
-        Local<Value> argv[] = {
-            Nan::Undefined(),
-            Nan::New<v8::Number>((int)ret),
-        };
-        callback->Call(2, argv);
-    }
-
-private:
-    CProducer* producer_ptr;
-    int ret;
-    ProducerWorkerType type;
-};
-
-}
-
-#endif
diff --git a/src/workers/push_consumer/start_or_shutdown.h b/src/workers/push_consumer/start_or_shutdown.h
deleted file mode 100644
index 1169059..0000000
--- a/src/workers/push_consumer/start_or_shutdown.h
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __ROCKETMQ_PUSH_CONSUMER_START_OR_SHUTDOWN_H__
-#define __ROCKETMQ_PUSH_CONSUMER_START_OR_SHUTDOWN_H__
-
-#include <nan.h>
-#include <CPushConsumer.h>
-#include <MQClientException.h>
-
-namespace __node_rocketmq__ {
-
-using namespace std;
-
-enum PushConsumerWorkerType {
-    START_PUSH_CONSUMER = 0,
-    SHUTDOWN_PUSH_CONSUMER
-};
-
-class PushConsumerStartOrShutdownWorker : public Nan::AsyncWorker {
-public:
-    PushConsumerStartOrShutdownWorker(Nan::Callback* callback, CPushConsumer* consumer_ptr, PushConsumerWorkerType type) :
-        AsyncWorker(callback),
-        consumer_ptr(consumer_ptr),
-        ret(0),
-        type(type)
-    {
-    }
-
-    ~PushConsumerStartOrShutdownWorker()
-    {
-    }
-
-    void Execute()
-    {
-        try
-        {
-            switch(type) {
-            case START_PUSH_CONSUMER:
-                ret = StartPushConsumer(consumer_ptr); break;
-            case SHUTDOWN_PUSH_CONSUMER:
-                ret = ShutdownPushConsumer(consumer_ptr); break;
-            default: break;
-            }
-        }
-        catch(const runtime_error e)
-        {
-            SetErrorMessage(e.what());
-        }
-        catch(const exception& e)
-        {
-            SetErrorMessage(e.what());
-        }
-    }
-
-    void HandleOKCallback()
-    {
-        Nan::HandleScope scope;
-
-        Local<Value> argv[] = {
-            Nan::Undefined(),
-            Nan::New<v8::Number>((int)ret),
-        };
-        callback->Call(2, argv);
-    }
-
-private:
-    CPushConsumer* consumer_ptr;
-    int ret;
-    PushConsumerWorkerType type;
-};
-
-}
-
-#endif