You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Shuyi Chen <su...@gmail.com> on 2018/05/14 06:14:33 UTC

[DISCUSS] Adding new interfaces in [Stream]ExecutionEnvironment

Hi Flink devs,

In an effort to support loading external libraries and creating UDFs from
external libraries using DDL in Flink SQL, we want to use Flink’s Blob
Server to distribute the external libraries in runtime and load those
libraries into the user code classloader automatically.

However, the current [Stream]ExecutionEnvironment.registerCachedFile
interface limits only to registering executable or non-executable blobs.
It’s not possible to tell in runtime if the blob files are libraries and
should be loaded into the user code classloader in RuntimeContext.
Therefore, I want to propose to add an enum called *BlobType* explicitly to
indicate the type of the Blob file being distributed, and the following
interface in [Stream]ExecutionEnvironment to support it. In general, I
think the new BlobType information can be used by Flink runtime to
preprocess the Blob files if needed.

*/***
** Registers a file at the distributed cache under the given name. The file
will be accessible*
** from any user-defined function in the (distributed) runtime under a
local path. Files*
** may be local files (as long as all relevant workers have access to it),
or files in a distributed file system.*
** The runtime will copy the files temporarily to a local cache, if needed.*
***
** <p>The {@link org.apache.flink.api.common.functions.RuntimeContext} can
be obtained inside UDFs via*
** {@link
org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and
provides access*
** {@link org.apache.flink.api.common.ca
<http://org.apache.flink.api.common.ca>che.DistributedCache} via*
** {@link
org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.*
***
** @param filePath The path of the file, as a URI (e.g. "file:///some/path"
or "hdfs://host:port/and/path")*
** @param name The name under which the file is registered.*
** @param blobType indicating the type of the Blob file*
**/*

*public void registerCachedFile(String filePath, String name,
DistributedCache.BlobType blobType) {...}*

Optionally, we can add another interface to register UDF Jars which will
use the interface above to implement.

*public void registerJarFile(String filePath, String name) {...}*

The existing interface in the following will be marked deprecated:

*public void registerCachedFile(String filePath, String name, boolean
executable) {...}*

And the following interface will be implemented using the new interface
proposed above with a EXECUTABLE BlobType:

*public void registerCachedFile(String filePath, String name) { ... }*

Thanks a lot.
Shuyi

"So you have to trust that the dots will somehow connect in your future."

Re: [DISCUSS] Adding new interfaces in [Stream]ExecutionEnvironment

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Marvin,

The discussion was not resolved yet.
Can you tell us a bit about your use case? Maybe that would help with the
decision.
Which of the discussed approaches would work best for your use case and why?

Thanks, Fabian

2018-06-25 13:27 GMT+02:00 Marvin777 <xy...@gmail.com>:

> Hi, Shuyi:
>
> What is the progress of the discussion?  We also look forward to this
> feature.
> Thanks.
>
> Shuyi Chen <su...@gmail.com> 于2018年6月8日周五 下午3:04写道:
>
> > Thanks a lot for the comments, Till and Fabian.
> >
> > The RemoteEnvrionment does provide a way to specify jar files at
> > construction, but we want the jar files to be specified dynamically in
> the
> > user code, e.g. in a DDL statement, and the jar files might be in a
> remote
> > DFS. As we discussed, I think there are 2 approaches:
> >
> > 1) add new interface env.registerJarFile(jarFiles...), which ships the
> JAR
> > files using JobGraph.addJar(). In this case, all jars will be loaded by
> > default at runtime. This approach will be the same as how SQL client ship
> > UDF jars now.
> > 2) add new interface env.registerJarFile(name, jarFiles...). It will do
> > similar things as env.registerCachedFile(), which will register a set of
> > Jar files with a key name, and we can add a new interface in
> > RuntimeContext as Fabian suggests, i.e.,
> > RuntimeContext.getClassloaderWithJar(<key name>). Now user will be able
> to
> > load the functions in remote jar dynamically using the returned
> > ClassLoader.
> >
> > Comparing the 2 approaches:
> >
> >    - Approach 1) will be simpler for user to use.
> >    - Approach 2) will allow us to use different versions of a class in
> the
> >    same code, and might solve some dependency conflict issues. Also in
> 2),
> > we
> >    can load Jars on demand, while in 1) all jars will be loaded by
> default.
> >
> > I think we can support both interfaces. On the SQL DDL implementation,
> both
> > will work and approach 2) will be more complicated, but with some nice
> > benefit as stated above. However, the implementation choice should be
> > transparent to the end user. Also, I am wondering outside of the SQL DDL,
> > will these new functionality/interface be helpful in other scenarios?
> > Maybe, that will help make the interface better and more generic. Thanks
> a
> > lot.
> >
> > Shuyi
> >
> > On Tue, Jun 5, 2018 at 1:47 AM Fabian Hueske <fh...@gmail.com> wrote:
> >
> > > We could also offer a feature that users can request classloaders with
> > > additional jars.
> > > This could work as follows:
> > >
> > > 1) Users register jar files in the ExecutionEnvironment (similar to
> > cached
> > > files) with a name, e.g., env.registerJarFile("~/myJar.jar",
> "myName");
> > > 2) In a function, the user can request a user classloader with the
> > > additional classes, e.g., RuntimeContext.
> getClassloaderWithJar("myName");
> > > This could also support to load multiple jar files in the same
> > classloader.
> > >
> > > IMO, the interesting part of Shuyi's proposal is to be able to
> > dynamically
> > > load code from remote locations without fetching it to the client
> first.
> > >
> > > Best, Fabian
> > >
> > >
> > > 2018-05-29 12:42 GMT+02:00 Till Rohrmann <tr...@apache.org>:
> > >
> > > > I see Shuyi's point that it would nice to allow adding jar files
> which
> > > > should be part of the user code classloader programmatically.
> Actually,
> > > we
> > > > expose this functionality in the `RemoteEnvironment` where you can
> > > specify
> > > > additional jars which shall be shipped to the cluster in the
> > > constructor. I
> > > > assume that is exactly the functionality you are looking for. In that
> > > > sense, it might be an API inconsistency that we allow it for some
> cases
> > > and
> > > > for others not.
> > > >
> > > > But I could also see that the whole functionality of dynamically
> > loading
> > > > jars at runtime could also perfectly live in the `UdfSqlOperator`.
> > This,
> > > of
> > > > course, would entail that one has to take care of clean up of the
> > > > downloaded resources. But it should be possible to first download the
> > > > resources and create a custom URLClassLoader at startup and then use
> > this
> > > > class loader when calling into the UDF.
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Wed, May 16, 2018 at 9:28 PM, Shuyi Chen <su...@gmail.com>
> > wrote:
> > > >
> > > > > Hi Aljoscha, Fabian, Rong, Ted and Timo,
> > > > >
> > > > > Thanks a lot for the feedback. Let me clarify the usage scenario
> in a
> > > bit
> > > > > more detail. The context is that we want to add support for SQL DDL
> > to
> > > > load
> > > > > UDF from external JARs located either in local filesystem or HDFS
> or
> > a
> > > > HTTP
> > > > > endpoint in Flink SQL. The local FS option is more for debugging
> > > purpose
> > > > > for user to submit the job jar locally, and the later 2 are for
> > > > production
> > > > > uses. Below is an example User application with the *CREATE
> FUNCTION*
> > > DDL
> > > > > (Note: grammar and interface not finalized yet).
> > > > >
> > > > > ------------------------------------------------------------
> > > > > -------------------------------------
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > *val env = StreamExecutionEnvironment.getExecutionEnvironmentval
> > tEnv =
> > > > > TableEnvironment.getTableEnvironment(env)// setup the
> > > > DataStream//......*
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > *// register the DataStream under the name
> > > > > "OrderAtEnv.registerDataStream("OrderA", orderA, 'user, 'product,
> > > > > 'amount)tEnv.sqlDDL(  "create function helloFunc as
> > > > > 'com.example.udf.HelloWorld' using jars
> > > > > ('hdfs:///users/david/libraries/my-udf-1.0.1-SNAPSHOT.jar')")val
> > result
> > > > =
> > > > > tEnv.sqlQuery(  "SELECT user, helloFunc(product), amount FROM
> OrderA
> > > > WHERE
> > > > > amount > 2")result.toAppendStream[Order].print()env.execute()*
> > > > > ------------------------------------------------------------
> > > > > -------------------------------------
> > > > >
> > > > > The example application above does the following:
> > > > > 1) it registers a DataStream as a Calcite table(
> > > > > *org.apache.calcite.schema.Table*) under name "OrderA", so SQL can
> > > > > reference the DataStream as table "OrderA".
> > > > > 2) it uses the SQL *CREATE FUNCTION* DDL (grammar and interface not
> > > > > finalized yet) to create a SQL UDF called *helloFunc* from a JAR
> > > located
> > > > in
> > > > > a remote HDFS path.
> > > > > 3) it issues a sql query that uses the *helloFunc* UDF defined
> above
> > > and
> > > > > generate a Flink table (*org.apache.flink.table.api.Table*)
> > > > > 4) it convert the Flink table back to a DataStream and print it.
> > > > >
> > > > > Step 1), 3), and 4) are already implemented. To implement 2), we
> need
> > > to
> > > > do
> > > > > the following to implement the *tEnv.sqlDDL()* function.
> > > > >
> > > > > a) parse the DDL into a SqlNode to extract the UDF *udfClasspath*,
> > UDF
> > > > > remote path *udfUrls[]* and UDF SQL name *udfName*.
> > > > > b) use the URLClassLoader to load the JARs specified in
> *udfUrls[]*,
> > > and
> > > > > register the SQL UDF using the {Batch/Stream/}TableEnvironment
> > > > > registerFunction methods using*  udfClasspath* under name
> *udfName.*
> > > > > c) register the JARs *udfUrls[]* through the
> > > > {Stream}ExecutionEnvironment,
> > > > > so that the JARs can be distributed to all the TaskManagers during
> > > > runtime.
> > > > >
> > > > >
> > > > > Since the CREATE FUNCTION DDL is executed within the user
> > application,
> > > I
> > > > > dont think we have access to the ClusterClient at the point when
> > > > > *tEnv.sqlDDL()* is executed. Also the JARs can be in a remote
> > > filesystem
> > > > > (which is the main usage scenarios), so the user can't really
> prepare
> > > the
> > > > > jar somehow in advance statically.
> > > > >
> > > > > For normal user application, I think {Stream}ExecutionEnvironment
> is
> > > the
> > > > > right place for the functionality, since it provides methods to
> > control
> > > > the
> > > > > job execution and to interact with the outside world, and also, it
> > > > actually
> > > > > already does similar things provided through the
> *registerCachedFile*
> > > > > interface.
> > > > >
> > > > > However, in such case, SQL FUNCTION DDL and SQL client will use 2
> > > > different
> > > > > routes to register UDF jars, one through
> *JobGraph.jobConfiguration*
> > > and
> > > > > the other through *JobGraph.userJars*. So *maybe we can, as Fabian
> > > > > suggests, add **registerUserJarFile()/getUserJarFiles() interfaces
> > > > > in {Stream}ExecutionEnvironment, which stores the jars internally
> in
> > a
> > > > > List, and when generating JobGraph, copy the jars to the JobGraph
> > using
> > > > > the  {Stream}ExecutionEnvironment.getUserJarFiles() and
> > > > > JobGraph.addJar()* (Note,
> > > > > streaming and batch implementations might vary). In such case, both
> > SQL
> > > > > FUNCTION DDL and SQL client will use *JobGraph.userJars* to ship
> the
> > > UDF
> > > > > jars.
> > > > >
> > > > > Hope that clarifies better. What do you guys think? Thanks a lot.
> > > > >
> > > > > Cheers!
> > > > > Shuyi
> > > > >
> > > > > On Wed, May 16, 2018 at 9:45 AM, Rong Rong <wa...@gmail.com>
> > > wrote:
> > > > >
> > > > > > I think the question here is whether registering Jar files (or
> > other
> > > > > > executable files) during job submission is sufficient for
> @shuyi's
> > > use
> > > > > > case.
> > > > > >
> > > > > > If I understand correctly regarding the part of dynamic
> > distribution
> > > of
> > > > > the
> > > > > > external libraries in runtime. This is used to deal with DDL/DSL
> > such
> > > > as:
> > > > > >     CREATE FUNCTION my_fun FROM url://<some_remote_jar>
> > > > > > during execution. Correct me if I am wrong @shuyi, The basic
> > > assumption
> > > > > > that "we can locate and ship all executable JARs during job
> > > submission"
> > > > > no
> > > > > > longer holds for your use case right?
> > > > > >
> > > > > > I guess we are missing details here regarding the "distribution
> of
> > > > > external
> > > > > > libraries in runtime" part. Maybe you can share more example of
> > this
> > > > use
> > > > > > case. Would this be included in the design doc @Timo?
> > > > > >
> > > > > > --
> > > > > > Rong
> > > > > >
> > > > > > On Wed, May 16, 2018 at 5:41 AM, Timo Walther <
> twalthr@apache.org>
> > > > > wrote:
> > > > > >
> > > > > > > Yes, we are using the addJar functionionality of the JobGraph
> as
> > > well
> > > > > for
> > > > > > > the SQL Client.
> > > > > > >
> > > > > > > I think the execution environment is not the right place to
> > specify
> > > > > jars.
> > > > > > > The location of the jars depends on the submission method. If a
> > > local
> > > > > > path
> > > > > > > is specified in the main() method of a packaged Flink jar, it
> > would
> > > > not
> > > > > > > work when such a program is submitted through the REST API.
> > > > > > >
> > > > > > > Regards,
> > > > > > > Timo
> > > > > > >
> > > > > > > Am 16.05.18 um 14:32 schrieb Aljoscha Krettek:
> > > > > > >
> > > > > > > I think this functionality is already there, we just have to
> > expose
> > > > it
> > > > > in
> > > > > > >> the right places: ClusterClient.submitJob() takes a JobGraph,
> > > > JobGraph
> > > > > > has
> > > > > > >> method addJar() for adding jars that need to be in the
> > classloader
> > > > for
> > > > > > >> executing a user program.
> > > > > > >>
> > > > > > >> On 16. May 2018, at 12:34, Fabian Hueske <fh...@gmail.com>
> > > wrote:
> > > > > > >>>
> > > > > > >>> Hi Ted,
> > > > > > >>>
> > > > > > >>> The design doc is in late draft status and proposes support
> for
> > > SQL
> > > > > DDL
> > > > > > >>> statements (CREATE TABLE, CREATE  FUNCTION, etc.).
> > > > > > >>> The question about registering JARs came up because we need a
> > way
> > > > to
> > > > > > >>> distribute JAR files that contain the code of user-defined
> > > > functions.
> > > > > > >>>
> > > > > > >>> The design doc will soon be shared on the dev mailing list to
> > > > gather
> > > > > > >>> feedback from the community.
> > > > > > >>>
> > > > > > >>> Best, Fabian
> > > > > > >>>
> > > > > > >>> 2018-05-16 10:45 GMT+02:00 Ted Yu <yu...@gmail.com>:
> > > > > > >>>
> > > > > > >>> bq. In a design document, Timo mentioned that we can ship
> > > multiple
> > > > > JAR
> > > > > > >>>> files
> > > > > > >>>>
> > > > > > >>>> Mind telling us where the design doc can be retrieved ?
> > > > > > >>>>
> > > > > > >>>> Thanks
> > > > > > >>>>
> > > > > > >>>> On Wed, May 16, 2018 at 1:29 AM, Fabian Hueske <
> > > fhueske@gmail.com
> > > > >
> > > > > > >>>> wrote:
> > > > > > >>>>
> > > > > > >>>> Hi,
> > > > > > >>>>>
> > > > > > >>>>> I'm not sure if we need to modify the existing method.
> > > > > > >>>>> What we need is a bit different from what
> > registerCachedFile()
> > > > > > >>>>> provides.
> > > > > > >>>>> The method ensures that a file is copied to each
> TaskManager
> > > and
> > > > > can
> > > > > > be
> > > > > > >>>>> locally accessed from a function's RuntimeContext.
> > > > > > >>>>> In our case, we don't need to access the file but would
> like
> > to
> > > > > make
> > > > > > >>>>> sure
> > > > > > >>>>> that it is loaded into the class loader.
> > > > > > >>>>> So, we could also just add a method like
> > registerUserJarFile().
> > > > > > >>>>>
> > > > > > >>>>> In a design document, Timo mentioned that we can ship
> > multiple
> > > > JAR
> > > > > > >>>>> files
> > > > > > >>>>> with a job.
> > > > > > >>>>> So, we could also implement the UDF shipping logic by
> loading
> > > the
> > > > > Jar
> > > > > > >>>>> file(s) to the client and distribute them from there.
> > > > > > >>>>> In that case, we would not need to add new method to the
> > > > execution
> > > > > > >>>>> environment.
> > > > > > >>>>>
> > > > > > >>>>> Best,
> > > > > > >>>>> Fabian
> > > > > > >>>>>
> > > > > > >>>>> 2018-05-15 3:50 GMT+02:00 Rong Rong <wa...@gmail.com>:
> > > > > > >>>>>
> > > > > > >>>>> +1. This could be very useful for "dynamic" UDF.
> > > > > > >>>>>>
> > > > > > >>>>>> Just to clarify, if I understand correctly, we are tying
> to
> > > use
> > > > an
> > > > > > >>>>>> ENUM
> > > > > > >>>>>> indicator to
> > > > > > >>>>>> (1) Replace the current Boolean isExecutable flag.
> > > > > > >>>>>> (2) Provide additional information used by
> > > ExecutionEnvironment
> > > > to
> > > > > > >>>>>>
> > > > > > >>>>> decide
> > > > > > >>>>
> > > > > > >>>>> when/where to use the DistributedCached file.
> > > > > > >>>>>>
> > > > > > >>>>>> In this case, DistributedCache.CacheType or
> > > > > > DistributedCache.FileType
> > > > > > >>>>>> sounds more intuitive, what do you think?
> > > > > > >>>>>>
> > > > > > >>>>>> Also, I was wondering is there any other useful
> information
> > > for
> > > > > the
> > > > > > >>>>>>
> > > > > > >>>>> cached
> > > > > > >>>>>
> > > > > > >>>>>> file to be passed to runtime.
> > > > > > >>>>>> If we are just talking about including the library to the
> > > > > > classloader,
> > > > > > >>>>>>
> > > > > > >>>>> can
> > > > > > >>>>>
> > > > > > >>>>>> we directly extend the interface with
> > > > > > >>>>>>
> > > > > > >>>>>> public void registerCachedFile(
> > > > > > >>>>>>     String filePath,
> > > > > > >>>>>>     String name,
> > > > > > >>>>>>     boolean executable,
> > > > > > >>>>>>     boolean includeInClassLoader)
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>> Thanks,
> > > > > > >>>>>> Rong
> > > > > > >>>>>>
> > > > > > >>>>>> On Sun, May 13, 2018 at 11:14 PM, Shuyi Chen <
> > > > suez1224@gmail.com>
> > > > > > >>>>>>
> > > > > > >>>>> wrote:
> > > > > > >>>>
> > > > > > >>>>> Hi Flink devs,
> > > > > > >>>>>>>
> > > > > > >>>>>>> In an effort to support loading external libraries and
> > > creating
> > > > > > UDFs
> > > > > > >>>>>>>
> > > > > > >>>>>> from
> > > > > > >>>>>
> > > > > > >>>>>> external libraries using DDL in Flink SQL, we want to use
> > > > Flink’s
> > > > > > >>>>>>>
> > > > > > >>>>>> Blob
> > > > > > >>>>
> > > > > > >>>>> Server to distribute the external libraries in runtime and
> > load
> > > > > those
> > > > > > >>>>>>> libraries into the user code classloader automatically.
> > > > > > >>>>>>>
> > > > > > >>>>>>> However, the current [Stream]ExecutionEnvironment.
> > > > > > registerCachedFile
> > > > > > >>>>>>> interface limits only to registering executable or
> > > > non-executable
> > > > > > >>>>>>>
> > > > > > >>>>>> blobs.
> > > > > > >>>>>
> > > > > > >>>>>> It’s not possible to tell in runtime if the blob files are
> > > > > libraries
> > > > > > >>>>>>>
> > > > > > >>>>>> and
> > > > > > >>>>>
> > > > > > >>>>>> should be loaded into the user code classloader in
> > > > RuntimeContext.
> > > > > > >>>>>>> Therefore, I want to propose to add an enum called
> > *BlobType*
> > > > > > >>>>>>>
> > > > > > >>>>>> explicitly
> > > > > > >>>>>
> > > > > > >>>>>> to
> > > > > > >>>>>>
> > > > > > >>>>>>> indicate the type of the Blob file being distributed, and
> > the
> > > > > > >>>>>>>
> > > > > > >>>>>> following
> > > > > > >>>>
> > > > > > >>>>> interface in [Stream]ExecutionEnvironment to support it. In
> > > > > general,
> > > > > > >>>>>>>
> > > > > > >>>>>> I
> > > > > > >>>>
> > > > > > >>>>> think the new BlobType information can be used by Flink
> > runtime
> > > > to
> > > > > > >>>>>>> preprocess the Blob files if needed.
> > > > > > >>>>>>>
> > > > > > >>>>>>> */***
> > > > > > >>>>>>> ** Registers a file at the distributed cache under the
> > given
> > > > > name.
> > > > > > >>>>>>>
> > > > > > >>>>>> The
> > > > > > >>>>
> > > > > > >>>>> file
> > > > > > >>>>>>
> > > > > > >>>>>>> will be accessible*
> > > > > > >>>>>>> ** from any user-defined function in the (distributed)
> > > runtime
> > > > > > under
> > > > > > >>>>>>>
> > > > > > >>>>>> a
> > > > > > >>>>
> > > > > > >>>>> local path. Files*
> > > > > > >>>>>>> ** may be local files (as long as all relevant workers
> have
> > > > > access
> > > > > > to
> > > > > > >>>>>>>
> > > > > > >>>>>> it),
> > > > > > >>>>>>
> > > > > > >>>>>>> or files in a distributed file system.*
> > > > > > >>>>>>> ** The runtime will copy the files temporarily to a local
> > > > cache,
> > > > > if
> > > > > > >>>>>>> needed.*
> > > > > > >>>>>>> ***
> > > > > > >>>>>>> ** <p>The {@link org.apache.flink.api.common.
> > > > > > >>>>>>>
> > > > > > >>>>>> functions.RuntimeContext}
> > > > > > >>>>
> > > > > > >>>>> can
> > > > > > >>>>>>
> > > > > > >>>>>>> be obtained inside UDFs via*
> > > > > > >>>>>>> ** {@link
> > > > > > >>>>>>> org.apache.flink.api.common.functions.RichFunction#
> > > > > > >>>>>>>
> > > > > > >>>>>> getRuntimeContext()}
> > > > > > >>>>>
> > > > > > >>>>>> and
> > > > > > >>>>>>> provides access*
> > > > > > >>>>>>> ** {@link org.apache.flink.api.common.ca
> > > > > > >>>>>>> <http://org.apache.flink.api.common.ca
> > >che.DistributedCache}
> > > > > via*
> > > > > > >>>>>>> ** {@link
> > > > > > >>>>>>> org.apache.flink.api.common.functions.RuntimeContext#
> > > > > > >>>>>>> getDistributedCache()}.*
> > > > > > >>>>>>> ***
> > > > > > >>>>>>> ** @param filePath The path of the file, as a URI (e.g.
> > > > > > >>>>>>>
> > > > > > >>>>>> "file:///some/path"
> > > > > > >>>>>>
> > > > > > >>>>>>> or "hdfs://host:port/and/path")*
> > > > > > >>>>>>> ** @param name The name under which the file is
> > registered.*
> > > > > > >>>>>>> ** @param blobType indicating the type of the Blob file*
> > > > > > >>>>>>> **/*
> > > > > > >>>>>>>
> > > > > > >>>>>>> *public void registerCachedFile(String filePath, String
> > name,
> > > > > > >>>>>>> DistributedCache.BlobType blobType) {...}*
> > > > > > >>>>>>>
> > > > > > >>>>>>> Optionally, we can add another interface to register UDF
> > Jars
> > > > > which
> > > > > > >>>>>>>
> > > > > > >>>>>> will
> > > > > > >>>>>
> > > > > > >>>>>> use the interface above to implement.
> > > > > > >>>>>>>
> > > > > > >>>>>>> *public void registerJarFile(String filePath, String
> name)
> > > > {...}*
> > > > > > >>>>>>>
> > > > > > >>>>>>> The existing interface in the following will be marked
> > > > > deprecated:
> > > > > > >>>>>>>
> > > > > > >>>>>>> *public void registerCachedFile(String filePath, String
> > name,
> > > > > > boolean
> > > > > > >>>>>>> executable) {...}*
> > > > > > >>>>>>>
> > > > > > >>>>>>> And the following interface will be implemented using the
> > new
> > > > > > >>>>>>>
> > > > > > >>>>>> interface
> > > > > > >>>>
> > > > > > >>>>> proposed above with a EXECUTABLE BlobType:
> > > > > > >>>>>>>
> > > > > > >>>>>>> *public void registerCachedFile(String filePath, String
> > > name) {
> > > > > ...
> > > > > > >>>>>>>
> > > > > > >>>>>> }*
> > > > > > >>>>
> > > > > > >>>>> Thanks a lot.
> > > > > > >>>>>>> Shuyi
> > > > > > >>>>>>>
> > > > > > >>>>>>> "So you have to trust that the dots will somehow connect
> in
> > > > your
> > > > > > >>>>>>>
> > > > > > >>>>>> future."
> > > > > > >>>>>
> > > > > > >>>>
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > "So you have to trust that the dots will somehow connect in your
> > > future."
> > > > >
> > > >
> > >
> >
> >
> > --
> > "So you have to trust that the dots will somehow connect in your future."
> >
>

