You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Till Rohrmann <tr...@apache.org> on 2020/10/01 09:21:20 UTC

Re: Blobserver dying mid-application

Hi Andreas,

do the logs of the JM contain any information?

Theoretically, each task submission to a `TaskExecutor` can trigger a new
connection to the BlobServer. This depends a bit on how large your
TaskInformation is and whether this information is being offloaded to the
BlobServer. What you can definitely try to do is to increase the
blob.fetch.backlog in order to see whether this solves the problem.

How many jobs and in with what timeline do you submit them to the Flink
cluster? Maybe you can share a bit more details about the application you
are running.

Cheers,
Till

On Wed, Sep 30, 2020 at 11:49 PM Hailu, Andreas <An...@gs.com>
wrote:

> Hello folks, I’m seeing application failures where our Blobserver is
> refusing connections mid application:
>
>
>
> 2020-09-30 13:56:06,227 INFO  [flink-akka.actor.default-dispatcher-18]
> org.apache.flink.runtime.taskexecutor.TaskExecutor            -
> Un-registering task and sending final execution state FINISHED to
> JobManager for task DataSink (TextOutputFormat
> (hdfs:/user/p2epda/lake/delp_prod/PROD/APPROVED/staging/datastore/MandateTradingLine/tmp_pipeline_collapse)
> - UTF-8) 3d1890b47f4398d805cf0c1b54286f71.
>
> 2020-09-30 13:56:06,423 INFO  [flink-akka.actor.default-dispatcher-18]
> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable      - Free slot
> TaskSlot(index:0, state:ACTIVE, resource profile:
> ResourceProfile{cpuCores=1.7976931348623157E308, heapMemoryInMB=2147483647,
> directMemoryInMB=2147483647, nativeMemoryInMB=2147483647,
> networkMemoryInMB=2147483647, managedMemoryInMB=3046}, allocationId:
> e8be16ec74f16c795d95b89cd08f5c37, jobId: e808de0373bd515224434b7ec1efe249).
>
> 2020-09-30 13:56:06,424 INFO  [flink-akka.actor.default-dispatcher-18]
> org.apache.flink.runtime.taskexecutor.JobLeaderService        - Remove job
> e808de0373bd515224434b7ec1efe249 from job leader monitoring.
>
> 2020-09-30 13:56:06,424 INFO  [flink-akka.actor.default-dispatcher-18]
> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close
> JobManager connection for job e808de0373bd515224434b7ec1efe249.
>
> 2020-09-30 13:56:06,426 INFO  [flink-akka.actor.default-dispatcher-18]
> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close
> JobManager connection for job e808de0373bd515224434b7ec1efe249.
>
> 2020-09-30 13:56:06,426 INFO  [flink-akka.actor.default-dispatcher-18]
> org.apache.flink.runtime.taskexecutor.JobLeaderService        - Cannot
> reconnect to job e808de0373bd515224434b7ec1efe249 because it is not
> registered.
>
> 2020-09-30 13:56:09,918 INFO  [CHAIN DataSource (dataset | Read Staging
> From File System | AVRO) -> Map (Map at
> readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter at
> validateData(DAXTask.java:97)) -> FlatMap (FlatMap at
> handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at
> collapsePipelineIfRequired(Task.java:160)) (1/1)]
> org.apache.flink.runtime.blob.BlobClient                      - Downloading
> 48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
> from d43723-430.dc.gs.com/10.48.128.14:46473 (retry 3)
>
> 2020-09-30 13:56:09,920 ERROR [CHAIN DataSource (dataset | Read Staging
> From File System | AVRO) -> Map (Map at
> readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter at
> validateData(DAXTask.java:97)) -> FlatMap (FlatMap at
> handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at
> collapsePipelineIfRequired(Task.java:160)) (1/1)]
> org.apache.flink.runtime.blob.BlobClient                      - Failed to
> fetch BLOB
> 48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
> from d43723-430.dc.gs.com/10.48.128.14:46473 and store it under
> /fs/htmp/yarn/local/usercache/delp_prod/appcache/application_1599723434208_15328880/blobStore-e2888df1-c7be-4ce6-b6b6-58e7c24a140a/incoming/temp-00000004
> Retrying...
>
> 2020-09-30 13:56:09,920 INFO  [CHAIN DataSource (dataset | Read Staging
> From File System | AVRO) -> Map (Map at
> readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter at
> validateData(DAXTask.java:97)) -> FlatMap (FlatMap at
> handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at
> collapsePipelineIfRequired(Task.java:160)) (1/1)]
> org.apache.flink.runtime.blob.BlobClient                      - Downloading
> 48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
> from d43723-430.dc.gs.com/10.48.128.14:46473 (retry 4)
>
> 2020-09-30 13:56:09,922 ERROR [CHAIN DataSource (dataset | Read Staging
> From File System | AVRO) -> Map (Map at
> readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter at
> validateData(DAXTask.java:97)) -> FlatMap (FlatMap at
> handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at
> collapsePipelineIfRequired(Task.java:160)) (1/1)]
> org.apache.flink.runtime.blob.BlobClient                      - Failed to
> fetch BLOB
> 48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
> from d43723-430.dc.gs.com/10.48.128.14:46473 and store it under
> /fs/htmp/yarn/local/usercache/delp_prod/appcache/application_1599723434208_15328880/blobStore-e2888df1-c7be-4ce6-b6b6-58e7c24a140a/incoming/temp-00000004
> Retrying...
>
> 2020-09-30 13:56:09,922 INFO  [CHAIN DataSource (dataset | Read Staging
> From File System | AVRO) -> Map (Map at
> readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter at
> validateData(DAXTask.java:97)) -> FlatMap (FlatMap at
> handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at
> collapsePipelineIfRequired(Task.java:160)) (1/1)]
> org.apache.flink.runtime.blob.BlobClient                      - Downloading
> 48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
> from d43723-430.dc.gs.com/10.48.128.14:46473 (retry 5)
>
> 2020-09-30 13:56:09,925 ERROR [CHAIN DataSource (dataset | Read Staging
> From File System | AVRO) -> Map (Map at
> readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter at
> validateData(DAXTask.java:97)) -> FlatMap (FlatMap at
> handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at
> collapsePipelineIfRequired(Task.java:160)) (1/1)]
> org.apache.flink.runtime.blob.BlobClient                      - Failed to
> fetch BLOB
> 48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
> from d43723-430.dc.gs.com/10.48.128.14:46473 and store it under
> /fs/htmp/yarn/local/usercache/delp_prod/appcache/application_1599723434208_15328880/blobStore-e2888df1-c7be-4ce6-b6b6-58e7c24a140a/incoming/temp-00000004
> No retries left.
>
> java.io.IOException: Could not connect to BlobServer at address
> d43723-430.dc.gs.com/10.48.128.14:46473
>
>                 at
> org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:100)
>
>                 at
> org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:143)
>
>                 at
> org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:181)
>
>                 at
> org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:202)
>
>                 at
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120)
>
>                 at
> org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:915)
>
>                 at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:595)
>
>                 at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>
>                 at java.lang.Thread.run(Thread.java:745)
>
> Caused by: java.net.ConnectException: Connection refused (Connection
> refused)
>
>                 at java.net.PlainSocketImpl.socketConnect(Native Method)
>
>                 at
> java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
>
>                 at
> java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
>
>                 at
> java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
>
>                 at
> java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
>
>                 at java.net.Socket.connect(Socket.java:589)
>
>                 at
> org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:95)
>
>                 ... 8 more
>
>
>
> Prior to the above connection refused error, I don’t see any exceptions or
> failures. We’re running this application with v1.9.2 on YARN, 26 Task
> Managers with 2 cores each, and with the default BLOB server
> configurations. The application itself then has many jobs it submits to the
> cluster. Does this sound like a blob.fetch.backlog/concurrent-connections
> config problem (defaulted to 1000 and 50 respectively)? I wasn’t sure how
> chatty each TM is with the server. How can we tell if it’s either a max
> concurrent-conn or backlog problem?
>
>
>
> Best,
>
> Andreas
>
> ------------------------------
>
> Your Personal Data: We may collect and process information about you that
> may be subject to data protection laws. For more information about how we
> use and disclose your personal data, how we protect your information, our
> legal basis to use your information, your rights and who you can contact,
> please refer to: www.gs.com/privacy-notices
>

Re: Blobserver dying mid-application

