You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kudu.apache.org by Dan Burkert <da...@apache.org> on 2016/08/12 00:41:59 UTC

Kudu Spark integration updates

Hi all,

Will Berkeley has been working on some proposed updates updates to the Kudu
Spark connector (see the code in review here
<https://gerrit.cloudera.org/#/c/3871>).  The high level overview of the
patch is

* Explicit support for inserting, dropping, upserting, and deleting rows in
DataFrames into Kudu tables (via methods on KuduContext).

* Possibly dropping the implementation of CreatableRelationProvider.  I
originally suggested this because the Spark connector can't create a table
without specifying many table properties that aren't part of the
interface.  Since then, I've been convinced by Chris George that this is a
bad idea in terms of usability, and that we should just limit the
implementation to only support the "append" save mode, and error if the
table doesn't exist.

If you use the Kudu Spark connector, please take a look at the patch and
weigh in!  We're trying to get this in for the upcoming 0.10 release.

- Dan

Re: Kudu Spark integration updates

Posted by Dan Burkert <da...@cloudera.com>.
Another big change in that patch is changing the default writing behavior
from Insert with ignore duplicates to Upsert.​

I've been thinking about this a bit, and I think it would be nice if we
could use the API to do upserts or inserts or updates or deletes (with the
default being upsert).  Towards that end I put together a patch that builds
off Will's patch that makes this a configurable parameter:

This example starts with an empty kudu table t, created as:

CREATE TABLE t (a INT32 NOT NULL, b INT32 NOT NULL, c INT32 NOT NULL)
PRIMARY KEY (a)
DISTRIBUTE BY HASH (a) INTO 4 BUCKETS
WITH 1 REPLICA;

Spark Shell output:

scala> sqlContext.read.option("kudu.table", "t").kudu.show
+---+---+---+
|  a|  b|  c|
+---+---+---+
+---+---+---+

// Define some data frames for writing into the table

scala> val insertDF = Seq((1, 1, 1), (2, 2, 2)).toDF(Array("a", "b", "c"):
_*)
insertDF: org.apache.spark.sql.DataFrame = [a: int, b: int, c: int]

scala> val upsertDF = Seq((1, 101, 101), (3, 3, 3)).toDF(Array("a", "b",
"c"): _*)
upsertDF: org.apache.spark.sql.DataFrame = [a: int, b: int, c: int]

scala> val deleteDF = sqlContext.range(1, 2).map(_.getLong(0).toInt).
toDF("a")
deleteDF: org.apache.spark.sql.DataFrame = [a: int]

// Do some inserts/upserts/deletes

scala> insertDF.write.option("kudu.table", "t").option("kudu.operation",
"insert").mode("append").kudu

scala> sqlContext.read.option("kudu.table", "t").kudu.show
+---+---+---+
|  a|  b|  c|
+---+---+---+
|  1|  1|  1|
|  2|  2|  2|
+---+---+---+

// Attempting the insert again will fail due to duplicate key constraint
violations

scala> insertDF.write.option("kudu.table", "t").option("kudu.operation",
"insert").mode("append").kudu
16/08/11 18:03:45 ERROR Executor: Exception in task 7.0 in stage 34.0 (TID
131)
java.lang.RuntimeException: failed to write 1 rows from DataFrame to Kudu;
sample errors: Already present: key already present (error 0)

scala> upsertDF.write.option("kudu.table", "t").option("kudu.operation",
"upsert").mode("append").kudu

scala> sqlContext.read.option("kudu.table", "t").kudu.show
+---+---+---+
|  a|  b|  c|
+---+---+---+
|  1|101|101|
|  2|  2|  2|
|  3|  3|  3|
+---+---+---+

scala> deleteDF.write.option("kudu.table", "t").option("kudu.operation",
"delete").mode("append").kudu

scala> sqlContext.read.option("kudu.table", "t").kudu.show
+---+---+---+
|  a|  b|  c|
+---+---+---+
|  2|  2|  2|
|  3|  3|  3|
+---+---+---+

The patch is here
<https://github.com/danburkert/kudu/commit/1970bc7e490eabc02844a0761e5b90bb759b7470>.
Comments welcome.

- Dan

On Thu, Aug 11, 2016 at 5:41 PM, Dan Burkert <da...@apache.org> wrote:

> Hi all,
>
> Will Berkeley has been working on some proposed updates updates to the
> Kudu Spark connector (see the code in review here
> <https://gerrit.cloudera.org/#/c/3871>).  The high level overview of the
> patch is
>
> * Explicit support for inserting, dropping, upserting, and deleting rows
> in DataFrames into Kudu tables (via methods on KuduContext).
>
> * Possibly dropping the implementation of CreatableRelationProvider.  I
> originally suggested this because the Spark connector can't create a table
> without specifying many table properties that aren't part of the
> interface.  Since then, I've been convinced by Chris George that this is a
> bad idea in terms of usability, and that we should just limit the
> implementation to only support the "append" save mode, and error if the
> table doesn't exist.
>
> If you use the Kudu Spark connector, please take a look at the patch and
> weigh in!  We're trying to get this in for the upcoming 0.10 release.
>
> - Dan
>