You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/11/21 07:08:08 UTC

[GitHub] [hudi] bithw1 opened a new issue #2267: [SUPPORT] Urgent: MOR seems not working as expected

bithw1 opened a new issue #2267:
URL: https://github.com/apache/hudi/issues/2267


   
   
   Hi,
   I am doing POC about Hudi, but I am quite new to Hudi, I am stuck here! I would like to ask about Hudi merge on read table
   (query out only the basic parquet data, not include the delta data)
   
   I am using Hudi 0.6.0 + Spark 2.4.4
   
   
   I use the following code to do 3 operations: `1. overwrite 2. upsert 3. upsert.`
   
   1. After the operations are done, when I look at the HDFS files, there are 3 parquets there, I have thought that only one parquet be there, the two upsert operations should be written to avro files as delta commits.
   
   ```
   [root@host1]hdfs dfs -ls /data/hudi_demo/hudi_hive_read_write_mor_4/2020-11-19
   Found 4 items
   -rw-r--r--   2 rootsupergroup         93 2020-11-21 14:45 /data/hudi_demo/hudi_hive_read_write_mor_4/2020-11-19/.hoodie_partition_metadata
   -rw-r--r--   2 rootsupergroup     435000 2020-11-21 14:45 /data/hudi_demo/hudi_hive_read_write_mor_4/2020-11-19/3f7b1807-8099-4629-8fb7-c8a74ba9298d-0_0-21-21_20201121144541.parquet
   -rw-r--r--   2 root supergroup     434787 2020-11-21 14:45 /data/hudi_demo/hudi_hive_read_write_mor_4/2020-11-19/3f7b1807-8099-4629-8fb7-c8a74ba9298d-0_0-51-49_20201121144550.parquet
   -rw-r--r--   2 root supergroup     434842 2020-11-21 14:45 /data/hudi_demo/hudi_hive_read_write_mor_4/2020-11-19/3f7b1807-8099-4629-8fb7-c8a74ba9298d-0_0-81-77_20201121144554.parquet
   ```
   
   2. In the code, I have used the following code snippet to disable compaction, not sure it is correct to disable compaction, I disable compaction simply want to see the delta commit files
   
   ```
         //disable async compact
         .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_OPT_KEY, "false")
         .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP, 100)
         //disable inline compact
         .option(HoodieCompactionConfig.INLINE_COMPACT_PROP, "false")
   ```
   3. In Hive,
   
   I do the following query, I want to query only the basic parquet data, not include the delta commit data.
   `select * from xyz.hudi_hive_read_write_mor_4_ro;`, it query out all the recent records(with the larget creation_date for the updated record) data, like a snapshot query
   
   4. In spark shell,
   
   I do the same query:
   
   ```
   val basepath="/data/hudi_demo/hudi_hive_read_write_mor_4"
   
   val x= spark.read.format("hudi").option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
   
   val df =x.load(basepath + "/*")
   
   df.show
   ```
   
   The result is same as query in hive.
   
   
   Following is the code, that I use to do the POC
   
   ```
   package org.example.hudi
   
   import org.apache.hudi.DataSourceWriteOptions
   import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, HoodieWriteConfig}
   import org.apache.hudi.index.HoodieIndex
   import org.apache.spark.sql.{SaveMode, SparkSession}
   
   case class MyOrder(
                       name: String,
                       price: String,
                       creation_date: String,
                       dt: String)
   
   object MORTest {
     val overwrite1Data = Seq(
       MyOrder("A", "12.7", "2020-11-18 14:43:32", "2020-11-19"),
       MyOrder("B", "13.2", "2020-11-18 14:42:21", "2020-11-19"),
       MyOrder("C", "11.6", "2020-11-18 14:47:19", "2020-11-19"),
       MyOrder("D", "10.4", "2020-11-18 14:46:50", "2020-11-19")
     )
   
     val insertUpdate1Data = Seq(
       MyOrder("A", "13.7", "2020-11-18 14:53:32", "2020-11-19"), //update A
       MyOrder("E", "11.6", "2020-11-18 14:47:19", "2020-11-19"), //add E,F
       MyOrder("F", "10.4", "2020-11-18 14:46:50", "2020-11-19")
     )
   
     val insertUpdate2Data = Seq(
       MyOrder("X", "13.7", "2020-11-18 14:53:32", "2020-11-19"), //add X, Y
       MyOrder("Y", "18.2", "2020-11-19 14:42:21", "2020-11-19"),
       MyOrder("F", "17.4", "2020-11-18 15:46:50", "2020-11-19") //Update F
     )
   
     val spark = SparkSession.builder.appName("MORTest")
       .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
       .config("spark.sql.warehouse.dir", "hdfs:///user/hive/warehouse")
       .enableHiveSupport().getOrCreate()
   
     val hudi_table = "hudi_hive_read_write_mor_4"
   
     val base_path = s"/data/hudi_demo/$hudi_table"
   
     def run(op: Int) = {
       val (data, saveMode) = op match {
         case 1 => (overwrite1Data, SaveMode.Overwrite)
         case 2 => (insertUpdate1Data, SaveMode.Append)
         case 3 => (insertUpdate2Data, SaveMode.Append)
       }
       import spark.implicits._
       val insertData = spark.createDataset(data)
       insertData.write.format("hudi")
         .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "name")
         .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "creation_date")
         .option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "xyz")
         .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, hudi_table)
         .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
         .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true")
         .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "dt")
   
         //table type: MOR
         .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
   
         //disable async compact
         .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_OPT_KEY, "false")
         .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP, 100)
         //disable inline compact
         .option(HoodieCompactionConfig.INLINE_COMPACT_PROP, "false")
   
   
         .option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, "jdbc:hive2://10.41.90.208:10000")
         .option(HoodieWriteConfig.TABLE_NAME, hudi_table)
         .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor")
         .option(HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name())
         .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "dt")
         .option("hoodie.insert.shuffle.parallelism", "2")
         .option("hoodie.upsert.shuffle.parallelism", "2")
         .mode(saveMode)
         .save(base_path);
     }
   
   
     def main(args: Array[String]): Unit = {
       //do overwrite
       run(1)
   
       //do upsert
       run(2)
   
       //do upsert
       run(3)
   
       println("===MOR is done=====")
     }
   
   }
   ```
   
   
   
   
   
   
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bithw1 commented on issue #2267: [SUPPORT] Urgent: Please Help: MOR seems not working as expected

Posted by GitBox <gi...@apache.org>.
bithw1 commented on issue #2267:
URL: https://github.com/apache/hudi/issues/2267#issuecomment-731670954


   Hi @bvaradar , thanks a lot for the great help. I tried your instruction, and it works as expected now.  
   So, I am closing this issue now.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar commented on issue #2267: [SUPPORT] Urgent: Please Help: MOR seems not working as expected

Posted by GitBox <gi...@apache.org>.
bvaradar commented on issue #2267:
URL: https://github.com/apache/hudi/issues/2267#issuecomment-731650631


   @bithw1 : The 3 Parquet files are different versions of the same file. 
   
   Your second and third batch has both new records and updates. Hudi will write new records to parquet files but can route updates to delta files. As you have very few records in the initial parquet file, Hudi will route both new records and updates to the same Parquet file and a new version of the file is created. 
   
   If you do only have updates in batch 2, you will see the delta files.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bithw1 closed issue #2267: [SUPPORT] Urgent: Please Help: MOR seems not working as expected

Posted by GitBox <gi...@apache.org>.
bithw1 closed issue #2267:
URL: https://github.com/apache/hudi/issues/2267


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org