You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/05/02 02:54:59 UTC
spark git commit: [SPARK-2808][Streaming][Kafka] update kafka to 0.8.2
Repository: spark
Updated Branches:
refs/heads/master b88c275e6 -> 478648407
[SPARK-2808][Streaming][Kafka] update kafka to 0.8.2
i don't think this should be merged until after 1.3.0 is final
Author: cody koeninger <co...@koeninger.org>
Author: Helena Edelson <he...@datastax.com>
Closes #4537 from koeninger/wip-2808-kafka-0.8.2-upgrade and squashes the following commits:
803aa2c [cody koeninger] [SPARK-2808][Streaming][Kafka] code cleanup per TD
e6dfaf6 [cody koeninger] [SPARK-2808][Streaming][Kafka] pointless whitespace change to trigger jenkins again
1770abc [cody koeninger] [SPARK-2808][Streaming][Kafka] make waitUntilLeaderOffset easier to call, call it from python tests as well
d4267e9 [cody koeninger] [SPARK-2808][Streaming][Kafka] fix stderr redirect in python test script
30d991d [cody koeninger] [SPARK-2808][Streaming][Kafka] remove stderr prints since it breaks python 3 syntax
1d896e2 [cody koeninger] [SPARK-2808][Streaming][Kafka] add even even more logging to python test
4c4557f [cody koeninger] [SPARK-2808][Streaming][Kafka] add even more logging to python test
115aeee [cody koeninger] Merge branch 'master' into wip-2808-kafka-0.8.2-upgrade
2712649 [cody koeninger] [SPARK-2808][Streaming][Kafka] add more logging to python test, see why its timing out in jenkins
2b92d3f [cody koeninger] [SPARK-2808][Streaming][Kafka] wait for leader offsets in the java test as well
3824ce3 [cody koeninger] [SPARK-2808][Streaming][Kafka] naming / comments per tdas
61b3464 [cody koeninger] [SPARK-2808][Streaming][Kafka] delay for second send in boundary condition test
af6f3ec [cody koeninger] [SPARK-2808][Streaming][Kafka] delay test until latest leader offset matches expected value
9edab4c [cody koeninger] [SPARK-2808][Streaming][Kafka] more shots in the dark on jenkins failing test
c70ee43 [cody koeninger] [SPARK-2808][Streaming][Kafka] add more asserts to test, try to figure out why it fails on jenkins but not locally
1d10751 [cody koeninger] Merge branch 'master' into wip-2808-kafka-0.8.2-upgrade
ed02d2c [cody koeninger] [SPARK-2808][Streaming][Kafka] move default argument for api version to overloaded method, for binary compat
407382e [cody koeninger] [SPARK-2808][Streaming][Kafka] update kafka to 0.8.2.1
77de6c2 [cody koeninger] Merge branch 'master' into wip-2808-kafka-0.8.2-upgrade
6953429 [cody koeninger] [SPARK-2808][Streaming][Kafka] update kafka to 0.8.2
2e67c66 [Helena Edelson] #SPARK-2808 Update to Kafka 0.8.2.0 GA from beta.
d9dc2bc [Helena Edelson] Merge remote-tracking branch 'upstream/master' into wip-2808-kafka-0.8.2-upgrade
e768164 [Helena Edelson] #2808 update kafka to version 0.8.2
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/47864840
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/47864840
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/47864840
Branch: refs/heads/master
Commit: 4786484076865c56c3fc23c49819b9be2933d287
Parents: b88c275
Author: cody koeninger <co...@koeninger.org>
Authored: Fri May 1 17:54:56 2015 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Fri May 1 17:54:56 2015 -0700
----------------------------------------------------------------------
external/kafka/pom.xml | 2 +-
.../spark/streaming/kafka/KafkaCluster.scala | 51 ++++++++++++++++----
.../spark/streaming/kafka/KafkaTestUtils.scala | 35 ++++++++++++--
.../streaming/kafka/JavaKafkaRDDSuite.java | 3 ++
.../spark/streaming/kafka/KafkaRDDSuite.scala | 32 ++++++++----
python/pyspark/streaming/tests.py | 8 +--
6 files changed, 104 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/47864840/external/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/external/kafka/pom.xml b/external/kafka/pom.xml
index f695cff..243ce6e 100644
--- a/external/kafka/pom.xml
+++ b/external/kafka/pom.xml
@@ -44,7 +44,7 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
- <version>0.8.1.1</version>
+ <version>0.8.2.1</version>
<exclusions>
<exclusion>
<groupId>com.sun.jmx</groupId>
http://git-wip-us.apache.org/repos/asf/spark/blob/47864840/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
index bd76703..6cf254a 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
@@ -20,9 +20,10 @@ package org.apache.spark.streaming.kafka
import scala.util.control.NonFatal
import scala.util.Random
import scala.collection.mutable.ArrayBuffer
+import scala.collection.JavaConverters._
import java.util.Properties
import kafka.api._
-import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition}
+import kafka.common.{ErrorMapping, OffsetAndMetadata, OffsetMetadataAndError, TopicAndPartition}
import kafka.consumer.{ConsumerConfig, SimpleConsumer}
import org.apache.spark.SparkException
@@ -220,12 +221,22 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
// https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
// scalastyle:on
+ // this 0 here indicates api version, in this case the original ZK backed api.
+ private def defaultConsumerApiVersion: Short = 0
+
/** Requires Kafka >= 0.8.1.1 */
def getConsumerOffsets(
groupId: String,
topicAndPartitions: Set[TopicAndPartition]
+ ): Either[Err, Map[TopicAndPartition, Long]] =
+ getConsumerOffsets(groupId, topicAndPartitions, defaultConsumerApiVersion)
+
+ def getConsumerOffsets(
+ groupId: String,
+ topicAndPartitions: Set[TopicAndPartition],
+ consumerApiVersion: Short
): Either[Err, Map[TopicAndPartition, Long]] = {
- getConsumerOffsetMetadata(groupId, topicAndPartitions).right.map { r =>
+ getConsumerOffsetMetadata(groupId, topicAndPartitions, consumerApiVersion).right.map { r =>
r.map { kv =>
kv._1 -> kv._2.offset
}
@@ -236,9 +247,16 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
def getConsumerOffsetMetadata(
groupId: String,
topicAndPartitions: Set[TopicAndPartition]
+ ): Either[Err, Map[TopicAndPartition, OffsetMetadataAndError]] =
+ getConsumerOffsetMetadata(groupId, topicAndPartitions, defaultConsumerApiVersion)
+
+ def getConsumerOffsetMetadata(
+ groupId: String,
+ topicAndPartitions: Set[TopicAndPartition],
+ consumerApiVersion: Short
): Either[Err, Map[TopicAndPartition, OffsetMetadataAndError]] = {
var result = Map[TopicAndPartition, OffsetMetadataAndError]()
- val req = OffsetFetchRequest(groupId, topicAndPartitions.toSeq)
+ val req = OffsetFetchRequest(groupId, topicAndPartitions.toSeq, consumerApiVersion)
val errs = new Err
withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
val resp = consumer.fetchOffsets(req)
@@ -266,24 +284,39 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
def setConsumerOffsets(
groupId: String,
offsets: Map[TopicAndPartition, Long]
+ ): Either[Err, Map[TopicAndPartition, Short]] =
+ setConsumerOffsets(groupId, offsets, defaultConsumerApiVersion)
+
+ def setConsumerOffsets(
+ groupId: String,
+ offsets: Map[TopicAndPartition, Long],
+ consumerApiVersion: Short
): Either[Err, Map[TopicAndPartition, Short]] = {
- setConsumerOffsetMetadata(groupId, offsets.map { kv =>
- kv._1 -> OffsetMetadataAndError(kv._2)
- })
+ val meta = offsets.map { kv =>
+ kv._1 -> OffsetAndMetadata(kv._2)
+ }
+ setConsumerOffsetMetadata(groupId, meta, consumerApiVersion)
}
/** Requires Kafka >= 0.8.1.1 */
def setConsumerOffsetMetadata(
groupId: String,
- metadata: Map[TopicAndPartition, OffsetMetadataAndError]
+ metadata: Map[TopicAndPartition, OffsetAndMetadata]
+ ): Either[Err, Map[TopicAndPartition, Short]] =
+ setConsumerOffsetMetadata(groupId, metadata, defaultConsumerApiVersion)
+
+ def setConsumerOffsetMetadata(
+ groupId: String,
+ metadata: Map[TopicAndPartition, OffsetAndMetadata],
+ consumerApiVersion: Short
): Either[Err, Map[TopicAndPartition, Short]] = {
var result = Map[TopicAndPartition, Short]()
- val req = OffsetCommitRequest(groupId, metadata)
+ val req = OffsetCommitRequest(groupId, metadata, consumerApiVersion)
val errs = new Err
val topicAndPartitions = metadata.keySet
withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
val resp = consumer.commitOffsets(req)
- val respMap = resp.requestInfo
+ val respMap = resp.commitStatus
val needed = topicAndPartitions.diff(result.keySet)
needed.foreach { tp: TopicAndPartition =>
respMap.get(tp).foreach { err: Short =>
http://git-wip-us.apache.org/repos/asf/spark/blob/47864840/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
index 13e9475..6dc4e95 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
@@ -29,10 +29,12 @@ import scala.language.postfixOps
import scala.util.control.NonFatal
import kafka.admin.AdminUtils
+import kafka.api.Request
+import kafka.common.TopicAndPartition
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
import kafka.serializer.StringEncoder
import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.ZKStringSerializer
+import kafka.utils.{ZKStringSerializer, ZkUtils}
import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
import org.I0Itec.zkclient.ZkClient
@@ -227,12 +229,35 @@ private class KafkaTestUtils extends Logging {
tryAgain(1)
}
- private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = {
+ /** Wait until the leader offset for the given topic/partition equals the specified offset */
+ def waitUntilLeaderOffset(
+ topic: String,
+ partition: Int,
+ offset: Long): Unit = {
eventually(Time(10000), Time(100)) {
+ val kc = new KafkaCluster(Map("metadata.broker.list" -> brokerAddress))
+ val tp = TopicAndPartition(topic, partition)
+ val llo = kc.getLatestLeaderOffsets(Set(tp)).right.get.apply(tp).offset
assert(
- server.apis.metadataCache.containsTopicAndPartition(topic, partition),
- s"Partition [$topic, $partition] metadata not propagated after timeout"
- )
+ llo == offset,
+ s"$topic $partition $offset not reached after timeout")
+ }
+ }
+
+ private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = {
+ def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, partition) match {
+ case Some(partitionState) =>
+ val leaderAndInSyncReplicas = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr
+
+ ZkUtils.getLeaderForPartition(zkClient, topic, partition).isDefined &&
+ Request.isValidBrokerId(leaderAndInSyncReplicas.leader) &&
+ leaderAndInSyncReplicas.isr.size >= 1
+
+ case _ =>
+ false
+ }
+ eventually(Time(10000), Time(100)) {
+ assert(isPropagated, s"Partition [$topic, $partition] metadata not propagated after timeout")
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/47864840/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
index a9dc6e5..5cf3796 100644
--- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
@@ -72,6 +72,9 @@ public class JavaKafkaRDDSuite implements Serializable {
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress());
+ kafkaTestUtils.waitUntilLeaderOffset(topic1, 0, topic1data.length);
+ kafkaTestUtils.waitUntilLeaderOffset(topic2, 0, topic2data.length);
+
OffsetRange[] offsetRanges = {
OffsetRange.create(topic1, 0, 0, 1),
OffsetRange.create(topic2, 0, 0, 1)
http://git-wip-us.apache.org/repos/asf/spark/blob/47864840/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
index 7d26ce5..39c3fb4 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
@@ -53,14 +53,15 @@ class KafkaRDDSuite extends FunSuite with BeforeAndAfterAll {
}
test("basic usage") {
- val topic = "topicbasic"
+ val topic = s"topicbasic-${Random.nextInt}"
kafkaTestUtils.createTopic(topic)
val messages = Set("the", "quick", "brown", "fox")
kafkaTestUtils.sendMessages(topic, messages.toArray)
-
val kafkaParams = Map("metadata.broker.list" -> kafkaTestUtils.brokerAddress,
- "group.id" -> s"test-consumer-${Random.nextInt(10000)}")
+ "group.id" -> s"test-consumer-${Random.nextInt}")
+
+ kafkaTestUtils.waitUntilLeaderOffset(topic, 0, messages.size)
val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size))
@@ -73,27 +74,38 @@ class KafkaRDDSuite extends FunSuite with BeforeAndAfterAll {
test("iterator boundary conditions") {
// the idea is to find e.g. off-by-one errors between what kafka has available and the rdd
- val topic = "topic1"
+ val topic = s"topicboundary-${Random.nextInt}"
val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
kafkaTestUtils.createTopic(topic)
val kafkaParams = Map("metadata.broker.list" -> kafkaTestUtils.brokerAddress,
- "group.id" -> s"test-consumer-${Random.nextInt(10000)}")
+ "group.id" -> s"test-consumer-${Random.nextInt}")
val kc = new KafkaCluster(kafkaParams)
// this is the "lots of messages" case
kafkaTestUtils.sendMessages(topic, sent)
+ val sentCount = sent.values.sum
+ kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sentCount)
+
// rdd defined from leaders after sending messages, should get the number sent
val rdd = getRdd(kc, Set(topic))
assert(rdd.isDefined)
- assert(rdd.get.count === sent.values.sum, "didn't get all sent messages")
- val ranges = rdd.get.asInstanceOf[HasOffsetRanges]
- .offsetRanges.map(o => TopicAndPartition(o.topic, o.partition) -> o.untilOffset).toMap
+ val ranges = rdd.get.asInstanceOf[HasOffsetRanges].offsetRanges
+ val rangeCount = ranges.map(o => o.untilOffset - o.fromOffset).sum
- kc.setConsumerOffsets(kafkaParams("group.id"), ranges)
+ assert(rangeCount === sentCount, "offset range didn't include all sent messages")
+ assert(rdd.get.count === sentCount, "didn't get all sent messages")
+
+ val rangesMap = ranges.map(o => TopicAndPartition(o.topic, o.partition) -> o.untilOffset).toMap
+
+ // make sure consumer offsets are committed before the next getRdd call
+ kc.setConsumerOffsets(kafkaParams("group.id"), rangesMap).fold(
+ err => throw new Exception(err.mkString("\n")),
+ _ => ()
+ )
// this is the "0 messages" case
val rdd2 = getRdd(kc, Set(topic))
@@ -101,6 +113,8 @@ class KafkaRDDSuite extends FunSuite with BeforeAndAfterAll {
val sentOnlyOne = Map("d" -> 1)
kafkaTestUtils.sendMessages(topic, sentOnlyOne)
+ kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sentCount + 1)
+
assert(rdd2.isDefined)
assert(rdd2.get.count === 0, "got messages when there shouldn't be any")
http://git-wip-us.apache.org/repos/asf/spark/blob/47864840/python/pyspark/streaming/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 7c06c20..33ea8c9 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -606,7 +606,6 @@ class KafkaStreamTests(PySparkStreamingTestCase):
result = {}
for i in rdd.map(lambda x: x[1]).collect():
result[i] = result.get(i, 0) + 1
-
self.assertEqual(sendData, result)
def test_kafka_stream(self):
@@ -616,6 +615,7 @@ class KafkaStreamTests(PySparkStreamingTestCase):
self._kafkaTestUtils.createTopic(topic)
self._kafkaTestUtils.sendMessages(topic, sendData)
+ self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values()))
stream = KafkaUtils.createStream(self.ssc, self._kafkaTestUtils.zkAddress(),
"test-streaming-consumer", {topic: 1},
@@ -631,6 +631,7 @@ class KafkaStreamTests(PySparkStreamingTestCase):
self._kafkaTestUtils.createTopic(topic)
self._kafkaTestUtils.sendMessages(topic, sendData)
+ self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values()))
stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams)
self._validateStreamResult(sendData, stream)
@@ -645,6 +646,7 @@ class KafkaStreamTests(PySparkStreamingTestCase):
self._kafkaTestUtils.createTopic(topic)
self._kafkaTestUtils.sendMessages(topic, sendData)
+ self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values()))
stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams, fromOffsets)
self._validateStreamResult(sendData, stream)
@@ -659,7 +661,7 @@ class KafkaStreamTests(PySparkStreamingTestCase):
self._kafkaTestUtils.createTopic(topic)
self._kafkaTestUtils.sendMessages(topic, sendData)
-
+ self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values()))
rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges)
self._validateRddResult(sendData, rdd)
@@ -675,7 +677,7 @@ class KafkaStreamTests(PySparkStreamingTestCase):
self._kafkaTestUtils.createTopic(topic)
self._kafkaTestUtils.sendMessages(topic, sendData)
-
+ self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values()))
rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges, leaders)
self._validateRddResult(sendData, rdd)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org