You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by dongjinleekr <gi...@git.apache.org> on 2018/08/30 12:28:21 UTC

[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...

GitHub user dongjinleekr opened a pull request:

    https://github.com/apache/spark/pull/22282

    [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming

    ## What changes were proposed in this pull request?
    
    This update adds support for Kafka Headers functionality in Structured Streaming.
    
    ## How was this patch tested?
    
    With following unit tests:
    
    - KafkaRelationSuite: "default starting and ending offsets with headers" (new)
    - KafkaSinkSuite: "batch - write to kafka" (updated)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/dongjinleekr/spark feature/SPARK-23539

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/22282.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #22282
    
----
commit ddd08612ef8bdb173f974059d2dc6311e1c7d9a3
Author: Lee Dongjin <do...@...>
Date:   2018-08-26T13:19:52Z

    Remove unused methods from UnsafeArrayData

commit 2af13899ab052cc7b52c25b57f154b78a2c45b2a
Author: Lee Dongjin <do...@...>
Date:   2018-08-26T13:25:25Z

    Implement UnsafeArrayData#fromBinaryArray

commit a8e5c5c0f478a795af1236771236da2074093f3e
Author: Lee Dongjin <do...@...>
Date:   2018-08-27T12:22:18Z

    Implement UnsafeArrayData#fromStringArray

commit 2ca181046cf1102aed14f4957e11e4dd901ba3c7
Author: Lee Dongjin <do...@...>
Date:   2018-08-27T13:28:58Z

    Implement UnsafeMapData#of

commit d0d746d99d0a19ecbb2dc098589adbfd1ef0b5ae
Author: Lee Dongjin <do...@...>
Date:   2018-08-29T13:25:57Z

    Allow empty UnsafeArrayData: does not throw IllegalArgumentException on empty or null array argument anymore.

commit b459cf3f391d6e4ee9cb77a7b5ed510d027d9ddd
Author: Lee Dongjin <do...@...>
Date:   2018-08-28T07:50:59Z

    Fix invalid formatting: UnsafeArraySuite

commit f077c5d75a83df3541a95a628726e1d74af8c153
Author: Lee Dongjin <do...@...>
Date:   2018-08-28T07:51:30Z

    Implemenet kafka headers functionality

commit 6b4d7754d01e4211d05c83746c848fdfd873f229
Author: Lee Dongjin <do...@...>
Date:   2018-08-29T09:39:55Z

    Add KafkaTestUtils#{sendMessage, sendMessages(String, Array[(String, String, Array[(String, String)])], Option[Int])}

commit c7fb9819989056da4910e2cfc81af332cd603d41
Author: Lee Dongjin <do...@...>
Date:   2018-08-29T13:24:09Z

    Extend KafkaRelationSuite, KafkaSinkSuite to test headers functionality

commit dd2d9390478e4c69b01a7c699e28bfe923ef0db1
Author: Lee Dongjin <do...@...>
Date:   2018-08-30T10:50:38Z

    Minor refinements

commit 229aac85442b03736fc850cae2c3b26becaedade
Author: Lee Dongjin <do...@...>
Date:   2018-08-30T12:21:30Z

    Specify #selectExpr on KafkaSourceSuiteBase's 'Kafka column types' test

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22282
  
    **[Test build #95468 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95468/testReport)** for PR 22282 at commit [`229aac8`](https://github.com/apache/spark/commit/229aac85442b03736fc850cae2c3b26becaedade).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22282
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95731/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22282
  
    **[Test build #95731 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95731/testReport)** for PR 22282 at commit [`b0f64e9`](https://github.com/apache/spark/commit/b0f64e91cb4f6306a7c0c60d4a17f1a0aacb3a51).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22282
  
    **[Test build #95779 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95779/testReport)** for PR 22282 at commit [`220bd0a`](https://github.com/apache/spark/commit/220bd0a90b0c606b8f74c227218bec7bb6614782).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22282
  
    **[Test build #98726 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/98726/testReport)** for PR 22282 at commit [`2c48aae`](https://github.com/apache/spark/commit/2c48aae653551c4c505fe90eef4b1e260db4fcb7).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22282#discussion_r214084903
  
    --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java ---
    @@ -473,17 +474,6 @@ public static UnsafeArrayData fromPrimitiveArray(
         return result;
       }
     
    -  public static UnsafeArrayData forPrimitiveArray(int offset, int length, int elementSize) {
    -    return fromPrimitiveArray(null, offset, length, elementSize);
    -  }
    -
    -  public static boolean shouldUseGenericArrayData(int elementSize, int length) {
    --- End diff --
    
    Yep, the UT failed log proved this:https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95468/testReport/org.apache.spark.sql.catalyst.expressions/CollectionExpressionsSuite/Array_Union/


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22282
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95740/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22282#discussion_r215092933
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala ---
    @@ -88,7 +92,30 @@ private[kafka010] abstract class KafkaRowWriter(
           throw new NullPointerException(s"null topic present in the data. Use the " +
             s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a default topic.")
         }
    -    val record = new ProducerRecord[Array[Byte], Array[Byte]](topic.toString, key, value)
    +    val record = if (projectedRow.isNullAt(3)) {
    +      new ProducerRecord[Array[Byte], Array[Byte]](
    +        topic.toString,
    +        null,
    +        key,
    +        value
    +      )
    +    } else {
    +      val headerMap = projectedRow.getMap(3)
    +      val headers = (0 until headerMap.numElements()).toArray.map(
    +        i =>
    +          new RecordHeader(
    --- End diff --
    
    Yeah then I also think it is missing spot for Kafka. Just asked it to Kafka dev mailing list.
    https://lists.apache.org/thread.html/2ec3e7e2345e64ac559d98aaa28e0980f07a9778db447168e19d41d2@%3Cdev.kafka.apache.org%3E
    
    If Kafka community says it's missing spot, either of us can go ahead fixing that. You can take it forward if you're happy to do it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22282
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22282#discussion_r214622600
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala ---
    @@ -115,7 +116,12 @@ private[kafka010] class KafkaRelation(
             cr.partition,
             cr.offset,
             DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(cr.timestamp)),
    -        cr.timestampType.id)
    +        cr.timestampType.id,
    +        UnsafeMapData.of(
    +          UnsafeArrayData.fromStringArray(cr.headers().toArray.map(_.key())),
    --- End diff --
    
    Same here.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22282
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22282
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

Posted by dongjinleekr <gi...@git.apache.org>.
Github user dongjinleekr commented on the issue:

    https://github.com/apache/spark/pull/22282
  
    cc/ @zsxwing @tdas @dongjoon-hyun @srowen Rebased onto the latest master. Please have a look when you are free. Thanks in advance.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22282
  
    **[Test build #95543 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95543/testReport)** for PR 22282 at commit [`8a5ef14`](https://github.com/apache/spark/commit/8a5ef14bacb437bb2b1d3143892d37dac7837760).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...

Posted by dongjinleekr <gi...@git.apache.org>.
Github user dongjinleekr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22282#discussion_r214345393
  
    --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java ---
    @@ -473,17 +474,6 @@ public static UnsafeArrayData fromPrimitiveArray(
         return result;
       }
     
    -  public static UnsafeArrayData forPrimitiveArray(int offset, int length, int elementSize) {
    -    return fromPrimitiveArray(null, offset, length, elementSize);
    -  }
    -
    -  public static boolean shouldUseGenericArrayData(int elementSize, int length) {
    --- End diff --
    
    Thank you for your kind guidance. I drop the commit removing some methods - it was totally wrong! :)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22282#discussion_r214640616
  
    --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala ---
    @@ -59,14 +59,23 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext with KafkaTest {
         val topic = newTopic()
         testUtils.createTopic(topic)
         val df = Seq("1", "2", "3", "4", "5").map(v => (topic, v)).toDF("topic", "value")
    +      .withColumn("headers",
    +        map(lit("x"), col("value").plus(1).cast(IntegerType).cast(StringType).cast(BinaryType),
    +          lit("y"), col("value").multiply(2).cast(IntegerType).cast(StringType).cast(BinaryType)))
         df.write
           .format("kafka")
           .option("kafka.bootstrap.servers", testUtils.brokerAddress)
           .option("topic", topic)
           .save()
         checkAnswer(
    -      createKafkaReader(topic).selectExpr("CAST(value as STRING) value"),
    -      Row("1") :: Row("2") :: Row("3") :: Row("4") :: Row("5") :: Nil)
    +      createKafkaReader(topic).selectExpr(
    +        "CAST(value as STRING) value",
    +        "CAST(headers.x AS STRING)",
    +        "CAST(headers.y AS STRING)"
    +      ),
    +      Row("1", "2", "2") :: Row("2", "3", "4") :: Row("3", "4", "6") :: Row("4", "5", "8"
    --- End diff --
    
    nit: `) ::` could be added here instead of next line.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

Posted by dongjinleekr <gi...@git.apache.org>.
Github user dongjinleekr commented on the issue:

    https://github.com/apache/spark/pull/22282
  
    retest this please.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22282
  
    **[Test build #95543 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95543/testReport)** for PR 22282 at commit [`8a5ef14`](https://github.com/apache/spark/commit/8a5ef14bacb437bb2b1d3143892d37dac7837760).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22282
  
    **[Test build #95731 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95731/testReport)** for PR 22282 at commit [`b0f64e9`](https://github.com/apache/spark/commit/b0f64e91cb4f6306a7c0c60d4a17f1a0aacb3a51).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22282#discussion_r214632570
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala ---
    @@ -88,7 +92,30 @@ private[kafka010] abstract class KafkaRowWriter(
           throw new NullPointerException(s"null topic present in the data. Use the " +
             s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a default topic.")
         }
    -    val record = new ProducerRecord[Array[Byte], Array[Byte]](topic.toString, key, value)
    +    val record = if (projectedRow.isNullAt(3)) {
    +      new ProducerRecord[Array[Byte], Array[Byte]](
    +        topic.toString,
    +        null,
    +        key,
    +        value
    +      )
    +    } else {
    +      val headerMap = projectedRow.getMap(3)
    +      val headers = (0 until headerMap.numElements()).toArray.map(
    +        i =>
    +          new RecordHeader(
    --- End diff --
    
    Looks like RecordHeader is in `org.apache.kafka.common.header.internals` package. Is there any alternative public methods/classes to create Header? Or it is just a missing spot in Kafka?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

Posted by dongjinleekr <gi...@git.apache.org>.
Github user dongjinleekr commented on the issue:

    https://github.com/apache/spark/pull/22282
  
    Retest this please.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22282
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95468/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22282
  
    **[Test build #95672 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95672/testReport)** for PR 22282 at commit [`253a894`](https://github.com/apache/spark/commit/253a894bedbd3a9642e529ad937dcb99dae346c7).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22282
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95670/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22282
  
    **[Test build #95670 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95670/testReport)** for PR 22282 at commit [`2254009`](https://github.com/apache/spark/commit/22540092f7a786585afeb5a861b1c329722e3d0b).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22282
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/98726/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22282
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95906/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22282#discussion_r214075761
  
    --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java ---
    @@ -473,17 +474,6 @@ public static UnsafeArrayData fromPrimitiveArray(
         return result;
       }
     
    -  public static UnsafeArrayData forPrimitiveArray(int offset, int length, int elementSize) {
    -    return fromPrimitiveArray(null, offset, length, elementSize);
    -  }
    -
    -  public static boolean shouldUseGenericArrayData(int elementSize, int length) {
    --- End diff --
    
    I think `shouldUseGenericArrayData` is still used in generated code, check the code here:
    https://github.com/apache/spark/blob/b459cf3f391d6e4ee9cb77a7b5ed510d027d9ddd/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala#L3633


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22282
  
    **[Test build #95468 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95468/testReport)** for PR 22282 at commit [`229aac8`](https://github.com/apache/spark/commit/229aac85442b03736fc850cae2c3b26becaedade).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22282
  
    **[Test build #98726 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/98726/testReport)** for PR 22282 at commit [`2c48aae`](https://github.com/apache/spark/commit/2c48aae653551c4c505fe90eef4b1e260db4fcb7).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22282
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95672/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22282#discussion_r214622654
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala ---
    @@ -293,7 +294,12 @@ private[kafka010] class KafkaSource(
             cr.partition,
             cr.offset,
             DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(cr.timestamp)),
    -        cr.timestampType.id)
    +        cr.timestampType.id,
    +        UnsafeMapData.of(
    +          UnsafeArrayData.fromStringArray(cr.headers().toArray.map(_.key())),
    --- End diff --
    
    Same here.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22282
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95779/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22282
  
    **[Test build #95779 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95779/testReport)** for PR 22282 at commit [`220bd0a`](https://github.com/apache/spark/commit/220bd0a90b0c606b8f74c227218bec7bb6614782).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22282
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...

Posted by dongjinleekr <gi...@git.apache.org>.
Github user dongjinleekr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22282#discussion_r214345173
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala ---
    @@ -131,9 +158,25 @@ private[kafka010] abstract class KafkaRowWriter(
             throw new IllegalStateException(s"${KafkaWriter.VALUE_ATTRIBUTE_NAME} " +
               s"attribute unsupported type ${t.catalogString}")
         }
    +    val headersExpression = inputSchema
    +      .find(_.name == KafkaWriter.HEADERS_ATTRIBUTE_NAME).getOrElse(
    +      Literal(CatalystTypeConverters.convertToCatalyst(null), MapType(StringType, BinaryType))
    +    )
    +    headersExpression.dataType match {
    +      case MapType(StringType, BinaryType, true) => // good
    +      case t =>
    +        throw new IllegalStateException(s"${KafkaWriter.HEADERS_ATTRIBUTE_NAME} " +
    --- End diff --
    
    Oh, I misunderstood; After reviewing the code, I found that `KafkaRowWriter#createProjection` throws `IllegalStateException` while `KafkaWriter#validateQuery` throwing `AnalysisException.` I think the reason should be attributed to the difference between two methods - while the former one detects the error from the state of `InternalRow,` the later one does by analyzing the expression's schema.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on the issue:

    https://github.com/apache/spark/pull/22282
  
    retest this please.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...

Posted by dongjinleekr <gi...@git.apache.org>.
Github user dongjinleekr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22282#discussion_r214856258
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala ---
    @@ -88,7 +92,30 @@ private[kafka010] abstract class KafkaRowWriter(
           throw new NullPointerException(s"null topic present in the data. Use the " +
             s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a default topic.")
         }
    -    val record = new ProducerRecord[Array[Byte], Array[Byte]](topic.toString, key, value)
    +    val record = if (projectedRow.isNullAt(3)) {
    +      new ProducerRecord[Array[Byte], Array[Byte]](
    +        topic.toString,
    +        null,
    +        key,
    +        value
    +      )
    +    } else {
    +      val headerMap = projectedRow.getMap(3)
    +      val headers = (0 until headerMap.numElements()).toArray.map(
    +        i =>
    +          new RecordHeader(
    --- End diff --
    
    As of September 2018, `RecordHeader` is the only implementation provided by Kafka. As you can see [here](https://memorynotfound.com/spring-kafka-adding-custom-header-kafka-message-example/), this way is widely used - I think it is more natural for `RecordHeader` to be hidden by some builder classes but it's not. It seems a missing spot.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22282
  
    **[Test build #95906 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95906/testReport)** for PR 22282 at commit [`220bd0a`](https://github.com/apache/spark/commit/220bd0a90b0c606b8f74c227218bec7bb6614782).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22282
  
    **[Test build #95740 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95740/testReport)** for PR 22282 at commit [`220bd0a`](https://github.com/apache/spark/commit/220bd0a90b0c606b8f74c227218bec7bb6614782).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22282
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22282
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...

Posted by dongjinleekr <gi...@git.apache.org>.
Github user dongjinleekr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22282#discussion_r214198620
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala ---
    @@ -131,9 +158,25 @@ private[kafka010] abstract class KafkaRowWriter(
             throw new IllegalStateException(s"${KafkaWriter.VALUE_ATTRIBUTE_NAME} " +
               s"attribute unsupported type ${t.catalogString}")
         }
    +    val headersExpression = inputSchema
    +      .find(_.name == KafkaWriter.HEADERS_ATTRIBUTE_NAME).getOrElse(
    +      Literal(CatalystTypeConverters.convertToCatalyst(null), MapType(StringType, BinaryType))
    +    )
    +    headersExpression.dataType match {
    +      case MapType(StringType, BinaryType, true) => // good
    +      case t =>
    +        throw new IllegalStateException(s"${KafkaWriter.HEADERS_ATTRIBUTE_NAME} " +
    --- End diff --
    
    Just a typo.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22282#discussion_r214618743
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala ---
    @@ -44,6 +44,11 @@ private[kafka010] class KafkaRecordToUnsafeRowConverter {
           5,
           DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(record.timestamp)))
         rowWriter.write(6, record.timestampType.id)
    +    val keys = record.headers.toArray.map(_.key())
    --- End diff --
    
    Might be better to define a new local value for `record.headers.toArray`, because it creates a new array when `headers` is not empty. It also guarantees consistent view for extracting keys and values, though we know `headers` is unlikely to be modified during this.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22282#discussion_r214639674
  
    --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala ---
    @@ -136,6 +142,19 @@ class KafkaRelationSuite extends QueryTest with SharedSQLContext with KafkaTest
         checkAnswer(df, (0 to 30).map(_.toString).toDF)
       }
     
    +  test("default starting and ending offsets with headers") {
    +    val topic = newTopic()
    +    testUtils.createTopic(topic, partitions = 3)
    +    testUtils.sendMessage(topic, (null, "1", Array(("once", "1"), ("twice", "2"))), Some(0))
    +    testUtils.sendMessage(topic, (null, "2", Array(("once", "2"), ("twice", "4"))), Some(1))
    +    testUtils.sendMessage(topic, (null, "3", Array(("once", "3"), ("twice", "6"))), Some(2))
    +
    +    // Implicit offset values, should default to earliest and latest
    +    val df = createDF(topic, Map.empty[String, String], None, true)
    --- End diff --
    
    nit: explicitly assigning `includeHeaders = true` may look easier to see.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22282
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22282
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22282
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22282#discussion_r214646749
  
    --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala ---
    @@ -1254,6 +1254,9 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
           .option("startingOffsets", s"earliest")
           .option("subscribe", topic)
           .load()
    +      .selectExpr(
    --- End diff --
    
    I just indicated your comment that without `selectExpr` the test failed. Is it related to the addition of header field? I guess we should make sure it also works without `selectExpr` so that we can check also `headers` with memory sink.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

Posted by dongjinleekr <gi...@git.apache.org>.
Github user dongjinleekr commented on the issue:

    https://github.com/apache/spark/pull/22282
  
    retest this please.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22282
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22282
  
    **[Test build #95906 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95906/testReport)** for PR 22282 at commit [`220bd0a`](https://github.com/apache/spark/commit/220bd0a90b0c606b8f74c227218bec7bb6614782).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22282
  
    **[Test build #95670 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95670/testReport)** for PR 22282 at commit [`2254009`](https://github.com/apache/spark/commit/22540092f7a786585afeb5a861b1c329722e3d0b).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

Posted by dongjinleekr <gi...@git.apache.org>.
Github user dongjinleekr commented on the issue:

    https://github.com/apache/spark/pull/22282
  
    As you can see, this PR consists of 3 parts:
    
    1. Extend `UnsafeArrayData`, `UnsafeMapData` (commit 1~6)
    2. Implement Kafka Headers functionality (commit 7, 10)
    3. Update unit tests (commit 8, 9)
    
    I have the following questions:
    
    1. Should I separate group 1 as a separated issue?
    2. I found that KafkaSourceSuiteBase's 'Kafka column types' test is missing a select expression. The weird thing is that it works before the update but does not work after the update. (It is why the last commit was added - without specification, this test does not pass.) Is this intended one? Or, Do I misunderstanding something?
    
    Please have a look when you are free. Thanks.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22282#discussion_r214638140
  
    --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala ---
    @@ -1254,6 +1254,9 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
           .option("startingOffsets", s"earliest")
           .option("subscribe", topic)
           .load()
    +      .selectExpr(
    --- End diff --
    
    It would be better if we check exhaustive columns here, so adding check for headers sounds better to me instead of limiting columns.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22282#discussion_r214635480
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala ---
    @@ -88,7 +92,30 @@ private[kafka010] abstract class KafkaRowWriter(
           throw new NullPointerException(s"null topic present in the data. Use the " +
             s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a default topic.")
         }
    -    val record = new ProducerRecord[Array[Byte], Array[Byte]](topic.toString, key, value)
    +    val record = if (projectedRow.isNullAt(3)) {
    +      new ProducerRecord[Array[Byte], Array[Byte]](
    --- End diff --
    
    nit: Spark scala style guide stated below:
    
    https://github.com/databricks/scala-style-guide#spacing-and-indentation
    
    > For method and class constructor invocations, use 2 space indentation for its parameters and put each in each line when the parameters don't fit in two lines.
    
    there're multiple places which these lines can be compacted into one or two lines.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22282
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95543/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...

Posted by tedyu <gi...@git.apache.org>.
Github user tedyu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22282#discussion_r214073971
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala ---
    @@ -131,9 +158,25 @@ private[kafka010] abstract class KafkaRowWriter(
             throw new IllegalStateException(s"${KafkaWriter.VALUE_ATTRIBUTE_NAME} " +
               s"attribute unsupported type ${t.catalogString}")
         }
    +    val headersExpression = inputSchema
    +      .find(_.name == KafkaWriter.HEADERS_ATTRIBUTE_NAME).getOrElse(
    +      Literal(CatalystTypeConverters.convertToCatalyst(null), MapType(StringType, BinaryType))
    +    )
    +    headersExpression.dataType match {
    +      case MapType(StringType, BinaryType, true) => // good
    +      case t =>
    +        throw new IllegalStateException(s"${KafkaWriter.HEADERS_ATTRIBUTE_NAME} " +
    --- End diff --
    
    This exception is different from the AnalysisException thrown in the next class.
    What's the reason ?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22282
  
    **[Test build #95672 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95672/testReport)** for PR 22282 at commit [`253a894`](https://github.com/apache/spark/commit/253a894bedbd3a9642e529ad937dcb99dae346c7).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22282#discussion_r214629265
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala ---
    @@ -88,7 +92,30 @@ private[kafka010] abstract class KafkaRowWriter(
           throw new NullPointerException(s"null topic present in the data. Use the " +
             s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a default topic.")
         }
    -    val record = new ProducerRecord[Array[Byte], Array[Byte]](topic.toString, key, value)
    +    val record = if (projectedRow.isNullAt(3)) {
    +      new ProducerRecord[Array[Byte], Array[Byte]](
    +        topic.toString,
    +        null,
    +        key,
    +        value
    +      )
    +    } else {
    +      val headerMap = projectedRow.getMap(3)
    +      val headers = (0 until headerMap.numElements()).toArray.map(
    --- End diff --
    
    We could remove `.toArray` here and also `.toIterable` in `headers.toIterable.asJava` unless there's performance difference.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22282
  
    **[Test build #95740 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95740/testReport)** for PR 22282 at commit [`220bd0a`](https://github.com/apache/spark/commit/220bd0a90b0c606b8f74c227218bec7bb6614782).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22282
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org