You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/09/07 18:27:40 UTC

[GitHub] [hudi] nsivabalan commented on a diff in pull request #6615: [HUDI-4758] Add validations to java spark examples

nsivabalan commented on code in PR #6615:
URL: https://github.com/apache/hudi/pull/6615#discussion_r965133110


##########
hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java:
##########
@@ -65,30 +66,42 @@ public static void main(String[] args) {
   public static void runQuickstart(JavaSparkContext jsc, SparkSession spark, String tableName, String tablePath) {
     final HoodieExampleDataGenerator<HoodieAvroPayload> dataGen = new HoodieExampleDataGenerator<>();
 
-    insertData(spark, jsc, tablePath, tableName, dataGen);
+    Dataset<Row> insertQueryDataIn = insertData(spark, jsc, tablePath, tableName, dataGen);

Review Comment:
   insertDf



##########
hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java:
##########
@@ -65,30 +66,42 @@ public static void main(String[] args) {
   public static void runQuickstart(JavaSparkContext jsc, SparkSession spark, String tableName, String tablePath) {
     final HoodieExampleDataGenerator<HoodieAvroPayload> dataGen = new HoodieExampleDataGenerator<>();
 
-    insertData(spark, jsc, tablePath, tableName, dataGen);
+    Dataset<Row> insertQueryDataIn = insertData(spark, jsc, tablePath, tableName, dataGen);
     queryData(spark, jsc, tablePath, tableName, dataGen);
+    assert insertQueryDataIn.except(spark.sql("SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table")).count() == 0;
 
-    updateData(spark, jsc, tablePath, tableName, dataGen);
+    Dataset<Row> updateQueryDataIn = updateData(spark, jsc, tablePath, tableName, dataGen);
     queryData(spark, jsc, tablePath, tableName, dataGen);
+    assert spark.sql("SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table").except(insertQueryDataIn).except(updateQueryDataIn).count() == 0;
 
     incrementalQuery(spark, tablePath, tableName);
     pointInTimeQuery(spark, tablePath, tableName);
 
-    delete(spark, tablePath, tableName);
+    Dataset<Row> beforeDelete = spark.sql("SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table");
+    Dataset<Row> deleteQueryIn = delete(spark, tablePath, tableName);

Review Comment:
   deletedDf



##########
hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java:
##########
@@ -65,30 +66,42 @@ public static void main(String[] args) {
   public static void runQuickstart(JavaSparkContext jsc, SparkSession spark, String tableName, String tablePath) {
     final HoodieExampleDataGenerator<HoodieAvroPayload> dataGen = new HoodieExampleDataGenerator<>();
 
-    insertData(spark, jsc, tablePath, tableName, dataGen);
+    Dataset<Row> insertQueryDataIn = insertData(spark, jsc, tablePath, tableName, dataGen);
     queryData(spark, jsc, tablePath, tableName, dataGen);
+    assert insertQueryDataIn.except(spark.sql("SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table")).count() == 0;
 
-    updateData(spark, jsc, tablePath, tableName, dataGen);
+    Dataset<Row> updateQueryDataIn = updateData(spark, jsc, tablePath, tableName, dataGen);
     queryData(spark, jsc, tablePath, tableName, dataGen);
+    assert spark.sql("SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table").except(insertQueryDataIn).except(updateQueryDataIn).count() == 0;
 
     incrementalQuery(spark, tablePath, tableName);
     pointInTimeQuery(spark, tablePath, tableName);
 
-    delete(spark, tablePath, tableName);
+    Dataset<Row> beforeDelete = spark.sql("SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table");
+    Dataset<Row> deleteQueryIn = delete(spark, tablePath, tableName);
     queryData(spark, jsc, tablePath, tableName, dataGen);
+    assert beforeDelete.except(deleteQueryIn).except(spark.sql("SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table")).count() == 0;
 
-    insertOverwriteData(spark, jsc, tablePath, tableName, dataGen);
+    Dataset<Row> beforeOverwrite = spark.sql("SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table");
+    Dataset<Row> overwriteDataIn = insertOverwriteData(spark, jsc, tablePath, tableName, dataGen);
+    Dataset<Row> afterOverwrite = spark.sql("SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table");
+    Dataset<Row> overwriteIntersect = beforeOverwrite.intersect(afterOverwrite);
+    assert afterOverwrite.except(overwriteIntersect).except(overwriteDataIn).count() == 0;
     queryData(spark, jsc, tablePath, tableName, dataGen);
 
+    Dataset<Row> beforeDeleteByPartition = spark.sql(
+        "SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table WHERE partitionpath NOT IN ("
+            + String.join(", ", HoodieExampleDataGenerator.DEFAULT_PARTITION_PATHS) + ")");
     deleteByPartition(spark, tablePath, tableName);
     queryData(spark, jsc, tablePath, tableName, dataGen);
+    assert spark.sql("SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table").except(beforeDeleteByPartition).count() == 0;

Review Comment:
   lets ensure we don't delete all partition paths, but just 1 or 2. 



##########
hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java:
##########
@@ -65,30 +66,42 @@ public static void main(String[] args) {
   public static void runQuickstart(JavaSparkContext jsc, SparkSession spark, String tableName, String tablePath) {
     final HoodieExampleDataGenerator<HoodieAvroPayload> dataGen = new HoodieExampleDataGenerator<>();
 
-    insertData(spark, jsc, tablePath, tableName, dataGen);
+    Dataset<Row> insertQueryDataIn = insertData(spark, jsc, tablePath, tableName, dataGen);
     queryData(spark, jsc, tablePath, tableName, dataGen);
+    assert insertQueryDataIn.except(spark.sql("SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table")).count() == 0;
 
-    updateData(spark, jsc, tablePath, tableName, dataGen);
+    Dataset<Row> updateQueryDataIn = updateData(spark, jsc, tablePath, tableName, dataGen);
     queryData(spark, jsc, tablePath, tableName, dataGen);
+    assert spark.sql("SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table").except(insertQueryDataIn).except(updateQueryDataIn).count() == 0;
 
     incrementalQuery(spark, tablePath, tableName);
     pointInTimeQuery(spark, tablePath, tableName);
 
-    delete(spark, tablePath, tableName);
+    Dataset<Row> beforeDelete = spark.sql("SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table");

Review Comment:
   snapshotBeforeDeleteDf



##########
hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java:
##########
@@ -171,16 +185,19 @@ public static void updateData(SparkSession spark, JavaSparkContext jsc, String t
         .option(TBL_NAME.key(), tableName)
         .mode(Append)
         .save(tablePath);
+    return df;
   }
 
   /**
    * Deleta data based in data information.
    */
-  public static void delete(SparkSession spark, String tablePath, String tableName) {
+  public static Dataset<Row> delete(SparkSession spark, String tablePath, String tableName) {
 
     Dataset<Row> roViewDF = spark.read().format("org.apache.hudi").load(tablePath + "/*/*/*/*");
     roViewDF.createOrReplaceTempView("hudi_ro_table");
-    Dataset<Row> df = spark.sql("select uuid, partitionpath, ts from  hudi_ro_table limit 2");
+    //Dataset<Row> df = spark.sql("select uuid, partitionpath, ts from  hudi_ro_table limit 2");
+    Dataset<Row> ret = spark.sql("SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table limit 2");

Review Comment:
   minor. rename `ret` -> `toBeDeletedDf`



##########
hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java:
##########
@@ -171,16 +185,19 @@ public static void updateData(SparkSession spark, JavaSparkContext jsc, String t
         .option(TBL_NAME.key(), tableName)
         .mode(Append)
         .save(tablePath);
+    return df;
   }
 
   /**
    * Deleta data based in data information.
    */
-  public static void delete(SparkSession spark, String tablePath, String tableName) {
+  public static Dataset<Row> delete(SparkSession spark, String tablePath, String tableName) {
 
     Dataset<Row> roViewDF = spark.read().format("org.apache.hudi").load(tablePath + "/*/*/*/*");
     roViewDF.createOrReplaceTempView("hudi_ro_table");
-    Dataset<Row> df = spark.sql("select uuid, partitionpath, ts from  hudi_ro_table limit 2");
+    //Dataset<Row> df = spark.sql("select uuid, partitionpath, ts from  hudi_ro_table limit 2");

Review Comment:
   remove uncommented code. 



##########
hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java:
##########
@@ -65,30 +66,42 @@ public static void main(String[] args) {
   public static void runQuickstart(JavaSparkContext jsc, SparkSession spark, String tableName, String tablePath) {
     final HoodieExampleDataGenerator<HoodieAvroPayload> dataGen = new HoodieExampleDataGenerator<>();
 
-    insertData(spark, jsc, tablePath, tableName, dataGen);
+    Dataset<Row> insertQueryDataIn = insertData(spark, jsc, tablePath, tableName, dataGen);
     queryData(spark, jsc, tablePath, tableName, dataGen);
+    assert insertQueryDataIn.except(spark.sql("SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table")).count() == 0;
 
-    updateData(spark, jsc, tablePath, tableName, dataGen);
+    Dataset<Row> updateQueryDataIn = updateData(spark, jsc, tablePath, tableName, dataGen);
     queryData(spark, jsc, tablePath, tableName, dataGen);
+    assert spark.sql("SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table").except(insertQueryDataIn).except(updateQueryDataIn).count() == 0;
 
     incrementalQuery(spark, tablePath, tableName);
     pointInTimeQuery(spark, tablePath, tableName);
 
-    delete(spark, tablePath, tableName);
+    Dataset<Row> beforeDelete = spark.sql("SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table");

Review Comment:
   if repeating, declare a constant and re-use 
   `SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table` 
   



##########
hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java:
##########
@@ -65,30 +66,42 @@ public static void main(String[] args) {
   public static void runQuickstart(JavaSparkContext jsc, SparkSession spark, String tableName, String tablePath) {
     final HoodieExampleDataGenerator<HoodieAvroPayload> dataGen = new HoodieExampleDataGenerator<>();
 
-    insertData(spark, jsc, tablePath, tableName, dataGen);
+    Dataset<Row> insertQueryDataIn = insertData(spark, jsc, tablePath, tableName, dataGen);
     queryData(spark, jsc, tablePath, tableName, dataGen);
+    assert insertQueryDataIn.except(spark.sql("SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table")).count() == 0;
 
-    updateData(spark, jsc, tablePath, tableName, dataGen);
+    Dataset<Row> updateQueryDataIn = updateData(spark, jsc, tablePath, tableName, dataGen);

Review Comment:
   updateDf



##########
hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java:
##########
@@ -65,30 +66,42 @@ public static void main(String[] args) {
   public static void runQuickstart(JavaSparkContext jsc, SparkSession spark, String tableName, String tablePath) {
     final HoodieExampleDataGenerator<HoodieAvroPayload> dataGen = new HoodieExampleDataGenerator<>();
 
-    insertData(spark, jsc, tablePath, tableName, dataGen);
+    Dataset<Row> insertQueryDataIn = insertData(spark, jsc, tablePath, tableName, dataGen);
     queryData(spark, jsc, tablePath, tableName, dataGen);
+    assert insertQueryDataIn.except(spark.sql("SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table")).count() == 0;
 
-    updateData(spark, jsc, tablePath, tableName, dataGen);
+    Dataset<Row> updateQueryDataIn = updateData(spark, jsc, tablePath, tableName, dataGen);
     queryData(spark, jsc, tablePath, tableName, dataGen);
+    assert spark.sql("SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table").except(insertQueryDataIn).except(updateQueryDataIn).count() == 0;

Review Comment:
   this might need some fixes. consider all error paths. 



##########
hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java:
##########
@@ -65,30 +66,42 @@ public static void main(String[] args) {
   public static void runQuickstart(JavaSparkContext jsc, SparkSession spark, String tableName, String tablePath) {
     final HoodieExampleDataGenerator<HoodieAvroPayload> dataGen = new HoodieExampleDataGenerator<>();
 
-    insertData(spark, jsc, tablePath, tableName, dataGen);
+    Dataset<Row> insertQueryDataIn = insertData(spark, jsc, tablePath, tableName, dataGen);
     queryData(spark, jsc, tablePath, tableName, dataGen);
+    assert insertQueryDataIn.except(spark.sql("SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table")).count() == 0;

Review Comment:
   probably you can separate out into two lines. 
   val hudiDf = spark.sql("SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table")
   assert insertDf.except(hudiDf).count == 0



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org