You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2018/11/06 23:07:34 UTC

kudu git commit: KUDU-2584: Prevent flaky off-by-one errors in backup tests

Repository: kudu
Updated Branches:
  refs/heads/master 6ce61d6e6 -> aa20ef057


KUDU-2584: Prevent flaky off-by-one errors in backup tests

This patch adds 1 ms to the target snapshot time when
a backup is taken. This ensures that we don’t have
flakes due to off-by-one errors where all the values are not read.

The underlying reason for adding 1 ms is that we pass
the timestamp in ms granularity but the snapshot time
consists of microseconds plus a logical clock. This
means if the data is inserted with a fraction of a ms
remaining it could be truncated and unread.

Additionaly this patch copies over the timestamp
propagation call from the KuduRDD and ensures
the Spark tests use the Kudu client from the
KuduContext. This should further prevent future
snapshot issues.

This patch also includes an auto-formating change in
KuduBackupOptions that must have been missed in
a previous commit.

Change-Id: Ia0f1b4a4138cc8c913543a68fad748927cdc439d
Reviewed-on: http://gerrit.cloudera.org:8080/11815
Tested-by: Grant Henke <gr...@apache.org>
Reviewed-by: Adar Dembo <ad...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/aa20ef05
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/aa20ef05
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/aa20ef05

Branch: refs/heads/master
Commit: aa20ef0576cd9e2cf4a035ecdf6dbd746d94c586
Parents: 6ce61d6
Author: Grant Henke <gr...@apache.org>
Authored: Mon Oct 29 12:50:11 2018 -0500
Committer: Grant Henke <gr...@apache.org>
Committed: Tue Nov 6 23:07:14 2018 +0000

----------------------------------------------------------------------
 .../apache/kudu/backup/KuduBackupOptions.scala    |  5 +++--
 .../org/apache/kudu/backup/KuduBackupRDD.scala    | 10 ++++++++--
 .../org/apache/kudu/backup/TestKuduBackup.scala   | 18 ++++++++++++++++--
 .../apache/kudu/spark/kudu/KuduTestSuite.scala    |  5 +++--
 4 files changed, 30 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/aa20ef05/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupOptions.scala
----------------------------------------------------------------------
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupOptions.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupOptions.scala
index c594b1a..82a8eb4 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupOptions.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupOptions.scala
@@ -87,8 +87,9 @@ object KuduBackupOptions {
 
       opt[Long]("keepAlivePeriodMs")
         .action((v, o) => o.copy(keepAlivePeriodMs = v))
-        .text("Sets the period at which to send keep-alive requests to the tablet server to ensure" +
-          " that scanners do not time out")
+        .text(
+          "Sets the period at which to send keep-alive requests to the tablet server to ensure" +
+            " that scanners do not time out")
         .optional()
 
       arg[String]("<table>...")

http://git-wip-us.apache.org/repos/asf/kudu/blob/aa20ef05/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala
----------------------------------------------------------------------
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala
index f42b369..9be2bf2 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala
@@ -87,7 +87,7 @@ class KuduBackupRDD private[kudu] (
     // TODO: Get deletes and updates for incremental backups.
     val scanner =
       KuduScanToken.deserializeIntoScanner(partition.scanToken, client)
-    new RowIterator(scanner, keepAlivePeriodMs)
+    new RowIterator(scanner, kuduContext, keepAlivePeriodMs)
   }
 
   override def getPreferredLocations(partition: Partition): Seq[String] = {
@@ -106,7 +106,10 @@ private case class KuduBackupPartition(index: Int, scanToken: Array[Byte], locat
  * that takes the job partitions and task context and expects to return an Iterator[Row].
  * This implementation facilitates that.
  */
-private class RowIterator(private val scanner: KuduScanner, val keepAlivePeriodMs: Long)
+private class RowIterator(
+    private val scanner: KuduScanner,
+    val kuduContext: KuduContext,
+    val keepAlivePeriodMs: Long)
     extends Iterator[Row] {
 
   private var currentIterator: RowResultIterator = RowResultIterator.empty
@@ -130,6 +133,9 @@ private class RowIterator(private val scanner: KuduScanner, val keepAlivePeriodM
       }
       currentIterator = scanner.nextRows()
     }
+    // Update timestampAccumulator with the client's last propagated
+    // timestamp on each executor.
+    kuduContext.timestampAccumulator.add(kuduContext.syncClient.getLastPropagatedTimestamp)
     KeepKuduScannerAlive()
     currentIterator.hasNext
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/aa20ef05/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
----------------------------------------------------------------------
diff --git a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
index e74fdcd..d3ff3c4 100644
--- a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
+++ b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
@@ -36,6 +36,7 @@ import org.apache.kudu.Type
 import org.apache.kudu.spark.kudu._
 import org.apache.kudu.test.RandomUtils
 import org.apache.kudu.util.DecimalUtil
+import org.apache.kudu.util.HybridTimeUtil
 import org.junit.Assert._
 import org.junit.Test
 import org.slf4j.Logger
@@ -349,9 +350,22 @@ class TestKuduBackup extends KuduTestSuite {
   def backupAndRestore(tableNames: String*): Unit = {
     val dir = Files.createTempDirectory("backup")
     val path = dir.toUri.toString
-
+    val nowMs = System.currentTimeMillis()
+
+    // Log the timestamps to simplify flaky debugging.
+    log.info(s"nowMs: ${System.currentTimeMillis()}")
+    val hts = HybridTimeUtil.HTTimestampToPhysicalAndLogical(kuduClient.getLastPropagatedTimestamp)
+    log.info(s"propagated physicalMicros: ${hts(0)}")
+    log.info(s"propagated logical: ${hts(1)}")
+
+    // Add one millisecond to our target snapshot time. This will ensure we read all of the records
+    // in the backup and prevent flaky off-by-one errors. The underlying reason for adding 1 ms is
+    // that we pass the timestamp in millisecond granularity but the snapshot time has microsecond
+    // granularity. This means if the test runs fast enough that data is inserted with the same
+    // millisecond value as nowMs (after truncating the micros) the records inserted in the
+    // microseconds after truncation could be unread.
     val backupOptions =
-      new KuduBackupOptions(tableNames, path, harness.getMasterAddressesAsString)
+      new KuduBackupOptions(tableNames, path, harness.getMasterAddressesAsString, nowMs + 1)
     KuduBackup.run(backupOptions, ss)
 
     val restoreOptions =

http://git-wip-us.apache.org/repos/asf/kudu/blob/aa20ef05/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala
index 03b2ba2..da43ef6 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala
@@ -125,11 +125,12 @@ trait KuduTestSuite extends JUnitSuite {
 
   @Before
   def setUpBase(): Unit = {
-    kuduClient = harness.getClient
-
     ss = SparkSession.builder().config(conf).getOrCreate()
     kuduContext = new KuduContext(harness.getMasterAddressesAsString, ss.sparkContext)
 
+    // Spark tests should use the client from the kuduContext.
+    kuduClient = kuduContext.syncClient
+
     table = kuduClient.createTable(tableName, schema, tableOptions)
 
     val simpleTableOptions = new CreateTableOptions()