You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2016/08/13 02:08:47 UTC

[2/5] kudu git commit: KUDU-1533 Spark Kudu Rdd/Dataframe upsert

KUDU-1533 Spark Kudu Rdd/Dataframe upsert

This patch improves the Kudu SparkSQL integration in two ways:

1) Removed support for all SaveMode's except Append for the
creatableRelationProvider trait of DefaultSource. This is an
improvement because the other modes cannot be correctly implemented
for Kudu without support for table truncation and because some modes
require auto-table creation and, in that case, there's no
satisfactory mechanism to specify things like the partition schema.

2) Added {insert, update, upsert, delete}Rows methods to KuduContext.
This is the now preferred way to write to Kudu tables.

Additionally, inserts to Kudu tables from Spark SQL using
DefaultSource are now upserts by default. They can be returned to
being strict inserts with the operation parameter.

These changes may break some existing clients, so they have been
documented in the release notes. Additionally, the enhancements to
the KuduContext API, and its preferred status over using the
DefaultSource to write to Kudu tables, have been documented in the
examples and the release notes.

Change-Id: Ib8e0d50fb74dc2ce5e757e8a56fc1e863f699822
Reviewed-on: http://gerrit.cloudera.org:8080/3871
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <da...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/95321972
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/95321972
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/95321972

Branch: refs/heads/master
Commit: 953219721744dcae6d6f6776ffab67a989c44f17
Parents: af5c059
Author: Will Berkeley <wd...@gmail.com>
Authored: Tue Aug 9 01:21:34 2016 -0400
Committer: Dan Burkert <da...@cloudera.com>
Committed: Fri Aug 12 19:32:40 2016 +0000

----------------------------------------------------------------------
 docs/developing.adoc                            |  67 +++++----
 docs/release_notes.adoc                         |   9 +-
 .../apache/kudu/spark/kudu/DefaultSource.scala  |  54 ++++----
 .../apache/kudu/spark/kudu/KuduContext.scala    |  70 +++++++---
 .../org/apache/kudu/spark/kudu/package.scala    |   6 +-
 .../kudu/spark/kudu/DefaultSourceTest.scala     | 136 ++++++++++++++-----
 6 files changed, 239 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/95321972/docs/developing.adoc
----------------------------------------------------------------------
diff --git a/docs/developing.adoc b/docs/developing.adoc
index 715cd99..b8af3a6 100644
--- a/docs/developing.adoc
+++ b/docs/developing.adoc
@@ -96,42 +96,59 @@ example Maven pom.xml files.
 See link:kudu_impala_integration.html[Using Impala With Kudu] for guidance on installing
 and using Impala with Kudu, including several `impala-shell` examples.
 
-== Kudu integration with Spark
+== Kudu Integration with Spark
 
-Kudu integrates with spark through the spark data source api as of version 0.9
-Include the kudu-spark using the --jars
+Kudu integrates with Spark through the Data Source API as of version 0.9.
+Include the kudu-spark jar using the --jars option:
 [source]
 ----
-spark-shell --jars /kudu-spark-0.9.0.jar
+spark-shell --jars kudu-spark-0.9.0.jar
 ----
-Then import kudu-spark and create a dataframe:
-[source]
+then import kudu-spark and create a dataframe:
+[source,scala]
 ----
-// Import kudu datasource
 import org.apache.kudu.spark.kudu._
-val kuduDataFrame =  sqlContext.read.options(Map("kudu.master"-> "your.kudu.master.here","kudu.table"-> "your.kudu.table.here")).kudu
-// Then query using spark api or register a temporary table and use spark sql
-kuduDataFrame.select("id").filter("id">=5).show()
-// Register kuduDataFrame as a temporary table for spark-sql
-kuduDataFrame.registerTempTable("kudu_table")
-// Select from the dataframe
-sqlContext.sql("select id from kudu_table where id>=5").show()
 
