You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dan Diephouse <da...@netzooid.com> on 2020/10/07 21:15:39 UTC

Network issue leading to "No pooled slot available"

I am now using the S3 StreamingFileSink to send data to an S3 bucket.
If/when the network connection has issues, it seems to put Flink into an
irrecoverable state. Am I understanding this correctly? Any suggestions on
how to troubleshoot / fix?

Here is what I'm observing:

*1. Network is dropped *

*2. S3 connections do not exit gracefully*

2020-10-07 20:58:07.468  WARN 1 --- [aa565930b86fb).]
o.apache.flink.runtime.taskmanager.Task  : Task 'Sink: Unnamed (1/12)' did
not react to cancelling signal for 30 seconds, but is stuck in method:
 java.base@14.0.2/sun.nio.ch.Net.poll(Native Method)
java.base@14.0.2/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:181)
java.base@14.0.2/sun.nio.ch.NioSocketImpl.timedRead(NioSocketImpl.java:285)
java.base@14.0.2/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:309)
java.base@14.0.2/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:350)
java.base@14.0.2/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:803)
java.base@14.0.2/java.net.Socket$SocketInputStream.read(Socket.java:982)
java.base@14.0.2
/sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:469)
java.base@14.0.2
/sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:463)
java.base@14.0.2
/sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:160)
java.base@14.0.2/sun.security.ssl.SSLTransport.decode(SSLTransport.java:110)
java.base@14.0.2
/sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1475)
java.base@14.0.2
/sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1381)
java.base@14.0.2
/sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:441)
java.base@14.0.2
/sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:412)
app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.createLayeredSocket(SSLConnectionSocketFactory.java:436)
app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:384)
app//com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:142)
app//org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
app//org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:374)
jdk.internal.reflect.GeneratedMethodAccessor92.invoke(Unknown Source)
java.base@14.0.2
/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.base@14.0.2/java.lang.reflect.Method.invoke(Method.java:564)
app//com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
app//com.amazonaws.http.conn.$Proxy89.connect(Unknown Source)
app//org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
app//org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
app//org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
app//org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
app//org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
app//org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
app//com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1323)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1139)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680)
app//com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544)
app//com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524)
app//com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5054)
app//com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5000)
app//com.amazonaws.services.s3.AmazonS3Client.initiateMultipartUpload(AmazonS3Client.java:3574)
app//org.apache.hadoop.fs.s3a.S3AFileSystem.initiateMultipartUpload(S3AFileSystem.java:2597)

*3. Tasks do not complete*

2020-10-07 21:00:37.458 ERROR 1 --- [9580107498927).]
o.a.f.runtime.taskexecutor.TaskExecutor  : Task did not exit gracefully
within 180 + seconds.

org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully
within 180 + seconds.
at
org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1572)
~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

2020-10-07 21:00:37.459 ERROR 1 --- [9580107498927).]
o.a.f.runtime.minicluster.MiniCluster    : TaskManager #0 failed.

org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully
within 180 + seconds.
at
org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1572)
~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

*4. When trying to restart, there are no slots*

2020-10-07 21:00:37.486  INFO 1 --- [t-dispatcher-46]
o.a.f.r.executiongraph.ExecutionGraph    : Compute vehicle location from
tag reads -> Sink: Vehicle Event Sink (2/12)
(064e7b4692314286791305bfa636b209) switched from SCHEDULED to FAILED on not
deployed.

java.util.concurrent.CompletionException:
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
No pooled slot available and request to ResourceManager for new slot failed
at
java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
~[na:na]
at
java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
~[na:na]
at
java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632)
~[na:na]
at
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
~[na:na]
at
java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2152)
~[na:na]
at
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.slotRequestToResourceManagerFailed(SlotPoolImpl.java:360)
~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.lambda$requestSlotFromResourceManager$1(SlotPoolImpl.java:348)
~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
~[na:na]
at
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
~[na:na]
at
java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
~[na:na]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
~[scala-library-2.11.12.jar:na]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
~[scala-library-2.11.12.jar:na]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
~[scala-library-2.11.12.jar:na]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
~[scala-library-2.11.12.jar:na]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
Caused by:
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
No pooled slot available and request to ResourceManager for new slot failed
... 27 common frames omitted
Caused by:
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException:
Could not fulfill slot request d0e3ac18beece314e56aa23cd2265d92.
at
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:391)
~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.runtime.resourcemanager.ResourceManager.requestSlot(ResourceManager.java:457)
~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method) ~[na:na]
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[na:na]
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:564) ~[na:na]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
~[flink-runtime_2.11-1.11.2.jar:1.11.2]
... 20 common frames omitted
Caused by:
org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException:
Could not fulfill slot request d0e3ac18beece314e56aa23cd2265d92. Requested
resource profile (ResourceProfile{UNKNOWN}) is unfulfillable.
at
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.lambda$fulfillPendingSlotRequestWithPendingTaskManagerSlot$11(SlotManagerImpl.java:882)
~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.util.OptionalConsumer.ifNotPresent(OptionalConsumer.java:52)
~[flink-core-1.11.2.jar:1.11.2]
at
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.fulfillPendingSlotRequestWithPendingTaskManagerSlot(SlotManagerImpl.java:878)
~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.lambda$internalRequestSlot$9(SlotManagerImpl.java:865)
~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.util.OptionalConsumer.ifNotPresent(OptionalConsumer.java:52)
~[flink-core-1.11.2.jar:1.11.2]
at
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.internalRequestSlot(SlotManagerImpl.java:865)
~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:386)
~[flink-runtime_2.11-1.11.2.jar:1.11.2]
... 27 common frames omitted

Any thoughts / suggestions are much appreciated.

-- 
Dan Diephouse
@dandiep

Re: Network issue leading to "No pooled slot available"

Posted by Dan Diephouse <da...@netzooid.com>.
Quick update: it appears to work outside my test case too. I have not
encountered this issue post update at all.

On Thu, Oct 8, 2020 at 11:15 PM Khachatryan Roman <
khachatryan.roman@gmail.com> wrote:

