You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2023/06/07 07:28:15 UTC

[shardingsphere] branch master updated: Improve performance of integer unique key table inventory data splitting (#26089)

This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c45ed22b7d Improve performance of integer unique key table inventory data splitting (#26089)
2c45ed22b7d is described below

commit 2c45ed22b7d5337351b5507cfc2c02e4c4d6430f
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Wed Jun 7 15:27:57 2023 +0800

    Improve performance of integer unique key table inventory data splitting (#26089)
    
    * Improve performance of integer unique key table inventory data splitting
    
    * Update unit test
    
    * Rename method
---
 .../spi/sqlbuilder/PipelineSQLBuilder.java         | 21 ++++---
 .../core/prepare/InventoryTaskSplitter.java        | 62 ++++++++++----------
 .../sqlbuilder/AbstractPipelineSQLBuilder.java     | 13 ++---
 .../core/util/IntervalToRangeIterator.java         | 62 ++++++++++++++++++++
 .../core/sqlbuilder/FixturePipelineSQLBuilder.java | 10 ++--
 .../core/util/IntervalToRangeIteratorTest.java     | 67 ++++++++++++++++++++++
 .../core/prepare/InventoryTaskSplitterTest.java    |  2 +-
 7 files changed, 182 insertions(+), 55 deletions(-)

diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
index d0610dcb3be..440ff4f8363 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
@@ -136,7 +136,6 @@ public interface PipelineSQLBuilder extends TypedSPI {
      * @param tableName table name
      * @return count SQL
      */
-    // TODO keep it for now, it might be used later
     String buildCountSQL(String schemaName, String tableName);
     
     /**
@@ -148,6 +147,16 @@ public interface PipelineSQLBuilder extends TypedSPI {
      */
     Optional<String> buildEstimatedCountSQL(String schemaName, String tableName);
     
+    /**
+     * Build unique key minimum maximum values SQL.
+     *
+     * @param schemaName schema name
+     * @param tableName table name
+     * @param uniqueKey unique key
+     * @return min max unique key SQL
+     */
+    String buildUniqueKeyMinMaxValuesSQL(String schemaName, String tableName, String uniqueKey);
+    
     /**
      * Build query all ordering SQL.
      *
@@ -169,16 +178,6 @@ public interface PipelineSQLBuilder extends TypedSPI {
      */
     String buildCheckEmptySQL(String schemaName, String tableName);
     
-    /**
-     * Build split by primary key range SQL.
-     *
-     * @param schemaName schema name
-     * @param tableName table name
-     * @param uniqueKey unique key
-     * @return split SQL
-     */
-    String buildSplitByPrimaryKeyRangeSQL(String schemaName, String tableName, String uniqueKey);
-    
     /**
      * Build CRC32 SQL.
      *
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index 86b781f41ab..c30d6f30e2b 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.core.prepare;
 
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.Range;
 import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineReadConfiguration;
@@ -43,15 +44,17 @@ import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumpe
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtils;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
+import org.apache.shardingsphere.data.pipeline.core.util.IntervalToRangeIterator;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
 import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
 import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
 
+import javax.sql.DataSource;
 import java.sql.Connection;
-import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
@@ -161,7 +164,8 @@ public final class InventoryTaskSplitter {
                 return result;
             }
         }
-        jobItemContext.updateInventoryRecordsCount(InventoryRecordsCountCalculator.getTableRecordsCount(dumperConfig, dataSource));
+        long tableRecordsCount = InventoryRecordsCountCalculator.getTableRecordsCount(dumperConfig, dataSource);
+        jobItemContext.updateInventoryRecordsCount(tableRecordsCount);
         if (!dumperConfig.hasUniqueKey()) {
             return Collections.singletonList(new NoUniqueKeyPosition());
         }
@@ -169,7 +173,7 @@ public final class InventoryTaskSplitter {
         if (1 == uniqueKeyColumns.size()) {
             int firstColumnDataType = uniqueKeyColumns.get(0).getDataType();
             if (PipelineJdbcUtils.isIntegerColumn(firstColumnDataType)) {
-                return getPositionByIntegerUniqueKeyRange(dumperConfig, jobItemContext, dataSource);
+                return getPositionByIntegerUniqueKeyRange(dumperConfig, tableRecordsCount, jobItemContext, dataSource);
             }
             if (PipelineJdbcUtils.isStringColumn(firstColumnDataType)) {
                 // TODO Support string unique key table splitting. Ascii characters ordering are different in different versions of databases.
@@ -179,40 +183,36 @@ public final class InventoryTaskSplitter {
         return Collections.singletonList(new UnsupportedKeyPosition());
     }
     
-    private Collection<IngestPosition> getPositionByIntegerUniqueKeyRange(final InventoryDumperConfiguration dumperConfig, final InventoryIncrementalJobItemContext jobItemContext,
-                                                                          final PipelineDataSourceWrapper dataSource) {
+    private Collection<IngestPosition> getPositionByIntegerUniqueKeyRange(final InventoryDumperConfiguration dumperConfig, final long tableRecordsCount,
+                                                                          final InventoryIncrementalJobItemContext jobItemContext, final PipelineDataSourceWrapper dataSource) {
+        if (0 == tableRecordsCount) {
+            return Collections.singletonList(new IntegerPrimaryKeyPosition(0, 0));
+        }
         Collection<IngestPosition> result = new LinkedList<>();
-        String uniqueKey = dumperConfig.getUniqueKeyColumns().get(0).getName();
-        String sql = PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, dataSource.getDatabaseType().getType())
-                .buildSplitByPrimaryKeyRangeSQL(dumperConfig.getSchemaName(new LogicTableName(dumperConfig.getLogicTableName())), dumperConfig.getActualTableName(), uniqueKey);
+        Range<Long> uniqueKeyValuesRange = getUniqueKeyValuesRange(jobItemContext, dataSource, dumperConfig);
         int shardingSize = jobItemContext.getJobProcessContext().getPipelineProcessConfig().getRead().getShardingSize();
+        long splitCount = tableRecordsCount / shardingSize + (tableRecordsCount % shardingSize > 0 ? 1 : 0);
+        long interval = (uniqueKeyValuesRange.getMaximum() - uniqueKeyValuesRange.getMinimum()) / splitCount;
+        IntervalToRangeIterator rangeIterator = new IntervalToRangeIterator(uniqueKeyValuesRange.getMinimum(), uniqueKeyValuesRange.getMaximum(), interval);
+        while (rangeIterator.hasNext()) {
+            Range<Long> range = rangeIterator.next();
+            result.add(new IntegerPrimaryKeyPosition(range.getMinimum(), range.getMaximum()));
+        }
+        return result;
+    }
+    
+    private Range<Long> getUniqueKeyValuesRange(final InventoryIncrementalJobItemContext jobItemContext, final DataSource dataSource, final InventoryDumperConfiguration dumperConfig) {
+        String uniqueKey = dumperConfig.getUniqueKeyColumns().get(0).getName();
+        String sql = PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, jobItemContext.getJobConfig().getSourceDatabaseType())
+                .buildUniqueKeyMinMaxValuesSQL(dumperConfig.getSchemaName(new LogicTableName(dumperConfig.getLogicTableName())), dumperConfig.getActualTableName(), uniqueKey);
         try (
                 Connection connection = dataSource.getConnection();
-                PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
-            // TODO query minimum value less than 0
-            long beginId = 0;
-            for (int i = 0; i < Integer.MAX_VALUE; i++) {
-                preparedStatement.setLong(1, beginId);
-                preparedStatement.setLong(2, shardingSize);
-                try (ResultSet resultSet = preparedStatement.executeQuery()) {
-                    if (!resultSet.next()) {
-                        break;
-                    }
-                    long endId = resultSet.getLong(1);
-                    if (0 == endId) {
-                        break;
-                    }
-                    result.add(new IntegerPrimaryKeyPosition(beginId, endId));
-                    beginId = endId + 1;
-                }
-            }
-            // fix empty table missing inventory task
-            if (result.isEmpty()) {
-                result.add(new IntegerPrimaryKeyPosition(0, 0));
-            }
+                Statement statement = connection.createStatement();
+                ResultSet resultSet = statement.executeQuery(sql)) {
+            resultSet.next();
+            return Range.between(resultSet.getLong(1), resultSet.getLong(2));
         } catch (final SQLException ex) {
             throw new SplitPipelineJobByUniqueKeyException(dumperConfig.getActualTableName(), uniqueKey, ex);
         }
-        return result;
     }
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
index 6494932f763..443655123ee 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
@@ -190,6 +190,12 @@ public abstract class AbstractPipelineSQLBuilder implements PipelineSQLBuilder {
         return String.format("SELECT COUNT(*) FROM %s", getQualifiedTableName(schemaName, tableName));
     }
     
+    @Override
+    public String buildUniqueKeyMinMaxValuesSQL(final String schemaName, final String tableName, final String uniqueKey) {
+        String quotedUniqueKey = quote(uniqueKey);
+        return String.format("SELECT MIN(%s), MAX(%s) FROM %s", quotedUniqueKey, quotedUniqueKey, getQualifiedTableName(schemaName, tableName));
+    }
+    
     @Override
     public String buildQueryAllOrderingSQL(final String schemaName, final String tableName, final List<String> columnNames, final String uniqueKey, final boolean firstQuery) {
         String qualifiedTableName = getQualifiedTableName(schemaName, tableName);
@@ -203,11 +209,4 @@ public abstract class AbstractPipelineSQLBuilder implements PipelineSQLBuilder {
     public String buildCheckEmptySQL(final String schemaName, final String tableName) {
         return String.format("SELECT * FROM %s LIMIT 1", getQualifiedTableName(schemaName, tableName));
     }
-    
-    @Override
-    public String buildSplitByPrimaryKeyRangeSQL(final String schemaName, final String tableName, final String uniqueKey) {
-        String quotedUniqueKey = quote(uniqueKey);
-        return String.format("SELECT MAX(%s),COUNT(1) FROM (SELECT %s FROM %s WHERE %s>=? ORDER BY %s LIMIT ?) t",
-                quotedUniqueKey, quotedUniqueKey, getQualifiedTableName(schemaName, tableName), quotedUniqueKey, quotedUniqueKey);
-    }
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/IntervalToRangeIterator.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/IntervalToRangeIterator.java
new file mode 100644
index 00000000000..81ce2ff6878
--- /dev/null
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/IntervalToRangeIterator.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.util;
+
+import org.apache.commons.lang3.Range;
+
+import java.util.Iterator;
+
+/**
+ * Interval to range iterator.
+ * <p>
+ * It's not thread-safe.
+ * </p>
+ */
+public final class IntervalToRangeIterator implements Iterator<Range<Long>> {
+    
+    private final long maximum;
+    
+    private final long interval;
+    
+    private long current;
+    
+    public IntervalToRangeIterator(final long minimum, final long maximum, final long interval) {
+        if (minimum > maximum) {
+            throw new IllegalArgumentException("minimum greater than maximum");
+        }
+        if (interval < 0) {
+            throw new IllegalArgumentException("interval is less than zero");
+        }
+        this.maximum = maximum;
+        this.interval = interval;
+        this.current = minimum;
+    }
+    
+    @Override
+    public boolean hasNext() {
+        return current <= maximum;
+    }
+    
+    @Override
+    public Range<Long> next() {
+        long upperLimit = Math.min(maximum, current + interval);
+        Range<Long> result = Range.between(current, upperLimit);
+        current = upperLimit + 1;
+        return result;
+    }
+}
diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/FixturePipelineSQLBuilder.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/FixturePipelineSQLBuilder.java
index 47eaa986d4d..1a67f178f1b 100644
--- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/FixturePipelineSQLBuilder.java
+++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/FixturePipelineSQLBuilder.java
@@ -79,18 +79,18 @@ public final class FixturePipelineSQLBuilder implements PipelineSQLBuilder {
     }
     
     @Override
-    public String buildQueryAllOrderingSQL(final String schemaName, final String tableName, final List<String> columnNames, final String uniqueKey, final boolean firstQuery) {
+    public String buildUniqueKeyMinMaxValuesSQL(final String schemaName, final String tableName, final String uniqueKey) {
         return "";
     }
     
     @Override
-    public String buildCheckEmptySQL(final String schemaName, final String tableName) {
-        return null;
+    public String buildQueryAllOrderingSQL(final String schemaName, final String tableName, final List<String> columnNames, final String uniqueKey, final boolean firstQuery) {
+        return "";
     }
     
     @Override
-    public String buildSplitByPrimaryKeyRangeSQL(final String schemaName, final String tableName, final String uniqueKey) {
-        return "";
+    public String buildCheckEmptySQL(final String schemaName, final String tableName) {
+        return null;
     }
     
     @Override
diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/IntervalToRangeIteratorTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/IntervalToRangeIteratorTest.java
new file mode 100644
index 00000000000..bfd036aaa8e
--- /dev/null
+++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/IntervalToRangeIteratorTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.util;
+
+import org.apache.commons.lang3.Range;
+import org.junit.jupiter.api.Test;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class IntervalToRangeIteratorTest {
+    
+    @Test
+    void assertMinimumGreaterThanMaximum() {
+        assertThrows(IllegalArgumentException.class, () -> new IntervalToRangeIterator(200L, 100L, 10L));
+    }
+    
+    @Test
+    void assertIntervalLessThanZero() {
+        assertThrows(IllegalArgumentException.class, () -> new IntervalToRangeIterator(100L, 200L, -10L));
+    }
+    
+    @Test
+    void assertSmallRangeCorrect() {
+        IntervalToRangeIterator iterator = new IntervalToRangeIterator(200L, 200L, 100L);
+        List<Range<Long>> actual = new LinkedList<>();
+        while (iterator.hasNext()) {
+            actual.add(iterator.next());
+        }
+        assertThat(actual.size(), is(1));
+        assertThat(actual.get(0).getMinimum(), is(200L));
+        assertThat(actual.get(0).getMaximum(), is(200L));
+    }
+    
+    @Test
+    void assertLargeRangeCorrect() {
+        IntervalToRangeIterator iterator = new IntervalToRangeIterator(200L, 400L, 100L);
+        List<Range<Long>> actual = new LinkedList<>();
+        while (iterator.hasNext()) {
+            actual.add(iterator.next());
+        }
+        assertThat(actual.size(), is(2));
+        assertThat(actual.get(0).getMinimum(), is(200L));
+        assertThat(actual.get(0).getMaximum(), is(300L));
+        assertThat(actual.get(1).getMinimum(), is(301L));
+        assertThat(actual.get(1).getMaximum(), is(400L));
+    }
+}
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
index a6e76f8d71e..42d3e694fc2 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
@@ -110,7 +110,7 @@ class InventoryTaskSplitterTest {
         assertThat(actual.size(), is(1));
         assertThat(actual.get(0).getTaskId(), is("ds_0.t_order#0"));
         IntegerPrimaryKeyPosition keyPosition = (IntegerPrimaryKeyPosition) actual.get(0).getTaskProgress().getPosition();
-        assertThat(keyPosition.getBeginValue(), is(0L));
+        assertThat(keyPosition.getBeginValue(), is(1L));
         assertThat(keyPosition.getEndValue(), is(999L));
     }