You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/05/02 02:54:59 UTC

spark git commit: [SPARK-2808][Streaming][Kafka] update kafka to 0.8.2

Repository: spark
Updated Branches:
  refs/heads/master b88c275e6 -> 478648407


[SPARK-2808][Streaming][Kafka] update kafka to 0.8.2

i don't think this should be merged until after 1.3.0 is final

Author: cody koeninger <co...@koeninger.org>
Author: Helena Edelson <he...@datastax.com>

Closes #4537 from koeninger/wip-2808-kafka-0.8.2-upgrade and squashes the following commits:

803aa2c [cody koeninger] [SPARK-2808][Streaming][Kafka] code cleanup per TD
e6dfaf6 [cody koeninger] [SPARK-2808][Streaming][Kafka] pointless whitespace change to trigger jenkins again
1770abc [cody koeninger] [SPARK-2808][Streaming][Kafka] make waitUntilLeaderOffset easier to call, call it from python tests as well
d4267e9 [cody koeninger] [SPARK-2808][Streaming][Kafka] fix stderr redirect in python test script
30d991d [cody koeninger] [SPARK-2808][Streaming][Kafka] remove stderr prints since it breaks python 3 syntax
1d896e2 [cody koeninger] [SPARK-2808][Streaming][Kafka] add even even more logging to python test
4c4557f [cody koeninger] [SPARK-2808][Streaming][Kafka] add even more logging to python test
115aeee [cody koeninger] Merge branch 'master' into wip-2808-kafka-0.8.2-upgrade
2712649 [cody koeninger] [SPARK-2808][Streaming][Kafka] add more logging to python test, see why its timing out in jenkins
2b92d3f [cody koeninger] [SPARK-2808][Streaming][Kafka] wait for leader offsets in the java test as well
3824ce3 [cody koeninger] [SPARK-2808][Streaming][Kafka] naming / comments per tdas
61b3464 [cody koeninger] [SPARK-2808][Streaming][Kafka] delay for second send in boundary condition test
af6f3ec [cody koeninger] [SPARK-2808][Streaming][Kafka] delay test until latest leader offset matches expected value
9edab4c [cody koeninger] [SPARK-2808][Streaming][Kafka] more shots in the dark on jenkins failing test
c70ee43 [cody koeninger] [SPARK-2808][Streaming][Kafka] add more asserts to test, try to figure out why it fails on jenkins but not locally
1d10751 [cody koeninger] Merge branch 'master' into wip-2808-kafka-0.8.2-upgrade
ed02d2c [cody koeninger] [SPARK-2808][Streaming][Kafka] move default argument for api version to overloaded method, for binary compat
407382e [cody koeninger] [SPARK-2808][Streaming][Kafka] update kafka to 0.8.2.1
77de6c2 [cody koeninger] Merge branch 'master' into wip-2808-kafka-0.8.2-upgrade
6953429 [cody koeninger] [SPARK-2808][Streaming][Kafka] update kafka to 0.8.2
2e67c66 [Helena Edelson] #SPARK-2808 Update to Kafka 0.8.2.0 GA from beta.
d9dc2bc [Helena Edelson] Merge remote-tracking branch 'upstream/master' into wip-2808-kafka-0.8.2-upgrade
e768164 [Helena Edelson] #2808 update kafka to version 0.8.2


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/47864840
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/47864840
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/47864840

Branch: refs/heads/master
Commit: 4786484076865c56c3fc23c49819b9be2933d287
Parents: b88c275
Author: cody koeninger <co...@koeninger.org>
Authored: Fri May 1 17:54:56 2015 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Fri May 1 17:54:56 2015 -0700

----------------------------------------------------------------------
 external/kafka/pom.xml                          |  2 +-
 .../spark/streaming/kafka/KafkaCluster.scala    | 51 ++++++++++++++++----
 .../spark/streaming/kafka/KafkaTestUtils.scala  | 35 ++++++++++++--
 .../streaming/kafka/JavaKafkaRDDSuite.java      |  3 ++
 .../spark/streaming/kafka/KafkaRDDSuite.scala   | 32 ++++++++----
 python/pyspark/streaming/tests.py               |  8 +--
 6 files changed, 104 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/47864840/external/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/external/kafka/pom.xml b/external/kafka/pom.xml
index f695cff..243ce6e 100644
--- a/external/kafka/pom.xml
+++ b/external/kafka/pom.xml
@@ -44,7 +44,7 @@
     <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_${scala.binary.version}</artifactId>
