You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zohar-pm <gi...@git.apache.org> on 2017/04/19 13:03:34 UTC

[GitHub] flink pull request #3741: [FLINK-6177] Add support for "Distributed Cache" i...

GitHub user zohar-pm opened a pull request:

    https://github.com/apache/flink/pull/3741

    [FLINK-6177] Add support for "Distributed Cache" in streaming applica\u2026

    \u2026tions
    
    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful description of your changes.
    
    - [ ] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [ ] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [ ] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zohar-pm/flink python-streaming-distributed-cache

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3741.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3741
    
----
commit 03684e8e460143013babb2ec88c66c8fa1119c43
Author: Zohar Mizrahi <zo...@parallelmachines.com>
Date:   2017-04-09T09:11:57Z

    [FLINK-6177] Add support for "Distributed Cache" in streaming applications

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3741: [FLINK-6177] Add support for "Distributed Cache" in strea...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/3741
  
    Looks quite good now.
    
    If I can ask you for one more followup: To have faster tests, it would be good to add the streaming distributed cache test and the batch distributed cache test to the same file.
    
    Can you change the `DistributedCacheTest` to extend `StreamingMultipleProgramsTestBase` and put your test in there as well? That will cause only one distributed mini cluster to be spawned, which typically saves 1-2 secs in tests. May not seem much, but it adds up over the 1000s of tests Flink has by now...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3741: [FLINK-6177] Add support for "Distributed Cache" in strea...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/3741
  
    Thanks for contributing this, the added functionality looks good.
    
    I would prefer to add this change without changing the dependencies and test base classes. You could for example change the test to throw an exception in the "validator function" if the word is not in the cache file. That way you do not need to "collect back" the data.
    
    Minor comment: Generating the input from a collection rather than a file makes the tests usually a bit more lightweight. In all newer tests, we try to do that.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3741: [FLINK-6177] Add support for "Distributed Cache" i...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/3741


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3741: [FLINK-6177] Add support for "Distributed Cache" in strea...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/3741
  
    merging. will add the missing space while I'm doing it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3741: [FLINK-6177] Add support for "Distributed Cache" in strea...

Posted by zohar-pm <gi...@git.apache.org>.
Github user zohar-pm commented on the issue:

    https://github.com/apache/flink/pull/3741
  
    No problem. Will follow it and will create a new pull request.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3741: [FLINK-6177] Add support for "Distributed Cache" in strea...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/3741
  
    @zohar-pm You don't have to open a new one, feel free to update the branch in this one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3741: [FLINK-6177] Add support for "Distributed Cache" i...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3741#discussion_r114503248
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---
    @@ -1776,8 +1787,45 @@ public static void setDefaultLocalParallelism(int parallelism) {
     	protected static void initializeContextEnvironment(StreamExecutionEnvironmentFactory ctx) {
     		contextEnvironmentFactory = ctx;
     	}
    -	
    +
     	protected static void resetContextEnvironment() {
     		contextEnvironmentFactory = null;
     	}
    +
    +	/**
    +	 * Registers a file at the distributed cache under the given name. The file will be accessible
    +	 * from any user-defined function in the (distributed) runtime under a local path. Files
    +	 * may be local files (as long as all relevant workers have access to it), or files in a distributed file system.
    +	 * The runtime will copy the files temporarily to a local cache, if needed.
    +	 * <p>
    +	 * The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs via
    +	 * {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and provides access
    +	 * {@link org.apache.flink.api.common.cache.DistributedCache} via
    +	 * {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
    +	 *
    +	 * @param filePath The path of the file, as a URI (e.g. "file:///some/path" or "hdfs://host:port/and/path")
    +	 * @param name The name under which the file is registered.
    +	 */
    +	public void registerCachedFile(String filePath, String name){
    +		registerCachedFile(filePath, name, false);
    +	}
    +
    +	/**
    +	 * Registers a file at the distributed cache under the given name. The file will be accessible
    +	 * from any user-defined function in the (distributed) runtime under a local path. Files
    +	 * may be local files (as long as all relevant workers have access to it), or files in a distributed file system.
    +	 * The runtime will copy the files temporarily to a local cache, if needed.
    +	 * <p>
    +	 * The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs via
    +	 * {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and provides access
    +	 * {@link org.apache.flink.api.common.cache.DistributedCache} via
    +	 * {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
    +	 *
    +	 * @param filePath The path of the file, as a URI (e.g. "file:///some/path" or "hdfs://host:port/and/path")
    +	 * @param name The name under which the file is registered.
    +	 * @param executable flag indicating whether the file should be executable
    +	 */
    +	public void registerCachedFile(String filePath, String name, boolean executable){
    --- End diff --
    
    missing space before `}`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3741: [FLINK-6177] Add support for "Distributed Cache" i...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3741#discussion_r114503212
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---
    @@ -1776,8 +1787,45 @@ public static void setDefaultLocalParallelism(int parallelism) {
     	protected static void initializeContextEnvironment(StreamExecutionEnvironmentFactory ctx) {
     		contextEnvironmentFactory = ctx;
     	}
    -	
    +
     	protected static void resetContextEnvironment() {
     		contextEnvironmentFactory = null;
     	}
    +
    +	/**
    +	 * Registers a file at the distributed cache under the given name. The file will be accessible
    +	 * from any user-defined function in the (distributed) runtime under a local path. Files
    +	 * may be local files (as long as all relevant workers have access to it), or files in a distributed file system.
    +	 * The runtime will copy the files temporarily to a local cache, if needed.
    +	 * <p>
    +	 * The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs via
    +	 * {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and provides access
    +	 * {@link org.apache.flink.api.common.cache.DistributedCache} via
    +	 * {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
    +	 *
    +	 * @param filePath The path of the file, as a URI (e.g. "file:///some/path" or "hdfs://host:port/and/path")
    +	 * @param name The name under which the file is registered.
    +	 */
    +	public void registerCachedFile(String filePath, String name){
    --- End diff --
    
    missing space before `{`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---