You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by vackosar <gi...@git.apache.org> on 2018/08/19 18:03:50 UTC

[GitHub] spark pull request #22143: [SPARK-24647][SS] Report KafkaStreamWriter's writ...

GitHub user vackosar opened a pull request:

    https://github.com/apache/spark/pull/22143

    [SPARK-24647][SS] Report KafkaStreamWriter's written min and max offs…

    …ets via CustomMetrics.
    
    ## What changes were proposed in this pull request?
    
    Report KafkaStreamWriter's written min and max offsets via CustomMetrics. This is important for data lineage projects like Spline. Related issue: https://issues.apache.org/jira/browse/SPARK-24647
    
    ## How was this patch tested?
    
    Unit tests.
    
    Please review http://spark.apache.org/contributing.html before opening a pull request.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/AbsaOSS/spark feature/SPARK-24647-kafka-writer-offsets

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/22143.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #22143
    
----
commit 767a0156a1acef515974d48bbcb6cfdb17f68d90
Author: Kosar, Vaclav: Functions Transformation <va...@...>
Date:   2018-08-17T13:31:57Z

    [SPARK-24647][SS] Report KafkaStreamWriter's written min and max offsets via CustomMetrics.

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22143: [SPARK-24647][SS] Report KafkaStreamWriter's written min...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22143
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22143: [SPARK-24647][SS] Report KafkaStreamWriter's writ...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22143#discussion_r211336368
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala ---
    @@ -19,18 +19,23 @@ package org.apache.spark.sql.kafka010
     
     import scala.collection.JavaConverters._
     
    +import org.json4s.JsonDSL._
    +import org.json4s.jackson.JsonMethods._
    +
     import org.apache.spark.sql.catalyst.InternalRow
     import org.apache.spark.sql.catalyst.expressions.Attribute
     import org.apache.spark.sql.kafka010.KafkaWriter.validateQuery
    +import org.apache.spark.sql.sources.v2.CustomMetrics
     import org.apache.spark.sql.sources.v2.writer._
    -import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
    +import org.apache.spark.sql.sources.v2.writer.streaming.{StreamWriter, SupportsCustomWriterMetrics}
     import org.apache.spark.sql.types.StructType
     
     /**
      * Dummy commit message. The DataSourceV2 framework requires a commit message implementation but we
      * don't need to really send one.
      */
    -case object KafkaWriterCommitMessage extends WriterCommitMessage
    +case class KafkaWriterCommitMessage(minOffset: KafkaSourceOffset, maxOffset: KafkaSourceOffset)
    --- End diff --
    
    Its kind of odd that the writer commit message includes source offset. IMO, better to define a `KafkaSinkOffset` or if it can be common, something like `KafkaOffsets`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22143: [SPARK-24647][SS] Report KafkaStreamWriter's written min...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22143
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22143: [SPARK-24647][SS] Report KafkaStreamWriter's written min...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22143
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94937/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22143: [SPARK-24647][SS] Report KafkaStreamWriter's writ...

Posted by vackosar <gi...@git.apache.org>.
Github user vackosar commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22143#discussion_r211379697
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala ---
    @@ -19,18 +19,23 @@ package org.apache.spark.sql.kafka010
     
     import scala.collection.JavaConverters._
     
    +import org.json4s.JsonDSL._
    +import org.json4s.jackson.JsonMethods._
    +
     import org.apache.spark.sql.catalyst.InternalRow
     import org.apache.spark.sql.catalyst.expressions.Attribute
     import org.apache.spark.sql.kafka010.KafkaWriter.validateQuery
    +import org.apache.spark.sql.sources.v2.CustomMetrics
     import org.apache.spark.sql.sources.v2.writer._
    -import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
    +import org.apache.spark.sql.sources.v2.writer.streaming.{StreamWriter, SupportsCustomWriterMetrics}
     import org.apache.spark.sql.types.StructType
     
     /**
      * Dummy commit message. The DataSourceV2 framework requires a commit message implementation but we
      * don't need to really send one.
      */
    -case object KafkaWriterCommitMessage extends WriterCommitMessage
    +case class KafkaWriterCommitMessage(minOffset: KafkaSourceOffset, maxOffset: KafkaSourceOffset)
    --- End diff --
    
    I would have to rename the class itself to not add additional duplicate class. I would love to do that, it is just that I am not sure if it would be accepted.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22143: [SPARK-24647][SS] Report KafkaStreamWriter's written min...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22143
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22143: [SPARK-24647][SS] Report KafkaStreamWriter's written min...