Re: [DISCUSS] Adding new interfaces in [Stream]ExecutionEnvironment

Posted by Marvin777 <xy...@gmail.com>.
Hi, Shuyi:

What is the progress of the discussion?  We also look forward to this
feature.
Thanks.

Shuyi Chen <su...@gmail.com> 于2018年6月8日周五 下午3:04写道:

> Thanks a lot for the comments, Till and Fabian.
>
> The RemoteEnvrionment does provide a way to specify jar files at
> construction, but we want the jar files to be specified dynamically in the
> user code, e.g. in a DDL statement, and the jar files might be in a remote
> DFS. As we discussed, I think there are 2 approaches:
>
> 1) add new interface env.registerJarFile(jarFiles...), which ships the JAR
> files using JobGraph.addJar(). In this case, all jars will be loaded by
> default at runtime. This approach will be the same as how SQL client ship
> UDF jars now.
> 2) add new interface env.registerJarFile(name, jarFiles...). It will do
> similar things as env.registerCachedFile(), which will register a set of
> Jar files with a key name, and we can add a new interface in
> RuntimeContext as Fabian suggests, i.e.,
> RuntimeContext.getClassloaderWithJar(<key name>). Now user will be able to
> load the functions in remote jar dynamically using the returned
> ClassLoader.
>
> Comparing the 2 approaches:
>
>    - Approach 1) will be simpler for user to use.
>    - Approach 2) will allow us to use different versions of a class in the
>    same code, and might solve some dependency conflict issues. Also in 2),
> we
>    can load Jars on demand, while in 1) all jars will be loaded by default.
>
> I think we can support both interfaces. On the SQL DDL implementation, both
> will work and approach 2) will be more complicated, but with some nice
> benefit as stated above. However, the implementation choice should be
> transparent to the end user. Also, I am wondering outside of the SQL DDL,
> will these new functionality/interface be helpful in other scenarios?
> Maybe, that will help make the interface better and more generic. Thanks a
> lot.
>
> Shuyi
>
> On Tue, Jun 5, 2018 at 1:47 AM Fabian Hueske <fh...@gmail.com> wrote:
>
> > We could also offer a feature that users can request classloaders with
> > additional jars.
> > This could work as follows:
> >
> > 1) Users register jar files in the ExecutionEnvironment (similar to
> cached
> > files) with a name, e.g., env.registerJarFile("~/myJar.jar", "myName");
> > 2) In a function, the user can request a user classloader with the
> > additional classes, e.g., RuntimeContext.getClassloaderWithJar("myName");
> > This could also support to load multiple jar files in the same
> classloader.
> >
> > IMO, the interesting part of Shuyi's proposal is to be able to
> dynamically
> > load code from remote locations without fetching it to the client first.
> >
> > Best, Fabian
> >
> >
> > 2018-05-29 12:42 GMT+02:00 Till Rohrmann <tr...@apache.org>:
> >
> > > I see Shuyi's point that it would nice to allow adding jar files which
> > > should be part of the user code classloader programmatically. Actually,
> > we
> > > expose this functionality in the `RemoteEnvironment` where you can
> > specify
> > > additional jars which shall be shipped to the cluster in the
> > constructor. I
> > > assume that is exactly the functionality you are looking for. In that
> > > sense, it might be an API inconsistency that we allow it for some cases
> > and
> > > for others not.
> > >
> > > But I could also see that the whole functionality of dynamically
> loading
> > > jars at runtime could also perfectly live in the `UdfSqlOperator`.
> This,
> > of
> > > course, would entail that one has to take care of clean up of the
> > > downloaded resources. But it should be possible to first download the
> > > resources and create a custom URLClassLoader at startup and then use
> this
> > > class loader when calling into the UDF.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Wed, May 16, 2018 at 9:28 PM, Shuyi Chen <su...@gmail.com>
> wrote:
> > >
> > > > Hi Aljoscha, Fabian, Rong, Ted and Timo,
> > > >
> > > > Thanks a lot for the feedback. Let me clarify the usage scenario in a
> > bit
> > > > more detail. The context is that we want to add support for SQL DDL
> to
> > > load
> > > > UDF from external JARs located either in local filesystem or HDFS or
> a
> > > HTTP
> > > > endpoint in Flink SQL. The local FS option is more for debugging
> > purpose
> > > > for user to submit the job jar locally, and the later 2 are for
> > > production
> > > > uses. Below is an example User application with the *CREATE FUNCTION*
> > DDL
> > > > (Note: grammar and interface not finalized yet).
> > > >
> > > > ------------------------------------------------------------
> > > > -------------------------------------
> > > >
> > > >
> > > >
> > > >
> > > > *val env = StreamExecutionEnvironment.getExecutionEnvironmentval
> tEnv =
> > > > TableEnvironment.getTableEnvironment(env)// setup the
> > > DataStream//......*
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > *// register the DataStream under the name
> > > > "OrderAtEnv.registerDataStream("OrderA", orderA, 'user, 'product,
> > > > 'amount)tEnv.sqlDDL(  "create function helloFunc as
> > > > 'com.example.udf.HelloWorld' using jars
> > > > ('hdfs:///users/david/libraries/my-udf-1.0.1-SNAPSHOT.jar')")val
> result
> > > =
> > > > tEnv.sqlQuery(  "SELECT user, helloFunc(product), amount FROM OrderA
> > > WHERE
> > > > amount > 2")result.toAppendStream[Order].print()env.execute()*
> > > > ------------------------------------------------------------
> > > > -------------------------------------
> > > >
> > > > The example application above does the following:
> > > > 1) it registers a DataStream as a Calcite table(
> > > > *org.apache.calcite.schema.Table*) under name "OrderA", so SQL can
> > > > reference the DataStream as table "OrderA".
> > > > 2) it uses the SQL *CREATE FUNCTION* DDL (grammar and interface not
> > > > finalized yet) to create a SQL UDF called *helloFunc* from a JAR
> > located
> > > in
> > > > a remote HDFS path.
> > > > 3) it issues a sql query that uses the *helloFunc* UDF defined above
> > and
> > > > generate a Flink table (*org.apache.flink.table.api.Table*)
> > > > 4) it convert the Flink table back to a DataStream and print it.
> > > >
> > > > Step 1), 3), and 4) are already implemented. To implement 2), we need
> > to
> > > do
> > > > the following to implement the *tEnv.sqlDDL()* function.
> > > >
> > > > a) parse the DDL into a SqlNode to extract the UDF *udfClasspath*,
> UDF
> > > > remote path *udfUrls[]* and UDF SQL name *udfName*.
> > > > b) use the URLClassLoader to load the JARs specified in *udfUrls[]*,
> > and
> > > > register the SQL UDF using the {Batch/Stream/}TableEnvironment
> > > > registerFunction methods using*  udfClasspath* under name *udfName.*
> > > > c) register the JARs *udfUrls[]* through the
> > > {Stream}ExecutionEnvironment,
> > > > so that the JARs can be distributed to all the TaskManagers during
> > > runtime.
> > > >
> > > >
> > > > Since the CREATE FUNCTION DDL is executed within the user
> application,
> > I
> > > > dont think we have access to the ClusterClient at the point when
> > > > *tEnv.sqlDDL()* is executed. Also the JARs can be in a remote
> > filesystem
> > > > (which is the main usage scenarios), so the user can't really prepare
> > the
> > > > jar somehow in advance statically.
> > > >
> > > > For normal user application, I think {Stream}ExecutionEnvironment is
> > the
> > > > right place for the functionality, since it provides methods to
> control
> > > the
> > > > job execution and to interact with the outside world, and also, it
> > > actually
> > > > already does similar things provided through the *registerCachedFile*
> > > > interface.
> > > >
> > > > However, in such case, SQL FUNCTION DDL and SQL client will use 2
> > > different
> > > > routes to register UDF jars, one through *JobGraph.jobConfiguration*
> > and
> > > > the other through *JobGraph.userJars*. So *maybe we can, as Fabian
> > > > suggests, add **registerUserJarFile()/getUserJarFiles() interfaces
> > > > in {Stream}ExecutionEnvironment, which stores the jars internally in
> a
> > > > List, and when generating JobGraph, copy the jars to the JobGraph
> using
> > > > the  {Stream}ExecutionEnvironment.getUserJarFiles() and
> > > > JobGraph.addJar()* (Note,
> > > > streaming and batch implementations might vary). In such case, both
> SQL
> > > > FUNCTION DDL and SQL client will use *JobGraph.userJars* to ship the
> > UDF
> > > > jars.
> > > >
> > > > Hope that clarifies better. What do you guys think? Thanks a lot.
> > > >
> > > > Cheers!
> > > > Shuyi
> > > >
> > > > On Wed, May 16, 2018 at 9:45 AM, Rong Rong <wa...@gmail.com>
> > wrote:
> > > >
> > > > > I think the question here is whether registering Jar files (or
> other
> > > > > executable files) during job submission is sufficient for @shuyi's
> > use
> > > > > case.
> > > > >
> > > > > If I understand correctly regarding the part of dynamic
> distribution
> > of
> > > > the
> > > > > external libraries in runtime. This is used to deal with DDL/DSL
> such
> > > as:
> > > > >     CREATE FUNCTION my_fun FROM url://<some_remote_jar>
> > > > > during execution. Correct me if I am wrong @shuyi, The basic
> > assumption
> > > > > that "we can locate and ship all executable JARs during job
> > submission"
> > > > no
> > > > > longer holds for your use case right?
> > > > >
> > > > > I guess we are missing details here regarding the "distribution of
> > > > external
> > > > > libraries in runtime" part. Maybe you can share more example of
> this
> > > use
> > > > > case. Would this be included in the design doc @Timo?
> > > > >
> > > > > --
> > > > > Rong
> > > > >
> > > > > On Wed, May 16, 2018 at 5:41 AM, Timo Walther <tw...@apache.org>
> > > > wrote:
> > > > >
> > > > > > Yes, we are using the addJar functionionality of the JobGraph as
> > well
> > > > for
> > > > > > the SQL Client.
> > > > > >
> > > > > > I think the execution environment is not the right place to
> specify
> > > > jars.
> > > > > > The location of the jars depends on the submission method. If a
> > local
> > > > > path
> > > > > > is specified in the main() method of a packaged Flink jar, it
> would
> > > not
> > > > > > work when such a program is submitted through the REST API.
> > > > > >
> > > > > > Regards,
> > > > > > Timo
> > > > > >
> > > > > > Am 16.05.18 um 14:32 schrieb Aljoscha Krettek:
> > > > > >
> > > > > > I think this functionality is already there, we just have to
> expose
> > > it
> > > > in
> > > > > >> the right places: ClusterClient.submitJob() takes a JobGraph,
> > > JobGraph
> > > > > has
> > > > > >> method addJar() for adding jars that need to be in the
> classloader
> > > for
> > > > > >> executing a user program.
> > > > > >>
> > > > > >> On 16. May 2018, at 12:34, Fabian Hueske <fh...@gmail.com>
> > wrote:
> > > > > >>>
> > > > > >>> Hi Ted,
> > > > > >>>
> > > > > >>> The design doc is in late draft status and proposes support for
> > SQL
> > > > DDL
> > > > > >>> statements (CREATE TABLE, CREATE  FUNCTION, etc.).
> > > > > >>> The question about registering JARs came up because we need a
> way
> > > to
> > > > > >>> distribute JAR files that contain the code of user-defined
> > > functions.
> > > > > >>>
> > > > > >>> The design doc will soon be shared on the dev mailing list to
> > > gather
> > > > > >>> feedback from the community.
> > > > > >>>
> > > > > >>> Best, Fabian
> > > > > >>>
> > > > > >>> 2018-05-16 10:45 GMT+02:00 Ted Yu <yu...@gmail.com>:
> > > > > >>>
> > > > > >>> bq. In a design document, Timo mentioned that we can ship
> > multiple
> > > > JAR
> > > > > >>>> files
> > > > > >>>>
> > > > > >>>> Mind telling us where the design doc can be retrieved ?
> > > > > >>>>
> > > > > >>>> Thanks
> > > > > >>>>
> > > > > >>>> On Wed, May 16, 2018 at 1:29 AM, Fabian Hueske <
> > fhueske@gmail.com
> > > >
> > > > > >>>> wrote:
> > > > > >>>>
> > > > > >>>> Hi,
> > > > > >>>>>
> > > > > >>>>> I'm not sure if we need to modify the existing method.
> > > > > >>>>> What we need is a bit different from what
> registerCachedFile()
> > > > > >>>>> provides.
> > > > > >>>>> The method ensures that a file is copied to each TaskManager
> > and
> > > > can
> > > > > be
> > > > > >>>>> locally accessed from a function's RuntimeContext.
> > > > > >>>>> In our case, we don't need to access the file but would like
> to
> > > > make
> > > > > >>>>> sure
> > > > > >>>>> that it is loaded into the class loader.
> > > > > >>>>> So, we could also just add a method like
> registerUserJarFile().
> > > > > >>>>>
> > > > > >>>>> In a design document, Timo mentioned that we can ship
> multiple
> > > JAR
> > > > > >>>>> files
> > > > > >>>>> with a job.
> > > > > >>>>> So, we could also implement the UDF shipping logic by loading
> > the
> > > > Jar
> > > > > >>>>> file(s) to the client and distribute them from there.
> > > > > >>>>> In that case, we would not need to add new method to the
> > > execution
> > > > > >>>>> environment.
> > > > > >>>>>
> > > > > >>>>> Best,
> > > > > >>>>> Fabian
> > > > > >>>>>
> > > > > >>>>> 2018-05-15 3:50 GMT+02:00 Rong Rong <wa...@gmail.com>:
> > > > > >>>>>
> > > > > >>>>> +1. This could be very useful for "dynamic" UDF.
> > > > > >>>>>>
> > > > > >>>>>> Just to clarify, if I understand correctly, we are tying to
> > use
> > > an
> > > > > >>>>>> ENUM
> > > > > >>>>>> indicator to
> > > > > >>>>>> (1) Replace the current Boolean isExecutable flag.
> > > > > >>>>>> (2) Provide additional information used by
> > ExecutionEnvironment
> > > to
> > > > > >>>>>>
> > > > > >>>>> decide
> > > > > >>>>
> > > > > >>>>> when/where to use the DistributedCached file.
> > > > > >>>>>>
> > > > > >>>>>> In this case, DistributedCache.CacheType or
> > > > > DistributedCache.FileType
> > > > > >>>>>> sounds more intuitive, what do you think?
> > > > > >>>>>>
> > > > > >>>>>> Also, I was wondering is there any other useful information
> > for
> > > > the
> > > > > >>>>>>
> > > > > >>>>> cached
> > > > > >>>>>
> > > > > >>>>>> file to be passed to runtime.
> > > > > >>>>>> If we are just talking about including the library to the
> > > > > classloader,
> > > > > >>>>>>
> > > > > >>>>> can
> > > > > >>>>>
> > > > > >>>>>> we directly extend the interface with
> > > > > >>>>>>
> > > > > >>>>>> public void registerCachedFile(
> > > > > >>>>>>     String filePath,
> > > > > >>>>>>     String name,
> > > > > >>>>>>     boolean executable,
> > > > > >>>>>>     boolean includeInClassLoader)
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>> Thanks,
> > > > > >>>>>> Rong
> > > > > >>>>>>
> > > > > >>>>>> On Sun, May 13, 2018 at 11:14 PM, Shuyi Chen <
> > > suez1224@gmail.com>
> > > > > >>>>>>
> > > > > >>>>> wrote:
> > > > > >>>>
> > > > > >>>>> Hi Flink devs,
> > > > > >>>>>>>
> > > > > >>>>>>> In an effort to support loading external libraries and
> > creating
> > > > > UDFs
> > > > > >>>>>>>
> > > > > >>>>>> from
> > > > > >>>>>
> > > > > >>>>>> external libraries using DDL in Flink SQL, we want to use
> > > Flink’s
> > > > > >>>>>>>
> > > > > >>>>>> Blob
> > > > > >>>>
> > > > > >>>>> Server to distribute the external libraries in runtime and
> load
> > > > those
> > > > > >>>>>>> libraries into the user code classloader automatically.
> > > > > >>>>>>>
> > > > > >>>>>>> However, the current [Stream]ExecutionEnvironment.
> > > > > registerCachedFile
> > > > > >>>>>>> interface limits only to registering executable or
> > > non-executable
> > > > > >>>>>>>
> > > > > >>>>>> blobs.
> > > > > >>>>>
> > > > > >>>>>> It’s not possible to tell in runtime if the blob files are
> > > > libraries
> > > > > >>>>>>>
> > > > > >>>>>> and
> > > > > >>>>>
> > > > > >>>>>> should be loaded into the user code classloader in
> > > RuntimeContext.
> > > > > >>>>>>> Therefore, I want to propose to add an enum called
> *BlobType*
> > > > > >>>>>>>
> > > > > >>>>>> explicitly
> > > > > >>>>>
> > > > > >>>>>> to
> > > > > >>>>>>
> > > > > >>>>>>> indicate the type of the Blob file being distributed, and
> the
> > > > > >>>>>>>
> > > > > >>>>>> following
> > > > > >>>>
> > > > > >>>>> interface in [Stream]ExecutionEnvironment to support it. In
> > > > general,
> > > > > >>>>>>>
> > > > > >>>>>> I
> > > > > >>>>
> > > > > >>>>> think the new BlobType information can be used by Flink
> runtime
> > > to
> > > > > >>>>>>> preprocess the Blob files if needed.
> > > > > >>>>>>>
> > > > > >>>>>>> */***
> > > > > >>>>>>> ** Registers a file at the distributed cache under the
> given
> > > > name.
> > > > > >>>>>>>
> > > > > >>>>>> The
> > > > > >>>>
> > > > > >>>>> file
> > > > > >>>>>>
> > > > > >>>>>>> will be accessible*
> > > > > >>>>>>> ** from any user-defined function in the (distributed)
> > runtime
> > > > > under
> > > > > >>>>>>>
> > > > > >>>>>> a
> > > > > >>>>
> > > > > >>>>> local path. Files*
> > > > > >>>>>>> ** may be local files (as long as all relevant workers have
> > > > access
> > > > > to
> > > > > >>>>>>>
> > > > > >>>>>> it),
> > > > > >>>>>>
> > > > > >>>>>>> or files in a distributed file system.*
> > > > > >>>>>>> ** The runtime will copy the files temporarily to a local
> > > cache,
> > > > if
> > > > > >>>>>>> needed.*
> > > > > >>>>>>> ***
> > > > > >>>>>>> ** <p>The {@link org.apache.flink.api.common.
> > > > > >>>>>>>
> > > > > >>>>>> functions.RuntimeContext}
> > > > > >>>>
> > > > > >>>>> can
> > > > > >>>>>>
> > > > > >>>>>>> be obtained inside UDFs via*
> > > > > >>>>>>> ** {@link
> > > > > >>>>>>> org.apache.flink.api.common.functions.RichFunction#
> > > > > >>>>>>>
> > > > > >>>>>> getRuntimeContext()}
> > > > > >>>>>
> > > > > >>>>>> and
> > > > > >>>>>>> provides access*
> > > > > >>>>>>> ** {@link org.apache.flink.api.common.ca
> > > > > >>>>>>> <http://org.apache.flink.api.common.ca
> >che.DistributedCache}
> > > > via*
> > > > > >>>>>>> ** {@link
> > > > > >>>>>>> org.apache.flink.api.common.functions.RuntimeContext#
> > > > > >>>>>>> getDistributedCache()}.*
> > > > > >>>>>>> ***
> > > > > >>>>>>> ** @param filePath The path of the file, as a URI (e.g.
> > > > > >>>>>>>
> > > > > >>>>>> "file:///some/path"
> > > > > >>>>>>
> > > > > >>>>>>> or "hdfs://host:port/and/path")*
> > > > > >>>>>>> ** @param name The name under which the file is
> registered.*
> > > > > >>>>>>> ** @param blobType indicating the type of the Blob file*
> > > > > >>>>>>> **/*
> > > > > >>>>>>>
> > > > > >>>>>>> *public void registerCachedFile(String filePath, String
> name,
> > > > > >>>>>>> DistributedCache.BlobType blobType) {...}*
> > > > > >>>>>>>
> > > > > >>>>>>> Optionally, we can add another interface to register UDF
> Jars
> > > > which
> > > > > >>>>>>>
> > > > > >>>>>> will
> > > > > >>>>>
> > > > > >>>>>> use the interface above to implement.
> > > > > >>>>>>>
> > > > > >>>>>>> *public void registerJarFile(String filePath, String name)
> > > {...}*
> > > > > >>>>>>>
> > > > > >>>>>>> The existing interface in the following will be marked
> > > > deprecated:
> > > > > >>>>>>>
> > > > > >>>>>>> *public void registerCachedFile(String filePath, String
> name,
> > > > > boolean
> > > > > >>>>>>> executable) {...}*
> > > > > >>>>>>>
> > > > > >>>>>>> And the following interface will be implemented using the
> new
> > > > > >>>>>>>
> > > > > >>>>>> interface
> > > > > >>>>
> > > > > >>>>> proposed above with a EXECUTABLE BlobType:
> > > > > >>>>>>>
> > > > > >>>>>>> *public void registerCachedFile(String filePath, String
> > name) {
> > > > ...
> > > > > >>>>>>>
> > > > > >>>>>> }*
> > > > > >>>>
> > > > > >>>>> Thanks a lot.
> > > > > >>>>>>> Shuyi
> > > > > >>>>>>>
> > > > > >>>>>>> "So you have to trust that the dots will somehow connect in
> > > your
> > > > > >>>>>>>
> > > > > >>>>>> future."
> > > > > >>>>>
> > > > > >>>>
> > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > "So you have to trust that the dots will somehow connect in your
> > future."
> > > >
> > >
> >
>
>
> --
> "So you have to trust that the dots will somehow connect in your future."
>

