You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2020/04/06 19:11:39 UTC

[kudu] branch master updated: KUDU-3099: Remove System.exit() calls from KuduBackup/KuduRestore

This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 5432d31  KUDU-3099: Remove System.exit() calls from KuduBackup/KuduRestore
5432d31 is described below

commit 5432d316a1c0e012b247b0c83ce86a481a4597dc
Author: waleed <wa...@gmail.com>
AuthorDate: Thu Apr 2 17:04:49 2020 -0500

    KUDU-3099: Remove System.exit() calls from KuduBackup/KuduRestore
    
    The System.exit() calls have a side effect that can cause Spark to fail even
    if the run function returns 0 on success. Rather than call System.exit()
    the run() method will return true on a successful run. We then throw
    a RuntimeException() in main if we find that run() failed, otherwise
    we call SparkSession's stop() method to cleanly shutdown Spark.
    Unfortunately the issue isn't easy to reproduce but we had one
    environment exhibiting the problem and we confirmed that this patch
    fixes the issue. TestKuduBackup.scala was modified where assertFalse()
    is used to check for failure and assertTrue() for success.
    
    Change-Id: I7d1b4796b6280adecd7dab685a0281af6b2570ce
    Reviewed-on: http://gerrit.cloudera.org:8080/15638
    Tested-by: Grant Henke <gr...@apache.org>
    Reviewed-by: Grant Henke <gr...@apache.org>
---
 .../scala/org/apache/kudu/backup/KuduBackup.scala  | 14 ++++----
 .../scala/org/apache/kudu/backup/KuduRestore.scala | 14 ++++----
 .../org/apache/kudu/backup/TestKuduBackup.scala    | 42 +++++++++++-----------
 3 files changed, 37 insertions(+), 33 deletions(-)

diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala
index 2c7347a..58e7edb 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala
@@ -97,7 +97,7 @@ object KuduBackup {
     io.writeTableMetadata(tableMetadata, metadataPath)
   }
 
-  def run(options: BackupOptions, session: SparkSession): Int = {
+  def run(options: BackupOptions, session: SparkSession): Boolean = {
     // Set the job group for all the spark backup jobs.
     // Note: The job description will be overridden by each Kudu table job.
     session.sparkContext.setJobGroup(s"Kudu Backup @ ${options.toMs}", "Kudu Backup")
@@ -156,10 +156,7 @@ object KuduBackup {
         log.error(
           s"Failed to back up table $tableName: Look back in the logs for the full exception. Error: ${ex.toString}")
     }
-    if (backupResults.exists(_._2.isFailure))
-      1
-    else
-      0
+    !backupResults.exists(_._2.isFailure)
   }
 
   def main(args: Array[String]): Unit = {
@@ -172,6 +169,11 @@ object KuduBackup {
       .appName("Kudu Table Backup")
       .getOrCreate()
 
-    System.exit(run(options, session))
+    val isRunSuccessful: Boolean = run(options, session)
+    if (!isRunSuccessful) {
+      throw new RuntimeException("Kudu Table Backup application failed")
+    }
+
+    session.stop()
   }
 }
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
index 684ad3e..b2e8b6e 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
@@ -153,7 +153,7 @@ object KuduRestore {
     }
   }
 
-  def run(options: RestoreOptions, session: SparkSession): Int = {
+  def run(options: RestoreOptions, session: SparkSession): Boolean = {
     // Set the job group for all the spark restore jobs.
     // Note: The job description will be overridden by each Kudu table job.
     session.sparkContext.setJobGroup(s"Kudu Restore @ ${options.timestampMs}", "Kudu Restore")
@@ -203,10 +203,7 @@ object KuduRestore {
         log.error(
           s"Failed to restore table $tableName: Look back in the logs for the full exception. Error: ${ex.toString}")
     }
-    if (restoreResults.exists(_._2.isFailure))
-      1
-    else
-      0
+    !restoreResults.exists(_._2.isFailure)
   }
 
   // Kudu isn't good at creating a lot of tablets at once, and by default tables may only be created
@@ -315,6 +312,11 @@ object KuduRestore {
       .appName("Kudu Table Restore")
       .getOrCreate()
 
-    System.exit(run(options, session))
+    val isRunSuccessful: Boolean = run(options, session)
+    if (!isRunSuccessful) {
+      throw new RuntimeException("Kudu Table Restore application failed!")
+    }
+
+    session.stop()
   }
 }
