You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2019/04/23 19:38:46 UTC

[kudu] branch master updated: [backup] Cleanup backup/restore TODO comments

This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new a226554  [backup] Cleanup backup/restore TODO comments
a226554 is described below

commit a2265547f671f5efc5a36b8a140aa47aeb1a8be2
Author: Grant Henke <gr...@apache.org>
AuthorDate: Tue Apr 23 10:57:51 2019 -0500

    [backup] Cleanup backup/restore TODO comments
    
    This patch reviews the open TODO items in the Kudu
    backup and restore code. It either addresses them,
    removes them, or opens a jira to track them.
    
    The fixed TODO items are:
    - Support backups using follower replicas
    - Reviewed backup command line options
    - Hides “experimental” options
    
    Change-Id: Ia03dd12b22988763640ff19f331ebe75e3cb5d6f
    Reviewed-on: http://gerrit.cloudera.org:8080/13085
    Tested-by: Kudu Jenkins
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
---
 java/kudu-backup/src/main/protobuf/backup.proto    |  2 +-
 .../scala/org/apache/kudu/backup/BackupGraph.scala |  1 -
 .../scala/org/apache/kudu/backup/KuduBackup.scala  |  9 +-
 .../org/apache/kudu/backup/KuduBackupRDD.scala     | 24 +++---
 .../scala/org/apache/kudu/backup/KuduRestore.scala | 18 ++--
 .../scala/org/apache/kudu/backup/Options.scala     | 97 ++++++++++++++--------
 .../scala/org/apache/kudu/backup/SessionIO.scala   |  2 +-
 .../org/apache/kudu/backup/TestKuduBackup.scala    |  6 +-
 .../scala/org/apache/kudu/backup/TestOptions.scala | 63 ++++++++++++++
 .../scala/org/apache/kudu/spark/kudu/KuduRDD.scala | 11 +--
 10 files changed, 152 insertions(+), 81 deletions(-)

