You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2022/02/15 19:27:09 UTC

[kudu] branch master updated: [java] KUDU-3350 add the support for deleteIgnoreRows

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new c7afc43  [java] KUDU-3350 add the support for deleteIgnoreRows
c7afc43 is described below

commit c7afc431861fbd3e1947d2358795cade1ab2f71e
Author: Hongjiang Zhang <ho...@ebay.com>
AuthorDate: Tue Feb 8 09:20:19 2022 +0800

    [java] KUDU-3350 add the support for deleteIgnoreRows
    
    Adds a new deleteIgnoreRows() wrapper for DELETE_IGNORE operations
    introduced with KUDU-1563(see https://github.com/apache/kudu/commit/7fbe341e51a9e4245d8b3017cecf11e393c3a22b)
    
    Change-Id: I6f89ced9ffa4a79f46661873f01c38aefb1d78d5
    Reviewed-on: http://gerrit.cloudera.org:8080/18211
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
---
 .../org/apache/kudu/spark/kudu/KuduContext.scala   | 19 +++++
 .../apache/kudu/spark/kudu/DefaultSourceTest.scala | 80 ++++++++++++++++++++++
 2 files changed, 99 insertions(+)

diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
index a5163f3..67533fd 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
@@ -330,6 +330,25 @@ class KuduContext(
   }
 
   /**
+   * Deletes the rows of a [[DataFrame]] from a Kudu table, ignoring any none-existing
+   * rows.
+   *
+   * @param data the data to delete from Kudu
+   *             note that only the key columns should be specified for deletes
+   * @param tableName The Kudu tabe to delete from
+   * @param writeOptions the Kudu write options
+   */
+  def deleteIgnoreRows(
+      data: DataFrame,
+      tableName: String,
+      writeOptions: KuduWriteOptions = new KuduWriteOptions): Unit = {
+    log.info(s"deleting rows from table '$tableName'")
+    writeRows(data, tableName, DeleteIgnore, writeOptions)
+    log.info(
+      s"deleted up to ${numDeletes.value.get(tableName)} rows from table '$tableName' using DELETE_IGNORE")
+  }
+
+  /**
    * Deletes the rows of a [[DataFrame]] from a Kudu table.
    *
    * @param data the data to delete from Kudu
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
index 9ea1818..a90565c 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
@@ -66,6 +66,86 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
   }
 
   /**
+   * A simple test to delete rows from an empty table.
+   * First delete ignore rows from the empty table.
+   * Next, insert data to the kudu table and delete all of them afterwards.
+   * Finally, delete again from the empty table through deleteIngoreRows.
+   */
+  @Test
+  def testDeleteRowsFromEmptyTable(): Unit = {
+    val origDf = sqlContext.read.options(kuduOptions).format("kudu").load
+    val tableName = "testEmptyTable"
+    if (kuduContext.tableExists(tableName)) {
+      kuduContext.deleteTable(tableName)
+    }
+    kuduContext.createTable(
+      tableName,
+      origDf.schema,
+      Seq("key"),
+      new CreateTableOptions()
+        .addHashPartitions(List("key").asJava, harness.getTabletServers.size() * 3))
+    val df = sqlContext.read.options(kuduOptions).format("kudu").load
+    // delete rows from the empty table.
+    kuduContext.deleteIgnoreRows(df, tableName)
+    // insert rows.
+    val newKuduTableOptions =
+      Map("kudu.table" -> tableName, "kudu.master" -> harness.getMasterAddressesAsString)
+    val insertsBefore = kuduContext.numInserts.value.get(tableName)
+    kuduContext.insertRows(df, tableName)
+    assertEquals(insertsBefore + df.count(), kuduContext.numInserts.value.get(tableName))
+    // delete all of the newly inserted rows.
+    kuduContext.deleteRows(df, tableName)
+    val newDf = sqlContext.read.options(newKuduTableOptions).format("kudu").load()
+    assertEquals(newDf.collectAsList().size(), 0)
+    // delete all of rows again, which is no hurt.
+    kuduContext.deleteIgnoreRows(df, tableName)
+  }
+
+  /**
+   * A simple test with two threads to delete the data from the same table.
+   * After applying deleteIgnoreRows, there should be no errors even though
+   * half of the delete operations will be applied on non-existing rows.
+   */
+  @Test
+  def testDuplicateDelete(): Unit = {
+    val totalRows = 10000
+    val origDf = sqlContext.read.options(kuduOptions).format("kudu").load
+    insertRows(table, totalRows, rowCount)
+    val tableName = "testDuplicateDelete"
+    if (kuduContext.tableExists(tableName)) {
+      kuduContext.deleteTable(tableName)
+    }
+    kuduContext.createTable(
+      tableName,
+      origDf.schema,
+      Seq("key"),
+      new CreateTableOptions()
+        .addHashPartitions(List("key").asJava, harness.getTabletServers.size() * 3)
+        .setNumReplicas(3))
+    val newKuduTableOptions =
+      Map("kudu.table" -> tableName, "kudu.master" -> harness.getMasterAddressesAsString)
+    kuduContext.insertRows(origDf, tableName)
+    val df = sqlContext.read.options(newKuduTableOptions).format("kudu").load()
+    assertEquals(df.collectAsList().size(), totalRows + rowCount)
+    val deleteThread1 = new Thread {
+      override def run: Unit = {
+        kuduContext.deleteIgnoreRows(df, tableName)
+      }
+    }
+    val deleteThread2 = new Thread {
+      override def run: Unit = {
+        kuduContext.deleteIgnoreRows(df, tableName)
+      }
+    }
+    deleteThread1.start()
+    deleteThread2.start()
+    deleteThread1.join(3000)
+    deleteThread2.join(3000)
+    val newDf = sqlContext.read.options(newKuduTableOptions).format("kudu").load()
+    assertEquals(newDf.collectAsList().size(), 0)
+  }
+
+  /**
    * A simple test to verify the legacy package reader/writer
    * syntax still works. This should be removed when the
    * deprecated `kudu` methods are removed.