Posted by Till Rohrmann <tr...@apache.org>.
Hi Andreas,

yes two Flink session clusters won't share the same BlobServer.

Is the problem easily reproducible? If yes, then it could be very helpful
to monitor the backlog length as Chesnay suggested.

One more piece of information is that we create a new TCP connection for
every blob we are downloading from the BlobServer. So if a task needs
multiple jars to run, then every jar is downloaded via its own connection.
However, this should happen sequentially.

Cheers,
Till



On Thu, Oct 1, 2020 at 10:34 PM Hailu, Andreas <An...@gs.com> wrote:

> @Chesnay:
>
> I see. I actually had a separate thread with Robert Metzger ago regarding
> connection-related issues we’re plagued with at higher parallelisms, and
> his guidance lead us to look into our somaxconn config. We increased it
> from 128 to 1024 in early September. We use the same generic JAR for all of
> our apps, so I don’t think JAR size is the cause. Just so I’m clear: when
> you say Flink session cluster – if we have 2 independent Flink applications
>  A & B with JobManagers that just happen to be running on the same
> DataNode, they don’t share Blobservers, right?
>
>
>
> In regard to historical behavior, no, I haven’t seen these Blobserver
> connection problems until after the somaxconn config change. From an app
> perspective, the only way these ones are different is that they’re wide
> rather than deep i.e. large # of jobs to submit instead of a small handful
> of jobs with large amounts of data to process. If we have many jobs to
> submit, as soon as one is complete, we’re trying to submit the next.
>
>
>
> I saw an example today of an application using 10 TaskManagers w/ 2 slots
> with a total 194 jobs to submit with at most 20 running in parallel fail
> with the same error. I’m happy to try increasing both the concurrent
> connections and backlog to 128 and 2048 respectively, but I still can’t
> make sense of how a backlog of 1,000 connections is being met by 10 Task
> Managers/20 connections at worst.
>
>
>
> $ sysctl -a | grep net.core.somaxconn
>
> net.core.somaxconn = 1024
>
>
>
> *// *ah
>
>
>
> *From:* Chesnay Schepler <ch...@apache.org>
> *Sent:* Thursday, October 1, 2020 1:41 PM
> *To:* Hailu, Andreas [Engineering] <An...@ny.email.gs.com>; Till
> Rohrmann <tr...@apache.org>
> *Cc:* user@flink.apache.org; Nico Kruber <ni...@ververica.com>
> *Subject:* Re: Blobserver dying mid-application
>
>
>
> All jobs running in a Flink session cluster talk to the same blob server.
>
>
>
> The time when tasks are submitted depends on the job; for streaming jobs
> all tasks are deployed when the job starts running; in case of batch jobs
> the submission can be staggered.
>
>
>
> I'm only aware of 2 cases where we transfer data via the blob server;
>
> a) retrieval of jars required for the user code to run  (this is what you
> see in the stack trace)
>
> b) retrieval of TaskInformation, which _should_ only happen if your job is
> quite large, but let's assume it does.
>
>
>
> For a) there should be at most numberOfSlots * numberOfTaskExecutors
> concurrent connections, in the worst case of each slot working on a
> different job, as each would download the jars for their respective job. If
> multiple slots are used for the same job at the same time, then the job jar
> is only retrieved once.
>
>
>
> For b) the limit should also be numberOfSlots * numberOfTaskExecutors; it
> is done once per task, and there are only so many tasks that can run at the
> same time.
>
>
>
> Thus from what I can tell there should be at most 104 (26 task executors *
> 2 slots * 2) concurrent attempts, of which only 54 should land in the
> backlog.
>
>
>
> Did you run into this issue before?
>
> If not, is this application different than your existing applications? Is
> the jar particularly big, jobs particularly short running or more complex
> than others.
>
>
>
> One thing to note is that the backlog relies entirely on OS functionality,
> which can be subject to an upper limit enforced by the OS.
>
> The configured backlog size is just a hint to the OS, but it may ignore
> it; it appears that 128 is not an uncommon upper limit, but maybe there are
> lower settings out there.
>
> You can check this limit via sysctl -a | grep net.core.somaxconn
>
> Maybe this value is set to 0, effectively disabling the backlog?
>
>
>
> It may also be worthwhile to monitor the number of such connections. (netstat
> -ant | grep -c SYN_REC)
>
>
>
> @Nico Do you have any ideas?
>
>
>
> On 10/1/2020 6:26 PM, Hailu, Andreas wrote:
>
> Hi Chesnay, Till, thanks for responding.
>
>
>
> @Chesnay:
>
> Apologies, I said cores when I meant slots J So a total of 26 Task
> managers with 2 slots each for a grand total of 52 parallelism.
>
>
>
> @Till:
>
> For this application, we have a grand total of 78 jobs, with some of them
> demanding more parallelism than others. Each job has multiple operators –
> depending on the size of the data we’re operating on, we could submit 1
> whopper with 52 parallelism, or multiple smaller jobs submitted in parallel
> with a sum of 52 parallelism. When does a task submission to a
> `TaskExecutor` take place? Is that on job submission or something else? I’m
> just curious as a parallelism of 52 seems on the lower side to breach 1K
> connections in the queue, unless interactions with the Blobserver are much
> more frequent than I think. Is it possible that separate Flink jobs share
> the same Blobserver? Because we have thousands of Flink applications
> running concurrently in our YARN cluster.
>
>
>
> *// *ah
>
>
>
> *From:* Chesnay Schepler <ch...@apache.org> <ch...@apache.org>
> *Sent:* Thursday, October 1, 2020 5:42 AM
> *To:* Till Rohrmann <tr...@apache.org> <tr...@apache.org>; Hailu,
> Andreas [Engineering] <An...@ny.email.gs.com>
> <An...@ny.email.gs.com>
> *Cc:* user@flink.apache.org
> *Subject:* Re: Blobserver dying mid-application
>
>
>
> It would also be good to know how many slots you have on each task
> executor.
>
>
>
> On 10/1/2020 11:21 AM, Till Rohrmann wrote:
>
> Hi Andreas,
>
>
>
> do the logs of the JM contain any information?
>
>
>
> Theoretically, each task submission to a `TaskExecutor` can trigger a new
> connection to the BlobServer. This depends a bit on how large your
> TaskInformation is and whether this information is being offloaded to the
> BlobServer. What you can definitely try to do is to increase the
> blob.fetch.backlog in order to see whether this solves the problem.
>
>
>
> How many jobs and in with what timeline do you submit them to the Flink
> cluster? Maybe you can share a bit more details about the application you
> are running.
>
>
>
> Cheers,
>
> Till
>
>
>
> On Wed, Sep 30, 2020 at 11:49 PM Hailu, Andreas <An...@gs.com>
> wrote:
>
> Hello folks, I’m seeing application failures where our Blobserver is
> refusing connections mid application:
>
>
>
> 2020-09-30 13:56:06,227 INFO  [flink-akka.actor.default-dispatcher-18]
> org.apache.flink.runtime.taskexecutor.TaskExecutor            -
> Un-registering task and sending final execution state FINISHED to
> JobManager for task DataSink (TextOutputFormat
> (hdfs:/user/p2epda/lake/delp_prod/PROD/APPROVED/staging/datastore/MandateTradingLine/tmp_pipeline_collapse)
> - UTF-8) 3d1890b47f4398d805cf0c1b54286f71.
>
> 2020-09-30 13:56:06,423 INFO  [flink-akka.actor.default-dispatcher-18]
> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable      - Free slot
> TaskSlot(index:0, state:ACTIVE, resource profile:
> ResourceProfile{cpuCores=1.7976931348623157E308, heapMemoryInMB=2147483647,
> directMemoryInMB=2147483647, nativeMemoryInMB=2147483647,
> networkMemoryInMB=2147483647, managedMemoryInMB=3046}, allocationId:
> e8be16ec74f16c795d95b89cd08f5c37, jobId: e808de0373bd515224434b7ec1efe249).
>
> 2020-09-30 13:56:06,424 INFO  [flink-akka.actor.default-dispatcher-18]
> org.apache.flink.runtime.taskexecutor.JobLeaderService        - Remove job
> e808de0373bd515224434b7ec1efe249 from job leader monitoring.
>
> 2020-09-30 13:56:06,424 INFO  [flink-akka.actor.default-dispatcher-18]
> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close
> JobManager connection for job e808de0373bd515224434b7ec1efe249.
>
> 2020-09-30 13:56:06,426 INFO  [flink-akka.actor.default-dispatcher-18]
> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close
> JobManager connection for job e808de0373bd515224434b7ec1efe249.
>
> 2020-09-30 13:56:06,426 INFO  [flink-akka.actor.default-dispatcher-18]
> org.apache.flink.runtime.taskexecutor.JobLeaderService        - Cannot
> reconnect to job e808de0373bd515224434b7ec1efe249 because it is not
> registered.
>
> 2020-09-30 13:56:09,918 INFO  [CHAIN DataSource (dataset | Read Staging
> From File System | AVRO) -> Map (Map at
> readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter at
> validateData(DAXTask.java:97)) -> FlatMap (FlatMap at
> handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at
> collapsePipelineIfRequired(Task.java:160)) (1/1)]
> org.apache.flink.runtime.blob.BlobClient                      - Downloading
> 48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
> from d43723-430.dc.gs.com/10.48.128.14:46473 (retry 3)
>
> 2020-09-30 13:56:09,920 ERROR [CHAIN DataSource (dataset | Read Staging
> From File System | AVRO) -> Map (Map at
> readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter at
> validateData(DAXTask.java:97)) -> FlatMap (FlatMap at
> handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at
> collapsePipelineIfRequired(Task.java:160)) (1/1)]
> org.apache.flink.runtime.blob.BlobClient                      - Failed to
> fetch BLOB
> 48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
> from d43723-430.dc.gs.com/10.48.128.14:46473 and store it under
> /fs/htmp/yarn/local/usercache/delp_prod/appcache/application_1599723434208_15328880/blobStore-e2888df1-c7be-4ce6-b6b6-58e7c24a140a/incoming/temp-00000004
> Retrying...
>
> 2020-09-30 13:56:09,920 INFO  [CHAIN DataSource (dataset | Read Staging
> From File System | AVRO) -> Map (Map at
> readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter at
> validateData(DAXTask.java:97)) -> FlatMap (FlatMap at
> handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at
> collapsePipelineIfRequired(Task.java:160)) (1/1)]
> org.apache.flink.runtime.blob.BlobClient                      - Downloading
> 48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
> from d43723-430.dc.gs.com/10.48.128.14:46473 (retry 4)
>
> 2020-09-30 13:56:09,922 ERROR [CHAIN DataSource (dataset | Read Staging
> From File System | AVRO) -> Map (Map at
> readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter at
> validateData(DAXTask.java:97)) -> FlatMap (FlatMap at
> handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at
> collapsePipelineIfRequired(Task.java:160)) (1/1)]
> org.apache.flink.runtime.blob.BlobClient                      - Failed to
> fetch BLOB
> 48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
> from d43723-430.dc.gs.com/10.48.128.14:46473 and store it under
> /fs/htmp/yarn/local/usercache/delp_prod/appcache/application_1599723434208_15328880/blobStore-e2888df1-c7be-4ce6-b6b6-58e7c24a140a/incoming/temp-00000004
> Retrying...
>
> 2020-09-30 13:56:09,922 INFO  [CHAIN DataSource (dataset | Read Staging
> From File System | AVRO) -> Map (Map at
> readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter at
> validateData(DAXTask.java:97)) -> FlatMap (FlatMap at
> handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at
> collapsePipelineIfRequired(Task.java:160)) (1/1)]
> org.apache.flink.runtime.blob.BlobClient                      - Downloading
> 48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
> from d43723-430.dc.gs.com/10.48.128.14:46473 (retry 5)
>
> 2020-09-30 13:56:09,925 ERROR [CHAIN DataSource (dataset | Read Staging
> From File System | AVRO) -> Map (Map at
> readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter at
> validateData(DAXTask.java:97)) -> FlatMap (FlatMap at
> handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at
> collapsePipelineIfRequired(Task.java:160)) (1/1)]
> org.apache.flink.runtime.blob.BlobClient                      - Failed to
> fetch BLOB
> 48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
> from d43723-430.dc.gs.com/10.48.128.14:46473 and store it under
> /fs/htmp/yarn/local/usercache/delp_prod/appcache/application_1599723434208_15328880/blobStore-e2888df1-c7be-4ce6-b6b6-58e7c24a140a/incoming/temp-00000004
> No retries left.
>
> java.io.IOException: Could not connect to BlobServer at address
> d43723-430.dc.gs.com/10.48.128.14:46473
>
>                 at
> org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:100)
>
>                 at
> org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:143)
>
>                 at
> org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:181)
>
>                 at
> org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:202)
>
>                 at
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120)
>
>                 at
> org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:915)
>
>                 at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:595)
>
>                 at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>
>                 at java.lang.Thread.run(Thread.java:745)
>
> Caused by: java.net.ConnectException: Connection refused (Connection
> refused)
>
>                 at java.net.PlainSocketImpl.socketConnect(Native Method)
>
>                 at
> java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
>
>                 at
> java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
>
>                 at
> java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
>
>                 at
> java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
>
>                 at java.net.Socket.connect(Socket.java:589)
>
>                 at
> org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:95)
>
>                 ... 8 more
>
>
>
> Prior to the above connection refused error, I don’t see any exceptions or
> failures. We’re running this application with v1.9.2 on YARN, 26 Task
> Managers with 2 cores each, and with the default BLOB server
> configurations. The application itself then has many jobs it submits to the
> cluster. Does this sound like a blob.fetch.backlog/concurrent-connections
> config problem (defaulted to 1000 and 50 respectively)? I wasn’t sure how
> chatty each TM is with the server. How can we tell if it’s either a max
> concurrent-conn or backlog problem?
>
>
>
> Best,
>
> Andreas
>
>
> ------------------------------
>
>
> Your Personal Data: We may collect and process information about you that
> may be subject to data protection laws. For more information about how we
> use and disclose your personal data, how we protect your information, our
> legal basis to use your information, your rights and who you can contact,
> please refer to: www.gs.com/privacy-notices
>
>
>
>
> ------------------------------
>
>
> Your Personal Data: We may collect and process information about you that
> may be subject to data protection laws. For more information about how we
> use and disclose your personal data, how we protect your information, our
> legal basis to use your information, your rights and who you can contact,
> please refer to: www.gs.com/privacy-notices
>
>
>
> ------------------------------
>
> Your Personal Data: We may collect and process information about you that
> may be subject to data protection laws. For more information about how we
> use and disclose your personal data, how we protect your information, our
> legal basis to use your information, your rights and who you can contact,
> please refer to: www.gs.com/privacy-notices
>

