You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Averell <lv...@gmail.com> on 2019/03/04 12:47:14 UTC

S3 parquet sink - failed with S3 connection exception

Hello everyone,

I have a job which is writing some streams into parquet files in S3. I use
Flink 1.7.2 on EMR 5.21.
My job had been running well, but suddenly it failed to make a checkpoint
with the full stack trace mentioned below. After that failure, the job
restarted from the last successful checkpoint, but it could not make any
further checkpoint - all subsequent checkpoints failed with the same reason. 
Searching on Internet I could only find one explanation: S3Object has not
been closed properly.

Could someone please help?

Thanks and regards,
Averell


 /The program finished with the following exception:

org.apache.flink.util.FlinkException: Triggering a savepoint for the job
8cf68432acb3b4ced2cbc97bc23b4af5 failed.
	at
org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:723)
	at
org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:701)
	at
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:985)
	at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:698)
	at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1065)
	at
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
	at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
	at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: java.util.concurrent.CompletionException:
java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint
failed: Could not perform checkpoint 33 for operator Sink: S3 - Instant
(1/4).
	at
org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$13(JobMaster.java:972)
	at
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
	at
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
	at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
	at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
	at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
	at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
	at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
	at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
	at
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
	at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
	at akka.actor.ActorCell.invoke(ActorCell.scala:495)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.util.concurrent.CompletionException: java.lang.Exception:
Checkpoint failed: Could not perform checkpoint 33 for operator Sink: S3 -
Instant (1/4).
	at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
	at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
	at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
	at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
	at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
	at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
	at
org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortWithCause(PendingCheckpoint.java:452)
	at
org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortError(PendingCheckpoint.java:447)
	at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.discardCheckpoint(CheckpointCoordinator.java:1258)
	at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.failUnacknowledgedPendingCheckpointsFor(CheckpointCoordinator.java:918)
	at
org.apache.flink.runtime.executiongraph.ExecutionGraph.notifyExecutionChange(ExecutionGraph.java:1779)
	at
org.apache.flink.runtime.executiongraph.ExecutionVertex.notifyStateTransition(ExecutionVertex.java:756)
	at
org.apache.flink.runtime.executiongraph.Execution.transitionState(Execution.java:1353)
	at
org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1113)
	at
org.apache.flink.runtime.executiongraph.Execution.markFailed(Execution.java:945)
	at
org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1576)
	at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:542)
	at sun.reflect.GeneratedMethodAccessor33.invoke(Unknown Source)
	at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
	at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
	... 15 more
Caused by: java.lang.Exception: Checkpoint failed: Could not perform
checkpoint 33 for operator Sink: S3 - Instant (1/4).
	... 30 more
Caused by: java.lang.Exception: Could not perform checkpoint 33 for operator
Sink: S3 - Instant (1/4).
	at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:595)
	at
org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:396)
	at
org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:292)
	at
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:200)
	at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:209)
	at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
	at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not complete snapshot 33 for operator
Sink: S3 - Instant (1/4).
	at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:422)
	at
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1113)
	at
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1055)
	at
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:729)
	at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:641)
	at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:586)
	... 8 more
Caused by: java.io.IOException: Uploading parts failed
	at
org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.awaitPendingPartUploadToComplete(RecoverableMultiPartUploadImpl.java:231)
	at
org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.awaitPendingPartsUpload(RecoverableMultiPartUploadImpl.java:215)
	at
org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetRecoverable(RecoverableMultiPartUploadImpl.java:151)
	at
org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetCommitter(RecoverableMultiPartUploadImpl.java:123)
	at
org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetCommitter(RecoverableMultiPartUploadImpl.java:56)
	at
org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.closeForCommit(S3RecoverableFsDataOutputStream.java:167)
	at
org.apache.flink.streaming.api.functions.sink.filesystem.PartFileWriter.closeForCommit(PartFileWriter.java:71)
	at
org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.closeForCommit(BulkPartWriter.java:63)
	at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.closePartFile(Bucket.java:239)
	at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.prepareBucketForCheckpointing(Bucket.java:280)
	at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.onReceptionOfCheckpoint(Bucket.java:253)
	at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.snapshotActiveBuckets(Buckets.java:244)
	at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.snapshotState(Buckets.java:235)
	at
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.snapshotState(StreamingFileSink.java:347)
	at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
	at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
	at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
	at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:395)
	... 13 more
Caused by: java.io.InterruptedIOException: upload part on
output/instant/dt=2019-01-09/part-0-24:
org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: Unable
to execute HTTP request: Timeout waiting for connection from pool
	at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils.translateInterruptedException(S3AUtils.java:340)
	at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:171)
	at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
	at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
	at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
	at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
	at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
	at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123)
	at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.uploadPart(WriteOperationHelper.java:471)
	at
