You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by da...@apache.org on 2017/11/21 21:10:12 UTC

kudu git commit: KUDU-1454 [part 1]: update propagated timestamp on the driver

Repository: kudu
Updated Branches:
  refs/heads/master 48eab213c -> 9f26c9d15


KUDU-1454 [part 1]: update propagated timestamp on the driver

Currently, Spark uses multiple clients simultaneously, possibly on
different JVMs, for reads and writes. Each client may have the
propagated timestamp properly updated after each read/write. However,
the KuduClient on the driver does not have the correct propagated
timestamp. This is problematic if the user wants to use READ_AT_SNAPSHOT
ReadMode to get a consistent snapshot.

This patch uses Accumulator in Spark to properly update the propagated
timestamp on the driver through comparing the maximum timestamp on each
executor. It also updates back last propagated timestamp to the
executors for further writes. Test is added accordingly and ran in both
'local' and 'local-cluster' mode.

Change-Id: Id0a078ae8ebaa6a859be75c822879291192c5842
Reviewed-on: http://gerrit.cloudera.org:8080/8552
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <da...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/9f26c9d1
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/9f26c9d1
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/9f26c9d1

Branch: refs/heads/master
Commit: 9f26c9d159e37b27b0c6f60dbcec796b14ded605
Parents: 48eab21
Author: hahao <ha...@cloudera.com>
Authored: Mon Nov 13 17:35:00 2017 -0800
Committer: Dan Burkert <da...@apache.org>
Committed: Tue Nov 21 21:09:59 2017 +0000

----------------------------------------------------------------------
 .../apache/kudu/spark/kudu/KuduContext.scala    | 54 +++++++++++++++++++-
 .../org/apache/kudu/spark/kudu/KuduRDD.scala    |  9 +++-
 .../kudu/spark/kudu/DefaultSourceTest.scala     | 40 ++++++++++++++-
 3 files changed, 98 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/9f26c9d1/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
index 6961430..49068c4 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
@@ -30,6 +30,7 @@ import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.types.{DataType, DataTypes, StructType}
 import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.util.AccumulatorV2
 import org.apache.yetus.audience.InterfaceStability
 import org.slf4j.{Logger, LoggerFactory}
 
