You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2019/09/03 06:00:39 UTC

[GitHub] [incubator-hudi] vinothchandar commented on issue #859: Hudi upsert after a delete in partition will cause valid records inserted to disappear.

vinothchandar commented on issue #859: Hudi upsert after a delete in partition will cause valid records inserted to disappear.
URL: https://github.com/apache/incubator-hudi/issues/859#issuecomment-527316262
 
 
   Thanks for the detailed explanation. I made the simple change to include a `team` field and used it as the partitioning field (entire code attached below). I can actually see both the records! 
   
   ```
   +-------------------+--------------------+------------------+----------------------+--------------------+---+------+----+---+
   |_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name| id|  name|team| ts|
   +-------------------+--------------------+------------------+----------------------+--------------------+---+------+----+---+
   |     20190902225356|  20190902225356_0_2|                 2|                  hudi|8eebf79c-b40c-4c0...|  2|vinoth|hudi|  1|
   |     20190902225400|  20190902225400_0_3|                 3|                  hudi|8eebf79c-b40c-4c0...|  3|balaji|hudi|  3|
   +-------------------+--------------------+------------------+----------------------+--------------------+---+------+----+---+
   
   $ ls -l /tmp/hoodie/sample-table/hudi/
   total 1276
   -rw-r--r-- 1 vinoth vinoth 433981 Sep  2 22:53 8eebf79c-b40c-4c05-ad60-aee5bf9dc31c-0_0-34-36_20190902225358.parquet
   -rw-r--r-- 1 vinoth vinoth 434180 Sep  2 22:53 8eebf79c-b40c-4c05-ad60-aee5bf9dc31c-0_0-5-5_20190902225356.parquet
   -rw-r--r-- 1 vinoth vinoth 434104 Sep  2 22:54 8eebf79c-b40c-4c05-ad60-aee5bf9dc31c-0_0-67-64_20190902225400.parquet
   vinoth@balerion: incubator-hudi$ ls -l /tmp/hoodie/sample-table/.hoodie/
   total 32
   -rw-r--r-- 1 vinoth vinoth 1085 Sep  2 22:53 20190902225356.clean
   -rw-r--r-- 1 vinoth vinoth 2057 Sep  2 22:53 20190902225356.commit
   -rw-r--r-- 1 vinoth vinoth 1085 Sep  2 22:53 20190902225358.clean
   -rw-r--r-- 1 vinoth vinoth 2071 Sep  2 22:53 20190902225358.commit
   -rw-r--r-- 1 vinoth vinoth 1085 Sep  2 22:54 20190902225400.clean
   -rw-r--r-- 1 vinoth vinoth 2070 Sep  2 22:54 20190902225400.commit
   drwxr-xr-x 2 vinoth vinoth 4096 Sep  2 22:53 archived
   -rw-r--r-- 1 vinoth vinoth  176 Sep  2 22:53 hoodie.properties
   vinoth@balerion: incubator-hudi$ 
   ```
   
   the commit times we see also makes sense..  vinoth has the first commit time and balaji the last one.
   
   Can you try rerunning the program below, and paste me the output w/ the logs? also if you can provide the listings of the .hoodie and data files (like above) , it ll help us get to the bottom of it. 
   
   ```
   /* Licensed to the Apache Software Foundation (ASF) under one
       * or more contributor license agreements.  See the NOTICE file
       * distributed with this work for additional information
       * regarding copyright ownership.  The ASF licenses this file
       * to you under the Apache License, Version 2.0 (the
       * "License"); you may not use this file except in compliance
       * with the License.  You may obtain a copy of the License at
       *
       *      http://www.apache.org/licenses/LICENSE-2.0
       *
       * Unless required by applicable law or agreed to in writing, software
       * distributed under the License is distributed on an "AS IS" BASIS,
       * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
       * See the License for the specific language governing permissions and
       * limitations under the License.
       */
   
   import com.beust.jcommander.JCommander;
   import com.beust.jcommander.Parameter;
   import java.util.Arrays;
   import java.util.List;
   import org.apache.hadoop.fs.FileSystem;
   import org.apache.hudi.DataSourceWriteOptions;
   import org.apache.hudi.EmptyHoodieRecordPayload;
   import org.apache.hudi.HoodieDataSourceHelpers;
   import org.apache.hudi.NonpartitionedKeyGenerator;
   import org.apache.hudi.SimpleKeyGenerator;
   import org.apache.hudi.common.model.HoodieTableType;
   import org.apache.hudi.config.HoodieWriteConfig;
   import org.apache.log4j.LogManager;
   import org.apache.log4j.Logger;
   import org.apache.spark.api.java.JavaSparkContext;
   import org.apache.spark.sql.DataFrameWriter;
   import org.apache.spark.sql.Dataset;
   import org.apache.spark.sql.Row;
   import org.apache.spark.sql.SaveMode;
   import org.apache.spark.sql.SparkSession;
   
   /**
    * Sample program that writes & reads hoodie datasets via the Spark datasource
    */
   public class HoodieIssue859 {
   
     @Parameter(names = {"--table-path", "-p"}, description = "path for Hoodie sample table")
     private String tablePath = "file:///tmp/hoodie/sample-table";
   
     @Parameter(names = {"--table-name", "-n"}, description = "table name for Hoodie sample table")
     private String tableName = "hoodie_test";
   
     @Parameter(names = {"--table-type", "-t"}, description = "One of COPY_ON_WRITE or MERGE_ON_READ")
     private String tableType = HoodieTableType.COPY_ON_WRITE.name();
   
     @Parameter(names = {"--help", "-h"}, help = true)
     public Boolean help = false;
   
     private static Logger logger = LogManager.getLogger(HoodieIssue859.class);
   
     public static void main(String[] args) throws Exception {
       HoodieIssue859 cli = new HoodieIssue859();
       JCommander cmd = new JCommander(cli, args);
   
       if (cli.help) {
         cmd.usage();
         System.exit(1);
       }
       cli.run();
     }
   
     private void printTable(SparkSession spark) {
       Dataset<Row> hoodieROViewDF = spark.read().format("org.apache.hudi").load(tablePath +  "/*");
       hoodieROViewDF.registerTempTable("hoodie_ro");
       spark.sql("select * from hoodie_ro").show();
     }
   
     private DataFrameWriter<Row> getBaseWriter(Dataset<Row> inputDF) {
       return inputDF.write().format("org.apache.hudi") // specify the hoodie source
           .option("hoodie.insert.shuffle.parallelism",
               "2") // any hoodie client config can be passed like this
           .option("hoodie.upsert.shuffle.parallelism",
               "2") // full list in HoodieWriteConfig & its package
           .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY(), tableType)
           .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(),
               "id") // This is the record key
           .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(),
               "team") // this is the partition to place it into
           .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(),
               "ts") // use to combine duplicate records in input/with disk val
           .option(HoodieWriteConfig.TABLE_NAME, tableName) // Used by hive sync and queries
           .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY(), SimpleKeyGenerator.class.getCanonicalName());
     }
   
   
     public void run() throws Exception {
   
       // Spark session setup..
       SparkSession spark = SparkSession.builder().appName("Hoodie Spark APP")
           .config("spark.serializer",
               "org.apache.spark.serializer.KryoSerializer").master("local[1]")
           .getOrCreate();
       JavaSparkContext jssc = new JavaSparkContext(spark.sparkContext());
       FileSystem fs = FileSystem.get(jssc.hadoopConfiguration());
   
       // Step 1
       List<String> insertRecords = Arrays.asList("{'id' : 1, 'name': 'kabeer', 'ts': 1, 'team': 'hudi'}",
           "{'id' : 2, 'name': 'vinoth', 'ts': 1, 'team': 'hudi'}");
       Dataset<Row> inputDF1 = spark.read().json(jssc.parallelize(insertRecords, 1));
       getBaseWriter(inputDF1)
           .option(DataSourceWriteOptions.OPERATION_OPT_KEY(), DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL())
           .mode(SaveMode.Overwrite)
           .save(tablePath);
       logger.info("First commit at instant time :" + HoodieDataSourceHelpers.latestCommit(fs, tablePath));
       printTable(spark);
   
   
       // Step 2
       List<String> deleteRecords = Arrays.asList("{'id' : 1, 'name': 'kabeer', 'ts': 1, 'team': 'hudi'}");
       Dataset<Row> inputDF2 = spark.read().json(jssc.parallelize(deleteRecords, 1));
       getBaseWriter(inputDF2)
           .option(DataSourceWriteOptions.OPERATION_OPT_KEY(), DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL())
           .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY(), EmptyHoodieRecordPayload.class.getName())
           .mode(SaveMode.Append)
           .save(tablePath);
       logger.info("Second commit at instant time :" + HoodieDataSourceHelpers.latestCommit(fs, tablePath));
       printTable(spark);
   
   
       // Step 3
       List<String> upsertRecords = Arrays.asList("{'id' : 3, 'name': 'balaji', 'ts': 3, 'team': 'hudi'}");
       Dataset<Row> inputDF3 = spark.read().json(jssc.parallelize(upsertRecords, 1));
       getBaseWriter(inputDF3)
           .option(DataSourceWriteOptions.OPERATION_OPT_KEY(), DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL())
           .mode(SaveMode.Append)
           .save(tablePath);
       logger.info("Third commit at instant time :" + HoodieDataSourceHelpers.latestCommit(fs, tablePath));
       printTable(spark);
   
     }
   }
   
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services