You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/07/21 07:54:12 UTC

[GitHub] [hudi] haripriyarhp opened a new issue, #6166: [SUPPORT] Missing records when using Kafka Hudi sink to write to S3.

haripriyarhp opened a new issue, #6166:
URL: https://github.com/apache/hudi/issues/6166

   **_Tips before filing an issue_**
   
   - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)?
   
   - Join the mailing list to engage in conversations and get faster support at dev-subscribe@hudi.apache.org.
   
   - If you have triaged this as a bug, then file an [issue](https://issues.apache.org/jira/projects/HUDI/issues) directly.
   
   **Describe the problem you faced**
   
   I am using the Kafka Hudi sink to write to S3. I am having mismatch in the number messages present in a topic and the number of records showing up in Athena for both MoR and CoW. For MoR, even after running the compaction there are some missing records. 
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. Initially, I sent 100 messages to a topic. It refelected in Athena after compaction. 
   2. Later sent 100 more new messages + some updates + some duplicates of previous 100. Record count was not correct. 
   3. And later sent like 1000 messages and still record count was not correct after compaction.
   4. The config file properties are 
   {
       "name": "hudi-sink",
       "config": {
   		"bootstrap.servers": "localhost:9092",
   		"connector.class": "org.apache.hudi.connect.HoodieSinkConnector",
   		"tasks.max": "4",
   		"control.topic.name": "hudi-control-topic-mor",
   		"topics": "sensor",
   		"hoodie.table.name": "sensor-mor",
   		"hoodie.table.type": "MERGE_ON_READ",
   		"key.converter": "org.apache.kafka.connect.storage.StringConverter",
   		"value.converter": "org.apache.kafka.connect.storage.StringConverter",
   		"hoodie.base.path": "s3a://path/sensor_mor",
   		"hoodie.datasource.write.recordkey.field":"oid,styp,sname,ts",
   		"hoodie.datasource.write.partitionpath.field":"gid,datatype,origin,oid",
   		"hoodie.datasource.write.keygenerator.type":"COMPLEX",
   		"hoodie.datasource.write.hive_style_partitioning": "true",
   		"hoodie.compact.inline.max.delta.commits":2,
   		"fs.s3a.fast.upload": "true",
   		"fs.s3a.access.key": "myaccesskey",
   		"fs.s3a.secret.key": "secretkey",
   		"hoodie.schemaprovider.class": "org.apache.hudi.schema.SchemaRegistryProvider",
   		"hoodie.deltastreamer.schemaprovider.registry.url": "http://localhost:8081/subjects/sensor/versions/latest",
   		"hoodie.kafka.commit.interval.secs": 60
         }
   }
   
   **Expected behavior**
   
   Irrespective of the messages sent to topic (could be new messages or duplicates or updates), as described, the connector should append them to tables. 
   
   **Environment Description**
   
   * Hudi version : 0.11.0
   
   * Spark version : 3.1.3
   
   * Hive version :
   
   * Hadoop version : 3.2
     
   * Storage (HDFS/S3/GCS..) : S3
     
   * Running on Docker? (yes/no) : No
    
   
   **Additional context**
   
   Add any other context about the problem here.
   
   **Stacktrace**
   
   ```Add the stacktrace of the error.```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] rmahindra123 commented on issue #6166: [SUPPORT] Missing records when using Kafka Hudi sink to write to S3.

Posted by GitBox <gi...@apache.org>.
rmahindra123 commented on issue #6166:
URL: https://github.com/apache/hudi/issues/6166#issuecomment-1195091228

   @haripriyarhp Can you verify that all .log files were compacted to parquet files after you ran compaction? Want to make sure records are not in the log files when querying 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] haripriyarhp commented on issue #6166: [SUPPORT] Missing records when using Kafka Hudi sink to write to S3.

Posted by GitBox <gi...@apache.org>.
haripriyarhp commented on issue #6166:
URL: https://github.com/apache/hudi/issues/6166#issuecomment-1199226564

   @rmahindra123 :  Unfortunately, I am not able to share the .hoodie folder. Just to add, yesterday I tried it out again. I sent messages to a topic in batches. Below are the steps I followed
   1. Sent a batch of 100 records to kafka. Ran compaction. No.of messages in kafka and no.of records in Athena, matched.
   2. Sent a batch of another 100 records to Kafka -> Compaction -> no.of msgs in kafka = no.of records in Athena.
   3. Sent a batch of another 100 records (here there were some duplicates ) -> Compaction -> no.of.msgs in Kafka = no. of records in Athena.
   4. Sent another batch 98 records (some were duplicates) -> compaction -> no.of messages != no.of records in Athena.  There were no more files to be compacted. About 24 records were missing.
   5.  Sent another 100 records. -> compaction -> record count did not match. there was same 24 missing. 
   
   More or less, I followed the above steps several times before I raised the issue here. Each time, after few runs the record count does not match even after running compaction.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] haripriyarhp commented on issue #6166: [SUPPORT] Missing records when using Kafka Hudi sink to write to S3.