@@ -48,6 +49,45 @@ import org.apache.kudu.{ColumnSchema, Schema, Type}
 @InterfaceStability.Unstable
 class KuduContext(val kuduMaster: String,
                   sc: SparkContext) extends Serializable {
+
+  /**
+    * TimestampAccumulator accumulates the maximum value of client's
+    * propagated timestamp of all executors and can only read by the
+    * driver.
+    */
+  private[kudu] class TimestampAccumulator(var timestamp: Long = 0L)
+      extends AccumulatorV2[Long, Long] {
+    override def isZero: Boolean = {
+      timestamp == 0
+    }
+
+    override def copy(): AccumulatorV2[Long, Long] = {
+      new TimestampAccumulator(timestamp)
+    }
+
+    override def reset(): Unit = {
+      timestamp = 0L
+    }
+
+    override def add(v: Long): Unit = {
+      timestamp = timestamp.max(v)
+    }
+
+    override def merge(other: AccumulatorV2[Long, Long]): Unit = {
+      timestamp = timestamp.max(other.value)
+
+      // Since for every write/scan operation, each executor holds its own copy of
+      // client. We need to update the propagated timestamp on the driver based on
+      // the latest propagated timestamp from all executors through TimestampAccumulator.
+      syncClient.updateLastPropagatedTimestamp(timestampAccumulator.value)
+    }
+
+    override def value: Long = timestamp
+  }
+
+  val timestampAccumulator = new TimestampAccumulator()
+  sc.register(timestampAccumulator)
+
   import kudu.KuduContext._
 
   @Deprecated()
@@ -200,8 +240,11 @@ class KuduContext(val kuduMaster: String,
 
   private[kudu] def writeRows(data: DataFrame, tableName: String, operation: OperationType) {
     val schema = data.schema
+    // Get the client's last propagated timestamp on the driver.
+    val lastPropagatedTimestamp = syncClient.getLastPropagatedTimestamp
     data.foreachPartition(iterator => {
-      val pendingErrors = writePartitionRows(iterator, schema, tableName, operation)
+      val pendingErrors = writePartitionRows(iterator, schema, tableName, operation,
+                                             lastPropagatedTimestamp)
       val errorCount = pendingErrors.getRowErrors.length
       if (errorCount > 0) {
         val errors = pendingErrors.getRowErrors.take(5).map(_.getErrorStatus).mkString
@@ -214,7 +257,11 @@ class KuduContext(val kuduMaster: String,
   private def writePartitionRows(rows: Iterator[Row],
                                  schema: StructType,
                                  tableName: String,
-                                 operationType: OperationType): RowErrorsAndOverflowStatus = {
+                                 operationType: OperationType,
+                                 lastPropagatedTimestamp: Long): RowErrorsAndOverflowStatus = {
+    // Since each executor has its own KuduClient, update executor's propagated timestamp
+    // based on the last one on the driver.
+    syncClient.updateLastPropagatedTimestamp(lastPropagatedTimestamp)
     val table: KuduTable = syncClient.openTable(tableName)
     val indices: Array[(Int, Int)] = schema.fields.zipWithIndex.map({ case (field, sparkIdx) =>
       sparkIdx -> table.getSchema.getColumnIndex(field.name)
@@ -246,6 +293,9 @@ class KuduContext(val kuduMaster: String,
       }
     } finally {
       session.close()
+      // Update timestampAccumulator with the client's last propagated
+      // timestamp on each executor.
+      timestampAccumulator.add(syncClient.getLastPropagatedTimestamp)
     }
     session.getPendingErrors
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/9f26c9d1/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
index c1b45eb..4202d73 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
@@ -61,7 +61,7 @@ class KuduRDD private[kudu] (val kuduContext: KuduContext,
     val client: KuduClient = kuduContext.syncClient
     val partition: KuduPartition = part.asInstanceOf[KuduPartition]
     val scanner = KuduScanToken.deserializeIntoScanner(partition.scanToken, client)
-    new RowIterator(scanner)
+    new RowIterator(scanner, kuduContext)
   }
 
   override def getPreferredLocations(partition: Partition): Seq[String] = {
@@ -79,8 +79,10 @@ private class KuduPartition(val index: Int,
 /**
   * A Spark SQL [[Row]] iterator which wraps a [[KuduScanner]].
   * @param scanner the wrapped scanner
+  * @param kuduContext the kudu context
   */
-private class RowIterator(private val scanner: KuduScanner) extends Iterator[Row] {
+private class RowIterator(private val scanner: KuduScanner,
+                          private val kuduContext: KuduContext) extends Iterator[Row] {
 
   private var currentIterator: RowResultIterator = null
 
@@ -91,6 +93,9 @@ private class RowIterator(private val scanner: KuduScanner) extends Iterator[Row
         throw new RuntimeException("Kudu task interrupted")
       }
       currentIterator = scanner.nextRows()
+      // Update timestampAccumulator with the client's last propagated
+      // timestamp on each executor.
+      kuduContext.timestampAccumulator.add(kuduContext.syncClient.getLastPropagatedTimestamp)
     }
     currentIterator.hasNext
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/9f26c9d1/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
index a65c530..9a90f56 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
@@ -195,7 +195,6 @@ class DefaultSourceTest extends FunSuite with TestContext with BeforeAndAfter wi
     val df = sqlContext.read.options(kuduOptions).kudu.select( "c2_s", "c1_i", "key")
     val collected = df.collect()
     assert(collected(0).getString(0).equals("0"))
-
   }
 
   test("table non fault tolerant scan") {
@@ -495,6 +494,45 @@ class DefaultSourceTest extends FunSuite with TestContext with BeforeAndAfter wi
     intercept[IllegalArgumentException] {
       sqlContext.read.options(kuduOptions).schema(userSchema).kudu
     }.getMessage should include ("Unknown column: foo")
+  }
+
+  // Verify that the propagated timestamp is properly updated inside
+  // the same client.
+  test("timestamp propagation") {
+    val df = sqlContext.read.options(kuduOptions).kudu
+    val insertDF = df.limit(1)
+                      .withColumn("key", df("key")
+                      .plus(100))
+                      .withColumn("c2_s", lit("abc"))
+
+    // Initiate a write via KuduContext, and verify that the client should
+    // have propagated timestamp.
+    kuduContext.insertRows(insertDF, tableName)
+    assert(kuduContext.syncClient.getLastPropagatedTimestamp > 0)
+    var prevTimestamp = kuduContext.syncClient.getLastPropagatedTimestamp
 
+    // Initiate a read via DataFrame, and verify that the client should
+    // move the propagated timestamp further.
+    val newDF = sqlContext.read.options(kuduOptions).kudu
+    val collected = newDF.filter("key = 100").collect()
+    assertEquals("abc", collected(0).getAs[String]("c2_s"))
+    assert(kuduContext.syncClient.getLastPropagatedTimestamp > prevTimestamp)
+    prevTimestamp = kuduContext.syncClient.getLastPropagatedTimestamp
+
+    // Initiate a read via KuduContext, and verify that the client should
+    // move the propagated timestamp further.
+    val rdd = kuduContext.kuduRDD(ss.sparkContext, tableName, List("key"))
+    assert(rdd.collect.length == 11)
+    assert(kuduContext.syncClient.getLastPropagatedTimestamp > prevTimestamp)
+    prevTimestamp = kuduContext.syncClient.getLastPropagatedTimestamp
+
+    // Initiate another write via KuduContext, and verify that the client should
+    // move the propagated timestamp further.
+    val updateDF = df.limit(1)
+                     .withColumn("key", df("key")
+                     .plus(100))
+                     .withColumn("c2_s", lit("def"))
+    kuduContext.insertIgnoreRows(updateDF, tableName)
+    assert(kuduContext.syncClient.getLastPropagatedTimestamp > prevTimestamp)
   }
 }