You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2020/08/04 17:59:49 UTC

[kudu] branch master updated: [KUDU-3177] Added kudu.snapshotTimestampMicros to kudu spark readOptions as optional property

This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 40289e2  [KUDU-3177] Added kudu.snapshotTimestampMicros to kudu spark readOptions as optional property
40289e2 is described below

commit 40289e2a2faa021826b9424864ab2935507bef33
Author: kevinmccarthy <00...@gmail.com>
AuthorDate: Mon Aug 3 07:24:50 2020 -0700

    [KUDU-3177] Added kudu.snapshotTimestampMicros to kudu spark readOptions
    as optional property
    
    Added property snapshotTimestampMs to spark read options which will
    allow consistant scanswhen timestamp is set before the first
    dataFrame read.
    
    Change-Id: I00862c0e174a964efc6cab0b8141b1ac5a1bebc0
    Reviewed-on: http://gerrit.cloudera.org:8080/16276
    Tested-by: Kudu Jenkins
    Reviewed-by: Grant Henke <gr...@apache.org>
---
 .../org/apache/kudu/spark/kudu/DefaultSource.scala |  6 +-
 .../scala/org/apache/kudu/spark/kudu/KuduRDD.scala |  7 +-
 .../apache/kudu/spark/kudu/KuduReadOptions.scala   |  6 +-
 .../apache/kudu/spark/kudu/DefaultSourceTest.scala | 77 ++++++++++++++++++++++
 .../org/apache/kudu/spark/kudu/KuduTestSuite.scala |  7 ++
 5 files changed, 100 insertions(+), 3 deletions(-)

diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
index 499da90..8fcfe71 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
@@ -71,6 +71,7 @@ class DefaultSource
   val SPLIT_SIZE_BYTES = "kudu.splitSizeBytes"
   val HANDLE_SCHEMA_DRIFT = "kudu.handleSchemaDrift"
   val USE_DRIVER_METADATA = "kudu.useDriverMetadata"
+  val SNAPSHOT_TIMESTAMP_MS = "kudu.snapshotTimestampMs"
 
   /**
    * A nice alias for the data source so that when specifying the format
@@ -186,6 +187,7 @@ class DefaultSource
     val splitSizeBytes = parameters.get(SPLIT_SIZE_BYTES).map(_.toLong)
     val useDriverMetadata =
       parameters.get(USE_DRIVER_METADATA).map(_.toBoolean).getOrElse(defaultUseDriverMetadata)
+    val snapshotTimestampMs = parameters.get(SNAPSHOT_TIMESTAMP_MS).map(_.toLong)
     KuduReadOptions(
       batchSize,
       scanLocality,
@@ -194,7 +196,9 @@ class DefaultSource
       scanRequestTimeoutMs,
       /* socketReadTimeoutMs= */ None,
       splitSizeBytes,
