You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/03/28 10:28:29 UTC

[GitHub] [incubator-seatunnel] realdengziqi opened a new pull request #1598: [Bug][seatunnel-connector-flink-KafkaSink] fix not being able to send data to Kafka.

realdengziqi opened a new pull request #1598:
URL: https://github.com/apache/incubator-seatunnel/pull/1598


   <!--
   
   Thank you for contributing to SeaTunnel! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [GITHUB issue](https://github.com/apache/incubator-seatunnel/issues).
   
     - Name the pull request in the form "[Feature] [component] Title of the pull request", where *Feature* can be replaced by `Hotfix`, `Bug`, etc.
   
     - Minor fixes should be named following this pattern: `[hotfix] [docs] Fix typo in README.md doc`.
   
   -->
   
   ## Purpose of this pull request
   
   fix not being able to send data to Kafka.
   This problem should be caused by an outdated API. After I changed it to executeInsert, Seatunnel worked fine.
   Here is my application config. You can use this script to test.
   ```
   env {
     # You can set flink configuration here
     execution.parallelism = 1
     #execution.checkpoint.interval = 10000
     #execution.checkpoint.data-uri = "hdfs://hadoop102:9092/checkpoint"
   }
   
   
   source {
           KafkaTableStream {
                   consumer.bootstrap.servers = "hadoop102:9092"
                   consumer.group.id = "seatunnel-test"
                   topics = test_csv
                   result_table_name = test123
                   format.type = csv
                   schema = "[{\"field\":\"name\",\"type\":\"string\"},{\"field\":\"age\",\"type\":\"string\"}]"
                   format.field-delimiter = ";"
                   format.allow-comments = "true"
                   format.ignore-parse-errors = "true"
           }
   }
   
   
   transform {
           sql{
                   source_table_name = test123
                   sql = "select name,age from test123"
   
           }
   }
   
   sink {
      KafkaTable {
   
            producer.bootstrap.servers = "hadoop102:9092"
            topics = test_sink
      }
   }
   
   ```
   
   <!-- Describe the purpose of this pull request. For example: This pull request adds checkstyle plugin.-->
   
   ## Check list
   
   * [x] Code changed are covered with tests, or it does not need tests for reason:
   * [ ] If any new Jar binary package adding in you PR, please add License Notice according
     [New License Guide](https://github.com/apache/incubator-seatunnel/blob/dev/docs/en/developement/NewLicenseGuide.md)
   * [ ] If necessary, please update the documentation to describe the new feature. https://github.com/apache/incubator-seatunnel/tree/dev/docs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] BenJFan commented on pull request #1598: [Bug][seatunnel-connector-flink-KafkaSink] fix not being able to send data to Kafka.

Posted by GitBox <gi...@apache.org>.
BenJFan commented on pull request #1598:
URL: https://github.com/apache/incubator-seatunnel/pull/1598#issuecomment-1080505065


   In #1593 , @tobezhou33 say this method will create two job in one process, this will make data sink twice. Please check this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] realdengziqi commented on pull request #1598: [Bug][seatunnel-connector-flink-KafkaSink] fix not being able to send data to Kafka.

Posted by GitBox <gi...@apache.org>.
realdengziqi commented on pull request #1598:
URL: https://github.com/apache/incubator-seatunnel/pull/1598#issuecomment-1081361103


   @CalvinKirs @BenJFan @tobezhou33 I modified the code to let it pass checkstyle. and it has been compiled locally.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] realdengziqi commented on pull request #1598: [Bug][seatunnel-connector-flink-KafkaSink] fix not being able to send data to Kafka.

Posted by GitBox <gi...@apache.org>.
realdengziqi commented on pull request #1598:
URL: https://github.com/apache/incubator-seatunnel/pull/1598#issuecomment-1081090818


   @CalvinKirs  @BenJFan @tobezhou33 Thanks for your help, after trying it, I confirmed that `executeInsert` does cause two flink jobs to be submitted. So, I followed tobezhou33's advice and used datastream, FlinkKafkaProducer and JsonRowSerializationSchema to solve this problem. I tested it on my own cluster and it works fine. new commits at [9ea79fd](https://github.com/apache/incubator-seatunnel/commit/9ea79fdb027d6584369cbebf8cf92f470a7d995f)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] tobezhou33 commented on pull request #1598: [Bug][seatunnel-connector-flink-KafkaSink] fix not being able to send data to Kafka.

Posted by GitBox <gi...@apache.org>.
tobezhou33 commented on pull request #1598:
URL: https://github.com/apache/incubator-seatunnel/pull/1598#issuecomment-1080513751


   i have used table.excuteinsert to do it ,but i find it submit two jobs. because excuteinsert will execEnv.executeAsync() to submit a job named 'insert-into_',  and if use old planner ,it will send two same msg to sink.
   ![image](https://user-images.githubusercontent.com/32997128/160385393-4877fbb3-e5f2-40c1-bdcd-599cf18f5bb4.png)
   
   
   i think tableenv.excute and StreamExecutionEnvironment().execute() only can excute once.  
   
   the kafkatable.java is used jsonFormatDescriptor. maybe we can use datatestream api FlinkKafkaProducer and JsonRowSerializationSchema . 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] realdengziqi commented on pull request #1598: [Bug][seatunnel-connector-flink-KafkaSink] fix not being able to send data to Kafka.

Posted by GitBox <gi...@apache.org>.
realdengziqi commented on pull request #1598:
URL: https://github.com/apache/incubator-seatunnel/pull/1598#issuecomment-1081365304


   @BenJFan thanks. I have resolved the conflict and force push again


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] CalvinKirs merged pull request #1598: [Bug][seatunnel-connector-flink-KafkaSink] fix not being able to send data to Kafka.

Posted by GitBox <gi...@apache.org>.
CalvinKirs merged pull request #1598:
URL: https://github.com/apache/incubator-seatunnel/pull/1598


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org