You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by da...@apache.org on 2017/11/21 21:12:41 UTC

kudu git commit: KUDU-1454 [part 2]: enable scan locality for Spark

Repository: kudu
Updated Branches:
  refs/heads/master 9f26c9d15 -> 3abca98c5


KUDU-1454 [part 2]: enable scan locality for Spark

This patch provides support to take advantage of scan locality in Spark
integration, so that the scan will take place at the closet replica. It
also uses READ_AT_SNAPSHOT read mode without setting a timestamp to
ensure the consistency for these scans.

Change-Id: I5f0ea53c2c15ca05da141ff1763018854b84fcea
Reviewed-on: http://gerrit.cloudera.org:8080/8560
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <da...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/3abca98c
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/3abca98c
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/3abca98c

Branch: refs/heads/master
Commit: 3abca98c58b540f6ba341f5b983ecbd90da43dcf
Parents: 9f26c9d
Author: hahao <ha...@cloudera.com>
Authored: Wed Nov 15 15:19:00 2017 -0800
Committer: Dan Burkert <da...@apache.org>
Committed: Tue Nov 21 21:12:33 2017 +0000

----------------------------------------------------------------------
 .../apache/kudu/spark/kudu/DefaultSource.scala  | 23 ++++++++++++++++----
 .../apache/kudu/spark/kudu/KuduContext.scala    |  5 +++--
 .../org/apache/kudu/spark/kudu/KuduRDD.scala    | 10 +++++++++
 .../kudu/spark/kudu/DefaultSourceTest.scala     | 15 +++++++++++++
 4 files changed, 47 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/3abca98c/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
index fbc2d38..c0b1b0a 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
@@ -49,6 +49,7 @@ class DefaultSource extends RelationProvider with CreatableRelationProvider
   val KUDU_MASTER = "kudu.master"
   val OPERATION = "kudu.operation"
   val FAULT_TOLERANT_SCANNER = "kudu.faultTolerantScan"
+  val SCAN_LOCALITY = "kudu.scanLocality"
 
   def defaultMasterAddrs: String = InetAddress.getLocalHost.getCanonicalHostName
 
@@ -69,8 +70,10 @@ class DefaultSource extends RelationProvider with CreatableRelationProvider
     val operationType = getOperationType(parameters.getOrElse(OPERATION, "upsert"))
     val faultTolerantScanner = Try(parameters.getOrElse(FAULT_TOLERANT_SCANNER, "false").toBoolean)
       .getOrElse(false)
+    val scanLocality = getScanLocalityType(parameters.getOrElse(SCAN_LOCALITY, "leader_only"))
 
