You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/05/23 02:39:04 UTC
spark git commit: [SPARK-7788] Made KinesisReceiver.onStart()
non-blocking
Repository: spark
Updated Branches:
refs/heads/master 3d8760d76 -> 1c388a998
[SPARK-7788] Made KinesisReceiver.onStart() non-blocking
KinesisReceiver calls worker.run() which is a blocking call (while loop) as per source code of kinesis-client library - https://github.com/awslabs/amazon-kinesis-client/blob/v1.2.1/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java.
This results in infinite loop while calling sparkStreamingContext.stop(stopSparkContext = false, stopGracefully = true) perhaps because ReceiverTracker is never able to register the receiver (it's receiverInfo field is a empty map) causing it to be stuck in infinite loop while waiting for running flag to be set to false.
Author: Tathagata Das <ta...@gmail.com>
Closes #6348 from tdas/SPARK-7788 and squashes the following commits:
2584683 [Tathagata Das] Added receiver id in thread name
6cf1cd4 [Tathagata Das] Made KinesisReceiver.onStart non-blocking
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1c388a99
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1c388a99
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1c388a99
Branch: refs/heads/master
Commit: 1c388a9985999e043fa002618a357bc8f0a8b65a
Parents: 3d8760d
Author: Tathagata Das <ta...@gmail.com>
Authored: Fri May 22 17:39:01 2015 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Fri May 22 17:39:01 2015 -0700
----------------------------------------------------------------------
.../streaming/kinesis/KinesisReceiver.scala | 30 ++++++++++++++++----
1 file changed, 25 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/1c388a99/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
index 800202e..7dd8bfd 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
@@ -18,6 +18,8 @@ package org.apache.spark.streaming.kinesis
import java.util.UUID
+import scala.util.control.NonFatal
+
import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider, BasicAWSCredentials, DefaultAWSCredentialsProviderChain}
import com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor, IRecordProcessorFactory}
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{InitialPositionInStream, KinesisClientLibConfiguration, Worker}
@@ -98,6 +100,9 @@ private[kinesis] class KinesisReceiver(
*/
private var worker: Worker = null
+ /** Thread running the worker */
+ private var workerThread: Thread = null
+
/**
* This is called when the KinesisReceiver starts and must be non-blocking.
* The KCL creates and manages the receiving/processing thread pool through Worker.run().
@@ -126,8 +131,19 @@ private[kinesis] class KinesisReceiver(
}
worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration)
- worker.run()
-
+ workerThread = new Thread() {
+ override def run(): Unit = {
+ try {
+ worker.run()
+ } catch {
+ case NonFatal(e) =>
+ restart("Error running the KCL worker in Receiver", e)
+ }
+ }
+ }
+ workerThread.setName(s"Kinesis Receiver ${streamId}")
+ workerThread.setDaemon(true)
+ workerThread.start()
logInfo(s"Started receiver with workerId $workerId")
}
@@ -137,10 +153,14 @@ private[kinesis] class KinesisReceiver(
* The KCL will do its best to drain and checkpoint any in-flight records upon shutdown.
*/
override def onStop() {
- if (worker != null) {
- worker.shutdown()
+ if (workerThread != null) {
+ if (worker != null) {
+ worker.shutdown()
+ worker = null
+ }
+ workerThread.join()
+ workerThread = null
logInfo(s"Stopped receiver for workerId $workerId")
- worker = null
}
workerId = null
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org