You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ab...@apache.org on 2019/06/01 08:38:14 UTC

[kudu] 04/04: [spark tools] KUDU-2831: Deflake DistributedDataGeneratorTest.testGenerateRandomData

This is an automated email from the ASF dual-hosted git repository.

abukor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 6ecafb43992f4af9c6e740a6c7198feb9440110e
Author: Will Berkeley <wd...@gmail.com>
AuthorDate: Thu May 30 10:55:38 2019 -0700

    [spark tools] KUDU-2831: Deflake DistributedDataGeneratorTest.testGenerateRandomData
    
    Oops. It seems it's not possible to retrieve an accumulator by name. We
    were creating an new accumulator for collisions after the old one was
    cleaned up because the job finished and all references to it ended. This
    meant we never actually counted collisions, so when they happened our
    expectation of the number of rows written was off. This fixes the
    problem by plumbing a reference to the rows written and collisions
    accumulators back to the test code.
    
    Before this patch, I saw 12/1000 failures. After, 0/1000.
    
    Change-Id: Ie2218328e400d76dbeab50cdb0d0241dfe56d20c
    Reviewed-on: http://gerrit.cloudera.org:8080/13477
    Reviewed-by: Grant Henke <gr...@apache.org>
    Tested-by: Kudu Jenkins
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
---
 .../spark/tools/DistributedDataGenerator.scala     | 11 +++++-----
 .../spark/tools/DistributedDataGeneratorTest.scala | 25 ++++++++++++++--------
 2 files changed, 22 insertions(+), 14 deletions(-)

diff --git a/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/DistributedDataGenerator.scala b/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/DistributedDataGenerator.scala
index 01af3ce..357eb8b 100644
--- a/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/DistributedDataGenerator.scala
+++ b/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/DistributedDataGenerator.scala
@@ -55,7 +55,7 @@ object GeneratorMetrics {
 object DistributedDataGenerator {
   val log: Logger = LoggerFactory.getLogger(getClass)
 
-  def run(options: DistributedDataGeneratorOptions, ss: SparkSession): Unit = {
+  def run(options: DistributedDataGeneratorOptions, ss: SparkSession): GeneratorMetrics = {
     log.info(s"Running a DistributedDataGenerator with options: $options")
     val sc = ss.sparkContext
     val context = new KuduContext(options.masterAddresses, sc)
@@ -131,8 +131,7 @@ object DistributedDataGenerator {
       session.close()
     }
 
-    log.info(s"Rows written: ${metrics.rowsWritten.value}")
-    log.info(s"Collisions: ${metrics.collisions.value}")
+    metrics
   }
 
   /**
@@ -140,7 +139,7 @@ object DistributedDataGenerator {
    * so tests must create and manage their own.
    */
   @InterfaceAudience.LimitedPrivate(Array("Test"))
-  def testMain(args: Array[String], ss: SparkSession): Unit = {
+  def testMain(args: Array[String], ss: SparkSession): GeneratorMetrics = {
     DistributedDataGeneratorOptions.parse(args) match {
       case None => throw new IllegalArgumentException("Could not parse arguments")
       case Some(config) => run(config, ss)
@@ -150,7 +149,9 @@ object DistributedDataGenerator {
   def main(args: Array[String]): Unit = {
     val conf = new SparkConf().setAppName("DistributedDataGenerator")
     val ss = SparkSession.builder().config(conf).getOrCreate()
-    testMain(args, ss)
+    val metrics = testMain(args, ss)
+    log.info(s"Rows written: ${metrics.rowsWritten.value}")
+    log.info(s"Collisions: ${metrics.collisions.value}")
   }
 }
 
diff --git a/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/DistributedDataGeneratorTest.scala b/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/DistributedDataGeneratorTest.scala
index 902abab..ca1eff9 100644
--- a/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/DistributedDataGeneratorTest.scala
+++ b/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/DistributedDataGeneratorTest.scala
@@ -52,10 +52,11 @@ class DistributedDataGeneratorTest extends KuduTestSuite {
       "--type=random",
       randomTableName,
       harness.getMasterAddressesAsString)
-    val rdd = runGeneratorTest(args)
-    val collisions = ss.sparkContext.longAccumulator("row_collisions").value
-    // Collisions could cause the number of row to be less than the number set.
-    assertEquals(numRows - collisions, rdd.collect.length)
+    val (metrics, rdd) = runGeneratorTest(args)
+    val (rowsWritten, collisions) = (metrics.rowsWritten.value, metrics.collisions.value)
+    // Collisions may cause the number of rows written to be less than the number generated.
+    assertEquals(rowsWritten, rdd.collect.length.toLong)
+    assertEquals(numRows, rowsWritten + collisions)
   }
 
   @Test
@@ -67,8 +68,11 @@ class DistributedDataGeneratorTest extends KuduTestSuite {
       "--type=sequential",
       randomTableName,
       harness.getMasterAddressesAsString)
-    val rdd = runGeneratorTest(args)
+    val (metrics, rdd) = runGeneratorTest(args)
+    val (rowsWritten, collisions) = (metrics.rowsWritten.value, metrics.collisions.value)
+    assertEquals(numRows.toLong, rowsWritten)
     assertEquals(numRows, rdd.collect.length)
+    assertEquals(0L, collisions)
   }
 
   @Test
@@ -81,8 +85,11 @@ class DistributedDataGeneratorTest extends KuduTestSuite {
       "--repartition=true",
       randomTableName,
       harness.getMasterAddressesAsString)
-    val rdd = runGeneratorTest(args)
+    val (metrics, rdd) = runGeneratorTest(args)
+    val (rowsWritten, collisions) = (metrics.rowsWritten.value, metrics.collisions.value)
+    assertEquals(numRows.toLong, rowsWritten)
     assertEquals(numRows, rdd.collect.length)
+    assertEquals(0L, collisions)
   }
 
   @Test
@@ -127,11 +134,11 @@ class DistributedDataGeneratorTest extends KuduTestSuite {
     assertEquals(numTasks + numPartitions, actualNumTasks)
   }
 
-  def runGeneratorTest(args: Array[String]): RDD[Row] = {
+  def runGeneratorTest(args: Array[String]): (GeneratorMetrics, RDD[Row]) = {
     val schema = generator.randomSchema()
     val options = generator.randomCreateTableOptions(schema)
     kuduClient.createTable(randomTableName, schema, options)
-    DistributedDataGenerator.testMain(args, ss)
-    kuduContext.kuduRDD(ss.sparkContext, randomTableName)
+    val metrics = DistributedDataGenerator.testMain(args, ss)
+    (metrics, kuduContext.kuduRDD(ss.sparkContext, randomTableName))
   }
 }