You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by da...@apache.org on 2016/08/18 19:27:27 UTC

[3/3] kudu git commit: [spark] Add insert-ignore, update, and delete as write options

[spark] Add insert-ignore, update, and delete as write options

Change-Id: I2781104c8a655da0287977b93188e9a65e7d68bb
Reviewed-on: http://gerrit.cloudera.org:8080/4016
Tested-by: Kudu Jenkins
Reviewed-by: Will Berkeley <wd...@gmail.com>
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/9acba45a
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/9acba45a
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/9acba45a

Branch: refs/heads/master
Commit: 9acba45ac147fbae5349b7bc559a75ca41c4084b
Parents: 8e09910
Author: Dan Burkert <da...@cloudera.com>
Authored: Mon Aug 15 18:51:37 2016 -0700
Committer: Dan Burkert <da...@cloudera.com>
Committed: Thu Aug 18 19:26:55 2016 +0000

----------------------------------------------------------------------
 .../apache/kudu/spark/kudu/DefaultSource.scala  | 60 +++++++++++---------
 .../apache/kudu/spark/kudu/KuduContext.scala    | 40 ++++++++-----
 .../org/apache/kudu/spark/kudu/KuduRDD.scala    | 10 ++--
 .../apache/kudu/spark/kudu/OperationType.scala  | 44 ++++++++++++++
 .../org/apache/kudu/spark/kudu/package.scala    |  2 +-
 .../kudu/spark/kudu/DefaultSourceTest.scala     | 23 ++++++++
 6 files changed, 131 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/9acba45a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
index a3384b2..286a75c 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
@@ -27,13 +27,16 @@ import org.apache.kudu.Type
 import org.apache.kudu.annotations.InterfaceStability
 import org.apache.kudu.client._
 import org.apache.kudu.client.KuduPredicate.ComparisonOp
