You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/06/07 16:26:26 UTC

[GitHub] [hudi] nandini57 commented on issue #1705: Tracking Hudi Data along transaction time and buisness time

nandini57 commented on issue #1705:
URL: https://github.com/apache/hudi/issues/1705#issuecomment-640243834


   haha. Sharing a superb short version which i tried implementing from functional point of view.
   Check Chapter 10.
   https://goldmansachs.github.io/reladomo-kata/reladomo-tour-docs/tour-guide.html#N408B5
   
   Here's my version of it with Hudi.
   1. A custom merge -> DELETE + INSERT ops.Need to wrap each op with try catch and rollback commit time if it fails.I don't get this facility with plain hdfs saves and hudi is useful here.Correct me if wrong.
   2.Timestamped record keys so that it doesn't uproot data which is saved already.I need full control on old data saved to support Bitemporal
   
   Need your views on implementing like this .
   
    public static void merge(Dataset<Row> baseDF, Dataset<Row> adjustedDF, SparkSession spark, String tableType, String tableName, String tablePath, String partitionPath) throws IOException {
   
           baseDF.createOrReplaceTempView ("base");
           adjustedDF.createOrReplaceTempView ("adjustments");
   
           String findMatchSql = "select b.*,a.FROM_Z as FROM_Z_NEW,a.THRU_Z as THRU_Z_NEW ,a.OUT_Z as OUT_Z_NEW, a.IN_Z as IN_Z_NEW from base b inner join adjustments a where b.accountId = a.accountId and b.OUT_Z='99991231'";
           Dataset<Row> matchedDF = spark.sql (findMatchSql).cache (); // Cache Required as DF will be reused
   
           matchedDF.show (false);
   
           Dataset<Row> toBeDeletedDF = matchedDF.drop ("FROM_Z_NEW").drop ("THRU_Z_NEW").drop ("OUT_Z_NEW").drop ("IN_Z_NEW");
   
           //To be Invalidated -Insert
   
           Dataset<Row> invalidatedDF = matchedDF.drop ("OUT_Z").withColumnRenamed ("IN_Z_NEW", "OUT_Z")
                   .drop ("FROM_Z_NEW").drop ("OUT_Z_NEW").drop ("THRU_Z_NEW");
   
           invalidatedDF.show ();
   
           //New World View -Insert
   
           Dataset<Row> newWorldViewDF = matchedDF.drop ("THRU_Z").drop ("IN_Z")
                   .withColumnRenamed ("IN_Z_NEW", "IN_Z").withColumnRenamed ("FROM_Z_NEW", "THRU_Z")
          .drop ("OUT_Z_NEW").drop ("THRU_Z_NEW");
   
           newWorldViewDF.show ();
   
   // Rollback to be added if it fails.Same with Insert
           getBaseWriter (toBeDeletedDF, tableType, tableName)
                   .option (DataSourceWriteOptions.OPERATION_OPT_KEY ( ), DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL ( ))
                   .option (DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY ( ), EmptyHoodieRecordPayload.class.getName ( ))
                   .mode (SaveMode.Append)
                   .save (tablePath);
   
   
           Dataset<Row> mergedDF = invalidatedDF.drop ("recordKey")
                   .withColumn ("recordKey", functions.concat_ws ("_", functions.lit (recordKeyGen ( )), functions.col ("accountId")))
                   .unionByName (newWorldViewDF.drop ("recordKey")
                           .withColumn ("recordKey", functions.concat_ws ("_", functions.lit (recordKeyGen ( )), functions.col ("accountId")))
                           .unionByName (adjustedDF));
   
          mergedDF.createOrReplaceTempView ("mergedDF");
   
          mergedDF= spark.sql ("select * from mergedDF where FROM_Z < THRU_Z");
   
           getBaseWriter (mergedDF, tableType, tableName)
                   .option (DataSourceWriteOptions.OPERATION_OPT_KEY ( ), DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL ( ))
                   .mode (SaveMode.Append)
                   .save (tablePath);
   
   
       }
   
   
   
   
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org