> Thanks for checking this workaround!
>
> I've created a jira issue [1] to check if AWS SDK version can be upgraded
> in Flink distribution.
>
> Regards,
> Roman
>
>
> On Fri, Oct 9, 2020 at 12:54 AM Dan Diephouse <da...@netzooid.com> wrote:
>
>> Well, I just dropped in the latest Amazon 1.11.878 SDK and now it
>> appears to respect interrupts in a test case I created. (the test fails
>> with the SDK that is in use by Flink)
>>
>> I will try it in a full fledged Flink environment and report back.
>>
>> On Thu, Oct 8, 2020 at 3:41 PM Dan Diephouse <da...@netzooid.com> wrote:
>>
>>> Did some digging... definitely appears that the Amazon SDK definitely is
>>> not picking up the interrupt.  I will try playing with the connection
>>> timeout. Hadoop defaults it to 200000 ms, which may be part of the problem.
>>> Anyone have any other ideas?
>>>
>>> In theory this should be fixed by SDK v2 which uses NIO, but I don't
>>> think I'm up for all the changes that would involve in the downstream
>>> components.
>>>
>>> On Thu, Oct 8, 2020 at 8:36 AM Dan Diephouse <da...@netzooid.com> wrote:
>>>
>>>> Using the latest - 1.11.2.
>>>>
>>>> I would assume the interruption is being ignored in the Hadoop / S3
>>>> layer. I was looking at the defaults and (if I understood correctly) the
>>>> client will retry 20 times. Which would explain why it never gets
>>>> cancelled...
>>>>
>>>> On Thu, Oct 8, 2020 at 1:27 AM Khachatryan Roman <
>>>> khachatryan.roman@gmail.com> wrote:
>>>>
>>>>> Hi Dan Diephouse,
>>>>>
>>>>> From the logs you provided indeed it looks like 1 causes 2 => 3 => 4,
>>>>> where 2 is a bug.
>>>>> It's unclear though where the interruption is ignored (Flink/Hadoop
>>>>> FS/S3 client).
>>>>>
>>>>> What version of Flink are you using?
>>>>>
>>>>> Regards,
>>>>> Roman
>>>>>
>>>>>
>>>>> On Wed, Oct 7, 2020 at 11:16 PM Dan Diephouse <da...@netzooid.com>
>>>>> wrote:
>>>>>
>>>>>> I am now using the S3 StreamingFileSink to send data to an S3 bucket.
>>>>>> If/when the network connection has issues, it seems to put Flink into an
>>>>>> irrecoverable state. Am I understanding this correctly? Any suggestions on
>>>>>> how to troubleshoot / fix?
>>>>>>
>>>>>> Here is what I'm observing:
>>>>>>
>>>>>> *1. Network is dropped *
>>>>>>
>>>>>> *2. S3 connections do not exit gracefully*
>>>>>>
>>>>>> 2020-10-07 20:58:07.468  WARN 1 --- [aa565930b86fb).]
>>>>>> o.apache.flink.runtime.taskmanager.Task  : Task 'Sink: Unnamed (1/12)' did
>>>>>> not react to cancelling signal for 30 seconds, but is stuck in method:
>>>>>>  java.base@14.0.2/sun.nio.ch.Net.poll(Native Method)
>>>>>> java.base@14.0.2
>>>>>> /sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:181)
>>>>>> java.base@14.0.2
>>>>>> /sun.nio.ch.NioSocketImpl.timedRead(NioSocketImpl.java:285)
>>>>>> java.base@14.0.2
>>>>>> /sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:309)
>>>>>> java.base@14.0.2
>>>>>> /sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:350)
>>>>>> java.base@14.0.2
>>>>>> /sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:803)
>>>>>> java.base@14.0.2
>>>>>> /java.net.Socket$SocketInputStream.read(Socket.java:982)
>>>>>> java.base@14.0.2
>>>>>> /sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:469)
>>>>>> java.base@14.0.2
>>>>>> /sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:463)
>>>>>> java.base@14.0.2
>>>>>> /sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:160)
>>>>>> java.base@14.0.2
>>>>>> /sun.security.ssl.SSLTransport.decode(SSLTransport.java:110)
>>>>>> java.base@14.0.2
>>>>>> /sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1475)
>>>>>> java.base@14.0.2
>>>>>> /sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1381)
>>>>>> java.base@14.0.2
>>>>>> /sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:441)
>>>>>> java.base@14.0.2
>>>>>> /sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:412)
>>>>>>
>>>>>> app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.createLayeredSocket(SSLConnectionSocketFactory.java:436)
>>>>>>
>>>>>> app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:384)
>>>>>>
>>>>>> app//com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:142)
>>>>>>
>>>>>> app//org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
>>>>>>
>>>>>> app//org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:374)
>>>>>> jdk.internal.reflect.GeneratedMethodAccessor92.invoke(Unknown Source)
>>>>>> java.base@14.0.2
>>>>>> /jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>> java.base@14.0.2/java.lang.reflect.Method.invoke(Method.java:564)
>>>>>>
>>>>>> app//com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
>>>>>> app//com.amazonaws.http.conn.$Proxy89.connect(Unknown Source)
>>>>>>
>>>>>> app//org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
>>>>>>
>>>>>> app//org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
>>>>>>
>>>>>> app//org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
>>>>>>
>>>>>> app//org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
>>>>>>
>>>>>> app//org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
>>>>>>
>>>>>> app//org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
>>>>>>
>>>>>> app//com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
>>>>>>
>>>>>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1323)
>>>>>>
>>>>>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1139)
>>>>>>
>>>>>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796)
>>>>>>
>>>>>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764)
>>>>>>
>>>>>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738)
>>>>>>
>>>>>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698)
>>>>>>
>>>>>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680)
>>>>>>
>>>>>> app//com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544)
>>>>>>
>>>>>> app//com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524)
>>>>>>
>>>>>> app//com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5054)
>>>>>>
>>>>>> app//com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5000)
>>>>>>
>>>>>> app//com.amazonaws.services.s3.AmazonS3Client.initiateMultipartUpload(AmazonS3Client.java:3574)
>>>>>>
>>>>>> app//org.apache.hadoop.fs.s3a.S3AFileSystem.initiateMultipartUpload(S3AFileSystem.java:2597)
>>>>>>
>>>>>> *3. Tasks do not complete*
>>>>>>
>>>>>> 2020-10-07 21:00:37.458 ERROR 1 --- [9580107498927).]
>>>>>> o.a.f.runtime.taskexecutor.TaskExecutor  : Task did not exit gracefully
>>>>>> within 180 + seconds.
>>>>>>
>>>>>> org.apache.flink.util.FlinkRuntimeException: Task did not exit
>>>>>> gracefully within 180 + seconds.
>>>>>> at
>>>>>> org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1572)
>>>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>>>> at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
>>>>>>
>>>>>> 2020-10-07 21:00:37.459 ERROR 1 --- [9580107498927).]
>>>>>> o.a.f.runtime.minicluster.MiniCluster    : TaskManager #0 failed.
>>>>>>
>>>>>> org.apache.flink.util.FlinkRuntimeException: Task did not exit
>>>>>> gracefully within 180 + seconds.
>>>>>> at
>>>>>> org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1572)
>>>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>>>> at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
>>>>>>
>>>>>> *4. When trying to restart, there are no slots*
>>>>>>
>>>>>> 2020-10-07 21:00:37.486  INFO 1 --- [t-dispatcher-46]
>>>>>> o.a.f.r.executiongraph.ExecutionGraph    : Compute vehicle location from
>>>>>> tag reads -> Sink: Vehicle Event Sink (2/12)
>>>>>> (064e7b4692314286791305bfa636b209) switched from SCHEDULED to FAILED on not
>>>>>> deployed.
>>>>>>
>>>>>> java.util.concurrent.CompletionException:
>>>>>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>>>>>> No pooled slot available and request to ResourceManager for new slot failed
>>>>>> at
>>>>>> java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
>>>>>> ~[na:na]
>>>>>> at
>>>>>> java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
>>>>>> ~[na:na]
>>>>>> at
>>>>>> java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632)
>>>>>> ~[na:na]
>>>>>> at
>>>>>> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>>>>>> ~[na:na]
>>>>>> at
>>>>>> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2152)
>>>>>> ~[na:na]
>>>>>> at
>>>>>> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.slotRequestToResourceManagerFailed(SlotPoolImpl.java:360)
>>>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>>>> at
>>>>>> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.lambda$requestSlotFromResourceManager$1(SlotPoolImpl.java:348)
>>>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>>>> at
>>>>>> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>>>>>> ~[na:na]
>>>>>> at
>>>>>> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>>>>>> ~[na:na]
>>>>>> at
>>>>>> java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
>>>>>> ~[na:na]
>>>>>> at
>>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
>>>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>>>> at
>>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
>>>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>>>> at
>>>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>>>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>>>> at
>>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>>>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>>>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>>>>> ~[scala-library-2.11.12.jar:na]
>>>>>> at
>>>>>> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>>>>> at
>>>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>>>>>> ~[scala-library-2.11.12.jar:na]
>>>>>> at
>>>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>>>> ~[scala-library-2.11.12.jar:na]
>>>>>> at
>>>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>>>> ~[scala-library-2.11.12.jar:na]
>>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>>>>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>>>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>>>>> at
>>>>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>>>>> at
>>>>>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>>>>> at
>>>>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>>>>> Caused by:
>>>>>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>>>>>> No pooled slot available and request to ResourceManager for new slot failed
>>>>>> ... 27 common frames omitted
>>>>>> Caused by:
>>>>>> org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException:
>>>>>> Could not fulfill slot request d0e3ac18beece314e56aa23cd2265d92.
>>>>>> at
>>>>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:391)
>>>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>>>> at
>>>>>> org.apache.flink.runtime.resourcemanager.ResourceManager.requestSlot(ResourceManager.java:457)
>>>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>>>> at
>>>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>>>> Method) ~[na:na]
>>>>>> at
>>>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>> ~[na:na]
>>>>>> at
>>>>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>> ~[na:na]
>>>>>> at java.base/java.lang.reflect.Method.invoke(Method.java:564) ~[na:na]
>>>>>> at
>>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
>>>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>>>> at
>>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
>>>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>>>> ... 20 common frames omitted
>>>>>> Caused by:
>>>>>> org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException:
>>>>>> Could not fulfill slot request d0e3ac18beece314e56aa23cd2265d92. Requested
>>>>>> resource profile (ResourceProfile{UNKNOWN}) is unfulfillable.
>>>>>> at
>>>>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.lambda$fulfillPendingSlotRequestWithPendingTaskManagerSlot$11(SlotManagerImpl.java:882)
>>>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>>>> at
>>>>>> org.apache.flink.util.OptionalConsumer.ifNotPresent(OptionalConsumer.java:52)
>>>>>> ~[flink-core-1.11.2.jar:1.11.2]
>>>>>> at
>>>>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.fulfillPendingSlotRequestWithPendingTaskManagerSlot(SlotManagerImpl.java:878)
>>>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>>>> at
>>>>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.lambda$internalRequestSlot$9(SlotManagerImpl.java:865)
>>>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>>>> at
>>>>>> org.apache.flink.util.OptionalConsumer.ifNotPresent(OptionalConsumer.java:52)
>>>>>> ~[flink-core-1.11.2.jar:1.11.2]
>>>>>> at
>>>>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.internalRequestSlot(SlotManagerImpl.java:865)
>>>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>>>> at
>>>>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:386)
>>>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>>>> ... 27 common frames omitted
>>>>>>
>>>>>> Any thoughts / suggestions are much appreciated.
>>>>>>
>>>>>> --
>>>>>> Dan Diephouse
>>>>>> @dandiep
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Dan Diephouse
>>>> @dandiep
>>>>
>>>
>>>
>>> --
>>> Dan Diephouse
>>> @dandiep
>>>
>>
>>
>> --
>> Dan Diephouse
>> @dandiep
>>
>

