You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2019/06/06 23:57:32 UTC
[kudu] 03/03: KUDU-2785: Add splitSizeBytes to the backup job
This is an automated email from the ASF dual-hosted git repository.
granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 5c87afd4f2160344d651948e90234ee512adcfd8
Author: Grant Henke <gr...@apache.org>
AuthorDate: Tue Jun 4 10:46:51 2019 -0500
KUDU-2785: Add splitSizeBytes to the backup job
This patch adds an experimental and hidden option
to use the new scanner splitSizeBytes feature in the
backup job.
Change-Id: If6b8d02b71b1f463e4d0d9e04203d8edbd5e016b
Reviewed-on: http://gerrit.cloudera.org:8080/13511
Tested-by: Kudu Jenkins
Reviewed-by: Mike Percy <mp...@apache.org>
---
.../org/apache/kudu/backup/KuduBackupRDD.scala | 4 +++
.../scala/org/apache/kudu/backup/Options.scala | 13 ++++++++-
.../org/apache/kudu/backup/TestKuduBackup.scala | 34 ++++++++++++++++++++++
3 files changed, 50 insertions(+), 1 deletion(-)
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala
index c9837ce..a9a7072 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala
@@ -57,6 +57,10 @@ class KuduBackupRDD private[kudu] (
.prefetching(options.scanPrefetching)
.keepAlivePeriodMs(options.keepAlivePeriodMs)
+ options.splitSizeBytes.foreach { size =>
+ builder.setSplitSizeBytes(size)
+ }
+
// Set a hybrid time for the scan to ensure application consistency.
val toMicros = TimeUnit.MILLISECONDS.toMicros(options.toMs)
val toHTT =
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala
index 67cd261..1a8b3b9 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala
@@ -39,7 +39,8 @@ case class BackupOptions(
scanPrefetching: Boolean = BackupOptions.DefaultScanPrefetching,
keepAlivePeriodMs: Long = BackupOptions.DefaultKeepAlivePeriodMs,
failOnFirstError: Boolean = BackupOptions.DefaultFailOnFirstError,
- numParallelBackups: Int = BackupOptions.DefaultNumParallelBackups)
+ numParallelBackups: Int = BackupOptions.DefaultNumParallelBackups,
+ splitSizeBytes: Option[Long] = None)
object BackupOptions {
val DefaultForceFull: Boolean = false
@@ -54,6 +55,7 @@ object BackupOptions {
val DefaultKeepAlivePeriodMs: Long = AsyncKuduClient.DEFAULT_KEEP_ALIVE_PERIOD_MS
val DefaultFailOnFirstError: Boolean = false
val DefaultNumParallelBackups = 1
+ val DefaultSplitSizeBytes: Option[Long] = None
// We use the program name to make the help output show a the spark invocation required.
val ClassName: String = KuduBackup.getClass.getCanonicalName.dropRight(1) // Remove trailing `$`
@@ -146,6 +148,15 @@ object BackupOptions {
.hidden()
.optional()
+ opt[Long]("splitSizeBytes")
+ .action((v, o) => o.copy(splitSizeBytes = Some(v)))
+ .text(
+ "Sets the target number of bytes per spark task. If set, tablet's primary key range " +
+ "will be split to generate uniform task sizes instead of the default of 1 task per " +
+ "tablet. This option is experimental.")
+ .hidden()
+ .optional()
+
help("help").text("prints this usage text")
arg[String]("<table>...")
diff --git a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
index 566e630..2e278bb 100644
--- a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
+++ b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
@@ -29,8 +29,10 @@ import org.apache.kudu.Schema
import org.apache.kudu.Type
import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder
import org.apache.kudu.spark.kudu.SparkListenerUtil.withJobDescriptionCollector
+import org.apache.kudu.spark.kudu.SparkListenerUtil.withJobTaskCounter
import org.apache.kudu.spark.kudu._
import org.apache.kudu.test.CapturingLogAppender
+import org.apache.kudu.test.KuduTestHarness.TabletServerConfig
import org.apache.kudu.test.RandomUtils
import org.apache.kudu.util.DataGenerator.DataGeneratorBuilder
import org.apache.kudu.util.HybridTimeUtil
@@ -323,6 +325,38 @@ class TestKuduBackup extends KuduTestSuite {
}
}
+ @TabletServerConfig(
+ flags = Array(
+ "--flush_threshold_mb=1",
+ "--flush_threshold_secs=1",
+ // Disable rowset compact to prevent DRSs being merged because they are too small.
+ "--enable_rowset_compaction=false"
+ ))
+ @Test
+ def testBackupWithSplitSizeBytes() {
+ // Create a table with a single partition.
+ val tableName = "split-size-table"
+ val options = new CreateTableOptions().setRangePartitionColumns(List("key").asJava)
+ val table = kuduClient.createTable(tableName, schema, options)
+
+ // Insert enough data into the test table so we can split it.
+ val rowCount = 1000
+ upsertRowsWithRowDataSize(table, rowCount, 32 * 1024)
+
+ // Wait for mrs flushed.
+ Thread.sleep(5 * 1000)
+
+ // Run a backup job with custom splitSizeBytes and count the tasks.
+ val backupOptions = createBackupOptions(Seq(tableName)).copy(splitSizeBytes = Some(1024))
+ val actualNumTasks = withJobTaskCounter(ss.sparkContext) { () =>
+ assertEquals(0, runBackup(backupOptions))
+ }
+ validateBackup(backupOptions, rowCount, false)
+
+ // Verify there were more tasks than there are partitions.
+ assertTrue(actualNumTasks > 1)
+ }
+
@Test
def testBackupAndRestoreTableWithManyPartitions(): Unit = {
val kNumPartitions = 100