You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/07/04 04:43:57 UTC

[GitHub] [hudi] ccchenhe opened a new issue, #6034: [SUPPORT] Flink Bucket Index Can't Update Records Using 0.11.1

ccchenhe opened a new issue, #6034:
URL: https://github.com/apache/hudi/issues/6034

   **Describe the problem you faced**
   
   testing hudi bucket index, I submit 2 flink applications to consume same topic using different group.id.
   
   application A using 0.10.0, index.type(not hoodie.index.type) = 'BLOOM'
   application B using 0.11.1, index.type(not hoodie.index.type) = 'BUCKET'
   
   e.g. we have 2 binlogs
   |id|DML type|detail|
   |--|--|--|
   |1|insert|insert xx into (id, update_time, status) values(1,1656860400, 0)|
   |1|update|update xx set update_time = 1656860460, status = 1 where id = 1 |
   
   and we think final staus should be status = 1 where id = 1
   |application|hudi version|id|record detail|it's right?|
   |--|--|--|--|--|
   |application A|0.10.0|id =1|status=1, update_time = 1656860460|yep|
   |application B|0.11.1|id =1|status=0, update_time = 1656860400|no|
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1.create kafka source table
   ```sql
   create table kafka_source_table(
   `database` string
   ,`table` string
   ,`type` string
   ,`ts` bigint
   ,`maxwell_ts` bigint
   ,`session_id` string
   ,`sequence_id` int
   ,`xid` bigint
   ,`xoffset` int
   ,`primary_key` Array<Int>
   ,`primary_key_columns` Array<String>
   ,`data` Map<String, String>
   ,`old` Map<String, String>
   ,`ts` TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss'))
   ,WATERMARK FOR `ts` AS `ts` - INTERVAL '10' SECOND.
   )with (
        'connector' = 'kafka'
       ,'properties.security.protocol' = 'SASL_PLAINTEXT'
       ,'properties.sasl.mechanism' = 'PLAIN'
       ,'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="xxxx" password="xxxx";'
       ,'scan.startup.mode' = 'earliest-offset' 
       ,'format' = 'json'
       ,'json.ignore-parse-errors' = 'true'
       ,'properties.fetch.message.max.bytes' = '10485760'
       ,'properties.socket.receive.buffer.bytes' = '1048576'
       ,'properties.request.timeout.ms' = '60000'
       ,'topic'='xxxxxxxx'
       ,'properties.group.id' = 'xxx'
       ,'properties.bootstrap.servers'='xxxx'
   ); 
   ```
   2.create hudi sink table using BUCKET index
   ```sql 
   create table sink_hudi (accounting_statement_id STRING ,
   amount BIGINT ,
   channel_detail_id STRING ,
   channel_id BIGINT ,
   clearing_agreement_item_id DECIMAL(20, 0) ,
   clearing_date DATE ,
   client_id STRING ,
   complete_time BIGINT ,
   create_time BIGINT ,
   currency STRING ,
   detail_status INT ,
   discrepancy_statement_id STRING ,
   entity_id DECIMAL(20, 0) ,
   extra_info STRING ,
   fund_direction INT ,
   id STRING ,
   identity_id STRING ,
   institution_no STRING ,
   mid STRING ,
   payment_id STRING ,
   reference_id STRING ,
   sub_channel_code STRING ,
   system_code STRING ,
   transaction_id STRING ,
   transaction_time BIGINT ,
   update_time BIGINT ,
   version BIGINT ,
   writeoff_statement_id STRING ,
   `database` STRING, `table` STRING, 
   `_event` Row<`ts` bigint,`database` string,`table` string,`type` string,`maxwell_ts` BIGINT>
   , grass_date STRING  ) 
   WITH(
        'compaction.schedule.enabled' = 'true'
        ,'compaction.async.enabled' = 'false'
        ,'compaction.tasks' = '8'
        ,'compaction.delta_commits' = '15'
        ,'hoodie.table.type' = 'COPY_ON_WRITE' 
        ,'hoodie.parquet.max.file.size' = '268435456'
        ,'hoodie.datasource.write.recordkey.field' = 'database,table,id'
        ,'hoodie.datasource.write.precombine.field' = 'update_time'
        ,'hoodie.parquet.small.file.limit' = '104857600'
        ,'hoodie.parquet.compression.codec'= 'snappy'
        ,'connector' = 'hudi'
        ,'path' = '$hdfsPath'
        ,'index.bootstrap.enabled' = 'true'
        ,'index.state.ttl' = '0'
        ,'index.type' = 'BUCKET'
        ,'hoodie.index.type' = 'BLOOM'
        ,'hoodie.index.class' = 'org.apache.hudi.index.bucket.HoodieBucketIndex'
        ,'hive_sync.partition_fields' = 'grass_date'
        ,'hive_sync.metastore.uris' = '$thrift://xxx'
        ,'hive_sync.db' = '$hiveDatabaseName'
        ,'hive_sync.table' = '$hiveTableName'
        ,'hive_sync.enable' = 'true'
        ,'hive_sync.use_jdbc' = 'false'
        ,'hive_sync.mode' = 'hms'
        ,'hoodie.datasource.write.hive_style_partitioning'= 'true'
        ,'write.operation' = 'upsert'
        ,'write.tasks'='4'
        ,'write.index_bootstrap.tasks'='32'
        ,'write.bucket_assign.tasks'='16'
        ,'write.rate.limit' = '64000'
        ,'write.precombine.field' = 'update_time'
        ,'hoodie.payload.ordering.field' = 'update_time'
        ,'write.payload.class' = 'org.apache.hudi.common.model.DefaultHoodieRecordPayload'
        ,'hoodie.datasource.write.partitionpath.field' = 'grass_date'
        ,'hoodie.datasource.write.keygenerator.class' = 'org.apache.hudi.keygen.ComplexAvroKeyGenerator'
        ,'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.MultiPartKeysValueExtractor'
        ,'hoodie.bucket.index.num.buckets' = '20'
        ,'hoodie.bucket.index.hash.field' = 'database,table,id'
   )
   ```
   3.create hudi sink table using Flink State index 
   ```sql
   -- ...
   -- same
   -- ...
   WITH(
   
        ,'index.type' = 'BLOOM'
       -- ,'hoodie.index.class' = 'org.apache.hudi.index.bucket.HoodieBucketIndex'
       -- ,'hoodie.bucket.index.num.buckets' = '20'
       -- other config is same
   )
   ```
   4. 2 tasks consume same topic using different groupid, and sink different hudi table, one use flink state, other use bucket
   ```sql
   insert into sink
   SELECT CAST(JSON_VALUE(`data`, '$.accounting_statement_id') as STRING),
   CAST(JSON_VALUE(`data`, '$.amount') as BIGINT), 
   CAST(JSON_VALUE(`data`, '$.channel_detail_id') as STRING), 
   CAST(JSON_VALUE(`data`, '$.channel_id') as BIGINT), 
   CAST(JSON_VALUE(`data`, '$.clearing_agreement_item_id') as DECIMAL(20, 0)), 
   CAST(JSON_VALUE(`data`, '$.clearing_date') as DATE), 
   CAST(JSON_VALUE(`data`, '$.client_id') as STRING), 
   CAST(JSON_VALUE(`data`, '$.complete_time') as BIGINT), 
   CAST(JSON_VALUE(`data`, '$.create_time') as BIGINT), 
   CAST(JSON_VALUE(`data`, '$.currency') as STRING), 
   CAST(JSON_VALUE(`data`, '$.detail_status') as INT), 
   CAST(JSON_VALUE(`data`, '$.discrepancy_statement_id') as STRING), 
   CAST(JSON_VALUE(`data`, '$.entity_id') as DECIMAL(20, 0)), 
   CAST(JSON_VALUE(`data`, '$.extra_info') as STRING), 
   CAST(JSON_VALUE(`data`, '$.fund_direction') as INT), 
   CAST(JSON_VALUE(`data`, '$.id') as STRING), 
   CAST(JSON_VALUE(`data`, '$.identity_id') as STRING), 
   CAST(JSON_VALUE(`data`, '$.institution_no') as STRING), 
   CAST(JSON_VALUE(`data`, '$.mid') as STRING), 
   CAST(JSON_VALUE(`data`, '$.payment_id') as STRING), 
   CAST(JSON_VALUE(`data`, '$.reference_id') as STRING), 
   CAST(JSON_VALUE(`data`, '$.sub_channel_code') as STRING), 
   CAST(JSON_VALUE(`data`, '$.system_code') as STRING), 
   CAST(JSON_VALUE(`data`, '$.transaction_id') as STRING), 
   CAST(JSON_VALUE(`data`, '$.transaction_time') as BIGINT), 
   CAST(IFNULL(JSON_VALUE(`data`, '$.update_time'), '0') as BIGINT), 
   CAST(JSON_VALUE(`data`, '$.version') as BIGINT), 
   CAST(JSON_VALUE(`data`, '$.writeoff_statement_id') as STRING), 
   `database`, 
   `table`,
   Row(`ts`, `database`, `table`, `type`, `maxwell_ts`), 
   DATE_FORMAT(TO_TIMESTAMP(FROM_UNIXTIME(CAST(JSON_VALUE(`data`, '$.create_time') as bigint), 'yyyy-MM-dd HH:mm:ss')), 'yyyy-MM-dd') 
   
   from source
   
   ```
   5. compare 2 hudi tables
   ```sql
   select *
   from stateTable t1
   left join bucketTable t2
   on t1.id = t2.id
   and t1.`database` = t2.`database`
   and t1.`table` = t2.`table`
   where t1.update_time > t2.update_time
   and t1.grass_date >= '2022-06-30'
   ```
   
   **Expected behavior**
   
   A clear and concise description of what you expected to happen.
   
   **Environment Description**
   *Flink version: 1.13.14
   
   * Hudi version :0.11.1
   
   * Spark version :not use
   
   * Hive version :3.1.2
   
   * Hadoop version :3.2.0
   
   * Storage (HDFS/S3/GCS..) :HDFS
   
   * Running on Docker? (yes/no) :nope
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on issue #6034: [SUPPORT] Flink Bucket Index Can't Update Records Using 0.11.1