-- 
Dan Diephouse
@dandiep

Re: Network issue leading to "No pooled slot available"

Posted by Khachatryan Roman <kh...@gmail.com>.
Thanks for checking this workaround!

I've created a jira issue [1] to check if AWS SDK version can be upgraded
in Flink distribution.

Regards,
Roman


On Fri, Oct 9, 2020 at 12:54 AM Dan Diephouse <da...@netzooid.com> wrote:

> Well, I just dropped in the latest Amazon 1.11.878 SDK and now it
> appears to respect interrupts in a test case I created. (the test fails
> with the SDK that is in use by Flink)
>
> I will try it in a full fledged Flink environment and report back.
>
> On Thu, Oct 8, 2020 at 3:41 PM Dan Diephouse <da...@netzooid.com> wrote:
>
>> Did some digging... definitely appears that the Amazon SDK definitely is
>> not picking up the interrupt.  I will try playing with the connection
>> timeout. Hadoop defaults it to 200000 ms, which may be part of the problem.
>> Anyone have any other ideas?
>>
>> In theory this should be fixed by SDK v2 which uses NIO, but I don't
>> think I'm up for all the changes that would involve in the downstream
>> components.
>>
>> On Thu, Oct 8, 2020 at 8:36 AM Dan Diephouse <da...@netzooid.com> wrote:
>>
>>> Using the latest - 1.11.2.
>>>
>>> I would assume the interruption is being ignored in the Hadoop / S3
>>> layer. I was looking at the defaults and (if I understood correctly) the
>>> client will retry 20 times. Which would explain why it never gets
>>> cancelled...
>>>
>>> On Thu, Oct 8, 2020 at 1:27 AM Khachatryan Roman <
>>> khachatryan.roman@gmail.com> wrote:
>>>
>>>> Hi Dan Diephouse,
>>>>
>>>> From the logs you provided indeed it looks like 1 causes 2 => 3 => 4,
>>>> where 2 is a bug.
>>>> It's unclear though where the interruption is ignored (Flink/Hadoop
>>>> FS/S3 client).
>>>>
>>>> What version of Flink are you using?
>>>>
>>>> Regards,
>>>> Roman
>>>>
>>>>
>>>> On Wed, Oct 7, 2020 at 11:16 PM Dan Diephouse <da...@netzooid.com> wrote:
>>>>
>>>>> I am now using the S3 StreamingFileSink to send data to an S3 bucket.
>>>>> If/when the network connection has issues, it seems to put Flink into an
>>>>> irrecoverable state. Am I understanding this correctly? Any suggestions on
>>>>> how to troubleshoot / fix?
>>>>>
>>>>> Here is what I'm observing:
>>>>>
>>>>> *1. Network is dropped *
>>>>>
>>>>> *2. S3 connections do not exit gracefully*
>>>>>
>>>>> 2020-10-07 20:58:07.468  WARN 1 --- [aa565930b86fb).]
>>>>> o.apache.flink.runtime.taskmanager.Task  : Task 'Sink: Unnamed (1/12)' did
>>>>> not react to cancelling signal for 30 seconds, but is stuck in method:
>>>>>  java.base@14.0.2/sun.nio.ch.Net.poll(Native Method)
>>>>> java.base@14.0.2/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:181)
>>>>> java.base@14.0.2
>>>>> /sun.nio.ch.NioSocketImpl.timedRead(NioSocketImpl.java:285)
>>>>> java.base@14.0.2
>>>>> /sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:309)
>>>>> java.base@14.0.2/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:350)
>>>>> java.base@14.0.2
>>>>> /sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:803)
>>>>> java.base@14.0.2
>>>>> /java.net.Socket$SocketInputStream.read(Socket.java:982)
>>>>> java.base@14.0.2
>>>>> /sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:469)
>>>>> java.base@14.0.2
>>>>> /sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:463)
>>>>> java.base@14.0.2
>>>>> /sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:160)
>>>>> java.base@14.0.2
>>>>> /sun.security.ssl.SSLTransport.decode(SSLTransport.java:110)
>>>>> java.base@14.0.2
>>>>> /sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1475)
>>>>> java.base@14.0.2
>>>>> /sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1381)
>>>>> java.base@14.0.2
>>>>> /sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:441)
>>>>> java.base@14.0.2
>>>>> /sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:412)
>>>>>
>>>>> app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.createLayeredSocket(SSLConnectionSocketFactory.java:436)
>>>>>
>>>>> app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:384)
>>>>>
>>>>> app//com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:142)
>>>>>
>>>>> app//org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
>>>>>
>>>>> app//org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:374)
>>>>> jdk.internal.reflect.GeneratedMethodAccessor92.invoke(Unknown Source)
>>>>> java.base@14.0.2
>>>>> /jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> java.base@14.0.2/java.lang.reflect.Method.invoke(Method.java:564)
>>>>>
>>>>> app//com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
>>>>> app//com.amazonaws.http.conn.$Proxy89.connect(Unknown Source)
>>>>>
>>>>> app//org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
>>>>>
>>>>> app//org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
>>>>>
>>>>> app//org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
>>>>>
>>>>> app//org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
>>>>>
>>>>> app//org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
>>>>>
>>>>> app//org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
>>>>>
>>>>> app//com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
>>>>>
>>>>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1323)
>>>>>
>>>>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1139)
>>>>>
>>>>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796)
>>>>>
>>>>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764)
>>>>>
>>>>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738)
>>>>>
>>>>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698)
>>>>>
>>>>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680)
>>>>>
>>>>> app//com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544)
>>>>>
>>>>> app//com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524)
>>>>>
>>>>> app//com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5054)
>>>>>
>>>>> app//com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5000)
>>>>>
>>>>> app//com.amazonaws.services.s3.AmazonS3Client.initiateMultipartUpload(AmazonS3Client.java:3574)
>>>>>
>>>>> app//org.apache.hadoop.fs.s3a.S3AFileSystem.initiateMultipartUpload(S3AFileSystem.java:2597)
>>>>>
>>>>> *3. Tasks do not complete*
>>>>>
>>>>> 2020-10-07 21:00:37.458 ERROR 1 --- [9580107498927).]
>>>>> o.a.f.runtime.taskexecutor.TaskExecutor  : Task did not exit gracefully
>>>>> within 180 + seconds.
>>>>>
>>>>> org.apache.flink.util.FlinkRuntimeException: Task did not exit
>>>>> gracefully within 180 + seconds.
>>>>> at
>>>>> org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1572)
>>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>>> at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
>>>>>
>>>>> 2020-10-07 21:00:37.459 ERROR 1 --- [9580107498927).]
>>>>> o.a.f.runtime.minicluster.MiniCluster    : TaskManager #0 failed.
>>>>>
>>>>> org.apache.flink.util.FlinkRuntimeException: Task did not exit
>>>>> gracefully within 180 + seconds.
>>>>> at
>>>>> org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1572)
>>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>>> at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
>>>>>
>>>>> *4. When trying to restart, there are no slots*
>>>>>
>>>>> 2020-10-07 21:00:37.486  INFO 1 --- [t-dispatcher-46]
>>>>> o.a.f.r.executiongraph.ExecutionGraph    : Compute vehicle location from
>>>>> tag reads -> Sink: Vehicle Event Sink (2/12)
>>>>> (064e7b4692314286791305bfa636b209) switched from SCHEDULED to FAILED on not
>>>>> deployed.
>>>>>
>>>>> java.util.concurrent.CompletionException:
>>>>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>>>>> No pooled slot available and request to ResourceManager for new slot failed
>>>>> at
>>>>> java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
>>>>> ~[na:na]
>>>>> at
>>>>> java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
>>>>> ~[na:na]
>>>>> at
>>>>> java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632)
>>>>> ~[na:na]
>>>>> at
>>>>> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>>>>> ~[na:na]
>>>>> at
>>>>> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2152)
>>>>> ~[na:na]
>>>>> at
>>>>> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.slotRequestToResourceManagerFailed(SlotPoolImpl.java:360)
>>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>>> at
>>>>> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.lambda$requestSlotFromResourceManager$1(SlotPoolImpl.java:348)
>>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>>> at
>>>>> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>>>>> ~[na:na]
>>>>> at
>>>>> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>>>>> ~[na:na]
>>>>> at
>>>>> java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
>>>>> ~[na:na]
>>>>> at
>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
>>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>>> at
>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
>>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>>> at
>>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>>> at
>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>>>> ~[scala-library-2.11.12.jar:na]
>>>>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>>>>> ~[scala-library-2.11.12.jar:na]
>>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>>> ~[scala-library-2.11.12.jar:na]
>>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>>> ~[scala-library-2.11.12.jar:na]
>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>>>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>>>> at
>>>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>>>> at
>>>>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>>>> at
>>>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>>>> Caused by:
>>>>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>>>>> No pooled slot available and request to ResourceManager for new slot failed
>>>>> ... 27 common frames omitted
>>>>> Caused by:
>>>>> org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException:
>>>>> Could not fulfill slot request d0e3ac18beece314e56aa23cd2265d92.
>>>>> at
>>>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:391)
>>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>>> at
>>>>> org.apache.flink.runtime.resourcemanager.ResourceManager.requestSlot(ResourceManager.java:457)
>>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>>> at
>>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>>> Method) ~[na:na]
>>>>> at
>>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>> ~[na:na]
>>>>> at
>>>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> ~[na:na]
>>>>> at java.base/java.lang.reflect.Method.invoke(Method.java:564) ~[na:na]
>>>>> at
>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
>>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>>> at
>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
>>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>>> ... 20 common frames omitted
>>>>> Caused by:
>>>>> org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException:
>>>>> Could not fulfill slot request d0e3ac18beece314e56aa23cd2265d92. Requested
>>>>> resource profile (ResourceProfile{UNKNOWN}) is unfulfillable.
>>>>> at
>>>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.lambda$fulfillPendingSlotRequestWithPendingTaskManagerSlot$11(SlotManagerImpl.java:882)
>>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>>> at
>>>>> org.apache.flink.util.OptionalConsumer.ifNotPresent(OptionalConsumer.java:52)
>>>>> ~[flink-core-1.11.2.jar:1.11.2]
>>>>> at
>>>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.fulfillPendingSlotRequestWithPendingTaskManagerSlot(SlotManagerImpl.java:878)
>>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>>> at
>>>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.lambda$internalRequestSlot$9(SlotManagerImpl.java:865)
>>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>>> at
>>>>> org.apache.flink.util.OptionalConsumer.ifNotPresent(OptionalConsumer.java:52)
>>>>> ~[flink-core-1.11.2.jar:1.11.2]
>>>>> at
>>>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.internalRequestSlot(SlotManagerImpl.java:865)
>>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>>> at
>>>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:386)
>>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>>> ... 27 common frames omitted
>>>>>
>>>>> Any thoughts / suggestions are much appreciated.
>>>>>
>>>>> --
>>>>> Dan Diephouse
>>>>> @dandiep
>>>>>
>>>>
>>>
>>> --
>>> Dan Diephouse
>>> @dandiep
>>>
>>
>>
>> --
>> Dan Diephouse
>> @dandiep
>>
>
>
> --
> Dan Diephouse
> @dandiep
>

