You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Ben Sidhom (JIRA)" <ji...@apache.org> on 2018/08/08 18:35:00 UTC

[jira] [Commented] (BEAM-5110) Reconile Flink JVM singleton management with deployment

    [ https://issues.apache.org/jira/browse/BEAM-5110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16573681#comment-16573681 ] 

Ben Sidhom commented on BEAM-5110:
----------------------------------

Note that we can partially fix the issue by requiring that users who deploy in standalone "sessions" drop the Flink job server jar into the lib/ directory of all workers. That ensures that the necessary classes are available in the static top-level classloader.

However, classloading in Flink is inverted from the usual behavior: "By default, Flink inverts classloading order, meaning it looks into the user code classloader first, and only looks into the parent (application classloader) if the class is not part of the dynamically loaded user code."

Making sure this works in practice also requires a change in the Flink runner code where we create a connection to remote environments. We have to ensure that _no_ user jars are staged here because any classes defined here will be loaded dynamically and take precedence over the root classloader.

> Reconile Flink JVM singleton management with deployment
> -------------------------------------------------------
>
>                 Key: BEAM-5110
>                 URL: https://issues.apache.org/jira/browse/BEAM-5110
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>            Reporter: Ben Sidhom
>            Assignee: Ben Sidhom
>            Priority: Major
>
> [~angoenka] noticed through debugging that multiple instances of BatchFlinkExecutableStageContext.BatchFactory are loaded for a given job when executing in standalone cluster mode. This context factory is responsible for maintaining singleton state across a TaskManager (JVM) in order to share SDK Environments across workers in a given job. The multiple-loading breaks singleton semantics and results in an indeterminate number of Environments being created.
> It turns out that the [Flink classloading mechanism|https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/debugging_classloading.html] is determined by deployment mode. Note that "user code" as referenced by this link is actually the Flink job server jar. Actual end-user code lives inside of the SDK Environment and uploaded artifacts.
> In order to maintain singletons without resorting to IPC (for example, using file locks and/or additional gRPC servers), we need to force non-dynamic classloading. For example, this happens when jobs are submitted to YARN for one-off deployments via `flink run`. However, connecting to an existing (Flink standalone) deployment results in dynamic classloading.
> We should investigate this behavior and either document (and attempt to enforce) deployment modes that are consistent with our requirements, or (if possible) create a custom classloader that enforces singleton loading.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)