You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2019/04/23 17:15:31 UTC

[kudu] 03/03: [backup] Fix fromMs override option

This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 2b023b979358cf3716c3b282ff21879bd97cec4a
Author: Grant Henke <gr...@apache.org>
AuthorDate: Mon Apr 22 21:20:00 2019 -0500

    [backup] Fix fromMs override option
    
    Fix the logic for deciding when to lookup old backups.
    Previously the fromMs argument wasn’t being handled
    correctly.
    
    Additionally contains a small change to avoid converting
    back and forth from Path to HPath multiple times.
    
    Change-Id: I8a3abc47dd9d1441ba269dfc9405691f79e6615d
    Reviewed-on: http://gerrit.cloudera.org:8080/13080
    Tested-by: Kudu Jenkins
    Reviewed-by: Will Berkeley <wd...@gmail.com>
---
 .../scala/org/apache/kudu/backup/KuduBackup.scala  | 14 +++---
 .../scala/org/apache/kudu/backup/Options.scala     |  7 +--
 .../scala/org/apache/kudu/backup/SessionIO.scala   |  7 ++-
 .../org/apache/kudu/backup/TestKuduBackup.scala    | 50 ++++++++++++++++++----
 4 files changed, 58 insertions(+), 20 deletions(-)

diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala
index a8d89f6..445bef0 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala
@@ -56,22 +56,26 @@ object KuduBackup {
 
       // Unless we are forcing a full backup or a fromMs was set, find the previous backup and
       // use the `to_ms` metadata as the `from_ms` time for this backup.
-      if (!tableOptions.forceFull || tableOptions.fromMs != 0) {
-        log.info("No fromMs option set, looking for a previous backup.")
+      if (tableOptions.forceFull) {
+        log.info("Performing a full backup, forceFull was set to true")
+      } else if (tableOptions.fromMs == BackupOptions.DefaultFromMS) {
+        log.info(s"Performing an incremental backup, fromMs was set to ${tableOptions.fromMs}")
+      } else {
+        log.info("Looking for a previous backup, forceFull or fromMs options are not set.")
         val graph = io.readBackupGraph(tableName)
         if (graph.hasFullBackup) {
           val base = graph.backupBase
           log.info(s"Setting fromMs to ${base.metadata.getToMs} from backup in path: ${base.path}")
           tableOptions = tableOptions.copy(fromMs = base.metadata.getToMs)
         } else {
-          log.info("No full backup was found. Starting a full backup.")
-          tableOptions = tableOptions.copy(fromMs = 0)
+          log.info("No previous backup was found. Starting a full backup.")
+          tableOptions = tableOptions.copy(forceFull = true)
         }
       }
       val rdd = new KuduBackupRDD(table, tableOptions, context, session.sparkContext)
       val df =
         session.sqlContext
-          .createDataFrame(rdd, io.dataSchema(table.getSchema, options.isIncremental))
+          .createDataFrame(rdd, io.dataSchema(table.getSchema, tableOptions.isIncremental))
 
       // Write the data to the backup path.
       // The backup path contains the timestampMs and should not already exist.
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala
index a57931f..2874eac 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala
@@ -39,7 +39,7 @@ case class BackupOptions(
     kuduMasterAddresses: String = InetAddress.getLocalHost.getCanonicalHostName,
     toMs: Long = System.currentTimeMillis(),
     forceFull: Boolean = BackupOptions.DefaultForceFull,
-    fromMs: Long = 0,
+    fromMs: Long = BackupOptions.DefaultFromMS,
     format: String = BackupOptions.DefaultFormat,
     scanBatchSize: Int = BackupOptions.DefaultScanBatchSize,
     scanRequestTimeoutMs: Long = BackupOptions.DefaultScanRequestTimeoutMs,
@@ -47,14 +47,15 @@ case class BackupOptions(
     keepAlivePeriodMs: Long = BackupOptions.DefaultKeepAlivePeriodMs)
     extends CommonOptions {
 
-  // If not forcing a full backup and fromMs is not zero, this is an incremental backup.
+  // If not forcing a full backup and fromMs is not set, this is an incremental backup.
   def isIncremental: Boolean = {
-    !forceFull && fromMs != 0
+    !forceFull && fromMs != BackupOptions.DefaultFromMS
   }
 }
 
 object BackupOptions {
   val DefaultForceFull: Boolean = false
+  val DefaultFromMS: Long = 0
   val DefaultFormat: String = "parquet"
   val DefaultScanBatchSize: Int = 1024 * 1024 * 20 // 20 MiB
   val DefaultScanRequestTimeoutMs: Long =
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/SessionIO.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/SessionIO.scala
index 8977174..40f513f 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/SessionIO.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/SessionIO.scala
@@ -161,7 +161,7 @@ class SessionIO(val session: SparkSession, options: CommonOptions) {
         if (file.isDirectory) {
           val metadataHPath = new HPath(file.getPath, MetadataFileName)
           if (fs.exists(metadataHPath)) {
-            val metadata = readTableMetadata(Paths.get(metadataHPath.toString))
+            val metadata = readTableMetadata(metadataHPath)
             results += ((Paths.get(file.getPath.toString), metadata))
           }
         }
@@ -176,9 +176,8 @@ class SessionIO(val session: SparkSession, options: CommonOptions) {
    * @param metadataPath the path to the metadata file.
    * @return the deserialized table metadata.
    */
-  private def readTableMetadata(metadataPath: Path): TableMetadataPB = {
-    val hPath = new HPath(metadataPath.toString)
-    val in = new InputStreamReader(fs.open(hPath), StandardCharsets.UTF_8)
+  private def readTableMetadata(metadataPath: HPath): TableMetadataPB = {
+    val in = new InputStreamReader(fs.open(metadataPath), StandardCharsets.UTF_8)
     val json = CharStreams.toString(in)
     in.close()
     val builder = TableMetadataPB.newBuilder()
diff --git a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
index a707702..8e1c6a2 100644
--- a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
+++ b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
@@ -19,6 +19,7 @@ package org.apache.kudu.backup
 import java.nio.file.Files
 import java.nio.file.Path
 import java.util
+import java.util.concurrent.TimeUnit
 
 import com.google.common.base.Objects
 import org.apache.commons.io.FileUtils
@@ -27,6 +28,7 @@ import org.apache.kudu.client._
 import org.apache.kudu.ColumnSchema
 import org.apache.kudu.Schema
 import org.apache.kudu.spark.kudu._
+import org.apache.kudu.test.CapturingLogAppender
 import org.apache.kudu.test.RandomUtils
 import org.apache.kudu.util.DataGenerator.DataGeneratorBuilder
 import org.apache.kudu.util.HybridTimeUtil
@@ -74,12 +76,12 @@ class TestKuduBackup extends KuduTestSuite {
     val rootDir = Files.createTempDirectory("backup")
     doBackup(rootDir, Seq(tableName)) // Full backup.
     insertRows(table, 100, 100) // Insert more data.
-    doBackup(rootDir, Seq(tableName), incremental = true) // Incremental backup.
+    doBackup(rootDir, Seq(tableName)) // Incremental backup.
     // Delete rows that span the full and incremental backup.
     Range(50, 150).foreach { i =>
       deleteRow(i)
     }
-    doBackup(rootDir, Seq(tableName), incremental = true) // Incremental backup.
+    doBackup(rootDir, Seq(tableName)) // Incremental backup.
     doRestore(rootDir, Seq(tableName)) // Restore all the backups.
     FileUtils.deleteDirectory(rootDir.toFile)
 
@@ -89,6 +91,28 @@ class TestKuduBackup extends KuduTestSuite {
   }
 
   @Test
+  def TestForceIncrementalBackup() {
+    insertRows(table, 100) // Insert data into the default test table.
+    val beforeMs = getPropagatedTimestampMs
+    insertRows(table, 100, 100) // Insert more data.
+
+    val rootDir = Files.createTempDirectory("backup")
+
+    // Capture the logs to validate job internals.
+    val logs = new CapturingLogAppender()
+
+    // Force an incremental backup without a full backup.
+    // It will use a diff scan and won't check the existing dependency graph.
+    val handle = logs.attach()
+    doBackup(rootDir, Seq(tableName), fromMs = beforeMs) // Incremental backup.
+    handle.close()
+
+    assertTrue(Files.isDirectory(rootDir))
+    assertEquals(1, rootDir.toFile.list().length)
+    assertTrue(logs.getAppendedText.contains("fromMs was set"))
+  }
+
+  @Test
   def testSimpleBackupAndRestoreWithSpecialCharacters() {
     // Use an Impala-style table name to verify url encoding/decoding of the table name works.
     val impalaTableName = "impala::default.test"
@@ -294,7 +318,10 @@ class TestKuduBackup extends KuduTestSuite {
     FileUtils.deleteDirectory(rootDir.toFile)
   }
 
-  def doBackup(rootDir: Path, tableNames: Seq[String], incremental: Boolean = false): Unit = {
+  def doBackup(
+      rootDir: Path,
+      tableNames: Seq[String],
+      fromMs: Long = BackupOptions.DefaultFromMS): Unit = {
     val nowMs = System.currentTimeMillis()
 
     // Log the timestamps to simplify flaky debugging.
@@ -310,11 +337,11 @@ class TestKuduBackup extends KuduTestSuite {
     // millisecond value as nowMs (after truncating the micros) the records inserted in the
     // microseconds after truncation could be unread.
     val backupOptions = new BackupOptions(
-      tableNames,
-      rootDir.toUri.toString,
-      harness.getMasterAddressesAsString,
-      nowMs + 1,
-      incremental
+      tables = tableNames,
+      rootPath = rootDir.toUri.toString,
+      kuduMasterAddresses = harness.getMasterAddressesAsString,
+      toMs = nowMs + 1,
+      fromMs = fromMs
     )
     KuduBackup.run(backupOptions, ss)
   }
@@ -324,4 +351,11 @@ class TestKuduBackup extends KuduTestSuite {
       new RestoreOptions(tableNames, rootDir.toUri.toString, harness.getMasterAddressesAsString)
     KuduRestore.run(restoreOptions, ss)
   }
+
+  private def getPropagatedTimestampMs: Long = {
+    val propagatedTimestamp = harness.getClient.getLastPropagatedTimestamp
+    val physicalTimeMicros =
+      HybridTimeUtil.HTTimestampToPhysicalAndLogical(propagatedTimestamp).head
+    TimeUnit.MILLISECONDS.convert(physicalTimeMicros, TimeUnit.MICROSECONDS)
+  }
 }