Re: [DISCUSS] Adding new interfaces in [Stream]ExecutionEnvironment

Posted by Shuyi Chen <su...@gmail.com>.
Thanks a lot for the comments, Till and Fabian.

The RemoteEnvrionment does provide a way to specify jar files at
construction, but we want the jar files to be specified dynamically in the
user code, e.g. in a DDL statement, and the jar files might be in a remote
DFS. As we discussed, I think there are 2 approaches:

1) add new interface env.registerJarFile(jarFiles...), which ships the JAR
files using JobGraph.addJar(). In this case, all jars will be loaded by
default at runtime. This approach will be the same as how SQL client ship
UDF jars now.
2) add new interface env.registerJarFile(name, jarFiles...). It will do
similar things as env.registerCachedFile(), which will register a set of
Jar files with a key name, and we can add a new interface in
RuntimeContext as Fabian suggests, i.e.,
RuntimeContext.getClassloaderWithJar(<key name>). Now user will be able to
load the functions in remote jar dynamically using the returned ClassLoader.

Comparing the 2 approaches:

   - Approach 1) will be simpler for user to use.
   - Approach 2) will allow us to use different versions of a class in the
   same code, and might solve some dependency conflict issues. Also in 2), we
   can load Jars on demand, while in 1) all jars will be loaded by default.

I think we can support both interfaces. On the SQL DDL implementation, both
will work and approach 2) will be more complicated, but with some nice
benefit as stated above. However, the implementation choice should be
transparent to the end user. Also, I am wondering outside of the SQL DDL,
will these new functionality/interface be helpful in other scenarios?
Maybe, that will help make the interface better and more generic. Thanks a
lot.

Shuyi

On Tue, Jun 5, 2018 at 1:47 AM Fabian Hueske <fh...@gmail.com> wrote:

> We could also offer a feature that users can request classloaders with
> additional jars.
> This could work as follows:
>
> 1) Users register jar files in the ExecutionEnvironment (similar to cached
> files) with a name, e.g., env.registerJarFile("~/myJar.jar", "myName");
> 2) In a function, the user can request a user classloader with the
> additional classes, e.g., RuntimeContext.getClassloaderWithJar("myName");
> This could also support to load multiple jar files in the same classloader.
>
> IMO, the interesting part of Shuyi's proposal is to be able to dynamically
> load code from remote locations without fetching it to the client first.
>
> Best, Fabian
>
>
> 2018-05-29 12:42 GMT+02:00 Till Rohrmann <tr...@apache.org>:
>
> > I see Shuyi's point that it would nice to allow adding jar files which
> > should be part of the user code classloader programmatically. Actually,
> we
> > expose this functionality in the `RemoteEnvironment` where you can
> specify
> > additional jars which shall be shipped to the cluster in the
> constructor. I
> > assume that is exactly the functionality you are looking for. In that
> > sense, it might be an API inconsistency that we allow it for some cases
> and
> > for others not.
> >
> > But I could also see that the whole functionality of dynamically loading
> > jars at runtime could also perfectly live in the `UdfSqlOperator`. This,
> of
> > course, would entail that one has to take care of clean up of the
> > downloaded resources. But it should be possible to first download the
> > resources and create a custom URLClassLoader at startup and then use this
> > class loader when calling into the UDF.
> >
> > Cheers,
> > Till
> >
> > On Wed, May 16, 2018 at 9:28 PM, Shuyi Chen <su...@gmail.com> wrote:
> >
> > > Hi Aljoscha, Fabian, Rong, Ted and Timo,
> > >
> > > Thanks a lot for the feedback. Let me clarify the usage scenario in a
> bit
> > > more detail. The context is that we want to add support for SQL DDL to
> > load
> > > UDF from external JARs located either in local filesystem or HDFS or a
> > HTTP
> > > endpoint in Flink SQL. The local FS option is more for debugging
> purpose
> > > for user to submit the job jar locally, and the later 2 are for
> > production
> > > uses. Below is an example User application with the *CREATE FUNCTION*
> DDL
> > > (Note: grammar and interface not finalized yet).
> > >
> > > ------------------------------------------------------------
> > > -------------------------------------
> > >
> > >
> > >
> > >
> > > *val env = StreamExecutionEnvironment.getExecutionEnvironmentval tEnv =
> > > TableEnvironment.getTableEnvironment(env)// setup the
> > DataStream//......*
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > *// register the DataStream under the name
> > > "OrderAtEnv.registerDataStream("OrderA", orderA, 'user, 'product,
> > > 'amount)tEnv.sqlDDL(  "create function helloFunc as
> > > 'com.example.udf.HelloWorld' using jars
> > > ('hdfs:///users/david/libraries/my-udf-1.0.1-SNAPSHOT.jar')")val result
> > =
> > > tEnv.sqlQuery(  "SELECT user, helloFunc(product), amount FROM OrderA
> > WHERE
> > > amount > 2")result.toAppendStream[Order].print()env.execute()*
> > > ------------------------------------------------------------
> > > -------------------------------------
> > >
> > > The example application above does the following:
> > > 1) it registers a DataStream as a Calcite table(
> > > *org.apache.calcite.schema.Table*) under name "OrderA", so SQL can
> > > reference the DataStream as table "OrderA".
> > > 2) it uses the SQL *CREATE FUNCTION* DDL (grammar and interface not
> > > finalized yet) to create a SQL UDF called *helloFunc* from a JAR
> located
> > in
> > > a remote HDFS path.
> > > 3) it issues a sql query that uses the *helloFunc* UDF defined above
> and
> > > generate a Flink table (*org.apache.flink.table.api.Table*)
> > > 4) it convert the Flink table back to a DataStream and print it.
> > >
> > > Step 1), 3), and 4) are already implemented. To implement 2), we need
> to
> > do
> > > the following to implement the *tEnv.sqlDDL()* function.
> > >
> > > a) parse the DDL into a SqlNode to extract the UDF *udfClasspath*, UDF
> > > remote path *udfUrls[]* and UDF SQL name *udfName*.
> > > b) use the URLClassLoader to load the JARs specified in *udfUrls[]*,
> and
> > > register the SQL UDF using the {Batch/Stream/}TableEnvironment
> > > registerFunction methods using*  udfClasspath* under name *udfName.*
> > > c) register the JARs *udfUrls[]* through the
> > {Stream}ExecutionEnvironment,
> > > so that the JARs can be distributed to all the TaskManagers during
> > runtime.
> > >
> > >
> > > Since the CREATE FUNCTION DDL is executed within the user application,
> I
> > > dont think we have access to the ClusterClient at the point when
> > > *tEnv.sqlDDL()* is executed. Also the JARs can be in a remote
> filesystem
> > > (which is the main usage scenarios), so the user can't really prepare
> the
> > > jar somehow in advance statically.
> > >
> > > For normal user application, I think {Stream}ExecutionEnvironment is
> the
> > > right place for the functionality, since it provides methods to control
> > the
> > > job execution and to interact with the outside world, and also, it
> > actually
> > > already does similar things provided through the *registerCachedFile*
> > > interface.
> > >
> > > However, in such case, SQL FUNCTION DDL and SQL client will use 2
> > different
> > > routes to register UDF jars, one through *JobGraph.jobConfiguration*
> and
> > > the other through *JobGraph.userJars*. So *maybe we can, as Fabian
> > > suggests, add **registerUserJarFile()/getUserJarFiles() interfaces
> > > in {Stream}ExecutionEnvironment, which stores the jars internally in a
> > > List, and when generating JobGraph, copy the jars to the JobGraph using
> > > the  {Stream}ExecutionEnvironment.getUserJarFiles() and
> > > JobGraph.addJar()* (Note,
> > > streaming and batch implementations might vary). In such case, both SQL
> > > FUNCTION DDL and SQL client will use *JobGraph.userJars* to ship the
> UDF
> > > jars.
> > >
> > > Hope that clarifies better. What do you guys think? Thanks a lot.
> > >
> > > Cheers!
> > > Shuyi
> > >
> > > On Wed, May 16, 2018 at 9:45 AM, Rong Rong <wa...@gmail.com>
> wrote:
> > >
> > > > I think the question here is whether registering Jar files (or other
> > > > executable files) during job submission is sufficient for @shuyi's
> use
> > > > case.
> > > >
> > > > If I understand correctly regarding the part of dynamic distribution
> of
> > > the
> > > > external libraries in runtime. This is used to deal with DDL/DSL such
> > as:
> > > >     CREATE FUNCTION my_fun FROM url://<some_remote_jar>
> > > > during execution. Correct me if I am wrong @shuyi, The basic
> assumption
> > > > that "we can locate and ship all executable JARs during job
> submission"
> > > no
> > > > longer holds for your use case right?
> > > >
> > > > I guess we are missing details here regarding the "distribution of
> > > external
> > > > libraries in runtime" part. Maybe you can share more example of this
> > use
> > > > case. Would this be included in the design doc @Timo?
> > > >
> > > > --
> > > > Rong
> > > >
> > > > On Wed, May 16, 2018 at 5:41 AM, Timo Walther <tw...@apache.org>
> > > wrote:
> > > >
> > > > > Yes, we are using the addJar functionionality of the JobGraph as
> well
> > > for
> > > > > the SQL Client.
> > > > >
> > > > > I think the execution environment is not the right place to specify
> > > jars.
> > > > > The location of the jars depends on the submission method. If a
> local
> > > > path
> > > > > is specified in the main() method of a packaged Flink jar, it would
> > not
> > > > > work when such a program is submitted through the REST API.
> > > > >
> > > > > Regards,
> > > > > Timo
> > > > >
> > > > > Am 16.05.18 um 14:32 schrieb Aljoscha Krettek:
> > > > >
> > > > > I think this functionality is already there, we just have to expose
> > it
> > > in
> > > > >> the right places: ClusterClient.submitJob() takes a JobGraph,
> > JobGraph
> > > > has
> > > > >> method addJar() for adding jars that need to be in the classloader
> > for
> > > > >> executing a user program.
> > > > >>
> > > > >> On 16. May 2018, at 12:34, Fabian Hueske <fh...@gmail.com>
> wrote:
> > > > >>>
> > > > >>> Hi Ted,
> > > > >>>
> > > > >>> The design doc is in late draft status and proposes support for
> SQL
> > > DDL
> > > > >>> statements (CREATE TABLE, CREATE  FUNCTION, etc.).
> > > > >>> The question about registering JARs came up because we need a way
> > to
> > > > >>> distribute JAR files that contain the code of user-defined
> > functions.
> > > > >>>
> > > > >>> The design doc will soon be shared on the dev mailing list to
> > gather
> > > > >>> feedback from the community.
> > > > >>>
> > > > >>> Best, Fabian
> > > > >>>
> > > > >>> 2018-05-16 10:45 GMT+02:00 Ted Yu <yu...@gmail.com>:
> > > > >>>
> > > > >>> bq. In a design document, Timo mentioned that we can ship
> multiple
> > > JAR
> > > > >>>> files
> > > > >>>>
> > > > >>>> Mind telling us where the design doc can be retrieved ?
> > > > >>>>
> > > > >>>> Thanks
> > > > >>>>
> > > > >>>> On Wed, May 16, 2018 at 1:29 AM, Fabian Hueske <
> fhueske@gmail.com
> > >
> > > > >>>> wrote:
> > > > >>>>
> > > > >>>> Hi,
> > > > >>>>>
> > > > >>>>> I'm not sure if we need to modify the existing method.
> > > > >>>>> What we need is a bit different from what registerCachedFile()
> > > > >>>>> provides.
> > > > >>>>> The method ensures that a file is copied to each TaskManager
> and
> > > can
> > > > be
> > > > >>>>> locally accessed from a function's RuntimeContext.
> > > > >>>>> In our case, we don't need to access the file but would like to
> > > make
> > > > >>>>> sure
> > > > >>>>> that it is loaded into the class loader.
> > > > >>>>> So, we could also just add a method like registerUserJarFile().
> > > > >>>>>
> > > > >>>>> In a design document, Timo mentioned that we can ship multiple
> > JAR
> > > > >>>>> files
> > > > >>>>> with a job.
> > > > >>>>> So, we could also implement the UDF shipping logic by loading
> the
> > > Jar
> > > > >>>>> file(s) to the client and distribute them from there.
> > > > >>>>> In that case, we would not need to add new method to the
> > execution
> > > > >>>>> environment.
> > > > >>>>>
> > > > >>>>> Best,
> > > > >>>>> Fabian
> > > > >>>>>
> > > > >>>>> 2018-05-15 3:50 GMT+02:00 Rong Rong <wa...@gmail.com>:
> > > > >>>>>
> > > > >>>>> +1. This could be very useful for "dynamic" UDF.
> > > > >>>>>>
> > > > >>>>>> Just to clarify, if I understand correctly, we are tying to
> use
> > an
> > > > >>>>>> ENUM
> > > > >>>>>> indicator to
> > > > >>>>>> (1) Replace the current Boolean isExecutable flag.
> > > > >>>>>> (2) Provide additional information used by
> ExecutionEnvironment
> > to
> > > > >>>>>>
> > > > >>>>> decide
> > > > >>>>
> > > > >>>>> when/where to use the DistributedCached file.
> > > > >>>>>>
> > > > >>>>>> In this case, DistributedCache.CacheType or
> > > > DistributedCache.FileType
> > > > >>>>>> sounds more intuitive, what do you think?
> > > > >>>>>>
> > > > >>>>>> Also, I was wondering is there any other useful information
> for
> > > the
> > > > >>>>>>
> > > > >>>>> cached
> > > > >>>>>
> > > > >>>>>> file to be passed to runtime.
> > > > >>>>>> If we are just talking about including the library to the
> > > > classloader,
> > > > >>>>>>
> > > > >>>>> can
> > > > >>>>>
> > > > >>>>>> we directly extend the interface with
> > > > >>>>>>
> > > > >>>>>> public void registerCachedFile(
> > > > >>>>>>     String filePath,
> > > > >>>>>>     String name,
> > > > >>>>>>     boolean executable,
> > > > >>>>>>     boolean includeInClassLoader)
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>> Thanks,
> > > > >>>>>> Rong
> > > > >>>>>>
> > > > >>>>>> On Sun, May 13, 2018 at 11:14 PM, Shuyi Chen <
> > suez1224@gmail.com>
> > > > >>>>>>
> > > > >>>>> wrote:
> > > > >>>>
> > > > >>>>> Hi Flink devs,
> > > > >>>>>>>
> > > > >>>>>>> In an effort to support loading external libraries and
> creating
> > > > UDFs
> > > > >>>>>>>
> > > > >>>>>> from
> > > > >>>>>
> > > > >>>>>> external libraries using DDL in Flink SQL, we want to use
> > Flink’s
> > > > >>>>>>>
> > > > >>>>>> Blob
> > > > >>>>
> > > > >>>>> Server to distribute the external libraries in runtime and load
> > > those
> > > > >>>>>>> libraries into the user code classloader automatically.
> > > > >>>>>>>
> > > > >>>>>>> However, the current [Stream]ExecutionEnvironment.
> > > > registerCachedFile
> > > > >>>>>>> interface limits only to registering executable or
> > non-executable
> > > > >>>>>>>
> > > > >>>>>> blobs.
> > > > >>>>>
> > > > >>>>>> It’s not possible to tell in runtime if the blob files are
> > > libraries
> > > > >>>>>>>
> > > > >>>>>> and
> > > > >>>>>
> > > > >>>>>> should be loaded into the user code classloader in
> > RuntimeContext.
> > > > >>>>>>> Therefore, I want to propose to add an enum called *BlobType*
> > > > >>>>>>>
> > > > >>>>>> explicitly
> > > > >>>>>
> > > > >>>>>> to
> > > > >>>>>>
> > > > >>>>>>> indicate the type of the Blob file being distributed, and the
> > > > >>>>>>>
> > > > >>>>>> following
> > > > >>>>
> > > > >>>>> interface in [Stream]ExecutionEnvironment to support it. In
> > > general,
> > > > >>>>>>>
> > > > >>>>>> I
> > > > >>>>
> > > > >>>>> think the new BlobType information can be used by Flink runtime
> > to
> > > > >>>>>>> preprocess the Blob files if needed.
> > > > >>>>>>>
> > > > >>>>>>> */***
> > > > >>>>>>> ** Registers a file at the distributed cache under the given
> > > name.
> > > > >>>>>>>
> > > > >>>>>> The
> > > > >>>>
> > > > >>>>> file
> > > > >>>>>>
> > > > >>>>>>> will be accessible*
> > > > >>>>>>> ** from any user-defined function in the (distributed)
> runtime
> > > > under
> > > > >>>>>>>
> > > > >>>>>> a
> > > > >>>>
> > > > >>>>> local path. Files*
> > > > >>>>>>> ** may be local files (as long as all relevant workers have
> > > access
> > > > to
> > > > >>>>>>>
> > > > >>>>>> it),
> > > > >>>>>>
> > > > >>>>>>> or files in a distributed file system.*
> > > > >>>>>>> ** The runtime will copy the files temporarily to a local
> > cache,
> > > if
> > > > >>>>>>> needed.*
> > > > >>>>>>> ***
> > > > >>>>>>> ** <p>The {@link org.apache.flink.api.common.
> > > > >>>>>>>
> > > > >>>>>> functions.RuntimeContext}
> > > > >>>>
> > > > >>>>> can
> > > > >>>>>>
> > > > >>>>>>> be obtained inside UDFs via*
> > > > >>>>>>> ** {@link
> > > > >>>>>>> org.apache.flink.api.common.functions.RichFunction#
> > > > >>>>>>>
> > > > >>>>>> getRuntimeContext()}
> > > > >>>>>
> > > > >>>>>> and
> > > > >>>>>>> provides access*
> > > > >>>>>>> ** {@link org.apache.flink.api.common.ca
> > > > >>>>>>> <http://org.apache.flink.api.common.ca>che.DistributedCache}
> > > via*
> > > > >>>>>>> ** {@link
> > > > >>>>>>> org.apache.flink.api.common.functions.RuntimeContext#
> > > > >>>>>>> getDistributedCache()}.*
> > > > >>>>>>> ***
> > > > >>>>>>> ** @param filePath The path of the file, as a URI (e.g.
> > > > >>>>>>>
> > > > >>>>>> "file:///some/path"
> > > > >>>>>>
> > > > >>>>>>> or "hdfs://host:port/and/path")*
> > > > >>>>>>> ** @param name The name under which the file is registered.*
> > > > >>>>>>> ** @param blobType indicating the type of the Blob file*
> > > > >>>>>>> **/*
> > > > >>>>>>>
> > > > >>>>>>> *public void registerCachedFile(String filePath, String name,
> > > > >>>>>>> DistributedCache.BlobType blobType) {...}*
> > > > >>>>>>>
> > > > >>>>>>> Optionally, we can add another interface to register UDF Jars
> > > which
> > > > >>>>>>>
> > > > >>>>>> will
> > > > >>>>>
> > > > >>>>>> use the interface above to implement.
> > > > >>>>>>>
> > > > >>>>>>> *public void registerJarFile(String filePath, String name)
> > {...}*
> > > > >>>>>>>
> > > > >>>>>>> The existing interface in the following will be marked
> > > deprecated:
> > > > >>>>>>>
> > > > >>>>>>> *public void registerCachedFile(String filePath, String name,
> > > > boolean
> > > > >>>>>>> executable) {...}*
> > > > >>>>>>>
> > > > >>>>>>> And the following interface will be implemented using the new
> > > > >>>>>>>
> > > > >>>>>> interface
> > > > >>>>
> > > > >>>>> proposed above with a EXECUTABLE BlobType:
> > > > >>>>>>>
> > > > >>>>>>> *public void registerCachedFile(String filePath, String
> name) {
> > > ...
> > > > >>>>>>>
> > > > >>>>>> }*
> > > > >>>>
> > > > >>>>> Thanks a lot.
> > > > >>>>>>> Shuyi
> > > > >>>>>>>
> > > > >>>>>>> "So you have to trust that the dots will somehow connect in
> > your
> > > > >>>>>>>
> > > > >>>>>> future."
> > > > >>>>>
> > > > >>>>
> > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > "So you have to trust that the dots will somehow connect in your
> future."
> > >
> >
>


-- 
"So you have to trust that the dots will somehow connect in your future."

Re: [DISCUSS] Adding new interfaces in [Stream]ExecutionEnvironment

Posted by Fabian Hueske <fh...@gmail.com>.
We could also offer a feature that users can request classloaders with
additional jars.
This could work as follows:

1) Users register jar files in the ExecutionEnvironment (similar to cached
files) with a name, e.g., env.registerJarFile("~/myJar.jar", "myName");
2) In a function, the user can request a user classloader with the
additional classes, e.g., RuntimeContext.getClassloaderWithJar("myName");
This could also support to load multiple jar files in the same classloader.

IMO, the interesting part of Shuyi's proposal is to be able to dynamically
load code from remote locations without fetching it to the client first.

Best, Fabian


2018-05-29 12:42 GMT+02:00 Till Rohrmann <tr...@apache.org>:

> I see Shuyi's point that it would nice to allow adding jar files which
> should be part of the user code classloader programmatically. Actually, we
> expose this functionality in the `RemoteEnvironment` where you can specify
> additional jars which shall be shipped to the cluster in the constructor. I
> assume that is exactly the functionality you are looking for. In that
> sense, it might be an API inconsistency that we allow it for some cases and
> for others not.
>
> But I could also see that the whole functionality of dynamically loading
> jars at runtime could also perfectly live in the `UdfSqlOperator`. This, of
> course, would entail that one has to take care of clean up of the
> downloaded resources. But it should be possible to first download the
> resources and create a custom URLClassLoader at startup and then use this
> class loader when calling into the UDF.
>
> Cheers,
> Till
>
> On Wed, May 16, 2018 at 9:28 PM, Shuyi Chen <su...@gmail.com> wrote:
>
> > Hi Aljoscha, Fabian, Rong, Ted and Timo,
> >
> > Thanks a lot for the feedback. Let me clarify the usage scenario in a bit
> > more detail. The context is that we want to add support for SQL DDL to
> load
> > UDF from external JARs located either in local filesystem or HDFS or a
> HTTP
> > endpoint in Flink SQL. The local FS option is more for debugging purpose
> > for user to submit the job jar locally, and the later 2 are for
> production
> > uses. Below is an example User application with the *CREATE FUNCTION* DDL
> > (Note: grammar and interface not finalized yet).
> >
> > ------------------------------------------------------------
> > -------------------------------------
> >
> >
> >
> >
> > *val env = StreamExecutionEnvironment.getExecutionEnvironmentval tEnv =
> > TableEnvironment.getTableEnvironment(env)// setup the
> DataStream//......*
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > *// register the DataStream under the name
> > "OrderAtEnv.registerDataStream("OrderA", orderA, 'user, 'product,
> > 'amount)tEnv.sqlDDL(  "create function helloFunc as
> > 'com.example.udf.HelloWorld' using jars
> > ('hdfs:///users/david/libraries/my-udf-1.0.1-SNAPSHOT.jar')")val result
> =
> > tEnv.sqlQuery(  "SELECT user, helloFunc(product), amount FROM OrderA
> WHERE
> > amount > 2")result.toAppendStream[Order].print()env.execute()*
> > ------------------------------------------------------------
> > -------------------------------------
> >
> > The example application above does the following:
> > 1) it registers a DataStream as a Calcite table(
> > *org.apache.calcite.schema.Table*) under name "OrderA", so SQL can
> > reference the DataStream as table "OrderA".
> > 2) it uses the SQL *CREATE FUNCTION* DDL (grammar and interface not
> > finalized yet) to create a SQL UDF called *helloFunc* from a JAR located
> in
> > a remote HDFS path.
> > 3) it issues a sql query that uses the *helloFunc* UDF defined above and
> > generate a Flink table (*org.apache.flink.table.api.Table*)
> > 4) it convert the Flink table back to a DataStream and print it.
> >
> > Step 1), 3), and 4) are already implemented. To implement 2), we need to
> do
> > the following to implement the *tEnv.sqlDDL()* function.
> >
> > a) parse the DDL into a SqlNode to extract the UDF *udfClasspath*, UDF
> > remote path *udfUrls[]* and UDF SQL name *udfName*.
> > b) use the URLClassLoader to load the JARs specified in *udfUrls[]*, and
> > register the SQL UDF using the {Batch/Stream/}TableEnvironment
> > registerFunction methods using*  udfClasspath* under name *udfName.*
> > c) register the JARs *udfUrls[]* through the
> {Stream}ExecutionEnvironment,
> > so that the JARs can be distributed to all the TaskManagers during
> runtime.
> >
> >
> > Since the CREATE FUNCTION DDL is executed within the user application, I
> > dont think we have access to the ClusterClient at the point when
> > *tEnv.sqlDDL()* is executed. Also the JARs can be in a remote filesystem
> > (which is the main usage scenarios), so the user can't really prepare the
> > jar somehow in advance statically.
> >
> > For normal user application, I think {Stream}ExecutionEnvironment is the
> > right place for the functionality, since it provides methods to control
> the
> > job execution and to interact with the outside world, and also, it
> actually
> > already does similar things provided through the *registerCachedFile*
> > interface.
> >
> > However, in such case, SQL FUNCTION DDL and SQL client will use 2
> different
> > routes to register UDF jars, one through *JobGraph.jobConfiguration* and
> > the other through *JobGraph.userJars*. So *maybe we can, as Fabian
> > suggests, add **registerUserJarFile()/getUserJarFiles() interfaces
> > in {Stream}ExecutionEnvironment, which stores the jars internally in a
> > List, and when generating JobGraph, copy the jars to the JobGraph using
> > the  {Stream}ExecutionEnvironment.getUserJarFiles() and
> > JobGraph.addJar()* (Note,
> > streaming and batch implementations might vary). In such case, both SQL
> > FUNCTION DDL and SQL client will use *JobGraph.userJars* to ship the UDF
> > jars.
> >
> > Hope that clarifies better. What do you guys think? Thanks a lot.
> >
> > Cheers!
> > Shuyi
> >
> > On Wed, May 16, 2018 at 9:45 AM, Rong Rong <wa...@gmail.com> wrote:
> >
> > > I think the question here is whether registering Jar files (or other
> > > executable files) during job submission is sufficient for @shuyi's use
> > > case.
> > >
> > > If I understand correctly regarding the part of dynamic distribution of
> > the
> > > external libraries in runtime. This is used to deal with DDL/DSL such
> as:
> > >     CREATE FUNCTION my_fun FROM url://<some_remote_jar>
> > > during execution. Correct me if I am wrong @shuyi, The basic assumption
> > > that "we can locate and ship all executable JARs during job submission"
> > no
> > > longer holds for your use case right?
> > >
> > > I guess we are missing details here regarding the "distribution of
> > external
> > > libraries in runtime" part. Maybe you can share more example of this
> use
> > > case. Would this be included in the design doc @Timo?
> > >
> > > --
> > > Rong
> > >
> > > On Wed, May 16, 2018 at 5:41 AM, Timo Walther <tw...@apache.org>
> > wrote:
> > >
> > > > Yes, we are using the addJar functionionality of the JobGraph as well
> > for
> > > > the SQL Client.
> > > >
> > > > I think the execution environment is not the right place to specify
> > jars.
> > > > The location of the jars depends on the submission method. If a local
> > > path
> > > > is specified in the main() method of a packaged Flink jar, it would
> not
> > > > work when such a program is submitted through the REST API.
> > > >
> > > > Regards,
> > > > Timo
> > > >
> > > > Am 16.05.18 um 14:32 schrieb Aljoscha Krettek:
> > > >
> > > > I think this functionality is already there, we just have to expose
> it
> > in
> > > >> the right places: ClusterClient.submitJob() takes a JobGraph,
> JobGraph
> > > has
> > > >> method addJar() for adding jars that need to be in the classloader
> for
> > > >> executing a user program.
> > > >>
> > > >> On 16. May 2018, at 12:34, Fabian Hueske <fh...@gmail.com> wrote:
> > > >>>
> > > >>> Hi Ted,
> > > >>>
> > > >>> The design doc is in late draft status and proposes support for SQL
> > DDL
> > > >>> statements (CREATE TABLE, CREATE  FUNCTION, etc.).
> > > >>> The question about registering JARs came up because we need a way
> to
> > > >>> distribute JAR files that contain the code of user-defined
> functions.
> > > >>>
> > > >>> The design doc will soon be shared on the dev mailing list to
> gather
> > > >>> feedback from the community.
> > > >>>
> > > >>> Best, Fabian
> > > >>>
> > > >>> 2018-05-16 10:45 GMT+02:00 Ted Yu <yu...@gmail.com>:
> > > >>>
> > > >>> bq. In a design document, Timo mentioned that we can ship multiple
> > JAR
> > > >>>> files
> > > >>>>
> > > >>>> Mind telling us where the design doc can be retrieved ?
> > > >>>>
> > > >>>> Thanks
> > > >>>>
> > > >>>> On Wed, May 16, 2018 at 1:29 AM, Fabian Hueske <fhueske@gmail.com
> >
> > > >>>> wrote:
> > > >>>>
> > > >>>> Hi,
> > > >>>>>
> > > >>>>> I'm not sure if we need to modify the existing method.
> > > >>>>> What we need is a bit different from what registerCachedFile()
> > > >>>>> provides.
> > > >>>>> The method ensures that a file is copied to each TaskManager and
> > can
> > > be
> > > >>>>> locally accessed from a function's RuntimeContext.
> > > >>>>> In our case, we don't need to access the file but would like to
> > make
> > > >>>>> sure
> > > >>>>> that it is loaded into the class loader.
> > > >>>>> So, we could also just add a method like registerUserJarFile().
> > > >>>>>
> > > >>>>> In a design document, Timo mentioned that we can ship multiple
> JAR
> > > >>>>> files
> > > >>>>> with a job.
> > > >>>>> So, we could also implement the UDF shipping logic by loading the
> > Jar
> > > >>>>> file(s) to the client and distribute them from there.
> > > >>>>> In that case, we would not need to add new method to the
> execution
> > > >>>>> environment.
> > > >>>>>
> > > >>>>> Best,
> > > >>>>> Fabian
> > > >>>>>
> > > >>>>> 2018-05-15 3:50 GMT+02:00 Rong Rong <wa...@gmail.com>:
> > > >>>>>
> > > >>>>> +1. This could be very useful for "dynamic" UDF.
> > > >>>>>>
> > > >>>>>> Just to clarify, if I understand correctly, we are tying to use
> an
> > > >>>>>> ENUM
> > > >>>>>> indicator to
> > > >>>>>> (1) Replace the current Boolean isExecutable flag.
> > > >>>>>> (2) Provide additional information used by ExecutionEnvironment
> to
> > > >>>>>>
> > > >>>>> decide
> > > >>>>
> > > >>>>> when/where to use the DistributedCached file.
> > > >>>>>>
> > > >>>>>> In this case, DistributedCache.CacheType or
> > > DistributedCache.FileType
> > > >>>>>> sounds more intuitive, what do you think?
> > > >>>>>>
> > > >>>>>> Also, I was wondering is there any other useful information for
> > the
> > > >>>>>>
> > > >>>>> cached
> > > >>>>>
> > > >>>>>> file to be passed to runtime.
> > > >>>>>> If we are just talking about including the library to the
> > > classloader,
> > > >>>>>>
> > > >>>>> can
> > > >>>>>
> > > >>>>>> we directly extend the interface with
> > > >>>>>>
> > > >>>>>> public void registerCachedFile(
> > > >>>>>>     String filePath,
> > > >>>>>>     String name,
> > > >>>>>>     boolean executable,
> > > >>>>>>     boolean includeInClassLoader)
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> Thanks,
> > > >>>>>> Rong
> > > >>>>>>
> > > >>>>>> On Sun, May 13, 2018 at 11:14 PM, Shuyi Chen <
> suez1224@gmail.com>
> > > >>>>>>
> > > >>>>> wrote:
> > > >>>>
> > > >>>>> Hi Flink devs,
> > > >>>>>>>
> > > >>>>>>> In an effort to support loading external libraries and creating
> > > UDFs
> > > >>>>>>>
> > > >>>>>> from
> > > >>>>>
> > > >>>>>> external libraries using DDL in Flink SQL, we want to use
> Flink’s
> > > >>>>>>>
> > > >>>>>> Blob
> > > >>>>
> > > >>>>> Server to distribute the external libraries in runtime and load
> > those
> > > >>>>>>> libraries into the user code classloader automatically.
> > > >>>>>>>
> > > >>>>>>> However, the current [Stream]ExecutionEnvironment.
> > > registerCachedFile
> > > >>>>>>> interface limits only to registering executable or
> non-executable
> > > >>>>>>>
> > > >>>>>> blobs.
> > > >>>>>
> > > >>>>>> It’s not possible to tell in runtime if the blob files are
> > libraries
> > > >>>>>>>
> > > >>>>>> and
> > > >>>>>
> > > >>>>>> should be loaded into the user code classloader in
> RuntimeContext.
> > > >>>>>>> Therefore, I want to propose to add an enum called *BlobType*
> > > >>>>>>>
> > > >>>>>> explicitly
> > > >>>>>
> > > >>>>>> to
> > > >>>>>>
> > > >>>>>>> indicate the type of the Blob file being distributed, and the
> > > >>>>>>>
> > > >>>>>> following
> > > >>>>
> > > >>>>> interface in [Stream]ExecutionEnvironment to support it. In
> > general,
> > > >>>>>>>
> > > >>>>>> I
> > > >>>>
> > > >>>>> think the new BlobType information can be used by Flink runtime
> to
> > > >>>>>>> preprocess the Blob files if needed.
> > > >>>>>>>
> > > >>>>>>> */***
> > > >>>>>>> ** Registers a file at the distributed cache under the given
> > name.
> > > >>>>>>>
> > > >>>>>> The
> > > >>>>
> > > >>>>> file
> > > >>>>>>
> > > >>>>>>> will be accessible*
> > > >>>>>>> ** from any user-defined function in the (distributed) runtime
> > > under
> > > >>>>>>>
> > > >>>>>> a
> > > >>>>
> > > >>>>> local path. Files*
> > > >>>>>>> ** may be local files (as long as all relevant workers have
> > access
> > > to
> > > >>>>>>>
> > > >>>>>> it),
> > > >>>>>>
> > > >>>>>>> or files in a distributed file system.*
> > > >>>>>>> ** The runtime will copy the files temporarily to a local
> cache,
> > if
> > > >>>>>>> needed.*
> > > >>>>>>> ***
> > > >>>>>>> ** <p>The {@link org.apache.flink.api.common.
> > > >>>>>>>
> > > >>>>>> functions.RuntimeContext}
> > > >>>>
> > > >>>>> can
> > > >>>>>>
> > > >>>>>>> be obtained inside UDFs via*
> > > >>>>>>> ** {@link
> > > >>>>>>> org.apache.flink.api.common.functions.RichFunction#
> > > >>>>>>>
> > > >>>>>> getRuntimeContext()}
> > > >>>>>
> > > >>>>>> and
> > > >>>>>>> provides access*
> > > >>>>>>> ** {@link org.apache.flink.api.common.ca
> > > >>>>>>> <http://org.apache.flink.api.common.ca>che.DistributedCache}
> > via*
> > > >>>>>>> ** {@link
> > > >>>>>>> org.apache.flink.api.common.functions.RuntimeContext#
> > > >>>>>>> getDistributedCache()}.*
> > > >>>>>>> ***
> > > >>>>>>> ** @param filePath The path of the file, as a URI (e.g.
> > > >>>>>>>
> > > >>>>>> "file:///some/path"
> > > >>>>>>
> > > >>>>>>> or "hdfs://host:port/and/path")*
> > > >>>>>>> ** @param name The name under which the file is registered.*
> > > >>>>>>> ** @param blobType indicating the type of the Blob file*
> > > >>>>>>> **/*
> > > >>>>>>>
> > > >>>>>>> *public void registerCachedFile(String filePath, String name,
> > > >>>>>>> DistributedCache.BlobType blobType) {...}*
> > > >>>>>>>
> > > >>>>>>> Optionally, we can add another interface to register UDF Jars
> > which
> > > >>>>>>>
> > > >>>>>> will
> > > >>>>>
> > > >>>>>> use the interface above to implement.
> > > >>>>>>>
> > > >>>>>>> *public void registerJarFile(String filePath, String name)
> {...}*
> > > >>>>>>>
> > > >>>>>>> The existing interface in the following will be marked
> > deprecated:
> > > >>>>>>>
> > > >>>>>>> *public void registerCachedFile(String filePath, String name,
> > > boolean
> > > >>>>>>> executable) {...}*
> > > >>>>>>>
> > > >>>>>>> And the following interface will be implemented using the new
> > > >>>>>>>
> > > >>>>>> interface
> > > >>>>
> > > >>>>> proposed above with a EXECUTABLE BlobType:
> > > >>>>>>>
> > > >>>>>>> *public void registerCachedFile(String filePath, String name) {
> > ...
> > > >>>>>>>
> > > >>>>>> }*
> > > >>>>
> > > >>>>> Thanks a lot.
> > > >>>>>>> Shuyi
> > > >>>>>>>
> > > >>>>>>> "So you have to trust that the dots will somehow connect in
> your
> > > >>>>>>>
> > > >>>>>> future."
> > > >>>>>
> > > >>>>
> > > >
> > > >
> > >
> >
> >
> >
> > --
> > "So you have to trust that the dots will somehow connect in your future."
> >
>

