You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2019/01/24 01:11:58 UTC
[kudu] 01/05: [spark] Add some logging to trace KuduContext
operations
This is an automated email from the ASF dual-hosted git repository.
adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
commit bc2369ea50f5b4b4cc6371444871b44f04ed62d1
Author: Will Berkeley <wd...@gmail.org>
AuthorDate: Tue Jan 22 14:39:08 2019 -0800
[spark] Add some logging to trace KuduContext operations
This patch adds some logging to help track how long an entire
KuduContext operation takes and also how long each part takes on each
executor. This information has been sorely lacking in some cases where
Spark's laziness makes attributing slowness to Kudu (vs other components
of the Spark job) very difficult.
Unfortunately, it's not as straightforward to add this sort of logging
to reading from Kudu (KuduRDD) because Spark may lazily read batches
from Kudu, and batches may be small enough that logging for each batch
is so verbose that it is not useful.
I tested this patch manually on a 3-node cluster and confirmed I saw the
expected log messages on the driver and on the executors, e.g.
19/01/22 15:18:13 INFO kudu.KuduContext: applying operations of type 'insert' to table 'impala::default.aaa'
19/01/22 15:18:13 INFO kudu.KuduContext: applied 1 operations of type 'insert' to table 'impala::default.aaa'
Change-Id: I6741f2584c1bc6b229d10d37297515474318f94c
Reviewed-on: http://gerrit.cloudera.org:8080/12252
Reviewed-by: Grant Henke <gr...@apache.org>
Tested-by: Will Berkeley <wd...@gmail.com>
---
.../org/apache/kudu/spark/kudu/KuduContext.scala | 19 ++++++++++++++++---
.../org/apache/kudu/spark/kudu/OperationType.scala | 10 ++++++++++
2 files changed, 26 insertions(+), 3 deletions(-)
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
index d6295f9..461886c 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
@@ -62,6 +62,7 @@ import org.apache.kudu.Type
@InterfaceStability.Evolving
class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeoutMs: Option[Long])
extends Serializable {
+ val log: Logger = LoggerFactory.getLogger(getClass)
def this(kuduMaster: String, sc: SparkContext) = this(kuduMaster, sc, None)
@@ -242,7 +243,9 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou
data: DataFrame,
tableName: String,
writeOptions: KuduWriteOptions = new KuduWriteOptions): Unit = {
+ log.info(s"inserting into table '$tableName'")
writeRows(data, tableName, Insert, writeOptions)
+ log.info(s"inserted ${numInserts.value} rows into table '$tableName'")
}
/**
@@ -261,7 +264,9 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou
"Use KuduContext.insertRows(data, tableName, new KuduWriteOptions(ignoreDuplicateRowErrors = true))")
def insertIgnoreRows(data: DataFrame, tableName: String): Unit = {
val writeOptions = KuduWriteOptions(ignoreDuplicateRowErrors = true)
+ log.info(s"inserting into table '$tableName'")
writeRows(data, tableName, Insert, writeOptions)
+ log.info(s"inserted ${numInserts.value} rows into table '$tableName'")
}
/**
@@ -275,7 +280,9 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou
data: DataFrame,
tableName: String,
writeOptions: KuduWriteOptions = new KuduWriteOptions): Unit = {
+ log.info(s"upserting into table '$tableName'")
writeRows(data, tableName, Upsert, writeOptions)
+ log.info(s"upserted ${numUpserts.value} rows into table '$tableName'")
}
/**
@@ -289,7 +296,9 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou
data: DataFrame,
tableName: String,
writeOptions: KuduWriteOptions = new KuduWriteOptions): Unit = {
+ log.info(s"updating rows in table '$tableName'")
writeRows(data, tableName, Update, writeOptions)
+ log.info(s"updated ${numUpdates.value} rows in table '$tableName'")
}
/**
@@ -304,7 +313,9 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou
data: DataFrame,
tableName: String,
writeOptions: KuduWriteOptions = new KuduWriteOptions): Unit = {
+ log.info(s"deleting rows from table '$tableName'")
writeRows(data, tableName, Delete, writeOptions)
+ log.info(s"deleted ${numDeletes.value} rows from table '$tableName'")
}
private[kudu] def writeRows(
@@ -337,7 +348,7 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou
rows: Iterator[InternalRow],
schema: StructType,
tableName: String,
- operationType: OperationType,
+ opType: OperationType,
lastPropagatedTimestamp: Long,
writeOptions: KuduWriteOptions): RowErrorsAndOverflowStatus = {
// Since each executor has its own KuduClient, update executor's propagated timestamp
@@ -353,10 +364,11 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou
session.setIgnoreAllDuplicateRows(writeOptions.ignoreDuplicateRowErrors)
val typeConverter = CatalystTypeConverters.createToScalaConverter(schema)
var numRows = 0
+ log.info(s"applying operations of type '${opType.toString}' to table '$tableName'")
try {
for (internalRow <- rows) {
val row = typeConverter(internalRow).asInstanceOf[Row]
- val operation = operationType.operation(table)
+ val operation = opType.operation(table)
for ((sparkIdx, kuduIdx) <- indices) {
if (row.isNullAt(sparkIdx)) {
if (table.getSchema.getColumnByIndex(kuduIdx).isKey) {
@@ -404,7 +416,8 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou
// Update timestampAccumulator with the client's last propagated
// timestamp on each executor.
timestampAccumulator.add(syncClient.getLastPropagatedTimestamp)
- addForOperation(numRows, operationType)
+ addForOperation(numRows, opType)
+ log.info(s"applied $numRows operations of type '${opType.toString()}' to table '$tableName'")
}
session.getPendingErrors
}
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/OperationType.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/OperationType.scala
index 143ea57..5312305 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/OperationType.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/OperationType.scala
@@ -25,16 +25,26 @@ import org.apache.kudu.client.Operation
*/
private[kudu] sealed trait OperationType {
def operation(table: KuduTable): Operation
+
+ def toString(): String
}
private[kudu] case object Insert extends OperationType {
override def operation(table: KuduTable): Operation = table.newInsert()
+
+ override def toString(): String = "insert"
}
private[kudu] case object Update extends OperationType {
override def operation(table: KuduTable): Operation = table.newUpdate()
+
+ override def toString(): String = "update"
}
private[kudu] case object Upsert extends OperationType {
override def operation(table: KuduTable): Operation = table.newUpsert()
+
+ override def toString(): String = "upsert"
}
private[kudu] case object Delete extends OperationType {
override def operation(table: KuduTable): Operation = table.newDelete()
+
+ override def toString(): String = "delete"
}