You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2019/11/25 03:09:14 UTC
[kudu] 02/02: [spark] Add a test for sink based writing
This is an automated email from the ASF dual-hosted git repository.
granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 1d382435d1d2e46d279503ab64225342cf0068ea
Author: Grant Henke <gr...@apache.org>
AuthorDate: Thu Oct 31 09:05:38 2019 -0500
[spark] Add a test for sink based writing
Adds a test to verify writing via the KuduSink, as opposed to the
KuduContext, works as expected.
Change-Id: Ic1f28be80ad21b0783d8a0889ad7b1847601442b
Reviewed-on: http://gerrit.cloudera.org:8080/14603
Tested-by: Kudu Jenkins
Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
.../apache/kudu/spark/kudu/DefaultSourceTest.scala | 37 ++++++++++++++++++++++
1 file changed, 37 insertions(+)
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
index 6dca719..b2226e5 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
@@ -20,6 +20,7 @@ import scala.collection.JavaConverters._
import scala.collection.immutable.IndexedSeq
import org.apache.spark.sql.Row
import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.DataTypes
import org.apache.spark.sql.types.StructField
@@ -338,6 +339,42 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
}
@Test
+ def testWriteWithSink() {
+ val df = sqlContext.read.options(kuduOptions).format("kudu").load
+ val baseDF = df.limit(1) // Filter down to just the first row.
+
+ // Change the c2 string to abc and upsert.
+ val upsertDF = baseDF.withColumn("c2_s", lit("abc"))
+ upsertDF.write
+ .format("kudu")
+ .option("kudu.master", harness.getMasterAddressesAsString)
+ .option("kudu.table", tableName)
+ // Default kudu.operation is upsert.
+ .mode(SaveMode.Append)
+ .save()
+
+ // Change the key and insert.
+ val insertDF = df
+ .limit(1)
+ .withColumn("key", df("key").plus(100))
+ .withColumn("c2_s", lit("def"))
+ insertDF.write
+ .format("kudu")
+ .option("kudu.master", harness.getMasterAddressesAsString)
+ .option("kudu.table", tableName)
+ .option("kudu.operation", "insert")
+ .mode(SaveMode.Append)
+ .save()
+
+ // Read the data back.
+ val newDF = sqlContext.read.options(kuduOptions).format("kudu").load
+ val collectedUpdate = newDF.filter("key = 0").collect()
+ assertEquals("abc", collectedUpdate(0).getAs[String]("c2_s"))
+ val collectedInsert = newDF.filter("key = 100").collect()
+ assertEquals("def", collectedInsert(0).getAs[String]("c2_s"))
+ }
+
+ @Test
def testUpsertRowsIgnoreNulls() {
val nonNullDF =
sqlContext.createDataFrame(Seq((0, "foo"))).toDF("key", "val")