You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/04/19 11:53:18 UTC

[GitHub] [incubator-seatunnel] whb-bigdata opened a new issue, #1713: [Bug] [Module Name] Bug title

whb-bigdata opened a new issue, #1713:
URL: https://github.com/apache/incubator-seatunnel/issues/1713

   ### Search before asking
   
   - [X] I had searched in the [issues](https://github.com/apache/incubator-seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues.
   
   
   ### What happened
   
   flink的kafka source 报错
   Exception StackTrace:org.apache.flink.table.api.ValidationException: Temporary table '`default_catalog`.`default_database`.`score_v3_1935`' already exists
           at org.apache.flink.table.catalog.CatalogManager.lambda$createTemporaryTable$11(CatalogManager.java:684)
           at java.util.HashMap.compute(HashMap.java:1197)
           at org.apache.flink.table.catalog.CatalogManager.createTemporaryTable(CatalogManager.java:678)
           at org.apache.flink.table.api.internal.TableEnvironmentImpl.createTemporaryView(TableEnvironmentImpl.java:502)
           at org.apache.flink.table.api.internal.TableEnvironmentImpl.createTemporaryView(TableEnvironmentImpl.java:488)
           at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.createTemporaryView(StreamTableEnvironmentImpl.java:264)
           at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.registerDataStream(StreamTableEnvironmentImpl.java:463)
           at org.apache.seatunnel.flink.stream.FlinkStreamExecution.registerResultTable(FlinkStreamExecution.java:98)
           at org.apache.seatunnel.flink.stream.FlinkStreamExecution.start(FlinkStreamExecution.java:57)
           at org.apache.seatunnel.Seatunnel.entryPoint(Seatunnel.java:107)
           at org.apache.seatunnel.Seatunnel.run(Seatunnel.java:65)
           at org.apache.seatunnel.SeatunnelFlink.main(SeatunnelFlink.java:29)
           at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
           at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
           at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
           at java.lang.reflect.Method.invoke(Method.java:498)
           at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
           at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
           at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
           at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
           at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
           at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
           at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
           at java.security.AccessController.doPrivileged(Native Method)
           at javax.security.auth.Subject.doAs(Subject.java:422)
           at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
           at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
           at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
   显示临时表存在
   
   ### SeaTunnel Version
   
   Apache SeaTunnel (Incubating) 2.1.0 
   
   ### SeaTunnel Config
   
   ```conf
   env {
     # You can set flink configuration here
     execution.parallelism = 1
     #execution.checkpoint.interval = 10000
     #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
   }
   
   source {
     KafkaTableStream {
       consumer.bootstrap.servers = "xxxxxxxx"
       consumer.group.id = "XXXXXXXXXXXXXXX"
       topics = XXXXXXXXXXXXXXXXXXXX
       format.type = csv
       #schema = "{\"min_position\": 5,\"has_more_items\": false,\"items_html\": \"Car\",\"new_latent_count\": 4}"
       schema = "[{\"field\":\"name\",\"type\":\"string\"},{\"field\":\"age\",\"type\":\"int\"}]"
       format.field-delimiter = ";"
       format.allow-comments = "true"
       format.ignore-parse-errors = "true"
       result_table_name = "score_v3"
     }
   }
   transform {
     sql {
         #source_table_name = "score_v3"
         sql = "select name,age from score_v3"
         result_table_name = "score_v4"
       }
   }
   sink {
   	ConsoleSink{}
   }
   ```
   
   
   ### Running Command
   
   ```shell
   bin/start-seatunnel-flink.sh --config ./config/application.conf
   ```
   
   
   ### Error Exception
   
   ```log
   Caused by: org.apache.flink.table.api.ValidationException: Temporary table '`default_catalog`.`default_database`.`score_v3_1935`' already exists
           at org.apache.flink.table.catalog.CatalogManager.lambda$createTemporaryTable$11(CatalogManager.java:684) ~[flink-table_2.12-1.13.3.jar:1.13.3]
           at java.util.HashMap.compute(HashMap.java:1197) ~[?:1.8.0_212]
           at org.apache.flink.table.catalog.CatalogManager.createTemporaryTable(CatalogManager.java:678) ~[flink-table_2.12-1.13.3.jar:1.13.3]
           at org.apache.flink.table.api.internal.TableEnvironmentImpl.createTemporaryView(TableEnvironmentImpl.java:502) ~[flink-table_2.12-1.13.3.jar:1.13.3]
           at org.apache.flink.table.api.internal.TableEnvironmentImpl.createTemporaryView(TableEnvironmentImpl.java:488) ~[flink-table_2.12-1.13.3.jar:1.13.3]
           at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.createTemporaryView(StreamTableEnvironmentImpl.java:264) ~[flink-table_2.12-1.13.3.jar:1.13.3]
           at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.registerDataStream(StreamTableEnvironmentImpl.java:463) ~[flink-table_2.12-1.13.3.jar:1.13.3]
           at org.apache.seatunnel.flink.stream.FlinkStreamExecution.registerResultTable(FlinkStreamExecution.java:98) ~[?:?]
           at org.apache.seatunnel.flink.stream.FlinkStreamExecution.start(FlinkStreamExecution.java:57) ~[?:?]
           at org.apache.seatunnel.Seatunnel.entryPoint(Seatunnel.java:107) ~[?:?]
           at org.apache.seatunnel.Seatunnel.run(Seatunnel.java:65) ~[?:?]
           at org.apache.seatunnel.SeatunnelFlink.main(SeatunnelFlink.java:29) ~[?:?]
           at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_212]
           at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_212]
           at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_212]
           at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_212]
           at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.12-1.13.3.jar:1.13.3]
   ```
   
   
   ### Flink or Spark Version
   
   _No response_
   
   ### Java or Scala Version
   
   _No response_
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ruanwenjun commented on issue #1713: [Bug] [Flink Kafka source] Temporary table already exists

Posted by GitBox <gi...@apache.org>.
ruanwenjun commented on issue #1713:
URL: https://github.com/apache/incubator-seatunnel/issues/1713#issuecomment-1103172609

   @whb-bigdata Hi, thanks for your feedback, this bug has been fixed by #1526, and will release at [2.1.1](https://github.com/apache/incubator-seatunnel/releases/tag/2.1.1). 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ruanwenjun closed issue #1713: [Bug] [Flink Kafka source] Temporary table already exists

Posted by GitBox <gi...@apache.org>.
ruanwenjun closed issue #1713: [Bug] [Flink Kafka source]  Temporary table already exists
URL: https://github.com/apache/incubator-seatunnel/issues/1713


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org