You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/04/12 02:50:29 UTC
[shardingsphere] branch master updated: Refactor replication slot cleanup for openGauss scaling job (#16746)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 2b53353ef41 Refactor replication slot cleanup for openGauss scaling job (#16746)
2b53353ef41 is described below
commit 2b53353ef4179ffeb5e6a36ca8b44858f7773a97
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Tue Apr 12 10:50:21 2022 +0800
Refactor replication slot cleanup for openGauss scaling job (#16746)
* Refactor replication slot cleanup
* Add TODO
* Refactor cleanup, clean all source data sources
* Fix code concurrent modification conflict
---
.../pool/creator/DataSourcePoolCreator.java | 4 ++++
.../core/api/impl/AbstractPipelineJobAPIImpl.java | 4 +---
.../core/api/impl/RuleAlteredJobAPIImpl.java | 15 ++----------
.../pipeline/core/execute/PipelineJobExecutor.java | 16 ++++++++++++-
.../rulealtered/RuleAlteredJobPreparer.java | 28 +++++++++++++++++-----
.../RuleAlteredJobProgressDetector.java | 13 ++++++++++
.../rulealtered/RuleAlteredJobSchedulerCenter.java | 4 +++-
.../scenario/rulealtered/RuleAlteredJobWorker.java | 1 +
.../ingest/OpenGaussPositionInitializer.java | 1 +
.../ingest/wal/OpenGaussLogicalReplication.java | 17 +++++++++----
.../ingest/PostgreSQLPositionInitializer.java | 1 +
.../subscriber/ScalingRegistrySubscriber.java | 3 ---
.../handler/CuratorZookeeperExceptionHandler.java | 2 ++
13 files changed, 78 insertions(+), 31 deletions(-)
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/datasource/pool/creator/DataSourcePoolCreator.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/datasource/pool/creator/DataSourcePoolCreator.java
index 594bea819b1..3fae4695b34 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/datasource/pool/creator/DataSourcePoolCreator.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/datasource/pool/creator/DataSourcePoolCreator.java
@@ -40,6 +40,9 @@ import java.util.stream.Collectors;
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class DataSourcePoolCreator {
+ // TODO pipeline doesn't need cache even if cache is enabled, since there might be some temp data sources
+ // TODO when all data source configurations of instance are dropped by DistSQL, cached data source should be closed
+
/**
* Create data sources.
*
@@ -62,6 +65,7 @@ public final class DataSourcePoolCreator {
return GlobalDataSourceRegistry.getInstance().getCachedDataSources().get(dataSourceProps.getInstance());
}
}
+ // TODO when aggregation is enabled, some data source properties should be changed, e.g. maxPoolSize
DataSource result = createDataSource(dataSourceProps.getDataSourceClassName());
Optional<DataSourcePoolMetaData> poolMetaData = DataSourcePoolMetaDataFactory.newInstance(dataSourceProps.getDataSourceClassName());
DataSourceReflection dataSourceReflection = new DataSourceReflection(result);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index 1867c5bcb98..549c3b7d259 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -51,9 +51,6 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
public void stop(final String jobId) {
log.info("Stop pipeline job {}", jobId);
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
- if (jobConfigPOJO.isDisabled()) {
- return;
- }
jobConfigPOJO.setDisabled(true);
jobConfigPOJO.getProps().setProperty("stop_time", LocalDateTime.now().format(DATE_TIME_FORMATTER));
PipelineAPIFactory.getJobConfigurationAPI().updateJobConfiguration(jobConfigPOJO);
@@ -64,6 +61,7 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
log.info("Remove pipeline job {}", jobId);
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
verifyJobStopped(jobConfigPOJO);
+ // TODO release lock
PipelineAPIFactory.getJobOperateAPI().remove(String.valueOf(jobId), null);
PipelineAPIFactory.getGovernanceRepositoryAPI().deleteJob(jobId);
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
index c65784f106e..370265c5dea 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
@@ -37,9 +37,8 @@ import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecuti
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFailedException;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJob;
-import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
-import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobPreparer;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobProgressDetector;
+import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobSchedulerCenter;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCheckAlgorithm;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -67,7 +66,6 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Properties;
-import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
@@ -361,17 +359,8 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
for (int each : repositoryAPI.getShardingItems(jobId)) {
repositoryAPI.updateShardingJobStatus(jobId, each, JobStatus.FINISHED);
}
+ RuleAlteredJobSchedulerCenter.stop(jobId);
stop(jobId);
- // TODO clean up should be done after the task is complete.
- try {
- TimeUnit.SECONDS.sleep(1L);
- } catch (final InterruptedException ex) {
- log.error(ex.getMessage());
- }
- RuleAlteredJobContext jobContext = new RuleAlteredJobContext(jobConfig);
- RuleAlteredJobPreparer jobPreparer = new RuleAlteredJobPreparer();
- jobPreparer.cleanup(jobContext);
- jobContext.close();
}
@Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
index 898528c1ee6..c1acd9ca7fa 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
@@ -18,12 +18,16 @@
package org.apache.shardingsphere.data.pipeline.core.execute;
import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.PipelineJobAPIFactory;
+import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
import org.apache.shardingsphere.data.pipeline.core.lock.PipelineSimpleLock;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJob;
+import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobPreparer;
+import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobProgressDetector;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobSchedulerCenter;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
@@ -49,15 +53,25 @@ public final class PipelineJobExecutor extends AbstractLifecycleExecutor {
}
private void watchGovernanceRepositoryConfiguration() {
+ RuleAlteredJobAPI ruleAlteredJobAPI = PipelineJobAPIFactory.newInstance();
PipelineAPIFactory.getGovernanceRepositoryAPI().watch(DataPipelineConstants.DATA_PIPELINE_ROOT, event -> {
Optional<JobConfigurationPOJO> jobConfigPOJOOptional = getJobConfigPOJO(event);
if (!jobConfigPOJOOptional.isPresent()) {
return;
}
JobConfigurationPOJO jobConfigPOJO = jobConfigPOJOOptional.get();
- if (DataChangedEvent.Type.DELETED == event.getType() || jobConfigPOJO.isDisabled()) {
+ boolean deleted = DataChangedEvent.Type.DELETED == event.getType();
+ boolean disabled = jobConfigPOJO.isDisabled();
+ if (deleted || disabled) {
+ log.info("jobId={}, deleted={}, disabled={}", jobConfigPOJO.getJobName(), deleted, disabled);
RuleAlteredJobSchedulerCenter.stop(jobConfigPOJO.getJobName());
JobConfiguration jobConfig = YamlEngine.unmarshal(jobConfigPOJO.getJobParameter(), JobConfiguration.class, true);
+ if (deleted) {
+ new RuleAlteredJobPreparer().cleanup(jobConfig);
+ } else if (RuleAlteredJobProgressDetector.isJobSuccessful(jobConfig.getHandleConfig().getJobShardingCount(), ruleAlteredJobAPI.getProgress(jobConfig).values())) {
+ log.info("isJobSuccessful=true");
+ new RuleAlteredJobPreparer().cleanup(jobConfig);
+ }
ScalingReleaseSchemaNameLockEvent releaseLockEvent = new ScalingReleaseSchemaNameLockEvent(jobConfig.getWorkflowConfig().getSchemaName());
ShardingSphereEventBus.getInstance().post(releaseLockEvent);
return;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
index a753250a628..f66d063e163 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
@@ -22,6 +22,8 @@ import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfigu
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
@@ -39,6 +41,11 @@ import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.prepare.Inve
import org.apache.shardingsphere.data.pipeline.spi.check.datasource.DataSourceChecker;
import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory;
import org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
+import org.apache.shardingsphere.infra.datasource.pool.creator.DataSourcePoolCreator;
+import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
+import org.apache.shardingsphere.infra.yaml.config.swapper.YamlDataSourceConfigurationSwapper;
import org.apache.shardingsphere.scaling.core.job.check.EnvironmentCheckerFactory;
import org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.spi.type.typed.TypedSPIRegistry;
@@ -183,16 +190,25 @@ public final class RuleAlteredJobPreparer {
/**
* Do cleanup work for scaling job.
*
- * @param jobContext job context
+ * @param jobConfig job configuration
*/
- public void cleanup(final RuleAlteredJobContext jobContext) {
- PipelineDataSourceManager dataSourceManager = jobContext.getDataSourceManager();
+ public void cleanup(final JobConfiguration jobConfig) {
try {
- TaskConfiguration taskConfig = jobContext.getTaskConfig();
- PositionInitializer positionInitializer = PositionInitializerFactory.getPositionInitializer(taskConfig.getHandleConfig().getSourceDatabaseType());
- positionInitializer.destroy(dataSourceManager.getDataSource(taskConfig.getDumperConfig().getDataSourceConfig()));
+ cleanup0(jobConfig);
} catch (final SQLException ex) {
log.warn("Scaling job destroying failed", ex);
}
}
+
+ private void cleanup0(final JobConfiguration jobConfig) throws SQLException {
+ DatabaseType databaseType = DatabaseTypeRegistry.getActualDatabaseType(jobConfig.getHandleConfig().getSourceDatabaseType());
+ PositionInitializer positionInitializer = PositionInitializerFactory.getPositionInitializer(databaseType.getName());
+ ShardingSpherePipelineDataSourceConfiguration sourceDataSourceConfig = (ShardingSpherePipelineDataSourceConfiguration)
+ PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getPipelineConfig().getSource().getType(), jobConfig.getPipelineConfig().getSource().getParameter());
+ for (DataSourceProperties each : new YamlDataSourceConfigurationSwapper().getDataSourcePropertiesMap(sourceDataSourceConfig.getRootConfig()).values()) {
+ try (PipelineDataSourceWrapper dataSource = new PipelineDataSourceWrapper(DataSourcePoolCreator.create(each), databaseType)) {
+ positionInitializer.destroy(dataSource);
+ }
+ }
+ }
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobProgressDetector.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobProgressDetector.java
index 4048df2e970..15deded91ed 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobProgressDetector.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobProgressDetector.java
@@ -21,6 +21,7 @@ import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
@@ -57,4 +58,16 @@ public final class RuleAlteredJobProgressDetector {
return jobShardingCount == jobProgresses.size()
&& jobProgresses.stream().allMatch(each -> null != each && !each.getStatus().isRunning());
}
+
+ /**
+ * Whether job is successful.
+ *
+ * @param jobShardingCount job sharding count
+ * @param jobProgresses job progresses
+ * @return completed or not
+ */
+ public static boolean isJobSuccessful(final int jobShardingCount, final Collection<JobProgress> jobProgresses) {
+ return jobShardingCount == jobProgresses.size()
+ && jobProgresses.stream().allMatch(each -> null != each && JobStatus.FINISHED == each.getStatus());
+ }
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java
index 4f1db0bcea3..aae0bbfff89 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java
@@ -73,13 +73,15 @@ public final class RuleAlteredJobSchedulerCenter {
*/
public static void stop(final String jobId) {
log.info("remove and stop {}", jobId);
- Map<Integer, RuleAlteredJobScheduler> schedulerMap = JOB_SCHEDULER_MAP.remove(jobId);
+ Map<Integer, RuleAlteredJobScheduler> schedulerMap = JOB_SCHEDULER_MAP.get(jobId);
if (null == schedulerMap) {
+ log.info("schedulerMap is null, ignore");
return;
}
for (Entry<Integer, RuleAlteredJobScheduler> entry : schedulerMap.entrySet()) {
entry.getValue().stop();
}
+ JOB_SCHEDULER_MAP.remove(jobId);
}
/**
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
index 95f9c300439..43e777ca587 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
@@ -181,6 +181,7 @@ public final class RuleAlteredJobWorker {
log.info("Switch rule configuration immediately.");
ScalingTaskFinishedEvent taskFinishedEvent = new ScalingTaskFinishedEvent(event.getSchemaName(), event.getActiveVersion(), event.getNewVersion());
ShardingSphereEventBus.getInstance().post(taskFinishedEvent);
+ ShardingSphereEventBus.getInstance().post(new ScalingReleaseSchemaNameLockEvent(event.getSchemaName()));
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java
index 8b9ead47f6d..12d2aa93e76 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java
@@ -37,6 +37,7 @@ public final class OpenGaussPositionInitializer implements PositionInitializer {
@Override
public WalPosition init(final DataSource dataSource) throws SQLException {
try (Connection connection = dataSource.getConnection()) {
+ // TODO createSlotIfNotExists
return getWalPosition(connection);
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussLogicalReplication.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussLogicalReplication.java
index 12740630642..9f20968ad19 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussLogicalReplication.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussLogicalReplication.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal;
+import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlJdbcConfiguration;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.BaseLogSequenceNumber;
@@ -37,8 +38,10 @@ import java.util.Properties;
/**
* Logical replication for openGauss.
*/
+@Slf4j
public final class OpenGaussLogicalReplication {
+ // TODO it should be private
public static final String SLOT_NAME_PREFIX = "sharding_scaling";
public static final String DECODE_PLUGIN = "mppdb_decoding";
@@ -90,7 +93,8 @@ public final class OpenGaussLogicalReplication {
* @throws SQLException SQL exception
*/
public static void createIfNotExists(final Connection connection) throws SQLException {
- if (!isSlotNameExist(connection)) {
+ String slotName = getUniqueSlotName(connection);
+ if (!isSlotNameExist(connection, slotName)) {
createSlotBySQL(connection);
}
}
@@ -102,16 +106,21 @@ public final class OpenGaussLogicalReplication {
* @throws SQLException drop SQL with error
*/
public static void dropSlot(final Connection connection) throws SQLException {
- String sql = String.format("select * from pg_drop_replication_slot('%s')", getUniqueSlotName(connection));
+ String slotName = getUniqueSlotName(connection);
+ if (!isSlotNameExist(connection, slotName)) {
+ log.info("dropSlot, slot not exist, ignore, slotName={}", slotName);
+ return;
+ }
+ String sql = String.format("select * from pg_drop_replication_slot('%s')", slotName);
try (CallableStatement callableStatement = connection.prepareCall(sql)) {
callableStatement.execute();
}
}
- private static boolean isSlotNameExist(final Connection connection) throws SQLException {
+ private static boolean isSlotNameExist(final Connection connection, final String slotName) throws SQLException {
String sql = "select * from pg_replication_slots where slot_name=?";
try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
- preparedStatement.setString(1, getUniqueSlotName(connection));
+ preparedStatement.setString(1, slotName);
try (ResultSet resultSet = preparedStatement.executeQuery()) {
return resultSet.next();
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java
index e2de501621b..14b2bfa838c 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java
@@ -36,6 +36,7 @@ import java.sql.SQLException;
@Slf4j
public final class PostgreSQLPositionInitializer implements PositionInitializer {
+ // TODO it should be private; _PREFIX;
public static final String SLOT_NAME = "sharding_scaling";
public static final String DECODE_PLUGIN = "test_decoding";
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/subscriber/ScalingRegistrySubscriber.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/subscriber/ScalingRegistrySubscriber.java
index ef496dfd507..46d7681ed1e 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/subscriber/ScalingRegistrySubscriber.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/subscriber/ScalingRegistrySubscriber.java
@@ -21,7 +21,6 @@ import com.google.common.eventbus.Subscribe;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.cache.event.StartScalingEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ScalingReleaseSchemaNameLockEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ScalingTaskFinishedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.version.SchemaVersionPreparedEvent;
import org.apache.shardingsphere.mode.metadata.persist.node.SchemaMetaDataNode;
@@ -81,7 +80,5 @@ public final class ScalingRegistrySubscriber {
} else {
log.error("targetActiveVersion does not match current activeVersion, targetActiveVersion={}, activeVersion={}", targetActiveVersion, activeVersion.orElse(null));
}
- ScalingReleaseSchemaNameLockEvent releaseLockEvent = new ScalingReleaseSchemaNameLockEvent(event.getTargetSchemaName());
- ShardingSphereEventBus.getInstance().post(releaseLockEvent);
}
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/handler/CuratorZookeeperExceptionHandler.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-r [...]
index 4627866fb06..519f1f98341 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/handler/CuratorZookeeperExceptionHandler.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/handler/CuratorZookeeperExceptionHandler.java
@@ -41,11 +41,13 @@ public final class CuratorZookeeperExceptionHandler {
*/
public static void handleException(final Exception cause) {
if (null == cause) {
+ log.info("cause is null");
return;
}
if (isIgnoredException(cause) || null != cause.getCause() && isIgnoredException(cause.getCause())) {
log.debug("Ignored exception for: {}", cause.getMessage());
} else if (cause instanceof InterruptedException) {
+ log.info("InterruptedException caught");
Thread.currentThread().interrupt();
} else {
throw new ClusterPersistRepositoryException(cause);