You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by "azexcy (via GitHub)" <gi...@apache.org> on 2023/03/07 07:23:28 UTC

[GitHub] [shardingsphere] azexcy opened a new pull request, #24485: E2E add more test fields in the incremental stage of pipeline

azexcy opened a new pull request, #24485:
URL: https://github.com/apache/shardingsphere/pull/24485

   
   
   Changes proposed in this pull request:
     - E2E add more test fields in the incremental stage of pipeline
   
   ---
   
   Before committing this PR, I'm sure that I have checked the following options:
   - [ ] My code follows the [code of conduct](https://shardingsphere.apache.org/community/en/involved/conduct/code/) of this project.
   - [ ] I have self-reviewed the commit code.
   - [ ] I have (or in comment I request) added corresponding labels for the pull request.
   - [ ] I have passed maven check locally : `./mvnw clean install -B -T1C -Dmaven.javadoc.skip -Dmaven.jacoco.skip -e`.
   - [ ] I have made corresponding changes to the documentation.
   - [ ] I have added corresponding unit tests for my changes.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] sandynz commented on a diff in pull request #24485: E2E add more test fields in the incremental stage of pipeline

Posted by "sandynz (via GitHub)" <gi...@apache.org>.
sandynz commented on code in PR #24485:
URL: https://github.com/apache/shardingsphere/pull/24485#discussion_r1130350307


##########
test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java:
##########
@@ -90,14 +90,15 @@ public void assertMigrationSuccess() throws SQLException, InterruptedException {
         createTargetOrderItemTableRule();
         Pair<List<Object[]>, List<Object[]>> dataPair = PipelineCaseHelper.generateFullInsertData(testParam.getDatabaseType(), PipelineBaseE2EIT.TABLE_INIT_ROW_COUNT);
         log.info("init data begin: {}", LocalDateTime.now());
-        DataSourceExecuteUtil.execute(getSourceDataSource(), getExtraSQLCommand().getFullInsertOrder(getSourceTableOrderName()), dataPair.getLeft());
+        String insertOrderSql = getExtraSQLCommand().getFullInsertOrder(getSourceTableOrderName());

Review Comment:
   `insertOrderSql` could be `insertOrderSQL`



##########
test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java:
##########
@@ -125,38 +125,36 @@ public void assertCDCDataImportSuccess() throws SQLException, InterruptedExcepti
             registerStorageUnit(each);
         }
         createOrderTableRule();