-      useDriverMetadata)
+      useDriverMetadata,
+      snapshotTimestampMs
+    )
   }
 
   private def getWriteOptions(parameters: Map[String, String]): KuduWriteOptions = {
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
index 1d9ac64..62a2529 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
@@ -68,10 +68,15 @@ class KuduRDD private[kudu] (
     // case, to ensure the consistency of such scan, we use READ_AT_SNAPSHOT
     // read mode without setting a timestamp.
     builder.replicaSelection(options.scanLocality)
-    if (options.scanLocality == ReplicaSelection.CLOSEST_REPLICA) {
+    if (options.scanLocality == ReplicaSelection.CLOSEST_REPLICA ||
+      options.snapshotTimestampMs.isDefined) {
       builder.readMode(AsyncKuduScanner.ReadMode.READ_AT_SNAPSHOT)
     }
 
+    options.snapshotTimestampMs.foreach { timestamp =>
+      builder.snapshotTimestampMicros(timestamp * 1000)
+    }
+
     options.scanRequestTimeoutMs.foreach { timeout =>
       builder.scanRequestTimeout(timeout)
     }
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala
index f0eae25..405c589 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala
@@ -41,6 +41,9 @@ import org.apache.kudu.spark.kudu.KuduReadOptions._
  * @param useDriverMetadata If true, sends the table metadata from the driver to the tasks instead
  *                          of relying on calls to the Kudu master for each task to get the current
  *                          table metadata.
+ * @param snapshotTimestampMs Sets a timestamp in unixtime milliseconds to use for READ_AT_SNAPSHOT
+ *                            to allow repeatable reads. If not set, the timestamp is generated
+ *                            by the server.
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
@@ -52,7 +55,8 @@ case class KuduReadOptions(
     scanRequestTimeoutMs: Option[Long] = None,
     socketReadTimeoutMs: Option[Long] = None,
     splitSizeBytes: Option[Long] = None,
-    useDriverMetadata: Boolean = defaultUseDriverMetadata)
+    useDriverMetadata: Boolean = defaultUseDriverMetadata,
+    snapshotTimestampMs: Option[Long] = None)
 
 object KuduReadOptions {
   val defaultBatchSize: Int = 1024 * 1024 * 20 // TODO: Understand/doc this setting?
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
index 0ef1e7d..b504116 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
@@ -18,6 +18,7 @@
 package org.apache.kudu.spark.kudu
 
 import java.nio.charset.StandardCharsets
+import java.util
 
 import scala.collection.JavaConverters._
 import scala.collection.immutable.IndexedSeq
@@ -750,6 +751,82 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
   }
 
   @Test
+  def testSnapshotTimestampMsPropagation() {
+    kuduOptions = Map(
+      "kudu.table" -> tableName,
+      "kudu.master" -> harness.getMasterAddressesAsString,
+      "kudu.snapshotTimestampMs" -> "86400000000")
+    val dataFrameSnapshotTimestamp = sqlContext.read.options(kuduOptions).format("kudu").load
+    val kuduRelationSnapshotTimestamp = kuduRelationFromDataFrame(dataFrameSnapshotTimestamp)
+
+    kuduOptions =
+      Map("kudu.table" -> tableName, "kudu.master" -> harness.getMasterAddressesAsString)
+    val dataFrameNoneSnapshotTimestamp = sqlContext.read.options(kuduOptions).format("kudu").load
+    val kuduRelationNoneSnapshotTimestamp = kuduRelationFromDataFrame(
+      dataFrameNoneSnapshotTimestamp)
+    assert(kuduRelationSnapshotTimestamp.readOptions.snapshotTimestampMs.contains(86400000000L))
+    assert(kuduRelationNoneSnapshotTimestamp.readOptions.snapshotTimestampMs.isEmpty)
+  }
+
+  @Test
+  def testReadDataFrameAtSnapshot() {
+    insertRows(table, 100, 1)
+    val timestamp = getLastPropagatedTimestampMs()
+    insertRows(table, 100, 100)
+    kuduOptions = Map(
+      "kudu.table" -> tableName,
+      "kudu.master" -> harness.getMasterAddressesAsString,
+      "kudu.snapshotTimestampMs" -> s"$timestamp")
+    val dataFrameWithSnapshotTimestamp = sqlContext.read.options(kuduOptions).format("kudu").load
+    kuduOptions =
+      Map("kudu.table" -> tableName, "kudu.master" -> harness.getMasterAddressesAsString)
+    val dataFrameWithoutSnapshotTimestamp = sqlContext.read.options(kuduOptions).format("kudu").load
+    assertEquals(100, dataFrameWithSnapshotTimestamp.collect().length)
+    assertEquals(200, dataFrameWithoutSnapshotTimestamp.collect().length)
+  }
+
+  @Test
+  def testSnapshotTimestampBeyondMaxAge(): Unit = {
+    val extraConfigs = new util.HashMap[String, String]()
+    val tableName = "snapshot_test"
+    extraConfigs.put("kudu.table.history_max_age_sec", "1");
+    kuduClient.createTable(
+      tableName,
+      schema,
+      tableOptions.setExtraConfigs(extraConfigs)
+    )
+    val timestamp = getLastPropagatedTimestampMs()
+    kuduOptions = Map(
+      "kudu.table" -> tableName,
+      "kudu.master" -> harness.getMasterAddressesAsString,
+      "kudu.snapshotTimestampMs" -> s"$timestamp")
+    insertRows(table, 100, 1)
+    Thread.sleep(2000)
+    val df = sqlContext.read.options(kuduOptions).format("kudu").load
+    val exception = intercept[Exception] {
+      df.count()
+    }
+    assertTrue(
+      exception.getMessage.contains(
+        "snapshot scan end timestamp is earlier than the ancient history mark")
+    )
+  }
+
+  @Test
+  def testSnapshotTimestampBeyondCurrentTimestamp(): Unit = {
+    val timestamp = getLastPropagatedTimestampMs() + 100000
+    kuduOptions = Map(
+      "kudu.table" -> tableName,
+      "kudu.master" -> harness.getMasterAddressesAsString,
+      "kudu.snapshotTimestampMs" -> s"$timestamp")
+    val df = sqlContext.read.options(kuduOptions).format("kudu").load
+    val exception = intercept[Exception] {
+      df.count()
+    }
+    assertTrue(exception.getMessage.contains("cannot verify timestamp"))
+  }
+
+  @Test
   @MasterServerConfig(
     flags = Array(
       "--mock_table_metrics_for_testing=true",
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala
index b502bc9..804efb7 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala
@@ -35,6 +35,7 @@ import org.apache.kudu.test.KuduTestHarness
 import org.apache.kudu.util.CharUtil
 import org.apache.kudu.util.DateUtil
 import org.apache.kudu.util.DecimalUtil
+import org.apache.kudu.util.HybridTimeUtil
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.SparkSession
@@ -207,6 +208,12 @@ trait KuduTestSuite extends JUnitSuite {
     rows
   }
 
+  def getLastPropagatedTimestampMs(): Long = {
+    HybridTimeUtil
+      .HTTimestampToPhysicalAndLogical(kuduClient.getLastPropagatedTimestamp)
+      .head / 1000
+  }
+
   def upsertRowsWithRowDataSize(
       targetTable: KuduTable,
       rowCount: Integer,