You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by da...@apache.org on 2016/08/18 19:27:27 UTC
[3/3] kudu git commit: [spark] Add insert-ignore, update,
and delete as write options
[spark] Add insert-ignore, update, and delete as write options
Change-Id: I2781104c8a655da0287977b93188e9a65e7d68bb
Reviewed-on: http://gerrit.cloudera.org:8080/4016
Tested-by: Kudu Jenkins
Reviewed-by: Will Berkeley <wd...@gmail.com>
Reviewed-by: Todd Lipcon <to...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/9acba45a
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/9acba45a
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/9acba45a
Branch: refs/heads/master
Commit: 9acba45ac147fbae5349b7bc559a75ca41c4084b
Parents: 8e09910
Author: Dan Burkert <da...@cloudera.com>
Authored: Mon Aug 15 18:51:37 2016 -0700
Committer: Dan Burkert <da...@cloudera.com>
Committed: Thu Aug 18 19:26:55 2016 +0000
----------------------------------------------------------------------
.../apache/kudu/spark/kudu/DefaultSource.scala | 60 +++++++++++---------
.../apache/kudu/spark/kudu/KuduContext.scala | 40 ++++++++-----
.../org/apache/kudu/spark/kudu/KuduRDD.scala | 10 ++--
.../apache/kudu/spark/kudu/OperationType.scala | 44 ++++++++++++++
.../org/apache/kudu/spark/kudu/package.scala | 2 +-
.../kudu/spark/kudu/DefaultSourceTest.scala | 23 ++++++++
6 files changed, 131 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/9acba45a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
index a3384b2..286a75c 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
@@ -27,13 +27,16 @@ import org.apache.kudu.Type
import org.apache.kudu.annotations.InterfaceStability
import org.apache.kudu.client._
import org.apache.kudu.client.KuduPredicate.ComparisonOp
-import org.apache.spark.sql.SaveMode._
import scala.collection.JavaConverters._
/**
- * DefaultSource for integration with Spark's dataframe datasources.
- * This class will produce a relationProvider based on input given to it from Spark.
+ * Data source for integration with Spark's [[DataFrame]] API.
+ *
+ * Serves as a factory for [[KuduRelation]] instances for Spark. Spark will
+ * automatically look for a [[RelationProvider]] implementation named
+ * `DefaultSource` when the user specifies the path of a source during DDL
+ * operations through [[org.apache.spark.sql.DataFrameReader.format]].
*/
@InterfaceStability.Unstable
class DefaultSource extends RelationProvider with CreatableRelationProvider {
@@ -53,16 +56,21 @@ class DefaultSource extends RelationProvider with CreatableRelationProvider {
parameters: Map[String, String]):
BaseRelation = {
val tableName = parameters.getOrElse(TABLE_KEY,
- throw new IllegalArgumentException(s"Kudu table name must be specified in create options " +
- s"using key '$TABLE_KEY'"))
+ throw new IllegalArgumentException(
+ s"Kudu table name must be specified in create options using key '$TABLE_KEY'"))
val kuduMaster = parameters.getOrElse(KUDU_MASTER, "localhost")
- val upsert = parameters.getOrElse(OPERATION, "upsert").toLowerCase match {
- case "upsert" => true
- case "insert" => false
- case _ => throw new UnsupportedOperationException(s"$OPERATION must be upsert or insert")
+
+ val opParam = parameters.getOrElse(OPERATION, "upsert")
+ val operationType = opParam.toLowerCase match {
+ case "insert" => Insert
+ case "insert-ignore" => InsertIgnore
+ case "upsert" => Upsert
+ case "update" => Update
+ case "delete" => Delete
+ case _ => throw new IllegalArgumentException(s"Unsupported operation type '$opParam'")
}
- new KuduRelation(tableName, kuduMaster, upsert)(sqlContext)
+ new KuduRelation(tableName, kuduMaster, operationType)(sqlContext)
}
/**
@@ -79,7 +87,7 @@ class DefaultSource extends RelationProvider with CreatableRelationProvider {
parameters: Map[String, String], data: DataFrame): BaseRelation = {
val kuduRelation = createRelation(sqlContext, parameters)
mode match {
- case Append => kuduRelation.asInstanceOf[KuduRelation].insert(data, false)
+ case SaveMode.Append => kuduRelation.asInstanceOf[KuduRelation].insert(data, false)
case _ => throw new UnsupportedOperationException(
"Currently, only Append is supported")
}
@@ -92,14 +100,14 @@ class DefaultSource extends RelationProvider with CreatableRelationProvider {
* Implementation of Spark BaseRelation.
*
* @param tableName Kudu table that we plan to read from
- * @param kuduMaster Kudu master addresses
- * @param upsert Whether the relation will be inserted or upserted by default
+ * @param masterAddrs Kudu master addresses
+ * @param operationType The default operation type to perform when writing to the relation
* @param sqlContext SparkSQL context
*/
@InterfaceStability.Unstable
class KuduRelation(private val tableName: String,
- private val kuduMaster: String,
- private val upsert: Boolean)(
+ private val masterAddrs: String,
+ private val operationType: OperationType)(
val sqlContext: SQLContext)
extends BaseRelation
with PrunedFilteredScan
@@ -107,7 +115,7 @@ with InsertableRelation {
import KuduRelation._
- private val context: KuduContext = new KuduContext(kuduMaster)
+ private val context: KuduContext = new KuduContext(masterAddrs)
private val table: KuduTable = context.syncClient.openTable(tableName)
override def unhandledFilters(filters: Array[Filter]): Array[Filter] =
@@ -123,7 +131,7 @@ with InsertableRelation {
val fields: Array[StructField] =
table.getSchema.getColumns.asScala.map { columnSchema =>
val sparkType = kuduTypeToSparkType(columnSchema.getType)
- new StructField(columnSchema.getName, sparkType, columnSchema.isNullable)
+ StructField(columnSchema.getName, sparkType, columnSchema.isNullable)
}.toArray
new StructType(fields)
@@ -138,8 +146,8 @@ with InsertableRelation {
*/
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
val predicates = filters.flatMap(filterToPredicate)
- new KuduRDD(kuduMaster, 1024 * 1024 * 20, requiredColumns, predicates,
- table, context, sqlContext.sparkContext)
+ new KuduRDD(masterAddrs, 1024 * 1024 * 20, requiredColumns, predicates,
+ table, context, sqlContext.sparkContext)
}
/**
@@ -192,21 +200,19 @@ with InsertableRelation {
}
/**
- * By default, upserts data into an existing Kudu table.
- * If the kudu.upsert parameter is set to false, data is inserted instead of upserted.
+ * Writes data into an existing Kudu table.
+ *
+ * If the `kudu.operation` parameter is set, the data will use that operation
+ * type. If the parameter is unset, the data will be upserted.
*
* @param data [[DataFrame]] to be inserted into Kudu
* @param overwrite must be false; otherwise, throws [[UnsupportedOperationException]]
*/
override def insert(data: DataFrame, overwrite: Boolean): Unit = {
if (overwrite) {
- throw new UnsupportedOperationException("overwrite is not supported")
- }
- if (upsert) {
- context.upsertRows(data, tableName)
- } else {
- context.insertRows(data, tableName)
+ throw new UnsupportedOperationException("overwrite is not yet supported")
}
+ context.writeRows(data, tableName, operationType)
}
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/9acba45a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
index ae463b7..0fa9df9 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
@@ -129,17 +129,18 @@ class KuduContext(kuduMaster: String) extends Serializable {
* @param tableName the Kudu table to insert into
*/
def insertRows(data: DataFrame, tableName: String): Unit = {
- writeRows(data, tableName, table => table.newInsert())
+ writeRows(data, tableName, Insert)
}
/**
- * Updates a Kudu table with the rows of a [[DataFrame]].
+ * Inserts the rows of a [[DataFrame]] into a Kudu table, ignoring any new
+ * rows that have a primary key conflict with existing rows.
*
- * @param data the data to update into Kudu
- * @param tableName the Kudu table to update
+ * @param data the data to insert into Kudu
+ * @param tableName the Kudu table to insert into
*/
- def updateRows(data: DataFrame, tableName: String): Unit = {
- writeRows(data, tableName, table => table.newUpdate())
+ def insertIgnoreRows(data: DataFrame, tableName: String): Unit = {
+ writeRows(data, tableName, InsertIgnore)
}
/**
@@ -149,7 +150,17 @@ class KuduContext(kuduMaster: String) extends Serializable {
* @param tableName the Kudu table to upsert into
*/
def upsertRows(data: DataFrame, tableName: String): Unit = {
- writeRows(data, tableName, table => table.newUpsert())
+ writeRows(data, tableName, Upsert)
+ }
+
+ /**
+ * Updates a Kudu table with the rows of a [[DataFrame]].
+ *
+ * @param data the data to update into Kudu
+ * @param tableName the Kudu table to update
+ */
+ def updateRows(data: DataFrame, tableName: String): Unit = {
+ writeRows(data, tableName, Update)
}
/**
@@ -157,16 +168,16 @@ class KuduContext(kuduMaster: String) extends Serializable {
*
* @param data the data to delete from Kudu
* note that only the key columns should be specified for deletes
- * @param tableName
+ * @param tableName The Kudu tabe to delete from
*/
def deleteRows(data: DataFrame, tableName: String): Unit = {
- writeRows(data, tableName, table => table.newDelete())
+ writeRows(data, tableName, Delete)
}
- private def writeRows(data: DataFrame, tableName: String, newOp: KuduTable => Operation) {
+ private[kudu] def writeRows(data: DataFrame, tableName: String, operation: OperationType) {
val schema = data.schema
data.foreachPartition(iterator => {
- val pendingErrors = writePartitionRows(iterator, schema, tableName, newOp)
+ val pendingErrors = writePartitionRows(iterator, schema, tableName, operation)
val errorCount = pendingErrors.getRowErrors.length
if (errorCount > 0) {
val errors = pendingErrors.getRowErrors.take(5).map(_.getErrorStatus).mkString
@@ -179,17 +190,17 @@ class KuduContext(kuduMaster: String) extends Serializable {
private def writePartitionRows(rows: Iterator[Row],
schema: StructType,
tableName: String,
- newOp: KuduTable => Operation): RowErrorsAndOverflowStatus = {
+ operationType: OperationType): RowErrorsAndOverflowStatus = {
val table: KuduTable = syncClient.openTable(tableName)
- val kuduSchema = table.getSchema
val indices: Array[(Int, Int)] = schema.fields.zipWithIndex.map({ case (field, sparkIdx) =>
sparkIdx -> table.getSchema.getColumnIndex(field.name)
})
val session: KuduSession = syncClient.newSession
session.setFlushMode(FlushMode.AUTO_FLUSH_BACKGROUND)
+ session.setIgnoreAllDuplicateRows(operationType.ignoreDuplicateRowErrors)
try {
for (row <- rows) {
- val operation = newOp(table)
+ val operation = operationType.operation(table)
for ((sparkIdx, kuduIdx) <- indices) {
if (row.isNullAt(sparkIdx)) {
operation.getRow.setNull(kuduIdx)
@@ -214,7 +225,6 @@ class KuduContext(kuduMaster: String) extends Serializable {
}
session.getPendingErrors
}
-
}
private object KuduConnection {
http://git-wip-us.apache.org/repos/asf/kudu/blob/9acba45a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
index b3b69ec..bfd0b55 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
@@ -50,10 +50,10 @@ class KuduRDD(val kuduMaster: String,
override protected def getPartitions: Array[Partition] = {
val builder = kuduContext.syncClient
- .newScanTokenBuilder(table)
- .batchSizeBytes(batchSize)
- .setProjectedColumnNames(projectedCols.toSeq.asJava)
- .cacheBlocks(true)
+ .newScanTokenBuilder(table)
+ .batchSizeBytes(batchSize)
+ .setProjectedColumnNames(projectedCols.toSeq.asJava)
+ .cacheBlocks(true)
for (predicate <- predicates) {
builder.addPredicate(predicate)
@@ -83,7 +83,7 @@ class KuduRDD(val kuduMaster: String,
*/
private[spark] class KuduPartition(val index: Int,
val scanToken: Array[Byte],
- val locations : Array[String]) extends Partition {}
+ val locations: Array[String]) extends Partition {}
/**
* A Spark SQL [[Row]] iterator which wraps a [[KuduScanner]].
http://git-wip-us.apache.org/repos/asf/kudu/blob/9acba45a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/OperationType.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/OperationType.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/OperationType.scala
new file mode 100644
index 0000000..fd23d05
--- /dev/null
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/OperationType.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kudu.spark.kudu
+
+import org.apache.kudu.client.{KuduTable, Operation}
+
+/**
+ * OperationType enumerates the types of Kudu write operations.
+ */
+private[kudu] sealed trait OperationType {
+ def operation(table: KuduTable): Operation
+ def ignoreDuplicateRowErrors: Boolean = false
+}
+private[kudu] case object Insert extends OperationType {
+ override def operation(table: KuduTable): Operation = table.newInsert()
+}
+private[kudu] case object InsertIgnore extends OperationType {
+ override def operation(table: KuduTable): Operation = table.newInsert()
+ override def ignoreDuplicateRowErrors: Boolean = true
+}
+private[kudu] case object Update extends OperationType {
+ override def operation(table: KuduTable): Operation = table.newUpdate()
+}
+private[kudu] case object Upsert extends OperationType {
+ override def operation(table: KuduTable): Operation = table.newUpsert()
+}
+private[kudu] case object Delete extends OperationType {
+ override def operation(table: KuduTable): Operation = table.newDelete()
+}
http://git-wip-us.apache.org/repos/asf/kudu/blob/9acba45a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/package.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/package.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/package.scala
index fe28f70..6f49023 100755
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/package.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/package.scala
@@ -35,4 +35,4 @@ package object kudu {
implicit class KuduDataFrameWriter(writer: DataFrameWriter) {
def kudu = writer.format("org.apache.kudu.spark.kudu").save
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kudu/blob/9acba45a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
index a7dd209..2dfce76 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
@@ -124,6 +124,29 @@ class DefaultSourceTest extends FunSuite with TestContext with BeforeAndAfter {
deleteRow(101)
}
+ test("insert ignore rows") {
+ val df = sqlContext.read.options(kuduOptions).kudu
+ val baseDF = df.limit(1) // filter down to just the first row
+
+ // change the c2 string to abc and insert
+ val updateDF = baseDF.withColumn("c2_s", lit("abc"))
+ kuduContext.insertIgnoreRows(updateDF, tableName)
+
+ // change the key and insert
+ val insertDF = df.limit(1).withColumn("key", df("key").plus(100)).withColumn("c2_s", lit("def"))
+ kuduContext.insertIgnoreRows(insertDF, tableName)
+
+ // read the data back
+ val newDF = sqlContext.read.options(kuduOptions).kudu
+ val collectedUpdate = newDF.filter("key = 0").collect()
+ assertEquals("0", collectedUpdate(0).getAs[String]("c2_s"))
+ val collectedInsert = newDF.filter("key = 100").collect()
+ assertEquals("def", collectedInsert(0).getAs[String]("c2_s"))
+
+ // restore the original state of the table
+ deleteRow(100)
+ }
+
test("upsert rows") {
val df = sqlContext.read.options(kuduOptions).kudu
val baseDF = df.limit(1) // filter down to just the first row