You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/04/13 13:04:23 UTC

[shardingsphere] branch master updated: Fix the error of incremental tasks caused by datetime containing zero value in MySQL (#25144)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new d11d1f0f0bd Fix the error of incremental tasks caused by datetime containing zero value in MySQL (#25144)
d11d1f0f0bd is described below

commit d11d1f0f0bd78bd3074544ed56df4bd61bc11bb7
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Thu Apr 13 21:04:13 2023 +0800

    Fix the error of incremental tasks caused by datetime containing zero value in MySQL (#25144)
    
    * Fix MySQLClient always print error log when decode binlog failed.
    
    * Fix MySQL datetime zero value not being processed correctly
    
    * Add incremental task
    
    * Fix the parameter may not be in effect at jdbcUrl
    
    * Fix the parameter may not be in effect at jdbcUrl
---
 .../time/MySQLDatetime2BinlogProtocolValue.java    |  3 +
 ...rdingSpherePipelineDataSourceConfiguration.java |  6 +-
 .../pipeline/mysql/ingest/client/MySQLClient.java  |  3 +-
 .../general/MySQLTimeTypesMigrationE2EIT.java      | 78 ++++++++++++++++++++++
 4 files changed, 88 insertions(+), 2 deletions(-)

diff --git a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLDatetime2BinlogProtocolValue.java b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLDatetime2BinlogProtocolValue.java
index 20a5d511579..2604ec3f642 100644
--- a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLDatetime2BinlogProtocolValue.java
+++ b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLDatetime2BinlogProtocolValue.java
@@ -48,6 +48,9 @@ public final class MySQLDatetime2BinlogProtocolValue implements MySQLBinlogProto
     
     private Serializable readDatetime(final MySQLBinlogColumnDef columnDef, final long datetime, final MySQLPacketPayload payload) {
         long datetimeWithoutSign = datetime & (0x8000000000L - 1);
+        if (0 == datetimeWithoutSign) {
+            return MySQLTimeValueUtils.DATETIME_OF_ZERO;
+        }
         long date = datetimeWithoutSign >> 17;
         long yearAndMonth = date >> 5;
         int year = (int) (yearAndMonth / 13);
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java
index 23f115a4629..3bbfe647169 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java
@@ -58,8 +58,12 @@ public final class ShardingSpherePipelineDataSourceConfiguration implements Pipe
     private final DatabaseType databaseType;
     
     public ShardingSpherePipelineDataSourceConfiguration(final String param) {
-        parameter = param;
         rootConfig = YamlEngine.unmarshal(param, YamlRootConfiguration.class, true);
+        // Need remove dataSourceProperties, because if the parameter at dataSourceProperties will override parameter at jdbcUrl
+        for (Map<String, Object> each : rootConfig.getDataSources().values()) {
+            each.remove("dataSourceProperties");
+        }
+        parameter = YamlEngine.marshal(rootConfig);
         Map<String, Object> props = rootConfig.getDataSources().values().iterator().next();
         databaseType = DatabaseTypeEngine.getDatabaseType(getJdbcUrl(props));
         appendJdbcQueryProperties(databaseType.getType());
diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
index 44a5a0ea659..6d82af059f7 100644
--- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
+++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
@@ -110,7 +110,6 @@ public final class MySQLClient {
                     }
                 }).connect(connectInfo.getHost(), connectInfo.getPort()).channel();
         serverInfo = waitExpectedResponse(ServerInfo.class);
-        reconnectTimes.set(0);
         running = true;
     }
     
@@ -305,6 +304,7 @@ public final class MySQLClient {
             if (msg instanceof AbstractBinlogEvent) {
                 lastBinlogEvent = (AbstractBinlogEvent) msg;
                 blockingEventQueue.put(lastBinlogEvent);
+                reconnectTimes.set(0);
             }
         }
         
@@ -332,6 +332,7 @@ public final class MySQLClient {
             }
             reconnectTimes.incrementAndGet();
             connect();
+            log.info("reconnect times {}", reconnectTimes.get());
             subscribe(lastBinlogEvent.getFileName(), lastBinlogEvent.getPosition());
         }
     }
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLTimeTypesMigrationE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLTimeTypesMigrationE2EIT.java
new file mode 100644
index 00000000000..f597af52cbc
--- /dev/null
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLTimeTypesMigrationE2EIT.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.general;
+
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
+import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.test.e2e.data.pipeline.cases.PipelineContainerComposer;
+import org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.AbstractMigrationE2EIT;
+import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ECondition;
+import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ESettings;
+import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ETestCaseArgumentsProvider;
+import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
+import org.junit.jupiter.api.condition.EnabledIf;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ArgumentsSource;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+/**
+ * E2E IT for time types of MySQL, includes.
+ * 1) datetime,timestamp,date default null
+ */
+@PipelineE2ESettings(fetchSingle = true, database = @PipelineE2ESettings.PipelineE2EDatabaseSettings(type = "MySQL", scenarioFiles = "env/common/none.xml"))
+public class MySQLTimeTypesMigrationE2EIT extends AbstractMigrationE2EIT {
+
+    @ParameterizedTest(name = "{0}")
+    @EnabledIf("isEnabled")
+    @ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
+    void assertIllegalTimeTypesValueMigrationSuccess(final PipelineTestParameter testParam) throws Exception {
+        try (PipelineContainerComposer containerComposer = new PipelineContainerComposer(testParam, new MigrationJobType())) {
+            String sql = "CREATE TABLE `time_e2e` ( `id` int NOT NULL, `t_timestamp` timestamp NULL DEFAULT NULL, `t_datetime` datetime DEFAULT NULL, `t_date` date DEFAULT NULL, "
+                    + "`t_year` year DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB;";
+            containerComposer.sourceExecuteWithLog(sql);
+            insertOneRecordWithZeroValue(containerComposer, 1);
+            addMigrationSourceResource(containerComposer);
+            addMigrationTargetResource(containerComposer);
+            startMigration(containerComposer, "time_e2e", "time_e2e");
+            String jobId = listJobId(containerComposer).get(0);
+            containerComposer.waitJobPrepareSuccess(String.format("SHOW MIGRATION STATUS '%s'", jobId));
+            insertOneRecordWithZeroValue(containerComposer, 2);
+            containerComposer.waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
+            assertCheckMigrationSuccess(containerComposer, jobId, "DATA_MATCH");
+        }
+    }
+
+    private void insertOneRecordWithZeroValue(final PipelineContainerComposer containerComposer, final int id) throws SQLException {
+        try (Connection connection = containerComposer.getSourceDataSource().getConnection()) {
+            PreparedStatement preparedStatement = connection.prepareStatement("INSERT INTO `time_e2e`(id, t_timestamp, t_datetime, t_date, t_year) VALUES (?, ?, ?, ?, ?)");
+            preparedStatement.setObject(1, id);
+            preparedStatement.setObject(2, "0000-00-00 00:00:00");
+            preparedStatement.setObject(3, "0000-00-00 00:00:00");
+            preparedStatement.setObject(4, "0000-00-00");
+            preparedStatement.setObject(5, "0000");
+            preparedStatement.execute();
+        }
+    }
+
+    private static boolean isEnabled() {
+        return PipelineE2ECondition.isEnabled(new MySQLDatabaseType());
+    }
+}