Re: [DISCUSS] Adding new interfaces in [Stream]ExecutionEnvironment

Posted by Till Rohrmann <tr...@apache.org>.
I see Shuyi's point that it would nice to allow adding jar files which
should be part of the user code classloader programmatically. Actually, we
expose this functionality in the `RemoteEnvironment` where you can specify
additional jars which shall be shipped to the cluster in the constructor. I
assume that is exactly the functionality you are looking for. In that
sense, it might be an API inconsistency that we allow it for some cases and
for others not.

But I could also see that the whole functionality of dynamically loading
jars at runtime could also perfectly live in the `UdfSqlOperator`. This, of
course, would entail that one has to take care of clean up of the
downloaded resources. But it should be possible to first download the
resources and create a custom URLClassLoader at startup and then use this
class loader when calling into the UDF.

Cheers,
Till

On Wed, May 16, 2018 at 9:28 PM, Shuyi Chen <su...@gmail.com> wrote:

> Hi Aljoscha, Fabian, Rong, Ted and Timo,
>
> Thanks a lot for the feedback. Let me clarify the usage scenario in a bit
> more detail. The context is that we want to add support for SQL DDL to load
> UDF from external JARs located either in local filesystem or HDFS or a HTTP
> endpoint in Flink SQL. The local FS option is more for debugging purpose
> for user to submit the job jar locally, and the later 2 are for production
> uses. Below is an example User application with the *CREATE FUNCTION* DDL
> (Note: grammar and interface not finalized yet).
>
> ------------------------------------------------------------
> -------------------------------------
>
>
>
>
> *val env = StreamExecutionEnvironment.getExecutionEnvironmentval tEnv =
> TableEnvironment.getTableEnvironment(env)// setup the DataStream//......*
>
>
>
>
>
>
>
>
>
>
>
> *// register the DataStream under the name
> "OrderAtEnv.registerDataStream("OrderA", orderA, 'user, 'product,
> 'amount)tEnv.sqlDDL(  "create function helloFunc as
> 'com.example.udf.HelloWorld' using jars
> ('hdfs:///users/david/libraries/my-udf-1.0.1-SNAPSHOT.jar')")val result =
> tEnv.sqlQuery(  "SELECT user, helloFunc(product), amount FROM OrderA WHERE
> amount > 2")result.toAppendStream[Order].print()env.execute()*
> ------------------------------------------------------------
> -------------------------------------
>
> The example application above does the following:
> 1) it registers a DataStream as a Calcite table(
> *org.apache.calcite.schema.Table*) under name "OrderA", so SQL can
> reference the DataStream as table "OrderA".
> 2) it uses the SQL *CREATE FUNCTION* DDL (grammar and interface not
> finalized yet) to create a SQL UDF called *helloFunc* from a JAR located in
> a remote HDFS path.
> 3) it issues a sql query that uses the *helloFunc* UDF defined above and
> generate a Flink table (*org.apache.flink.table.api.Table*)
> 4) it convert the Flink table back to a DataStream and print it.
>
> Step 1), 3), and 4) are already implemented. To implement 2), we need to do
> the following to implement the *tEnv.sqlDDL()* function.
>
> a) parse the DDL into a SqlNode to extract the UDF *udfClasspath*, UDF
> remote path *udfUrls[]* and UDF SQL name *udfName*.
> b) use the URLClassLoader to load the JARs specified in *udfUrls[]*, and
> register the SQL UDF using the {Batch/Stream/}TableEnvironment
> registerFunction methods using*  udfClasspath* under name *udfName.*
> c) register the JARs *udfUrls[]* through the {Stream}ExecutionEnvironment,
> so that the JARs can be distributed to all the TaskManagers during runtime.
>
>
> Since the CREATE FUNCTION DDL is executed within the user application, I
> dont think we have access to the ClusterClient at the point when
> *tEnv.sqlDDL()* is executed. Also the JARs can be in a remote filesystem
> (which is the main usage scenarios), so the user can't really prepare the
> jar somehow in advance statically.
>
> For normal user application, I think {Stream}ExecutionEnvironment is the
> right place for the functionality, since it provides methods to control the
> job execution and to interact with the outside world, and also, it actually
> already does similar things provided through the *registerCachedFile*
> interface.
>
> However, in such case, SQL FUNCTION DDL and SQL client will use 2 different
> routes to register UDF jars, one through *JobGraph.jobConfiguration* and
> the other through *JobGraph.userJars*. So *maybe we can, as Fabian
> suggests, add **registerUserJarFile()/getUserJarFiles() interfaces
> in {Stream}ExecutionEnvironment, which stores the jars internally in a
> List, and when generating JobGraph, copy the jars to the JobGraph using
> the  {Stream}ExecutionEnvironment.getUserJarFiles() and
> JobGraph.addJar()* (Note,
> streaming and batch implementations might vary). In such case, both SQL
> FUNCTION DDL and SQL client will use *JobGraph.userJars* to ship the UDF
> jars.
>
> Hope that clarifies better. What do you guys think? Thanks a lot.
>
> Cheers!
> Shuyi
>
> On Wed, May 16, 2018 at 9:45 AM, Rong Rong <wa...@gmail.com> wrote:
>
> > I think the question here is whether registering Jar files (or other
> > executable files) during job submission is sufficient for @shuyi's use
> > case.
> >
> > If I understand correctly regarding the part of dynamic distribution of
> the
> > external libraries in runtime. This is used to deal with DDL/DSL such as:
> >     CREATE FUNCTION my_fun FROM url://<some_remote_jar>
> > during execution. Correct me if I am wrong @shuyi, The basic assumption
> > that "we can locate and ship all executable JARs during job submission"
> no
> > longer holds for your use case right?
> >
> > I guess we are missing details here regarding the "distribution of
> external
> > libraries in runtime" part. Maybe you can share more example of this use
> > case. Would this be included in the design doc @Timo?
> >
> > --
> > Rong
> >
> > On Wed, May 16, 2018 at 5:41 AM, Timo Walther <tw...@apache.org>
> wrote:
> >
> > > Yes, we are using the addJar functionionality of the JobGraph as well
> for
> > > the SQL Client.
> > >
> > > I think the execution environment is not the right place to specify
> jars.
> > > The location of the jars depends on the submission method. If a local
> > path
> > > is specified in the main() method of a packaged Flink jar, it would not
> > > work when such a program is submitted through the REST API.
> > >
> > > Regards,
> > > Timo
> > >
> > > Am 16.05.18 um 14:32 schrieb Aljoscha Krettek:
> > >
> > > I think this functionality is already there, we just have to expose it
> in
> > >> the right places: ClusterClient.submitJob() takes a JobGraph, JobGraph
> > has
> > >> method addJar() for adding jars that need to be in the classloader for
> > >> executing a user program.
> > >>
> > >> On 16. May 2018, at 12:34, Fabian Hueske <fh...@gmail.com> wrote:
> > >>>
> > >>> Hi Ted,
> > >>>
> > >>> The design doc is in late draft status and proposes support for SQL
> DDL
> > >>> statements (CREATE TABLE, CREATE  FUNCTION, etc.).
> > >>> The question about registering JARs came up because we need a way to
> > >>> distribute JAR files that contain the code of user-defined functions.
> > >>>
> > >>> The design doc will soon be shared on the dev mailing list to gather
> > >>> feedback from the community.
> > >>>
> > >>> Best, Fabian
> > >>>
> > >>> 2018-05-16 10:45 GMT+02:00 Ted Yu <yu...@gmail.com>:
> > >>>
> > >>> bq. In a design document, Timo mentioned that we can ship multiple
> JAR
> > >>>> files
> > >>>>
> > >>>> Mind telling us where the design doc can be retrieved ?
> > >>>>
> > >>>> Thanks
> > >>>>
> > >>>> On Wed, May 16, 2018 at 1:29 AM, Fabian Hueske <fh...@gmail.com>
> > >>>> wrote:
> > >>>>
> > >>>> Hi,
> > >>>>>
> > >>>>> I'm not sure if we need to modify the existing method.
> > >>>>> What we need is a bit different from what registerCachedFile()
> > >>>>> provides.
> > >>>>> The method ensures that a file is copied to each TaskManager and
> can
> > be
> > >>>>> locally accessed from a function's RuntimeContext.
> > >>>>> In our case, we don't need to access the file but would like to
> make
> > >>>>> sure
> > >>>>> that it is loaded into the class loader.
> > >>>>> So, we could also just add a method like registerUserJarFile().
> > >>>>>
> > >>>>> In a design document, Timo mentioned that we can ship multiple JAR
> > >>>>> files
> > >>>>> with a job.
> > >>>>> So, we could also implement the UDF shipping logic by loading the
> Jar
> > >>>>> file(s) to the client and distribute them from there.
> > >>>>> In that case, we would not need to add new method to the execution
> > >>>>> environment.
> > >>>>>
> > >>>>> Best,
> > >>>>> Fabian
> > >>>>>
> > >>>>> 2018-05-15 3:50 GMT+02:00 Rong Rong <wa...@gmail.com>:
> > >>>>>
> > >>>>> +1. This could be very useful for "dynamic" UDF.
> > >>>>>>
> > >>>>>> Just to clarify, if I understand correctly, we are tying to use an
> > >>>>>> ENUM
> > >>>>>> indicator to
> > >>>>>> (1) Replace the current Boolean isExecutable flag.
> > >>>>>> (2) Provide additional information used by ExecutionEnvironment to
> > >>>>>>
> > >>>>> decide
> > >>>>
> > >>>>> when/where to use the DistributedCached file.
> > >>>>>>
> > >>>>>> In this case, DistributedCache.CacheType or
> > DistributedCache.FileType
> > >>>>>> sounds more intuitive, what do you think?
> > >>>>>>
> > >>>>>> Also, I was wondering is there any other useful information for
> the
> > >>>>>>
> > >>>>> cached
> > >>>>>
> > >>>>>> file to be passed to runtime.
> > >>>>>> If we are just talking about including the library to the
> > classloader,
> > >>>>>>
> > >>>>> can
> > >>>>>
> > >>>>>> we directly extend the interface with
> > >>>>>>
> > >>>>>> public void registerCachedFile(
> > >>>>>>     String filePath,
> > >>>>>>     String name,
> > >>>>>>     boolean executable,
> > >>>>>>     boolean includeInClassLoader)
> > >>>>>>
> > >>>>>>
> > >>>>>> Thanks,
> > >>>>>> Rong
> > >>>>>>
> > >>>>>> On Sun, May 13, 2018 at 11:14 PM, Shuyi Chen <su...@gmail.com>
> > >>>>>>
> > >>>>> wrote:
> > >>>>
> > >>>>> Hi Flink devs,
> > >>>>>>>
> > >>>>>>> In an effort to support loading external libraries and creating
> > UDFs
> > >>>>>>>
> > >>>>>> from
> > >>>>>
> > >>>>>> external libraries using DDL in Flink SQL, we want to use Flink’s
> > >>>>>>>
> > >>>>>> Blob
> > >>>>
> > >>>>> Server to distribute the external libraries in runtime and load
> those
> > >>>>>>> libraries into the user code classloader automatically.
> > >>>>>>>
> > >>>>>>> However, the current [Stream]ExecutionEnvironment.
> > registerCachedFile
> > >>>>>>> interface limits only to registering executable or non-executable
> > >>>>>>>
> > >>>>>> blobs.
> > >>>>>
> > >>>>>> It’s not possible to tell in runtime if the blob files are
> libraries
> > >>>>>>>
> > >>>>>> and
> > >>>>>
> > >>>>>> should be loaded into the user code classloader in RuntimeContext.
> > >>>>>>> Therefore, I want to propose to add an enum called *BlobType*
> > >>>>>>>
> > >>>>>> explicitly
> > >>>>>
> > >>>>>> to
> > >>>>>>
> > >>>>>>> indicate the type of the Blob file being distributed, and the
> > >>>>>>>
> > >>>>>> following
> > >>>>
> > >>>>> interface in [Stream]ExecutionEnvironment to support it. In
> general,
> > >>>>>>>
> > >>>>>> I
> > >>>>
> > >>>>> think the new BlobType information can be used by Flink runtime to
> > >>>>>>> preprocess the Blob files if needed.
> > >>>>>>>
> > >>>>>>> */***
> > >>>>>>> ** Registers a file at the distributed cache under the given
> name.
> > >>>>>>>
> > >>>>>> The
> > >>>>
> > >>>>> file
> > >>>>>>
> > >>>>>>> will be accessible*
> > >>>>>>> ** from any user-defined function in the (distributed) runtime
> > under
> > >>>>>>>
> > >>>>>> a
> > >>>>
> > >>>>> local path. Files*
> > >>>>>>> ** may be local files (as long as all relevant workers have
> access
> > to
> > >>>>>>>
> > >>>>>> it),
> > >>>>>>
> > >>>>>>> or files in a distributed file system.*
> > >>>>>>> ** The runtime will copy the files temporarily to a local cache,
> if
> > >>>>>>> needed.*
> > >>>>>>> ***
> > >>>>>>> ** <p>The {@link org.apache.flink.api.common.
> > >>>>>>>
> > >>>>>> functions.RuntimeContext}
> > >>>>
> > >>>>> can
> > >>>>>>
> > >>>>>>> be obtained inside UDFs via*
> > >>>>>>> ** {@link
> > >>>>>>> org.apache.flink.api.common.functions.RichFunction#
> > >>>>>>>
> > >>>>>> getRuntimeContext()}
> > >>>>>
> > >>>>>> and
> > >>>>>>> provides access*
> > >>>>>>> ** {@link org.apache.flink.api.common.ca
> > >>>>>>> <http://org.apache.flink.api.common.ca>che.DistributedCache}
> via*
> > >>>>>>> ** {@link
> > >>>>>>> org.apache.flink.api.common.functions.RuntimeContext#
> > >>>>>>> getDistributedCache()}.*
> > >>>>>>> ***
> > >>>>>>> ** @param filePath The path of the file, as a URI (e.g.
> > >>>>>>>
> > >>>>>> "file:///some/path"
> > >>>>>>
> > >>>>>>> or "hdfs://host:port/and/path")*
> > >>>>>>> ** @param name The name under which the file is registered.*
> > >>>>>>> ** @param blobType indicating the type of the Blob file*
> > >>>>>>> **/*
> > >>>>>>>
> > >>>>>>> *public void registerCachedFile(String filePath, String name,
> > >>>>>>> DistributedCache.BlobType blobType) {...}*
> > >>>>>>>
> > >>>>>>> Optionally, we can add another interface to register UDF Jars
> which
> > >>>>>>>
> > >>>>>> will
> > >>>>>
> > >>>>>> use the interface above to implement.
> > >>>>>>>
> > >>>>>>> *public void registerJarFile(String filePath, String name) {...}*
> > >>>>>>>
> > >>>>>>> The existing interface in the following will be marked
> deprecated:
> > >>>>>>>
> > >>>>>>> *public void registerCachedFile(String filePath, String name,
> > boolean
> > >>>>>>> executable) {...}*
> > >>>>>>>
> > >>>>>>> And the following interface will be implemented using the new
> > >>>>>>>
> > >>>>>> interface
> > >>>>
> > >>>>> proposed above with a EXECUTABLE BlobType:
> > >>>>>>>
> > >>>>>>> *public void registerCachedFile(String filePath, String name) {
> ...
> > >>>>>>>
> > >>>>>> }*
> > >>>>
> > >>>>> Thanks a lot.
> > >>>>>>> Shuyi
> > >>>>>>>
> > >>>>>>> "So you have to trust that the dots will somehow connect in your
> > >>>>>>>
> > >>>>>> future."
> > >>>>>
> > >>>>
> > >
> > >
> >
>
>
>
> --
> "So you have to trust that the dots will somehow connect in your future."
>

