You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by GitBox <gi...@apache.org> on 2022/04/11 07:50:59 UTC

[GitHub] [shardingsphere] azexcy opened a new pull request, #16727: - PostgreSQL scaling auto create table optimization

azexcy opened a new pull request, #16727:
URL: https://github.com/apache/shardingsphere/pull/16727

   Fixes 16657
   
   Changes proposed in this pull request:
   -  Use lock, make sure only one thread do prepare operation(eg. create table, create index).
   -  Support PostgreSQL `comment on` grammar at scaling.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] azexcy commented on a diff in pull request #16727: - PostgreSQL scaling auto create table optimization

Posted by GitBox <gi...@apache.org>.
azexcy commented on code in PR #16727:
URL: https://github.com/apache/shardingsphere/pull/16727#discussion_r847875789


##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java:
##########
@@ -91,29 +91,30 @@ private void prepareTarget(final JobConfiguration jobConfig, final PipelineDataS
             log.info("dataSourcePreparer null, ignore prepare target");
             return;
         }
-        LockContext lockContext = PipelineContext.getContextManager().getInstanceContext().getLockContext();
-        ShardingSphereLock lock = lockContext.getOrCreateSchemaLock(jobConfig.getWorkflowConfig().getSchemaName());
+        final PipelineSimpleLock lock = PipelineSimpleLock.getInstance();
         // TODO make sure only the first thread execute prepare
