You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2019/02/07 22:16:46 UTC

[kudu] 01/03: KUDU-2676 Support restoring tables over the maximum allowed replicas

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit ce60d6408f5ac0dd4f9f53ce2ab9a9ce76aad211
Author: Will Berkeley <wd...@gmail.org>
AuthorDate: Tue Feb 5 12:47:15 2019 -0800

    KUDU-2676 Support restoring tables over the maximum allowed replicas
    
    Backup can backup any table, but restore could only restore tables with
    60 or less tablets (with default settings) because Kudu limits the
    number of tablets created as part of creating a table. Additional
    tablets can be added by adding more range partitions.
    
    This patch fixes the issue by creating a restored table in two steps:
    1. Creating the table with a single range partition (or no range
       partition if the table does not have any range partitions).
    2. Creating all the rest of the range partitions by adding them one at a
       time.
    This happens up front before any rows are restored.
    
    There are two additional tests:
    1. A test that creates, backs up, and restores a table with 101
       range partitions.
    2. A test that creates, backs up, and restores a table with 0 range
       partitions.
    
    Change-Id: If6527d083fc85c25ab0fa7d21cbb339f63374de6
    Reviewed-on: http://gerrit.cloudera.org:8080/12372
    Reviewed-by: Grant Henke <gr...@apache.org>
    Tested-by: Kudu Jenkins
    Reviewed-by: Mike Percy <mp...@apache.org>
---
 .../scala/org/apache/kudu/backup/KuduRestore.scala | 36 ++++++++++++--
 .../org/apache/kudu/backup/TableMetadata.scala     | 13 +++--
 .../org/apache/kudu/backup/TestKuduBackup.scala    | 58 ++++++++++++++++++++--
 3 files changed, 92 insertions(+), 15 deletions(-)

diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
index 9a0173c..b8a94b7 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
@@ -24,9 +24,10 @@ import java.nio.file.Paths
 
 import com.google.common.io.CharStreams
 import com.google.protobuf.util.JsonFormat
-import org.apache.hadoop.fs.FileSystem
 import org.apache.hadoop.fs.{Path => HPath}
+
 import org.apache.kudu.backup.Backup.TableMetadataPB
+import org.apache.kudu.client.AlterTableOptions
 import org.apache.kudu.spark.kudu.KuduContext
 import org.apache.kudu.spark.kudu.KuduWriteOptions
 import org.apache.spark.sql.SparkSession
