You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2019/04/16 21:37:19 UTC

[kudu] branch master updated: [backup] Add initial incremental backup/restore support

This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a85813  [backup] Add initial incremental backup/restore support
8a85813 is described below

commit 8a85813913b9c258cbaadb0b65cef508bdee30fd
Author: Grant Henke <gr...@apache.org>
AuthorDate: Mon Mar 25 14:10:30 2019 -0500

    [backup] Add initial incremental backup/restore support
    
    This patch adds initial support for incremental backup
    and restore.
    
    A high level overview of the changes in this patch is:
    - Added a version to the TableMetadata for future use.
    - Broke out io/layout logic to a SessionIO class so it
       could be easily shared.
    - Unified the BackupOptions and RestoreOptions
       so common options could be shared.
    - Introduced a BackupGraph class to handle chaining
       together backups for backup and restore jobs.
    - Enhanced the BackupRDD to output an additional
       RowAction byte column on backup and restore.
    - Enhanced the restore job to use the new RowAction
       column and translate them into operations for
       incremental restore jobs.
    - Added the ability to restore to a given “time” on a
       per backup basis.
    - Added example usage docs to the Backup and Restore
       classes.
    - Added hasIsDeleted to RowResult and hasColumn to
       Schema.
    
    Change-Id: I50b21d921fefbf4d7e8bd1766285258e8014d890
    Reviewed-on: http://gerrit.cloudera.org:8080/12879
    Tested-by: Kudu Jenkins
    Reviewed-by: Mike Percy <mp...@apache.org>
---
 java/kudu-backup/src/main/protobuf/backup.proto    |  18 +-
 .../scala/org/apache/kudu/backup/BackupGraph.scala | 197 +++++++++++++++++++
 .../scala/org/apache/kudu/backup/KuduBackup.scala  |  87 +++++----
 .../org/apache/kudu/backup/KuduBackupOptions.scala | 113 -----------
 .../org/apache/kudu/backup/KuduBackupRDD.scala     |  91 +++++----
 .../scala/org/apache/kudu/backup/KuduRestore.scala | 141 ++++++++------
 .../apache/kudu/backup/KuduRestoreOptions.scala    |  89 ---------
 .../scala/org/apache/kudu/backup/Options.scala     | 209 +++++++++++++++++++++
 .../scala/org/apache/kudu/backup/RowAction.java    |  69 +++++++
 .../scala/org/apache/kudu/backup/SessionIO.scala   | 193 +++++++++++++++++++
 .../org/apache/kudu/backup/TableMetadata.scala     |  41 ++--
 .../org/apache/kudu/backup/TestBackupGraph.scala   | 144 ++++++++++++++
 .../org/apache/kudu/backup/TestKuduBackup.scala    |  68 +++++--
 .../src/main/java/org/apache/kudu/Schema.java      |  21 ++-
 .../org/apache/kudu/client/AsyncKuduScanner.java   |  13 +-
 .../java/org/apache/kudu/client/RowResult.java     |   9 +
 .../org/apache/kudu/spark/kudu/RowConverter.scala  |  10 +-
 .../org/apache/kudu/spark/kudu/KuduTestSuite.scala |   5 +-
 18 files changed, 1123 insertions(+), 395 deletions(-)

