You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2023/06/07 07:28:15 UTC
[shardingsphere] branch master updated: Improve performance of integer unique key table inventory data splitting (#26089)
This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 2c45ed22b7d Improve performance of integer unique key table inventory data splitting (#26089)
2c45ed22b7d is described below
commit 2c45ed22b7d5337351b5507cfc2c02e4c4d6430f
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Wed Jun 7 15:27:57 2023 +0800
Improve performance of integer unique key table inventory data splitting (#26089)
* Improve performance of integer unique key table inventory data splitting
* Update unit test
* Rename method
---
.../spi/sqlbuilder/PipelineSQLBuilder.java | 21 ++++---
.../core/prepare/InventoryTaskSplitter.java | 62 ++++++++++----------
.../sqlbuilder/AbstractPipelineSQLBuilder.java | 13 ++---
.../core/util/IntervalToRangeIterator.java | 62 ++++++++++++++++++++
.../core/sqlbuilder/FixturePipelineSQLBuilder.java | 10 ++--
.../core/util/IntervalToRangeIteratorTest.java | 67 ++++++++++++++++++++++
.../core/prepare/InventoryTaskSplitterTest.java | 2 +-
7 files changed, 182 insertions(+), 55 deletions(-)
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
index d0610dcb3be..440ff4f8363 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
@@ -136,7 +136,6 @@ public interface PipelineSQLBuilder extends TypedSPI {
* @param tableName table name
* @return count SQL
*/
- // TODO keep it for now, it might be used later
String buildCountSQL(String schemaName, String tableName);
/**
@@ -148,6 +147,16 @@ public interface PipelineSQLBuilder extends TypedSPI {
*/
Optional<String> buildEstimatedCountSQL(String schemaName, String tableName);
+ /**
+ * Build unique key minimum maximum values SQL.
+ *
+ * @param schemaName schema name
+ * @param tableName table name
+ * @param uniqueKey unique key
+ * @return min max unique key SQL
+ */
+ String buildUniqueKeyMinMaxValuesSQL(String schemaName, String tableName, String uniqueKey);
+
/**
* Build query all ordering SQL.
*
@@ -169,16 +178,6 @@ public interface PipelineSQLBuilder extends TypedSPI {
*/
String buildCheckEmptySQL(String schemaName, String tableName);
- /**
- * Build split by primary key range SQL.
- *
- * @param schemaName schema name
- * @param tableName table name
- * @param uniqueKey unique key
- * @return split SQL
- */
- String buildSplitByPrimaryKeyRangeSQL(String schemaName, String tableName, String uniqueKey);
-
/**
* Build CRC32 SQL.
*
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index 86b781f41ab..c30d6f30e2b 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.core.prepare;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.Range;
import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineReadConfiguration;
@@ -43,15 +44,17 @@ import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumpe
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtils;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
+import org.apache.shardingsphere.data.pipeline.core.util.IntervalToRangeIterator;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
+import javax.sql.DataSource;
import java.sql.Connection;
-import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Statement;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
@@ -161,7 +164,8 @@ public final class InventoryTaskSplitter {
return result;
}
}
- jobItemContext.updateInventoryRecordsCount(InventoryRecordsCountCalculator.getTableRecordsCount(dumperConfig, dataSource));
+ long tableRecordsCount = InventoryRecordsCountCalculator.getTableRecordsCount(dumperConfig, dataSource);
+ jobItemContext.updateInventoryRecordsCount(tableRecordsCount);
if (!dumperConfig.hasUniqueKey()) {
return Collections.singletonList(new NoUniqueKeyPosition());
}
@@ -169,7 +173,7 @@ public final class InventoryTaskSplitter {
if (1 == uniqueKeyColumns.size()) {
int firstColumnDataType = uniqueKeyColumns.get(0).getDataType();
if (PipelineJdbcUtils.isIntegerColumn(firstColumnDataType)) {
- return getPositionByIntegerUniqueKeyRange(dumperConfig, jobItemContext, dataSource);
+ return getPositionByIntegerUniqueKeyRange(dumperConfig, tableRecordsCount, jobItemContext, dataSource);
}
if (PipelineJdbcUtils.isStringColumn(firstColumnDataType)) {
// TODO Support string unique key table splitting. Ascii characters ordering are different in different versions of databases.
@@ -179,40 +183,36 @@ public final class InventoryTaskSplitter {
return Collections.singletonList(new UnsupportedKeyPosition());
}
- private Collection<IngestPosition> getPositionByIntegerUniqueKeyRange(final InventoryDumperConfiguration dumperConfig, final InventoryIncrementalJobItemContext jobItemContext,
- final PipelineDataSourceWrapper dataSource) {
+ private Collection<IngestPosition> getPositionByIntegerUniqueKeyRange(final InventoryDumperConfiguration dumperConfig, final long tableRecordsCount,
+ final InventoryIncrementalJobItemContext jobItemContext, final PipelineDataSourceWrapper dataSource) {
+ if (0 == tableRecordsCount) {
+ return Collections.singletonList(new IntegerPrimaryKeyPosition(0, 0));
+ }
Collection<IngestPosition> result = new LinkedList<>();
- String uniqueKey = dumperConfig.getUniqueKeyColumns().get(0).getName();
- String sql = PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, dataSource.getDatabaseType().getType())
- .buildSplitByPrimaryKeyRangeSQL(dumperConfig.getSchemaName(new LogicTableName(dumperConfig.getLogicTableName())), dumperConfig.getActualTableName(), uniqueKey);
+ Range<Long> uniqueKeyValuesRange = getUniqueKeyValuesRange(jobItemContext, dataSource, dumperConfig);
int shardingSize = jobItemContext.getJobProcessContext().getPipelineProcessConfig().getRead().getShardingSize();
+ long splitCount = tableRecordsCount / shardingSize + (tableRecordsCount % shardingSize > 0 ? 1 : 0);
+ long interval = (uniqueKeyValuesRange.getMaximum() - uniqueKeyValuesRange.getMinimum()) / splitCount;
+ IntervalToRangeIterator rangeIterator = new IntervalToRangeIterator(uniqueKeyValuesRange.getMinimum(), uniqueKeyValuesRange.getMaximum(), interval);
+ while (rangeIterator.hasNext()) {
+ Range<Long> range = rangeIterator.next();
+ result.add(new IntegerPrimaryKeyPosition(range.getMinimum(), range.getMaximum()));
+ }
+ return result;
+ }
+
+ private Range<Long> getUniqueKeyValuesRange(final InventoryIncrementalJobItemContext jobItemContext, final DataSource dataSource, final InventoryDumperConfiguration dumperConfig) {
+ String uniqueKey = dumperConfig.getUniqueKeyColumns().get(0).getName();
+ String sql = PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, jobItemContext.getJobConfig().getSourceDatabaseType())
+ .buildUniqueKeyMinMaxValuesSQL(dumperConfig.getSchemaName(new LogicTableName(dumperConfig.getLogicTableName())), dumperConfig.getActualTableName(), uniqueKey);
try (
Connection connection = dataSource.getConnection();
- PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
- // TODO query minimum value less than 0
- long beginId = 0;
- for (int i = 0; i < Integer.MAX_VALUE; i++) {
- preparedStatement.setLong(1, beginId);
- preparedStatement.setLong(2, shardingSize);
- try (ResultSet resultSet = preparedStatement.executeQuery()) {
- if (!resultSet.next()) {
- break;
- }
- long endId = resultSet.getLong(1);
- if (0 == endId) {
- break;
- }
- result.add(new IntegerPrimaryKeyPosition(beginId, endId));
- beginId = endId + 1;
- }
- }
- // fix empty table missing inventory task
- if (result.isEmpty()) {
- result.add(new IntegerPrimaryKeyPosition(0, 0));
- }
+ Statement statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery(sql)) {
+ resultSet.next();
+ return Range.between(resultSet.getLong(1), resultSet.getLong(2));
} catch (final SQLException ex) {
throw new SplitPipelineJobByUniqueKeyException(dumperConfig.getActualTableName(), uniqueKey, ex);
}
- return result;
}
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
index 6494932f763..443655123ee 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
@@ -190,6 +190,12 @@ public abstract class AbstractPipelineSQLBuilder implements PipelineSQLBuilder {
return String.format("SELECT COUNT(*) FROM %s", getQualifiedTableName(schemaName, tableName));
}
+ @Override
+ public String buildUniqueKeyMinMaxValuesSQL(final String schemaName, final String tableName, final String uniqueKey) {
+ String quotedUniqueKey = quote(uniqueKey);
+ return String.format("SELECT MIN(%s), MAX(%s) FROM %s", quotedUniqueKey, quotedUniqueKey, getQualifiedTableName(schemaName, tableName));
+ }
+
@Override
public String buildQueryAllOrderingSQL(final String schemaName, final String tableName, final List<String> columnNames, final String uniqueKey, final boolean firstQuery) {
String qualifiedTableName = getQualifiedTableName(schemaName, tableName);
@@ -203,11 +209,4 @@ public abstract class AbstractPipelineSQLBuilder implements PipelineSQLBuilder {
public String buildCheckEmptySQL(final String schemaName, final String tableName) {
return String.format("SELECT * FROM %s LIMIT 1", getQualifiedTableName(schemaName, tableName));
}
-
- @Override
- public String buildSplitByPrimaryKeyRangeSQL(final String schemaName, final String tableName, final String uniqueKey) {
- String quotedUniqueKey = quote(uniqueKey);
- return String.format("SELECT MAX(%s),COUNT(1) FROM (SELECT %s FROM %s WHERE %s>=? ORDER BY %s LIMIT ?) t",
- quotedUniqueKey, quotedUniqueKey, getQualifiedTableName(schemaName, tableName), quotedUniqueKey, quotedUniqueKey);
- }
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/IntervalToRangeIterator.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/IntervalToRangeIterator.java
new file mode 100644
index 00000000000..81ce2ff6878
--- /dev/null
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/IntervalToRangeIterator.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.util;
+
+import org.apache.commons.lang3.Range;
+
+import java.util.Iterator;
+
+/**
+ * Interval to range iterator.
+ * <p>
+ * It's not thread-safe.
+ * </p>
+ */
+public final class IntervalToRangeIterator implements Iterator<Range<Long>> {
+
+ private final long maximum;
+
+ private final long interval;
+
+ private long current;
+
+ public IntervalToRangeIterator(final long minimum, final long maximum, final long interval) {
+ if (minimum > maximum) {
+ throw new IllegalArgumentException("minimum greater than maximum");
+ }
+ if (interval < 0) {
+ throw new IllegalArgumentException("interval is less than zero");
+ }
+ this.maximum = maximum;
+ this.interval = interval;
+ this.current = minimum;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return current <= maximum;
+ }
+
+ @Override
+ public Range<Long> next() {
+ long upperLimit = Math.min(maximum, current + interval);
+ Range<Long> result = Range.between(current, upperLimit);
+ current = upperLimit + 1;
+ return result;
+ }
+}
diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/FixturePipelineSQLBuilder.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/FixturePipelineSQLBuilder.java
index 47eaa986d4d..1a67f178f1b 100644
--- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/FixturePipelineSQLBuilder.java
+++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/FixturePipelineSQLBuilder.java
@@ -79,18 +79,18 @@ public final class FixturePipelineSQLBuilder implements PipelineSQLBuilder {
}
@Override
- public String buildQueryAllOrderingSQL(final String schemaName, final String tableName, final List<String> columnNames, final String uniqueKey, final boolean firstQuery) {
+ public String buildUniqueKeyMinMaxValuesSQL(final String schemaName, final String tableName, final String uniqueKey) {
return "";
}
@Override
- public String buildCheckEmptySQL(final String schemaName, final String tableName) {
- return null;
+ public String buildQueryAllOrderingSQL(final String schemaName, final String tableName, final List<String> columnNames, final String uniqueKey, final boolean firstQuery) {
+ return "";
}
@Override
- public String buildSplitByPrimaryKeyRangeSQL(final String schemaName, final String tableName, final String uniqueKey) {
- return "";
+ public String buildCheckEmptySQL(final String schemaName, final String tableName) {
+ return null;
}
@Override
diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/IntervalToRangeIteratorTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/IntervalToRangeIteratorTest.java
new file mode 100644
index 00000000000..bfd036aaa8e
--- /dev/null
+++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/IntervalToRangeIteratorTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.util;
+
+import org.apache.commons.lang3.Range;
+import org.junit.jupiter.api.Test;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class IntervalToRangeIteratorTest {
+
+ @Test
+ void assertMinimumGreaterThanMaximum() {
+ assertThrows(IllegalArgumentException.class, () -> new IntervalToRangeIterator(200L, 100L, 10L));
+ }
+
+ @Test
+ void assertIntervalLessThanZero() {
+ assertThrows(IllegalArgumentException.class, () -> new IntervalToRangeIterator(100L, 200L, -10L));
+ }
+
+ @Test
+ void assertSmallRangeCorrect() {
+ IntervalToRangeIterator iterator = new IntervalToRangeIterator(200L, 200L, 100L);
+ List<Range<Long>> actual = new LinkedList<>();
+ while (iterator.hasNext()) {
+ actual.add(iterator.next());
+ }
+ assertThat(actual.size(), is(1));
+ assertThat(actual.get(0).getMinimum(), is(200L));
+ assertThat(actual.get(0).getMaximum(), is(200L));
+ }
+
+ @Test
+ void assertLargeRangeCorrect() {
+ IntervalToRangeIterator iterator = new IntervalToRangeIterator(200L, 400L, 100L);
+ List<Range<Long>> actual = new LinkedList<>();
+ while (iterator.hasNext()) {
+ actual.add(iterator.next());
+ }
+ assertThat(actual.size(), is(2));
+ assertThat(actual.get(0).getMinimum(), is(200L));
+ assertThat(actual.get(0).getMaximum(), is(300L));
+ assertThat(actual.get(1).getMinimum(), is(301L));
+ assertThat(actual.get(1).getMaximum(), is(400L));
+ }
+}
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
index a6e76f8d71e..42d3e694fc2 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
@@ -110,7 +110,7 @@ class InventoryTaskSplitterTest {
assertThat(actual.size(), is(1));
assertThat(actual.get(0).getTaskId(), is("ds_0.t_order#0"));
IntegerPrimaryKeyPosition keyPosition = (IntegerPrimaryKeyPosition) actual.get(0).getTaskProgress().getPosition();
- assertThat(keyPosition.getBeginValue(), is(0L));
+ assertThat(keyPosition.getBeginValue(), is(1L));
assertThat(keyPosition.getEndValue(), is(999L));
}