RE: Blobserver dying mid-application

Posted by "Hailu, Andreas" <An...@gs.com>.
@Chesnay:
I see. I actually had a separate thread with Robert Metzger ago regarding connection-related issues we’re plagued with at higher parallelisms, and his guidance lead us to look into our somaxconn config. We increased it from 128 to 1024 in early September. We use the same generic JAR for all of our apps, so I don’t think JAR size is the cause. Just so I’m clear: when you say Flink session cluster – if we have 2 independent Flink applications  A & B with JobManagers that just happen to be running on the same DataNode, they don’t share Blobservers, right?

In regard to historical behavior, no, I haven’t seen these Blobserver connection problems until after the somaxconn config change. From an app perspective, the only way these ones are different is that they’re wide rather than deep i.e. large # of jobs to submit instead of a small handful of jobs with large amounts of data to process. If we have many jobs to submit, as soon as one is complete, we’re trying to submit the next.

I saw an example today of an application using 10 TaskManagers w/ 2 slots with a total 194 jobs to submit with at most 20 running in parallel fail with the same error. I’m happy to try increasing both the concurrent connections and backlog to 128 and 2048 respectively, but I still can’t make sense of how a backlog of 1,000 connections is being met by 10 Task Managers/20 connections at worst.

$ sysctl -a | grep net.core.somaxconn
net.core.somaxconn = 1024

// ah

From: Chesnay Schepler <ch...@apache.org>
Sent: Thursday, October 1, 2020 1:41 PM
To: Hailu, Andreas [Engineering] <An...@ny.email.gs.com>; Till Rohrmann <tr...@apache.org>
Cc: user@flink.apache.org; Nico Kruber <ni...@ververica.com>
Subject: Re: Blobserver dying mid-application

All jobs running in a Flink session cluster talk to the same blob server.

The time when tasks are submitted depends on the job; for streaming jobs all tasks are deployed when the job starts running; in case of batch jobs the submission can be staggered.