diff --git a/java/kudu-backup/src/main/protobuf/backup.proto b/java/kudu-backup/src/main/protobuf/backup.proto
index 372c665..6de3b3e 100644
--- a/java/kudu-backup/src/main/protobuf/backup.proto
+++ b/java/kudu-backup/src/main/protobuf/backup.proto
@@ -35,6 +35,7 @@ message ColumnTypeAttributesMetadataPB {
 
 // Maps to the ColumnSchema class.
 // The fields are effectively 1 to 1 mappings of those in ColumnSchema.
+// TODO: How do we handle column additions?
 message ColumnMetadataPB {
   string name = 1;
   string type = 2;
@@ -98,18 +99,21 @@ message PartitionMetadataPB {
 // so we can understand and create a table that matches the backed up
 // table on restore.
 message TableMetadataPB {
+  // A version used to handle any future format/layout changes.
+  int32 version = 1;
   // The starting point of a backup. A UNIX timestamp in milliseconds since the epoch.
-  int64 from_ms = 1;
+  // If from_ms is 0, this is a full backup.
+  int64 from_ms = 2;
   // The end point of a backup. A UNIX timestamp in milliseconds since the epoch.
-  int64 to_ms = 2;
+  int64 to_ms = 3;
   // The file format used to store the data.
-  string data_format = 3;
+  string data_format = 4;
   // The name of the table.
-  string table_name = 4;
+  string table_name = 5;
   // The replication factor of this table.
-  int32 num_replicas = 5;
+  int32 num_replicas = 6;
   // The metadata for the table's columns.
-  repeated ColumnMetadataPB columns = 6;
+  repeated ColumnMetadataPB columns = 7;
   // The metadata for the table's partitions.
-  PartitionMetadataPB partitions = 7;
+  PartitionMetadataPB partitions = 8;
 }
\ No newline at end of file
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/BackupGraph.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/BackupGraph.scala
new file mode 100644
index 0000000..21c7ff9
--- /dev/null
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/BackupGraph.scala
@@ -0,0 +1,197 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.backup
+
+import java.nio.file.Path
+
+import org.apache.kudu.backup.Backup.TableMetadataPB
+import org.apache.yetus.audience.InterfaceAudience
+import org.apache.yetus.audience.InterfaceStability
+
+import scala.collection.mutable
+
+/**
+ * A directed weighted graph of backups used to pick the optimal series of backups and restores.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+class BackupGraph {
+  // Index of backup.fromMs -> backup for use in chaining backups together.
+  private val adjacencyList = mutable.Map[Long, mutable.ListBuffer[BackupNode]]()
+
+  // A full backup has FromMs of 0.
+  private val FullBackupFromMs = 0
+
+  /**
+   * Add a backup to the graph.
+   * @param backup the backup to add.
+   */
+  def addBackup(backup: BackupNode): Unit = {
+    // Add a weighted edge with the backup.
+    addEdge(backup)
+  }
+
+  private def addEdge(backup: BackupNode): Unit = {
+    val adjacentVertices =
+      adjacencyList.getOrElse(backup.metadata.getFromMs, mutable.ListBuffer[BackupNode]())
+    adjacentVertices += backup
+    adjacencyList.put(backup.metadata.getFromMs, adjacentVertices)
+  }
+
+  /**
+   * @return true if the graph has a full backup.
+   */
+  def hasFullBackup: Boolean = fullBackups.nonEmpty
+
+  /**
+   * @return all the full backups in the graph.
+   */
+  def fullBackups: Seq[BackupNode] = {
+    adjacencyList.getOrElse(FullBackupFromMs, Seq())
+  }
+
+  /**
+   * @return the most recent full backup.
+   * @throws IllegalStateException if no full backup exists.
+   */
+  def mostRecentFull: BackupNode = {
+    if (!hasFullBackup) throw new IllegalStateException("No full backup exists")
+    fullBackups.maxBy(_.metadata.getToMs)
+  }
+
+  /**
+   * @return all backup paths in the graph.
+   */
+  def backupPaths: Seq[BackupPath] = {
+    allPaths(FullBackupFromMs, List()).map(BackupPath)
+  }
+
+  private def allPaths(fromMs: Long, path: List[BackupNode]): List[List[BackupNode]] = {
+    if (!adjacencyList.contains(fromMs)) {
+      List(path)
+    } else {
+      adjacencyList(fromMs).flatMap { node =>
+        allPaths(node.metadata.getToMs, path ++ List(node))
+      }.toList
+    }
+  }
+
+  /**
+   * Returns the backup that should be used as the base for the next backup.
+   *
+   * The logic for picking this backup is as follows:
+   *
+   *   1. Pick the paths with the most recent full backup.
+   *   2. If there are multiple paths, pick the path with the most recent partial backup.
+   *   3. If there are multiple paths, pick the path with the lowest weight.
+   *
+   * This allows concurrent full backups to be taken (or generated by compaction)
+   * while also taking incremental backups.
+   *
+   * While a full backup is running incremental backups will continue to build
+   * off the chain from the previous full. When the new full completes, the
+   * next incremental backup will use that full its "current" chain.
+   *
+   * @throws IllegalStateException if no full backup exists.
+   */
+  def backupBase: BackupNode = {
+    // 1. Pick the paths with the most recent full backup.
+    val recentFulls = backupPaths.filter(_.fullBackup == mostRecentFull)
+
+    // 2. If there are multiple paths, pick the path with the most recent partial backup.
+    val maxToMs = recentFulls.maxBy(_.toMs).toMs
+    val recentPaths = recentFulls.filter(_.toMs == maxToMs)
+
+    // 3. If there are multiple paths, pick the path with the lowest weight.
+    recentPaths.minBy(_.weight).lastBackup
+  }
+
+  /**
+   * Returns a sequence of backups that should be used to restore.
+   *
+   * The logic for picking this path is as follows:
+   *
+   *   1. Pick the path with the most recent backup.
+   *   2. If there are multiple paths, pick the path with the lowest weight.
+   *
+   * This ensures we always restore the most current state of the data while
+   * also picking the most efficient path (likely a result of compaction).
+   *
+   * @throws IllegalStateException if no full backup exists.
+   */
+  def restorePath: BackupPath = {
+    //  1. Pick the path with the most recent backup.
+    val maxToMs = backupPaths.maxBy(_.toMs).toMs
+    val recentPaths = backupPaths.filter(_.toMs == maxToMs)
+
+    // 2. If there are multiple paths, pick the path with the lowest weight.
+    recentPaths.minBy(_.weight)
+  }
+
+  /**
+   * Returns a new BackupGraph that represents the graph including only nodes with a ToMS equal
+   * to or less than the specified time.
+   * @param timeMs the time to filter by.
+   * @return
+   */
+  def filterByTime(timeMs: Long): BackupGraph = {
+    val result = new BackupGraph()
+    val distinctBackups = adjacencyList.values.flatten.toSet
+    distinctBackups.filter(_.metadata.getToMs <= timeMs).foreach(result.addBackup)
+    result
+  }
+}
+
+/**
+ * Node class to represent nodes in the backup graph.
+ */
+case class BackupNode(path: Path, metadata: TableMetadataPB) {
+
+  /**
+   * @return The weight/cost of this Node.
+   */
+  def weight: Int = {
+    // Full backups have a weight of 0 and partial backups have a weight of 1.
+    // TODO: Use the size of a partial backup to contribute to weight.
+    if (metadata.getFromMs == 0) 0 else 1
+  }
+}
+
+/**
+ * A backup path is a full backup with a series of incremental backups.
+ */
+case class BackupPath(backups: Seq[BackupNode]) {
+
+  def fullBackup: BackupNode = backups.head
+
+  def lastBackup: BackupNode = backups.last
+
+  /**
+   * @return the toMs for the entire path.
+   */
+  def toMs: Long = backups.last.metadata.getToMs
+
+  /**
+   * @return the weight/cost of the path.
+   */
+  def weight: Int = backups.map(_.weight).sum
+
+  /**
+   * @return A string useful for debugging the path.
+   */
+  def pathString: String = backups.map(_.metadata.getFromMs).mkString(" -> ")
+}
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala
index 3280c0d..a8d89f6 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala
@@ -16,16 +16,7 @@
 // under the License.
 package org.apache.kudu.backup
 
-import java.net.URLEncoder
-import java.nio.charset.StandardCharsets
-import java.nio.file.Path
-import java.nio.file.Paths
-
-import com.google.protobuf.util.JsonFormat
-import org.apache.hadoop.fs.{Path => HPath}
-import org.apache.kudu.backup.Backup.TableMetadataPB
 import org.apache.kudu.spark.kudu.KuduContext
-import org.apache.kudu.spark.kudu.SparkUtil._
 import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql.SparkSession
 import org.apache.yetus.audience.InterfaceAudience
@@ -33,57 +24,71 @@ import org.apache.yetus.audience.InterfaceStability
 import org.slf4j.Logger
 import org.slf4j.LoggerFactory
 
+/**
+ * The main class for a Kudu backup spark job.
+ *
+ * Example Usage:
+ *   spark-submit --class org.apache.kudu.backup.KuduBackup kudu-backup2_2.11-*.jar \
+ *     --kuduMasterAddresses master1-host,master-2-host,master-3-host \
+ *     --rootPath hdfs:///kudu/backup/path \
+ *     my_kudu_table
+ */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 object KuduBackup {
   val log: Logger = LoggerFactory.getLogger(getClass)
 
-  def run(options: KuduBackupOptions, session: SparkSession): Unit = {
+  def run(options: BackupOptions, session: SparkSession): Unit = {
+    log.info(s"Backing up to root path: ${options.rootPath}")
     val context =
       new KuduContext(
         options.kuduMasterAddresses,
         session.sparkContext
       )
-    val path = options.path
-    log.info(s"Backing up to path: $path")
-
+    val io = new SessionIO(session, options)
     // TODO: Make parallel so each table isn't process serially?
-    options.tables.foreach { t =>
-      val table = context.syncClient.openTable(t)
-      val tablePath = Paths.get(path).resolve(URLEncoder.encode(t, "UTF-8"))
+    options.tables.foreach { tableName =>
+      var tableOptions = options.copy() // Copy the options so we can modify them for the table.
+      val table = context.syncClient.openTable(tableName)
+      val backupPath = io.backupPath(tableName, tableOptions.toMs)
+      val metadataPath = io.backupMetadataPath(backupPath)
+      log.info(s"Backing up table $tableName to path: $backupPath")
 
-      val rdd = new KuduBackupRDD(table, options, context, session.sparkContext)
+      // Unless we are forcing a full backup or a fromMs was set, find the previous backup and
+      // use the `to_ms` metadata as the `from_ms` time for this backup.
+      if (!tableOptions.forceFull || tableOptions.fromMs != 0) {
+        log.info("No fromMs option set, looking for a previous backup.")
+        val graph = io.readBackupGraph(tableName)
+        if (graph.hasFullBackup) {
+          val base = graph.backupBase
+          log.info(s"Setting fromMs to ${base.metadata.getToMs} from backup in path: ${base.path}")
+          tableOptions = tableOptions.copy(fromMs = base.metadata.getToMs)
+        } else {
+          log.info("No full backup was found. Starting a full backup.")
+          tableOptions = tableOptions.copy(fromMs = 0)
+        }
+      }
+      val rdd = new KuduBackupRDD(table, tableOptions, context, session.sparkContext)
       val df =
-        session.sqlContext.createDataFrame(rdd, sparkSchema(table.getSchema))
-      // TODO: Prefix path with the time? Maybe a backup "name" parameter defaulted to something?
-      // TODO: Take parameter for the SaveMode.
+        session.sqlContext
+          .createDataFrame(rdd, io.dataSchema(table.getSchema, options.isIncremental))
+
+      // Write the data to the backup path.
+      // The backup path contains the timestampMs and should not already exist.
       val writer = df.write.mode(SaveMode.ErrorIfExists)
-      // TODO: Restrict format option.
-      // TODO: We need to cleanup partial output on failure otherwise.
-      // retries of the entire job will fail because the file already exists.
-      writer.format(options.format).save(tablePath.toString)
+      writer
+        .format(tableOptions.format)
+        .save(backupPath.toString)
 
-      val tableMetadata = TableMetadata.getTableMetadata(table, options)
-      writeTableMetadata(tableMetadata, tablePath, session)
+      // Generate and output the new metadata for this table.
+      // The existence of metadata indicates this backup was successful.
+      val tableMetadata = TableMetadata.getTableMetadata(table, tableOptions)
+      io.writeTableMetadata(tableMetadata, metadataPath)
     }
   }
 
-  private def writeTableMetadata(
-      metadata: TableMetadataPB,
-      path: Path,
-      session: SparkSession): Unit = {
-    val conf = session.sparkContext.hadoopConfiguration
-    val hPath = new HPath(path.resolve(TableMetadata.MetadataFileName).toString)
-    val fs = hPath.getFileSystem(conf)
-    val out = fs.create(hPath, /* overwrite= */ false)
-    val json = JsonFormat.printer().print(metadata)
-    out.write(json.getBytes(StandardCharsets.UTF_8))
-    out.flush()
-    out.close()
-  }
-
   def main(args: Array[String]): Unit = {
-    val options = KuduBackupOptions
+    val options = BackupOptions
       .parse(args)
       .getOrElse(throw new IllegalArgumentException("could not parse the arguments"))
 
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupOptions.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupOptions.scala
deleted file mode 100644
index a1d7519..0000000
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupOptions.scala
+++ /dev/null
@@ -1,113 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.apache.kudu.backup
-
-import java.net.InetAddress
-
-import org.apache.kudu.client.AsyncKuduClient
-import org.apache.yetus.audience.InterfaceAudience
-import org.apache.yetus.audience.InterfaceStability
-import scopt.OptionParser
-
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-case class KuduBackupOptions(
-    tables: Seq[String],
-    path: String,
-    kuduMasterAddresses: String = InetAddress.getLocalHost.getCanonicalHostName,
-    timestampMs: Long = System.currentTimeMillis(),
-    format: String = KuduBackupOptions.DefaultFormat,
-    scanBatchSize: Int = KuduBackupOptions.DefaultScanBatchSize,
-    scanRequestTimeoutMs: Long = KuduBackupOptions.DefaultScanRequestTimeoutMs,
-    scanPrefetching: Boolean = KuduBackupOptions.DefaultScanPrefetching,
-    keepAlivePeriodMs: Long = KuduBackupOptions.defaultKeepAlivePeriodMs)
-
-object KuduBackupOptions {
-  val DefaultFormat: String = "parquet"
-  val DefaultScanBatchSize: Int = 1024 * 1024 * 20 // 20 MiB
-  val DefaultScanRequestTimeoutMs: Long =
-    AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS // 30 seconds
-  val DefaultScanPrefetching
-    : Boolean = false // TODO: Add a test per KUDU-1260 and enable by default?
-  val defaultKeepAlivePeriodMs: Long = AsyncKuduClient.DEFAULT_KEEP_ALIVE_PERIOD_MS
-
-  // TODO: clean up usage output.
-  // TODO: timeout configurations.
-  private val parser: OptionParser[KuduBackupOptions] =
-    new OptionParser[KuduBackupOptions]("KuduBackup") {
-      opt[String]("path")
-        .action((v, o) => o.copy(path = v))
-        .text("The root path to output backup data. Accepts any Spark compatible path.")
-        .optional()
-
-      opt[String]("kuduMasterAddresses")
-        .action((v, o) => o.copy(kuduMasterAddresses = v))
-        .text("Comma-separated addresses of Kudu masters.")
-        .optional()
-
-      opt[Long]("timestampMs")
-        .action((v, o) => o.copy(timestampMs = v))
-        // TODO: Document the limitations based on cluster configuration (ex: ancient history watermark).
-        .text("A UNIX timestamp in milliseconds since the epoch to execute scans at.")
-        .optional()
-
-      opt[String]("format")
-        .action((v, o) => o.copy(format = v))
-        .text("The file format to use when writing the data.")
-        .optional()
-
-      opt[Int]("scanBatchSize")
-        .action((v, o) => o.copy(scanBatchSize = v))
-        .text("The maximum number of bytes returned by the scanner, on each batch.")
-        .optional()
-
-      opt[Int]("scanRequestTimeoutMs")
-        .action((v, o) => o.copy(scanRequestTimeoutMs = v))
-        .text("Sets how long in milliseconds each scan request to a server can last.")
-        .optional()
-
-      opt[Unit]("scanPrefetching")
-        .action((_, o) => o.copy(scanPrefetching = true))
-        .text("An experimental flag to enable pre-fetching data.")
-        .optional()
-
-      opt[Long]("keepAlivePeriodMs")
-        .action((v, o) => o.copy(keepAlivePeriodMs = v))
-        .text(
-          "Sets the period at which to send keep-alive requests to the tablet server to ensure" +
-            " that scanners do not time out")
-        .optional()
-
-      arg[String]("<table>...")
-        .unbounded()
-        .action((v, o) => o.copy(tables = o.tables :+ v))
-        .text("A list of tables to be backed up.")
-    }
-
-  /**
-   * Parses the passed arguments into Some[KuduBackupOptions].
-   *
-   * If the arguments are bad, an error message is displayed
-   * and None is returned.
-   *
-   * @param args The arguments to parse.
-   * @return Some[KuduBackupOptions] if parsing was successful, None if not.
-   */
-  def parse(args: Seq[String]): Option[KuduBackupOptions] = {
-    parser.parse(args, KuduBackupOptions(Seq(), null))
-  }
-}
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala
index 55f7f98..544e7b2 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala
@@ -1,19 +1,19 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
 package org.apache.kudu.backup
 
 import java.util.concurrent.TimeUnit
@@ -37,39 +37,46 @@ import scala.collection.JavaConverters._
 @InterfaceStability.Unstable
 class KuduBackupRDD private[kudu] (
     @transient val table: KuduTable,
-    @transient val options: KuduBackupOptions,
+    @transient val options: BackupOptions,
     val kuduContext: KuduContext,
     @transient val sc: SparkContext)
     extends RDD[Row](sc, Nil) {
 
   // Defined here because the options are transient.
-  private val keepAlivePeriodMs = options.keepAlivePeriodMs
+  val incremental: Boolean = options.isIncremental
 
   // TODO: Split large tablets into smaller scan tokens?
   override protected def getPartitions: Array[Partition] = {
     val client = kuduContext.syncClient
 
-    // Set a hybrid time for the scan to ensure application consistency.
-    val timestampMicros = TimeUnit.MILLISECONDS.toMicros(options.timestampMs)
-    val hybridTime =
-      HybridTimeUtil.physicalAndLogicalToHTTimestamp(timestampMicros, 0)
-
-    // Create the scan tokens for each partition.
-    val tokens = client
+    val builder = client
       .newScanTokenBuilder(table)
       .cacheBlocks(false)
-      // TODO: Use fault tolerant scans to get mostly.
-      // ordered results when KUDU-2466 is fixed.
+      // TODO: Use fault tolerant scans to get mostly ordered results when KUDU-2466 is fixed.
       // .setFaultTolerant(true)
       .replicaSelection(ReplicaSelection.CLOSEST_REPLICA)
       .readMode(ReadMode.READ_AT_SNAPSHOT)
-      .snapshotTimestampRaw(hybridTime)
       .batchSizeBytes(options.scanBatchSize)
       .scanRequestTimeout(options.scanRequestTimeoutMs)
       .prefetching(options.scanPrefetching)
       .keepAlivePeriodMs(options.keepAlivePeriodMs)
-      .build()
 
+    // Set a hybrid time for the scan to ensure application consistency.
+    val toMicros = TimeUnit.MILLISECONDS.toMicros(options.toMs)
+    val toHTT =
+      HybridTimeUtil.physicalAndLogicalToHTTimestamp(toMicros, 0)
+
+    if (incremental) {
+      val fromMicros = TimeUnit.MILLISECONDS.toMicros(options.fromMs)
+      val fromHTT =
+        HybridTimeUtil.physicalAndLogicalToHTTimestamp(fromMicros, 0)
+      builder.diffScan(fromHTT, toHTT)
+    } else {
+      builder.snapshotTimestampRaw(toHTT)
+    }
+
+    // Create the scan tokens for each partition.
+    val tokens = builder.build()
     tokens.asScala.zipWithIndex.map {
       case (token, index) =>
         // TODO: Support backups from any replica or followers only.
@@ -85,12 +92,11 @@ class KuduBackupRDD private[kudu] (
   override def compute(part: Partition, taskContext: TaskContext): Iterator[Row] = {
     val client: KuduClient = kuduContext.syncClient
     val partition: KuduBackupPartition = part.asInstanceOf[KuduBackupPartition]
-    // TODO: Get deletes and updates for incremental backups.
     val scanner =
       KuduScanToken.deserializeIntoScanner(partition.scanToken, client)
     // We don't store the RowResult so we can enable the reuseRowResult optimization.
     scanner.setReuseRowResult(true)
-    new RowIterator(scanner, kuduContext)
+    new RowIterator(scanner, kuduContext, incremental)
   }
 
   override def getPreferredLocations(partition: Partition): Seq[String] = {
@@ -109,7 +115,10 @@ private case class KuduBackupPartition(index: Int, scanToken: Array[Byte], locat
  * that takes the job partitions and task context and expects to return an Iterator[Row].
  * This implementation facilitates that.
  */
-private class RowIterator(private val scanner: KuduScanner, val kuduContext: KuduContext)
+private class RowIterator(
+    private val scanner: KuduScanner,
+    val kuduContext: KuduContext,
+    val incremental: Boolean)
     extends Iterator[Row] {
 
   private val scannerIterator = scanner.iterator()
@@ -130,11 +139,25 @@ private class RowIterator(private val scanner: KuduScanner, val kuduContext: Kud
   // sort of zero copy/object pool pattern for performance (we could use that here).
   override def next(): Row = {
     val rowResult = scannerIterator.next()
-    val columnCount = rowResult.getColumnProjection.getColumnCount
-    val columns = Array.ofDim[Any](columnCount)
+    val fieldCount = rowResult.getColumnProjection.getColumnCount
+    // If this is an incremental backup, the last column is the is_deleted column.
+    val columnCount = if (incremental) fieldCount - 1 else fieldCount
+    val columns = Array.ofDim[Any](fieldCount)
     for (i <- 0 until columnCount) {
       columns(i) = rowResult.getObject(i)
     }
+    // If this is an incremental backup, translate the is_deleted column into
+    // the "change_type" column as the last field.
+    if (incremental) {
+      val rowAction = if (rowResult.isDeleted) {
+        RowAction.DELETE.getValue
+      } else {
+        // If the row is not deleted, we do not know if it was inserted or updated,
+        // so we use upsert.
+        RowAction.UPSERT.getValue
+      }
+      columns(fieldCount - 1) = rowAction
+    }
     Row.fromSeq(columns)
   }
 }
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
index b8a94b7..50517a9 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
@@ -16,82 +16,115 @@
 // under the License.
 package org.apache.kudu.backup
 
-import java.io.InputStreamReader
-import java.net.URLEncoder
-import java.nio.charset.StandardCharsets
-import java.nio.file.Path
-import java.nio.file.Paths
-
-import com.google.common.io.CharStreams
-import com.google.protobuf.util.JsonFormat
-import org.apache.hadoop.fs.{Path => HPath}
-
 import org.apache.kudu.backup.Backup.TableMetadataPB
 import org.apache.kudu.client.AlterTableOptions
+import org.apache.kudu.client.SessionConfiguration.FlushMode
 import org.apache.kudu.spark.kudu.KuduContext
-import org.apache.kudu.spark.kudu.KuduWriteOptions
+import org.apache.kudu.spark.kudu.RowConverter
+import org.apache.parquet.hadoop.ParquetInputFormat
 import org.apache.spark.sql.SparkSession
 import org.apache.yetus.audience.InterfaceAudience
 import org.apache.yetus.audience.InterfaceStability
 import org.slf4j.Logger
 import org.slf4j.LoggerFactory
 
+/**
+ * The main class for a Kudu restore spark job.
+ *
+ * Example Usage:
+ *   spark-submit --class org.apache.kudu.backup.KuduRestore kudu-backup2_2.11-*.jar \
+ *     --kuduMasterAddresses master1-host,master-2-host,master-3-host \
+ *     --rootPath hdfs:///kudu/backup/path \
+ *     my_kudu_table
+ */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 object KuduRestore {
   val log: Logger = LoggerFactory.getLogger(getClass)
 
-  def run(options: KuduRestoreOptions, session: SparkSession): Unit = {
+  def run(options: RestoreOptions, session: SparkSession): Unit = {
+    log.info(s"Restoring from path: ${options.rootPath}")
     val context =
-      new KuduContext(options.kuduMasterAddresses, session.sparkContext)
-    val path = options.path
-    log.info(s"Restoring from path: $path")
+      new KuduContext(
+        options.kuduMasterAddresses,
+        session.sparkContext
+      )
+    val io = new SessionIO(session, options)
 
     // TODO: Make parallel so each table isn't processed serially.
-    options.tables.foreach { t =>
-      val tableEncoded = URLEncoder.encode(t, "UTF-8")
-      val tablePath = Paths.get(path).resolve(tableEncoded)
-      val metadataPath = getMetadataPath(tableEncoded, options)
-      val metadata = readTableMetadata(metadataPath, session)
-      val restoreName = s"${metadata.getTableName}${options.tableSuffix}"
-      val table =
-        if (options.createTables) {
-          createTableRangePartitionByRangePartition(restoreName, metadata, context)
-        } else {
-          context.syncClient.openTable(restoreName)
+    options.tables.foreach { tableName =>
+      // TODO: Consider an option to enforce an exact toMS match.
+      val graph = io.readBackupGraph(tableName).filterByTime(options.timestampMs)
+      graph.restorePath.backups.foreach { backup =>
+        log.info(s"Restoring table $tableName from path: ${backup.path}")
+        val isFullRestore = backup.metadata.getFromMs == 0
+        val restoreName = s"${backup.metadata.getTableName}${options.tableSuffix}"
+        // TODO: Store the full metadata to compare/validate for each applied partial.
+
+        // On the full restore we may need to create the table.
+        if (isFullRestore) {
+          if (options.createTables) {
+            log.info(s"Creating restore table $restoreName")
+            createTableRangePartitionByRangePartition(restoreName, backup.metadata, context)
+          }
         }
+        val table = context.syncClient.openTable(restoreName)
+        val restoreSchema = io.dataSchema(table.getSchema)
+        val rowActionCol = restoreSchema.fields.last.name
 
-      // TODO: Restrict format option.
-      val df = session.sqlContext.read
-        .format(metadata.getDataFormat)
-        .load(tablePath.toString)
-      val writeOptions = new KuduWriteOptions(ignoreDuplicateRowErrors = false, ignoreNull = false)
-      // TODO: Use client directly for more control?
-      // (session timeout, consistency mode, flush interval, mutation buffer space)
+        // TODO: Restrict format option.
+        var data = session.sqlContext.read
+          .format(backup.metadata.getDataFormat)
+          .schema(restoreSchema)
+          .load(backup.path.toString)
+          // Default the the row action column with a value of "UPSERT" so that the
+          // rows from a full backup, which don't have a row action, are upserted.
+          .na
+          .fill(RowAction.UPSERT.getValue, Seq(rowActionCol))
 
-      // Upsert so that Spark task retries do not fail.
-      context.upsertRows(df, restoreName, writeOptions)
+        // TODO: Expose more configuration options:
+        //   (session timeout, consistency mode, flush interval, mutation buffer space)
+        data.queryExecution.toRdd.foreachPartition { internalRows =>
+          val table = context.syncClient.openTable(restoreName)
+          val converter = new RowConverter(table.getSchema, restoreSchema, false)
+          val session = context.syncClient.newSession
+          session.setFlushMode(FlushMode.AUTO_FLUSH_BACKGROUND)
+          try {
+            for (internalRow <- internalRows) {
+              // Convert the InternalRows to Rows.
+              // This avoids any corruption as reported in SPARK-26880.
+              val row = converter.toRow(internalRow)
+              // Get the operation type based on the row action column.
+              // This will always be the last column in the row.
+              val rowActionValue = row.getByte(row.length - 1)
+              val rowAction = RowAction.fromValue(rowActionValue)
+              // Generate an operation based on the row action.
+              val operation = rowAction match {
+                case RowAction.UPSERT => table.newUpsert()
+                case RowAction.DELETE => table.newDelete()
+                case _ => throw new IllegalStateException(s"Unsupported RowAction: $rowAction")
+              }
+              // Convert the Spark row to a partial row and set it on the operation.
+              val partialRow = converter.toPartialRow(row)
+              operation.setRow(partialRow)
+              session.apply(operation)
+            }
+          } finally {
+            session.close()
+          }
+          // Fail the task if there are any errors.
+          val errorCount = session.getPendingErrors.getRowErrors.length
+          if (errorCount > 0) {
+            val errors =
+              session.getPendingErrors.getRowErrors.take(5).map(_.getErrorStatus).mkString
+            throw new RuntimeException(
+              s"failed to write $errorCount rows from DataFrame to Kudu; sample errors: $errors")
+          }
+        }
+      }
     }
   }
 
-  private def getMetadataPath(tableName: String, options: KuduRestoreOptions): Path = {
-    val rootPath =
-      if (options.metadataPath.isEmpty) options.path else options.metadataPath
-    Paths.get(rootPath).resolve(tableName)
-  }
-
-  private def readTableMetadata(path: Path, session: SparkSession): TableMetadataPB = {
-    val conf = session.sparkContext.hadoopConfiguration
-    val hPath = new HPath(path.resolve(TableMetadata.MetadataFileName).toString)
-    val fs = hPath.getFileSystem(conf)
-    val in = new InputStreamReader(fs.open(hPath), StandardCharsets.UTF_8)
-    val json = CharStreams.toString(in)
-    in.close()
-    val builder = TableMetadataPB.newBuilder()
-    JsonFormat.parser().merge(json, builder)
-    builder.build()
-  }
-
   // Kudu isn't good at creating a lot of tablets at once, and by default tables may only be created
   // with at most 60 tablets. Additional tablets can be added later by adding range partitions. So,
   // to restore tables with more tablets than that, we need to create the table piece-by-piece. This
@@ -121,7 +154,7 @@ object KuduRestore {
   }
 
   def main(args: Array[String]): Unit = {
-    val options = KuduRestoreOptions
+    val options = RestoreOptions
       .parse(args)
       .getOrElse(throw new IllegalArgumentException("could not parse the arguments"))
 
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestoreOptions.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestoreOptions.scala
deleted file mode 100644
index 66de017..0000000
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestoreOptions.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.apache.kudu.backup
-
-import java.net.InetAddress
-
-import org.apache.yetus.audience.InterfaceAudience
-import org.apache.yetus.audience.InterfaceStability
-import scopt.OptionParser
-
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-case class KuduRestoreOptions(
-    tables: Seq[String],
-    path: String,
-    kuduMasterAddresses: String = InetAddress.getLocalHost.getCanonicalHostName,
-    tableSuffix: String = KuduRestoreOptions.DefaultTableSuffix,
-    createTables: Boolean = KuduRestoreOptions.DefaultCreateTables,
-    metadataPath: String = "")
-
-object KuduRestoreOptions {
-  val DefaultTableSuffix: String = "-restore"
-  val DefaultCreateTables: Boolean = true
-
-  // TODO: clean up usage output.
-  // TODO: timeout configurations.
-  private val parser: OptionParser[KuduRestoreOptions] =
-    new OptionParser[KuduRestoreOptions]("KuduRestore") {
-      opt[String]("path")
-        .action((v, o) => o.copy(path = v))
-        .text("The root path to the backup data. Accepts any Spark compatible path.")
-        .optional()
-
-      opt[String]("kuduMasterAddresses")
-        .action((v, o) => o.copy(kuduMasterAddresses = v))
-        .text("Comma-separated addresses of Kudu masters.")
-        .optional()
-
-      opt[Boolean]("createTables")
-        .action((v, o) => o.copy(createTables = v))
-        .text("true to create tables during restore, false if they already exist.")
-        .optional()
-
-      opt[String]("tableSuffix")
-        .action((v, o) => o.copy(tableSuffix = v))
-        .text("The suffix to add to the restored table names. Only used when createTables is true.")
-        .optional()
-
-      opt[String]("metadataPath")
-        .action((v, o) => o.copy(metadataPath = v))
-        .text(
-          "The root path to look for table metadata. This can be used to change the properties of " +
-            "the tables created during restore. By default the backup path is used. " +
-            "Only used when createTables is true.")
-        .optional()
-
-      arg[String]("<table>...")
-        .unbounded()
-        .action((v, o) => o.copy(tables = o.tables :+ v))
-        .text("A list of tables to be restored.")
-    }
-
-  /**
-   * Parses the passed arguments into Some[KuduRestoreOptions].
-   *
-   * If the arguments are bad, an error message is displayed
-   * and None is returned.
-   *
-   * @param args The arguments to parse.
-   * @return Some[KuduRestoreOptions] if parsing was successful, None if not.
-   */
-  def parse(args: Seq[String]): Option[KuduRestoreOptions] = {
-    parser.parse(args, KuduRestoreOptions(Seq(), null))
-  }
-}
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala
new file mode 100644
index 0000000..a57931f
--- /dev/null
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala
@@ -0,0 +1,209 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.backup
+
+import java.net.InetAddress
+
+import org.apache.kudu.client.AsyncKuduClient
+import org.apache.yetus.audience.InterfaceAudience
+import org.apache.yetus.audience.InterfaceStability
+import scopt.OptionParser
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+trait CommonOptions {
+  val tables: Seq[String]
+  val rootPath: String
+  val kuduMasterAddresses: String
+}
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+case class BackupOptions(
+    tables: Seq[String],
+    rootPath: String,
+    kuduMasterAddresses: String = InetAddress.getLocalHost.getCanonicalHostName,
+    toMs: Long = System.currentTimeMillis(),
+    forceFull: Boolean = BackupOptions.DefaultForceFull,
+    fromMs: Long = 0,
+    format: String = BackupOptions.DefaultFormat,
+    scanBatchSize: Int = BackupOptions.DefaultScanBatchSize,
+    scanRequestTimeoutMs: Long = BackupOptions.DefaultScanRequestTimeoutMs,
+    scanPrefetching: Boolean = BackupOptions.DefaultScanPrefetching,
+    keepAlivePeriodMs: Long = BackupOptions.DefaultKeepAlivePeriodMs)
+    extends CommonOptions {
+
+  // If not forcing a full backup and fromMs is not zero, this is an incremental backup.
+  def isIncremental: Boolean = {
+    !forceFull && fromMs != 0
+  }
+}
+
+object BackupOptions {
+  val DefaultForceFull: Boolean = false
+  val DefaultFormat: String = "parquet"
+  val DefaultScanBatchSize: Int = 1024 * 1024 * 20 // 20 MiB
+  val DefaultScanRequestTimeoutMs: Long =
+    AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS // 30 seconds
+  // TODO: Add a test per KUDU-1260 and enable by default?
+  val DefaultScanPrefetching: Boolean = false
+  val DefaultKeepAlivePeriodMs: Long = AsyncKuduClient.DEFAULT_KEEP_ALIVE_PERIOD_MS
+
+  // TODO: clean up usage output.
+  // TODO: timeout configurations.
+  private val parser: OptionParser[BackupOptions] =
+    new OptionParser[BackupOptions]("KuduBackup") {
+      opt[String]("rootPath")
+        .action((v, o) => o.copy(rootPath = v))
+        .text("The root path to output backup data. Accepts any Spark compatible path.")
+        .optional()
+
+      opt[String]("kuduMasterAddresses")
+        .action((v, o) => o.copy(kuduMasterAddresses = v))
+        .text("Comma-separated addresses of Kudu masters.")
+        .optional()
+
+      opt[Boolean]("forceFull")
+        .action((v, o) => o.copy(forceFull = v))
+        .text("If true, this will be a full backup even if another full already exists.")
+        .optional()
+
+      opt[Long]("fromMs")
+        .action((v, o) => o.copy(fromMs = v))
+        .text(
+          "A UNIX timestamp in milliseconds that defines the start time of an incremental backup.")
+        .optional()
+
+      opt[Long]("timestampMs")
+        .action((v, o) => o.copy(toMs = v))
+        // TODO: Document the limitations based on cluster configuration
+        //  (ex: ancient history watermark).
+        .text("A UNIX timestamp in milliseconds since the epoch to execute scans at.")
+        .optional()
+
+      opt[String]("format")
+        .action((v, o) => o.copy(format = v))
+        .text("The file format to use when writing the data.")
+        .optional()
+
+      opt[Int]("scanBatchSize")
+        .action((v, o) => o.copy(scanBatchSize = v))
+        .text("The maximum number of bytes returned by the scanner, on each batch.")
+        .optional()
+
+      opt[Int]("scanRequestTimeoutMs")
+        .action((v, o) => o.copy(scanRequestTimeoutMs = v))
+        .text("Sets how long in milliseconds each scan request to a server can last.")
+        .optional()
+
+      opt[Unit]("scanPrefetching")
+        .action((_, o) => o.copy(scanPrefetching = true))
+        .text("An experimental flag to enable pre-fetching data.")
+        .optional()
+
+      opt[Long]("keepAlivePeriodMs")
+        .action((v, o) => o.copy(keepAlivePeriodMs = v))
+        .text(
+          "Sets the period at which to send keep-alive requests to the tablet server to ensure" +
+            " that scanners do not time out")
+        .optional()
+
+      arg[String]("<table>...")
+        .unbounded()
+        .action((v, o) => o.copy(tables = o.tables :+ v))
+        .text("A list of tables to be backed up.")
+    }
+
+  /**
+   * Parses the passed arguments into Some[KuduBackupOptions].
+   *
+   * If the arguments are bad, an error message is displayed
+   * and None is returned.
+   *
+   * @param args The arguments to parse.
+   * @return Some[KuduBackupOptions] if parsing was successful, None if not.
+   */
+  def parse(args: Seq[String]): Option[BackupOptions] = {
+    parser.parse(args, BackupOptions(Seq(), null))
+  }
+}
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+case class RestoreOptions(
+    tables: Seq[String],
+    rootPath: String,
+    kuduMasterAddresses: String = InetAddress.getLocalHost.getCanonicalHostName,
+    tableSuffix: String = RestoreOptions.DefaultTableSuffix,
+    createTables: Boolean = RestoreOptions.DefaultCreateTables,
+    timestampMs: Long = System.currentTimeMillis()
+) extends CommonOptions
+
+object RestoreOptions {
+  val DefaultTableSuffix: String = "-restore"
+  val DefaultCreateTables: Boolean = true
+
+  // TODO: clean up usage output.
+  // TODO: timeout configurations.
+  private val parser: OptionParser[RestoreOptions] =
+    new OptionParser[RestoreOptions]("KuduRestore") {
+      opt[String]("rootPath")
+        .action((v, o) => o.copy(rootPath = v))
+        .text("The root path to the backup data. Accepts any Spark compatible path.")
+        .optional()
+
+      opt[String]("kuduMasterAddresses")
+        .action((v, o) => o.copy(kuduMasterAddresses = v))
+        .text("Comma-separated addresses of Kudu masters.")
+        .optional()
+
+      opt[Boolean]("createTables")
+        .action((v, o) => o.copy(createTables = v))
+        .text("true to create tables during restore, false if they already exist.")
+        .optional()
+
+      opt[String]("tableSuffix")
+        .action((v, o) => o.copy(tableSuffix = v))
+        .text("The suffix to add to the restored table names. Only used when createTables is true.")
+        .optional()
+
+      opt[Long]("timestampMs")
+        .action((v, o) => o.copy(timestampMs = v))
+        .text(
+          "A UNIX timestamp in milliseconds that define the latest time to use when selecting " +
+            "restore candidates.")
+        .optional()
+
+      arg[String]("<table>...")
+        .unbounded()
+        .action((v, o) => o.copy(tables = o.tables :+ v))
+        .text("A list of tables to be restored.")
+    }
+
+  /**
+   * Parses the passed arguments into Some[KuduRestoreOptions].
+   *
+   * If the arguments are bad, an error message is displayed
+   * and None is returned.
+   *
+   * @param args The arguments to parse.
+   * @return Some[KuduRestoreOptions] if parsing was successful, None if not.
+   */
+  def parse(args: Seq[String]): Option[RestoreOptions] = {
+    parser.parse(args, RestoreOptions(Seq(), null))
+  }
+}
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/RowAction.java b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/RowAction.java
new file mode 100644
index 0000000..54bf7ae
--- /dev/null
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/RowAction.java
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.backup;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * A RowAction is used to represent the action associated with a backed up row.
+ *
+ * Currently UPSERT is the default action, while rows with the IS_DELETED virtual
+ * column have an action of DELETE. This value is serialized as a byte in the
+ * output data format to be as space efficient as possible.
+ *
+ * Given there are currently only 2 options, IS_DELETED or not, we could have used an
+ * IS_DELETED boolean column in the output format, but this RowAction allows for greater
+ * format flexibility to support INSERT or UPDATE in the future if we have full fidelity
+ * and sparse row backups.
+ *
+ * See {@link RowIterator} for backup side usage and {@link KuduRestore} for restore
+ * side usage.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public enum RowAction {
+
+  UPSERT((byte) 0),
+  DELETE((byte) 1);
+
+  /** The byte value used to represent this RowAction */
+  private final byte value;
+
+  private static Map<Byte, RowAction> byteRowAction;
+  static {
+    byteRowAction = new ImmutableMap.Builder<Byte, RowAction>()
+        .put(UPSERT.getValue(), UPSERT)
+        .put(DELETE.getValue(), DELETE)
+        .build();
+  }
+
+  RowAction(byte value) {
+    this.value = value;
+  }
+
+  public byte getValue() {
+    return value;
+  }
+
+  public static RowAction fromValue(Byte value) {
+    return byteRowAction.get(value);
+  }
+}
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/SessionIO.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/SessionIO.scala
new file mode 100644
index 0000000..8977174
--- /dev/null
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/SessionIO.scala
@@ -0,0 +1,193 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.backup
+
+import java.io.InputStreamReader
+import java.net.URLEncoder
+import java.nio.charset.StandardCharsets
+import java.nio.file.Path
+import java.nio.file.Paths
+
+import com.google.common.io.CharStreams
+import com.google.protobuf.util.JsonFormat
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.{Path => HPath}
+import org.apache.kudu.Schema
+import org.apache.kudu.backup.Backup.TableMetadataPB
+import org.apache.kudu.backup.SessionIO._
+import org.apache.kudu.spark.kudu.SparkUtil
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.ByteType
+import org.apache.spark.sql.types.StructField
+import org.apache.spark.sql.types.StructType
+import org.apache.yetus.audience.InterfaceAudience
+import org.apache.yetus.audience.InterfaceStability
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+/**
+ * A class to encapsulate and centralize the logic for data layout and IO
+ * of metadata and data of the backup and restore jobs.
+ *
+ * The default backup directory structure is:
+ * /<rootPath>/<tableName>/<backup-id>/
+ *   .kudu-metadata.json
+ *   part-*.parquet
+ *
+ * In the above path the `/<rootPath>` can be used to distinguish separate backup groups.
+ * The `<backup-id>` is currently the `toMs` time for the job.
+ *
+ * TODO: Should the tableName contain the table id?
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+class SessionIO(val session: SparkSession, options: CommonOptions) {
+  val log: Logger = LoggerFactory.getLogger(getClass)
+
+  val conf: Configuration = session.sparkContext.hadoopConfiguration
+  val rootHPath: HPath = new HPath(options.rootPath)
+  val fs: FileSystem = rootHPath.getFileSystem(conf)
+
+  /**
+   * Returns the Spark schema for backup data based on the Kudu Schema.
+   * Additionally handles adding the RowAction column for incremental backup/restore.
+   * @return the Spark schema for backup data.
+   */
+  def dataSchema(schema: Schema, includeRowAction: Boolean = true): StructType = {
+    var fields = SparkUtil.sparkSchema(schema).fields
+    if (includeRowAction) {
+      val changeTypeField = generateRowActionColumn(schema)
+      fields = fields ++ Seq(changeTypeField)
+    }
+    StructType(fields)
+  }
+
+  /**
+   * Generates a RowAction column and handles column name collisions.
+   * The column name can vary because it's accessed positionally.
+   */
+  private def generateRowActionColumn(schema: Schema): StructField = {
+    var columnName = "backup_row_action"
+    // If the column already exists and we need to pick an alternate column name.
+    while (schema.hasColumn(columnName)) {
+      columnName += "_"
+    }
+    StructField(columnName, ByteType)
+  }
+
+  /**
+   * @return the path to the table directory.
+   */
+  def tablePath(tableName: String): Path = {
+    Paths.get(options.rootPath).resolve(URLEncoder.encode(tableName, "UTF-8"))
+  }
+
+  /**
+   * @return the backup path for a table and time.
+   */
+  def backupPath(tableName: String, timestampMs: Long): Path = {
+    tablePath(tableName).resolve(timestampMs.toString)
+  }
+
+  /**
+   * @return the path to the metadata file within a backup path.
+   */
+  def backupMetadataPath(backupPath: Path): Path = {
+    backupPath.resolve(MetadataFileName)
+  }
+
+  /**
+   * Serializes the table metadata to Json and writes it to the metadata path.
+   * @param tableMetadata the metadata to serialize.
+   * @param metadataPath the path to write the metadata file too.
+   */
+  def writeTableMetadata(tableMetadata: TableMetadataPB, metadataPath: Path): Unit = {
+    log.error(s"Writing metadata to $metadataPath")
+    val hPath = new HPath(metadataPath.toString)
+    val out = fs.create(hPath, /* overwrite= */ false)
+    val json = JsonFormat.printer().print(tableMetadata)
+    out.write(json.getBytes(StandardCharsets.UTF_8))
+    out.flush()
+    out.close()
+  }
+
+  /**
+   * Reads an entire backup graph by reading all of the metadata files for the
+   * given table. See [[BackupGraph]] for more details.
+   * @param tableName the table to read a backup graph for.
+   * @return the full backup graph.
+   */
+  def readBackupGraph(tableName: String): BackupGraph = {
+    val backups = readTableBackups(tableName)
+    val graph = new BackupGraph()
+    backups.foreach {
+      case (path, metadata) =>
+        graph.addBackup(BackupNode(path, metadata))
+    }
+    graph
+  }
+
+  /**
+   * Reads and returns all of the metadata for a given table.
+   * @param tableName the table to read the metadata for.
+   * @return a sequence of all the paths and metadata.
+   */
+  // TODO: Also use table-id to find backups.
+  private def readTableBackups(tableName: String): Seq[(Path, TableMetadataPB)] = {
+    val hPath = new HPath(tablePath(tableName).toString)
+    val results = new mutable.ListBuffer[(Path, TableMetadataPB)]()
+    if (fs.exists(hPath)) {
+      val iter = fs.listLocatedStatus(hPath)
+      while (iter.hasNext) {
+        val file = iter.next()
+        if (file.isDirectory) {
+          val metadataHPath = new HPath(file.getPath, MetadataFileName)
+          if (fs.exists(metadataHPath)) {
+            val metadata = readTableMetadata(Paths.get(metadataHPath.toString))
+            results += ((Paths.get(file.getPath.toString), metadata))
+          }
+        }
+      }
+    }
+    log.error(s"Found ${results.size} paths in ${hPath.toString}")
+    results.toList
+  }
+
+  /**
+   * Reads and deserializes the metadata file at the given path.
+   * @param metadataPath the path to the metadata file.
+   * @return the deserialized table metadata.
+   */
+  private def readTableMetadata(metadataPath: Path): TableMetadataPB = {
+    val hPath = new HPath(metadataPath.toString)
+    val in = new InputStreamReader(fs.open(hPath), StandardCharsets.UTF_8)
+    val json = CharStreams.toString(in)
+    in.close()
+    val builder = TableMetadataPB.newBuilder()
+    JsonFormat.parser().merge(json, builder)
+    builder.build()
+  }
+}
+
+object SessionIO {
+  // The name of the metadata file within a backup directory.
+  val MetadataFileName = ".kudu-metadata.json"
+}
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/TableMetadata.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/TableMetadata.scala
index 64845b4..0d2c315 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/TableMetadata.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/TableMetadata.scala
@@ -1,19 +1,19 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
 package org.apache.kudu.backup
 
 import java.math.BigDecimal
@@ -25,7 +25,6 @@ import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder
 import org.apache.kudu.ColumnSchema.CompressionAlgorithm
 import org.apache.kudu.ColumnSchema.Encoding
 import org.apache.kudu.ColumnTypeAttributes.ColumnTypeAttributesBuilder
-import org.apache.kudu.client.Bytes
 import org.apache.kudu.client.CreateTableOptions
 import org.apache.kudu.client.KuduTable
 import org.apache.kudu.client.PartialRow
@@ -42,8 +41,9 @@ import scala.collection.JavaConverters._
 object TableMetadata {
 
   val MetadataFileName = ".kudu-metadata.json"
+  val MetadataVersion = 1
 
-  def getTableMetadata(table: KuduTable, options: KuduBackupOptions): TableMetadataPB = {
+  def getTableMetadata(table: KuduTable, options: BackupOptions): TableMetadataPB = {
     val columns = table.getSchema.getColumns.asScala.map { col =>
       val builder = ColumnMetadataPB
         .newBuilder()
@@ -65,8 +65,9 @@ object TableMetadata {
 
     TableMetadataPB
       .newBuilder()
-      .setFromMs(0) // TODO: fromMs is always zero until we support incremental backups
-      .setToMs(options.timestampMs)
+      .setVersion(MetadataVersion)
+      .setFromMs(options.fromMs)
+      .setToMs(options.toMs)
       .setDataFormat(options.format)
       .setTableName(table.getName)
       .addAllColumns(columns.asJava)
diff --git a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestBackupGraph.scala b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestBackupGraph.scala
new file mode 100644
index 0000000..ca59921
--- /dev/null
+++ b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestBackupGraph.scala
@@ -0,0 +1,144 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.backup
+
+import org.apache.kudu.client.KuduTable
+import org.apache.kudu.spark.kudu._
+import org.junit.Assert._
+import org.junit.Test
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
+
+class TestBackupGraph extends KuduTestSuite {
+  val log: Logger = LoggerFactory.getLogger(getClass)
+
+  @Test
+  def testSimpleBackupGraph() {
+    val graph = new BackupGraph()
+    val full = createBackupVertex(table, 0, 1)
+    graph.addBackup(full)
+
+    // Validate a graph with only a single full backup.
+    assertEquals(1, graph.fullBackups.size)
+    assertEquals(1, graph.backupPaths.size)
+    val fullPath = graph.backupPaths.head
+    assertEquals("0", fullPath.pathString)
+
+    // Validate a graph with a couple incremental backups.
+    val inc1 = createBackupVertex(table, 1, 2)
+    graph.addBackup(inc1)
+    val inc2 = createBackupVertex(table, 2, 3)
+    graph.addBackup(inc2)
+    assertEquals(1, graph.fullBackups.size)
+    assertEquals(1, graph.backupPaths.size)
+
+    val incPath = graph.backupPaths.head
+    assertEquals("0 -> 1 -> 2", incPath.pathString)
+  }
+
+  @Test
+  def testForkingBackupGraph() {
+    val graph = new BackupGraph()
+    val full = createBackupVertex(table, 0, 1)
+    graph.addBackup(full)
+    // Duplicate fromMs of 1 creates a fork in the graph.
+    val inc1 = createBackupVertex(table, 1, 2)
+    graph.addBackup(inc1)
+    val inc2 = createBackupVertex(table, 1, 4)
+    graph.addBackup(inc2)
+    val inc3 = createBackupVertex(table, 2, 3)
+    graph.addBackup(inc3)
+    val inc4 = createBackupVertex(table, 4, 5)
+    graph.addBackup(inc4)
+
+    assertEquals(1, graph.fullBackups.size)
+    assertEquals(2, graph.backupPaths.size)
+
+    val path1 = graph.backupPaths.head
+    assertEquals("0 -> 1 -> 2", path1.pathString)
+
+    val path2 = graph.backupPaths.last
+    assertEquals("0 -> 1 -> 4", path2.pathString)
+
+    // Ensure the most recent incremental is used for a backup base and restore path.
+    assertEquals(5, graph.backupBase.metadata.getToMs)
+    assertEquals(5, graph.restorePath.toMs)
+  }
+
+  @Test
+  def testMultiFullBackupGraph() {
+    val graph = new BackupGraph()
+    val full1 = createBackupVertex(table, 0, 1)
+    graph.addBackup(full1)
+    val inc1 = createBackupVertex(table, 1, 2)
+    graph.addBackup(inc1)
+    val inc2 = createBackupVertex(table, 2, 4)
+    graph.addBackup(inc2)
+
+    // Add a second full backup.
+    val full2 = createBackupVertex(table, 0, 4)
+    graph.addBackup(full2)
+    val inc3 = createBackupVertex(table, 4, 5)
+    graph.addBackup(inc3)
+
+    assertEquals(2, graph.fullBackups.size)
+    assertEquals(2, graph.backupPaths.size)
+    val path1 = graph.backupPaths.head
+    assertEquals("0 -> 1 -> 2 -> 4", path1.pathString)
+
+    val path2 = graph.backupPaths.last
+    assertEquals("0 -> 4", path2.pathString)
+
+    // Ensure the most recent incremental is used for a backup base and restore path.
+    assertEquals(5, graph.backupBase.metadata.getToMs)
+    assertEquals(5, graph.restorePath.toMs)
+  }
+
+  @Test
+  def testFilterByTime() {
+    val graph = new BackupGraph()
+    val full1 = createBackupVertex(table, 0, 1)
+    graph.addBackup(full1)
+    val inc1 = createBackupVertex(table, 1, 2)
+    graph.addBackup(inc1)
+    val inc2 = createBackupVertex(table, 2, 4)
+    graph.addBackup(inc2)
+
+    // Add a second full backup.
+    val full2 = createBackupVertex(table, 0, 4)
+    graph.addBackup(full2)
+    val inc3 = createBackupVertex(table, 4, 5)
+    graph.addBackup(inc3)
+
+    val newGraph = graph.filterByTime(2)
+
+    assertEquals(1, newGraph.fullBackups.size)
+    assertEquals(1, newGraph.backupPaths.size)
+  }
+
+  private def createBackupVertex(table: KuduTable, fromMs: Long, toMs: Long): BackupNode = {
+    val options = new BackupOptions(
+      tables = Seq(table.getName),
+      rootPath = "foo/path",
+      "fooAddresses",
+      fromMs = fromMs,
+      toMs = toMs
+    )
+    val metadata = TableMetadata.getTableMetadata(table, options)
+    BackupNode(null, metadata)
+  }
+}
diff --git a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
index 3180e6b..a707702 100644
--- a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
+++ b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
@@ -14,25 +14,21 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-
 package org.apache.kudu.backup
 
 import java.nio.file.Files
+import java.nio.file.Path
 import java.util
 
 import com.google.common.base.Objects
 import org.apache.commons.io.FileUtils
-
 import org.apache.kudu.client.PartitionSchema.HashBucketSchema
 import org.apache.kudu.client._
 import org.apache.kudu.ColumnSchema
 import org.apache.kudu.Schema
-import org.apache.kudu.Type
 import org.apache.kudu.spark.kudu._
 import org.apache.kudu.test.RandomUtils
 import org.apache.kudu.util.DataGenerator.DataGeneratorBuilder
-import org.apache.kudu.util.DataGenerator
-import org.apache.kudu.util.DecimalUtil
 import org.apache.kudu.util.HybridTimeUtil
 import org.apache.kudu.util.SchemaGenerator.SchemaGeneratorBuilder
 import org.junit.Assert._
@@ -40,8 +36,8 @@ import org.junit.Before
 import org.junit.Test
 import org.slf4j.Logger
 import org.slf4j.LoggerFactory
+
 import scala.collection.JavaConverters._
-import scala.util.Random
 
 class TestKuduBackup extends KuduTestSuite {
   val log: Logger = LoggerFactory.getLogger(getClass)
@@ -57,7 +53,7 @@ class TestKuduBackup extends KuduTestSuite {
   def testSimpleBackupAndRestore() {
     insertRows(table, 100) // Insert data into the default test table.
 
-    backupAndRestore(tableName)
+    backupAndRestore(Seq(tableName))
 
     val rdd =
       kuduContext.kuduRDD(ss.sparkContext, s"$tableName-restore", List("key"))
@@ -70,6 +66,28 @@ class TestKuduBackup extends KuduTestSuite {
     assertTrue(partitionSchemasMatch(tA.getPartitionSchema, tB.getPartitionSchema))
   }
 
+  // TODO: Add a thorough randomized test for full and incremental backup/restore.
+  @Test
+  def TestIncrementalBackupAndRestore() {
+    insertRows(table, 100) // Insert data into the default test table.
+
+    val rootDir = Files.createTempDirectory("backup")
+    doBackup(rootDir, Seq(tableName)) // Full backup.
+    insertRows(table, 100, 100) // Insert more data.
+    doBackup(rootDir, Seq(tableName), incremental = true) // Incremental backup.
+    // Delete rows that span the full and incremental backup.
+    Range(50, 150).foreach { i =>
+      deleteRow(i)
+    }
+    doBackup(rootDir, Seq(tableName), incremental = true) // Incremental backup.
+    doRestore(rootDir, Seq(tableName)) // Restore all the backups.
+    FileUtils.deleteDirectory(rootDir.toFile)
+
+    val rdd =
+      kuduContext.kuduRDD(ss.sparkContext, s"$tableName-restore", List("key"))
+    assert(rdd.collect.length == 100)
+  }
+
   @Test
   def testSimpleBackupAndRestoreWithSpecialCharacters() {
     // Use an Impala-style table name to verify url encoding/decoding of the table name works.
@@ -81,7 +99,7 @@ class TestKuduBackup extends KuduTestSuite {
 
     kuduClient.createTable(impalaTableName, simpleSchema, tableOptions)
 
-    backupAndRestore(impalaTableName)
+    backupAndRestore(Seq(impalaTableName))
 
     val rdd = kuduContext
       .kuduRDD(ss.sparkContext, s"$impalaTableName-restore", List("key"))
@@ -95,7 +113,7 @@ class TestKuduBackup extends KuduTestSuite {
     val tableName = table.getName
     loadRandomData(table)
 
-    backupAndRestore(tableName)
+    backupAndRestore(Seq(tableName))
 
     val backupRows = kuduContext.kuduRDD(ss.sparkContext, s"$tableName").collect
     val restoreRows =
@@ -121,7 +139,7 @@ class TestKuduBackup extends KuduTestSuite {
     insertRows(table1, numRows)
     insertRows(table2, numRows)
 
-    backupAndRestore(table1Name, table2Name)
+    backupAndRestore(Seq(table1Name, table2Name))
 
     val rdd1 =
       kuduContext.kuduRDD(ss.sparkContext, s"$table1Name-restore", List("key"))
@@ -166,7 +184,7 @@ class TestKuduBackup extends KuduTestSuite {
     insertRows(table, kNumPartitions)
 
     // Now backup and restore the table.
-    backupAndRestore(tableName)
+    backupAndRestore(Seq(tableName))
   }
 
   @Test
@@ -180,7 +198,7 @@ class TestKuduBackup extends KuduTestSuite {
 
     insertRows(table1, 100)
 
-    backupAndRestore(tableName)
+    backupAndRestore(Seq(tableName))
   }
 
   // TODO: Move to a Schema equals/equivalent method.
@@ -269,9 +287,14 @@ class TestKuduBackup extends KuduTestSuite {
     }
   }
 
-  def backupAndRestore(tableNames: String*): Unit = {
-    val dir = Files.createTempDirectory("backup")
-    val path = dir.toUri.toString
+  def backupAndRestore(tableNames: Seq[String]): Unit = {
+    val rootDir = Files.createTempDirectory("backup")
+    doBackup(rootDir, tableNames)
+    doRestore(rootDir, tableNames)
+    FileUtils.deleteDirectory(rootDir.toFile)
+  }
+
+  def doBackup(rootDir: Path, tableNames: Seq[String], incremental: Boolean = false): Unit = {
     val nowMs = System.currentTimeMillis()
 
     // Log the timestamps to simplify flaky debugging.
@@ -286,14 +309,19 @@ class TestKuduBackup extends KuduTestSuite {
     // granularity. This means if the test runs fast enough that data is inserted with the same
     // millisecond value as nowMs (after truncating the micros) the records inserted in the
     // microseconds after truncation could be unread.
-    val backupOptions =
-      new KuduBackupOptions(tableNames, path, harness.getMasterAddressesAsString, nowMs + 1)
+    val backupOptions = new BackupOptions(
+      tableNames,
+      rootDir.toUri.toString,
+      harness.getMasterAddressesAsString,
+      nowMs + 1,
+      incremental
+    )
     KuduBackup.run(backupOptions, ss)
+  }
 
+  def doRestore(rootDir: Path, tableNames: Seq[String]): Unit = {
     val restoreOptions =
-      new KuduRestoreOptions(tableNames, path, harness.getMasterAddressesAsString)
+      new RestoreOptions(tableNames, rootDir.toUri.toString, harness.getMasterAddressesAsString)
     KuduRestore.run(restoreOptions, ss)
-
-    FileUtils.deleteDirectory(dir.toFile)
   }
 }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/Schema.java b/java/kudu-client/src/main/java/org/apache/kudu/Schema.java
index f5d746e..0be52aa 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/Schema.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/Schema.java
@@ -198,6 +198,15 @@ public class Schema {
   }
 
   /**
+   * Returns true if the column exists.
+   * @param columnName column to search for
+   * @return true if the column exists
+   */
+  public boolean hasColumn(String columnName) {
+    return this.columnsByName.containsKey(columnName);
+  }
+
+  /**
    * Get the index for the provided column name.
    * @param columnName column to search for
    * @return an index in the schema
@@ -307,14 +316,22 @@ public class Schema {
   }
 
   /**
+   * @return true if the schema has the IS_DELETED virtual column
+   */
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  public boolean hasIsDeleted() {
+    return isDeletedIndex != NO_IS_DELETED_INDEX;
+  }
+
+  /**
    * @return the index of the IS_DELETED virtual column
    * @throws IllegalStateException if no IS_DELETED virtual column exists
    */
   @InterfaceAudience.Private
   @InterfaceStability.Unstable
   public int getIsDeletedIndex() {
-    Preconditions.checkState(isDeletedIndex != NO_IS_DELETED_INDEX,
-        "Schema doesn't have an IS_DELETED columns");
+    Preconditions.checkState(hasIsDeleted(), "Schema doesn't have an IS_DELETED columns");
     return isDeletedIndex;
   }
 }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
index 9fa42c9..3ecc67e 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
@@ -350,16 +350,9 @@ public final class AsyncKuduScanner {
    */
   private static ColumnSchema generateIsDeletedColumn(Schema schema) {
     String columnName = DEFAULT_IS_DELETED_COL_NAME;
-    boolean collision = true;
-    while (collision) {
-      try {
-        // If getColumnIndex doesn't throw an IllegalArgumentException then
-        // the column already exists and we need to pick an alternate IS_DELETED column name.
-        schema.getColumnIndex(columnName);
-        columnName += "_";
-      } catch (IllegalArgumentException ex) {
-        collision = false;
-      }
+    // If the column already exists and we need to pick an alternate column name.
+    while (schema.hasColumn(columnName)) {
+      columnName += "_";
     }
     return new ColumnSchema.ColumnSchemaBuilder(columnName, Type.BOOL)
             .wireType(Common.DataType.IS_DELETED)
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RowResult.java b/java/kudu-client/src/main/java/org/apache/kudu/client/RowResult.java
index 7f80670..0d8d502 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/RowResult.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RowResult.java
@@ -597,6 +597,15 @@ public class RowResult {
   }
 
   /**
+   * @return true if the RowResult has the IS_DELETED virtual column
+   */
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  public boolean hasIsDeleted() {
+    return schema.hasIsDeleted();
+  }
+
+  /**
    * @return the value of the IS_DELETED virtual column
    * @throws IllegalStateException if no IS_DELETED virtual column exists
    */
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/RowConverter.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/RowConverter.scala
index e9f16e4..6df627d 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/RowConverter.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/RowConverter.scala
@@ -34,10 +34,14 @@ import org.apache.yetus.audience.InterfaceStability
 class RowConverter(kuduSchema: Schema, schema: StructType, ignoreNull: Boolean) {
 
   private val typeConverter = CatalystTypeConverters.createToScalaConverter(schema)
-  private val indices: Array[(Int, Int)] = schema.fields.zipWithIndex.map({
+  private val indices: Array[(Int, Int)] = schema.fields.zipWithIndex.flatMap {
     case (field, sparkIdx) =>
-      sparkIdx -> kuduSchema.getColumnIndex(field.name)
-  })
+      // Support Spark schemas that have more columns than the Kudu table by
+      // ignoring missing Kudu columns.
+      if (kuduSchema.hasColumn(field.name)) {
+        Some(sparkIdx -> kuduSchema.getColumnIndex(field.name))
+      } else None
+  }
 
   /**
    * Converts a Spark internalRow to a Spark Row.
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala
index 2f13ed2..4e139e2 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala
@@ -154,10 +154,11 @@ trait KuduTestSuite extends JUnitSuite {
 
   def insertRows(
       targetTable: KuduTable,
-      rowCount: Integer): IndexedSeq[(Int, Int, String, Long)] = {
+      rowCount: Int,
+      startIndex: Int = 0): IndexedSeq[(Int, Int, String, Long)] = {
     val kuduSession = kuduClient.newSession()
 
-    val rows = Range(0, rowCount).map { i =>
+    val rows = Range(startIndex, rowCount + startIndex).map { i =>
       val insert = targetTable.newInsert
       val row = insert.getRow
       row.addInt(0, i)