You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2020/11/09 15:07:55 UTC
[kudu] 02/02: KUDU-1563 Use DELETE_IGNORE in KuduRestore job
This is an automated email from the ASF dual-hosted git repository.
granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 5faf79726605e3a5d8b33bd18ce94c8c09241d3f
Author: Grant Henke <gr...@apache.org>
AuthorDate: Fri Oct 30 11:35:04 2020 -0500
KUDU-1563 Use DELETE_IGNORE in KuduRestore job
This patch changes the KuduRestore job to use DELETE_IGNORE
operations instead of DELETE when the cluster supports ignore
operations.
`session.setIgnoreAllNotFoundRows(true)` is retained to support
falling back to DELETE operations for backward compatibility.
Change-Id: Ib6f6d5a31be77630e79ff1566e796eb5183a5d22
Reviewed-on: http://gerrit.cloudera.org:8080/16683
Reviewed-by: Attila Bukor <ab...@apache.org>
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Tested-by: Grant Henke <gr...@apache.org>
---
.../main/scala/org/apache/kudu/backup/KuduRestore.scala | 13 ++++++++++---
.../scala/org/apache/kudu/backup/TestKuduBackup.scala | 17 ++++++++++++++++-
2 files changed, 26 insertions(+), 4 deletions(-)
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
index 6549cd6..ef01a8d 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
@@ -91,6 +91,7 @@ object KuduRestore {
.load(backup.path.toString)
// Default the the row action column with a value of "UPSERT" so that the
// rows from a full backup, which don't have a row action, are upserted.
+ // TODO(ghenke): Consider using INSERT_IGNORE for full backups.
.na
.fill(RowAction.UPSERT.getValue, Seq(rowActionCol))
@@ -106,8 +107,8 @@ object KuduRestore {
val session = context.syncClient.newSession
session.setFlushMode(FlushMode.AUTO_FLUSH_BACKGROUND)
// In the case of task retries we need to ignore NotFound errors for deleted rows.
- // TODO(KUDU-1563): Implement server side ignore capabilities to improve performance
- // and reliability.
+ // This can't occur if DELETE_IGNORE is used, but still needs to be set in the case
+ // DELETE is used for backwards compatibility.
session.setIgnoreAllNotFoundRows(true)
try for (internalRow <- internalRows) {
// Convert the InternalRows to Rows.
@@ -120,7 +121,13 @@ object KuduRestore {
// Generate an operation based on the row action.
val operation = rowAction match {
case RowAction.UPSERT => table.newUpsert()
- case RowAction.DELETE => table.newDelete()
+ case RowAction.DELETE => {
+ if (context.supportsIgnoreOperations) {
+ table.newDeleteIgnore()
+ } else {
+ table.newDelete()
+ }
+ }
case _ => throw new IllegalStateException(s"Unsupported RowAction: $rowAction")
}
// Convert the Spark row to a partial row and set it on the operation.
diff --git a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
index da8fe58..817875e 100644
--- a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
+++ b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
@@ -32,8 +32,9 @@ import org.apache.kudu.spark.kudu.SparkListenerUtil.withJobDescriptionCollector
import org.apache.kudu.spark.kudu.SparkListenerUtil.withJobTaskCounter
import org.apache.kudu.spark.kudu._
import org.apache.kudu.test.CapturingLogAppender
-import org.apache.kudu.test.KuduTestHarness.TabletServerConfig
+import org.apache.kudu.test.KuduTestHarness
import org.apache.kudu.test.RandomUtils
+import org.apache.kudu.test.KuduTestHarness.TabletServerConfig
import org.apache.kudu.util.DataGenerator.DataGeneratorBuilder
import org.apache.kudu.util.HybridTimeUtil
import org.apache.kudu.util.SchemaGenerator.SchemaGeneratorBuilder
@@ -591,6 +592,20 @@ class TestKuduBackup extends KuduTestSuite {
@Test
def testDeleteIgnore(): Unit = {
+ doDeleteIgnoreTest()
+ }
+
+ /**
+ * Identical to the above test, but exercising the old session based delete ignore operations,
+ * ensuring we functionally support the same semantics.
+ */
+ @Test
+ @KuduTestHarness.MasterServerConfig(flags = Array("--master_support_ignore_operations=false"))
+ def testLegacyDeleteIgnore(): Unit = {
+ doDeleteIgnoreTest()
+ }
+
+ def doDeleteIgnoreTest(): Unit = {
insertRows(table, 100) // Insert data into the default test table.
// Run and validate initial backup.