You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2019/02/07 22:16:46 UTC
[kudu] 01/03: KUDU-2676 Support restoring tables over the maximum
allowed replicas
This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
commit ce60d6408f5ac0dd4f9f53ce2ab9a9ce76aad211
Author: Will Berkeley <wd...@gmail.org>
AuthorDate: Tue Feb 5 12:47:15 2019 -0800
KUDU-2676 Support restoring tables over the maximum allowed replicas
Backup can backup any table, but restore could only restore tables with
60 or less tablets (with default settings) because Kudu limits the
number of tablets created as part of creating a table. Additional
tablets can be added by adding more range partitions.
This patch fixes the issue by creating a restored table in two steps:
1. Creating the table with a single range partition (or no range
partition if the table does not have any range partitions).
2. Creating all the rest of the range partitions by adding them one at a
time.
This happens up front before any rows are restored.
There are two additional tests:
1. A test that creates, backs up, and restores a table with 101
range partitions.
2. A test that creates, backs up, and restores a table with 0 range
partitions.
Change-Id: If6527d083fc85c25ab0fa7d21cbb339f63374de6
Reviewed-on: http://gerrit.cloudera.org:8080/12372
Reviewed-by: Grant Henke <gr...@apache.org>
Tested-by: Kudu Jenkins
Reviewed-by: Mike Percy <mp...@apache.org>
---
.../scala/org/apache/kudu/backup/KuduRestore.scala | 36 ++++++++++++--
.../org/apache/kudu/backup/TableMetadata.scala | 13 +++--
.../org/apache/kudu/backup/TestKuduBackup.scala | 58 ++++++++++++++++++++--
3 files changed, 92 insertions(+), 15 deletions(-)
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
index 9a0173c..b8a94b7 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
@@ -24,9 +24,10 @@ import java.nio.file.Paths
import com.google.common.io.CharStreams
import com.google.protobuf.util.JsonFormat
-import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.{Path => HPath}
+
import org.apache.kudu.backup.Backup.TableMetadataPB
+import org.apache.kudu.client.AlterTableOptions
import org.apache.kudu.spark.kudu.KuduContext
import org.apache.kudu.spark.kudu.KuduWriteOptions
import org.apache.spark.sql.SparkSession
@@ -55,10 +56,7 @@ object KuduRestore {
val restoreName = s"${metadata.getTableName}${options.tableSuffix}"
val table =
if (options.createTables) {
- // Read the metadata and generate a schema.
- val schema = TableMetadata.getKuduSchema(metadata)
- val createTableOptions = TableMetadata.getCreateTableOptions(metadata)
- context.createTable(restoreName, schema, createTableOptions)
+ createTableRangePartitionByRangePartition(restoreName, metadata, context)
} else {
context.syncClient.openTable(restoreName)
}
@@ -94,6 +92,34 @@ object KuduRestore {
builder.build()
}
+ // Kudu isn't good at creating a lot of tablets at once, and by default tables may only be created
+ // with at most 60 tablets. Additional tablets can be added later by adding range partitions. So,
+ // to restore tables with more tablets than that, we need to create the table piece-by-piece. This
+ // does so in the simplest way: creating the table with the first range partition, if there is
+ // one, and then altering it to add the rest of the partitions, one partition at a time.
+ private def createTableRangePartitionByRangePartition(
+ restoreName: String,
+ metadata: TableMetadataPB,
+ context: KuduContext): Unit = {
+ // Create the table with the first range partition (or none if there are none).
+ val schema = TableMetadata.getKuduSchema(metadata)
+ val options = TableMetadata.getCreateTableOptionsWithoutRangePartitions(metadata)
+ val bounds = TableMetadata.getRangeBoundPartialRows(metadata)
+ bounds.headOption.foreach(bound => {
+ val (lower, upper) = bound
+ options.addRangePartition(lower, upper)
+ })
+ context.createTable(restoreName, schema, options)
+
+ // Add the rest of the range partitions through alters.
+ bounds.tail.foreach(bound => {
+ val (lower, upper) = bound
+ val options = new AlterTableOptions()
+ options.addRangePartition(lower, upper)
+ context.syncClient.alterTable(restoreName, options)
+ })
+ }
+
def main(args: Array[String]): Unit = {
val options = KuduRestoreOptions
.parse(args)
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/TableMetadata.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/TableMetadata.scala
index acb9fb4..64845b4 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/TableMetadata.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/TableMetadata.scala
@@ -276,7 +276,7 @@ object TableMetadata {
}
}
- def getCreateTableOptions(metadata: TableMetadataPB): CreateTableOptions = {
+ def getCreateTableOptionsWithoutRangePartitions(metadata: TableMetadataPB): CreateTableOptions = {
val schema = getKuduSchema(metadata)
val options = new CreateTableOptions()
options.setNumReplicas(metadata.getNumReplicas)
@@ -287,12 +287,15 @@ object TableMetadata {
val rangePartitionColumns =
metadata.getPartitions.getRangePartitions.getColumnNamesList
options.setRangePartitionColumns(rangePartitionColumns)
- metadata.getPartitions.getRangePartitions.getBoundsList.asScala.foreach { b =>
+ options
+ }
+
+ def getRangeBoundPartialRows(metadata: TableMetadataPB): Seq[(PartialRow, PartialRow)] = {
+ val schema = getKuduSchema(metadata)
+ metadata.getPartitions.getRangePartitions.getBoundsList.asScala.map { b =>
val lower = getPartialRow(b.getLowerBoundsList.asScala, schema)
val upper = getPartialRow(b.getUpperBoundsList.asScala, schema)
- options.addRangePartition(lower, upper)
+ (lower, upper)
}
- options
}
-
}
diff --git a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
index d1fe486..3180e6b 100644
--- a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
+++ b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
@@ -22,11 +22,9 @@ import java.util
import com.google.common.base.Objects
import org.apache.commons.io.FileUtils
+
import org.apache.kudu.client.PartitionSchema.HashBucketSchema
-import org.apache.kudu.client.CreateTableOptions
-import org.apache.kudu.client.KuduTable
-import org.apache.kudu.client.PartialRow
-import org.apache.kudu.client.PartitionSchema
+import org.apache.kudu.client._
import org.apache.kudu.ColumnSchema
import org.apache.kudu.Schema
import org.apache.kudu.Type
@@ -42,7 +40,6 @@ import org.junit.Before
import org.junit.Test
import org.slf4j.Logger
import org.slf4j.LoggerFactory
-
import scala.collection.JavaConverters._
import scala.util.Random
@@ -135,6 +132,57 @@ class TestKuduBackup extends KuduTestSuite {
assertResult(numRows)(rdd2.count())
}
+ @Test
+ def testBackupAndRestoreTableWithManyPartitions(): Unit = {
+ val kNumPartitions = 100
+ val tableName = "many-partitions-table"
+
+ val options = new CreateTableOptions()
+ .setRangePartitionColumns(List("key").asJava)
+ .setNumReplicas(1)
+
+ // Add one range partition and create the table. Separate the range partition
+ // from the ones added later so there's a bounded, non-covered range.
+ val initialLower = schema.newPartialRow()
+ initialLower.addInt("key", -5)
+ val initialUpper = schema.newPartialRow()
+ initialUpper.addInt("key", -4)
+ options.addRangePartition(initialLower, initialUpper)
+ val table = kuduClient.createTable(tableName, schema, options)
+
+ // Add the rest of the partitions via alter.
+ for (i <- 0 to kNumPartitions) {
+ val alterOptions = new AlterTableOptions()
+ val lower = schema.newPartialRow()
+ lower.addInt("key", i)
+ val upper = schema.newPartialRow()
+ upper.addInt("key", i + 1)
+ alterOptions.addRangePartition(lower, upper)
+ kuduClient.alterTable(tableName, alterOptions)
+ }
+
+ // Insert some rows. Note that each row will go into a different range
+ // partition, and the initial partition will be empty.
+ insertRows(table, kNumPartitions)
+
+ // Now backup and restore the table.
+ backupAndRestore(tableName)
+ }
+
+ @Test
+ def testBackupAndRestoreTableWithNoRangePartitions(): Unit = {
+ val tableName = "only-hash-partitions-table"
+
+ val options = new CreateTableOptions()
+ .addHashPartitions(List("key").asJava, 2)
+ .setNumReplicas(1)
+ val table1 = kuduClient.createTable(tableName, schema, options)
+
+ insertRows(table1, 100)
+
+ backupAndRestore(tableName)
+ }
+
// TODO: Move to a Schema equals/equivalent method.
def schemasMatch(before: Schema, after: Schema): Boolean = {
if (before eq after) return true