You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2019/01/24 01:11:58 UTC

[kudu] 01/05: [spark] Add some logging to trace KuduContext operations

This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit bc2369ea50f5b4b4cc6371444871b44f04ed62d1
Author: Will Berkeley <wd...@gmail.org>
AuthorDate: Tue Jan 22 14:39:08 2019 -0800

    [spark] Add some logging to trace KuduContext operations
    
    This patch adds some logging to help track how long an entire
    KuduContext operation takes and also how long each part takes on each
    executor. This information has been sorely lacking in some cases where
    Spark's laziness makes attributing slowness to Kudu (vs other components
    of the Spark job) very difficult.
    
    Unfortunately, it's not as straightforward to add this sort of logging
    to reading from Kudu (KuduRDD) because Spark may lazily read batches
    from Kudu, and batches may be small enough that logging for each batch
    is so verbose that it is not useful.
    
    I tested this patch manually on a 3-node cluster and confirmed I saw the
    expected log messages on the driver and on the executors, e.g.
    
    19/01/22 15:18:13 INFO kudu.KuduContext: applying operations of type 'insert' to table 'impala::default.aaa'
    19/01/22 15:18:13 INFO kudu.KuduContext: applied 1 operations of type 'insert' to table 'impala::default.aaa'
    
    Change-Id: I6741f2584c1bc6b229d10d37297515474318f94c
    Reviewed-on: http://gerrit.cloudera.org:8080/12252
    Reviewed-by: Grant Henke <gr...@apache.org>
    Tested-by: Will Berkeley <wd...@gmail.com>
---
 .../org/apache/kudu/spark/kudu/KuduContext.scala      | 19 ++++++++++++++++---
 .../org/apache/kudu/spark/kudu/OperationType.scala    | 10 ++++++++++
 2 files changed, 26 insertions(+), 3 deletions(-)

diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
index d6295f9..461886c 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
@@ -62,6 +62,7 @@ import org.apache.kudu.Type
 @InterfaceStability.Evolving
 class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeoutMs: Option[Long])
     extends Serializable {
+  val log: Logger = LoggerFactory.getLogger(getClass)
 
   def this(kuduMaster: String, sc: SparkContext) = this(kuduMaster, sc, None)
 
@@ -242,7 +243,9 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou
       data: DataFrame,
       tableName: String,
       writeOptions: KuduWriteOptions = new KuduWriteOptions): Unit = {
+    log.info(s"inserting into table '$tableName'")
     writeRows(data, tableName, Insert, writeOptions)
+    log.info(s"inserted ${numInserts.value} rows into table '$tableName'")
   }
 
   /**
@@ -261,7 +264,9 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou
     "Use KuduContext.insertRows(data, tableName, new KuduWriteOptions(ignoreDuplicateRowErrors = true))")
   def insertIgnoreRows(data: DataFrame, tableName: String): Unit = {
     val writeOptions = KuduWriteOptions(ignoreDuplicateRowErrors = true)
+    log.info(s"inserting into table '$tableName'")
     writeRows(data, tableName, Insert, writeOptions)
+    log.info(s"inserted ${numInserts.value} rows into table '$tableName'")
   }
 
   /**
@@ -275,7 +280,9 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou
       data: DataFrame,
       tableName: String,
       writeOptions: KuduWriteOptions = new KuduWriteOptions): Unit = {
+    log.info(s"upserting into table '$tableName'")
     writeRows(data, tableName, Upsert, writeOptions)
+    log.info(s"upserted ${numUpserts.value} rows into table '$tableName'")
   }
 
   /**
@@ -289,7 +296,9 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou
       data: DataFrame,
       tableName: String,
       writeOptions: KuduWriteOptions = new KuduWriteOptions): Unit = {
+    log.info(s"updating rows in table '$tableName'")
     writeRows(data, tableName, Update, writeOptions)
+    log.info(s"updated ${numUpdates.value} rows in table '$tableName'")
   }
 
   /**
@@ -304,7 +313,9 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou
       data: DataFrame,
       tableName: String,
       writeOptions: KuduWriteOptions = new KuduWriteOptions): Unit = {
+    log.info(s"deleting rows from table '$tableName'")
     writeRows(data, tableName, Delete, writeOptions)
+    log.info(s"deleted ${numDeletes.value} rows from table '$tableName'")
   }
 
   private[kudu] def writeRows(
@@ -337,7 +348,7 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou
       rows: Iterator[InternalRow],
       schema: StructType,
       tableName: String,
-      operationType: OperationType,
+      opType: OperationType,
       lastPropagatedTimestamp: Long,
       writeOptions: KuduWriteOptions): RowErrorsAndOverflowStatus = {
     // Since each executor has its own KuduClient, update executor's propagated timestamp
@@ -353,10 +364,11 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou
     session.setIgnoreAllDuplicateRows(writeOptions.ignoreDuplicateRowErrors)
     val typeConverter = CatalystTypeConverters.createToScalaConverter(schema)
     var numRows = 0
+    log.info(s"applying operations of type '${opType.toString}' to table '$tableName'")
     try {
       for (internalRow <- rows) {
         val row = typeConverter(internalRow).asInstanceOf[Row]
-        val operation = operationType.operation(table)
+        val operation = opType.operation(table)
         for ((sparkIdx, kuduIdx) <- indices) {
           if (row.isNullAt(sparkIdx)) {
             if (table.getSchema.getColumnByIndex(kuduIdx).isKey) {
@@ -404,7 +416,8 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou
       // Update timestampAccumulator with the client's last propagated
       // timestamp on each executor.
       timestampAccumulator.add(syncClient.getLastPropagatedTimestamp)
-      addForOperation(numRows, operationType)
+      addForOperation(numRows, opType)
+      log.info(s"applied $numRows operations of type '${opType.toString()}' to table '$tableName'")
     }
     session.getPendingErrors
   }
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/OperationType.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/OperationType.scala
index 143ea57..5312305 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/OperationType.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/OperationType.scala
@@ -25,16 +25,26 @@ import org.apache.kudu.client.Operation
  */
 private[kudu] sealed trait OperationType {
   def operation(table: KuduTable): Operation
+
+  def toString(): String
 }
 private[kudu] case object Insert extends OperationType {
   override def operation(table: KuduTable): Operation = table.newInsert()
+
+  override def toString(): String = "insert"
 }
 private[kudu] case object Update extends OperationType {
   override def operation(table: KuduTable): Operation = table.newUpdate()
+
+  override def toString(): String = "update"
 }
 private[kudu] case object Upsert extends OperationType {
   override def operation(table: KuduTable): Operation = table.newUpsert()
+
+  override def toString(): String = "upsert"
 }
 private[kudu] case object Delete extends OperationType {
   override def operation(table: KuduTable): Operation = table.newDelete()
+
+  override def toString(): String = "delete"
 }