You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2020/11/09 15:07:53 UTC

[kudu] branch master updated (07cdc32 -> 5faf797)

This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git.


    from 07cdc32  KUDU-1563. Add a feature flag for IGNORE operations
     new 7fbe341  KUDU-1563 Support ignore operations in kudu-spark
     new 5faf797  KUDU-1563 Use DELETE_IGNORE in KuduRestore job

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../scala/org/apache/kudu/backup/KuduRestore.scala | 13 ++++++--
 .../org/apache/kudu/backup/TestKuduBackup.scala    | 17 +++++++++-
 .../org/apache/kudu/spark/kudu/DefaultSource.scala | 13 ++++++--
 .../org/apache/kudu/spark/kudu/KuduContext.scala   | 22 +++++++++++--
 .../org/apache/kudu/spark/kudu/OperationType.scala | 15 +++++++++
 .../apache/kudu/spark/kudu/DefaultSourceTest.scala | 38 ++++++++++++++++++++++
 6 files changed, 109 insertions(+), 9 deletions(-)


[kudu] 01/02: KUDU-1563 Support ignore operations in kudu-spark

Posted by gr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 7fbe341e51a9e4245d8b3017cecf11e393c3a22b
Author: Grant Henke <gr...@apache.org>
AuthorDate: Fri Oct 30 10:32:31 2020 -0500

    KUDU-1563 Support ignore operations in kudu-spark
    
    This patch adds support for the INSERT_IGNORE, UPDATE_IGNORE,
    and DELETE_IGNORE operations into the Kudu Spark integration.
    
    It leverages `AsyncKuduClient.supportsIgnoreOperations()` to
    handle INSERT_IGNORE operations in a compatible way.
    
    Change-Id: If4b4dc0ec996a88afead0f9da0024457e568b0f4
    Reviewed-on: http://gerrit.cloudera.org:8080/16681
    Reviewed-by: Attila Bukor <ab...@apache.org>
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
---
 .../org/apache/kudu/spark/kudu/DefaultSource.scala | 13 ++++++--
 .../org/apache/kudu/spark/kudu/KuduContext.scala   | 22 +++++++++++--
 .../org/apache/kudu/spark/kudu/OperationType.scala | 15 +++++++++
 .../apache/kudu/spark/kudu/DefaultSourceTest.scala | 38 ++++++++++++++++++++++
 4 files changed, 83 insertions(+), 5 deletions(-)

diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
index 8fcfe71..2dcf335 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
@@ -203,8 +203,10 @@ class DefaultSource
 
   private def getWriteOptions(parameters: Map[String, String]): KuduWriteOptions = {
     val ignoreDuplicateRowErrors =
-    Try(parameters(IGNORE_DUPLICATE_ROW_ERRORS).toBoolean).getOrElse(false) ||
-    Try(parameters(OPERATION) == "insert-ignore").getOrElse(false)
+      parameters
+        .get(IGNORE_DUPLICATE_ROW_ERRORS)
+        .map(_.toBoolean)
+        .getOrElse(defaultIgnoreDuplicateRowErrors)
     val ignoreNull =
       parameters.get(IGNORE_NULL).map(_.toBoolean).getOrElse(defaultIgnoreNull)
     val repartition =
@@ -241,10 +243,15 @@ class DefaultSource
   private def stringToOperationType(opParam: String): OperationType = {
     opParam.toLowerCase(Locale.ENGLISH) match {
       case "insert" => Insert
-      case "insert-ignore" => Insert
+      case "insert_ignore" => InsertIgnore
+      case "insert-ignore" => InsertIgnore
       case "upsert" => Upsert
       case "update" => Update
+      case "update_ignore" => UpdateIgnore
+      case "update-ignore" => UpdateIgnore
       case "delete" => Delete
+      case "delete_ignore" => DeleteIgnore
+      case "delete-ignore" => DeleteIgnore
       case _ =>
         throw new IllegalArgumentException(s"Unsupported operation type '$opParam'")
     }
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
index de37ac1..563533e 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
@@ -91,9 +91,12 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou
   private def addForOperation(count: Long, opType: OperationType, tableName: String): Unit = {
     opType match {
       case Insert => numInserts.add((tableName, count))
+      case InsertIgnore => numInserts.add((tableName, count))
       case Upsert => numUpserts.add((tableName, count))
       case Update => numUpdates.add((tableName, count))
+      case UpdateIgnore => numUpdates.add((tableName, count))
       case Delete => numDeletes.add((tableName, count))
+      case DeleteIgnore => numDeletes.add((tableName, count))
     }
   }
 
@@ -153,6 +156,8 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou
     c
   }
 
+  @transient lazy val supportsIgnoreOperations: Boolean = syncClient.supportsIgnoreOperations()
+
   // Visible for testing.
   private[kudu] val authnCredentials: Array[Byte] = {
     Subject.doAs(KuduContext.getSubject(sc), new PrivilegedAction[Array[Byte]] {
@@ -381,15 +386,24 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou
       rdd = repartitionRows(rdd, tableName, schema, writeOptions)
     }
 
+    // If the cluster doesn't support the INSERT_IGNORE operation fallback to the old
+    // session based style.
+    var adjustedOperation = operation
+    var adjustedWriteOptions = writeOptions
+    if (operation == InsertIgnore && !supportsIgnoreOperations) {
+      adjustedOperation = Insert;
+      adjustedWriteOptions = writeOptions.copy(ignoreDuplicateRowErrors = true);
+    }
+
     // Write the rows for each Spark partition.
     rdd.foreachPartition(iterator => {
       val pendingErrors = writePartitionRows(
         iterator,
         schema,
         tableName,
-        operation,
+        adjustedOperation,
         lastPropagatedTimestamp,
-        writeOptions)
+        adjustedWriteOptions)
       if (pendingErrors.getRowErrors.nonEmpty) {
         val errors = pendingErrors.getRowErrors
         val sample = errors.take(5).map(_.getErrorStatus).mkString
@@ -467,6 +481,10 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou
     val rowConverter = new RowConverter(table.getSchema, schema, writeOptions.ignoreNull)
     val session: KuduSession = syncClient.newSession
     session.setFlushMode(FlushMode.AUTO_FLUSH_BACKGROUND)
+    if (writeOptions.ignoreDuplicateRowErrors) {
+      log.warn(
+        "kudu.ignoreDuplicateRowErrors is deprecated and slow. Use the insert_ignore operation instead.")
+    }
     session.setIgnoreAllDuplicateRows(writeOptions.ignoreDuplicateRowErrors)
     var numRows = 0
     log.info(s"applying operations of type '${opType.toString}' to table '$tableName'")
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/OperationType.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/OperationType.scala
index 63d7807..125ee19 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/OperationType.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/OperationType.scala
@@ -33,11 +33,21 @@ private[kudu] case object Insert extends OperationType {
 
   override def toString(): String = "insert"
 }
+private[kudu] case object InsertIgnore extends OperationType {
+  override def operation(table: KuduTable): Operation = table.newInsertIgnore()
+
+  override def toString(): String = "insert_ignore"
+}
 private[kudu] case object Update extends OperationType {
   override def operation(table: KuduTable): Operation = table.newUpdate()
 
   override def toString(): String = "update"
 }
+private[kudu] case object UpdateIgnore extends OperationType {
+  override def operation(table: KuduTable): Operation = table.newUpdateIgnore()
+
+  override def toString(): String = "update_ignore"
+}
 private[kudu] case object Upsert extends OperationType {
   override def operation(table: KuduTable): Operation = table.newUpsert()
 
@@ -48,3 +58,8 @@ private[kudu] case object Delete extends OperationType {
 
   override def toString(): String = "delete"
 }
+private[kudu] case object DeleteIgnore extends OperationType {
+  override def operation(table: KuduTable): Operation = table.newDeleteIgnore()
+
+  override def toString(): String = "delete_ignore"
+}
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
index e82af8b..76f2af1 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.types.StructField
 import org.apache.spark.sql.types.StructType
 import org.junit.Assert._
 import org.apache.kudu.client.CreateTableOptions
+import org.apache.kudu.test.KuduTestHarness
 import org.apache.kudu.test.RandomUtils
 import org.apache.kudu.spark.kudu.SparkListenerUtil.withJobTaskCounter
 import org.apache.kudu.test.KuduTestHarness.MasterServerConfig
@@ -262,6 +263,43 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
     val newOptions: Map[String, String] = Map(
       "kudu.table" -> tableName,
       "kudu.master" -> harness.getMasterAddressesAsString,
+      "kudu.operation" -> "insert_ignore")
+    updateDF.write.options(newOptions).mode("append").format("kudu").save
+
+    // change the key and insert
+    val insertDF = df
+      .limit(1)
+      .withColumn("key", df("key").plus(100))
+      .withColumn("c2_s", lit("def"))
+    insertDF.write.options(newOptions).mode("append").format("kudu").save
+
+    // read the data back
+    val newDF = sqlContext.read.options(kuduOptions).format("kudu").load
+    val collectedUpdate = newDF.filter("key = 0").collect()
+    assertEquals("0", collectedUpdate(0).getAs[String]("c2_s"))
+    val collectedInsert = newDF.filter("key = 100").collect()
+    assertEquals("def", collectedInsert(0).getAs[String]("c2_s"))
+
+    // restore the original state of the table
+    deleteRow(100)
+  }
+
+  /**
+   * Identical to the above test, but exercising the old session based insert ignore operations,
+   * ensuring we functionally support the same semantics.
+   * Also uses "insert-ignore" instead of "insert_ignore".
+   */
+  @Test
+  @KuduTestHarness.MasterServerConfig(flags = Array("--master_support_ignore_operations=false"))
+  def testLegacyInsertIgnoreRowsWriteOption() {
+    val df = sqlContext.read.options(kuduOptions).format("kudu").load
+    val baseDF = df.limit(1) // filter down to just the first row
+
+    // change the c2 string to abc and insert
+    val updateDF = baseDF.withColumn("c2_s", lit("abc"))
+    val newOptions: Map[String, String] = Map(
+      "kudu.table" -> tableName,
+      "kudu.master" -> harness.getMasterAddressesAsString,
       "kudu.operation" -> "insert-ignore")
     updateDF.write.options(newOptions).mode("append").format("kudu").save
 


[kudu] 02/02: KUDU-1563 Use DELETE_IGNORE in KuduRestore job

Posted by gr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 5faf79726605e3a5d8b33bd18ce94c8c09241d3f
Author: Grant Henke <gr...@apache.org>
AuthorDate: Fri Oct 30 11:35:04 2020 -0500

    KUDU-1563 Use DELETE_IGNORE in KuduRestore job
    
    This patch changes the KuduRestore job to use DELETE_IGNORE
    operations instead of DELETE when the cluster supports ignore
    operations.
    
    `session.setIgnoreAllNotFoundRows(true)` is retained to support
    falling back to DELETE operations for backward compatibility.
    
    Change-Id: Ib6f6d5a31be77630e79ff1566e796eb5183a5d22
    Reviewed-on: http://gerrit.cloudera.org:8080/16683
    Reviewed-by: Attila Bukor <ab...@apache.org>
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
    Tested-by: Grant Henke <gr...@apache.org>
---
 .../main/scala/org/apache/kudu/backup/KuduRestore.scala | 13 ++++++++++---
 .../scala/org/apache/kudu/backup/TestKuduBackup.scala   | 17 ++++++++++++++++-
 2 files changed, 26 insertions(+), 4 deletions(-)

diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
index 6549cd6..ef01a8d 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
@@ -91,6 +91,7 @@ object KuduRestore {
         .load(backup.path.toString)
         // Default the the row action column with a value of "UPSERT" so that the
         // rows from a full backup, which don't have a row action, are upserted.
+        // TODO(ghenke): Consider using INSERT_IGNORE for full backups.
         .na
         .fill(RowAction.UPSERT.getValue, Seq(rowActionCol))
 
@@ -106,8 +107,8 @@ object KuduRestore {
         val session = context.syncClient.newSession
         session.setFlushMode(FlushMode.AUTO_FLUSH_BACKGROUND)
         // In the case of task retries we need to ignore NotFound errors for deleted rows.
-        // TODO(KUDU-1563): Implement server side ignore capabilities to improve performance
-        //  and reliability.
+        // This can't occur if DELETE_IGNORE is used, but still needs to be set in the case
+        // DELETE is used for backwards compatibility.
         session.setIgnoreAllNotFoundRows(true)
         try for (internalRow <- internalRows) {
           // Convert the InternalRows to Rows.
@@ -120,7 +121,13 @@ object KuduRestore {
           // Generate an operation based on the row action.
           val operation = rowAction match {
             case RowAction.UPSERT => table.newUpsert()
-            case RowAction.DELETE => table.newDelete()
+            case RowAction.DELETE => {
+              if (context.supportsIgnoreOperations) {
+                table.newDeleteIgnore()
+              } else {
+                table.newDelete()
+              }
+            }
             case _ => throw new IllegalStateException(s"Unsupported RowAction: $rowAction")
           }
           // Convert the Spark row to a partial row and set it on the operation.
diff --git a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
index da8fe58..817875e 100644
--- a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
+++ b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
@@ -32,8 +32,9 @@ import org.apache.kudu.spark.kudu.SparkListenerUtil.withJobDescriptionCollector
 import org.apache.kudu.spark.kudu.SparkListenerUtil.withJobTaskCounter
 import org.apache.kudu.spark.kudu._
 import org.apache.kudu.test.CapturingLogAppender
-import org.apache.kudu.test.KuduTestHarness.TabletServerConfig
+import org.apache.kudu.test.KuduTestHarness
 import org.apache.kudu.test.RandomUtils
+import org.apache.kudu.test.KuduTestHarness.TabletServerConfig
 import org.apache.kudu.util.DataGenerator.DataGeneratorBuilder
 import org.apache.kudu.util.HybridTimeUtil
 import org.apache.kudu.util.SchemaGenerator.SchemaGeneratorBuilder
@@ -591,6 +592,20 @@ class TestKuduBackup extends KuduTestSuite {
 
   @Test
   def testDeleteIgnore(): Unit = {
+    doDeleteIgnoreTest()
+  }
+
+  /**
+   * Identical to the above test, but exercising the old session based delete ignore operations,
+   * ensuring we functionally support the same semantics.
+   */
+  @Test
+  @KuduTestHarness.MasterServerConfig(flags = Array("--master_support_ignore_operations=false"))
+  def testLegacyDeleteIgnore(): Unit = {
+    doDeleteIgnoreTest()
+  }
+
+  def doDeleteIgnoreTest(): Unit = {
     insertRows(table, 100) // Insert data into the default test table.
 
     // Run and validate initial backup.