You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2020/08/04 17:59:49 UTC
[kudu] branch master updated: [KUDU-3177] Added
kudu.snapshotTimestampMicros to kudu spark readOptions as optional property
This is an automated email from the ASF dual-hosted git repository.
granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 40289e2 [KUDU-3177] Added kudu.snapshotTimestampMicros to kudu spark readOptions as optional property
40289e2 is described below
commit 40289e2a2faa021826b9424864ab2935507bef33
Author: kevinmccarthy <00...@gmail.com>
AuthorDate: Mon Aug 3 07:24:50 2020 -0700
[KUDU-3177] Added kudu.snapshotTimestampMicros to kudu spark readOptions
as optional property
Added property snapshotTimestampMs to spark read options which will
allow consistant scanswhen timestamp is set before the first
dataFrame read.
Change-Id: I00862c0e174a964efc6cab0b8141b1ac5a1bebc0
Reviewed-on: http://gerrit.cloudera.org:8080/16276
Tested-by: Kudu Jenkins
Reviewed-by: Grant Henke <gr...@apache.org>
---
.../org/apache/kudu/spark/kudu/DefaultSource.scala | 6 +-
.../scala/org/apache/kudu/spark/kudu/KuduRDD.scala | 7 +-
.../apache/kudu/spark/kudu/KuduReadOptions.scala | 6 +-
.../apache/kudu/spark/kudu/DefaultSourceTest.scala | 77 ++++++++++++++++++++++
.../org/apache/kudu/spark/kudu/KuduTestSuite.scala | 7 ++
5 files changed, 100 insertions(+), 3 deletions(-)
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
index 499da90..8fcfe71 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
@@ -71,6 +71,7 @@ class DefaultSource
val SPLIT_SIZE_BYTES = "kudu.splitSizeBytes"
val HANDLE_SCHEMA_DRIFT = "kudu.handleSchemaDrift"
val USE_DRIVER_METADATA = "kudu.useDriverMetadata"
+ val SNAPSHOT_TIMESTAMP_MS = "kudu.snapshotTimestampMs"
/**
* A nice alias for the data source so that when specifying the format
@@ -186,6 +187,7 @@ class DefaultSource
val splitSizeBytes = parameters.get(SPLIT_SIZE_BYTES).map(_.toLong)
val useDriverMetadata =
parameters.get(USE_DRIVER_METADATA).map(_.toBoolean).getOrElse(defaultUseDriverMetadata)
+ val snapshotTimestampMs = parameters.get(SNAPSHOT_TIMESTAMP_MS).map(_.toLong)
KuduReadOptions(
batchSize,
scanLocality,
@@ -194,7 +196,9 @@ class DefaultSource
scanRequestTimeoutMs,
/* socketReadTimeoutMs= */ None,
splitSizeBytes,
- useDriverMetadata)
+ useDriverMetadata,
+ snapshotTimestampMs
+ )
}
private def getWriteOptions(parameters: Map[String, String]): KuduWriteOptions = {
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
index 1d9ac64..62a2529 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
@@ -68,10 +68,15 @@ class KuduRDD private[kudu] (
// case, to ensure the consistency of such scan, we use READ_AT_SNAPSHOT
// read mode without setting a timestamp.
builder.replicaSelection(options.scanLocality)
- if (options.scanLocality == ReplicaSelection.CLOSEST_REPLICA) {
+ if (options.scanLocality == ReplicaSelection.CLOSEST_REPLICA ||
+ options.snapshotTimestampMs.isDefined) {
builder.readMode(AsyncKuduScanner.ReadMode.READ_AT_SNAPSHOT)
}
+ options.snapshotTimestampMs.foreach { timestamp =>
+ builder.snapshotTimestampMicros(timestamp * 1000)
+ }
+
options.scanRequestTimeoutMs.foreach { timeout =>
builder.scanRequestTimeout(timeout)
}
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala
index f0eae25..405c589 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala
@@ -41,6 +41,9 @@ import org.apache.kudu.spark.kudu.KuduReadOptions._
* @param useDriverMetadata If true, sends the table metadata from the driver to the tasks instead
* of relying on calls to the Kudu master for each task to get the current
* table metadata.
+ * @param snapshotTimestampMs Sets a timestamp in unixtime milliseconds to use for READ_AT_SNAPSHOT
+ * to allow repeatable reads. If not set, the timestamp is generated
+ * by the server.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
@@ -52,7 +55,8 @@ case class KuduReadOptions(
scanRequestTimeoutMs: Option[Long] = None,
socketReadTimeoutMs: Option[Long] = None,
splitSizeBytes: Option[Long] = None,
- useDriverMetadata: Boolean = defaultUseDriverMetadata)
+ useDriverMetadata: Boolean = defaultUseDriverMetadata,
+ snapshotTimestampMs: Option[Long] = None)
object KuduReadOptions {
val defaultBatchSize: Int = 1024 * 1024 * 20 // TODO: Understand/doc this setting?
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
index 0ef1e7d..b504116 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
@@ -18,6 +18,7 @@
package org.apache.kudu.spark.kudu
import java.nio.charset.StandardCharsets
+import java.util
import scala.collection.JavaConverters._
import scala.collection.immutable.IndexedSeq
@@ -750,6 +751,82 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
}
@Test
+ def testSnapshotTimestampMsPropagation() {
+ kuduOptions = Map(
+ "kudu.table" -> tableName,
+ "kudu.master" -> harness.getMasterAddressesAsString,
+ "kudu.snapshotTimestampMs" -> "86400000000")
+ val dataFrameSnapshotTimestamp = sqlContext.read.options(kuduOptions).format("kudu").load
+ val kuduRelationSnapshotTimestamp = kuduRelationFromDataFrame(dataFrameSnapshotTimestamp)
+
+ kuduOptions =
+ Map("kudu.table" -> tableName, "kudu.master" -> harness.getMasterAddressesAsString)
+ val dataFrameNoneSnapshotTimestamp = sqlContext.read.options(kuduOptions).format("kudu").load
+ val kuduRelationNoneSnapshotTimestamp = kuduRelationFromDataFrame(
+ dataFrameNoneSnapshotTimestamp)
+ assert(kuduRelationSnapshotTimestamp.readOptions.snapshotTimestampMs.contains(86400000000L))
+ assert(kuduRelationNoneSnapshotTimestamp.readOptions.snapshotTimestampMs.isEmpty)
+ }
+
+ @Test
+ def testReadDataFrameAtSnapshot() {
+ insertRows(table, 100, 1)
+ val timestamp = getLastPropagatedTimestampMs()
+ insertRows(table, 100, 100)
+ kuduOptions = Map(
+ "kudu.table" -> tableName,
+ "kudu.master" -> harness.getMasterAddressesAsString,
+ "kudu.snapshotTimestampMs" -> s"$timestamp")
+ val dataFrameWithSnapshotTimestamp = sqlContext.read.options(kuduOptions).format("kudu").load
+ kuduOptions =
+ Map("kudu.table" -> tableName, "kudu.master" -> harness.getMasterAddressesAsString)
+ val dataFrameWithoutSnapshotTimestamp = sqlContext.read.options(kuduOptions).format("kudu").load
+ assertEquals(100, dataFrameWithSnapshotTimestamp.collect().length)
+ assertEquals(200, dataFrameWithoutSnapshotTimestamp.collect().length)
+ }
+
+ @Test
+ def testSnapshotTimestampBeyondMaxAge(): Unit = {
+ val extraConfigs = new util.HashMap[String, String]()
+ val tableName = "snapshot_test"
+ extraConfigs.put("kudu.table.history_max_age_sec", "1");
+ kuduClient.createTable(
+ tableName,
+ schema,
+ tableOptions.setExtraConfigs(extraConfigs)
+ )
+ val timestamp = getLastPropagatedTimestampMs()
+ kuduOptions = Map(
+ "kudu.table" -> tableName,
+ "kudu.master" -> harness.getMasterAddressesAsString,
+ "kudu.snapshotTimestampMs" -> s"$timestamp")
+ insertRows(table, 100, 1)
+ Thread.sleep(2000)
+ val df = sqlContext.read.options(kuduOptions).format("kudu").load
+ val exception = intercept[Exception] {
+ df.count()
+ }
+ assertTrue(
+ exception.getMessage.contains(
+ "snapshot scan end timestamp is earlier than the ancient history mark")
+ )
+ }
+
+ @Test
+ def testSnapshotTimestampBeyondCurrentTimestamp(): Unit = {
+ val timestamp = getLastPropagatedTimestampMs() + 100000
+ kuduOptions = Map(
+ "kudu.table" -> tableName,
+ "kudu.master" -> harness.getMasterAddressesAsString,
+ "kudu.snapshotTimestampMs" -> s"$timestamp")
+ val df = sqlContext.read.options(kuduOptions).format("kudu").load
+ val exception = intercept[Exception] {
+ df.count()
+ }
+ assertTrue(exception.getMessage.contains("cannot verify timestamp"))
+ }
+
+ @Test
@MasterServerConfig(
flags = Array(
"--mock_table_metrics_for_testing=true",
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala
index b502bc9..804efb7 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala
@@ -35,6 +35,7 @@ import org.apache.kudu.test.KuduTestHarness
import org.apache.kudu.util.CharUtil
import org.apache.kudu.util.DateUtil
import org.apache.kudu.util.DecimalUtil
+import org.apache.kudu.util.HybridTimeUtil
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession
@@ -207,6 +208,12 @@ trait KuduTestSuite extends JUnitSuite {
rows
}
+ def getLastPropagatedTimestampMs(): Long = {
+ HybridTimeUtil
+ .HTTimestampToPhysicalAndLogical(kuduClient.getLastPropagatedTimestamp)
+ .head / 1000
+ }
+
def upsertRowsWithRowDataSize(
targetTable: KuduTable,
rowCount: Integer,