Posted by GitBox <gi...@apache.org>.
danny0405 commented on issue #6034:
URL: https://github.com/apache/hudi/issues/6034#issuecomment-1173659440

   You can set up partitions, can you show some characteristics of the data that duplicates ? It is helpful to dig into the reason.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on issue #6034: [SUPPORT] Flink Bucket Index Can't Update Records Using 0.11.1

Posted by GitBox <gi...@apache.org>.
danny0405 commented on issue #6034:
URL: https://github.com/apache/hudi/issues/6034#issuecomment-1173601299

   One thing you need to know is that bucket index does not support update upon different partitions, expect that you source stream is CDC itself.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on issue #6034: [SUPPORT] Flink Bucket Index Can't Update Records Using 0.11.1

Posted by GitBox <gi...@apache.org>.
danny0405 commented on issue #6034:
URL: https://github.com/apache/hudi/issues/6034#issuecomment-1306541144

   Close because of prolonged inactivity.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] fengjian428 commented on issue #6034: [SUPPORT] Flink Bucket Index Can't Update Records Using 0.11.1

Posted by GitBox <gi...@apache.org>.
fengjian428 commented on issue #6034:
URL: https://github.com/apache/hudi/issues/6034#issuecomment-1174061213

   there are no duplicates in both tables,  is it possible events in Kafka are not the same when you ingest data into the bucket indexed table?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] ccchenhe commented on issue #6034: [SUPPORT] Flink Bucket Index Can't Update Records Using 0.11.1