Re: Network issue leading to "No pooled slot available"

Posted by Dan Diephouse <da...@netzooid.com>.
Well, I just dropped in the latest Amazon 1.11.878 SDK and now it
appears to respect interrupts in a test case I created. (the test fails
with the SDK that is in use by Flink)

I will try it in a full fledged Flink environment and report back.

On Thu, Oct 8, 2020 at 3:41 PM Dan Diephouse <da...@netzooid.com> wrote:

> Did some digging... definitely appears that the Amazon SDK definitely is
> not picking up the interrupt.  I will try playing with the connection
> timeout. Hadoop defaults it to 200000 ms, which may be part of the problem.
> Anyone have any other ideas?
>
> In theory this should be fixed by SDK v2 which uses NIO, but I don't think
> I'm up for all the changes that would involve in the downstream components.
>
> On Thu, Oct 8, 2020 at 8:36 AM Dan Diephouse <da...@netzooid.com> wrote:
>
>> Using the latest - 1.11.2.
>>
>> I would assume the interruption is being ignored in the Hadoop / S3
>> layer. I was looking at the defaults and (if I understood correctly) the
>> client will retry 20 times. Which would explain why it never gets
>> cancelled...
>>
>> On Thu, Oct 8, 2020 at 1:27 AM Khachatryan Roman <
>> khachatryan.roman@gmail.com> wrote:
>>
>>> Hi Dan Diephouse,
>>>
>>> From the logs you provided indeed it looks like 1 causes 2 => 3 => 4,
>>> where 2 is a bug.
>>> It's unclear though where the interruption is ignored (Flink/Hadoop
>>> FS/S3 client).
>>>
>>> What version of Flink are you using?
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Wed, Oct 7, 2020 at 11:16 PM Dan Diephouse <da...@netzooid.com> wrote:
>>>
>>>> I am now using the S3 StreamingFileSink to send data to an S3 bucket.
>>>> If/when the network connection has issues, it seems to put Flink into an
>>>> irrecoverable state. Am I understanding this correctly? Any suggestions on
>>>> how to troubleshoot / fix?
>>>>
>>>> Here is what I'm observing:
>>>>
>>>> *1. Network is dropped *
>>>>
>>>> *2. S3 connections do not exit gracefully*
>>>>
>>>> 2020-10-07 20:58:07.468  WARN 1 --- [aa565930b86fb).]
>>>> o.apache.flink.runtime.taskmanager.Task  : Task 'Sink: Unnamed (1/12)' did
>>>> not react to cancelling signal for 30 seconds, but is stuck in method:
>>>>  java.base@14.0.2/sun.nio.ch.Net.poll(Native Method)
>>>> java.base@14.0.2/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:181)
>>>> java.base@14.0.2
>>>> /sun.nio.ch.NioSocketImpl.timedRead(NioSocketImpl.java:285)
>>>> java.base@14.0.2
>>>> /sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:309)
>>>> java.base@14.0.2/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:350)
>>>> java.base@14.0.2
>>>> /sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:803)
>>>> java.base@14.0.2
>>>> /java.net.Socket$SocketInputStream.read(Socket.java:982)
>>>> java.base@14.0.2
>>>> /sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:469)
>>>> java.base@14.0.2
>>>> /sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:463)
>>>> java.base@14.0.2
>>>> /sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:160)
>>>> java.base@14.0.2
>>>> /sun.security.ssl.SSLTransport.decode(SSLTransport.java:110)
>>>> java.base@14.0.2
>>>> /sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1475)
>>>> java.base@14.0.2
>>>> /sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1381)
>>>> java.base@14.0.2
>>>> /sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:441)
>>>> java.base@14.0.2
>>>> /sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:412)
>>>>
>>>> app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.createLayeredSocket(SSLConnectionSocketFactory.java:436)
>>>>
>>>> app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:384)
>>>>
>>>> app//com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:142)
>>>>
>>>> app//org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
>>>>
>>>> app//org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:374)
>>>> jdk.internal.reflect.GeneratedMethodAccessor92.invoke(Unknown Source)
>>>> java.base@14.0.2
>>>> /jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> java.base@14.0.2/java.lang.reflect.Method.invoke(Method.java:564)
>>>>
>>>> app//com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
>>>> app//com.amazonaws.http.conn.$Proxy89.connect(Unknown Source)
>>>>
>>>> app//org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
>>>>
>>>> app//org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
>>>>
>>>> app//org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
>>>>
>>>> app//org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
>>>>
>>>> app//org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
>>>>
>>>> app//org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
>>>>
>>>> app//com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
>>>>
>>>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1323)
>>>>
>>>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1139)
>>>>
>>>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796)
>>>>
>>>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764)
>>>>
>>>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738)
>>>>
>>>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698)
>>>>
>>>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680)
>>>>
>>>> app//com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544)
>>>>
>>>> app//com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524)
>>>>
>>>> app//com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5054)
>>>>
>>>> app//com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5000)
>>>>
>>>> app//com.amazonaws.services.s3.AmazonS3Client.initiateMultipartUpload(AmazonS3Client.java:3574)
>>>>
>>>> app//org.apache.hadoop.fs.s3a.S3AFileSystem.initiateMultipartUpload(S3AFileSystem.java:2597)
>>>>
>>>> *3. Tasks do not complete*
>>>>
>>>> 2020-10-07 21:00:37.458 ERROR 1 --- [9580107498927).]
>>>> o.a.f.runtime.taskexecutor.TaskExecutor  : Task did not exit gracefully
>>>> within 180 + seconds.
>>>>
>>>> org.apache.flink.util.FlinkRuntimeException: Task did not exit
>>>> gracefully within 180 + seconds.
>>>> at
>>>> org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1572)
>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>> at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
>>>>
>>>> 2020-10-07 21:00:37.459 ERROR 1 --- [9580107498927).]
>>>> o.a.f.runtime.minicluster.MiniCluster    : TaskManager #0 failed.
>>>>
>>>> org.apache.flink.util.FlinkRuntimeException: Task did not exit
>>>> gracefully within 180 + seconds.
>>>> at
>>>> org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1572)
>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>> at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
>>>>
>>>> *4. When trying to restart, there are no slots*
>>>>
>>>> 2020-10-07 21:00:37.486  INFO 1 --- [t-dispatcher-46]
>>>> o.a.f.r.executiongraph.ExecutionGraph    : Compute vehicle location from
>>>> tag reads -> Sink: Vehicle Event Sink (2/12)
>>>> (064e7b4692314286791305bfa636b209) switched from SCHEDULED to FAILED on not
>>>> deployed.
>>>>
>>>> java.util.concurrent.CompletionException:
>>>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>>>> No pooled slot available and request to ResourceManager for new slot failed
>>>> at
>>>> java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
>>>> ~[na:na]
>>>> at
>>>> java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
>>>> ~[na:na]
>>>> at
>>>> java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632)
>>>> ~[na:na]
>>>> at
>>>> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>>>> ~[na:na]
>>>> at
>>>> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2152)
>>>> ~[na:na]
>>>> at
>>>> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.slotRequestToResourceManagerFailed(SlotPoolImpl.java:360)
>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>> at
>>>> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.lambda$requestSlotFromResourceManager$1(SlotPoolImpl.java:348)
>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>> at
>>>> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>>>> ~[na:na]
>>>> at
>>>> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>>>> ~[na:na]
>>>> at
>>>> java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
>>>> ~[na:na]
>>>> at
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>> at
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>> at
>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>> at
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>>> ~[scala-library-2.11.12.jar:na]
>>>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>>>> ~[scala-library-2.11.12.jar:na]
>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>> ~[scala-library-2.11.12.jar:na]
>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>> ~[scala-library-2.11.12.jar:na]
>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>>> at
>>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>>> at
>>>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>>> at
>>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>>> Caused by:
>>>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>>>> No pooled slot available and request to ResourceManager for new slot failed
>>>> ... 27 common frames omitted
>>>> Caused by:
>>>> org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException:
>>>> Could not fulfill slot request d0e3ac18beece314e56aa23cd2265d92.
>>>> at
>>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:391)
>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>> at
>>>> org.apache.flink.runtime.resourcemanager.ResourceManager.requestSlot(ResourceManager.java:457)
>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>> at
>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>> Method) ~[na:na]
>>>> at
>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>> ~[na:na]
>>>> at
>>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> ~[na:na]
>>>> at java.base/java.lang.reflect.Method.invoke(Method.java:564) ~[na:na]
>>>> at
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>> at
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>> ... 20 common frames omitted
>>>> Caused by:
>>>> org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException:
>>>> Could not fulfill slot request d0e3ac18beece314e56aa23cd2265d92. Requested
>>>> resource profile (ResourceProfile{UNKNOWN}) is unfulfillable.
>>>> at
>>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.lambda$fulfillPendingSlotRequestWithPendingTaskManagerSlot$11(SlotManagerImpl.java:882)
>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>> at
>>>> org.apache.flink.util.OptionalConsumer.ifNotPresent(OptionalConsumer.java:52)
>>>> ~[flink-core-1.11.2.jar:1.11.2]
>>>> at
>>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.fulfillPendingSlotRequestWithPendingTaskManagerSlot(SlotManagerImpl.java:878)
>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>> at
>>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.lambda$internalRequestSlot$9(SlotManagerImpl.java:865)
>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>> at
>>>> org.apache.flink.util.OptionalConsumer.ifNotPresent(OptionalConsumer.java:52)
>>>> ~[flink-core-1.11.2.jar:1.11.2]
>>>> at
>>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.internalRequestSlot(SlotManagerImpl.java:865)
>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>> at
>>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:386)
>>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>>> ... 27 common frames omitted
>>>>
>>>> Any thoughts / suggestions are much appreciated.
>>>>
>>>> --
>>>> Dan Diephouse
>>>> @dandiep
>>>>
>>>
>>
>> --
>> Dan Diephouse
>> @dandiep
>>
>
>
> --
> Dan Diephouse
> @dandiep
>


