You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2014/11/14 23:33:42 UTC

spark git commit: [SPARK-4062][Streaming]Add ReliableKafkaReceiver in Spark Streaming Kafka connector

Repository: spark
Updated Branches:
  refs/heads/master 0cbdb01e1 -> 5930f64bf


[SPARK-4062][Streaming]Add ReliableKafkaReceiver in Spark Streaming Kafka connector

Add ReliableKafkaReceiver in Kafka connector to prevent data loss if WAL in Spark Streaming is enabled. Details and design doc can be seen in [SPARK-4062](https://issues.apache.org/jira/browse/SPARK-4062).

Author: jerryshao <sa...@intel.com>
Author: Tathagata Das <ta...@gmail.com>
Author: Saisai Shao <sa...@intel.com>

Closes #2991 from jerryshao/kafka-refactor and squashes the following commits:

5461f1c [Saisai Shao] Merge pull request #8 from tdas/kafka-refactor3
eae4ad6 [Tathagata Das] Refectored KafkaStreamSuiteBased to eliminate KafkaTestUtils and made Java more robust.
fab14c7 [Tathagata Das] minor update.
149948b [Tathagata Das] Fixed mistake
14630aa [Tathagata Das] Minor updates.
d9a452c [Tathagata Das] Minor updates.
ec2e95e [Tathagata Das] Removed the receiver's locks and essentially reverted to Saisai's original design.
2a20a01 [jerryshao] Address some comments
9f636b3 [Saisai Shao] Merge pull request #5 from tdas/kafka-refactor
b2b2f84 [Tathagata Das] Refactored Kafka receiver logic and Kafka testsuites
e501b3c [jerryshao] Add Mima excludes
b798535 [jerryshao] Fix the missed issue
e5e21c1 [jerryshao] Change to while loop
ea873e4 [jerryshao] Further address the comments
98f3d07 [jerryshao] Fix comment style
4854ee9 [jerryshao] Address all the comments
96c7a1d [jerryshao] Update the ReliableKafkaReceiver unit test
8135d31 [jerryshao] Fix flaky test
a949741 [jerryshao] Address the comments
16bfe78 [jerryshao] Change the ordering of imports
0894aef [jerryshao] Add some comments
77c3e50 [jerryshao] Code refactor and add some unit tests
dd9aeeb [jerryshao] Initial commit for reliable Kafka receiver


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5930f64b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5930f64b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5930f64b

Branch: refs/heads/master
Commit: 5930f64bf0d2516304b21bd49eac361a54caabdd
Parents: 0cbdb01
Author: jerryshao <sa...@intel.com>
Authored: Fri Nov 14 14:33:37 2014 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Fri Nov 14 14:33:37 2014 -0800

----------------------------------------------------------------------
 .../streaming/kafka/KafkaInputDStream.scala     |  33 ++-
 .../spark/streaming/kafka/KafkaUtils.scala      |   4 +-
 .../streaming/kafka/ReliableKafkaReceiver.scala | 282 +++++++++++++++++++
 .../streaming/kafka/JavaKafkaStreamSuite.java   |  44 +--
 .../streaming/kafka/KafkaStreamSuite.scala      | 216 +++++++-------
 .../kafka/ReliableKafkaStreamSuite.scala        | 140 +++++++++
 project/MimaExcludes.scala                      |   4 +
 .../streaming/receiver/BlockGenerator.scala     |  55 +++-
 .../receiver/ReceiverSupervisorImpl.scala       |   8 +-
 .../apache/spark/streaming/ReceiverSuite.scala  |   8 +-
 10 files changed, 651 insertions(+), 143 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5930f64b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
index 28ac592..4d26b64 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
@@ -17,13 +17,12 @@
 
 package org.apache.spark.streaming.kafka
 
+import java.util.Properties
+
 import scala.collection.Map
 import scala.reflect.{classTag, ClassTag}
 
-import java.util.Properties
-import java.util.concurrent.Executors
-
-import kafka.consumer._
+import kafka.consumer.{KafkaStream, Consumer, ConsumerConfig, ConsumerConnector}
 import kafka.serializer.Decoder
 import kafka.utils.VerifiableProperties
 
@@ -32,6 +31,7 @@ import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.StreamingContext
 import org.apache.spark.streaming.dstream._
 import org.apache.spark.streaming.receiver.Receiver
+import org.apache.spark.util.Utils
 
 /**
  * Input stream that pulls messages from a Kafka Broker.
@@ -51,12 +51,16 @@ class KafkaInputDStream[
     @transient ssc_ : StreamingContext,
     kafkaParams: Map[String, String],
     topics: Map[String, Int],
+    useReliableReceiver: Boolean,
     storageLevel: StorageLevel
   ) extends ReceiverInputDStream[(K, V)](ssc_) with Logging {
 
   def getReceiver(): Receiver[(K, V)] = {
-    new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
-        .asInstanceOf[Receiver[(K, V)]]
+    if (!useReliableReceiver) {
+      new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
+    } else {
+      new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
+    }
   }
 }
 
@@ -69,14 +73,15 @@ class KafkaReceiver[
     kafkaParams: Map[String, String],
     topics: Map[String, Int],
     storageLevel: StorageLevel
-  ) extends Receiver[Any](storageLevel) with Logging {
+  ) extends Receiver[(K, V)](storageLevel) with Logging {
 
   // Connection to Kafka
-  var consumerConnector : ConsumerConnector = null
+  var consumerConnector: ConsumerConnector = null
 
   def onStop() {
     if (consumerConnector != null) {
       consumerConnector.shutdown()
+      consumerConnector = null
     }
   }
 
@@ -102,11 +107,11 @@ class KafkaReceiver[
       .newInstance(consumerConfig.props)
       .asInstanceOf[Decoder[V]]
 
-    // Create Threads for each Topic/Message Stream we are listening
+    // Create threads for each topic/message Stream we are listening
     val topicMessageStreams = consumerConnector.createMessageStreams(
       topics, keyDecoder, valueDecoder)
 
-    val executorPool = Executors.newFixedThreadPool(topics.values.sum)
+    val executorPool = Utils.newDaemonFixedThreadPool(topics.values.sum, "KafkaMessageHandler")
     try {
       // Start the messages handler for each partition
       topicMessageStreams.values.foreach { streams =>
@@ -117,13 +122,15 @@ class KafkaReceiver[
     }
   }
 
-  // Handles Kafka Messages
-  private class MessageHandler[K: ClassTag, V: ClassTag](stream: KafkaStream[K, V])
+  // Handles Kafka messages
+  private class MessageHandler(stream: KafkaStream[K, V])
     extends Runnable {
     def run() {
       logInfo("Starting MessageHandler.")
       try {
-        for (msgAndMetadata <- stream) {
+        val streamIterator = stream.iterator()
+        while (streamIterator.hasNext()) {
+          val msgAndMetadata = streamIterator.next()
           store((msgAndMetadata.key, msgAndMetadata.message))
         }
       } catch {

http://git-wip-us.apache.org/repos/asf/spark/blob/5930f64b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index ec812e1..b4ac929 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -70,7 +70,8 @@ object KafkaUtils {
       topics: Map[String, Int],
       storageLevel: StorageLevel
     ): ReceiverInputDStream[(K, V)] = {
-    new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, storageLevel)
+    val walEnabled = ssc.conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)
+    new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel)
   }
 
   /**
@@ -99,7 +100,6 @@ object KafkaUtils {
    * @param topics    Map of (topic_name -> numPartitions) to consume. Each partition is consumed
    *                  in its own thread.
    * @param storageLevel RDD storage level.
-   *
    */
   def createStream(
       jssc: JavaStreamingContext,

http://git-wip-us.apache.org/repos/asf/spark/blob/5930f64b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
new file mode 100644
index 0000000..be734b8
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.util.Properties
+import java.util.concurrent.{ThreadPoolExecutor, ConcurrentHashMap}
+
+import scala.collection.{Map, mutable}
+import scala.reflect.{ClassTag, classTag}
+
+import kafka.common.TopicAndPartition
+import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, KafkaStream}
+import kafka.message.MessageAndMetadata
+import kafka.serializer.Decoder
+import kafka.utils.{VerifiableProperties, ZKGroupTopicDirs, ZKStringSerializer, ZkUtils}
+import org.I0Itec.zkclient.ZkClient
+
+import org.apache.spark.{Logging, SparkEnv}
+import org.apache.spark.storage.{StorageLevel, StreamBlockId}
+import org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListener, Receiver}
+import org.apache.spark.util.Utils
+
+/**
+ * ReliableKafkaReceiver offers the ability to reliably store data into BlockManager without loss.
+ * It is turned off by default and will be enabled when
+ * spark.streaming.receiver.writeAheadLog.enable is true. The difference compared to KafkaReceiver
+ * is that this receiver manages topic-partition/offset itself and updates the offset information
+ * after data is reliably stored as write-ahead log. Offsets will only be updated when data is
+ * reliably stored, so the potential data loss problem of KafkaReceiver can be eliminated.
+ *
+ * Note: ReliableKafkaReceiver will set auto.commit.enable to false to turn off automatic offset
+ * commit mechanism in Kafka consumer. So setting this configuration manually within kafkaParams
+ * will not take effect.
+ */
+private[streaming]
+class ReliableKafkaReceiver[
+  K: ClassTag,
+  V: ClassTag,
+  U <: Decoder[_]: ClassTag,
+  T <: Decoder[_]: ClassTag](
+    kafkaParams: Map[String, String],
+    topics: Map[String, Int],
+    storageLevel: StorageLevel)
+    extends Receiver[(K, V)](storageLevel) with Logging {
+
+  private val groupId = kafkaParams("group.id")
+  private val AUTO_OFFSET_COMMIT = "auto.commit.enable"
+  private def conf = SparkEnv.get.conf
+
+  /** High level consumer to connect to Kafka. */
+  private var consumerConnector: ConsumerConnector = null
+
+  /** zkClient to connect to Zookeeper to commit the offsets. */
+  private var zkClient: ZkClient = null
+
+  /**
+   * A HashMap to manage the offset for each topic/partition, this HashMap is called in
+   * synchronized block, so mutable HashMap will not meet concurrency issue.
+   */
+  private var topicPartitionOffsetMap: mutable.HashMap[TopicAndPartition, Long] = null
+
+  /** A concurrent HashMap to store the stream block id and related offset snapshot. */
+  private var blockOffsetMap: ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]] = null
+
+  /**
+   * Manage the BlockGenerator in receiver itself for better managing block store and offset
+   * commit.
+   */
+  private var blockGenerator: BlockGenerator = null
+
+  /** Thread pool running the handlers for receiving message from multiple topics and partitions. */
+  private var messageHandlerThreadPool: ThreadPoolExecutor = null
+
+  override def onStart(): Unit = {
+    logInfo(s"Starting Kafka Consumer Stream with group: $groupId")
+
+    // Initialize the topic-partition / offset hash map.
+    topicPartitionOffsetMap = new mutable.HashMap[TopicAndPartition, Long]
+
+    // Initialize the stream block id / offset snapshot hash map.
+    blockOffsetMap = new ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]]()
+
+    // Initialize the block generator for storing Kafka message.
+    blockGenerator = new BlockGenerator(new GeneratedBlockHandler, streamId, conf)
+
+    if (kafkaParams.contains(AUTO_OFFSET_COMMIT) && kafkaParams(AUTO_OFFSET_COMMIT) == "true") {
+      logWarning(s"$AUTO_OFFSET_COMMIT should be set to false in ReliableKafkaReceiver, " +
+        "otherwise we will manually set it to false to turn off auto offset commit in Kafka")
+    }
+
+    val props = new Properties()
+    kafkaParams.foreach(param => props.put(param._1, param._2))
+    // Manually set "auto.commit.enable" to "false" no matter user explicitly set it to true,
+    // we have to make sure this property is set to false to turn off auto commit mechanism in
+    // Kafka.
+    props.setProperty(AUTO_OFFSET_COMMIT, "false")
+
+    val consumerConfig = new ConsumerConfig(props)
+
+    assert(!consumerConfig.autoCommitEnable)
+
+    logInfo(s"Connecting to Zookeeper: ${consumerConfig.zkConnect}")
+    consumerConnector = Consumer.create(consumerConfig)
+    logInfo(s"Connected to Zookeeper: ${consumerConfig.zkConnect}")
+
+    zkClient = new ZkClient(consumerConfig.zkConnect, consumerConfig.zkSessionTimeoutMs,
+      consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer)
+
+    messageHandlerThreadPool = Utils.newDaemonFixedThreadPool(
+      topics.values.sum, "KafkaMessageHandler")
+
+    blockGenerator.start()
+
+    val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
+      .newInstance(consumerConfig.props)
+      .asInstanceOf[Decoder[K]]
+
+    val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
+      .newInstance(consumerConfig.props)
+      .asInstanceOf[Decoder[V]]
+
+    val topicMessageStreams = consumerConnector.createMessageStreams(
+      topics, keyDecoder, valueDecoder)
+
+    topicMessageStreams.values.foreach { streams =>
+      streams.foreach { stream =>
+        messageHandlerThreadPool.submit(new MessageHandler(stream))
+      }
+    }
+  }
+
+  override def onStop(): Unit = {
+    if (messageHandlerThreadPool != null) {
+      messageHandlerThreadPool.shutdown()
+      messageHandlerThreadPool = null
+    }
+
+    if (consumerConnector != null) {
+      consumerConnector.shutdown()
+      consumerConnector = null
+    }
+
+    if (zkClient != null) {
+      zkClient.close()
+      zkClient = null
+    }
+
+    if (blockGenerator != null) {
+      blockGenerator.stop()
+      blockGenerator = null
+    }
+
+    if (topicPartitionOffsetMap != null) {
+      topicPartitionOffsetMap.clear()
+      topicPartitionOffsetMap = null
+    }
+
+    if (blockOffsetMap != null) {
+      blockOffsetMap.clear()
+      blockOffsetMap = null
+    }
+  }
+
+  /** Store a Kafka message and the associated metadata as a tuple. */
+  private def storeMessageAndMetadata(
+      msgAndMetadata: MessageAndMetadata[K, V]): Unit = {
+    val topicAndPartition = TopicAndPartition(msgAndMetadata.topic, msgAndMetadata.partition)
+    val data = (msgAndMetadata.key, msgAndMetadata.message)
+    val metadata = (topicAndPartition, msgAndMetadata.offset)
+    blockGenerator.addDataWithCallback(data, metadata)
+  }
+
+  /** Update stored offset */
+  private def updateOffset(topicAndPartition: TopicAndPartition, offset: Long): Unit = {
+    topicPartitionOffsetMap.put(topicAndPartition, offset)
+  }
+
+  /**
+   * Remember the current offsets for each topic and partition. This is called when a block is
+   * generated.
+   */
+  private def rememberBlockOffsets(blockId: StreamBlockId): Unit = {
+    // Get a snapshot of current offset map and store with related block id.
+    val offsetSnapshot = topicPartitionOffsetMap.toMap
+    blockOffsetMap.put(blockId, offsetSnapshot)
+    topicPartitionOffsetMap.clear()
+  }
+
+  /** Store the ready-to-be-stored block and commit the related offsets to zookeeper. */
+  private def storeBlockAndCommitOffset(
+      blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
+    store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[(K, V)]])
+    Option(blockOffsetMap.get(blockId)).foreach(commitOffset)
+    blockOffsetMap.remove(blockId)
+  }
+
+  /**
+   * Commit the offset of Kafka's topic/partition, the commit mechanism follow Kafka 0.8.x's
+   * metadata schema in Zookeeper.
+   */
+  private def commitOffset(offsetMap: Map[TopicAndPartition, Long]): Unit = {
+    if (zkClient == null) {
+      val thrown = new IllegalStateException("Zookeeper client is unexpectedly null")
+      stop("Zookeeper client is not initialized before commit offsets to ZK", thrown)
+      return
+    }
+
+    for ((topicAndPart, offset) <- offsetMap) {
+      try {
+        val topicDirs = new ZKGroupTopicDirs(groupId, topicAndPart.topic)
+        val zkPath = s"${topicDirs.consumerOffsetDir}/${topicAndPart.partition}"
+
+        ZkUtils.updatePersistentPath(zkClient, zkPath, offset.toString)
+      } catch {
+        case e: Exception =>
+          logWarning(s"Exception during commit offset $offset for topic" +
+            s"${topicAndPart.topic}, partition ${topicAndPart.partition}", e)
+      }
+
+      logInfo(s"Committed offset $offset for topic ${topicAndPart.topic}, " +
+        s"partition ${topicAndPart.partition}")
+    }
+  }
+
+  /** Class to handle received Kafka message. */
+  private final class MessageHandler(stream: KafkaStream[K, V]) extends Runnable {
+    override def run(): Unit = {
+      while (!isStopped) {
+        try {
+          val streamIterator = stream.iterator()
+          while (streamIterator.hasNext) {
+            storeMessageAndMetadata(streamIterator.next)
+          }
+        } catch {
+          case e: Exception =>
+            logError("Error handling message", e)
+        }
+      }
+    }
+  }
+
+  /** Class to handle blocks generated by the block generator. */
+  private final class GeneratedBlockHandler extends BlockGeneratorListener {
+
+    def onAddData(data: Any, metadata: Any): Unit = {
+      // Update the offset of the data that was added to the generator
+      if (metadata != null) {
+        val (topicAndPartition, offset) = metadata.asInstanceOf[(TopicAndPartition, Long)]
+        updateOffset(topicAndPartition, offset)
+      }
+    }
+
+    def onGenerateBlock(blockId: StreamBlockId): Unit = {
+      // Remember the offsets of topics/partitions when a block has been generated
+      rememberBlockOffsets(blockId)
+    }
+
+    def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
+      // Store block and commit the blocks offset
+      storeBlockAndCommitOffset(blockId, arrayBuffer)
+    }
+
+    def onError(message: String, throwable: Throwable): Unit = {
+      reportError(message, throwable)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/5930f64b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
index efb0099..6e1abf3 100644
--- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
@@ -20,7 +20,10 @@ package org.apache.spark.streaming.kafka;
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Random;
 
+import org.apache.spark.SparkConf;
+import org.apache.spark.streaming.Duration;
 import scala.Predef;
 import scala.Tuple2;
 import scala.collection.JavaConverters;
@@ -32,8 +35,6 @@ import kafka.serializer.StringDecoder;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.LocalJavaStreamingContext;
 import org.apache.spark.streaming.api.java.JavaDStream;
 import org.apache.spark.streaming.api.java.JavaPairDStream;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
@@ -42,25 +43,27 @@ import org.junit.Test;
 import org.junit.After;
 import org.junit.Before;
 
-public class JavaKafkaStreamSuite extends LocalJavaStreamingContext implements Serializable {
-  private transient KafkaStreamSuite testSuite = new KafkaStreamSuite();
+public class JavaKafkaStreamSuite implements Serializable {
+  private transient JavaStreamingContext ssc = null;
+  private transient Random random = new Random();
+  private transient KafkaStreamSuiteBase suiteBase = null;
 
   @Before
-  @Override
   public void setUp() {
-    testSuite.beforeFunction();
+    suiteBase = new KafkaStreamSuiteBase() { };
+    suiteBase.setupKafka();
     System.clearProperty("spark.driver.port");
-    //System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock");
-    ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
+    SparkConf sparkConf = new SparkConf()
+      .setMaster("local[4]").setAppName(this.getClass().getSimpleName());
+    ssc = new JavaStreamingContext(sparkConf, new Duration(500));
   }
 
   @After
-  @Override
   public void tearDown() {
     ssc.stop();
     ssc = null;
     System.clearProperty("spark.driver.port");
-    testSuite.afterFunction();
+    suiteBase.tearDownKafka();
   }
 
   @Test
@@ -74,15 +77,15 @@ public class JavaKafkaStreamSuite extends LocalJavaStreamingContext implements S
     sent.put("b", 3);
     sent.put("c", 10);
 
-    testSuite.createTopic(topic);
+    suiteBase.createTopic(topic);
     HashMap<String, Object> tmp = new HashMap<String, Object>(sent);
-    testSuite.produceAndSendMessage(topic,
-      JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap(
-        Predef.<Tuple2<String, Object>>conforms()));
+    suiteBase.produceAndSendMessage(topic,
+        JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap(
+            Predef.<Tuple2<String, Object>>conforms()));
 
     HashMap<String, String> kafkaParams = new HashMap<String, String>();
-    kafkaParams.put("zookeeper.connect", testSuite.zkHost() + ":" + testSuite.zkPort());
-    kafkaParams.put("group.id", "test-consumer-" + KafkaTestUtils.random().nextInt(10000));
+    kafkaParams.put("zookeeper.connect", suiteBase.zkAddress());
+    kafkaParams.put("group.id", "test-consumer-" + random.nextInt(10000));
     kafkaParams.put("auto.offset.reset", "smallest");
 
     JavaPairDStream<String, String> stream = KafkaUtils.createStream(ssc,
@@ -124,11 +127,16 @@ public class JavaKafkaStreamSuite extends LocalJavaStreamingContext implements S
     );
 
     ssc.start();
-    ssc.awaitTermination(3000);
-
+    long startTime = System.currentTimeMillis();
+    boolean sizeMatches = false;
+    while (!sizeMatches && System.currentTimeMillis() - startTime < 20000) {
+      sizeMatches = sent.size() == result.size();
+      Thread.sleep(200);
+    }
     Assert.assertEquals(sent.size(), result.size());
     for (String k : sent.keySet()) {
       Assert.assertEquals(sent.get(k).intValue(), result.get(k).intValue());
     }
+    ssc.stop();
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/5930f64b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
index 6943326..b19c053 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
@@ -19,51 +19,57 @@ package org.apache.spark.streaming.kafka
 
 import java.io.File
 import java.net.InetSocketAddress
-import java.util.{Properties, Random}
+import java.util.Properties
 
 import scala.collection.mutable
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.util.Random
 
 import kafka.admin.CreateTopicCommand
 import kafka.common.{KafkaException, TopicAndPartition}
-import kafka.producer.{KeyedMessage, ProducerConfig, Producer}
-import kafka.utils.ZKStringSerializer
+import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
 import kafka.serializer.{StringDecoder, StringEncoder}
 import kafka.server.{KafkaConfig, KafkaServer}
-
+import kafka.utils.ZKStringSerializer
 import org.I0Itec.zkclient.ZkClient
+import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.concurrent.Eventually
 
-import org.apache.zookeeper.server.ZooKeeperServer
-import org.apache.zookeeper.server.NIOServerCnxnFactory
-
-import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
+import org.apache.spark.{Logging, SparkConf}
 import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Milliseconds, StreamingContext}
 import org.apache.spark.util.Utils
 
-class KafkaStreamSuite extends TestSuiteBase {
-  import KafkaTestUtils._
-
-  val zkHost = "localhost"
-  var zkPort: Int = 0
-  val zkConnectionTimeout = 6000
-  val zkSessionTimeout = 6000
-
-  protected var brokerPort = 9092
-  protected var brokerConf: KafkaConfig = _
-  protected var zookeeper: EmbeddedZookeeper = _
-  protected var zkClient: ZkClient = _
-  protected var server: KafkaServer = _
-  protected var producer: Producer[String, String] = _
-
-  override def useManualClock = false
-
-  override def beforeFunction() {
+/**
+ * This is an abstract base class for Kafka testsuites. This has the functionality to set up
+ * and tear down local Kafka servers, and to push data using Kafka producers.
+ */
+abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Logging {
+
+  var zkAddress: String = _
+  var zkClient: ZkClient = _
+
+  private val zkHost = "localhost"
+  private val zkConnectionTimeout = 6000
+  private val zkSessionTimeout = 6000
+  private var zookeeper: EmbeddedZookeeper = _
+  private var zkPort: Int = 0
+  private var brokerPort = 9092
+  private var brokerConf: KafkaConfig = _
+  private var server: KafkaServer = _
+  private var producer: Producer[String, String] = _
+
+  def setupKafka() {
     // Zookeeper server startup
     zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
     // Get the actual zookeeper binding port
     zkPort = zookeeper.actualPort
+    zkAddress = s"$zkHost:$zkPort"
     logInfo("==================== 0 ====================")
 
-    zkClient = new ZkClient(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout,
+    zkClient = new ZkClient(zkAddress, zkSessionTimeout, zkConnectionTimeout,
       ZKStringSerializer)
     logInfo("==================== 1 ====================")
 
@@ -71,7 +77,7 @@ class KafkaStreamSuite extends TestSuiteBase {
     var bindSuccess: Boolean = false
     while(!bindSuccess) {
       try {
-        val brokerProps = getBrokerConfig(brokerPort, s"$zkHost:$zkPort")
+        val brokerProps = getBrokerConfig()
         brokerConf = new KafkaConfig(brokerProps)
         server = new KafkaServer(brokerConf)
         logInfo("==================== 2 ====================")
@@ -89,53 +95,30 @@ class KafkaStreamSuite extends TestSuiteBase {
 
     Thread.sleep(2000)
     logInfo("==================== 4 ====================")
-    super.beforeFunction()
   }
 
-  override def afterFunction() {
-    producer.close()
-    server.shutdown()
-    brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) }
-
-    zkClient.close()
-    zookeeper.shutdown()
-
-    super.afterFunction()
-  }
-
-  test("Kafka input stream") {
-    val ssc = new StreamingContext(master, framework, batchDuration)
-    val topic = "topic1"
-    val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
-    createTopic(topic)
-    produceAndSendMessage(topic, sent)
+  def tearDownKafka() {
+    if (producer != null) {
+      producer.close()
+      producer = null
+    }
 
-    val kafkaParams = Map("zookeeper.connect" -> s"$zkHost:$zkPort",
-      "group.id" -> s"test-consumer-${random.nextInt(10000)}",
-      "auto.offset.reset" -> "smallest")
+    if (server != null) {
+      server.shutdown()
+      server = null
+    }
 
-    val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
-      ssc,
-      kafkaParams,
-      Map(topic -> 1),
-      StorageLevel.MEMORY_ONLY)
-    val result = new mutable.HashMap[String, Long]()
-    stream.map { case (k, v) => v }
-      .countByValue()
-      .foreachRDD { r =>
-        val ret = r.collect()
-        ret.toMap.foreach { kv =>
-          val count = result.getOrElseUpdate(kv._1, 0) + kv._2
-          result.put(kv._1, count)
-        }
-      }
-    ssc.start()
-    ssc.awaitTermination(3000)
+    brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) }
 
-    assert(sent.size === result.size)
-    sent.keys.foreach { k => assert(sent(k) === result(k).toInt) }
+    if (zkClient != null) {
+      zkClient.close()
+      zkClient = null
+    }
 
-    ssc.stop()
+    if (zookeeper != null) {
+      zookeeper.shutdown()
+      zookeeper = null
+    }
   }
 
   private def createTestMessage(topic: String, sent: Map[String, Int])
@@ -150,58 +133,43 @@ class KafkaStreamSuite extends TestSuiteBase {
     CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0")
     logInfo("==================== 5 ====================")
     // wait until metadata is propagated
-    waitUntilMetadataIsPropagated(Seq(server), topic, 0, 1000)
+    waitUntilMetadataIsPropagated(topic, 0)
   }
 
   def produceAndSendMessage(topic: String, sent: Map[String, Int]) {
-    val brokerAddr = brokerConf.hostName + ":" + brokerConf.port
-    producer = new Producer[String, String](new ProducerConfig(getProducerConfig(brokerAddr)))
+    producer = new Producer[String, String](new ProducerConfig(getProducerConfig()))
     producer.send(createTestMessage(topic, sent): _*)
+    producer.close()
     logInfo("==================== 6 ====================")
   }
-}
-
-object KafkaTestUtils {
-  val random = new Random()
 
-  def getBrokerConfig(port: Int, zkConnect: String): Properties = {
+  private def getBrokerConfig(): Properties = {
     val props = new Properties()
     props.put("broker.id", "0")
     props.put("host.name", "localhost")
-    props.put("port", port.toString)
+    props.put("port", brokerPort.toString)
     props.put("log.dir", Utils.createTempDir().getAbsolutePath)
-    props.put("zookeeper.connect", zkConnect)
+    props.put("zookeeper.connect", zkAddress)
     props.put("log.flush.interval.messages", "1")
     props.put("replica.socket.timeout.ms", "1500")
     props
   }
 
-  def getProducerConfig(brokerList: String): Properties = {
+  private def getProducerConfig(): Properties = {
+    val brokerAddr = brokerConf.hostName + ":" + brokerConf.port
     val props = new Properties()
-    props.put("metadata.broker.list", brokerList)
+    props.put("metadata.broker.list", brokerAddr)
     props.put("serializer.class", classOf[StringEncoder].getName)
     props
   }
 
-  def waitUntilTrue(condition: () => Boolean, waitTime: Long): Boolean = {
-    val startTime = System.currentTimeMillis()
-    while (true) {
-      if (condition())
-        return true
-      if (System.currentTimeMillis() > startTime + waitTime)
-        return false
-      Thread.sleep(waitTime.min(100L))
+  private def waitUntilMetadataIsPropagated(topic: String, partition: Int) {
+    eventually(timeout(1000 milliseconds), interval(100 milliseconds)) {
+      assert(
+        server.apis.leaderCache.keySet.contains(TopicAndPartition(topic, partition)),
+        s"Partition [$topic, $partition] metadata not propagated after timeout"
+      )
     }
-    // Should never go to here
-    throw new RuntimeException("unexpected error")
-  }
-
-  def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition: Int,
-      timeout: Long) {
-    assert(waitUntilTrue(() =>
-      servers.foldLeft(true)(_ && _.apis.leaderCache.keySet.contains(
-        TopicAndPartition(topic, partition))), timeout),
-      s"Partition [$topic, $partition] metadata not propagated after timeout")
   }
 
   class EmbeddedZookeeper(val zkConnect: String) {
@@ -227,3 +195,53 @@ object KafkaTestUtils {
     }
   }
 }
+
+
+class KafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
+  var ssc: StreamingContext = _
+
+  before {
+    setupKafka()
+  }
+
+  after {
+    if (ssc != null) {
+      ssc.stop()
+      ssc = null
+    }
+    tearDownKafka()
+  }
+
+  test("Kafka input stream") {
+    val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
+    ssc = new StreamingContext(sparkConf, Milliseconds(500))
+    val topic = "topic1"
+    val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
+    createTopic(topic)
+    produceAndSendMessage(topic, sent)
+
+    val kafkaParams = Map("zookeeper.connect" -> zkAddress,
+      "group.id" -> s"test-consumer-${Random.nextInt(10000)}",
+      "auto.offset.reset" -> "smallest")
+
+    val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
+      ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)
+    val result = new mutable.HashMap[String, Long]()
+    stream.map(_._2).countByValue().foreachRDD { r =>
+      val ret = r.collect()
+      ret.toMap.foreach { kv =>
+        val count = result.getOrElseUpdate(kv._1, 0) + kv._2
+        result.put(kv._1, count)
+      }
+    }
+    ssc.start()
+    eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
+      assert(sent.size === result.size)
+      sent.keys.foreach { k =>
+        assert(sent(k) === result(k).toInt)
+      }
+    }
+    ssc.stop()
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/5930f64b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
new file mode 100644
index 0000000..64ccc92
--- /dev/null
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+
+import java.io.File
+
+import scala.collection.mutable
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.util.Random
+
+import com.google.common.io.Files
+import kafka.serializer.StringDecoder
+import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
+import org.apache.commons.io.FileUtils
+import org.scalatest.BeforeAndAfter
+import org.scalatest.concurrent.Eventually
+
+import org.apache.spark.SparkConf
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Milliseconds, StreamingContext}
+
+class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with Eventually {
+
+  val sparkConf = new SparkConf()
+    .setMaster("local[4]")
+    .setAppName(this.getClass.getSimpleName)
+    .set("spark.streaming.receiver.writeAheadLog.enable", "true")
+  val data = Map("a" -> 10, "b" -> 10, "c" -> 10)
+
+
+  var groupId: String = _
+  var kafkaParams: Map[String, String] = _
+  var ssc: StreamingContext = _
+  var tempDirectory: File = null
+
+  before {
+    setupKafka()
+    groupId = s"test-consumer-${Random.nextInt(10000)}"
+    kafkaParams = Map(
+      "zookeeper.connect" -> zkAddress,
+      "group.id" -> groupId,
+      "auto.offset.reset" -> "smallest"
+    )
+
+    ssc = new StreamingContext(sparkConf, Milliseconds(500))
+    tempDirectory = Files.createTempDir()
+    ssc.checkpoint(tempDirectory.getAbsolutePath)
+  }
+
+  after {
+    if (ssc != null) {
+      ssc.stop()
+    }
+    if (tempDirectory != null && tempDirectory.exists()) {
+      FileUtils.deleteDirectory(tempDirectory)
+      tempDirectory = null
+    }
+    tearDownKafka()
+  }
+
+
+  test("Reliable Kafka input stream with single topic") {
+    var topic = "test-topic"
+    createTopic(topic)
+    produceAndSendMessage(topic, data)
+
+    // Verify whether the offset of this group/topic/partition is 0 before starting.
+    assert(getCommitOffset(groupId, topic, 0) === None)
+
+    val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
+      ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)
+    val result = new mutable.HashMap[String, Long]()
+    stream.map { case (k, v) => v }.foreachRDD { r =>
+        val ret = r.collect()
+        ret.foreach { v =>
+          val count = result.getOrElseUpdate(v, 0) + 1
+          result.put(v, count)
+        }
+      }
+    ssc.start()
+    eventually(timeout(20000 milliseconds), interval(200 milliseconds)) {
+      // A basic process verification for ReliableKafkaReceiver.
+      // Verify whether received message number is equal to the sent message number.
+      assert(data.size === result.size)
+      // Verify whether each message is the same as the data to be verified.
+      data.keys.foreach { k => assert(data(k) === result(k).toInt) }
+      // Verify the offset number whether it is equal to the total message number.
+      assert(getCommitOffset(groupId, topic, 0) === Some(29L))
+    }
+    ssc.stop()
+  }
+
+  test("Reliable Kafka input stream with multiple topics") {
+    val topics = Map("topic1" -> 1, "topic2" -> 1, "topic3" -> 1)
+    topics.foreach { case (t, _) =>
+      createTopic(t)
+      produceAndSendMessage(t, data)
+    }
+
+    // Before started, verify all the group/topic/partition offsets are 0.
+    topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === None) }
+
+    // Consuming all the data sent to the broker which will potential commit the offsets internally.
+    val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
+      ssc, kafkaParams, topics, StorageLevel.MEMORY_ONLY)
+    stream.foreachRDD(_ => Unit)
+    ssc.start()
+    eventually(timeout(20000 milliseconds), interval(100 milliseconds)) {
+      // Verify the offset for each group/topic to see whether they are equal to the expected one.
+      topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === Some(29L)) }
+    }
+    ssc.stop()
+  }
+
+
+  /** Getting partition offset from Zookeeper. */
+  private def getCommitOffset(groupId: String, topic: String, partition: Int): Option[Long] = {
+    assert(zkClient != null, "Zookeeper client is not initialized")
+    val topicDirs = new ZKGroupTopicDirs(groupId, topic)
+    val zkPath = s"${topicDirs.consumerOffsetDir}/$partition"
+    ZkUtils.readDataMaybeNull(zkClient, zkPath)._1.map(_.toLong)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/5930f64b/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index a94d09b..8a2a865 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -85,6 +85,10 @@ object MimaExcludes {
               "org.apache.hadoop.mapred.SparkHadoopMapRedUtil"),
             ProblemFilters.exclude[MissingTypesProblem](
               "org.apache.spark.rdd.PairRDDFunctions")
+          ) ++ Seq(
+            // SPARK-4062
+            ProblemFilters.exclude[MissingMethodProblem](
+              "org.apache.spark.streaming.kafka.KafkaReceiver#MessageHandler.this")
           )
 
         case v if v.startsWith("1.1") =>

http://git-wip-us.apache.org/repos/asf/spark/blob/5930f64b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
index 0316b68..55765dc 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
@@ -27,9 +27,38 @@ import org.apache.spark.streaming.util.{RecurringTimer, SystemClock}
 
 /** Listener object for BlockGenerator events */
 private[streaming] trait BlockGeneratorListener {
-  /** Called when a new block needs to be pushed */
+  /**
+   * Called after a data item is added into the BlockGenerator. The data addition and this
+   * callback are synchronized with the block generation and its associated callback,
+   * so block generation waits for the active data addition+callback to complete. This is useful
+   * for updating metadata on successful buffering of a data item, specifically that metadata
+   * that will be useful when a block is generated. Any long blocking operation in this callback
+   * will hurt the throughput.
+   */
+  def onAddData(data: Any, metadata: Any)
+
+  /**
+   * Called when a new block of data is generated by the block generator. The block generation
+   * and this callback are synchronized with the data addition and its associated callback, so
+   * the data addition waits for the block generation+callback to complete. This is useful
+   * for updating metadata when a block has been generated, specifically metadata that will
+   * be useful when the block has been successfully stored. Any long blocking operation in this
+   * callback will hurt the throughput.
+   */
+  def onGenerateBlock(blockId: StreamBlockId)
+
+  /**
+   * Called when a new block is ready to be pushed. Callers are supposed to store the block into
+   * Spark in this method. Internally this is called from a single
+   * thread, that is not synchronized with any other callbacks. Hence it is okay to do long
+   * blocking operation in this callback.
+   */
   def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_])
