You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by if...@apache.org on 2021/03/27 18:38:35 UTC

[rocketmq-client-nodejs] 01/01: refactor: powered by N-API and rocketmq-client-cpp@re_dev

This is an automated email from the ASF dual-hosted git repository.

ifplusor pushed a commit to branch n-api
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-nodejs.git

commit d7d8287a5882b9f06035fb3b84017aea079b5db7
Author: James Yin <yw...@hotmail.com>
AuthorDate: Fri Mar 26 12:30:05 2021 +0800

    refactor: powered by N-API and rocketmq-client-cpp@re_dev
---
 .clang-format                                 | 111 ++++++
 .gitignore                                    |   6 +-
 .gitmodules                                   |   5 +-
 README.md                                     |  10 +-
 binding.gyp                                   |  28 +-
 deps/rocketmq                                 |   2 +-
 lib/common.js                                 |  28 --
 lib/env_init.js                               |  32 --
 lib/producer.js                               |   6 +-
 lib/push_consumer.js                          |   6 +-
 package.json                                  |  22 +-
 script/download_lib.js                        | 131 ------
 script/get_linux_distro_route.js              | 102 -----
 src/consumer_ack.cpp                          |  86 ++--
 src/consumer_ack.h                            |  51 +--
 src/consumer_ack_inner.cpp                    |  77 ----
 src/consumer_ack_inner.h                      |  42 --
 src/producer.cpp                              | 440 +++++++++++----------
 src/producer.h                                |  48 +--
 src/push_consumer.cpp                         | 546 ++++++++++----------------
 src/push_consumer.h                           |  66 ++--
 src/rocketmq.cpp                              |  32 +-
 src/workers/producer/send_message.h           |  80 ----
 src/workers/producer/start_or_shutdown.h      |  88 -----
 src/workers/push_consumer/start_or_shutdown.h |  88 -----
 25 files changed, 691 insertions(+), 1442 deletions(-)

diff --git a/.clang-format b/.clang-format
new file mode 100644
index 0000000..4aad29c
--- /dev/null
+++ b/.clang-format
@@ -0,0 +1,111 @@
+---
+Language:        Cpp
+# BasedOnStyle:  Google
+AccessModifierOffset: -1
+AlignAfterOpenBracket: Align
+AlignConsecutiveAssignments: false
+AlignConsecutiveDeclarations: false
+AlignEscapedNewlines: Right
+AlignOperands:   true
+AlignTrailingComments: true
+AllowAllParametersOfDeclarationOnNextLine: true
+AllowShortBlocksOnASingleLine: false
+AllowShortCaseLabelsOnASingleLine: false
+AllowShortFunctionsOnASingleLine: Inline
+AllowShortIfStatementsOnASingleLine: true
+AllowShortLoopsOnASingleLine: true
+AlwaysBreakAfterDefinitionReturnType: None
+AlwaysBreakAfterReturnType: None
+AlwaysBreakBeforeMultilineStrings: false
+AlwaysBreakTemplateDeclarations: true
+BinPackArguments: false
+BinPackParameters: false
+BraceWrapping:
+  AfterClass:      false
+  AfterControlStatement: false
+  AfterEnum:       false
+  AfterFunction:   false
+  AfterNamespace:  false
+  AfterObjCDeclaration: false
+  AfterStruct:     false
+  AfterUnion:      false
+  AfterExternBlock: false
+  BeforeCatch:     false
+  BeforeElse:      false
+  IndentBraces:    false
+  SplitEmptyFunction: true
+  SplitEmptyRecord: true
+  SplitEmptyNamespace: true
+BreakBeforeBinaryOperators: None
+BreakBeforeBraces: Attach
+BreakBeforeInheritanceComma: false
+BreakBeforeTernaryOperators: true
+BreakConstructorInitializersBeforeComma: false
+BreakConstructorInitializers: BeforeColon
+BreakAfterJavaFieldAnnotations: false
+BreakStringLiterals: true
+ColumnLimit:      80
+CommentPragmas:  '^ IWYU pragma:'
+CompactNamespaces: false
+ConstructorInitializerAllOnOneLineOrOnePerLine: true
+ConstructorInitializerIndentWidth: 4
+ContinuationIndentWidth: 4
+Cpp11BracedListStyle: true
+DerivePointerAlignment: false
+DisableFormat:   false
+ExperimentalAutoDetectBinPacking: false
+FixNamespaceComments: true
+ForEachMacros:
+  - foreach
+  - Q_FOREACH
+  - BOOST_FOREACH
+IncludeBlocks:   Preserve
+IncludeCategories:
+  - Regex:           '^<ext/.*\.h>'
+    Priority:        2
+  - Regex:           '^<.*\.h>'
+    Priority:        1
+  - Regex:           '^<.*'
+    Priority:        2
+  - Regex:           '.*'
+    Priority:        3
+IncludeIsMainRegex: '([-_](test|unittest))?$'
+IndentCaseLabels: true
+IndentPPDirectives: None
+IndentWidth:     2
+IndentWrappedFunctionNames: false
+JavaScriptQuotes: Leave
+JavaScriptWrapImports: true
+KeepEmptyLinesAtTheStartOfBlocks: false
+MacroBlockBegin: ''
+MacroBlockEnd:   ''
+MaxEmptyLinesToKeep: 1
+NamespaceIndentation: None
+ObjCBlockIndentWidth: 2
+ObjCSpaceAfterProperty: false
+ObjCSpaceBeforeProtocolList: false
+PenaltyBreakAssignment: 2
+PenaltyBreakBeforeFirstCallParameter: 1
+PenaltyBreakComment: 300
+PenaltyBreakFirstLessLess: 120
+PenaltyBreakString: 1000
+PenaltyExcessCharacter: 1000000
+PenaltyReturnTypeOnItsOwnLine: 200
+PointerAlignment: Left
+ReflowComments:  true
+SortIncludes:    true
+SortUsingDeclarations: true
+SpaceAfterCStyleCast: false
+SpaceAfterTemplateKeyword: true
+SpaceBeforeAssignmentOperators: true
+SpaceBeforeParens: ControlStatements
+SpaceInEmptyParentheses: false
+SpacesBeforeTrailingComments: 2
+SpacesInAngles:  false
+SpacesInContainerLiterals: true
+SpacesInCStyleCastParentheses: false
+SpacesInParentheses: false
+SpacesInSquareBrackets: false
+Standard:        Auto
+TabWidth:        8
+UseTab:          Never
diff --git a/.gitignore b/.gitignore
index a50fd0b..6978ed0 100644
--- a/.gitignore
+++ b/.gitignore
@@ -197,6 +197,8 @@ $RECYCLE.BIN/
 # Windows shortcuts
 *.lnk
 
-build
 package-lock.json
-.vscode
+.vscode/
+build/
+Debug/
+Release/
diff --git a/.gitmodules b/.gitmodules
index c9fc461..51ac186 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -1,3 +1,4 @@
 [submodule "deps/rocketmq"]
