You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/08/22 14:04:34 UTC

incubator-rocketmq-externals git commit: Prepare release mysql replicator 1.1.0 version

Repository: incubator-rocketmq-externals
Updated Branches:
  refs/heads/master fdf7fc4a3 -> 343ab198e


Prepare release mysql replicator 1.1.0 version

Closes #26 from zhaoqun911/mysql-develop.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/commit/343ab198
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/tree/343ab198
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/diff/343ab198

Branch: refs/heads/master
Commit: 343ab198eb7987e50588d8209ded6a4db5e9de83
Parents: fdf7fc4
Author: zhaoqun007 <91...@zhaoqun911.cn>
Authored: Mon Aug 14 19:54:09 2017 +0800
Committer: yukon <yu...@apache.org>
Committed: Tue Aug 22 22:03:21 2017 +0800

----------------------------------------------------------------------
 rocketmq-mysql/LICENSE-BIN                      |   4 +-
 rocketmq-mysql/pom.xml                          |   8 +-
 .../apache/rocketmq/mysql/binlog/DataRow.java   |  12 +-
 .../rocketmq/mysql/binlog/EventListener.java    |  33 +++-
 .../rocketmq/mysql/binlog/EventProcessor.java   | 196 ++++++++-----------
 .../rocketmq/mysql/binlog/Transaction.java      |  10 +-
 .../mysql/position/BinlogPositionManager.java   |  11 +-
 .../schema/column/DateTimeColumnParser.java     |  15 +-
 8 files changed, 144 insertions(+), 145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/343ab198/rocketmq-mysql/LICENSE-BIN
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/LICENSE-BIN b/rocketmq-mysql/LICENSE-BIN
index 5349fbd..22b0aa4 100644
--- a/rocketmq-mysql/LICENSE-BIN
+++ b/rocketmq-mysql/LICENSE-BIN
@@ -297,5 +297,5 @@ contains test data from http://aspell.net/test/orig/batch0.tab.
 Copyright (C) 2002 Kevin Atkinson (kevina@gnu.org)
 
 ------
-This product has a bundle open-replicator, which is available under the ASL2 License.
-The source code of open-replicator can be found at https://github.com/whitesock/open-replicator.
+This product has a bundle mysql-binlog-connector-java, which is available under the ASL2 License.
+The source code of mysql-binlog-connector-java can be found at https://github.com/shyiko/mysql-binlog-connector-java.

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/343ab198/rocketmq-mysql/pom.xml
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/pom.xml b/rocketmq-mysql/pom.xml
index c09826c..23e7468 100644
--- a/rocketmq-mysql/pom.xml
+++ b/rocketmq-mysql/pom.xml
@@ -6,7 +6,7 @@
 
     <groupId>org.apache</groupId>
     <artifactId>rocketmq-mysql-replicator</artifactId>
-    <version>1.0.0</version>
+    <version>1.1.0</version>
 
     <scm>
         <url>https://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals.git</url>
@@ -82,9 +82,9 @@
             <version>${rocketmq.version}</version>
         </dependency>
         <dependency>
-            <groupId>com.zendesk</groupId>
-            <artifactId>open-replicator</artifactId>
-            <version>1.6.0</version>
+            <groupId>com.github.shyiko</groupId>
+            <artifactId>mysql-binlog-connector-java</artifactId>
+            <version>0.12.1</version>
         </dependency>
         <dependency>
             <groupId>mysql</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/343ab198/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/DataRow.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/DataRow.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/DataRow.java
index 772ffd5..646c018 100644
--- a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/DataRow.java
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/DataRow.java
@@ -17,8 +17,7 @@
 
 package org.apache.rocketmq.mysql.binlog;
 
