You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2018/10/02 19:01:56 UTC

[5/6] kudu git commit: [spark] Add KuduReadOptions to encapsulate the read parameters

[spark] Add KuduReadOptions to encapsulate the read parameters

This patch adds KuduReadOptions and refactors
the exisiting private APIs to leverage it.
Additionally some refactoring was done to
KuduWriteOptions to ensure the interaction
with both classes was unified.

This patch is a preliminary step to adding
keep alive support to the Spark integration.

Change-Id: Iaee3d09471cad2cddd816ec77eafaae457faf1c2
Reviewed-on: http://gerrit.cloudera.org:8080/11537
Tested-by: Kudu Jenkins
Reviewed-by: Hao Hao <ha...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/545afa7a
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/545afa7a
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/545afa7a

Branch: refs/heads/master
Commit: 545afa7a4c952457b11889b6b552685dadb0750b
Parents: dc8ae79
Author: Grant Henke <gr...@apache.org>
Authored: Thu Sep 27 10:50:09 2018 -0700
Committer: Grant Henke <gr...@apache.org>
Committed: Tue Oct 2 19:01:16 2018 +0000

----------------------------------------------------------------------
 .../apache/kudu/spark/kudu/DefaultSource.scala  | 152 +++++++++----------
 .../apache/kudu/spark/kudu/KuduContext.scala    |  22 +--
 .../org/apache/kudu/spark/kudu/KuduRDD.scala    |  32 ++--
 .../kudu/spark/kudu/KuduReadOptions.scala       |  50 ++++++
 .../kudu/spark/kudu/KuduWriteOptions.scala      |  21 ++-
 .../org/apache/kudu/spark/kudu/SparkUtil.scala  |   7 +-
 .../kudu/spark/kudu/DefaultSourceTest.scala     |  19 +--
 7 files changed, 170 insertions(+), 133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/545afa7a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
index 3f9ae0a..29635a3 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
@@ -30,9 +30,13 @@ import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.SaveMode
+import org.apache.yetus.audience.InterfaceAudience
 import org.apache.yetus.audience.InterfaceStability
+
 import org.apache.kudu.client.KuduPredicate.ComparisonOp
 import org.apache.kudu.client._
+import org.apache.kudu.spark.kudu.KuduReadOptions._
+import org.apache.kudu.spark.kudu.KuduWriteOptions._
 import org.apache.kudu.spark.kudu.SparkUtil._
 
 /**
@@ -43,6 +47,7 @@ import org.apache.kudu.spark.kudu.SparkUtil._
  * `DefaultSource` when the user specifies the path of a source during DDL
  * operations through [[org.apache.spark.sql.DataFrameReader.format]].
  */