-// create a new kudu table from a dataframe
-val kuduContext = new KuduContext("your.kudu.master.here")
-kuduContext.createTable("testcreatetable", df.schema, Seq("key"), new CreateTableOptions().setNumReplicas(1))
+// Read a table from Kudu
+val df = sqlContext.read.options(Map("kudu.master" -> "kudu.master:7051","kudu.table" -> "kudu_table")).kudu
+
+// Query using the Spark API...
+df.select("id").filter("id" >= 5).show()
+
+// ...or register a temporary table and use SQL
+df.registerTempTable("kudu_table")
+val filteredDF = sqlContext.sql("select id from kudu_table where id >= 5").show()
+
+// Use KuduContext to create, delete, or write to Kudu tables
+val kuduContext = new KuduContext("kudu.master:7051")
+
+// Create a new Kudu table from a dataframe schema
+// NB: No rows from the dataframe are inserted into the table
+kuduContext.createTable("test_table", df.schema, Seq("key"), new CreateTableOptions().setNumReplicas(1))
+
+// Insert data
+kuduContext.insertRows(df, "test_table")
+
+// Delete data
+kuduContext.deleteRows(filteredDF, "test_table")
+
+// Upsert data
+kuduContext.upsertRows(df, "test_table")
 
-// then we can insert data into the kudu table
-df.write.options(Map("kudu.master"-> "your.kudu.master.here","kudu.table"-> "your.kudu.table.here")).mode("append").kudu
+// Update data
+val alteredDF = df.select("id", $"count" + 1)
+kuduContext.updateRows(filteredRows, "test_table"
 
-// to update existing data change the mode to 'overwrite'
-df.write.options(Map("kudu.master"-> "your.kudu.master.here","kudu.table"-> "your.kudu.table.here")).mode("overwrite").kudu
+// Data can also be inserted into the Kudu table using the data source, though the methods on KuduContext are preferred
+// NB: The default is to upsert rows; to perform standard inserts instead, set operation = insert in the options map
+// NB: Only mode Append is supported
+df.write.options(Map("kudu.master"-> "kudu.master:7051", "kudu.table"-> "test_table")).mode("append").kudu
 
-// to check for existance of a kudu table
-kuduContext.tableExists("your.kudu.table.here")
+// Check for the existence of a Kudu table
+kuduContext.tableExists("another_table")
 
-// to delete a kudu table
-kuduContext.deleteTable("your.kudu.table.here")
+// Delete a Kudu table
+kuduContext.deleteTable("unwanted_table")
 ----
 
 == Integration with MapReduce, YARN, and Other Frameworks

http://git-wip-us.apache.org/repos/asf/kudu/blob/95321972/docs/release_notes.adoc
----------------------------------------------------------------------
diff --git a/docs/release_notes.adoc b/docs/release_notes.adoc
index ae0b5a2..fdb5719 100644
--- a/docs/release_notes.adoc
+++ b/docs/release_notes.adoc
@@ -103,6 +103,11 @@ To upgrade to Kudu 0.10.0, see link:installation.html#upgrade[Upgrade from 0.9.x
   several RPC metrics. Any users previously explicitly fetching or monitoring metrics
   related to Remote Bootstrap should update their scripts to reflect the new names.
 
+- The SparkSQL datasource for Kudu no longer supports mode `Overwrite`. Users should
+  use the new `KuduContext.upsertRows` method instead. Additionally, inserts using the
+  datasource are now upserts by default. The older behavior can be restored by setting
+  the `operation` parameter to `insert`.
+
 [[rn_0.10.0_new_features]]
 ==== New features
 
@@ -112,7 +117,9 @@ To upgrade to Kudu 0.10.0, see link:installation.html#upgrade[Upgrade from 0.9.x
 
 - TODO(mpercy): add something about reserved space configuration
 
-
+- The Spark integration's `KuduContext` now supports four new methods for writing to
+  Kudu tables: `insertRows`, `upsertRows`, `updateRows`, and `deleteRows`. These are
+  now the preferred way to write to Kudu tables from Spark.
 
 [[rn_0.10.0_improvements]]
 ==== Improvements and optimizations

http://git-wip-us.apache.org/repos/asf/kudu/blob/95321972/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
index 138eff1..a3384b2 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
@@ -22,25 +22,25 @@ import java.sql.Timestamp
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode}
+import org.apache.spark.sql.{DataFrame, Row, SaveMode, SQLContext}
 import org.apache.kudu.Type
 import org.apache.kudu.annotations.InterfaceStability
 import org.apache.kudu.client._
 import org.apache.kudu.client.KuduPredicate.ComparisonOp
