You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@streampark.apache.org by GitBox <gi...@apache.org> on 2022/11/19 06:05:51 UTC

[GitHub] [incubator-streampark] zhangxiangyang opened a new issue, #2066: [Bug] 2.0.0 version submit flink sql job on standalone cluster , unable to start.

zhangxiangyang opened a new issue, #2066:
URL: https://github.com/apache/incubator-streampark/issues/2066

   ### Search before asking
   
   - [X] I had searched in the [issues](https://github.com/apache/incubator-streampark/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues.
   
   
   ### What happened
   
   Run the Flink SQL job according to the official example. Submitting the job to the standalone cluster cannot run normally.
   SQL Demo:
      CREATE TABLE user_log (
       user_id VARCHAR,
       item_id VARCHAR,
       category_id VARCHAR,
       behavior VARCHAR,
       ts TIMESTAMP(3)
    ) WITH (
   'connector.type' = 'kafka', -- 使用 kafka connector
   'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本
   'connector.topic' = 'user_behavior',  -- kafka topic
   'connector.properties.bootstrap.servers'='kafka-1:9092,kafka-2:9092,kafka-3:9092',
   'connector.startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取
   'update-mode' = 'append',
   'format.type' = 'json',  -- 数据源格式为 json
   'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则
    );
   
   CREATE TABLE pvuv_sink (
       dt VARCHAR,
       pv BIGINT,
       uv BIGINT
    ) WITH (
   'connector.type' = 'jdbc', -- 使用 jdbc connector
   'connector.url' = 'jdbc:mysql://test-mysql:3306/test', -- jdbc url
   'connector.table' = 'pvuv_sink', -- 表名
   'connector.username' = 'root', -- 用户名
   'connector.password' = '123456', -- 密码
   'connector.write.flush.max-rows' = '1' -- 默认 5000 条,为了演示改为 1 条
    );
   
   INSERT INTO pvuv_sink
   SELECT
     DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') dt,
     COUNT(*) AS pv,
     COUNT(DISTINCT user_id) AS uv
   FROM user_log
   GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00');
   
   Dependencies:
       <dependency>
           <groupId>org.apache.flink</groupId>
           <artifactId>flink-json</artifactId>
           <version>1.16.0</version>
     </dependency>
     <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-jdbc</artifactId>
          <version>1.16.0</version>
     </dependency>
     <dependency>
         <groupId>mysql</groupId>
         <artifactId>mysql-connector-java</artifactId>
         <version>5.1.48</version>
     </dependency>
     <dependency>
       <groupId>org.apache.flink</groupId>
       <artifactId>flink-sql-connector-kafka</artifactId>
       <version>1.16.0</version>
     </dependency>
   
   
   ### StreamPark Version
   
   2.0.0
   
   ### Java Version
   
   _No response_
   
   ### Flink Version
   
   flink 1.16.0
   
   ### Scala Version of Flink
   
   2.12
   
   ### Error Exception
   
   ```log
   2022-11-19 13:52:50 | ERROR | streampark-deploy-executor-0 | org.apache.streampark.flink.submit.impl.RemoteSubmit:71] [StreamPark] REMOTE mode submit by restApi fail.
   2022-11-19 13:52:50 | WARN  | streampark-deploy-executor-0 | org.apache.streampark.flink.submit.impl.RemoteSubmit:63] [StreamPark] [flink-submit] RestAPI Submit Plan failed,try JobGraph Submit Plan now.
   2022-11-19 13:52:50 | INFO  | streampark-deploy-executor-0 | org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend:273] Using predefined options: DEFAULT.
   2022-11-19 13:52:50 | WARN  | streampark-deploy-executor-0 | org.apache.streampark.flink.core.FlinkTableInitializer:63] [StreamPark] Usage:can't fond config,you can set "--conf $path " in main arguments
   2022-11-19 13:52:50 | INFO  | streampark-deploy-executor-0 | org.apache.streampark.flink.core.FlinkTableInitializer:39] [StreamPark] blinkPlanner will be use.
   2022-11-19 13:52:50 | INFO  | streampark-deploy-executor-0 | org.apache.streampark.flink.core.FlinkTableInitializer:39] [StreamPark] components should work in streaming mode
   2022-11-19 13:52:53 | INFO  | streampark-deploy-executor-0 | org.apache.streampark.flink.core.FlinkSqlExecutor:39] [StreamPark] create table:CREATE TABLE user_log (
       user_id VARCHAR,
       item_id VARCHAR,
       category_id VARCHAR,
       behavior VARCHAR,
       ts TIMESTAMP(3)
    ) WITH (
   'connector.type' = 'kafka', 
   'connector.version' = 'universal',  
   'connector.topic' = 'user_behavior',  
   'connector.properties.bootstrap.servers'='kafka-1:9092,kafka-2:9092,kafka-3:9092',
   'connector.startup-mode' = 'earliest-offset', 
   'update-mode' = 'append',
   'format.type' = 'json',  
   'format.derive-schema' = 'true' 
    )
   2022-11-19 13:52:53 | INFO  | streampark-deploy-executor-0 | org.apache.streampark.flink.core.FlinkSqlExecutor:39] [StreamPark] create table:CREATE TABLE pvuv_sink (
       dt VARCHAR,
       pv BIGINT,
       uv BIGINT
    ) WITH (
   'connector.type' = 'jdbc', 
   'connector.url' = 'jdbc:mysql://test-mysql:3306/test', 
   'connector.table' = 'pvuv_sink', 
   'connector.username' = 'root', 
   'connector.password' = '123456', 
   'connector.write.flush.max-rows' = '1' 
    )
   2022-11-19 13:52:53 | ERROR | streampark-deploy-executor-0 | org.apache.streampark.flink.submit.impl.RemoteSubmit:71] [StreamPark] REMOTE mode submit by jobGraph fail.
   org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: findAndCreateTableSource failed.
   	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
   	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
   	at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:158)
   	at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:82)
   	at org.apache.streampark.flink.submit.trait.FlinkSubmitTrait.getJobGraph(FlinkSubmitTrait.scala:181)
   	at org.apache.streampark.flink.submit.trait.FlinkSubmitTrait.getJobGraph$(FlinkSubmitTrait.scala:164)
   	at org.apache.streampark.flink.submit.impl.RemoteSubmit$.jobGraphSubmit(RemoteSubmit.scala:118)
   	at org.apache.streampark.flink.submit.impl.RemoteSubmit$.$anonfun$doSubmit$2(RemoteSubmit.scala:48)
   	at org.apache.streampark.flink.submit.trait.FlinkSubmitTrait.$anonfun$trySubmit$5(FlinkSubmitTrait.scala:154)
   	at scala.util.Try$.apply(Try.scala:209)
   	at org.apache.streampark.flink.submit.trait.FlinkSubmitTrait.$anonfun$trySubmit$3(FlinkSubmitTrait.scala:154)
   	at scala.util.Failure.getOrElse(Try.scala:218)
   	at org.apache.streampark.flink.submit.trait.FlinkSubmitTrait.trySubmit(FlinkSubmitTrait.scala:152)
   	at org.apache.streampark.flink.submit.trait.FlinkSubmitTrait.trySubmit$(FlinkSubmitTrait.scala:144)
   	at org.apache.streampark.flink.submit.impl.RemoteSubmit$.doSubmit(RemoteSubmit.scala:48)
   	at org.apache.streampark.flink.submit.trait.FlinkSubmitTrait.submit(FlinkSubmitTrait.scala:114)
   	at org.apache.streampark.flink.submit.trait.FlinkSubmitTrait.submit$(FlinkSubmitTrait.scala:59)
   	at org.apache.streampark.flink.submit.impl.RemoteSubmit$.submit(RemoteSubmit.scala:37)
   	at org.apache.streampark.flink.submit.FlinkSubmit$.submit(FlinkSubmit.scala:29)
   	at org.apache.streampark.flink.submit.FlinkSubmit.submit(FlinkSubmit.scala)
   	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at java.lang.reflect.Method.invoke(Method.java:498)
   	at org.apache.streampark.flink.submit.FlinkSubmitter$.$anonfun$submit$1(FlinkSubmitter.scala:56)
   	at org.apache.streampark.flink.proxy.FlinkShimsProxy$.$anonfun$proxy$1(FlinkShimsProxy.scala:68)
   	at org.apache.streampark.common.util.ClassLoaderUtils$.runAsClassLoader(ClassLoaderUtils.scala:39)
   	at org.apache.streampark.flink.proxy.FlinkShimsProxy$.proxy(FlinkShimsProxy.scala:68)
   	at org.apache.streampark.flink.submit.FlinkSubmitter$.submit(FlinkSubmitter.scala:51)
   	at org.apache.streampark.flink.submit.FlinkSubmitter.submit(FlinkSubmitter.scala)
   	at org.apache.streampark.console.core.service.impl.ApplicationServiceImpl.lambda$start$7(ApplicationServiceImpl.java:1408)
   	at java.util.concurrent.CompletableFuture$AsyncSupply.run$$$capture(CompletableFuture.java:1604)
   	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java)
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   	at java.lang.Thread.run(Thread.java:750)
   Caused by: org.apache.flink.table.api.TableException: findAndCreateTableSource failed.
   	at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:47)
   	at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:78)
   	at org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.findAndCreateLegacyTableSource(LegacyCatalogSourceTable.scala:181)
   	at org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.toRel(LegacyCatalogSourceTable.scala:92)
   	at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3619)
   	at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2559)
   	at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2175)
   	at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2095)
   	at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2038)
   	at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:669)
   	at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:657)
   	at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3462)
   	at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
   	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:215)
   	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:191)
   	at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1498)
   	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1253)
   	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:374)
   	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNodeOrFail(SqlToOperationConverter.java:384)
   	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:828)
   	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:351)
   	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:262)
   	at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
   	at org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:62)
   	at org.apache.flink.table.api.bridge.scala.internal.StreamStatementSetImpl.addInsertSql(StreamStatementSetImpl.scala:36)
   	at org.apache.flink.table.api.bridge.scala.internal.StreamStatementSetImpl.addInsertSql(StreamStatementSetImpl.scala:27)
   	at org.apache.streampark.flink.core.FlinkSqlExecutor$.$anonfun$executeSql$3(FlinkSqlExecutor.scala:106)
   	at org.apache.streampark.flink.core.FlinkSqlExecutor$.$anonfun$executeSql$3$adapted(FlinkSqlExecutor.scala:48)
   	at scala.collection.immutable.List.foreach(List.scala:388)
   	at org.apache.streampark.flink.core.FlinkSqlExecutor$.executeSql(FlinkSqlExecutor.scala:48)
   	at org.apache.streampark.flink.core.FlinkStreamTableTrait.sql(FlinkStreamTableTrait.scala:82)
   	at org.apache.streampark.flink.cli.SqlClient$StreamSqlApp$.handle(SqlClient.scala:64)
   	at org.apache.streampark.flink.core.scala.FlinkStreamTable.main(FlinkStreamTable.scala:47)
   	at org.apache.streampark.flink.core.scala.FlinkStreamTable.main$(FlinkStreamTable.scala:44)
   	at org.apache.streampark.flink.cli.SqlClient$StreamSqlApp$.main(SqlClient.scala:63)
   	at org.apache.streampark.flink.cli.SqlClient$.delayedEndpoint$org$apache$streampark$flink$cli$SqlClient$1(SqlClient.scala:55)
   	at org.apache.streampark.flink.cli.SqlClient$delayedInit$body.apply(SqlClient.scala:30)
   	at scala.Function0.apply$mcV$sp(Function0.scala:34)
   	at scala.Function0.apply$mcV$sp$(Function0.scala:34)
   	at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
   	at scala.App.$anonfun$main$1$adapted(App.scala:76)
   	at scala.collection.immutable.List.foreach(List.scala:388)
   	at scala.App.main(App.scala:76)
   	at scala.App.main$(App.scala:74)
   	at org.apache.streampark.flink.cli.SqlClient$.main(SqlClient.scala:30)
   	at org.apache.streampark.flink.cli.SqlClient.main(SqlClient.scala)
   	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at java.lang.reflect.Method.invoke(Method.java:498)
   	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
   	... 35 more
   Caused by: org.apache.flink.table.factories.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
   the classpath.
   
   Reason: Required context properties mismatch.
   
   The matching candidates:
   org.apache.flink.table.sources.CsvAppendTableSourceFactory
   Mismatched properties:
   'connector.type' expects 'filesystem', but is 'kafka'
   'format.type' expects 'csv', but is 'json'
   
   The following properties are requested:
   connector.properties.bootstrap.servers=kafka-1:9092,kafka-2:9092,kafka-3:9092
   connector.startup-mode=earliest-offset
   connector.topic=user_behavior
   connector.type=kafka
   connector.version=universal
   format.derive-schema=true
   format.type=json
   schema.0.data-type=VARCHAR(2147483647)
   schema.0.name=user_id
   schema.1.data-type=VARCHAR(2147483647)
   schema.1.name=item_id
   schema.2.data-type=VARCHAR(2147483647)
   schema.2.name=category_id
   schema.3.data-type=VARCHAR(2147483647)
   schema.3.name=behavior
   schema.4.data-type=TIMESTAMP(3)
   schema.4.name=ts
   update-mode=append
   
   The following factories have been considered:
   org.apache.flink.table.sources.CsvBatchTableSourceFactory
   org.apache.flink.table.sources.CsvAppendTableSourceFactory
   	at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:313)
   	at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:191)
   	at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:152)
   	at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:106)
   	at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:43)
   	... 85 more
   2022-11-19 13:52:53 | ERROR | streampark-deploy-executor-0 | org.apache.streampark.flink.submit.impl.RemoteSubmit:71] [StreamPark] [flink-submit] Both Rest API Submit Plan and JobGraph Submit Plan failed.
   ```
   
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] zhangxiangyang closed issue #2066: [Bug] 2.0.0 version submit flink sql job on standalone cluster , unable to start.

Posted by GitBox <gi...@apache.org>.
zhangxiangyang closed issue #2066: [Bug] 2.0.0 version submit flink sql job on standalone cluster , unable to start.
URL: https://github.com/apache/incubator-streampark/issues/2066


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org