-        boolean skipPrepare = !lock.tryLock("prepareTargetTablesLock", 100);
+        boolean skipPrepare = !lock.tryLock(PREPARE_LOCK_KEY + jobConfig.getHandleConfig().getJobId(), 100);
         if (skipPrepare) {
-            // TODO polling until first thread finish prepare, then just return
-            while (lock.isLocked("prepareTargetTablesLock")) {
+            while (true) {
                 try {
                     TimeUnit.SECONDS.sleep(1);
                 } catch (InterruptedException e) {
-                    lock.releaseLock("prepareTargetTablesLock");
+                    lock.releaseLock(PREPARE_LOCK_KEY + jobConfig.getHandleConfig().getJobId());
+                }
+                // TODO just return ,because the first thread finish prepared
+                if (lock.tryLock(PREPARE_LOCK_KEY + jobConfig.getHandleConfig().getJobId(), 100)) {
+                    lock.releaseLock(PREPARE_LOCK_KEY + jobConfig.getHandleConfig().getJobId());
+                    return;
                 }
             }
-            lock.releaseLock("prepareTargetTablesLock");
-            return;
         }
         try {
             JobDataNodeLine tablesFirstDataNodes = JobDataNodeLine.unmarshal(jobConfig.getHandleConfig().getTablesFirstDataNodes());
             PrepareTargetTablesParameter prepareTargetTablesParameter = new PrepareTargetTablesParameter(tablesFirstDataNodes,
                     jobConfig.getPipelineConfig(), dataSourceManager);
             dataSourcePreparer.prepareTargetTables(prepareTargetTablesParameter);
         } finally {
-            lock.releaseLock("prepareTargetTablesLock");
+            lock.releaseLock(PREPARE_LOCK_KEY + jobConfig.getHandleConfig().getJobId());
         }

Review Comment:
   done



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java:
##########
@@ -91,29 +91,30 @@ private void prepareTarget(final JobConfiguration jobConfig, final PipelineDataS
             log.info("dataSourcePreparer null, ignore prepare target");
             return;
         }
-        LockContext lockContext = PipelineContext.getContextManager().getInstanceContext().getLockContext();
-        ShardingSphereLock lock = lockContext.getOrCreateSchemaLock(jobConfig.getWorkflowConfig().getSchemaName());
+        final PipelineSimpleLock lock = PipelineSimpleLock.getInstance();
         // TODO make sure only the first thread execute prepare
-        boolean skipPrepare = !lock.tryLock("prepareTargetTablesLock", 100);
+        boolean skipPrepare = !lock.tryLock(PREPARE_LOCK_KEY + jobConfig.getHandleConfig().getJobId(), 100);
         if (skipPrepare) {
-            // TODO polling until first thread finish prepare, then just return
-            while (lock.isLocked("prepareTargetTablesLock")) {
+            while (true) {
                 try {
                     TimeUnit.SECONDS.sleep(1);
                 } catch (InterruptedException e) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] sandynz commented on a diff in pull request #16727: - PostgreSQL scaling auto create table optimization

Posted by GitBox <gi...@apache.org>.
sandynz commented on code in PR #16727:
URL: https://github.com/apache/shardingsphere/pull/16727#discussion_r847234702


##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java:
##########
@@ -91,29 +91,30 @@ private void prepareTarget(final JobConfiguration jobConfig, final PipelineDataS
             log.info("dataSourcePreparer null, ignore prepare target");
             return;
         }
-        LockContext lockContext = PipelineContext.getContextManager().getInstanceContext().getLockContext();
-        ShardingSphereLock lock = lockContext.getOrCreateSchemaLock(jobConfig.getWorkflowConfig().getSchemaName());
+        final PipelineSimpleLock lock = PipelineSimpleLock.getInstance();
         // TODO make sure only the first thread execute prepare
-        boolean skipPrepare = !lock.tryLock("prepareTargetTablesLock", 100);
+        boolean skipPrepare = !lock.tryLock(PREPARE_LOCK_KEY + jobConfig.getHandleConfig().getJobId(), 100);
         if (skipPrepare) {
-            // TODO polling until first thread finish prepare, then just return
-            while (lock.isLocked("prepareTargetTablesLock")) {
+            while (true) {
                 try {
                     TimeUnit.SECONDS.sleep(1);
                 } catch (InterruptedException e) {
-                    lock.releaseLock("prepareTargetTablesLock");
+                    lock.releaseLock(PREPARE_LOCK_KEY + jobConfig.getHandleConfig().getJobId());
+                }
+                // TODO just return ,because the first thread finish prepared
+                if (lock.tryLock(PREPARE_LOCK_KEY + jobConfig.getHandleConfig().getJobId(), 100)) {
+                    lock.releaseLock(PREPARE_LOCK_KEY + jobConfig.getHandleConfig().getJobId());
+                    return;
                 }
             }
-            lock.releaseLock("prepareTargetTablesLock");
-            return;
         }
         try {
             JobDataNodeLine tablesFirstDataNodes = JobDataNodeLine.unmarshal(jobConfig.getHandleConfig().getTablesFirstDataNodes());
             PrepareTargetTablesParameter prepareTargetTablesParameter = new PrepareTargetTablesParameter(tablesFirstDataNodes,
                     jobConfig.getPipelineConfig(), dataSourceManager);
             dataSourcePreparer.prepareTargetTables(prepareTargetTablesParameter);
         } finally {
-            lock.releaseLock("prepareTargetTablesLock");
+            lock.releaseLock(PREPARE_LOCK_KEY + jobConfig.getHandleConfig().getJobId());
         }

Review Comment:
   It's better to extract a new method to get lock name, but not hard-coded several times.



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java:
##########
@@ -91,29 +91,30 @@ private void prepareTarget(final JobConfiguration jobConfig, final PipelineDataS
             log.info("dataSourcePreparer null, ignore prepare target");
             return;
         }
-        LockContext lockContext = PipelineContext.getContextManager().getInstanceContext().getLockContext();
-        ShardingSphereLock lock = lockContext.getOrCreateSchemaLock(jobConfig.getWorkflowConfig().getSchemaName());
+        final PipelineSimpleLock lock = PipelineSimpleLock.getInstance();
         // TODO make sure only the first thread execute prepare
-        boolean skipPrepare = !lock.tryLock("prepareTargetTablesLock", 100);
+        boolean skipPrepare = !lock.tryLock(PREPARE_LOCK_KEY + jobConfig.getHandleConfig().getJobId(), 100);
         if (skipPrepare) {
-            // TODO polling until first thread finish prepare, then just return
-            while (lock.isLocked("prepareTargetTablesLock")) {
+            while (true) {
                 try {
                     TimeUnit.SECONDS.sleep(1);
                 } catch (InterruptedException e) {

Review Comment:
   It's better to limit while loop count, but not `while (true)`, it might cause issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] azexcy commented on a diff in pull request #16727: - PostgreSQL scaling auto create table optimization

Posted by GitBox <gi...@apache.org>.
azexcy commented on code in PR #16727:
URL: https://github.com/apache/shardingsphere/pull/16727#discussion_r847129504


##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java:
##########
@@ -87,9 +91,30 @@ private void prepareTarget(final JobConfiguration jobConfig, final PipelineDataS
             log.info("dataSourcePreparer null, ignore prepare target");
             return;
         }
-        JobDataNodeLine tablesFirstDataNodes = JobDataNodeLine.unmarshal(jobConfig.getHandleConfig().getTablesFirstDataNodes());
-        PrepareTargetTablesParameter prepareTargetTablesParameter = new PrepareTargetTablesParameter(tablesFirstDataNodes, jobConfig.getPipelineConfig(), dataSourceManager);
-        dataSourcePreparer.prepareTargetTables(prepareTargetTablesParameter);
+        LockContext lockContext = PipelineContext.getContextManager().getInstanceContext().getLockContext();
+        ShardingSphereLock lock = lockContext.getOrCreateSchemaLock(jobConfig.getWorkflowConfig().getSchemaName());
+        // TODO make sure only the first thread execute prepare
+        boolean skipPrepare = !lock.tryLock("prepareTargetTablesLock", 100);

Review Comment:
   changed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] sandynz merged pull request #16727: - PostgreSQL scaling auto create table optimization

Posted by GitBox <gi...@apache.org>.
sandynz merged PR #16727:
URL: https://github.com/apache/shardingsphere/pull/16727


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] azexcy commented on a diff in pull request #16727: - PostgreSQL scaling auto create table optimization

Posted by GitBox <gi...@apache.org>.
azexcy commented on code in PR #16727:
URL: https://github.com/apache/shardingsphere/pull/16727#discussion_r847130207


##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java:
##########
@@ -132,12 +137,12 @@ private String rewriteActualIndexSql(final String sql, final String actualTableN
                 try (Connection sourceConnection = dataSource.getConnection()) {
                     String actualTableName = dataNode.getTableName();
                     StringJoiner joiner = new StringJoiner(";");
-                    Pair<String, List<String>> pkPiar = queryTablePrimaryKey(sourceConnection, actualTableName);
-                    joiner.add(queryCreateTableSql(sourceConnection, actualTableName, pkPiar.getRight()));
-                    queryCreateIndexes(sourceConnection, actualTableName, pkPiar.getLeft()).forEach(joiner::add);
-                    String logicTableName = each.getLogicTableName();
+                    Pair<String, List<String>> pkPair = queryTablePrimaryKey(sourceConnection, actualTableName);
+                    joiner.add(queryCreateTableSql(sourceConnection, actualTableName, pkPair.getRight()));
+                    queryCreateIndexes(sourceConnection, actualTableName, pkPair.getLeft()).forEach(joiner::add);

Review Comment:
   done , changed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] sandynz commented on a diff in pull request #16727: - PostgreSQL scaling auto create table optimization

Posted by GitBox <gi...@apache.org>.
sandynz commented on code in PR #16727:
URL: https://github.com/apache/shardingsphere/pull/16727#discussion_r847063159


##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java:
##########
@@ -87,9 +91,30 @@ private void prepareTarget(final JobConfiguration jobConfig, final PipelineDataS
             log.info("dataSourcePreparer null, ignore prepare target");
             return;
         }
-        JobDataNodeLine tablesFirstDataNodes = JobDataNodeLine.unmarshal(jobConfig.getHandleConfig().getTablesFirstDataNodes());
-        PrepareTargetTablesParameter prepareTargetTablesParameter = new PrepareTargetTablesParameter(tablesFirstDataNodes, jobConfig.getPipelineConfig(), dataSourceManager);
-        dataSourcePreparer.prepareTargetTables(prepareTargetTablesParameter);
+        LockContext lockContext = PipelineContext.getContextManager().getInstanceContext().getLockContext();
+        ShardingSphereLock lock = lockContext.getOrCreateSchemaLock(jobConfig.getWorkflowConfig().getSchemaName());
+        // TODO make sure only the first thread execute prepare
+        boolean skipPrepare = !lock.tryLock("prepareTargetTablesLock", 100);

Review Comment:
   Seems `tryLock`'s parameter `lockName` is still handled as `schemaName` in `ShardingSphereDistributeGlobalLock.innerTryLock`. `LockContext` is just designed for schema lock for now.
   
   Could we use `PipelineSimpleLock` to implement it?



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java:
##########
@@ -132,12 +137,12 @@ private String rewriteActualIndexSql(final String sql, final String actualTableN
                 try (Connection sourceConnection = dataSource.getConnection()) {
                     String actualTableName = dataNode.getTableName();
                     StringJoiner joiner = new StringJoiner(";");
-                    Pair<String, List<String>> pkPiar = queryTablePrimaryKey(sourceConnection, actualTableName);
-                    joiner.add(queryCreateTableSql(sourceConnection, actualTableName, pkPiar.getRight()));
-                    queryCreateIndexes(sourceConnection, actualTableName, pkPiar.getLeft()).forEach(joiner::add);
-                    String logicTableName = each.getLogicTableName();
+                    Pair<String, List<String>> pkPair = queryTablePrimaryKey(sourceConnection, actualTableName);
+                    joiner.add(queryCreateTableSql(sourceConnection, actualTableName, pkPair.getRight()));
+                    queryCreateIndexes(sourceConnection, actualTableName, pkPair.getLeft()).forEach(joiner::add);

Review Comment:
   It's better to use `primaryKey` to replace `pk`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org