You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/21 04:18:17 UTC

[jira] [Resolved] (SPARK-22685) Spark Streaming using Kinesis doesn't work if shard checkpoints exist in DynamoDB

     [ https://issues.apache.org/jira/browse/SPARK-22685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon resolved SPARK-22685.
----------------------------------
    Resolution: Incomplete

> Spark Streaming using Kinesis doesn't work if shard checkpoints exist in DynamoDB
> ---------------------------------------------------------------------------------
>
>                 Key: SPARK-22685
>                 URL: https://issues.apache.org/jira/browse/SPARK-22685
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 2.2.0
>            Reporter: Grega Kespret
>            Priority: Major
>              Labels: bulk-closed
>
> Apologize if this is not the best place to post this / if the description is lacking some needed info. Please let me know and I will update.
> This was cross-posted on [StackOverflow|https://stackoverflow.com/questions/47644984/spark-streaming-using-kinesis-doesnt-work-if-shard-checkpoints-exist-in-dynamod].
> **TL;DR** – If shard checkpoints don't exist in DynamoDB (== completely fresh), Spark Streaming application reading from Kinesis works flawlessly. However, if the checkpoints exist (e.g. due to app restart), it fails most of the times.
> ----
> The app uses **Spark Streaming 2.2.0** and **spark-streaming-kinesis-asl_2.11**. 
> When starting the app with checkpointed shard data (written by KCL to DynamoDB), after a few successful batches (number varies), this is what I can see in the logs:
> First, **Leases are lost**:
> {code}
> 17/12/01 05:16:50 INFO LeaseRenewer: Worker 10.0.182.119:9781acd5-6cb3-4a39-a235-46f1254eb885 lost lease with key shardId-000000000515
> {code}
> Then in random order: **Can't update checkpoint - instance doesn't hold the lease for this shard** and **com.amazonaws.SdkClientException: Unable to execute HTTP request: The target server failed to respond** follow, bringing down the whole app in a few batches:
> {code}
>     17/12/01 05:17:10 ERROR ProcessTask: ShardId shardId-000000000394: Caught exception:
>     com.amazonaws.SdkClientException: Unable to execute HTTP request: The target server failed to respond
>             at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1069)
>             at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1035)
>             at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:742)
>             at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:716)
>             at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
>             at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
>             at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
>             at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
>             at com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:1948)
>             at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:1924)
>             at com.amazonaws.services.kinesis.AmazonKinesisClient.executeGetRecords(AmazonKinesisClient.java:969)
>             at com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:945)
>             at com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy.get(KinesisProxy.java:156)
>             at com.amazonaws.services.kinesis.clientlibrary.proxies.MetricsCollectingKinesisProxyDecorator.get(MetricsCollectingKinesisProxyDecorator.java:74)
>             at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher.getRecords(KinesisDataFetcher.java:68)
>             at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.getRecordsResultAndRecordMillisBehindLatest(ProcessTask.java:291)
>             at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.getRecordsResult(ProcessTask.java:256)
>             at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.call(ProcessTask.java:127)
>             at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49)
>             at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:24)
>             at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>             at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>             at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>             at java.lang.Thread.run(Thread.java:748)
>       Caused by: org.apache.http.NoHttpResponseException: The target server failed to respond
>             at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:143)
>             at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:57)
>             at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:261)
>             at org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:165)
>             at org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:167)
>             at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:272)
>             at com.amazonaws.http.protocol.SdkHttpRequestExecutor.doReceiveResponse(SdkHttpRequestExecutor.java:82)
>             at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:124)
>             at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:271)
>             at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:184)
>             at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:184)
>             at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
>             at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:55)
>             at com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
>             at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1190)
>             at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1030)
>             ... 22 more
> {code}
> and
> {code}
>     17/12/01 05:20:59 ERROR KinesisRecordProcessor: ShutdownException:  Caught shutdown exception, skipping checkpoint.
>     com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException: Can't update checkpoint - instance doesn't hold the lease for this shard
>             at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibLeaseCoordinator.setCheckpoint(KinesisClientLibLeaseCoordinator.java:173)
>             at com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer.advancePosition(RecordProcessorCheckpointer.java:216)
>             at com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer.checkpoint(RecordProcessorCheckpointer.java:137)
>             at com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer.checkpoint(RecordProcessorCheckpointer.java:103)
>             at org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1$$anonfun$apply$1.apply$mcV$sp(KinesisCheckpointer.scala:94)
>             at org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1$$anonfun$apply$1.apply(KinesisCheckpointer.scala:94)
>             at org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1$$anonfun$apply$1.apply(KinesisCheckpointer.scala:94)
>             at scala.util.Try$.apply(Try.scala:192)
>             at org.apache.spark.streaming.kinesis.KinesisRecordProcessor$.retryRandom(KinesisRecordProcessor.scala:158)
>             at org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1.apply(KinesisCheckpointer.scala:94)
>             at org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1.apply(KinesisCheckpointer.scala:88)
>             at scala.Option.foreach(Option.scala:257)
>             at org.apache.spark.streaming.kinesis.KinesisCheckpointer.checkpoint(KinesisCheckpointer.scala:88)
>             at org.apache.spark.streaming.kinesis.KinesisCheckpointer.org$apache$spark$streaming$kinesis$KinesisCheckpointer$$checkpointAll(KinesisCheckpointer.scala:116)
>             at org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$1.apply$mcVJ$sp(KinesisCheckpointer.scala:130)
>             at org.apache.spark.streaming.util.RecurringTimer.triggerActionForNextInterval(RecurringTimer.scala:94)
>             at org.apache.spark.streaming.util.RecurringTimer.org$apache$spark$streaming$util$RecurringTimer$$loop(RecurringTimer.scala:106)
>             at org.apache.spark.streaming.util.RecurringTimer$$anon$1.run(RecurringTimer.scala:29)
> {code}
> Right now, the workaround is to go and delete all the checkpoint data of all shards from DynamoDB so that the app starts from the `InitialPositionInStream.LATEST`. Obviously, the downside of that is that checkpoint information is not used at all, and data is lost.
> I may have missed something obvious, so any help would be appreciated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org