You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/03/15 03:15:49 UTC
[shardingsphere] branch master updated: Refactor pipeline parse datetime/time of MySQL binlog (#24598)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 90bf6660d66 Refactor pipeline parse datetime/time of MySQL binlog (#24598)
90bf6660d66 is described below
commit 90bf6660d661278b1bb928c092f17f9da4e8ec97
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Wed Mar 15 11:15:40 2023 +0800
Refactor pipeline parse datetime/time of MySQL binlog (#24598)
* Refactor pipeline parse datetime/time of MySQL binlog
* Fix unit test
* Rename
---
.../time/MySQLDatetime2BinlogProtocolValue.java | 21 +++++++++--------
.../time/MySQLDatetimeBinlogProtocolValue.java | 21 +++++++++++------
.../column/value/time/MySQLFractionalSeconds.java | 27 +++++++---------------
.../value/time/MySQLTime2BinlogProtocolValue.java | 6 ++++-
.../time/MySQLTimestamp2BinlogProtocolValue.java | 6 +++--
.../MySQLDatetime2BinlogProtocolValueTest.java | 15 ++++++++----
.../time/MySQLDatetimeBinlogProtocolValueTest.java | 6 ++++-
.../time/MySQLTime2BinlogProtocolValueTest.java | 14 ++++++-----
.../MySQLTimestamp2BinlogProtocolValueTest.java | 9 ++++----
.../test/resources/env/scenario/general/mysql.xml | 6 ++---
10 files changed, 74 insertions(+), 57 deletions(-)
diff --git a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLDatetime2BinlogProtocolValue.java b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLDatetime2BinlogProtocolValue.java
index 09f6402c491..aeeb9bac3e0 100644
--- a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLDatetime2BinlogProtocolValue.java
+++ b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLDatetime2BinlogProtocolValue.java
@@ -22,6 +22,8 @@ import org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.column.valu
import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
import java.io.Serializable;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
/**
* MySQL DATETIME2 binlog protocol value.
@@ -46,15 +48,16 @@ public final class MySQLDatetime2BinlogProtocolValue implements MySQLBinlogProto
private Serializable readDatetime(final MySQLBinlogColumnDef columnDef, final long datetime, final MySQLPacketPayload payload) {
long datetimeWithoutSign = datetime & (0x8000000000L - 1);
- return readDate(datetimeWithoutSign >> 17) + " " + readTime(datetimeWithoutSign % (1 << 17)) + new MySQLFractionalSeconds(columnDef.getColumnMeta(), payload);
- }
-
- private String readDate(final long date) {
+ long date = datetimeWithoutSign >> 17;
long yearAndMonth = date >> 5;
- return String.format("%d-%02d-%02d", yearAndMonth / 13, yearAndMonth % 13, date % (1 << 5));
- }
-
- private String readTime(final long time) {
- return String.format("%02d:%02d:%02d", time >> 12, (time >> 6) % (1 << 6), time % (1 << 6));
+ int year = (int) (yearAndMonth / 13);
+ int month = (int) (yearAndMonth % 13);
+ int day = (int) (date % (1 << 5));
+ long time = datetimeWithoutSign % (1 << 17);
+ int hour = (int) (time >> 12);
+ int minute = (int) ((time >> 6) % (1 << 6));
+ int second = (int) (time % (1 << 6));
+ MySQLFractionalSeconds fractionalSeconds = new MySQLFractionalSeconds(columnDef.getColumnMeta(), payload);
+ return Timestamp.valueOf(LocalDateTime.of(year, month, day, hour, minute, second, fractionalSeconds.getNanos()));
}
}
diff --git a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLDatetimeBinlogProtocolValue.java b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLDatetimeBinlogProtocolValue.java
index fd2d677b6bf..fe8a10f4096 100644
--- a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLDatetimeBinlogProtocolValue.java
+++ b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLDatetimeBinlogProtocolValue.java
@@ -22,6 +22,9 @@ import org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.column.valu
import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
import java.io.Serializable;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.util.Date;
/**
* MySQL DATETIME binlog protocol value.
@@ -33,14 +36,18 @@ public final class MySQLDatetimeBinlogProtocolValue implements MySQLBinlogProtoc
@Override
public Serializable read(final MySQLBinlogColumnDef columnDef, final MySQLPacketPayload payload) {
long datetime = payload.readInt8();
- return 0 == datetime ? MySQLTimeValueUtil.DATETIME_OF_ZERO : String.format("%s %s", readDate((int) (datetime / 1000000)), readTime((int) (datetime % 1000000)));
+ return 0 == datetime ? MySQLTimeValueUtil.DATETIME_OF_ZERO : readDateTime(datetime);
}
- private String readDate(final int date) {
- return String.format("%04d-%02d-%02d", date / 10000, (date % 10000) / 100, date % 100);
- }
-
- private String readTime(final int time) {
- return String.format("%02d:%02d:%02d", time / 10000, (time % 10000) / 100, time % 100);
+ private Date readDateTime(final long datetime) {
+ int date = (int) (datetime / 1000000);
+ int year = date / 10000;
+ int month = (date % 10000) / 100;
+ int day = date % 100;
+ int time = (int) (datetime % 1000000);
+ int hour = time / 10000;
+ int minute = (time % 10000) / 100;
+ int second = time % 100;
+ return Timestamp.valueOf(LocalDateTime.of(year, month, day, hour, minute, second));
}
}
diff --git a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLFractionalSeconds.java b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLFractionalSeconds.java
index 0525e567285..dd62ca40a4e 100644
--- a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLFractionalSeconds.java
+++ b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLFractionalSeconds.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.column.value.time;
+import lombok.Getter;
import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
/**
@@ -26,41 +27,29 @@ import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
*/
public final class MySQLFractionalSeconds {
- private final int fraction;
+ @Getter
+ private final int nanos;
private final int fractionalSecondsPrecision;
public MySQLFractionalSeconds(final int columnMeta, final MySQLPacketPayload payload) {
fractionalSecondsPrecision = columnMeta;
- fraction = readFraction(payload);
+ nanos = convertFractionalSecondsToNanos(payload);
}
- private int readFraction(final MySQLPacketPayload payload) {
+ private int convertFractionalSecondsToNanos(final MySQLPacketPayload payload) {
switch (fractionalSecondsPrecision) {
case 1:
case 2:
- return payload.readInt1() * 10000;
+ return payload.readInt1() * 10000 * 1000;
case 3:
case 4:
- return payload.getByteBuf().readUnsignedShort() * 100;
+ return payload.getByteBuf().readUnsignedShort() * 100 * 1000;
case 5:
case 6:
- return payload.getByteBuf().readUnsignedMedium();
+ return payload.getByteBuf().readUnsignedMedium() * 1000;
default:
return 0;
}
}
-
- @Override
- public String toString() {
- if (0 == fractionalSecondsPrecision) {
- return "";
- }
- StringBuilder result = new StringBuilder(Integer.toString(fraction));
- for (int i = result.length(); i < fractionalSecondsPrecision; i++) {
- result.append("0");
- }
- result.setLength(fractionalSecondsPrecision);
- return "." + result;
- }
}
diff --git a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLTime2BinlogProtocolValue.java b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLTime2BinlogProtocolValue.java
index 13fabd7d093..8fa6d0bb66c 100644
--- a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLTime2BinlogProtocolValue.java
+++ b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLTime2BinlogProtocolValue.java
@@ -22,6 +22,7 @@ import org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.column.valu
import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
import java.io.Serializable;
+import java.time.LocalTime;
/**
* TIME2 type value of MySQL binlog protocol.
@@ -41,6 +42,9 @@ public final class MySQLTime2BinlogProtocolValue implements MySQLBinlogProtocolV
return MySQLTimeValueUtil.ZERO_OF_TIME;
}
MySQLFractionalSeconds fractionalSeconds = new MySQLFractionalSeconds(columnDef.getColumnMeta(), payload);
- return String.format("%02d:%02d:%02d%s", (time >> 12) % (1 << 10), (time >> 6) % (1 << 6), time % (1 << 6), fractionalSeconds);
+ int hour = (time >> 12) % (1 << 10);
+ int minute = (time >> 6) % (1 << 6);
+ int second = time % (1 << 6);
+ return LocalTime.of(hour, minute, second).withNano(fractionalSeconds.getNanos());
}
}
diff --git a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLTimestamp2BinlogProtocolValue.java b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLTimestamp2BinlogProtocolValue.java
index 610e484200a..ac6a8a8da04 100644
--- a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLTimestamp2BinlogProtocolValue.java
+++ b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLTimestamp2BinlogProtocolValue.java
@@ -37,7 +37,9 @@ public final class MySQLTimestamp2BinlogProtocolValue implements MySQLBinlogProt
if (0 == seconds) {
return MySQLTimeValueUtil.DATETIME_OF_ZERO;
}
- String result = MySQLTimeValueUtil.getSimpleDateFormat().format(new Timestamp(seconds * 1000L));
- return columnDef.getColumnMeta() > 0 ? result + new MySQLFractionalSeconds(columnDef.getColumnMeta(), payload) : result;
+ int nanos = columnDef.getColumnMeta() > 0 ? new MySQLFractionalSeconds(columnDef.getColumnMeta(), payload).getNanos() : 0;
+ Timestamp result = new Timestamp(seconds * 1000L);
+ result.setNanos(nanos);
+ return result;
}
}
diff --git a/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLDatetime2BinlogProtocolValueTest.java b/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLDatetime2BinlogProtocolValueTest.java
index f1176f787b4..5b9839927c8 100644
--- a/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLDatetime2BinlogProtocolValueTest.java
+++ b/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLDatetime2BinlogProtocolValueTest.java
@@ -27,6 +27,9 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.when;
@@ -50,14 +53,16 @@ public final class MySQLDatetime2BinlogProtocolValueTest {
@Test
public void assertReadWithoutFraction() {
when(payload.readInt1()).thenReturn(0xfe, 0xf3, 0xff, 0x7e, 0xfb);
- assertThat(new MySQLDatetime2BinlogProtocolValue().read(columnDef, payload), is("9999-12-31 23:59:59"));
+ LocalDateTime expected = LocalDateTime.of(9999, 12, 31, 23, 59, 59);
+ assertThat(new MySQLDatetime2BinlogProtocolValue().read(columnDef, payload), is(Timestamp.valueOf(expected)));
}
@Test
public void assertReadWithoutFraction1() {
columnDef.setColumnMeta(1);
when(payload.readInt1()).thenReturn(0xfe, 0xf3, 0xff, 0x7e, 0xfb, 0x00);
- assertThat(new MySQLDatetime2BinlogProtocolValue().read(columnDef, payload), is("9999-12-31 23:59:59.0"));
+ LocalDateTime expected = LocalDateTime.of(9999, 12, 31, 23, 59, 59, 0);
+ assertThat(new MySQLDatetime2BinlogProtocolValue().read(columnDef, payload), is(Timestamp.valueOf(expected)));
}
@Test
@@ -66,7 +71,8 @@ public final class MySQLDatetime2BinlogProtocolValueTest {
when(payload.readInt1()).thenReturn(0xfe, 0xf3, 0xff, 0x7e, 0xfb);
when(payload.getByteBuf()).thenReturn(byteBuf);
when(byteBuf.readUnsignedShort()).thenReturn(9990);
- assertThat(new MySQLDatetime2BinlogProtocolValue().read(columnDef, payload), is("9999-12-31 23:59:59.999"));
+ LocalDateTime expected = LocalDateTime.of(9999, 12, 31, 23, 59, 59, 999 * 1000 * 1000);
+ assertThat(new MySQLDatetime2BinlogProtocolValue().read(columnDef, payload), is(Timestamp.valueOf(expected)));
}
@Test
@@ -75,7 +81,8 @@ public final class MySQLDatetime2BinlogProtocolValueTest {
when(payload.readInt1()).thenReturn(0xfe, 0xf3, 0xff, 0x7e, 0xfb);
when(payload.getByteBuf()).thenReturn(byteBuf);
when(byteBuf.readUnsignedMedium()).thenReturn(999990);
- assertThat(new MySQLDatetime2BinlogProtocolValue().read(columnDef, payload), is("9999-12-31 23:59:59.99999"));
+ LocalDateTime expected = LocalDateTime.of(9999, 12, 31, 23, 59, 59, 999990000);
+ assertThat(new MySQLDatetime2BinlogProtocolValue().read(columnDef, payload), is(Timestamp.valueOf(expected)));
}
@Test
diff --git a/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLDatetimeBinlogProtocolValueTest.java b/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLDatetimeBinlogProtocolValueTest.java
index b6f65333ac7..f3189fdc5e2 100644
--- a/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLDatetimeBinlogProtocolValueTest.java
+++ b/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLDatetimeBinlogProtocolValueTest.java
@@ -24,6 +24,9 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.when;
@@ -40,7 +43,8 @@ public final class MySQLDatetimeBinlogProtocolValueTest {
@Test
public void assertRead() {
when(payload.readInt8()).thenReturn(99991231235959L);
- assertThat(new MySQLDatetimeBinlogProtocolValue().read(columnDef, payload), is("9999-12-31 23:59:59"));
+ LocalDateTime expected = LocalDateTime.of(9999, 12, 31, 23, 59, 59);
+ assertThat(new MySQLDatetimeBinlogProtocolValue().read(columnDef, payload), is(Timestamp.valueOf(expected)));
}
@Test
diff --git a/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLTime2BinlogProtocolValueTest.java b/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLTime2BinlogProtocolValueTest.java
index cdef42deb56..4ffd8217727 100644
--- a/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLTime2BinlogProtocolValueTest.java
+++ b/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLTime2BinlogProtocolValueTest.java
@@ -27,6 +27,8 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
+import java.time.LocalTime;
+
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.when;
@@ -52,7 +54,7 @@ public final class MySQLTime2BinlogProtocolValueTest {
public void assertRead() {
when(payload.getByteBuf()).thenReturn(byteBuf);
when(byteBuf.readUnsignedMedium()).thenReturn(0x800000 | (0x10 << 12) | (0x08 << 6) | 0x04);
- assertThat(new MySQLTime2BinlogProtocolValue().read(columnDef, payload), is("16:08:04"));
+ assertThat(new MySQLTime2BinlogProtocolValue().read(columnDef, payload), is(LocalTime.of(16, 8, 4)));
}
@Test
@@ -61,24 +63,24 @@ public final class MySQLTime2BinlogProtocolValueTest {
when(payload.getByteBuf()).thenReturn(byteBuf);
when(payload.readInt1()).thenReturn(90);
when(byteBuf.readUnsignedMedium()).thenReturn(0x800000 | (0x10 << 12) | (0x08 << 6) | 0x04);
- assertThat(new MySQLTime2BinlogProtocolValue().read(columnDef, payload), is("16:08:04.9"));
+ assertThat(new MySQLTime2BinlogProtocolValue().read(columnDef, payload), is(LocalTime.of(16, 8, 4).withNano(900000000)));
}
@Test
public void assertReadWithFraction3() {
columnDef.setColumnMeta(3);
when(payload.getByteBuf()).thenReturn(byteBuf);
- when(byteBuf.readUnsignedShort()).thenReturn(90);
+ when(byteBuf.readUnsignedShort()).thenReturn(9000);
when(byteBuf.readUnsignedMedium()).thenReturn(0x800000 | (0x10 << 12) | (0x08 << 6) | 0x04);
- assertThat(new MySQLTime2BinlogProtocolValue().read(columnDef, payload), is("16:08:04.900"));
+ assertThat(new MySQLTime2BinlogProtocolValue().read(columnDef, payload), is(LocalTime.of(16, 8, 4).withNano(900000000)));
}
@Test
public void assertReadWithFraction6() {
columnDef.setColumnMeta(6);
when(payload.getByteBuf()).thenReturn(byteBuf);
- when(byteBuf.readUnsignedMedium()).thenReturn(0x800000 | (0x10 << 12) | (0x08 << 6) | 0x04, 90);
- assertThat(new MySQLTime2BinlogProtocolValue().read(columnDef, payload), is("16:08:04.900000"));
+ when(byteBuf.readUnsignedMedium()).thenReturn(0x800000 | (0x10 << 12) | (0x08 << 6) | 0x04, 10123);
+ assertThat(new MySQLTime2BinlogProtocolValue().read(columnDef, payload), is(LocalTime.of(16, 8, 4).withNano(10123000)));
}
@Test
diff --git a/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLTimestamp2BinlogProtocolValueTest.java b/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLTimestamp2BinlogProtocolValueTest.java
index 45bc36a1d49..2c25aaafbea 100644
--- a/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLTimestamp2BinlogProtocolValueTest.java
+++ b/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLTimestamp2BinlogProtocolValueTest.java
@@ -54,19 +54,18 @@ public final class MySQLTimestamp2BinlogProtocolValueTest {
public void assertReadWithoutFraction() {
int currentSeconds = Long.valueOf(System.currentTimeMillis() / 1000).intValue();
when(byteBuf.readInt()).thenReturn(currentSeconds);
- assertThat(new MySQLTimestamp2BinlogProtocolValue().read(columnDef, payload), is(MySQLTimeValueUtil.getSimpleDateFormat().format(new Timestamp(currentSeconds * 1000L))));
+ assertThat(new MySQLTimestamp2BinlogProtocolValue().read(columnDef, payload), is(new Timestamp(currentSeconds * 1000L)));
}
@Test
public void assertReadWithFraction() {
columnDef.setColumnMeta(1);
- long currentTimeMillis = System.currentTimeMillis();
+ long currentTimeMillis = 1678795614082L;
int currentSeconds = Long.valueOf(currentTimeMillis / 1000).intValue();
- int currentMilliseconds = Long.valueOf(currentTimeMillis % 10).intValue();
+ int currentMilliseconds = Long.valueOf(currentTimeMillis % 100).intValue();
when(payload.readInt1()).thenReturn(currentMilliseconds);
when(byteBuf.readInt()).thenReturn(currentSeconds);
- assertThat(new MySQLTimestamp2BinlogProtocolValue().read(columnDef, payload),
- is(MySQLTimeValueUtil.getSimpleDateFormat().format(new Timestamp(currentSeconds * 1000L)) + "." + currentMilliseconds));
+ assertThat("currentTimeMillis:" + currentTimeMillis, new MySQLTimestamp2BinlogProtocolValue().read(columnDef, payload), is(new Timestamp(currentSeconds * 1000L + currentMilliseconds * 10L)));
}
@Test
diff --git a/test/e2e/pipeline/src/test/resources/env/scenario/general/mysql.xml b/test/e2e/pipeline/src/test/resources/env/scenario/general/mysql.xml
index c6d29086606..91a8063ad37 100644
--- a/test/e2e/pipeline/src/test/resources/env/scenario/general/mysql.xml
+++ b/test/e2e/pipeline/src/test/resources/env/scenario/general/mysql.xml
@@ -30,10 +30,10 @@
`t_float` float NULL,
`t_double` double NULL,
`t_decimal` decimal ( 10, 2 ) NULL,
- `t_timestamp` timestamp NULL,
- `t_datetime` datetime NULL,
+ `t_timestamp` timestamp(3) NULL,
+ `t_datetime` datetime(6) NULL,
`t_date` date NULL,
- `t_time` time NULL,
+ `t_time` time(1) NULL,
`t_year` year NULL,
`t_bit` bit(32) NULL,
`t_binary` binary(128) NULL,