Posted by GitBox <gi...@apache.org>.
ccchenhe commented on issue #6034:
URL: https://github.com/apache/hudi/issues/6034#issuecomment-1174748167

   > > You can set up partitions, can you show some characteristics of the data that duplicates ? It is helpful to dig into the reason.
   > > ok
   > 
   > now same id have 2 records, 1 is insert, other 1 is update
   > 
   > ```json
   > // insert
   > {"database":"database_00000001","table":"table_00000001","type":"insert","ts":1656643663,"maxwell_ts":1656643663889000,"xid":8498,"xoffset":1,"primary_key":[10401000107002207010151820438060],"primary_key_columns":["id"],"data":{"id":"10401000107002207010151820438060","version":1,"create_time":1656643663,"update_time":1656643663},"old":{}}
   > // update
   > {"database":"database_00000001","table":"table_00000001","type":"update","ts":1656660063,"maxwell_ts":1656660063062000,"xid":7210,"xoffset":1,"primary_key":[10401000107002207010151820438060],"primary_key_columns":["id"],"data":{"id":"10401000107002207010151820438060","version":2,"create_time":1656643663,"update_time":1656660063},"old":{"version":1,"update_time":1656643663}}
   > ```
   > 
   > application using flink bloom state consume kafka ( these 2 records), and we got ![image](https://user-images.githubusercontent.com/20533543/177280265-1f3dc430-ec16-453c-9484-525e031e83ef.png)
   > 
   > application using flink bucket consume kafka ( these 2 records), and we got <img alt="image" width="1792" src="https://user-images.githubusercontent.com/20533543/177280462-66a9e285-73ae-484c-ac91-c80acc61a3ac.png">
   
   m sure that all of records saved kafka and consume. becuase each binlog has double write hdfs use other channel


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on issue #6034: [SUPPORT] Flink Bucket Index Can't Update Records Using 0.11.1

Posted by GitBox <gi...@apache.org>.
danny0405 commented on issue #6034:
URL: https://github.com/apache/hudi/issues/6034#issuecomment-1175023540

   I see many useless options for the SQL, can you try this option again:
   ```sql
         'hoodie.table.type' = 'COPY_ON_WRITE' 
        ,'hoodie.datasource.write.recordkey.field' = 'database,table,id'
        ,'hoodie.datasource.write.precombine.field' = 'update_time'
        ,'hoodie.parquet.compression.codec'= 'snappy'
        ,'connector' = 'hudi'
        ,'path' = '$hdfsPath'
        ,'index.bootstrap.enabled' = 'true'
        ,'index.type' = 'BUCKET'
        ,'hive_sync.partition_fields' = 'grass_date'
        ,'hive_sync.metastore.uris' = '$thrift://xxx'
        ,'hive_sync.db' = '$hiveDatabaseName'
        ,'hive_sync.table' = '$hiveTableName'
        ,'hive_sync.enable' = 'true'
        ,'hive_sync.use_jdbc' = 'false'
        ,'hive_sync.mode' = 'hms'
        ,'hoodie.datasource.write.hive_style_partitioning'= 'true'
        ,'write.tasks'='4'
        ,'write.index_bootstrap.tasks'='32'
        ,'write.rate.limit' = '64000'
        ,'write.precombine.field' = 'update_time'
        ,'hoodie.datasource.write.partitionpath.field' = 'grass_date'
        ,'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.MultiPartKeysValueExtractor'
        ,'hoodie.bucket.index.num.buckets' = '20'
        ,'hoodie.bucket.index.hash.field' = 'database,table,id'
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] ccchenhe commented on issue #6034: [SUPPORT] Flink Bucket Index Can't Update Records Using 0.11.1

Posted by GitBox <gi...@apache.org>.
ccchenhe commented on issue #6034:
URL: https://github.com/apache/hudi/issues/6034#issuecomment-1174746611

   > You can set up partitions, can you show some characteristics of the data that duplicates ? It is helpful to dig into the reason.
   ok
   
   now same id have 2 records, 1 is insert, other 1 is update
   ```json
   // insert
   {"database":"database_00000001","table":"table_00000001","type":"insert","ts":1656643663,"maxwell_ts":1656643663889000,"xid":8498,"xoffset":1,"primary_key":[10401000107002207010151820438060],"primary_key_columns":["id"],"data":{"id":"10401000107002207010151820438060","version":1,"create_time":1656643663,"update_time":1656643663},"old":{}}
   // update
   {"database":"database_00000001","table":"table_00000001","type":"update","ts":1656660063,"maxwell_ts":1656660063062000,"xid":7210,"xoffset":1,"primary_key":[10401000107002207010151820438060],"primary_key_columns":["id"],"data":{"id":"10401000107002207010151820438060","version":2,"create_time":1656643663,"update_time":1656660063},"old":{"version":1,"update_time":1656643663}}
   
   ```
   application using flink bloom state consume kafka ( these 2 records), and we got
   ![image](https://user-images.githubusercontent.com/20533543/177280265-1f3dc430-ec16-453c-9484-525e031e83ef.png)
   
   application using flink bucket consume kafka ( these 2 records), and we got
   <img width="1792" alt="image" src="https://user-images.githubusercontent.com/20533543/177280462-66a9e285-73ae-484c-ac91-c80acc61a3ac.png">
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] ccchenhe commented on issue #6034: [SUPPORT] Flink Bucket Index Can't Update Records Using 0.11.1

Posted by GitBox <gi...@apache.org>.
ccchenhe commented on issue #6034:
URL: https://github.com/apache/hudi/issues/6034#issuecomment-1173612993

   > 
   
   yep, my kafka data source is binlog.
   or can I understand that if using bucket index, it is better not to set partitions? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] ccchenhe commented on issue #6034: [SUPPORT] Flink Bucket Index Can't Update Records Using 0.11.1

Posted by GitBox <gi...@apache.org>.
ccchenhe commented on issue #6034:
URL: https://github.com/apache/hudi/issues/6034#issuecomment-1178805367

   > I see many useless options for the SQL, can you try this option again:
   > 
   > ```sql
   >       'hoodie.table.type' = 'COPY_ON_WRITE' 
   >      ,'hoodie.datasource.write.recordkey.field' = 'database,table,id'
   >      ,'hoodie.datasource.write.precombine.field' = 'update_time'
   >      ,'hoodie.parquet.compression.codec'= 'snappy'
   >      ,'connector' = 'hudi'
   >      ,'path' = '$hdfsPath'
   >      ,'index.bootstrap.enabled' = 'true'
   >      ,'index.type' = 'BUCKET'
   >      ,'hive_sync.partition_fields' = 'grass_date'
   >      ,'hive_sync.metastore.uris' = '$thrift://xxx'
   >      ,'hive_sync.db' = '$hiveDatabaseName'
   >      ,'hive_sync.table' = '$hiveTableName'
   >      ,'hive_sync.enable' = 'true'
   >      ,'hive_sync.use_jdbc' = 'false'
   >      ,'hive_sync.mode' = 'hms'
   >      ,'hoodie.datasource.write.hive_style_partitioning'= 'true'
   >      ,'write.tasks'='4'
   >      ,'write.index_bootstrap.tasks'='32'
   >      ,'write.rate.limit' = '64000'
   >      ,'write.precombine.field' = 'update_time'
   >      ,'hoodie.datasource.write.partitionpath.field' = 'grass_date'
   >      ,'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.MultiPartKeysValueExtractor'
   >      ,'hoodie.bucket.index.num.buckets' = '20'
   >      ,'hoodie.bucket.index.hash.field' = 'database,table,id'
   > ```
   
   after testing , not works : (
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 closed issue #6034: [SUPPORT] Flink Bucket Index Can't Update Records Using 0.11.1

Posted by GitBox <gi...@apache.org>.
danny0405 closed issue #6034: [SUPPORT] Flink Bucket Index Can't Update Records Using 0.11.1
URL: https://github.com/apache/hudi/issues/6034


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org