You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2018/12/27 07:18:40 UTC

[GitHub] ShannonDing closed pull request #1: Refactor: first demo for developing this SDK

ShannonDing closed pull request #1: Refactor: first demo for developing this SDK
URL: https://github.com/apache/rocketmq-client-nodejs/pull/1
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/.eslintrc b/.eslintrc
new file mode 100644
index 0000000..4601220
--- /dev/null
+++ b/.eslintrc
@@ -0,0 +1,5 @@
+{
+    "extends": [
+      "rocketmq-style"
+    ]
+}
diff --git a/.github/ISSUE_TEMPLATE.md b/.github/ISSUE_TEMPLATE.md
new file mode 100644
index 0000000..1c4f079
--- /dev/null
+++ b/.github/ISSUE_TEMPLATE.md
@@ -0,0 +1,20 @@
+<!--
+Thank you for reporting an issue.
+
+This issue tracker is for bugs and issues found within RocketMQ Node.js SDK.
+
+Please fill in as much of the template below as you're able.
+
+Version:
+Platform: output of `uname -a` (UNIX), or version and 32 or 64-bit (Windows)
+Subsystem: if known, please specify affected core module name
+
+If possible, please provide code that demonstrates the problem, keeping it as
+simple and free of external dependencies as you are able.
+-->
+
+* **Version**:
+* **Platform**:
+* **Subsystem**:
+
+<!-- Enter your issue details below this comment. -->
diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md
new file mode 100644
index 0000000..c2f77b7
--- /dev/null
+++ b/.github/PULL_REQUEST_TEMPLATE.md
@@ -0,0 +1,16 @@
+<!--
+Thank you for your pull request. Please provide a description above and review
+the requirements below.
+
+Bug fixes and new features should include tests and possibly benchmarks.
+
+Contributors guide: https://github.com/apache/rocketmq-client-nodejs/blob/master/CONTRIBUTING.md
+-->
+
+##### Checklist
+<!-- Remove items that do not apply. For completed items, change [ ] to [x]. -->
+
+- [ ] `npm test` passes
+- [ ] tests and/or benchmarks are included
+- [ ] documentation is changed or added
+- [ ] commit message follows [commit guidelines](https://github.com/apache/rocketmq-client-nodejs/blob/master/PULL_REQUEST.md#commit-message-guidelines)
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..c9d16aa
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,188 @@
+# Created by https://www.gitignore.io/api/c++,node,linux,macos,windows
+# Edit at https://www.gitignore.io/?templates=c++,node,linux,macos,windows
+
+### C++ ###
+# Prerequisites
+*.d
+
+# Compiled Object files
+*.slo
+*.lo
+*.o
+*.obj
+
+# Precompiled Headers
+*.gch
+*.pch
+
+# Compiled Dynamic libraries
+*.so
+*.dylib
+*.dll
+
+# Fortran module files
+*.mod
+*.smod
+
+# Compiled Static libraries
+*.lai
+*.la
+*.a
+*.lib
+
+# Executables
+*.exe
+*.out
+*.app
+
+### Linux ###
+*~
+
+# temporary files which can be created if a process still has a handle open of a deleted file
+.fuse_hidden*
+
+# KDE directory preferences
+.directory
+
+# Linux trash folder which might appear on any partition or disk
+.Trash-*
+
+# .nfs files are created when an open file is removed but is still being accessed
+.nfs*
+
+### macOS ###
+# General
+.DS_Store
+.AppleDouble
+.LSOverride
+
+# Icon must end with two \r
+Icon
+
+# Thumbnails
+._*
+
+# Files that might appear in the root of a volume
+.DocumentRevisions-V100
+.fseventsd
+.Spotlight-V100
+.TemporaryItems
+.Trashes
+.VolumeIcon.icns
+.com.apple.timemachine.donotpresent
+
+# Directories potentially created on remote AFP share
+.AppleDB
+.AppleDesktop
+Network Trash Folder
+Temporary Items
+.apdisk
+
+### Node ###
+# Logs
+logs
+*.log
+npm-debug.log*
+yarn-debug.log*
+yarn-error.log*
+
+# Runtime data
+pids
+*.pid
+*.seed
+*.pid.lock
+
+# Directory for instrumented libs generated by jscoverage/JSCover
+lib-cov
+
+# Coverage directory used by tools like istanbul
+coverage
+
+# nyc test coverage
+.nyc_output
+
+# Grunt intermediate storage (https://gruntjs.com/creating-plugins#storing-task-files)
+.grunt
+
+# Bower dependency directory (https://bower.io/)
+bower_components
+
+# node-waf configuration
+.lock-wscript
+
+# Compiled binary addons (https://nodejs.org/api/addons.html)
+build/Release
+
+# Dependency directories
+node_modules/
+jspm_packages/
+
+# TypeScript v1 declaration files
+typings/
+
+# Optional npm cache directory
+.npm
+
+# Optional eslint cache
+.eslintcache
+
+# Optional REPL history
+.node_repl_history
+
+# Output of 'npm pack'
+*.tgz
+
+# Yarn Integrity file
+.yarn-integrity
+
+# dotenv environment variables file
+.env
+
+# parcel-bundler cache (https://parceljs.org/)
+.cache
+
+# next.js build output
+.next
+
+# nuxt.js build output
+.nuxt
+
+# vuepress build output
+.vuepress/dist
+
+# Serverless directories
+.serverless
+
+# FuseBox cache
+.fusebox/
+
+### Windows ###
+# Windows thumbnail cache files
+Thumbs.db
+ehthumbs.db
+ehthumbs_vista.db
+
+# Dump file
+*.stackdump
+
+# Folder config file
+[Dd]esktop.ini
+
+# Recycle Bin used on file shares
+$RECYCLE.BIN/
+
+# Windows Installer files
+*.cab
+*.msi
+*.msix
+*.msm
+*.msp
+
+# Windows shortcuts
+*.lnk
+
+# End of https://www.gitignore.io/api/c++,node,linux,macos,windows
+
+build
+package-lock.json
+.vscode
diff --git a/.gitmodules b/.gitmodules
new file mode 100644
index 0000000..c9fc461
--- /dev/null
+++ b/.gitmodules
@@ -0,0 +1,3 @@
+[submodule "deps/rocketmq"]
+path = deps/rocketmq
+url = https://github.com/apache/rocketmq-client-cpp.git
diff --git a/.travis.yml b/.travis.yml
new file mode 100644
index 0000000..3ced58e
--- /dev/null
+++ b/.travis.yml
@@ -0,0 +1,39 @@
+language: node_js
+
+node_js: 10
+
+before_script:
+  - wget http://us.mirrors.quenda.co/apache/rocketmq/4.3.2/rocketmq-all-4.3.2-bin-release.zip
+  - unzip rocketmq-all-4.3.2-bin-release.zip
+  - cd rocketmq-all-4.3.2-bin-release
+  - perl -i -pe's/-Xms8g -Xmx8g -Xmn4g/-Xms2g -Xmx2g -Xmn1g/g' bin/runbroker.sh
+  - nohup sh bin/mqnamesrv &
+  - nohup sh bin/mqbroker -n localhost:9876 &
+  - sleep 10
+  - ./bin/mqadmin updateTopic -b '127.0.0.1:10911' –n '127.0.0.1:9876' -t test
+  - ./bin/mqadmin updateSubGroup -b '127.0.0.1:10911' –n '127.0.0.1:9876' -g testGroup
+
+script:
+  - npm test
+
+matrix:
+  include:
+    - os: linux
+      dist: trusty
+    - os: linux
+      dist: xenial
+      jdk: openjdk8
+      env:
+        JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-amd64
+      apt:
+        packages:
+          - openjdk8
+    - os: windows
+    - os: osx
+      osx_image: xcode9.3
+      env:
+        JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_112.jdk/Contents/Home/jre
+  allow_failures:
+    - os: linux
+      dist: xenial
+    - os: windows
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
new file mode 100644
index 0000000..802ac1f
--- /dev/null
+++ b/CONTRIBUTING.md
@@ -0,0 +1 @@
+## TODO
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..d53ea09
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,201 @@
+                                Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (properties) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
diff --git a/README.md b/README.md
index 0743cbd..e4b3054 100644
--- a/README.md
+++ b/README.md
@@ -1 +1,307 @@
-# rocketmq-client-nodejs
+# RocketMQ Client for Node.js
+
+This official Node.js client is a C++ binding of [rocketmq-client-cpp](https://github.com/apache/rocketmq-client-cpp), which has been proven robust and widely adopted within Alibaba Group by many business units for more than three years.
+
+> **Notice 1:** This client is still in `dev` version. Use it cautiously in production.
+
+> **Notice 2:** This SDK is now only support macOS and Ubuntu **14.04**. Ubuntu 16+ is not supported and CentOS is not tested yet.
+
+## Installation
+
+```shell
+$ npm install --save apache-rocketmq
+```
+
+## Examples
+
+You may view [example/producer.js](https://github.com/apache/rocketmq-client-nodejs/blob/master/example/producer.js) and
+[example/push_consumer.js](https://github.com/apache/rocketmq-client-nodejs/blob/master/example/push_consumer.js) for quick start.
+
+## Usage
+
+Require this package first.
+
+```javascript
+const { Producer, PushConsumer } = require("apache-rocketmq");
+```
+
+### Producer
+
+#### Constructor
+
+```javascript
+new Producer(groupId[, instanceName][, options]);
+```
+
+`Producer`'s constructor receives three parameters:
+
++ `groupId`: the group id of the producer;
++ `instanceName`: the instance name of the producer, **optional**;
++ `options`: the options object, **optional**;
+  - `nameServer`: the name server of RocketMQ;
+  - `groupName`: the group name of this producer;
+  - `compressLevel`: the compress level (0-9) of this producer, default to `5` where `0` is fastest and `9` is most compressed;
+  - `sendMessageTimeout`: send message timeout millisecond, default to `3000` and suggestion is 2000 - 3000ms;
+  - `maxMessageSize`: max message size with unit (B), default to `1024 * 128` which means 128K;
+  - `logFileNum`: C++ core logic log file number, default to 3 and log file path is `$HOME/logs/rocketmq-cpp`;
+  - `logFileSize`: size of each C++ core logic log file with unit (B);
+  - `logLevel`: C++ core logic log level in `"fatal"`, `"error"`, `"warn"`, `"info"`, `"debug"`, `"trace"` and `"num"`.
+
+e.g.
+
+```javascript
+const { Producer } = require("apache-rocketmq");
+const producer = new Producer("GROUP_ID", "INSTANCE_NAME", {
+    nameServer: "127.0.0.1:9876",
+});
+```
+
+#### start
+
+```javascript
+producer.start([callback]);
+```
+
+`.start` receives a callback function. If no callback passed, this function will return a Promise object.
+
+e.g.
+
+```javascript
+producer.start(function(err) {
+    if(err) {
+        //
+    }
+});
+
+// or
+
+producer.start().then(() => {
+    //
+}).catch(err => {
+    //
+});
+```
+
+#### shutdown
+
+```javascript
+producer.start([callback]);
+```
+
+`.shutdown` receives a callback function. If no callback passed, this function will return a Promise object.
+
+e.g.
+
+```javascript
+producer.shutdown(function(err) {
+    if(err) {
+        //
+    }
+});
+
+// or
+
+producer.shutdown().then(() => {
+    //
+}).catch(err => {
+    //
+});
+```
+
+#### send
+
+```javascript
+producer.send(topic, body[, options][, callback]);
+```
+
+`.send` receives 4 parameters including a callback. If no callback passed, this function will return a Promise object.
+
++ `topic`: the topic string;
++ `body`: the message body string;
++ `options`: the options object, **optional**;
+  - `keys`: the keys for this message;
+  - `tags`: the tags for this message;
++ `callback`: the callback function, **optional**.
+
+e.g.
+
+```javascript
+producer.send("test", `baz ${i}`, {
+    keys: "foo",
+    tags: "bar"
+}, function(err, result) {
+    if(err) {
+        // ...    
+    } else {
+        console.log(result);
+
+        // console example:
+        //
+        //  { status: 0,
+        //    statusStr: 'OK',
+        //    msgId: '0101007F0000367E0000309DD68B0700',
+        //    offset: 0 }
+    }
+});
+```
+
+##### send `status` and `statusStr`
+
+| `status` | `statusStr`           |
+|----------|-----------------------|
+| `0`      | `OK`                  |
+| `1`      | `FLUSH_DISK_TIMEOUT`  |
+| `2`      | `FLUSH_SLAVE_TIMEOUT` |
+| `3`      | `SLAVE_NOT_AVAILABLE` |
+
+### PushConsumer
+
+#### Constructor
+
+```javascript
+new PushConsumer(groupId[, instanceName][, options]);
+```
+
+`PushConsumer`'s constructor receives three parameters:
+
++ `groupId`: the group id of the push consumer;
++ `instanceName`: the instance name of the push consumer, **optional**;
++ `options`: the options object, **optional**;
+  - `nameServer`: the name server of RocketMQ;
+  - `threadCount`: the thread number of underlying C++ logic;
+  - `maxBatchSize`: message max batch size;
+  - `logFileNum`: C++ core logic log file number, default to 3 and log file path is `$HOME/logs/rocketmq-cpp`;
+  - `logFileSize`: size of each C++ core logic log file with unit (B);
+  - `logLevel`: C++ core logic log level in `"fatal"`, `"error"`, `"warn"`, `"info"`, `"debug"`, `"trace"` and `"num"`.
+
+e.g.
+
+```javascript
+const { PushConsumer } = require("apache-rocketmq");
+const consumer = new PushConsumer("GROUP_ID", "INSTANCE_NAME", {
+    nameServer: "127.0.0.1:9876",
+    threadCount: 3
+});
+```
+
+#### start
+
+```javascript
+consumer.start([callback]);
+```
+
+`.start` receives a callback function. If no callback passed, this function will return a Promise object.
+
+e.g.
+
+```javascript
+consumer.start(function(err) {
+    if(err) {
+        //
+    }
+});
+
+// or
+
+consumer.start().then(() => {
+    //
+}).catch(err => {
+    //
+});
+```
+
+#### shutdown
+
+```javascript
+consumer.start([callback]);
+```
+
+`.shutdown` receives a callback function. If no callback passed, this function will return a Promise object.
+
+e.g.
+
+```javascript
+consumer.shutdown(function(err) {
+    if(err) {
+        //
+    }
+});
+
+// or
+
+consumer.shutdown().then(() => {
+    //
+}).catch(err => {
+    //
+});
+```
+
+#### subscribe
+
+Add a subscription relationship to consumer.
+
+```javascript
+consumer.subscribe(topic[, expression]);
+```
+
+`.subscribe` receives two parameters which the second parameter is optional.
+
++ `topic`: The topic to be subscribed;
++ `expression`: The additional expression to be subscribed, **optional**. e.g. `*`.
+
+#### On Message Event
+
+If you want to receive messages from RocketMQ Server, you should add a listener for `message` event which receives 2
+parameters.
+
+```javascript
+function YOUR_LISTENER(msg, ack) {
+    //
+}
+```
+
++ `msg`: the message object to be consumed;
++ `ack`: the Acknowledge object, which has a `.done()` function.
+
+`msg` object looks like:
+
+```javascript
+{ topic: 'test',
+  tags: 'bar',
+  keys: 'foo',
+  body: 'baz 7',
+  msgId: '0101007F0000367E0000339DD68B0800' }
+```
+
+You may call `ack.done()` to tell RocketMQ that you've finished your message successfully which is same as `ack.done(true)`. And you may call `ack.done(false)` to tell it that you've failed.
+
+e.g.
+
+```javascript
+consumer.on("message", function(msg, ack) {
+    console.log(msg);
+    ack.done();
+});
+```
+
+## Apache RocketMQ Community
+
++ [RocketMQ Community Projects](https://github.com/apache/rocketmq-externals)
+
+## Contact Us
+
++ Mailing Lists: https://rocketmq.apache.org/about/contact/
++ Home: https://rocketmq.apache.org
++ Docs: https://rocketmq.apache.org/docs/quick-start/
++ Issues: https://github.com/apache/rocketmq-client-nodejs/issues
++ Ask: https://stackoverflow.com/questions/tagged/rocketmq
++ Slack: https://rocketmq-community.slack.com/
+
+## How to Contribute
+
+Contributions are warmly welcome! Be it trivial cleanup, major new feature or other suggestion. Read this [how to contribute](CONTRIBUTING.md) guide for more details.
+
+## License
+
+[Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.html) Copyright (C) Apache Software Foundation
+
diff --git a/binding.gyp b/binding.gyp
new file mode 100644
index 0000000..697d1c4
--- /dev/null
+++ b/binding.gyp
@@ -0,0 +1,48 @@
+{
+  "targets": [
+    {
+      "target_name": "rocketmq",
+      "sources": [
+        "src/rocketmq.cpp",
+        "src/producer.cpp",
+        "src/push_consumer.cpp",
+        "src/consumer_ack.cpp",
+        "src/consumer_ack_inner.cpp"
+      ],
+      "include_dirs": [
+        "deps/rocketmq/include",
+        "<!(node -e \"require('nan')\")"
+      ],
+      "conditions": [
+        ["OS==\"linux\"", {
+          "libraries": [
+            "<(module_root_dir)/deps/lib/librocketmq.a"
+          ],
+          "cflags_cc!": [ "-fno-exceptions", "-pthread", "-Wl,--no-as-needed", "-ldl" ],
+          "cflags_cc": [ "-Wno-ignored-qualifiers" ],
+          "cflags": [ "-std=c++11", "-g" ]
+        }],
+        ["OS==\"win\"", {
+          "libraries": [
+            "<(module_root_dir)/deps/lib/rocketmq-client-cpp.lib"
+          ],
+          "copies": [
+            {
+              "destination": "<(module_root_dir)/build/Release/",
+              "files": [ "<(module_root_dir)/deps/lib/rocketmq-client-cpp.dll" ]
+            }
+          ]
+        }],
+        ["OS==\"mac\"", {
+          "xcode_settings": {
+            "GCC_ENABLE_CPP_EXCEPTIONS": "YES"
+          },
+          "cflags!": [ "-fno-exceptions" ],
+          "cflags_cc!": [ "-fno-exceptions", "-pthread", "-Wl,--no-as-needed", "-ldl" ],
+          "cflags_cc": [ "-Wno-ignored-qualifiers" ],
+          "cflags": [ "-std=c++11", "-stdlib=libc++" ]
+        }]
+      ]
+    }
+  ]
+}
diff --git a/deps/lib/.gitkeep b/deps/lib/.gitkeep
new file mode 100644
index 0000000..e69de29
diff --git a/deps/rocketmq b/deps/rocketmq
new file mode 160000
index 0000000..d5887b6
--- /dev/null
+++ b/deps/rocketmq
@@ -0,0 +1 @@
+Subproject commit d5887b63ddbba16fec562f64dca7f77ce9ca0bb1
diff --git a/example/common.js b/example/common.js
new file mode 100644
index 0000000..efc5153
--- /dev/null
+++ b/example/common.js
@@ -0,0 +1,6 @@
+"use strict";
+
+module.exports = {
+    nameServer: "127.0.0.1:9876",
+    messageCount: 10
+};
diff --git a/example/producer.js b/example/producer.js
new file mode 100644
index 0000000..0f8ac0b
--- /dev/null
+++ b/example/producer.js
@@ -0,0 +1,47 @@
+"use strict";
+
+const co = require("co");
+
+const common = require("./common");
+const Producer = require("../").Producer;
+
+co(function *() {
+    const producer = new Producer("testGroup", {
+        nameServer: common.nameServer,
+        groupName: "testGroupName",
+        logFileNum: 5,
+        logFileSize: 1048576000,
+        logLevel: "debug",
+        compressLevel: 3,
+        sendMessageTimeout: 5000,
+        maxMessageSize: 1024 * 256
+    });
+
+    console.time("producer start");
+    try {
+        yield producer.start();
+    } catch(e) {
+        console.error(e);
+        process.exit(4);
+    }
+    console.timeEnd("producer start");
+    for(let i = 0; i < common.messageCount; i++) {
+        console.time(`send ${i}`);
+        try {
+            const ret = yield producer.send("test", `baz ${i}`, {
+                keys: "foo",
+                tags: "bar"
+            });
+            console.timeEnd(`send ${i}`);
+            console.log(ret);
+        } catch(e) {
+            console.error(e);
+            console.error(e.stack);
+            process.exit(4);
+        }
+    }
+
+    console.time("producer end");
+    yield producer.shutdown();
+    console.timeEnd("producer end");
+});
diff --git a/example/push_consumer.js b/example/push_consumer.js
new file mode 100644
index 0000000..9154a97
--- /dev/null
+++ b/example/push_consumer.js
@@ -0,0 +1,58 @@
+"use strict";
+
+const assert = require("assert");
+
+const co = require("co");
+
+const common = require("./common");
+const PushConsumer = require("../").PushConsumer;
+
+co(function *() {
+    const msgs = [];
+    const consumer = new PushConsumer("testGroup", {
+        nameServer: common.nameServer,
+        logFileNum: 5,
+        logFileSize: 1048576000,
+        logLevel: "debug"
+    });
+
+    consumer.subscribe("test", "*");
+    consumer.on("message", function(msg, ack) {
+        msgs.push(msg);
+        ack.done();
+
+        // console.log(msg);
+        // return;
+
+        if(msgs.length === common.messageCount) {
+            msgs.sort(function(a, b) {
+                return a.body < b.body ? -1 : 1;
+            });
+
+            console.log(msgs);
+
+            for(let i = 0; i < msgs.length; i++) {
+                assert.deepStrictEqual(msgs[i], {
+                    topic: "test",
+                    tags: "bar",
+                    keys: "foo",
+                    body: `baz ${i}`,
+                    msgId: msgs[i].msgId
+                });
+            }
+
+            console.time("consumer end");
+            consumer.shutdown().then(() => {
+                console.timeEnd("consumer end");
+                process.exit(0);
+            }).catch(e => {
+                console.error(e);
+                process.exit(4);
+            });
+        }
+    });
+
+    console.time("consumer start");
+    yield consumer.start();
+    console.timeEnd("consumer start");
+});
diff --git a/index.js b/index.js
new file mode 100644
index 0000000..9127a66
--- /dev/null
+++ b/index.js
@@ -0,0 +1,4 @@
+"use strict";
+
+exports.Producer = require("./lib/producer");
+exports.PushConsumer = require("./lib/push_consumer");
diff --git a/lib/common.js b/lib/common.js
new file mode 100644
index 0000000..73f3ccb
--- /dev/null
+++ b/lib/common.js
@@ -0,0 +1,12 @@
+"use strict";
+
+exports.requireBinding = function(name) {
+    let mod;
+    try {
+        mod = require(`../build/Debug/${name}`);
+    } catch(e) {
+        mod = require(`../build/Release/${name}`);
+    }
+
+    return mod;
+};
diff --git a/lib/env_init.js b/lib/env_init.js
new file mode 100644
index 0000000..16d8d31
--- /dev/null
+++ b/lib/env_init.js
@@ -0,0 +1,16 @@
+"use strict";
+
+const os = require("os");
+const path = require("path");
+
+const common = require("./common");
+
+const binding = common.requireBinding("rocketmq");
+
+switch(os.platform()) {
+case "darwin":
+    process.env.EVENT_NOKQUEUE = "1";
+    binding.macosDLOpen(path.join(__dirname, "../deps/lib/librocketmq.dylib"));
+    break;
+default: break;
+}
diff --git a/lib/producer.js b/lib/producer.js
new file mode 100644
index 0000000..8c5b09c
--- /dev/null
+++ b/lib/producer.js
@@ -0,0 +1,184 @@
+"use strict";
+
+require("./env_init");
+
+const assert = require("assert");
+
+const common = require("./common");
+
+const binding = common.requireBinding("rocketmq");
+
+const START_OR_SHUTDOWN = Symbol("RocketMQProducer#startOrShutdown");
+
+const SEND_RESULT_STATUS = {
+    OK: 0,
+    FLUSH_DISK_TIMEOUT: 1,
+    FLUSH_SLAVE_TIMEOUT: 2,
+    SLAVE_NOT_AVAILABLE: 3
+};
+const SEND_RESULT_STATUS_STR = {
+    0: "OK",
+    1: "FLUSH_DISK_TIMEOUT",
+    2: "FLUSH_SLAVE_TIMEOUT",
+    3: "SLAVE_NOT_AVAILABLE"
+};
+const DEFAULT_OPTIONS = {};
+const START_STATUS = {
+    STOPPED: 0,
+    STARTED: 1,
+    STOPPING: 2,
+    STARTING: 3
+};
+const OPTIONS_LOG_LEVEL = {
+    FATAL: 1,
+    ERROR: 2,
+    WARN: 3,
+    INFO: 4,
+    DEBUG: 5,
+    TRACE: 6,
+    NUM: 7
+};
+
+let producerRef = 0;
+let timer;
+
+class RocketMQProducer {
+    /**
+     * RocketMQ Producer constructor
+     * @param {String} groupId the group id
+     * @param {String} [instanceName] the instance name
+     * @param {Object} options the options
+     */
+    constructor(groupId, instanceName, options) {
+        if(typeof instanceName !== "string" && !options) {
+            options = instanceName;
+            instanceName = null;
+        }
+
+        options = Object.assign({}, DEFAULT_OPTIONS, options || {});
+        if(options.logLevel && typeof options.logLevel === "string") {
+            options.logLevel = OPTIONS_LOG_LEVEL[options.logLevel.toUpperCase()] || OPTIONS_LOG_LEVEL.INFO;
+        }
+        this.core = new binding.Producer(groupId, instanceName, options);
+        this.status = START_STATUS.STOPPED;
+    }
+
+    /**
+     * Set session credentials (usually used in Alibaba MQ)
+     * @param {String} accessKey the access key
+     * @param {String} secretKey the secret key
+     * @param {String} onsChannel the ons channel
+     * @return {Number} the result
+     */
+    setSessionCredentials(accessKey, secretKey, onsChannel) {
+        assert(typeof accessKey === "string");
+        assert(typeof secretKey === "string");
+        assert(typeof onsChannel === "string");
+        return !this.core.setSessionCredentials(accessKey, secretKey, onsChannel);
+    }
+
+    [START_OR_SHUTDOWN](method, callback) {
+        let promise;
+        let resolve;
+        let reject;
+        if(!callback) {
+            promise = new Promise((_resolve, _reject) => {
+                resolve = _resolve;
+                reject = _reject;
+            });
+        } else {
+            resolve = reject = callback;
+        }
+
+        this.core[method]((err, ret) => {
+            if(err) return reject(err);
+
+            if(method === "start") {
+                this.status = START_STATUS.STARTED;
+                if(!producerRef) timer = setInterval(function() {}, 24 * 3600 * 1000);
+                producerRef++;
+            } else {
+                this.status = START_STATUS.STOPPED;
+                producerRef--;
+                if(!producerRef) clearInterval(timer);
+            }
+
+            return callback ? resolve(undefined, ret) : resolve(ret);
+        });
+
+        if(!callback) return promise;
+    }
+
+    /**
+     * Start the producer
+     * @param {Function} [callback] the callback function
+     * @return {Promise|undefined} returns a Promise if no callback
+     */
+    start(callback) {
+        assert(this.status === START_STATUS.STOPPED);
+        this.status = START_STATUS.STARTING;
+        return this[START_OR_SHUTDOWN]("start", callback);
+    }
+
+    /**
+     * Shutdown the producer
+     * @param {Function} [callback] the callback function
+     * @return {Promise|undefined} returns a Promise if no callback
+     */
+    shutdown(callback) {
+        assert(this.status === START_STATUS.STARTED);
+        this.status = START_STATUS.STOPPING;
+        return this[START_OR_SHUTDOWN]("shutdown", callback);
+    }
+
+    /**
+     * Send a message
+     * @param {String} topic the topic
+     * @param {String} body the body
+     * @param {Object} [options] the options
+     * @param {Function} [callback] the callback function
+     * @return {Promise|undefined} returns a Promise if no callback
+     */
+    send(topic, body, options, callback) {
+        assert(typeof topic === "string");
+        assert(typeof body === "string" || Buffer.isBuffer(body));
+
+        if(typeof options === "function") {
+            callback = options;
+            options = {};
+        }
+
+        options = options || {};
+
+        let promise;
+        let resolve;
+        let reject;
+        if(!callback) {
+            promise = new Promise((_resolve, _reject) => {
+                resolve = _resolve;
+                reject = _reject;
+            });
+        }
+
+        this.core.send(topic, body, options, function sendMessageCallback(err, status, msgId, offset) {
+            if(err) {
+                if(!callback) return reject(err);
+                return callback(err);
+            }
+
+            const ret = {
+                status,
+                statusStr: SEND_RESULT_STATUS_STR[status] || "UNKNOWN",
+                msgId,
+                offset
+            };
+            if(!callback) return resolve(ret);
+            callback(undefined, ret);
+        });
+        if(promise) return promise;
+    }
+}
+
+RocketMQProducer.SEND_RESULT = SEND_RESULT_STATUS;
+
+module.exports = RocketMQProducer;
diff --git a/lib/push_consumer.js b/lib/push_consumer.js
new file mode 100644
index 0000000..fde2e84
--- /dev/null
+++ b/lib/push_consumer.js
@@ -0,0 +1,126 @@
+"use strict";
+
+require("./env_init");
+
+const assert = require("assert");
+const EventEmitter = require("events").EventEmitter;
+
+const common = require("./common");
+
+const binding = common.requireBinding("rocketmq");
+
+const START_OR_SHUTDOWN = Symbol("RocketMQPushConsumer#startOrShutdown");
+const START_STATUS = {
+    STOPPED: 0,
+    STARTED: 1,
+    STOPPING: 2,
+    STARTING: 3
+};
+const OPTIONS_LOG_LEVEL = {
+    FATAL: 1,
+    ERROR: 2,
+    WARN: 3,
+    INFO: 4,
+    DEBUG: 5,
+    TRACE: 6,
+    NUM: 7
+};
+
+const DEFAULT_OPTIONS = {};
+
+let producerRef = 0;
+let timer;
+
+class RocketMQPushConsumer extends EventEmitter {
+    /**
+     * RocketMQ PushConsumer constructor
+     * @param {String} groupId the group id
+     * @param {String} [instanceName] the instance name
+     * @param {Object} options the options
+     */
+    constructor(groupId, instanceName, options) {
+        super();
+
+        if(typeof instanceName !== "string" && !options) {
+            options = instanceName;
+            instanceName = null;
+        }
+
+        options = Object.assign({}, DEFAULT_OPTIONS, options || {});
+        if(options.logLevel && typeof options.logLevel === "string") {
+            options.logLevel = OPTIONS_LOG_LEVEL[options.logLevel.toUpperCase()] || OPTIONS_LOG_LEVEL.INFO;
+        }
+        this.core = new binding.PushConsumer(groupId, instanceName, options);
+        this.core.setListener(this.emit.bind(this, "message"));
+        this.status = START_STATUS.STOPPED;
+    }
+
+    [START_OR_SHUTDOWN](method, callback) {
+        let promise;
+        let resolve;
+        let reject;
+        if(!callback) {
+            promise = new Promise((_resolve, _reject) => {
+                resolve = _resolve;
+                reject = _reject;
+            });
+        } else {
+            resolve = reject = callback;
+        }
+
+        this.core[method]((err, ret) => {
+            if(err) return reject(err);
+
+            if(method === "start") {
+                this.status = START_STATUS.STARTED;
+                if(!producerRef) timer = setInterval(function() {}, 24 * 3600 * 1000);
+                producerRef++;
+            } else {
+                this.status = START_STATUS.STOPPED;
+                producerRef--;
+                if(!producerRef) clearInterval(timer);
+            }
+
+            return callback ? resolve(undefined, ret) : resolve(ret);
+        });
+
+        if(!callback) return promise;
+    }
+
+    /**
+     * Start the push consumer
+     * @param {Function} [callback] the callback function
+     * @return {Promise|undefined} returns a Promise if no callback
+     */
+    start(callback) {
+        assert(this.status === START_STATUS.STOPPED);
+        this.status = START_STATUS.STARTING;
+        return this[START_OR_SHUTDOWN]("start", callback);
+    }
+
+    /**
+     * Shutdown the push consumer
+     * @param {Function} [callback] the callback function
+     * @return {Promise|undefined} returns a Promise if no callback
+     */
+    shutdown(callback) {
+        assert(this.status === START_STATUS.STARTED);
+        this.status = START_STATUS.STOPPING;
+        return this[START_OR_SHUTDOWN]("shutdown", callback);
+    }
+
+    /**
+     * subscribe a topic
+     * @param {String} topic the topic to be subscribed
+     * @param {String} [expression] the additional expression to be subscribed
+     * @return {Number} the subscribe status result
+     */
+    subscribe(topic, expression) {
+        assert(this.status === START_STATUS.STOPPED);
+        assert(topic && typeof topic === "string");
+        assert(!expression || expression && typeof expression === "string");
+        return this.core.subscribe(topic, expression || "");
+    }
+}
+
+module.exports = RocketMQPushConsumer;
diff --git a/package.json b/package.json
new file mode 100644
index 0000000..ae0df92
--- /dev/null
+++ b/package.json
@@ -0,0 +1,26 @@
+{
+  "name": "apache-rocketmq",
+  "version": "0.0.1-dev",
+  "cppSDKVersion": "1.2.0",
+  "description": "RocketMQ binding for Node.js",
+  "main": "index.js",
+  "scripts": {
+    "test": "npm run lint && echo 'temp example test' && node example/producer.js && node example/push_consumer.js",
+    "lint": "eslint .",
+    "install": "node ./script/download_lib.js && node-gyp rebuild"
+  },
+  "author": "XadillaX <i...@2333.moe>",
+  "license": "Apache-2.0",
+  "dependencies": {
+    "co": "^4.6.0",
+    "destroy": "^1.0.4",
+    "getos": "^3.1.1",
+    "mkdirp": "^0.5.1",
+    "nan": "^2.11.1",
+    "urllib": "^2.31.3"
+  },
+  "devDependencies": {
+    "eslint": "^5.9.0",
+    "eslint-config-rocketmq-style": "^1.0.0"
+  }
+}
diff --git a/script/download_lib.js b/script/download_lib.js
new file mode 100644
index 0000000..2b57a4b
--- /dev/null
+++ b/script/download_lib.js
@@ -0,0 +1,115 @@
+"use strict";
+
+const fs = require("fs");
+const os = require("os");
+const path = require("path");
+
+const co = require("co");
+const destroy = require("destroy");
+const _mkdirp = require("mkdirp");
+const urllib = require("urllib");
+
+const getLinuxDistroRoute = require("./get_linux_distro_route");
+const pkg = require("../package");
+
+let REGISTRY_MIRROR =
+    process.env.NODE_ROCKETMQ_REGISTRY ||
+    "https://opensource-rocketmq-client.oss-cn-hangzhou.aliyuncs.com";
+if(!REGISTRY_MIRROR.endsWith("/")) REGISTRY_MIRROR += "/";
+
+const CPP_SDK_VERSION = pkg.cppSDKVersion;
+const LIB_DIR = path.join(__dirname, "..", "deps", "lib");
+const URL_ROOT = `${REGISTRY_MIRROR}cpp-client`;
+
+function mkdirp(dir) {
+    return new Promise((resolve, reject) => {
+        _mkdirp(dir, null, err => {
+            if(err) reject(err);
+            else resolve();
+        });
+    });
+}
+
+function *getUrlArray() {
+    const platform = os.platform();
+    const ret = [];
+    let distro;
+
+    switch(platform) {
+    case "win32":
+        ret.push(`${URL_ROOT}/windows/${CPP_SDK_VERSION}/rocketmq-client-cpp.dll`);
+        ret.push(`${URL_ROOT}/windows/${CPP_SDK_VERSION}/rocketmq-client-cpp.lib`);
+        break;
+
+    case "darwin":
+        ret.push(`${URL_ROOT}/mac/${CPP_SDK_VERSION}/librocketmq.dylib`);
+        break;
+
+    case "linux":
+        distro = yield getLinuxDistroRoute();
+        ret.push(`${URL_ROOT}/linux/${CPP_SDK_VERSION}/${distro}/librocketmq.a`);
+        break;
+
+    default: throw new Error(`Unsupported platform ${platform}`);
+    }
+
+    return ret;
+}
+
+co(function *() {
+    let urls;
+    try {
+        urls = yield getUrlArray();
+    } catch(e) {
+        console.error(`[rocketmq sdk] [error] ${e.message}`);
+        process.exit(4);
+    }
+
+    yield mkdirp(LIB_DIR);
+    let writeTimes = 0;
+    for(const url of urls) {
+        console.log(`[rocketmq sdk] [info] downloading [${url}]...`);
+
+        const resp = yield urllib.request(url, {
+            timeout: 60000 * 5,
+            followRedirect: true,
+            streaming: true
+        });
+
+        if(resp.status !== 200) {
+            destroy(resp.res);
+            console.error(`[rocketmq sdk] [error] error status ${resp.status} while downloading [${url}].`);
+            process.exit(4);
+        }
+
+        const readStream = resp.res;
+        const filename = path.join(LIB_DIR, path.basename(url));
+        const writeStream = fs.createWriteStream(filename, {
+            encoding: "binary"
+        });
+
+        // eslint-disable-next-line
+        function handleDownladCallback(err) {
+            if(err) {
+                console.error(`[rocketmq sdk] [error] error occurred while downloading [${url}] to [${filename}].`);
+                console.error(err.stack);
+                process.exit(4);
+            }
+
+            writeTimes++;
+            destroy(resp.res);
+
+            console.log(`[rocketmq sdk] [info] downloaded library [${url}].`);
+            if(writeTimes === urls.length) {
+                console.log("[rocketmq sdk] [info] all libraries have been written to disk.");
+                process.exit(0);
+            }
+        }
+
+        readStream.on("error", handleDownladCallback);
+        writeStream.on("error", handleDownladCallback);
+        writeStream.on("finish", handleDownladCallback);
+
+        readStream.pipe(writeStream);
+    }
+});
diff --git a/script/get_linux_distro_route.js b/script/get_linux_distro_route.js
new file mode 100644
index 0000000..d5245a1
--- /dev/null
+++ b/script/get_linux_distro_route.js
@@ -0,0 +1,86 @@
+"use strict";
+
+const assert = require("assert");
+
+const getos = require("getos");
+
+const RHEL_MAP_ARRAY = [
+    "Centos",
+    "Red Hat Linux",
+    "RHEL",
+    "Scientific Linux",
+    "ScientificSL",
+    "ScientificCERNSLC",
+    "ScientificFermiLTS",
+    "ScientificSLF"
+];
+const NORMAL_MAP_ARRAY = [
+    "Alpine Linux",
+    "Amazon Linux",
+    "Arch Linux",
+    "Chakra",
+    "Debian",
+    "elementary OS",
+    "IYCC",
+    "Linux Mint",
+    "Manjaro Linux",
+    "Ubuntu Linux"
+];
+
+function realGetLinuxDistroRoute(dist, release) {
+    let major;
+    if(release) {
+        major = Number(release.split(".")[0]);
+    }
+
+    // RHEL Distros
+    if(RHEL_MAP_ARRAY.includes(dist)) {
+        assert(major >= 5 && major <= 7, `Only support ${dist} 5-7.`);
+        return `RHEL${major}.x`;
+    }
+
+    // Fedora
+    if("Fedora" === dist) {
+        if(major <= 18) {
+            return "RHEL6.x";
+        } else if(major === 19) {
+            return "RHEL7.x";
+        }
+    }
+
+    if("Ubuntu Linux" === dist) {
+        assert([ 14, 16, 18 ].includes(major));
+        return `UBUNTU/${major}.04`;
+    }
+
+    if("Debian" === dist) {
+        assert(major >= 8 && major <= 10);
+        return `UBUNTU/${(major + 2) / 2}.04`;
+    }
+
+    // Ubuntu Distros
+    if(!NORMAL_MAP_ARRAY.includes(dist)) {
+        console.error(`[rocketmq sdk] [warn] ${dist} may not supported, fallback to use Ubuntu library.`);
+    }
+
+    return "UBUNTU/14.04";
+}
+
+function getLinuxDistroRoute() {
+    return new Promise((resolve, reject) => {
+        getos(function(err, ret) {
+            if(err) return reject(err);
+
+            let route;
+            try {
+                route = realGetLinuxDistroRoute(ret.dist, ret.release);
+            } catch(e) {
+                return reject(e);
+            }
+
+            resolve(route);
+        });
+    });
+}
+
+module.exports = getLinuxDistroRoute;
diff --git a/src/consumer_ack.cpp b/src/consumer_ack.cpp
new file mode 100644
index 0000000..d4243ea
--- /dev/null
+++ b/src/consumer_ack.cpp
@@ -0,0 +1,74 @@
+#include "consumer_ack.h"
+
+namespace __node_rocketmq__ {
+
+Nan::Persistent<Function> ConsumerAck::constructor;
+
+ConsumerAck::ConsumerAck() :
+    inner(NULL)
+{
+}
+
+ConsumerAck::~ConsumerAck()
+{
+    inner = NULL;
+}
+
+NAN_MODULE_INIT(ConsumerAck::Init)
+{
+    Local<FunctionTemplate> tpl = Nan::New<FunctionTemplate>(New);
+    tpl->SetClassName(Nan::New("ConsumerAck").ToLocalChecked());
+    tpl->InstanceTemplate()->SetInternalFieldCount(1);
+
+    Nan::SetPrototypeMethod(tpl, "done", Done);
+
+    constructor.Reset(tpl->GetFunction());
+    Nan::Set(target, Nan::New("ConsumerAck").ToLocalChecked(), tpl->GetFunction());
+}
+
+NAN_METHOD(ConsumerAck::New)
+{
+    Isolate* isolate = info.GetIsolate();
+    Local<Context> context = Context::New(isolate);
+
+    if(!info.IsConstructCall())
+    {
+        Local<Function> _constructor = Nan::New<Function>(constructor);
+        info.GetReturnValue().Set(_constructor->NewInstance(context, 0, 0).ToLocalChecked());
+        return;
+    }
+
+    ConsumerAck* producer = new ConsumerAck();
+    producer->Wrap(info.This());
+    info.GetReturnValue().Set(info.This());
+}
+
+NAN_METHOD(ConsumerAck::Done)
+{
+    ConsumerAck* ack = ObjectWrap::Unwrap<ConsumerAck>(info.Holder());
+    bool succ = true;
+
+    if(info.Length() >= 1)
+    {
+        succ = (info[0]->IsUndefined() || info[0]->IsNull() || Nan::To<bool>(info[0]).FromJust());
+    }
+
+    // call inner ack's `Ack` function to emit the true `Acker`'s `Ack` function
+    // and finish waiting of consume thread
+    CConsumeStatus status = succ ?
+        CConsumeStatus::E_CONSUME_SUCCESS :
+        CConsumeStatus::E_RECONSUME_LATER;
+    ack->Ack(status);
+}
+
+void ConsumerAck::Ack(CConsumeStatus status)
+{
+    if(inner)
+    {
+        // call inner ack in the main event loop
+        inner->Ack(status);
+        inner = NULL;
+    }
+}
+
+}
\ No newline at end of file
diff --git a/src/consumer_ack.h b/src/consumer_ack.h
new file mode 100644
index 0000000..a90e5ca
--- /dev/null
+++ b/src/consumer_ack.h
@@ -0,0 +1,50 @@
+#ifndef __ROCKETMQ_CONSUMER_ACK_H__
+#define __ROCKETMQ_CONSUMER_ACK_H__
+
+#include "consumer_ack_inner.h"
+#include <nan.h>
+
+namespace __node_rocketmq__ {
+
+using v8::Context;
+using v8::Function;
+using v8::FunctionTemplate;
+using v8::Isolate;
+using v8::Local;
+using v8::Object;
+using v8::String;
+using v8::Value;
+
+class ConsumerAck : public Nan::ObjectWrap {
+public:
+    static NAN_MODULE_INIT(Init);
+
+private:
+    explicit ConsumerAck();
+    ~ConsumerAck();
+
+    static NAN_METHOD(New);
+    static NAN_METHOD(Done);
+
+    void Ack(CConsumeStatus status);
+
+    static Nan::Persistent<v8::Function> constructor;
+
+public:
+    void SetInner(ConsumerAckInner* _inner)
+    {
+        inner = _inner;
+    }
+
+    static Nan::Persistent<v8::Function>& GetConstructor()
+    {
+        return constructor;
+    }
+
+private:
+    ConsumerAckInner* inner;
+};
+
+}
+
+#endif
\ No newline at end of file
diff --git a/src/consumer_ack_inner.cpp b/src/consumer_ack_inner.cpp
new file mode 100644
index 0000000..dcf5e89
--- /dev/null
+++ b/src/consumer_ack_inner.cpp
@@ -0,0 +1,61 @@
+#include "consumer_ack_inner.h"
+
+namespace __node_rocketmq__ {
+
+ConsumerAckInner::ConsumerAckInner() :
+    acked(false)
+{
+    uv_cond_init(&cond);
+    uv_mutex_init(&mutex);
+}
+
+ConsumerAckInner::~ConsumerAckInner()
+{
+    uv_mutex_destroy(&mutex);
+    uv_cond_destroy(&cond);
+}
+
+void ConsumerAckInner::Ack(CConsumeStatus _status)
+{
+    uv_mutex_lock(&mutex);
+    bool _acked = acked;
+
+    if(_acked)
+    {
+        uv_mutex_unlock(&mutex);
+        return;
+    }
+
+    status = _status;
+    acked = true;
+
+    // tell `this->WaitResult()` to continue
+    uv_cond_signal(&cond);
+    uv_mutex_unlock(&mutex);
+}
+
+CConsumeStatus ConsumerAckInner::WaitResult()
+{
+    uv_mutex_lock(&mutex);
+
+    // if `cond signal` send before `WaitResult()`,
+    // `uv_cond_wait` will be blocked and never continue
+    //
+    // so we have to return result directly without `uv_cond_wait`
+    if(acked)
+    {
+        CConsumeStatus _status = status;
+        uv_mutex_unlock(&mutex);
+        return _status;
+    }
+
+    // wait for `this->Ack()` and that will emit `uv_cond_signal` to let it stop wait
+    uv_cond_wait(&cond, &mutex);
+
+    CConsumeStatus _status = status;
+    uv_mutex_unlock(&mutex);
+
+    return _status;
+}
+
+}
diff --git a/src/consumer_ack_inner.h b/src/consumer_ack_inner.h
new file mode 100644
index 0000000..402de6b
--- /dev/null
+++ b/src/consumer_ack_inner.h
@@ -0,0 +1,26 @@
+#ifndef __ROCKETMQ_CONSUMER_ACK_INNER_H__
+#define __ROCKETMQ_CONSUMER_ACK_INNER_H__
+
+#include <uv.h>
+#include <CPushConsumer.h>
+
+namespace __node_rocketmq__ {
+
+class ConsumerAckInner {
+public:
+    ConsumerAckInner();
+    ~ConsumerAckInner();
+
+    void Ack(CConsumeStatus _status);
+    CConsumeStatus WaitResult();
+
+private:
+    bool acked;
+    CConsumeStatus status;
+    uv_mutex_t mutex;
+    uv_cond_t cond;
+};
+
+}
+
+#endif
\ No newline at end of file
diff --git a/src/producer.cpp b/src/producer.cpp
new file mode 100644
index 0000000..36996c6
--- /dev/null
+++ b/src/producer.cpp
@@ -0,0 +1,251 @@
+#include "producer.h"
+#include "workers/producer/send_message.h"
+#include "workers/producer/start_or_shutdown.h"
+
+#include <MQClientException.h>
+#include <string>
+using namespace std;
+
+namespace __node_rocketmq__ {
+
+#define NAN_GET_CPRODUCER() \
+    RocketMQProducer* _v8_producer = ObjectWrap::Unwrap<RocketMQProducer>(info.Holder()); \
+    CProducer* producer_ptr = _v8_producer->GetProducer();
+
+Nan::Persistent<Function> RocketMQProducer::constructor;
+
+RocketMQProducer::RocketMQProducer(const char* group_id, const char* instance_name)
+{
+    producer_ptr = CreateProducer(group_id);
+    if(instance_name)
+    {
+        SetProducerInstanceName(producer_ptr, instance_name);
+    }
+}
+
+RocketMQProducer::~RocketMQProducer()
+{
+    try
+    {
+        ShutdownProducer(producer_ptr);
+    }
+    catch (...)
+    {
+        //
+    }
+
+    DestroyProducer(producer_ptr);
+}
+
+void RocketMQProducer::SetOptions(Local<Object> options)
+{
+    // set name server
+    Local<Value> _name_server_v = Nan::Get(options, Nan::New<String>("nameServer").ToLocalChecked()).ToLocalChecked();
+    if(_name_server_v->IsString())
+    {
+        Nan::Utf8String namesrv(_name_server_v);
+        SetProducerNameServerAddress(producer_ptr, *namesrv);
+    }
+
+    // set group name
+    Local<Value> _group_name_v = Nan::Get(options, Nan::New<String>("groupName").ToLocalChecked()).ToLocalChecked();
+    if(_group_name_v->IsString())
+    {
+        Nan::Utf8String group_name(_group_name_v);
+        SetProducerGroupName(producer_ptr, *group_name);
+    }
+
+    // set log num & single log size
+    int file_num = 3;
+    int64 file_size = 104857600;
+    Local<Value> _log_file_num_v = Nan::Get(options, Nan::New<String>("logFileNum").ToLocalChecked()).ToLocalChecked();
+    Local<Value> _log_file_size_v = Nan::Get(options, Nan::New<String>("logFileSize").ToLocalChecked()).ToLocalChecked();
+    if(_log_file_num_v->IsNumber())
+    {
+        file_num = _log_file_num_v->Int32Value();
+    }
+    if(_log_file_size_v->IsNumber())
+    {
+        file_size = _log_file_size_v->Int32Value();
+    }
+    SetProducerLogFileNumAndSize(producer_ptr, file_num, file_size);
+
+    // set log level
+    Local<Value> _log_level_v = Nan::Get(options, Nan::New<String>("logLevel").ToLocalChecked()).ToLocalChecked();
+    if(_log_level_v->IsNumber())
+    {
+        int level = _log_level_v->Int32Value();
+        SetProducerLogLevel(producer_ptr, (CLogLevel)level);
+    }
+
+    // set compress level
+    Local<Value> _compress_level_v = Nan::Get(options, Nan::New<String>("compressLevel").ToLocalChecked()).ToLocalChecked();
+    if(_compress_level_v->IsNumber()) {
+        int level = _compress_level_v->Int32Value();
+        SetProducerCompressLevel(producer_ptr, level);
+    }
+
+    // set send message timeout
+    Local<Value> _send_message_timeout_v = Nan::Get(options, Nan::New<String>("sendMessageTimeout").ToLocalChecked()).ToLocalChecked();
+    if(_send_message_timeout_v->IsNumber())
+    {
+        int timeout = _send_message_timeout_v->Int32Value();
+        SetProducerSendMsgTimeout(producer_ptr, timeout);
+    }
+
+    // set max message size
+    Local<Value> _max_message_size_v = Nan::Get(options, Nan::New<String>("maxMessageSize").ToLocalChecked()).ToLocalChecked();
+    if(_max_message_size_v->IsNumber())
+    {
+        int size = _max_message_size_v->Int32Value();
+        SetProducerMaxMessageSize(producer_ptr, size);
+    }
+}
+
+NAN_MODULE_INIT(RocketMQProducer::Init)
+{
+    Local<FunctionTemplate> tpl = Nan::New<FunctionTemplate>(New);
+    tpl->SetClassName(Nan::New("RocketMQProducer").ToLocalChecked());
+    tpl->InstanceTemplate()->SetInternalFieldCount(1);
+
+    Nan::SetPrototypeMethod(tpl, "start", Start);
+    Nan::SetPrototypeMethod(tpl, "shutdown", Shutdown);
+    Nan::SetPrototypeMethod(tpl, "send", Send);
+    Nan::SetPrototypeMethod(tpl, "setSessionCredentials", SetSessionCredentials);
+
+    constructor.Reset(tpl->GetFunction());
+    Nan::Set(target, Nan::New("Producer").ToLocalChecked(), tpl->GetFunction());
+}
+
+NAN_METHOD(RocketMQProducer::New)
+{
+    Isolate* isolate = info.GetIsolate();
+    Local<Context> context = Context::New(isolate);
+
+    if(!info.IsConstructCall())
+    {
+        const int argc = 3;
+        Local<Value> argv[argc] = { info[0], info[1], info[2] };
+        Local<Function> _constructor = Nan::New<v8::Function>(constructor);
+        info.GetReturnValue().Set(_constructor->NewInstance(context, argc, argv).ToLocalChecked());
+        return;
+    }
+
+    Nan::Utf8String group_id(info[0]);
+    Nan::Utf8String instance_name(info[1]);
+    Local<Object> options = Nan::To<Object>(info[2]).ToLocalChecked();
+    RocketMQProducer* producer = new RocketMQProducer(*group_id, info[1]->IsNull() ? NULL : *instance_name);
+
+    producer->Wrap(info.This());
+
+    // try to set options
+    try
+    {
+        producer->SetOptions(options);
+    }
+    catch (runtime_error e)
+    {
+        Nan::ThrowError(e.what());
+        return;
+    }
+
+    info.GetReturnValue().Set(info.This());
+}
+
+NAN_METHOD(RocketMQProducer::Start)
+{
+    NAN_GET_CPRODUCER();
+
+    Nan::Callback* callback = (info[0]->IsFunction()) ?
+        new Nan::Callback(Nan::To<Function>(info[0]).ToLocalChecked()) :
+        NULL;
+
+    Nan::AsyncQueueWorker(new ProducerStartOrShutdownWorker(callback, producer_ptr, ProducerWorkerType::START_PRODUCER));
+}
+
+NAN_METHOD(RocketMQProducer::Shutdown)
+{
+    NAN_GET_CPRODUCER();
+
+    Nan::Callback* callback = (info[0]->IsFunction()) ?
+        new Nan::Callback(Nan::To<Function>(info[0]).ToLocalChecked()) :
+        NULL;
+
+    Nan::AsyncQueueWorker(new ProducerStartOrShutdownWorker(callback, producer_ptr, ProducerWorkerType::SHUTDOWN_PRODUCER));
+}
+
+NAN_METHOD(RocketMQProducer::SetSessionCredentials)
+{
+    NAN_GET_CPRODUCER();
+
+    Nan::Utf8String access_key(info[0]);
+    Nan::Utf8String secret_key(info[1]);
+    Nan::Utf8String ons_channel(info[2]);
+
+    int ret;
+    try
+    {
+        ret = SetProducerSessionCredentials(producer_ptr, *access_key, *secret_key, *ons_channel);
+    }
+    catch(runtime_error e)
+    {
+        Nan::ThrowError(e.what());
+    }
+    catch(std::exception& e)
+    {
+        Nan::ThrowError(e.what());
+    }
+    catch(rocketmq::MQException& e)
+    {
+        Nan::ThrowError(e.what());
+    }
+    info.GetReturnValue().Set(ret);
+}
+
+NAN_METHOD(RocketMQProducer::Send)
+{
+    Nan::Utf8String topic(info[0]);
+    Local<Object> options = Nan::To<Object>(info[2]).ToLocalChecked();
+
+    CMessage* msg = CreateMessage(*topic);
+
+    Local<Value> _tags_to_be_checked = Nan::Get(options, Nan::New<String>("tags").ToLocalChecked()).ToLocalChecked();
+    Local<Value> _keys_to_be_checked = Nan::Get(options, Nan::New<String>("keys").ToLocalChecked()).ToLocalChecked();
+
+    if(_tags_to_be_checked->IsString())
+    {
+        Nan::Utf8String tags(_tags_to_be_checked);
+        SetMessageTags(msg, *tags);
+    }
+
+    if(_keys_to_be_checked->IsString())
+    {
+        Nan::Utf8String keys(_keys_to_be_checked);
+        SetMessageKeys(msg, *keys);
+    }
+
+    // set message body:
+    //   1. if it's a string, call `SetMessageBody`;
+    //   2. if it's a buffer, call `SetByteMessageBody`.
+    if(info[1]->IsString())
+    {
+        Nan::Utf8String body(info[1]);
+        SetMessageBody(msg, *body);
+    }
+    else
+    {
+        Local<Object> node_buff_object = Nan::To<Object>(info[1]).ToLocalChecked();
+        unsigned int length = node::Buffer::Length(node_buff_object);
+        const char* buff = node::Buffer::Data(node_buff_object);
+        SetByteMessageBody(msg, buff, length);
+    }
+
+    Nan::Callback* callback = (info[3]->IsFunction()) ?
+        new Nan::Callback(Nan::To<Function>(info[3]).ToLocalChecked()) :
+        NULL;
+
+    RocketMQProducer* producer = ObjectWrap::Unwrap<RocketMQProducer>(info.Holder());
+    Nan::AsyncQueueWorker(new ProducerSendMessageWorker(callback, producer, msg));
+}
+
+}
diff --git a/src/producer.h b/src/producer.h
new file mode 100644
index 0000000..55966f9
--- /dev/null
+++ b/src/producer.h
@@ -0,0 +1,46 @@
+#ifndef __ROCKETMQ_PRODUCER_H__
+#define __ROCKETMQ_PRODUCER_H__
+
+#include <CProducer.h>
+#include <nan.h>
+
+namespace __node_rocketmq__ {
+
+using v8::Context;
+using v8::Function;
+using v8::FunctionTemplate;
+using v8::Isolate;
+using v8::Local;
+using v8::Object;
+using v8::String;
+using v8::Value;
+
+class RocketMQProducer : public Nan::ObjectWrap {
+public:
+    static NAN_MODULE_INIT(Init);
+
+public:
+    CProducer* GetProducer() { return producer_ptr; }
+
+private:
+    explicit RocketMQProducer(const char* group_id, const char* instance_name);
+    ~RocketMQProducer();
+
+    static NAN_METHOD(New);
+    static NAN_METHOD(Start);
+    static NAN_METHOD(Shutdown);
+    static NAN_METHOD(Send);
+    static NAN_METHOD(SetSessionCredentials);
+
+    static Nan::Persistent<Function> constructor;
+
+private:
+    void SetOptions(Local<Object> options);
+
+private:
+    CProducer* producer_ptr;
+};
+
+}
+
+#endif
diff --git a/src/push_consumer.cpp b/src/push_consumer.cpp
new file mode 100644
index 0000000..94a626b
--- /dev/null
+++ b/src/push_consumer.cpp
@@ -0,0 +1,378 @@
+#include <map>
+#include "push_consumer.h"
+#include "consumer_ack.h"
+#include "workers/push_consumer/start_or_shutdown.h"
+
+using namespace std;
+
+namespace __node_rocketmq__ {
+
+struct MessageHandlerParam
+{
+    RocketMQPushConsumer* consumer;
+    ConsumerAckInner* ack;
+    CMessageExt* msg;
+};
+char message_handler_param_keys[5][8] = { "topic", "tags", "keys", "body", "msgId" };
+
+uv_mutex_t _get_msg_ext_column_lock;
+
+map<CPushConsumer*, RocketMQPushConsumer*> _push_consumer_map;
+
+#define NAN_GET_CPUSHCONSUMER() \
+    RocketMQPushConsumer* _v8_consumer = ObjectWrap::Unwrap<RocketMQPushConsumer>(info.Holder()); \
+    CPushConsumer* consumer_ptr = _v8_consumer->GetConsumer();
+
+Nan::Persistent<Function> RocketMQPushConsumer::constructor;
+
+RocketMQPushConsumer::RocketMQPushConsumer(const char* group_id, const char* instance_name) :
+    consumer_ptr(NULL)
+{
+    consumer_ptr = CreatePushConsumer(group_id);
+
+    if(instance_name)
+    {
+        SetPushConsumerInstanceName(consumer_ptr, instance_name);
+    }
+
+    _push_consumer_map[consumer_ptr] = this;
+
+    RegisterMessageCallback(consumer_ptr, RocketMQPushConsumer::OnMessage);
+}
+
+RocketMQPushConsumer::~RocketMQPushConsumer()
+{
+    try
+    {
+        ShutdownPushConsumer(consumer_ptr);
+        auto it = _push_consumer_map.find(consumer_ptr);
+        if(it != _push_consumer_map.end())
+        {
+            _push_consumer_map.erase(consumer_ptr);
+        }
+    }
+    catch(...)
+    {
+        //
+    }
+
+    DestroyPushConsumer(consumer_ptr);
+    consumer_ptr = NULL;
+}
+
+void RocketMQPushConsumer::SetOptions(Local<Object> options)
+{
+    // set name server
+    Local<Value> _name_server_v = Nan::Get(options, Nan::New<String>("nameServer").ToLocalChecked()).ToLocalChecked();
+    if(_name_server_v->IsString())
+    {
+        Nan::Utf8String namesrv(_name_server_v);
+        SetPushConsumerNameServerAddress(consumer_ptr, *namesrv);
+    }
+
+    // set thread count
+    Local<Value> _thread_count_v = Nan::Get(options, Nan::New<String>("threadCount").ToLocalChecked()).ToLocalChecked();
+    if(_thread_count_v->IsNumber())
+    {
+        int thread_count = Nan::To<int32_t>(_thread_count_v).FromJust();
+        if(thread_count > 0)
+        {
+            SetPushConsumerThreadCount(consumer_ptr, thread_count);
+        }
+    }
+
+    // set message batch max size
+    Local<Value> _max_batch_size_v = Nan::Get(options, Nan::New<String>("maxBatchSize").ToLocalChecked()).ToLocalChecked();
+    if(_max_batch_size_v->IsNumber())
+    {
+        int max_batch_size = Nan::To<int32_t>(_max_batch_size_v).FromJust();
+        if(max_batch_size > 0)
+        {
+            SetPushConsumerMessageBatchMaxSize(consumer_ptr, max_batch_size);
+        }
+    }
+
+    // set log num & single log size
+    int file_num = 3;
+    int64 file_size = 104857600;
+    Local<Value> _log_file_num_v = Nan::Get(options, Nan::New<String>("logFileNum").ToLocalChecked()).ToLocalChecked();
+    Local<Value> _log_file_size_v = Nan::Get(options, Nan::New<String>("logFileSize").ToLocalChecked()).ToLocalChecked();
+    if(_log_file_num_v->IsNumber())
+    {
+        file_num = _log_file_num_v->Int32Value();
+    }
+    if(_log_file_size_v->IsNumber())
+    {
+        file_size = _log_file_size_v->Int32Value();
+    }
+    SetPushConsumerLogFileNumAndSize(consumer_ptr, file_num, file_size);
+
+    // set log level
+    Local<Value> _log_level_v = Nan::Get(options, Nan::New<String>("logLevel").ToLocalChecked()).ToLocalChecked();
+    if(_log_level_v->IsNumber())
+    {
+        int level = _log_level_v->Int32Value();
+        SetPushConsumerLogLevel(consumer_ptr, (CLogLevel) level);
+    }
+}
+
+NAN_MODULE_INIT(RocketMQPushConsumer::Init)
+{
+    uv_mutex_init(&_get_msg_ext_column_lock);
+    Local<FunctionTemplate> tpl = Nan::New<FunctionTemplate>(New);
+    tpl->SetClassName(Nan::New("RocketMQPushConsumer").ToLocalChecked());
+    tpl->InstanceTemplate()->SetInternalFieldCount(1);
+
+    Nan::SetPrototypeMethod(tpl, "start", Start);
+    Nan::SetPrototypeMethod(tpl, "shutdown", Shutdown);
+    Nan::SetPrototypeMethod(tpl, "subscribe", Subscribe);
+    Nan::SetPrototypeMethod(tpl, "setListener", SetListener);
+    Nan::SetPrototypeMethod(tpl, "setSessionCredentials", SetSessionCredentials);
+
+    constructor.Reset(tpl->GetFunction());
+    Nan::Set(target, Nan::New("PushConsumer").ToLocalChecked(), tpl->GetFunction());
+}
+
+NAN_METHOD(RocketMQPushConsumer::New)
+{
+    Isolate* isolate = info.GetIsolate();
+    Local<Context> context = Context::New(isolate);
+
+    if(!info.IsConstructCall())
+    {
+        const int argc = 3;
+        Local<Value> argv[argc] = { info[0], info[1], info[2] };
+        Local<Function> _constructor = Nan::New<v8::Function>(constructor);
+        info.GetReturnValue().Set(_constructor->NewInstance(context, argc, argv).ToLocalChecked());
+        return;
+    }
+
+    Nan::Utf8String v8_group_id(info[0]);
+    Nan::Utf8String v8_instance_name(info[1]);
+    string group_id = *v8_group_id;
+    string instance_name = *v8_instance_name;
+    Local<Object> options = Nan::To<Object>(info[2]).ToLocalChecked();
+    RocketMQPushConsumer* consumer = new RocketMQPushConsumer(group_id.c_str(), info[1]->IsNull() ? NULL : instance_name.c_str());
+
+    consumer->Wrap(info.This());
+
+    // try to set options
+    try
+    {
+        consumer->SetOptions(options);
+    }
+    catch(const runtime_error e)
+    {
+        Nan::ThrowError(e.what());
+        return;
+    }
+    catch(const std::exception& e)
+    {
+        Nan::ThrowError(e.what());
+        return;
+    }
+
+    info.GetReturnValue().Set(info.This());
+}
+
+NAN_METHOD(RocketMQPushConsumer::Start)
+{
+    NAN_GET_CPUSHCONSUMER();
+
+    Nan::Callback* callback = (info[0]->IsFunction()) ?
+        new Nan::Callback(Nan::To<Function>(info[0]).ToLocalChecked()) :
+        NULL;
+
+    Nan::AsyncQueueWorker(new PushConsumerStartOrShutdownWorker(callback, consumer_ptr, PushConsumerWorkerType::START_PUSH_CONSUMER));
+}
+
+NAN_METHOD(RocketMQPushConsumer::Shutdown)
+{
+    NAN_GET_CPUSHCONSUMER();
+
+    Nan::Callback* callback = (info[0]->IsFunction()) ?
+        new Nan::Callback(Nan::To<Function>(info[0]).ToLocalChecked()) :
+        NULL;
+
+    Nan::AsyncQueueWorker(new PushConsumerStartOrShutdownWorker(callback, consumer_ptr, PushConsumerWorkerType::SHUTDOWN_PUSH_CONSUMER));
+}
+
+NAN_METHOD(RocketMQPushConsumer::Subscribe)
+{
+    NAN_GET_CPUSHCONSUMER();
+
+    Nan::Utf8String v8_topic(info[0]);
+    Nan::Utf8String v8_expression(info[1]);
+    string topic = *v8_topic;
+    string expression = *v8_expression;
+
+    int ret;
+    try
+    {
+        ret = ::Subscribe(consumer_ptr, topic.c_str(), expression.c_str());
+    }
+    catch(const runtime_error e)
+    {
+        Nan::ThrowError(e.what());
+        return;
+    }
+    catch(const std::exception& e)
+    {
+        Nan::ThrowError(e.what());
+        return;
+    }
+
+    info.GetReturnValue().Set(ret);
+}
+
+NAN_METHOD(RocketMQPushConsumer::SetListener)
+{
+    RocketMQPushConsumer* consumer = ObjectWrap::Unwrap<RocketMQPushConsumer>(info.Holder());
+    if(!consumer->listener_func.IsEmpty())
+    {
+        consumer->listener_func.Reset();
+    }
+
+    consumer->listener_func.Reset(Nan::To<Function>(info[0]).ToLocalChecked());
+}
+
+NAN_METHOD(RocketMQPushConsumer::SetSessionCredentials)
+{
+    NAN_GET_CPUSHCONSUMER();
+
+    Nan::Utf8String access_key(info[0]);
+    Nan::Utf8String secret_key(info[1]);
+    Nan::Utf8String ons_channel(info[2]);
+
+    int ret;
+    try
+    {
+        ret = SetPushConsumerSessionCredentials(consumer_ptr, *access_key, *secret_key, *ons_channel);
+    }
+    catch(const runtime_error e)
+    {
+        Nan::ThrowError(e.what());
+        return;
+    }
+    catch(const std::exception& e)
+    {
+        Nan::ThrowError(e.what());
+        return;
+    }
+
+    info.GetReturnValue().Set(ret);
+}
+
+string RocketMQPushConsumer::GetMessageColumn(char* name, CMessageExt* msg)
+{
+    const char* orig = NULL;
+
+    uv_mutex_lock(&_get_msg_ext_column_lock);
+    switch(name[0])
+    {
+    // topic / tags
+    case 't':
+        orig = name[1] == 'o' ? GetMessageTopic(msg) : GetMessageTags(msg);
+        break;
+
+    // keys
+    case 'k':
+        orig = GetMessageKeys(msg);
+        break;
+
+    // body
+    case 'b':
+        orig = GetMessageBody(msg);
+        break;
+
+    // msgId
+    case 'm':
+        orig = GetMessageId(msg);
+        break;
+
+    default:
+        orig = NULL;
+        break;
+    }
+
+    uv_mutex_unlock(&_get_msg_ext_column_lock);
+
+    if(!orig) return "";
+    return orig;
+}
+
+void close_async_done(uv_handle_t* handle)
+{
+    free(handle);
+}
+
+void RocketMQPushConsumer::HandleMessageInEventLoop(uv_async_t* async)
+{
+    Nan::HandleScope scope;
+
+    Isolate* isolate = Isolate::GetCurrent();
+    Local<Context> context = isolate->GetCurrentContext();
+
+    MessageHandlerParam* param = (MessageHandlerParam*)(async->data);
+    RocketMQPushConsumer* consumer = param->consumer;
+    ConsumerAckInner* ack_inner = param->ack;
+    CMessageExt* msg = param->msg;
+
+    // create the JavaScript ack object and then set inner ack object
+    Local<Function> cons = Nan::New<Function>(ConsumerAck::GetConstructor());
+    Local<Object> ack_obj = cons->NewInstance(context, 0, 0).ToLocalChecked();
+    ConsumerAck* ack = ObjectWrap::Unwrap<ConsumerAck>(ack_obj);
+    ack->SetInner(ack_inner);
+
+    // TODO: const char *GetMessageProperty(CMessageExt *msgExt, const char *key);
+    Local<Object> result = Nan::New<Object>();
+    for(int i = 0; i < 5; i++)
+    {
+        Nan::Set(
+            result,
+            Nan::New(message_handler_param_keys[i]).ToLocalChecked(),
+            Nan::New(RocketMQPushConsumer::GetMessageColumn(message_handler_param_keys[i], msg)).ToLocalChecked());
+    }
+
+    Local<Value> argv[2] = {
+        result,
+        ack_obj
+    };
+    Nan::Callback* callback = consumer->GetListenerFunction();
+    callback->Call(2, argv);
+
+    uv_close((uv_handle_t*)async, close_async_done);
+}
+
+int RocketMQPushConsumer::OnMessage(CPushConsumer* consumer_ptr, CMessageExt* msg_ext)
+{
+    RocketMQPushConsumer* consumer = _push_consumer_map[consumer_ptr];
+    if (!consumer)
+    {
+        // TODO: error handle
+        return CConsumeStatus::E_RECONSUME_LATER;
+    }
+
+    ConsumerAckInner ack_inner;
+
+    // create async parameter
+    MessageHandlerParam param;
+    param.consumer = consumer;
+    param.ack = &ack_inner;
+    param.msg = msg_ext;
+
+    // create a new async handler and bind with `RocketMQPushConsumer::HandleMessageInEventLoop`
+    uv_async_t* async = (uv_async_t*)malloc(sizeof(uv_async_t));
+    uv_async_init(uv_default_loop(), async, RocketMQPushConsumer::HandleMessageInEventLoop);
+    async->data = (void*)(&param);
+
+    // send async handler
+    uv_async_send(async);
+
+    // wait for result
+    CConsumeStatus status = ack_inner.WaitResult();
+
+    return status;
+}
+
+}
diff --git a/src/push_consumer.h b/src/push_consumer.h
new file mode 100644
index 0000000..8ee466e
--- /dev/null
+++ b/src/push_consumer.h
@@ -0,0 +1,62 @@
+#ifndef __ROCKETMQ_PUSH_CONSUMER_H__
+#define __ROCKETMQ_PUSH_CONSUMER_H__
+
+#include <CPushConsumer.h>
+#include <uv.h>
+#include <nan.h>
+#include <string>
+
+namespace __node_rocketmq__ {
+
+using v8::Context;
+using v8::Function;
+using v8::FunctionTemplate;
+using v8::Isolate;
+using v8::Local;
+using v8::Object;
+using v8::String;
+using v8::Value;
+
+class RocketMQPushConsumer : public Nan::ObjectWrap {
+public:
+    static NAN_MODULE_INIT(Init);
+    static int OnMessage(CPushConsumer* consumer_ptr, CMessageExt* msg_ext);
+    static std::string GetMessageColumn(char* name, CMessageExt* msg);
+
+private:
+    explicit RocketMQPushConsumer(const char* group_id, const char* instance_name);
+    ~RocketMQPushConsumer();
+
+    static NAN_METHOD(New);
+    static NAN_METHOD(Start);
+    static NAN_METHOD(Shutdown);
+    static NAN_METHOD(Subscribe);
+    static NAN_METHOD(SetListener);
+    static NAN_METHOD(SetSessionCredentials);
+
+    static Nan::Persistent<v8::Function> constructor;
+
+    void SetOptions(Local<Object>);
+    static void HandleMessageInEventLoop(uv_async_t* async);
+
+protected:
+    CPushConsumer* GetConsumer()
+    {
+        return consumer_ptr;
+    }
+
+    Nan::Callback* GetListenerFunction()
+    {
+        Nan::Callback* cb;
+        cb = &listener_func;
+        return cb;
+    }
+
+private:
+    CPushConsumer* consumer_ptr;
+    Nan::Callback listener_func;
+};
+
+}
+
+#endif
diff --git a/src/rocketmq.cpp b/src/rocketmq.cpp
new file mode 100644
index 0000000..bc29609
--- /dev/null
+++ b/src/rocketmq.cpp
@@ -0,0 +1,32 @@
+#include <nan.h>
+
+#include "producer.h"
+#include "push_consumer.h"
+#include "consumer_ack.h"
+
+namespace __node_rocketmq__ {
+
+#if defined(__APPLE__)
+uv_lib_t lib;
+
+NAN_METHOD(DLOpen)
+{
+    Nan::Utf8String filename(info[0]);
+    uv_dlopen(*filename, &lib);
+}
+#endif
+
+NAN_MODULE_INIT(Init)
+{
+    RocketMQProducer::Init(target);
+    RocketMQPushConsumer::Init(target);
+    ConsumerAck::Init(target);
+
+#if defined(__APPLE__)
+    Nan::Set(target, Nan::New("macosDLOpen").ToLocalChecked(), Nan::New<v8::FunctionTemplate>(DLOpen)->GetFunction());
+#endif
+}
+
+NODE_MODULE(rocketmq, Init)
+
+}
diff --git a/src/workers/producer/send_message.h b/src/workers/producer/send_message.h
new file mode 100644
index 0000000..f757319
--- /dev/null
+++ b/src/workers/producer/send_message.h
@@ -0,0 +1,64 @@
+#ifndef __ROCKETMQ_SEND_MESSAGE_H__
+#define __ROCKETMQ_SEND_MESSAGE_H__
+
+#include <nan.h>
+#include <CProducer.h>
+#include <MQClientException.h>
+
+namespace __node_rocketmq__ {
+
+using namespace std;
+
+class ProducerSendMessageWorker : public Nan::AsyncWorker {
+public:
+    ProducerSendMessageWorker(Nan::Callback* callback, RocketMQProducer* producer, CMessage* msg) :
+        AsyncWorker(callback),
+        msg(msg),
+        producer(producer)
+    {
+    }
+
+    ~ProducerSendMessageWorker()
+    {
+        DestroyMessage(msg);
+    }
+
+    void Execute()
+    {
+        try
+        {
+            SendMessageSync(producer->GetProducer(), msg, &send_ret);
+        }
+        catch(const runtime_error e)
+        {
+            SetErrorMessage(e.what());
+        }
+        catch(const std::exception& e)
+        {
+            SetErrorMessage(e.what());
+        }
+    }
+
+    void HandleOKCallback()
+    {
+        Nan::HandleScope scope;
+
+        Local<Value> argv[] = {
+            Nan::Undefined(),
+            Nan::New<v8::Number>((unsigned int)send_ret.sendStatus),
+            Nan::New<v8::String>(send_ret.msgId).ToLocalChecked(),
+            Nan::New<v8::Number>((long long)send_ret.offset)
+        };
+        callback->Call(4, argv);
+    }
+
+private:
+    CMessage* msg;
+    RocketMQProducer* producer;
+
+    CSendResult send_ret;
+};
+
+}
+
+#endif
diff --git a/src/workers/producer/start_or_shutdown.h b/src/workers/producer/start_or_shutdown.h
new file mode 100644
index 0000000..42aae60
--- /dev/null
+++ b/src/workers/producer/start_or_shutdown.h
@@ -0,0 +1,72 @@
+#ifndef __ROCKETMQ_PRODUCER_START_OR_SHUTDOWN_H__
+#define __ROCKETMQ_PRODUCER_START_OR_SHUTDOWN_H__
+
+#include <nan.h>
+#include <CProducer.h>
+#include <MQClientException.h>
+
+namespace __node_rocketmq__ {
+
+using namespace std;
+
+enum ProducerWorkerType {
+    START_PRODUCER = 0,
+    SHUTDOWN_PRODUCER
+};
+
+class ProducerStartOrShutdownWorker : public Nan::AsyncWorker {
+public:
+    ProducerStartOrShutdownWorker(Nan::Callback* callback, CProducer* producer_ptr, ProducerWorkerType type) :
+        AsyncWorker(callback),
+        producer_ptr(producer_ptr),
+        ret(0),
+        type(type)
+    {
+    }
+
+    ~ProducerStartOrShutdownWorker()
+    {
+    }
+
+    void Execute()
+    {
+        try
+        {
+            switch(type) {
+            case START_PRODUCER:
+                ret = StartProducer(producer_ptr); break;
+            case SHUTDOWN_PRODUCER:
+                ret = ShutdownProducer(producer_ptr); break;
+            default: break;
+            }
+        }
+        catch(const runtime_error e)
+        {
+            SetErrorMessage(e.what());
+        }
+        catch(const exception& e)
+        {
+            SetErrorMessage(e.what());
+        }
+    }
+
+    void HandleOKCallback()
+    {
+        Nan::HandleScope scope;
+
+        Local<Value> argv[] = {
+            Nan::Undefined(),
+            Nan::New<v8::Number>((int)ret),
+        };
+        callback->Call(2, argv);
+    }
+
+private:
+    CProducer* producer_ptr;
+    int ret;
+    ProducerWorkerType type;
+};
+
+}
+
+#endif
diff --git a/src/workers/push_consumer/start_or_shutdown.h b/src/workers/push_consumer/start_or_shutdown.h
new file mode 100644
index 0000000..84371b1
--- /dev/null
+++ b/src/workers/push_consumer/start_or_shutdown.h
@@ -0,0 +1,72 @@
+#ifndef __ROCKETMQ_PUSH_CONSUMER_START_OR_SHUTDOWN_H__
+#define __ROCKETMQ_PUSH_CONSUMER_START_OR_SHUTDOWN_H__
+
+#include <nan.h>
+#include <CPushConsumer.h>
+#include <MQClientException.h>
+
+namespace __node_rocketmq__ {
+
+using namespace std;
+
+enum PushConsumerWorkerType {
+    START_PUSH_CONSUMER = 0,
+    SHUTDOWN_PUSH_CONSUMER
+};
+
+class PushConsumerStartOrShutdownWorker : public Nan::AsyncWorker {
+public:
+    PushConsumerStartOrShutdownWorker(Nan::Callback* callback, CPushConsumer* consumer_ptr, PushConsumerWorkerType type) :
+        AsyncWorker(callback),
+        consumer_ptr(consumer_ptr),
+        ret(0),
+        type(type)
+    {
+    }
+
+    ~PushConsumerStartOrShutdownWorker()
+    {
+    }
+
+    void Execute()
+    {
+        try
+        {
+            switch(type) {
+            case START_PUSH_CONSUMER:
+                ret = StartPushConsumer(consumer_ptr); break;
+            case SHUTDOWN_PUSH_CONSUMER:
+                ret = ShutdownPushConsumer(consumer_ptr); break;
+            default: break;
+            }
+        }
+        catch(const runtime_error e)
+        {
+            SetErrorMessage(e.what());
+        }
+        catch(const exception& e)
+        {
+            SetErrorMessage(e.what());
+        }
+    }
+
+    void HandleOKCallback()
+    {
+        Nan::HandleScope scope;
+
+        Local<Value> argv[] = {
+            Nan::Undefined(),
+            Nan::New<v8::Number>((int)ret),
+        };
+        callback->Call(2, argv);
+    }
+
+private:
+    CPushConsumer* consumer_ptr;
+    int ret;
+    PushConsumerWorkerType type;
+};
+
+}
+
+#endif


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services