-import org.apache.spark.sql.SaveMode._
 
 import scala.collection.JavaConverters._
 
 /**
-  * DefaultSource for integration with Spark's dataframe datasources.
-  * This class will produce a relationProvider based on input given to it from Spark.
+  * Data source for integration with Spark's [[DataFrame]] API.
+  *
+  * Serves as a factory for [[KuduRelation]] instances for Spark. Spark will
+  * automatically look for a [[RelationProvider]] implementation named
+  * `DefaultSource` when the user specifies the path of a source during DDL
+  * operations through [[org.apache.spark.sql.DataFrameReader.format]].
   */
 @InterfaceStability.Unstable
 class DefaultSource extends RelationProvider with CreatableRelationProvider {
@@ -53,16 +56,21 @@ class DefaultSource extends RelationProvider with CreatableRelationProvider {
                               parameters: Map[String, String]):
   BaseRelation = {
     val tableName = parameters.getOrElse(TABLE_KEY,
-      throw new IllegalArgumentException(s"Kudu table name must be specified in create options " +
-        s"using key '$TABLE_KEY'"))
+      throw new IllegalArgumentException(
+        s"Kudu table name must be specified in create options using key '$TABLE_KEY'"))
     val kuduMaster = parameters.getOrElse(KUDU_MASTER, "localhost")
-    val upsert = parameters.getOrElse(OPERATION, "upsert").toLowerCase match {
-      case "upsert" => true
-      case "insert" => false
-      case _ => throw new UnsupportedOperationException(s"$OPERATION must be upsert or insert")
+
+    val opParam = parameters.getOrElse(OPERATION, "upsert")
+    val operationType = opParam.toLowerCase match {
+      case "insert" => Insert
+      case "insert-ignore" => InsertIgnore
+      case "upsert" => Upsert
+      case "update" => Update
+      case "delete" => Delete
+      case _ => throw new IllegalArgumentException(s"Unsupported operation type '$opParam'")
     }
 
-    new KuduRelation(tableName, kuduMaster, upsert)(sqlContext)
+    new KuduRelation(tableName, kuduMaster, operationType)(sqlContext)
   }
 
   /**
@@ -79,7 +87,7 @@ class DefaultSource extends RelationProvider with CreatableRelationProvider {
                               parameters: Map[String, String], data: DataFrame): BaseRelation = {
     val kuduRelation = createRelation(sqlContext, parameters)
     mode match {
-      case Append => kuduRelation.asInstanceOf[KuduRelation].insert(data, false)
+      case SaveMode.Append => kuduRelation.asInstanceOf[KuduRelation].insert(data, false)
       case _ => throw new UnsupportedOperationException(
         "Currently, only Append is supported")
     }
@@ -92,14 +100,14 @@ class DefaultSource extends RelationProvider with CreatableRelationProvider {
   * Implementation of Spark BaseRelation.
   *
   * @param tableName Kudu table that we plan to read from
-  * @param kuduMaster Kudu master addresses
-  * @param upsert Whether the relation will be inserted or upserted by default
+  * @param masterAddrs Kudu master addresses
+  * @param operationType The default operation type to perform when writing to the relation
   * @param sqlContext SparkSQL context
   */
 @InterfaceStability.Unstable
 class KuduRelation(private val tableName: String,
-                   private val kuduMaster: String,
-                   private val upsert: Boolean)(
+                   private val masterAddrs: String,
+                   private val operationType: OperationType)(
                    val sqlContext: SQLContext)
 extends BaseRelation
 with PrunedFilteredScan
@@ -107,7 +115,7 @@ with InsertableRelation {
 
   import KuduRelation._
 
-  private val context: KuduContext = new KuduContext(kuduMaster)
+  private val context: KuduContext = new KuduContext(masterAddrs)
   private val table: KuduTable = context.syncClient.openTable(tableName)
 
   override def unhandledFilters(filters: Array[Filter]): Array[Filter] =
@@ -123,7 +131,7 @@ with InsertableRelation {
     val fields: Array[StructField] =
       table.getSchema.getColumns.asScala.map { columnSchema =>
         val sparkType = kuduTypeToSparkType(columnSchema.getType)
-        new StructField(columnSchema.getName, sparkType, columnSchema.isNullable)
+        StructField(columnSchema.getName, sparkType, columnSchema.isNullable)
       }.toArray
 
     new StructType(fields)
@@ -138,8 +146,8 @@ with InsertableRelation {
     */
   override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
     val predicates = filters.flatMap(filterToPredicate)
-    new KuduRDD(kuduMaster, 1024 * 1024 * 20, requiredColumns, predicates,
-      table, context, sqlContext.sparkContext)
+    new KuduRDD(masterAddrs, 1024 * 1024 * 20, requiredColumns, predicates,
+                table, context, sqlContext.sparkContext)
   }
 
   /**
@@ -192,21 +200,19 @@ with InsertableRelation {
   }
 
   /**
-    * By default, upserts data into an existing Kudu table.
-    * If the kudu.upsert parameter is set to false, data is inserted instead of upserted.
+    * Writes data into an existing Kudu table.
+    *
+    * If the `kudu.operation` parameter is set, the data will use that operation
+    * type. If the parameter is unset, the data will be upserted.
     *
     * @param data [[DataFrame]] to be inserted into Kudu
     * @param overwrite must be false; otherwise, throws [[UnsupportedOperationException]]
     */
   override def insert(data: DataFrame, overwrite: Boolean): Unit = {
     if (overwrite) {
-      throw new UnsupportedOperationException("overwrite is not supported")
-    }
-    if (upsert) {
-      context.upsertRows(data, tableName)
-    } else {
-      context.insertRows(data, tableName)
+      throw new UnsupportedOperationException("overwrite is not yet supported")
     }
+    context.writeRows(data, tableName, operationType)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/9acba45a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
index ae463b7..0fa9df9 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
@@ -129,17 +129,18 @@ class KuduContext(kuduMaster: String) extends Serializable {
     * @param tableName the Kudu table to insert into
     */
   def insertRows(data: DataFrame, tableName: String): Unit = {
-    writeRows(data, tableName, table => table.newInsert())
+    writeRows(data, tableName, Insert)
   }
 
   /**
-    * Updates a Kudu table with the rows of a [[DataFrame]].
+    * Inserts the rows of a [[DataFrame]] into a Kudu table, ignoring any new
+    * rows that have a primary key conflict with existing rows.
     *
-    * @param data the data to update into Kudu
-    * @param tableName the Kudu table to update
+    * @param data the data to insert into Kudu
+    * @param tableName the Kudu table to insert into
     */
-  def updateRows(data: DataFrame, tableName: String): Unit = {
-    writeRows(data, tableName, table => table.newUpdate())
+  def insertIgnoreRows(data: DataFrame, tableName: String): Unit = {
+    writeRows(data, tableName, InsertIgnore)
   }
 
   /**
@@ -149,7 +150,17 @@ class KuduContext(kuduMaster: String) extends Serializable {
     * @param tableName the Kudu table to upsert into
     */
   def upsertRows(data: DataFrame, tableName: String): Unit = {
-    writeRows(data, tableName, table => table.newUpsert())
+    writeRows(data, tableName, Upsert)
+  }
+
+  /**
+    * Updates a Kudu table with the rows of a [[DataFrame]].
+    *
+    * @param data the data to update into Kudu
+    * @param tableName the Kudu table to update
+    */
+  def updateRows(data: DataFrame, tableName: String): Unit = {
+    writeRows(data, tableName, Update)
   }
 
   /**
@@ -157,16 +168,16 @@ class KuduContext(kuduMaster: String) extends Serializable {
     *
     * @param data the data to delete from Kudu
     *             note that only the key columns should be specified for deletes
-    * @param tableName
+    * @param tableName The Kudu tabe to delete from
     */
   def deleteRows(data: DataFrame, tableName: String): Unit = {
-    writeRows(data, tableName, table => table.newDelete())
+    writeRows(data, tableName, Delete)
   }
 
-  private def writeRows(data: DataFrame, tableName: String, newOp: KuduTable => Operation) {
+  private[kudu] def writeRows(data: DataFrame, tableName: String, operation: OperationType) {
     val schema = data.schema
     data.foreachPartition(iterator => {
-      val pendingErrors = writePartitionRows(iterator, schema, tableName, newOp)
+      val pendingErrors = writePartitionRows(iterator, schema, tableName, operation)
       val errorCount = pendingErrors.getRowErrors.length
       if (errorCount > 0) {
         val errors = pendingErrors.getRowErrors.take(5).map(_.getErrorStatus).mkString
@@ -179,17 +190,17 @@ class KuduContext(kuduMaster: String) extends Serializable {
   private def writePartitionRows(rows: Iterator[Row],
                                  schema: StructType,
                                  tableName: String,
-                                 newOp: KuduTable => Operation): RowErrorsAndOverflowStatus = {
+                                 operationType: OperationType): RowErrorsAndOverflowStatus = {
     val table: KuduTable = syncClient.openTable(tableName)
-    val kuduSchema = table.getSchema
     val indices: Array[(Int, Int)] = schema.fields.zipWithIndex.map({ case (field, sparkIdx) =>
       sparkIdx -> table.getSchema.getColumnIndex(field.name)
     })
     val session: KuduSession = syncClient.newSession
     session.setFlushMode(FlushMode.AUTO_FLUSH_BACKGROUND)
+    session.setIgnoreAllDuplicateRows(operationType.ignoreDuplicateRowErrors)
     try {
       for (row <- rows) {
-        val operation = newOp(table)
+        val operation = operationType.operation(table)
         for ((sparkIdx, kuduIdx) <- indices) {
           if (row.isNullAt(sparkIdx)) {
             operation.getRow.setNull(kuduIdx)
@@ -214,7 +225,6 @@ class KuduContext(kuduMaster: String) extends Serializable {
     }
     session.getPendingErrors
   }
-
 }
 
 private object KuduConnection {

http://git-wip-us.apache.org/repos/asf/kudu/blob/9acba45a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
index b3b69ec..bfd0b55 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
@@ -50,10 +50,10 @@ class KuduRDD(val kuduMaster: String,
 
   override protected def getPartitions: Array[Partition] = {
     val builder = kuduContext.syncClient
-                         .newScanTokenBuilder(table)
-                         .batchSizeBytes(batchSize)
-                         .setProjectedColumnNames(projectedCols.toSeq.asJava)
-                         .cacheBlocks(true)
+                             .newScanTokenBuilder(table)
+                             .batchSizeBytes(batchSize)
+                             .setProjectedColumnNames(projectedCols.toSeq.asJava)
+                             .cacheBlocks(true)
 
     for (predicate <- predicates) {
       builder.addPredicate(predicate)
@@ -83,7 +83,7 @@ class KuduRDD(val kuduMaster: String,
   */
 private[spark] class KuduPartition(val index: Int,
                                    val scanToken: Array[Byte],
-                                   val locations : Array[String]) extends Partition {}
+                                   val locations: Array[String]) extends Partition {}
 
 /**
   * A Spark SQL [[Row]] iterator which wraps a [[KuduScanner]].

http://git-wip-us.apache.org/repos/asf/kudu/blob/9acba45a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/OperationType.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/OperationType.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/OperationType.scala
new file mode 100644
index 0000000..fd23d05
--- /dev/null
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/OperationType.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kudu.spark.kudu
+
+import org.apache.kudu.client.{KuduTable, Operation}
+
+/**
+  * OperationType enumerates the types of Kudu write operations.
+  */
+private[kudu] sealed trait OperationType {
+  def operation(table: KuduTable): Operation
+  def ignoreDuplicateRowErrors: Boolean = false
+}
+private[kudu] case object Insert extends OperationType {
+  override def operation(table: KuduTable): Operation = table.newInsert()
+}
+private[kudu] case object InsertIgnore extends OperationType {
+  override def operation(table: KuduTable): Operation = table.newInsert()
+  override def ignoreDuplicateRowErrors: Boolean = true
+}
+private[kudu] case object Update extends OperationType {
+  override def operation(table: KuduTable): Operation = table.newUpdate()
+}
+private[kudu] case object Upsert extends OperationType {
+  override def operation(table: KuduTable): Operation = table.newUpsert()
+}
+private[kudu] case object Delete extends OperationType {
+  override def operation(table: KuduTable): Operation = table.newDelete()
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/9acba45a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/package.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/package.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/package.scala
index fe28f70..6f49023 100755
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/package.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/package.scala
@@ -35,4 +35,4 @@ package object kudu {
   implicit class KuduDataFrameWriter(writer: DataFrameWriter) {
     def kudu = writer.format("org.apache.kudu.spark.kudu").save
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kudu/blob/9acba45a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
index a7dd209..2dfce76 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
@@ -124,6 +124,29 @@ class DefaultSourceTest extends FunSuite with TestContext with BeforeAndAfter {
     deleteRow(101)
   }
 
+  test("insert ignore rows") {
+    val df = sqlContext.read.options(kuduOptions).kudu
+    val baseDF = df.limit(1) // filter down to just the first row
+
+    // change the c2 string to abc and insert
+    val updateDF = baseDF.withColumn("c2_s", lit("abc"))
+    kuduContext.insertIgnoreRows(updateDF, tableName)
+
+    // change the key and insert
+    val insertDF = df.limit(1).withColumn("key", df("key").plus(100)).withColumn("c2_s", lit("def"))
+    kuduContext.insertIgnoreRows(insertDF, tableName)
+
+    // read the data back
+    val newDF = sqlContext.read.options(kuduOptions).kudu
+    val collectedUpdate = newDF.filter("key = 0").collect()
+    assertEquals("0", collectedUpdate(0).getAs[String]("c2_s"))
+    val collectedInsert = newDF.filter("key = 100").collect()
+    assertEquals("def", collectedInsert(0).getAs[String]("c2_s"))
+
+    // restore the original state of the table
+    deleteRow(100)
+  }
+
   test("upsert rows") {
     val df = sqlContext.read.options(kuduOptions).kudu
     val baseDF = df.limit(1) // filter down to just the first row