Re: [DISCUSS] Adding new interfaces in [Stream]ExecutionEnvironment

Posted by Shuyi Chen <su...@gmail.com>.
Hi Aljoscha, Fabian, Rong, Ted and Timo,

Thanks a lot for the feedback. Let me clarify the usage scenario in a bit
more detail. The context is that we want to add support for SQL DDL to load
UDF from external JARs located either in local filesystem or HDFS or a HTTP
endpoint in Flink SQL. The local FS option is more for debugging purpose
for user to submit the job jar locally, and the later 2 are for production
uses. Below is an example User application with the *CREATE FUNCTION* DDL
(Note: grammar and interface not finalized yet).

------------------------------------------------------------
-------------------------------------




*val env = StreamExecutionEnvironment.getExecutionEnvironmentval tEnv =
TableEnvironment.getTableEnvironment(env)// setup the DataStream//......*











*// register the DataStream under the name
"OrderAtEnv.registerDataStream("OrderA", orderA, 'user, 'product,
'amount)tEnv.sqlDDL(  "create function helloFunc as
'com.example.udf.HelloWorld' using jars
('hdfs:///users/david/libraries/my-udf-1.0.1-SNAPSHOT.jar')")val result =
tEnv.sqlQuery(  "SELECT user, helloFunc(product), amount FROM OrderA WHERE
amount > 2")result.toAppendStream[Order].print()env.execute()*
------------------------------------------------------------
-------------------------------------

The example application above does the following:
1) it registers a DataStream as a Calcite table(
*org.apache.calcite.schema.Table*) under name "OrderA", so SQL can
reference the DataStream as table "OrderA".
2) it uses the SQL *CREATE FUNCTION* DDL (grammar and interface not
finalized yet) to create a SQL UDF called *helloFunc* from a JAR located in
a remote HDFS path.
3) it issues a sql query that uses the *helloFunc* UDF defined above and
generate a Flink table (*org.apache.flink.table.api.Table*)
4) it convert the Flink table back to a DataStream and print it.

Step 1), 3), and 4) are already implemented. To implement 2), we need to do
the following to implement the *tEnv.sqlDDL()* function.

a) parse the DDL into a SqlNode to extract the UDF *udfClasspath*, UDF
remote path *udfUrls[]* and UDF SQL name *udfName*.
b) use the URLClassLoader to load the JARs specified in *udfUrls[]*, and
register the SQL UDF using the {Batch/Stream/}TableEnvironment
registerFunction methods using*  udfClasspath* under name *udfName.*
c) register the JARs *udfUrls[]* through the {Stream}ExecutionEnvironment,
so that the JARs can be distributed to all the TaskManagers during runtime.


Since the CREATE FUNCTION DDL is executed within the user application, I
dont think we have access to the ClusterClient at the point when
*tEnv.sqlDDL()* is executed. Also the JARs can be in a remote filesystem
(which is the main usage scenarios), so the user can't really prepare the
jar somehow in advance statically.

For normal user application, I think {Stream}ExecutionEnvironment is the
right place for the functionality, since it provides methods to control the
job execution and to interact with the outside world, and also, it actually
already does similar things provided through the *registerCachedFile*
interface.

However, in such case, SQL FUNCTION DDL and SQL client will use 2 different
routes to register UDF jars, one through *JobGraph.jobConfiguration* and
the other through *JobGraph.userJars*. So *maybe we can, as Fabian
suggests, add **registerUserJarFile()/getUserJarFiles() interfaces
in {Stream}ExecutionEnvironment, which stores the jars internally in a
List, and when generating JobGraph, copy the jars to the JobGraph using
the  {Stream}ExecutionEnvironment.getUserJarFiles() and
JobGraph.addJar()* (Note,
streaming and batch implementations might vary). In such case, both SQL
FUNCTION DDL and SQL client will use *JobGraph.userJars* to ship the UDF
jars.

Hope that clarifies better. What do you guys think? Thanks a lot.

Cheers!
Shuyi

On Wed, May 16, 2018 at 9:45 AM, Rong Rong <wa...@gmail.com> wrote:

> I think the question here is whether registering Jar files (or other
> executable files) during job submission is sufficient for @shuyi's use
> case.
>
> If I understand correctly regarding the part of dynamic distribution of the
> external libraries in runtime. This is used to deal with DDL/DSL such as:
>     CREATE FUNCTION my_fun FROM url://<some_remote_jar>
> during execution. Correct me if I am wrong @shuyi, The basic assumption
> that "we can locate and ship all executable JARs during job submission" no
> longer holds for your use case right?
>
> I guess we are missing details here regarding the "distribution of external
> libraries in runtime" part. Maybe you can share more example of this use
> case. Would this be included in the design doc @Timo?
>
> --
> Rong
>
> On Wed, May 16, 2018 at 5:41 AM, Timo Walther <tw...@apache.org> wrote:
>
> > Yes, we are using the addJar functionionality of the JobGraph as well for
> > the SQL Client.
> >
> > I think the execution environment is not the right place to specify jars.
> > The location of the jars depends on the submission method. If a local
> path
> > is specified in the main() method of a packaged Flink jar, it would not
> > work when such a program is submitted through the REST API.
> >
> > Regards,
> > Timo
> >
> > Am 16.05.18 um 14:32 schrieb Aljoscha Krettek:
> >
> > I think this functionality is already there, we just have to expose it in
> >> the right places: ClusterClient.submitJob() takes a JobGraph, JobGraph
> has
> >> method addJar() for adding jars that need to be in the classloader for
> >> executing a user program.
> >>
> >> On 16. May 2018, at 12:34, Fabian Hueske <fh...@gmail.com> wrote:
> >>>
> >>> Hi Ted,
> >>>
> >>> The design doc is in late draft status and proposes support for SQL DDL
> >>> statements (CREATE TABLE, CREATE  FUNCTION, etc.).
> >>> The question about registering JARs came up because we need a way to
> >>> distribute JAR files that contain the code of user-defined functions.
> >>>
> >>> The design doc will soon be shared on the dev mailing list to gather
> >>> feedback from the community.
> >>>
> >>> Best, Fabian
> >>>
> >>> 2018-05-16 10:45 GMT+02:00 Ted Yu <yu...@gmail.com>:
> >>>
> >>> bq. In a design document, Timo mentioned that we can ship multiple JAR
> >>>> files
> >>>>
> >>>> Mind telling us where the design doc can be retrieved ?
> >>>>
> >>>> Thanks
> >>>>
> >>>> On Wed, May 16, 2018 at 1:29 AM, Fabian Hueske <fh...@gmail.com>
> >>>> wrote:
> >>>>
> >>>> Hi,
> >>>>>
> >>>>> I'm not sure if we need to modify the existing method.
> >>>>> What we need is a bit different from what registerCachedFile()
> >>>>> provides.
> >>>>> The method ensures that a file is copied to each TaskManager and can
> be
> >>>>> locally accessed from a function's RuntimeContext.
> >>>>> In our case, we don't need to access the file but would like to make
> >>>>> sure
> >>>>> that it is loaded into the class loader.
> >>>>> So, we could also just add a method like registerUserJarFile().
> >>>>>
> >>>>> In a design document, Timo mentioned that we can ship multiple JAR
> >>>>> files
> >>>>> with a job.
> >>>>> So, we could also implement the UDF shipping logic by loading the Jar
> >>>>> file(s) to the client and distribute them from there.
> >>>>> In that case, we would not need to add new method to the execution
> >>>>> environment.
> >>>>>
> >>>>> Best,
> >>>>> Fabian
> >>>>>
> >>>>> 2018-05-15 3:50 GMT+02:00 Rong Rong <wa...@gmail.com>:
> >>>>>
> >>>>> +1. This could be very useful for "dynamic" UDF.
> >>>>>>
> >>>>>> Just to clarify, if I understand correctly, we are tying to use an
> >>>>>> ENUM
> >>>>>> indicator to
> >>>>>> (1) Replace the current Boolean isExecutable flag.
> >>>>>> (2) Provide additional information used by ExecutionEnvironment to
> >>>>>>
> >>>>> decide
> >>>>
> >>>>> when/where to use the DistributedCached file.
> >>>>>>
> >>>>>> In this case, DistributedCache.CacheType or
> DistributedCache.FileType
> >>>>>> sounds more intuitive, what do you think?
> >>>>>>
> >>>>>> Also, I was wondering is there any other useful information for the
> >>>>>>
> >>>>> cached
> >>>>>
> >>>>>> file to be passed to runtime.
> >>>>>> If we are just talking about including the library to the
> classloader,
> >>>>>>
> >>>>> can
> >>>>>
> >>>>>> we directly extend the interface with
> >>>>>>
> >>>>>> public void registerCachedFile(
> >>>>>>     String filePath,
> >>>>>>     String name,
> >>>>>>     boolean executable,
> >>>>>>     boolean includeInClassLoader)
> >>>>>>
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Rong
> >>>>>>
> >>>>>> On Sun, May 13, 2018 at 11:14 PM, Shuyi Chen <su...@gmail.com>
> >>>>>>
> >>>>> wrote:
> >>>>
> >>>>> Hi Flink devs,
> >>>>>>>
> >>>>>>> In an effort to support loading external libraries and creating
> UDFs
> >>>>>>>
> >>>>>> from
> >>>>>
> >>>>>> external libraries using DDL in Flink SQL, we want to use Flink’s
> >>>>>>>
> >>>>>> Blob
> >>>>
> >>>>> Server to distribute the external libraries in runtime and load those
> >>>>>>> libraries into the user code classloader automatically.
> >>>>>>>
> >>>>>>> However, the current [Stream]ExecutionEnvironment.
> registerCachedFile
> >>>>>>> interface limits only to registering executable or non-executable
> >>>>>>>
> >>>>>> blobs.
> >>>>>
> >>>>>> It’s not possible to tell in runtime if the blob files are libraries
> >>>>>>>
> >>>>>> and
> >>>>>
> >>>>>> should be loaded into the user code classloader in RuntimeContext.
> >>>>>>> Therefore, I want to propose to add an enum called *BlobType*
> >>>>>>>
> >>>>>> explicitly
> >>>>>
> >>>>>> to
> >>>>>>
> >>>>>>> indicate the type of the Blob file being distributed, and the
> >>>>>>>
> >>>>>> following
> >>>>
> >>>>> interface in [Stream]ExecutionEnvironment to support it. In general,
> >>>>>>>
> >>>>>> I
> >>>>
> >>>>> think the new BlobType information can be used by Flink runtime to
> >>>>>>> preprocess the Blob files if needed.
> >>>>>>>
> >>>>>>> */***
> >>>>>>> ** Registers a file at the distributed cache under the given name.
> >>>>>>>
> >>>>>> The
> >>>>
> >>>>> file
> >>>>>>
> >>>>>>> will be accessible*
> >>>>>>> ** from any user-defined function in the (distributed) runtime
> under
> >>>>>>>
> >>>>>> a
> >>>>
> >>>>> local path. Files*
> >>>>>>> ** may be local files (as long as all relevant workers have access
> to
> >>>>>>>
> >>>>>> it),
> >>>>>>
> >>>>>>> or files in a distributed file system.*
> >>>>>>> ** The runtime will copy the files temporarily to a local cache, if
> >>>>>>> needed.*
> >>>>>>> ***
> >>>>>>> ** <p>The {@link org.apache.flink.api.common.
> >>>>>>>
> >>>>>> functions.RuntimeContext}
> >>>>
> >>>>> can
> >>>>>>
> >>>>>>> be obtained inside UDFs via*
> >>>>>>> ** {@link
> >>>>>>> org.apache.flink.api.common.functions.RichFunction#
> >>>>>>>
> >>>>>> getRuntimeContext()}
> >>>>>
> >>>>>> and
> >>>>>>> provides access*
> >>>>>>> ** {@link org.apache.flink.api.common.ca
> >>>>>>> <http://org.apache.flink.api.common.ca>che.DistributedCache} via*
> >>>>>>> ** {@link
> >>>>>>> org.apache.flink.api.common.functions.RuntimeContext#
> >>>>>>> getDistributedCache()}.*
> >>>>>>> ***
> >>>>>>> ** @param filePath The path of the file, as a URI (e.g.
> >>>>>>>
> >>>>>> "file:///some/path"
> >>>>>>
> >>>>>>> or "hdfs://host:port/and/path")*
> >>>>>>> ** @param name The name under which the file is registered.*
> >>>>>>> ** @param blobType indicating the type of the Blob file*
> >>>>>>> **/*
> >>>>>>>
> >>>>>>> *public void registerCachedFile(String filePath, String name,
> >>>>>>> DistributedCache.BlobType blobType) {...}*
> >>>>>>>
> >>>>>>> Optionally, we can add another interface to register UDF Jars which
> >>>>>>>
> >>>>>> will
> >>>>>
> >>>>>> use the interface above to implement.
> >>>>>>>
> >>>>>>> *public void registerJarFile(String filePath, String name) {...}*
> >>>>>>>
> >>>>>>> The existing interface in the following will be marked deprecated:
> >>>>>>>
> >>>>>>> *public void registerCachedFile(String filePath, String name,
> boolean
> >>>>>>> executable) {...}*
> >>>>>>>
> >>>>>>> And the following interface will be implemented using the new
> >>>>>>>
> >>>>>> interface
> >>>>
> >>>>> proposed above with a EXECUTABLE BlobType:
> >>>>>>>
> >>>>>>> *public void registerCachedFile(String filePath, String name) { ...
> >>>>>>>
> >>>>>> }*
> >>>>
> >>>>> Thanks a lot.
> >>>>>>> Shuyi
> >>>>>>>
> >>>>>>> "So you have to trust that the dots will somehow connect in your
> >>>>>>>
> >>>>>> future."
> >>>>>
> >>>>
> >
> >
>



-- 
"So you have to trust that the dots will somehow connect in your future."

Re: [DISCUSS] Adding new interfaces in [Stream]ExecutionEnvironment

Posted by Rong Rong <wa...@gmail.com>.
I think the question here is whether registering Jar files (or other
executable files) during job submission is sufficient for @shuyi's use
case.

If I understand correctly regarding the part of dynamic distribution of the
external libraries in runtime. This is used to deal with DDL/DSL such as:
    CREATE FUNCTION my_fun FROM url://<some_remote_jar>
during execution. Correct me if I am wrong @shuyi, The basic assumption
that "we can locate and ship all executable JARs during job submission" no
longer holds for your use case right?

I guess we are missing details here regarding the "distribution of external
libraries in runtime" part. Maybe you can share more example of this use
case. Would this be included in the design doc @Timo?

--
Rong

On Wed, May 16, 2018 at 5:41 AM, Timo Walther <tw...@apache.org> wrote:

> Yes, we are using the addJar functionionality of the JobGraph as well for
> the SQL Client.
>
> I think the execution environment is not the right place to specify jars.
> The location of the jars depends on the submission method. If a local path
> is specified in the main() method of a packaged Flink jar, it would not
> work when such a program is submitted through the REST API.
>
> Regards,
> Timo
>
> Am 16.05.18 um 14:32 schrieb Aljoscha Krettek:
>
> I think this functionality is already there, we just have to expose it in
>> the right places: ClusterClient.submitJob() takes a JobGraph, JobGraph has
>> method addJar() for adding jars that need to be in the classloader for
>> executing a user program.
>>
>> On 16. May 2018, at 12:34, Fabian Hueske <fh...@gmail.com> wrote:
>>>
>>> Hi Ted,
>>>
>>> The design doc is in late draft status and proposes support for SQL DDL
>>> statements (CREATE TABLE, CREATE  FUNCTION, etc.).
>>> The question about registering JARs came up because we need a way to
>>> distribute JAR files that contain the code of user-defined functions.
>>>
>>> The design doc will soon be shared on the dev mailing list to gather
>>> feedback from the community.
>>>
>>> Best, Fabian
>>>
>>> 2018-05-16 10:45 GMT+02:00 Ted Yu <yu...@gmail.com>:
>>>
>>> bq. In a design document, Timo mentioned that we can ship multiple JAR
>>>> files
>>>>
>>>> Mind telling us where the design doc can be retrieved ?
>>>>
>>>> Thanks
>>>>
>>>> On Wed, May 16, 2018 at 1:29 AM, Fabian Hueske <fh...@gmail.com>
>>>> wrote:
>>>>
>>>> Hi,
>>>>>
>>>>> I'm not sure if we need to modify the existing method.
>>>>> What we need is a bit different from what registerCachedFile()
>>>>> provides.
>>>>> The method ensures that a file is copied to each TaskManager and can be
>>>>> locally accessed from a function's RuntimeContext.
>>>>> In our case, we don't need to access the file but would like to make
>>>>> sure
>>>>> that it is loaded into the class loader.
>>>>> So, we could also just add a method like registerUserJarFile().
>>>>>
>>>>> In a design document, Timo mentioned that we can ship multiple JAR
>>>>> files
>>>>> with a job.
>>>>> So, we could also implement the UDF shipping logic by loading the Jar
>>>>> file(s) to the client and distribute them from there.
>>>>> In that case, we would not need to add new method to the execution
>>>>> environment.
>>>>>
>>>>> Best,
>>>>> Fabian
>>>>>
>>>>> 2018-05-15 3:50 GMT+02:00 Rong Rong <wa...@gmail.com>:
>>>>>
>>>>> +1. This could be very useful for "dynamic" UDF.
>>>>>>
>>>>>> Just to clarify, if I understand correctly, we are tying to use an
>>>>>> ENUM
>>>>>> indicator to
>>>>>> (1) Replace the current Boolean isExecutable flag.
>>>>>> (2) Provide additional information used by ExecutionEnvironment to
>>>>>>
>>>>> decide
>>>>
>>>>> when/where to use the DistributedCached file.
>>>>>>
>>>>>> In this case, DistributedCache.CacheType or DistributedCache.FileType
>>>>>> sounds more intuitive, what do you think?
>>>>>>
>>>>>> Also, I was wondering is there any other useful information for the
>>>>>>
>>>>> cached
>>>>>
>>>>>> file to be passed to runtime.
>>>>>> If we are just talking about including the library to the classloader,
>>>>>>
>>>>> can
>>>>>
>>>>>> we directly extend the interface with
>>>>>>
>>>>>> public void registerCachedFile(
>>>>>>     String filePath,
>>>>>>     String name,
>>>>>>     boolean executable,
>>>>>>     boolean includeInClassLoader)
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>> Rong
>>>>>>
>>>>>> On Sun, May 13, 2018 at 11:14 PM, Shuyi Chen <su...@gmail.com>
>>>>>>
>>>>> wrote:
>>>>
>>>>> Hi Flink devs,
>>>>>>>
>>>>>>> In an effort to support loading external libraries and creating UDFs
>>>>>>>
>>>>>> from
>>>>>
>>>>>> external libraries using DDL in Flink SQL, we want to use Flink’s
>>>>>>>
>>>>>> Blob
>>>>
>>>>> Server to distribute the external libraries in runtime and load those
>>>>>>> libraries into the user code classloader automatically.
>>>>>>>
>>>>>>> However, the current [Stream]ExecutionEnvironment.registerCachedFile
>>>>>>> interface limits only to registering executable or non-executable
>>>>>>>
>>>>>> blobs.
>>>>>
>>>>>> It’s not possible to tell in runtime if the blob files are libraries
>>>>>>>
>>>>>> and
>>>>>
>>>>>> should be loaded into the user code classloader in RuntimeContext.
>>>>>>> Therefore, I want to propose to add an enum called *BlobType*
>>>>>>>
>>>>>> explicitly
>>>>>
>>>>>> to
>>>>>>
>>>>>>> indicate the type of the Blob file being distributed, and the
>>>>>>>
>>>>>> following
>>>>
>>>>> interface in [Stream]ExecutionEnvironment to support it. In general,
>>>>>>>
>>>>>> I
>>>>
>>>>> think the new BlobType information can be used by Flink runtime to
>>>>>>> preprocess the Blob files if needed.
>>>>>>>
>>>>>>> */***
>>>>>>> ** Registers a file at the distributed cache under the given name.
>>>>>>>
>>>>>> The
>>>>
>>>>> file
>>>>>>
>>>>>>> will be accessible*
>>>>>>> ** from any user-defined function in the (distributed) runtime under
>>>>>>>
>>>>>> a
>>>>
>>>>> local path. Files*
>>>>>>> ** may be local files (as long as all relevant workers have access to
>>>>>>>
>>>>>> it),
>>>>>>
>>>>>>> or files in a distributed file system.*
>>>>>>> ** The runtime will copy the files temporarily to a local cache, if
>>>>>>> needed.*
>>>>>>> ***
>>>>>>> ** <p>The {@link org.apache.flink.api.common.
>>>>>>>
>>>>>> functions.RuntimeContext}
>>>>
>>>>> can
>>>>>>
>>>>>>> be obtained inside UDFs via*
>>>>>>> ** {@link
>>>>>>> org.apache.flink.api.common.functions.RichFunction#
>>>>>>>
>>>>>> getRuntimeContext()}
>>>>>
>>>>>> and
>>>>>>> provides access*
>>>>>>> ** {@link org.apache.flink.api.common.ca
>>>>>>> <http://org.apache.flink.api.common.ca>che.DistributedCache} via*
>>>>>>> ** {@link
>>>>>>> org.apache.flink.api.common.functions.RuntimeContext#
>>>>>>> getDistributedCache()}.*
>>>>>>> ***
>>>>>>> ** @param filePath The path of the file, as a URI (e.g.
>>>>>>>
>>>>>> "file:///some/path"
>>>>>>
>>>>>>> or "hdfs://host:port/and/path")*
>>>>>>> ** @param name The name under which the file is registered.*
>>>>>>> ** @param blobType indicating the type of the Blob file*
>>>>>>> **/*
>>>>>>>
>>>>>>> *public void registerCachedFile(String filePath, String name,
>>>>>>> DistributedCache.BlobType blobType) {...}*
>>>>>>>
>>>>>>> Optionally, we can add another interface to register UDF Jars which
>>>>>>>
>>>>>> will
>>>>>
>>>>>> use the interface above to implement.
>>>>>>>
>>>>>>> *public void registerJarFile(String filePath, String name) {...}*
>>>>>>>
>>>>>>> The existing interface in the following will be marked deprecated:
>>>>>>>
>>>>>>> *public void registerCachedFile(String filePath, String name, boolean
>>>>>>> executable) {...}*
>>>>>>>
>>>>>>> And the following interface will be implemented using the new
>>>>>>>
>>>>>> interface
>>>>
>>>>> proposed above with a EXECUTABLE BlobType:
>>>>>>>
>>>>>>> *public void registerCachedFile(String filePath, String name) { ...
>>>>>>>
>>>>>> }*
>>>>
>>>>> Thanks a lot.
>>>>>>> Shuyi
>>>>>>>
>>>>>>> "So you have to trust that the dots will somehow connect in your
>>>>>>>
>>>>>> future."
>>>>>
>>>>
>
>

Re: [DISCUSS] Adding new interfaces in [Stream]ExecutionEnvironment

Posted by Timo Walther <tw...@apache.org>.
Yes, we are using the addJar functionionality of the JobGraph as well 
for the SQL Client.

I think the execution environment is not the right place to specify 
jars. The location of the jars depends on the submission method. If a 
local path is specified in the main() method of a packaged Flink jar, it 
would not work when such a program is submitted through the REST API.

Regards,
Timo

Am 16.05.18 um 14:32 schrieb Aljoscha Krettek:
> I think this functionality is already there, we just have to expose it in the right places: ClusterClient.submitJob() takes a JobGraph, JobGraph has method addJar() for adding jars that need to be in the classloader for executing a user program.
>
>> On 16. May 2018, at 12:34, Fabian Hueske <fh...@gmail.com> wrote:
>>
>> Hi Ted,
>>
>> The design doc is in late draft status and proposes support for SQL DDL
>> statements (CREATE TABLE, CREATE  FUNCTION, etc.).
>> The question about registering JARs came up because we need a way to
>> distribute JAR files that contain the code of user-defined functions.
>>
>> The design doc will soon be shared on the dev mailing list to gather
>> feedback from the community.
>>
>> Best, Fabian
>>
>> 2018-05-16 10:45 GMT+02:00 Ted Yu <yu...@gmail.com>:
>>
>>> bq. In a design document, Timo mentioned that we can ship multiple JAR
>>> files
>>>
>>> Mind telling us where the design doc can be retrieved ?
>>>
>>> Thanks
>>>
>>> On Wed, May 16, 2018 at 1:29 AM, Fabian Hueske <fh...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I'm not sure if we need to modify the existing method.
>>>> What we need is a bit different from what registerCachedFile() provides.
>>>> The method ensures that a file is copied to each TaskManager and can be
>>>> locally accessed from a function's RuntimeContext.
>>>> In our case, we don't need to access the file but would like to make sure
>>>> that it is loaded into the class loader.
>>>> So, we could also just add a method like registerUserJarFile().
>>>>
>>>> In a design document, Timo mentioned that we can ship multiple JAR files
>>>> with a job.
>>>> So, we could also implement the UDF shipping logic by loading the Jar
>>>> file(s) to the client and distribute them from there.
>>>> In that case, we would not need to add new method to the execution
>>>> environment.
>>>>
>>>> Best,
>>>> Fabian
>>>>
>>>> 2018-05-15 3:50 GMT+02:00 Rong Rong <wa...@gmail.com>:
>>>>
>>>>> +1. This could be very useful for "dynamic" UDF.
>>>>>
>>>>> Just to clarify, if I understand correctly, we are tying to use an ENUM
>>>>> indicator to
>>>>> (1) Replace the current Boolean isExecutable flag.
>>>>> (2) Provide additional information used by ExecutionEnvironment to
>>> decide
>>>>> when/where to use the DistributedCached file.
>>>>>
>>>>> In this case, DistributedCache.CacheType or DistributedCache.FileType
>>>>> sounds more intuitive, what do you think?
>>>>>
>>>>> Also, I was wondering is there any other useful information for the
>>>> cached
>>>>> file to be passed to runtime.
>>>>> If we are just talking about including the library to the classloader,
>>>> can
>>>>> we directly extend the interface with
>>>>>
>>>>> public void registerCachedFile(
>>>>>     String filePath,
>>>>>     String name,
>>>>>     boolean executable,
>>>>>     boolean includeInClassLoader)
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Rong
>>>>>
>>>>> On Sun, May 13, 2018 at 11:14 PM, Shuyi Chen <su...@gmail.com>
>>> wrote:
>>>>>> Hi Flink devs,
>>>>>>
>>>>>> In an effort to support loading external libraries and creating UDFs
>>>> from
>>>>>> external libraries using DDL in Flink SQL, we want to use Flink’s
>>> Blob
>>>>>> Server to distribute the external libraries in runtime and load those
>>>>>> libraries into the user code classloader automatically.
>>>>>>
>>>>>> However, the current [Stream]ExecutionEnvironment.registerCachedFile
>>>>>> interface limits only to registering executable or non-executable
>>>> blobs.
>>>>>> It’s not possible to tell in runtime if the blob files are libraries
>>>> and
>>>>>> should be loaded into the user code classloader in RuntimeContext.
>>>>>> Therefore, I want to propose to add an enum called *BlobType*
>>>> explicitly
>>>>> to
>>>>>> indicate the type of the Blob file being distributed, and the
>>> following
>>>>>> interface in [Stream]ExecutionEnvironment to support it. In general,
>>> I
>>>>>> think the new BlobType information can be used by Flink runtime to
>>>>>> preprocess the Blob files if needed.
>>>>>>
>>>>>> */***
>>>>>> ** Registers a file at the distributed cache under the given name.
>>> The
>>>>> file
>>>>>> will be accessible*
>>>>>> ** from any user-defined function in the (distributed) runtime under
>>> a
>>>>>> local path. Files*
>>>>>> ** may be local files (as long as all relevant workers have access to
>>>>> it),
>>>>>> or files in a distributed file system.*
>>>>>> ** The runtime will copy the files temporarily to a local cache, if
>>>>>> needed.*
>>>>>> ***
>>>>>> ** <p>The {@link org.apache.flink.api.common.
>>> functions.RuntimeContext}
>>>>> can
>>>>>> be obtained inside UDFs via*
>>>>>> ** {@link
>>>>>> org.apache.flink.api.common.functions.RichFunction#
>>>> getRuntimeContext()}
>>>>>> and
>>>>>> provides access*
>>>>>> ** {@link org.apache.flink.api.common.ca
>>>>>> <http://org.apache.flink.api.common.ca>che.DistributedCache} via*
>>>>>> ** {@link
>>>>>> org.apache.flink.api.common.functions.RuntimeContext#
>>>>>> getDistributedCache()}.*
>>>>>> ***
>>>>>> ** @param filePath The path of the file, as a URI (e.g.
>>>>> "file:///some/path"
>>>>>> or "hdfs://host:port/and/path")*
>>>>>> ** @param name The name under which the file is registered.*
>>>>>> ** @param blobType indicating the type of the Blob file*
>>>>>> **/*
>>>>>>
>>>>>> *public void registerCachedFile(String filePath, String name,
>>>>>> DistributedCache.BlobType blobType) {...}*
>>>>>>
>>>>>> Optionally, we can add another interface to register UDF Jars which
>>>> will
>>>>>> use the interface above to implement.
>>>>>>
>>>>>> *public void registerJarFile(String filePath, String name) {...}*
>>>>>>
>>>>>> The existing interface in the following will be marked deprecated:
>>>>>>
>>>>>> *public void registerCachedFile(String filePath, String name, boolean
>>>>>> executable) {...}*
>>>>>>
>>>>>> And the following interface will be implemented using the new
>>> interface
>>>>>> proposed above with a EXECUTABLE BlobType:
>>>>>>
>>>>>> *public void registerCachedFile(String filePath, String name) { ...
>>> }*
>>>>>> Thanks a lot.
>>>>>> Shuyi
>>>>>>
>>>>>> "So you have to trust that the dots will somehow connect in your
>>>> future."



Re: [DISCUSS] Adding new interfaces in [Stream]ExecutionEnvironment

Posted by Aljoscha Krettek <al...@apache.org>.
I think this functionality is already there, we just have to expose it in the right places: ClusterClient.submitJob() takes a JobGraph, JobGraph has method addJar() for adding jars that need to be in the classloader for executing a user program.

> On 16. May 2018, at 12:34, Fabian Hueske <fh...@gmail.com> wrote:
> 
> Hi Ted,
> 
> The design doc is in late draft status and proposes support for SQL DDL
> statements (CREATE TABLE, CREATE  FUNCTION, etc.).
> The question about registering JARs came up because we need a way to
> distribute JAR files that contain the code of user-defined functions.
> 
> The design doc will soon be shared on the dev mailing list to gather
> feedback from the community.
> 
> Best, Fabian
> 
> 2018-05-16 10:45 GMT+02:00 Ted Yu <yu...@gmail.com>:
> 
>> bq. In a design document, Timo mentioned that we can ship multiple JAR
>> files
>> 
>> Mind telling us where the design doc can be retrieved ?
>> 
>> Thanks
>> 
>> On Wed, May 16, 2018 at 1:29 AM, Fabian Hueske <fh...@gmail.com> wrote:
>> 
>>> Hi,
>>> 
>>> I'm not sure if we need to modify the existing method.
>>> What we need is a bit different from what registerCachedFile() provides.
>>> The method ensures that a file is copied to each TaskManager and can be
>>> locally accessed from a function's RuntimeContext.
>>> In our case, we don't need to access the file but would like to make sure
>>> that it is loaded into the class loader.
>>> So, we could also just add a method like registerUserJarFile().
>>> 
>>> In a design document, Timo mentioned that we can ship multiple JAR files
>>> with a job.
>>> So, we could also implement the UDF shipping logic by loading the Jar
>>> file(s) to the client and distribute them from there.
>>> In that case, we would not need to add new method to the execution
>>> environment.
>>> 
>>> Best,
>>> Fabian
>>> 
>>> 2018-05-15 3:50 GMT+02:00 Rong Rong <wa...@gmail.com>:
>>> 
>>>> +1. This could be very useful for "dynamic" UDF.
>>>> 
>>>> Just to clarify, if I understand correctly, we are tying to use an ENUM
>>>> indicator to
>>>> (1) Replace the current Boolean isExecutable flag.
>>>> (2) Provide additional information used by ExecutionEnvironment to
>> decide
>>>> when/where to use the DistributedCached file.
>>>> 
>>>> In this case, DistributedCache.CacheType or DistributedCache.FileType
>>>> sounds more intuitive, what do you think?
>>>> 
>>>> Also, I was wondering is there any other useful information for the
>>> cached
>>>> file to be passed to runtime.
>>>> If we are just talking about including the library to the classloader,
>>> can
>>>> we directly extend the interface with
>>>> 
>>>> public void registerCachedFile(
>>>>    String filePath,
>>>>    String name,
>>>>    boolean executable,
>>>>    boolean includeInClassLoader)
>>>> 
>>>> 
>>>> Thanks,
>>>> Rong
>>>> 
>>>> On Sun, May 13, 2018 at 11:14 PM, Shuyi Chen <su...@gmail.com>
>> wrote:
>>>> 
>>>>> Hi Flink devs,
>>>>> 
>>>>> In an effort to support loading external libraries and creating UDFs
>>> from
>>>>> external libraries using DDL in Flink SQL, we want to use Flink’s
>> Blob
>>>>> Server to distribute the external libraries in runtime and load those
>>>>> libraries into the user code classloader automatically.
>>>>> 
>>>>> However, the current [Stream]ExecutionEnvironment.registerCachedFile
>>>>> interface limits only to registering executable or non-executable
>>> blobs.
>>>>> It’s not possible to tell in runtime if the blob files are libraries
>>> and
>>>>> should be loaded into the user code classloader in RuntimeContext.
>>>>> Therefore, I want to propose to add an enum called *BlobType*
>>> explicitly
>>>> to
>>>>> indicate the type of the Blob file being distributed, and the
>> following
>>>>> interface in [Stream]ExecutionEnvironment to support it. In general,
>> I
>>>>> think the new BlobType information can be used by Flink runtime to
>>>>> preprocess the Blob files if needed.
>>>>> 
>>>>> */***
>>>>> ** Registers a file at the distributed cache under the given name.
>> The
>>>> file
>>>>> will be accessible*
>>>>> ** from any user-defined function in the (distributed) runtime under
>> a
>>>>> local path. Files*
>>>>> ** may be local files (as long as all relevant workers have access to
>>>> it),
>>>>> or files in a distributed file system.*
>>>>> ** The runtime will copy the files temporarily to a local cache, if
>>>>> needed.*
>>>>> ***
>>>>> ** <p>The {@link org.apache.flink.api.common.
>> functions.RuntimeContext}
>>>> can
>>>>> be obtained inside UDFs via*
>>>>> ** {@link
>>>>> org.apache.flink.api.common.functions.RichFunction#
>>> getRuntimeContext()}
>>>>> and
>>>>> provides access*
>>>>> ** {@link org.apache.flink.api.common.ca
>>>>> <http://org.apache.flink.api.common.ca>che.DistributedCache} via*
>>>>> ** {@link
>>>>> org.apache.flink.api.common.functions.RuntimeContext#
>>>>> getDistributedCache()}.*
>>>>> ***
>>>>> ** @param filePath The path of the file, as a URI (e.g.
>>>> "file:///some/path"
>>>>> or "hdfs://host:port/and/path")*
>>>>> ** @param name The name under which the file is registered.*
>>>>> ** @param blobType indicating the type of the Blob file*
>>>>> **/*
>>>>> 
>>>>> *public void registerCachedFile(String filePath, String name,
>>>>> DistributedCache.BlobType blobType) {...}*
>>>>> 
>>>>> Optionally, we can add another interface to register UDF Jars which
>>> will
>>>>> use the interface above to implement.
>>>>> 
>>>>> *public void registerJarFile(String filePath, String name) {...}*
>>>>> 
>>>>> The existing interface in the following will be marked deprecated:
>>>>> 
>>>>> *public void registerCachedFile(String filePath, String name, boolean
>>>>> executable) {...}*
>>>>> 
>>>>> And the following interface will be implemented using the new
>> interface
>>>>> proposed above with a EXECUTABLE BlobType:
>>>>> 
>>>>> *public void registerCachedFile(String filePath, String name) { ...
>> }*
>>>>> 
>>>>> Thanks a lot.
>>>>> Shuyi
>>>>> 
>>>>> "So you have to trust that the dots will somehow connect in your
>>> future."
>>>>> 
>>>> 
>>> 
>> 


