You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@streampark.apache.org by GitBox <gi...@apache.org> on 2022/11/19 06:05:51 UTC
[GitHub] [incubator-streampark] zhangxiangyang opened a new issue, #2066: [Bug] 2.0.0 version submit flink sql job on standalone cluster , unable to start.
zhangxiangyang opened a new issue, #2066:
URL: https://github.com/apache/incubator-streampark/issues/2066
### Search before asking
- [X] I had searched in the [issues](https://github.com/apache/incubator-streampark/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues.
### What happened
Run the Flink SQL job according to the official example. Submitting the job to the standalone cluster cannot run normally.
SQL Demo:
CREATE TABLE user_log (
user_id VARCHAR,
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR,
ts TIMESTAMP(3)
) WITH (
'connector.type' = 'kafka', -- 使用 kafka connector
'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本
'connector.topic' = 'user_behavior', -- kafka topic
'connector.properties.bootstrap.servers'='kafka-1:9092,kafka-2:9092,kafka-3:9092',
'connector.startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取
'update-mode' = 'append',
'format.type' = 'json', -- 数据源格式为 json
'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则
);
CREATE TABLE pvuv_sink (
dt VARCHAR,
pv BIGINT,
uv BIGINT
) WITH (
'connector.type' = 'jdbc', -- 使用 jdbc connector
'connector.url' = 'jdbc:mysql://test-mysql:3306/test', -- jdbc url
'connector.table' = 'pvuv_sink', -- 表名
'connector.username' = 'root', -- 用户名
'connector.password' = '123456', -- 密码
'connector.write.flush.max-rows' = '1' -- 默认 5000 条,为了演示改为 1 条
);
INSERT INTO pvuv_sink
SELECT
DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') dt,
COUNT(*) AS pv,
COUNT(DISTINCT user_id) AS uv
FROM user_log
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00');
Dependencies:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>1.16.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.48</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka</artifactId>
<version>1.16.0</version>
</dependency>
### StreamPark Version
2.0.0
### Java Version
_No response_
### Flink Version
flink 1.16.0
### Scala Version of Flink
2.12
### Error Exception
```log
2022-11-19 13:52:50 | ERROR | streampark-deploy-executor-0 | org.apache.streampark.flink.submit.impl.RemoteSubmit:71] [StreamPark] REMOTE mode submit by restApi fail.
2022-11-19 13:52:50 | WARN | streampark-deploy-executor-0 | org.apache.streampark.flink.submit.impl.RemoteSubmit:63] [StreamPark] [flink-submit] RestAPI Submit Plan failed,try JobGraph Submit Plan now.
2022-11-19 13:52:50 | INFO | streampark-deploy-executor-0 | org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend:273] Using predefined options: DEFAULT.
2022-11-19 13:52:50 | WARN | streampark-deploy-executor-0 | org.apache.streampark.flink.core.FlinkTableInitializer:63] [StreamPark] Usage:can't fond config,you can set "--conf $path " in main arguments
2022-11-19 13:52:50 | INFO | streampark-deploy-executor-0 | org.apache.streampark.flink.core.FlinkTableInitializer:39] [StreamPark] blinkPlanner will be use.
2022-11-19 13:52:50 | INFO | streampark-deploy-executor-0 | org.apache.streampark.flink.core.FlinkTableInitializer:39] [StreamPark] components should work in streaming mode
2022-11-19 13:52:53 | INFO | streampark-deploy-executor-0 | org.apache.streampark.flink.core.FlinkSqlExecutor:39] [StreamPark] create table:CREATE TABLE user_log (
user_id VARCHAR,
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR,
ts TIMESTAMP(3)
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'user_behavior',
'connector.properties.bootstrap.servers'='kafka-1:9092,kafka-2:9092,kafka-3:9092',
'connector.startup-mode' = 'earliest-offset',
'update-mode' = 'append',
'format.type' = 'json',
'format.derive-schema' = 'true'
)
2022-11-19 13:52:53 | INFO | streampark-deploy-executor-0 | org.apache.streampark.flink.core.FlinkSqlExecutor:39] [StreamPark] create table:CREATE TABLE pvuv_sink (
dt VARCHAR,
pv BIGINT,
uv BIGINT
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://test-mysql:3306/test',
'connector.table' = 'pvuv_sink',
'connector.username' = 'root',
'connector.password' = '123456',
'connector.write.flush.max-rows' = '1'
)
2022-11-19 13:52:53 | ERROR | streampark-deploy-executor-0 | org.apache.streampark.flink.submit.impl.RemoteSubmit:71] [StreamPark] REMOTE mode submit by jobGraph fail.
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: findAndCreateTableSource failed.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:158)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:82)
at org.apache.streampark.flink.submit.trait.FlinkSubmitTrait.getJobGraph(FlinkSubmitTrait.scala:181)
at org.apache.streampark.flink.submit.trait.FlinkSubmitTrait.getJobGraph$(FlinkSubmitTrait.scala:164)
at org.apache.streampark.flink.submit.impl.RemoteSubmit$.jobGraphSubmit(RemoteSubmit.scala:118)
at org.apache.streampark.flink.submit.impl.RemoteSubmit$.$anonfun$doSubmit$2(RemoteSubmit.scala:48)
at org.apache.streampark.flink.submit.trait.FlinkSubmitTrait.$anonfun$trySubmit$5(FlinkSubmitTrait.scala:154)
at scala.util.Try$.apply(Try.scala:209)
at org.apache.streampark.flink.submit.trait.FlinkSubmitTrait.$anonfun$trySubmit$3(FlinkSubmitTrait.scala:154)
at scala.util.Failure.getOrElse(Try.scala:218)
at org.apache.streampark.flink.submit.trait.FlinkSubmitTrait.trySubmit(FlinkSubmitTrait.scala:152)
at org.apache.streampark.flink.submit.trait.FlinkSubmitTrait.trySubmit$(FlinkSubmitTrait.scala:144)
at org.apache.streampark.flink.submit.impl.RemoteSubmit$.doSubmit(RemoteSubmit.scala:48)
at org.apache.streampark.flink.submit.trait.FlinkSubmitTrait.submit(FlinkSubmitTrait.scala:114)
at org.apache.streampark.flink.submit.trait.FlinkSubmitTrait.submit$(FlinkSubmitTrait.scala:59)
at org.apache.streampark.flink.submit.impl.RemoteSubmit$.submit(RemoteSubmit.scala:37)
at org.apache.streampark.flink.submit.FlinkSubmit$.submit(FlinkSubmit.scala:29)
at org.apache.streampark.flink.submit.FlinkSubmit.submit(FlinkSubmit.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.streampark.flink.submit.FlinkSubmitter$.$anonfun$submit$1(FlinkSubmitter.scala:56)
at org.apache.streampark.flink.proxy.FlinkShimsProxy$.$anonfun$proxy$1(FlinkShimsProxy.scala:68)
at org.apache.streampark.common.util.ClassLoaderUtils$.runAsClassLoader(ClassLoaderUtils.scala:39)
at org.apache.streampark.flink.proxy.FlinkShimsProxy$.proxy(FlinkShimsProxy.scala:68)
at org.apache.streampark.flink.submit.FlinkSubmitter$.submit(FlinkSubmitter.scala:51)
at org.apache.streampark.flink.submit.FlinkSubmitter.submit(FlinkSubmitter.scala)
at org.apache.streampark.console.core.service.impl.ApplicationServiceImpl.lambda$start$7(ApplicationServiceImpl.java:1408)
at java.util.concurrent.CompletableFuture$AsyncSupply.run$$$capture(CompletableFuture.java:1604)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.flink.table.api.TableException: findAndCreateTableSource failed.
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:47)
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:78)
at org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.findAndCreateLegacyTableSource(LegacyCatalogSourceTable.scala:181)
at org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.toRel(LegacyCatalogSourceTable.scala:92)
at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3619)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2559)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2175)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2095)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2038)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:669)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:657)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3462)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:215)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:191)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1498)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1253)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:374)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNodeOrFail(SqlToOperationConverter.java:384)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:828)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:351)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:262)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
at org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:62)
at org.apache.flink.table.api.bridge.scala.internal.StreamStatementSetImpl.addInsertSql(StreamStatementSetImpl.scala:36)
at org.apache.flink.table.api.bridge.scala.internal.StreamStatementSetImpl.addInsertSql(StreamStatementSetImpl.scala:27)
at org.apache.streampark.flink.core.FlinkSqlExecutor$.$anonfun$executeSql$3(FlinkSqlExecutor.scala:106)
at org.apache.streampark.flink.core.FlinkSqlExecutor$.$anonfun$executeSql$3$adapted(FlinkSqlExecutor.scala:48)
at scala.collection.immutable.List.foreach(List.scala:388)
at org.apache.streampark.flink.core.FlinkSqlExecutor$.executeSql(FlinkSqlExecutor.scala:48)
at org.apache.streampark.flink.core.FlinkStreamTableTrait.sql(FlinkStreamTableTrait.scala:82)
at org.apache.streampark.flink.cli.SqlClient$StreamSqlApp$.handle(SqlClient.scala:64)
at org.apache.streampark.flink.core.scala.FlinkStreamTable.main(FlinkStreamTable.scala:47)
at org.apache.streampark.flink.core.scala.FlinkStreamTable.main$(FlinkStreamTable.scala:44)
at org.apache.streampark.flink.cli.SqlClient$StreamSqlApp$.main(SqlClient.scala:63)
at org.apache.streampark.flink.cli.SqlClient$.delayedEndpoint$org$apache$streampark$flink$cli$SqlClient$1(SqlClient.scala:55)
at org.apache.streampark.flink.cli.SqlClient$delayedInit$body.apply(SqlClient.scala:30)
at scala.Function0.apply$mcV$sp(Function0.scala:34)
at scala.Function0.apply$mcV$sp$(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App.$anonfun$main$1$adapted(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:388)
at scala.App.main(App.scala:76)
at scala.App.main$(App.scala:74)
at org.apache.streampark.flink.cli.SqlClient$.main(SqlClient.scala:30)
at org.apache.streampark.flink.cli.SqlClient.main(SqlClient.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
... 35 more
Caused by: org.apache.flink.table.factories.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.
Reason: Required context properties mismatch.
The matching candidates:
org.apache.flink.table.sources.CsvAppendTableSourceFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'kafka'
'format.type' expects 'csv', but is 'json'
The following properties are requested:
connector.properties.bootstrap.servers=kafka-1:9092,kafka-2:9092,kafka-3:9092
connector.startup-mode=earliest-offset
connector.topic=user_behavior
connector.type=kafka
connector.version=universal
format.derive-schema=true
format.type=json
schema.0.data-type=VARCHAR(2147483647)
schema.0.name=user_id
schema.1.data-type=VARCHAR(2147483647)
schema.1.name=item_id
schema.2.data-type=VARCHAR(2147483647)
schema.2.name=category_id
schema.3.data-type=VARCHAR(2147483647)
schema.3.name=behavior
schema.4.data-type=TIMESTAMP(3)
schema.4.name=ts
update-mode=append
The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:313)
at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:191)
at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:152)
at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:106)
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:43)
... 85 more
2022-11-19 13:52:53 | ERROR | streampark-deploy-executor-0 | org.apache.streampark.flink.submit.impl.RemoteSubmit:71] [StreamPark] [flink-submit] Both Rest API Submit Plan and JobGraph Submit Plan failed.
```
### Screenshots
_No response_
### Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [incubator-streampark] zhangxiangyang closed issue #2066: [Bug] 2.0.0 version submit flink sql job on standalone cluster , unable to start.
Posted by GitBox <gi...@apache.org>.
zhangxiangyang closed issue #2066: [Bug] 2.0.0 version submit flink sql job on standalone cluster , unable to start.
URL: https://github.com/apache/incubator-streampark/issues/2066
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org