You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/03/25 00:40:14 UTC

[spark] branch master updated: [SPARK-27260][SS] Upgrade to Kafka 2.2.0

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ef94e0  [SPARK-27260][SS] Upgrade to Kafka 2.2.0
6ef94e0 is described below

commit 6ef94e0f18d3c2b482564fddd3e6181c05b75280
Author: Dongjoon Hyun <dh...@apple.com>
AuthorDate: Sun Mar 24 17:39:57 2019 -0700

    [SPARK-27260][SS] Upgrade to Kafka 2.2.0
    
    ## What changes were proposed in this pull request?
    
    This PR aims to update Kafka dependency to 2.2.0 to bring the following improvement and bug fixes.
    - https://issues.apache.org/jira/projects/KAFKA/versions/12344063
    
    Due to [KAFKA-4453](https://issues.apache.org/jira/browse/KAFKA-4453), data plane API and controller plane API are separated. Apache Spark needs the following changes.
    ```scala
    - servers.head.apis.metadataCache
    + servers.head.dataPlaneRequestProcessor.metadataCache
    ```
    
    ## How was this patch tested?
    
    Pass the Jenkins with the existing tests.
    
    Closes #24190 from dongjoon-hyun/SPARK-27260.
    
    Authored-by: Dongjoon Hyun <dh...@apple.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala  | 3 ++-
 .../scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala     | 3 ++-
 pom.xml                                                                | 2 +-
 3 files changed, 5 insertions(+), 3 deletions(-)

diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
index 9fa88b4..70b6e67 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
@@ -429,7 +429,8 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
   }
 
   private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = {
-    def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, partition) match {
+    def isPropagated = server.dataPlaneRequestProcessor.metadataCache
+        .getPartitionInfo(topic, partition) match {
       case Some(partitionState) =>
         zkUtils.getLeaderForPartition(topic, partition).isDefined &&
           Request.isValidBrokerId(partitionState.basePartitionState.leader) &&
diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
index fd9fd00..5dec970 100644
--- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
+++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
@@ -286,7 +286,8 @@ private[kafka010] class KafkaTestUtils extends Logging {
   }
 
   private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = {
-    def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, partition) match {
+    def isPropagated = server.dataPlaneRequestProcessor.metadataCache
+        .getPartitionInfo(topic, partition) match {
       case Some(partitionState) =>
         val leader = partitionState.basePartitionState.leader
         val isr = partitionState.basePartitionState.isr
diff --git a/pom.xml b/pom.xml
index 33b9922..1977505 100644
--- a/pom.xml
+++ b/pom.xml
@@ -131,7 +131,7 @@
     <!-- Version used for internal directory structure -->
     <hive.version.short>1.2.1</hive.version.short>
     <!-- note that this should be compatible with Kafka brokers version 0.10 and up -->
-    <kafka.version>2.1.1</kafka.version>
+    <kafka.version>2.2.0</kafka.version>
     <derby.version>10.12.1.1</derby.version>
     <parquet.version>1.10.1</parquet.version>
     <orc.version>1.5.5</orc.version>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org