-- 
Dan Diephouse
@dandiep

Re: Network issue leading to "No pooled slot available"

Posted by Dan Diephouse <da...@netzooid.com>.
Did some digging... definitely appears that the Amazon SDK definitely is
not picking up the interrupt.  I will try playing with the connection
timeout. Hadoop defaults it to 200000 ms, which may be part of the problem.
Anyone have any other ideas?

In theory this should be fixed by SDK v2 which uses NIO, but I don't think
I'm up for all the changes that would involve in the downstream components.

On Thu, Oct 8, 2020 at 8:36 AM Dan Diephouse <da...@netzooid.com> wrote:

> Using the latest - 1.11.2.
>
> I would assume the interruption is being ignored in the Hadoop / S3 layer.
> I was looking at the defaults and (if I understood correctly) the client
> will retry 20 times. Which would explain why it never gets cancelled...
>
> On Thu, Oct 8, 2020 at 1:27 AM Khachatryan Roman <
> khachatryan.roman@gmail.com> wrote:
>
>> Hi Dan Diephouse,
>>
>> From the logs you provided indeed it looks like 1 causes 2 => 3 => 4,
>> where 2 is a bug.
>> It's unclear though where the interruption is ignored (Flink/Hadoop FS/S3
>> client).
>>
>> What version of Flink are you using?
>>
>> Regards,
>> Roman
>>
>>
>> On Wed, Oct 7, 2020 at 11:16 PM Dan Diephouse <da...@netzooid.com> wrote:
>>
>>> I am now using the S3 StreamingFileSink to send data to an S3 bucket.
>>> If/when the network connection has issues, it seems to put Flink into an
>>> irrecoverable state. Am I understanding this correctly? Any suggestions on
>>> how to troubleshoot / fix?
>>>
>>> Here is what I'm observing:
>>>
>>> *1. Network is dropped *
>>>
>>> *2. S3 connections do not exit gracefully*
>>>
>>> 2020-10-07 20:58:07.468  WARN 1 --- [aa565930b86fb).]
>>> o.apache.flink.runtime.taskmanager.Task  : Task 'Sink: Unnamed (1/12)' did
>>> not react to cancelling signal for 30 seconds, but is stuck in method:
>>>  java.base@14.0.2/sun.nio.ch.Net.poll(Native Method)
>>> java.base@14.0.2/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:181)
>>> java.base@14.0.2
>>> /sun.nio.ch.NioSocketImpl.timedRead(NioSocketImpl.java:285)
>>> java.base@14.0.2
>>> /sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:309)
>>> java.base@14.0.2/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:350)
>>> java.base@14.0.2/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:803)
>>> java.base@14.0.2/java.net.Socket$SocketInputStream.read(Socket.java:982)
>>> java.base@14.0.2
>>> /sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:469)
>>> java.base@14.0.2
>>> /sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:463)
>>> java.base@14.0.2
>>> /sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:160)
>>> java.base@14.0.2
>>> /sun.security.ssl.SSLTransport.decode(SSLTransport.java:110)
>>> java.base@14.0.2
>>> /sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1475)
>>> java.base@14.0.2
>>> /sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1381)
>>> java.base@14.0.2
>>> /sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:441)
>>> java.base@14.0.2
>>> /sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:412)
>>>
>>> app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.createLayeredSocket(SSLConnectionSocketFactory.java:436)
>>>
>>> app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:384)
>>>
>>> app//com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:142)
>>>
>>> app//org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
>>>
>>> app//org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:374)
>>> jdk.internal.reflect.GeneratedMethodAccessor92.invoke(Unknown Source)
>>> java.base@14.0.2
>>> /jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> java.base@14.0.2/java.lang.reflect.Method.invoke(Method.java:564)
>>>
>>> app//com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
>>> app//com.amazonaws.http.conn.$Proxy89.connect(Unknown Source)
>>>
>>> app//org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
>>>
>>> app//org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
>>>
>>> app//org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
>>>
>>> app//org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
>>>
>>> app//org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
>>>
>>> app//org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
>>>
>>> app//com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
>>>
>>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1323)
>>>
>>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1139)
>>>
>>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796)
>>>
>>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764)
>>>
>>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738)
>>>
>>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698)
>>>
>>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680)
>>>
>>> app//com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544)
>>>
>>> app//com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524)
>>>
>>> app//com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5054)
>>>
>>> app//com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5000)
>>>
>>> app//com.amazonaws.services.s3.AmazonS3Client.initiateMultipartUpload(AmazonS3Client.java:3574)
>>>
>>> app//org.apache.hadoop.fs.s3a.S3AFileSystem.initiateMultipartUpload(S3AFileSystem.java:2597)
>>>
>>> *3. Tasks do not complete*
>>>
>>> 2020-10-07 21:00:37.458 ERROR 1 --- [9580107498927).]
>>> o.a.f.runtime.taskexecutor.TaskExecutor  : Task did not exit gracefully
>>> within 180 + seconds.
>>>
>>> org.apache.flink.util.FlinkRuntimeException: Task did not exit
>>> gracefully within 180 + seconds.
>>> at
>>> org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1572)
>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>> at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
>>>
>>> 2020-10-07 21:00:37.459 ERROR 1 --- [9580107498927).]
>>> o.a.f.runtime.minicluster.MiniCluster    : TaskManager #0 failed.
>>>
>>> org.apache.flink.util.FlinkRuntimeException: Task did not exit
>>> gracefully within 180 + seconds.
>>> at
>>> org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1572)
>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>> at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
>>>
>>> *4. When trying to restart, there are no slots*
>>>
>>> 2020-10-07 21:00:37.486  INFO 1 --- [t-dispatcher-46]
>>> o.a.f.r.executiongraph.ExecutionGraph    : Compute vehicle location from
>>> tag reads -> Sink: Vehicle Event Sink (2/12)
>>> (064e7b4692314286791305bfa636b209) switched from SCHEDULED to FAILED on not
>>> deployed.
>>>
>>> java.util.concurrent.CompletionException:
>>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>>> No pooled slot available and request to ResourceManager for new slot failed
>>> at
>>> java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
>>> ~[na:na]
>>> at
>>> java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
>>> ~[na:na]
>>> at
>>> java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632)
>>> ~[na:na]
>>> at
>>> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>>> ~[na:na]
>>> at
>>> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2152)
>>> ~[na:na]
>>> at
>>> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.slotRequestToResourceManagerFailed(SlotPoolImpl.java:360)
>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>> at
>>> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.lambda$requestSlotFromResourceManager$1(SlotPoolImpl.java:348)
>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>> at
>>> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>>> ~[na:na]
>>> at
>>> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>>> ~[na:na]
>>> at
>>> java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
>>> ~[na:na]
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>> at
>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>> ~[scala-library-2.11.12.jar:na]
>>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>>> ~[scala-library-2.11.12.jar:na]
>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>> ~[scala-library-2.11.12.jar:na]
>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>> ~[scala-library-2.11.12.jar:na]
>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>> at
>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>> at
>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>>> Caused by:
>>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>>> No pooled slot available and request to ResourceManager for new slot failed
>>> ... 27 common frames omitted
>>> Caused by:
>>> org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException:
>>> Could not fulfill slot request d0e3ac18beece314e56aa23cd2265d92.
>>> at
>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:391)
>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>> at
>>> org.apache.flink.runtime.resourcemanager.ResourceManager.requestSlot(ResourceManager.java:457)
>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>> at
>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>>> Method) ~[na:na]
>>> at
>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> ~[na:na]
>>> at
>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> ~[na:na]
>>> at java.base/java.lang.reflect.Method.invoke(Method.java:564) ~[na:na]
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>> ... 20 common frames omitted
>>> Caused by:
>>> org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException:
>>> Could not fulfill slot request d0e3ac18beece314e56aa23cd2265d92. Requested
>>> resource profile (ResourceProfile{UNKNOWN}) is unfulfillable.
>>> at
>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.lambda$fulfillPendingSlotRequestWithPendingTaskManagerSlot$11(SlotManagerImpl.java:882)
>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>> at
>>> org.apache.flink.util.OptionalConsumer.ifNotPresent(OptionalConsumer.java:52)
>>> ~[flink-core-1.11.2.jar:1.11.2]
>>> at
>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.fulfillPendingSlotRequestWithPendingTaskManagerSlot(SlotManagerImpl.java:878)
>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>> at
>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.lambda$internalRequestSlot$9(SlotManagerImpl.java:865)
>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>> at
>>> org.apache.flink.util.OptionalConsumer.ifNotPresent(OptionalConsumer.java:52)
>>> ~[flink-core-1.11.2.jar:1.11.2]
>>> at
>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.internalRequestSlot(SlotManagerImpl.java:865)
>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>> at
>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:386)
>>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>>> ... 27 common frames omitted
>>>
>>> Any thoughts / suggestions are much appreciated.
>>>
>>> --
>>> Dan Diephouse
>>> @dandiep
>>>
>>
>
> --
> Dan Diephouse
> @dandiep
>