Posted by vackosar <gi...@git.apache.org>.
Github user vackosar commented on the issue:

    https://github.com/apache/spark/pull/22143
  
    @arunmahadevan min and max are used there can be other writers to same topic occurring in different job. The messages sent would then become interleaved and one would have to return large number of intervals to be accurate. This approach gives sufficient information where the data ended up being written, while being also resilient and simplistic. Would you recommend adding this as a Java Doc?
    
    To explain montivation I updated description of this PR using description of the Jira. (To track data lineage we need to know where data was read from and written to at least approaximately.)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22143: [SPARK-24647][SS] Report KafkaStreamWriter's written min...

Posted by koeninger <gi...@git.apache.org>.
Github user koeninger commented on the issue:

    https://github.com/apache/spark/pull/22143
  
    Jenkins, ok to test


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22143: [SPARK-24647][SS] Report KafkaStreamWriter's written min...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22143
  
    **[Test build #94937 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94937/testReport)** for PR 22143 at commit [`767a015`](https://github.com/apache/spark/commit/767a0156a1acef515974d48bbcb6cfdb17f68d90).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22143: [SPARK-24647][SS] Report KafkaStreamWriter's writ...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22143#discussion_r211339535
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala ---
    @@ -61,12 +63,30 @@ private[kafka010] class KafkaWriteTask(
     private[kafka010] abstract class KafkaRowWriter(
         inputSchema: Seq[Attribute], topic: Option[String]) {
     
    +  import scala.collection.JavaConverters._
    +
    +  protected val minOffsetAccumulator: collection.concurrent.Map[TopicPartition, Long] =
    +    new ConcurrentHashMap[TopicPartition, Long]().asScala
    --- End diff --
    
    why is this concurrent map?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22143: [SPARK-24647][SS] Report KafkaStreamWriter's written min...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22143
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22143: [SPARK-24647][SS] Report KafkaStreamWriter's writ...

Posted by vackosar <gi...@git.apache.org>.
Github user vackosar commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22143#discussion_r211381706
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala ---
    @@ -116,3 +133,66 @@ class KafkaStreamDataWriter(
         }
       }
     }
    +
    +private[kafka010] case class KafkaWriterCustomMetrics(
    +    minOffset: KafkaSourceOffset,
    +    maxOffset: KafkaSourceOffset) extends CustomMetrics {
    +  override def json(): String = {
    +    val jsonVal = ("minOffset" -> parse(minOffset.json)) ~
    +      ("maxOffset" -> parse(maxOffset.json))
    +    compact(render(jsonVal))
    +  }
    +
    +  override def toString: String = json()
    +}
    +
    +private[kafka010] object KafkaWriterCustomMetrics {
    +
    +  import Math.{min, max}
    +
    +  def apply(messages: Array[WriterCommitMessage]): KafkaWriterCustomMetrics = {
    +    val minMax = collate(messages)
    +    KafkaWriterCustomMetrics(minMax._1, minMax._2)
    +  }
    +
    +  private def collate(messages: Array[WriterCommitMessage]):
    --- End diff --
    
    Thanks, I will rename to something with minMax.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22143: [SPARK-24647][SS] Report KafkaStreamWriter's written min...

Posted by vackosar <gi...@git.apache.org>.
Github user vackosar commented on the issue:

    https://github.com/apache/spark/pull/22143
  
     @cloud-fan are you ok merging the PR?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22143: [SPARK-24647][SS] Report KafkaStreamWriter's written min...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22143
  
    **[Test build #94941 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94941/testReport)** for PR 22143 at commit [`c812eff`](https://github.com/apache/spark/commit/c812effdf54f3ce371fde561c26e2485ce996b64).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `case class KafkaWriterCommitMessage(minOffset: KafkaSourceOffset, maxOffset: KafkaSourceOffset)`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22143: [SPARK-24647][SS] Report KafkaStreamWriter's written min...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22143
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94941/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22143: [SPARK-24647][SS] Report KafkaStreamWriter's writ...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22143#discussion_r211336988
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala ---
    @@ -116,3 +133,66 @@ class KafkaStreamDataWriter(
         }
       }
     }
    +
    +private[kafka010] case class KafkaWriterCustomMetrics(
    +    minOffset: KafkaSourceOffset,
    +    maxOffset: KafkaSourceOffset) extends CustomMetrics {
    +  override def json(): String = {
    +    val jsonVal = ("minOffset" -> parse(minOffset.json)) ~
    +      ("maxOffset" -> parse(maxOffset.json))
    +    compact(render(jsonVal))
    +  }
    +
    +  override def toString: String = json()
    +}
    +
    +private[kafka010] object KafkaWriterCustomMetrics {
    +
    +  import Math.{min, max}
    +
    +  def apply(messages: Array[WriterCommitMessage]): KafkaWriterCustomMetrics = {
    +    val minMax = collate(messages)
    +    KafkaWriterCustomMetrics(minMax._1, minMax._2)
    +  }
    +
    +  private def collate(messages: Array[WriterCommitMessage]):
    --- End diff --
    
    good to leave some comment on what this does. It seems to be computing the min/max offset per partition? If so choosing an apt name for that function would make it clearer.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22143: [SPARK-24647][SS] Report KafkaStreamWriter's writ...

Posted by vackosar <gi...@git.apache.org>.
Github user vackosar commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22143#discussion_r211379432
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala ---
    @@ -61,12 +63,30 @@ private[kafka010] class KafkaWriteTask(
     private[kafka010] abstract class KafkaRowWriter(
         inputSchema: Seq[Attribute], topic: Option[String]) {
     
    +  import scala.collection.JavaConverters._
    +
    +  protected val minOffsetAccumulator: collection.concurrent.Map[TopicPartition, Long] =
    +    new ConcurrentHashMap[TopicPartition, Long]().asScala
    --- End diff --
    
    This map is accessed in callbacks concurrently with respect to different partitions. Can be seen from call hierarchy and docs of Kafka's send method.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22143: [SPARK-24647][SS] Report KafkaStreamWriter's written min...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22143
  
    **[Test build #94937 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94937/testReport)** for PR 22143 at commit [`767a015`](https://github.com/apache/spark/commit/767a0156a1acef515974d48bbcb6cfdb17f68d90).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `case class KafkaWriterCommitMessage(minOffset: KafkaSourceOffset, maxOffset: KafkaSourceOffset)`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22143: [SPARK-24647][SS] Report KafkaStreamWriter's written min...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22143
  
    **[Test build #94941 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94941/testReport)** for PR 22143 at commit [`c812eff`](https://github.com/apache/spark/commit/c812effdf54f3ce371fde561c26e2485ce996b64).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22143: [SPARK-24647][SS] Report KafkaStreamWriter's written min...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22143
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22143: [SPARK-24647][SS] Report KafkaStreamWriter's written min...

Posted by vackosar <gi...@git.apache.org>.
Github user vackosar commented on the issue:

    https://github.com/apache/spark/pull/22143
  
    @arunmahadevan @jose-torres @cloud-fan you may interested in this one.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22143: [SPARK-24647][SS] Report KafkaStreamWriter's written min...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22143
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org