You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/03/25 00:40:14 UTC
[spark] branch master updated: [SPARK-27260][SS] Upgrade to Kafka
2.2.0
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 6ef94e0 [SPARK-27260][SS] Upgrade to Kafka 2.2.0
6ef94e0 is described below
commit 6ef94e0f18d3c2b482564fddd3e6181c05b75280
Author: Dongjoon Hyun <dh...@apple.com>
AuthorDate: Sun Mar 24 17:39:57 2019 -0700
[SPARK-27260][SS] Upgrade to Kafka 2.2.0
## What changes were proposed in this pull request?
This PR aims to update Kafka dependency to 2.2.0 to bring the following improvement and bug fixes.
- https://issues.apache.org/jira/projects/KAFKA/versions/12344063
Due to [KAFKA-4453](https://issues.apache.org/jira/browse/KAFKA-4453), data plane API and controller plane API are separated. Apache Spark needs the following changes.
```scala
- servers.head.apis.metadataCache
+ servers.head.dataPlaneRequestProcessor.metadataCache
```
## How was this patch tested?
Pass the Jenkins with the existing tests.
Closes #24190 from dongjoon-hyun/SPARK-27260.
Authored-by: Dongjoon Hyun <dh...@apple.com>
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
.../src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala | 3 ++-
.../scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala | 3 ++-
pom.xml | 2 +-
3 files changed, 5 insertions(+), 3 deletions(-)
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
index 9fa88b4..70b6e67 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
@@ -429,7 +429,8 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
}
private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = {
- def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, partition) match {
+ def isPropagated = server.dataPlaneRequestProcessor.metadataCache
+ .getPartitionInfo(topic, partition) match {
case Some(partitionState) =>
zkUtils.getLeaderForPartition(topic, partition).isDefined &&
Request.isValidBrokerId(partitionState.basePartitionState.leader) &&
diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
index fd9fd00..5dec970 100644
--- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
+++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
@@ -286,7 +286,8 @@ private[kafka010] class KafkaTestUtils extends Logging {
}
private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = {
- def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, partition) match {
+ def isPropagated = server.dataPlaneRequestProcessor.metadataCache
+ .getPartitionInfo(topic, partition) match {
case Some(partitionState) =>
val leader = partitionState.basePartitionState.leader
val isr = partitionState.basePartitionState.isr
diff --git a/pom.xml b/pom.xml
index 33b9922..1977505 100644
--- a/pom.xml
+++ b/pom.xml
@@ -131,7 +131,7 @@
<!-- Version used for internal directory structure -->
<hive.version.short>1.2.1</hive.version.short>
<!-- note that this should be compatible with Kafka brokers version 0.10 and up -->
- <kafka.version>2.1.1</kafka.version>
+ <kafka.version>2.2.0</kafka.version>
<derby.version>10.12.1.1</derby.version>
<parquet.version>1.10.1</parquet.version>
<orc.version>1.5.5</orc.version>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org