You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by "Yizhou-Yang (via GitHub)" <gi...@apache.org> on 2023/04/26 03:49:13 UTC

[GitHub] [inlong] Yizhou-Yang opened a new pull request, #7924: Kafka partition

Yizhou-Yang opened a new pull request, #7924:
URL: https://github.com/apache/inlong/pull/7924

   ### Prepare a Pull Request
   - Fixes #7900 
   
   ### Motivation
   See issue.
   
   ### Modifications
   Added PrimaryKeyPartitioner, and made modifications to support upsert-kafka
   
   
   ### Verifying this change
   1. executing the sql in the issue again
   2. create a kafka-mysql task to consume all the messages back to mysql
   ![Uploading Pasted Graphic 14.png…]()
   the mysql table fluctuated from 1000-2000 rows, and in the end it redeced to 0 rows, so I assume that the issue has been solved. The format used is json, for canal-json/avro/csv, I only tested that the primaryKey partitition is in effect and there are no errors. 
   
     
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Yizhou-Yang closed pull request #7924: [INLONG-7900][Sort] Support partition by primary key when upsert single table of Kafka

Posted by "Yizhou-Yang (via GitHub)" <gi...@apache.org>.
Yizhou-Yang closed pull request #7924: [INLONG-7900][Sort] Support partition by primary key when upsert single table of Kafka
URL: https://github.com/apache/inlong/pull/7924


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #7924: [INLONG-7900][Sort] Support partition by primary key when upsert single table of Kafka

Posted by "Yizhou-Yang (via GitHub)" <gi...@apache.org>.
Yizhou-Yang commented on code in PR #7924:
URL: https://github.com/apache/inlong/pull/7924#discussion_r1188185705


##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/partitioner/PrimaryKeyPartitioner.java:
##########


Review Comment:
   I adjusted the implementation to make it schema-reliant, so that it directly extracts from rowdata instead of the serialized key now. please refresh the code to see the new implementation.. I think it is pretty distinct from raw hash partitioner now, and might take some effort to merge.
   
   Single Table and multiple sink scenerios are pretty different, when trying merge the two scenerios together ( like how i tried in Doris connector), the resulting code has proven to be.. rather hard to debug and modify, since all changes need to be compatible with both scenerios. I think that segregating the two cases strictly within connectors might lead to fewer bugs, especially in the production environment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] duowan1520 commented on a diff in pull request #7924: [INLONG-7900][Sort] Upsert Kafka Sink Single Table Partition By Primary key

Posted by "duowan1520 (via GitHub)" <gi...@apache.org>.
duowan1520 commented on code in PR #7924:
URL: https://github.com/apache/inlong/pull/7924#discussion_r1180942612


##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/partitioner/PrimaryKeyPartitioner.java:
##########


Review Comment:
   Hi, I noticed that in the RawDataHashPartitioner class there is code logic to get partition keys based on raw data primarykey and pattern. We have added a new partitioner called PrimaryKeyPartitioner and I was wondering what the relationship is between PrimaryKeyPartitioner and RawDataHashPartitioner?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #7924: [INLONG-7900][Sort] Support partition by primary key when upsert single table of Kafka

Posted by "Yizhou-Yang (via GitHub)" <gi...@apache.org>.
Yizhou-Yang commented on code in PR #7924:
URL: https://github.com/apache/inlong/pull/7924#discussion_r1185687301


##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/partitioner/PrimaryKeyPartitioner.java:
##########


Review Comment:
   Hi!  RawdataHashPartitioner was based on canal/debezium data and are parsed using corresponding JsonDynamicFormat. In contrast, the PrimaryKeyPartitioner is used primarily to support csv/avro/json format (but needs to keep compatibility with canal/debezium in case), which does not have JsonDynamicFormat and need not to be parsed. If we use RawDataHashPartitioner, there will be Null Pointer Exceptions from the missing JsonDynamicFormat.
   
   Another thing, design-wise, is that raw data is coming mostly from multiple sink scenerios, so instead of extending that partitioner, I want to make a seperate partitioner for single table, so that it might be more extendable and easier to maintain in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #7924: [INLONG-7900][Sort] Support partition by primary key when upsert single table of Kafka

Posted by "Yizhou-Yang (via GitHub)" <gi...@apache.org>.
Yizhou-Yang commented on code in PR #7924:
URL: https://github.com/apache/inlong/pull/7924#discussion_r1185687301


##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/partitioner/PrimaryKeyPartitioner.java:
##########