Re: [DISCUSS] Adding new interfaces in [Stream]ExecutionEnvironment

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Ted,

The design doc is in late draft status and proposes support for SQL DDL
statements (CREATE TABLE, CREATE  FUNCTION, etc.).
The question about registering JARs came up because we need a way to
distribute JAR files that contain the code of user-defined functions.

The design doc will soon be shared on the dev mailing list to gather
feedback from the community.

Best, Fabian

2018-05-16 10:45 GMT+02:00 Ted Yu <yu...@gmail.com>:

> bq. In a design document, Timo mentioned that we can ship multiple JAR
> files
>
> Mind telling us where the design doc can be retrieved ?
>
> Thanks
>
> On Wed, May 16, 2018 at 1:29 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
> > Hi,
> >
> > I'm not sure if we need to modify the existing method.
> > What we need is a bit different from what registerCachedFile() provides.
> > The method ensures that a file is copied to each TaskManager and can be
> > locally accessed from a function's RuntimeContext.
> > In our case, we don't need to access the file but would like to make sure
> > that it is loaded into the class loader.
> > So, we could also just add a method like registerUserJarFile().
> >
> > In a design document, Timo mentioned that we can ship multiple JAR files
> > with a job.
> > So, we could also implement the UDF shipping logic by loading the Jar
> > file(s) to the client and distribute them from there.
> > In that case, we would not need to add new method to the execution
> > environment.
> >
> > Best,
> > Fabian
> >
> > 2018-05-15 3:50 GMT+02:00 Rong Rong <wa...@gmail.com>:
> >
> > > +1. This could be very useful for "dynamic" UDF.
> > >
> > > Just to clarify, if I understand correctly, we are tying to use an ENUM
> > > indicator to
> > > (1) Replace the current Boolean isExecutable flag.
> > > (2) Provide additional information used by ExecutionEnvironment to
> decide
> > > when/where to use the DistributedCached file.
> > >
> > > In this case, DistributedCache.CacheType or DistributedCache.FileType
> > > sounds more intuitive, what do you think?
> > >
> > > Also, I was wondering is there any other useful information for the
> > cached
> > > file to be passed to runtime.
> > > If we are just talking about including the library to the classloader,
> > can
> > > we directly extend the interface with
> > >
> > > public void registerCachedFile(
> > >     String filePath,
> > >     String name,
> > >     boolean executable,
> > >     boolean includeInClassLoader)
> > >
> > >
> > > Thanks,
> > > Rong
> > >
> > > On Sun, May 13, 2018 at 11:14 PM, Shuyi Chen <su...@gmail.com>
> wrote:
> > >
> > > > Hi Flink devs,
> > > >
> > > > In an effort to support loading external libraries and creating UDFs
> > from
> > > > external libraries using DDL in Flink SQL, we want to use Flink’s
> Blob
> > > > Server to distribute the external libraries in runtime and load those
> > > > libraries into the user code classloader automatically.
> > > >
> > > > However, the current [Stream]ExecutionEnvironment.registerCachedFile
> > > > interface limits only to registering executable or non-executable
> > blobs.
> > > > It’s not possible to tell in runtime if the blob files are libraries
> > and
> > > > should be loaded into the user code classloader in RuntimeContext.
> > > > Therefore, I want to propose to add an enum called *BlobType*
> > explicitly
> > > to
> > > > indicate the type of the Blob file being distributed, and the
> following
> > > > interface in [Stream]ExecutionEnvironment to support it. In general,
> I
> > > > think the new BlobType information can be used by Flink runtime to
> > > > preprocess the Blob files if needed.
> > > >
> > > > */***
> > > > ** Registers a file at the distributed cache under the given name.
> The
> > > file
> > > > will be accessible*
> > > > ** from any user-defined function in the (distributed) runtime under
> a
> > > > local path. Files*
> > > > ** may be local files (as long as all relevant workers have access to
> > > it),
> > > > or files in a distributed file system.*
> > > > ** The runtime will copy the files temporarily to a local cache, if
> > > > needed.*
> > > > ***
> > > > ** <p>The {@link org.apache.flink.api.common.
> functions.RuntimeContext}
> > > can
> > > > be obtained inside UDFs via*
> > > > ** {@link
> > > > org.apache.flink.api.common.functions.RichFunction#
> > getRuntimeContext()}
> > > > and
> > > > provides access*
> > > > ** {@link org.apache.flink.api.common.ca
> > > > <http://org.apache.flink.api.common.ca>che.DistributedCache} via*
> > > > ** {@link
> > > > org.apache.flink.api.common.functions.RuntimeContext#
> > > > getDistributedCache()}.*
> > > > ***
> > > > ** @param filePath The path of the file, as a URI (e.g.
> > > "file:///some/path"
> > > > or "hdfs://host:port/and/path")*
> > > > ** @param name The name under which the file is registered.*
> > > > ** @param blobType indicating the type of the Blob file*
> > > > **/*
> > > >
> > > > *public void registerCachedFile(String filePath, String name,
> > > > DistributedCache.BlobType blobType) {...}*
> > > >
> > > > Optionally, we can add another interface to register UDF Jars which
> > will
> > > > use the interface above to implement.
> > > >
> > > > *public void registerJarFile(String filePath, String name) {...}*
> > > >
> > > > The existing interface in the following will be marked deprecated:
> > > >
> > > > *public void registerCachedFile(String filePath, String name, boolean
> > > > executable) {...}*
> > > >
> > > > And the following interface will be implemented using the new
> interface
> > > > proposed above with a EXECUTABLE BlobType:
> > > >
> > > > *public void registerCachedFile(String filePath, String name) { ...
> }*
> > > >
> > > > Thanks a lot.
> > > > Shuyi
> > > >
> > > > "So you have to trust that the dots will somehow connect in your
> > future."
> > > >
> > >
> >
>

Re: [DISCUSS] Adding new interfaces in [Stream]ExecutionEnvironment

Posted by Ted Yu <yu...@gmail.com>.
bq. In a design document, Timo mentioned that we can ship multiple JAR files

Mind telling us where the design doc can be retrieved ?

Thanks

On Wed, May 16, 2018 at 1:29 AM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi,
>
> I'm not sure if we need to modify the existing method.
> What we need is a bit different from what registerCachedFile() provides.
> The method ensures that a file is copied to each TaskManager and can be
> locally accessed from a function's RuntimeContext.
> In our case, we don't need to access the file but would like to make sure
> that it is loaded into the class loader.
> So, we could also just add a method like registerUserJarFile().
>
> In a design document, Timo mentioned that we can ship multiple JAR files
> with a job.
> So, we could also implement the UDF shipping logic by loading the Jar
> file(s) to the client and distribute them from there.
> In that case, we would not need to add new method to the execution
> environment.
>
> Best,
> Fabian
>
> 2018-05-15 3:50 GMT+02:00 Rong Rong <wa...@gmail.com>:
>
> > +1. This could be very useful for "dynamic" UDF.
> >
> > Just to clarify, if I understand correctly, we are tying to use an ENUM
> > indicator to
> > (1) Replace the current Boolean isExecutable flag.
> > (2) Provide additional information used by ExecutionEnvironment to decide
> > when/where to use the DistributedCached file.
> >
> > In this case, DistributedCache.CacheType or DistributedCache.FileType
> > sounds more intuitive, what do you think?
> >
> > Also, I was wondering is there any other useful information for the
> cached
> > file to be passed to runtime.
> > If we are just talking about including the library to the classloader,
> can
> > we directly extend the interface with
> >
> > public void registerCachedFile(
> >     String filePath,
> >     String name,
> >     boolean executable,
> >     boolean includeInClassLoader)
> >
> >
> > Thanks,
> > Rong
> >
> > On Sun, May 13, 2018 at 11:14 PM, Shuyi Chen <su...@gmail.com> wrote:
> >
> > > Hi Flink devs,
> > >
> > > In an effort to support loading external libraries and creating UDFs
> from
> > > external libraries using DDL in Flink SQL, we want to use Flink’s Blob
> > > Server to distribute the external libraries in runtime and load those
> > > libraries into the user code classloader automatically.
> > >
> > > However, the current [Stream]ExecutionEnvironment.registerCachedFile
> > > interface limits only to registering executable or non-executable
> blobs.
> > > It’s not possible to tell in runtime if the blob files are libraries
> and
> > > should be loaded into the user code classloader in RuntimeContext.
> > > Therefore, I want to propose to add an enum called *BlobType*
> explicitly
> > to
> > > indicate the type of the Blob file being distributed, and the following
> > > interface in [Stream]ExecutionEnvironment to support it. In general, I
> > > think the new BlobType information can be used by Flink runtime to
> > > preprocess the Blob files if needed.
> > >
> > > */***
> > > ** Registers a file at the distributed cache under the given name. The
> > file
> > > will be accessible*
> > > ** from any user-defined function in the (distributed) runtime under a
> > > local path. Files*
> > > ** may be local files (as long as all relevant workers have access to
> > it),
> > > or files in a distributed file system.*
> > > ** The runtime will copy the files temporarily to a local cache, if
> > > needed.*
> > > ***
> > > ** <p>The {@link org.apache.flink.api.common.functions.RuntimeContext}
> > can
> > > be obtained inside UDFs via*
> > > ** {@link
> > > org.apache.flink.api.common.functions.RichFunction#
> getRuntimeContext()}
> > > and
> > > provides access*
> > > ** {@link org.apache.flink.api.common.ca
> > > <http://org.apache.flink.api.common.ca>che.DistributedCache} via*
> > > ** {@link
> > > org.apache.flink.api.common.functions.RuntimeContext#
> > > getDistributedCache()}.*
> > > ***
> > > ** @param filePath The path of the file, as a URI (e.g.
> > "file:///some/path"
> > > or "hdfs://host:port/and/path")*
> > > ** @param name The name under which the file is registered.*
> > > ** @param blobType indicating the type of the Blob file*
> > > **/*
> > >
> > > *public void registerCachedFile(String filePath, String name,
> > > DistributedCache.BlobType blobType) {...}*
> > >
> > > Optionally, we can add another interface to register UDF Jars which
> will
> > > use the interface above to implement.
> > >
> > > *public void registerJarFile(String filePath, String name) {...}*
> > >
> > > The existing interface in the following will be marked deprecated:
> > >
> > > *public void registerCachedFile(String filePath, String name, boolean
> > > executable) {...}*
> > >
> > > And the following interface will be implemented using the new interface
> > > proposed above with a EXECUTABLE BlobType:
> > >
> > > *public void registerCachedFile(String filePath, String name) { ... }*
> > >
> > > Thanks a lot.
> > > Shuyi
> > >
> > > "So you have to trust that the dots will somehow connect in your
> future."
> > >
> >
>

Re: [DISCUSS] Adding new interfaces in [Stream]ExecutionEnvironment

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

I'm not sure if we need to modify the existing method.
What we need is a bit different from what registerCachedFile() provides.
The method ensures that a file is copied to each TaskManager and can be
locally accessed from a function's RuntimeContext.
In our case, we don't need to access the file but would like to make sure
that it is loaded into the class loader.
So, we could also just add a method like registerUserJarFile().

In a design document, Timo mentioned that we can ship multiple JAR files
with a job.
So, we could also implement the UDF shipping logic by loading the Jar
file(s) to the client and distribute them from there.
In that case, we would not need to add new method to the execution
environment.

Best,
Fabian

2018-05-15 3:50 GMT+02:00 Rong Rong <wa...@gmail.com>:

> +1. This could be very useful for "dynamic" UDF.
>
> Just to clarify, if I understand correctly, we are tying to use an ENUM
> indicator to
> (1) Replace the current Boolean isExecutable flag.
> (2) Provide additional information used by ExecutionEnvironment to decide
> when/where to use the DistributedCached file.
>
> In this case, DistributedCache.CacheType or DistributedCache.FileType
> sounds more intuitive, what do you think?
>
> Also, I was wondering is there any other useful information for the cached
> file to be passed to runtime.
> If we are just talking about including the library to the classloader, can
> we directly extend the interface with
>
> public void registerCachedFile(
>     String filePath,
>     String name,
>     boolean executable,
>     boolean includeInClassLoader)
>
>
> Thanks,
> Rong
>
> On Sun, May 13, 2018 at 11:14 PM, Shuyi Chen <su...@gmail.com> wrote:
>
> > Hi Flink devs,
> >
> > In an effort to support loading external libraries and creating UDFs from
> > external libraries using DDL in Flink SQL, we want to use Flink’s Blob
> > Server to distribute the external libraries in runtime and load those
> > libraries into the user code classloader automatically.
> >
> > However, the current [Stream]ExecutionEnvironment.registerCachedFile
> > interface limits only to registering executable or non-executable blobs.
> > It’s not possible to tell in runtime if the blob files are libraries and
> > should be loaded into the user code classloader in RuntimeContext.
> > Therefore, I want to propose to add an enum called *BlobType* explicitly
> to
> > indicate the type of the Blob file being distributed, and the following
> > interface in [Stream]ExecutionEnvironment to support it. In general, I
> > think the new BlobType information can be used by Flink runtime to
> > preprocess the Blob files if needed.
> >
> > */***
> > ** Registers a file at the distributed cache under the given name. The
> file
> > will be accessible*
> > ** from any user-defined function in the (distributed) runtime under a
> > local path. Files*
> > ** may be local files (as long as all relevant workers have access to
> it),
> > or files in a distributed file system.*
> > ** The runtime will copy the files temporarily to a local cache, if
> > needed.*
> > ***
> > ** <p>The {@link org.apache.flink.api.common.functions.RuntimeContext}
> can
> > be obtained inside UDFs via*
> > ** {@link
> > org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()}
> > and
> > provides access*
> > ** {@link org.apache.flink.api.common.ca
> > <http://org.apache.flink.api.common.ca>che.DistributedCache} via*
> > ** {@link
> > org.apache.flink.api.common.functions.RuntimeContext#
> > getDistributedCache()}.*
> > ***
> > ** @param filePath The path of the file, as a URI (e.g.
> "file:///some/path"
> > or "hdfs://host:port/and/path")*
> > ** @param name The name under which the file is registered.*
> > ** @param blobType indicating the type of the Blob file*
> > **/*
> >
> > *public void registerCachedFile(String filePath, String name,
> > DistributedCache.BlobType blobType) {...}*
> >
> > Optionally, we can add another interface to register UDF Jars which will
> > use the interface above to implement.
> >
> > *public void registerJarFile(String filePath, String name) {...}*
> >
> > The existing interface in the following will be marked deprecated:
> >
> > *public void registerCachedFile(String filePath, String name, boolean
> > executable) {...}*
> >
> > And the following interface will be implemented using the new interface
> > proposed above with a EXECUTABLE BlobType:
> >
> > *public void registerCachedFile(String filePath, String name) { ... }*
> >
> > Thanks a lot.
> > Shuyi
> >
> > "So you have to trust that the dots will somehow connect in your future."
> >
>

Re: [DISCUSS] Adding new interfaces in [Stream]ExecutionEnvironment

Posted by Rong Rong <wa...@gmail.com>.
+1. This could be very useful for "dynamic" UDF.

Just to clarify, if I understand correctly, we are tying to use an ENUM
indicator to
(1) Replace the current Boolean isExecutable flag.
(2) Provide additional information used by ExecutionEnvironment to decide
when/where to use the DistributedCached file.

In this case, DistributedCache.CacheType or DistributedCache.FileType
sounds more intuitive, what do you think?

Also, I was wondering is there any other useful information for the cached
file to be passed to runtime.
If we are just talking about including the library to the classloader, can
we directly extend the interface with

public void registerCachedFile(
    String filePath,
    String name,
    boolean executable,
    boolean includeInClassLoader)


Thanks,
Rong

On Sun, May 13, 2018 at 11:14 PM, Shuyi Chen <su...@gmail.com> wrote:

> Hi Flink devs,
>
> In an effort to support loading external libraries and creating UDFs from
> external libraries using DDL in Flink SQL, we want to use Flink’s Blob
> Server to distribute the external libraries in runtime and load those
> libraries into the user code classloader automatically.
>
> However, the current [Stream]ExecutionEnvironment.registerCachedFile
> interface limits only to registering executable or non-executable blobs.
> It’s not possible to tell in runtime if the blob files are libraries and
> should be loaded into the user code classloader in RuntimeContext.
> Therefore, I want to propose to add an enum called *BlobType* explicitly to
> indicate the type of the Blob file being distributed, and the following
> interface in [Stream]ExecutionEnvironment to support it. In general, I
> think the new BlobType information can be used by Flink runtime to
> preprocess the Blob files if needed.
>
> */***
> ** Registers a file at the distributed cache under the given name. The file
> will be accessible*
> ** from any user-defined function in the (distributed) runtime under a
> local path. Files*
> ** may be local files (as long as all relevant workers have access to it),
> or files in a distributed file system.*
> ** The runtime will copy the files temporarily to a local cache, if
> needed.*
> ***
> ** <p>The {@link org.apache.flink.api.common.functions.RuntimeContext} can
> be obtained inside UDFs via*
> ** {@link
> org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()}
> and
> provides access*
> ** {@link org.apache.flink.api.common.ca
> <http://org.apache.flink.api.common.ca>che.DistributedCache} via*
> ** {@link
> org.apache.flink.api.common.functions.RuntimeContext#
> getDistributedCache()}.*
> ***
> ** @param filePath The path of the file, as a URI (e.g. "file:///some/path"
> or "hdfs://host:port/and/path")*
> ** @param name The name under which the file is registered.*
> ** @param blobType indicating the type of the Blob file*
> **/*
>
> *public void registerCachedFile(String filePath, String name,
> DistributedCache.BlobType blobType) {...}*
>
> Optionally, we can add another interface to register UDF Jars which will
> use the interface above to implement.
>
> *public void registerJarFile(String filePath, String name) {...}*
>
> The existing interface in the following will be marked deprecated:
>
> *public void registerCachedFile(String filePath, String name, boolean
> executable) {...}*
>
> And the following interface will be implemented using the new interface
> proposed above with a EXECUTABLE BlobType:
>
> *public void registerCachedFile(String filePath, String name) { ... }*
>
> Thanks a lot.
> Shuyi
>
> "So you have to trust that the dots will somehow connect in your future."
>