-      <version>0.8.1.1</version>
+      <version>0.8.2.1</version>
       <exclusions>
         <exclusion>
           <groupId>com.sun.jmx</groupId>

http://git-wip-us.apache.org/repos/asf/spark/blob/47864840/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
index bd76703..6cf254a 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
@@ -20,9 +20,10 @@ package org.apache.spark.streaming.kafka
 import scala.util.control.NonFatal
 import scala.util.Random
 import scala.collection.mutable.ArrayBuffer
+import scala.collection.JavaConverters._
 import java.util.Properties
 import kafka.api._
-import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition}
+import kafka.common.{ErrorMapping, OffsetAndMetadata, OffsetMetadataAndError, TopicAndPartition}
 import kafka.consumer.{ConsumerConfig, SimpleConsumer}
 import org.apache.spark.SparkException
 
@@ -220,12 +221,22 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
   // https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
   // scalastyle:on
 
+  // this 0 here indicates api version, in this case the original ZK backed api.
+  private def defaultConsumerApiVersion: Short = 0
+
   /** Requires Kafka >= 0.8.1.1 */
   def getConsumerOffsets(
       groupId: String,
       topicAndPartitions: Set[TopicAndPartition]
+    ): Either[Err, Map[TopicAndPartition, Long]] =
+    getConsumerOffsets(groupId, topicAndPartitions, defaultConsumerApiVersion)
+
+  def getConsumerOffsets(
+      groupId: String,
+      topicAndPartitions: Set[TopicAndPartition],
+      consumerApiVersion: Short
     ): Either[Err, Map[TopicAndPartition, Long]] = {
-    getConsumerOffsetMetadata(groupId, topicAndPartitions).right.map { r =>
+    getConsumerOffsetMetadata(groupId, topicAndPartitions, consumerApiVersion).right.map { r =>
       r.map { kv =>
         kv._1 -> kv._2.offset
       }
@@ -236,9 +247,16 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
   def getConsumerOffsetMetadata(
       groupId: String,
       topicAndPartitions: Set[TopicAndPartition]
+    ): Either[Err, Map[TopicAndPartition, OffsetMetadataAndError]] =
+    getConsumerOffsetMetadata(groupId, topicAndPartitions, defaultConsumerApiVersion)
+
+  def getConsumerOffsetMetadata(
+      groupId: String,
+      topicAndPartitions: Set[TopicAndPartition],
+      consumerApiVersion: Short
     ): Either[Err, Map[TopicAndPartition, OffsetMetadataAndError]] = {
     var result = Map[TopicAndPartition, OffsetMetadataAndError]()
-    val req = OffsetFetchRequest(groupId, topicAndPartitions.toSeq)
+    val req = OffsetFetchRequest(groupId, topicAndPartitions.toSeq, consumerApiVersion)
     val errs = new Err
     withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
       val resp = consumer.fetchOffsets(req)
@@ -266,24 +284,39 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
   def setConsumerOffsets(
       groupId: String,
       offsets: Map[TopicAndPartition, Long]
+    ): Either[Err, Map[TopicAndPartition, Short]] =
+    setConsumerOffsets(groupId, offsets, defaultConsumerApiVersion)
+
+  def setConsumerOffsets(
+      groupId: String,
+      offsets: Map[TopicAndPartition, Long],
+      consumerApiVersion: Short
     ): Either[Err, Map[TopicAndPartition, Short]] = {
-    setConsumerOffsetMetadata(groupId, offsets.map { kv =>
-      kv._1 -> OffsetMetadataAndError(kv._2)
-    })
+    val meta = offsets.map { kv =>
+      kv._1 -> OffsetAndMetadata(kv._2)
+    }
+    setConsumerOffsetMetadata(groupId, meta, consumerApiVersion)
   }
 
   /** Requires Kafka >= 0.8.1.1 */
   def setConsumerOffsetMetadata(
       groupId: String,
-      metadata: Map[TopicAndPartition, OffsetMetadataAndError]
+      metadata: Map[TopicAndPartition, OffsetAndMetadata]
+    ): Either[Err, Map[TopicAndPartition, Short]] =
+    setConsumerOffsetMetadata(groupId, metadata, defaultConsumerApiVersion)
+
+  def setConsumerOffsetMetadata(
+      groupId: String,
+      metadata: Map[TopicAndPartition, OffsetAndMetadata],
+      consumerApiVersion: Short
     ): Either[Err, Map[TopicAndPartition, Short]] = {
     var result = Map[TopicAndPartition, Short]()
-    val req = OffsetCommitRequest(groupId, metadata)
+    val req = OffsetCommitRequest(groupId, metadata, consumerApiVersion)
     val errs = new Err
     val topicAndPartitions = metadata.keySet
     withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
       val resp = consumer.commitOffsets(req)
-      val respMap = resp.requestInfo
+      val respMap = resp.commitStatus
       val needed = topicAndPartitions.diff(result.keySet)
       needed.foreach { tp: TopicAndPartition =>
         respMap.get(tp).foreach { err: Short =>

http://git-wip-us.apache.org/repos/asf/spark/blob/47864840/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
index 13e9475..6dc4e95 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
@@ -29,10 +29,12 @@ import scala.language.postfixOps
 import scala.util.control.NonFatal
 
 import kafka.admin.AdminUtils
+import kafka.api.Request
+import kafka.common.TopicAndPartition
 import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
 import kafka.serializer.StringEncoder
 import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.ZKStringSerializer
+import kafka.utils.{ZKStringSerializer, ZkUtils}
 import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
 import org.I0Itec.zkclient.ZkClient
 
@@ -227,12 +229,35 @@ private class KafkaTestUtils extends Logging {
     tryAgain(1)
   }
 
-  private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = {
+  /** Wait until the leader offset for the given topic/partition equals the specified offset */
+  def waitUntilLeaderOffset(
+      topic: String,
+      partition: Int,
+      offset: Long): Unit = {
     eventually(Time(10000), Time(100)) {
+      val kc = new KafkaCluster(Map("metadata.broker.list" -> brokerAddress))
+      val tp = TopicAndPartition(topic, partition)
+      val llo = kc.getLatestLeaderOffsets(Set(tp)).right.get.apply(tp).offset
       assert(
-        server.apis.metadataCache.containsTopicAndPartition(topic, partition),
-        s"Partition [$topic, $partition] metadata not propagated after timeout"
-      )
+        llo == offset,
+        s"$topic $partition $offset not reached after timeout")
+    }
+  }
+
+  private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = {
+    def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, partition) match {
+      case Some(partitionState) =>
+        val leaderAndInSyncReplicas = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr
+
+        ZkUtils.getLeaderForPartition(zkClient, topic, partition).isDefined &&
+          Request.isValidBrokerId(leaderAndInSyncReplicas.leader) &&
+          leaderAndInSyncReplicas.isr.size >= 1
+
+      case _ =>
+        false
+    }
+    eventually(Time(10000), Time(100)) {
+      assert(isPropagated, s"Partition [$topic, $partition] metadata not propagated after timeout")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/47864840/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
index a9dc6e5..5cf3796 100644
--- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
@@ -72,6 +72,9 @@ public class JavaKafkaRDDSuite implements Serializable {
     HashMap<String, String> kafkaParams = new HashMap<String, String>();
     kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress());
 
+    kafkaTestUtils.waitUntilLeaderOffset(topic1, 0, topic1data.length);
+    kafkaTestUtils.waitUntilLeaderOffset(topic2, 0, topic2data.length);
+
     OffsetRange[] offsetRanges = {
       OffsetRange.create(topic1, 0, 0, 1),
       OffsetRange.create(topic2, 0, 0, 1)

http://git-wip-us.apache.org/repos/asf/spark/blob/47864840/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
index 7d26ce5..39c3fb4 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
@@ -53,14 +53,15 @@ class KafkaRDDSuite extends FunSuite with BeforeAndAfterAll {
   }
 
   test("basic usage") {
-    val topic = "topicbasic"
+    val topic = s"topicbasic-${Random.nextInt}"
     kafkaTestUtils.createTopic(topic)
     val messages = Set("the", "quick", "brown", "fox")
     kafkaTestUtils.sendMessages(topic, messages.toArray)
 
-
     val kafkaParams = Map("metadata.broker.list" -> kafkaTestUtils.brokerAddress,
-      "group.id" -> s"test-consumer-${Random.nextInt(10000)}")
+      "group.id" -> s"test-consumer-${Random.nextInt}")
+
+    kafkaTestUtils.waitUntilLeaderOffset(topic, 0, messages.size)
 
     val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size))
 
@@ -73,27 +74,38 @@ class KafkaRDDSuite extends FunSuite with BeforeAndAfterAll {
 
   test("iterator boundary conditions") {
     // the idea is to find e.g. off-by-one errors between what kafka has available and the rdd
-    val topic = "topic1"
+    val topic = s"topicboundary-${Random.nextInt}"
     val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
     kafkaTestUtils.createTopic(topic)
 
     val kafkaParams = Map("metadata.broker.list" -> kafkaTestUtils.brokerAddress,
-      "group.id" -> s"test-consumer-${Random.nextInt(10000)}")
+      "group.id" -> s"test-consumer-${Random.nextInt}")
 
     val kc = new KafkaCluster(kafkaParams)
 
     // this is the "lots of messages" case
     kafkaTestUtils.sendMessages(topic, sent)
+    val sentCount = sent.values.sum
+    kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sentCount)
+
     // rdd defined from leaders after sending messages, should get the number sent
     val rdd = getRdd(kc, Set(topic))
 
     assert(rdd.isDefined)
-    assert(rdd.get.count === sent.values.sum, "didn't get all sent messages")
 
-    val ranges = rdd.get.asInstanceOf[HasOffsetRanges]
-      .offsetRanges.map(o => TopicAndPartition(o.topic, o.partition) -> o.untilOffset).toMap
+    val ranges = rdd.get.asInstanceOf[HasOffsetRanges].offsetRanges
+    val rangeCount = ranges.map(o => o.untilOffset - o.fromOffset).sum
 
-    kc.setConsumerOffsets(kafkaParams("group.id"), ranges)
+    assert(rangeCount === sentCount, "offset range didn't include all sent messages")
+    assert(rdd.get.count === sentCount, "didn't get all sent messages")
+
+    val rangesMap = ranges.map(o => TopicAndPartition(o.topic, o.partition) -> o.untilOffset).toMap
+
+    // make sure consumer offsets are committed before the next getRdd call
+    kc.setConsumerOffsets(kafkaParams("group.id"), rangesMap).fold(
+      err => throw new Exception(err.mkString("\n")),
+      _ => ()
+    )
 
     // this is the "0 messages" case
     val rdd2 = getRdd(kc, Set(topic))
@@ -101,6 +113,8 @@ class KafkaRDDSuite extends FunSuite with BeforeAndAfterAll {
     val sentOnlyOne = Map("d" -> 1)
 
     kafkaTestUtils.sendMessages(topic, sentOnlyOne)
+    kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sentCount + 1)
+
     assert(rdd2.isDefined)
     assert(rdd2.get.count === 0, "got messages when there shouldn't be any")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/47864840/python/pyspark/streaming/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 7c06c20..33ea8c9 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -606,7 +606,6 @@ class KafkaStreamTests(PySparkStreamingTestCase):
         result = {}
         for i in rdd.map(lambda x: x[1]).collect():
             result[i] = result.get(i, 0) + 1
-
         self.assertEqual(sendData, result)
 
     def test_kafka_stream(self):
@@ -616,6 +615,7 @@ class KafkaStreamTests(PySparkStreamingTestCase):
 
         self._kafkaTestUtils.createTopic(topic)
         self._kafkaTestUtils.sendMessages(topic, sendData)
+        self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values()))
 
         stream = KafkaUtils.createStream(self.ssc, self._kafkaTestUtils.zkAddress(),
                                          "test-streaming-consumer", {topic: 1},
@@ -631,6 +631,7 @@ class KafkaStreamTests(PySparkStreamingTestCase):
 
         self._kafkaTestUtils.createTopic(topic)
         self._kafkaTestUtils.sendMessages(topic, sendData)
+        self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values()))
 
         stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams)
         self._validateStreamResult(sendData, stream)
@@ -645,6 +646,7 @@ class KafkaStreamTests(PySparkStreamingTestCase):
 
         self._kafkaTestUtils.createTopic(topic)
         self._kafkaTestUtils.sendMessages(topic, sendData)
+        self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values()))
 
         stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams, fromOffsets)
         self._validateStreamResult(sendData, stream)
@@ -659,7 +661,7 @@ class KafkaStreamTests(PySparkStreamingTestCase):
 
         self._kafkaTestUtils.createTopic(topic)
         self._kafkaTestUtils.sendMessages(topic, sendData)
-
+        self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values()))
         rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges)
         self._validateRddResult(sendData, rdd)
 
@@ -675,7 +677,7 @@ class KafkaStreamTests(PySparkStreamingTestCase):
 
         self._kafkaTestUtils.createTopic(topic)
         self._kafkaTestUtils.sendMessages(topic, sendData)
-
+        self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values()))
         rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges, leaders)
         self._validateRddResult(sendData, rdd)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org