You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2019/06/06 23:57:32 UTC

[kudu] 03/03: KUDU-2785: Add splitSizeBytes to the backup job

This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 5c87afd4f2160344d651948e90234ee512adcfd8
Author: Grant Henke <gr...@apache.org>
AuthorDate: Tue Jun 4 10:46:51 2019 -0500

    KUDU-2785: Add splitSizeBytes to the backup job
    
    This patch adds an experimental and hidden option
    to use the new scanner splitSizeBytes feature in the
    backup job.
    
    Change-Id: If6b8d02b71b1f463e4d0d9e04203d8edbd5e016b
    Reviewed-on: http://gerrit.cloudera.org:8080/13511
    Tested-by: Kudu Jenkins
    Reviewed-by: Mike Percy <mp...@apache.org>
---
 .../org/apache/kudu/backup/KuduBackupRDD.scala     |  4 +++
 .../scala/org/apache/kudu/backup/Options.scala     | 13 ++++++++-
 .../org/apache/kudu/backup/TestKuduBackup.scala    | 34 ++++++++++++++++++++++
 3 files changed, 50 insertions(+), 1 deletion(-)

diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala
index c9837ce..a9a7072 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala
@@ -57,6 +57,10 @@ class KuduBackupRDD private[kudu] (
       .prefetching(options.scanPrefetching)
       .keepAlivePeriodMs(options.keepAlivePeriodMs)
 
+    options.splitSizeBytes.foreach { size =>
+      builder.setSplitSizeBytes(size)
+    }
+
     // Set a hybrid time for the scan to ensure application consistency.
     val toMicros = TimeUnit.MILLISECONDS.toMicros(options.toMs)
     val toHTT =
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala
index 67cd261..1a8b3b9 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala
@@ -39,7 +39,8 @@ case class BackupOptions(
     scanPrefetching: Boolean = BackupOptions.DefaultScanPrefetching,
     keepAlivePeriodMs: Long = BackupOptions.DefaultKeepAlivePeriodMs,
     failOnFirstError: Boolean = BackupOptions.DefaultFailOnFirstError,
-    numParallelBackups: Int = BackupOptions.DefaultNumParallelBackups)
+    numParallelBackups: Int = BackupOptions.DefaultNumParallelBackups,
+    splitSizeBytes: Option[Long] = None)
 
 object BackupOptions {
   val DefaultForceFull: Boolean = false
@@ -54,6 +55,7 @@ object BackupOptions {
   val DefaultKeepAlivePeriodMs: Long = AsyncKuduClient.DEFAULT_KEEP_ALIVE_PERIOD_MS
   val DefaultFailOnFirstError: Boolean = false
   val DefaultNumParallelBackups = 1
+  val DefaultSplitSizeBytes: Option[Long] = None
 
   // We use the program name to make the help output show a the spark invocation required.
   val ClassName: String = KuduBackup.getClass.getCanonicalName.dropRight(1) // Remove trailing `$`
@@ -146,6 +148,15 @@ object BackupOptions {
         .hidden()
         .optional()
 
+      opt[Long]("splitSizeBytes")
+        .action((v, o) => o.copy(splitSizeBytes = Some(v)))
+        .text(
+          "Sets the target number of bytes per spark task. If set, tablet's primary key range " +
+            "will be split to generate uniform task sizes instead of the default of 1 task per " +
+            "tablet. This option is experimental.")
+        .hidden()
+        .optional()
+
       help("help").text("prints this usage text")
 
       arg[String]("<table>...")
diff --git a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
index 566e630..2e278bb 100644
--- a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
+++ b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
@@ -29,8 +29,10 @@ import org.apache.kudu.Schema
 import org.apache.kudu.Type
 import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder
 import org.apache.kudu.spark.kudu.SparkListenerUtil.withJobDescriptionCollector
+import org.apache.kudu.spark.kudu.SparkListenerUtil.withJobTaskCounter
 import org.apache.kudu.spark.kudu._
 import org.apache.kudu.test.CapturingLogAppender
+import org.apache.kudu.test.KuduTestHarness.TabletServerConfig
 import org.apache.kudu.test.RandomUtils
 import org.apache.kudu.util.DataGenerator.DataGeneratorBuilder
 import org.apache.kudu.util.HybridTimeUtil
@@ -323,6 +325,38 @@ class TestKuduBackup extends KuduTestSuite {
     }
   }
 
+  @TabletServerConfig(
+    flags = Array(
+      "--flush_threshold_mb=1",
+      "--flush_threshold_secs=1",
+      // Disable rowset compact to prevent DRSs being merged because they are too small.
+      "--enable_rowset_compaction=false"
+    ))
+  @Test
+  def testBackupWithSplitSizeBytes() {
+    // Create a table with a single partition.
+    val tableName = "split-size-table"
+    val options = new CreateTableOptions().setRangePartitionColumns(List("key").asJava)
+    val table = kuduClient.createTable(tableName, schema, options)
+
+    // Insert enough data into the test table so we can split it.
+    val rowCount = 1000
+    upsertRowsWithRowDataSize(table, rowCount, 32 * 1024)
+
+    // Wait for mrs flushed.
+    Thread.sleep(5 * 1000)
+
+    // Run a backup job with custom splitSizeBytes and count the tasks.
+    val backupOptions = createBackupOptions(Seq(tableName)).copy(splitSizeBytes = Some(1024))
+    val actualNumTasks = withJobTaskCounter(ss.sparkContext) { () =>
+      assertEquals(0, runBackup(backupOptions))
+    }
+    validateBackup(backupOptions, rowCount, false)
+
+    // Verify there were more tasks than there are partitions.
+    assertTrue(actualNumTasks > 1)
+  }
+
   @Test
   def testBackupAndRestoreTableWithManyPartitions(): Unit = {
     val kNumPartitions = 100