You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/08/22 14:04:34 UTC
incubator-rocketmq-externals git commit: Prepare release mysql
replicator 1.1.0 version
Repository: incubator-rocketmq-externals
Updated Branches:
refs/heads/master fdf7fc4a3 -> 343ab198e
Prepare release mysql replicator 1.1.0 version
Closes #26 from zhaoqun911/mysql-develop.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/commit/343ab198
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/tree/343ab198
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/diff/343ab198
Branch: refs/heads/master
Commit: 343ab198eb7987e50588d8209ded6a4db5e9de83
Parents: fdf7fc4
Author: zhaoqun007 <91...@zhaoqun911.cn>
Authored: Mon Aug 14 19:54:09 2017 +0800
Committer: yukon <yu...@apache.org>
Committed: Tue Aug 22 22:03:21 2017 +0800
----------------------------------------------------------------------
rocketmq-mysql/LICENSE-BIN | 4 +-
rocketmq-mysql/pom.xml | 8 +-
.../apache/rocketmq/mysql/binlog/DataRow.java | 12 +-
.../rocketmq/mysql/binlog/EventListener.java | 33 +++-
.../rocketmq/mysql/binlog/EventProcessor.java | 196 ++++++++-----------
.../rocketmq/mysql/binlog/Transaction.java | 10 +-
.../mysql/position/BinlogPositionManager.java | 11 +-
.../schema/column/DateTimeColumnParser.java | 15 +-
8 files changed, 144 insertions(+), 145 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/343ab198/rocketmq-mysql/LICENSE-BIN
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/LICENSE-BIN b/rocketmq-mysql/LICENSE-BIN
index 5349fbd..22b0aa4 100644
--- a/rocketmq-mysql/LICENSE-BIN
+++ b/rocketmq-mysql/LICENSE-BIN
@@ -297,5 +297,5 @@ contains test data from http://aspell.net/test/orig/batch0.tab.
Copyright (C) 2002 Kevin Atkinson (kevina@gnu.org)
------
-This product has a bundle open-replicator, which is available under the ASL2 License.
-The source code of open-replicator can be found at https://github.com/whitesock/open-replicator.
+This product has a bundle mysql-binlog-connector-java, which is available under the ASL2 License.
+The source code of mysql-binlog-connector-java can be found at https://github.com/shyiko/mysql-binlog-connector-java.
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/343ab198/rocketmq-mysql/pom.xml
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/pom.xml b/rocketmq-mysql/pom.xml
index c09826c..23e7468 100644
--- a/rocketmq-mysql/pom.xml
+++ b/rocketmq-mysql/pom.xml
@@ -6,7 +6,7 @@
<groupId>org.apache</groupId>
<artifactId>rocketmq-mysql-replicator</artifactId>
- <version>1.0.0</version>
+ <version>1.1.0</version>
<scm>
<url>https://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals.git</url>
@@ -82,9 +82,9 @@
<version>${rocketmq.version}</version>
</dependency>
<dependency>
- <groupId>com.zendesk</groupId>
- <artifactId>open-replicator</artifactId>
- <version>1.6.0</version>
+ <groupId>com.github.shyiko</groupId>
+ <artifactId>mysql-binlog-connector-java</artifactId>
+ <version>0.12.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/343ab198/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/DataRow.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/DataRow.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/DataRow.java
index 772ffd5..646c018 100644
--- a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/DataRow.java
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/DataRow.java
@@ -17,8 +17,7 @@
package org.apache.rocketmq.mysql.binlog;
-import com.google.code.or.common.glossary.Column;
-import com.google.code.or.common.glossary.Row;
+import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -32,9 +31,9 @@ public class DataRow {
private String type;
private Table table;
- private Row row;
+ private Serializable[] row;
- public DataRow(String type, Table table, Row row) {
+ public DataRow(String type, Table table, Serializable[] row) {
this.type = type;
this.table = table;
this.row = row;
@@ -43,14 +42,13 @@ public class DataRow {
public Map toMap() {
try {
- if (table.getColList().size() == row.getColumns().size()) {
+ if (table.getColList().size() == row.length) {
Map<String, Object> dataMap = new HashMap<>();
List<String> keyList = table.getColList();
List<ColumnParser> parserList = table.getParserList();
- List<Column> valueList = row.getColumns();
for (int i = 0; i < keyList.size(); i++) {
- Object value = valueList.get(i).getValue();
+ Object value = row[i];
ColumnParser parser = parserList.get(i);
dataMap.put(keyList.get(i), parser.getValue(value));
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/343ab198/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventListener.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventListener.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventListener.java
index cea36a0..b5005bc 100644
--- a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventListener.java
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventListener.java
@@ -17,20 +17,21 @@
package org.apache.rocketmq.mysql.binlog;
-import com.google.code.or.binlog.BinlogEventListener;
-import com.google.code.or.binlog.BinlogEventV4;
+import com.github.shyiko.mysql.binlog.BinaryLogClient;
+import com.github.shyiko.mysql.binlog.event.Event;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
-public class EventListener implements BinlogEventListener {
- private BlockingQueue<BinlogEventV4> queue;
+public class EventListener implements BinaryLogClient.EventListener, BinaryLogClient.LifecycleListener {
- public EventListener(BlockingQueue<BinlogEventV4> queue) {
+ private BlockingQueue<Event> queue;
+
+ public EventListener(BlockingQueue<Event> queue) {
this.queue = queue;
}
@Override
- public void onEvents(BinlogEventV4 event) {
+ public void onEvent(Event event) {
try {
while (true) {
if (queue.offer(event, 100, TimeUnit.MILLISECONDS)) {
@@ -41,4 +42,24 @@ public class EventListener implements BinlogEventListener {
e.printStackTrace();
}
}
+
+ @Override
+ public void onConnect(BinaryLogClient client) {
+
+ }
+
+ @Override
+ public void onCommunicationFailure(BinaryLogClient client, Exception e) {
+
+ }
+
+ @Override
+ public void onEventDeserializationFailure(BinaryLogClient client, Exception e) {
+
+ }
+
+ @Override
+ public void onDisconnect(BinaryLogClient client) {
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/343ab198/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java
index ba35d3e..a730403 100644
--- a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java
@@ -18,20 +18,17 @@
package org.apache.rocketmq.mysql.binlog;
import com.alibaba.druid.pool.DruidDataSourceFactory;
-import com.google.code.or.OpenReplicator;
-import com.google.code.or.binlog.BinlogEventV4;
-import com.google.code.or.binlog.impl.event.DeleteRowsEvent;
-import com.google.code.or.binlog.impl.event.DeleteRowsEventV2;
-import com.google.code.or.binlog.impl.event.QueryEvent;
-import com.google.code.or.binlog.impl.event.TableMapEvent;
-import com.google.code.or.binlog.impl.event.UpdateRowsEvent;
-import com.google.code.or.binlog.impl.event.UpdateRowsEventV2;
-import com.google.code.or.binlog.impl.event.WriteRowsEvent;
-import com.google.code.or.binlog.impl.event.WriteRowsEventV2;
-import com.google.code.or.binlog.impl.event.XidEvent;
-import com.google.code.or.common.glossary.Pair;
-import com.google.code.or.common.glossary.Row;
-import com.google.code.or.common.util.MySQLConstants;
+import com.github.shyiko.mysql.binlog.BinaryLogClient;
+import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
+import com.github.shyiko.mysql.binlog.event.Event;
+import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
+import com.github.shyiko.mysql.binlog.event.QueryEventData;
+import com.github.shyiko.mysql.binlog.event.TableMapEventData;
+import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
+import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
+import com.github.shyiko.mysql.binlog.event.XidEventData;
+import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
+import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -59,9 +56,9 @@ public class EventProcessor {
private BinlogPositionManager binlogPositionManager;
- private BlockingQueue<BinlogEventV4> queue = new LinkedBlockingQueue<>(100);
+ private BlockingQueue<Event> queue = new LinkedBlockingQueue<>(100);
- private OpenReplicator openReplicator;
+ private BinaryLogClient binaryLogClient;
private EventListener eventListener;
@@ -88,19 +85,22 @@ public class EventProcessor {
schema.load();
eventListener = new EventListener(queue);
- openReplicator = new OpenReplicator();
- openReplicator.setBinlogEventListener(eventListener);
- openReplicator.setHost(config.mysqlAddr);
- openReplicator.setPort(config.mysqlPort);
- openReplicator.setUser(config.mysqlUsername);
- openReplicator.setPassword(config.mysqlPassword);
- openReplicator.setStopOnEOF(false);
- openReplicator.setHeartbeatPeriod(1f);
- openReplicator.setLevel2BufferSize(50 * 1024 * 1024);
- openReplicator.setServerId(1001);
- openReplicator.setBinlogFileName(binlogPositionManager.getBinlogFilename());
- openReplicator.setBinlogPosition(binlogPositionManager.getPosition());
- openReplicator.start();
+ binaryLogClient = new BinaryLogClient(config.mysqlAddr,
+ config.mysqlPort,
+ config.mysqlUsername,
+ config.mysqlPassword);
+ binaryLogClient.setBlocking(true);
+ binaryLogClient.setServerId(1001);
+
+ EventDeserializer eventDeserializer = new EventDeserializer();
+ eventDeserializer.setCompatibilityMode(EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
+ EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY);
+ binaryLogClient.setEventDeserializer(eventDeserializer);
+ binaryLogClient.registerEventListener(eventListener);
+ binaryLogClient.setBinlogFilename(binlogPositionManager.getBinlogFilename());
+ binaryLogClient.setBinlogPosition(binlogPositionManager.getPosition());
+
+ binaryLogClient.connect(3000);
LOGGER.info("Started.");
@@ -112,46 +112,37 @@ public class EventProcessor {
while (true) {
try {
- BinlogEventV4 event = queue.poll(100, TimeUnit.MILLISECONDS);
+ Event event = queue.poll(1000, TimeUnit.MILLISECONDS);
if (event == null) {
checkConnection();
continue;
}
switch (event.getHeader().getEventType()) {
- case MySQLConstants.TABLE_MAP_EVENT:
+ case TABLE_MAP:
processTableMapEvent(event);
break;
- case MySQLConstants.WRITE_ROWS_EVENT:
+ case WRITE_ROWS:
+ case EXT_WRITE_ROWS:
processWriteEvent(event);
break;
- case MySQLConstants.WRITE_ROWS_EVENT_V2:
- processWriteEventV2(event);
- break;
-
- case MySQLConstants.UPDATE_ROWS_EVENT:
+ case UPDATE_ROWS:
+ case EXT_UPDATE_ROWS:
processUpdateEvent(event);
break;
- case MySQLConstants.UPDATE_ROWS_EVENT_V2:
- processUpdateEventV2(event);
- break;
-
- case MySQLConstants.DELETE_ROWS_EVENT:
+ case DELETE_ROWS:
+ case EXT_DELETE_ROWS:
processDeleteEvent(event);
break;
- case MySQLConstants.DELETE_ROWS_EVENT_V2:
- processDeleteEventV2(event);
- break;
-
- case MySQLConstants.QUERY_EVENT:
+ case QUERY:
processQueryEvent(event);
break;
- case MySQLConstants.XID_EVENT:
+ case XID:
processXidEvent(event);
break;
@@ -165,86 +156,54 @@ public class EventProcessor {
private void checkConnection() throws Exception {
- if (!openReplicator.isRunning()) {
+ if (!binaryLogClient.isConnected()) {
BinlogPosition binlogPosition = replicator.getNextBinlogPosition();
if (binlogPosition != null) {
- openReplicator.setBinlogFileName(binlogPosition.getBinlogFilename());
- openReplicator.setBinlogPosition(binlogPosition.getPosition());
+ binaryLogClient.setBinlogFilename(binlogPosition.getBinlogFilename());
+ binaryLogClient.setBinlogPosition(binlogPosition.getPosition());
}
- openReplicator.start();
+ binaryLogClient.connect(3000);
}
}
- private void processTableMapEvent(BinlogEventV4 event) {
- TableMapEvent tableMapEvent = (TableMapEvent) event;
- String dbName = tableMapEvent.getDatabaseName().toString();
- String tableName = tableMapEvent.getTableName().toString();
- Long tableId = tableMapEvent.getTableId();
+ private void processTableMapEvent(Event event) {
+ TableMapEventData data = event.getData();
+ String dbName = data.getDatabase();
+ String tableName = data.getTable();
+ Long tableId = data.getTableId();
Table table = schema.getTable(dbName, tableName);
tableMap.put(tableId, table);
}
- private void processWriteEvent(BinlogEventV4 event) {
- WriteRowsEvent writeRowsEvent = (WriteRowsEvent) event;
- Long tableId = writeRowsEvent.getTableId();
- List<Row> list = writeRowsEvent.getRows();
+ private void processWriteEvent(Event event) {
+ WriteRowsEventData data = event.getData();
+ Long tableId = data.getTableId();
+ List<Serializable[]> list = data.getRows();
- for (Row row : list) {
+ for (Serializable[] row : list) {
addRow("WRITE", tableId, row);
}
}
- private void processWriteEventV2(BinlogEventV4 event) {
- WriteRowsEventV2 writeRowsEventV2 = (WriteRowsEventV2) event;
- Long tableId = writeRowsEventV2.getTableId();
- List<Row> list = writeRowsEventV2.getRows();
-
- for (Row row : list) {
- addRow("WRITE", tableId, row);
- }
-
- }
-
- private void processUpdateEvent(BinlogEventV4 event) {
- UpdateRowsEvent updateRowsEvent = (UpdateRowsEvent) event;
- Long tableId = updateRowsEvent.getTableId();
- List<Pair<Row>> list = updateRowsEvent.getRows();
-
- for (Pair<Row> pair : list) {
- addRow("UPDATE", tableId, pair.getAfter());
- }
- }
-
- private void processUpdateEventV2(BinlogEventV4 event) {
- UpdateRowsEventV2 updateRowsEventV2 = (UpdateRowsEventV2) event;
- Long tableId = updateRowsEventV2.getTableId();
- List<Pair<Row>> list = updateRowsEventV2.getRows();
-
- for (Pair<Row> pair : list) {
- addRow("UPDATE", tableId, pair.getAfter());
- }
- }
-
- private void processDeleteEvent(BinlogEventV4 event) {
- DeleteRowsEvent deleteRowsEvent = (DeleteRowsEvent) event;
- Long tableId = deleteRowsEvent.getTableId();
- List<Row> list = deleteRowsEvent.getRows();
+ private void processUpdateEvent(Event event) {
+ UpdateRowsEventData data = event.getData();
+ Long tableId = data.getTableId();
+ List<Map.Entry<Serializable[], Serializable[]>> list = data.getRows();
- for (Row row : list) {
- addRow("DELETE", tableId, row);
+ for (Map.Entry<Serializable[], Serializable[]> entry : list) {
+ addRow("UPDATE", tableId, entry.getValue());
}
-
}
- private void processDeleteEventV2(BinlogEventV4 event) {
- DeleteRowsEventV2 deleteRowsEventV2 = (DeleteRowsEventV2) event;
- Long tableId = deleteRowsEventV2.getTableId();
- List<Row> list = deleteRowsEventV2.getRows();
+ private void processDeleteEvent(Event event) {
+ DeleteRowsEventData data = event.getData();
+ Long tableId = data.getTableId();
+ List<Serializable[]> list = data.getRows();
- for (Row row : list) {
+ for (Serializable[] row : list) {
addRow("DELETE", tableId, row);
}
@@ -253,20 +212,22 @@ public class EventProcessor {
private static Pattern createTablePattern =
Pattern.compile("^(CREATE|ALTER)\\s+TABLE", Pattern.CASE_INSENSITIVE);
- private void processQueryEvent(BinlogEventV4 event) {
- QueryEvent queryEvent = (QueryEvent) event;
- String sql = queryEvent.getSql().toString();
+ private void processQueryEvent(Event event) {
+ QueryEventData data = event.getData();
+ String sql = data.getSql();
if (createTablePattern.matcher(sql).find()) {
schema.reset();
}
}
- private void processXidEvent(BinlogEventV4 event) {
- XidEvent xidEvent = (XidEvent) event;
- String binlogFilename = xidEvent.getBinlogFilename();
- Long position = xidEvent.getHeader().getNextPosition();
- Long xid = xidEvent.getXid();
+ private void processXidEvent(Event event) {
+ EventHeaderV4 header = event.getHeader();
+ XidEventData data = event.getData();
+
+ String binlogFilename = binaryLogClient.getBinlogFilename();
+ Long position = header.getNextPosition();
+ Long xid = data.getXid();
BinlogPosition binlogPosition = new BinlogPosition(binlogFilename, position);
transaction.setNextBinlogPosition(binlogPosition);
@@ -274,14 +235,13 @@ public class EventProcessor {
replicator.commit(transaction, true);
- transaction = new Transaction(this);
-
+ transaction = new Transaction(config);
}
- private void addRow(String type, Long tableId, Row row) {
+ private void addRow(String type, Long tableId, Serializable[] row) {
if (transaction == null) {
- transaction = new Transaction(this);
+ transaction = new Transaction(config);
}
Table t = tableMap.get(tableId);
@@ -294,7 +254,7 @@ public class EventProcessor {
} else {
transaction.setNextBinlogPosition(replicator.getNextBinlogPosition());
replicator.commit(transaction, false);
- transaction = new Transaction(this);
+ transaction = new Transaction(config);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/343ab198/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/Transaction.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/Transaction.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/Transaction.java
index 9656a04..396815a 100644
--- a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/Transaction.java
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/Transaction.java
@@ -18,7 +18,7 @@
package org.apache.rocketmq.mysql.binlog;
import com.alibaba.fastjson.JSONObject;
-import com.google.code.or.common.glossary.Row;
+import java.io.Serializable;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -31,17 +31,15 @@ public class Transaction {
private BinlogPosition nextBinlogPosition;
private Long xid;
- private EventProcessor eventProcessor;
private Config config;
private List<DataRow> list = new LinkedList<>();
- public Transaction(EventProcessor eventProcessor) {
- this.eventProcessor = eventProcessor;
- this.config = eventProcessor.getConfig();
+ public Transaction(Config config) {
+ this.config = config;
}
- public boolean addRow(String type, Table table, Row row) {
+ public boolean addRow(String type, Table table, Serializable[] row) {
if (list.size() == config.maxTransactionRows) {
return false;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/343ab198/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java
index bf621b5..fd6555c 100644
--- a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java
@@ -31,8 +31,12 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.mysql.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class BinlogPositionManager {
+ private Logger logger = LoggerFactory.getLogger(BinlogPositionManager.class);
+
private DataSource dataSource;
private Config config;
@@ -67,7 +71,12 @@ public class BinlogPositionManager {
}
private void initPositionDefault() throws Exception {
- initPositionFromMqTail();
+
+ try {
+ initPositionFromMqTail();
+ } catch (Exception e) {
+ logger.error("Init position from mq error.", e);
+ }
if (binlogFilename == null || nextPosition == null) {
initPositionFromBinlogTail();
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/343ab198/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DateTimeColumnParser.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DateTimeColumnParser.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DateTimeColumnParser.java
index 97339d8..6b60abd 100644
--- a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DateTimeColumnParser.java
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DateTimeColumnParser.java
@@ -19,10 +19,19 @@ package org.apache.rocketmq.mysql.schema.column;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.TimeZone;
public class DateTimeColumnParser extends ColumnParser {
- private static SimpleDateFormat dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ private static SimpleDateFormat dateTimeFormat;
+ private static SimpleDateFormat dateTimeUtcFormat;
+
+ static {
+ dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ dateTimeUtcFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ dateTimeUtcFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+ }
@Override
public Object getValue(Object value) {
@@ -35,6 +44,10 @@ public class DateTimeColumnParser extends ColumnParser {
return dateTimeFormat.format(value);
}
+ if (value instanceof Long) {
+ return dateTimeUtcFormat.format(new Date((Long) value));
+ }
+
return value;
}
}