You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ab...@apache.org on 2019/06/01 08:38:14 UTC
[kudu] 04/04: [spark tools] KUDU-2831: Deflake
DistributedDataGeneratorTest.testGenerateRandomData
This is an automated email from the ASF dual-hosted git repository.
abukor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 6ecafb43992f4af9c6e740a6c7198feb9440110e
Author: Will Berkeley <wd...@gmail.com>
AuthorDate: Thu May 30 10:55:38 2019 -0700
[spark tools] KUDU-2831: Deflake DistributedDataGeneratorTest.testGenerateRandomData
Oops. It seems it's not possible to retrieve an accumulator by name. We
were creating an new accumulator for collisions after the old one was
cleaned up because the job finished and all references to it ended. This
meant we never actually counted collisions, so when they happened our
expectation of the number of rows written was off. This fixes the
problem by plumbing a reference to the rows written and collisions
accumulators back to the test code.
Before this patch, I saw 12/1000 failures. After, 0/1000.
Change-Id: Ie2218328e400d76dbeab50cdb0d0241dfe56d20c
Reviewed-on: http://gerrit.cloudera.org:8080/13477
Reviewed-by: Grant Henke <gr...@apache.org>
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>
---
.../spark/tools/DistributedDataGenerator.scala | 11 +++++-----
.../spark/tools/DistributedDataGeneratorTest.scala | 25 ++++++++++++++--------
2 files changed, 22 insertions(+), 14 deletions(-)
diff --git a/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/DistributedDataGenerator.scala b/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/DistributedDataGenerator.scala
index 01af3ce..357eb8b 100644
--- a/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/DistributedDataGenerator.scala
+++ b/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/DistributedDataGenerator.scala
@@ -55,7 +55,7 @@ object GeneratorMetrics {
object DistributedDataGenerator {
val log: Logger = LoggerFactory.getLogger(getClass)
- def run(options: DistributedDataGeneratorOptions, ss: SparkSession): Unit = {
+ def run(options: DistributedDataGeneratorOptions, ss: SparkSession): GeneratorMetrics = {
log.info(s"Running a DistributedDataGenerator with options: $options")
val sc = ss.sparkContext
val context = new KuduContext(options.masterAddresses, sc)
@@ -131,8 +131,7 @@ object DistributedDataGenerator {
session.close()
}
- log.info(s"Rows written: ${metrics.rowsWritten.value}")
- log.info(s"Collisions: ${metrics.collisions.value}")
+ metrics
}
/**
@@ -140,7 +139,7 @@ object DistributedDataGenerator {
* so tests must create and manage their own.
*/
@InterfaceAudience.LimitedPrivate(Array("Test"))
- def testMain(args: Array[String], ss: SparkSession): Unit = {
+ def testMain(args: Array[String], ss: SparkSession): GeneratorMetrics = {
DistributedDataGeneratorOptions.parse(args) match {
case None => throw new IllegalArgumentException("Could not parse arguments")
case Some(config) => run(config, ss)
@@ -150,7 +149,9 @@ object DistributedDataGenerator {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("DistributedDataGenerator")
val ss = SparkSession.builder().config(conf).getOrCreate()
- testMain(args, ss)
+ val metrics = testMain(args, ss)
+ log.info(s"Rows written: ${metrics.rowsWritten.value}")
+ log.info(s"Collisions: ${metrics.collisions.value}")
}
}
diff --git a/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/DistributedDataGeneratorTest.scala b/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/DistributedDataGeneratorTest.scala
index 902abab..ca1eff9 100644
--- a/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/DistributedDataGeneratorTest.scala
+++ b/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/DistributedDataGeneratorTest.scala
@@ -52,10 +52,11 @@ class DistributedDataGeneratorTest extends KuduTestSuite {
"--type=random",
randomTableName,
harness.getMasterAddressesAsString)
- val rdd = runGeneratorTest(args)
- val collisions = ss.sparkContext.longAccumulator("row_collisions").value
- // Collisions could cause the number of row to be less than the number set.
- assertEquals(numRows - collisions, rdd.collect.length)
+ val (metrics, rdd) = runGeneratorTest(args)
+ val (rowsWritten, collisions) = (metrics.rowsWritten.value, metrics.collisions.value)
+ // Collisions may cause the number of rows written to be less than the number generated.
+ assertEquals(rowsWritten, rdd.collect.length.toLong)
+ assertEquals(numRows, rowsWritten + collisions)
}
@Test
@@ -67,8 +68,11 @@ class DistributedDataGeneratorTest extends KuduTestSuite {
"--type=sequential",
randomTableName,
harness.getMasterAddressesAsString)
- val rdd = runGeneratorTest(args)
+ val (metrics, rdd) = runGeneratorTest(args)
+ val (rowsWritten, collisions) = (metrics.rowsWritten.value, metrics.collisions.value)
+ assertEquals(numRows.toLong, rowsWritten)
assertEquals(numRows, rdd.collect.length)
+ assertEquals(0L, collisions)
}
@Test
@@ -81,8 +85,11 @@ class DistributedDataGeneratorTest extends KuduTestSuite {
"--repartition=true",
randomTableName,
harness.getMasterAddressesAsString)
- val rdd = runGeneratorTest(args)
+ val (metrics, rdd) = runGeneratorTest(args)
+ val (rowsWritten, collisions) = (metrics.rowsWritten.value, metrics.collisions.value)
+ assertEquals(numRows.toLong, rowsWritten)
assertEquals(numRows, rdd.collect.length)
+ assertEquals(0L, collisions)
}
@Test
@@ -127,11 +134,11 @@ class DistributedDataGeneratorTest extends KuduTestSuite {
assertEquals(numTasks + numPartitions, actualNumTasks)
}
- def runGeneratorTest(args: Array[String]): RDD[Row] = {
+ def runGeneratorTest(args: Array[String]): (GeneratorMetrics, RDD[Row]) = {
val schema = generator.randomSchema()
val options = generator.randomCreateTableOptions(schema)
kuduClient.createTable(randomTableName, schema, options)
- DistributedDataGenerator.testMain(args, ss)
- kuduContext.kuduRDD(ss.sparkContext, randomTableName)
+ val metrics = DistributedDataGenerator.testMain(args, ss)
+ (metrics, kuduContext.kuduRDD(ss.sparkContext, randomTableName))
}
}