You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2019/06/05 23:50:36 UTC

[kudu] 02/02: KUDU-2813: Set Spark job descriptions for backup and restore jobs

This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 90d1378c92c6bd310a6dfe72ccf410450262962c
Author: Grant Henke <gr...@apache.org>
AuthorDate: Tue Jun 4 16:09:02 2019 -0500

    KUDU-2813: Set Spark job descriptions for backup and restore jobs
    
    This patch adds Spark job descritpions to the backup
    and restore jobs. This will make debugging and
    visualizing job progress in the Spark UI more
    straightforward.
    
    Change-Id: I9c051d39d0d601f239fd97dd93aed7e0ce5405ee
    Reviewed-on: http://gerrit.cloudera.org:8080/13512
    Tested-by: Kudu Jenkins
    Reviewed-by: Mike Percy <mp...@apache.org>
---
 .../scala/org/apache/kudu/backup/KuduBackup.scala  |  8 ++++
 .../scala/org/apache/kudu/backup/KuduRestore.scala | 14 ++++++-
 .../org/apache/kudu/backup/TestKuduBackup.scala    | 29 ++++++++++++++
 .../spark/tools/DistributedDataGeneratorTest.scala |  4 +-
 .../apache/kudu/spark/kudu/DefaultSourceTest.scala |  4 +-
 .../apache/kudu/spark/kudu/SparkListenerUtil.scala | 45 ++++++++++++++++++----
 6 files changed, 91 insertions(+), 13 deletions(-)

diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala
index 00f60b7..91092a0 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala
@@ -73,6 +73,10 @@ object KuduBackup {
         tableOptions = tableOptions.copy(forceFull = true)
       }
     }
+
+    val jobTypeStr = if (incremental) "incremental" else "full"
+    session.sparkContext.setJobDescription(s"Kudu Backup($jobTypeStr): $tableName")
+
     val rdd = new KuduBackupRDD(table, tableOptions, incremental, context, session.sparkContext)
     val df =
       session.sqlContext