I'm only aware of 2 cases where we transfer data via the blob server;
a) retrieval of jars required for the user code to run  (this is what you see in the stack trace)
b) retrieval of TaskInformation, which _should_ only happen if your job is quite large, but let's assume it does.

For a) there should be at most numberOfSlots * numberOfTaskExecutors concurrent connections, in the worst case of each slot working on a different job, as each would download the jars for their respective job. If multiple slots are used for the same job at the same time, then the job jar is only retrieved once.

For b) the limit should also be numberOfSlots * numberOfTaskExecutors; it is done once per task, and there are only so many tasks that can run at the same time.

Thus from what I can tell there should be at most 104 (26 task executors * 2 slots * 2) concurrent attempts, of which only 54 should land in the backlog.

Did you run into this issue before?
If not, is this application different than your existing applications? Is the jar particularly big, jobs particularly short running or more complex than others.

One thing to note is that the backlog relies entirely on OS functionality, which can be subject to an upper limit enforced by the OS.
The configured backlog size is just a hint to the OS, but it may ignore it; it appears that 128 is not an uncommon upper limit, but maybe there are lower settings out there.
You can check this limit via sysctl -a | grep net.core.somaxconn
Maybe this value is set to 0, effectively disabling the backlog?

It may also be worthwhile to monitor the number of such connections. (netstat -ant | grep -c SYN_REC)

@Nico Do you have any ideas?

On 10/1/2020 6:26 PM, Hailu, Andreas wrote:
Hi Chesnay, Till, thanks for responding.

@Chesnay:
Apologies, I said cores when I meant slots ☺ So a total of 26 Task managers with 2 slots each for a grand total of 52 parallelism.

@Till:
For this application, we have a grand total of 78 jobs, with some of them demanding more parallelism than others. Each job has multiple operators – depending on the size of the data we’re operating on, we could submit 1 whopper with 52 parallelism, or multiple smaller jobs submitted in parallel with a sum of 52 parallelism. When does a task submission to a `TaskExecutor` take place? Is that on job submission or something else? I’m just curious as a parallelism of 52 seems on the lower side to breach 1K connections in the queue, unless interactions with the Blobserver are much more frequent than I think. Is it possible that separate Flink jobs share the same Blobserver? Because we have thousands of Flink applications running concurrently in our YARN cluster.

// ah

From: Chesnay Schepler <ch...@apache.org>
Sent: Thursday, October 1, 2020 5:42 AM
To: Till Rohrmann <tr...@apache.org>; Hailu, Andreas [Engineering] <An...@ny.email.gs.com>
Cc: user@flink.apache.org<ma...@flink.apache.org>
Subject: Re: Blobserver dying mid-application

It would also be good to know how many slots you have on each task executor.

On 10/1/2020 11:21 AM, Till Rohrmann wrote:
Hi Andreas,

do the logs of the JM contain any information?

Theoretically, each task submission to a `TaskExecutor` can trigger a new connection to the BlobServer. This depends a bit on how large your TaskInformation is and whether this information is being offloaded to the BlobServer. What you can definitely try to do is to increase the blob.fetch.backlog in order to see whether this solves the problem.

How many jobs and in with what timeline do you submit them to the Flink cluster? Maybe you can share a bit more details about the application you are running.

Cheers,
Till

On Wed, Sep 30, 2020 at 11:49 PM Hailu, Andreas <An...@gs.com>> wrote:
Hello folks, I’m seeing application failures where our Blobserver is refusing connections mid application:

2020-09-30 13:56:06,227 INFO  [flink-akka.actor.default-dispatcher-18] org.apache.flink.runtime.taskexecutor.TaskExecutor            - Un-registering task and sending final execution state FINISHED to JobManager for task DataSink (TextOutputFormat (hdfs:/user/p2epda/lake/delp_prod/PROD/APPROVED/staging/datastore/MandateTradingLine/tmp_pipeline_collapse) - UTF-8) 3d1890b47f4398d805cf0c1b54286f71.
2020-09-30 13:56:06,423 INFO  [flink-akka.actor.default-dispatcher-18] org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable      - Free slot TaskSlot(index:0, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.7976931348623157E308, heapMemoryInMB=2147483647, directMemoryInMB=2147483647, nativeMemoryInMB=2147483647, networkMemoryInMB=2147483647, managedMemoryInMB=3046}, allocationId: e8be16ec74f16c795d95b89cd08f5c37, jobId: e808de0373bd515224434b7ec1efe249).
2020-09-30 13:56:06,424 INFO  [flink-akka.actor.default-dispatcher-18] org.apache.flink.runtime.taskexecutor.JobLeaderService        - Remove job e808de0373bd515224434b7ec1efe249 from job leader monitoring.
2020-09-30 13:56:06,424 INFO  [flink-akka.actor.default-dispatcher-18] org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close JobManager connection for job e808de0373bd515224434b7ec1efe249.
2020-09-30 13:56:06,426 INFO  [flink-akka.actor.default-dispatcher-18] org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close JobManager connection for job e808de0373bd515224434b7ec1efe249.
2020-09-30 13:56:06,426 INFO  [flink-akka.actor.default-dispatcher-18] org.apache.flink.runtime.taskexecutor.JobLeaderService        - Cannot reconnect to job e808de0373bd515224434b7ec1efe249 because it is not registered.
2020-09-30 13:56:09,918 INFO  [CHAIN DataSource (dataset | Read Staging From File System | AVRO) -> Map (Map at readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter at validateData(DAXTask.java:97)) -> FlatMap (FlatMap at handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at collapsePipelineIfRequired(Task.java:160)) (1/1)] org.apache.flink.runtime.blob.BlobClient                      - Downloading 48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39 from d43723-430.dc.gs.com/10.48.128.14:46473<http://d43723-430.dc.gs.com/10.48.128.14:46473> (retry 3)
2020-09-30 13:56:09,920 ERROR [CHAIN DataSource (dataset | Read Staging From File System | AVRO) -> Map (Map at readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter at validateData(DAXTask.java:97)) -> FlatMap (FlatMap at handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at collapsePipelineIfRequired(Task.java:160)) (1/1)] org.apache.flink.runtime.blob.BlobClient                      - Failed to fetch BLOB 48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39 from d43723-430.dc.gs.com/10.48.128.14:46473<http://d43723-430.dc.gs.com/10.48.128.14:46473> and store it under /fs/htmp/yarn/local/usercache/delp_prod/appcache/application_1599723434208_15328880/blobStore-e2888df1-c7be-4ce6-b6b6-58e7c24a140a/incoming/temp-00000004 Retrying...
2020-09-30 13:56:09,920 INFO  [CHAIN DataSource (dataset | Read Staging From File System | AVRO) -> Map (Map at readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter at validateData(DAXTask.java:97)) -> FlatMap (FlatMap at handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at collapsePipelineIfRequired(Task.java:160)) (1/1)] org.apache.flink.runtime.blob.BlobClient                      - Downloading 48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39 from d43723-430.dc.gs.com/10.48.128.14:46473<http://d43723-430.dc.gs.com/10.48.128.14:46473> (retry 4)
2020-09-30 13:56:09,922 ERROR [CHAIN DataSource (dataset | Read Staging From File System | AVRO) -> Map (Map at readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter at validateData(DAXTask.java:97)) -> FlatMap (FlatMap at handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at collapsePipelineIfRequired(Task.java:160)) (1/1)] org.apache.flink.runtime.blob.BlobClient                      - Failed to fetch BLOB 48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39 from d43723-430.dc.gs.com/10.48.128.14:46473<http://d43723-430.dc.gs.com/10.48.128.14:46473> and store it under /fs/htmp/yarn/local/usercache/delp_prod/appcache/application_1599723434208_15328880/blobStore-e2888df1-c7be-4ce6-b6b6-58e7c24a140a/incoming/temp-00000004 Retrying...
2020-09-30 13:56:09,922 INFO  [CHAIN DataSource (dataset | Read Staging From File System | AVRO) -> Map (Map at readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter at validateData(DAXTask.java:97)) -> FlatMap (FlatMap at handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at collapsePipelineIfRequired(Task.java:160)) (1/1)] org.apache.flink.runtime.blob.BlobClient                      - Downloading 48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39 from d43723-430.dc.gs.com/10.48.128.14:46473<http://d43723-430.dc.gs.com/10.48.128.14:46473> (retry 5)
2020-09-30 13:56:09,925 ERROR [CHAIN DataSource (dataset | Read Staging From File System | AVRO) -> Map (Map at readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter at validateData(DAXTask.java:97)) -> FlatMap (FlatMap at handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at collapsePipelineIfRequired(Task.java:160)) (1/1)] org.apache.flink.runtime.blob.BlobClient                      - Failed to fetch BLOB 48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39 from d43723-430.dc.gs.com/10.48.128.14:46473<http://d43723-430.dc.gs.com/10.48.128.14:46473> and store it under /fs/htmp/yarn/local/usercache/delp_prod/appcache/application_1599723434208_15328880/blobStore-e2888df1-c7be-4ce6-b6b6-58e7c24a140a/incoming/temp-00000004 No retries left.
java.io.IOException: Could not connect to BlobServer at address d43723-430.dc.gs.com/10.48.128.14:46473<http://d43723-430.dc.gs.com/10.48.128.14:46473>
                at org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:100)
                at org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:143)
                at org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:181)
                at org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:202)
                at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120)
                at org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:915)
                at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:595)
                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
                at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused (Connection refused)
                at java.net.PlainSocketImpl.socketConnect(Native Method)
                at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
                at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
                at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
                at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
                at java.net.Socket.connect(Socket.java:589)
                at org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:95)
                ... 8 more