org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.uploadPart(HadoopS3AccessHelper.java:73)
	at
org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl$UploadTask.run(RecoverableMultiPartUploadImpl.java:318)
	at
org.apache.flink.fs.s3.common.utils.BackPressuringExecutor$SemaphoreReleasingRunnable.run(BackPressuringExecutor.java:92)
	at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by:
org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: Unable
to execute HTTP request: Timeout waiting for connection from pool
	at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1114)
	at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1064)
	at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
	at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
	at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
	at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
	at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
	at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
	at
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
	at
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
	at
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3306)
	at
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3291)
	at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:1576)
	at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$uploadPart$8(WriteOperationHelper.java:474)
	at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
	... 12 more
Caused by:
org.apache.flink.fs.s3base.shaded.org.apache.http.conn.ConnectionPoolTimeoutException:
Timeout waiting for connection from pool
	at
org.apache.flink.fs.s3base.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:292)
	at
org.apache.flink.fs.s3base.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionManager.java:269)
	at sun.reflect.GeneratedMethodAccessor58.invoke(Unknown Source)
	at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70)
	at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.conn.$Proxy29.get(Unknown
Source)
	at
org.apache.flink.fs.s3base.shaded.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:191)
	at
org.apache.flink.fs.s3base.shaded.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185)
	at
org.apache.flink.fs.s3base.shaded.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
	at
org.apache.flink.fs.s3base.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
	at
org.apache.flink.fs.s3base.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
	at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
	at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1236)
	at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)
	... 25 more
/




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: S3 parquet sink - failed with S3 connection exception

Posted by Averell <lv...@gmail.com>.
Hello Kostas,

Thanks for your time.

I started that job from fresh, set checkpoint interval to 15 minutes. It
completed the first 13 checkpoints successfully, only started failing from
the 14th. I waited for about 20 more checkpoints, but all failed.
Then I cancelled the job, restored from the last successful checkpoint, and
there were no more issues.

Today, I had another try - restoring from the last successful checkpoint
from yesterday. Result: started getting the same error from the first
checkpoint after restore. 
Tried to cancel and restore again, then no more issue until now (35 more
checkpoints already).

Regarding my job: I have 6 different S3-file-source streams
connected/unioned together, and then connected to a 7th S3-file-source
broadcast stream. Sinks are S3 parquet files and Elasticsearch.
Checkpointing is incremental and uses RocksDB.
This broadcast stream is one of the new changes to my job. The previous
version with 4 out of those 6 sources has been running well for more than a
month without any issue.
TM/JM logs for the first run today (the failure one) are attached.
The Yarn/EMR cluster is dedicated to the job.

I have a feeling that the issue comes from that broadcast stream (as
mentioned in the document, it doesn't use RocksDB). But not quite sure.

Thanks and regards,
Averell

logs.gz
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/logs.gz>  



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: S3 parquet sink - failed with S3 connection exception

Posted by Averell <lv...@gmail.com>.
Hi Kostas and everyone,

I tried to change setFailOnCheckpointingErrors from True to False, and got
the following trace in Flink GUI when the checkpoint/uploading failed. Not
sure whether it would be of any help in identifying the issue.

BTW, could you please help tell where to find the log file that Flink GUI's
Exception tab is reading from?

Thanks and regards,
Averell

java.lang.ArrayIndexOutOfBoundsException: 122626
	at
org.apache.parquet.column.values.dictionary.DictionaryValuesWriter$PlainLongDictionaryValuesWriter.fallBackDictionaryEncodedData(DictionaryValuesWriter.java:397)
	at
org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.fallBackAllValuesTo(DictionaryValuesWriter.java:130)
	at
org.apache.parquet.column.values.fallback.FallbackValuesWriter.fallBack(FallbackValuesWriter.java:153)
	at
org.apache.parquet.column.values.fallback.FallbackValuesWriter.checkFallback(FallbackValuesWriter.java:147)
	at
org.apache.parquet.column.values.fallback.FallbackValuesWriter.writeLong(FallbackValuesWriter.java:181)
	at
org.apache.parquet.column.impl.ColumnWriterV1.write(ColumnWriterV1.java:228)
	at
org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.addLong(MessageColumnIO.java:449)
	at
org.apache.parquet.avro.AvroWriteSupport.writeValueWithoutConversion(AvroWriteSupport.java:327)
	at
org.apache.parquet.avro.AvroWriteSupport.writeValue(AvroWriteSupport.java:278)
	at
org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:191)
	at
org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:165)
	at
org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
	at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:299)
	at
org.apache.flink.formats.parquet.ParquetBulkWriter.addElement(ParquetBulkWriter.java:52)
	at