-import org.apache.kudu.client.SessionConfiguration.FlushMode
 import org.apache.spark.sql.SaveMode._
 
 import scala.collection.JavaConverters._
 
 /**
   * DefaultSource for integration with Spark's dataframe datasources.
-  * This class will produce a relationProvider based on input given to it from spark.
+  * This class will produce a relationProvider based on input given to it from Spark.
   */
 @InterfaceStability.Unstable
 class DefaultSource extends RelationProvider with CreatableRelationProvider {
 
   val TABLE_KEY = "kudu.table"
   val KUDU_MASTER = "kudu.master"
+  val OPERATION = "kudu.operation"
 
   /**
     * Construct a BaseRelation using the provided context and parameters.
@@ -56,35 +56,32 @@ class DefaultSource extends RelationProvider with CreatableRelationProvider {
       throw new IllegalArgumentException(s"Kudu table name must be specified in create options " +
         s"using key '$TABLE_KEY'"))
     val kuduMaster = parameters.getOrElse(KUDU_MASTER, "localhost")
+    val upsert = parameters.getOrElse(OPERATION, "upsert").toLowerCase match {
+      case "upsert" => true
+      case "insert" => false
+      case _ => throw new UnsupportedOperationException(s"$OPERATION must be upsert or insert")
+    }
 
-    new KuduRelation(tableName, kuduMaster)(sqlContext)
+    new KuduRelation(tableName, kuduMaster, upsert)(sqlContext)
   }
 
   /**
     * Creates a relation and inserts data to specified table.
     *
     * @param sqlContext
-    * @param mode Append will not overwrite existing data, Overwrite will perform update, but will
-    *             not insert data, use upsert on KuduContext if you require both
-    * @param parameters Nessisary parameters for kudu.table and kudu.master
+    * @param mode Only Append mode is supported. It will upsert or insert data
+    *             to an existing table, depending on the upsert parameter.
+    * @param parameters Necessary parameters for kudu.table and kudu.master
     * @param data Dataframe to save into kudu
     * @return returns populated base relation
     */
   override def createRelation(sqlContext: SQLContext, mode: SaveMode,
                               parameters: Map[String, String], data: DataFrame): BaseRelation = {
-    val tableName = parameters.getOrElse(TABLE_KEY,
-      throw new IllegalArgumentException(s"Kudu table name must be specified in create options " +
-        s"using key '$TABLE_KEY'"))
-
-    val kuduMaster = parameters.getOrElse(KUDU_MASTER, "localhost")
-
-    val kuduRelation = new KuduRelation(tableName, kuduMaster)(sqlContext)
+    val kuduRelation = createRelation(sqlContext, parameters)
     mode match {
-      case Append | Ignore => kuduRelation.insert(data, overwrite = false)
-      case Overwrite => kuduRelation.insert(data, overwrite = true)
-      case ErrorIfExists =>
-          throw new UnsupportedOperationException(
-            "ErrorIfExists is currently not supported")
+      case Append => kuduRelation.asInstanceOf[KuduRelation].insert(data, false)
+      case _ => throw new UnsupportedOperationException(
+        "Currently, only Append is supported")
     }
 
     kuduRelation
@@ -96,11 +93,13 @@ class DefaultSource extends RelationProvider with CreatableRelationProvider {
   *
   * @param tableName Kudu table that we plan to read from
   * @param kuduMaster Kudu master addresses
+  * @param upsert Whether the relation will be inserted or upserted by default
   * @param sqlContext SparkSQL context
   */
 @InterfaceStability.Unstable
 class KuduRelation(private val tableName: String,
-                   private val kuduMaster: String)(
+                   private val kuduMaster: String,
+                   private val upsert: Boolean)(
                    val sqlContext: SQLContext)
 extends BaseRelation
 with PrunedFilteredScan
@@ -193,12 +192,21 @@ with InsertableRelation {
   }
 
   /**
-    * Inserts data into an existing Kudu table.
+    * By default, upserts data into an existing Kudu table.
+    * If the kudu.upsert parameter is set to false, data is inserted instead of upserted.
+    *
     * @param data [[DataFrame]] to be inserted into Kudu
-    * @param overwrite If True it will update existing records, but will not perform inserts.
+    * @param overwrite must be false; otherwise, throws [[UnsupportedOperationException]]
     */
   override def insert(data: DataFrame, overwrite: Boolean): Unit = {
-    context.writeRows(data, tableName, overwrite)
+    if (overwrite) {
+      throw new UnsupportedOperationException("overwrite is not supported")
+    }
+    if (upsert) {
+      context.upsertRows(data, tableName)
+    } else {
+      context.insertRows(data, tableName)
+    }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/95321972/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
index 3a42365..ae463b7 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
@@ -67,20 +67,23 @@ class KuduContext(kuduMaster: String) extends Serializable {
 
   /**
     * Check if kudu table already exists
-    * @param tableName tablename to check
+    *
+    * @param tableName name of table to check
     * @return true if table exists, false if table does not exist
     */
   def tableExists(tableName: String): Boolean = syncClient.tableExists(tableName)
 
   /**
     * Delete kudu table
-    * @param tableName tablename to delete
+    *
+    * @param tableName name of table to delete
     * @return DeleteTableResponse
     */
   def deleteTable(tableName: String): DeleteTableResponse = syncClient.deleteTable(tableName)
 
   /**
     * Creates a kudu table for the given schema. Partitioning can be specified through options.
+    *
     * @param tableName table to create
     * @param schema struct schema of table
     * @param keys primary keys of the table
@@ -120,15 +123,50 @@ class KuduContext(kuduMaster: String) extends Serializable {
   }
 
   /**
-    * Inserts or updates rows in kudu from a [[DataFrame]].
-    * @param data `DataFrame` to insert/update
-    * @param tableName table to perform insertion on
-    * @param overwrite true=update, false=insert
+    * Inserts the rows of a [[DataFrame]] into a Kudu table.
+    *
+    * @param data the data to insert
+    * @param tableName the Kudu table to insert into
+    */
+  def insertRows(data: DataFrame, tableName: String): Unit = {
+    writeRows(data, tableName, table => table.newInsert())
+  }
+
+  /**
+    * Updates a Kudu table with the rows of a [[DataFrame]].
+    *
+    * @param data the data to update into Kudu
+    * @param tableName the Kudu table to update
     */
-  def writeRows(data: DataFrame, tableName: String, overwrite: Boolean) {
+  def updateRows(data: DataFrame, tableName: String): Unit = {
+    writeRows(data, tableName, table => table.newUpdate())
+  }
+
+  /**
+    * Upserts the rows of a [[DataFrame]] into a Kudu table.
+    *
+    * @param data the data to upsert into Kudu
+    * @param tableName the Kudu table to upsert into
+    */
+  def upsertRows(data: DataFrame, tableName: String): Unit = {
+    writeRows(data, tableName, table => table.newUpsert())
+  }
+
+  /**
+    * Deletes the rows of a [[DataFrame]] from a Kudu table.
+    *
+    * @param data the data to delete from Kudu
+    *             note that only the key columns should be specified for deletes
+    * @param tableName
+    */
+  def deleteRows(data: DataFrame, tableName: String): Unit = {
+    writeRows(data, tableName, table => table.newDelete())
+  }
+
+  private def writeRows(data: DataFrame, tableName: String, newOp: KuduTable => Operation) {
     val schema = data.schema
     data.foreachPartition(iterator => {
-      val pendingErrors = writeRows(iterator, schema, tableName, overwrite)
+      val pendingErrors = writePartitionRows(iterator, schema, tableName, newOp)
       val errorCount = pendingErrors.getRowErrors.length
       if (errorCount > 0) {
         val errors = pendingErrors.getRowErrors.take(5).map(_.getErrorStatus).mkString
@@ -138,15 +176,10 @@ class KuduContext(kuduMaster: String) extends Serializable {
     })
   }
 
-  /**
-    * Saves partitions of a [[DataFrame]] into Kudu.
-    * @param rows rows to insert or update
-    * @param tableName table to insert or update on
-    */
-  def writeRows(rows: Iterator[Row],
-                schema: StructType,
-                tableName: String,
-                performAsUpdate : Boolean = false): RowErrorsAndOverflowStatus = {
+  private def writePartitionRows(rows: Iterator[Row],
+                                 schema: StructType,
+                                 tableName: String,
+                                 newOp: KuduTable => Operation): RowErrorsAndOverflowStatus = {
     val table: KuduTable = syncClient.openTable(tableName)
     val kuduSchema = table.getSchema
     val indices: Array[(Int, Int)] = schema.fields.zipWithIndex.map({ case (field, sparkIdx) =>
@@ -154,10 +187,9 @@ class KuduContext(kuduMaster: String) extends Serializable {
     })
     val session: KuduSession = syncClient.newSession
     session.setFlushMode(FlushMode.AUTO_FLUSH_BACKGROUND)
-    session.setIgnoreAllDuplicateRows(true)
     try {
       for (row <- rows) {
-        val operation = if (performAsUpdate) { table.newUpdate() } else { table.newInsert() }
+        val operation = newOp(table)
         for ((sparkIdx, kuduIdx) <- indices) {
           if (row.isNullAt(sparkIdx)) {
             operation.getRow.setNull(kuduIdx)

http://git-wip-us.apache.org/repos/asf/kudu/blob/95321972/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/package.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/package.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/package.scala
index ffc45ed..fe28f70 100755
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/package.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/package.scala
@@ -32,7 +32,7 @@ package object kudu {
     * Adds a method, `kudu`, to DataFrameWriter that allows writes to Kudu using
     * the DataFileWriter
     */
-    implicit class KuduDataFrameWriter(writer: DataFrameWriter) {
-      def kudu = writer.format("org.apache.kudu.spark.kudu").save
-    }
+  implicit class KuduDataFrameWriter(writer: DataFrameWriter) {
+    def kudu = writer.format("org.apache.kudu.spark.kudu").save
+  }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/95321972/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
index 8e5d68e..a7dd209 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
@@ -72,42 +72,36 @@ class DefaultSourceTest extends FunSuite with TestContext with BeforeAndAfter {
   }
 
   test("table creation") {
-    if(kuduContext.tableExists("testcreatetable")) {
-      kuduContext.deleteTable("testcreatetable")
+    val tableName = "testcreatetable"
+    if (kuduContext.tableExists(tableName)) {
+      kuduContext.deleteTable(tableName)
     }
-
     val df = sqlContext.read.options(kuduOptions).kudu
-
-    kuduContext.createTable("testcreatetable", df.schema, Seq("key"),
+    kuduContext.createTable(tableName, df.schema, Seq("key"),
                             new CreateTableOptions().setRangePartitionColumns(List("key").asJava)
                                                     .setNumReplicas(1))
+    kuduContext.insertRows(df, tableName)
 
     // now use new options to refer to the new table name
     val newOptions: Map[String, String] = Map(
-      "kudu.table" -> "testcreatetable",
+      "kudu.table" -> tableName,
       "kudu.master" -> miniCluster.getMasterAddresses)
-
-    df.write.options(newOptions).mode("append").kudu
-
     val checkDf = sqlContext.read.options(newOptions).kudu
 
     assert(checkDf.schema === df.schema)
-
-    assertTrue(kuduContext.tableExists("testcreatetable"))
+    assertTrue(kuduContext.tableExists(tableName))
     assert(checkDf.count == 10)
-    kuduContext.deleteTable("testcreatetable")
 
-    assertFalse(kuduContext.tableExists("testcreatetable"))
+    kuduContext.deleteTable(tableName)
+    assertFalse(kuduContext.tableExists(tableName))
   }
 
   test("insertion") {
-    val df = sqlContext.read.options(      kuduOptions).kudu
+    val df = sqlContext.read.options(kuduOptions).kudu
     val changedDF = df.limit(1).withColumn("key", df("key").plus(100)).withColumn("c2_s", lit("abc"))
-    changedDF.show
-    changedDF.write.options(kuduOptions).mode("append").kudu
+    kuduContext.insertRows(changedDF, tableName)
 
     val newDF = sqlContext.read.options(kuduOptions).kudu
-    newDF.show
     val collected = newDF.filter("key = 100").collect()
     assertEquals("abc", collected(0).getAs[String]("c2_s"))
 
@@ -117,11 +111,9 @@ class DefaultSourceTest extends FunSuite with TestContext with BeforeAndAfter {
   test("insertion multiple") {
     val df = sqlContext.read.options(kuduOptions).kudu
     val changedDF = df.limit(2).withColumn("key", df("key").plus(100)).withColumn("c2_s", lit("abc"))
-    changedDF.show
-    changedDF.write.options(kuduOptions).mode("append").kudu
+    kuduContext.insertRows(changedDF, tableName)
 
     val newDF = sqlContext.read.options(kuduOptions).kudu
-    newDF.show
     val collected = newDF.filter("key = 100").collect()
     assertEquals("abc", collected(0).getAs[String]("c2_s"))
 
@@ -132,24 +124,43 @@ class DefaultSourceTest extends FunSuite with TestContext with BeforeAndAfter {
     deleteRow(101)
   }
 
-  test("update row") {
+  test("upsert rows") {
     val df = sqlContext.read.options(kuduOptions).kudu
     val baseDF = df.limit(1) // filter down to just the first row
-    baseDF.show
+
     // change the c2 string to abc and update
-    val changedDF = baseDF.withColumn("c2_s", lit("abc"))
-    changedDF.show
-    changedDF.write.options(kuduOptions).mode("overwrite").kudu
+    val updateDF = baseDF.withColumn("c2_s", lit("abc"))
+    kuduContext.upsertRows(updateDF, tableName)
 
-    //read the data back
+    // change the key and insert
+    val insertDF = df.limit(1).withColumn("key", df("key").plus(100)).withColumn("c2_s", lit("def"))
+    kuduContext.upsertRows(insertDF, tableName)
+
+    // read the data back
     val newDF = sqlContext.read.options(kuduOptions).kudu
-    newDF.show
-    val collected = newDF.filter("key = 0").collect()
-    assertEquals("abc", collected(0).getAs[String]("c2_s"))
+    val collectedUpdate = newDF.filter("key = 0").collect()
+    assertEquals("abc", collectedUpdate(0).getAs[String]("c2_s"))
+    val collectedInsert = newDF.filter("key = 100").collect()
+    assertEquals("def", collectedInsert(0).getAs[String]("c2_s"))
 
-    //rewrite the original value
-    baseDF.withColumn("c2_s", lit("0")).write.options(kuduOptions)
-      .mode("overwrite").kudu
+    // restore the original state of the table
+    kuduContext.updateRows(baseDF.filter("key = 0").withColumn("c2_s", lit("0")), tableName)
+    deleteRow(100)
+  }
+
+  test("delete rows") {
+    val df = sqlContext.read.options(kuduOptions).kudu
+    val deleteDF = df.filter("key = 0").select("key")
+    kuduContext.deleteRows(deleteDF, tableName)
+
+    // read the data back
+    val newDF = sqlContext.read.options(kuduOptions).kudu
+    val collectedDelete = newDF.filter("key = 0").collect()
+    assertEquals(0, collectedDelete.length)
+
+    // restore the original state of the table
+    val insertDF = df.limit(1).filter("key = 0")
+    kuduContext.insertRows(insertDF, tableName)
   }
 
   test("out of order selection") {
@@ -263,4 +274,65 @@ class DefaultSourceTest extends FunSuite with TestContext with BeforeAndAfter {
     assert(results.get(0).getInt(0).equals(2))
     assert(results.get(0).getString(1).equals("2"))
   }
+
+  test("Test SQL: insert into") {
+    val insertTable = "insertintotest"
+
+    // read 0 rows just to get the schema
+    val df = sqlContext.sql(s"SELECT * FROM $tableName LIMIT 0")
+    kuduContext.createTable(insertTable, df.schema, Seq("key"),
+      new CreateTableOptions().setRangePartitionColumns(List("key").asJava)
+        .setNumReplicas(1))
+
+    val newOptions: Map[String, String] = Map(
+      "kudu.table" -> insertTable,
+      "kudu.master" -> miniCluster.getMasterAddresses)
+    sqlContext.read.options(newOptions).kudu.registerTempTable(insertTable)
+
+    sqlContext.sql(s"INSERT INTO TABLE $insertTable SELECT * FROM $tableName")
+    val results = sqlContext.sql(s"SELECT key FROM $insertTable").collectAsList()
+    assertEquals(10, results.size())
+  }
+
+  test("Test SQL: insert overwrite unsupported") {
+    val insertTable = "insertoverwritetest"
+
+    // read 0 rows just to get the schema
+    val df = sqlContext.sql(s"SELECT * FROM $tableName LIMIT 0")
+    kuduContext.createTable(insertTable, df.schema, Seq("key"),
+      new CreateTableOptions().setRangePartitionColumns(List("key").asJava)
+        .setNumReplicas(1))
+
+    val newOptions: Map[String, String] = Map(
+      "kudu.table" -> insertTable,
+      "kudu.master" -> miniCluster.getMasterAddresses)
+    sqlContext.read.options(newOptions).kudu.registerTempTable(insertTable)
+
+    try {
+      sqlContext.sql(s"INSERT OVERWRITE TABLE $insertTable SELECT * FROM $tableName")
+      fail()
+    } catch {
+      case _: UnsupportedOperationException => // good
+      case _ => fail()
+    }
+  }
+
+  test("Test write using DefaultSource") {
+    val df = sqlContext.read.options(kuduOptions).kudu
+
+    val newTable = "testwritedatasourcetable"
+    kuduContext.createTable(newTable, df.schema, Seq("key"),
+        new CreateTableOptions().setRangePartitionColumns(List("key").asJava)
+          .setNumReplicas(1))
+
+    val newOptions: Map[String, String] = Map(
+      "kudu.table" -> newTable,
+      "kudu.master" -> miniCluster.getMasterAddresses)
+    df.write.options(newOptions).mode("append").kudu
+
+    val checkDf = sqlContext.read.options(newOptions).kudu
+    assert(checkDf.schema === df.schema)
+    assertTrue(kuduContext.tableExists(newTable))
+    assert(checkDf.count == 10)
+  }
 }