-import com.google.code.or.common.glossary.Column;
-import com.google.code.or.common.glossary.Row;
+import java.io.Serializable;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -32,9 +31,9 @@ public class DataRow {
 
     private String type;
     private Table table;
-    private Row row;
+    private Serializable[] row;
 
-    public DataRow(String type, Table table, Row row) {
+    public DataRow(String type, Table table, Serializable[] row) {
         this.type = type;
         this.table = table;
         this.row = row;
@@ -43,14 +42,13 @@ public class DataRow {
     public Map toMap() {
 
         try {
-            if (table.getColList().size() == row.getColumns().size()) {
+            if (table.getColList().size() == row.length) {
                 Map<String, Object> dataMap = new HashMap<>();
                 List<String> keyList = table.getColList();
                 List<ColumnParser> parserList = table.getParserList();
-                List<Column> valueList = row.getColumns();
 
                 for (int i = 0; i < keyList.size(); i++) {
-                    Object value = valueList.get(i).getValue();
+                    Object value = row[i];
                     ColumnParser parser = parserList.get(i);
                     dataMap.put(keyList.get(i), parser.getValue(value));
                 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/343ab198/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventListener.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventListener.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventListener.java
index cea36a0..b5005bc 100644
--- a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventListener.java
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventListener.java
@@ -17,20 +17,21 @@
 
 package org.apache.rocketmq.mysql.binlog;
 
-import com.google.code.or.binlog.BinlogEventListener;
-import com.google.code.or.binlog.BinlogEventV4;
+import com.github.shyiko.mysql.binlog.BinaryLogClient;
+import com.github.shyiko.mysql.binlog.event.Event;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 
-public class EventListener implements BinlogEventListener {
-    private BlockingQueue<BinlogEventV4> queue;
+public class EventListener implements BinaryLogClient.EventListener, BinaryLogClient.LifecycleListener {
 
-    public EventListener(BlockingQueue<BinlogEventV4> queue) {
+    private BlockingQueue<Event> queue;
+
+    public EventListener(BlockingQueue<Event> queue) {
         this.queue = queue;
     }
 
     @Override
-    public void onEvents(BinlogEventV4 event) {
+    public void onEvent(Event event) {
         try {
             while (true) {
                 if (queue.offer(event, 100, TimeUnit.MILLISECONDS)) {
@@ -41,4 +42,24 @@ public class EventListener implements BinlogEventListener {
             e.printStackTrace();
         }
     }
+
+    @Override
+    public void onConnect(BinaryLogClient client) {
+
+    }
+
+    @Override
+    public void onCommunicationFailure(BinaryLogClient client, Exception e) {
+
+    }
+
+    @Override
+    public void onEventDeserializationFailure(BinaryLogClient client, Exception e) {
+
+    }
+
+    @Override
+    public void onDisconnect(BinaryLogClient client) {
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/343ab198/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java
index ba35d3e..a730403 100644
--- a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java
@@ -18,20 +18,17 @@
 package org.apache.rocketmq.mysql.binlog;
 
 import com.alibaba.druid.pool.DruidDataSourceFactory;
-import com.google.code.or.OpenReplicator;
-import com.google.code.or.binlog.BinlogEventV4;
-import com.google.code.or.binlog.impl.event.DeleteRowsEvent;
-import com.google.code.or.binlog.impl.event.DeleteRowsEventV2;
-import com.google.code.or.binlog.impl.event.QueryEvent;
-import com.google.code.or.binlog.impl.event.TableMapEvent;
-import com.google.code.or.binlog.impl.event.UpdateRowsEvent;
-import com.google.code.or.binlog.impl.event.UpdateRowsEventV2;
-import com.google.code.or.binlog.impl.event.WriteRowsEvent;
-import com.google.code.or.binlog.impl.event.WriteRowsEventV2;
-import com.google.code.or.binlog.impl.event.XidEvent;
-import com.google.code.or.common.glossary.Pair;
-import com.google.code.or.common.glossary.Row;
-import com.google.code.or.common.util.MySQLConstants;
+import com.github.shyiko.mysql.binlog.BinaryLogClient;
+import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
+import com.github.shyiko.mysql.binlog.event.Event;
+import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
+import com.github.shyiko.mysql.binlog.event.QueryEventData;
+import com.github.shyiko.mysql.binlog.event.TableMapEventData;
+import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
+import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
+import com.github.shyiko.mysql.binlog.event.XidEventData;
+import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
+import java.io.Serializable;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -59,9 +56,9 @@ public class EventProcessor {
 
     private BinlogPositionManager binlogPositionManager;
 
-    private BlockingQueue<BinlogEventV4> queue = new LinkedBlockingQueue<>(100);
+    private BlockingQueue<Event> queue = new LinkedBlockingQueue<>(100);
 
-    private OpenReplicator openReplicator;
+    private BinaryLogClient binaryLogClient;
 
     private EventListener eventListener;
 
@@ -88,19 +85,22 @@ public class EventProcessor {
         schema.load();
 
         eventListener = new EventListener(queue);
-        openReplicator = new OpenReplicator();
-        openReplicator.setBinlogEventListener(eventListener);
-        openReplicator.setHost(config.mysqlAddr);
-        openReplicator.setPort(config.mysqlPort);
-        openReplicator.setUser(config.mysqlUsername);
-        openReplicator.setPassword(config.mysqlPassword);
-        openReplicator.setStopOnEOF(false);
-        openReplicator.setHeartbeatPeriod(1f);
-        openReplicator.setLevel2BufferSize(50 * 1024 * 1024);
-        openReplicator.setServerId(1001);
-        openReplicator.setBinlogFileName(binlogPositionManager.getBinlogFilename());
-        openReplicator.setBinlogPosition(binlogPositionManager.getPosition());
-        openReplicator.start();
+        binaryLogClient = new BinaryLogClient(config.mysqlAddr,
+            config.mysqlPort,
+            config.mysqlUsername,
+            config.mysqlPassword);
+        binaryLogClient.setBlocking(true);
+        binaryLogClient.setServerId(1001);
+
+        EventDeserializer eventDeserializer = new EventDeserializer();
+        eventDeserializer.setCompatibilityMode(EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
+            EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY);
+        binaryLogClient.setEventDeserializer(eventDeserializer);
+        binaryLogClient.registerEventListener(eventListener);
+        binaryLogClient.setBinlogFilename(binlogPositionManager.getBinlogFilename());
+        binaryLogClient.setBinlogPosition(binlogPositionManager.getPosition());
+
+        binaryLogClient.connect(3000);
 
         LOGGER.info("Started.");
 
@@ -112,46 +112,37 @@ public class EventProcessor {
         while (true) {
 
             try {
-                BinlogEventV4 event = queue.poll(100, TimeUnit.MILLISECONDS);
+                Event event = queue.poll(1000, TimeUnit.MILLISECONDS);
                 if (event == null) {
                     checkConnection();
                     continue;
                 }
 
                 switch (event.getHeader().getEventType()) {
-                    case MySQLConstants.TABLE_MAP_EVENT:
+                    case TABLE_MAP:
                         processTableMapEvent(event);
                         break;
 
-                    case MySQLConstants.WRITE_ROWS_EVENT:
+                    case WRITE_ROWS:
+                    case EXT_WRITE_ROWS:
                         processWriteEvent(event);
                         break;
 
-                    case MySQLConstants.WRITE_ROWS_EVENT_V2:
-                        processWriteEventV2(event);
-                        break;
-
-                    case MySQLConstants.UPDATE_ROWS_EVENT:
+                    case UPDATE_ROWS:
+                    case EXT_UPDATE_ROWS:
                         processUpdateEvent(event);
                         break;
 
-                    case MySQLConstants.UPDATE_ROWS_EVENT_V2:
-                        processUpdateEventV2(event);
-                        break;
-
-                    case MySQLConstants.DELETE_ROWS_EVENT:
+                    case DELETE_ROWS:
+                    case EXT_DELETE_ROWS:
                         processDeleteEvent(event);
                         break;
 
-                    case MySQLConstants.DELETE_ROWS_EVENT_V2:
-                        processDeleteEventV2(event);
-                        break;
-
-                    case MySQLConstants.QUERY_EVENT:
+                    case QUERY:
                         processQueryEvent(event);
                         break;
 
-                    case MySQLConstants.XID_EVENT:
+                    case XID:
                         processXidEvent(event);
                         break;
 
@@ -165,86 +156,54 @@ public class EventProcessor {
 
     private void checkConnection() throws Exception {
 
-        if (!openReplicator.isRunning()) {
+        if (!binaryLogClient.isConnected()) {
             BinlogPosition binlogPosition = replicator.getNextBinlogPosition();
             if (binlogPosition != null) {
-                openReplicator.setBinlogFileName(binlogPosition.getBinlogFilename());
-                openReplicator.setBinlogPosition(binlogPosition.getPosition());
+                binaryLogClient.setBinlogFilename(binlogPosition.getBinlogFilename());
+                binaryLogClient.setBinlogPosition(binlogPosition.getPosition());
             }
 
-            openReplicator.start();
+            binaryLogClient.connect(3000);
         }
     }
 
-    private void processTableMapEvent(BinlogEventV4 event) {
-        TableMapEvent tableMapEvent = (TableMapEvent) event;
-        String dbName = tableMapEvent.getDatabaseName().toString();
-        String tableName = tableMapEvent.getTableName().toString();
-        Long tableId = tableMapEvent.getTableId();
+    private void processTableMapEvent(Event event) {
+        TableMapEventData data = event.getData();
+        String dbName = data.getDatabase();
+        String tableName = data.getTable();
+        Long tableId = data.getTableId();
 
         Table table = schema.getTable(dbName, tableName);
 
         tableMap.put(tableId, table);
     }
 
-    private void processWriteEvent(BinlogEventV4 event) {
-        WriteRowsEvent writeRowsEvent = (WriteRowsEvent) event;
-        Long tableId = writeRowsEvent.getTableId();
-        List<Row> list = writeRowsEvent.getRows();
+    private void processWriteEvent(Event event) {
+        WriteRowsEventData data = event.getData();
+        Long tableId = data.getTableId();
+        List<Serializable[]> list = data.getRows();
 
-        for (Row row : list) {
+        for (Serializable[] row : list) {
             addRow("WRITE", tableId, row);
         }
     }
 
-    private void processWriteEventV2(BinlogEventV4 event) {
-        WriteRowsEventV2 writeRowsEventV2 = (WriteRowsEventV2) event;
-        Long tableId = writeRowsEventV2.getTableId();
-        List<Row> list = writeRowsEventV2.getRows();
-
-        for (Row row : list) {
-            addRow("WRITE", tableId, row);
-        }
-
-    }
-
-    private void processUpdateEvent(BinlogEventV4 event) {
-        UpdateRowsEvent updateRowsEvent = (UpdateRowsEvent) event;
-        Long tableId = updateRowsEvent.getTableId();
-        List<Pair<Row>> list = updateRowsEvent.getRows();
-
-        for (Pair<Row> pair : list) {
-            addRow("UPDATE", tableId, pair.getAfter());
-        }
-    }
-
-    private void processUpdateEventV2(BinlogEventV4 event) {
-        UpdateRowsEventV2 updateRowsEventV2 = (UpdateRowsEventV2) event;
-        Long tableId = updateRowsEventV2.getTableId();
-        List<Pair<Row>> list = updateRowsEventV2.getRows();
-
-        for (Pair<Row> pair : list) {
-            addRow("UPDATE", tableId, pair.getAfter());
-        }
-    }
-
-    private void processDeleteEvent(BinlogEventV4 event) {
-        DeleteRowsEvent deleteRowsEvent = (DeleteRowsEvent) event;
-        Long tableId = deleteRowsEvent.getTableId();
-        List<Row> list = deleteRowsEvent.getRows();
+    private void processUpdateEvent(Event event) {
+        UpdateRowsEventData data = event.getData();
+        Long tableId = data.getTableId();
+        List<Map.Entry<Serializable[], Serializable[]>> list = data.getRows();
 
-        for (Row row : list) {
-            addRow("DELETE", tableId, row);
+        for (Map.Entry<Serializable[], Serializable[]> entry : list) {
+            addRow("UPDATE", tableId, entry.getValue());
         }
-
     }
 
-    private void processDeleteEventV2(BinlogEventV4 event) {
-        DeleteRowsEventV2 deleteRowsEventV2 = (DeleteRowsEventV2) event;
-        Long tableId = deleteRowsEventV2.getTableId();
-        List<Row> list = deleteRowsEventV2.getRows();
+    private void processDeleteEvent(Event event) {
+        DeleteRowsEventData data = event.getData();
+        Long tableId = data.getTableId();
+        List<Serializable[]> list = data.getRows();
 
-        for (Row row : list) {
+        for (Serializable[] row : list) {
             addRow("DELETE", tableId, row);
         }
 
@@ -253,20 +212,22 @@ public class EventProcessor {
     private static Pattern createTablePattern =
         Pattern.compile("^(CREATE|ALTER)\\s+TABLE", Pattern.CASE_INSENSITIVE);
 
-    private void processQueryEvent(BinlogEventV4 event) {
-        QueryEvent queryEvent = (QueryEvent) event;
-        String sql = queryEvent.getSql().toString();
+    private void processQueryEvent(Event event) {
+        QueryEventData data = event.getData();
+        String sql = data.getSql();
 
         if (createTablePattern.matcher(sql).find()) {
             schema.reset();
         }
     }
 
-    private void processXidEvent(BinlogEventV4 event) {
-        XidEvent xidEvent = (XidEvent) event;
-        String binlogFilename = xidEvent.getBinlogFilename();
-        Long position = xidEvent.getHeader().getNextPosition();
-        Long xid = xidEvent.getXid();
+    private void processXidEvent(Event event) {
+        EventHeaderV4 header = event.getHeader();
+        XidEventData data = event.getData();
+
+        String binlogFilename = binaryLogClient.getBinlogFilename();
+        Long position = header.getNextPosition();
+        Long xid = data.getXid();
 
         BinlogPosition binlogPosition = new BinlogPosition(binlogFilename, position);
         transaction.setNextBinlogPosition(binlogPosition);
@@ -274,14 +235,13 @@ public class EventProcessor {
 
         replicator.commit(transaction, true);
 
-        transaction = new Transaction(this);
-
+        transaction = new Transaction(config);
     }
 
-    private void addRow(String type, Long tableId, Row row) {
+    private void addRow(String type, Long tableId, Serializable[] row) {
 
         if (transaction == null) {
-            transaction = new Transaction(this);
+            transaction = new Transaction(config);
         }
 
         Table t = tableMap.get(tableId);
@@ -294,7 +254,7 @@ public class EventProcessor {
                 } else {
                     transaction.setNextBinlogPosition(replicator.getNextBinlogPosition());
                     replicator.commit(transaction, false);
-                    transaction = new Transaction(this);
+                    transaction = new Transaction(config);
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/343ab198/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/Transaction.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/Transaction.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/Transaction.java
index 9656a04..396815a 100644
--- a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/Transaction.java
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/Transaction.java
@@ -18,7 +18,7 @@
 package org.apache.rocketmq.mysql.binlog;
 
 import com.alibaba.fastjson.JSONObject;
-import com.google.code.or.common.glossary.Row;
+import java.io.Serializable;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -31,17 +31,15 @@ public class Transaction {
     private BinlogPosition nextBinlogPosition;
     private Long xid;
 
-    private EventProcessor eventProcessor;
     private Config config;
 
     private List<DataRow> list = new LinkedList<>();
 
-    public Transaction(EventProcessor eventProcessor) {
-        this.eventProcessor = eventProcessor;
-        this.config = eventProcessor.getConfig();
+    public Transaction(Config config) {
+        this.config = config;
     }
 
-    public boolean addRow(String type, Table table, Row row) {
+    public boolean addRow(String type, Table table, Serializable[] row) {
 
         if (list.size() == config.maxTransactionRows) {
             return false;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/343ab198/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java
index bf621b5..fd6555c 100644
--- a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java
@@ -31,8 +31,12 @@ import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 import org.apache.rocketmq.mysql.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class BinlogPositionManager {
+    private Logger logger = LoggerFactory.getLogger(BinlogPositionManager.class);
+
     private DataSource dataSource;
     private Config config;
 
@@ -67,7 +71,12 @@ public class BinlogPositionManager {
     }
 
     private void initPositionDefault() throws Exception {
-        initPositionFromMqTail();
+
+        try {
+            initPositionFromMqTail();
+        } catch (Exception e) {
+            logger.error("Init position from mq error.", e);
+        }
 
         if (binlogFilename == null || nextPosition == null) {
             initPositionFromBinlogTail();

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/343ab198/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DateTimeColumnParser.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DateTimeColumnParser.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DateTimeColumnParser.java
index 97339d8..6b60abd 100644
--- a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DateTimeColumnParser.java
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DateTimeColumnParser.java
@@ -19,10 +19,19 @@ package org.apache.rocketmq.mysql.schema.column;
 
 import java.sql.Timestamp;
 import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.TimeZone;
 
 public class DateTimeColumnParser extends ColumnParser {
 
-    private static SimpleDateFormat dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    private static SimpleDateFormat dateTimeFormat;
+    private static SimpleDateFormat dateTimeUtcFormat;
+
+    static {
+        dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        dateTimeUtcFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        dateTimeUtcFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+    }
 
     @Override
     public Object getValue(Object value) {
@@ -35,6 +44,10 @@ public class DateTimeColumnParser extends ColumnParser {
             return dateTimeFormat.format(value);
         }
 
+        if (value instanceof Long) {
+            return dateTimeUtcFormat.format(new Date((Long) value));
+        }
+
         return value;
     }
 }