diff --git a/java/kudu-backup/src/main/protobuf/backup.proto b/java/kudu-backup/src/main/protobuf/backup.proto
index 6de3b3e..da2c794 100644
--- a/java/kudu-backup/src/main/protobuf/backup.proto
+++ b/java/kudu-backup/src/main/protobuf/backup.proto
@@ -35,7 +35,7 @@ message ColumnTypeAttributesMetadataPB {
 
 // Maps to the ColumnSchema class.
 // The fields are effectively 1 to 1 mappings of those in ColumnSchema.
-// TODO: How do we handle column additions?
+// TODO (KUDU-2788): How do we handle column additions?
 message ColumnMetadataPB {
   string name = 1;
   string type = 2;
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/BackupGraph.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/BackupGraph.scala
index 21c7ff9..1a6e92a 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/BackupGraph.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/BackupGraph.scala
@@ -166,7 +166,6 @@ case class BackupNode(path: Path, metadata: TableMetadataPB) {
    */
   def weight: Int = {
     // Full backups have a weight of 0 and partial backups have a weight of 1.
-    // TODO: Use the size of a partial backup to contribute to weight.
     if (metadata.getFromMs == 0) 0 else 1
   }
 }
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala
index 445bef0..a7c277f 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala
@@ -26,12 +26,6 @@ import org.slf4j.LoggerFactory
 
 /**
  * The main class for a Kudu backup spark job.
- *
- * Example Usage:
- *   spark-submit --class org.apache.kudu.backup.KuduBackup kudu-backup2_2.11-*.jar \
- *     --kuduMasterAddresses master1-host,master-2-host,master-3-host \
- *     --rootPath hdfs:///kudu/backup/path \
- *     my_kudu_table
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
@@ -46,7 +40,8 @@ object KuduBackup {
         session.sparkContext
       )
     val io = new SessionIO(session, options)
-    // TODO: Make parallel so each table isn't process serially?
+    // TODO (KUDU-2786): Make parallel so each table isn't process serially.
+    // TODO (KUDU-2787): Handle single table failures.
     options.tables.foreach { tableName =>
       var tableOptions = options.copy() // Copy the options so we can modify them for the table.
       val table = context.syncClient.openTable(tableName)
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala
index 544e7b2..80abb6d 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala
@@ -45,15 +45,13 @@ class KuduBackupRDD private[kudu] (
   // Defined here because the options are transient.
   val incremental: Boolean = options.isIncremental
 
-  // TODO: Split large tablets into smaller scan tokens?
+  // TODO (KUDU-2785): Split large tablets into smaller scan tokens?
   override protected def getPartitions: Array[Partition] = {
     val client = kuduContext.syncClient
 
     val builder = client
       .newScanTokenBuilder(table)
       .cacheBlocks(false)
-      // TODO: Use fault tolerant scans to get mostly ordered results when KUDU-2466 is fixed.
-      // .setFaultTolerant(true)
       .replicaSelection(ReplicaSelection.CLOSEST_REPLICA)
       .readMode(ReadMode.READ_AT_SNAPSHOT)
       .batchSizeBytes(options.scanBatchSize)
@@ -79,16 +77,20 @@ class KuduBackupRDD private[kudu] (
     val tokens = builder.build()
     tokens.asScala.zipWithIndex.map {
       case (token, index) =>
-        // TODO: Support backups from any replica or followers only.
-        // Always run on the leader for data locality.
-        val leaderLocation = token.getTablet.getLeaderReplica.getRpcHost
-        KuduBackupPartition(index, token.serialize(), Array(leaderLocation))
+        // Only list the leader replica as the preferred location if
+        // replica selection policy is leader only, to take advantage
+        // of scan locality.
+        val locations: Array[String] = {
+          if (options.scanLeaderOnly) {
+            Array(token.getTablet.getLeaderReplica.getRpcHost)
+          } else {
+            token.getTablet.getReplicas.asScala.map(_.getRpcHost).toArray
+          }
+        }
+        KuduBackupPartition(index, token.serialize(), locations)
     }.toArray
   }
 
-  // TODO: Do we need a custom spark partitioner for any guarantees?
-  // override val partitioner = None
-
   override def compute(part: Partition, taskContext: TaskContext): Iterator[Row] = {
     val client: KuduClient = kuduContext.syncClient
     val partition: KuduBackupPartition = part.asInstanceOf[KuduBackupPartition]
@@ -135,8 +137,6 @@ private class RowIterator(
     scannerIterator.hasNext(nextRowsCallback)
   }
 
-  // TODO: There may be an old KuduRDD implementation where we did some
-  // sort of zero copy/object pool pattern for performance (we could use that here).
   override def next(): Row = {
     val rowResult = scannerIterator.next()
     val fieldCount = rowResult.getColumnProjection.getColumnCount
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
index 50517a9..c4b5f8b 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
@@ -21,7 +21,6 @@ import org.apache.kudu.client.AlterTableOptions
 import org.apache.kudu.client.SessionConfiguration.FlushMode
 import org.apache.kudu.spark.kudu.KuduContext
 import org.apache.kudu.spark.kudu.RowConverter
-import org.apache.parquet.hadoop.ParquetInputFormat
 import org.apache.spark.sql.SparkSession
 import org.apache.yetus.audience.InterfaceAudience
 import org.apache.yetus.audience.InterfaceStability
@@ -30,12 +29,6 @@ import org.slf4j.LoggerFactory
 
 /**
  * The main class for a Kudu restore spark job.
- *
- * Example Usage:
- *   spark-submit --class org.apache.kudu.backup.KuduRestore kudu-backup2_2.11-*.jar \
- *     --kuduMasterAddresses master1-host,master-2-host,master-3-host \
- *     --rootPath hdfs:///kudu/backup/path \
- *     my_kudu_table
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
@@ -51,15 +44,16 @@ object KuduRestore {
       )
     val io = new SessionIO(session, options)
 
-    // TODO: Make parallel so each table isn't processed serially.
+    // TODO (KUDU-2786): Make parallel so each table isn't processed serially.
+    // TODO (KUDU-2787): Handle single table failures.
     options.tables.foreach { tableName =>
-      // TODO: Consider an option to enforce an exact toMS match.
       val graph = io.readBackupGraph(tableName).filterByTime(options.timestampMs)
       graph.restorePath.backups.foreach { backup =>
         log.info(s"Restoring table $tableName from path: ${backup.path}")
         val isFullRestore = backup.metadata.getFromMs == 0
         val restoreName = s"${backup.metadata.getTableName}${options.tableSuffix}"
-        // TODO: Store the full metadata to compare/validate for each applied partial.
+
+        // TODO (KUDU-2788): Store the full metadata to compare/validate for each applied partial.
 
         // On the full restore we may need to create the table.
         if (isFullRestore) {
@@ -72,7 +66,6 @@ object KuduRestore {
         val restoreSchema = io.dataSchema(table.getSchema)
         val rowActionCol = restoreSchema.fields.last.name
 
-        // TODO: Restrict format option.
         var data = session.sqlContext.read
           .format(backup.metadata.getDataFormat)
           .schema(restoreSchema)
@@ -82,8 +75,7 @@ object KuduRestore {
           .na
           .fill(RowAction.UPSERT.getValue, Seq(rowActionCol))
 
-        // TODO: Expose more configuration options:
-        //   (session timeout, consistency mode, flush interval, mutation buffer space)
+        // Write the data to Kudu.
         data.queryExecution.toRdd.foreachPartition { internalRows =>
           val table = context.syncClient.openTable(restoreName)
           val converter = new RowConverter(table.getSchema, restoreSchema, false)
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala
index 2874eac..51d468f 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala
@@ -43,6 +43,7 @@ case class BackupOptions(
     format: String = BackupOptions.DefaultFormat,
     scanBatchSize: Int = BackupOptions.DefaultScanBatchSize,
     scanRequestTimeoutMs: Long = BackupOptions.DefaultScanRequestTimeoutMs,
+    scanLeaderOnly: Boolean = BackupOptions.DefaultScanLeaderOnly,
     scanPrefetching: Boolean = BackupOptions.DefaultScanPrefetching,
     keepAlivePeriodMs: Long = BackupOptions.DefaultKeepAlivePeriodMs)
     extends CommonOptions {
@@ -60,69 +61,88 @@ object BackupOptions {
   val DefaultScanBatchSize: Int = 1024 * 1024 * 20 // 20 MiB
   val DefaultScanRequestTimeoutMs: Long =
     AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS // 30 seconds
-  // TODO: Add a test per KUDU-1260 and enable by default?
+  val DefaultScanLeaderOnly: Boolean = false
+  // TODO (KUDU-1260): Add a test and enable by default?
   val DefaultScanPrefetching: Boolean = false
   val DefaultKeepAlivePeriodMs: Long = AsyncKuduClient.DEFAULT_KEEP_ALIVE_PERIOD_MS
 
-  // TODO: clean up usage output.
-  // TODO: timeout configurations.
-  private val parser: OptionParser[BackupOptions] =
-    new OptionParser[BackupOptions]("KuduBackup") {
+  // We use the program name to make the help output show a the spark invocation required.
+  val ClassName: String = KuduBackup.getClass.getCanonicalName.dropRight(1) // Remove trailing `$`
+  val ProgramName: String = "spark-submit --class " + ClassName + " [spark-options] " +
+    "<application-jar>"
+
+  val parser: OptionParser[BackupOptions] =
+    new OptionParser[BackupOptions](ProgramName) {
       opt[String]("rootPath")
         .action((v, o) => o.copy(rootPath = v))
         .text("The root path to output backup data. Accepts any Spark compatible path.")
-        .optional()
+        .required()
 
       opt[String]("kuduMasterAddresses")
         .action((v, o) => o.copy(kuduMasterAddresses = v))
-        .text("Comma-separated addresses of Kudu masters.")
+        .text("Comma-separated addresses of Kudu masters. Default: localhost")
         .optional()
 
       opt[Boolean]("forceFull")
         .action((v, o) => o.copy(forceFull = v))
-        .text("If true, this will be a full backup even if another full already exists.")
+        .text("If true, this will be a full backup even if another full already exists. " +
+          "Default: " + DefaultForceFull)
         .optional()
 
       opt[Long]("fromMs")
         .action((v, o) => o.copy(fromMs = v))
         .text(
-          "A UNIX timestamp in milliseconds that defines the start time of an incremental backup.")
+          "A UNIX timestamp in milliseconds that defines the start time of an incremental " +
+            "backup. If unset, the fromMs will be defined by previous backups in the root " +
+            "directory.")
         .optional()
 
       opt[Long]("timestampMs")
         .action((v, o) => o.copy(toMs = v))
-        // TODO: Document the limitations based on cluster configuration
-        //  (ex: ancient history watermark).
-        .text("A UNIX timestamp in milliseconds since the epoch to execute scans at.")
-        .optional()
-
-      opt[String]("format")
-        .action((v, o) => o.copy(format = v))
-        .text("The file format to use when writing the data.")
+        // TODO (KUDU-2677): Document the limitations based on cluster configuration.
+        .text("A UNIX timestamp in milliseconds since the epoch to execute scans at. " +
+          "Default: `System.currentTimeMillis()`")
         .optional()
 
       opt[Int]("scanBatchSize")
         .action((v, o) => o.copy(scanBatchSize = v))
-        .text("The maximum number of bytes returned by the scanner, on each batch.")
+        .text("The maximum number of bytes returned by the scanner, on each batch. " +
+          "Default: " + DefaultScanBatchSize)
         .optional()
 
       opt[Int]("scanRequestTimeoutMs")
         .action((v, o) => o.copy(scanRequestTimeoutMs = v))
-        .text("Sets how long in milliseconds each scan request to a server can last.")
+        .text("Sets how long in milliseconds each scan request to a server can last. " +
+          "Default: " + DefaultScanRequestTimeoutMs)
         .optional()
 
-      opt[Unit]("scanPrefetching")
-        .action((_, o) => o.copy(scanPrefetching = true))
-        .text("An experimental flag to enable pre-fetching data.")
+      opt[Boolean]("scanLeaderOnly")
+        .action((v, o) => o.copy(scanLeaderOnly = v))
+        .text("If true scans will only use the leader replica, otherwise scans will take place " +
+          "at the closest replica. Default: " + DefaultScanLeaderOnly)
         .optional()
 
       opt[Long]("keepAlivePeriodMs")
         .action((v, o) => o.copy(keepAlivePeriodMs = v))
-        .text(
-          "Sets the period at which to send keep-alive requests to the tablet server to ensure" +
-            " that scanners do not time out")
+        .text("Sets the period at which to send keep-alive requests to the tablet server to " +
+          "ensure that scanners do not time out. Default: " + DefaultKeepAlivePeriodMs)
+        .optional()
+
+      opt[String]("format")
+        .action((v, o) => o.copy(format = v))
+        .text("The file format to use when writing the data. Default: " + DefaultFormat)
+        .hidden()
+        .optional()
+
+      opt[Unit]("scanPrefetching")
+        .action((_, o) => o.copy(scanPrefetching = true))
+        .text("An experimental flag to enable pre-fetching data. " +
+          "Default: " + DefaultScanPrefetching)
+        .hidden()
         .optional()
 
+      help("help").text("prints this usage text")
+
       arg[String]("<table>...")
         .unbounded()
         .action((v, o) => o.copy(tables = o.tables :+ v))
@@ -158,37 +178,42 @@ object RestoreOptions {
   val DefaultTableSuffix: String = "-restore"
   val DefaultCreateTables: Boolean = true
 
-  // TODO: clean up usage output.
-  // TODO: timeout configurations.
-  private val parser: OptionParser[RestoreOptions] =
-    new OptionParser[RestoreOptions]("KuduRestore") {
+  val ClassName: String = KuduRestore.getClass.getCanonicalName.dropRight(1) // Remove trailing `$`
+  val ProgramName: String = "spark-submit --class " + ClassName + " [spark-options] " +
+    "<application-jar>"
+
+  val parser: OptionParser[RestoreOptions] =
+    new OptionParser[RestoreOptions](ProgramName) {
       opt[String]("rootPath")
         .action((v, o) => o.copy(rootPath = v))
         .text("The root path to the backup data. Accepts any Spark compatible path.")
-        .optional()
+        .required()
 
       opt[String]("kuduMasterAddresses")
         .action((v, o) => o.copy(kuduMasterAddresses = v))
-        .text("Comma-separated addresses of Kudu masters.")
+        .text("Comma-separated addresses of Kudu masters. Default: localhost")
         .optional()
 
       opt[Boolean]("createTables")
         .action((v, o) => o.copy(createTables = v))
-        .text("true to create tables during restore, false if they already exist.")
+        .text("If true, create the tables during restore. Set to false if the target tables " +
+          "already exist. Default: " + DefaultCreateTables)
         .optional()
 
       opt[String]("tableSuffix")
         .action((v, o) => o.copy(tableSuffix = v))
-        .text("The suffix to add to the restored table names. Only used when createTables is true.")
+        .text("The suffix to add to the restored table names. Only used when createTables is " +
+          "true. Default: " + DefaultTableSuffix)
         .optional()
 
       opt[Long]("timestampMs")
         .action((v, o) => o.copy(timestampMs = v))
-        .text(
-          "A UNIX timestamp in milliseconds that define the latest time to use when selecting " +
-            "restore candidates.")
+        .text("A UNIX timestamp in milliseconds that defines the latest time to use when " +
+          "selecting restore candidates. Default: `System.currentTimeMillis()`")
         .optional()
 
+      help("help").text("prints this usage text")
+
       arg[String]("<table>...")
         .unbounded()
         .action((v, o) => o.copy(tables = o.tables :+ v))
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/SessionIO.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/SessionIO.scala
index 40f513f..0030362 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/SessionIO.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/SessionIO.scala
@@ -55,7 +55,7 @@ import scala.collection.mutable
  * In the above path the `/<rootPath>` can be used to distinguish separate backup groups.
  * The `<backup-id>` is currently the `toMs` time for the job.
  *
- * TODO: Should the tableName contain the table id?
+ * TODO (KUDU-2788): Should the tableName contain the table id?
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
diff --git a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
index 8e1c6a2..2105bbd 100644
--- a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
+++ b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
@@ -68,7 +68,7 @@ class TestKuduBackup extends KuduTestSuite {
     assertTrue(partitionSchemasMatch(tA.getPartitionSchema, tB.getPartitionSchema))
   }
 
-  // TODO: Add a thorough randomized test for full and incremental backup/restore.
+  // TODO (KUDU-2790): Add a thorough randomized test for full and incremental backup/restore.
   @Test
   def TestIncrementalBackupAndRestore() {
     insertRows(table, 100) // Insert data into the default test table.
@@ -225,7 +225,6 @@ class TestKuduBackup extends KuduTestSuite {
     backupAndRestore(Seq(tableName))
   }
 
-  // TODO: Move to a Schema equals/equivalent method.
   def schemasMatch(before: Schema, after: Schema): Boolean = {
     if (before eq after) return true
     if (before.getColumns.size != after.getColumns.size) return false
@@ -234,7 +233,6 @@ class TestKuduBackup extends KuduTestSuite {
     }
   }
 
-  // TODO: Move to a ColumnSchema equals/equivalent method.
   def columnsMatch(before: ColumnSchema, after: ColumnSchema): Boolean = {
     if (before eq after) return true
     Objects.equal(before.getName, after.getName) &&
@@ -260,7 +258,6 @@ class TestKuduBackup extends KuduTestSuite {
     }
   }
 
-  // TODO: Move to a PartitionSchema equals/equivalent method.
   def partitionSchemasMatch(before: PartitionSchema, after: PartitionSchema): Boolean = {
     if (before eq after) return true
     val beforeBuckets = before.getHashBucketSchemas.asScala
@@ -295,7 +292,6 @@ class TestKuduBackup extends KuduTestSuite {
     kuduClient.createTable(name, schema, options)
   }
 
-  // TODO: Add updates and deletes when incremental backups are supported.
   def loadRandomData(table: KuduTable): IndexedSeq[PartialRow] = {
     val kuduSession = kuduClient.newSession()
     val dataGenerator = new DataGeneratorBuilder()
diff --git a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestOptions.scala b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestOptions.scala
new file mode 100644
index 0000000..0eec6f1
--- /dev/null
+++ b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestOptions.scala
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.backup
+
+import org.apache.kudu.spark.kudu.KuduTestSuite
+import org.junit.Assert._
+import org.junit.Test
+
+class TestOptions extends KuduTestSuite {
+
+  @Test
+  def testBackupOptionsHelp() {
+    val expectedStr =
+      """Usage: spark-submit --class org.apache.kudu.backup.KuduBackup [spark-options] <application-jar> [options] <table>...
+        |
+        |  --rootPath <value>       The root path to output backup data. Accepts any Spark compatible path.
+        |  --kuduMasterAddresses <value>
+        |                           Comma-separated addresses of Kudu masters. Default: localhost
+        |  --forceFull <value>      If true, this will be a full backup even if another full already exists. Default: false
+        |  --fromMs <value>         A UNIX timestamp in milliseconds that defines the start time of an incremental backup. If unset, the fromMs will be defined by previous backups in the root directory.
+        |  --timestampMs <value>    A UNIX timestamp in milliseconds since the epoch to execute scans at. Default: `System.currentTimeMillis()`
+        |  --scanBatchSize <value>  The maximum number of bytes returned by the scanner, on each batch. Default: 20971520
+        |  --scanRequestTimeoutMs <value>
+        |                           Sets how long in milliseconds each scan request to a server can last. Default: 30000
+        |  --scanLeaderOnly <value>
+        |                           If true scans will only use the leader replica, otherwise scans will take place at the closest replica. Default: false
+        |  --keepAlivePeriodMs <value>
+        |                           Sets the period at which to send keep-alive requests to the tablet server to ensure that scanners do not time out. Default: 15000
+        |  --help                   prints this usage text
+        |  <table>...               A list of tables to be backed up.""".stripMargin
+    assertEquals(expectedStr, BackupOptions.parser.renderTwoColumnsUsage)
+  }
+
+  @Test
+  def testRestoreOptionsHelp() {
+    val expectedStr =
+      """Usage: spark-submit --class org.apache.kudu.backup.KuduRestore [spark-options] <application-jar> [options] <table>...
+        |
+        |  --rootPath <value>       The root path to the backup data. Accepts any Spark compatible path.
+        |  --kuduMasterAddresses <value>
+        |                           Comma-separated addresses of Kudu masters. Default: localhost
+        |  --createTables <value>   If true, create the tables during restore. Set to false if the target tables already exist. Default: true
+        |  --tableSuffix <value>    The suffix to add to the restored table names. Only used when createTables is true. Default: -restore
+        |  --timestampMs <value>    A UNIX timestamp in milliseconds that defines the latest time to use when selecting restore candidates. Default: `System.currentTimeMillis()`
+        |  --help                   prints this usage text
+        |  <table>...               A list of tables to be restored.""".stripMargin
+    assertEquals(expectedStr, RestoreOptions.parser.renderTwoColumnsUsage)
+  }
+}
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
index 92a8515..c47a084 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
@@ -84,11 +84,12 @@ class KuduRDD private[kudu] (
         // Only list the leader replica as the preferred location if
         // replica selection policy is leader only, to take advantage
         // of scan locality.
-        var locations: Array[String] = null
-        if (options.scanLocality == ReplicaSelection.LEADER_ONLY) {
-          locations = Array(token.getTablet.getLeaderReplica.getRpcHost)
-        } else {
-          locations = token.getTablet.getReplicas.asScala.map(_.getRpcHost).toArray
+        val locations = {
+          if (options.scanLocality == ReplicaSelection.LEADER_ONLY) {
+            Array(token.getTablet.getLeaderReplica.getRpcHost)
+          } else {
+            token.getTablet.getReplicas.asScala.map(_.getRpcHost).toArray
+          }
         }
         new KuduPartition(index, token.serialize(), locations)
     }.toArray