-- 
Dan Diephouse
@dandiep

Re: Network issue leading to "No pooled slot available"

Posted by Dan Diephouse <da...@netzooid.com>.
Using the latest - 1.11.2.

I would assume the interruption is being ignored in the Hadoop / S3 layer.
I was looking at the defaults and (if I understood correctly) the client
will retry 20 times. Which would explain why it never gets cancelled...

On Thu, Oct 8, 2020 at 1:27 AM Khachatryan Roman <
khachatryan.roman@gmail.com> wrote:

> Hi Dan Diephouse,
>
> From the logs you provided indeed it looks like 1 causes 2 => 3 => 4,
> where 2 is a bug.
> It's unclear though where the interruption is ignored (Flink/Hadoop FS/S3
> client).
>
> What version of Flink are you using?
>
> Regards,
> Roman
>
>
> On Wed, Oct 7, 2020 at 11:16 PM Dan Diephouse <da...@netzooid.com> wrote:
>
>> I am now using the S3 StreamingFileSink to send data to an S3 bucket.
>> If/when the network connection has issues, it seems to put Flink into an
>> irrecoverable state. Am I understanding this correctly? Any suggestions on
>> how to troubleshoot / fix?
>>
>> Here is what I'm observing:
>>
>> *1. Network is dropped *
>>
>> *2. S3 connections do not exit gracefully*
>>
>> 2020-10-07 20:58:07.468  WARN 1 --- [aa565930b86fb).]
>> o.apache.flink.runtime.taskmanager.Task  : Task 'Sink: Unnamed (1/12)' did
>> not react to cancelling signal for 30 seconds, but is stuck in method:
>>  java.base@14.0.2/sun.nio.ch.Net.poll(Native Method)
>> java.base@14.0.2/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:181)
>> java.base@14.0.2
>> /sun.nio.ch.NioSocketImpl.timedRead(NioSocketImpl.java:285)
>> java.base@14.0.2
>> /sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:309)
>> java.base@14.0.2/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:350)
>> java.base@14.0.2/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:803)
>> java.base@14.0.2/java.net.Socket$SocketInputStream.read(Socket.java:982)
>> java.base@14.0.2
>> /sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:469)
>> java.base@14.0.2
>> /sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:463)
>> java.base@14.0.2
>> /sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:160)
>> java.base@14.0.2
>> /sun.security.ssl.SSLTransport.decode(SSLTransport.java:110)
>> java.base@14.0.2
>> /sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1475)
>> java.base@14.0.2
>> /sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1381)
>> java.base@14.0.2
>> /sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:441)
>> java.base@14.0.2
>> /sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:412)
>>
>> app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.createLayeredSocket(SSLConnectionSocketFactory.java:436)
>>
>> app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:384)
>>
>> app//com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:142)
>>
>> app//org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
>>
>> app//org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:374)
>> jdk.internal.reflect.GeneratedMethodAccessor92.invoke(Unknown Source)
>> java.base@14.0.2
>> /jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> java.base@14.0.2/java.lang.reflect.Method.invoke(Method.java:564)
>>
>> app//com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
>> app//com.amazonaws.http.conn.$Proxy89.connect(Unknown Source)
>>
>> app//org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
>>
>> app//org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
>>
>> app//org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
>>
>> app//org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
>>
>> app//org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
>>
>> app//org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
>>
>> app//com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
>>
>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1323)
>>
>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1139)
>>
>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796)
>>
>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764)
>>
>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738)
>>
>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698)
>>
>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680)
>>
>> app//com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544)
>>
>> app//com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524)
>>
>> app//com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5054)
>>
>> app//com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5000)
>>
>> app//com.amazonaws.services.s3.AmazonS3Client.initiateMultipartUpload(AmazonS3Client.java:3574)
>>
>> app//org.apache.hadoop.fs.s3a.S3AFileSystem.initiateMultipartUpload(S3AFileSystem.java:2597)
>>
>> *3. Tasks do not complete*
>>
>> 2020-10-07 21:00:37.458 ERROR 1 --- [9580107498927).]
>> o.a.f.runtime.taskexecutor.TaskExecutor  : Task did not exit gracefully
>> within 180 + seconds.
>>
>> org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully
>> within 180 + seconds.
>> at
>> org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1572)
>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>> at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
>>
>> 2020-10-07 21:00:37.459 ERROR 1 --- [9580107498927).]
>> o.a.f.runtime.minicluster.MiniCluster    : TaskManager #0 failed.
>>
>> org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully
>> within 180 + seconds.
>> at
>> org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1572)
>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>> at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
>>
>> *4. When trying to restart, there are no slots*
>>
>> 2020-10-07 21:00:37.486  INFO 1 --- [t-dispatcher-46]
>> o.a.f.r.executiongraph.ExecutionGraph    : Compute vehicle location from
>> tag reads -> Sink: Vehicle Event Sink (2/12)
>> (064e7b4692314286791305bfa636b209) switched from SCHEDULED to FAILED on not
>> deployed.
>>
>> java.util.concurrent.CompletionException:
>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>> No pooled slot available and request to ResourceManager for new slot failed
>> at
>> java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
>> ~[na:na]
>> at
>> java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
>> ~[na:na]
>> at
>> java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632)
>> ~[na:na]
>> at
>> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>> ~[na:na]
>> at
>> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2152)
>> ~[na:na]
>> at
>> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.slotRequestToResourceManagerFailed(SlotPoolImpl.java:360)
>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>> at
>> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.lambda$requestSlotFromResourceManager$1(SlotPoolImpl.java:348)
>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>> at
>> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>> ~[na:na]
>> at
>> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>> ~[na:na]
>> at
>> java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
>> ~[na:na]
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>> ~[scala-library-2.11.12.jar:na]
>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>> ~[scala-library-2.11.12.jar:na]
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> ~[scala-library-2.11.12.jar:na]
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> ~[scala-library-2.11.12.jar:na]
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>> at
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>> at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
>> Caused by:
>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>> No pooled slot available and request to ResourceManager for new slot failed
>> ... 27 common frames omitted
>> Caused by:
>> org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException:
>> Could not fulfill slot request d0e3ac18beece314e56aa23cd2265d92.
>> at
>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:391)
>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>> at
>> org.apache.flink.runtime.resourcemanager.ResourceManager.requestSlot(ResourceManager.java:457)
>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>> Method) ~[na:na]
>> at
>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> ~[na:na]
>> at
>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> ~[na:na]
>> at java.base/java.lang.reflect.Method.invoke(Method.java:564) ~[na:na]
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>> ... 20 common frames omitted
>> Caused by:
>> org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException:
>> Could not fulfill slot request d0e3ac18beece314e56aa23cd2265d92. Requested
>> resource profile (ResourceProfile{UNKNOWN}) is unfulfillable.
>> at
>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.lambda$fulfillPendingSlotRequestWithPendingTaskManagerSlot$11(SlotManagerImpl.java:882)
>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>> at
>> org.apache.flink.util.OptionalConsumer.ifNotPresent(OptionalConsumer.java:52)
>> ~[flink-core-1.11.2.jar:1.11.2]
>> at
>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.fulfillPendingSlotRequestWithPendingTaskManagerSlot(SlotManagerImpl.java:878)
>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>> at
>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.lambda$internalRequestSlot$9(SlotManagerImpl.java:865)
>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>> at
>> org.apache.flink.util.OptionalConsumer.ifNotPresent(OptionalConsumer.java:52)
>> ~[flink-core-1.11.2.jar:1.11.2]
>> at
>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.internalRequestSlot(SlotManagerImpl.java:865)
>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>> at
>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:386)
>> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>> ... 27 common frames omitted
>>
>> Any thoughts / suggestions are much appreciated.
>>
>> --
>> Dan Diephouse
>> @dandiep
>>
>

