You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2019/11/21 08:38:48 UTC

[GitHub] [incubator-hudi] simonqin edited a comment on issue #1021: how can i deal this problem when partition's value changed with the same row_key?

simonqin edited a comment on issue #1021: how can i deal this problem when partition's value changed with the same row_key? 
URL: https://github.com/apache/incubator-hudi/issues/1021#issuecomment-555870581
 
 
   @bhasudha I tested three versions, such as 0.4.7, 0.5.0, master,all of them have errors.
   here is my test code.This is modified according to the run method in HoodieClientExample.java of hudi-client.:
   ```)
   public void runTest() throws Exception {
   
       SparkConf sparkConf = new SparkConf().setAppName("hoodie-client-example");
       sparkConf.setMaster("local[1]");
       sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
       sparkConf.set("spark.kryoserializer.buffer.max", "512m");
       JavaSparkContext jsc = new JavaSparkContext(sparkConf);
   
       // Generator of some records to be loaded in.
       HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
   
       // initialize the table, if not done already
       Path path = new Path(tablePath);
       FileSystem fs = FSUtils.getFs(tablePath, jsc.hadoopConfiguration());
       if (!fs.exists(path)) {
         HoodieTableMetaClient.initTableType(jsc.hadoopConfiguration(), tablePath, HoodieTableType.valueOf(tableType),
                 tableName, HoodieAvroPayload.class.getName());
       }
   
       // Create the write client to write some records in
       HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
               .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2).forTable(tableName)
               .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(IndexType.GLOBAL_BLOOM).build())
               .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(11, 12).build()).build();
       HoodieWriteClient client = new HoodieWriteClient(jsc, cfg);
   
       /**
        * Write 1 (only inserts)
        */
       String newCommitTime = client.startCommit();
       logger.info("Starting commit " + newCommitTime);
   //    List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
       List<HoodieRecord> records = generateInserts(newCommitTime);
       JavaRDD<HoodieRecord> writeRecords = jsc.<HoodieRecord>parallelize(records, 1);
       client.upsert(writeRecords, newCommitTime);
   
       /**
        * Write 2 (updates)
        */
       newCommitTime = client.startCommit();
       logger.info("Starting commit " + newCommitTime);
   //    records.addAll(dataGen.generateUpdates(newCommitTime, 100));
       records.clear();
       records.addAll(generateUpdates(newCommitTime));
       writeRecords = jsc.<HoodieRecord>parallelize(records, 1);
       client.upsert(writeRecords, newCommitTime);
   
       /**
        * Schedule a compaction and also perform compaction on a MOR dataset
        */
       if (HoodieTableType.valueOf(tableType) == HoodieTableType.MERGE_ON_READ) {
         Option<String> instant = client.scheduleCompaction(Option.empty());
         JavaRDD<WriteStatus> writeStatues = client.compact(instant.get());
         client.commitCompaction(instant.get(), writeStatues, Option.empty());
       }
     }
   
     public List<HoodieRecord> generateInserts(String commitTime) throws IOException {
       List<HoodieRecord> inserts = new ArrayList<>();
   
       String partitionPath = "2016/03/15";
       HoodieKey key = new HoodieKey("1", partitionPath);
       HoodieRecord record = new HoodieRecord(key, HoodieTestDataGenerator.generateRandomValue(key, commitTime));
       inserts.add(record);
   
       return inserts;
     }
   
     public List<HoodieRecord> generateUpdates(String commitTime) throws IOException {
       List<HoodieRecord> updates = new ArrayList<>();
   
       String partitionPath = "2016/04/15";
       HoodieKey key = new HoodieKey("1", partitionPath);
       HoodieRecord record = new HoodieRecord(key, HoodieTestDataGenerator.generateRandomValue(key, commitTime));
       updates.add(record);
   
       return updates;
     }
   
   error log:
   16214 [Executor task launch worker-0] INFO  org.apache.hudi.common.table.view.AbstractTableFileSystemView  - Building file system view for partition (2016/04/15)
   16214 [Executor task launch worker-0] INFO  org.apache.hudi.common.table.view.AbstractTableFileSystemView  - #files found in partition (2016/04/15) =0, Time taken =0
   16214 [Executor task launch worker-0] INFO  org.apache.hudi.common.table.view.AbstractTableFileSystemView  - addFilesToView: NumFiles=0, FileGroupsCreationTime=0, StoreTimeTaken=0
   16214 [Executor task launch worker-0] INFO  org.apache.hudi.common.table.view.HoodieTableFileSystemView  - Adding file-groups for partition :2016/04/15, #FileGroups=0
   16214 [Executor task launch worker-0] INFO  org.apache.hudi.common.table.view.AbstractTableFileSystemView  - Time to load partition (2016/04/15) =0
   16214 [Executor task launch worker-0] ERROR org.apache.hudi.table.HoodieCopyOnWriteTable  - Error upserting bucketType UPDATE for partition :0
   java.util.NoSuchElementException: No value present in Option
   	at org.apache.hudi.common.util.Option.get(Option.java:88)
   	at org.apache.hudi.io.HoodieMergeHandle.<init>(HoodieMergeHandle.java:74)
   	at org.apache.hudi.table.HoodieCopyOnWriteTable.getUpdateHandle(HoodieCopyOnWriteTable.java:220)
   	at org.apache.hudi.table.HoodieCopyOnWriteTable.handleUpdate(HoodieCopyOnWriteTable.java:177)
   	at org.apache.hudi.table.HoodieCopyOnWriteTable.handleUpsertPartition(HoodieCopyOnWriteTable.java:257)
   	at org.apache.hudi.HoodieWriteClient.lambda$upsertRecordsInternal$507693af$1(HoodieWriteClient.java:428)
   	at org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
   	at org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
   	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:843)
   	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:843)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
   	at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
   	at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
   	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:973)
   	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:948)
   	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888)
   	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948)
   	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694)
   	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
   	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
   	at org.apache.spark.scheduler.Task.run(Task.scala:99)
   	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   	at java.lang.Thread.run(Thread.java:748)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services