-path = deps/rocketmq
-url = https://github.com/apache/rocketmq-client-cpp.git
+	path = deps/rocketmq
+	url = https://github.com/apache/rocketmq-client-cpp.git
+	branch = re_dev
diff --git a/README.md b/README.md
index cdb8f65..dd15ba7 100644
--- a/README.md
+++ b/README.md
@@ -6,12 +6,12 @@
 [![TravisCI](https://travis-ci.org/apache/rocketmq-client-nodejs.svg)](https://travis-ci.org/apache/rocketmq-client-nodejs)
 [![Dependency](https://david-dm.org/apache/rocketmq-client-nodejs.svg)](https://david-dm.org/apache/rocketmq-client-nodejs)
 
-This official Node.js client is a lightweight wrapper around  [rocketmq-client-cpp](https://github.com/apache/rocketmq-client-cpp), a finely tuned CPP client.
+This official Node.js client is a lightweight wrapper around [re_dev branch of rocketmq-client-cpp](https://github.com/apache/rocketmq-client-cpp/tree/re_dev), a finely tuned CPP client.
 
 
 > **Notice 1:** This client is still in `dev` version. Use it cautiously in production.
 
-> **Notice 2:** This SDK is now only support macOS and Ubuntu **14.04**. Ubuntu 16+ is not supported and CentOS is not tested yet.
+> **Notice 2:** This SDK is now only tested on macOS. Ubuntu and CentOS is not tested yet.
 
 ## Installation
 
@@ -50,7 +50,8 @@ new Producer(groupId[, instanceName][, options]);
   - `compressLevel`: the compress level (0-9) of this producer, default to `5` where `0` is fastest and `9` is most compressed;
   - `sendMessageTimeout`: send message timeout millisecond, default to `3000` and suggestion is 2000 - 3000ms;
   - `maxMessageSize`: max message size with unit (B), default to `1024 * 128` which means 128K;
-  - `logFileNum`: C++ core logic log file number, default to 3 and log file path is `$HOME/logs/rocketmq-cpp`;
+  - `logDir`: the folder where C++ core logic log store, default log file path is `$HOME/logs/rocketmq-cpp`;
+  - `logFileNum`: C++ core logic log file number, default to 3;
   - `logFileSize`: size of each C++ core logic log file with unit (B);
   - `logLevel`: C++ core logic log level in `"fatal"`, `"error"`, `"warn"`, `"info"`, `"debug"`, `"trace"` and `"num"`.
 
@@ -177,7 +178,8 @@ new PushConsumer(groupId[, instanceName][, options]);
   - `nameServer`: the name server of RocketMQ;
   - `threadCount`: the thread number of underlying C++ logic;
   - `maxBatchSize`: message max batch size;
-  - `logFileNum`: C++ core logic log file number, default to 3 and log file path is `$HOME/logs/rocketmq-cpp`;
+  - `logDir`: the folder where C++ core logic log store, default log file path is `$HOME/logs/rocketmq-cpp`;
+  - `logFileNum`: C++ core logic log file number, default to 3;
   - `logFileSize`: size of each C++ core logic log file with unit (B);
   - `logLevel`: C++ core logic log level in `"fatal"`, `"error"`, `"warn"`, `"info"`, `"debug"`, `"trace"` and `"num"`.
 
diff --git a/binding.gyp b/binding.gyp
index 8f4a46e..c7c1d71 100644
--- a/binding.gyp
+++ b/binding.gyp
@@ -21,21 +21,21 @@
         "src/rocketmq.cpp",
         "src/producer.cpp",
         "src/push_consumer.cpp",
-        "src/consumer_ack.cpp",
-        "src/consumer_ack_inner.cpp"
+        "src/consumer_ack.cpp"
       ],
       "include_dirs": [
         "deps/rocketmq/include",
-        "<!(node -e \"require('nan')\")"
+        "<!@(node -p \"require('node-addon-api').include\")"
+      ],
+      "library_dirs": [
+        "<(module_root_dir)/deps/rocketmq/bin"
       ],
       "conditions": [
         ["OS==\"linux\"", {
-          "libraries": [
-            "<(module_root_dir)/deps/lib/librocketmq.a"
-          ],
-          "cflags_cc!": [ "-fno-exceptions", "-pthread", "-Wl,--no-as-needed", "-ldl" ],
-          "cflags_cc": [ "-Wno-ignored-qualifiers" ],
-          "cflags": [ "-std=c++11", "-g" ]
+          "libraries": [ "-lrocketmq" ],
+          "cflags_cc!": [ "-fno-exceptions", "-fno-rtti", "-pthread", "-Wl,--no-as-needed", "-ldl" ],
+          "cflags_cc": [ "-Wall", "-std=c++11" ],
+          "cflags": [ "-g" ]
         }],
         ["OS==\"win\"", {
           "libraries": [
@@ -49,13 +49,13 @@
           ]
         }],
         ["OS==\"mac\"", {
+          "libraries": [ "-lrocketmq" ],
           "xcode_settings": {
-            "GCC_ENABLE_CPP_EXCEPTIONS": "YES"
+            "GCC_ENABLE_CPP_EXCEPTIONS": "YES",
+            'GCC_ENABLE_CPP_RTTI': 'YES'
           },
-          "cflags!": [ "-fno-exceptions" ],
-          "cflags_cc!": [ "-fno-exceptions", "-pthread", "-Wl,--no-as-needed", "-ldl" ],
-          "cflags_cc": [ "-Wno-ignored-qualifiers" ],
-          "cflags": [ "-std=c++11", "-stdlib=libc++" ]
+          "cflags_cc!": [ "-fno-exceptions", "-fno-rtti", "-pthread", "-Wl,--no-as-needed", "-ldl" ],
+          "cflags_cc": [ "-Wall", "-std=c++11", "-stdlib=libc++" ]
         }]
       ]
     }
diff --git a/deps/rocketmq b/deps/rocketmq
index d5887b6..db0e209 160000
--- a/deps/rocketmq
+++ b/deps/rocketmq
@@ -1 +1 @@
-Subproject commit d5887b63ddbba16fec562f64dca7f77ce9ca0bb1
+Subproject commit db0e20972a78d70930acce5eaff1456408b72c73
diff --git a/lib/common.js b/lib/common.js
deleted file mode 100644
index 2386605..0000000
--- a/lib/common.js
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-"use strict";
-
-exports.requireBinding = function(name) {
-    let mod;
-    try {
-        mod = require(`../build/Debug/${name}`);
-    } catch(e) {
-        mod = require(`../build/Release/${name}`);
-    }
-
-    return mod;
-};
diff --git a/lib/env_init.js b/lib/env_init.js
deleted file mode 100644
index 0c26a1d..0000000
--- a/lib/env_init.js
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-"use strict";
-
-const os = require("os");
-const path = require("path");
-
-const common = require("./common");
-
-const binding = common.requireBinding("rocketmq");
-
-switch(os.platform()) {
-case "darwin":
-    process.env.EVENT_NOKQUEUE = "1";
-    binding.macosDLOpen(path.join(__dirname, "../deps/lib/librocketmq.dylib"));
-    break;
-default: break;
-}
diff --git a/lib/producer.js b/lib/producer.js
index f4f5c93..02b777e 100644
--- a/lib/producer.js
+++ b/lib/producer.js
@@ -16,13 +16,9 @@
  */
 "use strict";
 
-require("./env_init");
-
 const assert = require("assert");
 
-const common = require("./common");
-
-const binding = common.requireBinding("rocketmq");
+const binding = require("bindings")("rocketmq");
 
 const START_OR_SHUTDOWN = Symbol("RocketMQProducer#startOrShutdown");
 
diff --git a/lib/push_consumer.js b/lib/push_consumer.js
index 3bb51de..7cf97ab 100644
--- a/lib/push_consumer.js
+++ b/lib/push_consumer.js
@@ -16,14 +16,10 @@
  */
 "use strict";
 
-require("./env_init");
-
 const assert = require("assert");
 const EventEmitter = require("events").EventEmitter;
 
-const common = require("./common");
-
-const binding = common.requireBinding("rocketmq");
+const binding = require("bindings")("rocketmq");
 
 const START_OR_SHUTDOWN = Symbol("RocketMQPushConsumer#startOrShutdown");
 const START_STATUS = {
diff --git a/package.json b/package.json
index 074198d..5e38e40 100644
--- a/package.json
+++ b/package.json
@@ -1,25 +1,23 @@
 {
   "name": "apache-rocketmq",
-  "version": "1.0.0-rc1",
-  "cppSDKVersion": "1.2.0",
+  "version": "2.0.0-rc1",
+  "cppSDKVersion": "3.0.0",
   "description": "RocketMQ binding for Node.js",
+  "license": "Apache-2.0",
+  "author": "James Yin <if...@apache.org>",
   "main": "index.js",
   "scripts": {
+    "preinstall": "git submodule update --init --recommend-shallow",
+    "install": "deps/rocketmq/build.sh && node-gyp rebuild",
     "test": "npm run lint && echo 'temp example test' && node example/producer.js && node example/push_consumer.js",
-    "lint": "eslint .",
-    "install": "node ./script/download_lib.js && node-gyp rebuild"
+    "lint": "eslint ."
   },
-  "author": "XadillaX <i...@2333.moe>",
-  "license": "Apache-2.0",
   "dependencies": {
-    "co": "^4.6.0",
-    "destroy": "^1.0.4",
-    "getos": "^3.1.1",
-    "mkdirp": "^0.5.1",
-    "nan": "^2.11.1",
-    "urllib": "^2.31.3"
+    "bindings": "^1.5.0",
+    "node-addon-api": "^3.1.0"
   },
   "devDependencies": {
+    "co": "^4.6.0",
     "eslint": "^5.9.0",
     "eslint-config-rocketmq-style": "^1.0.0"
   }
diff --git a/script/download_lib.js b/script/download_lib.js
deleted file mode 100644
index 5cfebfd..0000000
--- a/script/download_lib.js
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-"use strict";
-
-const fs = require("fs");
-const os = require("os");
-const path = require("path");
-
-const co = require("co");
-const destroy = require("destroy");
-const _mkdirp = require("mkdirp");
-const urllib = require("urllib");
-
-const getLinuxDistroRoute = require("./get_linux_distro_route");
-const pkg = require("../package");
-
-let REGISTRY_MIRROR =
-    process.env.NODE_ROCKETMQ_REGISTRY ||
-    "https://opensource-rocketmq-client.oss-cn-hangzhou.aliyuncs.com";
-if(!REGISTRY_MIRROR.endsWith("/")) REGISTRY_MIRROR += "/";
-
-const CPP_SDK_VERSION = pkg.cppSDKVersion;
-const LIB_DIR = path.join(__dirname, "..", "deps", "lib");
-const URL_ROOT = `${REGISTRY_MIRROR}cpp-client`;
-
-function mkdirp(dir) {
-    return new Promise((resolve, reject) => {
-        _mkdirp(dir, null, err => {
-            if(err) reject(err);
-            else resolve();
-        });
-    });
-}
-
-function *getUrlArray() {
-    const platform = os.platform();
-    const ret = [];
-    let distro;
-
-    switch(platform) {
-    case "win32":
-        ret.push(`${URL_ROOT}/windows/${CPP_SDK_VERSION}/rocketmq-client-cpp.dll`);
-        ret.push(`${URL_ROOT}/windows/${CPP_SDK_VERSION}/rocketmq-client-cpp.lib`);
-        break;
-
-    case "darwin":
-        ret.push(`${URL_ROOT}/mac/${CPP_SDK_VERSION}/librocketmq.dylib`);
-        break;
-
-    case "linux":
-        distro = yield getLinuxDistroRoute();
-        ret.push(`${URL_ROOT}/linux/${CPP_SDK_VERSION}/${distro}/librocketmq.a`);
-        break;
-
-    default: throw new Error(`Unsupported platform ${platform}`);
-    }
-
-    return ret;
-}
-
-co(function *() {
-    let urls;
-    try {
-        urls = yield getUrlArray();
-    } catch(e) {
-        console.error(`[rocketmq sdk] [error] ${e.message}`);
-        process.exit(4);
-    }
-
-    yield mkdirp(LIB_DIR);
-    let writeTimes = 0;
-    for(const url of urls) {
-        console.log(`[rocketmq sdk] [info] downloading [${url}]...`);
-
-        const resp = yield urllib.request(url, {
-            timeout: 60000 * 5,
-            followRedirect: true,
-            streaming: true
-        });
-
-        if(resp.status !== 200) {
-            destroy(resp.res);
-            console.error(`[rocketmq sdk] [error] error status ${resp.status} while downloading [${url}].`);
-            process.exit(4);
-        }
-
-        const readStream = resp.res;
-        const filename = path.join(LIB_DIR, path.basename(url));
-        const writeStream = fs.createWriteStream(filename, {
-            encoding: "binary"
-        });
-
-        // eslint-disable-next-line
-        function handleDownladCallback(err) {
-            if(err) {
-                console.error(`[rocketmq sdk] [error] error occurred while downloading [${url}] to [${filename}].`);
-                console.error(err.stack);
-                process.exit(4);
-            }
-
-            writeTimes++;
-            destroy(resp.res);
-
-            console.log(`[rocketmq sdk] [info] downloaded library [${url}].`);
-            if(writeTimes === urls.length) {
-                console.log("[rocketmq sdk] [info] all libraries have been written to disk.");
-                process.exit(0);
-            }
-        }
-
-        readStream.on("error", handleDownladCallback);
-        writeStream.on("error", handleDownladCallback);
-        writeStream.on("finish", handleDownladCallback);
-
-        readStream.pipe(writeStream);
-    }
-});
diff --git a/script/get_linux_distro_route.js b/script/get_linux_distro_route.js
deleted file mode 100644
index f03c827..0000000
--- a/script/get_linux_distro_route.js
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-"use strict";
-
-const assert = require("assert");
-
-const getos = require("getos");
-
-const RHEL_MAP_ARRAY = [
-    "Centos",
-    "Red Hat Linux",
-    "RHEL",
-    "Scientific Linux",
-    "ScientificSL",
-    "ScientificCERNSLC",
-    "ScientificFermiLTS",
-    "ScientificSLF"
-];
-const NORMAL_MAP_ARRAY = [
-    "Alpine Linux",
-    "Amazon Linux",
-    "Arch Linux",
-    "Chakra",
-    "Debian",
-    "elementary OS",
-    "IYCC",
-    "Linux Mint",
-    "Manjaro Linux",
-    "Ubuntu Linux"
-];
-
-function realGetLinuxDistroRoute(dist, release) {
-    let major;
-    if(release) {
-        major = Number(release.split(".")[0]);
-    }
-
-    // RHEL Distros
-    if(RHEL_MAP_ARRAY.includes(dist)) {
-        assert(major >= 5 && major <= 7, `Only support ${dist} 5-7.`);
-        return `RHEL${major}.x`;
-    }
-
-    // Fedora
-    if("Fedora" === dist) {
-        if(major <= 18) {
-            return "RHEL6.x";
-        } else if(major === 19) {
-            return "RHEL7.x";
-        }
-    }
-
-    if("Ubuntu Linux" === dist) {
-        assert([ 14, 16, 18 ].includes(major));
-        return `UBUNTU/${major}.04`;
-    }
-
-    if("Debian" === dist) {
-        assert(major >= 8 && major <= 10);
-        return `UBUNTU/${(major + 2) / 2}.04`;
-    }
-
-    // Ubuntu Distros
-    if(!NORMAL_MAP_ARRAY.includes(dist)) {
-        console.error(`[rocketmq sdk] [warn] ${dist} may not supported, fallback to use Ubuntu library.`);
-    }
-
-    return "UBUNTU/14.04";
-}
-
-function getLinuxDistroRoute() {
-    return new Promise((resolve, reject) => {
-        getos(function(err, ret) {
-            if(err) return reject(err);
-
-            let route;
-            try {
-                route = realGetLinuxDistroRoute(ret.dist, ret.release);
-            } catch(e) {
-                return reject(e);
-            }
-
-            resolve(route);
-        });
-    });
-}
-
-module.exports = getLinuxDistroRoute;
diff --git a/src/consumer_ack.cpp b/src/consumer_ack.cpp
index e2f5c09..8de4aaf 100644
--- a/src/consumer_ack.cpp
+++ b/src/consumer_ack.cpp
@@ -15,76 +15,52 @@
  * limitations under the License.
  */
 #include "consumer_ack.h"
+#include <exception>
+#include "napi.h"
 
 namespace __node_rocketmq__ {
 
-Nan::Persistent<Function> ConsumerAck::constructor;
+Napi::Object ConsumerAck::Init(Napi::Env env, Napi::Object exports) {
+  Napi::Function func = DefineClass(
+      env, "ConsumerAck", {InstanceMethod<&ConsumerAck::Done>("done")});
 
-ConsumerAck::ConsumerAck() :
-    inner(NULL)
-{
-}
+  Napi::FunctionReference* constructor = new Napi::FunctionReference();
+  *constructor = Napi::Persistent(func);
+  env.SetInstanceData<Napi::FunctionReference>(constructor);
 
-ConsumerAck::~ConsumerAck()
-{
-    inner = NULL;
+  exports.Set("ConsumerAck", func);
+  return exports;
 }
 
-NAN_MODULE_INIT(ConsumerAck::Init)
-{
-    Local<FunctionTemplate> tpl = Nan::New<FunctionTemplate>(New);
-    tpl->SetClassName(Nan::New("ConsumerAck").ToLocalChecked());
-    tpl->InstanceTemplate()->SetInternalFieldCount(1);
-
-    Nan::SetPrototypeMethod(tpl, "done", Done);
-
-    constructor.Reset(tpl->GetFunction());
-    Nan::Set(target, Nan::New("ConsumerAck").ToLocalChecked(), tpl->GetFunction());
+Napi::Object ConsumerAck::NewInstance(Napi::Env env) {
+  Napi::Object obj = env.GetInstanceData<Napi::FunctionReference>()->New({});
+  return obj;
 }
 
-NAN_METHOD(ConsumerAck::New)
-{
-    Isolate* isolate = info.GetIsolate();
-    Local<Context> context = Context::New(isolate);
-
-    if(!info.IsConstructCall())
-    {
-        Local<Function> _constructor = Nan::New<Function>(constructor);
-        info.GetReturnValue().Set(_constructor->NewInstance(context, 0, 0).ToLocalChecked());
-        return;
-    }
+ConsumerAck::ConsumerAck(const Napi::CallbackInfo& info)
+    : Napi::ObjectWrap<ConsumerAck>(info) {}
 
-    ConsumerAck* producer = new ConsumerAck();
-    producer->Wrap(info.This());
-    info.GetReturnValue().Set(info.This());
+void ConsumerAck::SetPromise(std::promise<bool>&& promise) {
+  promise_ = std::move(promise);
 }
 
-NAN_METHOD(ConsumerAck::Done)
-{
-    ConsumerAck* ack = ObjectWrap::Unwrap<ConsumerAck>(info.Holder());
-    bool succ = true;
-
-    if(info.Length() >= 1)
-    {
-        succ = (info[0]->IsUndefined() || info[0]->IsNull() || Nan::To<bool>(info[0]).FromJust());
-    }
+void ConsumerAck::Done(bool ack) {
+  promise_.set_value(ack);
+}
 
-    // call inner ack's `Ack` function to emit the true `Acker`'s `Ack` function
-    // and finish waiting of consume thread
-    CConsumeStatus status = succ ?
-        CConsumeStatus::E_CONSUME_SUCCESS :
-        CConsumeStatus::E_RECONSUME_LATER;
-    ack->Ack(status);
+void ConsumerAck::Done(std::exception_ptr exception) {
+  promise_.set_exception(exception);
 }
 
-void ConsumerAck::Ack(CConsumeStatus status)
-{
-    if(inner)
-    {
-        // call inner ack in the main event loop
-        inner->Ack(status);
-        inner = NULL;
+Napi::Value ConsumerAck::Done(const Napi::CallbackInfo& info) {
+  if (info.Length() >= 1) {
+    Napi::Value ack = info[0];
+    if (ack.IsBoolean() && !ack.ToBoolean()) {
+      Done(false);
     }
+  }
+  Done(true);
+  return info.Env().Undefined();
 }
 
-}
+}  // namespace __node_rocketmq__
diff --git a/src/consumer_ack.h b/src/consumer_ack.h
index 682d85e..ff0ebd1 100644
--- a/src/consumer_ack.h
+++ b/src/consumer_ack.h
@@ -17,50 +17,31 @@
 #ifndef __ROCKETMQ_CONSUMER_ACK_H__
 #define __ROCKETMQ_CONSUMER_ACK_H__
 
-#include "consumer_ack_inner.h"
-#include <nan.h>
+#include <future>
 
-namespace __node_rocketmq__ {
-
-using v8::Context;
-using v8::Function;
-using v8::FunctionTemplate;
-using v8::Isolate;
-using v8::Local;
-using v8::Object;
-using v8::String;
-using v8::Value;
+#include <napi.h>
 
-class ConsumerAck : public Nan::ObjectWrap {
-public:
-    static NAN_MODULE_INIT(Init);
-
-private:
-    explicit ConsumerAck();
-    ~ConsumerAck();
+namespace __node_rocketmq__ {
 
-    static NAN_METHOD(New);
-    static NAN_METHOD(Done);
+class ConsumerAck : public Napi::ObjectWrap<ConsumerAck> {
+ public:
+  static Napi::Object Init(Napi::Env env, Napi::Object exports);
+  static Napi::Object NewInstance(Napi::Env env);
 
-    void Ack(CConsumeStatus status);
+  ConsumerAck(const Napi::CallbackInfo& info);
 
-    static Nan::Persistent<v8::Function> constructor;
+  void SetPromise(std::promise<bool>&& promise);
 
-public:
-    void SetInner(ConsumerAckInner* _inner)
-    {
-        inner = _inner;
-    }
+  void Done(bool ack);
+  void Done(std::exception_ptr exception);
 
-    static Nan::Persistent<v8::Function>& GetConstructor()
-    {
-        return constructor;
-    }
+ private:
+  Napi::Value Done(const Napi::CallbackInfo& info);
 
-private:
-    ConsumerAckInner* inner;
+ private:
+  std::promise<bool> promise_;
 };
 
-}
+}  // namespace __node_rocketmq__
 
 #endif
diff --git a/src/consumer_ack_inner.cpp b/src/consumer_ack_inner.cpp
deleted file mode 100644
index 47b1a07..0000000
--- a/src/consumer_ack_inner.cpp
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "consumer_ack_inner.h"
-
-namespace __node_rocketmq__ {
-
-ConsumerAckInner::ConsumerAckInner() :
-    acked(false)
-{
-    uv_cond_init(&cond);
-    uv_mutex_init(&mutex);
-}
-
-ConsumerAckInner::~ConsumerAckInner()
-{
-    uv_mutex_destroy(&mutex);
-    uv_cond_destroy(&cond);
-}
-
-void ConsumerAckInner::Ack(CConsumeStatus _status)
-{
-    uv_mutex_lock(&mutex);
-    bool _acked = acked;
-
-    if(_acked)
-    {
-        uv_mutex_unlock(&mutex);
-        return;
-    }
-
-    status = _status;
-    acked = true;
-
-    // tell `this->WaitResult()` to continue
-    uv_cond_signal(&cond);
-    uv_mutex_unlock(&mutex);
-}
-
-CConsumeStatus ConsumerAckInner::WaitResult()
-{
-    uv_mutex_lock(&mutex);
-
-    // if `cond signal` send before `WaitResult()`,
-    // `uv_cond_wait` will be blocked and never continue
-    //
-    // so we have to return result directly without `uv_cond_wait`
-    if(acked)
-    {
-        CConsumeStatus _status = status;
-        uv_mutex_unlock(&mutex);
-        return _status;
-    }
-
-    // wait for `this->Ack()` and that will emit `uv_cond_signal` to let it stop wait
-    uv_cond_wait(&cond, &mutex);
-
-    CConsumeStatus _status = status;
-    uv_mutex_unlock(&mutex);
-
-    return _status;
-}
-
-}
diff --git a/src/consumer_ack_inner.h b/src/consumer_ack_inner.h
deleted file mode 100644
index 9caa105..0000000
--- a/src/consumer_ack_inner.h
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __ROCKETMQ_CONSUMER_ACK_INNER_H__
-#define __ROCKETMQ_CONSUMER_ACK_INNER_H__
-
-#include <uv.h>
-#include <CPushConsumer.h>
-
-namespace __node_rocketmq__ {
-
-class ConsumerAckInner {
-public:
-    ConsumerAckInner();
-    ~ConsumerAckInner();
-
-    void Ack(CConsumeStatus _status);
-    CConsumeStatus WaitResult();
-
-private:
-    bool acked;
-    CConsumeStatus status;
-    uv_mutex_t mutex;
-    uv_cond_t cond;
-};
-
-}
-
-#endif
diff --git a/src/producer.cpp b/src/producer.cpp
index 3ae4047..45dfe1f 100644
--- a/src/producer.cpp
+++ b/src/producer.cpp
@@ -15,253 +15,277 @@
  * limitations under the License.
  */
 #include "producer.h"
-#include "workers/producer/send_message.h"
-#include "workers/producer/start_or_shutdown.h"
 
-#include <MQClientException.h>
+#include <cstddef>
+#include <exception>
 #include <string>
-using namespace std;
 
-namespace __node_rocketmq__ {
-
-#define NAN_GET_CPRODUCER() \
-    RocketMQProducer* _v8_producer = ObjectWrap::Unwrap<RocketMQProducer>(info.Holder()); \
-    CProducer* producer_ptr = _v8_producer->GetProducer();
-
-Nan::Persistent<Function> RocketMQProducer::constructor;
+#include <napi.h>
 
-RocketMQProducer::RocketMQProducer(const char* group_id, const char* instance_name)
-{
-    producer_ptr = CreateProducer(group_id);
-    if(instance_name)
-    {
-        SetProducerInstanceName(producer_ptr, instance_name);
-    }
-}
+#include <ClientRPCHook.h>
+#include <LoggerConfig.h>
+#include <MQException.h>
+#include <MQMessage.h>
+#include <SendCallback.h>
 
-RocketMQProducer::~RocketMQProducer()
-{
-    try
-    {
-        ShutdownProducer(producer_ptr);
-    }
-    catch (...)
-    {
-        //
-    }
+namespace __node_rocketmq__ {
 
-    DestroyProducer(producer_ptr);
+Napi::Object RocketMQProducer::Init(Napi::Env env, Napi::Object exports) {
+  Napi::Function func =
+      DefineClass(env,
+                  "RocketMQProducer",
+                  {
+                      InstanceMethod<&RocketMQProducer::Start>("start"),
+                      InstanceMethod<&RocketMQProducer::Shutdown>("shutdown"),
+                      InstanceMethod<&RocketMQProducer::Send>("send"),
+                      InstanceMethod<&RocketMQProducer::SetSessionCredentials>(
+                          "setSessionCredentials"),
+                  });
+
+  Napi::FunctionReference* constructor = new Napi::FunctionReference();
+  *constructor = Napi::Persistent(func);
+  env.SetInstanceData<Napi::FunctionReference>(constructor);
+
+  exports.Set("Producer", func);
+  return exports;
 }
 
-void RocketMQProducer::SetOptions(Local<Object> options)
-{
-    // set name server
-    Local<Value> _name_server_v = Nan::Get(options, Nan::New<String>("nameServer").ToLocalChecked()).ToLocalChecked();
-    if(_name_server_v->IsString())
-    {
-        Nan::Utf8String namesrv(_name_server_v);
-        SetProducerNameServerAddress(producer_ptr, *namesrv);
-    }
-
-    // set group name
-    Local<Value> _group_name_v = Nan::Get(options, Nan::New<String>("groupName").ToLocalChecked()).ToLocalChecked();
-    if(_group_name_v->IsString())
-    {
-        Nan::Utf8String group_name(_group_name_v);
-        SetProducerGroupName(producer_ptr, *group_name);
-    }
+RocketMQProducer::RocketMQProducer(const Napi::CallbackInfo& info)
+    : Napi::ObjectWrap<RocketMQProducer>(info), producer_("") {
+  const Napi::Value group_name = info[0];
+  if (group_name.IsString()) {
+    producer_.set_group_name(group_name.ToString());
+  }
 
-    // set log num & single log size
-    int file_num = 3;
-    int64 file_size = 104857600;
-    Local<Value> _log_file_num_v = Nan::Get(options, Nan::New<String>("logFileNum").ToLocalChecked()).ToLocalChecked();
-    Local<Value> _log_file_size_v = Nan::Get(options, Nan::New<String>("logFileSize").ToLocalChecked()).ToLocalChecked();
-    if(_log_file_num_v->IsNumber())
-    {
-        file_num = _log_file_num_v->Int32Value();
-    }
-    if(_log_file_size_v->IsNumber())
-    {
-        file_size = _log_file_size_v->Int32Value();
-    }
-    SetProducerLogFileNumAndSize(producer_ptr, file_num, file_size);
-
-    // set log level
-    Local<Value> _log_level_v = Nan::Get(options, Nan::New<String>("logLevel").ToLocalChecked()).ToLocalChecked();
-    if(_log_level_v->IsNumber())
-    {
-        int level = _log_level_v->Int32Value();
-        SetProducerLogLevel(producer_ptr, (CLogLevel)level);
-    }
+  const Napi::Value instance_name = info[1];
+  if (instance_name.IsString()) {
+    producer_.set_instance_name(instance_name.ToString());
+  }
 
-    // set compress level
-    Local<Value> _compress_level_v = Nan::Get(options, Nan::New<String>("compressLevel").ToLocalChecked()).ToLocalChecked();
-    if(_compress_level_v->IsNumber()) {
-        int level = _compress_level_v->Int32Value();
-        SetProducerCompressLevel(producer_ptr, level);
-    }
+  const Napi::Value options = info[2];
+  if (options.IsObject()) {
+    // try to set options
+    SetOptions(options.ToObject());
+  }
+}
 
-    // set send message timeout
-    Local<Value> _send_message_timeout_v = Nan::Get(options, Nan::New<String>("sendMessageTimeout").ToLocalChecked()).ToLocalChecked();
-    if(_send_message_timeout_v->IsNumber())
-    {
-        int timeout = _send_message_timeout_v->Int32Value();
-        SetProducerSendMsgTimeout(producer_ptr, timeout);
-    }
+RocketMQProducer::~RocketMQProducer() {
+  producer_.shutdown();
+}
 
-    // set max message size
-    Local<Value> _max_message_size_v = Nan::Get(options, Nan::New<String>("maxMessageSize").ToLocalChecked()).ToLocalChecked();
-    if(_max_message_size_v->IsNumber())
-    {
-        int size = _max_message_size_v->Int32Value();
-        SetProducerMaxMessageSize(producer_ptr, size);
+void RocketMQProducer::SetOptions(const Napi::Object& options) {
+  // set name server
+  Napi::Value name_server = options.Get("nameServer");
+  if (name_server.IsString()) {
+    producer_.set_namesrv_addr(name_server.ToString());
+  }
+
+  // set group name
+  Napi::Value group_name = options.Get("groupName");
+  if (group_name.IsString()) {
+    producer_.set_group_name(group_name.ToString());
+  }
+
+  // set max message size
+  Napi::Value max_message_size = options.Get("maxMessageSize");
+  if (max_message_size.IsNumber()) {
+    producer_.set_max_message_size(max_message_size.ToNumber());
+  }
+
+  // set compress level
+  Napi::Value compress_level = options.Get("compressLevel");
+  if (compress_level.IsNumber()) {
+    producer_.set_compress_level(compress_level.ToNumber());
+  }
+
+  // set send message timeout
+  Napi::Value send_message_timeout = options.Get("sendMessageTimeout");
+  if (send_message_timeout.IsNumber()) {
+    producer_.set_send_msg_timeout(send_message_timeout.ToNumber());
+  }
+
+  // set log level
+  Napi::Value log_level = options.Get("logLevel");
+  if (log_level.IsNumber()) {
+    int32_t level = log_level.ToNumber();
+    if (level >= 0 && level < rocketmq::LogLevel::LOG_LEVEL_LEVEL_NUM) {
+      rocketmq::GetDefaultLoggerConfig().set_level(
+          static_cast<rocketmq::LogLevel>(level));
     }
+  }
+
+  // set log directory
+  Napi::Value log_dir = options.Get("logDir");
+  if (log_dir.IsString()) {
+    rocketmq::GetDefaultLoggerConfig().set_path(log_dir.ToString());
+  }
+
+  // set log file size
+  Napi::Value log_file_size = options.Get("logFileSize");
+  if (log_file_size.IsNumber()) {
+    rocketmq::GetDefaultLoggerConfig().set_file_count(log_file_size.ToNumber());
+  }
+
+  // set log file num
+  Napi::Value log_file_num = options.Get("logFileNum");
+  if (log_file_num.IsNumber()) {
+    rocketmq::GetDefaultLoggerConfig().set_file_count(log_file_num.ToNumber());
+  }
 }
 
-NAN_MODULE_INIT(RocketMQProducer::Init)
-{
-    Local<FunctionTemplate> tpl = Nan::New<FunctionTemplate>(New);
-    tpl->SetClassName(Nan::New("RocketMQProducer").ToLocalChecked());
-    tpl->InstanceTemplate()->SetInternalFieldCount(1);
+Napi::Value RocketMQProducer::SetSessionCredentials(
+    const Napi::CallbackInfo& info) {
+  Napi::String access_key = info[0].As<Napi::String>();
+  Napi::String secret_key = info[1].As<Napi::String>();
+  Napi::String ons_channel = info[2].As<Napi::String>();
 
-    Nan::SetPrototypeMethod(tpl, "start", Start);
-    Nan::SetPrototypeMethod(tpl, "shutdown", Shutdown);
-    Nan::SetPrototypeMethod(tpl, "send", Send);
-    Nan::SetPrototypeMethod(tpl, "setSessionCredentials", SetSessionCredentials);
+  auto rpc_hook = std::make_shared<rocketmq::ClientRPCHook>(
+      rocketmq::SessionCredentials(access_key, secret_key, ons_channel));
+  producer_.setRPCHook(rpc_hook);
 
-    constructor.Reset(tpl->GetFunction());
-    Nan::Set(target, Nan::New("Producer").ToLocalChecked(), tpl->GetFunction());
+  return info.Env().Undefined();
 }
 
-NAN_METHOD(RocketMQProducer::New)
-{
-    Isolate* isolate = info.GetIsolate();
-    Local<Context> context = Context::New(isolate);
-
-    if(!info.IsConstructCall())
-    {
-        const int argc = 3;
-        Local<Value> argv[argc] = { info[0], info[1], info[2] };
-        Local<Function> _constructor = Nan::New<v8::Function>(constructor);
-        info.GetReturnValue().Set(_constructor->NewInstance(context, argc, argv).ToLocalChecked());
-        return;
-    }
+class ProducerStartWorker : public Napi::AsyncWorker {
+ public:
+  ProducerStartWorker(const Napi::Function& callback,
+                      const rocketmq::DefaultMQProducer& producer)
+      : Napi::AsyncWorker(callback), producer_(producer) {}
 
-    Nan::Utf8String group_id(info[0]);
-    Nan::Utf8String instance_name(info[1]);
-    Local<Object> options = Nan::To<Object>(info[2]).ToLocalChecked();
-    RocketMQProducer* producer = new RocketMQProducer(*group_id, info[1]->IsNull() ? NULL : *instance_name);
+  void Execute() override { producer_.start(); }
 
-    producer->Wrap(info.This());
+ private:
+  rocketmq::DefaultMQProducer producer_;
+};
 
-    // try to set options
-    try
-    {
-        producer->SetOptions(options);
-    }
-    catch (runtime_error e)
-    {
-        Nan::ThrowError(e.what());
-        return;
-    }
-
-    info.GetReturnValue().Set(info.This());
+Napi::Value RocketMQProducer::Start(const Napi::CallbackInfo& info) {
+  Napi::Function callback = info[0].As<Napi::Function>();
+  auto* worker = new ProducerStartWorker(callback, producer_);
+  worker->Queue();
+  return info.Env().Undefined();
 }
 
-NAN_METHOD(RocketMQProducer::Start)
-{
-    NAN_GET_CPRODUCER();
-
-    Nan::Callback* callback = (info[0]->IsFunction()) ?
-        new Nan::Callback(Nan::To<Function>(info[0]).ToLocalChecked()) :
-        NULL;
-
-    Nan::AsyncQueueWorker(new ProducerStartOrShutdownWorker(callback, producer_ptr, ProducerWorkerType::START_PRODUCER));
-}
+class ProducerShutdownWorker : public Napi::AsyncWorker {
+ public:
+  ProducerShutdownWorker(const Napi::Function& callback,
+                         const rocketmq::DefaultMQProducer& producer)
+      : Napi::AsyncWorker(callback), producer_(producer) {}
 
-NAN_METHOD(RocketMQProducer::Shutdown)
-{
-    NAN_GET_CPRODUCER();
+  void Execute() override { producer_.shutdown(); }
 
-    Nan::Callback* callback = (info[0]->IsFunction()) ?
-        new Nan::Callback(Nan::To<Function>(info[0]).ToLocalChecked()) :
-        NULL;
+ private:
+  rocketmq::DefaultMQProducer producer_;
+};
 
-    Nan::AsyncQueueWorker(new ProducerStartOrShutdownWorker(callback, producer_ptr, ProducerWorkerType::SHUTDOWN_PRODUCER));
+Napi::Value RocketMQProducer::Shutdown(const Napi::CallbackInfo& info) {
+  Napi::Function callback = info[0].As<Napi::Function>();
+  auto* worker = new ProducerShutdownWorker(callback, producer_);
+  worker->Queue();
+  return info.Env().Undefined();
 }
 
-NAN_METHOD(RocketMQProducer::SetSessionCredentials)
-{
-    NAN_GET_CPRODUCER();
-
-    Nan::Utf8String access_key(info[0]);
-    Nan::Utf8String secret_key(info[1]);
-    Nan::Utf8String ons_channel(info[2]);
-
-    int ret;
-    try
-    {
-        ret = SetProducerSessionCredentials(producer_ptr, *access_key, *secret_key, *ons_channel);
+class ProducerSendCallback : public rocketmq::AutoDeleteSendCallback {
+ private:
+  struct ResultOrException {
+    std::unique_ptr<rocketmq::SendResult> result;
+    std::exception_ptr exception;
+  };
+
+ public:
+  ProducerSendCallback(Napi::Env&& env, Napi::Function&& callback)
+      : callback_(
+            Callback::New(env, callback, "RocketMQ Send Callback", 0, 1)) {}
+
+  ~ProducerSendCallback() { callback_.Release(); }
+
+  void onSuccess(rocketmq::SendResult& send_result) override {
+    auto* data =
+        new ResultOrException{std::unique_ptr<rocketmq::SendResult>(
+                                  new rocketmq::SendResult(send_result)),
+                              nullptr};
+    napi_status status = callback_.BlockingCall(data);
+    if (status != napi_ok) {
+      // TODO: Handle error
+      std::exit(-1);
     }
-    catch(runtime_error e)
-    {
-        Nan::ThrowError(e.what());
+  }
+
+  void onException(rocketmq::MQException& exception) noexcept override {
+    auto* data =
+        new ResultOrException{nullptr, std::make_exception_ptr(exception)};
+    napi_status status = callback_.BlockingCall(data);
+    if (status != napi_ok) {
+      // TODO: Handle error
+      std::exit(-1);
     }
-    catch(std::exception& e)
-    {
-        Nan::ThrowError(e.what());
+  }
+
+  static void CallJs(Napi::Env env,
+                     Napi::Function callback,
+                     std::nullptr_t*,
+                     ResultOrException* data) {
+    std::unique_ptr<ResultOrException> data_guard(data);
+    if (env != nullptr) {
+      if (callback != nullptr) {
+        if (data->exception) {
+          try {
+            std::rethrow_exception(data->exception);
+          } catch (const std::exception& e) {
+            callback.Call(Napi::Object::New(callback.Env()),
+                          {Napi::Error::New(env, e.what()).Value()});
+          }
+        } else {
+          callback.Call(Napi::Object::New(callback.Env()),
+                        {env.Undefined(),
+                         Napi::Number::New(env, data->result->send_status()),
+                         Napi::String::New(env, data->result->msg_id()),
+                         Napi::Number::New(env, data->result->queue_offset())});
+        }
+      }
     }
-    catch(rocketmq::MQException& e)
-    {
-        Nan::ThrowError(e.what());
+  }
+
+ private:
+  using Callback = Napi::TypedThreadSafeFunction<std::nullptr_t,
+                                                 ResultOrException,
+                                                 &ProducerSendCallback::CallJs>;
+
+  Callback callback_;
+};
+
+Napi::Value RocketMQProducer::Send(const Napi::CallbackInfo& info) {
+  rocketmq::MQMessage message = [&]() {
+    Napi::String topic = info[0].As<Napi::String>();
+    Napi::Value body = info[1];
+    if (body.IsString()) {
+      return rocketmq::MQMessage(topic, body.ToString());
+    } else {
+      Napi::Buffer<char> buffer = body.As<Napi::Buffer<char>>();
+      return rocketmq::MQMessage(topic,
+                                 std::string(buffer.Data(), buffer.Length()));
     }
-    info.GetReturnValue().Set(ret);
-}
-
-NAN_METHOD(RocketMQProducer::Send)
-{
-    Nan::Utf8String topic(info[0]);
-    Local<Object> options = Nan::To<Object>(info[2]).ToLocalChecked();
-
-    CMessage* msg = CreateMessage(*topic);
+  }();
 
-    Local<Value> _tags_to_be_checked = Nan::Get(options, Nan::New<String>("tags").ToLocalChecked()).ToLocalChecked();
-    Local<Value> _keys_to_be_checked = Nan::Get(options, Nan::New<String>("keys").ToLocalChecked()).ToLocalChecked();
+  const Napi::Value options_v = info[2];
+  if (options_v.IsObject()) {
+    const Napi::Object options = options_v.ToObject();
 
-    if(_tags_to_be_checked->IsString())
-    {
-        Nan::Utf8String tags(_tags_to_be_checked);
-        SetMessageTags(msg, *tags);
+    Napi::Value tags = options.Get("tags");
+    if (tags.IsString()) {
+      message.set_tags(tags.ToString());
     }
 
-    if(_keys_to_be_checked->IsString())
-    {
-        Nan::Utf8String keys(_keys_to_be_checked);
-        SetMessageKeys(msg, *keys);
+    Napi::Value keys = options.Get("keys");
+    if (keys.IsString()) {
+      message.set_keys(keys.ToString());
     }
+  }
 
-    // set message body:
-    //   1. if it's a string, call `SetMessageBody`;
-    //   2. if it's a buffer, call `SetByteMessageBody`.
-    if(info[1]->IsString())
-    {
-        Nan::Utf8String body(info[1]);
-        SetMessageBody(msg, *body);
-    }
-    else
-    {
-        Local<Object> node_buff_object = Nan::To<Object>(info[1]).ToLocalChecked();
-        unsigned int length = node::Buffer::Length(node_buff_object);
-        const char* buff = node::Buffer::Data(node_buff_object);
-        SetByteMessageBody(msg, buff, length);
-    }
+  auto* send_callback =
+      new ProducerSendCallback(info.Env(), info[3].As<Napi::Function>());
+  producer_.send(message, send_callback);
 
-    Nan::Callback* callback = (info[3]->IsFunction()) ?
-        new Nan::Callback(Nan::To<Function>(info[3]).ToLocalChecked()) :
-        NULL;
-
-    RocketMQProducer* producer = ObjectWrap::Unwrap<RocketMQProducer>(info.Holder());
-    Nan::AsyncQueueWorker(new ProducerSendMessageWorker(callback, producer, msg));
+  return info.Env().Undefined();
 }
 
-}
+}  // namespace __node_rocketmq__
diff --git a/src/producer.h b/src/producer.h
index d20cd29..780efa4 100644
--- a/src/producer.h
+++ b/src/producer.h
@@ -17,46 +17,34 @@
 #ifndef __ROCKETMQ_PRODUCER_H__
 #define __ROCKETMQ_PRODUCER_H__
 
-#include <CProducer.h>
-#include <nan.h>
+#include <napi.h>
 
-namespace __node_rocketmq__ {
+#include <DefaultMQProducer.h>
 
-using v8::Context;
-using v8::Function;
-using v8::FunctionTemplate;
-using v8::Isolate;
-using v8::Local;
-using v8::Object;
-using v8::String;
-using v8::Value;
+namespace __node_rocketmq__ {
 
-class RocketMQProducer : public Nan::ObjectWrap {
-public:
-    static NAN_MODULE_INIT(Init);
+class RocketMQProducer : public Napi::ObjectWrap<RocketMQProducer> {
+ public:
+  static Napi::Object Init(Napi::Env env, Napi::Object exports);
 
-public:
-    CProducer* GetProducer() { return producer_ptr; }
+  RocketMQProducer(const Napi::CallbackInfo& info);
+  ~RocketMQProducer();
 
-private:
-    explicit RocketMQProducer(const char* group_id, const char* instance_name);
-    ~RocketMQProducer();
+ private:
+  Napi::Value SetSessionCredentials(const Napi::CallbackInfo& info);
 
-    static NAN_METHOD(New);
-    static NAN_METHOD(Start);
-    static NAN_METHOD(Shutdown);
-    static NAN_METHOD(Send);
-    static NAN_METHOD(SetSessionCredentials);
+  Napi::Value Start(const Napi::CallbackInfo& info);
+  Napi::Value Shutdown(const Napi::CallbackInfo& info);
 
-    static Nan::Persistent<Function> constructor;
+  Napi::Value Send(const Napi::CallbackInfo& info);
 
-private:
-    void SetOptions(Local<Object> options);
+ private:
+  void SetOptions(const Napi::Object& options);
 
-private:
-    CProducer* producer_ptr;
+ private:
+  rocketmq::DefaultMQProducer producer_;
 };
 
-}
+}  // namespace __node_rocketmq__
 
 #endif
diff --git a/src/push_consumer.cpp b/src/push_consumer.cpp
index 074a64d..63a786f 100644
--- a/src/push_consumer.cpp
+++ b/src/push_consumer.cpp
@@ -14,381 +14,259 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include <map>
 #include "push_consumer.h"
-#include "consumer_ack.h"
-#include "workers/push_consumer/start_or_shutdown.h"
-
-using namespace std;
-
-namespace __node_rocketmq__ {
 
-struct MessageHandlerParam
-{
-    RocketMQPushConsumer* consumer;
-    ConsumerAckInner* ack;
-    CMessageExt* msg;
-};
-char message_handler_param_keys[5][8] = { "topic", "tags", "keys", "body", "msgId" };
-
-uv_mutex_t _get_msg_ext_column_lock;
-
-map<CPushConsumer*, RocketMQPushConsumer*> _push_consumer_map;
+#include <exception>
+#include <future>
 
-#define NAN_GET_CPUSHCONSUMER() \
-    RocketMQPushConsumer* _v8_consumer = ObjectWrap::Unwrap<RocketMQPushConsumer>(info.Holder()); \
-    CPushConsumer* consumer_ptr = _v8_consumer->GetConsumer();
+#include <napi.h>
 
-Nan::Persistent<Function> RocketMQPushConsumer::constructor;
+#include <ClientRPCHook.h>
+#include <LoggerConfig.h>
+#include <MQMessageListener.h>
 
-RocketMQPushConsumer::RocketMQPushConsumer(const char* group_id, const char* instance_name) :
-    consumer_ptr(NULL)
-{
-    consumer_ptr = CreatePushConsumer(group_id);
+#include "consumer_ack.h"
 
-    if(instance_name)
-    {
-        SetPushConsumerInstanceName(consumer_ptr, instance_name);
-    }
+using namespace std;
 
-    _push_consumer_map[consumer_ptr] = this;
+namespace __node_rocketmq__ {
 
-    RegisterMessageCallback(consumer_ptr, RocketMQPushConsumer::OnMessage);
+Napi::Object RocketMQPushConsumer::Init(Napi::Env env, Napi::Object exports) {
+  Napi::Function func = DefineClass(
+      env,
+      "RocketMQPushConsumer",
+      {
+          InstanceMethod<&RocketMQPushConsumer::Start>("start"),
+          InstanceMethod<&RocketMQPushConsumer::Shutdown>("shutdown"),
+          InstanceMethod<&RocketMQPushConsumer::Subscribe>("subscribe"),
+          InstanceMethod<&RocketMQPushConsumer::SetListener>("setListener"),
+          InstanceMethod<&RocketMQPushConsumer::SetSessionCredentials>(
+              "setSessionCredentials"),
+      });
+
+  Napi::FunctionReference* constructor = new Napi::FunctionReference();
+  *constructor = Napi::Persistent(func);
+  env.SetInstanceData<Napi::FunctionReference>(constructor);
+
+  exports.Set("PushConsumer", func);
+  return exports;
 }
 
-RocketMQPushConsumer::~RocketMQPushConsumer()
-{
-    try
-    {
-        ShutdownPushConsumer(consumer_ptr);
-        auto it = _push_consumer_map.find(consumer_ptr);
-        if(it != _push_consumer_map.end())
-        {
-            _push_consumer_map.erase(consumer_ptr);
-        }
-    }
-    catch(...)
-    {
-        //
-    }
+RocketMQPushConsumer::RocketMQPushConsumer(const Napi::CallbackInfo& info)
+    : Napi::ObjectWrap<RocketMQPushConsumer>(info), consumer_("") {
+  const Napi::Value group_name = info[0];
+  if (group_name.IsString()) {
+    consumer_.set_group_name(group_name.ToString());
+  }
 
-    DestroyPushConsumer(consumer_ptr);
-    consumer_ptr = NULL;
-}
-
-void RocketMQPushConsumer::SetOptions(Local<Object> options)
-{
-    // set name server
-    Local<Value> _name_server_v = Nan::Get(options, Nan::New<String>("nameServer").ToLocalChecked()).ToLocalChecked();
-    if(_name_server_v->IsString())
-    {
-        Nan::Utf8String namesrv(_name_server_v);
-        SetPushConsumerNameServerAddress(consumer_ptr, *namesrv);
-    }
+  const Napi::Value instance_name = info[1];
+  if (instance_name.IsString()) {
+    consumer_.set_instance_name(instance_name.ToString());
+  }
 
-    // set thread count
-    Local<Value> _thread_count_v = Nan::Get(options, Nan::New<String>("threadCount").ToLocalChecked()).ToLocalChecked();
-    if(_thread_count_v->IsNumber())
-    {
-        int thread_count = Nan::To<int32_t>(_thread_count_v).FromJust();
-        if(thread_count > 0)
-        {
-            SetPushConsumerThreadCount(consumer_ptr, thread_count);
-        }
-    }
-
-    // set message batch max size
-    Local<Value> _max_batch_size_v = Nan::Get(options, Nan::New<String>("maxBatchSize").ToLocalChecked()).ToLocalChecked();
-    if(_max_batch_size_v->IsNumber())
-    {
-        int max_batch_size = Nan::To<int32_t>(_max_batch_size_v).FromJust();
-        if(max_batch_size > 0)
-        {
-            SetPushConsumerMessageBatchMaxSize(consumer_ptr, max_batch_size);
-        }
-    }
-
-    // set log num & single log size
-    int file_num = 3;
-    int64 file_size = 104857600;
-    Local<Value> _log_file_num_v = Nan::Get(options, Nan::New<String>("logFileNum").ToLocalChecked()).ToLocalChecked();
-    Local<Value> _log_file_size_v = Nan::Get(options, Nan::New<String>("logFileSize").ToLocalChecked()).ToLocalChecked();
-    if(_log_file_num_v->IsNumber())
-    {
-        file_num = _log_file_num_v->Int32Value();
-    }
-    if(_log_file_size_v->IsNumber())
-    {
-        file_size = _log_file_size_v->Int32Value();
-    }
-    SetPushConsumerLogFileNumAndSize(consumer_ptr, file_num, file_size);
-
-    // set log level
-    Local<Value> _log_level_v = Nan::Get(options, Nan::New<String>("logLevel").ToLocalChecked()).ToLocalChecked();
-    if(_log_level_v->IsNumber())
-    {
-        int level = _log_level_v->Int32Value();
-        SetPushConsumerLogLevel(consumer_ptr, (CLogLevel) level);
-    }
+  const Napi::Value options = info[2];
+  if (options.IsObject()) {
+    // try to set options
+    SetOptions(options.ToObject());
+  }
 }
 
-NAN_MODULE_INIT(RocketMQPushConsumer::Init)
-{
-    uv_mutex_init(&_get_msg_ext_column_lock);
-    Local<FunctionTemplate> tpl = Nan::New<FunctionTemplate>(New);
-    tpl->SetClassName(Nan::New("RocketMQPushConsumer").ToLocalChecked());
-    tpl->InstanceTemplate()->SetInternalFieldCount(1);
-
-    Nan::SetPrototypeMethod(tpl, "start", Start);
-    Nan::SetPrototypeMethod(tpl, "shutdown", Shutdown);
-    Nan::SetPrototypeMethod(tpl, "subscribe", Subscribe);
-    Nan::SetPrototypeMethod(tpl, "setListener", SetListener);
-    Nan::SetPrototypeMethod(tpl, "setSessionCredentials", SetSessionCredentials);
-
-    constructor.Reset(tpl->GetFunction());
-    Nan::Set(target, Nan::New("PushConsumer").ToLocalChecked(), tpl->GetFunction());
+RocketMQPushConsumer::~RocketMQPushConsumer() {
+  consumer_.shutdown();
 }
 
-NAN_METHOD(RocketMQPushConsumer::New)
-{
-    Isolate* isolate = info.GetIsolate();
-    Local<Context> context = Context::New(isolate);
-
-    if(!info.IsConstructCall())
-    {
-        const int argc = 3;
-        Local<Value> argv[argc] = { info[0], info[1], info[2] };
-        Local<Function> _constructor = Nan::New<v8::Function>(constructor);
-        info.GetReturnValue().Set(_constructor->NewInstance(context, argc, argv).ToLocalChecked());
-        return;
-    }
-
-    Nan::Utf8String v8_group_id(info[0]);
-    Nan::Utf8String v8_instance_name(info[1]);
-    string group_id = *v8_group_id;
-    string instance_name = *v8_instance_name;
-    Local<Object> options = Nan::To<Object>(info[2]).ToLocalChecked();
-    RocketMQPushConsumer* consumer = new RocketMQPushConsumer(group_id.c_str(), info[1]->IsNull() ? NULL : instance_name.c_str());
-
-    consumer->Wrap(info.This());
-
-    // try to set options
-    try
-    {
-        consumer->SetOptions(options);
-    }
-    catch(const runtime_error e)
-    {
-        Nan::ThrowError(e.what());
-        return;
-    }
-    catch(const std::exception& e)
-    {
-        Nan::ThrowError(e.what());
-        return;
+void RocketMQPushConsumer::SetOptions(const Napi::Object& options) {
+  // set name server
+  Napi::Value name_server = options.Get("nameServer");
+  if (name_server.IsString()) {
+    consumer_.set_namesrv_addr(name_server.ToString());
+  }
+
+  // set group name
+  Napi::Value group_name = options.Get("groupName");
+  if (group_name.IsString()) {
+    consumer_.set_group_name(group_name.ToString());
+  }
+
+  // set thread count
+  Napi::Value thread_count = options.Get("threadCount");
+  if (thread_count.IsNumber()) {
+    consumer_.set_consume_thread_nums(thread_count.ToNumber());
+  }
+
+  // set message batch max size
+  Napi::Value max_batch_size = options.Get("maxBatchSize");
+  if (max_batch_size.IsNumber()) {
+    consumer_.set_consume_message_batch_max_size(max_batch_size.ToNumber());
+  }
+
+  // set log level
+  Napi::Value log_level = options.Get("logLevel");
+  if (log_level.IsNumber()) {
+    int32_t level = log_level.ToNumber();
+    if (level >= 0 && level < rocketmq::LogLevel::LOG_LEVEL_LEVEL_NUM) {
+      rocketmq::GetDefaultLoggerConfig().set_level(
+          static_cast<rocketmq::LogLevel>(level));
     }
-
-    info.GetReturnValue().Set(info.This());
-}
-
-NAN_METHOD(RocketMQPushConsumer::Start)
-{
-    NAN_GET_CPUSHCONSUMER();
-
-    Nan::Callback* callback = (info[0]->IsFunction()) ?
-        new Nan::Callback(Nan::To<Function>(info[0]).ToLocalChecked()) :
-        NULL;
-
-    Nan::AsyncQueueWorker(new PushConsumerStartOrShutdownWorker(callback, consumer_ptr, PushConsumerWorkerType::START_PUSH_CONSUMER));
+  }
+
+  // set log directory
+  Napi::Value log_dir = options.Get("logDir");
+  if (log_dir.IsString()) {
+    rocketmq::GetDefaultLoggerConfig().set_path(log_dir.ToString());
+  }
+
+  // set log file size
+  Napi::Value log_file_size = options.Get("logFileSize");
+  if (log_file_size.IsNumber()) {
+    rocketmq::GetDefaultLoggerConfig().set_file_count(log_file_size.ToNumber());
+  }
+
+  // set log file num
+  Napi::Value log_file_num = options.Get("logFileNum");
+  if (log_file_num.IsNumber()) {
+    rocketmq::GetDefaultLoggerConfig().set_file_count(log_file_num.ToNumber());
+  }
 }
 
-NAN_METHOD(RocketMQPushConsumer::Shutdown)
-{
-    NAN_GET_CPUSHCONSUMER();
+Napi::Value RocketMQPushConsumer::SetSessionCredentials(
+    const Napi::CallbackInfo& info) {
+  Napi::String access_key = info[0].As<Napi::String>();
+  Napi::String secret_key = info[1].As<Napi::String>();
+  Napi::String ons_channel = info[2].As<Napi::String>();
 
-    Nan::Callback* callback = (info[0]->IsFunction()) ?
-        new Nan::Callback(Nan::To<Function>(info[0]).ToLocalChecked()) :
-        NULL;
+  auto rpc_hook = std::make_shared<rocketmq::ClientRPCHook>(
+      rocketmq::SessionCredentials(access_key, secret_key, ons_channel));
+  consumer_.setRPCHook(rpc_hook);
 
-    Nan::AsyncQueueWorker(new PushConsumerStartOrShutdownWorker(callback, consumer_ptr, PushConsumerWorkerType::SHUTDOWN_PUSH_CONSUMER));
+  return info.Env().Undefined();
 }
 
-NAN_METHOD(RocketMQPushConsumer::Subscribe)
-{
-    NAN_GET_CPUSHCONSUMER();
+class ConsumerStartWorker : public Napi::AsyncWorker {
+ public:
+  ConsumerStartWorker(const Napi::Function& callback,
+                      const rocketmq::DefaultMQPushConsumer& consumer)
+      : Napi::AsyncWorker(callback), consumer_(consumer) {}
 
-    Nan::Utf8String v8_topic(info[0]);
-    Nan::Utf8String v8_expression(info[1]);
-    string topic = *v8_topic;
-    string expression = *v8_expression;
+  void Execute() override { consumer_.start(); }
 
-    int ret;
-    try
-    {
-        ret = ::Subscribe(consumer_ptr, topic.c_str(), expression.c_str());
-    }
-    catch(const runtime_error e)
-    {
-        Nan::ThrowError(e.what());
-        return;
-    }
-    catch(const std::exception& e)
-    {
-        Nan::ThrowError(e.what());
-        return;
-    }
-
-    info.GetReturnValue().Set(ret);
-}
-
-NAN_METHOD(RocketMQPushConsumer::SetListener)
-{
-    RocketMQPushConsumer* consumer = ObjectWrap::Unwrap<RocketMQPushConsumer>(info.Holder());
-    if(!consumer->listener_func.IsEmpty())
-    {
-        consumer->listener_func.Reset();
-    }
+ private:
+  rocketmq::DefaultMQPushConsumer consumer_;
+};
 
-    consumer->listener_func.Reset(Nan::To<Function>(info[0]).ToLocalChecked());
+Napi::Value RocketMQPushConsumer::Start(const Napi::CallbackInfo& info) {
+  Napi::Function callback = info[0].As<Napi::Function>();
+  auto* worker = new ConsumerStartWorker(callback, consumer_);
+  worker->Queue();
+  return info.Env().Undefined();
 }
 
-NAN_METHOD(RocketMQPushConsumer::SetSessionCredentials)
-{
-    NAN_GET_CPUSHCONSUMER();
+class ConsumerShutdownWorker : public Napi::AsyncWorker {
+ public:
+  ConsumerShutdownWorker(const Napi::Function& callback,
+                         const rocketmq::DefaultMQPushConsumer& consumer)
+      : Napi::AsyncWorker(callback), consumer_(consumer) {}
 
-    Nan::Utf8String access_key(info[0]);
-    Nan::Utf8String secret_key(info[1]);
-    Nan::Utf8String ons_channel(info[2]);
+  void Execute() override { consumer_.shutdown(); }
 
-    int ret;
-    try
-    {
-        ret = SetPushConsumerSessionCredentials(consumer_ptr, *access_key, *secret_key, *ons_channel);
-    }
-    catch(const runtime_error e)
-    {
-        Nan::ThrowError(e.what());
-        return;
-    }
-    catch(const std::exception& e)
-    {
-        Nan::ThrowError(e.what());
-        return;
-    }
+ private:
+  rocketmq::DefaultMQPushConsumer consumer_;
+};
 
-    info.GetReturnValue().Set(ret);
+Napi::Value RocketMQPushConsumer::Shutdown(const Napi::CallbackInfo& info) {
+  Napi::Function callback = info[0].As<Napi::Function>();
+  auto* worker = new ConsumerShutdownWorker(callback, consumer_);
+  worker->Queue();
+  return info.Env().Undefined();
 }
 
-string RocketMQPushConsumer::GetMessageColumn(char* name, CMessageExt* msg)
-{
-    const char* orig = NULL;
-
-    uv_mutex_lock(&_get_msg_ext_column_lock);
-    switch(name[0])
-    {
-    // topic / tags
-    case 't':
-        orig = name[1] == 'o' ? GetMessageTopic(msg) : GetMessageTags(msg);
-        break;
-
-    // keys
-    case 'k':
-        orig = GetMessageKeys(msg);
-        break;
-
-    // body
-    case 'b':
-        orig = GetMessageBody(msg);
-        break;
-
-    // msgId
-    case 'm':
-        orig = GetMessageId(msg);
-        break;
-
-    default:
-        orig = NULL;
-        break;
-    }
+Napi::Value RocketMQPushConsumer::Subscribe(const Napi::CallbackInfo& info) {
+  Napi::String topic = info[0].As<Napi::String>();
+  Napi::String expression = info[1].As<Napi::String>();
 
-    uv_mutex_unlock(&_get_msg_ext_column_lock);
+  consumer_.subscribe(topic, expression);
 
-    if(!orig) return "";
-    return orig;
+  return info.Env().Undefined();
 }
 
-void close_async_done(uv_handle_t* handle)
-{
-    free(handle);
-}
-
-void RocketMQPushConsumer::HandleMessageInEventLoop(uv_async_t* async)
-{
-    Nan::HandleScope scope;
-
-    Isolate* isolate = Isolate::GetCurrent();
-    Local<Context> context = isolate->GetCurrentContext();
-
-    MessageHandlerParam* param = (MessageHandlerParam*)(async->data);
-    RocketMQPushConsumer* consumer = param->consumer;
-    ConsumerAckInner* ack_inner = param->ack;
-    CMessageExt* msg = param->msg;
-
-    // create the JavaScript ack object and then set inner ack object
-    Local<Function> cons = Nan::New<Function>(ConsumerAck::GetConstructor());
-    Local<Object> ack_obj = cons->NewInstance(context, 0, 0).ToLocalChecked();
-    ConsumerAck* ack = ObjectWrap::Unwrap<ConsumerAck>(ack_obj);
-    ack->SetInner(ack_inner);
-
-    // TODO: const char *GetMessageProperty(CMessageExt *msgExt, const char *key);
-    Local<Object> result = Nan::New<Object>();
-    for(int i = 0; i < 5; i++)
-    {
-        Nan::Set(
-            result,
-            Nan::New(message_handler_param_keys[i]).ToLocalChecked(),
-            Nan::New(RocketMQPushConsumer::GetMessageColumn(message_handler_param_keys[i], msg)).ToLocalChecked());
+class ConsumerMessageListener : public rocketmq::MessageListenerConcurrently {
+ private:
+  struct MessageAndPromise {
+    rocketmq::MQMessageExt message;
+    std::promise<bool> promise;
+  };
+
+ public:
+  ConsumerMessageListener(Napi::Env& env, Napi::Function&& callback)
+      : listener_(
+            Listener::New(env, callback, "RocketMQ Message Listener", 0, 1)) {}
+
+  ~ConsumerMessageListener() { listener_.Release(); }
+
+  rocketmq::ConsumeStatus consumeMessage(
+      std::vector<rocketmq::MQMessageExt>& msgs) override {
+    for (auto& msg : msgs) {
+      MessageAndPromise data{msg, std::promise<bool>()};
+      auto future = data.promise.get_future();
+      listener_.BlockingCall(&data);
+      try {
+        if (!future.get()) {
+          return rocketmq::ConsumeStatus::RECONSUME_LATER;
+        }
+      } catch (const std::exception& e) {
+        return rocketmq::ConsumeStatus::RECONSUME_LATER;
+      }
     }
-
-    Local<Value> argv[2] = {
-        result,
-        ack_obj
-    };
-    Nan::Callback* callback = consumer->GetListenerFunction();
-    callback->Call(2, argv);
-
-    uv_close((uv_handle_t*)async, close_async_done);
-}
-
-int RocketMQPushConsumer::OnMessage(CPushConsumer* consumer_ptr, CMessageExt* msg_ext)
-{
-    RocketMQPushConsumer* consumer = _push_consumer_map[consumer_ptr];
-    if (!consumer)
-    {
-        // TODO: error handle
-        return CConsumeStatus::E_RECONSUME_LATER;
+    return rocketmq::ConsumeStatus::CONSUME_SUCCESS;
+  };
+
+  static void CallJs(Napi::Env env,
+                     Napi::Function listener,
+                     std::nullptr_t*,
+                     MessageAndPromise* data) {
+    if (env != nullptr) {
+      if (listener != nullptr) {
+        Napi::Object message = Napi::Object::New(env);
+        message.Set("topic", data->message.topic());
+        message.Set("tags", data->message.tags());
+        message.Set("keys", data->message.keys());
+        message.Set("body", data->message.body());
+        message.Set("msgId", data->message.msg_id());
+
+        Napi::Object ack = ConsumerAck::NewInstance(env);
+        ConsumerAck* consumer_ack = Napi::ObjectWrap<ConsumerAck>::Unwrap(ack);
+        consumer_ack->SetPromise(std::move(data->promise));
+
+        try {
+          listener.Call(Napi::Object::New(listener.Env()), {message, ack});
+        } catch (const Napi::Error& e) {
+          try {
+            consumer_ack->Done(std::current_exception());
+          } catch (const std::future_error&) {
+            // ignore
+          }
+        }
+        return;
+      }
     }
+    data->promise.set_value(false);
+  }
 
-    ConsumerAckInner ack_inner;
-
-    // create async parameter
-    MessageHandlerParam param;
-    param.consumer = consumer;
-    param.ack = &ack_inner;
-    param.msg = msg_ext;
+ private:
+  using Listener =
+      Napi::TypedThreadSafeFunction<std::nullptr_t,
+                                    MessageAndPromise,
+                                    &ConsumerMessageListener::CallJs>;
 
-    // create a new async handler and bind with `RocketMQPushConsumer::HandleMessageInEventLoop`
-    uv_async_t* async = (uv_async_t*)malloc(sizeof(uv_async_t));
-    uv_async_init(uv_default_loop(), async, RocketMQPushConsumer::HandleMessageInEventLoop);
-    async->data = (void*)(&param);
-
-    // send async handler
-    uv_async_send(async);
-
-    // wait for result
-    CConsumeStatus status = ack_inner.WaitResult();
+  Listener listener_;
+};
 
-    return status;
+Napi::Value RocketMQPushConsumer::SetListener(const Napi::CallbackInfo& info) {
+  Napi::Env env = info.Env();
+  listener_.reset(
+      new ConsumerMessageListener(env, info[0].As<Napi::Function>()));
+  consumer_.registerMessageListener(listener_.get());
+  return env.Undefined();
 }
 
-}
+}  // namespace __node_rocketmq__
diff --git a/src/push_consumer.h b/src/push_consumer.h
index 6250d4e..47b3d14 100644
--- a/src/push_consumer.h
+++ b/src/push_consumer.h
@@ -17,62 +17,40 @@
 #ifndef __ROCKETMQ_PUSH_CONSUMER_H__
 #define __ROCKETMQ_PUSH_CONSUMER_H__
 
-#include <CPushConsumer.h>
-#include <uv.h>
-#include <nan.h>
 #include <string>
 
-namespace __node_rocketmq__ {
+#include <napi.h>
+
+#include <DefaultMQPushConsumer.h>
 
-using v8::Context;
-using v8::Function;
-using v8::FunctionTemplate;
-using v8::Isolate;
-using v8::Local;
-using v8::Object;
-using v8::String;
-using v8::Value;
+namespace __node_rocketmq__ {
 
-class RocketMQPushConsumer : public Nan::ObjectWrap {
-public:
-    static NAN_MODULE_INIT(Init);
-    static int OnMessage(CPushConsumer* consumer_ptr, CMessageExt* msg_ext);
-    static std::string GetMessageColumn(char* name, CMessageExt* msg);
+class ConsumerMessageListener;
 
-private:
-    explicit RocketMQPushConsumer(const char* group_id, const char* instance_name);
-    ~RocketMQPushConsumer();
+class RocketMQPushConsumer : public Napi::ObjectWrap<RocketMQPushConsumer> {
+ public:
+  static Napi::Object Init(Napi::Env env, Napi::Object exports);
 
-    static NAN_METHOD(New);
-    static NAN_METHOD(Start);
-    static NAN_METHOD(Shutdown);
-    static NAN_METHOD(Subscribe);
-    static NAN_METHOD(SetListener);
-    static NAN_METHOD(SetSessionCredentials);
+  RocketMQPushConsumer(const Napi::CallbackInfo& info);
+  ~RocketMQPushConsumer();
 
-    static Nan::Persistent<v8::Function> constructor;
+ private:
+  Napi::Value SetSessionCredentials(const Napi::CallbackInfo& info);
 
-    void SetOptions(Local<Object>);
-    static void HandleMessageInEventLoop(uv_async_t* async);
+  Napi::Value Start(const Napi::CallbackInfo& info);
+  Napi::Value Shutdown(const Napi::CallbackInfo& info);
 
-protected:
-    CPushConsumer* GetConsumer()
-    {
-        return consumer_ptr;
-    }
+  Napi::Value Subscribe(const Napi::CallbackInfo& info);
+  Napi::Value SetListener(const Napi::CallbackInfo& info);
 
-    Nan::Callback* GetListenerFunction()
-    {
-        Nan::Callback* cb;
-        cb = &listener_func;
-        return cb;
-    }
+ private:
+  void SetOptions(const Napi::Object& options);
 
-private:
-    CPushConsumer* consumer_ptr;
-    Nan::Callback listener_func;
+ private:
+  rocketmq::DefaultMQPushConsumer consumer_;
+  std::unique_ptr<ConsumerMessageListener> listener_;
 };
 
-}
+}  // namespace __node_rocketmq__
 
 #endif
diff --git a/src/rocketmq.cpp b/src/rocketmq.cpp
index 3b426fb..63fd41f 100644
--- a/src/rocketmq.cpp
+++ b/src/rocketmq.cpp
@@ -14,35 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include <nan.h>
+#include <napi.h>
 
+#include "consumer_ack.h"
 #include "producer.h"
 #include "push_consumer.h"
-#include "consumer_ack.h"
 
 namespace __node_rocketmq__ {
 
-#if defined(__APPLE__)
-uv_lib_t lib;
-
-NAN_METHOD(DLOpen)
-{
-    Nan::Utf8String filename(info[0]);
-    uv_dlopen(*filename, &lib);
+Napi::Object Init(Napi::Env env, Napi::Object exports) {
+  RocketMQProducer::Init(env, exports);
+  RocketMQPushConsumer::Init(env, exports);
+  ConsumerAck::Init(env, exports);
+  return exports;
 }
-#endif
 
-NAN_MODULE_INIT(Init)
-{
-    RocketMQProducer::Init(target);
-    RocketMQPushConsumer::Init(target);
-    ConsumerAck::Init(target);
+NODE_API_MODULE(rocketmq, Init)
 
-#if defined(__APPLE__)
-    Nan::Set(target, Nan::New("macosDLOpen").ToLocalChecked(), Nan::New<v8::FunctionTemplate>(DLOpen)->GetFunction());
-#endif
-}
-
-NODE_MODULE(rocketmq, Init)
-
-}
+}  // namespace __node_rocketmq__
diff --git a/src/workers/producer/send_message.h b/src/workers/producer/send_message.h
deleted file mode 100644
index 6974e5a..0000000
--- a/src/workers/producer/send_message.h
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __ROCKETMQ_SEND_MESSAGE_H__
-#define __ROCKETMQ_SEND_MESSAGE_H__
-
-#include <nan.h>
-#include <CProducer.h>
-#include <MQClientException.h>
-
-namespace __node_rocketmq__ {
-
-using namespace std;
-
-class ProducerSendMessageWorker : public Nan::AsyncWorker {
-public:
-    ProducerSendMessageWorker(Nan::Callback* callback, RocketMQProducer* producer, CMessage* msg) :
-        AsyncWorker(callback),
-        msg(msg),
-        producer(producer)
-    {
-    }
-
-    ~ProducerSendMessageWorker()
-    {
-        DestroyMessage(msg);
-    }
-
-    void Execute()
-    {
-        try
-        {
-            SendMessageSync(producer->GetProducer(), msg, &send_ret);
-        }
-        catch(const runtime_error e)
-        {
-            SetErrorMessage(e.what());
-        }
-        catch(const std::exception& e)
-        {
-            SetErrorMessage(e.what());
-        }
-    }
-
-    void HandleOKCallback()
-    {
-        Nan::HandleScope scope;
-
-        Local<Value> argv[] = {
-            Nan::Undefined(),
-            Nan::New<v8::Number>((unsigned int)send_ret.sendStatus),
-            Nan::New<v8::String>(send_ret.msgId).ToLocalChecked(),
-            Nan::New<v8::Number>((long long)send_ret.offset)
-        };
-        callback->Call(4, argv);
-    }
-
-private:
-    CMessage* msg;
-    RocketMQProducer* producer;
-
-    CSendResult send_ret;
-};
-
-}
-
-#endif
diff --git a/src/workers/producer/start_or_shutdown.h b/src/workers/producer/start_or_shutdown.h
deleted file mode 100644
index dde99a6..0000000
--- a/src/workers/producer/start_or_shutdown.h
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __ROCKETMQ_PRODUCER_START_OR_SHUTDOWN_H__
-#define __ROCKETMQ_PRODUCER_START_OR_SHUTDOWN_H__
-
-#include <nan.h>
-#include <CProducer.h>
-#include <MQClientException.h>
-
-namespace __node_rocketmq__ {
-
-using namespace std;
-
-enum ProducerWorkerType {
-    START_PRODUCER = 0,
-    SHUTDOWN_PRODUCER
-};
-
-class ProducerStartOrShutdownWorker : public Nan::AsyncWorker {
-public:
-    ProducerStartOrShutdownWorker(Nan::Callback* callback, CProducer* producer_ptr, ProducerWorkerType type) :
-        AsyncWorker(callback),
-        producer_ptr(producer_ptr),
-        ret(0),
-        type(type)
-    {
-    }
-
-    ~ProducerStartOrShutdownWorker()
-    {
-    }
-
-    void Execute()
-    {
-        try
-        {
-            switch(type) {
-            case START_PRODUCER:
-                ret = StartProducer(producer_ptr); break;
-            case SHUTDOWN_PRODUCER:
-                ret = ShutdownProducer(producer_ptr); break;
-            default: break;
-            }
-        }
-        catch(const runtime_error e)
-        {
-            SetErrorMessage(e.what());
-        }
-        catch(const exception& e)
-        {
-            SetErrorMessage(e.what());
-        }
-    }
-
-    void HandleOKCallback()
-    {
-        Nan::HandleScope scope;
-
-        Local<Value> argv[] = {
-            Nan::Undefined(),
-            Nan::New<v8::Number>((int)ret),
-        };
-        callback->Call(2, argv);
-    }
-
-private:
-    CProducer* producer_ptr;
-    int ret;
-    ProducerWorkerType type;
-};
-
-}
-
-#endif
diff --git a/src/workers/push_consumer/start_or_shutdown.h b/src/workers/push_consumer/start_or_shutdown.h
deleted file mode 100644
index 1169059..0000000
--- a/src/workers/push_consumer/start_or_shutdown.h
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __ROCKETMQ_PUSH_CONSUMER_START_OR_SHUTDOWN_H__
-#define __ROCKETMQ_PUSH_CONSUMER_START_OR_SHUTDOWN_H__
-
-#include <nan.h>
-#include <CPushConsumer.h>
-#include <MQClientException.h>
-
-namespace __node_rocketmq__ {
-
-using namespace std;
-
-enum PushConsumerWorkerType {
-    START_PUSH_CONSUMER = 0,
-    SHUTDOWN_PUSH_CONSUMER
-};
-
-class PushConsumerStartOrShutdownWorker : public Nan::AsyncWorker {
-public:
-    PushConsumerStartOrShutdownWorker(Nan::Callback* callback, CPushConsumer* consumer_ptr, PushConsumerWorkerType type) :
-        AsyncWorker(callback),
-        consumer_ptr(consumer_ptr),
-        ret(0),
-        type(type)
-    {
-    }
-
-    ~PushConsumerStartOrShutdownWorker()
-    {
-    }
-
-    void Execute()
-    {
-        try
-        {
-            switch(type) {
-            case START_PUSH_CONSUMER:
-                ret = StartPushConsumer(consumer_ptr); break;
-            case SHUTDOWN_PUSH_CONSUMER:
-                ret = ShutdownPushConsumer(consumer_ptr); break;
-            default: break;
-            }
-        }
-        catch(const runtime_error e)
-        {
-            SetErrorMessage(e.what());
-        }
-        catch(const exception& e)
-        {
-            SetErrorMessage(e.what());
-        }
-    }
-
-    void HandleOKCallback()
-    {
-        Nan::HandleScope scope;
-
-        Local<Value> argv[] = {
-            Nan::Undefined(),
-            Nan::New<v8::Number>((int)ret),
-        };
-        callback->Call(2, argv);
-    }
-
-private:
-    CPushConsumer* consumer_ptr;
-    int ret;
-    PushConsumerWorkerType type;
-};
-
-}
-
-#endif