You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by GitBox <gi...@apache.org> on 2022/05/03 10:17:54 UTC

[GitHub] [shardingsphere] nickfan opened a new issue, #17299: ShardingSphere-Proxy Unable to work with apache flink cdc connector.

nickfan opened a new issue, #17299:
URL: https://github.com/apache/shardingsphere/issues/17299

   ## Bug Report
   
   **For English only**, other languages will not accept.
   
   Before report a bug, make sure you have:
   
   - Searched open and closed [GitHub issues](https://github.com/apache/shardingsphere/issues).
   - Read documentation: [ShardingSphere Doc](https://shardingsphere.apache.org/document/current/en/overview).
   
   Please pay attention on issues you submitted, because we maybe need more details. 
   If no response anymore and we cannot reproduce it on current information, we will **close it**.
   
   Please answer these questions before submitting your issue. Thanks!
   
   ### Which version of ShardingSphere did you use?
   5.1.0
   
   ### Which project did you use? ShardingSphere-JDBC or ShardingSphere-Proxy?
   
   ShardingSphere-Proxy
   
   ### Expected behavior
   apache flink cdc connector work together with apache shardingsphere
   
   ### Actual behavior
   
   when use apache ShardingSphere as cdc datasource,it will cause a error when cdc try to enum the database tables:
   
   ### Reason analyze (If you can)
   
   ShardingSphere is missing the information_schema database which provider the metadata information of the instance databases,may be that's the reason?
   
   ### Steps to reproduce the behavior, such as: SQL to execute, sharding rule configuration, when exception occur etc.
   
   reference issue: 
   https://github.com/ververica/flink-cdc-connectors/issues/1148
   
   ### Example codes for reproduce this issue (such as a github link).
   
   reference issue: 
   https://github.com/ververica/flink-cdc-connectors/issues/1148
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] RaigorJiang closed issue #17299: ShardingSphere-Proxy Unable to work with apache flink cdc connector.

Posted by GitBox <gi...@apache.org>.
RaigorJiang closed issue #17299: ShardingSphere-Proxy Unable to work with apache flink cdc connector.
URL: https://github.com/apache/shardingsphere/issues/17299


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] nickfan commented on issue #17299: ShardingSphere-Proxy Unable to work with apache flink cdc connector.

Posted by GitBox <gi...@apache.org>.
nickfan commented on issue #17299:
URL: https://github.com/apache/shardingsphere/issues/17299#issuecomment-1119195695

   > Hi @nickfan The support for `information_schema` is being improved, please pay attention to the subsequent version releases. Related issue: #16234
   
   i upgraded  shardingsphere to 5.1.1,  still cause the same issue (with information_schema included):
   
   ```log
   jobmanager_1      | 2022-05-06T01:54:19.211855085Z 2022-05-06 01:54:19,211 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Starting execution of job insert-into_myhive.default.es_spl_queue_log (3b8fad9f3ed3a1d1d6ba4cb09583f2f9) under job master id 00000000000000000000000000000000.
   jobmanager_1      | 2022-05-06T01:54:19.213168090Z 2022-05-06 01:54:19,213 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Starting split enumerator for source Source: TableSourceScan(table=[[myhive, default, cdc_init_spl_queue_log]], fields=[id, org_id, spl_order_id, tracking_code]) -> DropUpdateBefore.
   jobmanager_1      | 2022-05-06T01:54:19.219152022Z 2022-05-06 01:54:19,219 INFO  com.ververica.cdc.connectors.mysql.source.connection.JdbcConnectionPools [] - Create and register connection pool 172.16.0.33:3308
   jobmanager_1      | 2022-05-06T01:54:19.220544261Z 2022-05-06 01:54:19,220 INFO  com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.HikariDataSource [] - connection-pool-172.16.0.33:3308 - Starting...
   jobmanager_1      | 2022-05-06T01:54:19.306373014Z 2022-05-06 01:54:19,306 INFO  com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.HikariDataSource [] - connection-pool-172.16.0.33:3308 - Start completed.
   jobmanager_1      | 2022-05-06T01:54:19.341044487Z 2022-05-06 01:54:19,340 INFO  io.debezium.jdbc.JdbcConnection                              [] - Connection gracefully closed
   jobmanager_1      | 2022-05-06T01:54:19.341182589Z 2022-05-06 01:54:19,341 INFO  com.ververica.cdc.connectors.mysql.MySqlValidator            [] - MySQL validation passed.
   jobmanager_1      | 2022-05-06T01:54:19.348825206Z 2022-05-06 01:54:19,348 INFO  com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils [] - Read list of available databases
   jobmanager_1      | 2022-05-06T01:54:19.351583669Z 2022-05-06 01:54:19,351 INFO  com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils [] -        list of available databases is: [sharding_db, mysql, information_schema, performance_schema, sys]
   jobmanager_1      | 2022-05-06T01:54:19.351597610Z 2022-05-06 01:54:19,351 INFO  com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils [] - Read list of available tables in each database
   jobmanager_1      | 2022-05-06T01:54:19.369729366Z 2022-05-06 01:54:19,369 WARN  com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils [] -        skipping database 'sharding_db' due to error reading tables: No database selected
   jobmanager_1      | 2022-05-06T01:54:19.371206896Z 2022-05-06 01:54:19,371 WARN  com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils [] -        skipping database 'mysql' due to error reading tables: No database selected
   jobmanager_1      | 2022-05-06T01:54:19.372318346Z 2022-05-06 01:54:19,372 WARN  com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils [] -        skipping database 'information_schema' due to error reading tables: No database selected
   jobmanager_1      | 2022-05-06T01:54:19.373209226Z 2022-05-06 01:54:19,373 WARN  com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils [] -        skipping database 'performance_schema' due to error reading tables: No database selected
   jobmanager_1      | 2022-05-06T01:54:19.374866886Z 2022-05-06 01:54:19,374 WARN  com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils [] -        skipping database 'sys' due to error reading tables: No database selected
   jobmanager_1      | 2022-05-06T01:54:19.376215140Z 2022-05-06 01:54:19,376 INFO  io.debezium.jdbc.JdbcConnection                              [] - Connection gracefully closed
   jobmanager_1      | 2022-05-06T01:54:19.381521176Z 2022-05-06 01:54:19,378 ERROR org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Failed to create Source Enumerator for source Source: TableSourceScan(table=[[myhive, default, cdc_init_spl_queue_log]], fields=[id, org_id, spl_order_id, tracking_code]) -> DropUpdateBefore
   jobmanager_1      | 2022-05-06T01:54:19.381544918Z org.apache.flink.util.FlinkRuntimeException: Failed to discover captured tables for enumerator
   jobmanager_1      | 2022-05-06T01:54:19.381548337Z      at com.ververica.cdc.connectors.mysql.source.MySqlSource.createEnumerator(MySqlSource.java:170) ~[flink-sql-connector-mysql-cdc-2.1.0.jar:2.1.0]
   jobmanager_1      | 2022-05-06T01:54:19.381551337Z      at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:128) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T01:54:19.381554248Z      at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.applyCall(RecreateOnResetOperatorCoordinator.java:291) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T01:54:19.381557585Z      at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.start(RecreateOnResetOperatorCoordinator.java:70) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T01:54:19.381560391Z      at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:194) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T01:54:19.381563002Z      at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:85) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T01:54:19.381565617Z      at org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:592) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T01:54:19.381568183Z      at org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:955) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T01:54:19.381570771Z      at org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:873) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T01:54:19.381573286Z      at org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:383) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T01:54:19.381575813Z      at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T01:54:19.381579351Z      at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:605) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T01:54:19.381582075Z      at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:180) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T01:54:19.381584698Z      at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T01:54:19.381587246Z      at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T01:54:19.381589746Z      at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T01:54:19.381592192Z      at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T01:54:19.381604319Z      at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T01:54:19.381607039Z      at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T01:54:19.381609600Z      at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T01:54:19.381612323Z      at akka.actor.Actor.aroundReceive(Actor.scala:517) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T01:54:19.381614900Z      at akka.actor.Actor.aroundReceive$(Actor.scala:515) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T01:54:19.381617444Z      at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T01:54:19.381619887Z      at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T01:54:19.381622342Z      at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T01:54:19.381624803Z      at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T01:54:19.381627320Z      at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T01:54:19.381629931Z      at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T01:54:19.381632520Z      at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T01:54:19.381635121Z      at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T01:54:19.381637742Z      at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T01:54:19.381640473Z      at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T01:54:19.381643027Z Caused by: java.lang.IllegalArgumentException: Can't find any matched tables, please check your configured database-name: [sharding_db] and table-name: [sharding_db.spl_queue_log]
   jobmanager_1      | 2022-05-06T01:54:19.381645744Z      at com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.discoverCapturedTables(DebeziumUtils.java:167) ~[flink-sql-connector-mysql-cdc-2.1.0.jar:2.1.0]
   jobmanager_1      | 2022-05-06T01:54:19.381648582Z      at com.ververica.cdc.connectors.mysql.source.MySqlSource.createEnumerator(MySqlSource.java:161) ~[flink-sql-connector-mysql-cdc-2.1.0.jar:2.1.0]
   jobmanager_1      | 2022-05-06T01:54:19.381651603Z      ... 31 more
   ```
   ![image](https://user-images.githubusercontent.com/100613/167054643-e8389b8b-9574-4dc5-b2b0-beeb95a5f22c.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] RaigorJiang commented on issue #17299: ShardingSphere-Proxy Unable to work with apache flink cdc connector.

Posted by GitBox <gi...@apache.org>.
RaigorJiang commented on issue #17299:
URL: https://github.com/apache/shardingsphere/issues/17299#issuecomment-1151752740

   Closed because no response.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] RaigorJiang commented on issue #17299: ShardingSphere-Proxy Unable to work with apache flink cdc connector.