-        try (Connection connection = getProxyDataSource().getConnection()) {
+        DataSource proxyDataSource = generateShardingSphereDataSourceFromProxy();
+        try (Connection connection = proxyDataSource.getConnection()) {
             initSchemaAndTable(connection);
         }
         Pair<List<Object[]>, List<Object[]>> dataPair = PipelineCaseHelper.generateFullInsertData(getDatabaseType(), 20);
         log.info("init data begin: {}", LocalDateTime.now());
-        DataSourceExecuteUtil.execute(getProxyDataSource(), getExtraSQLCommand().getFullInsertOrder(getSourceTableOrderName()), dataPair.getLeft());
+        String insertOrderTableSql = getExtraSQLCommand().getFullInsertOrder(getSourceTableOrderName());
+        DataSourceExecuteUtil.execute(proxyDataSource, insertOrderTableSql, dataPair.getLeft());
         log.info("init data end: {}", LocalDateTime.now());
         try (Connection connection = DriverManager.getConnection(getActualJdbcUrlTemplate(DS_4, false), getUsername(), getPassword())) {
             initSchemaAndTable(connection);
         }
         startCDCClient();
         Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(() -> !queryForListWithLog("SHOW STREAMING LIST").isEmpty());
-        if (getDatabaseType() instanceof MySQLDatabaseType) {
-            startIncrementTask(new MySQLIncrementTask(getProxyDataSource(), getSourceTableOrderName(), new SnowflakeKeyGenerateAlgorithm(), 20));
-        } else {
-            startIncrementTask(new PostgreSQLIncrementTask(getProxyDataSource(), PipelineBaseE2EIT.SCHEMA_NAME, getSourceTableOrderName(), 20));
-        }
+        startIncrementTask(new E2EIncrementalTask(proxyDataSource, getSourceTableOrderName(), insertOrderTableSql, new SnowflakeKeyGenerateAlgorithm(), getDatabaseType(), 20));
         getIncreaseTaskThread().join(10000);
         List<Map<String, Object>> actualProxyList;
-        try (Connection connection = getProxyDataSource().getConnection()) {
+        try (Connection connection = proxyDataSource.getConnection()) {
             ResultSet resultSet = connection.createStatement().executeQuery(String.format("SELECT * FROM %s ORDER BY order_id ASC", getOrderTableNameWithSchema()));
             actualProxyList = transformResultSetToList(resultSet);
         }
-        Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(2, TimeUnit.SECONDS).until(() -> listOrderRecords(getOrderTableNameWithSchema()).size() == actualProxyList.size());
+        Awaitility.await().atMost(20, TimeUnit.SECONDS).pollInterval(2, TimeUnit.SECONDS).until(() -> listOrderRecords(getOrderTableNameWithSchema()).size() == actualProxyList.size());

Review Comment:
   Could we remove `actualProxyList` check? Since there's SingleTableInventoryDataConsistencyChecker.check



##########
db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/string/MySQLStringBinlogProtocolValue.java:
##########
@@ -45,7 +45,7 @@ public Serializable read(final MySQLBinlogColumnDef columnDef, final MySQLPacket
             case MYSQL_TYPE_SET:
                 return payload.getByteBuf().readByte();
             case MYSQL_TYPE_STRING:
-                return payload.readStringFix(readActualLength(length, payload));
+                return payload.readStringFixByBytes(readActualLength(length, payload));

Review Comment:
   It should return `new MySQLBinaryString(bytes)`



##########
test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.test.e2e.data.pipeline.cases.task;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
+import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
+import org.apache.shardingsphere.test.e2e.data.pipeline.cases.base.BaseIncrementTask;
+import org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
+import org.apache.shardingsphere.test.e2e.data.pipeline.util.DataSourceExecuteUtil;
+
+import javax.sql.DataSource;
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.Year;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+@RequiredArgsConstructor
+@Slf4j
+public final class E2EIncrementalTask extends BaseIncrementTask {
+    
+    private static final List<String> MYSQL_COLUMN_NAMES = Arrays.asList("status", "t_mediumint", "t_smallint", "t_tinyint", "t_unsigned_int", "t_unsigned_mediumint", "t_unsigned_smallint",
+            "t_unsigned_tinyint", "t_float", "t_double", "t_decimal", "t_timestamp", "t_datetime", "t_date", "t_time", "t_year", "t_bit", "t_binary", "t_varbinary", "t_blob", "t_mediumblob",
+            "t_char", "t_text", "t_mediumtext", "t_enum", "t_set", "t_json");
+    
+    private static final List<String> POSTGRESQL_COLUMN_NAMES = Arrays.asList("status", "t_int2", "t_numeric", "t_bool", "t_bytea", "t_char", "t_float", "t_double", "t_json", "t_jsonb", "t_text",
+            "t_date", "t_time", "t_timestamp", "t_timestamptz");
+    
+    private final DataSource dataSource;
+    
+    private final String orderTableName;
+    
+    private final String insertTableSql;
+    
+    private final KeyGenerateAlgorithm primaryKeyGenerateAlgorithm;
+    
+    private final DatabaseType databaseType;
+    
+    private final int loopCount;
+    
+    @Override
+    public void run() {
+        List<Object[]> orderInsertData = PipelineCaseHelper.generateOrderInsertData(databaseType, primaryKeyGenerateAlgorithm, loopCount);
+        List<Object> primaryKeys = new LinkedList<>();
+        for (Object[] each : orderInsertData) {
+            primaryKeys.add(each[0]);
+            DataSourceExecuteUtil.execute(dataSource, insertTableSql, each);
+        }
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        for (int i = 0; i < Math.max(1, loopCount / 3); i++) {
+            // TODO 0000-00-00 00:00:00 now will cause consistency check failed of MySQL.
+            // DataSourceUtil.execute(dataSource, String.format("UPDATE %s SET t_datetime='0000-00-00 00:00:00' WHERE order_id = ?", orderTableName)
+            updateOrderById(primaryKeys.get(random.nextInt(0, primaryKeys.size())));
+        }
+        for (int i = 0; i < Math.max(1, loopCount / 3); i++) {
+            setNullToAllFields(primaryKeys.get(random.nextInt(0, primaryKeys.size())));
+            deleteOrderById(primaryKeys.get(random.nextInt(0, primaryKeys.size())));
+        }
+        log.info("increment task runnable execute successfully.");
+    }
+    
+    private void updateOrderById(final Object orderId) {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        int randomInt = random.nextInt(-100, 100);
+        if (databaseType instanceof MySQLDatabaseType) {
+            String sql = String.format(buildUpdateSql(MYSQL_COLUMN_NAMES, "?"), orderTableName);
+            log.info("update sql: {}", sql);
+            int randomUnsignedInt = random.nextInt(10, 100);
+            LocalDateTime now = LocalDateTime.now();
+            DataSourceExecuteUtil.execute(dataSource, sql, new Object[]{"中文测试", randomInt, randomInt, randomInt, randomUnsignedInt, randomUnsignedInt, randomUnsignedInt,
+                    randomUnsignedInt, 1.0F, 1.0, new BigDecimal("999"), now, now, now.toLocalDate(), now.toLocalTime(), Year.now().getValue() + 1, new byte[]{}, new byte[]{1, 2, -1, -3},
+                    "D".getBytes(), "A".getBytes(), "T".getBytes(), "E", "text", "mediumText", "3", "3", PipelineCaseHelper.generateJsonString(32, true), orderId});
+            return;
+        }
+        if (databaseType instanceof PostgreSQLDatabaseType || databaseType instanceof OpenGaussDatabaseType) {
+            String sql = String.format(buildUpdateSql(POSTGRESQL_COLUMN_NAMES, "?"), orderTableName);
+            log.info("update sql: {}", sql);
+            DataSourceExecuteUtil.execute(dataSource, sql, new Object[]{"中文测试", randomInt, BigDecimal.valueOf(10000), true, new byte[]{}, "update", PipelineCaseHelper.generateFloat(),
+                    PipelineCaseHelper.generateDouble(), PipelineCaseHelper.generateJsonString(10, true), PipelineCaseHelper.generateJsonString(20, true), "text-update", LocalDate.now(),
+                    LocalTime.now(), Timestamp.valueOf(LocalDateTime.now()), OffsetDateTime.now(), orderId});
+        }
+    }
+    
+    private String buildUpdateSql(final List<String> columnNames, final String placeholder) {

Review Comment:
   `buildUpdateSql` could be `buildUpdateSQL`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] sandynz merged pull request #24485: Add more test fields in incremental stage for pipeline job E2E

Posted by "sandynz (via GitHub)" <gi...@apache.org>.
sandynz merged PR #24485:
URL: https://github.com/apache/shardingsphere/pull/24485


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] sandynz commented on pull request #24485: E2E add more test fields in the incremental stage of pipeline

Posted by "sandynz (via GitHub)" <gi...@apache.org>.
sandynz commented on PR #24485:
URL: https://github.com/apache/shardingsphere/pull/24485#issuecomment-1461230875

   TODO:
   Find special float value that could not be represented exactly, then add it in E2E.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] sandynz commented on a diff in pull request #24485: E2E add more test fields in the incremental stage of pipeline

Posted by "sandynz (via GitHub)" <gi...@apache.org>.
sandynz commented on code in PR #24485:
URL: https://github.com/apache/shardingsphere/pull/24485#discussion_r1130527251


##########
test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.test.e2e.data.pipeline.cases.task;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
+import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
+import org.apache.shardingsphere.test.e2e.data.pipeline.cases.base.BaseIncrementTask;
+import org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
+import org.apache.shardingsphere.test.e2e.data.pipeline.util.DataSourceExecuteUtil;
+
+import javax.sql.DataSource;
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.Year;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+@RequiredArgsConstructor
+@Slf4j
+public final class E2EIncrementalTask extends BaseIncrementTask {
+    
+    private static final List<String> MYSQL_COLUMN_NAMES = Arrays.asList("status", "t_mediumint", "t_smallint", "t_tinyint", "t_unsigned_int", "t_unsigned_mediumint", "t_unsigned_smallint",
+            "t_unsigned_tinyint", "t_float", "t_double", "t_decimal", "t_timestamp", "t_datetime", "t_date", "t_time", "t_year", "t_bit", "t_binary", "t_varbinary", "t_blob", "t_mediumblob",
+            "t_char", "t_text", "t_mediumtext", "t_enum", "t_set", "t_json");
+    
+    private static final List<String> POSTGRESQL_COLUMN_NAMES = Arrays.asList("status", "t_int2", "t_numeric", "t_bool", "t_bytea", "t_char", "t_float", "t_double", "t_json", "t_jsonb", "t_text",
+            "t_date", "t_time", "t_timestamp", "t_timestamptz");
+    
+    private final DataSource dataSource;
+    
+    private final String orderTableName;
+    
+    private final String insertTableSql;

Review Comment:
   Could we remove `insertTableSql`, use columns to generate one



##########
test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/helper/PipelineCaseHelper.java:
##########
@@ -120,12 +127,32 @@ public static String generateJsonString(final int length, final boolean useUnico
         return String.format("{\"test\":\"%s\"}", value);
     }
     
-    private static float generateFloat() {
-        return ThreadLocalRandom.current().nextFloat();
+    /**
+     * Generate float value.
+     *
+     * @return float.
+     */
+    public static float generateFloat() {
+        return ThreadLocalRandom.current().nextInt(-1000, 1000) / 100.0F;
+    }
+    
+    /**
+     * Generate double value.
+     *
+     * @return double
+     */
+    public static double generateDouble() {
+        return ThreadLocalRandom.current().nextInt(-1000000000, 1000000000) / 1000000.0;

Review Comment:
   `1000000.0` could be `1000000.0D`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] codecov-commenter commented on pull request #24485: E2E add more test fields in the incremental stage of pipeline

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #24485:
URL: https://github.com/apache/shardingsphere/pull/24485#issuecomment-1458210106

   # [Codecov](https://codecov.io/gh/apache/shardingsphere/pull/24485?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#24485](https://codecov.io/gh/apache/shardingsphere/pull/24485?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (72dfb4e) into [master](https://codecov.io/gh/apache/shardingsphere/commit/134a24a80df9ef1ec959ea217db8cfd6a6f4cc2f?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (134a24a) will **not change** coverage.
   > The diff coverage is `100.00%`.
   
   > :exclamation: Current head 72dfb4e differs from pull request most recent head 24ec38c. Consider uploading reports for the commit 24ec38c to get more accurate results
   
   ```diff
   @@            Coverage Diff            @@
   ##             master   #24485   +/-   ##
   =========================================
     Coverage     50.22%   50.22%           
     Complexity     1588     1588           
   =========================================
     Files          3270     3270           
     Lines         53705    53705           
     Branches       9884     9884           
   =========================================
     Hits          26971    26971           
     Misses        24323    24323           
     Partials       2411     2411           
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/shardingsphere/pull/24485?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...n/value/string/MySQLStringBinlogProtocolValue.java](https://codecov.io/gh/apache/shardingsphere/pull/24485?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGItcHJvdG9jb2wvbXlzcWwvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3NoYXJkaW5nc3BoZXJlL2RiL3Byb3RvY29sL215c3FsL3BhY2tldC9iaW5sb2cvcm93L2NvbHVtbi92YWx1ZS9zdHJpbmcvTXlTUUxTdHJpbmdCaW5sb2dQcm90b2NvbFZhbHVlLmphdmE=) | `100.00% <100.00%> (ø)` | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] sandynz commented on a diff in pull request #24485: E2E add more test fields in the incremental stage of pipeline

Posted by "sandynz (via GitHub)" <gi...@apache.org>.
sandynz commented on code in PR #24485:
URL: https://github.com/apache/shardingsphere/pull/24485#discussion_r1127478004


##########
test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/IncrementalTask.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.test.e2e.data.pipeline.cases.task;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
+import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
+import org.apache.shardingsphere.test.e2e.data.pipeline.cases.base.BaseIncrementTask;
+import org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
+import org.apache.shardingsphere.test.e2e.data.pipeline.util.DataSourceExecuteUtil;
+
+import javax.sql.DataSource;
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.Year;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+@RequiredArgsConstructor
+@Slf4j
+public final class IncrementalTask extends BaseIncrementTask {
+    
+    private static final String MYSQL_UPDATE_ORDER_BY_ID = "UPDATE `%s` SET status = ?, t_mediumint=?, t_smallint=?, t_tinyint=?, t_unsigned_int=?, t_unsigned_mediumint=?, t_unsigned_smallint=?, "
+            + "t_unsigned_tinyint=?, t_float=?, t_double=?, t_decimal=?, t_timestamp=?, t_datetime=?, t_date=?, t_time=?, t_year=?, t_bit=?, t_binary=?, t_varbinary=?, t_blob=?, t_mediumblob=?, "
+            + "t_char=?, t_text=?, t_mediumtext=?, t_enum=?, t_set=?, t_json=? WHERE order_id = ?";
+    
+    private static final String POSTGRESQL_UPDATE_ORDER_BY_ID = "UPDATE %S SET status=?, t_int2=?, t_numeric=?, t_bool=?, t_bytea=?, t_char=?, t_float=?, t_double=?, t_json=?, t_jsonb=?, t_text=?, "
+            + "t_date=?, t_time=?, t_timestamp=?, t_timestamptz=? WHERE order_id = ?";
+    
+    private final DataSource dataSource;
+    
+    private final String orderTableName;
+    
+    private final String insertTableSql;
+    
+    private final KeyGenerateAlgorithm primaryKeyGenerateAlgorithm;
+    
+    private final DatabaseType databaseType;
+    
+    private final int loopCount;
+    
+    @Override
+    public void run() {
+        List<Object[]> orderInsertData = PipelineCaseHelper.generateOrderInsertData(databaseType, primaryKeyGenerateAlgorithm, loopCount);
+        List<Object> primaryKeys = new LinkedList<>();
+        for (Object[] each : orderInsertData) {
+            primaryKeys.add(each[0]);
+            DataSourceExecuteUtil.execute(dataSource, insertTableSql, each);
+        }
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        for (int i = 0; i < Math.max(1, loopCount / 4); i++) {
+            // TODO 0000-00-00 00:00:00 now will cause consistency check failed of MySQL.
+            // DataSourceUtil.execute(dataSource, String.format("UPDATE %s SET t_datetime='0000-00-00 00:00:00' WHERE order_id = ?", orderTableName)
+            updateOrderById(primaryKeys.get(random.nextInt(0, primaryKeys.size())));
+            if (databaseType instanceof MySQLDatabaseType) {
+                setNullToUnsignedFields(primaryKeys.get(random.nextInt(0, primaryKeys.size())));
+            }
+        }
+        for (int i = 0; i < Math.max(1, loopCount / 5); i++) {
+            deleteOrderById(primaryKeys.get(random.nextInt(0, primaryKeys.size())));
+        }
+        log.info("increment task runnable execute successfully.");
+    }
+    
+    private void updateOrderById(final Object orderId) {
+        int randomInt = ThreadLocalRandom.current().nextInt(-100, 100);
+        if (databaseType instanceof MySQLDatabaseType) {
+            String updateSql = String.format(MYSQL_UPDATE_ORDER_BY_ID, orderTableName);
+            int randomUnsignedInt = ThreadLocalRandom.current().nextInt(10, 100);
+            LocalDateTime now = LocalDateTime.now();
+            DataSourceExecuteUtil.execute(dataSource, updateSql, new Object[]{"中文测试", randomInt, randomInt, randomInt, randomUnsignedInt, randomUnsignedInt, randomUnsignedInt,
+                    randomUnsignedInt, 1.0F, 1.0, new BigDecimal("999"), now, now, now.toLocalDate(), now.toLocalTime(), Year.now().getValue() + 1, "U".getBytes(), "P".getBytes(),
+                    "D".getBytes(), "A".getBytes(), "T".getBytes(), "E", "text", "mediumText", "3", "3", PipelineCaseHelper.generateJsonString(32, true), orderId});
+            return;
+        }
+        if (databaseType instanceof PostgreSQLDatabaseType || databaseType instanceof OpenGaussDatabaseType) {
+            String updateSql = String.format(POSTGRESQL_UPDATE_ORDER_BY_ID, orderTableName);
+            DataSourceExecuteUtil.execute(dataSource, updateSql, new Object[]{"中文测试", randomInt, BigDecimal.valueOf(10000), true, "bytea-update".getBytes(), "update", 1.0F, 2.0,
+                    PipelineCaseHelper.generateJsonString(10, true), PipelineCaseHelper.generateJsonString(20, true), "text-update", LocalDate.now(),
+                    LocalTime.now(), Timestamp.valueOf(LocalDateTime.now()), OffsetDateTime.now(), orderId});
+        }
+    }
+    
+    private void deleteOrderById(final Object orderId) {
+        String sql = String.format("DELETE FROM %s WHERE order_id = ?", orderTableName);
+        DataSourceExecuteUtil.execute(dataSource, sql, new Object[]{orderId});
+    }
+    
+    private void setNullToUnsignedFields(final Object orderId) {
+        DataSourceExecuteUtil.execute(dataSource, String.format("UPDATE %s SET t_unsigned_int = null, t_unsigned_tinyint = null WHERE order_id = ?", orderTableName),

Review Comment:
   Could we set null for all columns?



##########
test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/IncrementalTask.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.test.e2e.data.pipeline.cases.task;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
+import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
+import org.apache.shardingsphere.test.e2e.data.pipeline.cases.base.BaseIncrementTask;
+import org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
+import org.apache.shardingsphere.test.e2e.data.pipeline.util.DataSourceExecuteUtil;
+
+import javax.sql.DataSource;
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.Year;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+@RequiredArgsConstructor
+@Slf4j
+public final class IncrementalTask extends BaseIncrementTask {
+    
+    private static final String MYSQL_UPDATE_ORDER_BY_ID = "UPDATE `%s` SET status = ?, t_mediumint=?, t_smallint=?, t_tinyint=?, t_unsigned_int=?, t_unsigned_mediumint=?, t_unsigned_smallint=?, "
+            + "t_unsigned_tinyint=?, t_float=?, t_double=?, t_decimal=?, t_timestamp=?, t_datetime=?, t_date=?, t_time=?, t_year=?, t_bit=?, t_binary=?, t_varbinary=?, t_blob=?, t_mediumblob=?, "
+            + "t_char=?, t_text=?, t_mediumtext=?, t_enum=?, t_set=?, t_json=? WHERE order_id = ?";
+    
+    private static final String POSTGRESQL_UPDATE_ORDER_BY_ID = "UPDATE %S SET status=?, t_int2=?, t_numeric=?, t_bool=?, t_bytea=?, t_char=?, t_float=?, t_double=?, t_json=?, t_jsonb=?, t_text=?, "
+            + "t_date=?, t_time=?, t_timestamp=?, t_timestamptz=? WHERE order_id = ?";
+    
+    private final DataSource dataSource;
+    
+    private final String orderTableName;
+    
+    private final String insertTableSql;
+    
+    private final KeyGenerateAlgorithm primaryKeyGenerateAlgorithm;
+    
+    private final DatabaseType databaseType;
+    
+    private final int loopCount;
+    
+    @Override
+    public void run() {
+        List<Object[]> orderInsertData = PipelineCaseHelper.generateOrderInsertData(databaseType, primaryKeyGenerateAlgorithm, loopCount);
+        List<Object> primaryKeys = new LinkedList<>();
+        for (Object[] each : orderInsertData) {
+            primaryKeys.add(each[0]);
+            DataSourceExecuteUtil.execute(dataSource, insertTableSql, each);
+        }
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        for (int i = 0; i < Math.max(1, loopCount / 4); i++) {
+            // TODO 0000-00-00 00:00:00 now will cause consistency check failed of MySQL.
+            // DataSourceUtil.execute(dataSource, String.format("UPDATE %s SET t_datetime='0000-00-00 00:00:00' WHERE order_id = ?", orderTableName)
+            updateOrderById(primaryKeys.get(random.nextInt(0, primaryKeys.size())));
+            if (databaseType instanceof MySQLDatabaseType) {
+                setNullToUnsignedFields(primaryKeys.get(random.nextInt(0, primaryKeys.size())));
+            }
+        }
+        for (int i = 0; i < Math.max(1, loopCount / 5); i++) {
+            deleteOrderById(primaryKeys.get(random.nextInt(0, primaryKeys.size())));
+        }
+        log.info("increment task runnable execute successfully.");
+    }
+    
+    private void updateOrderById(final Object orderId) {
+        int randomInt = ThreadLocalRandom.current().nextInt(-100, 100);
+        if (databaseType instanceof MySQLDatabaseType) {
+            String updateSql = String.format(MYSQL_UPDATE_ORDER_BY_ID, orderTableName);
+            int randomUnsignedInt = ThreadLocalRandom.current().nextInt(10, 100);
+            LocalDateTime now = LocalDateTime.now();
+            DataSourceExecuteUtil.execute(dataSource, updateSql, new Object[]{"中文测试", randomInt, randomInt, randomInt, randomUnsignedInt, randomUnsignedInt, randomUnsignedInt,
+                    randomUnsignedInt, 1.0F, 1.0, new BigDecimal("999"), now, now, now.toLocalDate(), now.toLocalTime(), Year.now().getValue() + 1, "U".getBytes(), "P".getBytes(),
+                    "D".getBytes(), "A".getBytes(), "T".getBytes(), "E", "text", "mediumText", "3", "3", PipelineCaseHelper.generateJsonString(32, true), orderId});
+            return;
+        }
+        if (databaseType instanceof PostgreSQLDatabaseType || databaseType instanceof OpenGaussDatabaseType) {
+            String updateSql = String.format(POSTGRESQL_UPDATE_ORDER_BY_ID, orderTableName);
+            DataSourceExecuteUtil.execute(dataSource, updateSql, new Object[]{"中文测试", randomInt, BigDecimal.valueOf(10000), true, "bytea-update".getBytes(), "update", 1.0F, 2.0,

Review Comment:
   Could we use real binary to test but not ascii chars. e.g. `new byte[] {-1, 0, 1}`
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org