You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2020/11/09 15:07:55 UTC

[kudu] 02/02: KUDU-1563 Use DELETE_IGNORE in KuduRestore job

This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 5faf79726605e3a5d8b33bd18ce94c8c09241d3f
Author: Grant Henke <gr...@apache.org>
AuthorDate: Fri Oct 30 11:35:04 2020 -0500

    KUDU-1563 Use DELETE_IGNORE in KuduRestore job
    
    This patch changes the KuduRestore job to use DELETE_IGNORE
    operations instead of DELETE when the cluster supports ignore
    operations.
    
    `session.setIgnoreAllNotFoundRows(true)` is retained to support
    falling back to DELETE operations for backward compatibility.
    
    Change-Id: Ib6f6d5a31be77630e79ff1566e796eb5183a5d22
    Reviewed-on: http://gerrit.cloudera.org:8080/16683
    Reviewed-by: Attila Bukor <ab...@apache.org>
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
    Tested-by: Grant Henke <gr...@apache.org>
---
 .../main/scala/org/apache/kudu/backup/KuduRestore.scala | 13 ++++++++++---
 .../scala/org/apache/kudu/backup/TestKuduBackup.scala   | 17 ++++++++++++++++-
 2 files changed, 26 insertions(+), 4 deletions(-)

diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
index 6549cd6..ef01a8d 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
@@ -91,6 +91,7 @@ object KuduRestore {
         .load(backup.path.toString)
         // Default the the row action column with a value of "UPSERT" so that the
         // rows from a full backup, which don't have a row action, are upserted.
+        // TODO(ghenke): Consider using INSERT_IGNORE for full backups.
         .na
         .fill(RowAction.UPSERT.getValue, Seq(rowActionCol))
 
@@ -106,8 +107,8 @@ object KuduRestore {
         val session = context.syncClient.newSession
         session.setFlushMode(FlushMode.AUTO_FLUSH_BACKGROUND)
         // In the case of task retries we need to ignore NotFound errors for deleted rows.
-        // TODO(KUDU-1563): Implement server side ignore capabilities to improve performance
-        //  and reliability.
+        // This can't occur if DELETE_IGNORE is used, but still needs to be set in the case
+        // DELETE is used for backwards compatibility.
         session.setIgnoreAllNotFoundRows(true)
         try for (internalRow <- internalRows) {
           // Convert the InternalRows to Rows.
@@ -120,7 +121,13 @@ object KuduRestore {
           // Generate an operation based on the row action.
           val operation = rowAction match {
             case RowAction.UPSERT => table.newUpsert()
-            case RowAction.DELETE => table.newDelete()
+            case RowAction.DELETE => {
+              if (context.supportsIgnoreOperations) {
+                table.newDeleteIgnore()
+              } else {
+                table.newDelete()
+              }
+            }
             case _ => throw new IllegalStateException(s"Unsupported RowAction: $rowAction")
           }
           // Convert the Spark row to a partial row and set it on the operation.
diff --git a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
index da8fe58..817875e 100644
--- a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
+++ b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
@@ -32,8 +32,9 @@ import org.apache.kudu.spark.kudu.SparkListenerUtil.withJobDescriptionCollector
 import org.apache.kudu.spark.kudu.SparkListenerUtil.withJobTaskCounter
 import org.apache.kudu.spark.kudu._
 import org.apache.kudu.test.CapturingLogAppender
-import org.apache.kudu.test.KuduTestHarness.TabletServerConfig
+import org.apache.kudu.test.KuduTestHarness
 import org.apache.kudu.test.RandomUtils
+import org.apache.kudu.test.KuduTestHarness.TabletServerConfig
 import org.apache.kudu.util.DataGenerator.DataGeneratorBuilder
 import org.apache.kudu.util.HybridTimeUtil
 import org.apache.kudu.util.SchemaGenerator.SchemaGeneratorBuilder
@@ -591,6 +592,20 @@ class TestKuduBackup extends KuduTestSuite {
 
   @Test
   def testDeleteIgnore(): Unit = {
+    doDeleteIgnoreTest()
+  }
+
+  /**
+   * Identical to the above test, but exercising the old session based delete ignore operations,
+   * ensuring we functionally support the same semantics.
+   */
+  @Test
+  @KuduTestHarness.MasterServerConfig(flags = Array("--master_support_ignore_operations=false"))
+  def testLegacyDeleteIgnore(): Unit = {
+    doDeleteIgnoreTest()
+  }
+
+  def doDeleteIgnoreTest(): Unit = {
     insertRows(table, 100) // Insert data into the default test table.
 
     // Run and validate initial backup.