Review Comment:
   Hi!  Purpose-wise the two are aiming to achieve the same thing. However, the rawdatahashpartitioner was based on canal/debezium data and are parsed using corresponding JsonDynamicFormat. In contrast, the PrimaryKeyPartitioner is used primarily to support csv/avro/json format (but needs to keep compatibility with canal/debezium in case), which does not have JsonDynamicFormat and need not to be parsed. If we use RawDataHashPartitioner, there will be Null Pointer Exceptions from the missing JsonDynamicFormat.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #7924: [INLONG-7900][Sort] Support partition by primary key when upsert single table of Kafka

Posted by "Yizhou-Yang (via GitHub)" <gi...@apache.org>.
Yizhou-Yang commented on code in PR #7924:
URL: https://github.com/apache/inlong/pull/7924#discussion_r1185687301


##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/partitioner/PrimaryKeyPartitioner.java:
##########


Review Comment:
   Hi!  RawdataHashPartitioner was based on canal/debezium data and are parsed using corresponding JsonDynamicFormat. In contrast, the PrimaryKeyPartitioner is used primarily in cases which does not have JsonDynamicFormat. If we use RawDataHashPartitioner, there will be Null Pointer Exceptions and it doesn't work.
   
   Another thing, design-wise, is that raw data is coming mostly from multiple sink scenerios, so instead of extending that partitioner, I want to make a seperate partitioner for single table, so that it might be more extendable and easier to maintain in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #7924: [INLONG-7900][Sort] Support partition by primary key when upsert single table of Kafka

Posted by "Yizhou-Yang (via GitHub)" <gi...@apache.org>.
Yizhou-Yang commented on code in PR #7924:
URL: https://github.com/apache/inlong/pull/7924#discussion_r1188185705


##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/partitioner/PrimaryKeyPartitioner.java:
##########


Review Comment:
   I adjusted the implementation to make it schema-reliant, so that it directly extracts from rowdata instead of the serialized key now. please refresh the code to see the new implementation.. I think it is pretty distinct from raw hash partitioner now, and might take some effort to merge.
   
   The two scenerios are pretty different, when trying merge the two scenerios together ( like how i tried in Doris connector), the resulting code has proven to be.. rather hard to debug and modify, since all changes need to be compatible with both scenerios. I think that segregating the two cases strictly within connectors might lead to fewer bugs, especially in the production environment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #7924: [INLONG-7900][Sort] Support partition by primary key when upsert single table of Kafka

Posted by "Yizhou-Yang (via GitHub)" <gi...@apache.org>.
Yizhou-Yang commented on code in PR #7924:
URL: https://github.com/apache/inlong/pull/7924#discussion_r1185687301


##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/partitioner/PrimaryKeyPartitioner.java:
##########


Review Comment:
   Hi! The rawdatahashpartitioner was based on RawData, which are serialized canal/debezium data and are deserialized using corresponding JsonDynamicFormat. However, the PrimaryKeyPartitioner need to support csv/avro/json format in addition to canal/debezium, which does not have JsonDynamicFormat and need not to be parsed. If we use RawDataHashPartitioner, there will be Null Pointer Exceptions from the missing JsonDynamicFormat.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #7924: [INLONG-7900][Sort] Support partition by primary key when upsert single table of Kafka

Posted by "Yizhou-Yang (via GitHub)" <gi...@apache.org>.
Yizhou-Yang commented on code in PR #7924:
URL: https://github.com/apache/inlong/pull/7924#discussion_r1185687301


##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/partitioner/PrimaryKeyPartitioner.java:
##########


Review Comment:
   Hi! The rawdatahashpartitioner was based on canal/debezium data and are parsed using corresponding JsonDynamicFormat. However, the PrimaryKeyPartitioner is used primarily to support csv/avro/json format (but needs to keep compatibility with canal/debezium in case), which does not have JsonDynamicFormat and need not to be parsed. If we use RawDataHashPartitioner, there will be Null Pointer Exceptions from the missing JsonDynamicFormat.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #7924: [INLONG-7900][Sort] Support partition by primary key when upsert single table of Kafka

Posted by "Yizhou-Yang (via GitHub)" <gi...@apache.org>.
Yizhou-Yang commented on code in PR #7924:
URL: https://github.com/apache/inlong/pull/7924#discussion_r1185687301


##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/partitioner/PrimaryKeyPartitioner.java:
##########


Review Comment:
   Hi! The rawdatahashpartitioner was based on data serialized using canal/debezium data and are deserialized using corresponding JsonDynamicFormat. However, the PrimaryKeyPartitioner need to support csv/avro/json format in addition to canal/debezium, which does not have JsonDynamicFormat and need not to be parsed. If we use RawDataHashPartitioner, there will be Null Pointer Exceptions from the missing JsonDynamicFormat.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #7924: [INLONG-7900][Sort] Support partition by primary key when upsert single table of Kafka