-    new KuduRelation(tableName, kuduMaster, faultTolerantScanner, operationType, None)(sqlContext)
+    new KuduRelation(tableName, kuduMaster, faultTolerantScanner,
+      scanLocality, operationType, None)(sqlContext)
   }
 
   /**
@@ -103,9 +106,10 @@ class DefaultSource extends RelationProvider with CreatableRelationProvider
     val operationType = getOperationType(parameters.getOrElse(OPERATION, "upsert"))
     val faultTolerantScanner = Try(parameters.getOrElse(FAULT_TOLERANT_SCANNER, "false").toBoolean)
       .getOrElse(false)
+    val scanLocality = getScanLocalityType(parameters.getOrElse(SCAN_LOCALITY, "leader_only"))
 
-    new KuduRelation(tableName, kuduMaster, faultTolerantScanner, operationType,
-      Some(schema))(sqlContext)
+    new KuduRelation(tableName, kuduMaster, faultTolerantScanner,
+      scanLocality, operationType, Some(schema))(sqlContext)
   }
 
   private def getOperationType(opParam: String): OperationType = {
@@ -118,6 +122,14 @@ class DefaultSource extends RelationProvider with CreatableRelationProvider
       case _ => throw new IllegalArgumentException(s"Unsupported operation type '$opParam'")
     }
   }
+
+  private def getScanLocalityType(opParam: String): ReplicaSelection = {
+    opParam.toLowerCase match {
+      case "leader_only" => ReplicaSelection.LEADER_ONLY
+      case "closest_replica" => ReplicaSelection.CLOSEST_REPLICA
+      case _ => throw new IllegalArgumentException(s"Unsupported replica selection type '$opParam'")
+    }
+  }
 }
 
 /**
@@ -127,6 +139,8 @@ class DefaultSource extends RelationProvider with CreatableRelationProvider
   * @param masterAddrs Kudu master addresses
   * @param faultTolerantScanner scanner type to be used. Fault tolerant if true,
   *                             otherwise, use non fault tolerant one
+  * @param scanLocality If true scan locality is enabled, so that the scan will
+  *                     take place at the closest replica.
   * @param operationType The default operation type to perform when writing to the relation
   * @param userSchema A schema used to select columns for the relation
   * @param sqlContext SparkSQL context
@@ -135,6 +149,7 @@ class DefaultSource extends RelationProvider with CreatableRelationProvider
 class KuduRelation(private val tableName: String,
                    private val masterAddrs: String,
                    private val faultTolerantScanner: Boolean,
+                   private val scanLocality: ReplicaSelection,
                    private val operationType: OperationType,
                    private val userSchema: Option[StructType])(
                    val sqlContext: SQLContext)
@@ -182,7 +197,7 @@ class KuduRelation(private val tableName: String,
   override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
     val predicates = filters.flatMap(filterToPredicate)
     new KuduRDD(context, 1024 * 1024 * 20, requiredColumns, predicates,
-                table, faultTolerantScanner, sqlContext.sparkContext)
+                table, faultTolerantScanner, scanLocality, sqlContext.sparkContext)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kudu/blob/3abca98c/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
index 49068c4..a981f5e 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
@@ -124,9 +124,10 @@ class KuduContext(val kuduMaster: String,
   def kuduRDD(sc: SparkContext,
               tableName: String,
               columnProjection: Seq[String] = Nil): RDD[Row] = {
-    // TODO: provide an elegant way to pass various options (faultTolerantScan, etc) to KuduRDD
+    // TODO: provide an elegant way to pass various options (faultTolerantScan,
+    // TODO: localityScan, etc) to KuduRDD
     new KuduRDD(this, 1024*1024*20, columnProjection.toArray, Array(),
-                syncClient.openTable(tableName), false, sc)
+                syncClient.openTable(tableName), false, ReplicaSelection.LEADER_ONLY, sc)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kudu/blob/3abca98c/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
index 4202d73..3baa65f 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
@@ -36,6 +36,7 @@ class KuduRDD private[kudu] (val kuduContext: KuduContext,
                              @transient val predicates: Array[client.KuduPredicate],
                              @transient val table: KuduTable,
                              @transient val isFaultTolerant: Boolean,
+                             @transient val scanLocality: ReplicaSelection,
                              @transient val sc: SparkContext) extends RDD[Row](sc, Nil) {
 
   override protected def getPartitions: Array[Partition] = {
@@ -46,6 +47,15 @@ class KuduRDD private[kudu] (val kuduContext: KuduContext,
                              .setFaultTolerant(isFaultTolerant)
                              .cacheBlocks(true)
 
+    // A scan is partitioned to multiple ones. If scan locality is enabled,
+    // each will take place at the closet replica from the executor. In this
+    // case, to ensure the consistency of such scan, we use READ_AT_SNAPSHOT
+    // read mode without setting a timestamp.
+    if (scanLocality == ReplicaSelection.CLOSEST_REPLICA) {
+      builder.replicaSelection(ReplicaSelection.CLOSEST_REPLICA)
+             .readMode(AsyncKuduScanner.ReadMode.READ_AT_SNAPSHOT)
+    }
+
     for (predicate <- predicates) {
       builder.addPredicate(predicate)
     }

http://git-wip-us.apache.org/repos/asf/kudu/blob/3abca98c/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
index 9a90f56..69d1f20 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
@@ -496,6 +496,21 @@ class DefaultSourceTest extends FunSuite with TestContext with BeforeAndAfter wi
     }.getMessage should include ("Unknown column: foo")
   }
 
+  test("scan locality") {
+    kuduOptions = Map(
+      "kudu.table" -> tableName,
+      "kudu.master" -> miniCluster.getMasterAddresses,
+      "kudu.scanLocality" -> "closest_replica")
+
+    val table = "scanLocalityTest"
+    sqlContext.read.options(kuduOptions).kudu.createOrReplaceTempView(table)
+    val results = sqlContext.sql(s"SELECT * FROM $table").collectAsList()
+    assert(results.size() == rowCount)
+
+    assert(!results.get(0).isNullAt(2))
+    assert(results.get(1).isNullAt(2))
+  }
+
   // Verify that the propagated timestamp is properly updated inside
   // the same client.
   test("timestamp propagation") {