You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2019/04/23 17:15:31 UTC
[kudu] 03/03: [backup] Fix fromMs override option
This is an automated email from the ASF dual-hosted git repository.
granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 2b023b979358cf3716c3b282ff21879bd97cec4a
Author: Grant Henke <gr...@apache.org>
AuthorDate: Mon Apr 22 21:20:00 2019 -0500
[backup] Fix fromMs override option
Fix the logic for deciding when to lookup old backups.
Previously the fromMs argument wasn’t being handled
correctly.
Additionally contains a small change to avoid converting
back and forth from Path to HPath multiple times.
Change-Id: I8a3abc47dd9d1441ba269dfc9405691f79e6615d
Reviewed-on: http://gerrit.cloudera.org:8080/13080
Tested-by: Kudu Jenkins
Reviewed-by: Will Berkeley <wd...@gmail.com>
---
.../scala/org/apache/kudu/backup/KuduBackup.scala | 14 +++---
.../scala/org/apache/kudu/backup/Options.scala | 7 +--
.../scala/org/apache/kudu/backup/SessionIO.scala | 7 ++-
.../org/apache/kudu/backup/TestKuduBackup.scala | 50 ++++++++++++++++++----
4 files changed, 58 insertions(+), 20 deletions(-)
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala
index a8d89f6..445bef0 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala
@@ -56,22 +56,26 @@ object KuduBackup {
// Unless we are forcing a full backup or a fromMs was set, find the previous backup and
// use the `to_ms` metadata as the `from_ms` time for this backup.
- if (!tableOptions.forceFull || tableOptions.fromMs != 0) {
- log.info("No fromMs option set, looking for a previous backup.")
+ if (tableOptions.forceFull) {
+ log.info("Performing a full backup, forceFull was set to true")
+ } else if (tableOptions.fromMs == BackupOptions.DefaultFromMS) {
+ log.info(s"Performing an incremental backup, fromMs was set to ${tableOptions.fromMs}")
+ } else {
+ log.info("Looking for a previous backup, forceFull or fromMs options are not set.")
val graph = io.readBackupGraph(tableName)
if (graph.hasFullBackup) {
val base = graph.backupBase
log.info(s"Setting fromMs to ${base.metadata.getToMs} from backup in path: ${base.path}")
tableOptions = tableOptions.copy(fromMs = base.metadata.getToMs)
} else {
- log.info("No full backup was found. Starting a full backup.")
- tableOptions = tableOptions.copy(fromMs = 0)
+ log.info("No previous backup was found. Starting a full backup.")
+ tableOptions = tableOptions.copy(forceFull = true)
}
}
val rdd = new KuduBackupRDD(table, tableOptions, context, session.sparkContext)
val df =
session.sqlContext
- .createDataFrame(rdd, io.dataSchema(table.getSchema, options.isIncremental))
+ .createDataFrame(rdd, io.dataSchema(table.getSchema, tableOptions.isIncremental))
// Write the data to the backup path.
// The backup path contains the timestampMs and should not already exist.
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala
index a57931f..2874eac 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala
@@ -39,7 +39,7 @@ case class BackupOptions(
kuduMasterAddresses: String = InetAddress.getLocalHost.getCanonicalHostName,
toMs: Long = System.currentTimeMillis(),
forceFull: Boolean = BackupOptions.DefaultForceFull,
- fromMs: Long = 0,
+ fromMs: Long = BackupOptions.DefaultFromMS,
format: String = BackupOptions.DefaultFormat,
scanBatchSize: Int = BackupOptions.DefaultScanBatchSize,
scanRequestTimeoutMs: Long = BackupOptions.DefaultScanRequestTimeoutMs,
@@ -47,14 +47,15 @@ case class BackupOptions(
keepAlivePeriodMs: Long = BackupOptions.DefaultKeepAlivePeriodMs)
extends CommonOptions {
- // If not forcing a full backup and fromMs is not zero, this is an incremental backup.
+ // If not forcing a full backup and fromMs is not set, this is an incremental backup.
def isIncremental: Boolean = {
- !forceFull && fromMs != 0
+ !forceFull && fromMs != BackupOptions.DefaultFromMS
}
}
object BackupOptions {
val DefaultForceFull: Boolean = false
+ val DefaultFromMS: Long = 0
val DefaultFormat: String = "parquet"
val DefaultScanBatchSize: Int = 1024 * 1024 * 20 // 20 MiB
val DefaultScanRequestTimeoutMs: Long =
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/SessionIO.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/SessionIO.scala
index 8977174..40f513f 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/SessionIO.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/SessionIO.scala
@@ -161,7 +161,7 @@ class SessionIO(val session: SparkSession, options: CommonOptions) {
if (file.isDirectory) {
val metadataHPath = new HPath(file.getPath, MetadataFileName)
if (fs.exists(metadataHPath)) {
- val metadata = readTableMetadata(Paths.get(metadataHPath.toString))
+ val metadata = readTableMetadata(metadataHPath)
results += ((Paths.get(file.getPath.toString), metadata))
}
}
@@ -176,9 +176,8 @@ class SessionIO(val session: SparkSession, options: CommonOptions) {
* @param metadataPath the path to the metadata file.
* @return the deserialized table metadata.
*/
- private def readTableMetadata(metadataPath: Path): TableMetadataPB = {
- val hPath = new HPath(metadataPath.toString)
- val in = new InputStreamReader(fs.open(hPath), StandardCharsets.UTF_8)
+ private def readTableMetadata(metadataPath: HPath): TableMetadataPB = {
+ val in = new InputStreamReader(fs.open(metadataPath), StandardCharsets.UTF_8)
val json = CharStreams.toString(in)
in.close()
val builder = TableMetadataPB.newBuilder()
diff --git a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
index a707702..8e1c6a2 100644
--- a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
+++ b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
@@ -19,6 +19,7 @@ package org.apache.kudu.backup
import java.nio.file.Files
import java.nio.file.Path
import java.util
+import java.util.concurrent.TimeUnit
import com.google.common.base.Objects
import org.apache.commons.io.FileUtils
@@ -27,6 +28,7 @@ import org.apache.kudu.client._
import org.apache.kudu.ColumnSchema
import org.apache.kudu.Schema
import org.apache.kudu.spark.kudu._
+import org.apache.kudu.test.CapturingLogAppender
import org.apache.kudu.test.RandomUtils
import org.apache.kudu.util.DataGenerator.DataGeneratorBuilder
import org.apache.kudu.util.HybridTimeUtil
@@ -74,12 +76,12 @@ class TestKuduBackup extends KuduTestSuite {
val rootDir = Files.createTempDirectory("backup")
doBackup(rootDir, Seq(tableName)) // Full backup.
insertRows(table, 100, 100) // Insert more data.
- doBackup(rootDir, Seq(tableName), incremental = true) // Incremental backup.
+ doBackup(rootDir, Seq(tableName)) // Incremental backup.
// Delete rows that span the full and incremental backup.
Range(50, 150).foreach { i =>
deleteRow(i)
}
- doBackup(rootDir, Seq(tableName), incremental = true) // Incremental backup.
+ doBackup(rootDir, Seq(tableName)) // Incremental backup.
doRestore(rootDir, Seq(tableName)) // Restore all the backups.
FileUtils.deleteDirectory(rootDir.toFile)
@@ -89,6 +91,28 @@ class TestKuduBackup extends KuduTestSuite {
}
@Test
+ def TestForceIncrementalBackup() {
+ insertRows(table, 100) // Insert data into the default test table.
+ val beforeMs = getPropagatedTimestampMs
+ insertRows(table, 100, 100) // Insert more data.
+
+ val rootDir = Files.createTempDirectory("backup")
+
+ // Capture the logs to validate job internals.
+ val logs = new CapturingLogAppender()
+
+ // Force an incremental backup without a full backup.
+ // It will use a diff scan and won't check the existing dependency graph.
+ val handle = logs.attach()
+ doBackup(rootDir, Seq(tableName), fromMs = beforeMs) // Incremental backup.
+ handle.close()
+
+ assertTrue(Files.isDirectory(rootDir))
+ assertEquals(1, rootDir.toFile.list().length)
+ assertTrue(logs.getAppendedText.contains("fromMs was set"))
+ }
+
+ @Test
def testSimpleBackupAndRestoreWithSpecialCharacters() {
// Use an Impala-style table name to verify url encoding/decoding of the table name works.
val impalaTableName = "impala::default.test"
@@ -294,7 +318,10 @@ class TestKuduBackup extends KuduTestSuite {
FileUtils.deleteDirectory(rootDir.toFile)
}
- def doBackup(rootDir: Path, tableNames: Seq[String], incremental: Boolean = false): Unit = {
+ def doBackup(
+ rootDir: Path,
+ tableNames: Seq[String],
+ fromMs: Long = BackupOptions.DefaultFromMS): Unit = {
val nowMs = System.currentTimeMillis()
// Log the timestamps to simplify flaky debugging.
@@ -310,11 +337,11 @@ class TestKuduBackup extends KuduTestSuite {
// millisecond value as nowMs (after truncating the micros) the records inserted in the
// microseconds after truncation could be unread.
val backupOptions = new BackupOptions(
- tableNames,
- rootDir.toUri.toString,
- harness.getMasterAddressesAsString,
- nowMs + 1,
- incremental
+ tables = tableNames,
+ rootPath = rootDir.toUri.toString,
+ kuduMasterAddresses = harness.getMasterAddressesAsString,
+ toMs = nowMs + 1,
+ fromMs = fromMs
)
KuduBackup.run(backupOptions, ss)
}
@@ -324,4 +351,11 @@ class TestKuduBackup extends KuduTestSuite {
new RestoreOptions(tableNames, rootDir.toUri.toString, harness.getMasterAddressesAsString)
KuduRestore.run(restoreOptions, ss)
}
+
+ private def getPropagatedTimestampMs: Long = {
+ val propagatedTimestamp = harness.getClient.getLastPropagatedTimestamp
+ val physicalTimeMicros =
+ HybridTimeUtil.HTTimestampToPhysicalAndLogical(propagatedTimestamp).head
+ TimeUnit.MILLISECONDS.convert(physicalTimeMicros, TimeUnit.MICROSECONDS)
+ }
}