You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2019/04/16 21:37:19 UTC
[kudu] branch master updated: [backup] Add initial incremental
backup/restore support
This is an automated email from the ASF dual-hosted git repository.
granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 8a85813 [backup] Add initial incremental backup/restore support
8a85813 is described below
commit 8a85813913b9c258cbaadb0b65cef508bdee30fd
Author: Grant Henke <gr...@apache.org>
AuthorDate: Mon Mar 25 14:10:30 2019 -0500
[backup] Add initial incremental backup/restore support
This patch adds initial support for incremental backup
and restore.
A high level overview of the changes in this patch is:
- Added a version to the TableMetadata for future use.
- Broke out io/layout logic to a SessionIO class so it
could be easily shared.
- Unified the BackupOptions and RestoreOptions
so common options could be shared.
- Introduced a BackupGraph class to handle chaining
together backups for backup and restore jobs.
- Enhanced the BackupRDD to output an additional
RowAction byte column on backup and restore.
- Enhanced the restore job to use the new RowAction
column and translate them into operations for
incremental restore jobs.
- Added the ability to restore to a given “time” on a
per backup basis.
- Added example usage docs to the Backup and Restore
classes.
- Added hasIsDeleted to RowResult and hasColumn to
Schema.
Change-Id: I50b21d921fefbf4d7e8bd1766285258e8014d890
Reviewed-on: http://gerrit.cloudera.org:8080/12879
Tested-by: Kudu Jenkins
Reviewed-by: Mike Percy <mp...@apache.org>
---
java/kudu-backup/src/main/protobuf/backup.proto | 18 +-
.../scala/org/apache/kudu/backup/BackupGraph.scala | 197 +++++++++++++++++++
.../scala/org/apache/kudu/backup/KuduBackup.scala | 87 +++++----
.../org/apache/kudu/backup/KuduBackupOptions.scala | 113 -----------
.../org/apache/kudu/backup/KuduBackupRDD.scala | 91 +++++----
.../scala/org/apache/kudu/backup/KuduRestore.scala | 141 ++++++++------
.../apache/kudu/backup/KuduRestoreOptions.scala | 89 ---------
.../scala/org/apache/kudu/backup/Options.scala | 209 +++++++++++++++++++++
.../scala/org/apache/kudu/backup/RowAction.java | 69 +++++++
.../scala/org/apache/kudu/backup/SessionIO.scala | 193 +++++++++++++++++++
.../org/apache/kudu/backup/TableMetadata.scala | 41 ++--
.../org/apache/kudu/backup/TestBackupGraph.scala | 144 ++++++++++++++
.../org/apache/kudu/backup/TestKuduBackup.scala | 68 +++++--
.../src/main/java/org/apache/kudu/Schema.java | 21 ++-
.../org/apache/kudu/client/AsyncKuduScanner.java | 13 +-
.../java/org/apache/kudu/client/RowResult.java | 9 +
.../org/apache/kudu/spark/kudu/RowConverter.scala | 10 +-
.../org/apache/kudu/spark/kudu/KuduTestSuite.scala | 5 +-
18 files changed, 1123 insertions(+), 395 deletions(-)
diff --git a/java/kudu-backup/src/main/protobuf/backup.proto b/java/kudu-backup/src/main/protobuf/backup.proto
index 372c665..6de3b3e 100644
--- a/java/kudu-backup/src/main/protobuf/backup.proto
+++ b/java/kudu-backup/src/main/protobuf/backup.proto
@@ -35,6 +35,7 @@ message ColumnTypeAttributesMetadataPB {
// Maps to the ColumnSchema class.
// The fields are effectively 1 to 1 mappings of those in ColumnSchema.
+// TODO: How do we handle column additions?
message ColumnMetadataPB {
string name = 1;
string type = 2;
@@ -98,18 +99,21 @@ message PartitionMetadataPB {
// so we can understand and create a table that matches the backed up
// table on restore.
message TableMetadataPB {
+ // A version used to handle any future format/layout changes.
+ int32 version = 1;
// The starting point of a backup. A UNIX timestamp in milliseconds since the epoch.
- int64 from_ms = 1;
+ // If from_ms is 0, this is a full backup.
+ int64 from_ms = 2;
// The end point of a backup. A UNIX timestamp in milliseconds since the epoch.
- int64 to_ms = 2;
+ int64 to_ms = 3;
// The file format used to store the data.
- string data_format = 3;
+ string data_format = 4;
// The name of the table.
- string table_name = 4;
+ string table_name = 5;
// The replication factor of this table.
- int32 num_replicas = 5;
+ int32 num_replicas = 6;
// The metadata for the table's columns.
- repeated ColumnMetadataPB columns = 6;
+ repeated ColumnMetadataPB columns = 7;
// The metadata for the table's partitions.
- PartitionMetadataPB partitions = 7;
+ PartitionMetadataPB partitions = 8;
}
\ No newline at end of file
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/BackupGraph.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/BackupGraph.scala
new file mode 100644
index 0000000..21c7ff9
--- /dev/null
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/BackupGraph.scala
@@ -0,0 +1,197 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.backup
+
+import java.nio.file.Path
+
+import org.apache.kudu.backup.Backup.TableMetadataPB
+import org.apache.yetus.audience.InterfaceAudience
+import org.apache.yetus.audience.InterfaceStability
+
+import scala.collection.mutable
+
+/**
+ * A directed weighted graph of backups used to pick the optimal series of backups and restores.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+class BackupGraph {
+ // Index of backup.fromMs -> backup for use in chaining backups together.
+ private val adjacencyList = mutable.Map[Long, mutable.ListBuffer[BackupNode]]()
+
+ // A full backup has FromMs of 0.
+ private val FullBackupFromMs = 0
+
+ /**
+ * Add a backup to the graph.
+ * @param backup the backup to add.
+ */
+ def addBackup(backup: BackupNode): Unit = {
+ // Add a weighted edge with the backup.
+ addEdge(backup)
+ }
+
+ private def addEdge(backup: BackupNode): Unit = {
+ val adjacentVertices =
+ adjacencyList.getOrElse(backup.metadata.getFromMs, mutable.ListBuffer[BackupNode]())
+ adjacentVertices += backup
+ adjacencyList.put(backup.metadata.getFromMs, adjacentVertices)
+ }
+
+ /**
+ * @return true if the graph has a full backup.
+ */
+ def hasFullBackup: Boolean = fullBackups.nonEmpty
+
+ /**
+ * @return all the full backups in the graph.
+ */
+ def fullBackups: Seq[BackupNode] = {
+ adjacencyList.getOrElse(FullBackupFromMs, Seq())
+ }
+
+ /**
+ * @return the most recent full backup.
+ * @throws IllegalStateException if no full backup exists.
+ */
+ def mostRecentFull: BackupNode = {
+ if (!hasFullBackup) throw new IllegalStateException("No full backup exists")
+ fullBackups.maxBy(_.metadata.getToMs)
+ }
+
+ /**
+ * @return all backup paths in the graph.
+ */
+ def backupPaths: Seq[BackupPath] = {
+ allPaths(FullBackupFromMs, List()).map(BackupPath)
+ }
+
+ private def allPaths(fromMs: Long, path: List[BackupNode]): List[List[BackupNode]] = {
+ if (!adjacencyList.contains(fromMs)) {
+ List(path)
+ } else {
+ adjacencyList(fromMs).flatMap { node =>
+ allPaths(node.metadata.getToMs, path ++ List(node))
+ }.toList
+ }
+ }
+
+ /**
+ * Returns the backup that should be used as the base for the next backup.
+ *
+ * The logic for picking this backup is as follows:
+ *
+ * 1. Pick the paths with the most recent full backup.
+ * 2. If there are multiple paths, pick the path with the most recent partial backup.
+ * 3. If there are multiple paths, pick the path with the lowest weight.
+ *
+ * This allows concurrent full backups to be taken (or generated by compaction)
+ * while also taking incremental backups.
+ *
+ * While a full backup is running incremental backups will continue to build
+ * off the chain from the previous full. When the new full completes, the
+ * next incremental backup will use that full its "current" chain.
+ *
+ * @throws IllegalStateException if no full backup exists.
+ */
+ def backupBase: BackupNode = {
+ // 1. Pick the paths with the most recent full backup.
+ val recentFulls = backupPaths.filter(_.fullBackup == mostRecentFull)
+
+ // 2. If there are multiple paths, pick the path with the most recent partial backup.
+ val maxToMs = recentFulls.maxBy(_.toMs).toMs
+ val recentPaths = recentFulls.filter(_.toMs == maxToMs)
+
+ // 3. If there are multiple paths, pick the path with the lowest weight.
+ recentPaths.minBy(_.weight).lastBackup
+ }
+
+ /**
+ * Returns a sequence of backups that should be used to restore.
+ *
+ * The logic for picking this path is as follows:
+ *
+ * 1. Pick the path with the most recent backup.
+ * 2. If there are multiple paths, pick the path with the lowest weight.
+ *
+ * This ensures we always restore the most current state of the data while
+ * also picking the most efficient path (likely a result of compaction).
+ *
+ * @throws IllegalStateException if no full backup exists.
+ */
+ def restorePath: BackupPath = {
+ // 1. Pick the path with the most recent backup.
+ val maxToMs = backupPaths.maxBy(_.toMs).toMs
+ val recentPaths = backupPaths.filter(_.toMs == maxToMs)
+
+ // 2. If there are multiple paths, pick the path with the lowest weight.
+ recentPaths.minBy(_.weight)
+ }
+
+ /**
+ * Returns a new BackupGraph that represents the graph including only nodes with a ToMS equal
+ * to or less than the specified time.
+ * @param timeMs the time to filter by.
+ * @return
+ */
+ def filterByTime(timeMs: Long): BackupGraph = {
+ val result = new BackupGraph()
+ val distinctBackups = adjacencyList.values.flatten.toSet
+ distinctBackups.filter(_.metadata.getToMs <= timeMs).foreach(result.addBackup)
+ result
+ }
+}
+
+/**
+ * Node class to represent nodes in the backup graph.
+ */
+case class BackupNode(path: Path, metadata: TableMetadataPB) {
+
+ /**
+ * @return The weight/cost of this Node.
+ */
+ def weight: Int = {
+ // Full backups have a weight of 0 and partial backups have a weight of 1.
+ // TODO: Use the size of a partial backup to contribute to weight.
+ if (metadata.getFromMs == 0) 0 else 1
+ }
+}
+
+/**
+ * A backup path is a full backup with a series of incremental backups.
+ */
+case class BackupPath(backups: Seq[BackupNode]) {
+
+ def fullBackup: BackupNode = backups.head
+
+ def lastBackup: BackupNode = backups.last
+
+ /**
+ * @return the toMs for the entire path.
+ */
+ def toMs: Long = backups.last.metadata.getToMs
+
+ /**
+ * @return the weight/cost of the path.
+ */
+ def weight: Int = backups.map(_.weight).sum
+
+ /**
+ * @return A string useful for debugging the path.
+ */
+ def pathString: String = backups.map(_.metadata.getFromMs).mkString(" -> ")
+}
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala
index 3280c0d..a8d89f6 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala
@@ -16,16 +16,7 @@
// under the License.
package org.apache.kudu.backup
-import java.net.URLEncoder
-import java.nio.charset.StandardCharsets
-import java.nio.file.Path
-import java.nio.file.Paths
-
-import com.google.protobuf.util.JsonFormat
-import org.apache.hadoop.fs.{Path => HPath}
-import org.apache.kudu.backup.Backup.TableMetadataPB
import org.apache.kudu.spark.kudu.KuduContext
-import org.apache.kudu.spark.kudu.SparkUtil._
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SparkSession
import org.apache.yetus.audience.InterfaceAudience
@@ -33,57 +24,71 @@ import org.apache.yetus.audience.InterfaceStability
import org.slf4j.Logger
import org.slf4j.LoggerFactory
+/**
+ * The main class for a Kudu backup spark job.
+ *
+ * Example Usage:
+ * spark-submit --class org.apache.kudu.backup.KuduBackup kudu-backup2_2.11-*.jar \
+ * --kuduMasterAddresses master1-host,master-2-host,master-3-host \
+ * --rootPath hdfs:///kudu/backup/path \
+ * my_kudu_table
+ */
@InterfaceAudience.Private
@InterfaceStability.Unstable
object KuduBackup {
val log: Logger = LoggerFactory.getLogger(getClass)
- def run(options: KuduBackupOptions, session: SparkSession): Unit = {
+ def run(options: BackupOptions, session: SparkSession): Unit = {
+ log.info(s"Backing up to root path: ${options.rootPath}")
val context =
new KuduContext(
options.kuduMasterAddresses,
session.sparkContext
)
- val path = options.path
- log.info(s"Backing up to path: $path")
-
+ val io = new SessionIO(session, options)
// TODO: Make parallel so each table isn't process serially?
- options.tables.foreach { t =>
- val table = context.syncClient.openTable(t)
- val tablePath = Paths.get(path).resolve(URLEncoder.encode(t, "UTF-8"))
+ options.tables.foreach { tableName =>
+ var tableOptions = options.copy() // Copy the options so we can modify them for the table.
+ val table = context.syncClient.openTable(tableName)
+ val backupPath = io.backupPath(tableName, tableOptions.toMs)
+ val metadataPath = io.backupMetadataPath(backupPath)
+ log.info(s"Backing up table $tableName to path: $backupPath")
- val rdd = new KuduBackupRDD(table, options, context, session.sparkContext)
+ // Unless we are forcing a full backup or a fromMs was set, find the previous backup and
+ // use the `to_ms` metadata as the `from_ms` time for this backup.
+ if (!tableOptions.forceFull || tableOptions.fromMs != 0) {
+ log.info("No fromMs option set, looking for a previous backup.")
+ val graph = io.readBackupGraph(tableName)
+ if (graph.hasFullBackup) {
+ val base = graph.backupBase
+ log.info(s"Setting fromMs to ${base.metadata.getToMs} from backup in path: ${base.path}")
+ tableOptions = tableOptions.copy(fromMs = base.metadata.getToMs)
+ } else {
+ log.info("No full backup was found. Starting a full backup.")
+ tableOptions = tableOptions.copy(fromMs = 0)
+ }
+ }
+ val rdd = new KuduBackupRDD(table, tableOptions, context, session.sparkContext)
val df =
- session.sqlContext.createDataFrame(rdd, sparkSchema(table.getSchema))
- // TODO: Prefix path with the time? Maybe a backup "name" parameter defaulted to something?
- // TODO: Take parameter for the SaveMode.
+ session.sqlContext
+ .createDataFrame(rdd, io.dataSchema(table.getSchema, options.isIncremental))
+
+ // Write the data to the backup path.
+ // The backup path contains the timestampMs and should not already exist.
val writer = df.write.mode(SaveMode.ErrorIfExists)
- // TODO: Restrict format option.
- // TODO: We need to cleanup partial output on failure otherwise.
- // retries of the entire job will fail because the file already exists.
- writer.format(options.format).save(tablePath.toString)
+ writer
+ .format(tableOptions.format)
+ .save(backupPath.toString)
- val tableMetadata = TableMetadata.getTableMetadata(table, options)
- writeTableMetadata(tableMetadata, tablePath, session)
+ // Generate and output the new metadata for this table.
+ // The existence of metadata indicates this backup was successful.
+ val tableMetadata = TableMetadata.getTableMetadata(table, tableOptions)
+ io.writeTableMetadata(tableMetadata, metadataPath)
}
}
- private def writeTableMetadata(
- metadata: TableMetadataPB,
- path: Path,
- session: SparkSession): Unit = {
- val conf = session.sparkContext.hadoopConfiguration
- val hPath = new HPath(path.resolve(TableMetadata.MetadataFileName).toString)
- val fs = hPath.getFileSystem(conf)
- val out = fs.create(hPath, /* overwrite= */ false)
- val json = JsonFormat.printer().print(metadata)
- out.write(json.getBytes(StandardCharsets.UTF_8))
- out.flush()
- out.close()
- }
-
def main(args: Array[String]): Unit = {
- val options = KuduBackupOptions
+ val options = BackupOptions
.parse(args)
.getOrElse(throw new IllegalArgumentException("could not parse the arguments"))
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupOptions.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupOptions.scala
deleted file mode 100644
index a1d7519..0000000
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupOptions.scala
+++ /dev/null
@@ -1,113 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.apache.kudu.backup
-
-import java.net.InetAddress
-
-import org.apache.kudu.client.AsyncKuduClient
-import org.apache.yetus.audience.InterfaceAudience
-import org.apache.yetus.audience.InterfaceStability
-import scopt.OptionParser
-
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-case class KuduBackupOptions(
- tables: Seq[String],
- path: String,
- kuduMasterAddresses: String = InetAddress.getLocalHost.getCanonicalHostName,
- timestampMs: Long = System.currentTimeMillis(),
- format: String = KuduBackupOptions.DefaultFormat,
- scanBatchSize: Int = KuduBackupOptions.DefaultScanBatchSize,
- scanRequestTimeoutMs: Long = KuduBackupOptions.DefaultScanRequestTimeoutMs,
- scanPrefetching: Boolean = KuduBackupOptions.DefaultScanPrefetching,
- keepAlivePeriodMs: Long = KuduBackupOptions.defaultKeepAlivePeriodMs)
-
-object KuduBackupOptions {
- val DefaultFormat: String = "parquet"
- val DefaultScanBatchSize: Int = 1024 * 1024 * 20 // 20 MiB
- val DefaultScanRequestTimeoutMs: Long =
- AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS // 30 seconds
- val DefaultScanPrefetching
- : Boolean = false // TODO: Add a test per KUDU-1260 and enable by default?
- val defaultKeepAlivePeriodMs: Long = AsyncKuduClient.DEFAULT_KEEP_ALIVE_PERIOD_MS
-
- // TODO: clean up usage output.
- // TODO: timeout configurations.
- private val parser: OptionParser[KuduBackupOptions] =
- new OptionParser[KuduBackupOptions]("KuduBackup") {
- opt[String]("path")
- .action((v, o) => o.copy(path = v))
- .text("The root path to output backup data. Accepts any Spark compatible path.")
- .optional()
-
- opt[String]("kuduMasterAddresses")
- .action((v, o) => o.copy(kuduMasterAddresses = v))
- .text("Comma-separated addresses of Kudu masters.")
- .optional()
-
- opt[Long]("timestampMs")
- .action((v, o) => o.copy(timestampMs = v))
- // TODO: Document the limitations based on cluster configuration (ex: ancient history watermark).
- .text("A UNIX timestamp in milliseconds since the epoch to execute scans at.")
- .optional()
-
- opt[String]("format")
- .action((v, o) => o.copy(format = v))
- .text("The file format to use when writing the data.")
- .optional()
-
- opt[Int]("scanBatchSize")
- .action((v, o) => o.copy(scanBatchSize = v))
- .text("The maximum number of bytes returned by the scanner, on each batch.")
- .optional()
-
- opt[Int]("scanRequestTimeoutMs")
- .action((v, o) => o.copy(scanRequestTimeoutMs = v))
- .text("Sets how long in milliseconds each scan request to a server can last.")
- .optional()
-
- opt[Unit]("scanPrefetching")
- .action((_, o) => o.copy(scanPrefetching = true))
- .text("An experimental flag to enable pre-fetching data.")
- .optional()
-
- opt[Long]("keepAlivePeriodMs")
- .action((v, o) => o.copy(keepAlivePeriodMs = v))
- .text(
- "Sets the period at which to send keep-alive requests to the tablet server to ensure" +
- " that scanners do not time out")
- .optional()
-
- arg[String]("<table>...")
- .unbounded()
- .action((v, o) => o.copy(tables = o.tables :+ v))
- .text("A list of tables to be backed up.")
- }
-
- /**
- * Parses the passed arguments into Some[KuduBackupOptions].
- *
- * If the arguments are bad, an error message is displayed
- * and None is returned.
- *
- * @param args The arguments to parse.
- * @return Some[KuduBackupOptions] if parsing was successful, None if not.
- */
- def parse(args: Seq[String]): Option[KuduBackupOptions] = {
- parser.parse(args, KuduBackupOptions(Seq(), null))
- }
-}
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala
index 55f7f98..544e7b2 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala
@@ -1,19 +1,19 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
package org.apache.kudu.backup
import java.util.concurrent.TimeUnit
@@ -37,39 +37,46 @@ import scala.collection.JavaConverters._
@InterfaceStability.Unstable
class KuduBackupRDD private[kudu] (
@transient val table: KuduTable,
- @transient val options: KuduBackupOptions,
+ @transient val options: BackupOptions,
val kuduContext: KuduContext,
@transient val sc: SparkContext)
extends RDD[Row](sc, Nil) {
// Defined here because the options are transient.
- private val keepAlivePeriodMs = options.keepAlivePeriodMs
+ val incremental: Boolean = options.isIncremental
// TODO: Split large tablets into smaller scan tokens?
override protected def getPartitions: Array[Partition] = {
val client = kuduContext.syncClient
- // Set a hybrid time for the scan to ensure application consistency.
- val timestampMicros = TimeUnit.MILLISECONDS.toMicros(options.timestampMs)
- val hybridTime =
- HybridTimeUtil.physicalAndLogicalToHTTimestamp(timestampMicros, 0)
-
- // Create the scan tokens for each partition.
- val tokens = client
+ val builder = client
.newScanTokenBuilder(table)
.cacheBlocks(false)
- // TODO: Use fault tolerant scans to get mostly.
- // ordered results when KUDU-2466 is fixed.
+ // TODO: Use fault tolerant scans to get mostly ordered results when KUDU-2466 is fixed.
// .setFaultTolerant(true)
.replicaSelection(ReplicaSelection.CLOSEST_REPLICA)
.readMode(ReadMode.READ_AT_SNAPSHOT)
- .snapshotTimestampRaw(hybridTime)
.batchSizeBytes(options.scanBatchSize)
.scanRequestTimeout(options.scanRequestTimeoutMs)
.prefetching(options.scanPrefetching)
.keepAlivePeriodMs(options.keepAlivePeriodMs)
- .build()
+ // Set a hybrid time for the scan to ensure application consistency.
+ val toMicros = TimeUnit.MILLISECONDS.toMicros(options.toMs)
+ val toHTT =
+ HybridTimeUtil.physicalAndLogicalToHTTimestamp(toMicros, 0)
+
+ if (incremental) {
+ val fromMicros = TimeUnit.MILLISECONDS.toMicros(options.fromMs)
+ val fromHTT =
+ HybridTimeUtil.physicalAndLogicalToHTTimestamp(fromMicros, 0)
+ builder.diffScan(fromHTT, toHTT)
+ } else {
+ builder.snapshotTimestampRaw(toHTT)
+ }
+
+ // Create the scan tokens for each partition.
+ val tokens = builder.build()
tokens.asScala.zipWithIndex.map {
case (token, index) =>
// TODO: Support backups from any replica or followers only.
@@ -85,12 +92,11 @@ class KuduBackupRDD private[kudu] (
override def compute(part: Partition, taskContext: TaskContext): Iterator[Row] = {
val client: KuduClient = kuduContext.syncClient
val partition: KuduBackupPartition = part.asInstanceOf[KuduBackupPartition]
- // TODO: Get deletes and updates for incremental backups.
val scanner =
KuduScanToken.deserializeIntoScanner(partition.scanToken, client)
// We don't store the RowResult so we can enable the reuseRowResult optimization.
scanner.setReuseRowResult(true)
- new RowIterator(scanner, kuduContext)
+ new RowIterator(scanner, kuduContext, incremental)
}
override def getPreferredLocations(partition: Partition): Seq[String] = {
@@ -109,7 +115,10 @@ private case class KuduBackupPartition(index: Int, scanToken: Array[Byte], locat
* that takes the job partitions and task context and expects to return an Iterator[Row].
* This implementation facilitates that.
*/
-private class RowIterator(private val scanner: KuduScanner, val kuduContext: KuduContext)
+private class RowIterator(
+ private val scanner: KuduScanner,
+ val kuduContext: KuduContext,
+ val incremental: Boolean)
extends Iterator[Row] {
private val scannerIterator = scanner.iterator()
@@ -130,11 +139,25 @@ private class RowIterator(private val scanner: KuduScanner, val kuduContext: Kud
// sort of zero copy/object pool pattern for performance (we could use that here).
override def next(): Row = {
val rowResult = scannerIterator.next()
- val columnCount = rowResult.getColumnProjection.getColumnCount
- val columns = Array.ofDim[Any](columnCount)
+ val fieldCount = rowResult.getColumnProjection.getColumnCount
+ // If this is an incremental backup, the last column is the is_deleted column.
+ val columnCount = if (incremental) fieldCount - 1 else fieldCount
+ val columns = Array.ofDim[Any](fieldCount)
for (i <- 0 until columnCount) {
columns(i) = rowResult.getObject(i)
}
+ // If this is an incremental backup, translate the is_deleted column into
+ // the "change_type" column as the last field.
+ if (incremental) {
+ val rowAction = if (rowResult.isDeleted) {
+ RowAction.DELETE.getValue
+ } else {
+ // If the row is not deleted, we do not know if it was inserted or updated,
+ // so we use upsert.
+ RowAction.UPSERT.getValue
+ }
+ columns(fieldCount - 1) = rowAction
+ }
Row.fromSeq(columns)
}
}
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
index b8a94b7..50517a9 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
@@ -16,82 +16,115 @@
// under the License.
package org.apache.kudu.backup
-import java.io.InputStreamReader
-import java.net.URLEncoder
-import java.nio.charset.StandardCharsets
-import java.nio.file.Path
-import java.nio.file.Paths
-
-import com.google.common.io.CharStreams
-import com.google.protobuf.util.JsonFormat
-import org.apache.hadoop.fs.{Path => HPath}
-
import org.apache.kudu.backup.Backup.TableMetadataPB
import org.apache.kudu.client.AlterTableOptions
+import org.apache.kudu.client.SessionConfiguration.FlushMode
import org.apache.kudu.spark.kudu.KuduContext
-import org.apache.kudu.spark.kudu.KuduWriteOptions
+import org.apache.kudu.spark.kudu.RowConverter
+import org.apache.parquet.hadoop.ParquetInputFormat
import org.apache.spark.sql.SparkSession
import org.apache.yetus.audience.InterfaceAudience
import org.apache.yetus.audience.InterfaceStability
import org.slf4j.Logger
import org.slf4j.LoggerFactory
+/**
+ * The main class for a Kudu restore spark job.
+ *
+ * Example Usage:
+ * spark-submit --class org.apache.kudu.backup.KuduRestore kudu-backup2_2.11-*.jar \
+ * --kuduMasterAddresses master1-host,master-2-host,master-3-host \
+ * --rootPath hdfs:///kudu/backup/path \
+ * my_kudu_table
+ */
@InterfaceAudience.Private
@InterfaceStability.Unstable
object KuduRestore {
val log: Logger = LoggerFactory.getLogger(getClass)
- def run(options: KuduRestoreOptions, session: SparkSession): Unit = {
+ def run(options: RestoreOptions, session: SparkSession): Unit = {
+ log.info(s"Restoring from path: ${options.rootPath}")
val context =
- new KuduContext(options.kuduMasterAddresses, session.sparkContext)
- val path = options.path
- log.info(s"Restoring from path: $path")
+ new KuduContext(
+ options.kuduMasterAddresses,
+ session.sparkContext
+ )
+ val io = new SessionIO(session, options)
// TODO: Make parallel so each table isn't processed serially.
- options.tables.foreach { t =>
- val tableEncoded = URLEncoder.encode(t, "UTF-8")
- val tablePath = Paths.get(path).resolve(tableEncoded)
- val metadataPath = getMetadataPath(tableEncoded, options)
- val metadata = readTableMetadata(metadataPath, session)
- val restoreName = s"${metadata.getTableName}${options.tableSuffix}"
- val table =
- if (options.createTables) {
- createTableRangePartitionByRangePartition(restoreName, metadata, context)
- } else {
- context.syncClient.openTable(restoreName)
+ options.tables.foreach { tableName =>
+ // TODO: Consider an option to enforce an exact toMS match.
+ val graph = io.readBackupGraph(tableName).filterByTime(options.timestampMs)
+ graph.restorePath.backups.foreach { backup =>
+ log.info(s"Restoring table $tableName from path: ${backup.path}")
+ val isFullRestore = backup.metadata.getFromMs == 0
+ val restoreName = s"${backup.metadata.getTableName}${options.tableSuffix}"
+ // TODO: Store the full metadata to compare/validate for each applied partial.
+
+ // On the full restore we may need to create the table.
+ if (isFullRestore) {
+ if (options.createTables) {
+ log.info(s"Creating restore table $restoreName")
+ createTableRangePartitionByRangePartition(restoreName, backup.metadata, context)
+ }
}
+ val table = context.syncClient.openTable(restoreName)
+ val restoreSchema = io.dataSchema(table.getSchema)
+ val rowActionCol = restoreSchema.fields.last.name
- // TODO: Restrict format option.
- val df = session.sqlContext.read
- .format(metadata.getDataFormat)
- .load(tablePath.toString)
- val writeOptions = new KuduWriteOptions(ignoreDuplicateRowErrors = false, ignoreNull = false)
- // TODO: Use client directly for more control?
- // (session timeout, consistency mode, flush interval, mutation buffer space)
+ // TODO: Restrict format option.
+ var data = session.sqlContext.read
+ .format(backup.metadata.getDataFormat)
+ .schema(restoreSchema)
+ .load(backup.path.toString)
+ // Default the the row action column with a value of "UPSERT" so that the
+ // rows from a full backup, which don't have a row action, are upserted.
+ .na
+ .fill(RowAction.UPSERT.getValue, Seq(rowActionCol))
- // Upsert so that Spark task retries do not fail.
- context.upsertRows(df, restoreName, writeOptions)
+ // TODO: Expose more configuration options:
+ // (session timeout, consistency mode, flush interval, mutation buffer space)
+ data.queryExecution.toRdd.foreachPartition { internalRows =>
+ val table = context.syncClient.openTable(restoreName)
+ val converter = new RowConverter(table.getSchema, restoreSchema, false)
+ val session = context.syncClient.newSession
+ session.setFlushMode(FlushMode.AUTO_FLUSH_BACKGROUND)
+ try {
+ for (internalRow <- internalRows) {
+ // Convert the InternalRows to Rows.
+ // This avoids any corruption as reported in SPARK-26880.
+ val row = converter.toRow(internalRow)
+ // Get the operation type based on the row action column.
+ // This will always be the last column in the row.
+ val rowActionValue = row.getByte(row.length - 1)
+ val rowAction = RowAction.fromValue(rowActionValue)
+ // Generate an operation based on the row action.
+ val operation = rowAction match {
+ case RowAction.UPSERT => table.newUpsert()
+ case RowAction.DELETE => table.newDelete()
+ case _ => throw new IllegalStateException(s"Unsupported RowAction: $rowAction")
+ }
+ // Convert the Spark row to a partial row and set it on the operation.
+ val partialRow = converter.toPartialRow(row)
+ operation.setRow(partialRow)
+ session.apply(operation)
+ }
+ } finally {
+ session.close()
+ }
+ // Fail the task if there are any errors.
+ val errorCount = session.getPendingErrors.getRowErrors.length
+ if (errorCount > 0) {
+ val errors =
+ session.getPendingErrors.getRowErrors.take(5).map(_.getErrorStatus).mkString
+ throw new RuntimeException(
+ s"failed to write $errorCount rows from DataFrame to Kudu; sample errors: $errors")
+ }
+ }
+ }
}
}
- private def getMetadataPath(tableName: String, options: KuduRestoreOptions): Path = {
- val rootPath =
- if (options.metadataPath.isEmpty) options.path else options.metadataPath
- Paths.get(rootPath).resolve(tableName)
- }
-
- private def readTableMetadata(path: Path, session: SparkSession): TableMetadataPB = {
- val conf = session.sparkContext.hadoopConfiguration
- val hPath = new HPath(path.resolve(TableMetadata.MetadataFileName).toString)
- val fs = hPath.getFileSystem(conf)
- val in = new InputStreamReader(fs.open(hPath), StandardCharsets.UTF_8)
- val json = CharStreams.toString(in)
- in.close()
- val builder = TableMetadataPB.newBuilder()
- JsonFormat.parser().merge(json, builder)
- builder.build()
- }
-
// Kudu isn't good at creating a lot of tablets at once, and by default tables may only be created
// with at most 60 tablets. Additional tablets can be added later by adding range partitions. So,
// to restore tables with more tablets than that, we need to create the table piece-by-piece. This
@@ -121,7 +154,7 @@ object KuduRestore {
}
def main(args: Array[String]): Unit = {
- val options = KuduRestoreOptions
+ val options = RestoreOptions
.parse(args)
.getOrElse(throw new IllegalArgumentException("could not parse the arguments"))
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestoreOptions.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestoreOptions.scala
deleted file mode 100644
index 66de017..0000000
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestoreOptions.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.apache.kudu.backup
-
-import java.net.InetAddress
-
-import org.apache.yetus.audience.InterfaceAudience
-import org.apache.yetus.audience.InterfaceStability
-import scopt.OptionParser
-
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-case class KuduRestoreOptions(
- tables: Seq[String],
- path: String,
- kuduMasterAddresses: String = InetAddress.getLocalHost.getCanonicalHostName,
- tableSuffix: String = KuduRestoreOptions.DefaultTableSuffix,
- createTables: Boolean = KuduRestoreOptions.DefaultCreateTables,
- metadataPath: String = "")
-
-object KuduRestoreOptions {
- val DefaultTableSuffix: String = "-restore"
- val DefaultCreateTables: Boolean = true
-
- // TODO: clean up usage output.
- // TODO: timeout configurations.
- private val parser: OptionParser[KuduRestoreOptions] =
- new OptionParser[KuduRestoreOptions]("KuduRestore") {
- opt[String]("path")
- .action((v, o) => o.copy(path = v))
- .text("The root path to the backup data. Accepts any Spark compatible path.")
- .optional()
-
- opt[String]("kuduMasterAddresses")
- .action((v, o) => o.copy(kuduMasterAddresses = v))
- .text("Comma-separated addresses of Kudu masters.")
- .optional()
-
- opt[Boolean]("createTables")
- .action((v, o) => o.copy(createTables = v))
- .text("true to create tables during restore, false if they already exist.")
- .optional()
-
- opt[String]("tableSuffix")
- .action((v, o) => o.copy(tableSuffix = v))
- .text("The suffix to add to the restored table names. Only used when createTables is true.")
- .optional()
-
- opt[String]("metadataPath")
- .action((v, o) => o.copy(metadataPath = v))
- .text(
- "The root path to look for table metadata. This can be used to change the properties of " +
- "the tables created during restore. By default the backup path is used. " +
- "Only used when createTables is true.")
- .optional()
-
- arg[String]("<table>...")
- .unbounded()
- .action((v, o) => o.copy(tables = o.tables :+ v))
- .text("A list of tables to be restored.")
- }
-
- /**
- * Parses the passed arguments into Some[KuduRestoreOptions].
- *
- * If the arguments are bad, an error message is displayed
- * and None is returned.
- *
- * @param args The arguments to parse.
- * @return Some[KuduRestoreOptions] if parsing was successful, None if not.
- */
- def parse(args: Seq[String]): Option[KuduRestoreOptions] = {
- parser.parse(args, KuduRestoreOptions(Seq(), null))
- }
-}
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala
new file mode 100644
index 0000000..a57931f
--- /dev/null
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala
@@ -0,0 +1,209 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.backup
+
+import java.net.InetAddress
+
+import org.apache.kudu.client.AsyncKuduClient
+import org.apache.yetus.audience.InterfaceAudience
+import org.apache.yetus.audience.InterfaceStability
+import scopt.OptionParser
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+trait CommonOptions {
+ val tables: Seq[String]
+ val rootPath: String
+ val kuduMasterAddresses: String
+}
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+case class BackupOptions(
+ tables: Seq[String],
+ rootPath: String,
+ kuduMasterAddresses: String = InetAddress.getLocalHost.getCanonicalHostName,
+ toMs: Long = System.currentTimeMillis(),
+ forceFull: Boolean = BackupOptions.DefaultForceFull,
+ fromMs: Long = 0,
+ format: String = BackupOptions.DefaultFormat,
+ scanBatchSize: Int = BackupOptions.DefaultScanBatchSize,
+ scanRequestTimeoutMs: Long = BackupOptions.DefaultScanRequestTimeoutMs,
+ scanPrefetching: Boolean = BackupOptions.DefaultScanPrefetching,
+ keepAlivePeriodMs: Long = BackupOptions.DefaultKeepAlivePeriodMs)
+ extends CommonOptions {
+
+ // If not forcing a full backup and fromMs is not zero, this is an incremental backup.
+ def isIncremental: Boolean = {
+ !forceFull && fromMs != 0
+ }
+}
+
+object BackupOptions {
+ val DefaultForceFull: Boolean = false
+ val DefaultFormat: String = "parquet"
+ val DefaultScanBatchSize: Int = 1024 * 1024 * 20 // 20 MiB
+ val DefaultScanRequestTimeoutMs: Long =
+ AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS // 30 seconds
+ // TODO: Add a test per KUDU-1260 and enable by default?
+ val DefaultScanPrefetching: Boolean = false
+ val DefaultKeepAlivePeriodMs: Long = AsyncKuduClient.DEFAULT_KEEP_ALIVE_PERIOD_MS
+
+ // TODO: clean up usage output.
+ // TODO: timeout configurations.
+ private val parser: OptionParser[BackupOptions] =
+ new OptionParser[BackupOptions]("KuduBackup") {
+ opt[String]("rootPath")
+ .action((v, o) => o.copy(rootPath = v))
+ .text("The root path to output backup data. Accepts any Spark compatible path.")
+ .optional()
+
+ opt[String]("kuduMasterAddresses")
+ .action((v, o) => o.copy(kuduMasterAddresses = v))
+ .text("Comma-separated addresses of Kudu masters.")
+ .optional()
+
+ opt[Boolean]("forceFull")
+ .action((v, o) => o.copy(forceFull = v))
+ .text("If true, this will be a full backup even if another full already exists.")
+ .optional()
+
+ opt[Long]("fromMs")
+ .action((v, o) => o.copy(fromMs = v))
+ .text(
+ "A UNIX timestamp in milliseconds that defines the start time of an incremental backup.")
+ .optional()
+
+ opt[Long]("timestampMs")
+ .action((v, o) => o.copy(toMs = v))
+ // TODO: Document the limitations based on cluster configuration
+ // (ex: ancient history watermark).
+ .text("A UNIX timestamp in milliseconds since the epoch to execute scans at.")
+ .optional()
+
+ opt[String]("format")
+ .action((v, o) => o.copy(format = v))
+ .text("The file format to use when writing the data.")
+ .optional()
+
+ opt[Int]("scanBatchSize")
+ .action((v, o) => o.copy(scanBatchSize = v))
+ .text("The maximum number of bytes returned by the scanner, on each batch.")
+ .optional()
+
+ opt[Int]("scanRequestTimeoutMs")
+ .action((v, o) => o.copy(scanRequestTimeoutMs = v))
+ .text("Sets how long in milliseconds each scan request to a server can last.")
+ .optional()
+
+ opt[Unit]("scanPrefetching")
+ .action((_, o) => o.copy(scanPrefetching = true))
+ .text("An experimental flag to enable pre-fetching data.")
+ .optional()
+
+ opt[Long]("keepAlivePeriodMs")
+ .action((v, o) => o.copy(keepAlivePeriodMs = v))
+ .text(
+ "Sets the period at which to send keep-alive requests to the tablet server to ensure" +
+ " that scanners do not time out")
+ .optional()
+
+ arg[String]("<table>...")
+ .unbounded()
+ .action((v, o) => o.copy(tables = o.tables :+ v))
+ .text("A list of tables to be backed up.")
+ }
+
+ /**
+ * Parses the passed arguments into Some[KuduBackupOptions].
+ *
+ * If the arguments are bad, an error message is displayed
+ * and None is returned.
+ *
+ * @param args The arguments to parse.
+ * @return Some[KuduBackupOptions] if parsing was successful, None if not.
+ */
+ def parse(args: Seq[String]): Option[BackupOptions] = {
+ parser.parse(args, BackupOptions(Seq(), null))
+ }
+}
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+case class RestoreOptions(
+ tables: Seq[String],
+ rootPath: String,
+ kuduMasterAddresses: String = InetAddress.getLocalHost.getCanonicalHostName,
+ tableSuffix: String = RestoreOptions.DefaultTableSuffix,
+ createTables: Boolean = RestoreOptions.DefaultCreateTables,
+ timestampMs: Long = System.currentTimeMillis()
+) extends CommonOptions
+
+object RestoreOptions {
+ val DefaultTableSuffix: String = "-restore"
+ val DefaultCreateTables: Boolean = true
+
+ // TODO: clean up usage output.
+ // TODO: timeout configurations.
+ private val parser: OptionParser[RestoreOptions] =
+ new OptionParser[RestoreOptions]("KuduRestore") {
+ opt[String]("rootPath")
+ .action((v, o) => o.copy(rootPath = v))
+ .text("The root path to the backup data. Accepts any Spark compatible path.")
+ .optional()
+
+ opt[String]("kuduMasterAddresses")
+ .action((v, o) => o.copy(kuduMasterAddresses = v))
+ .text("Comma-separated addresses of Kudu masters.")
+ .optional()
+
+ opt[Boolean]("createTables")
+ .action((v, o) => o.copy(createTables = v))
+ .text("true to create tables during restore, false if they already exist.")
+ .optional()
+
+ opt[String]("tableSuffix")
+ .action((v, o) => o.copy(tableSuffix = v))
+ .text("The suffix to add to the restored table names. Only used when createTables is true.")
+ .optional()
+
+ opt[Long]("timestampMs")
+ .action((v, o) => o.copy(timestampMs = v))
+ .text(
+ "A UNIX timestamp in milliseconds that define the latest time to use when selecting " +
+ "restore candidates.")
+ .optional()
+
+ arg[String]("<table>...")
+ .unbounded()
+ .action((v, o) => o.copy(tables = o.tables :+ v))
+ .text("A list of tables to be restored.")
+ }
+
+ /**
+ * Parses the passed arguments into Some[KuduRestoreOptions].
+ *
+ * If the arguments are bad, an error message is displayed
+ * and None is returned.
+ *
+ * @param args The arguments to parse.
+ * @return Some[KuduRestoreOptions] if parsing was successful, None if not.
+ */
+ def parse(args: Seq[String]): Option[RestoreOptions] = {
+ parser.parse(args, RestoreOptions(Seq(), null))
+ }
+}
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/RowAction.java b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/RowAction.java
new file mode 100644
index 0000000..54bf7ae
--- /dev/null
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/RowAction.java
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.backup;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * A RowAction is used to represent the action associated with a backed up row.
+ *
+ * Currently UPSERT is the default action, while rows with the IS_DELETED virtual
+ * column have an action of DELETE. This value is serialized as a byte in the
+ * output data format to be as space efficient as possible.
+ *
+ * Given there are currently only 2 options, IS_DELETED or not, we could have used an
+ * IS_DELETED boolean column in the output format, but this RowAction allows for greater
+ * format flexibility to support INSERT or UPDATE in the future if we have full fidelity
+ * and sparse row backups.
+ *
+ * See {@link RowIterator} for backup side usage and {@link KuduRestore} for restore
+ * side usage.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public enum RowAction {
+
+ UPSERT((byte) 0),
+ DELETE((byte) 1);
+
+ /** The byte value used to represent this RowAction */
+ private final byte value;
+
+ private static Map<Byte, RowAction> byteRowAction;
+ static {
+ byteRowAction = new ImmutableMap.Builder<Byte, RowAction>()
+ .put(UPSERT.getValue(), UPSERT)
+ .put(DELETE.getValue(), DELETE)
+ .build();
+ }
+
+ RowAction(byte value) {
+ this.value = value;
+ }
+
+ public byte getValue() {
+ return value;
+ }
+
+ public static RowAction fromValue(Byte value) {
+ return byteRowAction.get(value);
+ }
+}
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/SessionIO.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/SessionIO.scala
new file mode 100644
index 0000000..8977174
--- /dev/null
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/SessionIO.scala
@@ -0,0 +1,193 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.backup
+
+import java.io.InputStreamReader
+import java.net.URLEncoder
+import java.nio.charset.StandardCharsets
+import java.nio.file.Path
+import java.nio.file.Paths
+
+import com.google.common.io.CharStreams
+import com.google.protobuf.util.JsonFormat
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.{Path => HPath}
+import org.apache.kudu.Schema
+import org.apache.kudu.backup.Backup.TableMetadataPB
+import org.apache.kudu.backup.SessionIO._
+import org.apache.kudu.spark.kudu.SparkUtil
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.ByteType
+import org.apache.spark.sql.types.StructField
+import org.apache.spark.sql.types.StructType
+import org.apache.yetus.audience.InterfaceAudience
+import org.apache.yetus.audience.InterfaceStability
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+/**
+ * A class to encapsulate and centralize the logic for data layout and IO
+ * of metadata and data of the backup and restore jobs.
+ *
+ * The default backup directory structure is:
+ * /<rootPath>/<tableName>/<backup-id>/
+ * .kudu-metadata.json
+ * part-*.parquet
+ *
+ * In the above path the `/<rootPath>` can be used to distinguish separate backup groups.
+ * The `<backup-id>` is currently the `toMs` time for the job.
+ *
+ * TODO: Should the tableName contain the table id?
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+class SessionIO(val session: SparkSession, options: CommonOptions) {
+ val log: Logger = LoggerFactory.getLogger(getClass)
+
+ val conf: Configuration = session.sparkContext.hadoopConfiguration
+ val rootHPath: HPath = new HPath(options.rootPath)
+ val fs: FileSystem = rootHPath.getFileSystem(conf)
+
+ /**
+ * Returns the Spark schema for backup data based on the Kudu Schema.
+ * Additionally handles adding the RowAction column for incremental backup/restore.
+ * @return the Spark schema for backup data.
+ */
+ def dataSchema(schema: Schema, includeRowAction: Boolean = true): StructType = {
+ var fields = SparkUtil.sparkSchema(schema).fields
+ if (includeRowAction) {
+ val changeTypeField = generateRowActionColumn(schema)
+ fields = fields ++ Seq(changeTypeField)
+ }
+ StructType(fields)
+ }
+
+ /**
+ * Generates a RowAction column and handles column name collisions.
+ * The column name can vary because it's accessed positionally.
+ */
+ private def generateRowActionColumn(schema: Schema): StructField = {
+ var columnName = "backup_row_action"
+ // If the column already exists and we need to pick an alternate column name.
+ while (schema.hasColumn(columnName)) {
+ columnName += "_"
+ }
+ StructField(columnName, ByteType)
+ }
+
+ /**
+ * @return the path to the table directory.
+ */
+ def tablePath(tableName: String): Path = {
+ Paths.get(options.rootPath).resolve(URLEncoder.encode(tableName, "UTF-8"))
+ }
+
+ /**
+ * @return the backup path for a table and time.
+ */
+ def backupPath(tableName: String, timestampMs: Long): Path = {
+ tablePath(tableName).resolve(timestampMs.toString)
+ }
+
+ /**
+ * @return the path to the metadata file within a backup path.
+ */
+ def backupMetadataPath(backupPath: Path): Path = {
+ backupPath.resolve(MetadataFileName)
+ }
+
+ /**
+ * Serializes the table metadata to Json and writes it to the metadata path.
+ * @param tableMetadata the metadata to serialize.
+ * @param metadataPath the path to write the metadata file too.
+ */
+ def writeTableMetadata(tableMetadata: TableMetadataPB, metadataPath: Path): Unit = {
+ log.error(s"Writing metadata to $metadataPath")
+ val hPath = new HPath(metadataPath.toString)
+ val out = fs.create(hPath, /* overwrite= */ false)
+ val json = JsonFormat.printer().print(tableMetadata)
+ out.write(json.getBytes(StandardCharsets.UTF_8))
+ out.flush()
+ out.close()
+ }
+
+ /**
+ * Reads an entire backup graph by reading all of the metadata files for the
+ * given table. See [[BackupGraph]] for more details.
+ * @param tableName the table to read a backup graph for.
+ * @return the full backup graph.
+ */
+ def readBackupGraph(tableName: String): BackupGraph = {
+ val backups = readTableBackups(tableName)
+ val graph = new BackupGraph()
+ backups.foreach {
+ case (path, metadata) =>
+ graph.addBackup(BackupNode(path, metadata))
+ }
+ graph
+ }
+
+ /**
+ * Reads and returns all of the metadata for a given table.
+ * @param tableName the table to read the metadata for.
+ * @return a sequence of all the paths and metadata.
+ */
+ // TODO: Also use table-id to find backups.
+ private def readTableBackups(tableName: String): Seq[(Path, TableMetadataPB)] = {
+ val hPath = new HPath(tablePath(tableName).toString)
+ val results = new mutable.ListBuffer[(Path, TableMetadataPB)]()
+ if (fs.exists(hPath)) {
+ val iter = fs.listLocatedStatus(hPath)
+ while (iter.hasNext) {
+ val file = iter.next()
+ if (file.isDirectory) {
+ val metadataHPath = new HPath(file.getPath, MetadataFileName)
+ if (fs.exists(metadataHPath)) {
+ val metadata = readTableMetadata(Paths.get(metadataHPath.toString))
+ results += ((Paths.get(file.getPath.toString), metadata))
+ }
+ }
+ }
+ }
+ log.error(s"Found ${results.size} paths in ${hPath.toString}")
+ results.toList
+ }
+
+ /**
+ * Reads and deserializes the metadata file at the given path.
+ * @param metadataPath the path to the metadata file.
+ * @return the deserialized table metadata.
+ */
+ private def readTableMetadata(metadataPath: Path): TableMetadataPB = {
+ val hPath = new HPath(metadataPath.toString)
+ val in = new InputStreamReader(fs.open(hPath), StandardCharsets.UTF_8)
+ val json = CharStreams.toString(in)
+ in.close()
+ val builder = TableMetadataPB.newBuilder()
+ JsonFormat.parser().merge(json, builder)
+ builder.build()
+ }
+}
+
+object SessionIO {
+ // The name of the metadata file within a backup directory.
+ val MetadataFileName = ".kudu-metadata.json"
+}
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/TableMetadata.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/TableMetadata.scala
index 64845b4..0d2c315 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/TableMetadata.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/TableMetadata.scala
@@ -1,19 +1,19 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
package org.apache.kudu.backup
import java.math.BigDecimal
@@ -25,7 +25,6 @@ import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder
import org.apache.kudu.ColumnSchema.CompressionAlgorithm
import org.apache.kudu.ColumnSchema.Encoding
import org.apache.kudu.ColumnTypeAttributes.ColumnTypeAttributesBuilder
-import org.apache.kudu.client.Bytes
import org.apache.kudu.client.CreateTableOptions
import org.apache.kudu.client.KuduTable
import org.apache.kudu.client.PartialRow
@@ -42,8 +41,9 @@ import scala.collection.JavaConverters._
object TableMetadata {
val MetadataFileName = ".kudu-metadata.json"
+ val MetadataVersion = 1
- def getTableMetadata(table: KuduTable, options: KuduBackupOptions): TableMetadataPB = {
+ def getTableMetadata(table: KuduTable, options: BackupOptions): TableMetadataPB = {
val columns = table.getSchema.getColumns.asScala.map { col =>
val builder = ColumnMetadataPB
.newBuilder()
@@ -65,8 +65,9 @@ object TableMetadata {
TableMetadataPB
.newBuilder()
- .setFromMs(0) // TODO: fromMs is always zero until we support incremental backups
- .setToMs(options.timestampMs)
+ .setVersion(MetadataVersion)
+ .setFromMs(options.fromMs)
+ .setToMs(options.toMs)
.setDataFormat(options.format)
.setTableName(table.getName)
.addAllColumns(columns.asJava)
diff --git a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestBackupGraph.scala b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestBackupGraph.scala
new file mode 100644
index 0000000..ca59921
--- /dev/null
+++ b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestBackupGraph.scala
@@ -0,0 +1,144 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.backup
+
+import org.apache.kudu.client.KuduTable
+import org.apache.kudu.spark.kudu._
+import org.junit.Assert._
+import org.junit.Test
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
+
+class TestBackupGraph extends KuduTestSuite {
+ val log: Logger = LoggerFactory.getLogger(getClass)
+
+ @Test
+ def testSimpleBackupGraph() {
+ val graph = new BackupGraph()
+ val full = createBackupVertex(table, 0, 1)
+ graph.addBackup(full)
+
+ // Validate a graph with only a single full backup.
+ assertEquals(1, graph.fullBackups.size)
+ assertEquals(1, graph.backupPaths.size)
+ val fullPath = graph.backupPaths.head
+ assertEquals("0", fullPath.pathString)
+
+ // Validate a graph with a couple incremental backups.
+ val inc1 = createBackupVertex(table, 1, 2)
+ graph.addBackup(inc1)
+ val inc2 = createBackupVertex(table, 2, 3)
+ graph.addBackup(inc2)
+ assertEquals(1, graph.fullBackups.size)
+ assertEquals(1, graph.backupPaths.size)
+
+ val incPath = graph.backupPaths.head
+ assertEquals("0 -> 1 -> 2", incPath.pathString)
+ }
+
+ @Test
+ def testForkingBackupGraph() {
+ val graph = new BackupGraph()
+ val full = createBackupVertex(table, 0, 1)
+ graph.addBackup(full)
+ // Duplicate fromMs of 1 creates a fork in the graph.
+ val inc1 = createBackupVertex(table, 1, 2)
+ graph.addBackup(inc1)
+ val inc2 = createBackupVertex(table, 1, 4)
+ graph.addBackup(inc2)
+ val inc3 = createBackupVertex(table, 2, 3)
+ graph.addBackup(inc3)
+ val inc4 = createBackupVertex(table, 4, 5)
+ graph.addBackup(inc4)
+
+ assertEquals(1, graph.fullBackups.size)
+ assertEquals(2, graph.backupPaths.size)
+
+ val path1 = graph.backupPaths.head
+ assertEquals("0 -> 1 -> 2", path1.pathString)
+
+ val path2 = graph.backupPaths.last
+ assertEquals("0 -> 1 -> 4", path2.pathString)
+
+ // Ensure the most recent incremental is used for a backup base and restore path.
+ assertEquals(5, graph.backupBase.metadata.getToMs)
+ assertEquals(5, graph.restorePath.toMs)
+ }
+
+ @Test
+ def testMultiFullBackupGraph() {
+ val graph = new BackupGraph()
+ val full1 = createBackupVertex(table, 0, 1)
+ graph.addBackup(full1)
+ val inc1 = createBackupVertex(table, 1, 2)
+ graph.addBackup(inc1)
+ val inc2 = createBackupVertex(table, 2, 4)
+ graph.addBackup(inc2)
+
+ // Add a second full backup.
+ val full2 = createBackupVertex(table, 0, 4)
+ graph.addBackup(full2)
+ val inc3 = createBackupVertex(table, 4, 5)
+ graph.addBackup(inc3)
+
+ assertEquals(2, graph.fullBackups.size)
+ assertEquals(2, graph.backupPaths.size)
+ val path1 = graph.backupPaths.head
+ assertEquals("0 -> 1 -> 2 -> 4", path1.pathString)
+
+ val path2 = graph.backupPaths.last
+ assertEquals("0 -> 4", path2.pathString)
+
+ // Ensure the most recent incremental is used for a backup base and restore path.
+ assertEquals(5, graph.backupBase.metadata.getToMs)
+ assertEquals(5, graph.restorePath.toMs)
+ }
+
+ @Test
+ def testFilterByTime() {
+ val graph = new BackupGraph()
+ val full1 = createBackupVertex(table, 0, 1)
+ graph.addBackup(full1)
+ val inc1 = createBackupVertex(table, 1, 2)
+ graph.addBackup(inc1)
+ val inc2 = createBackupVertex(table, 2, 4)
+ graph.addBackup(inc2)
+
+ // Add a second full backup.
+ val full2 = createBackupVertex(table, 0, 4)
+ graph.addBackup(full2)
+ val inc3 = createBackupVertex(table, 4, 5)
+ graph.addBackup(inc3)
+
+ val newGraph = graph.filterByTime(2)
+
+ assertEquals(1, newGraph.fullBackups.size)
+ assertEquals(1, newGraph.backupPaths.size)
+ }
+
+ private def createBackupVertex(table: KuduTable, fromMs: Long, toMs: Long): BackupNode = {
+ val options = new BackupOptions(
+ tables = Seq(table.getName),
+ rootPath = "foo/path",
+ "fooAddresses",
+ fromMs = fromMs,
+ toMs = toMs
+ )
+ val metadata = TableMetadata.getTableMetadata(table, options)
+ BackupNode(null, metadata)
+ }
+}
diff --git a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
index 3180e6b..a707702 100644
--- a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
+++ b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
@@ -14,25 +14,21 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-
package org.apache.kudu.backup
import java.nio.file.Files
+import java.nio.file.Path
import java.util
import com.google.common.base.Objects
import org.apache.commons.io.FileUtils
-
import org.apache.kudu.client.PartitionSchema.HashBucketSchema
import org.apache.kudu.client._
import org.apache.kudu.ColumnSchema
import org.apache.kudu.Schema
-import org.apache.kudu.Type
import org.apache.kudu.spark.kudu._
import org.apache.kudu.test.RandomUtils
import org.apache.kudu.util.DataGenerator.DataGeneratorBuilder
-import org.apache.kudu.util.DataGenerator
-import org.apache.kudu.util.DecimalUtil
import org.apache.kudu.util.HybridTimeUtil
import org.apache.kudu.util.SchemaGenerator.SchemaGeneratorBuilder
import org.junit.Assert._
@@ -40,8 +36,8 @@ import org.junit.Before
import org.junit.Test
import org.slf4j.Logger
import org.slf4j.LoggerFactory
+
import scala.collection.JavaConverters._
-import scala.util.Random
class TestKuduBackup extends KuduTestSuite {
val log: Logger = LoggerFactory.getLogger(getClass)
@@ -57,7 +53,7 @@ class TestKuduBackup extends KuduTestSuite {
def testSimpleBackupAndRestore() {
insertRows(table, 100) // Insert data into the default test table.
- backupAndRestore(tableName)
+ backupAndRestore(Seq(tableName))
val rdd =
kuduContext.kuduRDD(ss.sparkContext, s"$tableName-restore", List("key"))
@@ -70,6 +66,28 @@ class TestKuduBackup extends KuduTestSuite {
assertTrue(partitionSchemasMatch(tA.getPartitionSchema, tB.getPartitionSchema))
}
+ // TODO: Add a thorough randomized test for full and incremental backup/restore.
+ @Test
+ def TestIncrementalBackupAndRestore() {
+ insertRows(table, 100) // Insert data into the default test table.
+
+ val rootDir = Files.createTempDirectory("backup")
+ doBackup(rootDir, Seq(tableName)) // Full backup.
+ insertRows(table, 100, 100) // Insert more data.
+ doBackup(rootDir, Seq(tableName), incremental = true) // Incremental backup.
+ // Delete rows that span the full and incremental backup.
+ Range(50, 150).foreach { i =>
+ deleteRow(i)
+ }
+ doBackup(rootDir, Seq(tableName), incremental = true) // Incremental backup.
+ doRestore(rootDir, Seq(tableName)) // Restore all the backups.
+ FileUtils.deleteDirectory(rootDir.toFile)
+
+ val rdd =
+ kuduContext.kuduRDD(ss.sparkContext, s"$tableName-restore", List("key"))
+ assert(rdd.collect.length == 100)
+ }
+
@Test
def testSimpleBackupAndRestoreWithSpecialCharacters() {
// Use an Impala-style table name to verify url encoding/decoding of the table name works.
@@ -81,7 +99,7 @@ class TestKuduBackup extends KuduTestSuite {
kuduClient.createTable(impalaTableName, simpleSchema, tableOptions)
- backupAndRestore(impalaTableName)
+ backupAndRestore(Seq(impalaTableName))
val rdd = kuduContext
.kuduRDD(ss.sparkContext, s"$impalaTableName-restore", List("key"))
@@ -95,7 +113,7 @@ class TestKuduBackup extends KuduTestSuite {
val tableName = table.getName
loadRandomData(table)
- backupAndRestore(tableName)
+ backupAndRestore(Seq(tableName))
val backupRows = kuduContext.kuduRDD(ss.sparkContext, s"$tableName").collect
val restoreRows =
@@ -121,7 +139,7 @@ class TestKuduBackup extends KuduTestSuite {
insertRows(table1, numRows)
insertRows(table2, numRows)
- backupAndRestore(table1Name, table2Name)
+ backupAndRestore(Seq(table1Name, table2Name))
val rdd1 =
kuduContext.kuduRDD(ss.sparkContext, s"$table1Name-restore", List("key"))
@@ -166,7 +184,7 @@ class TestKuduBackup extends KuduTestSuite {
insertRows(table, kNumPartitions)
// Now backup and restore the table.
- backupAndRestore(tableName)
+ backupAndRestore(Seq(tableName))
}
@Test
@@ -180,7 +198,7 @@ class TestKuduBackup extends KuduTestSuite {
insertRows(table1, 100)
- backupAndRestore(tableName)
+ backupAndRestore(Seq(tableName))
}
// TODO: Move to a Schema equals/equivalent method.
@@ -269,9 +287,14 @@ class TestKuduBackup extends KuduTestSuite {
}
}
- def backupAndRestore(tableNames: String*): Unit = {
- val dir = Files.createTempDirectory("backup")
- val path = dir.toUri.toString
+ def backupAndRestore(tableNames: Seq[String]): Unit = {
+ val rootDir = Files.createTempDirectory("backup")
+ doBackup(rootDir, tableNames)
+ doRestore(rootDir, tableNames)
+ FileUtils.deleteDirectory(rootDir.toFile)
+ }
+
+ def doBackup(rootDir: Path, tableNames: Seq[String], incremental: Boolean = false): Unit = {
val nowMs = System.currentTimeMillis()
// Log the timestamps to simplify flaky debugging.
@@ -286,14 +309,19 @@ class TestKuduBackup extends KuduTestSuite {
// granularity. This means if the test runs fast enough that data is inserted with the same
// millisecond value as nowMs (after truncating the micros) the records inserted in the
// microseconds after truncation could be unread.
- val backupOptions =
- new KuduBackupOptions(tableNames, path, harness.getMasterAddressesAsString, nowMs + 1)
+ val backupOptions = new BackupOptions(
+ tableNames,
+ rootDir.toUri.toString,
+ harness.getMasterAddressesAsString,
+ nowMs + 1,
+ incremental
+ )
KuduBackup.run(backupOptions, ss)
+ }
+ def doRestore(rootDir: Path, tableNames: Seq[String]): Unit = {
val restoreOptions =
- new KuduRestoreOptions(tableNames, path, harness.getMasterAddressesAsString)
+ new RestoreOptions(tableNames, rootDir.toUri.toString, harness.getMasterAddressesAsString)
KuduRestore.run(restoreOptions, ss)
-
- FileUtils.deleteDirectory(dir.toFile)
}
}
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/Schema.java b/java/kudu-client/src/main/java/org/apache/kudu/Schema.java
index f5d746e..0be52aa 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/Schema.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/Schema.java
@@ -198,6 +198,15 @@ public class Schema {
}
/**
+ * Returns true if the column exists.
+ * @param columnName column to search for
+ * @return true if the column exists
+ */
+ public boolean hasColumn(String columnName) {
+ return this.columnsByName.containsKey(columnName);
+ }
+
+ /**
* Get the index for the provided column name.
* @param columnName column to search for
* @return an index in the schema
@@ -307,14 +316,22 @@ public class Schema {
}
/**
+ * @return true if the schema has the IS_DELETED virtual column
+ */
+ @InterfaceAudience.Private
+ @InterfaceStability.Unstable
+ public boolean hasIsDeleted() {
+ return isDeletedIndex != NO_IS_DELETED_INDEX;
+ }
+
+ /**
* @return the index of the IS_DELETED virtual column
* @throws IllegalStateException if no IS_DELETED virtual column exists
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public int getIsDeletedIndex() {
- Preconditions.checkState(isDeletedIndex != NO_IS_DELETED_INDEX,
- "Schema doesn't have an IS_DELETED columns");
+ Preconditions.checkState(hasIsDeleted(), "Schema doesn't have an IS_DELETED columns");
return isDeletedIndex;
}
}
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
index 9fa42c9..3ecc67e 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
@@ -350,16 +350,9 @@ public final class AsyncKuduScanner {
*/
private static ColumnSchema generateIsDeletedColumn(Schema schema) {
String columnName = DEFAULT_IS_DELETED_COL_NAME;
- boolean collision = true;
- while (collision) {
- try {
- // If getColumnIndex doesn't throw an IllegalArgumentException then
- // the column already exists and we need to pick an alternate IS_DELETED column name.
- schema.getColumnIndex(columnName);
- columnName += "_";
- } catch (IllegalArgumentException ex) {
- collision = false;
- }
+ // If the column already exists and we need to pick an alternate column name.
+ while (schema.hasColumn(columnName)) {
+ columnName += "_";
}
return new ColumnSchema.ColumnSchemaBuilder(columnName, Type.BOOL)
.wireType(Common.DataType.IS_DELETED)
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RowResult.java b/java/kudu-client/src/main/java/org/apache/kudu/client/RowResult.java
index 7f80670..0d8d502 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/RowResult.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RowResult.java
@@ -597,6 +597,15 @@ public class RowResult {
}
/**
+ * @return true if the RowResult has the IS_DELETED virtual column
+ */
+ @InterfaceAudience.Private
+ @InterfaceStability.Unstable
+ public boolean hasIsDeleted() {
+ return schema.hasIsDeleted();
+ }
+
+ /**
* @return the value of the IS_DELETED virtual column
* @throws IllegalStateException if no IS_DELETED virtual column exists
*/
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/RowConverter.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/RowConverter.scala
index e9f16e4..6df627d 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/RowConverter.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/RowConverter.scala
@@ -34,10 +34,14 @@ import org.apache.yetus.audience.InterfaceStability
class RowConverter(kuduSchema: Schema, schema: StructType, ignoreNull: Boolean) {
private val typeConverter = CatalystTypeConverters.createToScalaConverter(schema)
- private val indices: Array[(Int, Int)] = schema.fields.zipWithIndex.map({
+ private val indices: Array[(Int, Int)] = schema.fields.zipWithIndex.flatMap {
case (field, sparkIdx) =>
- sparkIdx -> kuduSchema.getColumnIndex(field.name)
- })
+ // Support Spark schemas that have more columns than the Kudu table by
+ // ignoring missing Kudu columns.
+ if (kuduSchema.hasColumn(field.name)) {
+ Some(sparkIdx -> kuduSchema.getColumnIndex(field.name))
+ } else None
+ }
/**
* Converts a Spark internalRow to a Spark Row.
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala
index 2f13ed2..4e139e2 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala
@@ -154,10 +154,11 @@ trait KuduTestSuite extends JUnitSuite {
def insertRows(
targetTable: KuduTable,
- rowCount: Integer): IndexedSeq[(Int, Int, String, Long)] = {
+ rowCount: Int,
+ startIndex: Int = 0): IndexedSeq[(Int, Int, String, Long)] = {
val kuduSession = kuduClient.newSession()
- val rows = Range(0, rowCount).map { i =>
+ val rows = Range(startIndex, rowCount + startIndex).map { i =>
val insert = targetTable.newInsert
val row = insert.getRow
row.addInt(0, i)