@@ -93,6 +97,10 @@ object KuduBackup {
   }
 
   def run(options: BackupOptions, session: SparkSession): Int = {
+    // Set the job group for all the spark backup jobs.
+    // Note: The job description will be overridden by each Kudu table job.
+    session.sparkContext.setJobGroup(s"Kudu Backup @ ${options.toMs}", "Kudu Backup")
+
     log.info(s"Backing up to root path: ${options.rootPath}")
     val context =
       new KuduContext(
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
index 791486b..2d3f1a0 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
@@ -55,9 +55,14 @@ object KuduRestore {
       throw new RuntimeException(s"No valid backups found for table: $tableName")
     }
     val graph = backupMap(tableName)
-    val lastMetadata = graph.restorePath.backups.last.metadata
+    val restorePath = graph.restorePath
+    val lastMetadata = restorePath.backups.last.metadata
     val restoreName = s"${lastMetadata.getTableName}${options.tableSuffix}"
-    graph.restorePath.backups.foreach { backup =>
+    val numJobs = restorePath.backups.size
+    var currentJob = 1
+    restorePath.backups.foreach { backup =>
+      session.sparkContext.setJobDescription(s"Kudu Restore($currentJob/$numJobs): $tableName")
+
       log.info(s"Restoring table $tableName from path: ${backup.path}")
       val metadata = backup.metadata
       val isFullRestore = metadata.getFromMs == 0
@@ -143,10 +148,15 @@ object KuduRestore {
           }
         }
       }
+      currentJob += 1
     }
   }
 
   def run(options: RestoreOptions, session: SparkSession): Int = {
+    // Set the job group for all the spark restore jobs.
+    // Note: The job description will be overridden by each Kudu table job.
+    session.sparkContext.setJobGroup(s"Kudu Restore @ ${options.timestampMs}", "Kudu Restore")
+
     log.info(s"Restoring from path: ${options.rootPath}")
     val context =
       new KuduContext(
diff --git a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
index eb210d9..566e630 100644
--- a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
+++ b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
@@ -28,6 +28,7 @@ import org.apache.kudu.ColumnSchema
 import org.apache.kudu.Schema
 import org.apache.kudu.Type
 import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder
+import org.apache.kudu.spark.kudu.SparkListenerUtil.withJobDescriptionCollector
 import org.apache.kudu.spark.kudu._
 import org.apache.kudu.test.CapturingLogAppender
 import org.apache.kudu.test.RandomUtils
@@ -95,6 +96,34 @@ class TestKuduBackup extends KuduTestSuite {
   }
 
   @Test
+  def testBackupAndRestoreJobNames() {
+    val rowCount = 100
+    insertRows(table, rowCount) // Insert data into the default test table.
+
+    // Backup the table and verify the job description.
+    val fullDesc = withJobDescriptionCollector(ss.sparkContext) { () =>
+      runBackup(createBackupOptions(Seq(table.getName)))
+    }
+    assertEquals(1, fullDesc.size)
+    assertEquals("Kudu Backup(full): test", fullDesc.head)
+
+    // Backup again and verify the job description.
+    val incDesc = withJobDescriptionCollector(ss.sparkContext) { () =>
+      runBackup(createBackupOptions(Seq(table.getName)))
+    }
+    assertEquals(1, incDesc.size)
+    assertEquals("Kudu Backup(incremental): test", incDesc.head)
+
+    // Restore the table and verify the job descriptions.
+    val restoreDesc = withJobDescriptionCollector(ss.sparkContext) { () =>
+      runRestore(createRestoreOptions(Seq(table.getName)))
+    }
+    assertEquals(2, restoreDesc.size)
+    assertTrue(restoreDesc.contains("Kudu Restore(1/2): test"))
+    assertTrue(restoreDesc.contains("Kudu Restore(2/2): test"))
+  }
+
+  @Test
   def testBackupAndRestoreWithNoRows(): Unit = {
     backupAndValidateTable(tableName, 0, false)
     backupAndValidateTable(tableName, 0, true)
diff --git a/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/DistributedDataGeneratorTest.scala b/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/DistributedDataGeneratorTest.scala
index ca1eff9..f794e4a 100644
--- a/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/DistributedDataGeneratorTest.scala
+++ b/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/DistributedDataGeneratorTest.scala
@@ -103,7 +103,7 @@ class DistributedDataGeneratorTest extends KuduTestSuite {
       harness.getMasterAddressesAsString)
 
     // count the number of tasks that end.
-    val actualNumTasks = withJobTaskCounter(ss.sparkContext) { _ =>
+    val actualNumTasks = withJobTaskCounter(ss.sparkContext) { () =>
       runGeneratorTest(args)
     }
     assertEquals(numTasks, actualNumTasks)
@@ -121,7 +121,7 @@ class DistributedDataGeneratorTest extends KuduTestSuite {
       harness.getMasterAddressesAsString)
 
     // count the number of tasks that end.
-    val actualNumTasks = withJobTaskCounter(ss.sparkContext) { _ =>
+    val actualNumTasks = withJobTaskCounter(ss.sparkContext) { () =>
       runGeneratorTest(args)
     }
 
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
index 4d1944b..787e13b 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
@@ -451,7 +451,7 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
     kuduContext.captureRows = true
 
     // Count the number of tasks that end.
-    val actualNumTasks = withJobTaskCounter(ss.sparkContext) { _ =>
+    val actualNumTasks = withJobTaskCounter(ss.sparkContext) { () =>
       kuduContext.insertRows(
         dataDF,
         tableName,
@@ -1039,7 +1039,7 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
       "kudu.splitSizeBytes" -> "1024")
 
     // count the number of tasks that end.
-    val actualNumTasks = withJobTaskCounter(ss.sparkContext) { _ =>
+    val actualNumTasks = withJobTaskCounter(ss.sparkContext) { () =>
       val t = "scanWithKeyRangeTest"
       sqlContext.read.options(kuduOptions).format("kudu").load.createOrReplaceTempView(t)
       val results = sqlContext.sql(s"SELECT * FROM $t").collectAsList()
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/SparkListenerUtil.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/SparkListenerUtil.scala
index b9bb885..b6b4209 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/SparkListenerUtil.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/SparkListenerUtil.scala
@@ -21,23 +21,45 @@ import org.apache.kudu.test.junit.AssertHelpers.BooleanExpression
 import org.apache.spark.SparkContext
 import org.apache.spark.scheduler.SparkListener
 import org.apache.spark.scheduler.SparkListenerJobEnd
+import org.apache.spark.scheduler.SparkListenerJobStart
 import org.apache.spark.scheduler.SparkListenerTaskEnd
 
+import scala.collection.mutable.ListBuffer
+
 object SparkListenerUtil {
 
-  // TODO: Use org.apache.spark.TestUtils.withListener if it becomes public test API
-  def withJobTaskCounter(sc: SparkContext)(body: Any => Unit): Int = {
+  def withJobTaskCounter(sc: SparkContext)(body: () => Unit): Int = {
     // Add a SparkListener to count the number of tasks that end.
     var numTasks = 0
-    var jobDone = false
     val listener: SparkListener = new SparkListener {
       override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
         numTasks += 1
       }
-      override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
-        jobDone = true
+    }
+    withListener(sc, listener)(body)
+    numTasks
+  }
+
+  def withJobDescriptionCollector(sc: SparkContext)(body: () => Unit): List[String] = {
+    // Add a SparkListener to collect the job descriptions.
+    val jobDescriptions = new ListBuffer[String]()
+    val listener: SparkListener = new SparkListener {
+      override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+        // TODO: Use SparkContext.SPARK_JOB_DESCRIPTION when public.
+        val description = jobStart.properties.getProperty("spark.job.description")
+        if (description != null) {
+          jobDescriptions += description
+        }
       }
     }
+    withListener(sc, listener)(body)
+    jobDescriptions.toList
+  }
+
+  // TODO: Use org.apache.spark.TestUtils.withListener if it becomes public test API
+  def withListener[L <: SparkListener](sc: SparkContext, listener: L)(body: () => Unit): Unit = {
+    val jobDoneListener = new JobDoneListener
+    sc.addSparkListener(jobDoneListener)
     sc.addSparkListener(listener)
     try {
       body()
@@ -46,10 +68,19 @@ object SparkListenerUtil {
       // private API, we use the jobEnd event to know that all of the taskEnd events
       // must have been processed.
       AssertHelpers.assertEventuallyTrue("Spark job did not complete", new BooleanExpression {
-        override def get(): Boolean = jobDone
+        override def get(): Boolean = jobDoneListener.isDone
       }, 5000)
       sc.removeSparkListener(listener)
+      sc.removeSparkListener(jobDoneListener)
     }
-    numTasks
+  }
+
+  private class JobDoneListener extends SparkListener {
+    var jobDone = false
+
+    override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
+      jobDone = true
+    }
+    def isDone = jobDone
   }
 }