You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/04/19 11:53:18 UTC
[GitHub] [incubator-seatunnel] whb-bigdata opened a new issue, #1713: [Bug] [Module Name] Bug title
whb-bigdata opened a new issue, #1713:
URL: https://github.com/apache/incubator-seatunnel/issues/1713
### Search before asking
- [X] I had searched in the [issues](https://github.com/apache/incubator-seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues.
### What happened
flink的kafka source 报错
Exception StackTrace:org.apache.flink.table.api.ValidationException: Temporary table '`default_catalog`.`default_database`.`score_v3_1935`' already exists
at org.apache.flink.table.catalog.CatalogManager.lambda$createTemporaryTable$11(CatalogManager.java:684)
at java.util.HashMap.compute(HashMap.java:1197)
at org.apache.flink.table.catalog.CatalogManager.createTemporaryTable(CatalogManager.java:678)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.createTemporaryView(TableEnvironmentImpl.java:502)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.createTemporaryView(TableEnvironmentImpl.java:488)
at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.createTemporaryView(StreamTableEnvironmentImpl.java:264)
at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.registerDataStream(StreamTableEnvironmentImpl.java:463)
at org.apache.seatunnel.flink.stream.FlinkStreamExecution.registerResultTable(FlinkStreamExecution.java:98)
at org.apache.seatunnel.flink.stream.FlinkStreamExecution.start(FlinkStreamExecution.java:57)
at org.apache.seatunnel.Seatunnel.entryPoint(Seatunnel.java:107)
at org.apache.seatunnel.Seatunnel.run(Seatunnel.java:65)
at org.apache.seatunnel.SeatunnelFlink.main(SeatunnelFlink.java:29)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
显示临时表存在
### SeaTunnel Version
Apache SeaTunnel (Incubating) 2.1.0
### SeaTunnel Config
```conf
env {
# You can set flink configuration here
execution.parallelism = 1
#execution.checkpoint.interval = 10000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
source {
KafkaTableStream {
consumer.bootstrap.servers = "xxxxxxxx"
consumer.group.id = "XXXXXXXXXXXXXXX"
topics = XXXXXXXXXXXXXXXXXXXX
format.type = csv
#schema = "{\"min_position\": 5,\"has_more_items\": false,\"items_html\": \"Car\",\"new_latent_count\": 4}"
schema = "[{\"field\":\"name\",\"type\":\"string\"},{\"field\":\"age\",\"type\":\"int\"}]"
format.field-delimiter = ";"
format.allow-comments = "true"
format.ignore-parse-errors = "true"
result_table_name = "score_v3"
}
}
transform {
sql {
#source_table_name = "score_v3"
sql = "select name,age from score_v3"
result_table_name = "score_v4"
}
}
sink {
ConsoleSink{}
}
```
### Running Command
```shell
bin/start-seatunnel-flink.sh --config ./config/application.conf
```
### Error Exception
```log
Caused by: org.apache.flink.table.api.ValidationException: Temporary table '`default_catalog`.`default_database`.`score_v3_1935`' already exists
at org.apache.flink.table.catalog.CatalogManager.lambda$createTemporaryTable$11(CatalogManager.java:684) ~[flink-table_2.12-1.13.3.jar:1.13.3]
at java.util.HashMap.compute(HashMap.java:1197) ~[?:1.8.0_212]
at org.apache.flink.table.catalog.CatalogManager.createTemporaryTable(CatalogManager.java:678) ~[flink-table_2.12-1.13.3.jar:1.13.3]
at org.apache.flink.table.api.internal.TableEnvironmentImpl.createTemporaryView(TableEnvironmentImpl.java:502) ~[flink-table_2.12-1.13.3.jar:1.13.3]
at org.apache.flink.table.api.internal.TableEnvironmentImpl.createTemporaryView(TableEnvironmentImpl.java:488) ~[flink-table_2.12-1.13.3.jar:1.13.3]
at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.createTemporaryView(StreamTableEnvironmentImpl.java:264) ~[flink-table_2.12-1.13.3.jar:1.13.3]
at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.registerDataStream(StreamTableEnvironmentImpl.java:463) ~[flink-table_2.12-1.13.3.jar:1.13.3]
at org.apache.seatunnel.flink.stream.FlinkStreamExecution.registerResultTable(FlinkStreamExecution.java:98) ~[?:?]
at org.apache.seatunnel.flink.stream.FlinkStreamExecution.start(FlinkStreamExecution.java:57) ~[?:?]
at org.apache.seatunnel.Seatunnel.entryPoint(Seatunnel.java:107) ~[?:?]
at org.apache.seatunnel.Seatunnel.run(Seatunnel.java:65) ~[?:?]
at org.apache.seatunnel.SeatunnelFlink.main(SeatunnelFlink.java:29) ~[?:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_212]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_212]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_212]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_212]
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
```
### Flink or Spark Version
_No response_
### Java or Scala Version
_No response_
### Screenshots
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [incubator-seatunnel] ruanwenjun commented on issue #1713: [Bug] [Flink Kafka source] Temporary table already exists
Posted by GitBox <gi...@apache.org>.
ruanwenjun commented on issue #1713:
URL: https://github.com/apache/incubator-seatunnel/issues/1713#issuecomment-1103172609
@whb-bigdata Hi, thanks for your feedback, this bug has been fixed by #1526, and will release at [2.1.1](https://github.com/apache/incubator-seatunnel/releases/tag/2.1.1).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [incubator-seatunnel] ruanwenjun closed issue #1713: [Bug] [Flink Kafka source] Temporary table already exists
Posted by GitBox <gi...@apache.org>.
ruanwenjun closed issue #1713: [Bug] [Flink Kafka source] Temporary table already exists
URL: https://github.com/apache/incubator-seatunnel/issues/1713
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org