-  /** Called when an error has occurred in BlockGenerator */
+
+  /**
+   * Called when an error has occurred in the BlockGenerator. Can be called form many places
+   * so better to not do any long block operation in this callback.
+   */
   def onError(message: String, throwable: Throwable)
 }
 
@@ -80,9 +109,20 @@ private[streaming] class BlockGenerator(
    * Push a single data item into the buffer. All received data items
    * will be periodically pushed into BlockManager.
    */
-  def += (data: Any): Unit = synchronized {
+  def addData (data: Any): Unit = synchronized {
+    waitToPush()
+    currentBuffer += data
+  }
+
+  /**
+   * Push a single data item into the buffer. After buffering the data, the
+   * `BlockGeneratorListnere.onAddData` callback will be called. All received data items
+   * will be periodically pushed into BlockManager.
+   */
+  def addDataWithCallback(data: Any, metadata: Any) = synchronized {
     waitToPush()
     currentBuffer += data
+    listener.onAddData(data, metadata)
   }
 
   /** Change the buffer to which single records are added to. */
@@ -93,14 +133,15 @@ private[streaming] class BlockGenerator(
       if (newBlockBuffer.size > 0) {
         val blockId = StreamBlockId(receiverId, time - blockInterval)
         val newBlock = new Block(blockId, newBlockBuffer)
+        listener.onGenerateBlock(blockId)
         blocksForPushing.put(newBlock)  // put is blocking when queue is full
         logDebug("Last element in " + blockId + " is " + newBlockBuffer.last)
       }
     } catch {
       case ie: InterruptedException =>
         logInfo("Block updating timer thread was interrupted")
-      case t: Throwable =>
-        reportError("Error in block updating thread", t)
+      case e: Exception =>
+        reportError("Error in block updating thread", e)
     }
   }
 
@@ -126,8 +167,8 @@ private[streaming] class BlockGenerator(
     } catch {
       case ie: InterruptedException =>
         logInfo("Block pushing thread was interrupted")
-      case t: Throwable =>
-        reportError("Error in block pushing thread", t)
+      case e: Exception =>
+        reportError("Error in block pushing thread", e)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5930f64b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index 5360412..3b1233e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -27,10 +27,10 @@ import akka.actor.{Actor, Props}
 import akka.pattern.ask
 import com.google.common.base.Throwables
 import org.apache.hadoop.conf.Configuration
+
 import org.apache.spark.{Logging, SparkEnv, SparkException}
 import org.apache.spark.storage.StreamBlockId
 import org.apache.spark.streaming.scheduler._
-import org.apache.spark.streaming.util.WriteAheadLogFileSegment
 import org.apache.spark.util.{AkkaUtils, Utils}
 
 /**
@@ -99,6 +99,10 @@ private[streaming] class ReceiverSupervisorImpl(
 
   /** Divides received data records into data blocks for pushing in BlockManager. */
   private val blockGenerator = new BlockGenerator(new BlockGeneratorListener {
+    def onAddData(data: Any, metadata: Any): Unit = { }
+
+    def onGenerateBlock(blockId: StreamBlockId): Unit = { }
+
     def onError(message: String, throwable: Throwable) {
       reportError(message, throwable)
     }
@@ -110,7 +114,7 @@ private[streaming] class ReceiverSupervisorImpl(
 
   /** Push a single record of received data into block generator. */
   def pushSingle(data: Any) {
-    blockGenerator += (data)
+    blockGenerator.addData(data)
   }
 
   /** Store an ArrayBuffer of received data as a data block into Spark's memory. */

http://git-wip-us.apache.org/repos/asf/spark/blob/5930f64b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
index 0f6a948..e26c0c6 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
@@ -138,7 +138,7 @@ class ReceiverSuite extends FunSuite with Timeouts {
     blockGenerator.start()
     var count = 0
     while(System.currentTimeMillis - startTime < waitTime) {
-      blockGenerator += count
+      blockGenerator.addData(count)
       generatedData += count
       count += 1
       Thread.sleep(10)
@@ -168,7 +168,7 @@ class ReceiverSuite extends FunSuite with Timeouts {
     blockGenerator.start()
     var count = 0
     while(System.currentTimeMillis - startTime < waitTime) {
-      blockGenerator += count
+      blockGenerator.addData(count)
       generatedData += count
       count += 1
       Thread.sleep(1)
@@ -299,6 +299,10 @@ class ReceiverSuite extends FunSuite with Timeouts {
     val arrayBuffers = new ArrayBuffer[ArrayBuffer[Int]]
     val errors = new ArrayBuffer[Throwable]
 
+    def onAddData(data: Any, metadata: Any) { }
+
+    def onGenerateBlock(blockId: StreamBlockId) { }
+
     def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {
       val bufferOfInts = arrayBuffer.map(_.asInstanceOf[Int])
       arrayBuffers += bufferOfInts


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org