Prior to the above connection refused error, I don’t see any exceptions or failures. We’re running this application with v1.9.2 on YARN, 26 Task Managers with 2 cores each, and with the default BLOB server configurations. The application itself then has many jobs it submits to the cluster. Does this sound like a blob.fetch.backlog/concurrent-connections config problem (defaulted to 1000 and 50 respectively)? I wasn’t sure how chatty each TM is with the server. How can we tell if it’s either a max concurrent-conn or backlog problem?

Best,
Andreas

________________________________

Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>



________________________________

Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>



________________________________

Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>

Re: Blobserver dying mid-application

Posted by Chesnay Schepler <ch...@apache.org>.
All jobs running in a Flink session cluster talk to the same blob server.

The time when tasks are submitted depends on the job; for streaming jobs 
all tasks are deployed when the job starts running; in case of batch 
jobs the submission can be staggered.

I'm only aware of 2 cases where we transfer data via the blob server;
a) retrieval of jars required for the user code to run  (this is what 
you see in the stack trace)
b) retrieval of TaskInformation, which _should_ only happen if your job 
is quite large, but let's assume it does.

For a) there should be at most numberOfSlots * numberOfTaskExecutors 
concurrent connections, in the worst case of each slot working on a 
different job, as each would download the jars for their respective job. 
If multiple slots are used for the same job at the same time, then the 
job jar is only retrieved once.

For b) the limit should also be numberOfSlots * numberOfTaskExecutors; 
it is done once per task, and there are only so many tasks that can run 
at the same time.

Thus from what I can tell there should be at most 104 (26 task executors 
* 2 slots * 2) concurrent attempts, of which only 54 should land in the 
backlog.

Did you run into this issue before?
If not, is this application different than your existing applications? 
Is the jar particularly big, jobs particularly short running or more 
complex than others.

One thing to note is that the backlog relies entirely on OS 
functionality, which can be subject to an upper limit enforced by the OS.
The configured backlog size is just a hint to the OS, but it may ignore 
it; it appears that 128 is not an uncommon upper limit, but maybe there 
are lower settings out there.
You can check this limit via sysctl -a | grep net.core.somaxconn
Maybe this value is set to 0, effectively disabling the backlog?

It may also be worthwhile to monitor the number of such 
connections.(|netstat -ant | grep -c SYN_REC)|

@Nico Do you have any ideas?

