You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/03/15 03:15:49 UTC

[shardingsphere] branch master updated: Refactor pipeline parse datetime/time of MySQL binlog (#24598)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 90bf6660d66 Refactor pipeline parse datetime/time of MySQL binlog (#24598)
90bf6660d66 is described below

commit 90bf6660d661278b1bb928c092f17f9da4e8ec97
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Wed Mar 15 11:15:40 2023 +0800

    Refactor pipeline parse datetime/time of MySQL binlog (#24598)
    
    * Refactor pipeline parse datetime/time of MySQL binlog
    
    * Fix unit test
    
    * Rename
---
 .../time/MySQLDatetime2BinlogProtocolValue.java    | 21 +++++++++--------
 .../time/MySQLDatetimeBinlogProtocolValue.java     | 21 +++++++++++------
 .../column/value/time/MySQLFractionalSeconds.java  | 27 +++++++---------------
 .../value/time/MySQLTime2BinlogProtocolValue.java  |  6 ++++-
 .../time/MySQLTimestamp2BinlogProtocolValue.java   |  6 +++--
 .../MySQLDatetime2BinlogProtocolValueTest.java     | 15 ++++++++----
 .../time/MySQLDatetimeBinlogProtocolValueTest.java |  6 ++++-
 .../time/MySQLTime2BinlogProtocolValueTest.java    | 14 ++++++-----
 .../MySQLTimestamp2BinlogProtocolValueTest.java    |  9 ++++----
 .../test/resources/env/scenario/general/mysql.xml  |  6 ++---
 10 files changed, 74 insertions(+), 57 deletions(-)

diff --git a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLDatetime2BinlogProtocolValue.java b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLDatetime2BinlogProtocolValue.java
index 09f6402c491..aeeb9bac3e0 100644
--- a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLDatetime2BinlogProtocolValue.java
+++ b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLDatetime2BinlogProtocolValue.java
@@ -22,6 +22,8 @@ import org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.column.valu
 import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
 
 import java.io.Serializable;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
 
 /**
  * MySQL DATETIME2 binlog protocol value.
@@ -46,15 +48,16 @@ public final class MySQLDatetime2BinlogProtocolValue implements MySQLBinlogProto
     
     private Serializable readDatetime(final MySQLBinlogColumnDef columnDef, final long datetime, final MySQLPacketPayload payload) {
         long datetimeWithoutSign = datetime & (0x8000000000L - 1);
-        return readDate(datetimeWithoutSign >> 17) + " " + readTime(datetimeWithoutSign % (1 << 17)) + new MySQLFractionalSeconds(columnDef.getColumnMeta(), payload);
-    }
-    
-    private String readDate(final long date) {
+        long date = datetimeWithoutSign >> 17;
         long yearAndMonth = date >> 5;
-        return String.format("%d-%02d-%02d", yearAndMonth / 13, yearAndMonth % 13, date % (1 << 5));
-    }
-    
-    private String readTime(final long time) {
-        return String.format("%02d:%02d:%02d", time >> 12, (time >> 6) % (1 << 6), time % (1 << 6));
+        int year = (int) (yearAndMonth / 13);
+        int month = (int) (yearAndMonth % 13);
+        int day = (int) (date % (1 << 5));
+        long time = datetimeWithoutSign % (1 << 17);
+        int hour = (int) (time >> 12);
+        int minute = (int) ((time >> 6) % (1 << 6));
+        int second = (int) (time % (1 << 6));
+        MySQLFractionalSeconds fractionalSeconds = new MySQLFractionalSeconds(columnDef.getColumnMeta(), payload);
+        return Timestamp.valueOf(LocalDateTime.of(year, month, day, hour, minute, second, fractionalSeconds.getNanos()));
     }
 }
diff --git a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLDatetimeBinlogProtocolValue.java b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLDatetimeBinlogProtocolValue.java
index fd2d677b6bf..fe8a10f4096 100644
--- a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLDatetimeBinlogProtocolValue.java
+++ b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLDatetimeBinlogProtocolValue.java
@@ -22,6 +22,9 @@ import org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.column.valu
 import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
 
 import java.io.Serializable;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.util.Date;
 
 /**
  * MySQL DATETIME binlog protocol value.
@@ -33,14 +36,18 @@ public final class MySQLDatetimeBinlogProtocolValue implements MySQLBinlogProtoc
     @Override
     public Serializable read(final MySQLBinlogColumnDef columnDef, final MySQLPacketPayload payload) {
         long datetime = payload.readInt8();
-        return 0 == datetime ? MySQLTimeValueUtil.DATETIME_OF_ZERO : String.format("%s %s", readDate((int) (datetime / 1000000)), readTime((int) (datetime % 1000000)));
+        return 0 == datetime ? MySQLTimeValueUtil.DATETIME_OF_ZERO : readDateTime(datetime);
     }
     
-    private String readDate(final int date) {
-        return String.format("%04d-%02d-%02d", date / 10000, (date % 10000) / 100, date % 100);
-    }
-    
-    private String readTime(final int time) {
-        return String.format("%02d:%02d:%02d", time / 10000, (time % 10000) / 100, time % 100);
+    private Date readDateTime(final long datetime) {
+        int date = (int) (datetime / 1000000);
+        int year = date / 10000;
+        int month = (date % 10000) / 100;
+        int day = date % 100;
+        int time = (int) (datetime % 1000000);
+        int hour = time / 10000;
+        int minute = (time % 10000) / 100;
+        int second = time % 100;
+        return Timestamp.valueOf(LocalDateTime.of(year, month, day, hour, minute, second));
     }
 }
diff --git a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLFractionalSeconds.java b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLFractionalSeconds.java
index 0525e567285..dd62ca40a4e 100644
--- a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLFractionalSeconds.java
+++ b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLFractionalSeconds.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.column.value.time;
 
+import lombok.Getter;
 import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
 
 /**
@@ -26,41 +27,29 @@ import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
  */
 public final class MySQLFractionalSeconds {
     
-    private final int fraction;
+    @Getter
+    private final int nanos;
     
     private final int fractionalSecondsPrecision;
     
     public MySQLFractionalSeconds(final int columnMeta, final MySQLPacketPayload payload) {
         fractionalSecondsPrecision = columnMeta;
-        fraction = readFraction(payload);
+        nanos = convertFractionalSecondsToNanos(payload);
     }
     
-    private int readFraction(final MySQLPacketPayload payload) {
+    private int convertFractionalSecondsToNanos(final MySQLPacketPayload payload) {
         switch (fractionalSecondsPrecision) {
             case 1:
             case 2:
-                return payload.readInt1() * 10000;
+                return payload.readInt1() * 10000 * 1000;
             case 3:
             case 4:
-                return payload.getByteBuf().readUnsignedShort() * 100;
+                return payload.getByteBuf().readUnsignedShort() * 100 * 1000;
             case 5:
             case 6:
-                return payload.getByteBuf().readUnsignedMedium();
+                return payload.getByteBuf().readUnsignedMedium() * 1000;
             default:
                 return 0;
         }
     }
-    
-    @Override
-    public String toString() {
-        if (0 == fractionalSecondsPrecision) {
-            return "";
-        }
-        StringBuilder result = new StringBuilder(Integer.toString(fraction));
-        for (int i = result.length(); i < fractionalSecondsPrecision; i++) {
-            result.append("0");
-        }
-        result.setLength(fractionalSecondsPrecision);
-        return "." + result;
-    }
 }
diff --git a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLTime2BinlogProtocolValue.java b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLTime2BinlogProtocolValue.java
index 13fabd7d093..8fa6d0bb66c 100644
--- a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLTime2BinlogProtocolValue.java
+++ b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLTime2BinlogProtocolValue.java
@@ -22,6 +22,7 @@ import org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.column.valu
 import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
 
 import java.io.Serializable;
+import java.time.LocalTime;
 
 /**
  * TIME2 type value of MySQL binlog protocol.
@@ -41,6 +42,9 @@ public final class MySQLTime2BinlogProtocolValue implements MySQLBinlogProtocolV
             return MySQLTimeValueUtil.ZERO_OF_TIME;
         }
         MySQLFractionalSeconds fractionalSeconds = new MySQLFractionalSeconds(columnDef.getColumnMeta(), payload);
-        return String.format("%02d:%02d:%02d%s", (time >> 12) % (1 << 10), (time >> 6) % (1 << 6), time % (1 << 6), fractionalSeconds);
+        int hour = (time >> 12) % (1 << 10);
+        int minute = (time >> 6) % (1 << 6);
+        int second = time % (1 << 6);
+        return LocalTime.of(hour, minute, second).withNano(fractionalSeconds.getNanos());
     }
 }
diff --git a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLTimestamp2BinlogProtocolValue.java b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLTimestamp2BinlogProtocolValue.java
index 610e484200a..ac6a8a8da04 100644
--- a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLTimestamp2BinlogProtocolValue.java
+++ b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLTimestamp2BinlogProtocolValue.java
@@ -37,7 +37,9 @@ public final class MySQLTimestamp2BinlogProtocolValue implements MySQLBinlogProt
         if (0 == seconds) {
             return MySQLTimeValueUtil.DATETIME_OF_ZERO;
         }
-        String result = MySQLTimeValueUtil.getSimpleDateFormat().format(new Timestamp(seconds * 1000L));
-        return columnDef.getColumnMeta() > 0 ? result + new MySQLFractionalSeconds(columnDef.getColumnMeta(), payload) : result;
+        int nanos = columnDef.getColumnMeta() > 0 ? new MySQLFractionalSeconds(columnDef.getColumnMeta(), payload).getNanos() : 0;
+        Timestamp result = new Timestamp(seconds * 1000L);
+        result.setNanos(nanos);
+        return result;
     }
 }
diff --git a/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLDatetime2BinlogProtocolValueTest.java b/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLDatetime2BinlogProtocolValueTest.java
index f1176f787b4..5b9839927c8 100644
--- a/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLDatetime2BinlogProtocolValueTest.java
+++ b/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLDatetime2BinlogProtocolValueTest.java
@@ -27,6 +27,9 @@ import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.mockito.Mockito.when;
@@ -50,14 +53,16 @@ public final class MySQLDatetime2BinlogProtocolValueTest {
     @Test
     public void assertReadWithoutFraction() {
         when(payload.readInt1()).thenReturn(0xfe, 0xf3, 0xff, 0x7e, 0xfb);
-        assertThat(new MySQLDatetime2BinlogProtocolValue().read(columnDef, payload), is("9999-12-31 23:59:59"));
+        LocalDateTime expected = LocalDateTime.of(9999, 12, 31, 23, 59, 59);
+        assertThat(new MySQLDatetime2BinlogProtocolValue().read(columnDef, payload), is(Timestamp.valueOf(expected)));
     }
     
     @Test
     public void assertReadWithoutFraction1() {
         columnDef.setColumnMeta(1);
         when(payload.readInt1()).thenReturn(0xfe, 0xf3, 0xff, 0x7e, 0xfb, 0x00);
-        assertThat(new MySQLDatetime2BinlogProtocolValue().read(columnDef, payload), is("9999-12-31 23:59:59.0"));
+        LocalDateTime expected = LocalDateTime.of(9999, 12, 31, 23, 59, 59, 0);
+        assertThat(new MySQLDatetime2BinlogProtocolValue().read(columnDef, payload), is(Timestamp.valueOf(expected)));
     }
     
     @Test
@@ -66,7 +71,8 @@ public final class MySQLDatetime2BinlogProtocolValueTest {
         when(payload.readInt1()).thenReturn(0xfe, 0xf3, 0xff, 0x7e, 0xfb);
         when(payload.getByteBuf()).thenReturn(byteBuf);
         when(byteBuf.readUnsignedShort()).thenReturn(9990);
-        assertThat(new MySQLDatetime2BinlogProtocolValue().read(columnDef, payload), is("9999-12-31 23:59:59.999"));
+        LocalDateTime expected = LocalDateTime.of(9999, 12, 31, 23, 59, 59, 999 * 1000 * 1000);
+        assertThat(new MySQLDatetime2BinlogProtocolValue().read(columnDef, payload), is(Timestamp.valueOf(expected)));
     }
     
     @Test
@@ -75,7 +81,8 @@ public final class MySQLDatetime2BinlogProtocolValueTest {
         when(payload.readInt1()).thenReturn(0xfe, 0xf3, 0xff, 0x7e, 0xfb);
         when(payload.getByteBuf()).thenReturn(byteBuf);
         when(byteBuf.readUnsignedMedium()).thenReturn(999990);
-        assertThat(new MySQLDatetime2BinlogProtocolValue().read(columnDef, payload), is("9999-12-31 23:59:59.99999"));
+        LocalDateTime expected = LocalDateTime.of(9999, 12, 31, 23, 59, 59, 999990000);
+        assertThat(new MySQLDatetime2BinlogProtocolValue().read(columnDef, payload), is(Timestamp.valueOf(expected)));
     }
     
     @Test
diff --git a/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLDatetimeBinlogProtocolValueTest.java b/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLDatetimeBinlogProtocolValueTest.java
index b6f65333ac7..f3189fdc5e2 100644
--- a/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLDatetimeBinlogProtocolValueTest.java
+++ b/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLDatetimeBinlogProtocolValueTest.java
@@ -24,6 +24,9 @@ import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.mockito.Mockito.when;
@@ -40,7 +43,8 @@ public final class MySQLDatetimeBinlogProtocolValueTest {
     @Test
     public void assertRead() {
         when(payload.readInt8()).thenReturn(99991231235959L);
-        assertThat(new MySQLDatetimeBinlogProtocolValue().read(columnDef, payload), is("9999-12-31 23:59:59"));
+        LocalDateTime expected = LocalDateTime.of(9999, 12, 31, 23, 59, 59);
+        assertThat(new MySQLDatetimeBinlogProtocolValue().read(columnDef, payload), is(Timestamp.valueOf(expected)));
     }
     
     @Test
diff --git a/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLTime2BinlogProtocolValueTest.java b/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLTime2BinlogProtocolValueTest.java
index cdef42deb56..4ffd8217727 100644
--- a/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLTime2BinlogProtocolValueTest.java
+++ b/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLTime2BinlogProtocolValueTest.java
@@ -27,6 +27,8 @@ import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 
+import java.time.LocalTime;
+
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.mockito.Mockito.when;
@@ -52,7 +54,7 @@ public final class MySQLTime2BinlogProtocolValueTest {
     public void assertRead() {
         when(payload.getByteBuf()).thenReturn(byteBuf);
         when(byteBuf.readUnsignedMedium()).thenReturn(0x800000 | (0x10 << 12) | (0x08 << 6) | 0x04);
-        assertThat(new MySQLTime2BinlogProtocolValue().read(columnDef, payload), is("16:08:04"));
+        assertThat(new MySQLTime2BinlogProtocolValue().read(columnDef, payload), is(LocalTime.of(16, 8, 4)));
     }
     
     @Test
@@ -61,24 +63,24 @@ public final class MySQLTime2BinlogProtocolValueTest {
         when(payload.getByteBuf()).thenReturn(byteBuf);
         when(payload.readInt1()).thenReturn(90);
         when(byteBuf.readUnsignedMedium()).thenReturn(0x800000 | (0x10 << 12) | (0x08 << 6) | 0x04);
-        assertThat(new MySQLTime2BinlogProtocolValue().read(columnDef, payload), is("16:08:04.9"));
+        assertThat(new MySQLTime2BinlogProtocolValue().read(columnDef, payload), is(LocalTime.of(16, 8, 4).withNano(900000000)));
     }
     
     @Test
     public void assertReadWithFraction3() {
         columnDef.setColumnMeta(3);
         when(payload.getByteBuf()).thenReturn(byteBuf);
-        when(byteBuf.readUnsignedShort()).thenReturn(90);
+        when(byteBuf.readUnsignedShort()).thenReturn(9000);
         when(byteBuf.readUnsignedMedium()).thenReturn(0x800000 | (0x10 << 12) | (0x08 << 6) | 0x04);
-        assertThat(new MySQLTime2BinlogProtocolValue().read(columnDef, payload), is("16:08:04.900"));
+        assertThat(new MySQLTime2BinlogProtocolValue().read(columnDef, payload), is(LocalTime.of(16, 8, 4).withNano(900000000)));
     }
     
     @Test
     public void assertReadWithFraction6() {
         columnDef.setColumnMeta(6);
         when(payload.getByteBuf()).thenReturn(byteBuf);
-        when(byteBuf.readUnsignedMedium()).thenReturn(0x800000 | (0x10 << 12) | (0x08 << 6) | 0x04, 90);
-        assertThat(new MySQLTime2BinlogProtocolValue().read(columnDef, payload), is("16:08:04.900000"));
+        when(byteBuf.readUnsignedMedium()).thenReturn(0x800000 | (0x10 << 12) | (0x08 << 6) | 0x04, 10123);
+        assertThat(new MySQLTime2BinlogProtocolValue().read(columnDef, payload), is(LocalTime.of(16, 8, 4).withNano(10123000)));
     }
     
     @Test
diff --git a/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLTimestamp2BinlogProtocolValueTest.java b/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLTimestamp2BinlogProtocolValueTest.java
index 45bc36a1d49..2c25aaafbea 100644
--- a/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLTimestamp2BinlogProtocolValueTest.java
+++ b/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLTimestamp2BinlogProtocolValueTest.java
@@ -54,19 +54,18 @@ public final class MySQLTimestamp2BinlogProtocolValueTest {
     public void assertReadWithoutFraction() {
         int currentSeconds = Long.valueOf(System.currentTimeMillis() / 1000).intValue();
         when(byteBuf.readInt()).thenReturn(currentSeconds);
-        assertThat(new MySQLTimestamp2BinlogProtocolValue().read(columnDef, payload), is(MySQLTimeValueUtil.getSimpleDateFormat().format(new Timestamp(currentSeconds * 1000L))));
+        assertThat(new MySQLTimestamp2BinlogProtocolValue().read(columnDef, payload), is(new Timestamp(currentSeconds * 1000L)));
     }
     
     @Test
     public void assertReadWithFraction() {
         columnDef.setColumnMeta(1);
-        long currentTimeMillis = System.currentTimeMillis();
+        long currentTimeMillis = 1678795614082L;
         int currentSeconds = Long.valueOf(currentTimeMillis / 1000).intValue();
-        int currentMilliseconds = Long.valueOf(currentTimeMillis % 10).intValue();
+        int currentMilliseconds = Long.valueOf(currentTimeMillis % 100).intValue();
         when(payload.readInt1()).thenReturn(currentMilliseconds);
         when(byteBuf.readInt()).thenReturn(currentSeconds);
-        assertThat(new MySQLTimestamp2BinlogProtocolValue().read(columnDef, payload),
-                is(MySQLTimeValueUtil.getSimpleDateFormat().format(new Timestamp(currentSeconds * 1000L)) + "." + currentMilliseconds));
+        assertThat("currentTimeMillis:" + currentTimeMillis, new MySQLTimestamp2BinlogProtocolValue().read(columnDef, payload), is(new Timestamp(currentSeconds * 1000L + currentMilliseconds * 10L)));
     }
     
     @Test
diff --git a/test/e2e/pipeline/src/test/resources/env/scenario/general/mysql.xml b/test/e2e/pipeline/src/test/resources/env/scenario/general/mysql.xml
index c6d29086606..91a8063ad37 100644
--- a/test/e2e/pipeline/src/test/resources/env/scenario/general/mysql.xml
+++ b/test/e2e/pipeline/src/test/resources/env/scenario/general/mysql.xml
@@ -30,10 +30,10 @@
         `t_float` float NULL,
         `t_double` double NULL,
         `t_decimal` decimal ( 10, 2 ) NULL,
-        `t_timestamp` timestamp NULL,
-        `t_datetime` datetime NULL,
+        `t_timestamp` timestamp(3) NULL,
+        `t_datetime` datetime(6) NULL,
         `t_date` date NULL,
-        `t_time` time NULL,
+        `t_time` time(1) NULL,
         `t_year` year NULL,
         `t_bit` bit(32) NULL,
         `t_binary` binary(128) NULL,