Posted by GitBox <gi...@apache.org>.
haripriyarhp commented on issue #6166:
URL: https://github.com/apache/hudi/issues/6166#issuecomment-1191910129

   Okay, let me clarify. 
   First, i sent 100 messages. It was fine. Athena also showed 100 records
   Next I sent 100 new messages + 25 updates + 25 duplicates of previous 100 messages. In total there are 250 messages in Kafka but Athena showed only 247. Irrespective of inserts, duplicates or updates, I am assuming that the connector should append the messages. 
   Later on, I continued sending several rounds of messages and found that the count did not match. Few records were missing (somewhere between 20 -50) for around 500-600 messages sent to Kafka. I made this test several times And each time, there were some missing records. 
   I tested with CoW too, it also had missing records. The no.of records in Athena was always less than no.of messages in Kafka 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] rmahindra123 commented on issue #6166: [SUPPORT] Missing records when using Kafka Hudi sink to write to S3.

Posted by GitBox <gi...@apache.org>.
rmahindra123 commented on issue #6166:
URL: https://github.com/apache/hudi/issues/6166#issuecomment-1195094414

   Also, could you run a parquet query to see if all records are in hudi? It would be helpful if you could share your .hoodie folder here. Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] rmahindra123 commented on issue #6166: [SUPPORT] Missing records when using Kafka Hudi sink to write to S3.

Posted by GitBox <gi...@apache.org>.
rmahindra123 commented on issue #6166:
URL: https://github.com/apache/hudi/issues/6166#issuecomment-1191855029

   @haripriyarhp the current kafka connector only supports inserts and not updates. Could you clarify the below comment:
   
   >> Later sent 100 more new messages + some updates + some duplicates of previous 100. Record count was not correct.
   
   In this case, do you see more records in the Hudi table than the kafka records? Or vice versa.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] haripriyarhp commented on issue #6166: [SUPPORT] Missing records when using Kafka Hudi sink to write to S3.

Posted by GitBox <gi...@apache.org>.
haripriyarhp commented on issue #6166:
URL: https://github.com/apache/hudi/issues/6166#issuecomment-1197932705

   @rmahindra123 : Yes, all the files were compacted. There are no more open compaction.requested files. All the compaction was completed and only then I queried the Athena table. 
   
   > Also, could you run a parquet query to see if all records are in hudi? It would be helpful if you could share your .hoodie folder here. Thanks
   
   Is this some special query? Because I query the Athena table. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] ad1happy2go commented on issue #6166: [SUPPORT] Missing records when using Kafka Hudi sink to write to S3.

Posted by "ad1happy2go (via GitHub)" <gi...@apache.org>.
ad1happy2go commented on issue #6166:
URL: https://github.com/apache/hudi/issues/6166#issuecomment-1534596219

   Able to reproduce this. Actually MOR table supports upserts. But we are seeing dupes.
   
   ```
   # Publish first 100 records
   bash setupKafka.sh -n 100 -k test1
       Before Compaction - 100
       After Compaction - 100
   
   # Publish next  100 records all with new keys
   bash setupKafka.sh -n 100 -t -o 100 -k test1
       Before Compaction - 200
       After Compaction - 200
   
   # Publish next  100 records 50 upsets and 50 new keys
   bash setupKafka.sh -n 100 -t -o 150 -k test1
       Before Compaction - 285 [ Distinct count of ("_hoodie_partition_path","volume") = 250)
       After Compaction - 285
       So, dupes coming after this step.    
   
   
   bash setupKafka.sh -n 100 -t -o 180 -k test1
       Before Compaction - 369 Distinct count = 280
       After Compaction - 369 Distinct count = 280
   ```
   connect properties used 
   ```
   {
       "name": "test1",
       "config": {
                   "bootstrap.servers": "kafkabroker:9092",
                   "connector.class": "org.apache.hudi.connect.HoodieSinkConnector",
                   "tasks.max": "4",
                   "key.converter": "org.apache.kafka.connect.storage.StringConverter",
                   "value.converter": "org.apache.kafka.connect.storage.StringConverter",
                   "value.converter.schemas.enable": "false",
                   "topics": "test1",
                   "hoodie.table.name": "test1",
                   "hoodie.table.type": "MERGE_ON_READ",
                   "hoodie.base.path": "file:///tmp/hoodie/test1",
                   "hoodie.datasource.write.recordkey.field": "volume",
                   "hoodie.datasource.write.partitionpath.field": "date",
                   "hoodie.schemaprovider.class": "org.apache.hudi.schema.SchemaRegistryProvider",
                   "hoodie.deltastreamer.schemaprovider.registry.url": "http://localhost:8082/subjects/test1/versions/latest",
                   "hoodie.kafka.commit.interval.secs": 60,
                   "hoodie.compact.schedule.inline":"true",
                   "hoodie.compact.inline.max.delta.commits":1
         }
   }
   ```
   
   Also, I noticed the data loss with async clustering enabled.
    
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org