org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.write(BulkPartWriter.java:50)
	at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:214)
	at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:268)
	at
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:370)
	at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
	at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
	at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
	at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
	at java.lang.Thread.run(Thread.java:748)






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: S3 parquet sink - failed with S3 connection exception

Posted by Averell <lv...@gmail.com>.
Hi Kostas, and everyone,

Just some update to my issue: I have tried to:
 * changed s3 related configuration in hadoop as suggested by hadoop
document [1]: 
 increased /fs.s3a.threads.max/ from 10 to 100, and
/fs.s3a.connection.maximum/ from 15 to 120. For reference, I am having only
3 S3 sinks, with parallelisms of 4, 4, and 1.
 * followed AWS's document [2] to increase their EMRFS maxConnections to
200. However, I doubt that this would make any difference, as in creating
the S3 parquet bucket sink, I needed to use "s3a://..." path. "s3://..."
seems not supported by Flink yet. 
 * reduced the parallelism for my S3 continuous files reader.

However, the problem still randomly occurred (random by job executions. When
it occurred, the only solution is to cancel the job and restart from the
last successful checkpoint).

Thanks and regards,
Averell

[1]  Hadoop-AWS module: Integration with Amazon Web Services
<https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#aTimeout_waiting_for_connection_from_pool_when_writing_to_S3A>  
[2]  emr-timeout-connection-wait
<https://aws.amazon.com/premiumsupport/knowledge-center/emr-timeout-connection-wait/>  



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: S3 parquet sink - failed with S3 connection exception

Posted by Kostas Kloudas <kk...@gmail.com>.
Hi Averell,

Did you have other failures before (from which you managed to resume
successfully)?
Can you share a bit more details about your job and potentially the TM/JM
logs?

The only thing I found about this is here
https://forums.aws.amazon.com/thread.jspa?threadID=130172
but Flink does not directly call getObject(). We rely on Hadoop code to
read objects.
In addition, we do not interact directly with the connection pool.

Cheers,
Kostas

On Mon, Mar 4, 2019 at 1:53 PM Averell <lv...@gmail.com> wrote:

> Hello everyone,
>
> I have a job which is writing some streams into parquet files in S3. I use
> Flink 1.7.2 on EMR 5.21.
> My job had been running well, but suddenly it failed to make a checkpoint
> with the full stack trace mentioned below. After that failure, the job
> restarted from the last successful checkpoint, but it could not make any
> further checkpoint - all subsequent checkpoints failed with the same
> reason.
> Searching on Internet I could only find one explanation: S3Object has not
> been closed properly.
>
> Could someone please help?
>
> Thanks and regards,
> Averell
>
>
>  /The program finished with the following exception:
>
> org.apache.flink.util.FlinkException: Triggering a savepoint for the job
> 8cf68432acb3b4ced2cbc97bc23b4af5 failed.
>         at
>
> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:723)
>         at
>
> org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:701)
>         at
>
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:985)
>         at
> org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:698)
>         at
>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1065)
>         at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:422)
>         at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
>         at
>
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>         at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> Caused by: java.util.concurrent.CompletionException:
> java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint
> failed: Could not perform checkpoint 33 for operator Sink: S3 - Instant
> (1/4).
>         at
>
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$13(JobMaster.java:972)
>         at
>
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
>         at
>
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>         at
>
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>         at
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
>         at
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
>         at
>
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>         at
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>         at
>
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>         at
>
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>         at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.util.concurrent.CompletionException: java.lang.Exception:
> Checkpoint failed: Could not perform checkpoint 33 for operator Sink: S3 -
> Instant (1/4).
>         at
>
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>         at
>
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>         at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
>         at
>
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>         at
>
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>         at
>
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>         at
>
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortWithCause(PendingCheckpoint.java:452)
>         at
>
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortError(PendingCheckpoint.java:447)
>         at
>
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.discardCheckpoint(CheckpointCoordinator.java:1258)
>         at
>
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.failUnacknowledgedPendingCheckpointsFor(CheckpointCoordinator.java:918)
>         at
>
> org.apache.flink.runtime.executiongraph.ExecutionGraph.notifyExecutionChange(ExecutionGraph.java:1779)
>         at
>
> org.apache.flink.runtime.executiongraph.ExecutionVertex.notifyStateTransition(ExecutionVertex.java:756)
>         at
>
> org.apache.flink.runtime.executiongraph.Execution.transitionState(Execution.java:1353)
>         at
>
> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1113)
>         at
>
> org.apache.flink.runtime.executiongraph.Execution.markFailed(Execution.java:945)
>         at
>
> org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1576)
>         at
>
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:542)
>         at sun.reflect.GeneratedMethodAccessor33.invoke(Unknown Source)
>         at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
>         at
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
>         ... 15 more
> Caused by: java.lang.Exception: Checkpoint failed: Could not perform
> checkpoint 33 for operator Sink: S3 - Instant (1/4).
>         ... 30 more
> Caused by: java.lang.Exception: Could not perform checkpoint 33 for
> operator
> Sink: S3 - Instant (1/4).
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:595)
>         at
>
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:396)
>         at
>
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:292)
>         at
>
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:200)
>         at
> org.apache.flink.streaming.runtime.io
> .StreamInputProcessor.processInput(StreamInputProcessor.java:209)
>         at
>
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not complete snapshot 33 for operator
> Sink: S3 - Instant (1/4).
>         at
>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:422)
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1113)
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1055)
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:729)
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:641)
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:586)
>         ... 8 more
> Caused by: java.io.IOException: Uploading parts failed
>         at
>
> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.awaitPendingPartUploadToComplete(RecoverableMultiPartUploadImpl.java:231)
>         at
>
> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.awaitPendingPartsUpload(RecoverableMultiPartUploadImpl.java:215)
>         at
>
> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetRecoverable(RecoverableMultiPartUploadImpl.java:151)
>         at
>
> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetCommitter(RecoverableMultiPartUploadImpl.java:123)
>         at
>
> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetCommitter(RecoverableMultiPartUploadImpl.java:56)
>         at
>
> org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.closeForCommit(S3RecoverableFsDataOutputStream.java:167)
>         at
>
> org.apache.flink.streaming.api.functions.sink.filesystem.PartFileWriter.closeForCommit(PartFileWriter.java:71)
>         at
>
> org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.closeForCommit(BulkPartWriter.java:63)
>         at
>
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.closePartFile(Bucket.java:239)
>         at
>
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.prepareBucketForCheckpointing(Bucket.java:280)
>         at
>
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.onReceptionOfCheckpoint(Bucket.java:253)
>         at
>
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.snapshotActiveBuckets(Buckets.java:244)
>         at
>
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.snapshotState(Buckets.java:235)
>         at
>
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.snapshotState(StreamingFileSink.java:347)
>         at
>
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
>         at
>
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
>         at
>
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
>         at
>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:395)
>         ... 13 more
> Caused by: java.io.InterruptedIOException: upload part on
> output/instant/dt=2019-01-09/part-0-24:
> org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: Unable
> to execute HTTP request: Timeout waiting for connection from pool
>         at
>
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils.translateInterruptedException(S3AUtils.java:340)
>         at
>
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:171)
>         at
>
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
>         at
>
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
>         at
>
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
>         at
>
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
>         at
>
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
>         at
>
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123)
>         at
>
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.uploadPart(WriteOperationHelper.java:471)
>         at
>
> org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.uploadPart(HadoopS3AccessHelper.java:73)
>         at
>
> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl$UploadTask.run(RecoverableMultiPartUploadImpl.java:318)
>         at
>
> org.apache.flink.fs.s3.common.utils.BackPressuringExecutor$SemaphoreReleasingRunnable.run(BackPressuringExecutor.java:92)
>         at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         ... 1 more
> Caused by:
> org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: Unable
> to execute HTTP request: Timeout waiting for connection from pool
>         at
>
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1114)
>         at
>
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1064)
>         at
>
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
>         at
>
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
>         at
>
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
>         at
>
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
>         at
>
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
>         at
>
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
>         at
>
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
>         at
>
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
>         at
>
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3306)
>         at
>
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3291)
>         at
>
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:1576)
>         at
>
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$uploadPart$8(WriteOperationHelper.java:474)
>         at
>
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
>         ... 12 more
> Caused by:
>
> org.apache.flink.fs.s3base.shaded.org.apache.http.conn.ConnectionPoolTimeoutException:
> Timeout waiting for connection from pool
>         at
>
> org.apache.flink.fs.s3base.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:292)
>         at
>
> org.apache.flink.fs.s3base.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionManager.java:269)
>         at sun.reflect.GeneratedMethodAccessor58.invoke(Unknown Source)
>         at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at
>
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70)
>         at
>
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.conn.$Proxy29.get(Unknown
> Source)
>         at
>
> org.apache.flink.fs.s3base.shaded.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:191)
>         at
>
> org.apache.flink.fs.s3base.shaded.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185)
>         at
>
> org.apache.flink.fs.s3base.shaded.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
>         at
>
> org.apache.flink.fs.s3base.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
>         at
>
> org.apache.flink.fs.s3base.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
>         at
>
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
>         at
>
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1236)
>         at
>
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)
>         ... 25 more
> /
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>