Posted by GitBox <gi...@apache.org>.
RaigorJiang commented on issue #17299:
URL: https://github.com/apache/shardingsphere/issues/17299#issuecomment-1119202471

   <img width="1503" alt="image" src="https://user-images.githubusercontent.com/5668787/167055600-0fa77d42-2424-490a-84e9-ddbf4087901a.png">
   
   `No database selected` prompted, so you did not specify DB when connecting to Proxy?
   What SQL did your cdc connector send to Proxy when `No database selected` prompted?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] RaigorJiang commented on issue #17299: ShardingSphere-Proxy Unable to work with apache flink cdc connector.

Posted by GitBox <gi...@apache.org>.
RaigorJiang commented on issue #17299:
URL: https://github.com/apache/shardingsphere/issues/17299#issuecomment-1119588943

   @nickfan 
   Yes, the metadata schemas (like information_schema) are still in the simulation stage, you can refer to:
   https://github.com/apache/shardingsphere/tree/master/shardingsphere-infra/shardingsphere-infra-common/src/main/resources/schema/mysql
   
   In the future, ShardingSphere will update the information in the metadata schemas in an asynchronous way. 
   If you are interested, we can improve it together.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] nickfan commented on issue #17299: ShardingSphere-Proxy Unable to work with apache flink cdc connector.

