You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2022/02/15 19:27:09 UTC
[kudu] branch master updated: [java] KUDU-3350 add the support for deleteIgnoreRows
This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new c7afc43 [java] KUDU-3350 add the support for deleteIgnoreRows
c7afc43 is described below
commit c7afc431861fbd3e1947d2358795cade1ab2f71e
Author: Hongjiang Zhang <ho...@ebay.com>
AuthorDate: Tue Feb 8 09:20:19 2022 +0800
[java] KUDU-3350 add the support for deleteIgnoreRows
Adds a new deleteIgnoreRows() wrapper for DELETE_IGNORE operations
introduced with KUDU-1563(see https://github.com/apache/kudu/commit/7fbe341e51a9e4245d8b3017cecf11e393c3a22b)
Change-Id: I6f89ced9ffa4a79f46661873f01c38aefb1d78d5
Reviewed-on: http://gerrit.cloudera.org:8080/18211
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <as...@cloudera.com>
---
.../org/apache/kudu/spark/kudu/KuduContext.scala | 19 +++++
.../apache/kudu/spark/kudu/DefaultSourceTest.scala | 80 ++++++++++++++++++++++
2 files changed, 99 insertions(+)
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
index a5163f3..67533fd 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
@@ -330,6 +330,25 @@ class KuduContext(
}
/**
+ * Deletes the rows of a [[DataFrame]] from a Kudu table, ignoring any none-existing
+ * rows.
+ *
+ * @param data the data to delete from Kudu
+ * note that only the key columns should be specified for deletes
+ * @param tableName The Kudu tabe to delete from
+ * @param writeOptions the Kudu write options
+ */
+ def deleteIgnoreRows(
+ data: DataFrame,
+ tableName: String,
+ writeOptions: KuduWriteOptions = new KuduWriteOptions): Unit = {
+ log.info(s"deleting rows from table '$tableName'")
+ writeRows(data, tableName, DeleteIgnore, writeOptions)
+ log.info(
+ s"deleted up to ${numDeletes.value.get(tableName)} rows from table '$tableName' using DELETE_IGNORE")
+ }
+
+ /**
* Deletes the rows of a [[DataFrame]] from a Kudu table.
*
* @param data the data to delete from Kudu
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
index 9ea1818..a90565c 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
@@ -66,6 +66,86 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
}
/**
+ * A simple test to delete rows from an empty table.
+ * First delete ignore rows from the empty table.
+ * Next, insert data to the kudu table and delete all of them afterwards.
+ * Finally, delete again from the empty table through deleteIngoreRows.
+ */
+ @Test
+ def testDeleteRowsFromEmptyTable(): Unit = {
+ val origDf = sqlContext.read.options(kuduOptions).format("kudu").load
+ val tableName = "testEmptyTable"
+ if (kuduContext.tableExists(tableName)) {
+ kuduContext.deleteTable(tableName)
+ }
+ kuduContext.createTable(
+ tableName,
+ origDf.schema,
+ Seq("key"),
+ new CreateTableOptions()
+ .addHashPartitions(List("key").asJava, harness.getTabletServers.size() * 3))
+ val df = sqlContext.read.options(kuduOptions).format("kudu").load
+ // delete rows from the empty table.
+ kuduContext.deleteIgnoreRows(df, tableName)
+ // insert rows.
+ val newKuduTableOptions =
+ Map("kudu.table" -> tableName, "kudu.master" -> harness.getMasterAddressesAsString)
+ val insertsBefore = kuduContext.numInserts.value.get(tableName)
+ kuduContext.insertRows(df, tableName)
+ assertEquals(insertsBefore + df.count(), kuduContext.numInserts.value.get(tableName))
+ // delete all of the newly inserted rows.
+ kuduContext.deleteRows(df, tableName)
+ val newDf = sqlContext.read.options(newKuduTableOptions).format("kudu").load()
+ assertEquals(newDf.collectAsList().size(), 0)
+ // delete all of rows again, which is no hurt.
+ kuduContext.deleteIgnoreRows(df, tableName)
+ }
+
+ /**
+ * A simple test with two threads to delete the data from the same table.
+ * After applying deleteIgnoreRows, there should be no errors even though
+ * half of the delete operations will be applied on non-existing rows.
+ */
+ @Test
+ def testDuplicateDelete(): Unit = {
+ val totalRows = 10000
+ val origDf = sqlContext.read.options(kuduOptions).format("kudu").load
+ insertRows(table, totalRows, rowCount)
+ val tableName = "testDuplicateDelete"
+ if (kuduContext.tableExists(tableName)) {
+ kuduContext.deleteTable(tableName)
+ }
+ kuduContext.createTable(
+ tableName,
+ origDf.schema,
+ Seq("key"),
+ new CreateTableOptions()
+ .addHashPartitions(List("key").asJava, harness.getTabletServers.size() * 3)
+ .setNumReplicas(3))
+ val newKuduTableOptions =
+ Map("kudu.table" -> tableName, "kudu.master" -> harness.getMasterAddressesAsString)
+ kuduContext.insertRows(origDf, tableName)
+ val df = sqlContext.read.options(newKuduTableOptions).format("kudu").load()
+ assertEquals(df.collectAsList().size(), totalRows + rowCount)
+ val deleteThread1 = new Thread {
+ override def run: Unit = {
+ kuduContext.deleteIgnoreRows(df, tableName)
+ }
+ }
+ val deleteThread2 = new Thread {
+ override def run: Unit = {
+ kuduContext.deleteIgnoreRows(df, tableName)
+ }
+ }
+ deleteThread1.start()
+ deleteThread2.start()
+ deleteThread1.join(3000)
+ deleteThread2.join(3000)
+ val newDf = sqlContext.read.options(newKuduTableOptions).format("kudu").load()
+ assertEquals(newDf.collectAsList().size(), 0)
+ }
+
+ /**
* A simple test to verify the legacy package reader/writer
* syntax still works. This should be removed when the
* deprecated `kudu` methods are removed.