You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2017/06/15 09:50:26 UTC

[2/6] incubator-rocketmq-externals git commit: RocketMQ-MySQL 1.0-snapshot closes apache/incubator-rocketmq-externals#24

RocketMQ-MySQL 1.0-snapshot closes apache/incubator-rocketmq-externals#24


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/commit/5593575c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/tree/5593575c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/diff/5593575c

Branch: refs/heads/master
Commit: 5593575cbbeb4a6e92ba3d47caef0c7cd48e7278
Parents: aaa0758
Author: zhaoqun911 <91...@zhaoqun911.cn>
Authored: Wed Jun 14 20:36:26 2017 +0800
Committer: vongosling <vo...@alibaba-inc.com>
Committed: Wed Jun 14 20:36:26 2017 +0800

----------------------------------------------------------------------
 rocketmq-mysql/.gitignore                       |  14 +
 rocketmq-mysql/LICENSE-BIN                      | 279 ++++++++++++++++
 rocketmq-mysql/NOTICE-BIN                       |   5 +
 rocketmq-mysql/README.md                        |  41 ++-
 rocketmq-mysql/pom.xml                          | 275 ++++++++++++++++
 rocketmq-mysql/src/main/assembly/assembly.xml   |  61 ++++
 .../src/main/assembly/scripts/start.sh          |  23 ++
 .../src/main/assembly/scripts/stop.sh           |  18 ++
 .../java/org/apache/rocketmq/mysql/Config.java  | 130 ++++++++
 .../org/apache/rocketmq/mysql/Replicator.java   | 129 ++++++++
 .../apache/rocketmq/mysql/binlog/DataRow.java   |  79 +++++
 .../rocketmq/mysql/binlog/EventListener.java    |  46 +++
 .../rocketmq/mysql/binlog/EventProcessor.java   | 318 +++++++++++++++++++
 .../rocketmq/mysql/binlog/Transaction.java      |  91 ++++++
 .../rocketmq/mysql/offset/OffsetLogThread.java  |  49 +++
 .../rocketmq/mysql/position/BinlogPosition.java |  47 +++
 .../mysql/position/BinlogPositionManager.java   | 131 ++++++++
 .../mysql/productor/RocketMQProducer.java       |  59 ++++
 .../apache/rocketmq/mysql/schema/Database.java  | 108 +++++++
 .../apache/rocketmq/mysql/schema/Schema.java    | 128 ++++++++
 .../org/apache/rocketmq/mysql/schema/Table.java |  59 ++++
 .../mysql/schema/column/BigIntColumnParser.java |  50 +++
 .../mysql/schema/column/ColumnParser.java       |  76 +++++
 .../schema/column/DateTimeColumnParser.java     |  40 +++
 .../schema/column/DefaultColumnParser.java      |  37 +++
 .../mysql/schema/column/EnumColumnParser.java   |  46 +++
 .../mysql/schema/column/IntColumnParser.java    |  69 ++++
 .../mysql/schema/column/SetColumnParser.java    |  55 ++++
 .../mysql/schema/column/StringColumnParser.java |  57 ++++
 .../mysql/schema/column/TimeColumnParser.java   |  40 +++
 .../mysql/schema/column/YearColumnParser.java   |  40 +++
 rocketmq-mysql/src/main/resources/logback.xml   |  79 +++++
 .../src/main/resources/rocketmq_mysql.conf      |  28 ++
 .../rocketmq/mysql/BigIntColumnParserTest.java  |  38 +++
 .../rocketmq/mysql/EnumColumnParserTest.java    |  38 +++
 .../rocketmq/mysql/IntColumnParserTest.java     |  56 ++++
 .../rocketmq/mysql/SetColumnParserTest.java     |  38 +++
 rocketmq-mysql/style/copyright/Apache.xml       |  23 ++
 .../style/copyright/profiles_settings.xml       |  64 ++++
 rocketmq-mysql/style/rmq_checkstyle.xml         | 134 ++++++++
 rocketmq-mysql/style/rmq_codeStyle.xml          | 143 +++++++++
 41 files changed, 3240 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/.gitignore
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/.gitignore b/rocketmq-mysql/.gitignore
new file mode 100644
index 0000000..3311eab
--- /dev/null
+++ b/rocketmq-mysql/.gitignore
@@ -0,0 +1,14 @@
+*dependency-reduced-pom.xml
+.classpath
+.project
+.settings/
+target/
+devenv
+*.log*
+*.iml
+.idea/
+*.versionsBackup
+*bin
+!NOTICE-BIN
+!LICENSE-BIN
+.DS_Store
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/LICENSE-BIN
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/LICENSE-BIN b/rocketmq-mysql/LICENSE-BIN
new file mode 100644
index 0000000..5d47613
--- /dev/null
+++ b/rocketmq-mysql/LICENSE-BIN
@@ -0,0 +1,279 @@
+Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "{}"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright {}
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+
+
+------
+This product has a bundle logback, which is available under the EPL v1.0 License.
+The source code of logback can be found at https://github.com/qos-ch/logback.
+
+Logback LICENSE
+---------------
+
+Logback: the reliable, generic, fast and flexible logging framework.
+Copyright (C) 1999-2015, QOS.ch. All rights reserved.
+
+This program and the accompanying materials are dual-licensed under
+either the terms of the Eclipse Public License v1.0 as published by
+the Eclipse Foundation
+
+  or (per the licensee's choosing)
+
+under the terms of the GNU Lesser General Public License version 2.1
+as published by the Free Software Foundation.
+
+------
+This product has a bundle slf4j, which is available under the MIT License.
+The source code of slf4j can be found at https://github.com/qos-ch/slf4j.
+
+ Copyright (c) 2004-2017 QOS.ch
+ All rights reserved.
+
+ Permission is hereby granted, free  of charge, to any person obtaining
+ a  copy  of this  software  and  associated  documentation files  (the
+ "Software"), to  deal in  the Software without  restriction, including
+ without limitation  the rights to  use, copy, modify,  merge, publish,
+ distribute,  sublicense, and/or sell  copies of  the Software,  and to
+ permit persons to whom the Software  is furnished to do so, subject to
+ the following conditions:
+
+ The  above  copyright  notice  and  this permission  notice  shall  be
+ included in all copies or substantial portions of the Software.
+
+ THE  SOFTWARE IS  PROVIDED  "AS  IS", WITHOUT  WARRANTY  OF ANY  KIND,
+ EXPRESS OR  IMPLIED, INCLUDING  BUT NOT LIMITED  TO THE  WARRANTIES OF
+ MERCHANTABILITY,    FITNESS    FOR    A   PARTICULAR    PURPOSE    AND
+ NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+ LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+ OF CONTRACT, TORT OR OTHERWISE,  ARISING FROM, OUT OF OR IN CONNECTION
+ WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+------
+This product has a bundle fastjson, which is available under the ASL2 License.
+The source code of fastjson can be found at https://github.com/alibaba/fastjson.
+
+ Copyright 1999-2016 Alibaba Group Holding Ltd.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+------
+This product has a bundle commons-codec, which is available under the ASL2 License.
+The source code of commons-codec can be found at http://svn.apache.org/viewvc/commons/proper/codec/trunk/.
+
+Apache Commons Codec
+Copyright 2002-2016 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java
+contains test data from http://aspell.net/test/orig/batch0.tab.
+Copyright (C) 2002 Kevin Atkinson (kevina@gnu.org)

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/NOTICE-BIN
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/NOTICE-BIN b/rocketmq-mysql/NOTICE-BIN
new file mode 100644
index 0000000..5384857
--- /dev/null
+++ b/rocketmq-mysql/NOTICE-BIN
@@ -0,0 +1,5 @@
+Apache RocketMQ (incubating)
+Copyright 2016-2017 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/README.md
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/README.md b/rocketmq-mysql/README.md
index 8b47a0e..cd3b54c 100644
--- a/rocketmq-mysql/README.md
+++ b/rocketmq-mysql/README.md
@@ -4,6 +4,45 @@
 ## Overview
 ![overview](./doc/overview.png)
 
+The RocketMQ-MySQL is a data replicator between MySQL and other systems, 
+the replicator simulates a MySQL slave instance and parses the binlog event 
+and sends it to the RocketMQ in json format ,other systems can consume data from RocketMQ. 
+With the RocketMQ-MySQL Replicator,more systems can process data from MySQL binlog 
+in a simple and low cost method.
+
+## Dataflow
+![dataflow](./doc/dataflow.png)
+
+* 1.Firstly,get the last data from the queue,and get the binlog position from this data,and if the queue data is null,then use the latest binlog position of the MySQL,and surely user can also specify this position on his own;
+* 2.Send a binlog dump request to the MySQL;
+* 3.The MySQL push binlog event to the replicator,the replicator parses the data and accumulate as a transaction-object;
+* 4.Add the next-position of the transaction to the transaction-object and send it in json format to the queue;
+* 5.Record the binlog position and the offset in the queue of the latest transaction every second.
+
+
+## Quick Start
+
+* 1.Create an account with MySQL replication permission,which is used to simulate the MySQL slave to get the binlog event,and the replication must be in row mode;
+* 2.Create a topic in the RocketMQ to store binlog events,in order to ensure that the downstream system consumes the data orderly,the topic must have only one queue;
+* 3.Configure relevant information of MySQL and RocketMQ in the RocketMQ-MySQL.conf file;
+* 4.Execute"mvn install",and then start the replicator(execute "nohup ./start.sh &");
+* 5.Subscribe to and process the messages in your system.
+
+
+## Configuration Instruction
+|key               |nullable|default    |description|
+|------------------|--------|-----------|-----------|
+|mysqlAddr         |false   |           |MySQL address|
+|mysqlPort         |false   |           |MySQL port|
+|mysqlUsername     |false   |           |username of MySQL account|
+|mysqlPassword     |false   |           |password of MySQL account|
+|mqNamesrvAddr     |false   |           |RocketMQ name server address (e.g.,127.0.0.1:9876)|
+|mqTopic           |false   |           |RocketMQ topic name|
+|startType         |true    |NEW_EVENT  |The way that the replicator starts processing data,there are three options available:<br>- NEW_EVENT: starts processing data from the tail of binlog<br>- LAST_PROCESSED: starts processing data from the last processed event<br>- SPECIFIED:starts processing data from the position that user specified,if you choose this option,the binlogFilename and nextPosition must not be null|
+|binlogFilename    |true    |           |If "startType" is "SPECIFIED",the replicator will begin to replicate from this binlog file|
+|nextPosition      |true    |           |If "startType" is "SPECIFIED",the replicator will begin to replicate from this position|
+|maxTransactionRows|true    |100        |max rows of the transaction pushed to RocketMQ|
+=======
 The RocketMQ-MySQL is a data replicator between MySQL and other system,the replicator parse the binglog event and send it in json format to the RocketMQ,and other system can pull data from RocketMQ.
 ## Dataflow
 ![dataflow](./doc/dataflow.png)
@@ -12,4 +51,4 @@ The RocketMQ-MySQL is a data replicator between MySQL and other system,the repli
 * 2.Send a binlog dump request to the MySQL;
 * 3.The MySQL push binglog event to the Replicator,the Replicator parse the data and accumulate as a transaction-object;
 * 4.Add the next-position of the transaction to the transaction-object and send it in json format to the queue;
-* 5.Record the binglog position and the offset in the queue of the latest transaction every second.
\ No newline at end of file
+* 5.Record the binglog position and the offset in the queue of the latest transaction every second.

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/pom.xml
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/pom.xml b/rocketmq-mysql/pom.xml
new file mode 100644
index 0000000..8603bdd
--- /dev/null
+++ b/rocketmq-mysql/pom.xml
@@ -0,0 +1,275 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.apache</groupId>
+    <artifactId>rocketmq-mysql-replicator</artifactId>
+    <version>1.0.0-SNAPSHOT</version>
+
+    <scm>
+        <url>https://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals.git</url>
+        <connection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals.git</connection>
+        <developerConnection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals.git
+        </developerConnection>
+        <tag>HEAD</tag>
+    </scm>
+
+    <mailingLists>
+        <mailingList>
+            <name>Development List</name>
+            <subscribe>dev-subscribe@rocketmq.incubator.apache.org</subscribe>
+            <unsubscribe>dev-unsubscribe@rocketmq.incubator.apache.org</unsubscribe>
+            <post>dev@rocketmq.incubator.apache.org</post>
+        </mailingList>
+        <mailingList>
+            <name>User List</name>
+            <subscribe>users-subscribe@rocketmq.incubator.apache.org</subscribe>
+            <unsubscribe>users-unsubscribe@rocketmq.incubator.apache.org</unsubscribe>
+            <post>users@rocketmq.incubator.apache.org</post>
+        </mailingList>
+        <mailingList>
+            <name>Commits List</name>
+            <subscribe>commits-subscribe@rocketmq.incubator.apache.org</subscribe>
+            <unsubscribe>commits-unsubscribe@rocketmq.incubator.apache.org</unsubscribe>
+            <post>commits@rocketmq.incubator.apache.org</post>
+        </mailingList>
+    </mailingLists>
+
+    <developers>
+        <developer>
+            <id>Apache RocketMQ</id>
+            <name>Apache RocketMQ of ASF</name>
+            <url>https://rocketmq.apache.org/</url>
+        </developer>
+    </developers>
+
+    <licenses>
+        <license>
+            <name>Apache License, Version 2.0</name>
+            <url>http://www.apache.org/licenses/LICENSE-2.0</url>
+            <distribution>repo</distribution>
+        </license>
+    </licenses>
+
+    <organization>
+        <name>Apache Software Foundation</name>
+        <url>http://www.apache.org</url>
+    </organization>
+
+    <issueManagement>
+        <system>jira</system>
+        <url>https://issues.apache.org/jira/browse/RocketMQ</url>
+    </issueManagement>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+
+        <maven.test.skip>false</maven.test.skip>
+        <maven.javadoc.skip>true</maven.javadoc.skip>
+        <maven.compiler.source>1.7</maven.compiler.source>
+        <maven.compiler.target>1.7</maven.compiler.target>
+        <rocketmq.version>4.0.0-incubating</rocketmq.version>
+    </properties>
+
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-client</artifactId>
+            <version>${rocketmq.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.zendesk</groupId>
+            <artifactId>open-replicator</artifactId>
+            <version>1.6.0</version>
+        </dependency>
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+            <version>5.1.39</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>1.7.5</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <version>1.0.13</version>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>druid</artifactId>
+            <version>1.0.29</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-codec</groupId>
+            <artifactId>commons-codec</artifactId>
+            <version>1.9</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.11</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <finalName>rocketmq-mysql</finalName>
+        <sourceDirectory>${project.basedir}/src/main/java</sourceDirectory>
+        <outputDirectory>${project.basedir}/target/classes</outputDirectory>
+        <resources>
+            <resource>
+                <directory>${project.basedir}/src/main/resources</directory>
+                <filtering>true</filtering>
+            </resource>
+        </resources>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>clirr-maven-plugin</artifactId>
+                <version>2.8</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-enforcer-plugin</artifactId>
+                <version>1.4.1</version>
+                <executions>
+                    <execution>
+                        <id>enforce-ban-circular-dependencies</id>
+                        <goals>
+                            <goal>enforce</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <rules>
+                        <banCircularDependencies/>
+                    </rules>
+                    <fail>true</fail>
+                </configuration>
+                <dependencies>
+                    <dependency>
+                        <groupId>org.codehaus.mojo</groupId>
+                        <artifactId>extra-enforcer-rules</artifactId>
+                        <version>1.0-beta-6</version>
+                    </dependency>
+                </dependencies>
+            </plugin>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.5.1</version>
+                <configuration>
+                    <source>${maven.compiler.source}</source>
+                    <target>${maven.compiler.target}</target>
+                    <compilerVersion>${maven.compiler.source}</compilerVersion>
+                    <showDeprecation>true</showDeprecation>
+                    <showWarnings>true</showWarnings>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-javadoc-plugin</artifactId>
+                <version>2.10.4</version>
+                <configuration>
+                    <charset>UTF-8</charset>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>attach-javadocs</id>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-source-plugin</artifactId>
+                <version>3.0.1</version>
+                <executions>
+                    <execution>
+                        <id>attach-sources</id>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>findbugs-maven-plugin</artifactId>
+                <version>3.0.4</version>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <version>0.12</version>
+                <configuration>
+                    <excludes>
+                        <exclude>README.md</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+                <version>2.17</version>
+                <executions>
+                    <execution>
+                        <id>verify</id>
+                        <phase>verify</phase>
+                        <configuration>
+                            <configLocation>style/rmq_checkstyle.xml</configLocation>
+                            <encoding>UTF-8</encoding>
+                            <consoleOutput>true</consoleOutput>
+                            <failsOnError>true</failsOnError>
+                            <includeTestSourceDirectory>false</includeTestSourceDirectory>
+                            <includeTestResources>false</includeTestResources>
+                        </configuration>
+                        <goals>
+                            <goal>check</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+
+            <plugin>
+                <artifactId>maven-jar-plugin</artifactId>
+                <version>2.4</version>
+                <configuration>
+                    <archive>
+                        <addMavenDescriptor>false</addMavenDescriptor>
+                        <manifest>
+                            <addClasspath>true</addClasspath>
+                            <classpathPrefix>lib/</classpathPrefix>
+                        </manifest>
+                    </archive>
+                    <excludes>
+                        <exclude>rocketmq_mysql.conf</exclude>
+                        <exclude>logback.xml</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>2.4</version>
+                <configuration>
+                    <descriptors>
+                        <descriptor>src/main/assembly/assembly.xml</descriptor>
+                    </descriptors>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/src/main/assembly/assembly.xml
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/assembly/assembly.xml b/rocketmq-mysql/src/main/assembly/assembly.xml
new file mode 100644
index 0000000..b280aa6
--- /dev/null
+++ b/rocketmq-mysql/src/main/assembly/assembly.xml
@@ -0,0 +1,61 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<assembly
+	xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
+	<id>pack</id>
+	<formats>
+		<format>tar.gz</format>
+		<format>dir</format>
+	</formats>
+	<includeBaseDirectory>false</includeBaseDirectory>
+	<dependencySets>
+		<dependencySet>
+			<useProjectArtifact>true</useProjectArtifact>
+			<outputDirectory>lib</outputDirectory>
+		</dependencySet>
+	</dependencySets>
+	<fileSets>
+		<fileSet>
+			<directory>src/main/assembly/scripts</directory>
+			<outputDirectory>bin</outputDirectory>
+			<fileMode>0755</fileMode>
+		</fileSet>
+		<fileSet>
+			<directory>target/classes</directory>
+			<outputDirectory>conf</outputDirectory>
+			<fileMode>0755</fileMode>
+			<includes>
+				<include>*.conf</include>
+				<include>logback.xml</include>
+			</includes>
+		</fileSet>
+	</fileSets>
+
+	<files>
+		<file>
+			<source>LICENSE-BIN</source>
+			<destName>LICENSE</destName>
+		</file>
+		<file>
+			<source>NOTICE-BIN</source>
+			<destName>NOTICE</destName>
+		</file>
+	</files>
+</assembly>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/src/main/assembly/scripts/start.sh
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/assembly/scripts/start.sh b/rocketmq-mysql/src/main/assembly/scripts/start.sh
new file mode 100644
index 0000000..e159f36
--- /dev/null
+++ b/rocketmq-mysql/src/main/assembly/scripts/start.sh
@@ -0,0 +1,23 @@
+#!/usr/bin/env bash
+
+binPath=$(cd "$(dirname "$0")"; pwd);
+cd $binPath
+cd ..
+parentPath=`pwd`
+libPath=$parentPath/lib/
+
+
+function exportClassPath(){
+    jarFileList=`find "$libPath" -name *.jar |awk -F'/' '{print $(NF)}' 2>>/dev/null`
+    CLASSPATH=".:$binPath";
+    for jarItem in $jarFileList
+      do
+        CLASSPATH="$CLASSPATH:$libPath$jarItem"
+    done
+    CLASSPATH=$CLASSPATH:./conf
+    export CLASSPATH
+}
+ulimit -n 65535
+exportClassPath
+
+java -server -Xms512m -Xmx512m -Xss2m -XX:NewRatio=2  -XX:+UseGCOverheadLimit -XX:-UseParallelGC -XX:ParallelGCThreads=24 org.apache.rocketmq.mysql.Replicator

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/src/main/assembly/scripts/stop.sh
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/assembly/scripts/stop.sh b/rocketmq-mysql/src/main/assembly/scripts/stop.sh
new file mode 100755
index 0000000..f0e3c0d
--- /dev/null
+++ b/rocketmq-mysql/src/main/assembly/scripts/stop.sh
@@ -0,0 +1,18 @@
+#!/bin/bash
+
+PROGRAM_NAME="org.apache.rocketmq.mysql.Replicator"
+PIDS=`ps -ef | grep $PROGRAM_NAME | grep -v "grep" | awk '{print $2}'`
+
+if [ -z $PIDS ]; then
+    echo "No this process."
+else
+    echo "Find process is $PIDS."
+fi
+
+#####kill####
+echo -e "Stopping the $PROGRAM_NAME...\c"
+for PID in $PIDS ; do
+    kill  $PID
+done
+
+echo "SUCCESS!"

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Config.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Config.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Config.java
new file mode 100644
index 0000000..0705946
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Config.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.Method;
+import java.util.Properties;
+
+
+public class Config {
+
+    public String mysqlAddr;
+    public Integer mysqlPort;
+    public String mysqlUsername;
+    public String mysqlPassword;
+
+    public String mqNamesrvAddr;
+    public String mqTopic;
+
+    public String startType = "NEW_EVENT";
+    public String binlogFilename;
+    public Long nextPosition;
+    public Integer maxTransactionRows = 100;
+
+    public void load() throws IOException {
+
+        InputStream in = Config.class.getClassLoader().getResourceAsStream("rocketmq_mysql.conf");
+        Properties properties = new Properties();
+        properties.load(in);
+
+        properties2Object(properties, this);
+
+    }
+
+    private void properties2Object(final Properties p, final Object object) {
+        Method[] methods = object.getClass().getMethods();
+        for (Method method : methods) {
+            String mn = method.getName();
+            if (mn.startsWith("set")) {
+                try {
+                    String tmp = mn.substring(4);
+                    String first = mn.substring(3, 4);
+
+                    String key = first.toLowerCase() + tmp;
+                    String property = p.getProperty(key);
+                    if (property != null) {
+                        Class<?>[] pt = method.getParameterTypes();
+                        if (pt != null && pt.length > 0) {
+                            String cn = pt[0].getSimpleName();
+                            Object arg = null;
+                            if (cn.equals("int") || cn.equals("Integer")) {
+                                arg = Integer.parseInt(property);
+                            } else if (cn.equals("long") || cn.equals("Long")) {
+                                arg = Long.parseLong(property);
+                            } else if (cn.equals("double") || cn.equals("Double")) {
+                                arg = Double.parseDouble(property);
+                            } else if (cn.equals("boolean") || cn.equals("Boolean")) {
+                                arg = Boolean.parseBoolean(property);
+                            } else if (cn.equals("float") || cn.equals("Float")) {
+                                arg = Float.parseFloat(property);
+                            } else if (cn.equals("String")) {
+                                arg = property;
+                            } else {
+                                continue;
+                            }
+                            method.invoke(object, arg);
+                        }
+                    }
+                } catch (Throwable ignored) {
+                }
+            }
+        }
+    }
+
+    public void setMysqlAddr(String mysqlAddr) {
+        this.mysqlAddr = mysqlAddr;
+    }
+
+    public void setMysqlPort(Integer mysqlPort) {
+        this.mysqlPort = mysqlPort;
+    }
+
+    public void setMysqlUsername(String mysqlUsername) {
+        this.mysqlUsername = mysqlUsername;
+    }
+
+    public void setMysqlPassword(String mysqlPassword) {
+        this.mysqlPassword = mysqlPassword;
+    }
+
+    public void setBinlogFilename(String binlogFilename) {
+        this.binlogFilename = binlogFilename;
+    }
+
+    public void setNextPosition(Long nextPosition) {
+        this.nextPosition = nextPosition;
+    }
+
+    public void setMaxTransactionRows(Integer maxTransactionRows) {
+        this.maxTransactionRows = maxTransactionRows;
+    }
+
+    public void setMqNamesrvAddr(String mqNamesrvAddr) {
+        this.mqNamesrvAddr = mqNamesrvAddr;
+    }
+
+    public void setMqTopic(String mqTopic) {
+        this.mqTopic = mqTopic;
+    }
+
+    public void setStartType(String startType) {
+        this.startType = startType;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Replicator.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Replicator.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Replicator.java
new file mode 100644
index 0000000..b358567
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Replicator.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql;
+
+import org.apache.rocketmq.mysql.binlog.EventProcessor;
+import org.apache.rocketmq.mysql.binlog.Transaction;
+import org.apache.rocketmq.mysql.offset.OffsetLogThread;
+import org.apache.rocketmq.mysql.productor.RocketMQProducer;
+import org.apache.rocketmq.mysql.position.BinlogPosition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Replicator {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(Replicator.class);
+
+    private static final Logger OFFSET_LOGGER = LoggerFactory.getLogger("OffsetLogger");
+
+    private Config config;
+
+    private EventProcessor eventProcessor;
+
+    private RocketMQProducer rocketMQProducer;
+
+    private Object lock = new Object();
+    private BinlogPosition nextBinlogPosition;
+    private long nextQueueOffset;
+    private long xid;
+
+    public static void main(String[] args) {
+
+        Replicator replicator = new Replicator();
+        replicator.start();
+    }
+
+    public void start() {
+
+        try {
+            config = new Config();
+            config.load();
+
+            rocketMQProducer = new RocketMQProducer(config);
+            rocketMQProducer.start();
+
+            OffsetLogThread offsetLogThread = new OffsetLogThread(this);
+            offsetLogThread.start();
+
+            eventProcessor = new EventProcessor(this);
+            eventProcessor.start();
+
+        } catch (Exception e) {
+            LOGGER.error("Start error.", e);
+            System.exit(1);
+        }
+    }
+
+    public void commit(Transaction transaction, boolean isComplete) {
+
+        String json = transaction.toJson();
+
+        for (int i = 0; i < 3; i++) {
+            try {
+                if (isComplete) {
+                    long offset = rocketMQProducer.push(json);
+
+                    synchronized (lock) {
+                        xid = transaction.getXid();
+                        nextBinlogPosition = transaction.getNextBinlogPosition();
+                        nextQueueOffset = offset;
+                    }
+
+                } else {
+                    rocketMQProducer.push(json);
+                }
+                break;
+
+            } catch (Exception e) {
+                LOGGER.error("Push error,retry:" + (i + 1) + ",", e);
+            }
+        }
+    }
+
+    public void logOffset() {
+
+        String binlogFilename = null;
+        long xid = 0L;
+        long nextPosition = 0L;
+        long nextOffset = 0L;
+
+        synchronized (lock) {
+            if (nextBinlogPosition != null) {
+                xid = this.xid;
+                binlogFilename = nextBinlogPosition.getBinlogFilename();
+                nextPosition = nextBinlogPosition.getPosition();
+                nextOffset = nextQueueOffset;
+            }
+        }
+
+        if (binlogFilename != null) {
+            OFFSET_LOGGER.info("XID: {},   BINLOG_FILE: {},   NEXT_POSITION: {},   NEXT_OFFSET: {}",
+                xid, binlogFilename, nextPosition, nextOffset);
+        }
+
+    }
+
+    public Config getConfig() {
+        return config;
+    }
+
+    public BinlogPosition getNextBinlogPosition() {
+        return nextBinlogPosition;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/DataRow.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/DataRow.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/DataRow.java
new file mode 100644
index 0000000..3d9789f
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/DataRow.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.binlog;
+
+import com.google.code.or.common.glossary.Column;
+import com.google.code.or.common.glossary.Row;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.rocketmq.mysql.schema.Table;
+import org.apache.rocketmq.mysql.schema.column.ColumnParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DataRow {
+
+    private Logger logger = LoggerFactory.getLogger(DataRow.class);
+
+    private String type;
+    private Table table;
+    private Row row;
+
+    public DataRow(String type, Table table, Row row) {
+        this.type = type;
+        this.table = table;
+        this.row = row;
+    }
+
+    public Map toMap() {
+
+        try {
+            if (table.getColList().size() == row.getColumns().size()) {
+                Map<String, Object> dataMap = new HashMap<>();
+                List<String> keyList = table.getColList();
+                List<ColumnParser> parserList = table.getParserList();
+                List<Column> valueList = row.getColumns();
+
+                for (int i = 0; i < keyList.size(); i++) {
+                    Object value = valueList.get(i).getValue();
+                    ColumnParser parser = parserList.get(i);
+                    dataMap.put(keyList.get(i), parser.getValue(value));
+                }
+
+                Map<String, Object> map = new HashMap<>();
+                map.put("database", table.getDatabase());
+                map.put("table", table.getName());
+                map.put("type", type);
+                map.put("data", dataMap);
+
+                return map;
+            } else {
+                logger.error("Table schema changed,discard data: {} - {}, {}  {}",
+                    table.getDatabase().toUpperCase(), table.getName().toUpperCase(), type, row.toString());
+
+                return null;
+            }
+        } catch (Exception e) {
+            logger.error("Row parse error,discard data: {} - {}, {}  {}",
+                table.getDatabase().toUpperCase(), table.getName().toUpperCase(), type, row.toString());
+        }
+
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventListener.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventListener.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventListener.java
new file mode 100644
index 0000000..ecc632e
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventListener.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.binlog;
+
+import com.google.code.or.binlog.BinlogEventListener;
+import com.google.code.or.binlog.BinlogEventV4;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class EventListener implements BinlogEventListener {
+
+    private BlockingQueue<BinlogEventV4> queue;
+
+    public EventListener(BlockingQueue<BinlogEventV4> queue) {
+        this.queue = queue;
+    }
+
+    @Override
+    public void onEvents(BinlogEventV4 event) {
+
+        try {
+            while (true) {
+                if (queue.offer(event, 100, TimeUnit.MILLISECONDS)) {
+                    return;
+                }
+            }
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java
new file mode 100644
index 0000000..f937b6d
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.binlog;
+
+import com.alibaba.druid.pool.DruidDataSourceFactory;
+import com.google.code.or.OpenReplicator;
+import com.google.code.or.binlog.BinlogEventV4;
+import com.google.code.or.binlog.impl.event.DeleteRowsEvent;
+import com.google.code.or.binlog.impl.event.DeleteRowsEventV2;
+import com.google.code.or.binlog.impl.event.QueryEvent;
+import com.google.code.or.binlog.impl.event.TableMapEvent;
+import com.google.code.or.binlog.impl.event.UpdateRowsEvent;
+import com.google.code.or.binlog.impl.event.UpdateRowsEventV2;
+import com.google.code.or.binlog.impl.event.WriteRowsEvent;
+import com.google.code.or.binlog.impl.event.WriteRowsEventV2;
+import com.google.code.or.binlog.impl.event.XidEvent;
+import com.google.code.or.common.glossary.Pair;
+import com.google.code.or.common.glossary.Row;
+import com.google.code.or.common.util.MySQLConstants;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.regex.Pattern;
+import javax.sql.DataSource;
+import org.apache.rocketmq.mysql.Config;
+import org.apache.rocketmq.mysql.Replicator;
+import org.apache.rocketmq.mysql.position.BinlogPosition;
+import org.apache.rocketmq.mysql.position.BinlogPositionManager;
+import org.apache.rocketmq.mysql.schema.Schema;
+import org.apache.rocketmq.mysql.schema.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EventProcessor {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(EventProcessor.class);
+
+    private Replicator replicator;
+    private Config config;
+
+    private DataSource dataSource;
+
+    private BinlogPositionManager binlogPositionManager;
+
+    private BlockingQueue<BinlogEventV4> queue = new LinkedBlockingQueue<>(100);
+
+    private OpenReplicator openReplicator;
+
+    private EventListener eventListener;
+
+    private Schema schema;
+
+    private Map<Long, Table> tableMap = new HashMap<>();
+
+    private Transaction transaction;
+
+    public EventProcessor(Replicator replicator) {
+
+        this.replicator = replicator;
+        this.config = replicator.getConfig();
+    }
+
+    public void start() throws Exception {
+
+        initDataSource();
+
+        binlogPositionManager = new BinlogPositionManager(config, dataSource);
+        binlogPositionManager.initBeginPosition();
+
+        schema = new Schema(dataSource);
+        schema.load();
+
+        eventListener = new EventListener(queue);
+        openReplicator = new OpenReplicator();
+        openReplicator.setBinlogEventListener(eventListener);
+        openReplicator.setHost(config.mysqlAddr);
+        openReplicator.setPort(config.mysqlPort);
+        openReplicator.setUser(config.mysqlUsername);
+        openReplicator.setPassword(config.mysqlPassword);
+        openReplicator.setStopOnEOF(false);
+        openReplicator.setHeartbeatPeriod(1f);
+        openReplicator.setLevel2BufferSize(50 * 1024 * 1024);
+        openReplicator.setServerId(1001);
+        openReplicator.setBinlogFileName(binlogPositionManager.getBinlogFilename());
+        openReplicator.setBinlogPosition(binlogPositionManager.getPosition());
+        openReplicator.start();
+
+        LOGGER.info("Started.");
+
+        doProcess();
+    }
+
+    private void doProcess() {
+
+        while (true) {
+
+            try {
+                BinlogEventV4 event = queue.take();
+
+                switch (event.getHeader().getEventType()) {
+                    case MySQLConstants.TABLE_MAP_EVENT:
+                        processTableMapEvent(event);
+                        break;
+
+                    case MySQLConstants.WRITE_ROWS_EVENT:
+                        processWriteEvent(event);
+                        break;
+
+                    case MySQLConstants.WRITE_ROWS_EVENT_V2:
+                        processWriteEventV2(event);
+                        break;
+
+                    case MySQLConstants.UPDATE_ROWS_EVENT:
+                        processUpdateEvent(event);
+                        break;
+
+                    case MySQLConstants.UPDATE_ROWS_EVENT_V2:
+                        processUpdateEventV2(event);
+                        break;
+
+                    case MySQLConstants.DELETE_ROWS_EVENT:
+                        processDeleteEvent(event);
+                        break;
+
+                    case MySQLConstants.DELETE_ROWS_EVENT_V2:
+                        processDeleteEventV2(event);
+                        break;
+
+                    case MySQLConstants.QUERY_EVENT:
+                        processQueryEvent(event);
+                        break;
+
+                    case MySQLConstants.XID_EVENT:
+                        processXidEvent(event);
+                        break;
+
+                }
+            } catch (Exception e) {
+                LOGGER.error("Binlog process error.", e);
+            }
+
+        }
+    }
+
+    private void processTableMapEvent(BinlogEventV4 event) {
+
+        TableMapEvent tableMapEvent = (TableMapEvent) event;
+        String dbName = tableMapEvent.getDatabaseName().toString();
+        String tableName = tableMapEvent.getTableName().toString();
+        Long tableId = tableMapEvent.getTableId();
+
+        Table table = schema.getTable(dbName, tableName);
+
+        tableMap.put(tableId, table);
+    }
+
+    private void processWriteEvent(BinlogEventV4 event) {
+
+        WriteRowsEvent writeRowsEvent = (WriteRowsEvent) event;
+        Long tableId = writeRowsEvent.getTableId();
+        List<Row> list = writeRowsEvent.getRows();
+
+        for (Row row : list) {
+            addRow("WRITE", tableId, row);
+        }
+    }
+
+    private void processWriteEventV2(BinlogEventV4 event) {
+
+        WriteRowsEventV2 writeRowsEventV2 = (WriteRowsEventV2) event;
+        Long tableId = writeRowsEventV2.getTableId();
+        List<Row> list = writeRowsEventV2.getRows();
+
+        for (Row row : list) {
+            addRow("WRITE", tableId, row);
+        }
+
+    }
+
+    private void processUpdateEvent(BinlogEventV4 event) {
+
+        UpdateRowsEvent updateRowsEvent = (UpdateRowsEvent) event;
+        Long tableId = updateRowsEvent.getTableId();
+        List<Pair<Row>> list = updateRowsEvent.getRows();
+
+        for (Pair<Row> pair : list) {
+            addRow("UPDATE", tableId, pair.getAfter());
+        }
+    }
+
+    private void processUpdateEventV2(BinlogEventV4 event) {
+
+        UpdateRowsEventV2 updateRowsEventV2 = (UpdateRowsEventV2) event;
+        Long tableId = updateRowsEventV2.getTableId();
+        List<Pair<Row>> list = updateRowsEventV2.getRows();
+
+        for (Pair<Row> pair : list) {
+            addRow("UPDATE", tableId, pair.getAfter());
+        }
+    }
+
+    private void processDeleteEvent(BinlogEventV4 event) {
+
+        DeleteRowsEvent deleteRowsEvent = (DeleteRowsEvent) event;
+        Long tableId = deleteRowsEvent.getTableId();
+        List<Row> list = deleteRowsEvent.getRows();
+
+        for (Row row : list) {
+            addRow("DELETE", tableId, row);
+        }
+
+    }
+
+    private void processDeleteEventV2(BinlogEventV4 event) {
+
+        DeleteRowsEventV2 deleteRowsEventV2 = (DeleteRowsEventV2) event;
+        Long tableId = deleteRowsEventV2.getTableId();
+        List<Row> list = deleteRowsEventV2.getRows();
+
+        for (Row row : list) {
+            addRow("DELETE", tableId, row);
+        }
+
+    }
+
+    private static Pattern createTablePattern =
+        Pattern.compile("^(CREATE|ALTER)\\s+TABLE", Pattern.CASE_INSENSITIVE);
+
+    private void processQueryEvent(BinlogEventV4 event) {
+
+        QueryEvent queryEvent = (QueryEvent) event;
+        String sql = queryEvent.getSql().toString();
+
+        if (createTablePattern.matcher(sql).find()) {
+            schema.reset();
+        }
+    }
+
+    private void processXidEvent(BinlogEventV4 event) {
+
+        XidEvent xidEvent = (XidEvent) event;
+        String binlogFilename = xidEvent.getBinlogFilename();
+        Long position = xidEvent.getHeader().getNextPosition();
+        Long xid = xidEvent.getXid();
+
+        BinlogPosition binlogPosition = new BinlogPosition(binlogFilename, position);
+        transaction.setNextBinlogPosition(binlogPosition);
+        transaction.setXid(xid);
+
+        replicator.commit(transaction, true);
+
+        transaction = new Transaction(this);
+
+    }
+
+    private void addRow(String type, Long tableId, Row row) {
+
+        if (transaction == null) {
+            transaction = new Transaction(this);
+        }
+
+        Table t = tableMap.get(tableId);
+        if (t != null) {
+
+            while (true) {
+                if (transaction.addRow(type, t, row)) {
+                    break;
+
+                } else {
+                    transaction.setNextBinlogPosition(replicator.getNextBinlogPosition());
+                    replicator.commit(transaction, false);
+                    transaction = new Transaction(this);
+                }
+            }
+
+        }
+    }
+
+    private void initDataSource() throws Exception {
+
+        Map map = new HashMap();
+        map.put("driverClassName", "com.mysql.jdbc.Driver");
+        map.put("url", "jdbc:mysql://" + config.mysqlAddr + ":" + config.mysqlPort + "?useSSL=true&verifyServerCertificate=false");
+        map.put("username", config.mysqlUsername);
+        map.put("password", config.mysqlPassword);
+        map.put("initialSize", "2");
+        map.put("maxActive", "2");
+        map.put("maxWait", "60000");
+        map.put("timeBetweenEvictionRunsMillis", "60000");
+        map.put("minEvictableIdleTimeMillis", "300000");
+        map.put("validationQuery", "SELECT 1 FROM DUAL");
+        map.put("testWhileIdle", "true");
+
+        dataSource = DruidDataSourceFactory.createDataSource(map);
+    }
+
+    public Config getConfig() {
+        return config;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/Transaction.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/Transaction.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/Transaction.java
new file mode 100644
index 0000000..16aa06f
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/Transaction.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.binlog;
+
+import com.alibaba.fastjson.JSONObject;
+import com.google.code.or.common.glossary.Row;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import org.apache.rocketmq.mysql.Config;
+import org.apache.rocketmq.mysql.position.BinlogPosition;
+import org.apache.rocketmq.mysql.schema.Table;
+
+public class Transaction {
+
+    private BinlogPosition nextBinlogPosition;
+    private Long xid;
+
+    private EventProcessor eventProcessor;
+    private Config config;
+
+    private List<DataRow> list = new LinkedList<>();
+
+    public Transaction(EventProcessor eventProcessor) {
+        this.eventProcessor = eventProcessor;
+        this.config = eventProcessor.getConfig();
+    }
+
+    public boolean addRow(String type, Table table, Row row) {
+
+        if (list.size() == config.maxTransactionRows) {
+            return false;
+        } else {
+            DataRow dataRow = new DataRow(type, table, row);
+            list.add(dataRow);
+            return true;
+        }
+
+    }
+
+    public String toJson() {
+
+        List<Map> rows = new LinkedList<>();
+        for (DataRow dataRow : list) {
+            Map rowMap = dataRow.toMap();
+            if (rowMap != null) {
+                rows.add(rowMap);
+            }
+        }
+
+        Map<String, Object> map = new HashMap<>();
+        map.put("xid", xid);
+        map.put("binlogFilename", nextBinlogPosition.getBinlogFilename());
+        map.put("nextPosition", nextBinlogPosition.getPosition());
+        map.put("rows", rows);
+
+        return JSONObject.toJSONString(map);
+    }
+
+    public BinlogPosition getNextBinlogPosition() {
+        return nextBinlogPosition;
+    }
+
+    public void setNextBinlogPosition(BinlogPosition nextBinlogPosition) {
+        this.nextBinlogPosition = nextBinlogPosition;
+    }
+
+    public void setXid(Long xid) {
+        this.xid = xid;
+    }
+
+    public Long getXid() {
+        return xid;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/offset/OffsetLogThread.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/offset/OffsetLogThread.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/offset/OffsetLogThread.java
new file mode 100644
index 0000000..40468af
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/offset/OffsetLogThread.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.offset;
+
+import org.apache.rocketmq.mysql.Replicator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OffsetLogThread extends Thread {
+
+    private Logger logger = LoggerFactory.getLogger(OffsetLogThread.class);
+
+    private Replicator replicator;
+
+    public OffsetLogThread(Replicator replicator) {
+        this.replicator = replicator;
+        setDaemon(true);
+    }
+
+    @Override
+    public void run() {
+
+        while (true) {
+
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+                logger.error("Offset thread interrupted.", e);
+            }
+
+            replicator.logOffset();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPosition.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPosition.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPosition.java
new file mode 100644
index 0000000..5ba436c
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPosition.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.position;
+
+public class BinlogPosition {
+
+    private String binlogFilename;
+    private Long position;
+
+    public BinlogPosition(String binlogFilename, Long position) {
+        this.binlogFilename = binlogFilename;
+        this.position = position;
+    }
+
+    public String getBinlogFilename() {
+        return binlogFilename;
+    }
+
+    public void setBinlogFilename(String binlogFilename) {
+        this.binlogFilename = binlogFilename;
+    }
+
+    public Long getPosition() {
+        return position;
+    }
+
+    public void setPosition(Long position) {
+        this.position = position;
+    }
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java
new file mode 100644
index 0000000..67e8d9e
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.position;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Set;
+import javax.sql.DataSource;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullStatus;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.mysql.Config;
+
+public class BinlogPositionManager {
+
+    private DataSource dataSource;
+    private Config config;
+
+    private String binlogFilename;
+    private Long nextPosition;
+
+    public BinlogPositionManager(Config config, DataSource dataSource) {
+        this.config = config;
+        this.dataSource = dataSource;
+    }
+
+    public void initBeginPosition() throws Exception {
+
+        if (config.startType == null || config.startType.equals("NEW_EVENT")) {
+
+            initPositionFromBinlogTail();
+        } else if (config.startType.equals("LAST_PROCESSED")) {
+
+            initPositionFromMqTail();
+        } else if (config.startType.equals("SPECIFIED")) {
+
+            binlogFilename = config.binlogFilename;
+            nextPosition = config.nextPosition;
+        }
+
+        if (binlogFilename == null || nextPosition == null) {
+            throw new Exception("binlogFilename | nextPosition is null.");
+        }
+    }
+
+    private void initPositionFromMqTail() throws Exception {
+
+        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("BINLOG_CONSUMER_GROUP");
+        consumer.setNamesrvAddr(config.mqNamesrvAddr);
+        consumer.setMessageModel(MessageModel.valueOf("BROADCASTING"));
+        consumer.start();
+
+        Set<MessageQueue> queues = consumer.fetchSubscribeMessageQueues(config.mqTopic);
+        MessageQueue queue = queues.iterator().next();
+
+        if (queue != null) {
+            Long offset = consumer.maxOffset(queue);
+            if (offset > 0)
+                offset--;
+
+            PullResult pullResult = consumer.pull(queue, "*", offset, 100);
+
+            if (pullResult.getPullStatus() == PullStatus.FOUND) {
+                MessageExt msg = pullResult.getMsgFoundList().get(0);
+                String json = new String(msg.getBody(), "UTF-8");
+
+                JSONObject js = JSON.parseObject(json);
+                binlogFilename = (String) js.get("binlogFilename");
+                nextPosition = js.getLong("nextPosition");
+            }
+        }
+
+    }
+
+    private void initPositionFromBinlogTail() throws SQLException {
+
+        String sql = "SHOW MASTER STATUS";
+
+        Connection conn = null;
+        ResultSet rs = null;
+
+        try {
+            Connection connection = dataSource.getConnection();
+            rs = connection.createStatement().executeQuery(sql);
+
+            while (rs.next()) {
+                binlogFilename = rs.getString("File");
+                nextPosition = rs.getLong("Position");
+            }
+
+        } finally {
+
+            if (conn != null) {
+                conn.close();
+            }
+            if (rs != null) {
+                rs.close();
+            }
+        }
+
+    }
+
+    public String getBinlogFilename() {
+        return binlogFilename;
+    }
+
+    public Long getPosition() {
+        return nextPosition;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/productor/RocketMQProducer.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/productor/RocketMQProducer.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/productor/RocketMQProducer.java
new file mode 100644
index 0000000..fb4eb11
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/productor/RocketMQProducer.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.productor;
+
+import java.io.UnsupportedEncodingException;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.mysql.Config;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RocketMQProducer {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(RocketMQProducer.class);
+
+    private DefaultMQProducer producer;
+    private Config config;
+
+    public RocketMQProducer(Config config) {
+        this.config = config;
+    }
+
+    public void start() throws MQClientException {
+
+        producer = new DefaultMQProducer("BINLOG_PRODUCER_GROUP");
+        producer.setNamesrvAddr(config.mqNamesrvAddr);
+        producer.start();
+    }
+
+    public long push(
+        String json) throws UnsupportedEncodingException, InterruptedException, RemotingException, MQClientException, MQBrokerException {
+
+        LOGGER.debug(json);
+
+        Message message = new Message(config.mqTopic, json.getBytes("UTF-8"));
+        SendResult sendResult = producer.send(message);
+
+        return sendResult.getQueueOffset();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/Database.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/Database.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/Database.java
new file mode 100644
index 0000000..604cd7f
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/Database.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.schema;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import javax.sql.DataSource;
+import org.apache.rocketmq.mysql.binlog.EventProcessor;
+import org.apache.rocketmq.mysql.schema.column.ColumnParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Database {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(EventProcessor.class);
+
+    private String name;
+
+    private DataSource dataSource;
+
+    private Map<String, Table> tableMap = new HashMap<String, Table>();
+
+    public Database(String name, DataSource dataSource) {
+        this.name = name;
+        this.dataSource = dataSource;
+    }
+
+    public void init() throws SQLException {
+
+        String sql = "select table_name,column_name,data_type,column_type,character_set_name " +
+            "from information_schema.columns " +
+            "where table_schema = ?";
+
+        Connection conn = null;
+        PreparedStatement ps = null;
+        ResultSet rs = null;
+
+        try {
+            conn = dataSource.getConnection();
+
+            ps = conn.prepareStatement(sql);
+            ps.setString(1, name);
+            rs = ps.executeQuery();
+
+            while (rs.next()) {
+                String tableName = rs.getString(1);
+                String colName = rs.getString(2);
+                String dataType = rs.getString(3);
+                String colType = rs.getString(4);
+                String charset = rs.getString(5);
+
+                ColumnParser columnParser = ColumnParser.getColumnParser(dataType, colType, charset);
+
+                if (!tableMap.containsKey(tableName)) {
+                    addTable(tableName);
+                }
+                Table table = tableMap.get(tableName);
+                table.addCol(colName);
+                table.addParser(columnParser);
+            }
+
+        } finally {
+
+            if (conn != null) {
+                conn.close();
+            }
+            if (ps != null) {
+                ps.close();
+            }
+            if (rs != null) {
+                rs.close();
+            }
+        }
+
+    }
+
+    private void addTable(String tableName) {
+
+        LOGGER.info("Schema load -- DATABASE:{},\tTABLE:{}", name, tableName);
+
+        Table table = new Table(name, tableName);
+        tableMap.put(tableName, table);
+    }
+
+    public Table getTable(String tableName) {
+
+        return tableMap.get(tableName);
+    }
+}