On 10/1/2020 6:26 PM, Hailu, Andreas wrote:
>
> Hi Chesnay, Till, thanks for responding.
>
> @Chesnay:
>
> Apologies, I said cores when I meant slots JSo a total of 26 Task 
> managers with 2 slots each for a grand total of 52 parallelism.
>
> @Till:
>
> For this application, we have a grand total of 78 jobs, with some of 
> them demanding more parallelism than others. Each job has multiple 
> operators – depending on the size of the data we’re operating on, we 
> could submit 1 whopper with 52 parallelism, or multiple smaller jobs 
> submitted in parallel with a sum of 52 parallelism. When does a task 
> submission to a `TaskExecutor` take place? Is that on job submission 
> or something else? I’m just curious as a parallelism of 52 seems on 
> the lower side to breach 1K connections in the queue, unless 
> interactions with the Blobserver are much more frequent than I think. 
> Is it possible that separate Flink jobs share the same Blobserver? 
> Because we have thousands of Flink applications running concurrently 
> in our YARN cluster.
>
> *// *ah**
>
> *From:*Chesnay Schepler <ch...@apache.org>
> *Sent:* Thursday, October 1, 2020 5:42 AM
> *To:* Till Rohrmann <tr...@apache.org>; Hailu, Andreas 
> [Engineering] <An...@ny.email.gs.com>
> *Cc:* user@flink.apache.org
> *Subject:* Re: Blobserver dying mid-application
>
> It would also be good to know how many slots you have on each task 
> executor.
>
> On 10/1/2020 11:21 AM, Till Rohrmann wrote:
>
>     Hi Andreas,
>
>     do the logs of the JM contain any information?
>
>     Theoretically, each task submission to a `TaskExecutor` can
>     trigger a new connection to the BlobServer. This depends a bit on
>     how large your TaskInformation is and whether this information is
>     being offloaded to the BlobServer. What you can definitely try to
>     do is to increase the blob.fetch.backlog in order to see whether
>     this solves the problem.
>
>     How many jobs and in with what timeline do you submit them to the
>     Flink cluster? Maybe you can share a bit more details about the
>     application you are running.
>
>     Cheers,
>
>     Till
>
>     On Wed, Sep 30, 2020 at 11:49 PM Hailu, Andreas
>     <Andreas.Hailu@gs.com <ma...@gs.com>> wrote:
>
>         Hello folks, I’m seeing application failures where our
>         Blobserver is refusing connections mid application:
>
>         2020-09-30 13:56:06,227 INFO
>         [flink-akka.actor.default-dispatcher-18]
>         org.apache.flink.runtime.taskexecutor.TaskExecutor -
>         Un-registering task and sending final execution state FINISHED
>         to JobManager for task DataSink (TextOutputFormat
>         (hdfs:/user/p2epda/lake/delp_prod/PROD/APPROVED/staging/datastore/MandateTradingLine/tmp_pipeline_collapse)
>         - UTF-8) 3d1890b47f4398d805cf0c1b54286f71.
>
>         2020-09-30 13:56:06,423 INFO
>         [flink-akka.actor.default-dispatcher-18]
>         org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable -
>         Free slot TaskSlot(index:0, state:ACTIVE, resource profile:
>         ResourceProfile{cpuCores=1.7976931348623157E308,
>         heapMemoryInMB=2147483647, directMemoryInMB=2147483647,
>         nativeMemoryInMB=2147483647, networkMemoryInMB=2147483647,
>         managedMemoryInMB=3046}, allocationId:
>         e8be16ec74f16c795d95b89cd08f5c37, jobId:
>         e808de0373bd515224434b7ec1efe249).
>
>         2020-09-30 13:56:06,424 INFO
>         [flink-akka.actor.default-dispatcher-18]
>         org.apache.flink.runtime.taskexecutor.JobLeaderService -
>         Remove job e808de0373bd515224434b7ec1efe249 from job leader
>         monitoring.
>
>         2020-09-30 13:56:06,424 INFO
>         [flink-akka.actor.default-dispatcher-18]
>         org.apache.flink.runtime.taskexecutor.TaskExecutor - Close
>         JobManager connection for job e808de0373bd515224434b7ec1efe249.
>
>         2020-09-30 13:56:06,426 INFO
>         [flink-akka.actor.default-dispatcher-18]
>         org.apache.flink.runtime.taskexecutor.TaskExecutor - Close
>         JobManager connection for job e808de0373bd515224434b7ec1efe249.
>
>         2020-09-30 13:56:06,426 INFO
>         [flink-akka.actor.default-dispatcher-18]
>         org.apache.flink.runtime.taskexecutor.JobLeaderService -
>         Cannot reconnect to job e808de0373bd515224434b7ec1efe249
>         because it is not registered.
>
>         2020-09-30 13:56:09,918 INFO  [CHAIN DataSource (dataset |
>         Read Staging From File System | AVRO) -> Map (Map at
>         readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter
>         (Filter at validateData(DAXTask.java:97)) -> FlatMap (FlatMap
>         at handleBloomFilter(PreMergeTask.java:187)) -> FlatMap
>         (FlatMap at collapsePipelineIfRequired(Task.java:160)) (1/1)]
>         org.apache.flink.runtime.blob.BlobClient - Downloading
>         48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
>         from d43723-430.dc.gs.com/10.48.128.14:46473
>         <http://d43723-430.dc.gs.com/10.48.128.14:46473> (retry 3)
>
>         2020-09-30 13:56:09,920 ERROR [CHAIN DataSource (dataset |
>         Read Staging From File System | AVRO) -> Map (Map at
>         readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter
>         (Filter at validateData(DAXTask.java:97)) -> FlatMap (FlatMap
>         at handleBloomFilter(PreMergeTask.java:187)) -> FlatMap
>         (FlatMap at collapsePipelineIfRequired(Task.java:160)) (1/1)]
>         org.apache.flink.runtime.blob.BlobClient - Failed to fetch
>         BLOB
>         48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
>         from d43723-430.dc.gs.com/10.48.128.14:46473
>         <http://d43723-430.dc.gs.com/10.48.128.14:46473> and store it
>         under
>         /fs/htmp/yarn/local/usercache/delp_prod/appcache/application_1599723434208_15328880/blobStore-e2888df1-c7be-4ce6-b6b6-58e7c24a140a/incoming/temp-00000004
>         Retrying...
>
>         2020-09-30 13:56:09,920 INFO  [CHAIN DataSource (dataset |
>         Read Staging From File System | AVRO) -> Map (Map at
>         readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter
>         (Filter at validateData(DAXTask.java:97)) -> FlatMap (FlatMap
>         at handleBloomFilter(PreMergeTask.java:187)) -> FlatMap
>         (FlatMap at collapsePipelineIfRequired(Task.java:160)) (1/1)]
>         org.apache.flink.runtime.blob.BlobClient - Downloading
>         48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
>         from d43723-430.dc.gs.com/10.48.128.14:46473
>         <http://d43723-430.dc.gs.com/10.48.128.14:46473> (retry 4)
>
>         2020-09-30 13:56:09,922 ERROR [CHAIN DataSource (dataset |
>         Read Staging From File System | AVRO) -> Map (Map at
>         readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter
>         (Filter at validateData(DAXTask.java:97)) -> FlatMap (FlatMap
>         at handleBloomFilter(PreMergeTask.java:187)) -> FlatMap
>         (FlatMap at collapsePipelineIfRequired(Task.java:160)) (1/1)]
>         org.apache.flink.runtime.blob.BlobClient - Failed to fetch
>         BLOB
>         48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
>         from d43723-430.dc.gs.com/10.48.128.14:46473
>         <http://d43723-430.dc.gs.com/10.48.128.14:46473> and store it
>         under
>         /fs/htmp/yarn/local/usercache/delp_prod/appcache/application_1599723434208_15328880/blobStore-e2888df1-c7be-4ce6-b6b6-58e7c24a140a/incoming/temp-00000004
>         Retrying...
>
>         2020-09-30 13:56:09,922 INFO  [CHAIN DataSource (dataset |
>         Read Staging From File System | AVRO) -> Map (Map at
>         readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter
>         (Filter at validateData(DAXTask.java:97)) -> FlatMap (FlatMap
>         at handleBloomFilter(PreMergeTask.java:187)) -> FlatMap
>         (FlatMap at collapsePipelineIfRequired(Task.java:160)) (1/1)]
>         org.apache.flink.runtime.blob.BlobClient - Downloading
>         48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
>         from d43723-430.dc.gs.com/10.48.128.14:46473
>         <http://d43723-430.dc.gs.com/10.48.128.14:46473> (retry 5)
>
>         2020-09-30 13:56:09,925 ERROR [CHAIN DataSource (dataset |
>         Read Staging From File System | AVRO) -> Map (Map at
>         readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter
>         (Filter at validateData(DAXTask.java:97)) -> FlatMap (FlatMap
>         at handleBloomFilter(PreMergeTask.java:187)) -> FlatMap
>         (FlatMap at collapsePipelineIfRequired(Task.java:160)) (1/1)]
>         org.apache.flink.runtime.blob.BlobClient - Failed to fetch
>         BLOB
>         48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
>         from d43723-430.dc.gs.com/10.48.128.14:46473
>         <http://d43723-430.dc.gs.com/10.48.128.14:46473> and store it
>         under
>         /fs/htmp/yarn/local/usercache/delp_prod/appcache/application_1599723434208_15328880/blobStore-e2888df1-c7be-4ce6-b6b6-58e7c24a140a/incoming/temp-00000004
>         No retries left.
>
>         java.io.IOException: Could not connect to BlobServer at
>         address d43723-430.dc.gs.com/10.48.128.14:46473
>         <http://d43723-430.dc.gs.com/10.48.128.14:46473>
>
>         at
>         org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:100)
>
>         at
>         org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:143)
>
>         at
>         org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:181)
>
>         at
>         org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:202)
>
>         at
>         org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120)
>
>         at
>         org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:915)
>
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:595)
>
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>
>         at java.lang.Thread.run(Thread.java:745)
>
>         Caused by: java.net.ConnectException: Connection refused
>         (Connection refused)
>
>         at java.net.PlainSocketImpl.socketConnect(Native Method)
>
>         at
>         java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
>
>         at
>         java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
>
>         at
>         java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
>
>         at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
>
>         at java.net.Socket.connect(Socket.java:589)
>
>         at
>         org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:95)
>
>         ... 8 more
>
>         Prior to the above connection refused error, I don’t see any
>         exceptions or failures. We’re running this application with
>         v1.9.2 on YARN, 26 Task Managers with 2 cores each, and with
>         the default BLOB server configurations. The application itself
>         then has many jobs it submits to the cluster. Does this sound
>         like a blob.fetch.backlog/concurrent-connections config
>         problem (defaulted to 1000 and 50 respectively)? I wasn’t sure
>         how chatty each TM is with the server. How can we tell if it’s
>         either a max concurrent-conn or backlog problem?
>
>         Best,
>
>         Andreas
>
>         ------------------------------------------------------------------------
>
>
>         Your Personal Data: We may collect and process information
>         about you that may be subject to data protection laws. For
>         more information about how we use and disclose your personal
>         data, how we protect your information, our legal basis to use
>         your information, your rights and who you can contact, please
>         refer to: www.gs.com/privacy-notices
>         <http://www.gs.com/privacy-notices>
>
>
> ------------------------------------------------------------------------
>
> Your Personal Data: We may collect and process information about you 
> that may be subject to data protection laws. For more information 
> about how we use and disclose your personal data, how we protect your 
> information, our legal basis to use your information, your rights and 
> who you can contact, please refer to: www.gs.com/privacy-notices 
> <http://www.gs.com/privacy-notices>



RE: Blobserver dying mid-application

Posted by "Hailu, Andreas" <An...@gs.com>.
Hi Chesnay, Till, thanks for responding.

@Chesnay:
Apologies, I said cores when I meant slots ☺ So a total of 26 Task managers with 2 slots each for a grand total of 52 parallelism.

@Till:
For this application, we have a grand total of 78 jobs, with some of them demanding more parallelism than others. Each job has multiple operators – depending on the size of the data we’re operating on, we could submit 1 whopper with 52 parallelism, or multiple smaller jobs submitted in parallel with a sum of 52 parallelism. When does a task submission to a `TaskExecutor` take place? Is that on job submission or something else? I’m just curious as a parallelism of 52 seems on the lower side to breach 1K connections in the queue, unless interactions with the Blobserver are much more frequent than I think. Is it possible that separate Flink jobs share the same Blobserver? Because we have thousands of Flink applications running concurrently in our YARN cluster.

// ah

From: Chesnay Schepler <ch...@apache.org>
Sent: Thursday, October 1, 2020 5:42 AM
To: Till Rohrmann <tr...@apache.org>; Hailu, Andreas [Engineering] <An...@ny.email.gs.com>
Cc: user@flink.apache.org
Subject: Re: Blobserver dying mid-application

It would also be good to know how many slots you have on each task executor.

On 10/1/2020 11:21 AM, Till Rohrmann wrote:
Hi Andreas,

do the logs of the JM contain any information?

