You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/02/24 09:36:54 UTC

[GitHub] [hudi] scxwhite opened a new issue #4899: clustering cause Duplicate Records in Merge on Read [SUPPORT]

scxwhite opened a new issue #4899:
URL: https://github.com/apache/hudi/issues/4899


   
   
   **Describe the problem you faced**
   
   I reproduced this problem using the following code(spark local run). In the following code, I repeatedly update 10000 pieces of data, but if I execute the following code more than 5 times, the program will report an error.
   The problem may occur in clustering. When merging small files, the old submitted files are obtained(SparkSizeBasedClusteringPlanStrategy#getFileSlicesEligibleForClustering).
   
   **To Reproduce**
   
   
   ```
   
   import org.apache.hudi.DataSourceWriteOptions;
   import org.apache.hudi.client.clustering.update.strategy.SparkRejectUpdateStrategy;
   import org.apache.hudi.common.config.HoodieMetadataConfig;
   import org.apache.hudi.common.model.*;
   import org.apache.hudi.common.table.HoodieTableConfig;
   import org.apache.hudi.common.table.marker.MarkerType;
   import org.apache.hudi.config.*;
   import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
   import org.apache.spark.sql.*;
   
   import java.util.*;
   
   import static org.apache.hudi.common.model.HoodieTableType.*;
   
   /**
    * Created by sucx
    */
   public class Test {
   
   
       public static void main(String[] args) {
           List<String> data = new ArrayList<>();
           List<String> dtList = new ArrayList<>();
           dtList.add("197001");
           Random random = new Random();
           for (int i = 0; i < 100000; i++) {
               String dt = dtList.get(random.nextInt(dtList.size()));
               data.add("{\"dt\":\"" + dt + "\",\"id\":\"" + random.nextInt(10000) + "\",\"gmt_modified\":" + System.currentTimeMillis() + "}");
           }
   
           SparkSession sparkSession = SparkSession.builder().config("spark.serializer","org.apache.spark.serializer.KryoSerializer").master("local[*]").getOrCreate();
   
           Dataset<String> dataset = sparkSession.createDataset(data, Encoders.STRING());
           Dataset<Row> json = sparkSession.read().json(dataset);
           json.printSchema();
           int dataKeepTime = 5;
           json.selectExpr("dt", "id", "gmt_modified", "'' as name").toDF()
                   .write()
                   .format("org.apache.hudi")
                   .option(HoodieTableConfig.TYPE.key(), MERGE_ON_READ.name())
                   .option(DataSourceWriteOptions.OPERATION().key(), WriteOperationType.UPSERT.value())
                   .option(DataSourceWriteOptions.TABLE_TYPE().key(), MERGE_ON_READ.name())
                   .option(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "id")
                   .option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "dt")
                   .option(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key(), true)
                   .option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "gmt_modified")
                   .option(HoodieWriteConfig.COMBINE_BEFORE_UPSERT.key(), true)
                   .option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key(), 200)
                   .option(HoodieWriteConfig.FINALIZE_WRITE_PARALLELISM_VALUE.key(), 200)
                   .option(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key(), DefaultHoodieRecordPayload.class.getName())
                   .option(HoodieWriteConfig.AVRO_EXTERNAL_SCHEMA_TRANSFORMATION_ENABLE.key(), false)
                   .option(HoodieWriteConfig.MARKERS_TYPE.key(), MarkerType.DIRECT.toString())
                   .option(HoodieCompactionConfig.PAYLOAD_CLASS_NAME.key(), DefaultHoodieRecordPayload.class.getName())
                   .option(HoodieCompactionConfig.CLEANER_FILE_VERSIONS_RETAINED.key(), dataKeepTime)
                   .option(HoodieCompactionConfig.AUTO_CLEAN.key(), false)
                   .option(HoodieCompactionConfig.CLEANER_INCREMENTAL_MODE_ENABLE.key(), false)
                   .option(HoodieCompactionConfig.CLEANER_COMMITS_RETAINED.key(), dataKeepTime)
                   .option(HoodieCompactionConfig.MIN_COMMITS_TO_KEEP.key(), dataKeepTime + 1)
                   .option(HoodieCompactionConfig.MAX_COMMITS_TO_KEEP.key(), dataKeepTime + 2)
                   .option(HoodieCompactionConfig.TARGET_IO_PER_COMPACTION_IN_MB.key(), 500 * 1024)
                   .option(HoodieCompactionConfig.CLEANER_POLICY.key(), HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS.name())
                   .option(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), 128 * 1024 * 1024)
                   .option(HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY.key(), HoodieFailedWritesCleaningPolicy.EAGER.name())
                   .option(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), 256 * 1024 * 1024)
                   .option(HoodieCompactionConfig.INLINE_COMPACT.key(), true)
                   .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), 1)
                   .option(HoodieClusteringConfig.INLINE_CLUSTERING.key(), true)
                   .option(HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key(), 1)
                   .option(HoodieClusteringConfig.PLAN_STRATEGY_MAX_BYTES_PER_OUTPUT_FILEGROUP.key(), 256 * 1024 * 1024L)
                   .option(HoodieClusteringConfig.PLAN_STRATEGY_TARGET_FILE_MAX_BYTES.key(), 256 * 1024 * 1024L)
                   .option(HoodieClusteringConfig.PLAN_STRATEGY_SMALL_FILE_LIMIT.key(), 128 * 1024 * 1024L)
                   .option(HoodieClusteringConfig.UPDATES_STRATEGY.key(), SparkRejectUpdateStrategy.class.getName())
                   .option(HoodieMetadataConfig.ENABLE.key(), true)
                   .option(HoodieMetadataConfig.MIN_COMMITS_TO_KEEP.key(), dataKeepTime + 1)
                   .option(HoodieMetadataConfig.MAX_COMMITS_TO_KEEP.key(), dataKeepTime + 2)
                   .option(HoodieMetadataConfig.CLEANER_COMMITS_RETAINED.key(), dataKeepTime)
                   .option(HoodieClusteringConfig.PRESERVE_COMMIT_METADATA.key(), true)
                   .option(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, "gmt_modified")
                   .option(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY, "gmt_modified")
                   .option(HoodieIndexConfig.BLOOM_INDEX_KEYS_PER_BUCKET.key(), 100000)
                   .option(HoodieIndexConfig.BLOOM_INDEX_PARALLELISM.key(), 200)
                   .option(HoodieIndexConfig.BLOOM_INDEX_PRUNE_BY_RANGES.key(), true)
                   .option(HoodieTableConfig.NAME.key(), "sucx")
                   .mode(SaveMode.Append)
                   .save("/tmp/hudi/sucx_test");
           long hudi = sparkSession.read().format("hudi")
                   .load("/tmp/hudi/sucx_test")
                   .count();
           if (hudi > 10000) {
               throw new RuntimeException("count grater than 10000:" + hudi);
           }
           System.out.println("the count is:" + hudi);
       }
   
   }
   
   
   ```
   
   **Expected behavior**
   
   A clear and concise description of what you expected to happen.
   
   **Environment Description**
   
   * Hudi version : 0.9.0
   
   * Spark version : 3.0.0
   
   * Hive version : 
   
   * Hadoop version :
   
   * Storage (HDFS/S3/GCS..) : file
   
   * Running on Docker? (yes/no) : no
   
   
   **Additional context**
   
   Add any other context about the problem here.
   
   details: https://github.com/apache/hudi/issues/4311
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan closed issue #4899: clustering cause Duplicate Records in Merge on Read [SUPPORT]

Posted by GitBox <gi...@apache.org>.
nsivabalan closed issue #4899:
URL: https://github.com/apache/hudi/issues/4899


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on issue #4899: clustering cause Duplicate Records in Merge on Read [SUPPORT]

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #4899:
URL: https://github.com/apache/hudi/issues/4899#issuecomment-1052120084


   got it, thanks! please re-open if you run into any issues. thanks for bringing up the issue. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on issue #4899: clustering cause Duplicate Records in Merge on Read [SUPPORT]

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #4899:
URL: https://github.com/apache/hudi/issues/4899#issuecomment-1050302330


   I ran this tests 8 to 9 times w/ latest master and could not reproduce (I did not see the failure). and even verified that getLatestFileSlices(partition) always returns just 1 file slice which I correlated w/ actual data. Can you verify on your end please.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on issue #4899: clustering cause Duplicate Records in Merge on Read [SUPPORT]

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #4899:
URL: https://github.com/apache/hudi/issues/4899#issuecomment-1050283769


   @scxwhite : thanks for reporting the problem. I remember we fixing around clustering and timeline interplays. Can you give it a try with 0.10.1. I will try to see whats going on. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] scxwhite commented on issue #4899: clustering cause Duplicate Records in Merge on Read [SUPPORT]

Posted by GitBox <gi...@apache.org>.
scxwhite commented on issue #4899:
URL: https://github.com/apache/hudi/issues/4899#issuecomment-1050557047


   @nsivabalan  Thank you for your reply, I have tested the 0.9.0 and 0.10.0 versions of hudi, these two versions have problems, but 0.10.1 is no problem.I will try to upgrade the version of hudi to 0.10.1


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org