Posted by "Yizhou-Yang (via GitHub)" <gi...@apache.org>.
Yizhou-Yang commented on code in PR #7924:
URL: https://github.com/apache/inlong/pull/7924#discussion_r1185687301


##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/partitioner/PrimaryKeyPartitioner.java:
##########


Review Comment:
   Hi! The rawdatahashpartitioner was based on data serialized using canal/debezium data and are deserialized using corresponding JsonDynamicFormat. However, the PrimaryKeyPartitioner is used primarily to support csv/avro/json format (but needs to keep compatibility with canal/debezium in case), which does not have JsonDynamicFormat and need not to be parsed. If we use RawDataHashPartitioner, there will be Null Pointer Exceptions from the missing JsonDynamicFormat.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #7924: [INLONG-7900][Sort] Support partition by primary key when upsert single table of Kafka

Posted by "Yizhou-Yang (via GitHub)" <gi...@apache.org>.
Yizhou-Yang commented on code in PR #7924:
URL: https://github.com/apache/inlong/pull/7924#discussion_r1185687301


##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/partitioner/PrimaryKeyPartitioner.java:
##########


Review Comment:
   Hi! The rawdatahashpartitioner was based on data serialized using canal/debezium data and are deserialized using corresponding JsonDynamicFormat. However, the PrimaryKeyPartitioner is used primarily to support csv/avro/json format in addition to canal/debezium, which does not have JsonDynamicFormat and need not to be parsed. If we use RawDataHashPartitioner, there will be Null Pointer Exceptions from the missing JsonDynamicFormat.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] duowan1520 commented on a diff in pull request #7924: [INLONG-7900][Sort] Support partition by primary key when upsert single table of Kafka

Posted by "duowan1520 (via GitHub)" <gi...@apache.org>.
duowan1520 commented on code in PR #7924:
URL: https://github.com/apache/inlong/pull/7924#discussion_r1188053827


##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/partitioner/PrimaryKeyPartitioner.java:
##########


Review Comment:
   Should we consider elevating the logic for extracting primary keys to a higher level and then merging the PrimaryKeyPartitioner and RawDataHashPartitioner into a hash-based Partitioner? This would help avoid introducing different partitioners when choosing between synchronizing a single table or multiple tables.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #7924: [INLONG-7900][Sort] Support partition by primary key when upsert single table of Kafka

Posted by "Yizhou-Yang (via GitHub)" <gi...@apache.org>.
Yizhou-Yang commented on code in PR #7924:
URL: https://github.com/apache/inlong/pull/7924#discussion_r1185687301


##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/partitioner/PrimaryKeyPartitioner.java:
##########


Review Comment:
   Hi!  RawdataHashPartitioner was based on canal/debezium data and are parsed using corresponding JsonDynamicFormat. In contrast, the PrimaryKeyPartitioner is used primarily to support csv/avro/json format (but needs to keep compatibility with canal/debezium in case), which does not have JsonDynamicFormat and need not to be parsed. If we use RawDataHashPartitioner, there will be Null Pointer Exceptions from the missing JsonDynamicFormat.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #7924: [INLONG-7900][Sort] Support partition by primary key when upsert single table of Kafka

Posted by "Yizhou-Yang (via GitHub)" <gi...@apache.org>.
Yizhou-Yang commented on code in PR #7924:
URL: https://github.com/apache/inlong/pull/7924#discussion_r1186972911


##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/partitioner/PrimaryKeyPartitioner.java:
##########


Review Comment:
   updating this pr, the user should designate the "partition key" using an additional property, and this new entry will be parsed from original data and used for hashing. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #7924: [INLONG-7900][Sort] Support partition by primary key when upsert single table of Kafka

Posted by "Yizhou-Yang (via GitHub)" <gi...@apache.org>.
Yizhou-Yang commented on code in PR #7924:
URL: https://github.com/apache/inlong/pull/7924#discussion_r1185687301


##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/partitioner/PrimaryKeyPartitioner.java:
##########


Review Comment:
   Hi! The rawdatahashpartitioner was based on data serialized using canal/debezium data and are deserialized using corresponding JsonDynamicFormat. However, the PrimaryKeyPartitioner is used primarily to support csv/avro/json format (but needs to keep compatibility with canal/debezium), which does not have JsonDynamicFormat and need not to be parsed. If we use RawDataHashPartitioner, there will be Null Pointer Exceptions from the missing JsonDynamicFormat.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org