@@ -55,10 +56,7 @@ object KuduRestore {
       val restoreName = s"${metadata.getTableName}${options.tableSuffix}"
       val table =
         if (options.createTables) {
-          // Read the metadata and generate a schema.
-          val schema = TableMetadata.getKuduSchema(metadata)
-          val createTableOptions = TableMetadata.getCreateTableOptions(metadata)
-          context.createTable(restoreName, schema, createTableOptions)
+          createTableRangePartitionByRangePartition(restoreName, metadata, context)
         } else {
           context.syncClient.openTable(restoreName)
         }
@@ -94,6 +92,34 @@ object KuduRestore {
     builder.build()
   }
 
+  // Kudu isn't good at creating a lot of tablets at once, and by default tables may only be created
+  // with at most 60 tablets. Additional tablets can be added later by adding range partitions. So,
+  // to restore tables with more tablets than that, we need to create the table piece-by-piece. This
+  // does so in the simplest way: creating the table with the first range partition, if there is
+  // one, and then altering it to add the rest of the partitions, one partition at a time.
+  private def createTableRangePartitionByRangePartition(
+      restoreName: String,
+      metadata: TableMetadataPB,
+      context: KuduContext): Unit = {
+    // Create the table with the first range partition (or none if there are none).
+    val schema = TableMetadata.getKuduSchema(metadata)
+    val options = TableMetadata.getCreateTableOptionsWithoutRangePartitions(metadata)
+    val bounds = TableMetadata.getRangeBoundPartialRows(metadata)
+    bounds.headOption.foreach(bound => {
+      val (lower, upper) = bound
+      options.addRangePartition(lower, upper)
+    })
+    context.createTable(restoreName, schema, options)
+
+    // Add the rest of the range partitions through alters.
+    bounds.tail.foreach(bound => {
+      val (lower, upper) = bound
+      val options = new AlterTableOptions()
+      options.addRangePartition(lower, upper)
+      context.syncClient.alterTable(restoreName, options)
+    })
+  }
+
   def main(args: Array[String]): Unit = {
     val options = KuduRestoreOptions
       .parse(args)
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/TableMetadata.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/TableMetadata.scala
index acb9fb4..64845b4 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/TableMetadata.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/TableMetadata.scala
@@ -276,7 +276,7 @@ object TableMetadata {
     }
   }
 
-  def getCreateTableOptions(metadata: TableMetadataPB): CreateTableOptions = {
+  def getCreateTableOptionsWithoutRangePartitions(metadata: TableMetadataPB): CreateTableOptions = {
     val schema = getKuduSchema(metadata)
     val options = new CreateTableOptions()
     options.setNumReplicas(metadata.getNumReplicas)
@@ -287,12 +287,15 @@ object TableMetadata {
     val rangePartitionColumns =
       metadata.getPartitions.getRangePartitions.getColumnNamesList
     options.setRangePartitionColumns(rangePartitionColumns)
-    metadata.getPartitions.getRangePartitions.getBoundsList.asScala.foreach { b =>
+    options
+  }
+
+  def getRangeBoundPartialRows(metadata: TableMetadataPB): Seq[(PartialRow, PartialRow)] = {
+    val schema = getKuduSchema(metadata)
+    metadata.getPartitions.getRangePartitions.getBoundsList.asScala.map { b =>
       val lower = getPartialRow(b.getLowerBoundsList.asScala, schema)
       val upper = getPartialRow(b.getUpperBoundsList.asScala, schema)
-      options.addRangePartition(lower, upper)
+      (lower, upper)
     }
-    options
   }
-
 }
diff --git a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
index d1fe486..3180e6b 100644
--- a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
+++ b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
@@ -22,11 +22,9 @@ import java.util
 
 import com.google.common.base.Objects
 import org.apache.commons.io.FileUtils
+
 import org.apache.kudu.client.PartitionSchema.HashBucketSchema
-import org.apache.kudu.client.CreateTableOptions
-import org.apache.kudu.client.KuduTable
-import org.apache.kudu.client.PartialRow
-import org.apache.kudu.client.PartitionSchema
+import org.apache.kudu.client._
 import org.apache.kudu.ColumnSchema
 import org.apache.kudu.Schema
 import org.apache.kudu.Type
@@ -42,7 +40,6 @@ import org.junit.Before
 import org.junit.Test
 import org.slf4j.Logger
 import org.slf4j.LoggerFactory
-
 import scala.collection.JavaConverters._
 import scala.util.Random
 
@@ -135,6 +132,57 @@ class TestKuduBackup extends KuduTestSuite {
     assertResult(numRows)(rdd2.count())
   }
 
+  @Test
+  def testBackupAndRestoreTableWithManyPartitions(): Unit = {
+    val kNumPartitions = 100
+    val tableName = "many-partitions-table"
+
+    val options = new CreateTableOptions()
+      .setRangePartitionColumns(List("key").asJava)
+      .setNumReplicas(1)
+
+    // Add one range partition and create the table. Separate the range partition
+    // from the ones added later so there's a bounded, non-covered range.
+    val initialLower = schema.newPartialRow()
+    initialLower.addInt("key", -5)
+    val initialUpper = schema.newPartialRow()
+    initialUpper.addInt("key", -4)
+    options.addRangePartition(initialLower, initialUpper)
+    val table = kuduClient.createTable(tableName, schema, options)
+
+    // Add the rest of the partitions via alter.
+    for (i <- 0 to kNumPartitions) {
+      val alterOptions = new AlterTableOptions()
+      val lower = schema.newPartialRow()
+      lower.addInt("key", i)
+      val upper = schema.newPartialRow()
+      upper.addInt("key", i + 1)
+      alterOptions.addRangePartition(lower, upper)
+      kuduClient.alterTable(tableName, alterOptions)
+    }
+
+    // Insert some rows. Note that each row will go into a different range
+    // partition, and the initial partition will be empty.
+    insertRows(table, kNumPartitions)
+
+    // Now backup and restore the table.
+    backupAndRestore(tableName)
+  }
+
+  @Test
+  def testBackupAndRestoreTableWithNoRangePartitions(): Unit = {
+    val tableName = "only-hash-partitions-table"
+
+    val options = new CreateTableOptions()
+      .addHashPartitions(List("key").asJava, 2)
+      .setNumReplicas(1)
+    val table1 = kuduClient.createTable(tableName, schema, options)
+
+    insertRows(table1, 100)
+
+    backupAndRestore(tableName)
+  }
+
   // TODO: Move to a Schema equals/equivalent method.
   def schemasMatch(before: Schema, after: Schema): Boolean = {
     if (before eq after) return true