+@InterfaceAudience.Private
 @InterfaceStability.Unstable
 class DefaultSource
     extends RelationProvider with CreatableRelationProvider with SchemaRelationProvider {
@@ -56,16 +61,7 @@ class DefaultSource
   val IGNORE_DUPLICATE_ROW_ERRORS = "kudu.ignoreDuplicateRowErrors"
   val SCAN_REQUEST_TIMEOUT_MS = "kudu.scanRequestTimeoutMs"
   val SOCKET_READ_TIMEOUT_MS = "kudu.socketReadTimeoutMs"
-
-  def defaultMasterAddrs: String = InetAddress.getLocalHost.getCanonicalHostName
-
-  def getScanRequestTimeoutMs(parameters: Map[String, String]): Option[Long] = {
-    parameters.get(SCAN_REQUEST_TIMEOUT_MS).map(_.toLong)
-  }
-
-  def getSocketReadTimeoutMs(parameters: Map[String, String]): Option[Long] = {
-    parameters.get(SOCKET_READ_TIMEOUT_MS).map(_.toLong)
-  }
+  val BATCH_SIZE = "kudu.batchSize"
 
   /**
    * Construct a BaseRelation using the provided context and parameters.
@@ -77,33 +73,37 @@ class DefaultSource
   override def createRelation(
       sqlContext: SQLContext,
       parameters: Map[String, String]): BaseRelation = {
+    createRelation(sqlContext, parameters, null)
+  }
+
+  /**
+    * Construct a BaseRelation using the provided context, parameters and schema.
+    *
+    * @param sqlContext SparkSQL context
+    * @param parameters parameters given to us from SparkSQL
+    * @param schema     the schema used to select columns for the relation
+    * @return           a BaseRelation Object
+    */
+  override def createRelation(
+      sqlContext: SQLContext,
+      parameters: Map[String, String],
+      schema: StructType): BaseRelation = {
     val tableName = parameters.getOrElse(
       TABLE_KEY,
       throw new IllegalArgumentException(
         s"Kudu table name must be specified in create options using key '$TABLE_KEY'"))
     val kuduMaster = parameters.getOrElse(KUDU_MASTER, defaultMasterAddrs)
     val operationType = getOperationType(parameters.getOrElse(OPERATION, "upsert"))
-    val faultTolerantScanner =
-      Try(parameters.getOrElse(FAULT_TOLERANT_SCANNER, "false").toBoolean)
-        .getOrElse(false)
-    val scanLocality = getScanLocalityType(parameters.getOrElse(SCAN_LOCALITY, "closest_replica"))
-    val ignoreDuplicateRowErrors = Try(parameters(IGNORE_DUPLICATE_ROW_ERRORS).toBoolean)
-      .getOrElse(false) ||
-    Try(parameters(OPERATION) == "insert-ignore").getOrElse(false)
-    val ignoreNull =
-      Try(parameters.getOrElse(IGNORE_NULL, "false").toBoolean).getOrElse(false)
-    val writeOptions =
-      new KuduWriteOptions(ignoreDuplicateRowErrors, ignoreNull)
+    val schemaOption = Option(schema)
+    val readOptions = getReadOptions(parameters)
+    val writeOptions = getWriteOptions(parameters)
 
     new KuduRelation(
       tableName,
       kuduMaster,
-      faultTolerantScanner,
-      scanLocality,
-      getScanRequestTimeoutMs(parameters),
-      getSocketReadTimeoutMs(parameters),
       operationType,
-      None,
+      schemaOption,
+      readOptions,
       writeOptions
     )(sqlContext)
   }
@@ -130,36 +130,47 @@ class DefaultSource
       case _ =>
         throw new UnsupportedOperationException("Currently, only Append is supported")
     }
-
     kuduRelation
   }
 
-  override def createRelation(
-      sqlContext: SQLContext,
-      parameters: Map[String, String],
-      schema: StructType): BaseRelation = {
-    val tableName = parameters.getOrElse(
-      TABLE_KEY,
-      throw new IllegalArgumentException(
-        s"Kudu table name must be specified in create options " +
-          s"using key '$TABLE_KEY'"))
-    val kuduMaster = parameters.getOrElse(KUDU_MASTER, defaultMasterAddrs)
-    val operationType = getOperationType(parameters.getOrElse(OPERATION, "upsert"))
+  private def getReadOptions(parameters: Map[String, String]): KuduReadOptions = {
+    val batchSize = parameters.get(BATCH_SIZE).map(_.toInt).getOrElse(defaultBatchSize)
     val faultTolerantScanner =
-      Try(parameters.getOrElse(FAULT_TOLERANT_SCANNER, "false").toBoolean)
-        .getOrElse(false)
-    val scanLocality = getScanLocalityType(parameters.getOrElse(SCAN_LOCALITY, "closest_replica"))
+      parameters.get(FAULT_TOLERANT_SCANNER).map(_.toBoolean).getOrElse(defaultFaultTolerantScanner)
+    val scanLocality =
+      parameters.get(SCAN_LOCALITY).map(getScanLocalityType).getOrElse(defaultScanLocality)
+    val scanRequestTimeoutMs = parameters.get(SCAN_REQUEST_TIMEOUT_MS).map(_.toLong)
+    val socketReadTimeoutMs = parameters.get(SOCKET_READ_TIMEOUT_MS).map(_.toLong)
 
-    new KuduRelation(
-      tableName,
-      kuduMaster,
-      faultTolerantScanner,
+    KuduReadOptions(
+      batchSize,
       scanLocality,
-      getScanRequestTimeoutMs(parameters),
-      getSocketReadTimeoutMs(parameters),
-      operationType,
-      Some(schema)
-    )(sqlContext)
+      faultTolerantScanner,
+      scanRequestTimeoutMs,
+      socketReadTimeoutMs)
+  }
+
+  private def getWriteOptions(parameters: Map[String, String]): KuduWriteOptions = {
+    val ignoreDuplicateRowErrors =
+    Try(parameters(IGNORE_DUPLICATE_ROW_ERRORS).toBoolean).getOrElse(false) ||
+    Try(parameters(OPERATION) == "insert-ignore").getOrElse(false)
+    val ignoreNull =
+      parameters.get(IGNORE_NULL).map(_.toBoolean).getOrElse(defaultIgnoreNull)
+
+    Try(parameters.getOrElse(IGNORE_NULL, "false").toBoolean).getOrElse(false)
+
+    KuduWriteOptions(ignoreDuplicateRowErrors, ignoreNull)
+  }
+
+  private def defaultMasterAddrs: String = InetAddress.getLocalHost.getCanonicalHostName
+
+  private def getScanLocalityType(opParam: String): ReplicaSelection = {
+    opParam.toLowerCase match {
+      case "leader_only" => ReplicaSelection.LEADER_ONLY
+      case "closest_replica" => ReplicaSelection.CLOSEST_REPLICA
+      case _ =>
+        throw new IllegalArgumentException(s"Unsupported replica selection type '$opParam'")
+    }
   }
 
   private def getOperationType(opParam: String): OperationType = {
@@ -173,15 +184,6 @@ class DefaultSource
         throw new IllegalArgumentException(s"Unsupported operation type '$opParam'")
     }
   }