diff --git a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
index d802568..b513b20 100644
--- a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
+++ b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
@@ -149,7 +149,7 @@ class TestKuduBackup extends KuduTestSuite {
     // default setting to not fail on individual table errors. The failure is indicated by the
     // return value.
     val options = createBackupOptions(Seq("missingTable"))
-    assertEquals(1, runBackup(options))
+    assertFalse(runBackup(options))
   }
 
   @Test
@@ -172,7 +172,7 @@ class TestKuduBackup extends KuduTestSuite {
     // table that does exist, it should not throw an exception, and it should return 1 to indicate
     // some error. The logs should contain a message about the missing table.
     val options = createBackupOptions(Seq("missingTable", tableName))
-    val logs = captureLogs(() => assertEquals(1, KuduBackup.run(options, ss)))
+    val logs = captureLogs(() => assertFalse(KuduBackup.run(options, ss)))
     assertTrue(logs.contains("the table does not exist"))
 
     // Restore the backup of the non-failed table and validate the end result.
@@ -185,7 +185,7 @@ class TestKuduBackup extends KuduTestSuite {
     // fail-on-first-error option is set.
     val failFastOptions = createRestoreOptions(Seq(tableName)).copy(failOnFirstError = true)
     try {
-      assertEquals(1, runRestore(failFastOptions))
+      assertFalse(runRestore(failFastOptions))
       fail()
     } catch {
       case e: RuntimeException =>
@@ -194,7 +194,7 @@ class TestKuduBackup extends KuduTestSuite {
 
     // Check that a restore of a table with no backups does not fail fast or throw an exception if
     // default no-fail-fast option is set.
-    assertEquals(1, runRestore(createRestoreOptions(Seq("missingTable"))))
+    assertFalse(runRestore(createRestoreOptions(Seq("missingTable"))))
   }
 
   @Test
@@ -205,7 +205,7 @@ class TestKuduBackup extends KuduTestSuite {
     // There's no guarantee about the order restores run in, so it doesn't work to test fail-fast
     // and then the default no-fail-fast because the actual table may have been restored.
     val logs = captureLogs(
-      () => assertEquals(1, runRestore(createRestoreOptions(Seq("missingTable", tableName)))))
+      () => assertFalse(runRestore(createRestoreOptions(Seq("missingTable", tableName)))))
     assertTrue(logs.contains("Failed to restore table"))
   }
 
@@ -222,7 +222,7 @@ class TestKuduBackup extends KuduTestSuite {
     // It will use a diff scan and won't check the existing dependency graph.
     val options = createBackupOptions(Seq(tableName), fromMs = beforeMs)
     val logs = captureLogs { () =>
-      assertEquals(0, runBackup(options))
+      assertTrue(runBackup(options))
     }
     assertTrue(logs.contains("Performing an incremental backup: fromMs was set to"))
     validateBackup(options, 100, true)
@@ -238,7 +238,7 @@ class TestKuduBackup extends KuduTestSuite {
     // Force a full backup. It should contain all the rows.
     val options = createBackupOptions(Seq(tableName), forceFull = true)
     val logs = captureLogs { () =>
-      assertEquals(0, runBackup(options))
+      assertTrue(runBackup(options))
     }
     assertTrue(logs.contains("Performing a full backup: forceFull was set to true"))
     validateBackup(options, 200, false)
@@ -270,17 +270,17 @@ class TestKuduBackup extends KuduTestSuite {
     // generated rows. There may be duplicates in the generated rows. Instead, the restored table is
     // checked against the original table, which is less exhaustive but still a good test.
     val options = createBackupOptions(Seq(tableName))
-    assertEquals(0, runBackup(options))
+    assertTrue(runBackup(options))
 
     // Run 1 to 5 incremental backups.
     val incrementalCount = random.nextInt(5) + 1
     (0 to incrementalCount).foreach { i =>
       loadRandomData(table, maxRows)
       val incOptions = createBackupOptions(Seq(tableName))
-      assertEquals(0, runBackup(incOptions))
+      assertTrue(runBackup(incOptions))
     }
 
-    assertEquals(0, runRestore(createRestoreOptions(Seq(tableName))))
+    assertTrue(runRestore(createRestoreOptions(Seq(tableName))))
     validateTablesMatch(tableName, s"$tableName-restore")
   }
 
@@ -296,8 +296,8 @@ class TestKuduBackup extends KuduTestSuite {
     insertRows(table1, numRows)
     insertRows(table2, numRows)
 
-    assertEquals(0, runBackup(createBackupOptions(Seq(table1Name, table2Name))))
-    assertEquals(0, runRestore(createRestoreOptions(Seq(table1Name, table2Name))))
+    assertTrue(runBackup(createBackupOptions(Seq(table1Name, table2Name))))
+    assertTrue(runRestore(createRestoreOptions(Seq(table1Name, table2Name))))
 
     val rdd1 = kuduContext.kuduRDD(ss.sparkContext, s"$table1Name-restore", List("key"))
     assertResult(numRows)(rdd1.count())
@@ -316,8 +316,8 @@ class TestKuduBackup extends KuduTestSuite {
       tableName
     }
 
-    assertEquals(0, runBackup(createBackupOptions(tableNames).copy(numParallelBackups = 3)))
-    assertEquals(0, runRestore(createRestoreOptions(tableNames).copy(numParallelRestores = 4)))
+    assertTrue(runBackup(createBackupOptions(tableNames).copy(numParallelBackups = 3)))
+    assertTrue(runRestore(createRestoreOptions(tableNames).copy(numParallelRestores = 4)))
 
     tableNames.map { tableName =>
       val rdd = kuduContext.kuduRDD(ss.sparkContext, s"$tableName-restore", List("key"))
@@ -349,7 +349,7 @@ class TestKuduBackup extends KuduTestSuite {
     // Run a backup job with custom splitSizeBytes and count the tasks.
     val backupOptions = createBackupOptions(Seq(tableName)).copy(splitSizeBytes = Some(1024))
     val actualNumTasks = withJobTaskCounter(ss.sparkContext) { () =>
-      assertEquals(0, runBackup(backupOptions))
+      assertTrue(runBackup(backupOptions))
     }
     validateBackup(backupOptions, rowCount, false)
 
@@ -469,7 +469,7 @@ class TestKuduBackup extends KuduTestSuite {
     backupAndValidateTable(tableName, 10, true)
 
     // Restore the table and validate.
-    assertEquals(0, runRestore(createRestoreOptions(Seq(tableName))))
+    assertTrue(runRestore(createRestoreOptions(Seq(tableName))))
 
     val restoreTable = kuduClient.openTable(s"$tableName-restore")
     val scanner = kuduClient.newScannerBuilder(restoreTable).build()
@@ -532,7 +532,7 @@ class TestKuduBackup extends KuduTestSuite {
     backupAndValidateTable(tableName, 15, true)
 
     // Restore the table and validate.
-    assertEquals(0, runRestore(createRestoreOptions(Seq(tableName))))
+    assertTrue(runRestore(createRestoreOptions(Seq(tableName))))
 
     val restoreTable = kuduClient.openTable(s"$tableName-restore")
     val scanner = kuduClient.newScannerBuilder(restoreTable).build()
@@ -691,11 +691,11 @@ class TestKuduBackup extends KuduTestSuite {
       expectIncremental: Boolean = false) = {
     val options = createBackupOptions(Seq(tableName))
     // Run the backup.
-    assertEquals(0, runBackup(options))
+    assertTrue(runBackup(options))
     validateBackup(options, expectedRowCount, expectIncremental)
   }
 
-  def runBackup(options: BackupOptions): Int = {
+  def runBackup(options: BackupOptions): Boolean = {
     // Log the timestamps to simplify flaky debugging.
     log.info(s"nowMs: ${System.currentTimeMillis()}")
     val hts = HybridTimeUtil.HTTimestampToPhysicalAndLogical(kuduClient.getLastPropagatedTimestamp)
@@ -733,14 +733,14 @@ class TestKuduBackup extends KuduTestSuite {
 
   def restoreAndValidateTable(tableName: String, expectedRowCount: Long) = {
     val options = createRestoreOptions(Seq(tableName))
-    assertEquals(0, runRestore(options))
+    assertTrue(runRestore(options))
 
     // Verify the table data.
     val rdd = kuduContext.kuduRDD(ss.sparkContext, s"$tableName-restore")
     assertEquals(rdd.collect.length, expectedRowCount)
   }
 
-  def runRestore(options: RestoreOptions): Int = {
+  def runRestore(options: RestoreOptions): Boolean = {
     KuduRestore.run(options, ss)
   }