You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/07/19 17:29:40 UTC

[hudi] branch master updated: [HUDI-92] Provide reasonable names for Spark DAG stages in HUDI. (#1289)

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b71f25f  [HUDI-92] Provide reasonable names for Spark DAG stages in HUDI. (#1289)
b71f25f is described below

commit b71f25f210c4004a2dcc97a9967399e74f870fc7
Author: Prashant Wason <pw...@uber.com>
AuthorDate: Sun Jul 19 10:29:25 2020 -0700

    [HUDI-92] Provide reasonable names for Spark DAG stages in HUDI. (#1289)
---
 README.md                                          | 12 ++++++++++++
 .../apache/hudi/client/CompactionAdminClient.java  |  3 +++
 .../org/apache/hudi/index/HoodieIndexUtils.java    |  1 +
 .../apache/hudi/index/bloom/HoodieBloomIndex.java  |  1 +
 .../java/org/apache/hudi/table/HoodieTable.java    |  2 ++
 .../table/action/clean/CleanActionExecutor.java    |  3 +++
 .../table/action/commit/UpsertPartitioner.java     |  1 +
 .../compact/HoodieMergeOnReadTableCompactor.java   |  2 ++
 .../MergeOnReadRollbackActionExecutor.java         |  1 +
 .../hudi/table/action/rollback/RollbackHelper.java |  1 +
 .../action/savepoint/SavepointActionExecutor.java  |  1 +
 .../hudi/index/bloom/TestHoodieBloomIndex.java     |  2 +-
 .../index/bloom/TestHoodieGlobalBloomIndex.java    |  2 +-
 .../apache/hudi/io/TestHoodieCommitArchiveLog.java |  2 +-
 .../org/apache/hudi/io/TestHoodieMergeHandle.java  |  2 +-
 .../table/action/compact/TestHoodieCompactor.java  |  2 +-
 .../hudi/testutils/HoodieClientTestHarness.java    | 22 +++++++++++++++++-----
 .../hudi/testutils/HoodieClientTestUtils.java      | 21 +++++++++++++++++++++
 .../apache/hudi/utilities/HDFSParquetImporter.java |  1 +
 .../hudi/utilities/HoodieSnapshotCopier.java       |  1 +
 .../hudi/utilities/HoodieSnapshotExporter.java     |  2 ++
 .../hudi/utilities/HoodieWithTimelineServer.java   |  1 +
 .../hudi/utilities/perf/TimelineServerPerf.java    |  1 +
 .../hudi/utilities/sources/AvroDFSSource.java      |  1 +
 .../hudi/utilities/sources/HiveIncrPullSource.java |  1 +
 25 files changed, 79 insertions(+), 10 deletions(-)

diff --git a/README.md b/README.md
index 66b9138..542ff2e 100644
--- a/README.md
+++ b/README.md
@@ -92,6 +92,18 @@ spark-2.4.4-bin-hadoop2.7/bin/spark-shell \
   --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
 ```
 
+## Running Tests
+
+All tests can be run with maven
+```
+mvn test
+```
+
+To run tests with spark event logging enabled, define the Spark event log directory. This allows visualizing test DAG and stages using Spark History Server UI.
+```
+mvn test -DSPARK_EVLOG_DIR=/path/for/spark/event/log
+```
+
 ## Quickstart
 
 Please visit [https://hudi.apache.org/docs/quick-start-guide.html](https://hudi.apache.org/docs/quick-start-guide.html) to quickly explore Hudi's capabilities using spark-shell. 
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/CompactionAdminClient.java b/hudi-client/src/main/java/org/apache/hudi/client/CompactionAdminClient.java
index 627348c..48042e7 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/CompactionAdminClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/CompactionAdminClient.java
@@ -85,6 +85,7 @@ public class CompactionAdminClient extends AbstractHoodieClient {
     if (plan.getOperations() != null) {
       List<CompactionOperation> ops = plan.getOperations().stream()
           .map(CompactionOperation::convertFromAvroRecordInstance).collect(Collectors.toList());
+      jsc.setJobGroup(this.getClass().getSimpleName(), "Validate compaction operations");
       return jsc.parallelize(ops, parallelism).map(op -> {
         try {
           return validateCompactionOperation(metaClient, compactionInstant, op, Option.of(fsView));
@@ -350,6 +351,7 @@ public class CompactionAdminClient extends AbstractHoodieClient {
     } else {
       LOG.info("The following compaction renaming operations needs to be performed to un-schedule");
       if (!dryRun) {
+        jsc.setJobGroup(this.getClass().getSimpleName(), "Execute unschedule operations");
         return jsc.parallelize(renameActions, parallelism).map(lfPair -> {
           try {
             LOG.info("RENAME " + lfPair.getLeft().getPath() + " => " + lfPair.getRight().getPath());
@@ -392,6 +394,7 @@ public class CompactionAdminClient extends AbstractHoodieClient {
           "Number of Compaction Operations :" + plan.getOperations().size() + " for instant :" + compactionInstant);
       List<CompactionOperation> ops = plan.getOperations().stream()
           .map(CompactionOperation::convertFromAvroRecordInstance).collect(Collectors.toList());
+      jsc.setJobGroup(this.getClass().getSimpleName(), "Generate compaction unscheduling operations");
       return jsc.parallelize(ops, parallelism).flatMap(op -> {
         try {
           return getRenamingActionsForUnschedulingCompactionOperation(metaClient, compactionInstant, op,
diff --git a/hudi-client/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java b/hudi-client/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
index 806dcf5..ebd30b3 100644
--- a/hudi-client/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
+++ b/hudi-client/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
@@ -49,6 +49,7 @@ public class HoodieIndexUtils {
   public static List<Pair<String, HoodieBaseFile>> getLatestBaseFilesForAllPartitions(final List<String> partitions,
                                                                                       final JavaSparkContext jsc,
                                                                                       final HoodieTable hoodieTable) {
+    jsc.setJobGroup(HoodieIndexUtils.class.getSimpleName(), "Load latest base files from all partitions");
     return jsc.parallelize(partitions, Math.max(partitions.size(), 1))
         .flatMap(partitionPath -> {
           Option<HoodieInstant> latestCommitTime = hoodieTable.getMetaClient().getCommitsTimeline()
diff --git a/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java b/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
index ca960ef..e03f38a 100644
--- a/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
+++ b/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
@@ -199,6 +199,7 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
 
     if (config.getBloomIndexPruneByRanges()) {
       // also obtain file ranges, if range pruning is enabled
+      jsc.setJobDescription("Obtain key ranges for file slices (range pruning=on)");
       return jsc.parallelize(partitionPathFileIDList, Math.max(partitionPathFileIDList.size(), 1)).mapToPair(pf -> {
         try {
           HoodieRangeInfoHandle<T> rangeInfoHandle = new HoodieRangeInfoHandle<T>(config, hoodieTable, pf);
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
index 49d8858..b545eae 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -448,6 +448,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
         }
 
         // Now delete partially written files
+        jsc.setJobGroup(this.getClass().getSimpleName(), "Delete all partially written files");
         jsc.parallelize(new ArrayList<>(groupByPartition.values()), config.getFinalizeWriteParallelism())
             .map(partitionWithFileList -> {
               final FileSystem fileSystem = metaClient.getFs();
@@ -489,6 +490,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
    */
   private void waitForAllFiles(JavaSparkContext jsc, Map<String, List<Pair<String, String>>> groupByPartition, FileVisibility visibility) {
     // This will either ensure all files to be deleted are present.
+    jsc.setJobGroup(this.getClass().getSimpleName(), "Wait for all files to appear/disappear");
     boolean checkPassed =
         jsc.parallelize(new ArrayList<>(groupByPartition.entrySet()), config.getFinalizeWriteParallelism())
             .map(partitionWithFileList -> waitForCondition(partitionWithFileList.getKey(),
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
index 57feebc..c72a453 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
@@ -81,6 +81,7 @@ public class CleanActionExecutor extends BaseActionExecutor<HoodieCleanMetadata>
       int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism());
       LOG.info("Using cleanerParallelism: " + cleanerParallelism);
 
+      jsc.setJobGroup(this.getClass().getSimpleName(), "Generates list of file slices to be cleaned");
       Map<String, List<String>> cleanOps = jsc
           .parallelize(partitionsToClean, cleanerParallelism)
           .map(partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)))
@@ -147,6 +148,8 @@ public class CleanActionExecutor extends BaseActionExecutor<HoodieCleanMetadata>
         (int) (cleanerPlan.getFilesToBeDeletedPerPartition().values().stream().mapToInt(List::size).count()),
         config.getCleanerParallelism());
     LOG.info("Using cleanerParallelism: " + cleanerParallelism);
+    
+    jsc.setJobGroup(this.getClass().getSimpleName(), "Perform cleaning of partitions");
     List<Tuple2<String, PartitionCleanStat>> partitionCleanStats = jsc
         .parallelize(cleanerPlan.getFilesToBeDeletedPerPartition().entrySet().stream()
             .flatMap(x -> x.getValue().stream().map(y -> new Tuple2<>(x.getKey(), y)))
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
index a598710..8857fc3 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
@@ -210,6 +210,7 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends Partiti
 
     Map<String, List<SmallFile>> partitionSmallFilesMap = new HashMap<>();
     if (partitionPaths != null && partitionPaths.size() > 0) {
+      jsc.setJobGroup(this.getClass().getSimpleName(), "Getting small files from partitions");
       JavaRDD<String> partitionPathRdds = jsc.parallelize(partitionPaths, partitionPaths.size());
       partitionSmallFilesMap = partitionPathRdds.mapToPair((PairFunction<String, String, List<SmallFile>>)
           partitionPath -> new Tuple2<>(partitionPath, getSmallFiles(partitionPath))).collectAsMap();
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/compact/HoodieMergeOnReadTableCompactor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/HoodieMergeOnReadTableCompactor.java
index a2ce958..80afcac 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/compact/HoodieMergeOnReadTableCompactor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/HoodieMergeOnReadTableCompactor.java
@@ -94,6 +94,7 @@ public class HoodieMergeOnReadTableCompactor implements HoodieCompactor {
         .map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
     LOG.info("Compactor compacting " + operations + " files");
 
+    jsc.setJobGroup(this.getClass().getSimpleName(), "Compacting file slices");
     return jsc.parallelize(operations, operations.size())
         .map(s -> compact(table, metaClient, config, s, compactionInstantTime)).flatMap(List::iterator);
   }
@@ -192,6 +193,7 @@ public class HoodieMergeOnReadTableCompactor implements HoodieCompactor {
 
     SliceView fileSystemView = hoodieTable.getSliceView();
     LOG.info("Compaction looking for files to compact in " + partitionPaths + " partitions");
+    jsc.setJobGroup(this.getClass().getSimpleName(), "Looking for files to compact");
     List<HoodieCompactionOperation> operations = jsc.parallelize(partitionPaths, partitionPaths.size())
         .flatMap((FlatMapFunction<String, CompactionOperation>) partitionPath -> fileSystemView
             .getLatestFileSlices(partitionPath)
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java
index d30f0e2..481fb2d 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java
@@ -118,6 +118,7 @@ public class MergeOnReadRollbackActionExecutor extends BaseRollbackActionExecuto
     List<String> partitions = FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(), table.getMetaClient().getBasePath(),
         config.shouldAssumeDatePartitioning());
     int sparkPartitions = Math.max(Math.min(partitions.size(), config.getRollbackParallelism()), 1);
+    jsc.setJobGroup(this.getClass().getSimpleName(), "Generate all rollback requests");
     return jsc.parallelize(partitions, Math.min(partitions.size(), sparkPartitions)).flatMap(partitionPath -> {
       HoodieActiveTimeline activeTimeline = table.getMetaClient().reloadActiveTimeline();
       List<RollbackRequest> partitionRollbackRequests = new ArrayList<>();
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelper.java b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelper.java
index 4a9c20a..f5b92e7 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelper.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelper.java
@@ -85,6 +85,7 @@ public class RollbackHelper implements Serializable {
     };
 
     int sparkPartitions = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1);
+    jsc.setJobGroup(this.getClass().getSimpleName(), "Perform rollback actions");
     return jsc.parallelize(rollbackRequests, sparkPartitions).mapToPair(rollbackRequest -> {
       final Map<FileStatus, Boolean> filesToDeletedStatus = new HashMap<>();
       switch (rollbackRequest.getRollbackAction()) {
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
index 5fee8a6..ac95118 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
@@ -87,6 +87,7 @@ public class SavepointActionExecutor extends BaseActionExecutor<HoodieSavepointM
       ValidationUtils.checkArgument(HoodieTimeline.compareTimestamps(instantTime, HoodieTimeline.GREATER_THAN_OR_EQUALS, lastCommitRetained),
           "Could not savepoint commit " + instantTime + " as this is beyond the lookup window " + lastCommitRetained);
 
+      jsc.setJobGroup(this.getClass().getSimpleName(), "Collecting latest files for savepoint " + instantTime);
       Map<String, List<String>> latestFilesMap = jsc.parallelize(FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(),
           table.getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning()))
           .mapToPair(partitionPath -> {
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
index 2e91a84..719f042 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
@@ -86,7 +86,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
 
   @BeforeEach
   public void setUp() throws Exception {
-    initSparkContexts("TestHoodieBloomIndex");
+    initSparkContexts();
     initPath();
     initFileSystem();
     // We have some records to be tagged (two different partitions)
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
index 6aab654..bb227ae 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
@@ -71,7 +71,7 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
 
   @BeforeEach
   public void setUp() throws Exception {
-    initSparkContexts("TestHoodieGlobalBloomIndex");
+    initSparkContexts();
     initPath();
     // We have some records to be tagged (two different partitions)
     String schemaStr = FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/exampleSchema.txt"));
diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java
index 9cd3b3f..127cfac 100644
--- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java
+++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java
@@ -60,7 +60,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
   public void init() throws Exception {
     initDFS();
     initPath();
-    initSparkContexts("TestHoodieCommitArchiveLog");
+    initSparkContexts();
     hadoopConf = dfs.getConf();
     hadoopConf.addResource(dfs.getConf());
     dfs.mkdirs(new Path(basePath));
diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
index e06f9b3..18744ef 100644
--- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
+++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
@@ -57,7 +57,7 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness {
 
   @BeforeEach
   public void setUp() throws Exception {
-    initSparkContexts("TestHoodieMergeHandle");
+    initSparkContexts();
     initPath();
     initFileSystem();
     initTestDataGenerator();
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
index 7fa64a5..6819cc5 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
@@ -66,7 +66,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
   @BeforeEach
   public void setUp() throws Exception {
     // Initialize a local spark env
-    initSparkContexts("TestHoodieCompactor");
+    initSparkContexts();
 
     // Create a temp folder as the base path
     initPath();
diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
index 69e1776..a3b534c 100644
--- a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
+++ b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
@@ -41,6 +41,8 @@ import org.apache.hudi.table.HoodieTable;
 
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.SQLContext;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,7 +58,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness implements Serializable {
 
   private static final Logger LOG = LoggerFactory.getLogger(HoodieClientTestHarness.class);
-
+  
+  private String testMethodName;
   protected transient JavaSparkContext jsc = null;
   protected transient Configuration hadoopConf = null;
   protected transient SQLContext sqlContext;
@@ -82,6 +85,15 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
   protected transient MiniDFSCluster dfsCluster;
   protected transient DistributedFileSystem dfs;
 
+  @BeforeEach
+  public void setTestMethodName(TestInfo testInfo) {
+    if (testInfo.getTestMethod().isPresent()) {
+      testMethodName = testInfo.getTestMethod().get().getName();
+    } else {
+      testMethodName = "Unknown";
+    }
+  }
+
   /**
    * Initializes resource group for the subclasses of {@link HoodieClientTestBase}.
    */
@@ -113,7 +125,7 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
    */
   protected void initSparkContexts(String appName) {
     // Initialize a local spark env
-    jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(appName));
+    jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(appName + "#" + testMethodName));
     jsc.setLogLevel("ERROR");
     hadoopConf = jsc.hadoopConfiguration();
 
@@ -122,11 +134,11 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
   }
 
   /**
-   * Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext}) with a default name
-   * <b>TestHoodieClient</b>.
+   * Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext}) 
+   * with a default name matching the name of the class.
    */
   protected void initSparkContexts() {
-    initSparkContexts("TestHoodieClient");
+    initSparkContexts(this.getClass().getSimpleName());
   }
 
   /**
diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
index 1bb8275..5c57c25 100644
--- a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
+++ b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
@@ -146,9 +146,30 @@ public class HoodieClientTestUtils {
     new RandomAccessFile(path, "rw").setLength(length);
   }
 
+  /**
+   * Returns a Spark config for this test.
+   *
+   * The following properties may be set to customize the Spark context:
+   *   SPARK_EVLOG_DIR: Local directory where event logs should be saved. This
+   *                    allows viewing the logs with spark-history-server.
+   *
+   * @note When running the tests using maven, use the following syntax to set
+   *       a property:
+   *          mvn -DSPARK_XXX=yyy ...
+   *
+   * @param appName A name for the Spark application. Shown in the Spark web UI.
+   * @return A Spark config
+   */
   public static SparkConf getSparkConfForTest(String appName) {
     SparkConf sparkConf = new SparkConf().setAppName(appName)
         .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").setMaster("local[8]");
+
+    String evlogDir = System.getProperty("SPARK_EVLOG_DIR");
+    if (evlogDir != null) {
+      sparkConf.set("spark.eventLog.enabled", "true");
+      sparkConf.set("spark.eventLog.dir", evlogDir);
+    }
+
     return HoodieReadClient.addHoodieSupport(sparkConf);
   }
 
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java
index 4befaec..e0cfba0 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java
@@ -166,6 +166,7 @@ public class HDFSParquetImporter implements Serializable {
     AvroReadSupport.setAvroReadSchema(jsc.hadoopConfiguration(), (new Schema.Parser().parse(schemaStr)));
     ParquetInputFormat.setReadSupportClass(job, (AvroReadSupport.class));
 
+    jsc.setJobGroup(this.getClass().getSimpleName(), "Build records for import");
     return jsc.newAPIHadoopFile(cfg.srcPath, ParquetInputFormat.class, Void.class, GenericRecord.class,
             job.getConfiguration())
         // To reduce large number of tasks.
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java
index 651cbbf..916d019 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java
@@ -97,6 +97,7 @@ public class HoodieSnapshotCopier implements Serializable {
         fs.delete(new Path(outputDir), true);
       }
 
+      jsc.setJobGroup(this.getClass().getSimpleName(), "Creating a snapshot");
       jsc.parallelize(partitions, partitions.size()).flatMap(partition -> {
         // Only take latest version files <= latestCommit.
         FileSystem fs1 = FSUtils.getFs(baseDir, serConf.newCopy());
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
index 7e21b4e..0743839 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
@@ -175,6 +175,7 @@ public class HoodieSnapshotExporter {
         ? defaultPartitioner
         : ReflectionUtils.loadClass(cfg.outputPartitioner);
 
+    jsc.setJobGroup(this.getClass().getSimpleName(), "Exporting as non-HUDI dataset");
     final BaseFileOnlyView fsView = getBaseFileOnlyView(jsc, cfg);
     Iterator<String> exportingFilePaths = jsc
         .parallelize(partitions, partitions.size())
@@ -193,6 +194,7 @@ public class HoodieSnapshotExporter {
   private void exportAsHudi(JavaSparkContext jsc, Config cfg, List<String> partitions, String latestCommitTimestamp) throws IOException {
     final BaseFileOnlyView fsView = getBaseFileOnlyView(jsc, cfg);
     final SerializableConfiguration serConf = new SerializableConfiguration(jsc.hadoopConfiguration());
+    jsc.setJobGroup(this.getClass().getSimpleName(), "Exporting as HUDI dataset");
     jsc.parallelize(partitions, partitions.size()).flatMap(partition -> {
       // Only take latest version files <= latestCommit.
       List<Tuple2<String, String>> filePaths = new ArrayList<>();
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieWithTimelineServer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieWithTimelineServer.java
index 06fdda3..06c1084 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieWithTimelineServer.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieWithTimelineServer.java
@@ -86,6 +86,7 @@ public class HoodieWithTimelineServer implements Serializable {
     System.out.println("Driver Hostname is :" + driverHost);
     List<String> messages = new ArrayList<>();
     IntStream.range(0, cfg.numPartitions).forEach(i -> messages.add("Hello World"));
+    jsc.setJobGroup(this.getClass().getSimpleName(), "Sending requests to driver host");
     List<String> gotMessages = jsc.parallelize(messages).map(msg -> sendRequest(driverHost, cfg.serverPort)).collect();
     System.out.println("Got Messages :" + gotMessages);
     ValidationUtils.checkArgument(gotMessages.equals(messages), "Got expected reply from Server");
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java
index a6692cc..6aaa6bd 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java
@@ -132,6 +132,7 @@ public class TimelineServerPerf implements Serializable {
 
   public List<PerfStats> runLookups(JavaSparkContext jsc, List<String> partitionPaths, SyncableFileSystemView fsView,
       int numIterations, int concurrency) {
+    jsc.setJobGroup(this.getClass().getSimpleName(), "Lookup all performance stats");
     return jsc.parallelize(partitionPaths, cfg.numExecutors).flatMap(p -> {
       ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(100);
       final List<PerfStats> result = new ArrayList<>();
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java
index c8aa3e8..a2f7df9 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java
@@ -56,6 +56,7 @@ public class AvroDFSSource extends AvroSource {
   }
 
   private JavaRDD<GenericRecord> fromFiles(String pathStr) {
+    sparkContext.setJobGroup(this.getClass().getSimpleName(), "Fetch Avro data from files");
     JavaPairRDD<AvroKey, NullWritable> avroRDD = sparkContext.newAPIHadoopFile(pathStr, AvroKeyInputFormat.class,
         AvroKey.class, NullWritable.class, sparkContext.hadoopConfiguration());
     return avroRDD.keys().map(r -> ((GenericRecord) r.datum()));
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HiveIncrPullSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HiveIncrPullSource.java
index a8bea98..5853190 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HiveIncrPullSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HiveIncrPullSource.java
@@ -128,6 +128,7 @@ public class HiveIncrPullSource extends AvroSource {
       String pathStr = commitDeltaFiles.stream().map(f -> f.getPath().toString()).collect(Collectors.joining(","));
       JavaPairRDD<AvroKey, NullWritable> avroRDD = sparkContext.newAPIHadoopFile(pathStr, AvroKeyInputFormat.class,
           AvroKey.class, NullWritable.class, sparkContext.hadoopConfiguration());
+      sparkContext.setJobGroup(this.getClass().getSimpleName(), "Fetch new data");
       return new InputBatch<>(Option.of(avroRDD.keys().map(r -> ((GenericRecord) r.datum()))),
           String.valueOf(commitToPull.get()));
     } catch (IOException ioe) {