-
-  private def getScanLocalityType(opParam: String): ReplicaSelection = {
-    opParam.toLowerCase match {
-      case "leader_only" => ReplicaSelection.LEADER_ONLY
-      case "closest_replica" => ReplicaSelection.CLOSEST_REPLICA
-      case _ =>
-        throw new IllegalArgumentException(s"Unsupported replica selection type '$opParam'")
-    }
-  }
 }
 
 /**
@@ -189,31 +191,25 @@ class DefaultSource
  *
  * @param tableName Kudu table that we plan to read from
  * @param masterAddrs Kudu master addresses
- * @param faultTolerantScanner scanner type to be used. Fault tolerant if true,
- *                             otherwise, use non fault tolerant one
- * @param scanLocality If true scan locality is enabled, so that the scan will
- *                     take place at the closest replica.
- * @param scanRequestTimeoutMs Maximum time allowed per scan request, in milliseconds
  * @param operationType The default operation type to perform when writing to the relation
  * @param userSchema A schema used to select columns for the relation
+ * @param readOptions Kudu read options
  * @param writeOptions Kudu write options
  * @param sqlContext SparkSQL context
  */
+@InterfaceAudience.Private
 @InterfaceStability.Unstable
 class KuduRelation(
-    private val tableName: String,
-    private val masterAddrs: String,
-    private val faultTolerantScanner: Boolean,
-    private val scanLocality: ReplicaSelection,
-    private[kudu] val scanRequestTimeoutMs: Option[Long],
-    private[kudu] val socketReadTimeoutMs: Option[Long],
-    private val operationType: OperationType,
-    private val userSchema: Option[StructType],
-    private val writeOptions: KuduWriteOptions = new KuduWriteOptions)(val sqlContext: SQLContext)
+    val tableName: String,
+    val masterAddrs: String,
+    val operationType: OperationType,
+    val userSchema: Option[StructType],
+    val readOptions: KuduReadOptions = new KuduReadOptions,
+    val writeOptions: KuduWriteOptions = new KuduWriteOptions)(val sqlContext: SQLContext)
     extends BaseRelation with PrunedFilteredScan with InsertableRelation {
 
   private val context: KuduContext =
-    new KuduContext(masterAddrs, sqlContext.sparkContext, socketReadTimeoutMs)
+    new KuduContext(masterAddrs, sqlContext.sparkContext, readOptions.socketReadTimeoutMs)
 
   private val table: KuduTable = context.syncClient.openTable(tableName)
 
@@ -241,14 +237,10 @@ class KuduRelation(
     val predicates = filters.flatMap(filterToPredicate)
     new KuduRDD(
       context,
-      1024 * 1024 * 20,
+      table,
       requiredColumns,
       predicates,
-      table,
-      faultTolerantScanner,
-      scanLocality,
-      scanRequestTimeoutMs,
-      socketReadTimeoutMs,
+      readOptions,
       sqlContext.sparkContext
     )
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/545afa7a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
index a9975b6..5a12ad6 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
@@ -20,8 +20,6 @@ package org.apache.kudu.spark.kudu
 import java.security.AccessController
 import java.security.PrivilegedAction
 
-import java.sql.Timestamp
-
 import javax.security.auth.Subject
 import javax.security.auth.login.AppConfigurationEntry
 import javax.security.auth.login.Configuration
@@ -45,6 +43,7 @@ import org.apache.yetus.audience.InterfaceAudience
 import org.apache.yetus.audience.InterfaceStability
 import org.slf4j.Logger
 import org.slf4j.LoggerFactory
+
 import org.apache.kudu.client.SessionConfiguration.FlushMode
 import org.apache.kudu.client._
 import org.apache.kudu.spark.kudu.SparkUtil._
@@ -58,7 +57,8 @@ import org.apache.kudu.Type
  * [[KuduContext]] should be created in the driver, and shared with executors
  * as a serializable field.
  */
-@InterfaceStability.Unstable
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
 class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeoutMs: Option[Long])
     extends Serializable {
 
@@ -137,19 +137,14 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou
   def kuduRDD(
       sc: SparkContext,
       tableName: String,
-      columnProjection: Seq[String] = Nil): RDD[Row] = {
-    // TODO: provide an elegant way to pass various options (faultTolerantScan,
-    // TODO: localityScan, etc) to KuduRDD
+      columnProjection: Seq[String] = Nil,
+      options: KuduReadOptions = KuduReadOptions()): RDD[Row] = {
     new KuduRDD(
       this,
-      1024 * 1024 * 20,
+      syncClient.openTable(tableName),
       columnProjection.toArray,
       Array(),
-      syncClient.openTable(tableName),
-      false,
-      ReplicaSelection.LEADER_ONLY,
-      None,
-      None,
+      options,
       sc)
   }
 
@@ -246,8 +241,7 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou
   @deprecated(
     "Use KuduContext.insertRows(data, tableName, new KuduWriteOptions(ignoreDuplicateRowErrors = true))")
   def insertIgnoreRows(data: DataFrame, tableName: String): Unit = {
-    val writeOptions = new KuduWriteOptions
-    writeOptions.ignoreDuplicateRowErrors = true
+    val writeOptions = KuduWriteOptions(ignoreDuplicateRowErrors = true)
     writeRows(data, tableName, Insert, writeOptions)
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/545afa7a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
index c16a6a0..77dabcc 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
@@ -17,12 +17,13 @@
 package org.apache.kudu.spark.kudu
 
 import scala.collection.JavaConverters._
-
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.Row
 import org.apache.spark.Partition
 import org.apache.spark.SparkContext
 import org.apache.spark.TaskContext
+import org.apache.yetus.audience.InterfaceAudience
+import org.apache.yetus.audience.InterfaceStability
 
 import org.apache.kudu.client._
 import org.apache.kudu.Type
@@ -33,45 +34,42 @@ import org.apache.kudu.client
  *
  * To construct a KuduRDD, use [[KuduContext#kuduRDD]] or a Kudu DataSource.
  */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
 class KuduRDD private[kudu] (
     val kuduContext: KuduContext,
-    @transient val batchSize: Integer,
+    @transient val table: KuduTable,
     @transient val projectedCols: Array[String],
     @transient val predicates: Array[client.KuduPredicate],
-    @transient val table: KuduTable,
-    @transient val isFaultTolerant: Boolean,
-    @transient val scanLocality: ReplicaSelection,
-    @transient val scanRequestTimeoutMs: Option[Long],
-    @transient val socketReadTimeoutMs: Option[Long],
+    @transient val options: KuduReadOptions,
     @transient val sc: SparkContext)
     extends RDD[Row](sc, Nil) {
 
   override protected def getPartitions: Array[Partition] = {
     val builder = kuduContext.syncClient
       .newScanTokenBuilder(table)
-      .batchSizeBytes(batchSize)
+      .batchSizeBytes(options.batchSize)
       .setProjectedColumnNames(projectedCols.toSeq.asJava)
-      .setFaultTolerant(isFaultTolerant)
+      .setFaultTolerant(options.faultTolerantScanner)
       .cacheBlocks(true)
 
     // A scan is partitioned to multiple ones. If scan locality is enabled,
     // each will take place at the closet replica from the executor. In this
     // case, to ensure the consistency of such scan, we use READ_AT_SNAPSHOT
     // read mode without setting a timestamp.
-    if (scanLocality == ReplicaSelection.CLOSEST_REPLICA) {
-      builder
-        .replicaSelection(ReplicaSelection.CLOSEST_REPLICA)
-        .readMode(AsyncKuduScanner.ReadMode.READ_AT_SNAPSHOT)
+    builder.replicaSelection(options.scanLocality)
+    if (options.scanLocality == ReplicaSelection.CLOSEST_REPLICA) {
+      builder.readMode(AsyncKuduScanner.ReadMode.READ_AT_SNAPSHOT)
     }
 
-    scanRequestTimeoutMs match {
-      case Some(timeout) => builder.scanRequestTimeout(timeout)
-      case _ =>
+    options.scanRequestTimeoutMs.foreach { timeout =>
+      builder.scanRequestTimeout(timeout)
     }
 
     for (predicate <- predicates) {
       builder.addPredicate(predicate)
     }
+
     val tokens = builder.build().asScala
     tokens.zipWithIndex.map {
       case (token, index) =>
@@ -79,7 +77,7 @@ class KuduRDD private[kudu] (
         // replica selection policy is leader only, to take advantage
         // of scan locality.
         var locations: Array[String] = null
-        if (scanLocality == ReplicaSelection.LEADER_ONLY) {
+        if (options.scanLocality == ReplicaSelection.LEADER_ONLY) {
           locations = Array(token.getTablet.getLeaderReplica.getRpcHost)
         } else {
           locations = token.getTablet.getReplicas.asScala.map(_.getRpcHost).toArray

http://git-wip-us.apache.org/repos/asf/kudu/blob/545afa7a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala
new file mode 100644
index 0000000..7c9b888
--- /dev/null
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kudu.spark.kudu
+
+import org.apache.yetus.audience.InterfaceAudience
+import org.apache.yetus.audience.InterfaceStability
+
+import org.apache.kudu.client.ReplicaSelection
+import org.apache.kudu.spark.kudu.KuduReadOptions._
+
+/**
+ * KuduReadOptions holds configuration of reads to Kudu tables.
+ *
+ * @param batchSize Sets the maximum number of bytes returned by the scanner, on each batch.
+ * @param scanLocality If true scan locality is enabled, so that the scan will
+ *                     take place at the closest replica
+ * @param faultTolerantScanner scanner type to be used. Fault tolerant if true,
+ *                             otherwise, use non fault tolerant one
+ * @param scanRequestTimeoutMs Maximum time allowed per scan request, in milliseconds
+ * @param socketReadTimeoutMs Maximum time allowed when waiting on data from a socket
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+case class KuduReadOptions(
+    batchSize: Int = defaultBatchSize,
+    scanLocality: ReplicaSelection = defaultScanLocality,
+    faultTolerantScanner: Boolean = defaultFaultTolerantScanner,
+    scanRequestTimeoutMs: Option[Long] = None,
+    socketReadTimeoutMs: Option[Long] = None)
+
+object KuduReadOptions {
+  val defaultBatchSize: Int = 1024 * 1024 * 20 // TODO: Understand/doc this setting?
+  val defaultScanLocality: ReplicaSelection = ReplicaSelection.CLOSEST_REPLICA
+  val defaultFaultTolerantScanner: Boolean = false
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/545afa7a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduWriteOptions.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduWriteOptions.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduWriteOptions.scala
index 1eb21ca..22a6886 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduWriteOptions.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduWriteOptions.scala
@@ -17,20 +17,25 @@
 
 package org.apache.kudu.spark.kudu
 
+import org.apache.yetus.audience.InterfaceAudience
 import org.apache.yetus.audience.InterfaceStability
 
+import org.apache.kudu.spark.kudu.KuduWriteOptions._
+
 /**
  * KuduWriteOptions holds configuration of writes to Kudu tables.
  *
- * The instance of this class is passed to KuduContext write functions,
- * such as insertRows, deleteRows, upsertRows, and updateRows.
- *
  * @param ignoreDuplicateRowErrors when inserting, ignore any new rows that
  *                                 have a primary key conflict with existing rows
  * @param ignoreNull update only non-Null columns if set true
  */
-@InterfaceStability.Unstable
-class KuduWriteOptions(
-    var ignoreDuplicateRowErrors: Boolean = false,
-    var ignoreNull: Boolean = false)
-    extends Serializable
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+case class KuduWriteOptions(
+    ignoreDuplicateRowErrors: Boolean = defaultIgnoreDuplicateRowErrors,
+    ignoreNull: Boolean = defaultIgnoreNull)
+
+object KuduWriteOptions {
+  val defaultIgnoreDuplicateRowErrors: Boolean = false
+  val defaultIgnoreNull: Boolean = false
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/545afa7a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/SparkUtil.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/SparkUtil.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/SparkUtil.scala
index 69b6ca4..aae386f 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/SparkUtil.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/SparkUtil.scala
@@ -2,14 +2,15 @@ package org.apache.kudu.spark.kudu
 
 import java.util
 
+import org.apache.spark.sql.types._
+import org.apache.yetus.audience.InterfaceAudience
+import org.apache.yetus.audience.InterfaceStability
+
 import org.apache.kudu.ColumnTypeAttributes.ColumnTypeAttributesBuilder
 import org.apache.kudu.ColumnSchema
 import org.apache.kudu.ColumnTypeAttributes
 import org.apache.kudu.Schema
 import org.apache.kudu.Type
-import org.apache.spark.sql.types._
-import org.apache.yetus.audience.InterfaceAudience
-import org.apache.yetus.audience.InterfaceStability
 
 import scala.collection.JavaConverters._
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/545afa7a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
index ed56b69..624257c 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
@@ -164,8 +164,7 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
 
     // change the c2 string to abc and insert
     val updateDF = baseDF.withColumn("c2_s", lit("abc"))
-    val kuduWriteOptions = new KuduWriteOptions
-    kuduWriteOptions.ignoreDuplicateRowErrors = true
+    val kuduWriteOptions = KuduWriteOptions(ignoreDuplicateRowErrors = true)
     kuduContext.insertRows(updateDF, tableName, kuduWriteOptions)
 
     // change the key and insert
@@ -320,14 +319,13 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
     val nullDF = sqlContext
       .createDataFrame(Seq((0, null.asInstanceOf[String])))
       .toDF("key", "val")
-    val kuduWriteOptions = new KuduWriteOptions
-    kuduWriteOptions.ignoreNull = true
-    kuduContext.upsertRows(nullDF, simpleTableName, kuduWriteOptions)
+    val ignoreNullOptions = KuduWriteOptions(ignoreNull = true)
+    kuduContext.upsertRows(nullDF, simpleTableName, ignoreNullOptions)
     assert(dataDF.collect.toList === nonNullDF.collect.toList)
 
-    kuduWriteOptions.ignoreNull = false
+    val respectNullOptions = KuduWriteOptions(ignoreNull = false)
     kuduContext.updateRows(nonNullDF, simpleTableName)
-    kuduContext.upsertRows(nullDF, simpleTableName, kuduWriteOptions)
+    kuduContext.upsertRows(nullDF, simpleTableName, respectNullOptions)
     assert(dataDF.collect.toList === nullDF.collect.toList)
 
     kuduContext.updateRows(nonNullDF, simpleTableName)
@@ -883,8 +881,7 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
         df("key")
           .plus(100))
       .withColumn("c2_s", lit("def"))
-    val kuduWriteOptions = new KuduWriteOptions
-    kuduWriteOptions.ignoreDuplicateRowErrors = true
+    val kuduWriteOptions = KuduWriteOptions(ignoreDuplicateRowErrors = true)
     kuduContext.insertRows(updateDF, tableName, kuduWriteOptions)
     assert(kuduContext.syncClient.getLastPropagatedTimestamp > prevTimestamp)
   }
@@ -914,7 +911,7 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
       "kudu.scanRequestTimeoutMs" -> "1")
     val dataFrame = sqlContext.read.options(kuduOptions).kudu
     val kuduRelation = kuduRelationFromDataFrame(dataFrame)
-    assert(kuduRelation.scanRequestTimeoutMs == Some(1))
+    assert(kuduRelation.readOptions.scanRequestTimeoutMs == Some(1))
   }
 
   /**
@@ -930,6 +927,6 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
       "kudu.socketReadTimeoutMs" -> "1")
     val dataFrame = sqlContext.read.options(kuduOptions).kudu
     val kuduRelation = kuduRelationFromDataFrame(dataFrame)
-    assert(kuduRelation.socketReadTimeoutMs == Some(1))
+    assert(kuduRelation.readOptions.socketReadTimeoutMs == Some(1))
   }
 }