You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/02/04 23:20:52 UTC

spark git commit: [SPARK-4707][STREAMING] Reliable Kafka Receiver can lose data if the blo...

Repository: spark
Updated Branches:
  refs/heads/master b0c002195 -> f0500f9fa


[SPARK-4707][STREAMING] Reliable Kafka Receiver can lose data if the blo...

...ck generator fails to store data.

The Reliable Kafka Receiver commits offsets only when events are actually stored, which ensures that on restart we will actually start where we left off. But if the failure happens in the store() call, and the block generator reports an error the receiver does not do anything and will continue reading from the current offset and not the last commit. This means that messages between the last commit and the current offset will be lost.

This PR retries the store call four times and then stops the receiver with an error message and the last exception that was received from the store.

Author: Hari Shreedharan <hs...@apache.org>

Closes #3655 from harishreedharan/kafka-failure-fix and squashes the following commits:

5e2e7ad [Hari Shreedharan] [SPARK-4704][STREAMING] Reliable Kafka Receiver can lose data if the block generator fails to store data.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f0500f9f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f0500f9f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f0500f9f

Branch: refs/heads/master
Commit: f0500f9fa378d81e4b4038a66a40eee15806b677
Parents: b0c0021
Author: Hari Shreedharan <hs...@apache.org>
Authored: Wed Feb 4 14:20:44 2015 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Wed Feb 4 14:20:44 2015 -0800

----------------------------------------------------------------------
 .../streaming/kafka/ReliableKafkaReceiver.scala | 27 +++++++++++++++++---
 1 file changed, 23 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f0500f9f/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
index be734b8..c4a44c1 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
@@ -201,12 +201,31 @@ class ReliableKafkaReceiver[
     topicPartitionOffsetMap.clear()
   }
 
-  /** Store the ready-to-be-stored block and commit the related offsets to zookeeper. */
+  /**
+   * Store the ready-to-be-stored block and commit the related offsets to zookeeper. This method
+   * will try a fixed number of times to push the block. If the push fails, the receiver is stopped.
+   */
   private def storeBlockAndCommitOffset(
       blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
-    store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[(K, V)]])
-    Option(blockOffsetMap.get(blockId)).foreach(commitOffset)
-    blockOffsetMap.remove(blockId)
+    var count = 0
+    var pushed = false
+    var exception: Exception = null
+    while (!pushed && count <= 3) {
+      try {
+        store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[(K, V)]])
+        pushed = true
+      } catch {
+        case ex: Exception =>
+          count += 1
+          exception = ex
+      }
+    }
+    if (pushed) {
+      Option(blockOffsetMap.get(blockId)).foreach(commitOffset)
+      blockOffsetMap.remove(blockId)
+    } else {
+      stop("Error while storing block into Spark", exception)
+    }
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org