You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by jerryshao <gi...@git.apache.org> on 2014/10/29 06:16:38 UTC

[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

GitHub user jerryshao opened a pull request:

    https://github.com/apache/spark/pull/2991

    [SPARK-4062][Streaming]Add ReliableKafkaReceiver in Spark Streaming Kafka connector

    Add ReliableKafkaReceiver in Kafka connector to prevent data loss if WAL in Spark Streaming is enabled. Details and design doc can be seen in [SPARK-4062](https://issues.apache.org/jira/browse/SPARK-4062).

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/jerryshao/apache-spark kafka-refactor

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/2991.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2991
    
----
commit c173cc1b018de65225509ddf9de6c12468450896
Author: jerryshao <sa...@intel.com>
Date:   2014-10-23T08:19:35Z

    Initial commit for reliable Kafka receiver

commit 46bb2db679478cf878ad4d1dedde3767cd69211e
Author: jerryshao <sa...@intel.com>
Date:   2014-10-27T08:55:13Z

    Code refactor and add some unit tests

commit a18c3e0286f99ea3803c5e743b78beff43d34cc0
Author: jerryshao <sa...@intel.com>
Date:   2014-10-28T02:56:18Z

    Add some comments

commit cd64678d4868b2bb53d375e27918f5f81aa651a9
Author: jerryshao <sa...@intel.com>
Date:   2014-10-28T05:38:23Z

    Change the ordering of imports

commit 5cc4cb198662cb35008d9a2e46e320f75ce35a71
Author: jerryshao <sa...@intel.com>
Date:   2014-10-29T05:08:50Z

    Address the comments

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2991#discussion_r19992995
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala ---
    @@ -0,0 +1,212 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka
    +
    +import java.util.Properties
    +import java.util.concurrent.{ConcurrentHashMap, Executors}
    +
    +import scala.collection.Map
    +import scala.collection.mutable
    +import scala.reflect.{classTag, ClassTag}
    +
    +import kafka.common.TopicAndPartition
    +import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector}
    +import kafka.serializer.Decoder
    +import kafka.utils.{ZkUtils, ZKGroupTopicDirs, ZKStringSerializer, VerifiableProperties}
    +import org.I0Itec.zkclient.ZkClient
    +
    +import org.apache.spark.{SparkEnv, Logging}
    +import org.apache.spark.storage.{StreamBlockId, StorageLevel}
    +import org.apache.spark.streaming.receiver.{BlockGeneratorListener, BlockGenerator, Receiver}
    +
    +private[streaming]
    +class ReliableKafkaReceiver[
    +  K: ClassTag,
    +  V: ClassTag,
    +  U <: Decoder[_]: ClassTag,
    +  T <: Decoder[_]: ClassTag](
    +    kafkaParams: Map[String, String],
    +    topics: Map[String, Int],
    +    storageLevel: StorageLevel)
    +    extends Receiver[Any](storageLevel) with Logging {
    +
    +  /** High level consumer to connect to Kafka */
    +  private var consumerConnector: ConsumerConnector = null
    +
    +  /** zkClient to connect to Zookeeper to commit the offsets */
    +  private var zkClient: ZkClient = null
    +
    +  private val groupId = kafkaParams("group.id")
    +
    +  private lazy val env = SparkEnv.get
    +
    +  private val AUTO_OFFSET_COMMIT = "auto.commit.enable"
    +
    +  /** A HashMap to manage the offset for each topic/partition, this HashMap is called in
    +    * synchronized block, so mutable HashMap will not meet concurrency issue */
    +  private lazy val topicPartitionOffsetMap = new mutable.HashMap[TopicAndPartition, Long]
    +
    +  /** A concurrent HashMap to store the stream block id and related offset snapshot */
    +  private lazy val blockOffsetMap =
    +    new ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]]
    +
    +  private lazy val blockGeneratorListener = new BlockGeneratorListener {
    +    override def onStoreData(data: Any, metadata: Any): Unit = {
    +      if (metadata != null) {
    +        val kafkaMetadata = metadata.asInstanceOf[(TopicAndPartition, Long)]
    +        topicPartitionOffsetMap.put(kafkaMetadata._1, kafkaMetadata._2)
    +      }
    +    }
    +
    +    override def onGenerateBlock(blockId: StreamBlockId): Unit = {
    +      // Get a snapshot of current offset map and store with related block id. Since this hook
    +      // function is called in synchronized block, so we can get the snapshot without explicit lock.
    +      val offsetSnapshot = topicPartitionOffsetMap.toMap
    +      blockOffsetMap.put(blockId, offsetSnapshot)
    +      topicPartitionOffsetMap.clear()
    +    }
    +
    +    override def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
    +      store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[Any]])
    +
    +      // Commit and remove the related offsets.
    +      Option(blockOffsetMap.get(blockId)).foreach { offsetMap =>
    +        commitOffset(offsetMap)
    +      }
    +      blockOffsetMap.remove(blockId)
    +    }
    +
    +    override def onError(message: String, throwable: Throwable): Unit = {
    +      reportError(message, throwable)
    +    }
    +  }
    +
    +  /** Manage the BlockGenerator in receiver itself for better managing block store and offset
    --- End diff --
    
    Please fix comment style. Multi lines comments should be 
    ```
    /**
     *
     */
    ```
    
    https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide#SparkCodeStyleGuide-Codedocumentationstyle


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-63026569
  
      [Test build #23362 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/23362/consoleFull) for   PR 2991 at commit [`5461f1c`](https://github.com/apache/spark/commit/5461f1c43b0e98aa7b583f14569eefd833b19df0).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-62682114
  
      [Test build #23251 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/23251/consoleFull) for   PR 2991 at commit [`b798535`](https://github.com/apache/spark/commit/b798535f1e6c8c981bfb700fe6083e881072d210).
     * This patch **fails MiMa tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2991#discussion_r20130193
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala ---
    @@ -80,9 +89,10 @@ private[streaming] class BlockGenerator(
        * Push a single data item into the buffer. All received data items
        * will be periodically pushed into BlockManager.
        */
    -  def += (data: Any): Unit = synchronized {
    +  def += (data: Any, metadata: Any = null): Unit = synchronized {
         waitToPush()
         currentBuffer += data
    +    listener.onStoreData(data, metadata)
    --- End diff --
    
    Actually that is more obvious, and introduces minimum extra logic in the BlockGenerator, keeping the BlockGenerator more generic. It is tricky to document stuff like "method 1 and method 2 of BlockGenerator are called in the same lock, but not other methods".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-62452662
  
    @jerryshao This is a very good observation. I think this is going to be an experimental update, it is okay to do it this way for now. If there are issues with large kafka installation we can try to optimize this further later. Maybe Kafka 0.9 will help. 
    
    Regarding performance, do you have sense of how much drop in performance is it going to be?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-62679992
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23250/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2991#discussion_r20129302
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaReceiver.scala ---
    @@ -0,0 +1,135 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka
    +
    +import java.util.Properties
    +import java.util.concurrent.Executors
    +
    +import scala.collection.Map
    +import scala.reflect.{classTag, ClassTag}
    +
    +import kafka.consumer._
    +import kafka.serializer.Decoder
    +import kafka.utils.VerifiableProperties
    +import kafka.utils.ZKStringSerializer
    +import org.I0Itec.zkclient._
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.streaming.receiver.Receiver
    +
    +private[streaming]
    +class KafkaReceiver[
    +  K: ClassTag,
    +  V: ClassTag,
    +  U <: Decoder[_]: ClassTag,
    +  T <: Decoder[_]: ClassTag](
    +    kafkaParams: Map[String, String],
    +    topics: Map[String, Int],
    +    storageLevel: StorageLevel
    +  ) extends Receiver[Any](storageLevel) with Logging {
    +
    +  // Connection to Kafka
    +  var consumerConnector: ConsumerConnector = null
    +
    +  def onStop() {
    +    if (consumerConnector != null) {
    +      consumerConnector.shutdown()
    +    }
    +  }
    +
    +  def onStart() {
    +
    +    logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id"))
    +
    +    // Kafka connection properties
    +    val props = new Properties()
    +    kafkaParams.foreach(param => props.put(param._1, param._2))
    +
    +    val zkConnect = kafkaParams("zookeeper.connect")
    +    // Create the connection to the cluster
    +    logInfo("Connecting to Zookeeper: " + zkConnect)
    +    val consumerConfig = new ConsumerConfig(props)
    +    consumerConnector = Consumer.create(consumerConfig)
    +    logInfo("Connected to " + zkConnect)
    +
    +    // When auto.offset.reset is defined, it is our responsibility to try and whack the
    +    // consumer group zk node.
    +    if (kafkaParams.contains("auto.offset.reset")) {
    +      tryZookeeperConsumerGroupCleanup(zkConnect, kafkaParams("group.id"))
    +    }
    +
    +    val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
    +      .newInstance(consumerConfig.props)
    +      .asInstanceOf[Decoder[K]]
    +    val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
    +      .newInstance(consumerConfig.props)
    +      .asInstanceOf[Decoder[V]]
    +
    +    // Create Threads for each Topic/Message Stream we are listening
    +    val topicMessageStreams = consumerConnector.createMessageStreams(
    +      topics, keyDecoder, valueDecoder)
    +
    +    val executorPool = Executors.newFixedThreadPool(topics.values.sum)
    +    try {
    +      // Start the messages handler for each partition
    +      topicMessageStreams.values.foreach { streams =>
    +        streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }
    +      }
    +    } finally {
    +      executorPool.shutdown() // Just causes threads to terminate after work is done
    +    }
    +  }
    +
    +  // Handles Kafka Messages
    +  private class MessageHandler[K: ClassTag, V: ClassTag](stream: KafkaStream[K, V])
    +    extends Runnable {
    +    def run() {
    +      logInfo("Starting MessageHandler.")
    +      try {
    +        for (msgAndMetadata <- stream) {
    +          store((msgAndMetadata.key, msgAndMetadata.message))
    +        }
    +      } catch {
    +        case e: Throwable => logError("Error handling message; exiting", e)
    +      }
    +    }
    +  }
    +
    +  // It is our responsibility to delete the consumer group when specifying auto.offset.reset. This
    +  // is because Kafka 0.7.2 only honors this param when the group is not in zookeeper.
    +  //
    +  // The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied
    +  // from Kafka's ConsoleConsumer. See code related to 'auto.offset.reset' when it is set to
    +  // 'smallest'/'largest':
    +  // scalastyle:off
    +  // https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
    +  // scalastyle:on
    +  private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) {
    +    val dir = "/consumers/" + groupId
    +    logInfo("Cleaning up temporary Zookeeper data under " + dir + ".")
    --- End diff --
    
    Please use Scala string interpolation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-62674043
  
    Hi TD, I just updated the code as you suggested, would you mind taking a look at it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2991#discussion_r20210320
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala ---
    @@ -0,0 +1,251 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka
    +
    +import java.util.Properties
    +import java.util.concurrent.ConcurrentHashMap
    +
    +import scala.collection.Map
    +import scala.collection.mutable
    +import scala.reflect.{classTag, ClassTag}
    +
    +import kafka.common.TopicAndPartition
    +import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, KafkaStream}
    +import kafka.serializer.Decoder
    +import kafka.utils.{ZkUtils, ZKGroupTopicDirs, ZKStringSerializer, VerifiableProperties}
    +import org.I0Itec.zkclient.ZkClient
    +
    +import org.apache.spark.{SparkEnv, Logging}
    +import org.apache.spark.storage.{StreamBlockId, StorageLevel}
    +import org.apache.spark.streaming.receiver.{BlockGeneratorListener, BlockGenerator, Receiver}
    +import org.apache.spark.util.Utils
    +
    +
    +/**
    + * ReliableKafkaReceiver offers the ability to reliably store data into BlockManager without loss.
    + * It is turned off by default and will be enabled when
    + * spark.streaming.receiver.writeAheadLog.enable is true. The difference compared to KafkaReceiver
    + * is that this receiver manages topic-partition/offset itself and updates the offset information
    + * after data is reliably stored as write-ahead log. Offsets will only be updated when data is
    + * reliably stored, so the potential data loss problem of KafkaReceiver can be eliminated.
    + *
    + * Note: ReliableKafkaReceiver will set auto.commit.enable to false to turn off automatic offset
    + * commit mechanism in Kafka consumer. So setting this configuration manually within kafkaParams
    + * will not take effect.
    + */
    +private[streaming]
    +class ReliableKafkaReceiver[
    +  K: ClassTag,
    +  V: ClassTag,
    +  U <: Decoder[_]: ClassTag,
    +  T <: Decoder[_]: ClassTag](
    +    kafkaParams: Map[String, String],
    +    topics: Map[String, Int],
    +    storageLevel: StorageLevel)
    +    extends Receiver[(K, V)](storageLevel) with Logging {
    +
    +  private val groupId = kafkaParams("group.id")
    +
    +  private val AUTO_OFFSET_COMMIT = "auto.commit.enable"
    +
    +  private def conf() = SparkEnv.get.conf
    +
    +  /** High level consumer to connect to Kafka. */
    +  private var consumerConnector: ConsumerConnector = null
    +
    +  /** zkClient to connect to Zookeeper to commit the offsets. */
    +  private var zkClient: ZkClient = null
    +
    +  /**
    +   * A HashMap to manage the offset for each topic/partition, this HashMap is called in
    +   * synchronized block, so mutable HashMap will not meet concurrency issue.
    +   */
    +  private var topicPartitionOffsetMap: mutable.HashMap[TopicAndPartition, Long] = null
    +
    +  /** A concurrent HashMap to store the stream block id and related offset snapshot. */
    +  private var blockOffsetMap: ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]] = null
    +
    +  /**
    +   * Manage the BlockGenerator in receiver itself for better managing block store and offset
    +   * commit.
    +   */
    +  private var blockGenerator: BlockGenerator = null
    +
    +  /** Kafka offsets checkpoint listener to register into BlockGenerator for offsets checkpoint. */
    +  private final class OffsetCheckpointListener extends BlockGeneratorListener {
    +
    +    override def onGenerateBlock(blockId: StreamBlockId): Unit = {
    +      // Get a snapshot of current offset map and store with related block id. Since this hook
    +      // function is called in synchronized block, so we can get the snapshot without explicit lock.
    +      val offsetSnapshot = topicPartitionOffsetMap.toMap
    --- End diff --
    
    There is no synchronization between on the blockGenerator here. Since the insertion is synchronized with the block generator, getting the map also has to be synchronized on the same object.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-62675074
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23245/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-60881849
  
      [Test build #22438 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22438/consoleFull) for   PR 2991 at commit [`09d57c5`](https://github.com/apache/spark/commit/09d57c5270cf876e52d49db702e2330c2b6a6e10).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2991#discussion_r20129842
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala ---
    @@ -0,0 +1,241 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka
    +
    +import java.util.Properties
    +import java.util.concurrent.ConcurrentHashMap
    +
    +import scala.collection.Map
    +import scala.collection.mutable
    +import scala.reflect.{classTag, ClassTag}
    +
    +import kafka.common.TopicAndPartition
    +import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, KafkaStream}
    +import kafka.serializer.Decoder
    +import kafka.utils.{ZkUtils, ZKGroupTopicDirs, ZKStringSerializer, VerifiableProperties}
    +import org.I0Itec.zkclient.ZkClient
    +
    +import org.apache.spark.{SparkEnv, Logging}
    +import org.apache.spark.storage.{StreamBlockId, StorageLevel}
    +import org.apache.spark.streaming.receiver.{BlockGeneratorListener, BlockGenerator, Receiver}
    +import org.apache.spark.util.Utils
    +
    +private[streaming]
    +class ReliableKafkaReceiver[
    +  K: ClassTag,
    +  V: ClassTag,
    +  U <: Decoder[_]: ClassTag,
    +  T <: Decoder[_]: ClassTag](
    +    kafkaParams: Map[String, String],
    +    topics: Map[String, Int],
    +    storageLevel: StorageLevel)
    +    extends Receiver[Any](storageLevel) with Logging {
    +
    +  /** High level consumer to connect to Kafka. */
    +  private var consumerConnector: ConsumerConnector = null
    +
    +  /** zkClient to connect to Zookeeper to commit the offsets. */
    +  private var zkClient: ZkClient = null
    +
    +  private val groupId = kafkaParams("group.id")
    --- End diff --
    
    OK, I will.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-62678538
  
      [Test build #23253 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/23253/consoleFull) for   PR 2991 at commit [`e501b3c`](https://github.com/apache/spark/commit/e501b3ce04d43f514676c953799ba91963415227).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-62834307
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23297/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-62530387
  
    I have merged #1420 but github has not synced yet. If it does not update soon, you can use the master branch of the real apache git repo to get the latest changes.
    
    https://git-wip-us.apache.org/repos/asf?p=spark.git;a=summary 
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-63153874
  
    Yes, I will, thanks a lot, greatly appreciate your help.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2991#discussion_r20247390
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala ---
    @@ -0,0 +1,251 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka
    +
    +import java.util.Properties
    +import java.util.concurrent.ConcurrentHashMap
    +
    +import scala.collection.Map
    +import scala.collection.mutable
    +import scala.reflect.{classTag, ClassTag}
    +
    +import kafka.common.TopicAndPartition
    +import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, KafkaStream}
    +import kafka.serializer.Decoder
    +import kafka.utils.{ZkUtils, ZKGroupTopicDirs, ZKStringSerializer, VerifiableProperties}
    +import org.I0Itec.zkclient.ZkClient
    +
    +import org.apache.spark.{SparkEnv, Logging}
    +import org.apache.spark.storage.{StreamBlockId, StorageLevel}
    +import org.apache.spark.streaming.receiver.{BlockGeneratorListener, BlockGenerator, Receiver}
    +import org.apache.spark.util.Utils
    +
    +
    +/**
    + * ReliableKafkaReceiver offers the ability to reliably store data into BlockManager without loss.
    + * It is turned off by default and will be enabled when
    + * spark.streaming.receiver.writeAheadLog.enable is true. The difference compared to KafkaReceiver
    + * is that this receiver manages topic-partition/offset itself and updates the offset information
    + * after data is reliably stored as write-ahead log. Offsets will only be updated when data is
    + * reliably stored, so the potential data loss problem of KafkaReceiver can be eliminated.
    + *
    + * Note: ReliableKafkaReceiver will set auto.commit.enable to false to turn off automatic offset
    + * commit mechanism in Kafka consumer. So setting this configuration manually within kafkaParams
    + * will not take effect.
    + */
    +private[streaming]
    +class ReliableKafkaReceiver[
    +  K: ClassTag,
    +  V: ClassTag,
    +  U <: Decoder[_]: ClassTag,
    +  T <: Decoder[_]: ClassTag](
    +    kafkaParams: Map[String, String],
    +    topics: Map[String, Int],
    +    storageLevel: StorageLevel)
    +    extends Receiver[(K, V)](storageLevel) with Logging {
    +
    +  private val groupId = kafkaParams("group.id")
    +
    +  private val AUTO_OFFSET_COMMIT = "auto.commit.enable"
    +
    +  private def conf() = SparkEnv.get.conf
    +
    +  /** High level consumer to connect to Kafka. */
    +  private var consumerConnector: ConsumerConnector = null
    +
    +  /** zkClient to connect to Zookeeper to commit the offsets. */
    +  private var zkClient: ZkClient = null
    +
    +  /**
    +   * A HashMap to manage the offset for each topic/partition, this HashMap is called in
    +   * synchronized block, so mutable HashMap will not meet concurrency issue.
    +   */
    +  private var topicPartitionOffsetMap: mutable.HashMap[TopicAndPartition, Long] = null
    +
    +  /** A concurrent HashMap to store the stream block id and related offset snapshot. */
    +  private var blockOffsetMap: ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]] = null
    +
    +  /**
    +   * Manage the BlockGenerator in receiver itself for better managing block store and offset
    +   * commit.
    +   */
    +  private var blockGenerator: BlockGenerator = null
    +
    +  /** Kafka offsets checkpoint listener to register into BlockGenerator for offsets checkpoint. */
    +  private final class OffsetCheckpointListener extends BlockGeneratorListener {
    +
    +    override def onGenerateBlock(blockId: StreamBlockId): Unit = {
    +      // Get a snapshot of current offset map and store with related block id. Since this hook
    +      // function is called in synchronized block, so we can get the snapshot without explicit lock.
    +      val offsetSnapshot = topicPartitionOffsetMap.toMap
    --- End diff --
    
    Let me try to modify the code to give you a sense of the what I think is a better design. Only then will I know whether what I am thinking makes sense or not.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2991#discussion_r20272077
  
    --- Diff: external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka
    +
    +import java.io.File
    +
    +import scala.collection.mutable
    +import scala.concurrent.duration._
    +import scala.language.postfixOps
    +import scala.util.Random
    +
    +import kafka.serializer.StringDecoder
    +import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
    +import org.scalatest.concurrent.Eventually
    +
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.streaming.StreamingContext
    +import org.apache.spark.util.Utils
    +
    +class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with Eventually {
    +  val topic = "topic"
    +  val data = Map("a" -> 10, "b" -> 10, "c" -> 10)
    +  var groupId: String = _
    +  var kafkaParams: Map[String, String] = _
    +
    +  before {
    +    beforeFunction()  // call this first to start ZK and Kafka
    +    groupId = s"test-consumer-${Random.nextInt(10000)}"
    +    kafkaParams = Map(
    +      "zookeeper.connect" -> zkAddress,
    +      "group.id" -> groupId,
    +      "auto.offset.reset" -> "smallest"
    +    )
    +  }
    +
    +  after {
    +    afterFunction()
    +  }
    +
    +  test("Reliable Kafka input stream") {
    +    sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
    +    ssc = new StreamingContext(sparkConf, batchDuration)
    +    val checkpointDir = s"${System.getProperty("java.io.tmpdir", "/tmp")}/" +
    +      s"test-checkpoint${Random.nextInt(10000)}"
    +    Utils.registerShutdownDeleteDir(new File(checkpointDir))
    +    ssc.checkpoint(checkpointDir)
    +    createTopic(topic)
    +    produceAndSendMessage(topic, data)
    +
    +    val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
    +      ssc,
    +      kafkaParams,
    +      Map(topic -> 1),
    +      StorageLevel.MEMORY_ONLY)
    +    val result = new mutable.HashMap[String, Long]()
    +    stream.map { case (k, v) => v }.foreachRDD { r =>
    +        val ret = r.collect()
    +        ret.foreach { v =>
    +          val count = result.getOrElseUpdate(v, 0) + 1
    +          result.put(v, count)
    +        }
    +      }
    +    ssc.start()
    +    eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
    +      // A basic process verification for ReliableKafkaReceiver.
    +      // Verify whether received message number is equal to the sent message number.
    +      assert(data.size === result.size)
    +      // Verify whether each message is the same as the data to be verified.
    +      data.keys.foreach { k => assert(data(k) === result(k).toInt) }
    +    }
    +    ssc.stop()
    +  }
    +/*
    --- End diff --
    
    I will do it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-63118606
  
      [Test build #23380 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/23380/consoleFull) for   PR 2991 at commit [`5461f1c`](https://github.com/apache/spark/commit/5461f1c43b0e98aa7b583f14569eefd833b19df0).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-63139795
  
    Alright, I am merging this!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2991#discussion_r19993764
  
    --- Diff: external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala ---
    @@ -0,0 +1,162 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka
    +
    +import java.io.File
    +
    +import scala.collection.mutable
    +
    +import kafka.serializer.StringDecoder
    +import kafka.utils.{ZkUtils, ZKGroupTopicDirs}
    +
    +import org.apache.spark.SparkConf
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.streaming.StreamingContext
    +import org.apache.spark.util.Utils
    +
    +class ReliableKafkaStreamSuite extends KafkaStreamSuite {
    --- End diff --
    
    Throughput the testsuites, in each unit test, could you add comments describing what each step is doing? If can be at the granularity of `// Do this and then verify that`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-61196953
  
    OK, will do :).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-60875582
  
      [Test build #22425 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22425/consoleFull) for   PR 2991 at commit [`5cc4cb1`](https://github.com/apache/spark/commit/5cc4cb198662cb35008d9a2e46e320f75ce35a71).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-61930141
  
    Hi @tdas , would you mind reviewing this code? Thanks a lot.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-63025874
  
    Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-62876618
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23305/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-62352709
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23142/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-62993250
  
    OK, I will, thanks a lot, greatly appreciated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2991#discussion_r20247329
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala ---
    @@ -0,0 +1,251 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka
    +
    +import java.util.Properties
    +import java.util.concurrent.ConcurrentHashMap
    +
    +import scala.collection.Map
    +import scala.collection.mutable
    +import scala.reflect.{classTag, ClassTag}
    +
    +import kafka.common.TopicAndPartition
    +import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, KafkaStream}
    +import kafka.serializer.Decoder
    +import kafka.utils.{ZkUtils, ZKGroupTopicDirs, ZKStringSerializer, VerifiableProperties}
    +import org.I0Itec.zkclient.ZkClient
    +
    +import org.apache.spark.{SparkEnv, Logging}
    +import org.apache.spark.storage.{StreamBlockId, StorageLevel}
    +import org.apache.spark.streaming.receiver.{BlockGeneratorListener, BlockGenerator, Receiver}
    +import org.apache.spark.util.Utils
    +
    +
    +/**
    + * ReliableKafkaReceiver offers the ability to reliably store data into BlockManager without loss.
    + * It is turned off by default and will be enabled when
    + * spark.streaming.receiver.writeAheadLog.enable is true. The difference compared to KafkaReceiver
    + * is that this receiver manages topic-partition/offset itself and updates the offset information
    + * after data is reliably stored as write-ahead log. Offsets will only be updated when data is
    + * reliably stored, so the potential data loss problem of KafkaReceiver can be eliminated.
    + *
    + * Note: ReliableKafkaReceiver will set auto.commit.enable to false to turn off automatic offset
    + * commit mechanism in Kafka consumer. So setting this configuration manually within kafkaParams
    + * will not take effect.
    + */
    +private[streaming]
    +class ReliableKafkaReceiver[
    +  K: ClassTag,
    +  V: ClassTag,
    +  U <: Decoder[_]: ClassTag,
    +  T <: Decoder[_]: ClassTag](
    +    kafkaParams: Map[String, String],
    +    topics: Map[String, Int],
    +    storageLevel: StorageLevel)
    +    extends Receiver[(K, V)](storageLevel) with Logging {
    +
    +  private val groupId = kafkaParams("group.id")
    +
    +  private val AUTO_OFFSET_COMMIT = "auto.commit.enable"
    +
    +  private def conf() = SparkEnv.get.conf
    +
    +  /** High level consumer to connect to Kafka. */
    +  private var consumerConnector: ConsumerConnector = null
    +
    +  /** zkClient to connect to Zookeeper to commit the offsets. */
    +  private var zkClient: ZkClient = null
    +
    +  /**
    +   * A HashMap to manage the offset for each topic/partition, this HashMap is called in
    +   * synchronized block, so mutable HashMap will not meet concurrency issue.
    +   */
    +  private var topicPartitionOffsetMap: mutable.HashMap[TopicAndPartition, Long] = null
    +
    +  /** A concurrent HashMap to store the stream block id and related offset snapshot. */
    +  private var blockOffsetMap: ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]] = null
    +
    +  /**
    +   * Manage the BlockGenerator in receiver itself for better managing block store and offset
    +   * commit.
    +   */
    +  private var blockGenerator: BlockGenerator = null
    +
    +  /** Kafka offsets checkpoint listener to register into BlockGenerator for offsets checkpoint. */
    +  private final class OffsetCheckpointListener extends BlockGeneratorListener {
    +
    +    override def onGenerateBlock(blockId: StreamBlockId): Unit = {
    +      // Get a snapshot of current offset map and store with related block id. Since this hook
    +      // function is called in synchronized block, so we can get the snapshot without explicit lock.
    +      val offsetSnapshot = topicPartitionOffsetMap.toMap
    --- End diff --
    
    Aaaah, I get it but this is soooo non-intuitive. That synchronized is no in this file. This is a hard logic to understand. Maybe we should separate out the locked functionality into two locks. The lock of the BlockGenerator is used to replace the ArrayBuffer in the BlockGenerator, and in the ReliableKafkaReceiver, another lock is used to update the offsets. Even though there are two locks, thats a cleaner design as there is a clean separation of functionality in the locks - the BlockGenerator lock does not need to be concerned with the Receiver lock, and the Receiver lock should not have to worry about locks in BlockGenerator (as long as deadlock is avoided). To avoid deadlocks, the callbacks should not be called from within synchornized sections in the BlockGenerator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2991#discussion_r20129965
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala ---
    @@ -0,0 +1,241 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka
    +
    +import java.util.Properties
    +import java.util.concurrent.ConcurrentHashMap
    +
    +import scala.collection.Map
    +import scala.collection.mutable
    +import scala.reflect.{classTag, ClassTag}
    +
    +import kafka.common.TopicAndPartition
    +import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, KafkaStream}
    +import kafka.serializer.Decoder
    +import kafka.utils.{ZkUtils, ZKGroupTopicDirs, ZKStringSerializer, VerifiableProperties}
    +import org.I0Itec.zkclient.ZkClient
    +
    +import org.apache.spark.{SparkEnv, Logging}
    +import org.apache.spark.storage.{StreamBlockId, StorageLevel}
    +import org.apache.spark.streaming.receiver.{BlockGeneratorListener, BlockGenerator, Receiver}
    +import org.apache.spark.util.Utils
    +
    +private[streaming]
    +class ReliableKafkaReceiver[
    +  K: ClassTag,
    +  V: ClassTag,
    +  U <: Decoder[_]: ClassTag,
    +  T <: Decoder[_]: ClassTag](
    +    kafkaParams: Map[String, String],
    +    topics: Map[String, Int],
    +    storageLevel: StorageLevel)
    +    extends Receiver[Any](storageLevel) with Logging {
    +
    +  /** High level consumer to connect to Kafka. */
    +  private var consumerConnector: ConsumerConnector = null
    +
    +  /** zkClient to connect to Zookeeper to commit the offsets. */
    +  private var zkClient: ZkClient = null
    +
    +  private val groupId = kafkaParams("group.id")
    +
    +  private def conf() = SparkEnv.get.conf
    +
    +  private val AUTO_OFFSET_COMMIT = "auto.commit.enable"
    +
    +  /**
    --- End diff --
    
    Well, on second thought, these comments could argued in both ways. They could be considered as inline comments or  scala docs. They are fine to be as they are.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2991#discussion_r20196611
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala ---
    @@ -53,112 +46,17 @@ class KafkaInputDStream[
         @transient ssc_ : StreamingContext,
         kafkaParams: Map[String, String],
         topics: Map[String, Int],
    +    reliableReceiveEnabled: Boolean,
         storageLevel: StorageLevel
       ) extends ReceiverInputDStream[(K, V)](ssc_) with Logging {
     
       def getReceiver(): Receiver[(K, V)] = {
    -    new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
    +    if (!reliableReceiveEnabled) {
    +      new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
    +        .asInstanceOf[Receiver[(K, V)]]
    +    } else {
    +      new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
             .asInstanceOf[Receiver[(K, V)]]
    --- End diff --
    
    If you make the ReliableKafkaReceiver extend Receiver[(K, V)], then this `asInstanceOf` is not necessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2991#discussion_r20003429
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala ---
    @@ -0,0 +1,212 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka
    +
    +import java.util.Properties
    +import java.util.concurrent.{ConcurrentHashMap, Executors}
    +
    +import scala.collection.Map
    +import scala.collection.mutable
    +import scala.reflect.{classTag, ClassTag}
    +
    +import kafka.common.TopicAndPartition
    +import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector}
    +import kafka.serializer.Decoder
    +import kafka.utils.{ZkUtils, ZKGroupTopicDirs, ZKStringSerializer, VerifiableProperties}
    +import org.I0Itec.zkclient.ZkClient
    +
    +import org.apache.spark.{SparkEnv, Logging}
    +import org.apache.spark.storage.{StreamBlockId, StorageLevel}
    +import org.apache.spark.streaming.receiver.{BlockGeneratorListener, BlockGenerator, Receiver}
    +
    +private[streaming]
    +class ReliableKafkaReceiver[
    +  K: ClassTag,
    +  V: ClassTag,
    +  U <: Decoder[_]: ClassTag,
    +  T <: Decoder[_]: ClassTag](
    +    kafkaParams: Map[String, String],
    +    topics: Map[String, Int],
    +    storageLevel: StorageLevel)
    +    extends Receiver[Any](storageLevel) with Logging {
    +
    +  /** High level consumer to connect to Kafka */
    +  private var consumerConnector: ConsumerConnector = null
    +
    +  /** zkClient to connect to Zookeeper to commit the offsets */
    +  private var zkClient: ZkClient = null
    +
    +  private val groupId = kafkaParams("group.id")
    +
    +  private lazy val env = SparkEnv.get
    +
    +  private val AUTO_OFFSET_COMMIT = "auto.commit.enable"
    +
    +  /** A HashMap to manage the offset for each topic/partition, this HashMap is called in
    +    * synchronized block, so mutable HashMap will not meet concurrency issue */
    +  private lazy val topicPartitionOffsetMap = new mutable.HashMap[TopicAndPartition, Long]
    +
    +  /** A concurrent HashMap to store the stream block id and related offset snapshot */
    +  private lazy val blockOffsetMap =
    +    new ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]]
    +
    +  private lazy val blockGeneratorListener = new BlockGeneratorListener {
    --- End diff --
    
    Good to define the named class for this generator listener.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-63002988
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23345/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-60887196
  
      [Test build #22438 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22438/consoleFull) for   PR 2991 at commit [`09d57c5`](https://github.com/apache/spark/commit/09d57c5270cf876e52d49db702e2330c2b6a6e10).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-62675072
  
      [Test build #23245 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/23245/consoleFull) for   PR 2991 at commit [`ea873e4`](https://github.com/apache/spark/commit/ea873e424424747c8807adbc0c217a5fbf80e060).
     * This patch **fails MiMa tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-62850049
  
    Hi TD, I've made some changes:
    
    1. small code styles and comment changes.
    2. Re-enable JavaKafkaStreamSuite, previous change makes Java related test ignore, re-enabled it.
    3. Uncomment the unit test.
    
    Still have synchronized issue, we should figure out a better way I think.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2991#discussion_r19993595
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala ---
    @@ -0,0 +1,212 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka
    +
    +import java.util.Properties
    +import java.util.concurrent.{ConcurrentHashMap, Executors}
    +
    +import scala.collection.Map
    +import scala.collection.mutable
    +import scala.reflect.{classTag, ClassTag}
    +
    +import kafka.common.TopicAndPartition
    +import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector}
    +import kafka.serializer.Decoder
    +import kafka.utils.{ZkUtils, ZKGroupTopicDirs, ZKStringSerializer, VerifiableProperties}
    +import org.I0Itec.zkclient.ZkClient
    +
    +import org.apache.spark.{SparkEnv, Logging}
    +import org.apache.spark.storage.{StreamBlockId, StorageLevel}
    +import org.apache.spark.streaming.receiver.{BlockGeneratorListener, BlockGenerator, Receiver}
    +
    +private[streaming]
    +class ReliableKafkaReceiver[
    +  K: ClassTag,
    +  V: ClassTag,
    +  U <: Decoder[_]: ClassTag,
    +  T <: Decoder[_]: ClassTag](
    +    kafkaParams: Map[String, String],
    +    topics: Map[String, Int],
    +    storageLevel: StorageLevel)
    +    extends Receiver[Any](storageLevel) with Logging {
    +
    +  /** High level consumer to connect to Kafka */
    +  private var consumerConnector: ConsumerConnector = null
    +
    +  /** zkClient to connect to Zookeeper to commit the offsets */
    +  private var zkClient: ZkClient = null
    +
    +  private val groupId = kafkaParams("group.id")
    +
    +  private lazy val env = SparkEnv.get
    +
    +  private val AUTO_OFFSET_COMMIT = "auto.commit.enable"
    +
    +  /** A HashMap to manage the offset for each topic/partition, this HashMap is called in
    +    * synchronized block, so mutable HashMap will not meet concurrency issue */
    +  private lazy val topicPartitionOffsetMap = new mutable.HashMap[TopicAndPartition, Long]
    +
    +  /** A concurrent HashMap to store the stream block id and related offset snapshot */
    +  private lazy val blockOffsetMap =
    +    new ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]]
    +
    +  private lazy val blockGeneratorListener = new BlockGeneratorListener {
    +    override def onStoreData(data: Any, metadata: Any): Unit = {
    +      if (metadata != null) {
    +        val kafkaMetadata = metadata.asInstanceOf[(TopicAndPartition, Long)]
    +        topicPartitionOffsetMap.put(kafkaMetadata._1, kafkaMetadata._2)
    +      }
    +    }
    +
    +    override def onGenerateBlock(blockId: StreamBlockId): Unit = {
    +      // Get a snapshot of current offset map and store with related block id. Since this hook
    +      // function is called in synchronized block, so we can get the snapshot without explicit lock.
    +      val offsetSnapshot = topicPartitionOffsetMap.toMap
    +      blockOffsetMap.put(blockId, offsetSnapshot)
    +      topicPartitionOffsetMap.clear()
    +    }
    +
    +    override def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
    +      store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[Any]])
    +
    +      // Commit and remove the related offsets.
    +      Option(blockOffsetMap.get(blockId)).foreach { offsetMap =>
    +        commitOffset(offsetMap)
    +      }
    +      blockOffsetMap.remove(blockId)
    +    }
    +
    +    override def onError(message: String, throwable: Throwable): Unit = {
    +      reportError(message, throwable)
    +    }
    +  }
    +
    +  /** Manage the BlockGenerator in receiver itself for better managing block store and offset
    +    * commit */
    +  private var blockGenerator: BlockGenerator = null
    +
    +  override def onStop(): Unit = {
    +    if (consumerConnector != null) {
    +      consumerConnector.shutdown()
    +      consumerConnector = null
    +    }
    +
    +    if (zkClient != null) {
    +      zkClient.close()
    +      zkClient = null
    +    }
    +
    +    blockGenerator.stop()
    +  }
    +
    +  override def onStart(): Unit = {
    +    logInfo(s"Starting Kafka Consumer Stream with group: $groupId")
    +
    +    blockGenerator = new BlockGenerator(blockGeneratorListener, streamId, env.conf)
    +
    +    if (kafkaParams.contains(AUTO_OFFSET_COMMIT) && kafkaParams(AUTO_OFFSET_COMMIT) == "true") {
    +      logWarning(s"$AUTO_OFFSET_COMMIT should be set to false in ReliableKafkaReceiver, " +
    +        "otherwise we will manually set it to false to turn off auto offset commit in Kafka")
    +    }
    +
    +    val props = new Properties()
    +    kafkaParams.foreach(param => props.put(param._1, param._2))
    +    // Manually set "auto.commit.enable" to "false" no matter user explicitly set it to true,
    +    // we have to make sure this property is set to false to turn off auto commit mechanism in
    +    // Kafka.
    +    props.setProperty(AUTO_OFFSET_COMMIT, "false")
    +
    +    val consumerConfig = new ConsumerConfig(props)
    +
    +    assert(consumerConfig.autoCommitEnable == false)
    +
    +    logInfo(s"Connecting to Zookeeper: ${consumerConfig.zkConnect}")
    +    consumerConnector = Consumer.create(consumerConfig)
    +    logInfo(s"Connected to Zookeeper: ${consumerConfig.zkConnect}")
    +
    +    zkClient = new ZkClient(consumerConfig.zkConnect, consumerConfig.zkSessionTimeoutMs,
    +      consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer)
    +
    +    // start BlockGenerator
    +    blockGenerator.start()
    +
    +    val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
    +      .newInstance(consumerConfig.props)
    +      .asInstanceOf[Decoder[K]]
    +
    +    val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
    +      .newInstance(consumerConfig.props)
    +      .asInstanceOf[Decoder[V]]
    +
    +    val topicMessageStreams = consumerConnector.createMessageStreams(
    +      topics, keyDecoder, valueDecoder)
    +
    +    val executorPool = Executors.newFixedThreadPool(topics.values.sum)
    --- End diff --
    
    Can you use `o.a.spark.util.Utils.newDaemonFixedThreadPool()` for this so that the thread's get nice names (easier to debug in jstack)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2991#discussion_r20129324
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala ---
    @@ -0,0 +1,241 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka
    +
    +import java.util.Properties
    +import java.util.concurrent.ConcurrentHashMap
    +
    +import scala.collection.Map
    +import scala.collection.mutable
    +import scala.reflect.{classTag, ClassTag}
    +
    +import kafka.common.TopicAndPartition
    +import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, KafkaStream}
    +import kafka.serializer.Decoder
    +import kafka.utils.{ZkUtils, ZKGroupTopicDirs, ZKStringSerializer, VerifiableProperties}
    +import org.I0Itec.zkclient.ZkClient
    +
    +import org.apache.spark.{SparkEnv, Logging}
    +import org.apache.spark.storage.{StreamBlockId, StorageLevel}
    +import org.apache.spark.streaming.receiver.{BlockGeneratorListener, BlockGenerator, Receiver}
    +import org.apache.spark.util.Utils
    +
    +private[streaming]
    +class ReliableKafkaReceiver[
    +  K: ClassTag,
    +  V: ClassTag,
    +  U <: Decoder[_]: ClassTag,
    +  T <: Decoder[_]: ClassTag](
    +    kafkaParams: Map[String, String],
    +    topics: Map[String, Int],
    +    storageLevel: StorageLevel)
    +    extends Receiver[Any](storageLevel) with Logging {
    +
    +  /** High level consumer to connect to Kafka. */
    +  private var consumerConnector: ConsumerConnector = null
    +
    +  /** zkClient to connect to Zookeeper to commit the offsets. */
    +  private var zkClient: ZkClient = null
    +
    +  private val groupId = kafkaParams("group.id")
    --- End diff --
    
    Nit: Can please co-locate the vals and the vars? All the vals first and then all the vars.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-61224748
  
      [Test build #22595 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22595/consoleFull) for   PR 2991 at commit [`48752b2`](https://github.com/apache/spark/commit/48752b254533a377fd95e47c62d8b6c0133f7fbf).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-62834197
  
      [Test build #23297 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/23297/consoleFull) for   PR 2991 at commit [`9f636b3`](https://github.com/apache/spark/commit/9f636b35cfd19d510219eb94e50077914ada158a).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-63023457
  
      [Test build #23352 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/23352/consoleFull) for   PR 2991 at commit [`5461f1c`](https://github.com/apache/spark/commit/5461f1c43b0e98aa7b583f14569eefd833b19df0).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2991#discussion_r20272068
  
    --- Diff: project/MimaExcludes.scala ---
    @@ -85,6 +85,10 @@ object MimaExcludes {
                   "org.apache.hadoop.mapred.SparkHadoopMapRedUtil"),
                 ProblemFilters.exclude[MissingTypesProblem](
                   "org.apache.spark.rdd.PairRDDFunctions")
    +          ) ++ Seq(
    +            // SPARK-4062
    +            ProblemFilters.exclude[MissingMethodProblem](
    --- End diff --
    
    I'm not sure, the Mima checker complains this, so I added here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-61220739
  
      [Test build #22595 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22595/consoleFull) for   PR 2991 at commit [`48752b2`](https://github.com/apache/spark/commit/48752b254533a377fd95e47c62d8b6c0133f7fbf).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-60892857
  
      [Test build #22446 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22446/consoleFull) for   PR 2991 at commit [`bb57d05`](https://github.com/apache/spark/commit/bb57d05b2e3579c9c3e59429918082937a99e87f).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2991#discussion_r20130143
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaReceiver.scala ---
    @@ -0,0 +1,135 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka
    +
    +import java.util.Properties
    +import java.util.concurrent.Executors
    +
    +import scala.collection.Map
    +import scala.reflect.{classTag, ClassTag}
    +
    +import kafka.consumer._
    +import kafka.serializer.Decoder
    +import kafka.utils.VerifiableProperties
    +import kafka.utils.ZKStringSerializer
    +import org.I0Itec.zkclient._
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.streaming.receiver.Receiver
    +
    +private[streaming]
    +class KafkaReceiver[
    +  K: ClassTag,
    +  V: ClassTag,
    +  U <: Decoder[_]: ClassTag,
    +  T <: Decoder[_]: ClassTag](
    +    kafkaParams: Map[String, String],
    +    topics: Map[String, Int],
    +    storageLevel: StorageLevel
    +  ) extends Receiver[Any](storageLevel) with Logging {
    +
    +  // Connection to Kafka
    +  var consumerConnector: ConsumerConnector = null
    +
    +  def onStop() {
    +    if (consumerConnector != null) {
    +      consumerConnector.shutdown()
    +    }
    +  }
    +
    +  def onStart() {
    +
    +    logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id"))
    +
    +    // Kafka connection properties
    +    val props = new Properties()
    +    kafkaParams.foreach(param => props.put(param._1, param._2))
    +
    +    val zkConnect = kafkaParams("zookeeper.connect")
    +    // Create the connection to the cluster
    +    logInfo("Connecting to Zookeeper: " + zkConnect)
    +    val consumerConfig = new ConsumerConfig(props)
    +    consumerConnector = Consumer.create(consumerConfig)
    +    logInfo("Connected to " + zkConnect)
    +
    +    // When auto.offset.reset is defined, it is our responsibility to try and whack the
    +    // consumer group zk node.
    +    if (kafkaParams.contains("auto.offset.reset")) {
    +      tryZookeeperConsumerGroupCleanup(zkConnect, kafkaParams("group.id"))
    +    }
    +
    +    val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
    +      .newInstance(consumerConfig.props)
    +      .asInstanceOf[Decoder[K]]
    +    val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
    +      .newInstance(consumerConfig.props)
    +      .asInstanceOf[Decoder[V]]
    +
    +    // Create Threads for each Topic/Message Stream we are listening
    --- End diff --
    
    "Threads" => "threads"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2991#discussion_r19993737
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala ---
    @@ -0,0 +1,212 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka
    +
    +import java.util.Properties
    +import java.util.concurrent.{ConcurrentHashMap, Executors}
    +
    +import scala.collection.Map
    +import scala.collection.mutable
    +import scala.reflect.{classTag, ClassTag}
    +
    +import kafka.common.TopicAndPartition
    +import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector}
    +import kafka.serializer.Decoder
    +import kafka.utils.{ZkUtils, ZKGroupTopicDirs, ZKStringSerializer, VerifiableProperties}
    +import org.I0Itec.zkclient.ZkClient
    +
    +import org.apache.spark.{SparkEnv, Logging}
    +import org.apache.spark.storage.{StreamBlockId, StorageLevel}
    +import org.apache.spark.streaming.receiver.{BlockGeneratorListener, BlockGenerator, Receiver}
    +
    +private[streaming]
    +class ReliableKafkaReceiver[
    +  K: ClassTag,
    +  V: ClassTag,
    +  U <: Decoder[_]: ClassTag,
    +  T <: Decoder[_]: ClassTag](
    +    kafkaParams: Map[String, String],
    +    topics: Map[String, Int],
    +    storageLevel: StorageLevel)
    +    extends Receiver[Any](storageLevel) with Logging {
    +
    +  /** High level consumer to connect to Kafka */
    +  private var consumerConnector: ConsumerConnector = null
    +
    +  /** zkClient to connect to Zookeeper to commit the offsets */
    +  private var zkClient: ZkClient = null
    +
    +  private val groupId = kafkaParams("group.id")
    +
    +  private lazy val env = SparkEnv.get
    +
    +  private val AUTO_OFFSET_COMMIT = "auto.commit.enable"
    +
    +  /** A HashMap to manage the offset for each topic/partition, this HashMap is called in
    +    * synchronized block, so mutable HashMap will not meet concurrency issue */
    +  private lazy val topicPartitionOffsetMap = new mutable.HashMap[TopicAndPartition, Long]
    +
    +  /** A concurrent HashMap to store the stream block id and related offset snapshot */
    +  private lazy val blockOffsetMap =
    +    new ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]]
    +
    +  private lazy val blockGeneratorListener = new BlockGeneratorListener {
    +    override def onStoreData(data: Any, metadata: Any): Unit = {
    +      if (metadata != null) {
    +        val kafkaMetadata = metadata.asInstanceOf[(TopicAndPartition, Long)]
    +        topicPartitionOffsetMap.put(kafkaMetadata._1, kafkaMetadata._2)
    +      }
    +    }
    +
    +    override def onGenerateBlock(blockId: StreamBlockId): Unit = {
    +      // Get a snapshot of current offset map and store with related block id. Since this hook
    +      // function is called in synchronized block, so we can get the snapshot without explicit lock.
    +      val offsetSnapshot = topicPartitionOffsetMap.toMap
    +      blockOffsetMap.put(blockId, offsetSnapshot)
    +      topicPartitionOffsetMap.clear()
    +    }
    +
    +    override def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
    +      store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[Any]])
    +
    +      // Commit and remove the related offsets.
    +      Option(blockOffsetMap.get(blockId)).foreach { offsetMap =>
    +        commitOffset(offsetMap)
    +      }
    +      blockOffsetMap.remove(blockId)
    +    }
    +
    +    override def onError(message: String, throwable: Throwable): Unit = {
    +      reportError(message, throwable)
    +    }
    +  }
    +
    +  /** Manage the BlockGenerator in receiver itself for better managing block store and offset
    +    * commit */
    +  private var blockGenerator: BlockGenerator = null
    +
    +  override def onStop(): Unit = {
    +    if (consumerConnector != null) {
    +      consumerConnector.shutdown()
    +      consumerConnector = null
    +    }
    +
    +    if (zkClient != null) {
    +      zkClient.close()
    +      zkClient = null
    +    }
    +
    +    blockGenerator.stop()
    +  }
    +
    +  override def onStart(): Unit = {
    +    logInfo(s"Starting Kafka Consumer Stream with group: $groupId")
    +
    +    blockGenerator = new BlockGenerator(blockGeneratorListener, streamId, env.conf)
    +
    +    if (kafkaParams.contains(AUTO_OFFSET_COMMIT) && kafkaParams(AUTO_OFFSET_COMMIT) == "true") {
    +      logWarning(s"$AUTO_OFFSET_COMMIT should be set to false in ReliableKafkaReceiver, " +
    +        "otherwise we will manually set it to false to turn off auto offset commit in Kafka")
    +    }
    +
    +    val props = new Properties()
    +    kafkaParams.foreach(param => props.put(param._1, param._2))
    +    // Manually set "auto.commit.enable" to "false" no matter user explicitly set it to true,
    +    // we have to make sure this property is set to false to turn off auto commit mechanism in
    +    // Kafka.
    +    props.setProperty(AUTO_OFFSET_COMMIT, "false")
    +
    +    val consumerConfig = new ConsumerConfig(props)
    +
    +    assert(consumerConfig.autoCommitEnable == false)
    +
    +    logInfo(s"Connecting to Zookeeper: ${consumerConfig.zkConnect}")
    +    consumerConnector = Consumer.create(consumerConfig)
    +    logInfo(s"Connected to Zookeeper: ${consumerConfig.zkConnect}")
    +
    +    zkClient = new ZkClient(consumerConfig.zkConnect, consumerConfig.zkSessionTimeoutMs,
    +      consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer)
    +
    +    // start BlockGenerator
    +    blockGenerator.start()
    +
    +    val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
    +      .newInstance(consumerConfig.props)
    +      .asInstanceOf[Decoder[K]]
    +
    +    val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
    +      .newInstance(consumerConfig.props)
    +      .asInstanceOf[Decoder[V]]
    +
    +    val topicMessageStreams = consumerConnector.createMessageStreams(
    +      topics, keyDecoder, valueDecoder)
    +
    +    val executorPool = Executors.newFixedThreadPool(topics.values.sum)
    +
    +    try {
    +      topicMessageStreams.values.foreach { streams =>
    +        streams.foreach { stream =>
    +          executorPool.submit(new Runnable {
    +            override def run(): Unit = {
    +              logInfo(s"Starting message process thread ${Thread.currentThread().getId}.")
    +              try {
    +                for (msgAndMetadata <- stream) {
    +                  val topicAndPartition = TopicAndPartition(
    +                    msgAndMetadata.topic, msgAndMetadata.partition)
    +                  val metadata = (topicAndPartition, msgAndMetadata.offset)
    +
    +                  blockGenerator += ((msgAndMetadata.key, msgAndMetadata.message), metadata)
    +                }
    +              } catch {
    +                case e: Throwable => logError("Error handling message; existing", e)
    +              }
    +            }
    +          })
    +        }
    +      }
    +    } finally {
    +      executorPool.shutdown()
    +    }
    +  }
    +
    +  /**
    +   * Commit the offset of Kafka's topic/partition, the commit mechanism follow Kafka 0.8.x's
    +   * metadata schema in Zookeeper.
    +   */
    +  private def commitOffset(offsetMap: Map[TopicAndPartition, Long]): Unit = {
    +    if (zkClient == null) {
    +      logError(s"zkClient $zkClient should be initialized at started")
    --- End diff --
    
    Shouldnt this throw an error and the receiver be stopped? If offset cannot be committed isnt it a wrong execution that should not continue at all? If that is the case, then please call `stop(errorMessage)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-62834301
  
      [Test build #23297 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/23297/consoleFull) for   PR 2991 at commit [`9f636b3`](https://github.com/apache/spark/commit/9f636b35cfd19d510219eb94e50077914ada158a).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2991#discussion_r20265529
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala ---
    @@ -0,0 +1,251 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka
    +
    +import java.util.Properties
    +import java.util.concurrent.ConcurrentHashMap
    +
    +import scala.collection.Map
    +import scala.collection.mutable
    +import scala.reflect.{classTag, ClassTag}
    +
    +import kafka.common.TopicAndPartition
    +import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, KafkaStream}
    +import kafka.serializer.Decoder
    +import kafka.utils.{ZkUtils, ZKGroupTopicDirs, ZKStringSerializer, VerifiableProperties}
    +import org.I0Itec.zkclient.ZkClient
    +
    +import org.apache.spark.{SparkEnv, Logging}
    +import org.apache.spark.storage.{StreamBlockId, StorageLevel}
    +import org.apache.spark.streaming.receiver.{BlockGeneratorListener, BlockGenerator, Receiver}
    +import org.apache.spark.util.Utils
    +
    +
    +/**
    + * ReliableKafkaReceiver offers the ability to reliably store data into BlockManager without loss.
    + * It is turned off by default and will be enabled when
    + * spark.streaming.receiver.writeAheadLog.enable is true. The difference compared to KafkaReceiver
    + * is that this receiver manages topic-partition/offset itself and updates the offset information
    + * after data is reliably stored as write-ahead log. Offsets will only be updated when data is
    + * reliably stored, so the potential data loss problem of KafkaReceiver can be eliminated.
    + *
    + * Note: ReliableKafkaReceiver will set auto.commit.enable to false to turn off automatic offset
    + * commit mechanism in Kafka consumer. So setting this configuration manually within kafkaParams
    + * will not take effect.
    + */
    +private[streaming]
    +class ReliableKafkaReceiver[
    +  K: ClassTag,
    +  V: ClassTag,
    +  U <: Decoder[_]: ClassTag,
    +  T <: Decoder[_]: ClassTag](
    +    kafkaParams: Map[String, String],
    +    topics: Map[String, Int],
    +    storageLevel: StorageLevel)
    +    extends Receiver[(K, V)](storageLevel) with Logging {
    +
    +  private val groupId = kafkaParams("group.id")
    +
    +  private val AUTO_OFFSET_COMMIT = "auto.commit.enable"
    +
    +  private def conf() = SparkEnv.get.conf
    +
    +  /** High level consumer to connect to Kafka. */
    +  private var consumerConnector: ConsumerConnector = null
    +
    +  /** zkClient to connect to Zookeeper to commit the offsets. */
    +  private var zkClient: ZkClient = null
    +
    +  /**
    +   * A HashMap to manage the offset for each topic/partition, this HashMap is called in
    +   * synchronized block, so mutable HashMap will not meet concurrency issue.
    +   */
    +  private var topicPartitionOffsetMap: mutable.HashMap[TopicAndPartition, Long] = null
    +
    +  /** A concurrent HashMap to store the stream block id and related offset snapshot. */
    +  private var blockOffsetMap: ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]] = null
    +
    +  /**
    +   * Manage the BlockGenerator in receiver itself for better managing block store and offset
    +   * commit.
    +   */
    +  private var blockGenerator: BlockGenerator = null
    +
    +  /** Kafka offsets checkpoint listener to register into BlockGenerator for offsets checkpoint. */
    +  private final class OffsetCheckpointListener extends BlockGeneratorListener {
    +
    +    override def onGenerateBlock(blockId: StreamBlockId): Unit = {
    +      // Get a snapshot of current offset map and store with related block id. Since this hook
    +      // function is called in synchronized block, so we can get the snapshot without explicit lock.
    +      val offsetSnapshot = topicPartitionOffsetMap.toMap
    --- End diff --
    
    I spent some time on this today, and I am almost there. 
    In fact there were a bunch issues with the unit tests as well (leaking context in case of failure, tests being run twice, etc.). I refactored them and I have this branch. Not all the unit tests in the new testsuite pass (they are commented out) can you take a look.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2991#discussion_r20271438
  
    --- Diff: project/MimaExcludes.scala ---
    @@ -85,6 +85,10 @@ object MimaExcludes {
                   "org.apache.hadoop.mapred.SparkHadoopMapRedUtil"),
                 ProblemFilters.exclude[MissingTypesProblem](
                   "org.apache.spark.rdd.PairRDDFunctions")
    +          ) ++ Seq(
    +            // SPARK-4062
    +            ProblemFilters.exclude[MissingMethodProblem](
    --- End diff --
    
    Why is this necessary!? We are not updating any of the public classes. `org.apache.spark.streaming.kafka.KafkaReceiver#MessageHandler.this` class is inner class of private class so should not matter. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/2991


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2991#discussion_r19992945
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala ---
    @@ -0,0 +1,212 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka
    +
    +import java.util.Properties
    +import java.util.concurrent.{ConcurrentHashMap, Executors}
    +
    +import scala.collection.Map
    +import scala.collection.mutable
    +import scala.reflect.{classTag, ClassTag}
    +
    +import kafka.common.TopicAndPartition
    +import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector}
    +import kafka.serializer.Decoder
    +import kafka.utils.{ZkUtils, ZKGroupTopicDirs, ZKStringSerializer, VerifiableProperties}
    +import org.I0Itec.zkclient.ZkClient
    +
    +import org.apache.spark.{SparkEnv, Logging}
    +import org.apache.spark.storage.{StreamBlockId, StorageLevel}
    +import org.apache.spark.streaming.receiver.{BlockGeneratorListener, BlockGenerator, Receiver}
    +
    +private[streaming]
    +class ReliableKafkaReceiver[
    +  K: ClassTag,
    +  V: ClassTag,
    +  U <: Decoder[_]: ClassTag,
    +  T <: Decoder[_]: ClassTag](
    +    kafkaParams: Map[String, String],
    +    topics: Map[String, Int],
    +    storageLevel: StorageLevel)
    +    extends Receiver[Any](storageLevel) with Logging {
    +
    +  /** High level consumer to connect to Kafka */
    +  private var consumerConnector: ConsumerConnector = null
    +
    +  /** zkClient to connect to Zookeeper to commit the offsets */
    +  private var zkClient: ZkClient = null
    +
    +  private val groupId = kafkaParams("group.id")
    +
    +  private lazy val env = SparkEnv.get
    +
    +  private val AUTO_OFFSET_COMMIT = "auto.commit.enable"
    +
    +  /** A HashMap to manage the offset for each topic/partition, this HashMap is called in
    +    * synchronized block, so mutable HashMap will not meet concurrency issue */
    +  private lazy val topicPartitionOffsetMap = new mutable.HashMap[TopicAndPartition, Long]
    --- End diff --
    
    Could you use var's instead of lazy val? Bascially, the reciever should be designed to be multiple lifecycles (start, stop) and `lazy val` causes initialization only in the first `start` and likely not cleared when `stop` is called. So its a better design pattern to use `vars` and explicit initialization on start(), rather than `lazy vals`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-62675496
  
      [Test build #23251 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/23251/consoleFull) for   PR 2991 at commit [`b798535`](https://github.com/apache/spark/commit/b798535f1e6c8c981bfb700fe6083e881072d210).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-62347257
  
      [Test build #23142 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/23142/consoleFull) for   PR 2991 at commit [`c174454`](https://github.com/apache/spark/commit/c1744547d0369045fc53a715fa227e7b34edce55).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-62357457
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23144/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-62486799
  
    OK, got it, thanks a lot. I will add this class with experiment annotation. For the performance comparison, we're still under test,  some tuning and configuration things delay the test, we will bring the result back to you soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2991#discussion_r20130302
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaReceiver.scala ---
    @@ -0,0 +1,135 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka
    +
    +import java.util.Properties
    +import java.util.concurrent.Executors
    +
    +import scala.collection.Map
    +import scala.reflect.{classTag, ClassTag}
    +
    +import kafka.consumer._
    +import kafka.serializer.Decoder
    +import kafka.utils.VerifiableProperties
    +import kafka.utils.ZKStringSerializer
    +import org.I0Itec.zkclient._
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.streaming.receiver.Receiver
    +
    +private[streaming]
    +class KafkaReceiver[
    +  K: ClassTag,
    +  V: ClassTag,
    +  U <: Decoder[_]: ClassTag,
    +  T <: Decoder[_]: ClassTag](
    +    kafkaParams: Map[String, String],
    +    topics: Map[String, Int],
    +    storageLevel: StorageLevel
    +  ) extends Receiver[Any](storageLevel) with Logging {
    +
    +  // Connection to Kafka
    +  var consumerConnector: ConsumerConnector = null
    +
    +  def onStop() {
    +    if (consumerConnector != null) {
    +      consumerConnector.shutdown()
    +    }
    +  }
    +
    +  def onStart() {
    +
    +    logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id"))
    +
    +    // Kafka connection properties
    +    val props = new Properties()
    +    kafkaParams.foreach(param => props.put(param._1, param._2))
    +
    +    val zkConnect = kafkaParams("zookeeper.connect")
    +    // Create the connection to the cluster
    +    logInfo("Connecting to Zookeeper: " + zkConnect)
    +    val consumerConfig = new ConsumerConfig(props)
    +    consumerConnector = Consumer.create(consumerConfig)
    +    logInfo("Connected to " + zkConnect)
    +
    +    // When auto.offset.reset is defined, it is our responsibility to try and whack the
    +    // consumer group zk node.
    +    if (kafkaParams.contains("auto.offset.reset")) {
    +      tryZookeeperConsumerGroupCleanup(zkConnect, kafkaParams("group.id"))
    +    }
    +
    +    val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
    +      .newInstance(consumerConfig.props)
    +      .asInstanceOf[Decoder[K]]
    +    val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
    +      .newInstance(consumerConfig.props)
    +      .asInstanceOf[Decoder[V]]
    +
    +    // Create Threads for each Topic/Message Stream we are listening
    +    val topicMessageStreams = consumerConnector.createMessageStreams(
    +      topics, keyDecoder, valueDecoder)
    +
    +    val executorPool = Executors.newFixedThreadPool(topics.values.sum)
    +    try {
    +      // Start the messages handler for each partition
    +      topicMessageStreams.values.foreach { streams =>
    +        streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }
    +      }
    +    } finally {
    +      executorPool.shutdown() // Just causes threads to terminate after work is done
    +    }
    +  }
    +
    +  // Handles Kafka Messages
    +  private class MessageHandler[K: ClassTag, V: ClassTag](stream: KafkaStream[K, V])
    +    extends Runnable {
    +    def run() {
    +      logInfo("Starting MessageHandler.")
    +      try {
    +        for (msgAndMetadata <- stream) {
    +          store((msgAndMetadata.key, msgAndMetadata.message))
    +        }
    +      } catch {
    +        case e: Throwable => logError("Error handling message; exiting", e)
    +      }
    +    }
    +  }
    +
    +  // It is our responsibility to delete the consumer group when specifying auto.offset.reset. This
    +  // is because Kafka 0.7.2 only honors this param when the group is not in zookeeper.
    +  //
    +  // The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied
    +  // from Kafka's ConsoleConsumer. See code related to 'auto.offset.reset' when it is set to
    +  // 'smallest'/'largest':
    +  // scalastyle:off
    +  // https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
    +  // scalastyle:on
    +  private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) {
    +    val dir = "/consumers/" + groupId
    +    logInfo("Cleaning up temporary Zookeeper data under " + dir + ".")
    --- End diff --
    
    Right, sorry, I should merge that. But if I merge that, you will have to update this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2991#discussion_r20131127
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala ---
    @@ -0,0 +1,241 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka
    +
    +import java.util.Properties
    +import java.util.concurrent.ConcurrentHashMap
    +
    +import scala.collection.Map
    +import scala.collection.mutable
    +import scala.reflect.{classTag, ClassTag}
    +
    +import kafka.common.TopicAndPartition
    +import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, KafkaStream}
    +import kafka.serializer.Decoder
    +import kafka.utils.{ZkUtils, ZKGroupTopicDirs, ZKStringSerializer, VerifiableProperties}
    +import org.I0Itec.zkclient.ZkClient
    +
    +import org.apache.spark.{SparkEnv, Logging}
    +import org.apache.spark.storage.{StreamBlockId, StorageLevel}
    +import org.apache.spark.streaming.receiver.{BlockGeneratorListener, BlockGenerator, Receiver}
    +import org.apache.spark.util.Utils
    +
    +private[streaming]
    --- End diff --
    
    Please add scala docs describing this class. It should have the information of how it is uses the `store(multipleObjects)` to get more reliability.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-62357452
  
      [Test build #23144 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/23144/consoleFull) for   PR 2991 at commit [`3ba4f73`](https://github.com/apache/spark/commit/3ba4f73bf1aa3431fd5c4679441b9f0ff900b883).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-62860993
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23300/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-63043173
  
    **[Test build #23362 timed out](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/23362/consoleFull)**     for PR 2991 at commit [`5461f1c`](https://github.com/apache/spark/commit/5461f1c43b0e98aa7b583f14569eefd833b19df0)     after a configured wait of `120m`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2991#discussion_r20265416
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala ---
    @@ -0,0 +1,251 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka
    +
    +import java.util.Properties
    +import java.util.concurrent.ConcurrentHashMap
    +
    +import scala.collection.Map
    +import scala.collection.mutable
    +import scala.reflect.{classTag, ClassTag}
    +
    +import kafka.common.TopicAndPartition
    +import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, KafkaStream}
    +import kafka.serializer.Decoder
    +import kafka.utils.{ZkUtils, ZKGroupTopicDirs, ZKStringSerializer, VerifiableProperties}
    +import org.I0Itec.zkclient.ZkClient
    +
    +import org.apache.spark.{SparkEnv, Logging}
    +import org.apache.spark.storage.{StreamBlockId, StorageLevel}
    +import org.apache.spark.streaming.receiver.{BlockGeneratorListener, BlockGenerator, Receiver}
    +import org.apache.spark.util.Utils
    +
    +
    +/**
    + * ReliableKafkaReceiver offers the ability to reliably store data into BlockManager without loss.
    + * It is turned off by default and will be enabled when
    + * spark.streaming.receiver.writeAheadLog.enable is true. The difference compared to KafkaReceiver
    + * is that this receiver manages topic-partition/offset itself and updates the offset information
    + * after data is reliably stored as write-ahead log. Offsets will only be updated when data is
    + * reliably stored, so the potential data loss problem of KafkaReceiver can be eliminated.
    + *
    + * Note: ReliableKafkaReceiver will set auto.commit.enable to false to turn off automatic offset
    + * commit mechanism in Kafka consumer. So setting this configuration manually within kafkaParams
    + * will not take effect.
    + */
    +private[streaming]
    +class ReliableKafkaReceiver[
    +  K: ClassTag,
    +  V: ClassTag,
    +  U <: Decoder[_]: ClassTag,
    +  T <: Decoder[_]: ClassTag](
    +    kafkaParams: Map[String, String],
    +    topics: Map[String, Int],
    +    storageLevel: StorageLevel)
    +    extends Receiver[(K, V)](storageLevel) with Logging {
    +
    +  private val groupId = kafkaParams("group.id")
    +
    +  private val AUTO_OFFSET_COMMIT = "auto.commit.enable"
    +
    +  private def conf() = SparkEnv.get.conf
    +
    +  /** High level consumer to connect to Kafka. */
    +  private var consumerConnector: ConsumerConnector = null
    +
    +  /** zkClient to connect to Zookeeper to commit the offsets. */
    +  private var zkClient: ZkClient = null
    +
    +  /**
    +   * A HashMap to manage the offset for each topic/partition, this HashMap is called in
    +   * synchronized block, so mutable HashMap will not meet concurrency issue.
    +   */
    +  private var topicPartitionOffsetMap: mutable.HashMap[TopicAndPartition, Long] = null
    +
    +  /** A concurrent HashMap to store the stream block id and related offset snapshot. */
    +  private var blockOffsetMap: ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]] = null
    +
    +  /**
    +   * Manage the BlockGenerator in receiver itself for better managing block store and offset
    +   * commit.
    +   */
    +  private var blockGenerator: BlockGenerator = null
    +
    +  /** Kafka offsets checkpoint listener to register into BlockGenerator for offsets checkpoint. */
    +  private final class OffsetCheckpointListener extends BlockGeneratorListener {
    +
    +    override def onGenerateBlock(blockId: StreamBlockId): Unit = {
    +      // Get a snapshot of current offset map and store with related block id. Since this hook
    +      // function is called in synchronized block, so we can get the snapshot without explicit lock.
    +      val offsetSnapshot = topicPartitionOffsetMap.toMap
    --- End diff --
    
    Yes, that sounds great, previously synchronized section in BlockGenerator is method, we can refine the lock section to make it better performance. 
    
    There's one thing I think should take care: add message into BlockGenerator and update offset should be in one synchronized section, and be exclusive to the stream block generation and offset snapshot cut (the latter two action also should be in the same synchronized section). Otherwise we will get wrong result potentially.
    
    If we introduce 2 locks, we should also follow the previous rule and avoid dead lock. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2991#discussion_r19992880
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala ---
    @@ -0,0 +1,212 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka
    +
    +import java.util.Properties
    +import java.util.concurrent.{ConcurrentHashMap, Executors}
    +
    +import scala.collection.Map
    +import scala.collection.mutable
    +import scala.reflect.{classTag, ClassTag}
    +
    +import kafka.common.TopicAndPartition
    +import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector}
    +import kafka.serializer.Decoder
    +import kafka.utils.{ZkUtils, ZKGroupTopicDirs, ZKStringSerializer, VerifiableProperties}
    +import org.I0Itec.zkclient.ZkClient
    +
    +import org.apache.spark.{SparkEnv, Logging}
    +import org.apache.spark.storage.{StreamBlockId, StorageLevel}
    +import org.apache.spark.streaming.receiver.{BlockGeneratorListener, BlockGenerator, Receiver}
    +
    +private[streaming]
    +class ReliableKafkaReceiver[
    +  K: ClassTag,
    +  V: ClassTag,
    +  U <: Decoder[_]: ClassTag,
    +  T <: Decoder[_]: ClassTag](
    +    kafkaParams: Map[String, String],
    +    topics: Map[String, Int],
    +    storageLevel: StorageLevel)
    +    extends Receiver[Any](storageLevel) with Logging {
    +
    +  /** High level consumer to connect to Kafka */
    +  private var consumerConnector: ConsumerConnector = null
    +
    +  /** zkClient to connect to Zookeeper to commit the offsets */
    +  private var zkClient: ZkClient = null
    +
    +  private val groupId = kafkaParams("group.id")
    +
    +  private lazy val env = SparkEnv.get
    --- End diff --
    
    Can you make the env() a def instead of lazy val. Easier to reason about when it get instantiated. In fact do you need a handle to env? You just need the conf. So the method can be `def conf() = SparkEnv.get.conf`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2991#discussion_r20265849
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala ---
    @@ -0,0 +1,251 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka
    +
    +import java.util.Properties
    +import java.util.concurrent.ConcurrentHashMap
    +
    +import scala.collection.Map
    +import scala.collection.mutable
    +import scala.reflect.{classTag, ClassTag}
    +
    +import kafka.common.TopicAndPartition
    +import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, KafkaStream}
    +import kafka.serializer.Decoder
    +import kafka.utils.{ZkUtils, ZKGroupTopicDirs, ZKStringSerializer, VerifiableProperties}
    +import org.I0Itec.zkclient.ZkClient
    +
    +import org.apache.spark.{SparkEnv, Logging}
    +import org.apache.spark.storage.{StreamBlockId, StorageLevel}
    +import org.apache.spark.streaming.receiver.{BlockGeneratorListener, BlockGenerator, Receiver}
    +import org.apache.spark.util.Utils
    +
    +
    +/**
    + * ReliableKafkaReceiver offers the ability to reliably store data into BlockManager without loss.
    + * It is turned off by default and will be enabled when
    + * spark.streaming.receiver.writeAheadLog.enable is true. The difference compared to KafkaReceiver
    + * is that this receiver manages topic-partition/offset itself and updates the offset information
    + * after data is reliably stored as write-ahead log. Offsets will only be updated when data is
    + * reliably stored, so the potential data loss problem of KafkaReceiver can be eliminated.
    + *
    + * Note: ReliableKafkaReceiver will set auto.commit.enable to false to turn off automatic offset
    + * commit mechanism in Kafka consumer. So setting this configuration manually within kafkaParams
    + * will not take effect.
    + */
    +private[streaming]
    +class ReliableKafkaReceiver[
    +  K: ClassTag,
    +  V: ClassTag,
    +  U <: Decoder[_]: ClassTag,
    +  T <: Decoder[_]: ClassTag](
    +    kafkaParams: Map[String, String],
    +    topics: Map[String, Int],
    +    storageLevel: StorageLevel)
    +    extends Receiver[(K, V)](storageLevel) with Logging {
    +
    +  private val groupId = kafkaParams("group.id")
    +
    +  private val AUTO_OFFSET_COMMIT = "auto.commit.enable"
    +
    +  private def conf() = SparkEnv.get.conf
    +
    +  /** High level consumer to connect to Kafka. */
    +  private var consumerConnector: ConsumerConnector = null
    +
    +  /** zkClient to connect to Zookeeper to commit the offsets. */
    +  private var zkClient: ZkClient = null
    +
    +  /**
    +   * A HashMap to manage the offset for each topic/partition, this HashMap is called in
    +   * synchronized block, so mutable HashMap will not meet concurrency issue.
    +   */
    +  private var topicPartitionOffsetMap: mutable.HashMap[TopicAndPartition, Long] = null
    +
    +  /** A concurrent HashMap to store the stream block id and related offset snapshot. */
    +  private var blockOffsetMap: ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]] = null
    +
    +  /**
    +   * Manage the BlockGenerator in receiver itself for better managing block store and offset
    +   * commit.
    +   */
    +  private var blockGenerator: BlockGenerator = null
    +
    +  /** Kafka offsets checkpoint listener to register into BlockGenerator for offsets checkpoint. */
    +  private final class OffsetCheckpointListener extends BlockGeneratorListener {
    +
    +    override def onGenerateBlock(blockId: StreamBlockId): Unit = {
    +      // Get a snapshot of current offset map and store with related block id. Since this hook
    +      // function is called in synchronized block, so we can get the snapshot without explicit lock.
    +      val offsetSnapshot = topicPartitionOffsetMap.toMap
    --- End diff --
    
    Yeah, I will, thanks a lot.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2991#discussion_r20128588
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala ---
    @@ -53,112 +46,17 @@ class KafkaInputDStream[
         @transient ssc_ : StreamingContext,
         kafkaParams: Map[String, String],
         topics: Map[String, Int],
    +    reliableReceiveEnabled: Boolean,
    --- End diff --
    
    Better to name it as "useReliableReceiver"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2991#discussion_r20271138
  
    --- Diff: external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala ---
    @@ -0,0 +1,173 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka
    +
    +import java.io.File
    +
    +import scala.collection.mutable
    +
    +import kafka.serializer.StringDecoder
    +import kafka.utils.{ZkUtils, ZKGroupTopicDirs}
    +
    +import org.apache.spark.SparkConf
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.streaming.StreamingContext
    +import org.apache.spark.util.Utils
    +
    +class ReliableKafkaStreamSuite extends KafkaStreamSuite {
    +  import KafkaTestUtils._
    +
    +  test("Reliable Kafka input stream") {
    +    val sparkConf = new SparkConf()
    +      .setMaster(master)
    +      .setAppName(framework)
    +      .set("spark.streaming.receiver.writeAheadLog.enable", "true")
    +    val ssc = new StreamingContext(sparkConf, batchDuration)
    +    val checkpointDir = s"${System.getProperty("java.io.tmpdir", "/tmp")}/" +
    +      s"test-checkpoint${random.nextInt(10000)}"
    +    Utils.registerShutdownDeleteDir(new File(checkpointDir))
    +    ssc.checkpoint(checkpointDir)
    +
    +    val topic = "test"
    +    val sent = Map("a" -> 1, "b" -> 1, "c" -> 1)
    +    createTopic(topic)
    +    produceAndSendMessage(topic, sent)
    +
    +    val kafkaParams = Map("zookeeper.connect" -> s"$zkHost:$zkPort",
    +      "group.id" -> s"test-consumer-${random.nextInt(10000)}",
    +      "auto.offset.reset" -> "smallest")
    +
    +    val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
    +      ssc,
    +      kafkaParams,
    +      Map(topic -> 1),
    +      StorageLevel.MEMORY_ONLY)
    +    val result = new mutable.HashMap[String, Long]()
    +    stream.map { case (k, v) => v }
    +      .foreachRDD { r =>
    +        val ret = r.collect()
    +        ret.foreach { v =>
    +          val count = result.getOrElseUpdate(v, 0) + 1
    +          result.put(v, count)
    +        }
    +      }
    +    ssc.start()
    +    ssc.awaitTermination(3000)
    +
    +    // A basic process verification for ReliableKafkaReceiver.
    +    // Verify whether received message number is equal to the sent message number.
    +    assert(sent.size === result.size)
    +    // Verify whether each message is the same as the data to be verified.
    +    sent.keys.foreach { k => assert(sent(k) === result(k).toInt) }
    +
    +    ssc.stop()
    +  }
    +
    +  test("Verify the offset commit") {
    +    // Verify the corretness of offset commit mechanism.
    --- End diff --
    
    This is still there :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2991#discussion_r20131105
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala ---
    @@ -71,7 +71,8 @@ object KafkaUtils {
           topics: Map[String, Int],
           storageLevel: StorageLevel
         ): ReceiverInputDStream[(K, V)] = {
    -    new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, storageLevel)
    +    val WALEnabled = ssc.conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)
    --- End diff --
    
    WALEnabled => walEnabled


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-62840906
  
    I think there's still a synchronizing issue, would you mind taking a look at the comment here in (https://github.com/jerryshao/apache-spark/pull/5), thanks a lot.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-62499225
  
    I have made comments on that PR #1420, I will merge that as soon as you have made the changes. And then please update this PR, both receivers. BTW, for simplifying stuff (since it really late in the release cycle), please keep the existing receiver in the same file (to minimize the diff). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-62976670
  
    @jerryshao Here is another round of changes from me. 
    You correctly identified a flaw in the lock logic in the last change I made. I played around with different implementations, and I came up with two implementation that I think are correct, and tries to preserve the BlockGenerator performance for existing receivers. Both of them are pulls requests on your repo.
    
    1. Refactor 1, https://github.com/jerryshao/apache-spark/pull/7: Here I attempt to fix the flaw by taking receiver lock before updating the buffer. And this is done by extending the BlockGenerator and overriding the updateCurrentBuffer method to take receiver lock first. This ensures deadlock free locking by always taking locks in the same order - receiver lock followed by block generator lock. The default block generator code path is not affected, so other receiver should not be affected either.
    
    2. Refactor2, https://github.com/jerryshao/apache-spark/pull/6: I essentially reverted back to your original proposal. :) As I tried out all possible implementations, and finally got "Refactor 1", I realized that it was more complicated than what you proposed. So I reverted back to that, and added a lot of scala docs explaining the behavior. Personally I am in favor of this now. 
    
    Besides I also eliminated more duplicate code form the unit tests and merged two unit tests to reduce test run times. Please take a look.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-62682120
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23251/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-62350594
  
      [Test build #23144 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/23144/consoleFull) for   PR 2991 at commit [`3ba4f73`](https://github.com/apache/spark/commit/3ba4f73bf1aa3431fd5c4679441b9f0ff900b883).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2991#discussion_r20197017
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaReceiver.scala ---
    @@ -0,0 +1,135 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka
    +
    +import java.util.Properties
    +import java.util.concurrent.Executors
    +
    +import scala.collection.Map
    +import scala.reflect.{classTag, ClassTag}
    +
    +import kafka.consumer._
    +import kafka.serializer.Decoder
    +import kafka.utils.VerifiableProperties
    +import kafka.utils.ZKStringSerializer
    +import org.I0Itec.zkclient._
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.streaming.receiver.Receiver
    +
    +private[streaming]
    +class KafkaReceiver[
    +  K: ClassTag,
    +  V: ClassTag,
    +  U <: Decoder[_]: ClassTag,
    +  T <: Decoder[_]: ClassTag](
    +    kafkaParams: Map[String, String],
    +    topics: Map[String, Int],
    +    storageLevel: StorageLevel
    +  ) extends Receiver[Any](storageLevel) with Logging {
    +
    +  // Connection to Kafka
    +  var consumerConnector: ConsumerConnector = null
    +
    +  def onStop() {
    +    if (consumerConnector != null) {
    +      consumerConnector.shutdown()
    +    }
    +  }
    +
    +  def onStart() {
    +
    +    logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id"))
    +
    +    // Kafka connection properties
    +    val props = new Properties()
    +    kafkaParams.foreach(param => props.put(param._1, param._2))
    +
    +    val zkConnect = kafkaParams("zookeeper.connect")
    +    // Create the connection to the cluster
    +    logInfo("Connecting to Zookeeper: " + zkConnect)
    +    val consumerConfig = new ConsumerConfig(props)
    +    consumerConnector = Consumer.create(consumerConfig)
    +    logInfo("Connected to " + zkConnect)
    +
    +    // When auto.offset.reset is defined, it is our responsibility to try and whack the
    +    // consumer group zk node.
    +    if (kafkaParams.contains("auto.offset.reset")) {
    +      tryZookeeperConsumerGroupCleanup(zkConnect, kafkaParams("group.id"))
    +    }
    +
    +    val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
    +      .newInstance(consumerConfig.props)
    +      .asInstanceOf[Decoder[K]]
    +    val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
    +      .newInstance(consumerConfig.props)
    +      .asInstanceOf[Decoder[V]]
    +
    +    // Create Threads for each Topic/Message Stream we are listening
    +    val topicMessageStreams = consumerConnector.createMessageStreams(
    +      topics, keyDecoder, valueDecoder)
    +
    +    val executorPool = Executors.newFixedThreadPool(topics.values.sum)
    +    try {
    +      // Start the messages handler for each partition
    +      topicMessageStreams.values.foreach { streams =>
    +        streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }
    +      }
    +    } finally {
    +      executorPool.shutdown() // Just causes threads to terminate after work is done
    +    }
    +  }
    +
    +  // Handles Kafka Messages
    +  private class MessageHandler[K: ClassTag, V: ClassTag](stream: KafkaStream[K, V])
    +    extends Runnable {
    +    def run() {
    +      logInfo("Starting MessageHandler.")
    +      try {
    +        for (msgAndMetadata <- stream) {
    --- End diff --
    
    Can you replace this with a while loop? Scala's for loops are less efficient that plain old while loop. 
    
    http://dynamicsofprogramming.blogspot.co.uk/2013/01/loop-performance-and-local-variables-in.html
    
    Not sure how much performance different it is, but better to use while loop any ways


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2991#discussion_r20134004
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaReceiver.scala ---
    @@ -0,0 +1,135 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka
    +
    +import java.util.Properties
    +import java.util.concurrent.Executors
    +
    +import scala.collection.Map
    +import scala.reflect.{classTag, ClassTag}
    +
    +import kafka.consumer._
    +import kafka.serializer.Decoder
    +import kafka.utils.VerifiableProperties
    +import kafka.utils.ZKStringSerializer
    +import org.I0Itec.zkclient._
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.streaming.receiver.Receiver
    +
    +private[streaming]
    +class KafkaReceiver[
    +  K: ClassTag,
    +  V: ClassTag,
    +  U <: Decoder[_]: ClassTag,
    +  T <: Decoder[_]: ClassTag](
    +    kafkaParams: Map[String, String],
    +    topics: Map[String, Int],
    +    storageLevel: StorageLevel
    +  ) extends Receiver[Any](storageLevel) with Logging {
    +
    +  // Connection to Kafka
    +  var consumerConnector: ConsumerConnector = null
    +
    +  def onStop() {
    +    if (consumerConnector != null) {
    +      consumerConnector.shutdown()
    +    }
    +  }
    +
    +  def onStart() {
    +
    +    logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id"))
    +
    +    // Kafka connection properties
    +    val props = new Properties()
    +    kafkaParams.foreach(param => props.put(param._1, param._2))
    +
    +    val zkConnect = kafkaParams("zookeeper.connect")
    +    // Create the connection to the cluster
    +    logInfo("Connecting to Zookeeper: " + zkConnect)
    +    val consumerConfig = new ConsumerConfig(props)
    +    consumerConnector = Consumer.create(consumerConfig)
    +    logInfo("Connected to " + zkConnect)
    +
    +    // When auto.offset.reset is defined, it is our responsibility to try and whack the
    +    // consumer group zk node.
    +    if (kafkaParams.contains("auto.offset.reset")) {
    +      tryZookeeperConsumerGroupCleanup(zkConnect, kafkaParams("group.id"))
    +    }
    +
    +    val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
    +      .newInstance(consumerConfig.props)
    +      .asInstanceOf[Decoder[K]]
    +    val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
    +      .newInstance(consumerConfig.props)
    +      .asInstanceOf[Decoder[V]]
    +
    +    // Create Threads for each Topic/Message Stream we are listening
    +    val topicMessageStreams = consumerConnector.createMessageStreams(
    +      topics, keyDecoder, valueDecoder)
    +
    +    val executorPool = Executors.newFixedThreadPool(topics.values.sum)
    +    try {
    +      // Start the messages handler for each partition
    +      topicMessageStreams.values.foreach { streams =>
    +        streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }
    +      }
    +    } finally {
    +      executorPool.shutdown() // Just causes threads to terminate after work is done
    +    }
    +  }
    +
    +  // Handles Kafka Messages
    +  private class MessageHandler[K: ClassTag, V: ClassTag](stream: KafkaStream[K, V])
    +    extends Runnable {
    +    def run() {
    +      logInfo("Starting MessageHandler.")
    +      try {
    +        for (msgAndMetadata <- stream) {
    +          store((msgAndMetadata.key, msgAndMetadata.message))
    +        }
    +      } catch {
    +        case e: Throwable => logError("Error handling message; exiting", e)
    +      }
    +    }
    +  }
    +
    +  // It is our responsibility to delete the consumer group when specifying auto.offset.reset. This
    +  // is because Kafka 0.7.2 only honors this param when the group is not in zookeeper.
    +  //
    +  // The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied
    +  // from Kafka's ConsoleConsumer. See code related to 'auto.offset.reset' when it is set to
    +  // 'smallest'/'largest':
    +  // scalastyle:off
    +  // https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
    +  // scalastyle:on
    +  private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) {
    +    val dir = "/consumers/" + groupId
    +    logInfo("Cleaning up temporary Zookeeper data under " + dir + ".")
    --- End diff --
    
    Yeah, I will.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-62840663
  
    I will do it. Thanks a lot for your refactor work and review, very appreciated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-63043183
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23362/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-60901164
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22446/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-60878843
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22425/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-63129827
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23380/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2991#discussion_r19992762
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala ---
    @@ -80,9 +89,10 @@ private[streaming] class BlockGenerator(
        * Push a single data item into the buffer. All received data items
        * will be periodically pushed into BlockManager.
        */
    -  def += (data: Any): Unit = synchronized {
    +  def += (data: Any, metadata: Any = null): Unit = synchronized {
         waitToPush()
         currentBuffer += data
    +    listener.onStoreData(data, metadata)
    --- End diff --
    
    What is the point of calling onStore data right after inserting into the buffer? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-62998448
  
    Hi TD, this test is so flaky, it fails several times in my local test:
    
    ```
    - block addition, block to batch allocation and cleanup with write ahead log *** FAILED *** (21 milliseconds)
    [info]   java.io.FileNotFoundException: File /tmp/1415929501402-0/receivedBlockMetadata/log-0-1000 does not exist.
    [info]   at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:397)
    [info]   at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:324)
    [info]   at org.apache.spark.streaming.util.WriteAheadLogSuite$.getLogFilesInDirectory(WriteAheadLogSuite.scala:344)
    [info]   at org.apache.spark.streaming.ReceivedBlockTrackerSuite.getWriteAheadLogFiles(ReceivedBlockTrackerSuite.scala:226)
    [info]   at org.apache.spark.streaming.ReceivedBlockTrackerSuite$$anonfun$4.apply$mcV$sp(ReceivedBlockTrackerSuite.scala:171)
    [info]   at org.apache.spark.streaming.ReceivedBlockTrackerSuite$$anonfun$4.apply(ReceivedBlockTrackerSuite.scala:96)
    [info]   at org.apache.spark.streaming.ReceivedBlockTrackerSuite$$anonfun$4.apply(ReceivedBlockTrackerSuite.scala:96)
    [info]   at 
    ....
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2991#discussion_r20265751
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala ---
    @@ -0,0 +1,251 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka
    +
    +import java.util.Properties
    +import java.util.concurrent.ConcurrentHashMap
    +
    +import scala.collection.Map
    +import scala.collection.mutable
    +import scala.reflect.{classTag, ClassTag}
    +
    +import kafka.common.TopicAndPartition
    +import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, KafkaStream}
    +import kafka.serializer.Decoder
    +import kafka.utils.{ZkUtils, ZKGroupTopicDirs, ZKStringSerializer, VerifiableProperties}
    +import org.I0Itec.zkclient.ZkClient
    +
    +import org.apache.spark.{SparkEnv, Logging}
    +import org.apache.spark.storage.{StreamBlockId, StorageLevel}
    +import org.apache.spark.streaming.receiver.{BlockGeneratorListener, BlockGenerator, Receiver}
    +import org.apache.spark.util.Utils
    +
    +
    +/**
    + * ReliableKafkaReceiver offers the ability to reliably store data into BlockManager without loss.
    + * It is turned off by default and will be enabled when
    + * spark.streaming.receiver.writeAheadLog.enable is true. The difference compared to KafkaReceiver
    + * is that this receiver manages topic-partition/offset itself and updates the offset information
    + * after data is reliably stored as write-ahead log. Offsets will only be updated when data is
    + * reliably stored, so the potential data loss problem of KafkaReceiver can be eliminated.
    + *
    + * Note: ReliableKafkaReceiver will set auto.commit.enable to false to turn off automatic offset
    + * commit mechanism in Kafka consumer. So setting this configuration manually within kafkaParams
    + * will not take effect.
    + */
    +private[streaming]
    +class ReliableKafkaReceiver[
    +  K: ClassTag,
    +  V: ClassTag,
    +  U <: Decoder[_]: ClassTag,
    +  T <: Decoder[_]: ClassTag](
    +    kafkaParams: Map[String, String],
    +    topics: Map[String, Int],
    +    storageLevel: StorageLevel)
    +    extends Receiver[(K, V)](storageLevel) with Logging {
    +
    +  private val groupId = kafkaParams("group.id")
    +
    +  private val AUTO_OFFSET_COMMIT = "auto.commit.enable"
    +
    +  private def conf() = SparkEnv.get.conf
    +
    +  /** High level consumer to connect to Kafka. */
    +  private var consumerConnector: ConsumerConnector = null
    +
    +  /** zkClient to connect to Zookeeper to commit the offsets. */
    +  private var zkClient: ZkClient = null
    +
    +  /**
    +   * A HashMap to manage the offset for each topic/partition, this HashMap is called in
    +   * synchronized block, so mutable HashMap will not meet concurrency issue.
    +   */
    +  private var topicPartitionOffsetMap: mutable.HashMap[TopicAndPartition, Long] = null
    +
    +  /** A concurrent HashMap to store the stream block id and related offset snapshot. */
    +  private var blockOffsetMap: ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]] = null
    +
    +  /**
    +   * Manage the BlockGenerator in receiver itself for better managing block store and offset
    +   * commit.
    +   */
    +  private var blockGenerator: BlockGenerator = null
    +
    +  /** Kafka offsets checkpoint listener to register into BlockGenerator for offsets checkpoint. */
    +  private final class OffsetCheckpointListener extends BlockGeneratorListener {
    +
    +    override def onGenerateBlock(blockId: StreamBlockId): Unit = {
    +      // Get a snapshot of current offset map and store with related block id. Since this hook
    +      // function is called in synchronized block, so we can get the snapshot without explicit lock.
    +      val offsetSnapshot = topicPartitionOffsetMap.toMap
    --- End diff --
    
    Here is the Pull Request to your repository. Please take a look.
    https://github.com/jerryshao/apache-spark/pull/5


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-62876612
  
    **[Test build #23305 timed out](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/23305/consoleFull)**     for PR 2991 at commit [`2a20a01`](https://github.com/apache/spark/commit/2a20a0107af78f9db85bea28acd6cd4730af84f7)     after a configured wait of `120m`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-63017519
  
    Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2991#discussion_r20271214
  
    --- Diff: external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka
    +
    +import java.io.File
    +
    +import scala.collection.mutable
    +import scala.concurrent.duration._
    +import scala.language.postfixOps
    +import scala.util.Random
    +
    +import kafka.serializer.StringDecoder
    +import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
    +import org.scalatest.concurrent.Eventually
    +
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.streaming.StreamingContext
    +import org.apache.spark.util.Utils
    +
    +class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with Eventually {
    +  val topic = "topic"
    +  val data = Map("a" -> 10, "b" -> 10, "c" -> 10)
    +  var groupId: String = _
    +  var kafkaParams: Map[String, String] = _
    +
    +  before {
    +    beforeFunction()  // call this first to start ZK and Kafka
    +    groupId = s"test-consumer-${Random.nextInt(10000)}"
    +    kafkaParams = Map(
    +      "zookeeper.connect" -> zkAddress,
    +      "group.id" -> groupId,
    +      "auto.offset.reset" -> "smallest"
    +    )
    +  }
    +
    +  after {
    +    afterFunction()
    +  }
    +
    +  test("Reliable Kafka input stream") {
    +    sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
    +    ssc = new StreamingContext(sparkConf, batchDuration)
    +    val checkpointDir = s"${System.getProperty("java.io.tmpdir", "/tmp")}/" +
    +      s"test-checkpoint${Random.nextInt(10000)}"
    +    Utils.registerShutdownDeleteDir(new File(checkpointDir))
    +    ssc.checkpoint(checkpointDir)
    +    createTopic(topic)
    +    produceAndSendMessage(topic, data)
    +
    +    val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
    +      ssc,
    +      kafkaParams,
    +      Map(topic -> 1),
    +      StorageLevel.MEMORY_ONLY)
    +    val result = new mutable.HashMap[String, Long]()
    +    stream.map { case (k, v) => v }.foreachRDD { r =>
    +        val ret = r.collect()
    +        ret.foreach { v =>
    +          val count = result.getOrElseUpdate(v, 0) + 1
    +          result.put(v, count)
    +        }
    +      }
    +    ssc.start()
    +    eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
    +      // A basic process verification for ReliableKafkaReceiver.
    +      // Verify whether received message number is equal to the sent message number.
    +      assert(data.size === result.size)
    +      // Verify whether each message is the same as the data to be verified.
    +      data.keys.foreach { k => assert(data(k) === result(k).toInt) }
    +    }
    +    ssc.stop()
    +  }
    +/*
    --- End diff --
    
    This test is still commented out. 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2991#discussion_r19994012
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala ---
    @@ -80,9 +89,10 @@ private[streaming] class BlockGenerator(
        * Push a single data item into the buffer. All received data items
        * will be periodically pushed into BlockManager.
        */
    -  def += (data: Any): Unit = synchronized {
    +  def += (data: Any, metadata: Any = null): Unit = synchronized {
         waitToPush()
         currentBuffer += data
    +    listener.onStoreData(data, metadata)
    --- End diff --
    
    The reason of putting onStoreData here is that this is synchronized with the same mutex, if we put this into caller class, we should use another mutex, this will add some other synchronization logic.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-62685127
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23253/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-62679987
  
      [Test build #23250 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/23250/consoleFull) for   PR 2991 at commit [`e5e21c1`](https://github.com/apache/spark/commit/e5e21c1b54222cc5595e42933f739ca3f078ed36).
     * This patch **fails MiMa tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-62838566
  
    @jerryshao Please enable the unit tests that i commented out and test whether they work correctly. Thanks for helping out, sorry I could not get it to work completely before I handed them off to you. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-62685122
  
      [Test build #23253 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/23253/consoleFull) for   PR 2991 at commit [`e501b3c`](https://github.com/apache/spark/commit/e501b3ce04d43f514676c953799ba91963415227).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-60901153
  
      [Test build #22446 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22446/consoleFull) for   PR 2991 at commit [`bb57d05`](https://github.com/apache/spark/commit/bb57d05b2e3579c9c3e59429918082937a99e87f).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-63129819
  
      [Test build #23380 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/23380/consoleFull) for   PR 2991 at commit [`5461f1c`](https://github.com/apache/spark/commit/5461f1c43b0e98aa7b583f14569eefd833b19df0).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2991#discussion_r19993617
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala ---
    @@ -0,0 +1,212 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka
    +
    +import java.util.Properties
    +import java.util.concurrent.{ConcurrentHashMap, Executors}
    +
    +import scala.collection.Map
    +import scala.collection.mutable
    +import scala.reflect.{classTag, ClassTag}
    +
    +import kafka.common.TopicAndPartition
    +import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector}
    +import kafka.serializer.Decoder
    +import kafka.utils.{ZkUtils, ZKGroupTopicDirs, ZKStringSerializer, VerifiableProperties}
    +import org.I0Itec.zkclient.ZkClient
    +
    +import org.apache.spark.{SparkEnv, Logging}
    +import org.apache.spark.storage.{StreamBlockId, StorageLevel}
    +import org.apache.spark.streaming.receiver.{BlockGeneratorListener, BlockGenerator, Receiver}
    +
    +private[streaming]
    +class ReliableKafkaReceiver[
    +  K: ClassTag,
    +  V: ClassTag,
    +  U <: Decoder[_]: ClassTag,
    +  T <: Decoder[_]: ClassTag](
    +    kafkaParams: Map[String, String],
    +    topics: Map[String, Int],
    +    storageLevel: StorageLevel)
    +    extends Receiver[Any](storageLevel) with Logging {
    +
    +  /** High level consumer to connect to Kafka */
    +  private var consumerConnector: ConsumerConnector = null
    +
    +  /** zkClient to connect to Zookeeper to commit the offsets */
    +  private var zkClient: ZkClient = null
    +
    +  private val groupId = kafkaParams("group.id")
    +
    +  private lazy val env = SparkEnv.get
    +
    +  private val AUTO_OFFSET_COMMIT = "auto.commit.enable"
    +
    +  /** A HashMap to manage the offset for each topic/partition, this HashMap is called in
    +    * synchronized block, so mutable HashMap will not meet concurrency issue */
    +  private lazy val topicPartitionOffsetMap = new mutable.HashMap[TopicAndPartition, Long]
    +
    +  /** A concurrent HashMap to store the stream block id and related offset snapshot */
    +  private lazy val blockOffsetMap =
    +    new ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]]
    +
    +  private lazy val blockGeneratorListener = new BlockGeneratorListener {
    +    override def onStoreData(data: Any, metadata: Any): Unit = {
    +      if (metadata != null) {
    +        val kafkaMetadata = metadata.asInstanceOf[(TopicAndPartition, Long)]
    +        topicPartitionOffsetMap.put(kafkaMetadata._1, kafkaMetadata._2)
    +      }
    +    }
    +
    +    override def onGenerateBlock(blockId: StreamBlockId): Unit = {
    +      // Get a snapshot of current offset map and store with related block id. Since this hook
    +      // function is called in synchronized block, so we can get the snapshot without explicit lock.
    +      val offsetSnapshot = topicPartitionOffsetMap.toMap
    +      blockOffsetMap.put(blockId, offsetSnapshot)
    +      topicPartitionOffsetMap.clear()
    +    }
    +
    +    override def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
    +      store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[Any]])
    +
    +      // Commit and remove the related offsets.
    +      Option(blockOffsetMap.get(blockId)).foreach { offsetMap =>
    +        commitOffset(offsetMap)
    +      }
    +      blockOffsetMap.remove(blockId)
    +    }
    +
    +    override def onError(message: String, throwable: Throwable): Unit = {
    +      reportError(message, throwable)
    +    }
    +  }
    +
    +  /** Manage the BlockGenerator in receiver itself for better managing block store and offset
    +    * commit */
    +  private var blockGenerator: BlockGenerator = null
    +
    +  override def onStop(): Unit = {
    +    if (consumerConnector != null) {
    +      consumerConnector.shutdown()
    +      consumerConnector = null
    +    }
    +
    +    if (zkClient != null) {
    +      zkClient.close()
    +      zkClient = null
    +    }
    +
    +    blockGenerator.stop()
    +  }
    +
    +  override def onStart(): Unit = {
    +    logInfo(s"Starting Kafka Consumer Stream with group: $groupId")
    +
    +    blockGenerator = new BlockGenerator(blockGeneratorListener, streamId, env.conf)
    +
    +    if (kafkaParams.contains(AUTO_OFFSET_COMMIT) && kafkaParams(AUTO_OFFSET_COMMIT) == "true") {
    +      logWarning(s"$AUTO_OFFSET_COMMIT should be set to false in ReliableKafkaReceiver, " +
    +        "otherwise we will manually set it to false to turn off auto offset commit in Kafka")
    +    }
    +
    +    val props = new Properties()
    +    kafkaParams.foreach(param => props.put(param._1, param._2))
    +    // Manually set "auto.commit.enable" to "false" no matter user explicitly set it to true,
    +    // we have to make sure this property is set to false to turn off auto commit mechanism in
    +    // Kafka.
    +    props.setProperty(AUTO_OFFSET_COMMIT, "false")
    +
    +    val consumerConfig = new ConsumerConfig(props)
    +
    +    assert(consumerConfig.autoCommitEnable == false)
    +
    +    logInfo(s"Connecting to Zookeeper: ${consumerConfig.zkConnect}")
    +    consumerConnector = Consumer.create(consumerConfig)
    +    logInfo(s"Connected to Zookeeper: ${consumerConfig.zkConnect}")
    +
    +    zkClient = new ZkClient(consumerConfig.zkConnect, consumerConfig.zkSessionTimeoutMs,
    +      consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer)
    +
    +    // start BlockGenerator
    +    blockGenerator.start()
    +
    +    val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
    +      .newInstance(consumerConfig.props)
    +      .asInstanceOf[Decoder[K]]
    +
    +    val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
    +      .newInstance(consumerConfig.props)
    +      .asInstanceOf[Decoder[V]]
    +
    +    val topicMessageStreams = consumerConnector.createMessageStreams(
    +      topics, keyDecoder, valueDecoder)
    +
    +    val executorPool = Executors.newFixedThreadPool(topics.values.sum)
    +
    +    try {
    +      topicMessageStreams.values.foreach { streams =>
    +        streams.foreach { stream =>
    +          executorPool.submit(new Runnable {
    --- End diff --
    
    Can you convert the runnable into a specific inner class. This code is very nested and therefore hard to understand.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-63023461
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23352/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-61168870
  
    @jerryshao Thanks for this change! I am taking a look at this. But the code does not merge. So please update the code to the master branch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-62862783
  
      [Test build #23305 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/23305/consoleFull) for   PR 2991 at commit [`2a20a01`](https://github.com/apache/spark/commit/2a20a0107af78f9db85bea28acd6cd4730af84f7).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-63017550
  
      [Test build #23352 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/23352/consoleFull) for   PR 2991 at commit [`5461f1c`](https://github.com/apache/spark/commit/5461f1c43b0e98aa7b583f14569eefd833b19df0).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-62522054
  
    Things to do for this PR
    1. Revert the change of separating out KafkaReceiver to minimize diff. Do not want anything to affect the main code path.
    2. Do not use `BlockGenerator.onStoreData`. Instead add synchronization in the `KafkaReliableReceiver`, such that it synchronous add data to BlockGenerator and update offset in the map. And also synchronously deals with generated blocks by pushing it and committing Kafka offsets. Its best to have all Kafka-specific complexity in the Kafka code, rather than pushing them down into the BlockGenerator code. Later, depending on whether other receivers need the functionality, we can push stuff into BlockGenerator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-62674113
  
      [Test build #23250 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/23250/consoleFull) for   PR 2991 at commit [`e5e21c1`](https://github.com/apache/spark/commit/e5e21c1b54222cc5595e42933f739ca3f078ed36).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2991#discussion_r20130229
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala ---
    @@ -0,0 +1,212 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka
    +
    +import java.util.Properties
    +import java.util.concurrent.{ConcurrentHashMap, Executors}
    +
    +import scala.collection.Map
    +import scala.collection.mutable
    +import scala.reflect.{classTag, ClassTag}
    +
    +import kafka.common.TopicAndPartition
    +import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector}
    +import kafka.serializer.Decoder
    +import kafka.utils.{ZkUtils, ZKGroupTopicDirs, ZKStringSerializer, VerifiableProperties}
    +import org.I0Itec.zkclient.ZkClient
    +
    +import org.apache.spark.{SparkEnv, Logging}
    +import org.apache.spark.storage.{StreamBlockId, StorageLevel}
    +import org.apache.spark.streaming.receiver.{BlockGeneratorListener, BlockGenerator, Receiver}
    +
    +private[streaming]
    +class ReliableKafkaReceiver[
    +  K: ClassTag,
    +  V: ClassTag,
    +  U <: Decoder[_]: ClassTag,
    +  T <: Decoder[_]: ClassTag](
    +    kafkaParams: Map[String, String],
    +    topics: Map[String, Int],
    +    storageLevel: StorageLevel)
    +    extends Receiver[Any](storageLevel) with Logging {
    +
    +  /** High level consumer to connect to Kafka */
    +  private var consumerConnector: ConsumerConnector = null
    +
    +  /** zkClient to connect to Zookeeper to commit the offsets */
    +  private var zkClient: ZkClient = null
    +
    +  private val groupId = kafkaParams("group.id")
    +
    +  private lazy val env = SparkEnv.get
    +
    +  private val AUTO_OFFSET_COMMIT = "auto.commit.enable"
    +
    +  /** A HashMap to manage the offset for each topic/partition, this HashMap is called in
    +    * synchronized block, so mutable HashMap will not meet concurrency issue */
    +  private lazy val topicPartitionOffsetMap = new mutable.HashMap[TopicAndPartition, Long]
    +
    +  /** A concurrent HashMap to store the stream block id and related offset snapshot */
    +  private lazy val blockOffsetMap =
    +    new ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]]
    +
    +  private lazy val blockGeneratorListener = new BlockGeneratorListener {
    +    override def onStoreData(data: Any, metadata: Any): Unit = {
    +      if (metadata != null) {
    +        val kafkaMetadata = metadata.asInstanceOf[(TopicAndPartition, Long)]
    +        topicPartitionOffsetMap.put(kafkaMetadata._1, kafkaMetadata._2)
    +      }
    +    }
    +
    +    override def onGenerateBlock(blockId: StreamBlockId): Unit = {
    +      // Get a snapshot of current offset map and store with related block id. Since this hook
    +      // function is called in synchronized block, so we can get the snapshot without explicit lock.
    +      val offsetSnapshot = topicPartitionOffsetMap.toMap
    +      blockOffsetMap.put(blockId, offsetSnapshot)
    +      topicPartitionOffsetMap.clear()
    +    }
    +
    +    override def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
    +      store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[Any]])
    +
    +      // Commit and remove the related offsets.
    +      Option(blockOffsetMap.get(blockId)).foreach { offsetMap =>
    +        commitOffset(offsetMap)
    +      }
    +      blockOffsetMap.remove(blockId)
    +    }
    +
    +    override def onError(message: String, throwable: Throwable): Unit = {
    +      reportError(message, throwable)
    +    }
    +  }
    +
    +  /** Manage the BlockGenerator in receiver itself for better managing block store and offset
    +    * commit */
    +  private var blockGenerator: BlockGenerator = null
    +
    +  override def onStop(): Unit = {
    +    if (consumerConnector != null) {
    +      consumerConnector.shutdown()
    +      consumerConnector = null
    +    }
    +
    +    if (zkClient != null) {
    +      zkClient.close()
    +      zkClient = null
    +    }
    +
    +    blockGenerator.stop()
    +  }
    +
    +  override def onStart(): Unit = {
    +    logInfo(s"Starting Kafka Consumer Stream with group: $groupId")
    +
    +    blockGenerator = new BlockGenerator(blockGeneratorListener, streamId, env.conf)
    +
    +    if (kafkaParams.contains(AUTO_OFFSET_COMMIT) && kafkaParams(AUTO_OFFSET_COMMIT) == "true") {
    +      logWarning(s"$AUTO_OFFSET_COMMIT should be set to false in ReliableKafkaReceiver, " +
    +        "otherwise we will manually set it to false to turn off auto offset commit in Kafka")
    +    }
    +
    +    val props = new Properties()
    +    kafkaParams.foreach(param => props.put(param._1, param._2))
    +    // Manually set "auto.commit.enable" to "false" no matter user explicitly set it to true,
    +    // we have to make sure this property is set to false to turn off auto commit mechanism in
    +    // Kafka.
    +    props.setProperty(AUTO_OFFSET_COMMIT, "false")
    +
    +    val consumerConfig = new ConsumerConfig(props)
    +
    +    assert(consumerConfig.autoCommitEnable == false)
    +
    +    logInfo(s"Connecting to Zookeeper: ${consumerConfig.zkConnect}")
    +    consumerConnector = Consumer.create(consumerConfig)
    +    logInfo(s"Connected to Zookeeper: ${consumerConfig.zkConnect}")
    +
    +    zkClient = new ZkClient(consumerConfig.zkConnect, consumerConfig.zkSessionTimeoutMs,
    +      consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer)
    +
    +    // start BlockGenerator
    +    blockGenerator.start()
    +
    +    val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
    +      .newInstance(consumerConfig.props)
    +      .asInstanceOf[Decoder[K]]
    +
    +    val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
    +      .newInstance(consumerConfig.props)
    +      .asInstanceOf[Decoder[V]]
    +
    +    val topicMessageStreams = consumerConnector.createMessageStreams(
    +      topics, keyDecoder, valueDecoder)
    +
    +    val executorPool = Executors.newFixedThreadPool(topics.values.sum)
    --- End diff --
    
    This changes was not made.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2991#discussion_r20129828
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaReceiver.scala ---
    @@ -0,0 +1,135 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka
    +
    +import java.util.Properties
    +import java.util.concurrent.Executors
    +
    +import scala.collection.Map
    +import scala.reflect.{classTag, ClassTag}
    +
    +import kafka.consumer._
    +import kafka.serializer.Decoder
    +import kafka.utils.VerifiableProperties
    +import kafka.utils.ZKStringSerializer
    +import org.I0Itec.zkclient._
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.streaming.receiver.Receiver
    +
    +private[streaming]
    +class KafkaReceiver[
    +  K: ClassTag,
    +  V: ClassTag,
    +  U <: Decoder[_]: ClassTag,
    +  T <: Decoder[_]: ClassTag](
    +    kafkaParams: Map[String, String],
    +    topics: Map[String, Int],
    +    storageLevel: StorageLevel
    +  ) extends Receiver[Any](storageLevel) with Logging {
    +
    +  // Connection to Kafka
    +  var consumerConnector: ConsumerConnector = null
    +
    +  def onStop() {
    +    if (consumerConnector != null) {
    +      consumerConnector.shutdown()
    +    }
    +  }
    +
    +  def onStart() {
    +
    +    logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id"))
    +
    +    // Kafka connection properties
    +    val props = new Properties()
    +    kafkaParams.foreach(param => props.put(param._1, param._2))
    +
    +    val zkConnect = kafkaParams("zookeeper.connect")
    +    // Create the connection to the cluster
    +    logInfo("Connecting to Zookeeper: " + zkConnect)
    +    val consumerConfig = new ConsumerConfig(props)
    +    consumerConnector = Consumer.create(consumerConfig)
    +    logInfo("Connected to " + zkConnect)
    +
    +    // When auto.offset.reset is defined, it is our responsibility to try and whack the
    +    // consumer group zk node.
    +    if (kafkaParams.contains("auto.offset.reset")) {
    +      tryZookeeperConsumerGroupCleanup(zkConnect, kafkaParams("group.id"))
    +    }
    +
    +    val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
    +      .newInstance(consumerConfig.props)
    +      .asInstanceOf[Decoder[K]]
    +    val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
    +      .newInstance(consumerConfig.props)
    +      .asInstanceOf[Decoder[V]]
    +
    +    // Create Threads for each Topic/Message Stream we are listening
    +    val topicMessageStreams = consumerConnector.createMessageStreams(
    +      topics, keyDecoder, valueDecoder)
    +
    +    val executorPool = Executors.newFixedThreadPool(topics.values.sum)
    +    try {
    +      // Start the messages handler for each partition
    +      topicMessageStreams.values.foreach { streams =>
    +        streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }
    +      }
    +    } finally {
    +      executorPool.shutdown() // Just causes threads to terminate after work is done
    +    }
    +  }
    +
    +  // Handles Kafka Messages
    +  private class MessageHandler[K: ClassTag, V: ClassTag](stream: KafkaStream[K, V])
    +    extends Runnable {
    +    def run() {
    +      logInfo("Starting MessageHandler.")
    +      try {
    +        for (msgAndMetadata <- stream) {
    +          store((msgAndMetadata.key, msgAndMetadata.message))
    +        }
    +      } catch {
    +        case e: Throwable => logError("Error handling message; exiting", e)
    +      }
    +    }
    +  }
    +
    +  // It is our responsibility to delete the consumer group when specifying auto.offset.reset. This
    +  // is because Kafka 0.7.2 only honors this param when the group is not in zookeeper.
    +  //
    +  // The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied
    +  // from Kafka's ConsoleConsumer. See code related to 'auto.offset.reset' when it is set to
    +  // 'smallest'/'largest':
    +  // scalastyle:off
    +  // https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
    +  // scalastyle:on
    +  private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) {
    +    val dir = "/consumers/" + groupId
    +    logInfo("Cleaning up temporary Zookeeper data under " + dir + ".")
    --- End diff --
    
    Hi TD, this is a quite old code and should be removed since we already update Kafka to 0.8, would you mind taking a look at this PR #1420 , it is pending for a long while.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-60887199
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22438/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2991#discussion_r20270965
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala ---
    @@ -0,0 +1,266 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka
    +
    +import java.util.Properties
    +import java.util.concurrent.{ThreadPoolExecutor, ConcurrentHashMap}
    +
    +import scala.collection.{Map, mutable}
    +import scala.reflect.{ClassTag, classTag}
    +
    +import kafka.common.TopicAndPartition
    +import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, KafkaStream}
    +import kafka.message.MessageAndMetadata
    +import kafka.serializer.Decoder
    +import kafka.utils.{VerifiableProperties, ZKGroupTopicDirs, ZKStringSerializer, ZkUtils}
    +import org.I0Itec.zkclient.ZkClient
    +
    +import org.apache.spark.{Logging, SparkEnv}
    +import org.apache.spark.storage.{StorageLevel, StreamBlockId}
    +import org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListener, Receiver}
    +import org.apache.spark.util.Utils
    +
    +/**
    + * ReliableKafkaReceiver offers the ability to reliably store data into BlockManager without loss.
    + * It is turned off by default and will be enabled when
    + * spark.streaming.receiver.writeAheadLog.enable is true. The difference compared to KafkaReceiver
    + * is that this receiver manages topic-partition/offset itself and updates the offset information
    + * after data is reliably stored as write-ahead log. Offsets will only be updated when data is
    + * reliably stored, so the potential data loss problem of KafkaReceiver can be eliminated.
    + *
    + * Note: ReliableKafkaReceiver will set auto.commit.enable to false to turn off automatic offset
    + * commit mechanism in Kafka consumer. So setting this configuration manually within kafkaParams
    + * will not take effect.
    + */
    +private[streaming]
    +class ReliableKafkaReceiver[
    +  K: ClassTag,
    +  V: ClassTag,
    +  U <: Decoder[_]: ClassTag,
    +  T <: Decoder[_]: ClassTag](
    +    kafkaParams: Map[String, String],
    +    topics: Map[String, Int],
    +    storageLevel: StorageLevel)
    +    extends Receiver[(K, V)](storageLevel) with Logging {
    +
    +  private val groupId = kafkaParams("group.id")
    +  private val AUTO_OFFSET_COMMIT = "auto.commit.enable"
    +  private def conf = SparkEnv.get.conf
    +
    +  /** High level consumer to connect to Kafka. */
    +  private var consumerConnector: ConsumerConnector = null
    +
    +  /** zkClient to connect to Zookeeper to commit the offsets. */
    +  private var zkClient: ZkClient = null
    +
    +  /**
    +   * A HashMap to manage the offset for each topic/partition, this HashMap is called in
    +   * synchronized block, so mutable HashMap will not meet concurrency issue.
    +   */
    +  private var topicPartitionOffsetMap: mutable.HashMap[TopicAndPartition, Long] = null
    +
    +  /** A concurrent HashMap to store the stream block id and related offset snapshot. */
    +  private var blockOffsetMap: ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]] = null
    +
    +  /**
    +   * Manage the BlockGenerator in receiver itself for better managing block store and offset
    +   * commit.
    +   */
    +  private var blockGenerator: BlockGenerator = null
    +
    +  /** Threadpool running the handlers for receiving message from multiple topics and partitions. */
    +  private var messageHandlerThreadPool: ThreadPoolExecutor = null
    +
    +  override def onStart(): Unit = {
    +    logInfo(s"Starting Kafka Consumer Stream with group: $groupId")
    +
    +    // Initialize the topic-partition / offset hash map.
    +    topicPartitionOffsetMap = new mutable.HashMap[TopicAndPartition, Long]
    +
    +    // Initialize the stream block id / offset snapshot hash map.
    +    blockOffsetMap = new ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]]()
    +
    +    // Initialize the block generator for storing Kafka message.
    +    blockGenerator = new BlockGenerator(new GeneratedBlockHandler, streamId, conf)
    +
    +    if (kafkaParams.contains(AUTO_OFFSET_COMMIT) && kafkaParams(AUTO_OFFSET_COMMIT) == "true") {
    +      logWarning(s"$AUTO_OFFSET_COMMIT should be set to false in ReliableKafkaReceiver, " +
    --- End diff --
    
    Good addition!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-62997454
  
    Lets see if this passes jenkins, I hadnt tried that yet


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-60875453
  
    Hi @tdas , would you mind taking a look at this? Thanks a lot.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-62094150
  
    Thanks a lot for your review, I will address the above comments, besides we are under test with StreamBench, the result will return to your later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-62982811
  
    I did more refactoring for Refactoring 2, to create https://github.com/jerryshao/apache-spark/pull/8 . This is what I finally recommend for merging. Please take a look. I have run the 3 kafka testsuites repeatedly for a long time, and they seemed consistently pass. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-63117684
  
    Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2991#discussion_r20210141
  
    --- Diff: external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala ---
    @@ -0,0 +1,173 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka
    +
    +import java.io.File
    +
    +import scala.collection.mutable
    +
    +import kafka.serializer.StringDecoder
    +import kafka.utils.{ZkUtils, ZKGroupTopicDirs}
    +
    +import org.apache.spark.SparkConf
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.streaming.StreamingContext
    +import org.apache.spark.util.Utils
    +
    +class ReliableKafkaStreamSuite extends KafkaStreamSuite {
    +  import KafkaTestUtils._
    +
    +  test("Reliable Kafka input stream") {
    +    val sparkConf = new SparkConf()
    +      .setMaster(master)
    +      .setAppName(framework)
    +      .set("spark.streaming.receiver.writeAheadLog.enable", "true")
    +    val ssc = new StreamingContext(sparkConf, batchDuration)
    +    val checkpointDir = s"${System.getProperty("java.io.tmpdir", "/tmp")}/" +
    +      s"test-checkpoint${random.nextInt(10000)}"
    +    Utils.registerShutdownDeleteDir(new File(checkpointDir))
    +    ssc.checkpoint(checkpointDir)
    +
    +    val topic = "test"
    +    val sent = Map("a" -> 1, "b" -> 1, "c" -> 1)
    +    createTopic(topic)
    +    produceAndSendMessage(topic, sent)
    +
    +    val kafkaParams = Map("zookeeper.connect" -> s"$zkHost:$zkPort",
    +      "group.id" -> s"test-consumer-${random.nextInt(10000)}",
    +      "auto.offset.reset" -> "smallest")
    +
    +    val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
    +      ssc,
    +      kafkaParams,
    +      Map(topic -> 1),
    +      StorageLevel.MEMORY_ONLY)
    +    val result = new mutable.HashMap[String, Long]()
    +    stream.map { case (k, v) => v }
    +      .foreachRDD { r =>
    +        val ret = r.collect()
    +        ret.foreach { v =>
    +          val count = result.getOrElseUpdate(v, 0) + 1
    +          result.put(v, count)
    +        }
    +      }
    +    ssc.start()
    +    ssc.awaitTermination(3000)
    +
    +    // A basic process verification for ReliableKafkaReceiver.
    +    // Verify whether received message number is equal to the sent message number.
    +    assert(sent.size === result.size)
    +    // Verify whether each message is the same as the data to be verified.
    +    sent.keys.foreach { k => assert(sent(k) === result(k).toInt) }
    +
    +    ssc.stop()
    +  }
    +
    +  test("Verify the offset commit") {
    +    // Verify the corretness of offset commit mechanism.
    --- End diff --
    
    nit: spelling mistake


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-61224754
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22595/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2991#discussion_r20129945
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala ---
    @@ -0,0 +1,241 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka
    +
    +import java.util.Properties
    +import java.util.concurrent.ConcurrentHashMap
    +
    +import scala.collection.Map
    +import scala.collection.mutable
    +import scala.reflect.{classTag, ClassTag}
    +
    +import kafka.common.TopicAndPartition
    +import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, KafkaStream}
    +import kafka.serializer.Decoder
    +import kafka.utils.{ZkUtils, ZKGroupTopicDirs, ZKStringSerializer, VerifiableProperties}
    +import org.I0Itec.zkclient.ZkClient
    +
    +import org.apache.spark.{SparkEnv, Logging}
    +import org.apache.spark.storage.{StreamBlockId, StorageLevel}
    +import org.apache.spark.streaming.receiver.{BlockGeneratorListener, BlockGenerator, Receiver}
    +import org.apache.spark.util.Utils
    +
    +private[streaming]
    +class ReliableKafkaReceiver[
    +  K: ClassTag,
    +  V: ClassTag,
    +  U <: Decoder[_]: ClassTag,
    +  T <: Decoder[_]: ClassTag](
    +    kafkaParams: Map[String, String],
    +    topics: Map[String, Int],
    +    storageLevel: StorageLevel)
    +    extends Receiver[Any](storageLevel) with Logging {
    +
    +  /** High level consumer to connect to Kafka. */
    +  private var consumerConnector: ConsumerConnector = null
    +
    +  /** zkClient to connect to Zookeeper to commit the offsets. */
    +  private var zkClient: ZkClient = null
    +
    +  private val groupId = kafkaParams("group.id")
    +
    +  private def conf() = SparkEnv.get.conf
    +
    +  private val AUTO_OFFSET_COMMIT = "auto.commit.enable"
    +
    +  /**
    --- End diff --
    
    It is fine to leave it as is in this PR, but in future please use `//` comments or `/* ... */` comments for inline commenting. The `/** ... */` style is used only for scala docs.  See https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide#SparkCodeStyleGuide-Codedocumentationstyle


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-62523259
  
    Greatly appreciate your comments, thanks a lot. I will change the code as you suggested. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-62347765
  
    Hi @tdas , thanks a lot for your comments. I've addressed all the comments you mentioned before. Would you mind taking a look at the updated version? Thanks a lot.
    
    Besides there's one concern I have to bring out: the overhead of committing offsets to Zookeeper. Since we now will update the offsets to ZK after pushing data into WAL and BM, the request period is 200ms in normal situation. I'm not this frequency will bring overhead to the ZK, but compared to the default Kafka's commit frequency (1 mins), it is too frequent. In my local test, because my cluster is a small cluster, it is quite fine, but I'm not sure if the cluster reaches to hundreds of nodes.
    
    If we need to do this synchronous offsets commit mechanism, this problem cannot be easily solved, even use low level API. I think this problem can be addressed by Kafka 0.9, it will manager the offsets itself, not rely on ZK, so the ZK overhead will be alleviated.
    
    So what is your opinion?
     


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-62996965
  
      [Test build #23345 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/23345/consoleFull) for   PR 2991 at commit [`5461f1c`](https://github.com/apache/spark/commit/5461f1c43b0e98aa7b583f14569eefd833b19df0).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-62093876
  
    I have made some high level comments, but I am going to do another deeper roudn to understand the logic and correctness. In the mean time, since the WAL has been merged, could you test whether this patch is working as expected with the WAL turned on?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-60878841
  
      [Test build #22425 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22425/consoleFull) for   PR 2991 at commit [`5cc4cb1`](https://github.com/apache/spark/commit/5cc4cb198662cb35008d9a2e46e320f75ce35a71).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-62352704
  
      [Test build #23142 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/23142/consoleFull) for   PR 2991 at commit [`c174454`](https://github.com/apache/spark/commit/c1744547d0369045fc53a715fa227e7b34edce55).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-62862109
  
    Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-62860980
  
      [Test build #23300 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/23300/consoleFull) for   PR 2991 at commit [`2a20a01`](https://github.com/apache/spark/commit/2a20a0107af78f9db85bea28acd6cd4730af84f7).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2991#discussion_r20139202
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaReceiver.scala ---
    @@ -0,0 +1,135 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka
    +
    +import java.util.Properties
    +import java.util.concurrent.Executors
    +
    +import scala.collection.Map
    +import scala.reflect.{classTag, ClassTag}
    +
    +import kafka.consumer._
    +import kafka.serializer.Decoder
    +import kafka.utils.VerifiableProperties
    +import kafka.utils.ZKStringSerializer
    +import org.I0Itec.zkclient._
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.streaming.receiver.Receiver
    +
    +private[streaming]
    +class KafkaReceiver[
    +  K: ClassTag,
    +  V: ClassTag,
    +  U <: Decoder[_]: ClassTag,
    +  T <: Decoder[_]: ClassTag](
    +    kafkaParams: Map[String, String],
    +    topics: Map[String, Int],
    +    storageLevel: StorageLevel
    +  ) extends Receiver[Any](storageLevel) with Logging {
    --- End diff --
    
    This should be probably be Receiver[(K, V)]. I know that this is how it is in the existing receiver, but this probably wrong.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2991#discussion_r20139743
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaReceiver.scala ---
    @@ -0,0 +1,135 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka
    +
    +import java.util.Properties
    +import java.util.concurrent.Executors
    +
    +import scala.collection.Map
    +import scala.reflect.{classTag, ClassTag}
    +
    +import kafka.consumer._
    +import kafka.serializer.Decoder
    +import kafka.utils.VerifiableProperties
    +import kafka.utils.ZKStringSerializer
    +import org.I0Itec.zkclient._
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.streaming.receiver.Receiver
    +
    +private[streaming]
    +class KafkaReceiver[
    +  K: ClassTag,
    +  V: ClassTag,
    +  U <: Decoder[_]: ClassTag,
    +  T <: Decoder[_]: ClassTag](
    +    kafkaParams: Map[String, String],
    +    topics: Map[String, Int],
    +    storageLevel: StorageLevel
    +  ) extends Receiver[Any](storageLevel) with Logging {
    --- End diff --
    
    OK, I will fix this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-63002982
  
      [Test build #23345 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/23345/consoleFull) for   PR 2991 at commit [`5461f1c`](https://github.com/apache/spark/commit/5461f1c43b0e98aa7b583f14569eefd833b19df0).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2991#discussion_r20215912
  
    --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala ---
    @@ -0,0 +1,251 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka
    +
    +import java.util.Properties
    +import java.util.concurrent.ConcurrentHashMap
    +
    +import scala.collection.Map
    +import scala.collection.mutable
    +import scala.reflect.{classTag, ClassTag}
    +
    +import kafka.common.TopicAndPartition
    +import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, KafkaStream}
    +import kafka.serializer.Decoder
    +import kafka.utils.{ZkUtils, ZKGroupTopicDirs, ZKStringSerializer, VerifiableProperties}
    +import org.I0Itec.zkclient.ZkClient
    +
    +import org.apache.spark.{SparkEnv, Logging}
    +import org.apache.spark.storage.{StreamBlockId, StorageLevel}
    +import org.apache.spark.streaming.receiver.{BlockGeneratorListener, BlockGenerator, Receiver}
    +import org.apache.spark.util.Utils
    +
    +
    +/**
    + * ReliableKafkaReceiver offers the ability to reliably store data into BlockManager without loss.
    + * It is turned off by default and will be enabled when
    + * spark.streaming.receiver.writeAheadLog.enable is true. The difference compared to KafkaReceiver
    + * is that this receiver manages topic-partition/offset itself and updates the offset information
    + * after data is reliably stored as write-ahead log. Offsets will only be updated when data is
    + * reliably stored, so the potential data loss problem of KafkaReceiver can be eliminated.
    + *
    + * Note: ReliableKafkaReceiver will set auto.commit.enable to false to turn off automatic offset
    + * commit mechanism in Kafka consumer. So setting this configuration manually within kafkaParams
    + * will not take effect.
    + */
    +private[streaming]
    +class ReliableKafkaReceiver[
    +  K: ClassTag,
    +  V: ClassTag,
    +  U <: Decoder[_]: ClassTag,
    +  T <: Decoder[_]: ClassTag](
    +    kafkaParams: Map[String, String],
    +    topics: Map[String, Int],
    +    storageLevel: StorageLevel)
    +    extends Receiver[(K, V)](storageLevel) with Logging {
    +
    +  private val groupId = kafkaParams("group.id")
    +
    +  private val AUTO_OFFSET_COMMIT = "auto.commit.enable"
    +
    +  private def conf() = SparkEnv.get.conf
    +
    +  /** High level consumer to connect to Kafka. */
    +  private var consumerConnector: ConsumerConnector = null
    +
    +  /** zkClient to connect to Zookeeper to commit the offsets. */
    +  private var zkClient: ZkClient = null
    +
    +  /**
    +   * A HashMap to manage the offset for each topic/partition, this HashMap is called in
    +   * synchronized block, so mutable HashMap will not meet concurrency issue.
    +   */
    +  private var topicPartitionOffsetMap: mutable.HashMap[TopicAndPartition, Long] = null
    +
    +  /** A concurrent HashMap to store the stream block id and related offset snapshot. */
    +  private var blockOffsetMap: ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]] = null
    +
    +  /**
    +   * Manage the BlockGenerator in receiver itself for better managing block store and offset
    +   * commit.
    +   */
    +  private var blockGenerator: BlockGenerator = null
    +
    +  /** Kafka offsets checkpoint listener to register into BlockGenerator for offsets checkpoint. */
    +  private final class OffsetCheckpointListener extends BlockGeneratorListener {
    +
    +    override def onGenerateBlock(blockId: StreamBlockId): Unit = {
    +      // Get a snapshot of current offset map and store with related block id. Since this hook
    +      // function is called in synchronized block, so we can get the snapshot without explicit lock.
    +      val offsetSnapshot = topicPartitionOffsetMap.toMap
    --- End diff --
    
    Because this hook function `onGenerateBlock` is called in function `updateCurrentBuffer`, which is synchronized with BlockGenerator object, so  I don't think we need to add another lock here. At the time `updateCurrentBuffer` is called, because we get a lock, so there is no data will add into BlockGenerator and insert offset into topicPartitionOffsetMap.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-62849975
  
      [Test build #23300 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/23300/consoleFull) for   PR 2991 at commit [`2a20a01`](https://github.com/apache/spark/commit/2a20a0107af78f9db85bea28acd6cd4730af84f7).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4062][Streaming]Add ReliableKafkaReceiv...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2991#issuecomment-62669315
  
      [Test build #23245 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/23245/consoleFull) for   PR 2991 at commit [`ea873e4`](https://github.com/apache/spark/commit/ea873e424424747c8807adbc0c217a5fbf80e060).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org