You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2019/11/25 03:09:14 UTC

[kudu] 02/02: [spark] Add a test for sink based writing

This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 1d382435d1d2e46d279503ab64225342cf0068ea
Author: Grant Henke <gr...@apache.org>
AuthorDate: Thu Oct 31 09:05:38 2019 -0500

    [spark] Add a test for sink based writing
    
    Adds a test to verify writing via the KuduSink, as opposed to the
    KuduContext, works as expected.
    
    Change-Id: Ic1f28be80ad21b0783d8a0889ad7b1847601442b
    Reviewed-on: http://gerrit.cloudera.org:8080/14603
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 .../apache/kudu/spark/kudu/DefaultSourceTest.scala | 37 ++++++++++++++++++++++
 1 file changed, 37 insertions(+)

diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
index 6dca719..b2226e5 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
@@ -20,6 +20,7 @@ import scala.collection.JavaConverters._
 import scala.collection.immutable.IndexedSeq
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types.DataTypes
 import org.apache.spark.sql.types.StructField
@@ -338,6 +339,42 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
   }
 
   @Test
+  def testWriteWithSink() {
+    val df = sqlContext.read.options(kuduOptions).format("kudu").load
+    val baseDF = df.limit(1) // Filter down to just the first row.
+
+    // Change the c2 string to abc and upsert.
+    val upsertDF = baseDF.withColumn("c2_s", lit("abc"))
+    upsertDF.write
+      .format("kudu")
+      .option("kudu.master", harness.getMasterAddressesAsString)
+      .option("kudu.table", tableName)
+      // Default kudu.operation is upsert.
+      .mode(SaveMode.Append)
+      .save()
+
+    // Change the key and insert.
+    val insertDF = df
+      .limit(1)
+      .withColumn("key", df("key").plus(100))
+      .withColumn("c2_s", lit("def"))
+    insertDF.write
+      .format("kudu")
+      .option("kudu.master", harness.getMasterAddressesAsString)
+      .option("kudu.table", tableName)
+      .option("kudu.operation", "insert")
+      .mode(SaveMode.Append)
+      .save()
+
+    // Read the data back.
+    val newDF = sqlContext.read.options(kuduOptions).format("kudu").load
+    val collectedUpdate = newDF.filter("key = 0").collect()
+    assertEquals("abc", collectedUpdate(0).getAs[String]("c2_s"))
+    val collectedInsert = newDF.filter("key = 100").collect()
+    assertEquals("def", collectedInsert(0).getAs[String]("c2_s"))
+  }
+
+  @Test
   def testUpsertRowsIgnoreNulls() {
     val nonNullDF =
       sqlContext.createDataFrame(Seq((0, "foo"))).toDF("key", "val")