Theoretically, each task submission to a `TaskExecutor` can trigger a new connection to the BlobServer. This depends a bit on how large your TaskInformation is and whether this information is being offloaded to the BlobServer. What you can definitely try to do is to increase the blob.fetch.backlog in order to see whether this solves the problem.

How many jobs and in with what timeline do you submit them to the Flink cluster? Maybe you can share a bit more details about the application you are running.

Cheers,
Till

On Wed, Sep 30, 2020 at 11:49 PM Hailu, Andreas <An...@gs.com>> wrote:
Hello folks, I’m seeing application failures where our Blobserver is refusing connections mid application:

2020-09-30 13:56:06,227 INFO  [flink-akka.actor.default-dispatcher-18] org.apache.flink.runtime.taskexecutor.TaskExecutor            - Un-registering task and sending final execution state FINISHED to JobManager for task DataSink (TextOutputFormat (hdfs:/user/p2epda/lake/delp_prod/PROD/APPROVED/staging/datastore/MandateTradingLine/tmp_pipeline_collapse) - UTF-8) 3d1890b47f4398d805cf0c1b54286f71.
2020-09-30 13:56:06,423 INFO  [flink-akka.actor.default-dispatcher-18] org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable      - Free slot TaskSlot(index:0, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.7976931348623157E308, heapMemoryInMB=2147483647, directMemoryInMB=2147483647, nativeMemoryInMB=2147483647, networkMemoryInMB=2147483647, managedMemoryInMB=3046}, allocationId: e8be16ec74f16c795d95b89cd08f5c37, jobId: e808de0373bd515224434b7ec1efe249).
2020-09-30 13:56:06,424 INFO  [flink-akka.actor.default-dispatcher-18] org.apache.flink.runtime.taskexecutor.JobLeaderService        - Remove job e808de0373bd515224434b7ec1efe249 from job leader monitoring.
2020-09-30 13:56:06,424 INFO  [flink-akka.actor.default-dispatcher-18] org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close JobManager connection for job e808de0373bd515224434b7ec1efe249.
2020-09-30 13:56:06,426 INFO  [flink-akka.actor.default-dispatcher-18] org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close JobManager connection for job e808de0373bd515224434b7ec1efe249.
2020-09-30 13:56:06,426 INFO  [flink-akka.actor.default-dispatcher-18] org.apache.flink.runtime.taskexecutor.JobLeaderService        - Cannot reconnect to job e808de0373bd515224434b7ec1efe249 because it is not registered.
2020-09-30 13:56:09,918 INFO  [CHAIN DataSource (dataset | Read Staging From File System | AVRO) -> Map (Map at readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter at validateData(DAXTask.java:97)) -> FlatMap (FlatMap at handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at collapsePipelineIfRequired(Task.java:160)) (1/1)] org.apache.flink.runtime.blob.BlobClient                      - Downloading 48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39 from d43723-430.dc.gs.com/10.48.128.14:46473<http://d43723-430.dc.gs.com/10.48.128.14:46473> (retry 3)
2020-09-30 13:56:09,920 ERROR [CHAIN DataSource (dataset | Read Staging From File System | AVRO) -> Map (Map at readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter at validateData(DAXTask.java:97)) -> FlatMap (FlatMap at handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at collapsePipelineIfRequired(Task.java:160)) (1/1)] org.apache.flink.runtime.blob.BlobClient                      - Failed to fetch BLOB 48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39 from d43723-430.dc.gs.com/10.48.128.14:46473<http://d43723-430.dc.gs.com/10.48.128.14:46473> and store it under /fs/htmp/yarn/local/usercache/delp_prod/appcache/application_1599723434208_15328880/blobStore-e2888df1-c7be-4ce6-b6b6-58e7c24a140a/incoming/temp-00000004 Retrying...
2020-09-30 13:56:09,920 INFO  [CHAIN DataSource (dataset | Read Staging From File System | AVRO) -> Map (Map at readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter at validateData(DAXTask.java:97)) -> FlatMap (FlatMap at handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at collapsePipelineIfRequired(Task.java:160)) (1/1)] org.apache.flink.runtime.blob.BlobClient                      - Downloading 48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39 from d43723-430.dc.gs.com/10.48.128.14:46473<http://d43723-430.dc.gs.com/10.48.128.14:46473> (retry 4)
2020-09-30 13:56:09,922 ERROR [CHAIN DataSource (dataset | Read Staging From File System | AVRO) -> Map (Map at readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter at validateData(DAXTask.java:97)) -> FlatMap (FlatMap at handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at collapsePipelineIfRequired(Task.java:160)) (1/1)] org.apache.flink.runtime.blob.BlobClient                      - Failed to fetch BLOB 48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39 from d43723-430.dc.gs.com/10.48.128.14:46473<http://d43723-430.dc.gs.com/10.48.128.14:46473> and store it under /fs/htmp/yarn/local/usercache/delp_prod/appcache/application_1599723434208_15328880/blobStore-e2888df1-c7be-4ce6-b6b6-58e7c24a140a/incoming/temp-00000004 Retrying...
2020-09-30 13:56:09,922 INFO  [CHAIN DataSource (dataset | Read Staging From File System | AVRO) -> Map (Map at readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter at validateData(DAXTask.java:97)) -> FlatMap (FlatMap at handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at collapsePipelineIfRequired(Task.java:160)) (1/1)] org.apache.flink.runtime.blob.BlobClient                      - Downloading 48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39 from d43723-430.dc.gs.com/10.48.128.14:46473<http://d43723-430.dc.gs.com/10.48.128.14:46473> (retry 5)
2020-09-30 13:56:09,925 ERROR [CHAIN DataSource (dataset | Read Staging From File System | AVRO) -> Map (Map at readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter at validateData(DAXTask.java:97)) -> FlatMap (FlatMap at handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at collapsePipelineIfRequired(Task.java:160)) (1/1)] org.apache.flink.runtime.blob.BlobClient                      - Failed to fetch BLOB 48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39 from d43723-430.dc.gs.com/10.48.128.14:46473<http://d43723-430.dc.gs.com/10.48.128.14:46473> and store it under /fs/htmp/yarn/local/usercache/delp_prod/appcache/application_1599723434208_15328880/blobStore-e2888df1-c7be-4ce6-b6b6-58e7c24a140a/incoming/temp-00000004 No retries left.
java.io.IOException: Could not connect to BlobServer at address d43723-430.dc.gs.com/10.48.128.14:46473<http://d43723-430.dc.gs.com/10.48.128.14:46473>
                at org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:100)
                at org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:143)
                at org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:181)
                at org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:202)
                at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120)
                at org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:915)
                at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:595)
                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
                at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused (Connection refused)
                at java.net.PlainSocketImpl.socketConnect(Native Method)
                at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
                at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
                at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
                at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
                at java.net.Socket.connect(Socket.java:589)
                at org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:95)
                ... 8 more

Prior to the above connection refused error, I don’t see any exceptions or failures. We’re running this application with v1.9.2 on YARN, 26 Task Managers with 2 cores each, and with the default BLOB server configurations. The application itself then has many jobs it submits to the cluster. Does this sound like a blob.fetch.backlog/concurrent-connections config problem (defaulted to 1000 and 50 respectively)? I wasn’t sure how chatty each TM is with the server. How can we tell if it’s either a max concurrent-conn or backlog problem?

Best,
Andreas

________________________________

Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>



________________________________

Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>

Re: Blobserver dying mid-application

Posted by Chesnay Schepler <ch...@apache.org>.
It would also be good to know how many slots you have on each task executor.

