You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/04/13 13:04:23 UTC
[shardingsphere] branch master updated: Fix the error of incremental tasks caused by datetime containing zero value in MySQL (#25144)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new d11d1f0f0bd Fix the error of incremental tasks caused by datetime containing zero value in MySQL (#25144)
d11d1f0f0bd is described below
commit d11d1f0f0bd78bd3074544ed56df4bd61bc11bb7
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Thu Apr 13 21:04:13 2023 +0800
Fix the error of incremental tasks caused by datetime containing zero value in MySQL (#25144)
* Fix MySQLClient always print error log when decode binlog failed.
* Fix MySQL datetime zero value not being processed correctly
* Add incremental task
* Fix the parameter may not be in effect at jdbcUrl
* Fix the parameter may not be in effect at jdbcUrl
---
.../time/MySQLDatetime2BinlogProtocolValue.java | 3 +
...rdingSpherePipelineDataSourceConfiguration.java | 6 +-
.../pipeline/mysql/ingest/client/MySQLClient.java | 3 +-
.../general/MySQLTimeTypesMigrationE2EIT.java | 78 ++++++++++++++++++++++
4 files changed, 88 insertions(+), 2 deletions(-)
diff --git a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLDatetime2BinlogProtocolValue.java b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLDatetime2BinlogProtocolValue.java
index 20a5d511579..2604ec3f642 100644
--- a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLDatetime2BinlogProtocolValue.java
+++ b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/time/MySQLDatetime2BinlogProtocolValue.java
@@ -48,6 +48,9 @@ public final class MySQLDatetime2BinlogProtocolValue implements MySQLBinlogProto
private Serializable readDatetime(final MySQLBinlogColumnDef columnDef, final long datetime, final MySQLPacketPayload payload) {
long datetimeWithoutSign = datetime & (0x8000000000L - 1);
+ if (0 == datetimeWithoutSign) {
+ return MySQLTimeValueUtils.DATETIME_OF_ZERO;
+ }
long date = datetimeWithoutSign >> 17;
long yearAndMonth = date >> 5;
int year = (int) (yearAndMonth / 13);
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java
index 23f115a4629..3bbfe647169 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java
@@ -58,8 +58,12 @@ public final class ShardingSpherePipelineDataSourceConfiguration implements Pipe
private final DatabaseType databaseType;
public ShardingSpherePipelineDataSourceConfiguration(final String param) {
- parameter = param;
rootConfig = YamlEngine.unmarshal(param, YamlRootConfiguration.class, true);
+ // Need remove dataSourceProperties, because if the parameter at dataSourceProperties will override parameter at jdbcUrl
+ for (Map<String, Object> each : rootConfig.getDataSources().values()) {
+ each.remove("dataSourceProperties");
+ }
+ parameter = YamlEngine.marshal(rootConfig);
Map<String, Object> props = rootConfig.getDataSources().values().iterator().next();
databaseType = DatabaseTypeEngine.getDatabaseType(getJdbcUrl(props));
appendJdbcQueryProperties(databaseType.getType());
diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
index 44a5a0ea659..6d82af059f7 100644
--- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
+++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
@@ -110,7 +110,6 @@ public final class MySQLClient {
}
}).connect(connectInfo.getHost(), connectInfo.getPort()).channel();
serverInfo = waitExpectedResponse(ServerInfo.class);
- reconnectTimes.set(0);
running = true;
}
@@ -305,6 +304,7 @@ public final class MySQLClient {
if (msg instanceof AbstractBinlogEvent) {
lastBinlogEvent = (AbstractBinlogEvent) msg;
blockingEventQueue.put(lastBinlogEvent);
+ reconnectTimes.set(0);
}
}
@@ -332,6 +332,7 @@ public final class MySQLClient {
}
reconnectTimes.incrementAndGet();
connect();
+ log.info("reconnect times {}", reconnectTimes.get());
subscribe(lastBinlogEvent.getFileName(), lastBinlogEvent.getPosition());
}
}
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLTimeTypesMigrationE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLTimeTypesMigrationE2EIT.java
new file mode 100644
index 00000000000..f597af52cbc
--- /dev/null
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLTimeTypesMigrationE2EIT.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.general;
+
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
+import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.test.e2e.data.pipeline.cases.PipelineContainerComposer;
+import org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.AbstractMigrationE2EIT;
+import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ECondition;
+import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ESettings;
+import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ETestCaseArgumentsProvider;
+import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
+import org.junit.jupiter.api.condition.EnabledIf;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ArgumentsSource;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+/**
+ * E2E IT for time types of MySQL, includes.
+ * 1) datetime,timestamp,date default null
+ */
+@PipelineE2ESettings(fetchSingle = true, database = @PipelineE2ESettings.PipelineE2EDatabaseSettings(type = "MySQL", scenarioFiles = "env/common/none.xml"))
+public class MySQLTimeTypesMigrationE2EIT extends AbstractMigrationE2EIT {
+
+ @ParameterizedTest(name = "{0}")
+ @EnabledIf("isEnabled")
+ @ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
+ void assertIllegalTimeTypesValueMigrationSuccess(final PipelineTestParameter testParam) throws Exception {
+ try (PipelineContainerComposer containerComposer = new PipelineContainerComposer(testParam, new MigrationJobType())) {
+ String sql = "CREATE TABLE `time_e2e` ( `id` int NOT NULL, `t_timestamp` timestamp NULL DEFAULT NULL, `t_datetime` datetime DEFAULT NULL, `t_date` date DEFAULT NULL, "
+ + "`t_year` year DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB;";
+ containerComposer.sourceExecuteWithLog(sql);
+ insertOneRecordWithZeroValue(containerComposer, 1);
+ addMigrationSourceResource(containerComposer);
+ addMigrationTargetResource(containerComposer);
+ startMigration(containerComposer, "time_e2e", "time_e2e");
+ String jobId = listJobId(containerComposer).get(0);
+ containerComposer.waitJobPrepareSuccess(String.format("SHOW MIGRATION STATUS '%s'", jobId));
+ insertOneRecordWithZeroValue(containerComposer, 2);
+ containerComposer.waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
+ assertCheckMigrationSuccess(containerComposer, jobId, "DATA_MATCH");
+ }
+ }
+
+ private void insertOneRecordWithZeroValue(final PipelineContainerComposer containerComposer, final int id) throws SQLException {
+ try (Connection connection = containerComposer.getSourceDataSource().getConnection()) {
+ PreparedStatement preparedStatement = connection.prepareStatement("INSERT INTO `time_e2e`(id, t_timestamp, t_datetime, t_date, t_year) VALUES (?, ?, ?, ?, ?)");
+ preparedStatement.setObject(1, id);
+ preparedStatement.setObject(2, "0000-00-00 00:00:00");
+ preparedStatement.setObject(3, "0000-00-00 00:00:00");
+ preparedStatement.setObject(4, "0000-00-00");
+ preparedStatement.setObject(5, "0000");
+ preparedStatement.execute();
+ }
+ }
+
+ private static boolean isEnabled() {
+ return PipelineE2ECondition.isEnabled(new MySQLDatabaseType());
+ }
+}