-- 
Dan Diephouse
@dandiep

Re: Network issue leading to "No pooled slot available"

Posted by Khachatryan Roman <kh...@gmail.com>.
Hi Dan Diephouse,

From the logs you provided indeed it looks like 1 causes 2 => 3 => 4, where
2 is a bug.
It's unclear though where the interruption is ignored (Flink/Hadoop FS/S3
client).

What version of Flink are you using?

Regards,
Roman


On Wed, Oct 7, 2020 at 11:16 PM Dan Diephouse <da...@netzooid.com> wrote:

> I am now using the S3 StreamingFileSink to send data to an S3 bucket.
> If/when the network connection has issues, it seems to put Flink into an
> irrecoverable state. Am I understanding this correctly? Any suggestions on
> how to troubleshoot / fix?
>
> Here is what I'm observing:
>
> *1. Network is dropped *
>
> *2. S3 connections do not exit gracefully*
>
> 2020-10-07 20:58:07.468  WARN 1 --- [aa565930b86fb).]
> o.apache.flink.runtime.taskmanager.Task  : Task 'Sink: Unnamed (1/12)' did
> not react to cancelling signal for 30 seconds, but is stuck in method:
>  java.base@14.0.2/sun.nio.ch.Net.poll(Native Method)
> java.base@14.0.2/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:181)
> java.base@14.0.2
> /sun.nio.ch.NioSocketImpl.timedRead(NioSocketImpl.java:285)
> java.base@14.0.2/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:309)
> java.base@14.0.2/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:350)
> java.base@14.0.2/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:803)
> java.base@14.0.2/java.net.Socket$SocketInputStream.read(Socket.java:982)
> java.base@14.0.2
> /sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:469)
> java.base@14.0.2
> /sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:463)
> java.base@14.0.2
> /sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:160)
> java.base@14.0.2
> /sun.security.ssl.SSLTransport.decode(SSLTransport.java:110)
> java.base@14.0.2
> /sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1475)
> java.base@14.0.2
> /sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1381)
> java.base@14.0.2
> /sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:441)
> java.base@14.0.2
> /sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:412)
>
> app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.createLayeredSocket(SSLConnectionSocketFactory.java:436)
>
> app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:384)
>
> app//com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:142)
>
> app//org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
>
> app//org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:374)
> jdk.internal.reflect.GeneratedMethodAccessor92.invoke(Unknown Source)
> java.base@14.0.2
> /jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> java.base@14.0.2/java.lang.reflect.Method.invoke(Method.java:564)
>
> app//com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
> app//com.amazonaws.http.conn.$Proxy89.connect(Unknown Source)
>
> app//org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
>
> app//org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
>
> app//org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
>
> app//org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
>
> app//org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
>
> app//org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
>
> app//com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
>
> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1323)
>
> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1139)
>
> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796)
>
> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764)
>
> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738)
>
> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698)
>
> app//com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680)
> app//com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544)
> app//com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524)
>
> app//com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5054)
>
> app//com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5000)
>
> app//com.amazonaws.services.s3.AmazonS3Client.initiateMultipartUpload(AmazonS3Client.java:3574)
>
> app//org.apache.hadoop.fs.s3a.S3AFileSystem.initiateMultipartUpload(S3AFileSystem.java:2597)
>
> *3. Tasks do not complete*
>
> 2020-10-07 21:00:37.458 ERROR 1 --- [9580107498927).]
> o.a.f.runtime.taskexecutor.TaskExecutor  : Task did not exit gracefully
> within 180 + seconds.
>
> org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully
> within 180 + seconds.
> at
> org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1572)
> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
> at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
>
> 2020-10-07 21:00:37.459 ERROR 1 --- [9580107498927).]
> o.a.f.runtime.minicluster.MiniCluster    : TaskManager #0 failed.
>
> org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully
> within 180 + seconds.
> at
> org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1572)
> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
> at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
>
> *4. When trying to restart, there are no slots*
>
> 2020-10-07 21:00:37.486  INFO 1 --- [t-dispatcher-46]
> o.a.f.r.executiongraph.ExecutionGraph    : Compute vehicle location from
> tag reads -> Sink: Vehicle Event Sink (2/12)
> (064e7b4692314286791305bfa636b209) switched from SCHEDULED to FAILED on not
> deployed.
>
> java.util.concurrent.CompletionException:
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> No pooled slot available and request to ResourceManager for new slot failed
> at
> java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
> ~[na:na]
> at
> java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
> ~[na:na]
> at
> java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632)
> ~[na:na]
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> ~[na:na]
> at
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2152)
> ~[na:na]
> at
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.slotRequestToResourceManagerFailed(SlotPoolImpl.java:360)
> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
> at
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.lambda$requestSlotFromResourceManager$1(SlotPoolImpl.java:348)
> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
> at
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
> ~[na:na]
> at
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
> ~[na:na]
> at
> java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
> ~[na:na]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> ~[scala-library-2.11.12.jar:na]
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> ~[scala-library-2.11.12.jar:na]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> ~[scala-library-2.11.12.jar:na]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> ~[scala-library-2.11.12.jar:na]
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> ~[akka-actor_2.11-2.5.21.jar:2.5.21]
> Caused by:
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> No pooled slot available and request to ResourceManager for new slot failed
> ... 27 common frames omitted
> Caused by:
> org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException:
> Could not fulfill slot request d0e3ac18beece314e56aa23cd2265d92.
> at
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:391)
> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
> at
> org.apache.flink.runtime.resourcemanager.ResourceManager.requestSlot(ResourceManager.java:457)
> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method) ~[na:na]
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> ~[na:na]
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[na:na]
> at java.base/java.lang.reflect.Method.invoke(Method.java:564) ~[na:na]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
> ... 20 common frames omitted
> Caused by:
> org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException:
> Could not fulfill slot request d0e3ac18beece314e56aa23cd2265d92. Requested
> resource profile (ResourceProfile{UNKNOWN}) is unfulfillable.
> at
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.lambda$fulfillPendingSlotRequestWithPendingTaskManagerSlot$11(SlotManagerImpl.java:882)
> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
> at
> org.apache.flink.util.OptionalConsumer.ifNotPresent(OptionalConsumer.java:52)
> ~[flink-core-1.11.2.jar:1.11.2]
> at
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.fulfillPendingSlotRequestWithPendingTaskManagerSlot(SlotManagerImpl.java:878)
> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
> at
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.lambda$internalRequestSlot$9(SlotManagerImpl.java:865)
> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
> at
> org.apache.flink.util.OptionalConsumer.ifNotPresent(OptionalConsumer.java:52)
> ~[flink-core-1.11.2.jar:1.11.2]
> at
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.internalRequestSlot(SlotManagerImpl.java:865)
> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
> at
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:386)
> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
> ... 27 common frames omitted
>
> Any thoughts / suggestions are much appreciated.
>
> --
> Dan Diephouse
> @dandiep
>