You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "michael1991 (via GitHub)" <gi...@apache.org> on 2023/02/17 08:36:01 UTC

[GitHub] [hudi] michael1991 opened a new issue, #7988: [SUPPORT] Facing issues when using custom Payload class

michael1991 opened a new issue, #7988:
URL: https://github.com/apache/hudi/issues/7988

   **_Tips before filing an issue_**
   
   - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)?
   - Yes, but there is no complete example.
   
   - Join the mailing list to engage in conversations and get faster support at dev-subscribe@hudi.apache.org.
   - No.
   
   - If you have triaged this as a bug, then file an [issue](https://issues.apache.org/jira/projects/HUDI/issues) directly.
   - No.
   
   **Describe the problem you faced**
   
   I'm using HUDI 0.12.0 on GCP Dataproc 2.1.3 with Spark 3.3.0, when I try to use custom Payload class following steps below:
   1. package single jar with custom Payload class
   2. set spark.driver.extraClassPath and spark.executor.extraClassPath during SparkSession creation
   3. config WRITE_PAYLOAD_CLASS_NAME with custom Payload class on writing
   Please give me a favor to handle this situation, Thanks in advance !
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1.
   4.
   5.
   6.
   
   **Expected behavior**
   
   A clear and concise description of what you expected to happen.
   
   **Environment Description**
   
   * Hudi version : 0.12.0
   
   * Spark version : 3.3.0
   
   * Hive version : not used
   
   * Hadoop version : 3.3.3
   
   * Storage (HDFS/S3/GCS..) : GCS
   
   * Running on Docker? (yes/no) : no
   
   
   **Additional context**
   
   Sorry, not sure.
   
   **Stacktrace**
   
   ```
   Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 15.0 failed 4 times, most recent failure: Lost task 0.3 in stage 15.0 (TID 797) (executor 2): java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of org.apache.spark.rdd.MapPartitionsRDD
   	at java.base/java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2076)
   	at java.base/java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2039)
   	at java.base/java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1293)
   	at java.base/java.io.ObjectInputStream.defaultCheckFieldValues(ObjectInputStream.java:2512)
   	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2419)
   	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
   	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
   	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
   	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
   	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
   	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
   	at java.base/java.io.ObjectInputStream.readArray(ObjectInputStream.java:2134)
   	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1675)
   	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
   	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
   	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
   	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
   	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
   	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
   	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
   	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
   	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
   	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
   	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
   	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
   	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)
   	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)
   	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
   	at jdk.internal.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
   	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
   	at java.base/java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1046)
   	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2357)
   	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
   	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
   	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
   	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
   	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
   	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
   	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
   	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
   	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
   	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
   	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)
   	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)
   	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
   	at jdk.internal.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
   	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
   	at java.base/java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1046)
   	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2357)
   	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
   	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
   	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
   	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
   	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
   	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
   	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
   	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
   	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
   	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
   	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)
   	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)
   	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
   	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
   	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:85)
   	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
   	at org.apache.spark.scheduler.Task.run(Task.scala:136)
   	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
   	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
   	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
   	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
   	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
   	at java.base/java.lang.Thread.run(Thread.java:829)
   
   Driver stacktrace:
   	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2673)
   	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2609)
   	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2608)
   	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
   	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
   	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
   	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2608)
   	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
   	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
   	at scala.Option.foreach(Option.scala:407)
   	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
   	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2861)
   	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2803)
   	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2792)
   	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
   	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
   	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
   	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2257)
   	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2276)
   	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2301)
   	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
   	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
   	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
   	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
   	at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
   	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$countByKey$1(PairRDDFunctions.scala:367)
   	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
   	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
   	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
   	at org.apache.spark.rdd.PairRDDFunctions.countByKey(PairRDDFunctions.scala:367)
   	at org.apache.spark.api.java.JavaPairRDD.countByKey(JavaPairRDD.scala:314)
   	at org.apache.hudi.data.HoodieJavaPairRDD.countByKey(HoodieJavaPairRDD.java:105)
   	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.buildProfile(BaseSparkCommitActionExecutor.java:187)
   	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:156)
   	at org.apache.hudi.table.action.deltacommit.SparkUpsertPreppedDeltaCommitActionExecutor.execute(SparkUpsertPreppedDeltaCommitActionExecutor.java:45)
   	at org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsertPrepped(HoodieSparkMergeOnReadTable.java:112)
   	at org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsertPrepped(HoodieSparkMergeOnReadTable.java:76)
   	at org.apache.hudi.client.SparkRDDWriteClient.upsertPreppedRecords(SparkRDDWriteClient.java:169)
   	at org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.commit(SparkHoodieBackedTableMetadataWriter.java:166)
   	at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.update(HoodieBackedTableMetadataWriter.java:934)
   	at org.apache.hudi.table.action.BaseActionExecutor.lambda$writeTableMetadata$2(BaseActionExecutor.java:77)
   	at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
   	at org.apache.hudi.table.action.BaseActionExecutor.writeTableMetadata(BaseActionExecutor.java:77)
   	at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.finishRollback(BaseRollbackActionExecutor.java:247)
   	at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.runRollback(BaseRollbackActionExecutor.java:116)
   	at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.execute(BaseRollbackActionExecutor.java:137)
   	at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.rollback(HoodieSparkCopyOnWriteTable.java:281)
   	at org.apache.hudi.client.BaseHoodieWriteClient.rollback(BaseHoodieWriteClient.java:772)
   	... 52 more
   Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of org.apache.spark.rdd.MapPartitionsRDD
   	at java.base/java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2076)
   	at java.base/java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2039)
   	at java.base/java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1293)
   	at java.base/java.io.ObjectInputStream.defaultCheckFieldValues(ObjectInputStream.java:2512)
   	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2419)
   	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
   	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
   	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
   	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
   	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
   	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
   	at java.base/java.io.ObjectInputStream.readArray(ObjectInputStream.java:2134)
   	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1675)
   	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
   	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
   	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
   	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
   	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
   	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
   	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
   	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
   	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
   	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
   	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
   	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
   	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)
   	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)
   	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
   	at jdk.internal.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
   	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
   	at java.base/java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1046)
   	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2357)
   	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
   	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
   	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
   	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
   	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
   	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
   	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
   	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
   	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
   	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
   	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)
   	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)
   	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
   	at jdk.internal.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
   	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
   	at java.base/java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1046)
   	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2357)
   	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
   	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
   	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
   	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
   	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
   	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
   	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
   	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
   	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
   	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
   	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)
   	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)
   	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
   	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
   	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:85)
   	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
   	at org.apache.spark.scheduler.Task.run(Task.scala:136)
   	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
   	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
   	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
   	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
   	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
   	at java.base/java.lang.Thread.run(Thread.java:829)
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] michael1991 commented on issue #7988: [SUPPORT] Facing issues based on custom Payload implementation setting

Posted by "michael1991 (via GitHub)" <gi...@apache.org>.
michael1991 commented on issue #7988:
URL: https://github.com/apache/hudi/issues/7988#issuecomment-1437741101

   Problem solved here !
   Key point for Dataproc is that HUDI is configured automatically during cluster initialization, if we overwrite spark.{driver/executor}.extraClassPath, then HUDI bundle jar will be removed from extraClassPath configuration.
   So Tips for Dataproc with HUDI:
   option 1. package your own HUDI bundle jar with custom implementation
   option 2. keep HUDI bundle jar while overwriting extraClassPath like hudi-bundle.jar:custom.jar


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] michael1991 closed issue #7988: [SUPPORT] Facing issues based on custom Payload implementation setting

Posted by "michael1991 (via GitHub)" <gi...@apache.org>.
michael1991 closed issue #7988: [SUPPORT] Facing issues based on custom Payload implementation setting
URL: https://github.com/apache/hudi/issues/7988


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] michael1991 commented on issue #7988: [SUPPORT] Facing issues when using custom Payload class

Posted by "michael1991 (via GitHub)" <gi...@apache.org>.
michael1991 commented on issue #7988:
URL: https://github.com/apache/hudi/issues/7988#issuecomment-1436228601

   Hi @yihua, thanks for your quickly response ! Sorry for my late comment due to weekends.
   My custom payload implementation seems like :
   ```java
   package com.x.y.z.payload;
   
   import org.apache.avro.Schema;
   import org.apache.avro.generic.GenericRecord;
   import org.apache.avro.generic.GenericRecordBuilder;
   import org.apache.avro.generic.IndexedRecord;
   import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
   import org.apache.hudi.common.util.Option;
   
   import java.io.IOException;
   import java.util.List;
   import java.util.Properties;
   
   import static org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS;
   
   /**
    * Custom Payload Class to Process Incremental Log
    */
   public class CustomPayload extends OverwriteWithLatestAvroPayload {
   
       public CustomPayload(GenericRecord record, Comparable orderingVal) {
           super(record, orderingVal);
       }
   
       public CustomPayload(Option<GenericRecord> record) {
           super(record);
       }
   
       @Override
       public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties properties) throws IOException {
           Option<IndexedRecord> recordOption = getInsertValue(schema);
           if (!recordOption.isPresent()) {
               return Option.empty();
           }
   
           GenericRecord insertRecord = (GenericRecord) recordOption.get();
           GenericRecord currentRecord = (GenericRecord) currentValue;
   
           if (isDeleteRecord(insertRecord)) {
               return Option.empty();
           } else {
               final GenericRecordBuilder builder = new GenericRecordBuilder(schema);
               List<Schema.Field> fields = schema.getFields();
               for (Schema.Field field : fields) {
                   String fieldName = field.name();
                   Object insertFieldValue = insertRecord.get(field.name());
                   Object currentFieldValue = currentRecord.get(field.pos());
   
                   switch (fieldName) {
                       case "double_price_1":
                       case "double_price_2":
                           Double insertPrice = toDouble(insertFieldValue);
                           Double currentPrice = toDouble(currentFieldValue);
                           if (insertPrice != null && (currentPrice == null || insertPrice < currentPrice)) {
                               builder.set(field, insertFieldValue);
                           } else {
                               builder.set(field, currentFieldValue);
                           }
                           break;
                       case "int_flag_1":
                       case "int_flag_2":
                           int insertFlag = Integer.parseInt(insertFieldValue.toString());
                           int currentFlag = Integer.parseInt(currentFieldValue.toString());
                           builder.set(field, currentFlag == 0 && insertFlag == 1 ? insertFieldValue : currentFieldValue);
                           break;
                       case "long_time_1":
                       case "long_time_2":
                           Long insertTime = toLong(insertFieldValue);
                           Long currentTime = toLong(currentFieldValue);
                           if (insertTime != null && (currentTime == null || insertTime < currentTime)) {
                               builder.set(field, insertFieldValue);
                           } else {
                               builder.set(field, currentFieldValue);
                           }
                           break;
                       default:
                           if (HOODIE_META_COLUMNS_NAME_TO_POS.containsKey(fieldName) || (insertFieldValue != null && currentFieldValue == null)) {
                               builder.set(field, insertFieldValue);
                           } else {
                               builder.set(field, currentFieldValue);
                           }
                           break;
                   }
               }
               return Option.of(builder.build());
           }
       }
   
       private Double toDouble(Object object) {
           if (object == null) {
               return null;
           } else {
               try {
                   return Double.parseDouble(object.toString());
               } catch (Exception exception) {
                   return null;
               }
           }
       }
   
       private Long toLong(Object object) {
           if (object == null) {
               return null;
           } else {
               try {
                   return Long.parseLong(object.toString());
               } catch (Exception exception) {
                   return null;
               }
           }
       }
   
   }
   
   ```
   **Sample schema** : key, ts, int_flag_1, double_price_1, long_time_1, int_flag_2, double_price_2, long_time_2
   flag: 0 or 1 , int type
   price: double price
   long_time: timestamp
   key: record key
   ts: pre combine column, but unnecessary based on our scenario
   
   Sample data or unit test, I couldn't share it, sorry.
   
   BTW, I saw [PartialUpdateAvroPayload](https://github.com/apache/hudi/blob/master/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java) before, but we need more flexible updating on our scenario, so we try to implement by ourselves. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] yihua commented on issue #7988: [SUPPORT] Facing issues when using custom Payload class

Posted by "yihua (via GitHub)" <gi...@apache.org>.
yihua commented on issue #7988:
URL: https://github.com/apache/hudi/issues/7988#issuecomment-1435415262

   Hi @michael1991 thanks for the question.  Could you put up a draft of your payload implementation?  The issue is likely due to the payload implementation and related to the schema of the records.  If possible, could you also provide sample data or unit test to reproduce this problem?
   
   If you're interested in the partial updates, there's already a payload implementation for that, [`PartialUpdateAvroPayload`](https://github.com/apache/hudi/blob/master/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java), which is going to be released in 0.13.0.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org