On 10/1/2020 11:21 AM, Till Rohrmann wrote:
> Hi Andreas,
>
> do the logs of the JM contain any information?
>
> Theoretically, each task submission to a `TaskExecutor` can trigger a 
> new connection to the BlobServer. This depends a bit on how large your 
> TaskInformation is and whether this information is being offloaded to 
> the BlobServer. What you can definitely try to do is to increase the 
> blob.fetch.backlog in order to see whether this solves the problem.
>
> How many jobs and in with what timeline do you submit them to the 
> Flink cluster? Maybe you can share a bit more details about the 
> application you are running.
>
> Cheers,
> Till
>
> On Wed, Sep 30, 2020 at 11:49 PM Hailu, Andreas <Andreas.Hailu@gs.com 
> <ma...@gs.com>> wrote:
>
>     Hello folks, I’m seeing application failures where our Blobserver
>     is refusing connections mid application:
>
>     2020-09-30 13:56:06,227 INFO
>     [flink-akka.actor.default-dispatcher-18]
>     org.apache.flink.runtime.taskexecutor.TaskExecutor -
>     Un-registering task and sending final execution state FINISHED to
>     JobManager for task DataSink (TextOutputFormat
>     (hdfs:/user/p2epda/lake/delp_prod/PROD/APPROVED/staging/datastore/MandateTradingLine/tmp_pipeline_collapse)
>     - UTF-8) 3d1890b47f4398d805cf0c1b54286f71.
>
>     2020-09-30 13:56:06,423 INFO
>     [flink-akka.actor.default-dispatcher-18]
>     org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable - Free
>     slot TaskSlot(index:0, state:ACTIVE, resource profile:
>     ResourceProfile{cpuCores=1.7976931348623157E308,
>     heapMemoryInMB=2147483647, directMemoryInMB=2147483647,
>     nativeMemoryInMB=2147483647, networkMemoryInMB=2147483647,
>     managedMemoryInMB=3046}, allocationId:
>     e8be16ec74f16c795d95b89cd08f5c37, jobId:
>     e808de0373bd515224434b7ec1efe249).
>
>     2020-09-30 13:56:06,424 INFO
>     [flink-akka.actor.default-dispatcher-18]
>     org.apache.flink.runtime.taskexecutor.JobLeaderService - Remove
>     job e808de0373bd515224434b7ec1efe249 from job leader monitoring.
>
>     2020-09-30 13:56:06,424 INFO
>     [flink-akka.actor.default-dispatcher-18]
>     org.apache.flink.runtime.taskexecutor.TaskExecutor - Close
>     JobManager connection for job e808de0373bd515224434b7ec1efe249.
>
>     2020-09-30 13:56:06,426 INFO
>     [flink-akka.actor.default-dispatcher-18]
>     org.apache.flink.runtime.taskexecutor.TaskExecutor - Close
>     JobManager connection for job e808de0373bd515224434b7ec1efe249.
>
>     2020-09-30 13:56:06,426 INFO
>     [flink-akka.actor.default-dispatcher-18]
>     org.apache.flink.runtime.taskexecutor.JobLeaderService - Cannot
>     reconnect to job e808de0373bd515224434b7ec1efe249 because it is
>     not registered.
>
>     2020-09-30 13:56:09,918 INFO  [CHAIN DataSource (dataset | Read
>     Staging From File System | AVRO) -> Map (Map at
>     readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter
>     at validateData(DAXTask.java:97)) -> FlatMap (FlatMap at
>     handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at
>     collapsePipelineIfRequired(Task.java:160)) (1/1)]
>     org.apache.flink.runtime.blob.BlobClient - Downloading
>     48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
>     from d43723-430.dc.gs.com/10.48.128.14:46473
>     <http://d43723-430.dc.gs.com/10.48.128.14:46473> (retry 3)
>
>     2020-09-30 13:56:09,920 ERROR [CHAIN DataSource (dataset | Read
>     Staging From File System | AVRO) -> Map (Map at
>     readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter
>     at validateData(DAXTask.java:97)) -> FlatMap (FlatMap at
>     handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at
>     collapsePipelineIfRequired(Task.java:160)) (1/1)]
>     org.apache.flink.runtime.blob.BlobClient - Failed to fetch BLOB
>     48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
>     from d43723-430.dc.gs.com/10.48.128.14:46473
>     <http://d43723-430.dc.gs.com/10.48.128.14:46473> and store it
>     under
>     /fs/htmp/yarn/local/usercache/delp_prod/appcache/application_1599723434208_15328880/blobStore-e2888df1-c7be-4ce6-b6b6-58e7c24a140a/incoming/temp-00000004
>     Retrying...
>
>     2020-09-30 13:56:09,920 INFO  [CHAIN DataSource (dataset | Read
>     Staging From File System | AVRO) -> Map (Map at
>     readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter
>     at validateData(DAXTask.java:97)) -> FlatMap (FlatMap at
>     handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at
>     collapsePipelineIfRequired(Task.java:160)) (1/1)]
>     org.apache.flink.runtime.blob.BlobClient - Downloading
>     48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
>     from d43723-430.dc.gs.com/10.48.128.14:46473
>     <http://d43723-430.dc.gs.com/10.48.128.14:46473> (retry 4)
>
>     2020-09-30 13:56:09,922 ERROR [CHAIN DataSource (dataset | Read
>     Staging From File System | AVRO) -> Map (Map at
>     readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter
>     at validateData(DAXTask.java:97)) -> FlatMap (FlatMap at
>     handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at
>     collapsePipelineIfRequired(Task.java:160)) (1/1)]
>     org.apache.flink.runtime.blob.BlobClient - Failed to fetch BLOB
>     48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
>     from d43723-430.dc.gs.com/10.48.128.14:46473
>     <http://d43723-430.dc.gs.com/10.48.128.14:46473> and store it
>     under
>     /fs/htmp/yarn/local/usercache/delp_prod/appcache/application_1599723434208_15328880/blobStore-e2888df1-c7be-4ce6-b6b6-58e7c24a140a/incoming/temp-00000004
>     Retrying...
>
>     2020-09-30 13:56:09,922 INFO  [CHAIN DataSource (dataset | Read
>     Staging From File System | AVRO) -> Map (Map at
>     readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter
>     at validateData(DAXTask.java:97)) -> FlatMap (FlatMap at
>     handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at
>     collapsePipelineIfRequired(Task.java:160)) (1/1)]
>     org.apache.flink.runtime.blob.BlobClient - Downloading
>     48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
>     from d43723-430.dc.gs.com/10.48.128.14:46473
>     <http://d43723-430.dc.gs.com/10.48.128.14:46473> (retry 5)
>
>     2020-09-30 13:56:09,925 ERROR [CHAIN DataSource (dataset | Read
>     Staging From File System | AVRO) -> Map (Map at
>     readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter
>     at validateData(DAXTask.java:97)) -> FlatMap (FlatMap at
>     handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at
>     collapsePipelineIfRequired(Task.java:160)) (1/1)]
>     org.apache.flink.runtime.blob.BlobClient - Failed to fetch BLOB
>     48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
>     from d43723-430.dc.gs.com/10.48.128.14:46473
>     <http://d43723-430.dc.gs.com/10.48.128.14:46473> and store it
>     under
>     /fs/htmp/yarn/local/usercache/delp_prod/appcache/application_1599723434208_15328880/blobStore-e2888df1-c7be-4ce6-b6b6-58e7c24a140a/incoming/temp-00000004
>     No retries left.
>
>     java.io.IOException: Could not connect to BlobServer at address
>     d43723-430.dc.gs.com/10.48.128.14:46473
>     <http://d43723-430.dc.gs.com/10.48.128.14:46473>
>
>                     at
>     org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:100)
>
>                     at
>     org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:143)
>
>                     at
>     org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:181)
>
>                     at
>     org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:202)
>
>                     at
>     org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120)
>
>                     at
>     org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:915)
>
>                     at
>     org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:595)
>
>                     at
>     org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>
>                     at java.lang.Thread.run(Thread.java:745)
>
>     Caused by: java.net.ConnectException: Connection refused
>     (Connection refused)
>
>                     at java.net.PlainSocketImpl.socketConnect(Native
>     Method)
>
>                     at
>     java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
>
>                     at
>     java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
>
>                     at
>     java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
>
>                     at
>     java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
>
>                     at java.net.Socket.connect(Socket.java:589)
>
>                     at
>     org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:95)
>
>                     ... 8 more
>
>     Prior to the above connection refused error, I don’t see any
>     exceptions or failures. We’re running this application with v1.9.2
>     on YARN, 26 Task Managers with 2 cores each, and with the default
>     BLOB server configurations. The application itself then has many
>     jobs it submits to the cluster. Does this sound like a
>     blob.fetch.backlog/concurrent-connections config problem
>     (defaulted to 1000 and 50 respectively)? I wasn’t sure how chatty
>     each TM is with the server. How can we tell if it’s either a max
>     concurrent-conn or backlog problem?
>
>     Best,
>
>     Andreas
>
>
>     ------------------------------------------------------------------------
>
>     Your Personal Data: We may collect and process information about
>     you that may be subject to data protection laws. For more
>     information about how we use and disclose your personal data, how
>     we protect your information, our legal basis to use your
>     information, your rights and who you can contact, please refer to:
>     www.gs.com/privacy-notices <http://www.gs.com/privacy-notices>
>