You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2019/04/23 19:38:46 UTC
[kudu] branch master updated: [backup] Cleanup backup/restore TODO
comments
This is an automated email from the ASF dual-hosted git repository.
granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new a226554 [backup] Cleanup backup/restore TODO comments
a226554 is described below
commit a2265547f671f5efc5a36b8a140aa47aeb1a8be2
Author: Grant Henke <gr...@apache.org>
AuthorDate: Tue Apr 23 10:57:51 2019 -0500
[backup] Cleanup backup/restore TODO comments
This patch reviews the open TODO items in the Kudu
backup and restore code. It either addresses them,
removes them, or opens a jira to track them.
The fixed TODO items are:
- Support backups using follower replicas
- Reviewed backup command line options
- Hides “experimental” options
Change-Id: Ia03dd12b22988763640ff19f331ebe75e3cb5d6f
Reviewed-on: http://gerrit.cloudera.org:8080/13085
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>
---
java/kudu-backup/src/main/protobuf/backup.proto | 2 +-
.../scala/org/apache/kudu/backup/BackupGraph.scala | 1 -
.../scala/org/apache/kudu/backup/KuduBackup.scala | 9 +-
.../org/apache/kudu/backup/KuduBackupRDD.scala | 24 +++---
.../scala/org/apache/kudu/backup/KuduRestore.scala | 18 ++--
.../scala/org/apache/kudu/backup/Options.scala | 97 ++++++++++++++--------
.../scala/org/apache/kudu/backup/SessionIO.scala | 2 +-
.../org/apache/kudu/backup/TestKuduBackup.scala | 6 +-
.../scala/org/apache/kudu/backup/TestOptions.scala | 63 ++++++++++++++
.../scala/org/apache/kudu/spark/kudu/KuduRDD.scala | 11 +--
10 files changed, 152 insertions(+), 81 deletions(-)
diff --git a/java/kudu-backup/src/main/protobuf/backup.proto b/java/kudu-backup/src/main/protobuf/backup.proto
index 6de3b3e..da2c794 100644
--- a/java/kudu-backup/src/main/protobuf/backup.proto
+++ b/java/kudu-backup/src/main/protobuf/backup.proto
@@ -35,7 +35,7 @@ message ColumnTypeAttributesMetadataPB {
// Maps to the ColumnSchema class.
// The fields are effectively 1 to 1 mappings of those in ColumnSchema.
-// TODO: How do we handle column additions?
+// TODO (KUDU-2788): How do we handle column additions?
message ColumnMetadataPB {
string name = 1;
string type = 2;
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/BackupGraph.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/BackupGraph.scala
index 21c7ff9..1a6e92a 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/BackupGraph.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/BackupGraph.scala
@@ -166,7 +166,6 @@ case class BackupNode(path: Path, metadata: TableMetadataPB) {
*/
def weight: Int = {
// Full backups have a weight of 0 and partial backups have a weight of 1.
- // TODO: Use the size of a partial backup to contribute to weight.
if (metadata.getFromMs == 0) 0 else 1
}
}
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala
index 445bef0..a7c277f 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala
@@ -26,12 +26,6 @@ import org.slf4j.LoggerFactory
/**
* The main class for a Kudu backup spark job.
- *
- * Example Usage:
- * spark-submit --class org.apache.kudu.backup.KuduBackup kudu-backup2_2.11-*.jar \
- * --kuduMasterAddresses master1-host,master-2-host,master-3-host \
- * --rootPath hdfs:///kudu/backup/path \
- * my_kudu_table
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
@@ -46,7 +40,8 @@ object KuduBackup {
session.sparkContext
)
val io = new SessionIO(session, options)
- // TODO: Make parallel so each table isn't process serially?
+ // TODO (KUDU-2786): Make parallel so each table isn't process serially.
+ // TODO (KUDU-2787): Handle single table failures.
options.tables.foreach { tableName =>
var tableOptions = options.copy() // Copy the options so we can modify them for the table.
val table = context.syncClient.openTable(tableName)
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala
index 544e7b2..80abb6d 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala
@@ -45,15 +45,13 @@ class KuduBackupRDD private[kudu] (
// Defined here because the options are transient.
val incremental: Boolean = options.isIncremental
- // TODO: Split large tablets into smaller scan tokens?
+ // TODO (KUDU-2785): Split large tablets into smaller scan tokens?
override protected def getPartitions: Array[Partition] = {
val client = kuduContext.syncClient
val builder = client
.newScanTokenBuilder(table)
.cacheBlocks(false)
- // TODO: Use fault tolerant scans to get mostly ordered results when KUDU-2466 is fixed.
- // .setFaultTolerant(true)
.replicaSelection(ReplicaSelection.CLOSEST_REPLICA)
.readMode(ReadMode.READ_AT_SNAPSHOT)
.batchSizeBytes(options.scanBatchSize)
@@ -79,16 +77,20 @@ class KuduBackupRDD private[kudu] (
val tokens = builder.build()
tokens.asScala.zipWithIndex.map {
case (token, index) =>
- // TODO: Support backups from any replica or followers only.
- // Always run on the leader for data locality.
- val leaderLocation = token.getTablet.getLeaderReplica.getRpcHost
- KuduBackupPartition(index, token.serialize(), Array(leaderLocation))
+ // Only list the leader replica as the preferred location if
+ // replica selection policy is leader only, to take advantage
+ // of scan locality.
+ val locations: Array[String] = {
+ if (options.scanLeaderOnly) {
+ Array(token.getTablet.getLeaderReplica.getRpcHost)
+ } else {
+ token.getTablet.getReplicas.asScala.map(_.getRpcHost).toArray
+ }
+ }
+ KuduBackupPartition(index, token.serialize(), locations)
}.toArray
}
- // TODO: Do we need a custom spark partitioner for any guarantees?
- // override val partitioner = None
-
override def compute(part: Partition, taskContext: TaskContext): Iterator[Row] = {
val client: KuduClient = kuduContext.syncClient
val partition: KuduBackupPartition = part.asInstanceOf[KuduBackupPartition]
@@ -135,8 +137,6 @@ private class RowIterator(
scannerIterator.hasNext(nextRowsCallback)
}
- // TODO: There may be an old KuduRDD implementation where we did some
- // sort of zero copy/object pool pattern for performance (we could use that here).
override def next(): Row = {
val rowResult = scannerIterator.next()
val fieldCount = rowResult.getColumnProjection.getColumnCount
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
index 50517a9..c4b5f8b 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
@@ -21,7 +21,6 @@ import org.apache.kudu.client.AlterTableOptions
import org.apache.kudu.client.SessionConfiguration.FlushMode
import org.apache.kudu.spark.kudu.KuduContext
import org.apache.kudu.spark.kudu.RowConverter
-import org.apache.parquet.hadoop.ParquetInputFormat
import org.apache.spark.sql.SparkSession
import org.apache.yetus.audience.InterfaceAudience
import org.apache.yetus.audience.InterfaceStability
@@ -30,12 +29,6 @@ import org.slf4j.LoggerFactory
/**
* The main class for a Kudu restore spark job.
- *
- * Example Usage:
- * spark-submit --class org.apache.kudu.backup.KuduRestore kudu-backup2_2.11-*.jar \
- * --kuduMasterAddresses master1-host,master-2-host,master-3-host \
- * --rootPath hdfs:///kudu/backup/path \
- * my_kudu_table
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
@@ -51,15 +44,16 @@ object KuduRestore {
)
val io = new SessionIO(session, options)
- // TODO: Make parallel so each table isn't processed serially.
+ // TODO (KUDU-2786): Make parallel so each table isn't processed serially.
+ // TODO (KUDU-2787): Handle single table failures.
options.tables.foreach { tableName =>
- // TODO: Consider an option to enforce an exact toMS match.
val graph = io.readBackupGraph(tableName).filterByTime(options.timestampMs)
graph.restorePath.backups.foreach { backup =>
log.info(s"Restoring table $tableName from path: ${backup.path}")
val isFullRestore = backup.metadata.getFromMs == 0
val restoreName = s"${backup.metadata.getTableName}${options.tableSuffix}"
- // TODO: Store the full metadata to compare/validate for each applied partial.
+
+ // TODO (KUDU-2788): Store the full metadata to compare/validate for each applied partial.
// On the full restore we may need to create the table.
if (isFullRestore) {
@@ -72,7 +66,6 @@ object KuduRestore {
val restoreSchema = io.dataSchema(table.getSchema)
val rowActionCol = restoreSchema.fields.last.name
- // TODO: Restrict format option.
var data = session.sqlContext.read
.format(backup.metadata.getDataFormat)
.schema(restoreSchema)
@@ -82,8 +75,7 @@ object KuduRestore {
.na
.fill(RowAction.UPSERT.getValue, Seq(rowActionCol))
- // TODO: Expose more configuration options:
- // (session timeout, consistency mode, flush interval, mutation buffer space)
+ // Write the data to Kudu.
data.queryExecution.toRdd.foreachPartition { internalRows =>
val table = context.syncClient.openTable(restoreName)
val converter = new RowConverter(table.getSchema, restoreSchema, false)
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala
index 2874eac..51d468f 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala
@@ -43,6 +43,7 @@ case class BackupOptions(
format: String = BackupOptions.DefaultFormat,
scanBatchSize: Int = BackupOptions.DefaultScanBatchSize,
scanRequestTimeoutMs: Long = BackupOptions.DefaultScanRequestTimeoutMs,
+ scanLeaderOnly: Boolean = BackupOptions.DefaultScanLeaderOnly,
scanPrefetching: Boolean = BackupOptions.DefaultScanPrefetching,
keepAlivePeriodMs: Long = BackupOptions.DefaultKeepAlivePeriodMs)
extends CommonOptions {
@@ -60,69 +61,88 @@ object BackupOptions {
val DefaultScanBatchSize: Int = 1024 * 1024 * 20 // 20 MiB
val DefaultScanRequestTimeoutMs: Long =
AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS // 30 seconds
- // TODO: Add a test per KUDU-1260 and enable by default?
+ val DefaultScanLeaderOnly: Boolean = false
+ // TODO (KUDU-1260): Add a test and enable by default?
val DefaultScanPrefetching: Boolean = false
val DefaultKeepAlivePeriodMs: Long = AsyncKuduClient.DEFAULT_KEEP_ALIVE_PERIOD_MS
- // TODO: clean up usage output.
- // TODO: timeout configurations.
- private val parser: OptionParser[BackupOptions] =
- new OptionParser[BackupOptions]("KuduBackup") {
+ // We use the program name to make the help output show a the spark invocation required.
+ val ClassName: String = KuduBackup.getClass.getCanonicalName.dropRight(1) // Remove trailing `$`
+ val ProgramName: String = "spark-submit --class " + ClassName + " [spark-options] " +
+ "<application-jar>"
+
+ val parser: OptionParser[BackupOptions] =
+ new OptionParser[BackupOptions](ProgramName) {
opt[String]("rootPath")
.action((v, o) => o.copy(rootPath = v))
.text("The root path to output backup data. Accepts any Spark compatible path.")
- .optional()
+ .required()
opt[String]("kuduMasterAddresses")
.action((v, o) => o.copy(kuduMasterAddresses = v))
- .text("Comma-separated addresses of Kudu masters.")
+ .text("Comma-separated addresses of Kudu masters. Default: localhost")
.optional()
opt[Boolean]("forceFull")
.action((v, o) => o.copy(forceFull = v))
- .text("If true, this will be a full backup even if another full already exists.")
+ .text("If true, this will be a full backup even if another full already exists. " +
+ "Default: " + DefaultForceFull)
.optional()
opt[Long]("fromMs")
.action((v, o) => o.copy(fromMs = v))
.text(
- "A UNIX timestamp in milliseconds that defines the start time of an incremental backup.")
+ "A UNIX timestamp in milliseconds that defines the start time of an incremental " +
+ "backup. If unset, the fromMs will be defined by previous backups in the root " +
+ "directory.")
.optional()
opt[Long]("timestampMs")
.action((v, o) => o.copy(toMs = v))
- // TODO: Document the limitations based on cluster configuration
- // (ex: ancient history watermark).
- .text("A UNIX timestamp in milliseconds since the epoch to execute scans at.")
- .optional()
-
- opt[String]("format")
- .action((v, o) => o.copy(format = v))
- .text("The file format to use when writing the data.")
+ // TODO (KUDU-2677): Document the limitations based on cluster configuration.
+ .text("A UNIX timestamp in milliseconds since the epoch to execute scans at. " +
+ "Default: `System.currentTimeMillis()`")
.optional()
opt[Int]("scanBatchSize")
.action((v, o) => o.copy(scanBatchSize = v))
- .text("The maximum number of bytes returned by the scanner, on each batch.")
+ .text("The maximum number of bytes returned by the scanner, on each batch. " +
+ "Default: " + DefaultScanBatchSize)
.optional()
opt[Int]("scanRequestTimeoutMs")
.action((v, o) => o.copy(scanRequestTimeoutMs = v))
- .text("Sets how long in milliseconds each scan request to a server can last.")
+ .text("Sets how long in milliseconds each scan request to a server can last. " +
+ "Default: " + DefaultScanRequestTimeoutMs)
.optional()
- opt[Unit]("scanPrefetching")
- .action((_, o) => o.copy(scanPrefetching = true))
- .text("An experimental flag to enable pre-fetching data.")
+ opt[Boolean]("scanLeaderOnly")
+ .action((v, o) => o.copy(scanLeaderOnly = v))
+ .text("If true scans will only use the leader replica, otherwise scans will take place " +
+ "at the closest replica. Default: " + DefaultScanLeaderOnly)
.optional()
opt[Long]("keepAlivePeriodMs")
.action((v, o) => o.copy(keepAlivePeriodMs = v))
- .text(
- "Sets the period at which to send keep-alive requests to the tablet server to ensure" +
- " that scanners do not time out")
+ .text("Sets the period at which to send keep-alive requests to the tablet server to " +
+ "ensure that scanners do not time out. Default: " + DefaultKeepAlivePeriodMs)
+ .optional()
+
+ opt[String]("format")
+ .action((v, o) => o.copy(format = v))
+ .text("The file format to use when writing the data. Default: " + DefaultFormat)
+ .hidden()
+ .optional()
+
+ opt[Unit]("scanPrefetching")
+ .action((_, o) => o.copy(scanPrefetching = true))
+ .text("An experimental flag to enable pre-fetching data. " +
+ "Default: " + DefaultScanPrefetching)
+ .hidden()
.optional()
+ help("help").text("prints this usage text")
+
arg[String]("<table>...")
.unbounded()
.action((v, o) => o.copy(tables = o.tables :+ v))
@@ -158,37 +178,42 @@ object RestoreOptions {
val DefaultTableSuffix: String = "-restore"
val DefaultCreateTables: Boolean = true
- // TODO: clean up usage output.
- // TODO: timeout configurations.
- private val parser: OptionParser[RestoreOptions] =
- new OptionParser[RestoreOptions]("KuduRestore") {
+ val ClassName: String = KuduRestore.getClass.getCanonicalName.dropRight(1) // Remove trailing `$`
+ val ProgramName: String = "spark-submit --class " + ClassName + " [spark-options] " +
+ "<application-jar>"
+
+ val parser: OptionParser[RestoreOptions] =
+ new OptionParser[RestoreOptions](ProgramName) {
opt[String]("rootPath")
.action((v, o) => o.copy(rootPath = v))
.text("The root path to the backup data. Accepts any Spark compatible path.")
- .optional()
+ .required()
opt[String]("kuduMasterAddresses")
.action((v, o) => o.copy(kuduMasterAddresses = v))
- .text("Comma-separated addresses of Kudu masters.")
+ .text("Comma-separated addresses of Kudu masters. Default: localhost")
.optional()
opt[Boolean]("createTables")
.action((v, o) => o.copy(createTables = v))
- .text("true to create tables during restore, false if they already exist.")
+ .text("If true, create the tables during restore. Set to false if the target tables " +
+ "already exist. Default: " + DefaultCreateTables)
.optional()
opt[String]("tableSuffix")
.action((v, o) => o.copy(tableSuffix = v))
- .text("The suffix to add to the restored table names. Only used when createTables is true.")
+ .text("The suffix to add to the restored table names. Only used when createTables is " +
+ "true. Default: " + DefaultTableSuffix)
.optional()
opt[Long]("timestampMs")
.action((v, o) => o.copy(timestampMs = v))
- .text(
- "A UNIX timestamp in milliseconds that define the latest time to use when selecting " +
- "restore candidates.")
+ .text("A UNIX timestamp in milliseconds that defines the latest time to use when " +
+ "selecting restore candidates. Default: `System.currentTimeMillis()`")
.optional()
+ help("help").text("prints this usage text")
+
arg[String]("<table>...")
.unbounded()
.action((v, o) => o.copy(tables = o.tables :+ v))
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/SessionIO.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/SessionIO.scala
index 40f513f..0030362 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/SessionIO.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/SessionIO.scala
@@ -55,7 +55,7 @@ import scala.collection.mutable
* In the above path the `/<rootPath>` can be used to distinguish separate backup groups.
* The `<backup-id>` is currently the `toMs` time for the job.
*
- * TODO: Should the tableName contain the table id?
+ * TODO (KUDU-2788): Should the tableName contain the table id?
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
diff --git a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
index 8e1c6a2..2105bbd 100644
--- a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
+++ b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
@@ -68,7 +68,7 @@ class TestKuduBackup extends KuduTestSuite {
assertTrue(partitionSchemasMatch(tA.getPartitionSchema, tB.getPartitionSchema))
}
- // TODO: Add a thorough randomized test for full and incremental backup/restore.
+ // TODO (KUDU-2790): Add a thorough randomized test for full and incremental backup/restore.
@Test
def TestIncrementalBackupAndRestore() {
insertRows(table, 100) // Insert data into the default test table.
@@ -225,7 +225,6 @@ class TestKuduBackup extends KuduTestSuite {
backupAndRestore(Seq(tableName))
}
- // TODO: Move to a Schema equals/equivalent method.
def schemasMatch(before: Schema, after: Schema): Boolean = {
if (before eq after) return true
if (before.getColumns.size != after.getColumns.size) return false
@@ -234,7 +233,6 @@ class TestKuduBackup extends KuduTestSuite {
}
}
- // TODO: Move to a ColumnSchema equals/equivalent method.
def columnsMatch(before: ColumnSchema, after: ColumnSchema): Boolean = {
if (before eq after) return true
Objects.equal(before.getName, after.getName) &&
@@ -260,7 +258,6 @@ class TestKuduBackup extends KuduTestSuite {
}
}
- // TODO: Move to a PartitionSchema equals/equivalent method.
def partitionSchemasMatch(before: PartitionSchema, after: PartitionSchema): Boolean = {
if (before eq after) return true
val beforeBuckets = before.getHashBucketSchemas.asScala
@@ -295,7 +292,6 @@ class TestKuduBackup extends KuduTestSuite {
kuduClient.createTable(name, schema, options)
}
- // TODO: Add updates and deletes when incremental backups are supported.
def loadRandomData(table: KuduTable): IndexedSeq[PartialRow] = {
val kuduSession = kuduClient.newSession()
val dataGenerator = new DataGeneratorBuilder()
diff --git a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestOptions.scala b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestOptions.scala
new file mode 100644
index 0000000..0eec6f1
--- /dev/null
+++ b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestOptions.scala
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.backup
+
+import org.apache.kudu.spark.kudu.KuduTestSuite
+import org.junit.Assert._
+import org.junit.Test
+
+class TestOptions extends KuduTestSuite {
+
+ @Test
+ def testBackupOptionsHelp() {
+ val expectedStr =
+ """Usage: spark-submit --class org.apache.kudu.backup.KuduBackup [spark-options] <application-jar> [options] <table>...
+ |
+ | --rootPath <value> The root path to output backup data. Accepts any Spark compatible path.
+ | --kuduMasterAddresses <value>
+ | Comma-separated addresses of Kudu masters. Default: localhost
+ | --forceFull <value> If true, this will be a full backup even if another full already exists. Default: false
+ | --fromMs <value> A UNIX timestamp in milliseconds that defines the start time of an incremental backup. If unset, the fromMs will be defined by previous backups in the root directory.
+ | --timestampMs <value> A UNIX timestamp in milliseconds since the epoch to execute scans at. Default: `System.currentTimeMillis()`
+ | --scanBatchSize <value> The maximum number of bytes returned by the scanner, on each batch. Default: 20971520
+ | --scanRequestTimeoutMs <value>
+ | Sets how long in milliseconds each scan request to a server can last. Default: 30000
+ | --scanLeaderOnly <value>
+ | If true scans will only use the leader replica, otherwise scans will take place at the closest replica. Default: false
+ | --keepAlivePeriodMs <value>
+ | Sets the period at which to send keep-alive requests to the tablet server to ensure that scanners do not time out. Default: 15000
+ | --help prints this usage text
+ | <table>... A list of tables to be backed up.""".stripMargin
+ assertEquals(expectedStr, BackupOptions.parser.renderTwoColumnsUsage)
+ }
+
+ @Test
+ def testRestoreOptionsHelp() {
+ val expectedStr =
+ """Usage: spark-submit --class org.apache.kudu.backup.KuduRestore [spark-options] <application-jar> [options] <table>...
+ |
+ | --rootPath <value> The root path to the backup data. Accepts any Spark compatible path.
+ | --kuduMasterAddresses <value>
+ | Comma-separated addresses of Kudu masters. Default: localhost
+ | --createTables <value> If true, create the tables during restore. Set to false if the target tables already exist. Default: true
+ | --tableSuffix <value> The suffix to add to the restored table names. Only used when createTables is true. Default: -restore
+ | --timestampMs <value> A UNIX timestamp in milliseconds that defines the latest time to use when selecting restore candidates. Default: `System.currentTimeMillis()`
+ | --help prints this usage text
+ | <table>... A list of tables to be restored.""".stripMargin
+ assertEquals(expectedStr, RestoreOptions.parser.renderTwoColumnsUsage)
+ }
+}
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
index 92a8515..c47a084 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
@@ -84,11 +84,12 @@ class KuduRDD private[kudu] (
// Only list the leader replica as the preferred location if
// replica selection policy is leader only, to take advantage
// of scan locality.
- var locations: Array[String] = null
- if (options.scanLocality == ReplicaSelection.LEADER_ONLY) {
- locations = Array(token.getTablet.getLeaderReplica.getRpcHost)
- } else {
- locations = token.getTablet.getReplicas.asScala.map(_.getRpcHost).toArray
+ val locations = {
+ if (options.scanLocality == ReplicaSelection.LEADER_ONLY) {
+ Array(token.getTablet.getLeaderReplica.getRpcHost)
+ } else {
+ token.getTablet.getReplicas.asScala.map(_.getRpcHost).toArray
+ }
}
new KuduPartition(index, token.serialize(), locations)
}.toArray