You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/08/22 14:11:04 UTC

[1/2] incubator-rocketmq-externals git commit: Release rocketmq-mysql 1.1.0 version

Repository: incubator-rocketmq-externals
Updated Branches:
  refs/heads/release-rocketmq-mysql-1.1.0 [created] a0aeee629


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java
new file mode 100644
index 0000000..fd6555c
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.position;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Set;
+import javax.sql.DataSource;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullStatus;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.mysql.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BinlogPositionManager {
+    private Logger logger = LoggerFactory.getLogger(BinlogPositionManager.class);
+
+    private DataSource dataSource;
+    private Config config;
+
+    private String binlogFilename;
+    private Long nextPosition;
+
+    public BinlogPositionManager(Config config, DataSource dataSource) {
+        this.config = config;
+        this.dataSource = dataSource;
+    }
+
+    public void initBeginPosition() throws Exception {
+
+        if (config.startType == null || config.startType.equals("DEFAULT")) {
+            initPositionDefault();
+
+        } else if (config.startType.equals("NEW_EVENT")) {
+            initPositionFromBinlogTail();
+
+        } else if (config.startType.equals("LAST_PROCESSED")) {
+            initPositionFromMqTail();
+
+        } else if (config.startType.equals("SPECIFIED")) {
+            binlogFilename = config.binlogFilename;
+            nextPosition = config.nextPosition;
+
+        }
+
+        if (binlogFilename == null || nextPosition == null) {
+            throw new Exception("binlogFilename | nextPosition is null.");
+        }
+    }
+
+    private void initPositionDefault() throws Exception {
+
+        try {
+            initPositionFromMqTail();
+        } catch (Exception e) {
+            logger.error("Init position from mq error.", e);
+        }
+
+        if (binlogFilename == null || nextPosition == null) {
+            initPositionFromBinlogTail();
+        }
+
+    }
+
+    private void initPositionFromMqTail() throws Exception {
+        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("BINLOG_CONSUMER_GROUP");
+        consumer.setNamesrvAddr(config.mqNamesrvAddr);
+        consumer.setMessageModel(MessageModel.valueOf("BROADCASTING"));
+        consumer.start();
+
+        Set<MessageQueue> queues = consumer.fetchSubscribeMessageQueues(config.mqTopic);
+        MessageQueue queue = queues.iterator().next();
+
+        if (queue != null) {
+            Long offset = consumer.maxOffset(queue);
+            if (offset > 0)
+                offset--;
+
+            PullResult pullResult = consumer.pull(queue, "*", offset, 100);
+
+            if (pullResult.getPullStatus() == PullStatus.FOUND) {
+                MessageExt msg = pullResult.getMsgFoundList().get(0);
+                String json = new String(msg.getBody(), "UTF-8");
+
+                JSONObject js = JSON.parseObject(json);
+                binlogFilename = (String) js.get("binlogFilename");
+                nextPosition = js.getLong("nextPosition");
+            }
+        }
+
+    }
+
+    private void initPositionFromBinlogTail() throws SQLException {
+        String sql = "SHOW MASTER STATUS";
+
+        Connection conn = null;
+        ResultSet rs = null;
+
+        try {
+            Connection connection = dataSource.getConnection();
+            rs = connection.createStatement().executeQuery(sql);
+
+            while (rs.next()) {
+                binlogFilename = rs.getString("File");
+                nextPosition = rs.getLong("Position");
+            }
+
+        } finally {
+
+            if (conn != null) {
+                conn.close();
+            }
+            if (rs != null) {
+                rs.close();
+            }
+        }
+
+    }
+
+    public String getBinlogFilename() {
+        return binlogFilename;
+    }
+
+    public Long getPosition() {
+        return nextPosition;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/productor/RocketMQProducer.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/productor/RocketMQProducer.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/productor/RocketMQProducer.java
new file mode 100644
index 0000000..38aca7f
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/productor/RocketMQProducer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.productor;
+
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.mysql.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RocketMQProducer {
+    private static final Logger LOGGER = LoggerFactory.getLogger(RocketMQProducer.class);
+
+    private DefaultMQProducer producer;
+    private Config config;
+
+    public RocketMQProducer(Config config) {
+        this.config = config;
+    }
+
+    public void start() throws MQClientException {
+        producer = new DefaultMQProducer("BINLOG_PRODUCER_GROUP");
+        producer.setNamesrvAddr(config.mqNamesrvAddr);
+        producer.start();
+    }
+
+    public long push(String json) throws Exception {
+        LOGGER.debug(json);
+
+        Message message = new Message(config.mqTopic, json.getBytes("UTF-8"));
+        SendResult sendResult = producer.send(message);
+
+        return sendResult.getQueueOffset();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/Database.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/Database.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/Database.java
new file mode 100644
index 0000000..b8e8321
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/Database.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.schema;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import javax.sql.DataSource;
+import org.apache.rocketmq.mysql.binlog.EventProcessor;
+import org.apache.rocketmq.mysql.schema.column.ColumnParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Database {
+    private static final Logger LOGGER = LoggerFactory.getLogger(EventProcessor.class);
+
+    private static final String SQL = "select table_name,column_name,data_type,column_type,character_set_name " +
+        "from information_schema.columns " +
+        "where table_schema = ?";
+    private String name;
+
+    private DataSource dataSource;
+
+    private Map<String, Table> tableMap = new HashMap<String, Table>();
+
+    public Database(String name, DataSource dataSource) {
+        this.name = name;
+        this.dataSource = dataSource;
+    }
+
+    public void init() throws SQLException {
+        Connection conn = null;
+        PreparedStatement ps = null;
+        ResultSet rs = null;
+
+        try {
+            conn = dataSource.getConnection();
+
+            ps = conn.prepareStatement(SQL);
+            ps.setString(1, name);
+            rs = ps.executeQuery();
+
+            while (rs.next()) {
+                String tableName = rs.getString(1);
+                String colName = rs.getString(2);
+                String dataType = rs.getString(3);
+                String colType = rs.getString(4);
+                String charset = rs.getString(5);
+
+                ColumnParser columnParser = ColumnParser.getColumnParser(dataType, colType, charset);
+
+                if (!tableMap.containsKey(tableName)) {
+                    addTable(tableName);
+                }
+                Table table = tableMap.get(tableName);
+                table.addCol(colName);
+                table.addParser(columnParser);
+            }
+
+        } finally {
+            if (conn != null) {
+                conn.close();
+            }
+            if (ps != null) {
+                ps.close();
+            }
+            if (rs != null) {
+                rs.close();
+            }
+        }
+
+    }
+
+    private void addTable(String tableName) {
+
+        LOGGER.info("Schema load -- DATABASE:{},\tTABLE:{}", name, tableName);
+
+        Table table = new Table(name, tableName);
+        tableMap.put(tableName, table);
+    }
+
+    public Table getTable(String tableName) {
+
+        return tableMap.get(tableName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/Schema.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/Schema.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/Schema.java
new file mode 100644
index 0000000..2baf2a2
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/Schema.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.schema;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.sql.DataSource;
+import org.apache.rocketmq.mysql.binlog.EventProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Schema {
+    private static final Logger LOGGER = LoggerFactory.getLogger(EventProcessor.class);
+
+    private static final String SQL = "select schema_name from information_schema.schemata";
+
+    private static final List<String> IGNORED_DATABASES = new ArrayList<>(
+        Arrays.asList(new String[] {"information_schema", "mysql", "performance_schema", "sys"})
+    );
+
+    private DataSource dataSource;
+
+    private Map<String, Database> dbMap;
+
+    public Schema(DataSource dataSource) {
+        this.dataSource = dataSource;
+    }
+
+    public void load() throws SQLException {
+
+        dbMap = new HashMap<>();
+
+        Connection conn = null;
+        PreparedStatement ps = null;
+        ResultSet rs = null;
+
+        try {
+            conn = dataSource.getConnection();
+
+            ps = conn.prepareStatement(SQL);
+            rs = ps.executeQuery();
+
+            while (rs.next()) {
+                String dbName = rs.getString(1);
+                if (!IGNORED_DATABASES.contains(dbName)) {
+                    Database database = new Database(dbName, dataSource);
+                    dbMap.put(dbName, database);
+                }
+            }
+
+        } finally {
+
+            if (conn != null) {
+                conn.close();
+            }
+            if (ps != null) {
+                ps.close();
+            }
+            if (rs != null) {
+                rs.close();
+            }
+        }
+
+        for (Database db : dbMap.values()) {
+            db.init();
+        }
+
+    }
+
+    public Table getTable(String dbName, String tableName) {
+
+        if (dbMap == null) {
+            reload();
+        }
+
+        Database database = dbMap.get(dbName);
+        if (database == null) {
+            return null;
+        }
+
+        Table table = database.getTable(tableName);
+        if (table == null) {
+            return null;
+        }
+
+        return table;
+    }
+
+    private void reload() {
+
+        while (true) {
+            try {
+                load();
+                break;
+            } catch (Exception e) {
+                LOGGER.error("Reload schema error.", e);
+            }
+        }
+    }
+
+    public void reset() {
+        dbMap = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/Table.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/Table.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/Table.java
new file mode 100644
index 0000000..54592a9
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/Table.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.schema;
+
+import java.util.LinkedList;
+import java.util.List;
+import org.apache.rocketmq.mysql.schema.column.ColumnParser;
+
+public class Table {
+    private String database;
+    private String name;
+    private List<String> colList = new LinkedList<String>();
+    private List<ColumnParser> parserList = new LinkedList<ColumnParser>();
+
+    public Table(String database, String table) {
+        this.database = database;
+        this.name = table;
+    }
+
+    public void addCol(String column) {
+        colList.add(column);
+    }
+
+    public void addParser(ColumnParser columnParser) {
+        parserList.add(columnParser);
+    }
+
+    public List<String> getColList() {
+        return colList;
+    }
+
+    public String getDatabase() {
+        return database;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public List<ColumnParser> getParserList() {
+        return parserList;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/BigIntColumnParser.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/BigIntColumnParser.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/BigIntColumnParser.java
new file mode 100644
index 0000000..667db75
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/BigIntColumnParser.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.schema.column;
+
+import java.math.BigInteger;
+
+public class BigIntColumnParser extends ColumnParser {
+
+    private static BigInteger max = BigInteger.ONE.shiftLeft(64);
+
+    private boolean signed;
+
+    public BigIntColumnParser(String colType) {
+        this.signed = !colType.matches(".* unsigned$");
+    }
+
+    @Override
+    public Object getValue(Object value) {
+
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof BigInteger) {
+            return value;
+        }
+
+        Long l = (Long) value;
+        if (!signed && l < 0) {
+            return max.add(BigInteger.valueOf(l));
+        } else {
+            return l;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/ColumnParser.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/ColumnParser.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/ColumnParser.java
new file mode 100644
index 0000000..5f2920b
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/ColumnParser.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.schema.column;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public abstract class ColumnParser {
+
+    public static ColumnParser getColumnParser(String dataType, String colType, String charset) {
+
+        switch (dataType) {
+            case "tinyint":
+            case "smallint":
+            case "mediumint":
+            case "int":
+                return new IntColumnParser(dataType, colType);
+            case "bigint":
+                return new BigIntColumnParser(colType);
+            case "tinytext":
+            case "text":
+            case "mediumtext":
+            case "longtext":
+            case "varchar":
+            case "char":
+                return new StringColumnParser(charset);
+            case "date":
+            case "datetime":
+            case "timestamp":
+                return new DateTimeColumnParser();
+            case "time":
+                return new TimeColumnParser();
+            case "year":
+                return new YearColumnParser();
+            case "enum":
+                return new EnumColumnParser(colType);
+            case "set":
+                return new SetColumnParser(colType);
+            default:
+                return new DefaultColumnParser();
+        }
+    }
+
+    public static String[] extractEnumValues(String colType) {
+        String[] enumValues = {};
+        Matcher matcher = Pattern.compile("(enum|set)\\((.*)\\)").matcher(colType);
+        if (matcher.matches()) {
+            enumValues = matcher.group(2).replace("'", "").split(",");
+        }
+
+        return enumValues;
+    }
+
+    public abstract Object getValue(Object value);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DateTimeColumnParser.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DateTimeColumnParser.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DateTimeColumnParser.java
new file mode 100644
index 0000000..6b60abd
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DateTimeColumnParser.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.schema.column;
+
+import java.sql.Timestamp;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.TimeZone;
+
+public class DateTimeColumnParser extends ColumnParser {
+
+    private static SimpleDateFormat dateTimeFormat;
+    private static SimpleDateFormat dateTimeUtcFormat;
+
+    static {
+        dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        dateTimeUtcFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        dateTimeUtcFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+    }
+
+    @Override
+    public Object getValue(Object value) {
+
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Timestamp) {
+            return dateTimeFormat.format(value);
+        }
+
+        if (value instanceof Long) {
+            return dateTimeUtcFormat.format(new Date((Long) value));
+        }
+
+        return value;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DefaultColumnParser.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DefaultColumnParser.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DefaultColumnParser.java
new file mode 100644
index 0000000..46eb48e
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DefaultColumnParser.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.schema.column;
+
+import org.apache.commons.codec.binary.Base64;
+
+public class DefaultColumnParser extends ColumnParser {
+
+    @Override
+    public Object getValue(Object value) {
+
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof byte[]) {
+            return Base64.encodeBase64String((byte[]) value);
+        }
+
+        return value;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/EnumColumnParser.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/EnumColumnParser.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/EnumColumnParser.java
new file mode 100644
index 0000000..2942103
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/EnumColumnParser.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.schema.column;
+
+public class EnumColumnParser extends ColumnParser {
+
+    private String[] enumValues;
+
+    public EnumColumnParser(String colType) {
+        enumValues = extractEnumValues(colType);
+    }
+
+    @Override
+    public Object getValue(Object value) {
+
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof String) {
+            return value;
+        }
+
+        Integer i = (Integer) value;
+        if (i == 0) {
+            return null;
+        } else {
+            return enumValues[i - 1];
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/IntColumnParser.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/IntColumnParser.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/IntColumnParser.java
new file mode 100644
index 0000000..96cf999
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/IntColumnParser.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.schema.column;
+
+public class IntColumnParser extends ColumnParser {
+
+    private int bits;
+    private boolean signed;
+
+    public IntColumnParser(String dataType, String colType) {
+
+        switch (dataType) {
+            case "tinyint":
+                bits = 8;
+                break;
+            case "smallint":
+                bits = 16;
+                break;
+            case "mediumint":
+                bits = 24;
+                break;
+            case "int":
+                bits = 32;
+        }
+
+        this.signed = !colType.matches(".* unsigned$");
+    }
+
+    @Override
+    public Object getValue(Object value) {
+
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Long) {
+            return value;
+        }
+
+        if (value instanceof Integer) {
+            Integer i = (Integer) value;
+            if (signed || i > 0) {
+                return i;
+            } else {
+                return (1L << bits) + i;
+            }
+        }
+
+        return value;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/SetColumnParser.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/SetColumnParser.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/SetColumnParser.java
new file mode 100644
index 0000000..fb28c30
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/SetColumnParser.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.schema.column;
+
+public class SetColumnParser extends ColumnParser {
+
+    private String[] enumValues;
+
+    public SetColumnParser(String colType) {
+        enumValues = extractEnumValues(colType);
+    }
+
+    @Override
+    public Object getValue(Object value) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof String) {
+            return value;
+        }
+
+        StringBuilder builder = new StringBuilder();
+        long l = (Long) value;
+
+        boolean needSplit = false;
+        for (int i = 0; i < enumValues.length; i++) {
+            if (((l >> i) & 1) == 1) {
+                if (needSplit)
+                    builder.append(",");
+
+                builder.append(enumValues[i]);
+                needSplit = true;
+            }
+        }
+
+        return builder.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/StringColumnParser.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/StringColumnParser.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/StringColumnParser.java
new file mode 100644
index 0000000..19068c9
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/StringColumnParser.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.schema.column;
+
+import org.apache.commons.codec.Charsets;
+
+public class StringColumnParser extends ColumnParser {
+
+    private String charset;
+
+    public StringColumnParser(String charset) {
+        this.charset = charset.toLowerCase();
+    }
+
+    @Override
+    public Object getValue(Object value) {
+
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof String) {
+            return value;
+        }
+
+        byte[] bytes = (byte[]) value;
+
+        switch (charset) {
+            case "utf8":
+            case "utf8mb4":
+                return new String(bytes, Charsets.UTF_8);
+            case "latin1":
+            case "ascii":
+                return new String(bytes, Charsets.ISO_8859_1);
+            case "ucs2":
+                return new String(bytes, Charsets.UTF_16);
+            default:
+                return new String(bytes, Charsets.toCharset(charset));
+
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/TimeColumnParser.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/TimeColumnParser.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/TimeColumnParser.java
new file mode 100644
index 0000000..384b06e
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/TimeColumnParser.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.schema.column;
+
+import java.sql.Time;
+import java.sql.Timestamp;
+
+public class TimeColumnParser extends ColumnParser {
+
+    @Override
+    public Object getValue(Object value) {
+
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Timestamp) {
+
+            return new Time(((Timestamp) value).getTime());
+        }
+
+        return value;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/YearColumnParser.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/YearColumnParser.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/YearColumnParser.java
new file mode 100644
index 0000000..0419933
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/YearColumnParser.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.schema.column;
+
+import java.sql.Date;
+import java.util.Calendar;
+
+public class YearColumnParser extends ColumnParser {
+
+    @Override
+    public Object getValue(Object value) {
+
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Date) {
+            Calendar calendar = Calendar.getInstance();
+            calendar.setTime((Date) value);
+            return calendar.get(Calendar.YEAR);
+        }
+
+        return value;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/resources/logback.xml
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/resources/logback.xml b/rocketmq-mysql/src/main/resources/logback.xml
new file mode 100644
index 0000000..d4993de
--- /dev/null
+++ b/rocketmq-mysql/src/main/resources/logback.xml
@@ -0,0 +1,79 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<configuration>
+
+
+    <appender name="DefaultConsoleAppender" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p - %m%n</pattern>
+            <charset class="java.nio.charset.Charset">UTF-8</charset>
+        </encoder>
+    </appender>
+
+
+    <appender name="DefaultFileAppender"
+              class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <file>./logs/rocketmq_mysql.log</file>
+        <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+            <fileNamePattern>./logs/rocketmq_mysql.%i.log
+            </fileNamePattern>
+            <minIndex>1</minIndex>
+            <maxIndex>10</maxIndex>
+        </rollingPolicy>
+        <triggeringPolicy
+            class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+            <maxFileSize>100MB</maxFileSize>
+        </triggeringPolicy>
+        <encoder>
+            <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p - %m%n</pattern>
+            <charset class="java.nio.charset.Charset">UTF-8</charset>
+        </encoder>
+    </appender>
+
+    <appender name="PositionAppender"
+              class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <file>./logs/position.log</file>
+        <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+            <fileNamePattern>./logs/position.%i.log
+            </fileNamePattern>
+            <minIndex>1</minIndex>
+            <maxIndex>10</maxIndex>
+        </rollingPolicy>
+        <triggeringPolicy
+            class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+            <maxFileSize>10MB</maxFileSize>
+        </triggeringPolicy>
+        <encoder>
+            <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} - %m%n</pattern>
+            <charset class="java.nio.charset.Charset">UTF-8</charset>
+        </encoder>
+    </appender>
+
+    <root>
+        <level value="INFO"/>
+        <appender-ref ref="DefaultConsoleAppender"/>
+        <appender-ref ref="DefaultFileAppender"/>
+    </root>
+
+    <logger name="PositionLogger" additivity="false">
+        <level value="INFO"/>
+        <appender-ref ref="PositionAppender"/>
+    </logger>
+
+</configuration>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/resources/rocketmq_mysql.conf
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/resources/rocketmq_mysql.conf b/rocketmq-mysql/src/main/resources/rocketmq_mysql.conf
new file mode 100644
index 0000000..4a7a35f
--- /dev/null
+++ b/rocketmq-mysql/src/main/resources/rocketmq_mysql.conf
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+
+mysqlAddr=
+mysqlPort=
+mysqlUsername=
+mysqlPassword=
+
+mqNamesrvAddr=
+mqTopic=
+
+#startType=
+#binlogFilename=
+#nextPosition=
+#maxTransactionRows=
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/test/java/org/apache/rocketmq/mysql/BigIntColumnParserTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/test/java/org/apache/rocketmq/mysql/BigIntColumnParserTest.java b/rocketmq-mysql/src/test/java/org/apache/rocketmq/mysql/BigIntColumnParserTest.java
new file mode 100644
index 0000000..ebf0926
--- /dev/null
+++ b/rocketmq-mysql/src/test/java/org/apache/rocketmq/mysql/BigIntColumnParserTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql;
+
+import java.math.BigInteger;
+import org.apache.rocketmq.mysql.schema.column.BigIntColumnParser;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class BigIntColumnParserTest {
+
+    @Test
+    public void testBigInt() {
+        BigIntColumnParser parser = new BigIntColumnParser("bigint(20) unsigned");
+
+        BigInteger v1 = (BigInteger) parser.getValue(Long.MIN_VALUE);
+        BigInteger v2 = BigInteger.valueOf(Long.MAX_VALUE).add(BigInteger.ONE);
+        assertEquals(v1, v2);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/test/java/org/apache/rocketmq/mysql/EnumColumnParserTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/test/java/org/apache/rocketmq/mysql/EnumColumnParserTest.java b/rocketmq-mysql/src/test/java/org/apache/rocketmq/mysql/EnumColumnParserTest.java
new file mode 100644
index 0000000..5c40060
--- /dev/null
+++ b/rocketmq-mysql/src/test/java/org/apache/rocketmq/mysql/EnumColumnParserTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql;
+
+import org.apache.rocketmq.mysql.schema.column.EnumColumnParser;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class EnumColumnParserTest {
+
+    @Test
+    public void testEnum() {
+        String colType = "enum('a','b','c','d')";
+
+        EnumColumnParser parser = new EnumColumnParser(colType);
+        String v = (String) parser.getValue(3);
+        assertEquals(v, "c");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/test/java/org/apache/rocketmq/mysql/IntColumnParserTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/test/java/org/apache/rocketmq/mysql/IntColumnParserTest.java b/rocketmq-mysql/src/test/java/org/apache/rocketmq/mysql/IntColumnParserTest.java
new file mode 100644
index 0000000..4972947
--- /dev/null
+++ b/rocketmq-mysql/src/test/java/org/apache/rocketmq/mysql/IntColumnParserTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql;
+
+import org.apache.rocketmq.mysql.schema.column.IntColumnParser;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class IntColumnParserTest {
+
+    @Test
+    public void testInt() {
+        IntColumnParser parser = new IntColumnParser("int", "int(10) unsigned");
+
+        Long v1 = (Long) parser.getValue(Integer.MIN_VALUE);
+        Long v2 = (long) Integer.MAX_VALUE + 1;
+        assertEquals(v1, v2);
+    }
+
+    @Test
+    public void testSmallint() {
+        IntColumnParser parser = new IntColumnParser("smallint", "smallint(5) unsigned");
+
+        Long v1 = (Long) parser.getValue((int) Short.MIN_VALUE);
+        Long v2 = (long) (Short.MAX_VALUE + 1);
+        assertEquals(v1, v2);
+    }
+
+    @Test
+    public void testTinyint() {
+        IntColumnParser parser = new IntColumnParser("tinyint", "tinyint(3) unsigned");
+
+        Long v1 = (Long) parser.getValue((int) Byte.MIN_VALUE);
+        Long v2 = (long) (Byte.MAX_VALUE + 1);
+        assertEquals(v1, v2);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/test/java/org/apache/rocketmq/mysql/SetColumnParserTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/test/java/org/apache/rocketmq/mysql/SetColumnParserTest.java b/rocketmq-mysql/src/test/java/org/apache/rocketmq/mysql/SetColumnParserTest.java
new file mode 100644
index 0000000..3fbf4ba
--- /dev/null
+++ b/rocketmq-mysql/src/test/java/org/apache/rocketmq/mysql/SetColumnParserTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql;
+
+import org.apache.rocketmq.mysql.schema.column.SetColumnParser;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class SetColumnParserTest {
+
+    @Test
+    public void testSet() {
+        String colType = "set('a','b','c','d')";
+
+        SetColumnParser parser = new SetColumnParser(colType);
+        String v = (String) parser.getValue(1001L);
+        assertEquals(v, "a,d");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/style/copyright/Apache.xml
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/style/copyright/Apache.xml b/rocketmq-mysql/style/copyright/Apache.xml
new file mode 100644
index 0000000..e3e3dec
--- /dev/null
+++ b/rocketmq-mysql/style/copyright/Apache.xml
@@ -0,0 +1,23 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<component name="CopyrightManager">
+    <copyright>
+        <option name="myName" value="Apache" />
+        <option name="notice" value="Licensed to the Apache Software Foundation (ASF) under one or more&#10;contributor license agreements.  See the NOTICE file distributed with&#10;this work for additional information regarding copyright ownership.&#10;The ASF licenses this file to You under the Apache License, Version 2.0&#10;(the &quot;License&quot;); you may not use this file except in compliance with&#10;the License.  You may obtain a copy of the License at&#10;&#10;    http://www.apache.org/licenses/LICENSE-2.0&#10;&#10;Unless required by applicable law or agreed to in writing, software&#10;distributed under the License is distributed on an &quot;AS IS&quot; BASIS,&#10;WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.&#10;See the License for the specific language governing permissions and&#10;limitations under the License." />
+    </copyright>
+</component>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/style/copyright/profiles_settings.xml
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/style/copyright/profiles_settings.xml b/rocketmq-mysql/style/copyright/profiles_settings.xml
new file mode 100644
index 0000000..747c7e2
--- /dev/null
+++ b/rocketmq-mysql/style/copyright/profiles_settings.xml
@@ -0,0 +1,64 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<component name="CopyrightManager">
+    <settings default="Apache">
+        <module2copyright>
+            <element module="All" copyright="Apache"/>
+        </module2copyright>
+        <LanguageOptions name="GSP">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="prefixLines" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="HTML">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="prefixLines" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="JAVA">
+            <option name="fileTypeOverride" value="3" />
+            <option name="addBlankAfter" value="false" />
+        </LanguageOptions>
+        <LanguageOptions name="JSP">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="prefixLines" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="JSPX">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="prefixLines" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="MXML">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="prefixLines" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="Properties">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="block" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="SPI">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="block" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="XML">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="prefixLines" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="__TEMPLATE__">
+            <option name="separateBefore" value="true"/>
+            <option name="lenBefore" value="1"/>
+        </LanguageOptions>
+    </settings>
+</component>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/style/rmq_checkstyle.xml
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/style/rmq_checkstyle.xml b/rocketmq-mysql/style/rmq_checkstyle.xml
new file mode 100644
index 0000000..2872eb7
--- /dev/null
+++ b/rocketmq-mysql/style/rmq_checkstyle.xml
@@ -0,0 +1,134 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<!DOCTYPE module PUBLIC
+    "-//Puppy Crawl//DTD Check Configuration 1.3//EN"
+    "http://www.puppycrawl.com/dtds/configuration_1_3.dtd">
+<!--Refer http://checkstyle.sourceforge.net/reports/google-java-style.html#s2.2-file-encoding -->
+<module name="Checker">
+
+    <property name="localeLanguage" value="en"/>
+
+    <!--To configure the check to report on the first instance in each file-->
+    <module name="FileTabCharacter"/>
+
+    <!-- header -->
+    <module name="RegexpHeader">
+        <property name="header" value="/\*\nLicensed to the Apache Software Foundation*"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format" value="System\.out\.println"/>
+        <property name="message" value="Prohibit invoking System.out.println in source code !"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format" value="//FIXME"/>
+        <property name="message" value="Recommended fix FIXME task !"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format" value="//TODO"/>
+        <property name="message" value="Recommended fix TODO task !"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format" value="@alibaba"/>
+        <property name="message" value="Recommended remove @alibaba keyword!"/>
+    </module>
+    <module name="RegexpSingleline">
+        <property name="format" value="@taobao"/>
+        <property name="message" value="Recommended remove @taobao keyword!"/>
+    </module>
+    <module name="RegexpSingleline">
+        <property name="format" value="@author"/>
+        <property name="message" value="Recommended remove @author tag in javadoc!"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format"
+                  value=".*[\u3400-\u4DB5\u4E00-\u9FA5\u9FA6-\u9FBB\uF900-\uFA2D\uFA30-\uFA6A\uFA70-\uFAD9\uFF00-\uFFEF\u2E80-\u2EFF\u3000-\u303F\u31C0-\u31EF]+.*"/>
+        <property name="message" value="Not allow chinese character !"/>
+    </module>
+
+    <module name="FileLength">
+        <property name="max" value="3000"/>
+    </module>
+
+    <module name="TreeWalker">
+
+        <module name="UnusedImports">
+            <property name="processJavadoc" value="true"/>
+        </module>
+        <module name="RedundantImport"/>
+
+        <!--<module name="IllegalImport" />-->
+
+        <!--Checks that classes that override equals() also override hashCode()-->
+        <module name="EqualsHashCode"/>
+        <!--Checks for over-complicated boolean expressions. Currently finds code like if (topic == true), topic || true, !false, etc.-->
+        <module name="SimplifyBooleanExpression"/>
+        <module name="OneStatementPerLine"/>
+        <module name="UnnecessaryParentheses"/>
+        <!--Checks for over-complicated boolean return statements. For example the following code-->
+        <module name="SimplifyBooleanReturn"/>
+
+        <!--Check that the default is after all the cases in producerGroup switch statement-->
+        <module name="DefaultComesLast"/>
+        <!--Detects empty statements (standalone ";" semicolon)-->
+        <module name="EmptyStatement"/>
+        <!--Checks that long constants are defined with an upper ell-->
+        <module name="UpperEll"/>
+        <module name="ConstantName">
+            <property name="format" value="(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)|(^log$)"/>
+        </module>
+        <!--Checks that local, non-final variable names conform to producerGroup format specified by the format property-->
+        <module name="LocalVariableName"/>
+        <!--Validates identifiers for local, final variables, including catch parameters-->
+        <module name="LocalFinalVariableName"/>
+        <!--Validates identifiers for non-static fields-->
+        <module name="MemberName"/>
+        <!--Validates identifiers for class type parameters-->
+        <module name="ClassTypeParameterName">
+            <property name="format" value="^[A-Z0-9]*$"/>
+        </module>
+        <!--Validates identifiers for method type parameters-->
+        <module name="MethodTypeParameterName">
+            <property name="format" value="^[A-Z0-9]*$"/>
+        </module>
+        <module name="PackageName"/>
+        <module name="ParameterName"/>
+        <module name="StaticVariableName"/>
+        <module name="TypeName"/>
+        <!--Checks that there are no import statements that use the * notation-->
+        <module name="AvoidStarImport"/>
+
+        <!--whitespace-->
+        <module name="GenericWhitespace"/>
+        <module name="NoWhitespaceBefore"/>
+        <module name="NoWhitespaceAfter"/>
+        <module name="WhitespaceAround">
+            <property name="allowEmptyConstructors" value="true"/>
+            <property name="allowEmptyMethods" value="true"/>
+        </module>
+        <module name="Indentation"/>
+        <module name="MethodParamPad"/>
+        <module name="ParenPad"/>
+        <module name="TypecastParenPad"/>
+    </module>
+</module>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/style/rmq_codeStyle.xml
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/style/rmq_codeStyle.xml b/rocketmq-mysql/style/rmq_codeStyle.xml
new file mode 100644
index 0000000..7c7ce54
--- /dev/null
+++ b/rocketmq-mysql/style/rmq_codeStyle.xml
@@ -0,0 +1,143 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<code_scheme name="rocketmq">
+    <option name="USE_SAME_INDENTS" value="true"/>
+    <option name="IGNORE_SAME_INDENTS_FOR_LANGUAGES" value="true"/>
+    <option name="OTHER_INDENT_OPTIONS">
+        <value>
+            <option name="INDENT_SIZE" value="4"/>
+            <option name="CONTINUATION_INDENT_SIZE" value="4"/>
+            <option name="TAB_SIZE" value="4"/>
+            <option name="USE_TAB_CHARACTER" value="false"/>
+            <option name="SMART_TABS" value="false"/>
+            <option name="LABEL_INDENT_SIZE" value="0"/>
+            <option name="LABEL_INDENT_ABSOLUTE" value="false"/>
+            <option name="USE_RELATIVE_INDENTS" value="false"/>
+        </value>
+    </option>
+    <option name="PREFER_LONGER_NAMES" value="false"/>
+    <option name="CLASS_COUNT_TO_USE_IMPORT_ON_DEMAND" value="1000"/>
+    <option name="NAMES_COUNT_TO_USE_IMPORT_ON_DEMAND" value="1000"/>
+    <option name="PACKAGES_TO_USE_IMPORT_ON_DEMAND">
+        <value/>
+    </option>
+    <option name="IMPORT_LAYOUT_TABLE">
+        <value>
+            <package name="" withSubpackages="true" static="false"/>
+            <emptyLine/>
+            <package name="" withSubpackages="true" static="true"/>
+        </value>
+    </option>
+    <option name="JD_ALIGN_PARAM_COMMENTS" value="false"/>
+    <option name="JD_ALIGN_EXCEPTION_COMMENTS" value="false"/>
+    <option name="JD_P_AT_EMPTY_LINES" value="false"/>
+    <option name="JD_KEEP_INVALID_TAGS" value="false"/>
+    <option name="JD_DO_NOT_WRAP_ONE_LINE_COMMENTS" value="true"/>
+    <option name="KEEP_CONTROL_STATEMENT_IN_ONE_LINE" value="false"/>
+    <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/>
+    <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
+    <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/>
+    <option name="WHILE_ON_NEW_LINE" value="true"/>
+    <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/>
+    <option name="ALIGN_MULTILINE_FOR" value="false"/>
+    <option name="SPACE_AFTER_TYPE_CAST" value="true"/>
+    <option name="SPACE_BEFORE_ARRAY_INITIALIZER_LBRACE" value="true"/>
+    <option name="METHOD_PARAMETERS_WRAP" value="1"/>
+    <option name="ARRAY_INITIALIZER_LBRACE_ON_NEXT_LINE" value="true"/>
+    <option name="LABELED_STATEMENT_WRAP" value="1"/>
+    <option name="WRAP_COMMENTS" value="true"/>
+    <option name="METHOD_ANNOTATION_WRAP" value="1"/>
+    <option name="CLASS_ANNOTATION_WRAP" value="1"/>
+    <option name="FIELD_ANNOTATION_WRAP" value="1"/>
+    <JavaCodeStyleSettings>
+        <option name="CLASS_NAMES_IN_JAVADOC" value="3"/>
+    </JavaCodeStyleSettings>
+    <XML>
+        <option name="XML_LEGACY_SETTINGS_IMPORTED" value="true"/>
+    </XML>
+    <ADDITIONAL_INDENT_OPTIONS fileType="haml">
+        <option name="INDENT_SIZE" value="2"/>
+    </ADDITIONAL_INDENT_OPTIONS>
+    <codeStyleSettings language="Groovy">
+        <option name="KEEP_CONTROL_STATEMENT_IN_ONE_LINE" value="false"/>
+        <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/>
+        <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
+        <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/>
+        <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/>
+        <option name="ALIGN_MULTILINE_FOR" value="false"/>
+        <option name="METHOD_PARAMETERS_WRAP" value="1"/>
+        <option name="METHOD_ANNOTATION_WRAP" value="1"/>
+        <option name="CLASS_ANNOTATION_WRAP" value="1"/>
+        <option name="FIELD_ANNOTATION_WRAP" value="1"/>
+        <option name="PARENT_SETTINGS_INSTALLED" value="true"/>
+        <indentOptions>
+            <option name="CONTINUATION_INDENT_SIZE" value="4"/>
+        </indentOptions>
+    </codeStyleSettings>
+    <codeStyleSettings language="HOCON">
+        <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/>
+        <option name="PARENT_SETTINGS_INSTALLED" value="true"/>
+    </codeStyleSettings>
+    <codeStyleSettings language="JAVA">
+        <option name="KEEP_CONTROL_STATEMENT_IN_ONE_LINE" value="false"/>
+        <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/>
+        <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
+        <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/>
+        <option name="WHILE_ON_NEW_LINE" value="true"/>
+        <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/>
+        <option name="ALIGN_MULTILINE_FOR" value="false"/>
+        <option name="SPACE_BEFORE_ARRAY_INITIALIZER_LBRACE" value="true"/>
+        <option name="METHOD_PARAMETERS_WRAP" value="1"/>
+        <option name="ARRAY_INITIALIZER_LBRACE_ON_NEXT_LINE" value="true"/>
+        <option name="LABELED_STATEMENT_WRAP" value="1"/>
+        <option name="METHOD_ANNOTATION_WRAP" value="1"/>
+        <option name="CLASS_ANNOTATION_WRAP" value="1"/>
+        <option name="FIELD_ANNOTATION_WRAP" value="1"/>
+        <option name="PARENT_SETTINGS_INSTALLED" value="true"/>
+        <indentOptions>
+            <option name="CONTINUATION_INDENT_SIZE" value="4"/>
+        </indentOptions>
+    </codeStyleSettings>
+    <codeStyleSettings language="JSON">
+        <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
+        <option name="PARENT_SETTINGS_INSTALLED" value="true"/>
+    </codeStyleSettings>
+    <codeStyleSettings language="Scala">
+        <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/>
+        <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
+        <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/>
+        <option name="WHILE_ON_NEW_LINE" value="true"/>
+        <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/>
+        <option name="ALIGN_MULTILINE_FOR" value="false"/>
+        <option name="METHOD_PARAMETERS_WRAP" value="1"/>
+        <option name="METHOD_ANNOTATION_WRAP" value="1"/>
+        <option name="CLASS_ANNOTATION_WRAP" value="1"/>
+        <option name="FIELD_ANNOTATION_WRAP" value="1"/>
+        <option name="PARENT_SETTINGS_INSTALLED" value="true"/>
+        <indentOptions>
+            <option name="INDENT_SIZE" value="4"/>
+            <option name="CONTINUATION_INDENT_SIZE" value="4"/>
+            <option name="TAB_SIZE" value="4"/>
+        </indentOptions>
+    </codeStyleSettings>
+    <codeStyleSettings language="XML">
+        <indentOptions>
+            <option name="CONTINUATION_INDENT_SIZE" value="4"/>
+        </indentOptions>
+    </codeStyleSettings>
+</code_scheme>
\ No newline at end of file


[2/2] incubator-rocketmq-externals git commit: Release rocketmq-mysql 1.1.0 version

Posted by yu...@apache.org.
Release rocketmq-mysql 1.1.0 version


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/commit/a0aeee62
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/tree/a0aeee62
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/diff/a0aeee62

Branch: refs/heads/release-rocketmq-mysql-1.1.0
Commit: a0aeee629f4dc3fbb86ccbf3408011092c719b1e
Parents: 
Author: yukon <yu...@apache.org>
Authored: Tue Aug 22 22:08:24 2017 +0800
Committer: yukon <yu...@apache.org>
Committed: Tue Aug 22 22:08:24 2017 +0800

----------------------------------------------------------------------
 .gitignore                                      |   1 +
 README.md                                       |  26 ++
 rocketmq-mysql/.gitignore                       |  14 +
 rocketmq-mysql/LICENSE                          | 201 +++++++++++++
 rocketmq-mysql/LICENSE-BIN                      | 301 +++++++++++++++++++
 rocketmq-mysql/NOTICE                           |   5 +
 rocketmq-mysql/NOTICE-BIN                       |   5 +
 rocketmq-mysql/README.md                        |  42 +++
 rocketmq-mysql/doc/dataflow.png                 | Bin 0 -> 28277 bytes
 rocketmq-mysql/doc/overview.png                 | Bin 0 -> 35155 bytes
 rocketmq-mysql/pom.xml                          | 275 +++++++++++++++++
 rocketmq-mysql/src/main/assembly/assembly.xml   |  61 ++++
 .../src/main/assembly/scripts/start.sh          |  23 ++
 .../src/main/assembly/scripts/stop.sh           |  18 ++
 .../java/org/apache/rocketmq/mysql/Config.java  | 130 ++++++++
 .../org/apache/rocketmq/mysql/Replicator.java   | 129 ++++++++
 .../apache/rocketmq/mysql/binlog/DataRow.java   |  76 +++++
 .../rocketmq/mysql/binlog/EventListener.java    |  65 ++++
 .../rocketmq/mysql/binlog/EventProcessor.java   | 285 ++++++++++++++++++
 .../rocketmq/mysql/binlog/Transaction.java      |  88 ++++++
 .../rocketmq/mysql/position/BinlogPosition.java |  47 +++
 .../mysql/position/BinlogPositionLogThread.java |  47 +++
 .../mysql/position/BinlogPositionManager.java   | 149 +++++++++
 .../mysql/productor/RocketMQProducer.java       |  52 ++++
 .../apache/rocketmq/mysql/schema/Database.java  | 104 +++++++
 .../apache/rocketmq/mysql/schema/Schema.java    | 126 ++++++++
 .../org/apache/rocketmq/mysql/schema/Table.java |  58 ++++
 .../mysql/schema/column/BigIntColumnParser.java |  50 +++
 .../mysql/schema/column/ColumnParser.java       |  71 +++++
 .../schema/column/DateTimeColumnParser.java     |  53 ++++
 .../schema/column/DefaultColumnParser.java      |  37 +++
 .../mysql/schema/column/EnumColumnParser.java   |  46 +++
 .../mysql/schema/column/IntColumnParser.java    |  66 ++++
 .../mysql/schema/column/SetColumnParser.java    |  54 ++++
 .../mysql/schema/column/StringColumnParser.java |  57 ++++
 .../mysql/schema/column/TimeColumnParser.java   |  39 +++
 .../mysql/schema/column/YearColumnParser.java   |  40 +++
 rocketmq-mysql/src/main/resources/logback.xml   |  79 +++++
 .../src/main/resources/rocketmq_mysql.conf      |  28 ++
 .../rocketmq/mysql/BigIntColumnParserTest.java  |  37 +++
 .../rocketmq/mysql/EnumColumnParserTest.java    |  37 +++
 .../rocketmq/mysql/IntColumnParserTest.java     |  54 ++++
 .../rocketmq/mysql/SetColumnParserTest.java     |  36 +++
 rocketmq-mysql/style/copyright/Apache.xml       |  23 ++
 .../style/copyright/profiles_settings.xml       |  64 ++++
 rocketmq-mysql/style/rmq_checkstyle.xml         | 134 +++++++++
 rocketmq-mysql/style/rmq_codeStyle.xml          | 143 +++++++++
 47 files changed, 3476 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..485dee6
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1 @@
+.idea

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..8cd617b
--- /dev/null
+++ b/README.md
@@ -0,0 +1,26 @@
+# RocketMQ Externals
+
+There are many Apache RocketMQ external projects contributed and maintained by community.
+
+## RocketMQ-Console
+A newly designed RocketMQ's console using spring-boot.
+
+## RocketMQ-JMS
+RocketMQ's JMS 1.1 spec. implementation.
+
+## RocketMQ-Flume
+Flume RocketMQ source and sink implementation.
+
+
+## RocketMQ-Spark
+
+Integration of Apache Spark-Streaming and Apache RocketMQ. Both push & pull consumers are provided. For more details please refer to [README](https://github.com/apache/incubator-rocketmq-externals/tree/master/rocketmq-spark).
+
+## RocketMQ-Docker
+Apache RocketMQ Docker provides Dockerfile and bash scripts for building and running docker image.
+
+## RocketMQ-MySQL
+This project is a data replicator between MySQL and other systems.For more details please refer to [README](https://github.com/apache/incubator-rocketmq-externals/tree/master/rocketmq-mysql).
+
+## Others
+[RocketMQ-Druid](https://github.com/druid-io/druid/tree/master/extensions-contrib/druid-rocketmq), [RocketMQ-Ignite](https://github.com/apache/ignite/tree/master/modules/rocketmq) and [RocketMQ-Storm](https://github.com/apache/storm/tree/master/external/storm-rocketmq) integration can be found in those repositories.

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/.gitignore
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/.gitignore b/rocketmq-mysql/.gitignore
new file mode 100644
index 0000000..3311eab
--- /dev/null
+++ b/rocketmq-mysql/.gitignore
@@ -0,0 +1,14 @@
+*dependency-reduced-pom.xml
+.classpath
+.project
+.settings/
+target/
+devenv
+*.log*
+*.iml
+.idea/
+*.versionsBackup
+*bin
+!NOTICE-BIN
+!LICENSE-BIN
+.DS_Store
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/LICENSE
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/LICENSE b/rocketmq-mysql/LICENSE
new file mode 100644
index 0000000..b67d909
--- /dev/null
+++ b/rocketmq-mysql/LICENSE
@@ -0,0 +1,201 @@
+Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "{}"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright {}
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/LICENSE-BIN
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/LICENSE-BIN b/rocketmq-mysql/LICENSE-BIN
new file mode 100644
index 0000000..22b0aa4
--- /dev/null
+++ b/rocketmq-mysql/LICENSE-BIN
@@ -0,0 +1,301 @@
+Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "{}"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright {}
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+
+
+------
+This product has a bundle logback, which is available under the EPL v1.0 License.
+The source code of logback can be found at https://github.com/qos-ch/logback.
+
+Logback LICENSE
+---------------
+
+Logback: the reliable, generic, fast and flexible logging framework.
+Copyright (C) 1999-2015, QOS.ch. All rights reserved.
+
+This program and the accompanying materials are dual-licensed under
+either the terms of the Eclipse Public License v1.0 as published by
+the Eclipse Foundation
+
+  or (per the licensee's choosing)
+
+under the terms of the GNU Lesser General Public License version 2.1
+as published by the Free Software Foundation.
+
+------
+This product has a bundle slf4j, which is available under the MIT License.
+The source code of slf4j can be found at https://github.com/qos-ch/slf4j.
+
+ Copyright (c) 2004-2017 QOS.ch
+ All rights reserved.
+
+ Permission is hereby granted, free  of charge, to any person obtaining
+ a  copy  of this  software  and  associated  documentation files  (the
+ "Software"), to  deal in  the Software without  restriction, including
+ without limitation  the rights to  use, copy, modify,  merge, publish,
+ distribute,  sublicense, and/or sell  copies of  the Software,  and to
+ permit persons to whom the Software  is furnished to do so, subject to
+ the following conditions:
+
+ The  above  copyright  notice  and  this permission  notice  shall  be
+ included in all copies or substantial portions of the Software.
+
+ THE  SOFTWARE IS  PROVIDED  "AS  IS", WITHOUT  WARRANTY  OF ANY  KIND,
+ EXPRESS OR  IMPLIED, INCLUDING  BUT NOT LIMITED  TO THE  WARRANTIES OF
+ MERCHANTABILITY,    FITNESS    FOR    A   PARTICULAR    PURPOSE    AND
+ NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+ LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+ OF CONTRACT, TORT OR OTHERWISE,  ARISING FROM, OUT OF OR IN CONNECTION
+ WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+------
+This product has a bundle fastjson, which is available under the ASL2 License.
+The source code of fastjson can be found at https://github.com/alibaba/fastjson.
+
+ Copyright 1999-2017 Alibaba Group Holding Ltd.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+------
+ This product has a bundle druid, which is available under the ASL2 License.
+ The source code of druid can be found at https://github.com/alibaba/druid.
+
+  Copyright 1999-2017 Alibaba Group Holding Ltd.
+
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+
+------
+This product has a bundle commons-codec, which is available under the ASL2 License.
+The source code of commons-codec can be found at http://svn.apache.org/viewvc/commons/proper/codec/trunk/.
+
+Apache Commons Codec
+Copyright 2002-2016 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java
+contains test data from http://aspell.net/test/orig/batch0.tab.
+Copyright (C) 2002 Kevin Atkinson (kevina@gnu.org)
+
+------
+This product has a bundle mysql-binlog-connector-java, which is available under the ASL2 License.
+The source code of mysql-binlog-connector-java can be found at https://github.com/shyiko/mysql-binlog-connector-java.

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/NOTICE
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/NOTICE b/rocketmq-mysql/NOTICE
new file mode 100644
index 0000000..5384857
--- /dev/null
+++ b/rocketmq-mysql/NOTICE
@@ -0,0 +1,5 @@
+Apache RocketMQ (incubating)
+Copyright 2016-2017 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/NOTICE-BIN
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/NOTICE-BIN b/rocketmq-mysql/NOTICE-BIN
new file mode 100644
index 0000000..5384857
--- /dev/null
+++ b/rocketmq-mysql/NOTICE-BIN
@@ -0,0 +1,5 @@
+Apache RocketMQ (incubating)
+Copyright 2016-2017 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/README.md
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/README.md b/rocketmq-mysql/README.md
new file mode 100644
index 0000000..65efb05
--- /dev/null
+++ b/rocketmq-mysql/README.md
@@ -0,0 +1,42 @@
+# RocketMQ-MySQL
+
+
+## Overview
+![overview](./doc/overview.png)
+
+The RocketMQ-MySQL is a data replicator between MySQL and other systems. The replicator simulates a MySQL slave instance, parses the binlog event 
+and sends it to RocketMQ in json format. Besides MySQL, other systems can also consume data from RocketMQ. With the RocketMQ-MySQL Replicator, more systems can easily process data from MySQL binlog at a very low cost.
+
+## Dataflow
+![dataflow](./doc/dataflow.png)
+
+* 1. Firstly, get the last data from the queue, and get the binlog position from this
+ data. If the data queue is empty, use the latest binlog position of MySQL. Besides that, customized setting of position of the wanted binlog is also supported.
+* 2. Send a binlog dump request to MySQL.
+* 3. MySQL pushes binlog event to the replicator. The replicator parses the data and accumulates it as a transaction-object.
+* 4. Add the next-position of the transaction to the transaction-object and send it in json format to the queue.
+* 5. Record the binlog position and the offset in the queue of the latest transaction every second.
+
+
+## Quick Start
+
+* 1. Create an account with MySQL replication permission, which is used to simulate the MySQL slave to get the binlog event, and the replication must be in row mode.
+* 2. Create a topic in the RocketMQ to store binlog events to ensure that the downstream system consumes the data in order. Make sure the topic must have only one queue.
+* 3. Configure the relevant information of MySQL and RocketMQ in the RocketMQ-MySQL.conf file.
+* 4. Execute "mvn install", then start the replicator (via execute "nohup ./start.sh &").
+* 5. Subscribe and process the messages.
+
+
+## Configuration Instruction
+|key               |nullable|default    |description|
+|------------------|--------|-----------|-----------|
+|mysqlAddr         |false   |           |MySQL address|
+|mysqlPort         |false   |           |MySQL port|
+|mysqlUsername     |false   |           |username of MySQL account|
+|mysqlPassword     |false   |           |password of MySQL account|
+|mqNamesrvAddr     |false   |           |RocketMQ name server address (e.g.,127.0.0.1:9876)|
+|mqTopic           |false   |           |RocketMQ topic name|
+|startType         |true    |DEFAULT    |The way that the replicator starts processing data,there are four options available:<br>- DEFAULT: try to start processing data in the "LAST_PROCESSED" way,if failed, then in the "NEW_EVENT" way<br>- LAST_PROCESSED: starts processing data from the last processed event<br>- NEW_EVENT: starts processing data from the tail of binlog<br>- SPECIFIED: starts processing data from the position that user specified,if you choose this option,the binlogFilename and nextPosition must not be null|
+|binlogFilename    |true    |           |If "startType" is "SPECIFIED",the replicator will begin to replicate from this binlog file|
+|nextPosition      |true    |           |If "startType" is "SPECIFIED",the replicator will begin to replicate from this position|
+|maxTransactionRows|true    |100        |max rows of the transaction pushed to RocketMQ|
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/doc/dataflow.png
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/doc/dataflow.png b/rocketmq-mysql/doc/dataflow.png
new file mode 100644
index 0000000..ed12b52
Binary files /dev/null and b/rocketmq-mysql/doc/dataflow.png differ

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/doc/overview.png
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/doc/overview.png b/rocketmq-mysql/doc/overview.png
new file mode 100644
index 0000000..0a3ec82
Binary files /dev/null and b/rocketmq-mysql/doc/overview.png differ

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/pom.xml
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/pom.xml b/rocketmq-mysql/pom.xml
new file mode 100644
index 0000000..23e7468
--- /dev/null
+++ b/rocketmq-mysql/pom.xml
@@ -0,0 +1,275 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.apache</groupId>
+    <artifactId>rocketmq-mysql-replicator</artifactId>
+    <version>1.1.0</version>
+
+    <scm>
+        <url>https://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals.git</url>
+        <connection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals.git</connection>
+        <developerConnection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals.git
+        </developerConnection>
+        <tag>HEAD</tag>
+    </scm>
+
+    <mailingLists>
+        <mailingList>
+            <name>Development List</name>
+            <subscribe>dev-subscribe@rocketmq.incubator.apache.org</subscribe>
+            <unsubscribe>dev-unsubscribe@rocketmq.incubator.apache.org</unsubscribe>
+            <post>dev@rocketmq.incubator.apache.org</post>
+        </mailingList>
+        <mailingList>
+            <name>User List</name>
+            <subscribe>users-subscribe@rocketmq.incubator.apache.org</subscribe>
+            <unsubscribe>users-unsubscribe@rocketmq.incubator.apache.org</unsubscribe>
+            <post>users@rocketmq.incubator.apache.org</post>
+        </mailingList>
+        <mailingList>
+            <name>Commits List</name>
+            <subscribe>commits-subscribe@rocketmq.incubator.apache.org</subscribe>
+            <unsubscribe>commits-unsubscribe@rocketmq.incubator.apache.org</unsubscribe>
+            <post>commits@rocketmq.incubator.apache.org</post>
+        </mailingList>
+    </mailingLists>
+
+    <developers>
+        <developer>
+            <id>Apache RocketMQ</id>
+            <name>Apache RocketMQ of ASF</name>
+            <url>https://rocketmq.apache.org/</url>
+        </developer>
+    </developers>
+
+    <licenses>
+        <license>
+            <name>Apache License, Version 2.0</name>
+            <url>http://www.apache.org/licenses/LICENSE-2.0</url>
+            <distribution>repo</distribution>
+        </license>
+    </licenses>
+
+    <organization>
+        <name>Apache Software Foundation</name>
+        <url>http://www.apache.org</url>
+    </organization>
+
+    <issueManagement>
+        <system>jira</system>
+        <url>https://issues.apache.org/jira/browse/RocketMQ</url>
+    </issueManagement>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+
+        <maven.test.skip>false</maven.test.skip>
+        <maven.javadoc.skip>true</maven.javadoc.skip>
+        <maven.compiler.source>1.7</maven.compiler.source>
+        <maven.compiler.target>1.7</maven.compiler.target>
+        <rocketmq.version>4.0.0-incubating</rocketmq.version>
+    </properties>
+
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-client</artifactId>
+            <version>${rocketmq.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.github.shyiko</groupId>
+            <artifactId>mysql-binlog-connector-java</artifactId>
+            <version>0.12.1</version>
+        </dependency>
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+            <version>6.0.6</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>1.7.5</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <version>1.0.13</version>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>druid</artifactId>
+            <version>1.0.31</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-codec</groupId>
+            <artifactId>commons-codec</artifactId>
+            <version>1.9</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.11</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <finalName>rocketmq-mysql</finalName>
+        <sourceDirectory>${project.basedir}/src/main/java</sourceDirectory>
+        <outputDirectory>${project.basedir}/target/classes</outputDirectory>
+        <resources>
+            <resource>
+                <directory>${project.basedir}/src/main/resources</directory>
+                <filtering>true</filtering>
+            </resource>
+        </resources>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>clirr-maven-plugin</artifactId>
+                <version>2.8</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-enforcer-plugin</artifactId>
+                <version>1.4.1</version>
+                <executions>
+                    <execution>
+                        <id>enforce-ban-circular-dependencies</id>
+                        <goals>
+                            <goal>enforce</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <rules>
+                        <banCircularDependencies/>
+                    </rules>
+                    <fail>true</fail>
+                </configuration>
+                <dependencies>
+                    <dependency>
+                        <groupId>org.codehaus.mojo</groupId>
+                        <artifactId>extra-enforcer-rules</artifactId>
+                        <version>1.0-beta-6</version>
+                    </dependency>
+                </dependencies>
+            </plugin>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.5.1</version>
+                <configuration>
+                    <source>${maven.compiler.source}</source>
+                    <target>${maven.compiler.target}</target>
+                    <compilerVersion>${maven.compiler.source}</compilerVersion>
+                    <showDeprecation>true</showDeprecation>
+                    <showWarnings>true</showWarnings>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-javadoc-plugin</artifactId>
+                <version>2.10.4</version>
+                <configuration>
+                    <charset>UTF-8</charset>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>attach-javadocs</id>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-source-plugin</artifactId>
+                <version>3.0.1</version>
+                <executions>
+                    <execution>
+                        <id>attach-sources</id>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>findbugs-maven-plugin</artifactId>
+                <version>3.0.4</version>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <version>0.12</version>
+                <configuration>
+                    <excludes>
+                        <exclude>README.md</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+                <version>2.17</version>
+                <executions>
+                    <execution>
+                        <id>verify</id>
+                        <phase>verify</phase>
+                        <configuration>
+                            <configLocation>style/rmq_checkstyle.xml</configLocation>
+                            <encoding>UTF-8</encoding>
+                            <consoleOutput>true</consoleOutput>
+                            <failsOnError>true</failsOnError>
+                            <includeTestSourceDirectory>false</includeTestSourceDirectory>
+                            <includeTestResources>false</includeTestResources>
+                        </configuration>
+                        <goals>
+                            <goal>check</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+
+            <plugin>
+                <artifactId>maven-jar-plugin</artifactId>
+                <version>2.4</version>
+                <configuration>
+                    <archive>
+                        <addMavenDescriptor>false</addMavenDescriptor>
+                        <manifest>
+                            <addClasspath>true</addClasspath>
+                            <classpathPrefix>lib/</classpathPrefix>
+                        </manifest>
+                    </archive>
+                    <excludes>
+                        <exclude>rocketmq_mysql.conf</exclude>
+                        <exclude>logback.xml</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>2.4</version>
+                <configuration>
+                    <descriptors>
+                        <descriptor>src/main/assembly/assembly.xml</descriptor>
+                    </descriptors>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/assembly/assembly.xml
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/assembly/assembly.xml b/rocketmq-mysql/src/main/assembly/assembly.xml
new file mode 100644
index 0000000..b280aa6
--- /dev/null
+++ b/rocketmq-mysql/src/main/assembly/assembly.xml
@@ -0,0 +1,61 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<assembly
+	xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
+	<id>pack</id>
+	<formats>
+		<format>tar.gz</format>
+		<format>dir</format>
+	</formats>
+	<includeBaseDirectory>false</includeBaseDirectory>
+	<dependencySets>
+		<dependencySet>
+			<useProjectArtifact>true</useProjectArtifact>
+			<outputDirectory>lib</outputDirectory>
+		</dependencySet>
+	</dependencySets>
+	<fileSets>
+		<fileSet>
+			<directory>src/main/assembly/scripts</directory>
+			<outputDirectory>bin</outputDirectory>
+			<fileMode>0755</fileMode>
+		</fileSet>
+		<fileSet>
+			<directory>target/classes</directory>
+			<outputDirectory>conf</outputDirectory>
+			<fileMode>0755</fileMode>
+			<includes>
+				<include>*.conf</include>
+				<include>logback.xml</include>
+			</includes>
+		</fileSet>
+	</fileSets>
+
+	<files>
+		<file>
+			<source>LICENSE-BIN</source>
+			<destName>LICENSE</destName>
+		</file>
+		<file>
+			<source>NOTICE-BIN</source>
+			<destName>NOTICE</destName>
+		</file>
+	</files>
+</assembly>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/assembly/scripts/start.sh
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/assembly/scripts/start.sh b/rocketmq-mysql/src/main/assembly/scripts/start.sh
new file mode 100644
index 0000000..e159f36
--- /dev/null
+++ b/rocketmq-mysql/src/main/assembly/scripts/start.sh
@@ -0,0 +1,23 @@
+#!/usr/bin/env bash
+
+binPath=$(cd "$(dirname "$0")"; pwd);
+cd $binPath
+cd ..
+parentPath=`pwd`
+libPath=$parentPath/lib/
+
+
+function exportClassPath(){
+    jarFileList=`find "$libPath" -name *.jar |awk -F'/' '{print $(NF)}' 2>>/dev/null`
+    CLASSPATH=".:$binPath";
+    for jarItem in $jarFileList
+      do
+        CLASSPATH="$CLASSPATH:$libPath$jarItem"
+    done
+    CLASSPATH=$CLASSPATH:./conf
+    export CLASSPATH
+}
+ulimit -n 65535
+exportClassPath
+
+java -server -Xms512m -Xmx512m -Xss2m -XX:NewRatio=2  -XX:+UseGCOverheadLimit -XX:-UseParallelGC -XX:ParallelGCThreads=24 org.apache.rocketmq.mysql.Replicator

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/assembly/scripts/stop.sh
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/assembly/scripts/stop.sh b/rocketmq-mysql/src/main/assembly/scripts/stop.sh
new file mode 100755
index 0000000..f0e3c0d
--- /dev/null
+++ b/rocketmq-mysql/src/main/assembly/scripts/stop.sh
@@ -0,0 +1,18 @@
+#!/bin/bash
+
+PROGRAM_NAME="org.apache.rocketmq.mysql.Replicator"
+PIDS=`ps -ef | grep $PROGRAM_NAME | grep -v "grep" | awk '{print $2}'`
+
+if [ -z $PIDS ]; then
+    echo "No this process."
+else
+    echo "Find process is $PIDS."
+fi
+
+#####kill####
+echo -e "Stopping the $PROGRAM_NAME...\c"
+for PID in $PIDS ; do
+    kill  $PID
+done
+
+echo "SUCCESS!"

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Config.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Config.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Config.java
new file mode 100644
index 0000000..6c14cb4
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Config.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.Method;
+import java.util.Properties;
+
+
+public class Config {
+
+    public String mysqlAddr;
+    public Integer mysqlPort;
+    public String mysqlUsername;
+    public String mysqlPassword;
+
+    public String mqNamesrvAddr;
+    public String mqTopic;
+
+    public String startType = "DEFAULT";
+    public String binlogFilename;
+    public Long nextPosition;
+    public Integer maxTransactionRows = 100;
+
+    public void load() throws IOException {
+
+        InputStream in = Config.class.getClassLoader().getResourceAsStream("rocketmq_mysql.conf");
+        Properties properties = new Properties();
+        properties.load(in);
+
+        properties2Object(properties, this);
+
+    }
+
+    private void properties2Object(final Properties p, final Object object) {
+        Method[] methods = object.getClass().getMethods();
+        for (Method method : methods) {
+            String mn = method.getName();
+            if (mn.startsWith("set")) {
+                try {
+                    String tmp = mn.substring(4);
+                    String first = mn.substring(3, 4);
+
+                    String key = first.toLowerCase() + tmp;
+                    String property = p.getProperty(key);
+                    if (property != null) {
+                        Class<?>[] pt = method.getParameterTypes();
+                        if (pt != null && pt.length > 0) {
+                            String cn = pt[0].getSimpleName();
+                            Object arg;
+                            if (cn.equals("int") || cn.equals("Integer")) {
+                                arg = Integer.parseInt(property);
+                            } else if (cn.equals("long") || cn.equals("Long")) {
+                                arg = Long.parseLong(property);
+                            } else if (cn.equals("double") || cn.equals("Double")) {
+                                arg = Double.parseDouble(property);
+                            } else if (cn.equals("boolean") || cn.equals("Boolean")) {
+                                arg = Boolean.parseBoolean(property);
+                            } else if (cn.equals("float") || cn.equals("Float")) {
+                                arg = Float.parseFloat(property);
+                            } else if (cn.equals("String")) {
+                                arg = property;
+                            } else {
+                                continue;
+                            }
+                            method.invoke(object, arg);
+                        }
+                    }
+                } catch (Throwable ignored) {
+                }
+            }
+        }
+    }
+
+    public void setMysqlAddr(String mysqlAddr) {
+        this.mysqlAddr = mysqlAddr;
+    }
+
+    public void setMysqlPort(Integer mysqlPort) {
+        this.mysqlPort = mysqlPort;
+    }
+
+    public void setMysqlUsername(String mysqlUsername) {
+        this.mysqlUsername = mysqlUsername;
+    }
+
+    public void setMysqlPassword(String mysqlPassword) {
+        this.mysqlPassword = mysqlPassword;
+    }
+
+    public void setBinlogFilename(String binlogFilename) {
+        this.binlogFilename = binlogFilename;
+    }
+
+    public void setNextPosition(Long nextPosition) {
+        this.nextPosition = nextPosition;
+    }
+
+    public void setMaxTransactionRows(Integer maxTransactionRows) {
+        this.maxTransactionRows = maxTransactionRows;
+    }
+
+    public void setMqNamesrvAddr(String mqNamesrvAddr) {
+        this.mqNamesrvAddr = mqNamesrvAddr;
+    }
+
+    public void setMqTopic(String mqTopic) {
+        this.mqTopic = mqTopic;
+    }
+
+    public void setStartType(String startType) {
+        this.startType = startType;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Replicator.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Replicator.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Replicator.java
new file mode 100644
index 0000000..ae3c984
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Replicator.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql;
+
+import org.apache.rocketmq.mysql.binlog.EventProcessor;
+import org.apache.rocketmq.mysql.binlog.Transaction;
+import org.apache.rocketmq.mysql.position.BinlogPositionLogThread;
+import org.apache.rocketmq.mysql.productor.RocketMQProducer;
+import org.apache.rocketmq.mysql.position.BinlogPosition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Replicator {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(Replicator.class);
+
+    private static final Logger POSITION_LOGGER = LoggerFactory.getLogger("PositionLogger");
+
+    private Config config;
+
+    private EventProcessor eventProcessor;
+
+    private RocketMQProducer rocketMQProducer;
+
+    private Object lock = new Object();
+    private BinlogPosition nextBinlogPosition;
+    private long nextQueueOffset;
+    private long xid;
+
+    public static void main(String[] args) {
+
+        Replicator replicator = new Replicator();
+        replicator.start();
+    }
+
+    public void start() {
+
+        try {
+            config = new Config();
+            config.load();
+
+            rocketMQProducer = new RocketMQProducer(config);
+            rocketMQProducer.start();
+
+            BinlogPositionLogThread binlogPositionLogThread = new BinlogPositionLogThread(this);
+            binlogPositionLogThread.start();
+
+            eventProcessor = new EventProcessor(this);
+            eventProcessor.start();
+
+        } catch (Exception e) {
+            LOGGER.error("Start error.", e);
+            System.exit(1);
+        }
+    }
+
+    public void commit(Transaction transaction, boolean isComplete) {
+
+        String json = transaction.toJson();
+
+        for (int i = 0; i < 3; i++) {
+            try {
+                if (isComplete) {
+                    long offset = rocketMQProducer.push(json);
+
+                    synchronized (lock) {
+                        xid = transaction.getXid();
+                        nextBinlogPosition = transaction.getNextBinlogPosition();
+                        nextQueueOffset = offset;
+                    }
+
+                } else {
+                    rocketMQProducer.push(json);
+                }
+                break;
+
+            } catch (Exception e) {
+                LOGGER.error("Push error,retry:" + (i + 1) + ",", e);
+            }
+        }
+    }
+
+    public void logPosition() {
+
+        String binlogFilename = null;
+        long xid = 0L;
+        long nextPosition = 0L;
+        long nextOffset = 0L;
+
+        synchronized (lock) {
+            if (nextBinlogPosition != null) {
+                xid = this.xid;
+                binlogFilename = nextBinlogPosition.getBinlogFilename();
+                nextPosition = nextBinlogPosition.getPosition();
+                nextOffset = nextQueueOffset;
+            }
+        }
+
+        if (binlogFilename != null) {
+            POSITION_LOGGER.info("XID: {},   BINLOG_FILE: {},   NEXT_POSITION: {},   NEXT_OFFSET: {}",
+                xid, binlogFilename, nextPosition, nextOffset);
+        }
+
+    }
+
+    public Config getConfig() {
+        return config;
+    }
+
+    public BinlogPosition getNextBinlogPosition() {
+        return nextBinlogPosition;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/DataRow.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/DataRow.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/DataRow.java
new file mode 100644
index 0000000..646c018
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/DataRow.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.binlog;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.rocketmq.mysql.schema.Table;
+import org.apache.rocketmq.mysql.schema.column.ColumnParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DataRow {
+    private Logger logger = LoggerFactory.getLogger(DataRow.class);
+
+    private String type;
+    private Table table;
+    private Serializable[] row;
+
+    public DataRow(String type, Table table, Serializable[] row) {
+        this.type = type;
+        this.table = table;
+        this.row = row;
+    }
+
+    public Map toMap() {
+
+        try {
+            if (table.getColList().size() == row.length) {
+                Map<String, Object> dataMap = new HashMap<>();
+                List<String> keyList = table.getColList();
+                List<ColumnParser> parserList = table.getParserList();
+
+                for (int i = 0; i < keyList.size(); i++) {
+                    Object value = row[i];
+                    ColumnParser parser = parserList.get(i);
+                    dataMap.put(keyList.get(i), parser.getValue(value));
+                }
+
+                Map<String, Object> map = new HashMap<>();
+                map.put("database", table.getDatabase());
+                map.put("table", table.getName());
+                map.put("type", type);
+                map.put("data", dataMap);
+
+                return map;
+            } else {
+                logger.error("Table schema changed,discard data: {} - {}, {}  {}",
+                    table.getDatabase().toUpperCase(), table.getName().toUpperCase(), type, row.toString());
+
+                return null;
+            }
+        } catch (Exception e) {
+            logger.error("Row parse error,discard data: {} - {}, {}  {}",
+                table.getDatabase().toUpperCase(), table.getName().toUpperCase(), type, row.toString());
+        }
+
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventListener.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventListener.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventListener.java
new file mode 100644
index 0000000..b5005bc
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventListener.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.binlog;
+
+import com.github.shyiko.mysql.binlog.BinaryLogClient;
+import com.github.shyiko.mysql.binlog.event.Event;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class EventListener implements BinaryLogClient.EventListener, BinaryLogClient.LifecycleListener {
+
+    private BlockingQueue<Event> queue;
+
+    public EventListener(BlockingQueue<Event> queue) {
+        this.queue = queue;
+    }
+
+    @Override
+    public void onEvent(Event event) {
+        try {
+            while (true) {
+                if (queue.offer(event, 100, TimeUnit.MILLISECONDS)) {
+                    return;
+                }
+            }
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public void onConnect(BinaryLogClient client) {
+
+    }
+
+    @Override
+    public void onCommunicationFailure(BinaryLogClient client, Exception e) {
+
+    }
+
+    @Override
+    public void onEventDeserializationFailure(BinaryLogClient client, Exception e) {
+
+    }
+
+    @Override
+    public void onDisconnect(BinaryLogClient client) {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java
new file mode 100644
index 0000000..a730403
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.binlog;
+
+import com.alibaba.druid.pool.DruidDataSourceFactory;
+import com.github.shyiko.mysql.binlog.BinaryLogClient;
+import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
+import com.github.shyiko.mysql.binlog.event.Event;
+import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
+import com.github.shyiko.mysql.binlog.event.QueryEventData;
+import com.github.shyiko.mysql.binlog.event.TableMapEventData;
+import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
+import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
+import com.github.shyiko.mysql.binlog.event.XidEventData;
+import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+import javax.sql.DataSource;
+import org.apache.rocketmq.mysql.Config;
+import org.apache.rocketmq.mysql.Replicator;
+import org.apache.rocketmq.mysql.position.BinlogPosition;
+import org.apache.rocketmq.mysql.position.BinlogPositionManager;
+import org.apache.rocketmq.mysql.schema.Schema;
+import org.apache.rocketmq.mysql.schema.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EventProcessor {
+    private static final Logger LOGGER = LoggerFactory.getLogger(EventProcessor.class);
+
+    private Replicator replicator;
+    private Config config;
+
+    private DataSource dataSource;
+
+    private BinlogPositionManager binlogPositionManager;
+
+    private BlockingQueue<Event> queue = new LinkedBlockingQueue<>(100);
+
+    private BinaryLogClient binaryLogClient;
+
+    private EventListener eventListener;
+
+    private Schema schema;
+
+    private Map<Long, Table> tableMap = new HashMap<>();
+
+    private Transaction transaction;
+
+    public EventProcessor(Replicator replicator) {
+
+        this.replicator = replicator;
+        this.config = replicator.getConfig();
+    }
+
+    public void start() throws Exception {
+
+        initDataSource();
+
+        binlogPositionManager = new BinlogPositionManager(config, dataSource);
+        binlogPositionManager.initBeginPosition();
+
+        schema = new Schema(dataSource);
+        schema.load();
+
+        eventListener = new EventListener(queue);
+        binaryLogClient = new BinaryLogClient(config.mysqlAddr,
+            config.mysqlPort,
+            config.mysqlUsername,
+            config.mysqlPassword);
+        binaryLogClient.setBlocking(true);
+        binaryLogClient.setServerId(1001);
+
+        EventDeserializer eventDeserializer = new EventDeserializer();
+        eventDeserializer.setCompatibilityMode(EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
+            EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY);
+        binaryLogClient.setEventDeserializer(eventDeserializer);
+        binaryLogClient.registerEventListener(eventListener);
+        binaryLogClient.setBinlogFilename(binlogPositionManager.getBinlogFilename());
+        binaryLogClient.setBinlogPosition(binlogPositionManager.getPosition());
+
+        binaryLogClient.connect(3000);
+
+        LOGGER.info("Started.");
+
+        doProcess();
+    }
+
+    private void doProcess() {
+
+        while (true) {
+
+            try {
+                Event event = queue.poll(1000, TimeUnit.MILLISECONDS);
+                if (event == null) {
+                    checkConnection();
+                    continue;
+                }
+
+                switch (event.getHeader().getEventType()) {
+                    case TABLE_MAP:
+                        processTableMapEvent(event);
+                        break;
+
+                    case WRITE_ROWS:
+                    case EXT_WRITE_ROWS:
+                        processWriteEvent(event);
+                        break;
+
+                    case UPDATE_ROWS:
+                    case EXT_UPDATE_ROWS:
+                        processUpdateEvent(event);
+                        break;
+
+                    case DELETE_ROWS:
+                    case EXT_DELETE_ROWS:
+                        processDeleteEvent(event);
+                        break;
+
+                    case QUERY:
+                        processQueryEvent(event);
+                        break;
+
+                    case XID:
+                        processXidEvent(event);
+                        break;
+
+                }
+            } catch (Exception e) {
+                LOGGER.error("Binlog process error.", e);
+            }
+
+        }
+    }
+
+    private void checkConnection() throws Exception {
+
+        if (!binaryLogClient.isConnected()) {
+            BinlogPosition binlogPosition = replicator.getNextBinlogPosition();
+            if (binlogPosition != null) {
+                binaryLogClient.setBinlogFilename(binlogPosition.getBinlogFilename());
+                binaryLogClient.setBinlogPosition(binlogPosition.getPosition());
+            }
+
+            binaryLogClient.connect(3000);
+        }
+    }
+
+    private void processTableMapEvent(Event event) {
+        TableMapEventData data = event.getData();
+        String dbName = data.getDatabase();
+        String tableName = data.getTable();
+        Long tableId = data.getTableId();
+
+        Table table = schema.getTable(dbName, tableName);
+
+        tableMap.put(tableId, table);
+    }
+
+    private void processWriteEvent(Event event) {
+        WriteRowsEventData data = event.getData();
+        Long tableId = data.getTableId();
+        List<Serializable[]> list = data.getRows();
+
+        for (Serializable[] row : list) {
+            addRow("WRITE", tableId, row);
+        }
+    }
+
+    private void processUpdateEvent(Event event) {
+        UpdateRowsEventData data = event.getData();
+        Long tableId = data.getTableId();
+        List<Map.Entry<Serializable[], Serializable[]>> list = data.getRows();
+
+        for (Map.Entry<Serializable[], Serializable[]> entry : list) {
+            addRow("UPDATE", tableId, entry.getValue());
+        }
+    }
+
+    private void processDeleteEvent(Event event) {
+        DeleteRowsEventData data = event.getData();
+        Long tableId = data.getTableId();
+        List<Serializable[]> list = data.getRows();
+
+        for (Serializable[] row : list) {
+            addRow("DELETE", tableId, row);
+        }
+
+    }
+
+    private static Pattern createTablePattern =
+        Pattern.compile("^(CREATE|ALTER)\\s+TABLE", Pattern.CASE_INSENSITIVE);
+
+    private void processQueryEvent(Event event) {
+        QueryEventData data = event.getData();
+        String sql = data.getSql();
+
+        if (createTablePattern.matcher(sql).find()) {
+            schema.reset();
+        }
+    }
+
+    private void processXidEvent(Event event) {
+        EventHeaderV4 header = event.getHeader();
+        XidEventData data = event.getData();
+
+        String binlogFilename = binaryLogClient.getBinlogFilename();
+        Long position = header.getNextPosition();
+        Long xid = data.getXid();
+
+        BinlogPosition binlogPosition = new BinlogPosition(binlogFilename, position);
+        transaction.setNextBinlogPosition(binlogPosition);
+        transaction.setXid(xid);
+
+        replicator.commit(transaction, true);
+
+        transaction = new Transaction(config);
+    }
+
+    private void addRow(String type, Long tableId, Serializable[] row) {
+
+        if (transaction == null) {
+            transaction = new Transaction(config);
+        }
+
+        Table t = tableMap.get(tableId);
+        if (t != null) {
+
+            while (true) {
+                if (transaction.addRow(type, t, row)) {
+                    break;
+
+                } else {
+                    transaction.setNextBinlogPosition(replicator.getNextBinlogPosition());
+                    replicator.commit(transaction, false);
+                    transaction = new Transaction(config);
+                }
+            }
+
+        }
+    }
+
+    private void initDataSource() throws Exception {
+        Map<String, String> map = new HashMap<>();
+        map.put("driverClassName", "com.mysql.jdbc.Driver");
+        map.put("url", "jdbc:mysql://" + config.mysqlAddr + ":" + config.mysqlPort + "?useSSL=true&verifyServerCertificate=false");
+        map.put("username", config.mysqlUsername);
+        map.put("password", config.mysqlPassword);
+        map.put("initialSize", "2");
+        map.put("maxActive", "2");
+        map.put("maxWait", "60000");
+        map.put("timeBetweenEvictionRunsMillis", "60000");
+        map.put("minEvictableIdleTimeMillis", "300000");
+        map.put("validationQuery", "SELECT 1 FROM DUAL");
+        map.put("testWhileIdle", "true");
+
+        dataSource = DruidDataSourceFactory.createDataSource(map);
+    }
+
+    public Config getConfig() {
+        return config;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/Transaction.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/Transaction.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/Transaction.java
new file mode 100644
index 0000000..396815a
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/Transaction.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.binlog;
+
+import com.alibaba.fastjson.JSONObject;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import org.apache.rocketmq.mysql.Config;
+import org.apache.rocketmq.mysql.position.BinlogPosition;
+import org.apache.rocketmq.mysql.schema.Table;
+
+public class Transaction {
+    private BinlogPosition nextBinlogPosition;
+    private Long xid;
+
+    private Config config;
+
+    private List<DataRow> list = new LinkedList<>();
+
+    public Transaction(Config config) {
+        this.config = config;
+    }
+
+    public boolean addRow(String type, Table table, Serializable[] row) {
+
+        if (list.size() == config.maxTransactionRows) {
+            return false;
+        } else {
+            DataRow dataRow = new DataRow(type, table, row);
+            list.add(dataRow);
+            return true;
+        }
+
+    }
+
+    public String toJson() {
+
+        List<Map> rows = new LinkedList<>();
+        for (DataRow dataRow : list) {
+            Map rowMap = dataRow.toMap();
+            if (rowMap != null) {
+                rows.add(rowMap);
+            }
+        }
+
+        Map<String, Object> map = new HashMap<>();
+        map.put("xid", xid);
+        map.put("binlogFilename", nextBinlogPosition.getBinlogFilename());
+        map.put("nextPosition", nextBinlogPosition.getPosition());
+        map.put("rows", rows);
+
+        return JSONObject.toJSONString(map);
+    }
+
+    public BinlogPosition getNextBinlogPosition() {
+        return nextBinlogPosition;
+    }
+
+    public void setNextBinlogPosition(BinlogPosition nextBinlogPosition) {
+        this.nextBinlogPosition = nextBinlogPosition;
+    }
+
+    public void setXid(Long xid) {
+        this.xid = xid;
+    }
+
+    public Long getXid() {
+        return xid;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPosition.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPosition.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPosition.java
new file mode 100644
index 0000000..5ba436c
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPosition.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.position;
+
+public class BinlogPosition {
+
+    private String binlogFilename;
+    private Long position;
+
+    public BinlogPosition(String binlogFilename, Long position) {
+        this.binlogFilename = binlogFilename;
+        this.position = position;
+    }
+
+    public String getBinlogFilename() {
+        return binlogFilename;
+    }
+
+    public void setBinlogFilename(String binlogFilename) {
+        this.binlogFilename = binlogFilename;
+    }
+
+    public Long getPosition() {
+        return position;
+    }
+
+    public void setPosition(Long position) {
+        this.position = position;
+    }
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionLogThread.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionLogThread.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionLogThread.java
new file mode 100644
index 0000000..dedb08f
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionLogThread.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.position;
+
+import org.apache.rocketmq.mysql.Replicator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BinlogPositionLogThread extends Thread {
+    private Logger logger = LoggerFactory.getLogger(BinlogPositionLogThread.class);
+
+    private Replicator replicator;
+
+    public BinlogPositionLogThread(Replicator replicator) {
+        this.replicator = replicator;
+        setDaemon(true);
+    }
+
+    @Override
+    public void run() {
+
+        while (true) {
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+                logger.error("Offset thread interrupted.", e);
+            }
+
+            replicator.logPosition();
+        }
+    }
+}