Posted by GitBox <gi...@apache.org>.
nickfan commented on issue #17299:
URL: https://github.com/apache/shardingsphere/issues/17299#issuecomment-1119238133

   ![image](https://user-images.githubusercontent.com/100613/167060373-4aee9cad-4fb5-4f3b-95db-e40ebaff1356.png)
   shardingsphere information_schema tables is just a mockup? or did i miss sth ?
   
   I've put together the relevant configuration files for reproducing these issues.
   
   apache shardingsphere 5.1.1
   
   server.yaml
   ```yaml
   
   mode:
     type: Cluster
     repository:
       type: ZooKeeper
       props:
         namespace: governance_ds
         server-lists: zookeeper:2181
         retryIntervalMilliseconds: 500
         timeToLiveSeconds: 60
         maxRetries: 3
         operationTimeoutMilliseconds: 500
     overwrite: false
   
   rules:
     - !AUTHORITY
       users:
         - root@%:root
         - sharding@:sharding
       provider:
         type: ALL_PRIVILEGES_PERMITTED
   #  - !TRANSACTION
   #    defaultType: XA
   #    providerType: Atomikos
   
   props:
     max-connections-size-per-query: 1
     kernel-executor-size: 16  # Infinite by default.
     proxy-frontend-flush-threshold: 128  # The default value is 128.
     proxy-opentracing-enabled: false
   #  proxy-hint-enabled: false
     sql-show: true
   #  check-table-metadata-enabled: false
   #  show-process-list-enabled: false
   #    # Proxy backend query fetch size. A larger value may increase the memory usage of ShardingSphere Proxy.
   #    # The default value is -1, which means set the minimum value for different JDBC drivers.
   #  proxy-backend-query-fetch-size: -1
   #  check-duplicate-table-enabled: false
   #  proxy-frontend-executor-size: 0 # Proxy frontend executor size. The default value is 0, which means let Netty decide.
   #    # Available options of proxy backend executor suitable: OLAP(default), OLTP. The OLTP option may reduce time cost of writing packets to client, but it may increase the latency of SQL execution
   #    # and block other clients if client connections are more than `proxy-frontend-executor-size`, especially executing slow SQL.
   #  proxy-backend-executor-suitable: OLAP
   #  proxy-frontend-max-connections: 0 # Less than or equal to 0 means no limitation.
   #  sql-federation-enabled: false
   #    # Available proxy backend driver type: JDBC (default), ExperimentalVertx
   #  proxy-backend-driver-type: JDBC
   
   ```
   
   config-sharding.yaml
   ```
   schemaName: sharding_db
   dataSources:
     ds_0:
       url: jdbc:mysql://mysql:3306/demo_ds_0?serverTimezone=UTC&useSSL=false
       username: root
       password: root
       connectionTimeoutMilliseconds: 30000
       idleTimeoutMilliseconds: 60000
       maxLifetimeMilliseconds: 1800000
       maxPoolSize: 50
       minPoolSize: 1
     ds_1:
       url: jdbc:mysql://mysql:3306/demo_ds_1?serverTimezone=UTC&useSSL=false
       username: root
       password: root
       connectionTimeoutMilliseconds: 30000
       idleTimeoutMilliseconds: 60000
       maxLifetimeMilliseconds: 1800000
       maxPoolSize: 50
       minPoolSize: 1
   rules:
   - !SHARDING
     tables:
       t_order:
         actualDataNodes: ds_${0..1}.t_order_${0..1}
         tableStrategy:
           standard:
             shardingColumn: org_id
             shardingAlgorithmName: t_order_inline
         keyGenerateStrategy:
           column: id
           keyGeneratorName: snowflake
       t_order_item:
         actualDataNodes: ds_${0..1}.t_order_item_${0..1}
         tableStrategy:
           standard:
             shardingColumn: org_id
             shardingAlgorithmName: t_order_item_inline
         keyGenerateStrategy:
           column: id
           keyGeneratorName: snowflake
       t_org_option:
         actualDataNodes: ds_${0..1}.t_org_option
         tableStrategy:
           none:
         keyGenerateStrategy:
           column: id
           keyGeneratorName: snowflake
       t_global_option:
         actualDataNodes: ds_${0..1}.t_global_option
         tableStrategy:
           none:
         keyGenerateStrategy:
           column: id
           keyGeneratorName: snowflake
     bindingTables:
       - t_order
       - t_order_item
       - t_org_option
     broadcastTables:
       - t_global_option
     defaultDatabaseStrategy:
       standard:
         shardingColumn: org_id
         shardingAlgorithmName: database_inline
     defaultTableStrategy:
       none:
     defaultKeyGenerateStrategy:
       column: id
       keyGeneratorName: snowflake
     defaultShardingColumn: org_id
     shardingAlgorithms:
       database_inline:
         type: INLINE
         props:
           algorithm-expression: ds_${org_id % 2}
       t_order_inline:
         type: INLINE
         props:
           algorithm-expression: t_order_${org_id % 2}
       t_order_item_inline:
         type: INLINE
         props:
           algorithm-expression: t_order_item_${org_id % 2}
     keyGenerators:
       snowflake:
         type: SNOWFLAKE
         props:
           worker-id: 123
   ```
   
   mysql source schema init sql:
   
   ```sql
   CREATE DATABASE IF NOT EXISTS `demo_ds_0` DEFAULT CHARACTER SET = `utf8` DEFAULT COLLATE = `utf8_general_ci`;
   CREATE DATABASE IF NOT EXISTS `demo_ds_1` DEFAULT CHARACTER SET = `utf8` DEFAULT COLLATE = `utf8_general_ci`;
   
   USE `demo_ds_0`;
   
   CREATE TABLE IF NOT EXISTS `t_global_option` (
     `id` bigint(20) unsigned NOT NULL COMMENT 'ID',
     `option_name` varchar(255) NOT NULL COMMENT '选项名',
     `option_value` varchar(255) DEFAULT NULL COMMENT '选项值',
     `created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
     `updated_at` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
     PRIMARY KEY (`id`),
     UNIQUE KEY `option_name` (`option_name`)
   ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
   
   CREATE TABLE IF NOT EXISTS `t_org_option` (
     `id` bigint(20) unsigned NOT NULL COMMENT 'ID',
     `org_id` int(11) NOT NULL COMMENT '组织id',
     `option_name` varchar(255) NOT NULL COMMENT '选项名',
     `option_value` varchar(255) DEFAULT NULL COMMENT '选项值',
     `created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
     `updated_at` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
     PRIMARY KEY (`id`),
     UNIQUE KEY `option_name` (`option_name`,`org_id`),
     KEY `org_id` (`org_id`)
   ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
   
   
   CREATE TABLE IF NOT EXISTS `t_order_0` (
     `id` bigint(20) unsigned NOT NULL COMMENT 'ID',
     `org_id` int(11) NOT NULL DEFAULT '0' COMMENT '组织id',
     `order_source_platform` varchar(255) DEFAULT NULL COMMENT '订单来源平台',
     `order_source_no` varchar(255) DEFAULT NULL COMMENT '订单来源单号',
     `created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
     `updated_at` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
     PRIMARY KEY (`id`),
     KEY `org_id` (`org_id`),
     KEY `created_at` (`created_at`)
   ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
   
   CREATE TABLE IF NOT EXISTS `t_order_1` (
     `id` bigint(20) unsigned NOT NULL COMMENT 'ID',
     `org_id` int(11) NOT NULL DEFAULT '0' COMMENT '组织id',
     `order_source_platform` varchar(255) DEFAULT NULL COMMENT '订单来源平台',
     `order_source_no` varchar(255) DEFAULT NULL COMMENT '订单来源单号',
     `created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
     `updated_at` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
     PRIMARY KEY (`id`),
     KEY `org_id` (`org_id`),
     KEY `created_at` (`created_at`)
   ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
   
   CREATE TABLE IF NOT EXISTS `t_order_item_0` (
     `id` bigint(20) unsigned NOT NULL COMMENT 'ID',
     `org_id` int(11) NOT NULL DEFAULT '0' COMMENT '组织id',
     `order_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '所属订单id',
     `order_source_no` varchar(255) DEFAULT NULL COMMENT '订单来源单号',
     `sku` varchar(255) NOT NULL COMMENT '平台SKU',
     `unit` varchar(255) NOT NULL COMMENT '单位',
     `qty` decimal(10,4) NOT NULL DEFAULT '0.0000' COMMENT '数量',
     `created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
     `updated_at` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
     PRIMARY KEY (`id`),
     UNIQUE KEY `sku` (`sku`,`order_id`,`org_id`),
     KEY `org_id` (`org_id`),
     KEY `order_id` (`order_id`,`org_id`),
     KEY `created_at` (`created_at`,`org_id`)
   ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
   
   CREATE TABLE IF NOT EXISTS `t_order_item_1` (
     `id` bigint(20) unsigned NOT NULL COMMENT 'ID',
     `org_id` int(11) NOT NULL DEFAULT '0' COMMENT '组织id',
     `order_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '所属订单id',
     `order_source_no` varchar(255) DEFAULT NULL COMMENT '订单来源单号',
     `sku` varchar(255) NOT NULL COMMENT '平台SKU',
     `unit` varchar(255) NOT NULL COMMENT '单位',
     `qty` decimal(10,4) NOT NULL DEFAULT '0.0000' COMMENT '数量',
     `created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
     `updated_at` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
     PRIMARY KEY (`id`),
     UNIQUE KEY `sku` (`sku`,`order_id`,`org_id`),
     KEY `org_id` (`org_id`),
     KEY `order_id` (`order_id`,`org_id`),
     KEY `created_at` (`created_at`,`org_id`)
   ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
   
   
   USE `demo_ds_1`;
   
   CREATE TABLE IF NOT EXISTS `t_global_option` (
     `id` bigint(20) unsigned NOT NULL COMMENT 'ID',
     `option_name` varchar(255) NOT NULL COMMENT '选项名',
     `option_value` varchar(255) DEFAULT NULL COMMENT '选项值',
     `created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
     `updated_at` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
     PRIMARY KEY (`id`),
     UNIQUE KEY `option_name` (`option_name`)
   ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
   
   CREATE TABLE IF NOT EXISTS `t_org_option` (
     `id` bigint(20) unsigned NOT NULL COMMENT 'ID',
     `org_id` int(11) NOT NULL COMMENT '组织id',
     `option_name` varchar(255) NOT NULL COMMENT '选项名',
     `option_value` varchar(255) DEFAULT NULL COMMENT '选项值',
     `created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
     `updated_at` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
     PRIMARY KEY (`id`),
     UNIQUE KEY `option_name` (`option_name`,`org_id`),
     KEY `org_id` (`org_id`)
   ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
   
   
   CREATE TABLE IF NOT EXISTS `t_order_0` (
     `id` bigint(20) unsigned NOT NULL COMMENT 'ID',
     `org_id` int(11) NOT NULL DEFAULT '0' COMMENT '组织id',
     `order_source_platform` varchar(255) DEFAULT NULL COMMENT '订单来源平台',
     `order_source_no` varchar(255) DEFAULT NULL COMMENT '订单来源单号',
     `created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
     `updated_at` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
     PRIMARY KEY (`id`),
     KEY `org_id` (`org_id`),
     KEY `created_at` (`created_at`)
   ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
   
   CREATE TABLE IF NOT EXISTS `t_order_1` (
     `id` bigint(20) unsigned NOT NULL COMMENT 'ID',
     `org_id` int(11) NOT NULL DEFAULT '0' COMMENT '组织id',
     `order_source_platform` varchar(255) DEFAULT NULL COMMENT '订单来源平台',
     `order_source_no` varchar(255) DEFAULT NULL COMMENT '订单来源单号',
     `created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
     `updated_at` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
     PRIMARY KEY (`id`),
     KEY `org_id` (`org_id`),
     KEY `created_at` (`created_at`)
   ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
   
   CREATE TABLE IF NOT EXISTS `t_order_item_0` (
     `id` bigint(20) unsigned NOT NULL COMMENT 'ID',
     `org_id` int(11) NOT NULL DEFAULT '0' COMMENT '组织id',
     `order_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '所属订单id',
     `order_source_no` varchar(255) DEFAULT NULL COMMENT '订单来源单号',
     `sku` varchar(255) NOT NULL COMMENT '平台SKU',
     `unit` varchar(255) NOT NULL COMMENT '单位',
     `qty` decimal(10,4) NOT NULL DEFAULT '0.0000' COMMENT '数量',
     `created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
     `updated_at` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
     PRIMARY KEY (`id`),
     UNIQUE KEY `sku` (`sku`,`order_id`,`org_id`),
     KEY `org_id` (`org_id`),
     KEY `order_id` (`order_id`,`org_id`),
     KEY `created_at` (`created_at`,`org_id`)
   ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
   
   CREATE TABLE IF NOT EXISTS `t_order_item_1` (
     `id` bigint(20) unsigned NOT NULL COMMENT 'ID',
     `org_id` int(11) NOT NULL DEFAULT '0' COMMENT '组织id',
     `order_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '所属订单id',
     `order_source_no` varchar(255) DEFAULT NULL COMMENT '订单来源单号',
     `sku` varchar(255) NOT NULL COMMENT '平台SKU',
     `unit` varchar(255) NOT NULL COMMENT '单位',
     `qty` decimal(10,4) NOT NULL DEFAULT '0.0000' COMMENT '数量',
     `created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
     `updated_at` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
     PRIMARY KEY (`id`),
     UNIQUE KEY `sku` (`sku`,`order_id`,`org_id`),
     KEY `org_id` (`org_id`),
     KEY `order_id` (`order_id`,`org_id`),
     KEY `created_at` (`created_at`,`org_id`)
   ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
   
   ```
   
   data mockup sql for shardingsphere 'sharding_db':
   
   ```sql
   
   USE `sharding_db`;
   
   -- t_global_option table
   INSERT INTO `t_global_option` (`option_name`, `option_value`) VALUES ('k1', 'v1');
   INSERT INTO `t_global_option` (`option_name`, `option_value`) VALUES ('k2', 'v2');
   INSERT INTO `t_global_option` (`option_name`, `option_value`) VALUES ('k3', 'v3');
   
   -- t_org_option table
   INSERT INTO `t_org_option` (`org_id`, `option_name`, `option_value`) VALUES (1,'k1', 'v1');
   INSERT INTO `t_org_option` (`org_id`, `option_name`, `option_value`) VALUES (1,'k2', 'v2');
   INSERT INTO `t_org_option` (`org_id`, `option_name`, `option_value`) VALUES (1,'k3', 'v3');
   
   INSERT INTO `t_org_option` (`org_id`, `option_name`, `option_value`) VALUES (2,'k1', 'v1');
   INSERT INTO `t_org_option` (`org_id`, `option_name`, `option_value`) VALUES (2,'k2', 'v2');
   INSERT INTO `t_org_option` (`org_id`, `option_name`, `option_value`) VALUES (2,'k3', 'v3');
   
   INSERT INTO `t_org_option` (`org_id`, `option_name`, `option_value`) VALUES (3,'k1', 'v1');
   INSERT INTO `t_org_option` (`org_id`, `option_name`, `option_value`) VALUES (3,'k2', 'v2');
   INSERT INTO `t_org_option` (`org_id`, `option_name`, `option_value`) VALUES (3,'k3', 'v3');
   
   INSERT INTO `t_org_option` (`org_id`, `option_name`, `option_value`) VALUES (4,'k1', 'v1');
   INSERT INTO `t_org_option` (`org_id`, `option_name`, `option_value`) VALUES (4,'k2', 'v2');
   INSERT INTO `t_org_option` (`org_id`, `option_name`, `option_value`) VALUES (4,'k3', 'v3');
   
   INSERT INTO `t_org_option` (`org_id`, `option_name`, `option_value`) VALUES (5,'k1', 'v1');
   INSERT INTO `t_org_option` (`org_id`, `option_name`, `option_value`) VALUES (5,'k2', 'v2');
   INSERT INTO `t_org_option` (`org_id`, `option_name`, `option_value`) VALUES (5,'k3', 'v3');
   
   
   INSERT INTO `t_order` (`org_id`, `order_source_platform`, `order_source_no`) VALUES (1,'amazon.com', 'amz-o1-so1');
   INSERT INTO `t_order` (`org_id`, `order_source_platform`, `order_source_no`) VALUES (1,'amazon.com', 'amz-o1-so1');
   INSERT INTO `t_order` (`org_id`, `order_source_platform`, `order_source_no`) VALUES (1,'amazon.com', 'amz-o1-so1');
   
   INSERT INTO `t_order` (`org_id`, `order_source_platform`, `order_source_no`) VALUES (2,'amazon.com', 'amz-o2-so1');
   INSERT INTO `t_order` (`org_id`, `order_source_platform`, `order_source_no`) VALUES (2,'amazon.com', 'amz-o2-so2');
   INSERT INTO `t_order` (`org_id`, `order_source_platform`, `order_source_no`) VALUES (2,'amazon.com', 'amz-o2-so3');
   
   INSERT INTO `t_order` (`org_id`, `order_source_platform`, `order_source_no`) VALUES (3,'amazon.com', 'amz-o3-so1');
   INSERT INTO `t_order` (`org_id`, `order_source_platform`, `order_source_no`) VALUES (3,'amazon.com', 'amz-o3-so2');
   INSERT INTO `t_order` (`org_id`, `order_source_platform`, `order_source_no`) VALUES (3,'amazon.com', 'amz-o3-so3');
   
   INSERT INTO `t_order` (`org_id`, `order_source_platform`, `order_source_no`) VALUES (4,'amazon.com', 'amz-o4-so1');
   INSERT INTO `t_order` (`org_id`, `order_source_platform`, `order_source_no`) VALUES (4,'amazon.com', 'amz-o4-so2');
   INSERT INTO `t_order` (`org_id`, `order_source_platform`, `order_source_no`) VALUES (4,'amazon.com', 'amz-o4-so3');
   
   INSERT INTO `t_order` (`org_id`, `order_source_platform`, `order_source_no`) VALUES (5,'amazon.com', 'amz-o5-so1');
   INSERT INTO `t_order` (`org_id`, `order_source_platform`, `order_source_no`) VALUES (5,'amazon.com', 'amz-o5-so2');
   INSERT INTO `t_order` (`org_id`, `order_source_platform`, `order_source_no`) VALUES (5,'amazon.com', 'amz-o5-so3');
   
   
   ```
   
   
   flink version 1.13.3  cdc connector version 2.1.0
   
   flink-sql mysql cdc source and es sink :
   
   ```sql
   
   CREATE TABLE IF NOT EXISTS cdc_init_ds_t_order (
   id BIGINT,
   org_id INTEGER,
   order_source_platform VARCHAR(255),
   order_source_no VARCHAR(255),
   created_at TIMESTAMP(0),
   updated_at TIMESTAMP(0),
   PRIMARY KEY (id) NOT ENFORCED 
   ) WITH (
   'connector' = 'mysql-cdc',
   'scan.startup.mode' = 'initial',
   'hostname' = '172.16.0.33',
   'port' = '3308',
   'username' = 'root',
   'password' = 'root',
   'database-name' = 'sharding_db',
   'table-name' = 't_order'
   );
   
   
   CREATE TABLE IF NOT EXISTS es_ds_t_order (
   id BIGINT,
   org_id INTEGER,
   order_source_platform VARCHAR(255),
   order_source_no VARCHAR(255),
   created_at TIMESTAMP(0),
   updated_at TIMESTAMP(0),
   PRIMARY KEY (id) NOT ENFORCED 
   ) WITH (
   'connector' = 'elasticsearch-7',
   'hosts' = 'http://elasticsearch:9200',
   'index' = 't_order'
   );
   
   ```
   
   flink-sql cdc task mysql cdc source sink 2 es:
   
   ```sql
   INSERT INTO es_ds_t_order SELECT * FROM cdc_init_ds_t_order;
   ```
   
   flink console log:
   
   ```log
   jobmanager_1      | 2022-05-06T03:41:18.091783496Z 2022-05-06 03:41:18,091 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Received JobGraph submission 6f4d2fb809cd27113aa5a210af958eca (insert-into_myhive.default.es_ds_t_order).
   jobmanager_1      | 2022-05-06T03:41:18.092316247Z 2022-05-06 03:41:18,092 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Submitting job 6f4d2fb809cd27113aa5a210af958eca (insert-into_myhive.default.es_ds_t_order).
   jobmanager_1      | 2022-05-06T03:41:18.115033279Z 2022-05-06 03:41:18,114 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService             [] - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/rpc/jobmanager_17 .
   jobmanager_1      | 2022-05-06T03:41:18.115982107Z 2022-05-06 03:41:18,115 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Initializing job insert-into_myhive.default.es_ds_t_order (6f4d2fb809cd27113aa5a210af958eca).
   jobmanager_1      | 2022-05-06T03:41:18.117063069Z 2022-05-06 03:41:18,117 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using restart back off time strategy FailureRateRestartBackoffTimeStrategy(FailureRateRestartBackoffTimeStrategy(failuresIntervalMS=180000,backoffTimeMS=180000,maxFailuresPerInterval=4) for insert-into_myhive.default.es_ds_t_order (6f4d2fb809cd27113aa5a210af958eca).
   jobmanager_1      | 2022-05-06T03:41:18.117757348Z 2022-05-06 03:41:18,117 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Running initialization on master for job insert-into_myhive.default.es_ds_t_order (6f4d2fb809cd27113aa5a210af958eca).
   jobmanager_1      | 2022-05-06T03:41:18.117771898Z 2022-05-06 03:41:18,117 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Successfully ran initialization on master in 0 ms.
   jobmanager_1      | 2022-05-06T03:41:18.146655076Z 2022-05-06 03:41:18,146 INFO  org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - Built 1 pipelined regions in 0 ms
   jobmanager_1      | 2022-05-06T03:41:18.147124380Z 2022-05-06 03:41:18,147 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using job/cluster config to configure application-defined state backend: org.apache.flink.runtime.state.hashmap.HashMapStateBackend@749f20e
   jobmanager_1      | 2022-05-06T03:41:18.147162910Z 2022-05-06 03:41:18,147 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using application-defined state backend: org.apache.flink.runtime.state.hashmap.HashMapStateBackend@b868bf4
   jobmanager_1      | 2022-05-06T03:41:18.147354271Z 2022-05-06 03:41:18,147 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Checkpoint storage is set to 'filesystem': (checkpoints "hdfs://namenode:8020/flink/checkpoints")
   jobmanager_1      | 2022-05-06T03:41:18.188205038Z 2022-05-06 03:41:18,188 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - No checkpoint found during restore.
   jobmanager_1      | 2022-05-06T03:41:18.188312206Z 2022-05-06 03:41:18,188 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using failover strategy org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@7077d08 for insert-into_myhive.default.es_ds_t_order (6f4d2fb809cd27113aa5a210af958eca).
   jobmanager_1      | 2022-05-06T03:41:18.188662958Z 2022-05-06 03:41:18,188 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Starting execution of job insert-into_myhive.default.es_ds_t_order (6f4d2fb809cd27113aa5a210af958eca) under job master id 00000000000000000000000000000000.
   jobmanager_1      | 2022-05-06T03:41:18.189276971Z 2022-05-06 03:41:18,189 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Starting split enumerator for source Source: TableSourceScan(table=[[myhive, default, cdc_init_ds_t_order]], fields=[id, org_id, order_source_platform, order_source_no, created_at, updated_at]) -> DropUpdateBefore.
   jobmanager_1      | 2022-05-06T03:41:18.193002757Z 2022-05-06 03:41:18,192 WARN  com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.PoolBase [] - connection-pool-172.16.0.33:3308 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@7590c18e (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
   jobmanager_1      | 2022-05-06T03:41:18.197250937Z 2022-05-06 03:41:18,196 WARN  com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.PoolBase [] - connection-pool-172.16.0.33:3308 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@58d6122f (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
   jobmanager_1      | 2022-05-06T03:41:18.198771011Z 2022-05-06 03:41:18,198 WARN  com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.PoolBase [] - connection-pool-172.16.0.33:3308 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@301caba0 (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
   jobmanager_1      | 2022-05-06T03:41:18.200223069Z 2022-05-06 03:41:18,200 WARN  com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.PoolBase [] - connection-pool-172.16.0.33:3308 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@4373809c (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
   jobmanager_1      | 2022-05-06T03:41:18.201675640Z 2022-05-06 03:41:18,201 WARN  com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.PoolBase [] - connection-pool-172.16.0.33:3308 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@4413048f (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
   jobmanager_1      | 2022-05-06T03:41:18.203528540Z 2022-05-06 03:41:18,203 WARN  com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.PoolBase [] - connection-pool-172.16.0.33:3308 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@2cbc3148 (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
   jobmanager_1      | 2022-05-06T03:41:18.204849069Z 2022-05-06 03:41:18,204 WARN  com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.PoolBase [] - connection-pool-172.16.0.33:3308 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@7928ded4 (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
   jobmanager_1      | 2022-05-06T03:41:18.206113172Z 2022-05-06 03:41:18,206 WARN  com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.PoolBase [] - connection-pool-172.16.0.33:3308 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@2600d60d (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
   jobmanager_1      | 2022-05-06T03:41:18.216072540Z 2022-05-06 03:41:18,215 WARN  com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.PoolBase [] - connection-pool-172.16.0.33:3308 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@7a6b893e (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
   jobmanager_1      | 2022-05-06T03:41:18.217582928Z 2022-05-06 03:41:18,217 WARN  com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.PoolBase [] - connection-pool-172.16.0.33:3308 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@3790d14a (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
   jobmanager_1      | 2022-05-06T03:41:18.220663460Z 2022-05-06 03:41:18,220 WARN  com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.PoolBase [] - connection-pool-172.16.0.33:3308 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@8f17712 (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
   jobmanager_1      | 2022-05-06T03:41:18.221933286Z 2022-05-06 03:41:18,221 WARN  com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.PoolBase [] - connection-pool-172.16.0.33:3308 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@28e168bf (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
   jobmanager_1      | 2022-05-06T03:41:18.223072751Z 2022-05-06 03:41:18,222 WARN  com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.PoolBase [] - connection-pool-172.16.0.33:3308 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@e6536e4 (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
   jobmanager_1      | 2022-05-06T03:41:18.224661737Z 2022-05-06 03:41:18,224 WARN  com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.PoolBase [] - connection-pool-172.16.0.33:3308 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@3fcebce3 (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
   jobmanager_1      | 2022-05-06T03:41:18.225772747Z 2022-05-06 03:41:18,225 WARN  com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.PoolBase [] - connection-pool-172.16.0.33:3308 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@3c8415ef (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
   jobmanager_1      | 2022-05-06T03:41:18.227405302Z 2022-05-06 03:41:18,227 WARN  com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.PoolBase [] - connection-pool-172.16.0.33:3308 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@3876e142 (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
   jobmanager_1      | 2022-05-06T03:41:18.228714051Z 2022-05-06 03:41:18,228 WARN  com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.PoolBase [] - connection-pool-172.16.0.33:3308 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@72157b0b (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
   jobmanager_1      | 2022-05-06T03:41:18.231013816Z 2022-05-06 03:41:18,230 WARN  com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.PoolBase [] - connection-pool-172.16.0.33:3308 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@6e2223cb (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
   jobmanager_1      | 2022-05-06T03:41:18.232069357Z 2022-05-06 03:41:18,232 WARN  com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.PoolBase [] - connection-pool-172.16.0.33:3308 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@6815a9f9 (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
   jobmanager_1      | 2022-05-06T03:41:18.235841071Z 2022-05-06 03:41:18,235 WARN  com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.PoolBase [] - connection-pool-172.16.0.33:3308 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@67663948 (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
   jobmanager_1      | 2022-05-06T03:41:18.315131903Z 2022-05-06 03:41:18,314 INFO  io.debezium.jdbc.JdbcConnection                              [] - Connection gracefully closed
   jobmanager_1      | 2022-05-06T03:41:18.315290827Z 2022-05-06 03:41:18,315 INFO  com.ververica.cdc.connectors.mysql.MySqlValidator            [] - MySQL validation passed.
   jobmanager_1      | 2022-05-06T03:41:18.315798138Z 2022-05-06 03:41:18,315 INFO  com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils [] - Read list of available databases
   jobmanager_1      | 2022-05-06T03:41:18.316828871Z 2022-05-06 03:41:18,316 INFO  com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils [] -        list of available databases is: [sharding_db, mysql, information_schema, performance_schema, sys]
   jobmanager_1      | 2022-05-06T03:41:18.316848004Z 2022-05-06 03:41:18,316 INFO  com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils [] - Read list of available tables in each database
   jobmanager_1      | 2022-05-06T03:41:18.341224657Z 2022-05-06 03:41:18,341 WARN  com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils [] -        skipping database 'sharding_db' due to error reading tables: No database selected
   jobmanager_1      | 2022-05-06T03:41:18.342691048Z 2022-05-06 03:41:18,342 WARN  com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils [] -        skipping database 'mysql' due to error reading tables: No database selected
   jobmanager_1      | 2022-05-06T03:41:18.343667712Z 2022-05-06 03:41:18,343 WARN  com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils [] -        skipping database 'information_schema' due to error reading tables: No database selected
   jobmanager_1      | 2022-05-06T03:41:18.344465873Z 2022-05-06 03:41:18,344 WARN  com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils [] -        skipping database 'performance_schema' due to error reading tables: No database selected
   jobmanager_1      | 2022-05-06T03:41:18.345258206Z 2022-05-06 03:41:18,345 WARN  com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils [] -        skipping database 'sys' due to error reading tables: No database selected
   jobmanager_1      | 2022-05-06T03:41:18.346396216Z 2022-05-06 03:41:18,346 INFO  io.debezium.jdbc.JdbcConnection                              [] - Connection gracefully closed
   jobmanager_1      | 2022-05-06T03:41:18.347640778Z 2022-05-06 03:41:18,346 ERROR org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Failed to create Source Enumerator for source Source: TableSourceScan(table=[[myhive, default, cdc_init_ds_t_order]], fields=[id, org_id, order_source_platform, order_source_no, created_at, updated_at]) -> DropUpdateBefore
   jobmanager_1      | 2022-05-06T03:41:18.347663736Z org.apache.flink.util.FlinkRuntimeException: Failed to discover captured tables for enumerator
   jobmanager_1      | 2022-05-06T03:41:18.347668994Z      at com.ververica.cdc.connectors.mysql.source.MySqlSource.createEnumerator(MySqlSource.java:170) ~[flink-sql-connector-mysql-cdc-2.1.0.jar:2.1.0]
   jobmanager_1      | 2022-05-06T03:41:18.347674499Z      at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:128) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.347679133Z      at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.applyCall(RecreateOnResetOperatorCoordinator.java:291) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.347683960Z      at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.start(RecreateOnResetOperatorCoordinator.java:70) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.347690047Z      at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:194) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.347695171Z      at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:85) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.347702122Z      at org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:592) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.347706608Z      at org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:955) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.347710381Z      at org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:873) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.347714329Z      at org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:383) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.347718472Z      at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.347724528Z      at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:605) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.347729888Z      at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:180) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.347733598Z      at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.347737241Z      at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.347741029Z      at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.347745238Z      at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.347762754Z      at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.347767108Z      at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.347771037Z      at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.347774681Z      at akka.actor.Actor.aroundReceive(Actor.scala:517) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.347778529Z      at akka.actor.Actor.aroundReceive$(Actor.scala:515) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.347782516Z      at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.347786140Z      at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.347791771Z      at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.347795948Z      at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.347799511Z      at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.347803973Z      at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.347808945Z      at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.347813429Z      at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.347818245Z      at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.347825951Z      at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.347830790Z Caused by: java.lang.IllegalArgumentException: Can't find any matched tables, please check your configured database-name: [sharding_db] and table-name: [sharding_db.t_order]
   jobmanager_1      | 2022-05-06T03:41:18.347834971Z      at com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.discoverCapturedTables(DebeziumUtils.java:167) ~[flink-sql-connector-mysql-cdc-2.1.0.jar:2.1.0]
   jobmanager_1      | 2022-05-06T03:41:18.347839264Z      at com.ververica.cdc.connectors.mysql.source.MySqlSource.createEnumerator(MySqlSource.java:161) ~[flink-sql-connector-mysql-cdc-2.1.0.jar:2.1.0]
   jobmanager_1      | 2022-05-06T03:41:18.347843703Z      ... 31 more
   jobmanager_1      | 2022-05-06T03:41:18.347847624Z 2022-05-06 03:41:18,347 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Starting scheduling with scheduling strategy [org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]
   jobmanager_1      | 2022-05-06T03:41:18.347852911Z 2022-05-06 03:41:18,347 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job insert-into_myhive.default.es_ds_t_order (6f4d2fb809cd27113aa5a210af958eca) switched from state CREATED to RUNNING.
   jobmanager_1      | 2022-05-06T03:41:18.348142922Z 2022-05-06 03:41:18,348 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: TableSourceScan(table=[[myhive, default, cdc_init_ds_t_order]], fields=[id, org_id, order_source_platform, order_source_no, created_at, updated_at]) -> DropUpdateBefore (1/1) (1d29c53fdde1442b97a7dec0aa8650c1) switched from CREATED to SCHEDULED.
   jobmanager_1      | 2022-05-06T03:41:18.348157054Z 2022-05-06 03:41:18,348 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - ChangelogNormalize(key=[id]) -> NotNullEnforcer(fields=[id]) -> Sink: Sink(table=[myhive.default.es_ds_t_order], fields=[id, org_id, order_source_platform, order_source_no, created_at, updated_at]) (1/1) (529eade9aab81d77b0dcce25d963b98a) switched from CREATED to SCHEDULED.
   jobmanager_1      | 2022-05-06T03:41:18.348674695Z 2022-05-06 03:41:18,348 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Connecting to ResourceManager akka.tcp://flink@jobmanager:6123/user/rpc/resourcemanager_*(00000000000000000000000000000000)
   jobmanager_1      | 2022-05-06T03:41:18.349373848Z 2022-05-06 03:41:18,349 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Resolved ResourceManager address, beginning registration
   jobmanager_1      | 2022-05-06T03:41:18.350244971Z 2022-05-06 03:41:18,349 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Trying to recover from a global failure.
   jobmanager_1      | 2022-05-06T03:41:18.350260239Z org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'Source: TableSourceScan(table=[[myhive, default, cdc_init_ds_t_order]], fields=[id, org_id, order_source_platform, order_source_no, created_at, updated_at]) -> DropUpdateBefore' (operator cbc357ccb763df2852fee8c4fc7d55f2).
   jobmanager_1      | 2022-05-06T03:41:18.350266930Z      at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:553) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.350272463Z      at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:223) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.350276072Z      at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:285) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.350279092Z      at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:132) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.350281885Z      at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.applyCall(RecreateOnResetOperatorCoordinator.java:291) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.350285118Z      at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.start(RecreateOnResetOperatorCoordinator.java:70) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.350288023Z      at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:194) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.350291058Z      at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:85) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.350293962Z      at org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:592) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.350306268Z      at org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:955) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.350309420Z      at org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:873) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.350313763Z      at org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:383) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.350316686Z      at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.350319330Z      at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:605) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.350321993Z      at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:180) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.350324769Z      at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.350327411Z      at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.350330302Z      at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.350333090Z      at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.350335686Z      at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.350338336Z      at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.350341798Z      at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.350344360Z      at akka.actor.Actor.aroundReceive(Actor.scala:517) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.350346964Z      at akka.actor.Actor.aroundReceive$(Actor.scala:515) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.350349746Z      at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.350352767Z      at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.350355517Z      at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.350358205Z      at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.350361353Z      at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.350363931Z      at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.350366514Z      at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.350371921Z      at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.350374603Z      at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.350377633Z      at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.350380439Z Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to discover captured tables for enumerator
   jobmanager_1      | 2022-05-06T03:41:18.350383227Z      at com.ververica.cdc.connectors.mysql.source.MySqlSource.createEnumerator(MySqlSource.java:170) ~[flink-sql-connector-mysql-cdc-2.1.0.jar:2.1.0]
   jobmanager_1      | 2022-05-06T03:41:18.350386109Z      at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:128) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.350388798Z      ... 30 more
   jobmanager_1      | 2022-05-06T03:41:18.350391575Z Caused by: java.lang.IllegalArgumentException: Can't find any matched tables, please check your configured database-name: [sharding_db] and table-name: [sharding_db.t_order]
   jobmanager_1      | 2022-05-06T03:41:18.350395685Z      at com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.discoverCapturedTables(DebeziumUtils.java:167) ~[flink-sql-connector-mysql-cdc-2.1.0.jar:2.1.0]
   jobmanager_1      | 2022-05-06T03:41:18.350399407Z      at com.ververica.cdc.connectors.mysql.source.MySqlSource.createEnumerator(MySqlSource.java:161) ~[flink-sql-connector-mysql-cdc-2.1.0.jar:2.1.0]
   jobmanager_1      | 2022-05-06T03:41:18.350403720Z      at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:128) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
   jobmanager_1      | 2022-05-06T03:41:18.350407603Z      ... 30 more
   jobmanager_1      | 2022-05-06T03:41:18.350410381Z 2022-05-06 03:41:18,350 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Registering job manager 00000000000000000000000000000000@akka.tcp://flink@jobmanager:6123/user/rpc/jobmanager_17 for job 6f4d2fb809cd27113aa5a210af958eca.
   jobmanager_1      | 2022-05-06T03:41:18.350413912Z 2022-05-06 03:41:18,350 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job insert-into_myhive.default.es_ds_t_order (6f4d2fb809cd27113aa5a210af958eca) switched from state RUNNING to RESTARTING.
   jobmanager_1      | 2022-05-06T03:41:18.350712711Z 2022-05-06 03:41:18,350 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: TableSourceScan(table=[[myhive, default, cdc_init_ds_t_order]], fields=[id, org_id, order_source_platform, order_source_no, created_at, updated_at]) -> DropUpdateBefore (1/1) (1d29c53fdde1442b97a7dec0aa8650c1) switched from SCHEDULED to CANCELING.
   jobmanager_1      | 2022-05-06T03:41:18.350736699Z 2022-05-06 03:41:18,350 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: TableSourceScan(table=[[myhive, default, cdc_init_ds_t_order]], fields=[id, org_id, order_source_platform, order_source_no, created_at, updated_at]) -> DropUpdateBefore (1/1) (1d29c53fdde1442b97a7dec0aa8650c1) switched from CANCELING to CANCELED.
   jobmanager_1      | 2022-05-06T03:41:18.350820209Z 2022-05-06 03:41:18,350 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Discarding the results produced by task execution 1d29c53fdde1442b97a7dec0aa8650c1.
   jobmanager_1      | 2022-05-06T03:41:18.350837088Z 2022-05-06 03:41:18,350 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - ChangelogNormalize(key=[id]) -> NotNullEnforcer(fields=[id]) -> Sink: Sink(table=[myhive.default.es_ds_t_order], fields=[id, org_id, order_source_platform, order_source_no, created_at, updated_at]) (1/1) (529eade9aab81d77b0dcce25d963b98a) switched from SCHEDULED to CANCELING.
   jobmanager_1      | 2022-05-06T03:41:18.350853127Z 2022-05-06 03:41:18,350 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - ChangelogNormalize(key=[id]) -> NotNullEnforcer(fields=[id]) -> Sink: Sink(table=[myhive.default.es_ds_t_order], fields=[id, org_id, order_source_platform, order_source_no, created_at, updated_at]) (1/1) (529eade9aab81d77b0dcce25d963b98a) switched from CANCELING to CANCELED.
   jobmanager_1      | 2022-05-06T03:41:18.350888673Z 2022-05-06 03:41:18,350 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Discarding the results produced by task execution 529eade9aab81d77b0dcce25d963b98a.
   jobmanager_1      | 2022-05-06T03:41:18.351132167Z 2022-05-06 03:41:18,351 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Registered job manager 00000000000000000000000000000000@akka.tcp://flink@jobmanager:6123/user/rpc/jobmanager_17 for job 6f4d2fb809cd27113aa5a210af958eca.
   jobmanager_1      | 2022-05-06T03:41:18.388171957Z 2022-05-06 03:41:18,388 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - JobManager successfully registered at ResourceManager, leader id: 00000000000000000000000000000000.
   jobmanager_1      | 2022-05-06T03:41:18.388349125Z 2022-05-06 03:41:18,388 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Clearing resource requirements of job 6f4d2fb809cd27113aa5a210af958eca
   jobmanager_1      | 2022-05-06T03:44:18.352081613Z 2022-05-06 03:44:18,351 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job insert-into_myhive.default.es_ds_t_order (6f4d2fb809cd27113aa5a210af958eca) switched from state RESTARTING to RUNNING.
   jobmanager_1      | 2022-05-06T03:44:18.352196316Z 2022-05-06 03:44:18,352 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - No checkpoint found during restore.
   jobmanager_1      | 2022-05-06T03:44:18.352229069Z 2022-05-06 03:44:18,352 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Resetting the Operator Coordinators to an empty state.
   jobmanager_1      | 2022-05-06T03:44:18.352240632Z 2022-05-06 03:44:18,352 INFO  org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator [] - Resetting coordinator to checkpoint.
   jobmanager_1      | 2022-05-06T03:44:18.352655863Z 2022-05-06 03:44:18,352 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: TableSourceScan(table=[[myhive, default, cdc_init_ds_t_order]], fields=[id, org_id, order_source_platform, order_source_no, created_at, updated_at]) -> DropUpdateBefore (1/1) (ee9671e9cae75dc457921722d8b71100) switched from CREATED to SCHEDULED.
   jobmanager_1      | 2022-05-06T03:44:18.352668293Z 2022-05-06 03:44:18,352 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - ChangelogNormalize(key=[id]) -> NotNullEnforcer(fields=[id]) -> Sink: Sink(table=[myhive.default.es_ds_t_order], fields=[id, org_id, order_source_platform, order_source_no, created_at, updated_at]) (1/1) (492ecc6a9fe9a7ef92dbd5af27a4cab8) switched from CREATED to SCHEDULED.
   jobmanager_1      | 2022-05-06T03:44:18.352752881Z 2022-05-06 03:44:18,352 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Closing SourceCoordinator for source Source: TableSourceScan(table=[[myhive, default, cdc_init_ds_t_order]], fields=[id, org_id, order_source_platform, order_source_no, created_at, updated_at]) -> DropUpdateBefore.
   jobmanager_1      | 2022-05-06T03:44:18.352835438Z 2022-05-06 03:44:18,352 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source coordinator for source Source: TableSourceScan(table=[[myhive, default, cdc_init_ds_t_order]], fields=[id, org_id, order_source_platform, order_source_no, created_at, updated_at]) -> DropUpdateBefore closed.
   jobmanager_1      | 2022-05-06T03:44:18.352933577Z 2022-05-06 03:44:18,352 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Starting split enumerator for source Source: TableSourceScan(table=[[myhive, default, cdc_init_ds_t_order]], fields=[id, org_id, order_source_platform, order_source_no, created_at, updated_at]) -> DropUpdateBefore.
   jobmanager_1      | 2022-05-06T03:44:18.353052378Z 2022-05-06 03:44:18,352 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job 6f4d2fb809cd27113aa5a210af958eca: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=1}]
   ```
   ![image](https://user-images.githubusercontent.com/100613/167064411-82a49b44-03af-4a06-b6ea-9b73812ccc58.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] RaigorJiang commented on issue #17299: ShardingSphere-Proxy Unable to work with apache flink cdc connector.

Posted by GitBox <gi...@apache.org>.
RaigorJiang commented on issue #17299:
URL: https://github.com/apache/shardingsphere/issues/17299#issuecomment-1118231744

   Hi @nickfan 
   The support for `information_schema` is being improved, please pay attention to the subsequent version releases.
   Related issue: #16234


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org