You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/04/28 04:32:57 UTC

[GitHub] [incubator-hudi] hddong commented on a change in pull request #1558: [HUDI-796]: added deduping logic for upserts case

hddong commented on a change in pull request #1558:
URL: https://github.com/apache/incubator-hudi/pull/1558#discussion_r416299956



##########
File path: hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
##########
@@ -64,11 +64,15 @@ public String deduplicate(
       @CliOption(key = {"repairedOutputPath"}, help = "Location to place the repaired files",
           mandatory = true) final String repairedOutputPath,
       @CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path",
-          mandatory = true) final String sparkPropertiesPath)
+          mandatory = true) final String sparkPropertiesPath,
+      @CliOption(key = {"useCommitTimeForDedupe"}, help = "Set it to true if duplicates have never been updated",
+        unspecifiedDefaultValue = "true") final boolean useCommitTimeForDedupe,
+      @CliOption(key = {"dryrun"}, help = "Should we actually add or just print what would be done",

Review comment:
       Let us modify help string of dryrun, statements are inaccurate :)

##########
File path: hudi-cli/src/main/scala/org/apache/hudi/cli/DedupeSparkJob.scala
##########
@@ -103,24 +105,51 @@ class DedupeSparkJob(basePath: String,
     // Mark all files except the one with latest commits for deletion
     dupeMap.foreach(rt => {
       val (key, rows) = rt
-      var maxCommit = -1L
-
-      rows.foreach(r => {
-        val c = r(3).asInstanceOf[String].toLong
-        if (c > maxCommit)
-          maxCommit = c
-      })
-
-      rows.foreach(r => {
-        val c = r(3).asInstanceOf[String].toLong
-        if (c != maxCommit) {
-          val f = r(2).asInstanceOf[String].split("_")(0)
-          if (!fileToDeleteKeyMap.contains(f)) {
-            fileToDeleteKeyMap(f) = HashSet[String]()
+
+      if (useCommitTimeForDedupe) {
+        /*
+        This corresponds to the case where duplicates got created due to INSERT and have never been updated.
+         */
+        var maxCommit = -1L
+
+        rows.foreach(r => {
+          val c = r(3).asInstanceOf[String].toLong
+          if (c > maxCommit)
+            maxCommit = c
+        })
+        rows.foreach(r => {
+          val c = r(3).asInstanceOf[String].toLong
+          if (c != maxCommit) {
+            val f = r(2).asInstanceOf[String].split("_")(0)
+            if (!fileToDeleteKeyMap.contains(f)) {
+              fileToDeleteKeyMap(f) = HashSet[String]()
+            }
+            fileToDeleteKeyMap(f).add(key)
           }
-          fileToDeleteKeyMap(f).add(key)
+        })
+      } else {
+        /*
+        This corresponds to the case where duplicates have been updated at least once.
+        Once updated, duplicates are bound to have same commit time unless forcefully modified.
+         */
+        val size = rows.size - 1
+        var i = 0
+        val loop = new Breaks
+        loop.breakable {

Review comment:
       It's better not use break here, `rows.init` also can get the rows will be delete.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org