You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2020/11/09 15:07:53 UTC
[kudu] branch master updated (07cdc32 -> 5faf797)
This is an automated email from the ASF dual-hosted git repository.
granthenke pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git.
from 07cdc32 KUDU-1563. Add a feature flag for IGNORE operations
new 7fbe341 KUDU-1563 Support ignore operations in kudu-spark
new 5faf797 KUDU-1563 Use DELETE_IGNORE in KuduRestore job
The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
.../scala/org/apache/kudu/backup/KuduRestore.scala | 13 ++++++--
.../org/apache/kudu/backup/TestKuduBackup.scala | 17 +++++++++-
.../org/apache/kudu/spark/kudu/DefaultSource.scala | 13 ++++++--
.../org/apache/kudu/spark/kudu/KuduContext.scala | 22 +++++++++++--
.../org/apache/kudu/spark/kudu/OperationType.scala | 15 +++++++++
.../apache/kudu/spark/kudu/DefaultSourceTest.scala | 38 ++++++++++++++++++++++
6 files changed, 109 insertions(+), 9 deletions(-)
[kudu] 01/02: KUDU-1563 Support ignore operations in kudu-spark
Posted by gr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 7fbe341e51a9e4245d8b3017cecf11e393c3a22b
Author: Grant Henke <gr...@apache.org>
AuthorDate: Fri Oct 30 10:32:31 2020 -0500
KUDU-1563 Support ignore operations in kudu-spark
This patch adds support for the INSERT_IGNORE, UPDATE_IGNORE,
and DELETE_IGNORE operations into the Kudu Spark integration.
It leverages `AsyncKuduClient.supportsIgnoreOperations()` to
handle INSERT_IGNORE operations in a compatible way.
Change-Id: If4b4dc0ec996a88afead0f9da0024457e568b0f4
Reviewed-on: http://gerrit.cloudera.org:8080/16681
Reviewed-by: Attila Bukor <ab...@apache.org>
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <as...@cloudera.com>
---
.../org/apache/kudu/spark/kudu/DefaultSource.scala | 13 ++++++--
.../org/apache/kudu/spark/kudu/KuduContext.scala | 22 +++++++++++--
.../org/apache/kudu/spark/kudu/OperationType.scala | 15 +++++++++
.../apache/kudu/spark/kudu/DefaultSourceTest.scala | 38 ++++++++++++++++++++++
4 files changed, 83 insertions(+), 5 deletions(-)
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
index 8fcfe71..2dcf335 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
@@ -203,8 +203,10 @@ class DefaultSource
private def getWriteOptions(parameters: Map[String, String]): KuduWriteOptions = {
val ignoreDuplicateRowErrors =
- Try(parameters(IGNORE_DUPLICATE_ROW_ERRORS).toBoolean).getOrElse(false) ||
- Try(parameters(OPERATION) == "insert-ignore").getOrElse(false)
+ parameters
+ .get(IGNORE_DUPLICATE_ROW_ERRORS)
+ .map(_.toBoolean)
+ .getOrElse(defaultIgnoreDuplicateRowErrors)
val ignoreNull =
parameters.get(IGNORE_NULL).map(_.toBoolean).getOrElse(defaultIgnoreNull)
val repartition =
@@ -241,10 +243,15 @@ class DefaultSource
private def stringToOperationType(opParam: String): OperationType = {
opParam.toLowerCase(Locale.ENGLISH) match {
case "insert" => Insert
- case "insert-ignore" => Insert
+ case "insert_ignore" => InsertIgnore
+ case "insert-ignore" => InsertIgnore
case "upsert" => Upsert
case "update" => Update
+ case "update_ignore" => UpdateIgnore
+ case "update-ignore" => UpdateIgnore
case "delete" => Delete
+ case "delete_ignore" => DeleteIgnore
+ case "delete-ignore" => DeleteIgnore
case _ =>
throw new IllegalArgumentException(s"Unsupported operation type '$opParam'")
}
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
index de37ac1..563533e 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
@@ -91,9 +91,12 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou
private def addForOperation(count: Long, opType: OperationType, tableName: String): Unit = {
opType match {
case Insert => numInserts.add((tableName, count))
+ case InsertIgnore => numInserts.add((tableName, count))
case Upsert => numUpserts.add((tableName, count))
case Update => numUpdates.add((tableName, count))
+ case UpdateIgnore => numUpdates.add((tableName, count))
case Delete => numDeletes.add((tableName, count))
+ case DeleteIgnore => numDeletes.add((tableName, count))
}
}
@@ -153,6 +156,8 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou
c
}
+ @transient lazy val supportsIgnoreOperations: Boolean = syncClient.supportsIgnoreOperations()
+
// Visible for testing.
private[kudu] val authnCredentials: Array[Byte] = {
Subject.doAs(KuduContext.getSubject(sc), new PrivilegedAction[Array[Byte]] {
@@ -381,15 +386,24 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou
rdd = repartitionRows(rdd, tableName, schema, writeOptions)
}
+ // If the cluster doesn't support the INSERT_IGNORE operation fallback to the old
+ // session based style.
+ var adjustedOperation = operation
+ var adjustedWriteOptions = writeOptions
+ if (operation == InsertIgnore && !supportsIgnoreOperations) {
+ adjustedOperation = Insert;
+ adjustedWriteOptions = writeOptions.copy(ignoreDuplicateRowErrors = true);
+ }
+
// Write the rows for each Spark partition.
rdd.foreachPartition(iterator => {
val pendingErrors = writePartitionRows(
iterator,
schema,
tableName,
- operation,
+ adjustedOperation,
lastPropagatedTimestamp,
- writeOptions)
+ adjustedWriteOptions)
if (pendingErrors.getRowErrors.nonEmpty) {
val errors = pendingErrors.getRowErrors
val sample = errors.take(5).map(_.getErrorStatus).mkString
@@ -467,6 +481,10 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou
val rowConverter = new RowConverter(table.getSchema, schema, writeOptions.ignoreNull)
val session: KuduSession = syncClient.newSession
session.setFlushMode(FlushMode.AUTO_FLUSH_BACKGROUND)
+ if (writeOptions.ignoreDuplicateRowErrors) {
+ log.warn(
+ "kudu.ignoreDuplicateRowErrors is deprecated and slow. Use the insert_ignore operation instead.")
+ }
session.setIgnoreAllDuplicateRows(writeOptions.ignoreDuplicateRowErrors)
var numRows = 0
log.info(s"applying operations of type '${opType.toString}' to table '$tableName'")
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/OperationType.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/OperationType.scala
index 63d7807..125ee19 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/OperationType.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/OperationType.scala
@@ -33,11 +33,21 @@ private[kudu] case object Insert extends OperationType {
override def toString(): String = "insert"
}
+private[kudu] case object InsertIgnore extends OperationType {
+ override def operation(table: KuduTable): Operation = table.newInsertIgnore()
+
+ override def toString(): String = "insert_ignore"
+}
private[kudu] case object Update extends OperationType {
override def operation(table: KuduTable): Operation = table.newUpdate()
override def toString(): String = "update"
}
+private[kudu] case object UpdateIgnore extends OperationType {
+ override def operation(table: KuduTable): Operation = table.newUpdateIgnore()
+
+ override def toString(): String = "update_ignore"
+}
private[kudu] case object Upsert extends OperationType {
override def operation(table: KuduTable): Operation = table.newUpsert()
@@ -48,3 +58,8 @@ private[kudu] case object Delete extends OperationType {
override def toString(): String = "delete"
}
+private[kudu] case object DeleteIgnore extends OperationType {
+ override def operation(table: KuduTable): Operation = table.newDeleteIgnore()
+
+ override def toString(): String = "delete_ignore"
+}
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
index e82af8b..76f2af1 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType
import org.junit.Assert._
import org.apache.kudu.client.CreateTableOptions
+import org.apache.kudu.test.KuduTestHarness
import org.apache.kudu.test.RandomUtils
import org.apache.kudu.spark.kudu.SparkListenerUtil.withJobTaskCounter
import org.apache.kudu.test.KuduTestHarness.MasterServerConfig
@@ -262,6 +263,43 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
val newOptions: Map[String, String] = Map(
"kudu.table" -> tableName,
"kudu.master" -> harness.getMasterAddressesAsString,
+ "kudu.operation" -> "insert_ignore")
+ updateDF.write.options(newOptions).mode("append").format("kudu").save
+
+ // change the key and insert
+ val insertDF = df
+ .limit(1)
+ .withColumn("key", df("key").plus(100))
+ .withColumn("c2_s", lit("def"))
+ insertDF.write.options(newOptions).mode("append").format("kudu").save
+
+ // read the data back
+ val newDF = sqlContext.read.options(kuduOptions).format("kudu").load
+ val collectedUpdate = newDF.filter("key = 0").collect()
+ assertEquals("0", collectedUpdate(0).getAs[String]("c2_s"))
+ val collectedInsert = newDF.filter("key = 100").collect()
+ assertEquals("def", collectedInsert(0).getAs[String]("c2_s"))
+
+ // restore the original state of the table
+ deleteRow(100)
+ }
+
+ /**
+ * Identical to the above test, but exercising the old session based insert ignore operations,
+ * ensuring we functionally support the same semantics.
+ * Also uses "insert-ignore" instead of "insert_ignore".
+ */
+ @Test
+ @KuduTestHarness.MasterServerConfig(flags = Array("--master_support_ignore_operations=false"))
+ def testLegacyInsertIgnoreRowsWriteOption() {
+ val df = sqlContext.read.options(kuduOptions).format("kudu").load
+ val baseDF = df.limit(1) // filter down to just the first row
+
+ // change the c2 string to abc and insert
+ val updateDF = baseDF.withColumn("c2_s", lit("abc"))
+ val newOptions: Map[String, String] = Map(
+ "kudu.table" -> tableName,
+ "kudu.master" -> harness.getMasterAddressesAsString,
"kudu.operation" -> "insert-ignore")
updateDF.write.options(newOptions).mode("append").format("kudu").save
[kudu] 02/02: KUDU-1563 Use DELETE_IGNORE in KuduRestore job
Posted by gr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 5faf79726605e3a5d8b33bd18ce94c8c09241d3f
Author: Grant Henke <gr...@apache.org>
AuthorDate: Fri Oct 30 11:35:04 2020 -0500
KUDU-1563 Use DELETE_IGNORE in KuduRestore job
This patch changes the KuduRestore job to use DELETE_IGNORE
operations instead of DELETE when the cluster supports ignore
operations.
`session.setIgnoreAllNotFoundRows(true)` is retained to support
falling back to DELETE operations for backward compatibility.
Change-Id: Ib6f6d5a31be77630e79ff1566e796eb5183a5d22
Reviewed-on: http://gerrit.cloudera.org:8080/16683
Reviewed-by: Attila Bukor <ab...@apache.org>
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Tested-by: Grant Henke <gr...@apache.org>
---
.../main/scala/org/apache/kudu/backup/KuduRestore.scala | 13 ++++++++++---
.../scala/org/apache/kudu/backup/TestKuduBackup.scala | 17 ++++++++++++++++-
2 files changed, 26 insertions(+), 4 deletions(-)
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
index 6549cd6..ef01a8d 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
@@ -91,6 +91,7 @@ object KuduRestore {
.load(backup.path.toString)
// Default the the row action column with a value of "UPSERT" so that the
// rows from a full backup, which don't have a row action, are upserted.
+ // TODO(ghenke): Consider using INSERT_IGNORE for full backups.
.na
.fill(RowAction.UPSERT.getValue, Seq(rowActionCol))
@@ -106,8 +107,8 @@ object KuduRestore {
val session = context.syncClient.newSession
session.setFlushMode(FlushMode.AUTO_FLUSH_BACKGROUND)
// In the case of task retries we need to ignore NotFound errors for deleted rows.
- // TODO(KUDU-1563): Implement server side ignore capabilities to improve performance
- // and reliability.
+ // This can't occur if DELETE_IGNORE is used, but still needs to be set in the case
+ // DELETE is used for backwards compatibility.
session.setIgnoreAllNotFoundRows(true)
try for (internalRow <- internalRows) {
// Convert the InternalRows to Rows.
@@ -120,7 +121,13 @@ object KuduRestore {
// Generate an operation based on the row action.
val operation = rowAction match {
case RowAction.UPSERT => table.newUpsert()
- case RowAction.DELETE => table.newDelete()
+ case RowAction.DELETE => {
+ if (context.supportsIgnoreOperations) {
+ table.newDeleteIgnore()
+ } else {
+ table.newDelete()
+ }
+ }
case _ => throw new IllegalStateException(s"Unsupported RowAction: $rowAction")
}
// Convert the Spark row to a partial row and set it on the operation.
diff --git a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
index da8fe58..817875e 100644
--- a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
+++ b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
@@ -32,8 +32,9 @@ import org.apache.kudu.spark.kudu.SparkListenerUtil.withJobDescriptionCollector
import org.apache.kudu.spark.kudu.SparkListenerUtil.withJobTaskCounter
import org.apache.kudu.spark.kudu._
import org.apache.kudu.test.CapturingLogAppender
-import org.apache.kudu.test.KuduTestHarness.TabletServerConfig
+import org.apache.kudu.test.KuduTestHarness
import org.apache.kudu.test.RandomUtils
+import org.apache.kudu.test.KuduTestHarness.TabletServerConfig
import org.apache.kudu.util.DataGenerator.DataGeneratorBuilder
import org.apache.kudu.util.HybridTimeUtil
import org.apache.kudu.util.SchemaGenerator.SchemaGeneratorBuilder
@@ -591,6 +592,20 @@ class TestKuduBackup extends KuduTestSuite {
@Test
def testDeleteIgnore(): Unit = {
+ doDeleteIgnoreTest()
+ }
+
+ /**
+ * Identical to the above test, but exercising the old session based delete ignore operations,
+ * ensuring we functionally support the same semantics.
+ */
+ @Test
+ @KuduTestHarness.MasterServerConfig(flags = Array("--master_support_ignore_operations=false"))
+ def testLegacyDeleteIgnore(): Unit = {
+ doDeleteIgnoreTest()
+ }
+
+ def doDeleteIgnoreTest(): Unit = {
insertRows(table, 100) // Insert data into the default test table.
// Run and validate initial backup.